actix_http/h1/
utils.rs

1use std::{
2    future::Future,
3    pin::Pin,
4    task::{Context, Poll},
5};
6
7use actix_codec::{AsyncRead, AsyncWrite, Framed};
8use pin_project_lite::pin_project;
9
10use crate::{
11    body::{BodySize, MessageBody},
12    h1::{Codec, Message},
13    Error, Response,
14};
15
16pin_project! {
17    /// Send HTTP/1 response
18    pub struct SendResponse<T, B> {
19        res: Option<Message<(Response<()>, BodySize)>>,
20
21        #[pin]
22        body: Option<B>,
23
24        #[pin]
25        framed: Option<Framed<T, Codec>>,
26    }
27}
28
29impl<T, B> SendResponse<T, B>
30where
31    B: MessageBody,
32    B::Error: Into<Error>,
33{
34    pub fn new(framed: Framed<T, Codec>, response: Response<B>) -> Self {
35        let (res, body) = response.into_parts();
36
37        SendResponse {
38            res: Some((res, body.size()).into()),
39            body: Some(body),
40            framed: Some(framed),
41        }
42    }
43}
44
45impl<T, B> Future for SendResponse<T, B>
46where
47    T: AsyncRead + AsyncWrite + Unpin,
48    B: MessageBody,
49    B::Error: Into<Error>,
50{
51    type Output = Result<Framed<T, Codec>, Error>;
52
53    // TODO: rethink if we need loops in polls
54    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
55        let mut this = self.as_mut().project();
56
57        let mut body_done = this.body.is_none();
58        loop {
59            let mut body_ready = !body_done;
60
61            // send body
62            if this.res.is_none() && body_ready {
63                while body_ready
64                    && !body_done
65                    && !this
66                        .framed
67                        .as_ref()
68                        .as_pin_ref()
69                        .unwrap()
70                        .is_write_buf_full()
71                {
72                    let next = match this.body.as_mut().as_pin_mut().unwrap().poll_next(cx) {
73                        Poll::Ready(Some(Ok(item))) => Poll::Ready(Some(item)),
74                        Poll::Ready(Some(Err(err))) => return Poll::Ready(Err(err.into())),
75                        Poll::Ready(None) => Poll::Ready(None),
76                        Poll::Pending => Poll::Pending,
77                    };
78
79                    match next {
80                        Poll::Ready(item) => {
81                            // body is done when item is None
82                            body_done = item.is_none();
83                            if body_done {
84                                this.body.set(None);
85                            }
86                            let framed = this.framed.as_mut().as_pin_mut().unwrap();
87                            framed
88                                .write(Message::Chunk(item))
89                                .map_err(|err| Error::new_send_response().with_cause(err))?;
90                        }
91                        Poll::Pending => body_ready = false,
92                    }
93                }
94            }
95
96            let framed = this.framed.as_mut().as_pin_mut().unwrap();
97
98            // flush write buffer
99            if !framed.is_write_buf_empty() {
100                match framed
101                    .flush(cx)
102                    .map_err(|err| Error::new_send_response().with_cause(err))?
103                {
104                    Poll::Ready(_) => {
105                        if body_ready {
106                            continue;
107                        } else {
108                            return Poll::Pending;
109                        }
110                    }
111                    Poll::Pending => return Poll::Pending,
112                }
113            }
114
115            // send response
116            if let Some(res) = this.res.take() {
117                framed
118                    .write(res)
119                    .map_err(|err| Error::new_send_response().with_cause(err))?;
120                continue;
121            }
122
123            if !body_done {
124                if body_ready {
125                    continue;
126                } else {
127                    return Poll::Pending;
128                }
129            } else {
130                break;
131            }
132        }
133
134        let framed = this.framed.take().unwrap();
135
136        Poll::Ready(Ok(framed))
137    }
138}