actix_http/h2/
mod.rs

1//! HTTP/2 protocol.
2
3use std::{
4    future::Future,
5    pin::Pin,
6    task::{Context, Poll},
7};
8
9use actix_codec::{AsyncRead, AsyncWrite};
10use actix_rt::time::{sleep_until, Sleep};
11use bytes::Bytes;
12use futures_core::{ready, Stream};
13use h2::{
14    server::{handshake, Connection, Handshake},
15    RecvStream,
16};
17
18use crate::{
19    config::ServiceConfig,
20    error::{DispatchError, PayloadError},
21};
22
23mod dispatcher;
24mod service;
25
26pub use self::{dispatcher::Dispatcher, service::H2Service};
27
28/// HTTP/2 peer stream.
29pub struct Payload {
30    stream: RecvStream,
31}
32
33impl Payload {
34    pub(crate) fn new(stream: RecvStream) -> Self {
35        Self { stream }
36    }
37}
38
39impl Stream for Payload {
40    type Item = Result<Bytes, PayloadError>;
41
42    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
43        let this = self.get_mut();
44
45        match ready!(Pin::new(&mut this.stream).poll_data(cx)) {
46            Some(Ok(chunk)) => {
47                let len = chunk.len();
48
49                match this.stream.flow_control().release_capacity(len) {
50                    Ok(()) => Poll::Ready(Some(Ok(chunk))),
51                    Err(err) => Poll::Ready(Some(Err(err.into()))),
52                }
53            }
54            Some(Err(err)) => Poll::Ready(Some(Err(err.into()))),
55            None => Poll::Ready(None),
56        }
57    }
58}
59
60pub(crate) fn handshake_with_timeout<T>(io: T, config: &ServiceConfig) -> HandshakeWithTimeout<T>
61where
62    T: AsyncRead + AsyncWrite + Unpin,
63{
64    HandshakeWithTimeout {
65        handshake: handshake(io),
66        timer: config
67            .client_request_deadline()
68            .map(|deadline| Box::pin(sleep_until(deadline.into()))),
69    }
70}
71
72pub(crate) struct HandshakeWithTimeout<T: AsyncRead + AsyncWrite + Unpin> {
73    handshake: Handshake<T>,
74    timer: Option<Pin<Box<Sleep>>>,
75}
76
77impl<T> Future for HandshakeWithTimeout<T>
78where
79    T: AsyncRead + AsyncWrite + Unpin,
80{
81    type Output = Result<(Connection<T, Bytes>, Option<Pin<Box<Sleep>>>), DispatchError>;
82
83    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
84        let this = self.get_mut();
85
86        match Pin::new(&mut this.handshake).poll(cx)? {
87            // return the timer on success handshake; its slot can be re-used for h2 ping-pong
88            Poll::Ready(conn) => Poll::Ready(Ok((conn, this.timer.take()))),
89            Poll::Pending => match this.timer.as_mut() {
90                Some(timer) => {
91                    ready!(timer.as_mut().poll(cx));
92                    Poll::Ready(Err(DispatchError::SlowRequestTimeout))
93                }
94                None => Poll::Pending,
95            },
96        }
97    }
98}
99
100#[cfg(test)]
101mod tests {
102    use static_assertions::assert_impl_all;
103
104    use super::*;
105
106    assert_impl_all!(Payload: Unpin, Send, Sync);
107}