1use std::{
4 future::Future,
5 pin::Pin,
6 task::{Context, Poll},
7};
8
9use actix_codec::{AsyncRead, AsyncWrite};
10use actix_rt::time::{sleep_until, Sleep};
11use bytes::Bytes;
12use futures_core::{ready, Stream};
13use h2::{
14 server::{handshake, Connection, Handshake},
15 RecvStream,
16};
17
18use crate::{
19 config::ServiceConfig,
20 error::{DispatchError, PayloadError},
21};
22
23mod dispatcher;
24mod service;
25
26pub use self::{dispatcher::Dispatcher, service::H2Service};
27
28pub struct Payload {
30 stream: RecvStream,
31}
32
33impl Payload {
34 pub(crate) fn new(stream: RecvStream) -> Self {
35 Self { stream }
36 }
37}
38
39impl Stream for Payload {
40 type Item = Result<Bytes, PayloadError>;
41
42 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
43 let this = self.get_mut();
44
45 match ready!(Pin::new(&mut this.stream).poll_data(cx)) {
46 Some(Ok(chunk)) => {
47 let len = chunk.len();
48
49 match this.stream.flow_control().release_capacity(len) {
50 Ok(()) => Poll::Ready(Some(Ok(chunk))),
51 Err(err) => Poll::Ready(Some(Err(err.into()))),
52 }
53 }
54 Some(Err(err)) => Poll::Ready(Some(Err(err.into()))),
55 None => Poll::Ready(None),
56 }
57 }
58}
59
60pub(crate) fn handshake_with_timeout<T>(io: T, config: &ServiceConfig) -> HandshakeWithTimeout<T>
61where
62 T: AsyncRead + AsyncWrite + Unpin,
63{
64 HandshakeWithTimeout {
65 handshake: handshake(io),
66 timer: config
67 .client_request_deadline()
68 .map(|deadline| Box::pin(sleep_until(deadline.into()))),
69 }
70}
71
72pub(crate) struct HandshakeWithTimeout<T: AsyncRead + AsyncWrite + Unpin> {
73 handshake: Handshake<T>,
74 timer: Option<Pin<Box<Sleep>>>,
75}
76
77impl<T> Future for HandshakeWithTimeout<T>
78where
79 T: AsyncRead + AsyncWrite + Unpin,
80{
81 type Output = Result<(Connection<T, Bytes>, Option<Pin<Box<Sleep>>>), DispatchError>;
82
83 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
84 let this = self.get_mut();
85
86 match Pin::new(&mut this.handshake).poll(cx)? {
87 Poll::Ready(conn) => Poll::Ready(Ok((conn, this.timer.take()))),
89 Poll::Pending => match this.timer.as_mut() {
90 Some(timer) => {
91 ready!(timer.as_mut().poll(cx));
92 Poll::Ready(Err(DispatchError::SlowRequestTimeout))
93 }
94 None => Poll::Pending,
95 },
96 }
97 }
98}
99
100#[cfg(test)]
101mod tests {
102 use static_assertions::assert_impl_all;
103
104 use super::*;
105
106 assert_impl_all!(Payload: Unpin, Send, Sync);
107}