actix_http/body/
body_stream.rs

1use std::{
2    error::Error as StdError,
3    pin::Pin,
4    task::{Context, Poll},
5};
6
7use bytes::Bytes;
8use futures_core::{ready, Stream};
9use pin_project_lite::pin_project;
10
11use super::{BodySize, MessageBody};
12
13pin_project! {
14    /// Streaming response wrapper.
15    ///
16    /// Response does not contain `Content-Length` header and appropriate transfer encoding is used.
17    pub struct BodyStream<S> {
18        #[pin]
19        stream: S,
20    }
21}
22
23// TODO: from_infallible method
24
25impl<S, E> BodyStream<S>
26where
27    S: Stream<Item = Result<Bytes, E>>,
28    E: Into<Box<dyn StdError>> + 'static,
29{
30    #[inline]
31    pub fn new(stream: S) -> Self {
32        BodyStream { stream }
33    }
34}
35
36impl<S, E> MessageBody for BodyStream<S>
37where
38    S: Stream<Item = Result<Bytes, E>>,
39    E: Into<Box<dyn StdError>> + 'static,
40{
41    type Error = E;
42
43    #[inline]
44    fn size(&self) -> BodySize {
45        BodySize::Stream
46    }
47
48    /// Attempts to pull out the next value of the underlying [`Stream`].
49    ///
50    /// Empty values are skipped to prevent [`BodyStream`]'s transmission being ended on a
51    /// zero-length chunk, but rather proceed until the underlying [`Stream`] ends.
52    fn poll_next(
53        mut self: Pin<&mut Self>,
54        cx: &mut Context<'_>,
55    ) -> Poll<Option<Result<Bytes, Self::Error>>> {
56        loop {
57            let stream = self.as_mut().project().stream;
58
59            let chunk = match ready!(stream.poll_next(cx)) {
60                Some(Ok(ref bytes)) if bytes.is_empty() => continue,
61                opt => opt,
62            };
63
64            return Poll::Ready(chunk);
65        }
66    }
67}
68
69#[cfg(test)]
70mod tests {
71    use std::{convert::Infallible, time::Duration};
72
73    use actix_rt::{
74        pin,
75        time::{sleep, Sleep},
76    };
77    use actix_utils::future::poll_fn;
78    use derive_more::{Display, Error};
79    use futures_core::ready;
80    use futures_util::{stream, FutureExt as _};
81    use pin_project_lite::pin_project;
82    use static_assertions::{assert_impl_all, assert_not_impl_any};
83
84    use super::*;
85    use crate::body::to_bytes;
86
87    assert_impl_all!(BodyStream<stream::Empty<Result<Bytes, crate::Error>>>: MessageBody);
88    assert_impl_all!(BodyStream<stream::Empty<Result<Bytes, &'static str>>>: MessageBody);
89    assert_impl_all!(BodyStream<stream::Repeat<Result<Bytes, &'static str>>>: MessageBody);
90    assert_impl_all!(BodyStream<stream::Empty<Result<Bytes, Infallible>>>: MessageBody);
91    assert_impl_all!(BodyStream<stream::Repeat<Result<Bytes, Infallible>>>: MessageBody);
92
93    assert_not_impl_any!(BodyStream<stream::Empty<Bytes>>: MessageBody);
94    assert_not_impl_any!(BodyStream<stream::Repeat<Bytes>>: MessageBody);
95    // crate::Error is not Clone
96    assert_not_impl_any!(BodyStream<stream::Repeat<Result<Bytes, crate::Error>>>: MessageBody);
97
98    #[actix_rt::test]
99    async fn skips_empty_chunks() {
100        let body = BodyStream::new(stream::iter(
101            ["1", "", "2"]
102                .iter()
103                .map(|&v| Ok::<_, Infallible>(Bytes::from(v))),
104        ));
105        pin!(body);
106
107        assert_eq!(
108            poll_fn(|cx| body.as_mut().poll_next(cx))
109                .await
110                .unwrap()
111                .ok(),
112            Some(Bytes::from("1")),
113        );
114        assert_eq!(
115            poll_fn(|cx| body.as_mut().poll_next(cx))
116                .await
117                .unwrap()
118                .ok(),
119            Some(Bytes::from("2")),
120        );
121    }
122
123    #[actix_rt::test]
124    async fn read_to_bytes() {
125        let body = BodyStream::new(stream::iter(
126            ["1", "", "2"]
127                .iter()
128                .map(|&v| Ok::<_, Infallible>(Bytes::from(v))),
129        ));
130
131        assert_eq!(to_bytes(body).await.ok(), Some(Bytes::from("12")));
132    }
133    #[derive(Debug, Display, Error)]
134    #[display("stream error")]
135    struct StreamErr;
136
137    #[actix_rt::test]
138    async fn stream_immediate_error() {
139        let body = BodyStream::new(stream::once(async { Err(StreamErr) }));
140        assert!(matches!(to_bytes(body).await, Err(StreamErr)));
141    }
142
143    #[actix_rt::test]
144    async fn stream_string_error() {
145        // `&'static str` does not impl `Error`
146        // but it does impl `Into<Box<dyn Error>>`
147
148        let body = BodyStream::new(stream::once(async { Err("stringy error") }));
149        assert!(matches!(to_bytes(body).await, Err("stringy error")));
150    }
151
152    #[actix_rt::test]
153    async fn stream_boxed_error() {
154        // `Box<dyn Error>` does not impl `Error`
155        // but it does impl `Into<Box<dyn Error>>`
156
157        let body = BodyStream::new(stream::once(async {
158            Err(Box::<dyn StdError>::from("stringy error"))
159        }));
160
161        assert_eq!(
162            to_bytes(body).await.unwrap_err().to_string(),
163            "stringy error"
164        );
165    }
166
167    #[actix_rt::test]
168    async fn stream_delayed_error() {
169        let body = BodyStream::new(stream::iter(vec![Ok(Bytes::from("1")), Err(StreamErr)]));
170        assert!(matches!(to_bytes(body).await, Err(StreamErr)));
171
172        pin_project! {
173            #[derive(Debug)]
174            #[project = TimeDelayStreamProj]
175            enum TimeDelayStream {
176                Start,
177                Sleep { delay: Pin<Box<Sleep>> },
178                Done,
179            }
180        }
181
182        impl Stream for TimeDelayStream {
183            type Item = Result<Bytes, StreamErr>;
184
185            fn poll_next(
186                mut self: Pin<&mut Self>,
187                cx: &mut Context<'_>,
188            ) -> Poll<Option<Self::Item>> {
189                match self.as_mut().get_mut() {
190                    TimeDelayStream::Start => {
191                        let sleep = sleep(Duration::from_millis(1));
192                        self.as_mut().set(TimeDelayStream::Sleep {
193                            delay: Box::pin(sleep),
194                        });
195                        cx.waker().wake_by_ref();
196                        Poll::Pending
197                    }
198
199                    TimeDelayStream::Sleep { ref mut delay } => {
200                        ready!(delay.poll_unpin(cx));
201                        self.set(TimeDelayStream::Done);
202                        cx.waker().wake_by_ref();
203                        Poll::Pending
204                    }
205
206                    TimeDelayStream::Done => Poll::Ready(Some(Err(StreamErr))),
207                }
208            }
209        }
210
211        let body = BodyStream::new(TimeDelayStream::Start);
212        assert!(matches!(to_bytes(body).await, Err(StreamErr)));
213    }
214}