1use std::{
2 cmp,
3 error::Error as StdError,
4 future::Future,
5 marker::PhantomData,
6 net,
7 pin::{pin, Pin},
8 rc::Rc,
9 task::{Context, Poll},
10};
11
12use actix_codec::{AsyncRead, AsyncWrite};
13use actix_rt::time::{sleep, Sleep};
14use actix_service::Service;
15use actix_utils::future::poll_fn;
16use bytes::{Bytes, BytesMut};
17use futures_core::ready;
18use h2::{
19 server::{Connection, SendResponse},
20 Ping, PingPong,
21};
22use pin_project_lite::pin_project;
23
24use crate::{
25 body::{BodySize, BoxBody, MessageBody},
26 config::ServiceConfig,
27 header::{
28 HeaderName, HeaderValue, CONNECTION, CONTENT_LENGTH, DATE, TRANSFER_ENCODING, UPGRADE,
29 },
30 service::HttpFlow,
31 Extensions, Method, OnConnectData, Payload, Request, Response, ResponseHead,
32};
33
34const CHUNK_SIZE: usize = 16_384;
35
36pin_project! {
37 pub struct Dispatcher<T, S, B, X, U> {
39 flow: Rc<HttpFlow<S, X, U>>,
40 connection: Connection<T, Bytes>,
41 conn_data: Option<Rc<Extensions>>,
42 config: ServiceConfig,
43 peer_addr: Option<net::SocketAddr>,
44 ping_pong: Option<H2PingPong>,
45 _phantom: PhantomData<B>
46 }
47}
48
49impl<T, S, B, X, U> Dispatcher<T, S, B, X, U>
50where
51 T: AsyncRead + AsyncWrite + Unpin,
52{
53 pub(crate) fn new(
54 mut conn: Connection<T, Bytes>,
55 flow: Rc<HttpFlow<S, X, U>>,
56 config: ServiceConfig,
57 peer_addr: Option<net::SocketAddr>,
58 conn_data: OnConnectData,
59 timer: Option<Pin<Box<Sleep>>>,
60 ) -> Self {
61 let ping_pong = config.keep_alive().duration().map(|dur| H2PingPong {
62 timer: timer
63 .map(|mut timer| {
64 timer.as_mut().reset((config.now() + dur).into());
66 timer
67 })
68 .unwrap_or_else(|| Box::pin(sleep(dur))),
69 in_flight: false,
70 ping_pong: conn.ping_pong().unwrap(),
71 });
72
73 Self {
74 flow,
75 config,
76 peer_addr,
77 connection: conn,
78 conn_data: conn_data.0.map(Rc::new),
79 ping_pong,
80 _phantom: PhantomData,
81 }
82 }
83}
84
85struct H2PingPong {
86 ping_pong: PingPong,
88
89 in_flight: bool,
91
92 timer: Pin<Box<Sleep>>,
94}
95
96impl<T, S, B, X, U> Future for Dispatcher<T, S, B, X, U>
97where
98 T: AsyncRead + AsyncWrite + Unpin,
99
100 S: Service<Request>,
101 S::Error: Into<Response<BoxBody>>,
102 S::Future: 'static,
103 S::Response: Into<Response<B>>,
104
105 B: MessageBody,
106{
107 type Output = Result<(), crate::error::DispatchError>;
108
109 #[inline]
110 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
111 let this = self.get_mut();
112
113 loop {
114 match Pin::new(&mut this.connection).poll_accept(cx)? {
115 Poll::Ready(Some((req, tx))) => {
116 let (parts, body) = req.into_parts();
117 let payload = crate::h2::Payload::new(body);
118 let pl = Payload::H2 { payload };
119 let mut req = Request::with_payload(pl);
120 let head_req = parts.method == Method::HEAD;
121
122 let head = req.head_mut();
123 head.uri = parts.uri;
124 head.method = parts.method;
125 head.version = parts.version;
126 head.headers = parts.headers.into();
127 head.peer_addr = this.peer_addr;
128
129 req.conn_data.clone_from(&this.conn_data);
130
131 let fut = this.flow.service.call(req);
132 let config = this.config.clone();
133
134 actix_rt::spawn(async move {
136 let res = match fut.await {
138 Ok(res) => handle_response(res.into(), tx, config, head_req).await,
139 Err(err) => {
140 let res: Response<BoxBody> = err.into();
141 handle_response(res, tx, config, head_req).await
142 }
143 };
144
145 if let Err(err) = res {
147 match err {
148 DispatchError::SendResponse(err) => {
149 tracing::trace!("Error sending response: {err:?}");
150 }
151 DispatchError::SendData(err) => {
152 tracing::warn!("Send data error: {err:?}");
153 }
154 DispatchError::ResponseBody(err) => {
155 tracing::error!("Response payload stream error: {err:?}");
156 }
157 }
158 }
159 });
160 }
161 Poll::Ready(None) => return Poll::Ready(Ok(())),
162
163 Poll::Pending => match this.ping_pong.as_mut() {
164 Some(ping_pong) => loop {
165 if ping_pong.in_flight {
166 match ping_pong.ping_pong.poll_pong(cx)? {
170 Poll::Ready(_) => {
171 ping_pong.in_flight = false;
172
173 let dead_line = this.config.keep_alive_deadline().unwrap();
174 ping_pong.timer.as_mut().reset(dead_line.into());
175 }
176 Poll::Pending => {
177 return ping_pong.timer.as_mut().poll(cx).map(|_| Ok(()));
178 }
179 }
180 } else {
181 ready!(ping_pong.timer.as_mut().poll(cx));
185
186 ping_pong.ping_pong.send_ping(Ping::opaque())?;
187
188 let dead_line = this.config.keep_alive_deadline().unwrap();
189 ping_pong.timer.as_mut().reset(dead_line.into());
190
191 ping_pong.in_flight = true;
192 }
193 },
194 None => return Poll::Pending,
195 },
196 }
197 }
198 }
199}
200
201enum DispatchError {
202 SendResponse(h2::Error),
203 SendData(h2::Error),
204 ResponseBody(Box<dyn StdError>),
205}
206
207async fn handle_response<B>(
208 res: Response<B>,
209 mut tx: SendResponse<Bytes>,
210 config: ServiceConfig,
211 head_req: bool,
212) -> Result<(), DispatchError>
213where
214 B: MessageBody,
215{
216 let (res, body) = res.replace_body(());
217
218 let mut size = body.size();
220 let res = prepare_response(config, res.head(), &mut size);
221 let eof_or_head = size.is_eof() || head_req;
222
223 let mut stream = tx
225 .send_response(res, eof_or_head)
226 .map_err(DispatchError::SendResponse)?;
227
228 if eof_or_head {
229 return Ok(());
230 }
231
232 let mut body = pin!(body);
233
234 while let Some(res) = poll_fn(|cx| body.as_mut().poll_next(cx)).await {
236 let mut chunk = res.map_err(|err| DispatchError::ResponseBody(err.into()))?;
237
238 'send: loop {
239 let chunk_size = cmp::min(chunk.len(), CHUNK_SIZE);
240
241 stream.reserve_capacity(chunk_size);
243
244 match poll_fn(|cx| stream.poll_capacity(cx)).await {
245 None => return Ok(()),
247
248 Some(Err(err)) => return Err(DispatchError::SendData(err)),
249
250 Some(Ok(cap)) => {
251 let len = chunk.len();
253 let bytes = chunk.split_to(cmp::min(len, cap));
254
255 stream
256 .send_data(bytes, false)
257 .map_err(DispatchError::SendData)?;
258
259 if chunk.is_empty() {
261 break 'send;
262 }
263 }
264 }
265 }
266 }
267
268 stream
270 .send_data(Bytes::new(), true)
271 .map_err(DispatchError::SendData)?;
272
273 Ok(())
274}
275
276fn prepare_response(
277 config: ServiceConfig,
278 head: &ResponseHead,
279 size: &mut BodySize,
280) -> http::Response<()> {
281 let mut has_date = false;
282 let mut skip_len = size != &BodySize::Stream;
283
284 let mut res = http::Response::new(());
285 *res.status_mut() = head.status;
286 *res.version_mut() = http::Version::HTTP_2;
287
288 match head.status {
290 http::StatusCode::NO_CONTENT
291 | http::StatusCode::CONTINUE
292 | http::StatusCode::PROCESSING => *size = BodySize::None,
293 http::StatusCode::SWITCHING_PROTOCOLS => {
294 skip_len = true;
295 *size = BodySize::Stream;
296 }
297 _ => {}
298 }
299
300 match size {
301 BodySize::None | BodySize::Stream => {}
302
303 BodySize::Sized(0) => {
304 #[allow(clippy::declare_interior_mutable_const)]
305 const HV_ZERO: HeaderValue = HeaderValue::from_static("0");
306 res.headers_mut().insert(CONTENT_LENGTH, HV_ZERO);
307 }
308
309 BodySize::Sized(len) => {
310 let mut buf = itoa::Buffer::new();
311
312 res.headers_mut().insert(
313 CONTENT_LENGTH,
314 HeaderValue::from_str(buf.format(*len)).unwrap(),
315 );
316 }
317 };
318
319 for (key, value) in head.headers.iter() {
321 match key {
322 &CONNECTION | &TRANSFER_ENCODING | &UPGRADE => continue,
325
326 &CONTENT_LENGTH if skip_len => continue,
327 &DATE => has_date = true,
328
329 hdr if hdr == HeaderName::from_static("keep-alive")
332 || hdr == HeaderName::from_static("proxy-connection") =>
333 {
334 continue
335 }
336
337 _ => {}
338 }
339
340 res.headers_mut().append(key, value.clone());
341 }
342
343 if !has_date {
345 let mut bytes = BytesMut::with_capacity(29);
346 config.write_date_header_value(&mut bytes);
347 res.headers_mut().insert(
348 DATE,
349 unsafe { HeaderValue::from_maybe_shared_unchecked(bytes.freeze()) },
351 );
352 }
353
354 res
355}