actix_http/h2/
dispatcher.rs

1use std::{
2    cmp,
3    error::Error as StdError,
4    future::Future,
5    marker::PhantomData,
6    net,
7    pin::{pin, Pin},
8    rc::Rc,
9    task::{Context, Poll},
10};
11
12use actix_codec::{AsyncRead, AsyncWrite};
13use actix_rt::time::{sleep, Sleep};
14use actix_service::Service;
15use actix_utils::future::poll_fn;
16use bytes::{Bytes, BytesMut};
17use futures_core::ready;
18use h2::{
19    server::{Connection, SendResponse},
20    Ping, PingPong,
21};
22use pin_project_lite::pin_project;
23
24use crate::{
25    body::{BodySize, BoxBody, MessageBody},
26    config::ServiceConfig,
27    header::{
28        HeaderName, HeaderValue, CONNECTION, CONTENT_LENGTH, DATE, TRANSFER_ENCODING, UPGRADE,
29    },
30    service::HttpFlow,
31    Extensions, Method, OnConnectData, Payload, Request, Response, ResponseHead,
32};
33
34const CHUNK_SIZE: usize = 16_384;
35
36pin_project! {
37    /// Dispatcher for HTTP/2 protocol.
38    pub struct Dispatcher<T, S, B, X, U> {
39        flow: Rc<HttpFlow<S, X, U>>,
40        connection: Connection<T, Bytes>,
41        conn_data: Option<Rc<Extensions>>,
42        config: ServiceConfig,
43        peer_addr: Option<net::SocketAddr>,
44        ping_pong: Option<H2PingPong>,
45        _phantom: PhantomData<B>
46    }
47}
48
49impl<T, S, B, X, U> Dispatcher<T, S, B, X, U>
50where
51    T: AsyncRead + AsyncWrite + Unpin,
52{
53    pub(crate) fn new(
54        mut conn: Connection<T, Bytes>,
55        flow: Rc<HttpFlow<S, X, U>>,
56        config: ServiceConfig,
57        peer_addr: Option<net::SocketAddr>,
58        conn_data: OnConnectData,
59        timer: Option<Pin<Box<Sleep>>>,
60    ) -> Self {
61        let ping_pong = config.keep_alive().duration().map(|dur| H2PingPong {
62            timer: timer
63                .map(|mut timer| {
64                    // reuse timer slot if it was initialized for handshake
65                    timer.as_mut().reset((config.now() + dur).into());
66                    timer
67                })
68                .unwrap_or_else(|| Box::pin(sleep(dur))),
69            in_flight: false,
70            ping_pong: conn.ping_pong().unwrap(),
71        });
72
73        Self {
74            flow,
75            config,
76            peer_addr,
77            connection: conn,
78            conn_data: conn_data.0.map(Rc::new),
79            ping_pong,
80            _phantom: PhantomData,
81        }
82    }
83}
84
85struct H2PingPong {
86    /// Handle to send ping frames from the peer.
87    ping_pong: PingPong,
88
89    /// True when a ping has been sent and is waiting for a reply.
90    in_flight: bool,
91
92    /// Timeout for pong response.
93    timer: Pin<Box<Sleep>>,
94}
95
96impl<T, S, B, X, U> Future for Dispatcher<T, S, B, X, U>
97where
98    T: AsyncRead + AsyncWrite + Unpin,
99
100    S: Service<Request>,
101    S::Error: Into<Response<BoxBody>>,
102    S::Future: 'static,
103    S::Response: Into<Response<B>>,
104
105    B: MessageBody,
106{
107    type Output = Result<(), crate::error::DispatchError>;
108
109    #[inline]
110    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
111        let this = self.get_mut();
112
113        loop {
114            match Pin::new(&mut this.connection).poll_accept(cx)? {
115                Poll::Ready(Some((req, tx))) => {
116                    let (parts, body) = req.into_parts();
117                    let payload = crate::h2::Payload::new(body);
118                    let pl = Payload::H2 { payload };
119                    let mut req = Request::with_payload(pl);
120                    let head_req = parts.method == Method::HEAD;
121
122                    let head = req.head_mut();
123                    head.uri = parts.uri;
124                    head.method = parts.method;
125                    head.version = parts.version;
126                    head.headers = parts.headers.into();
127                    head.peer_addr = this.peer_addr;
128
129                    req.conn_data.clone_from(&this.conn_data);
130
131                    let fut = this.flow.service.call(req);
132                    let config = this.config.clone();
133
134                    // multiplex request handling with spawn task
135                    actix_rt::spawn(async move {
136                        // resolve service call and send response.
137                        let res = match fut.await {
138                            Ok(res) => handle_response(res.into(), tx, config, head_req).await,
139                            Err(err) => {
140                                let res: Response<BoxBody> = err.into();
141                                handle_response(res, tx, config, head_req).await
142                            }
143                        };
144
145                        // log error.
146                        if let Err(err) = res {
147                            match err {
148                                DispatchError::SendResponse(err) => {
149                                    tracing::trace!("Error sending response: {err:?}");
150                                }
151                                DispatchError::SendData(err) => {
152                                    tracing::warn!("Send data error: {err:?}");
153                                }
154                                DispatchError::ResponseBody(err) => {
155                                    tracing::error!("Response payload stream error: {err:?}");
156                                }
157                            }
158                        }
159                    });
160                }
161                Poll::Ready(None) => return Poll::Ready(Ok(())),
162
163                Poll::Pending => match this.ping_pong.as_mut() {
164                    Some(ping_pong) => loop {
165                        if ping_pong.in_flight {
166                            // When there is an in-flight ping-pong, poll pong and and keep-alive
167                            // timer. On successful pong received, update keep-alive timer to
168                            // determine the next timing of ping pong.
169                            match ping_pong.ping_pong.poll_pong(cx)? {
170                                Poll::Ready(_) => {
171                                    ping_pong.in_flight = false;
172
173                                    let dead_line = this.config.keep_alive_deadline().unwrap();
174                                    ping_pong.timer.as_mut().reset(dead_line.into());
175                                }
176                                Poll::Pending => {
177                                    return ping_pong.timer.as_mut().poll(cx).map(|_| Ok(()));
178                                }
179                            }
180                        } else {
181                            // When there is no in-flight ping-pong, keep-alive timer is used to
182                            // wait for next timing of ping-pong. Therefore, at this point it serves
183                            // as an interval instead.
184                            ready!(ping_pong.timer.as_mut().poll(cx));
185
186                            ping_pong.ping_pong.send_ping(Ping::opaque())?;
187
188                            let dead_line = this.config.keep_alive_deadline().unwrap();
189                            ping_pong.timer.as_mut().reset(dead_line.into());
190
191                            ping_pong.in_flight = true;
192                        }
193                    },
194                    None => return Poll::Pending,
195                },
196            }
197        }
198    }
199}
200
201enum DispatchError {
202    SendResponse(h2::Error),
203    SendData(h2::Error),
204    ResponseBody(Box<dyn StdError>),
205}
206
207async fn handle_response<B>(
208    res: Response<B>,
209    mut tx: SendResponse<Bytes>,
210    config: ServiceConfig,
211    head_req: bool,
212) -> Result<(), DispatchError>
213where
214    B: MessageBody,
215{
216    let (res, body) = res.replace_body(());
217
218    // prepare response.
219    let mut size = body.size();
220    let res = prepare_response(config, res.head(), &mut size);
221    let eof_or_head = size.is_eof() || head_req;
222
223    // send response head and return on eof.
224    let mut stream = tx
225        .send_response(res, eof_or_head)
226        .map_err(DispatchError::SendResponse)?;
227
228    if eof_or_head {
229        return Ok(());
230    }
231
232    let mut body = pin!(body);
233
234    // poll response body and send chunks to client
235    while let Some(res) = poll_fn(|cx| body.as_mut().poll_next(cx)).await {
236        let mut chunk = res.map_err(|err| DispatchError::ResponseBody(err.into()))?;
237
238        'send: loop {
239            let chunk_size = cmp::min(chunk.len(), CHUNK_SIZE);
240
241            // reserve enough space and wait for stream ready.
242            stream.reserve_capacity(chunk_size);
243
244            match poll_fn(|cx| stream.poll_capacity(cx)).await {
245                // No capacity left. drop body and return.
246                None => return Ok(()),
247
248                Some(Err(err)) => return Err(DispatchError::SendData(err)),
249
250                Some(Ok(cap)) => {
251                    // split chunk to writeable size and send to client
252                    let len = chunk.len();
253                    let bytes = chunk.split_to(cmp::min(len, cap));
254
255                    stream
256                        .send_data(bytes, false)
257                        .map_err(DispatchError::SendData)?;
258
259                    // Current chuck completely sent. break send loop and poll next one.
260                    if chunk.is_empty() {
261                        break 'send;
262                    }
263                }
264            }
265        }
266    }
267
268    // response body streaming finished. send end of stream and return.
269    stream
270        .send_data(Bytes::new(), true)
271        .map_err(DispatchError::SendData)?;
272
273    Ok(())
274}
275
276fn prepare_response(
277    config: ServiceConfig,
278    head: &ResponseHead,
279    size: &mut BodySize,
280) -> http::Response<()> {
281    let mut has_date = false;
282    let mut skip_len = size != &BodySize::Stream;
283
284    let mut res = http::Response::new(());
285    *res.status_mut() = head.status;
286    *res.version_mut() = http::Version::HTTP_2;
287
288    // Content length
289    match head.status {
290        http::StatusCode::NO_CONTENT
291        | http::StatusCode::CONTINUE
292        | http::StatusCode::PROCESSING => *size = BodySize::None,
293        http::StatusCode::SWITCHING_PROTOCOLS => {
294            skip_len = true;
295            *size = BodySize::Stream;
296        }
297        _ => {}
298    }
299
300    match size {
301        BodySize::None | BodySize::Stream => {}
302
303        BodySize::Sized(0) => {
304            #[allow(clippy::declare_interior_mutable_const)]
305            const HV_ZERO: HeaderValue = HeaderValue::from_static("0");
306            res.headers_mut().insert(CONTENT_LENGTH, HV_ZERO);
307        }
308
309        BodySize::Sized(len) => {
310            let mut buf = itoa::Buffer::new();
311
312            res.headers_mut().insert(
313                CONTENT_LENGTH,
314                HeaderValue::from_str(buf.format(*len)).unwrap(),
315            );
316        }
317    };
318
319    // copy headers
320    for (key, value) in head.headers.iter() {
321        match key {
322            // omit HTTP/1.x only headers according to:
323            // https://datatracker.ietf.org/doc/html/rfc7540#section-8.1.2.2
324            &CONNECTION | &TRANSFER_ENCODING | &UPGRADE => continue,
325
326            &CONTENT_LENGTH if skip_len => continue,
327            &DATE => has_date = true,
328
329            // omit HTTP/1.x only headers according to:
330            // https://datatracker.ietf.org/doc/html/rfc7540#section-8.1.2.2
331            hdr if hdr == HeaderName::from_static("keep-alive")
332                || hdr == HeaderName::from_static("proxy-connection") =>
333            {
334                continue
335            }
336
337            _ => {}
338        }
339
340        res.headers_mut().append(key, value.clone());
341    }
342
343    // set date header
344    if !has_date {
345        let mut bytes = BytesMut::with_capacity(29);
346        config.write_date_header_value(&mut bytes);
347        res.headers_mut().insert(
348            DATE,
349            // SAFETY: serialized date-times are known ASCII strings
350            unsafe { HeaderValue::from_maybe_shared_unchecked(bytes.freeze()) },
351        );
352    }
353
354    res
355}