actix_http/body/
body_stream.rs1use std::{
2 error::Error as StdError,
3 pin::Pin,
4 task::{Context, Poll},
5};
6
7use bytes::Bytes;
8use futures_core::{ready, Stream};
9use pin_project_lite::pin_project;
10
11use super::{BodySize, MessageBody};
12
13pin_project! {
14 pub struct BodyStream<S> {
18 #[pin]
19 stream: S,
20 }
21}
22
23impl<S, E> BodyStream<S>
26where
27 S: Stream<Item = Result<Bytes, E>>,
28 E: Into<Box<dyn StdError>> + 'static,
29{
30 #[inline]
31 pub fn new(stream: S) -> Self {
32 BodyStream { stream }
33 }
34}
35
36impl<S, E> MessageBody for BodyStream<S>
37where
38 S: Stream<Item = Result<Bytes, E>>,
39 E: Into<Box<dyn StdError>> + 'static,
40{
41 type Error = E;
42
43 #[inline]
44 fn size(&self) -> BodySize {
45 BodySize::Stream
46 }
47
48 fn poll_next(
53 mut self: Pin<&mut Self>,
54 cx: &mut Context<'_>,
55 ) -> Poll<Option<Result<Bytes, Self::Error>>> {
56 loop {
57 let stream = self.as_mut().project().stream;
58
59 let chunk = match ready!(stream.poll_next(cx)) {
60 Some(Ok(ref bytes)) if bytes.is_empty() => continue,
61 opt => opt,
62 };
63
64 return Poll::Ready(chunk);
65 }
66 }
67}
68
69#[cfg(test)]
70mod tests {
71 use std::{convert::Infallible, time::Duration};
72
73 use actix_rt::{
74 pin,
75 time::{sleep, Sleep},
76 };
77 use actix_utils::future::poll_fn;
78 use derive_more::{Display, Error};
79 use futures_core::ready;
80 use futures_util::{stream, FutureExt as _};
81 use pin_project_lite::pin_project;
82 use static_assertions::{assert_impl_all, assert_not_impl_any};
83
84 use super::*;
85 use crate::body::to_bytes;
86
87 assert_impl_all!(BodyStream<stream::Empty<Result<Bytes, crate::Error>>>: MessageBody);
88 assert_impl_all!(BodyStream<stream::Empty<Result<Bytes, &'static str>>>: MessageBody);
89 assert_impl_all!(BodyStream<stream::Repeat<Result<Bytes, &'static str>>>: MessageBody);
90 assert_impl_all!(BodyStream<stream::Empty<Result<Bytes, Infallible>>>: MessageBody);
91 assert_impl_all!(BodyStream<stream::Repeat<Result<Bytes, Infallible>>>: MessageBody);
92
93 assert_not_impl_any!(BodyStream<stream::Empty<Bytes>>: MessageBody);
94 assert_not_impl_any!(BodyStream<stream::Repeat<Bytes>>: MessageBody);
95 assert_not_impl_any!(BodyStream<stream::Repeat<Result<Bytes, crate::Error>>>: MessageBody);
97
98 #[actix_rt::test]
99 async fn skips_empty_chunks() {
100 let body = BodyStream::new(stream::iter(
101 ["1", "", "2"]
102 .iter()
103 .map(|&v| Ok::<_, Infallible>(Bytes::from(v))),
104 ));
105 pin!(body);
106
107 assert_eq!(
108 poll_fn(|cx| body.as_mut().poll_next(cx))
109 .await
110 .unwrap()
111 .ok(),
112 Some(Bytes::from("1")),
113 );
114 assert_eq!(
115 poll_fn(|cx| body.as_mut().poll_next(cx))
116 .await
117 .unwrap()
118 .ok(),
119 Some(Bytes::from("2")),
120 );
121 }
122
123 #[actix_rt::test]
124 async fn read_to_bytes() {
125 let body = BodyStream::new(stream::iter(
126 ["1", "", "2"]
127 .iter()
128 .map(|&v| Ok::<_, Infallible>(Bytes::from(v))),
129 ));
130
131 assert_eq!(to_bytes(body).await.ok(), Some(Bytes::from("12")));
132 }
133 #[derive(Debug, Display, Error)]
134 #[display("stream error")]
135 struct StreamErr;
136
137 #[actix_rt::test]
138 async fn stream_immediate_error() {
139 let body = BodyStream::new(stream::once(async { Err(StreamErr) }));
140 assert!(matches!(to_bytes(body).await, Err(StreamErr)));
141 }
142
143 #[actix_rt::test]
144 async fn stream_string_error() {
145 let body = BodyStream::new(stream::once(async { Err("stringy error") }));
149 assert!(matches!(to_bytes(body).await, Err("stringy error")));
150 }
151
152 #[actix_rt::test]
153 async fn stream_boxed_error() {
154 let body = BodyStream::new(stream::once(async {
158 Err(Box::<dyn StdError>::from("stringy error"))
159 }));
160
161 assert_eq!(
162 to_bytes(body).await.unwrap_err().to_string(),
163 "stringy error"
164 );
165 }
166
167 #[actix_rt::test]
168 async fn stream_delayed_error() {
169 let body = BodyStream::new(stream::iter(vec![Ok(Bytes::from("1")), Err(StreamErr)]));
170 assert!(matches!(to_bytes(body).await, Err(StreamErr)));
171
172 pin_project! {
173 #[derive(Debug)]
174 #[project = TimeDelayStreamProj]
175 enum TimeDelayStream {
176 Start,
177 Sleep { delay: Pin<Box<Sleep>> },
178 Done,
179 }
180 }
181
182 impl Stream for TimeDelayStream {
183 type Item = Result<Bytes, StreamErr>;
184
185 fn poll_next(
186 mut self: Pin<&mut Self>,
187 cx: &mut Context<'_>,
188 ) -> Poll<Option<Self::Item>> {
189 match self.as_mut().get_mut() {
190 TimeDelayStream::Start => {
191 let sleep = sleep(Duration::from_millis(1));
192 self.as_mut().set(TimeDelayStream::Sleep {
193 delay: Box::pin(sleep),
194 });
195 cx.waker().wake_by_ref();
196 Poll::Pending
197 }
198
199 TimeDelayStream::Sleep { ref mut delay } => {
200 ready!(delay.poll_unpin(cx));
201 self.set(TimeDelayStream::Done);
202 cx.waker().wake_by_ref();
203 Poll::Pending
204 }
205
206 TimeDelayStream::Done => Poll::Ready(Some(Err(StreamErr))),
207 }
208 }
209 }
210
211 let body = BodyStream::new(TimeDelayStream::Start);
212 assert!(matches!(to_bytes(body).await, Err(StreamErr)));
213 }
214}