1use std::{
2 future::Future,
3 pin::Pin,
4 task::{Context, Poll},
5};
6
7use actix_codec::{AsyncRead, AsyncWrite, Framed};
8use pin_project_lite::pin_project;
9
10use crate::{
11 body::{BodySize, MessageBody},
12 h1::{Codec, Message},
13 Error, Response,
14};
15
16pin_project! {
17 pub struct SendResponse<T, B> {
19 res: Option<Message<(Response<()>, BodySize)>>,
20
21 #[pin]
22 body: Option<B>,
23
24 #[pin]
25 framed: Option<Framed<T, Codec>>,
26 }
27}
28
29impl<T, B> SendResponse<T, B>
30where
31 B: MessageBody,
32 B::Error: Into<Error>,
33{
34 pub fn new(framed: Framed<T, Codec>, response: Response<B>) -> Self {
35 let (res, body) = response.into_parts();
36
37 SendResponse {
38 res: Some((res, body.size()).into()),
39 body: Some(body),
40 framed: Some(framed),
41 }
42 }
43}
44
45impl<T, B> Future for SendResponse<T, B>
46where
47 T: AsyncRead + AsyncWrite + Unpin,
48 B: MessageBody,
49 B::Error: Into<Error>,
50{
51 type Output = Result<Framed<T, Codec>, Error>;
52
53 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
55 let mut this = self.as_mut().project();
56
57 let mut body_done = this.body.is_none();
58 loop {
59 let mut body_ready = !body_done;
60
61 if this.res.is_none() && body_ready {
63 while body_ready
64 && !body_done
65 && !this
66 .framed
67 .as_ref()
68 .as_pin_ref()
69 .unwrap()
70 .is_write_buf_full()
71 {
72 let next = match this.body.as_mut().as_pin_mut().unwrap().poll_next(cx) {
73 Poll::Ready(Some(Ok(item))) => Poll::Ready(Some(item)),
74 Poll::Ready(Some(Err(err))) => return Poll::Ready(Err(err.into())),
75 Poll::Ready(None) => Poll::Ready(None),
76 Poll::Pending => Poll::Pending,
77 };
78
79 match next {
80 Poll::Ready(item) => {
81 body_done = item.is_none();
83 if body_done {
84 this.body.set(None);
85 }
86 let framed = this.framed.as_mut().as_pin_mut().unwrap();
87 framed
88 .write(Message::Chunk(item))
89 .map_err(|err| Error::new_send_response().with_cause(err))?;
90 }
91 Poll::Pending => body_ready = false,
92 }
93 }
94 }
95
96 let framed = this.framed.as_mut().as_pin_mut().unwrap();
97
98 if !framed.is_write_buf_empty() {
100 match framed
101 .flush(cx)
102 .map_err(|err| Error::new_send_response().with_cause(err))?
103 {
104 Poll::Ready(_) => {
105 if body_ready {
106 continue;
107 } else {
108 return Poll::Pending;
109 }
110 }
111 Poll::Pending => return Poll::Pending,
112 }
113 }
114
115 if let Some(res) = this.res.take() {
117 framed
118 .write(res)
119 .map_err(|err| Error::new_send_response().with_cause(err))?;
120 continue;
121 }
122
123 if !body_done {
124 if body_ready {
125 continue;
126 } else {
127 return Poll::Pending;
128 }
129 } else {
130 break;
131 }
132 }
133
134 let framed = this.framed.take().unwrap();
135
136 Poll::Ready(Ok(framed))
137 }
138}