actix_http/body/
message_body.rs

1//! [`MessageBody`] trait and foreign implementations.
2
3use std::{
4    convert::Infallible,
5    error::Error as StdError,
6    mem,
7    pin::Pin,
8    task::{Context, Poll},
9};
10
11use bytes::{Bytes, BytesMut};
12use futures_core::ready;
13use pin_project_lite::pin_project;
14
15use super::{BodySize, BoxBody};
16
17/// An interface for types that can be used as a response body.
18///
19/// It is not usually necessary to create custom body types, this trait is already [implemented for
20/// a large number of sensible body types](#foreign-impls) including:
21/// - Empty body: `()`
22/// - Text-based: `String`, `&'static str`, [`ByteString`](https://docs.rs/bytestring/1).
23/// - Byte-based: `Bytes`, `BytesMut`, `Vec<u8>`, `&'static [u8]`;
24/// - Streams: [`BodyStream`](super::BodyStream), [`SizedStream`](super::SizedStream)
25///
26/// # Examples
27/// ```
28/// # use std::convert::Infallible;
29/// # use std::task::{Poll, Context};
30/// # use std::pin::Pin;
31/// # use bytes::Bytes;
32/// # use actix_http::body::{BodySize, MessageBody};
33/// struct Repeat {
34///     chunk: String,
35///     n_times: usize,
36/// }
37///
38/// impl MessageBody for Repeat {
39///     type Error = Infallible;
40///
41///     fn size(&self) -> BodySize {
42///         BodySize::Sized((self.chunk.len() * self.n_times) as u64)
43///     }
44///
45///     fn poll_next(
46///         self: Pin<&mut Self>,
47///         _cx: &mut Context<'_>,
48///     ) -> Poll<Option<Result<Bytes, Self::Error>>> {
49///         let payload_string = self.chunk.repeat(self.n_times);
50///         let payload_bytes = Bytes::from(payload_string);
51///         Poll::Ready(Some(Ok(payload_bytes)))
52///     }
53/// }
54/// ```
55pub trait MessageBody {
56    /// The type of error that will be returned if streaming body fails.
57    ///
58    /// Since it is not appropriate to generate a response mid-stream, it only requires `Error` for
59    /// internal use and logging.
60    type Error: Into<Box<dyn StdError>>;
61
62    /// Body size hint.
63    ///
64    /// If [`BodySize::None`] is returned, optimizations that skip reading the body are allowed.
65    fn size(&self) -> BodySize;
66
67    /// Attempt to pull out the next chunk of body bytes.
68    ///
69    /// # Return Value
70    /// Similar to the `Stream` interface, there are several possible return values, each indicating
71    /// a distinct state:
72    /// - `Poll::Pending` means that this body's next chunk is not ready yet. Implementations must
73    ///   ensure that the current task will be notified when the next chunk may be ready.
74    /// - `Poll::Ready(Some(val))` means that the body has successfully produced a chunk, `val`,
75    ///   and may produce further values on subsequent `poll_next` calls.
76    /// - `Poll::Ready(None)` means that the body is complete, and `poll_next` should not be
77    ///   invoked again.
78    ///
79    /// # Panics
80    /// Once a body is complete (i.e., `poll_next` returned `Ready(None)`), calling its `poll_next`
81    /// method again may panic, block forever, or cause other kinds of problems; this trait places
82    /// no requirements on the effects of such a call. However, as the `poll_next` method is not
83    /// marked unsafe, Rust’s usual rules apply: calls must never cause UB, regardless of its state.
84    fn poll_next(
85        self: Pin<&mut Self>,
86        cx: &mut Context<'_>,
87    ) -> Poll<Option<Result<Bytes, Self::Error>>>;
88
89    /// Try to convert into the complete chunk of body bytes.
90    ///
91    /// Override this method if the complete body can be trivially extracted. This is useful for
92    /// optimizations where `poll_next` calls can be avoided.
93    ///
94    /// Body types with [`BodySize::None`] are allowed to return empty `Bytes`. Although, if calling
95    /// this method, it is recommended to check `size` first and return early.
96    ///
97    /// # Errors
98    /// The default implementation will error and return the original type back to the caller for
99    /// further use.
100    #[inline]
101    fn try_into_bytes(self) -> Result<Bytes, Self>
102    where
103        Self: Sized,
104    {
105        Err(self)
106    }
107
108    /// Wraps this body into a `BoxBody`.
109    ///
110    /// No-op when called on a `BoxBody`, meaning there is no risk of double boxing when calling
111    /// this on a generic `MessageBody`. Prefer this over [`BoxBody::new`] when a boxed body
112    /// is required.
113    #[inline]
114    fn boxed(self) -> BoxBody
115    where
116        Self: Sized + 'static,
117    {
118        BoxBody::new(self)
119    }
120}
121
122mod foreign_impls {
123    use std::{borrow::Cow, ops::DerefMut};
124
125    use super::*;
126
127    impl<B> MessageBody for &mut B
128    where
129        B: MessageBody + Unpin + ?Sized,
130    {
131        type Error = B::Error;
132
133        fn size(&self) -> BodySize {
134            (**self).size()
135        }
136
137        fn poll_next(
138            mut self: Pin<&mut Self>,
139            cx: &mut Context<'_>,
140        ) -> Poll<Option<Result<Bytes, Self::Error>>> {
141            Pin::new(&mut **self).poll_next(cx)
142        }
143    }
144
145    impl MessageBody for Infallible {
146        type Error = Infallible;
147
148        fn size(&self) -> BodySize {
149            match *self {}
150        }
151
152        fn poll_next(
153            self: Pin<&mut Self>,
154            _cx: &mut Context<'_>,
155        ) -> Poll<Option<Result<Bytes, Self::Error>>> {
156            match *self {}
157        }
158    }
159
160    impl MessageBody for () {
161        type Error = Infallible;
162
163        #[inline]
164        fn size(&self) -> BodySize {
165            BodySize::Sized(0)
166        }
167
168        #[inline]
169        fn poll_next(
170            self: Pin<&mut Self>,
171            _cx: &mut Context<'_>,
172        ) -> Poll<Option<Result<Bytes, Self::Error>>> {
173            Poll::Ready(None)
174        }
175
176        #[inline]
177        fn try_into_bytes(self) -> Result<Bytes, Self> {
178            Ok(Bytes::new())
179        }
180    }
181
182    impl<B> MessageBody for Box<B>
183    where
184        B: MessageBody + Unpin + ?Sized,
185    {
186        type Error = B::Error;
187
188        #[inline]
189        fn size(&self) -> BodySize {
190            self.as_ref().size()
191        }
192
193        #[inline]
194        fn poll_next(
195            self: Pin<&mut Self>,
196            cx: &mut Context<'_>,
197        ) -> Poll<Option<Result<Bytes, Self::Error>>> {
198            Pin::new(self.get_mut().as_mut()).poll_next(cx)
199        }
200    }
201
202    impl<T, B> MessageBody for Pin<T>
203    where
204        T: DerefMut<Target = B> + Unpin,
205        B: MessageBody + ?Sized,
206    {
207        type Error = B::Error;
208
209        #[inline]
210        fn size(&self) -> BodySize {
211            self.as_ref().size()
212        }
213
214        #[inline]
215        fn poll_next(
216            self: Pin<&mut Self>,
217            cx: &mut Context<'_>,
218        ) -> Poll<Option<Result<Bytes, Self::Error>>> {
219            self.get_mut().as_mut().poll_next(cx)
220        }
221    }
222
223    impl MessageBody for &'static [u8] {
224        type Error = Infallible;
225
226        #[inline]
227        fn size(&self) -> BodySize {
228            BodySize::Sized(self.len() as u64)
229        }
230
231        #[inline]
232        fn poll_next(
233            self: Pin<&mut Self>,
234            _cx: &mut Context<'_>,
235        ) -> Poll<Option<Result<Bytes, Self::Error>>> {
236            if self.is_empty() {
237                Poll::Ready(None)
238            } else {
239                Poll::Ready(Some(Ok(Bytes::from_static(mem::take(self.get_mut())))))
240            }
241        }
242
243        #[inline]
244        fn try_into_bytes(self) -> Result<Bytes, Self> {
245            Ok(Bytes::from_static(self))
246        }
247    }
248
249    impl MessageBody for Bytes {
250        type Error = Infallible;
251
252        #[inline]
253        fn size(&self) -> BodySize {
254            BodySize::Sized(self.len() as u64)
255        }
256
257        #[inline]
258        fn poll_next(
259            self: Pin<&mut Self>,
260            _cx: &mut Context<'_>,
261        ) -> Poll<Option<Result<Bytes, Self::Error>>> {
262            if self.is_empty() {
263                Poll::Ready(None)
264            } else {
265                Poll::Ready(Some(Ok(mem::take(self.get_mut()))))
266            }
267        }
268
269        #[inline]
270        fn try_into_bytes(self) -> Result<Bytes, Self> {
271            Ok(self)
272        }
273    }
274
275    impl MessageBody for BytesMut {
276        type Error = Infallible;
277
278        #[inline]
279        fn size(&self) -> BodySize {
280            BodySize::Sized(self.len() as u64)
281        }
282
283        #[inline]
284        fn poll_next(
285            self: Pin<&mut Self>,
286            _cx: &mut Context<'_>,
287        ) -> Poll<Option<Result<Bytes, Self::Error>>> {
288            if self.is_empty() {
289                Poll::Ready(None)
290            } else {
291                Poll::Ready(Some(Ok(mem::take(self.get_mut()).freeze())))
292            }
293        }
294
295        #[inline]
296        fn try_into_bytes(self) -> Result<Bytes, Self> {
297            Ok(self.freeze())
298        }
299    }
300
301    impl MessageBody for Vec<u8> {
302        type Error = Infallible;
303
304        #[inline]
305        fn size(&self) -> BodySize {
306            BodySize::Sized(self.len() as u64)
307        }
308
309        #[inline]
310        fn poll_next(
311            self: Pin<&mut Self>,
312            _cx: &mut Context<'_>,
313        ) -> Poll<Option<Result<Bytes, Self::Error>>> {
314            if self.is_empty() {
315                Poll::Ready(None)
316            } else {
317                Poll::Ready(Some(Ok(mem::take(self.get_mut()).into())))
318            }
319        }
320
321        #[inline]
322        fn try_into_bytes(self) -> Result<Bytes, Self> {
323            Ok(Bytes::from(self))
324        }
325    }
326
327    impl MessageBody for Cow<'static, [u8]> {
328        type Error = Infallible;
329
330        #[inline]
331        fn size(&self) -> BodySize {
332            BodySize::Sized(self.len() as u64)
333        }
334
335        #[inline]
336        fn poll_next(
337            self: Pin<&mut Self>,
338            _cx: &mut Context<'_>,
339        ) -> Poll<Option<Result<Bytes, Self::Error>>> {
340            if self.is_empty() {
341                Poll::Ready(None)
342            } else {
343                let bytes = match mem::take(self.get_mut()) {
344                    Cow::Borrowed(b) => Bytes::from_static(b),
345                    Cow::Owned(b) => Bytes::from(b),
346                };
347                Poll::Ready(Some(Ok(bytes)))
348            }
349        }
350
351        #[inline]
352        fn try_into_bytes(self) -> Result<Bytes, Self> {
353            match self {
354                Cow::Borrowed(b) => Ok(Bytes::from_static(b)),
355                Cow::Owned(b) => Ok(Bytes::from(b)),
356            }
357        }
358    }
359
360    impl MessageBody for &'static str {
361        type Error = Infallible;
362
363        #[inline]
364        fn size(&self) -> BodySize {
365            BodySize::Sized(self.len() as u64)
366        }
367
368        #[inline]
369        fn poll_next(
370            self: Pin<&mut Self>,
371            _cx: &mut Context<'_>,
372        ) -> Poll<Option<Result<Bytes, Self::Error>>> {
373            if self.is_empty() {
374                Poll::Ready(None)
375            } else {
376                let string = mem::take(self.get_mut());
377                let bytes = Bytes::from_static(string.as_bytes());
378                Poll::Ready(Some(Ok(bytes)))
379            }
380        }
381
382        #[inline]
383        fn try_into_bytes(self) -> Result<Bytes, Self> {
384            Ok(Bytes::from_static(self.as_bytes()))
385        }
386    }
387
388    impl MessageBody for String {
389        type Error = Infallible;
390
391        #[inline]
392        fn size(&self) -> BodySize {
393            BodySize::Sized(self.len() as u64)
394        }
395
396        #[inline]
397        fn poll_next(
398            self: Pin<&mut Self>,
399            _cx: &mut Context<'_>,
400        ) -> Poll<Option<Result<Bytes, Self::Error>>> {
401            if self.is_empty() {
402                Poll::Ready(None)
403            } else {
404                let string = mem::take(self.get_mut());
405                Poll::Ready(Some(Ok(Bytes::from(string))))
406            }
407        }
408
409        #[inline]
410        fn try_into_bytes(self) -> Result<Bytes, Self> {
411            Ok(Bytes::from(self))
412        }
413    }
414
415    impl MessageBody for Cow<'static, str> {
416        type Error = Infallible;
417
418        #[inline]
419        fn size(&self) -> BodySize {
420            BodySize::Sized(self.len() as u64)
421        }
422
423        #[inline]
424        fn poll_next(
425            self: Pin<&mut Self>,
426            _cx: &mut Context<'_>,
427        ) -> Poll<Option<Result<Bytes, Self::Error>>> {
428            if self.is_empty() {
429                Poll::Ready(None)
430            } else {
431                let bytes = match mem::take(self.get_mut()) {
432                    Cow::Borrowed(s) => Bytes::from_static(s.as_bytes()),
433                    Cow::Owned(s) => Bytes::from(s.into_bytes()),
434                };
435                Poll::Ready(Some(Ok(bytes)))
436            }
437        }
438
439        #[inline]
440        fn try_into_bytes(self) -> Result<Bytes, Self> {
441            match self {
442                Cow::Borrowed(s) => Ok(Bytes::from_static(s.as_bytes())),
443                Cow::Owned(s) => Ok(Bytes::from(s.into_bytes())),
444            }
445        }
446    }
447
448    impl MessageBody for bytestring::ByteString {
449        type Error = Infallible;
450
451        #[inline]
452        fn size(&self) -> BodySize {
453            BodySize::Sized(self.len() as u64)
454        }
455
456        #[inline]
457        fn poll_next(
458            self: Pin<&mut Self>,
459            _cx: &mut Context<'_>,
460        ) -> Poll<Option<Result<Bytes, Self::Error>>> {
461            let string = mem::take(self.get_mut());
462            Poll::Ready(Some(Ok(string.into_bytes())))
463        }
464
465        #[inline]
466        fn try_into_bytes(self) -> Result<Bytes, Self> {
467            Ok(self.into_bytes())
468        }
469    }
470}
471
472pin_project! {
473    pub(crate) struct MessageBodyMapErr<B, F> {
474        #[pin]
475        body: B,
476        mapper: Option<F>,
477    }
478}
479
480impl<B, F, E> MessageBodyMapErr<B, F>
481where
482    B: MessageBody,
483    F: FnOnce(B::Error) -> E,
484{
485    pub(crate) fn new(body: B, mapper: F) -> Self {
486        Self {
487            body,
488            mapper: Some(mapper),
489        }
490    }
491}
492
493impl<B, F, E> MessageBody for MessageBodyMapErr<B, F>
494where
495    B: MessageBody,
496    F: FnOnce(B::Error) -> E,
497    E: Into<Box<dyn StdError>>,
498{
499    type Error = E;
500
501    #[inline]
502    fn size(&self) -> BodySize {
503        self.body.size()
504    }
505
506    fn poll_next(
507        mut self: Pin<&mut Self>,
508        cx: &mut Context<'_>,
509    ) -> Poll<Option<Result<Bytes, Self::Error>>> {
510        let this = self.as_mut().project();
511
512        match ready!(this.body.poll_next(cx)) {
513            Some(Err(err)) => {
514                let f = self.as_mut().project().mapper.take().unwrap();
515                let mapped_err = (f)(err);
516                Poll::Ready(Some(Err(mapped_err)))
517            }
518            Some(Ok(val)) => Poll::Ready(Some(Ok(val))),
519            None => Poll::Ready(None),
520        }
521    }
522
523    #[inline]
524    fn try_into_bytes(self) -> Result<Bytes, Self> {
525        let Self { body, mapper } = self;
526        body.try_into_bytes().map_err(|body| Self { body, mapper })
527    }
528}
529
530#[cfg(test)]
531mod tests {
532    use actix_rt::pin;
533    use actix_utils::future::poll_fn;
534    use futures_util::stream;
535
536    use super::*;
537    use crate::body::{self, EitherBody};
538
539    macro_rules! assert_poll_next {
540        ($pin:expr, $exp:expr) => {
541            assert_eq!(
542                poll_fn(|cx| $pin.as_mut().poll_next(cx))
543                    .await
544                    .unwrap() // unwrap option
545                    .unwrap(), // unwrap result
546                $exp
547            );
548        };
549    }
550
551    macro_rules! assert_poll_next_none {
552        ($pin:expr) => {
553            assert!(poll_fn(|cx| $pin.as_mut().poll_next(cx)).await.is_none());
554        };
555    }
556
557    #[allow(unused_allocation)] // triggered by `Box::new(()).size()`
558    #[actix_rt::test]
559    async fn boxing_equivalence() {
560        assert_eq!(().size(), BodySize::Sized(0));
561        assert_eq!(().size(), Box::new(()).size());
562        assert_eq!(().size(), Box::pin(()).size());
563
564        let pl = Box::new(());
565        pin!(pl);
566        assert_poll_next_none!(pl);
567
568        let mut pl = Box::pin(());
569        assert_poll_next_none!(pl);
570    }
571
572    #[actix_rt::test]
573    async fn mut_equivalence() {
574        assert_eq!(().size(), BodySize::Sized(0));
575        assert_eq!(().size(), (&(&mut ())).size());
576
577        let pl = &mut ();
578        pin!(pl);
579        assert_poll_next_none!(pl);
580
581        let pl = &mut Box::new(());
582        pin!(pl);
583        assert_poll_next_none!(pl);
584
585        let mut body = body::SizedStream::new(
586            8,
587            stream::iter([
588                Ok::<_, std::io::Error>(Bytes::from("1234")),
589                Ok(Bytes::from("5678")),
590            ]),
591        );
592        let body = &mut body;
593        assert_eq!(body.size(), BodySize::Sized(8));
594        pin!(body);
595        assert_poll_next!(body, Bytes::from_static(b"1234"));
596        assert_poll_next!(body, Bytes::from_static(b"5678"));
597        assert_poll_next_none!(body);
598    }
599
600    #[allow(clippy::let_unit_value)]
601    #[actix_rt::test]
602    async fn test_unit() {
603        let pl = ();
604        assert_eq!(pl.size(), BodySize::Sized(0));
605        pin!(pl);
606        assert_poll_next_none!(pl);
607    }
608
609    #[actix_rt::test]
610    async fn test_static_str() {
611        assert_eq!("".size(), BodySize::Sized(0));
612        assert_eq!("test".size(), BodySize::Sized(4));
613
614        let pl = "test";
615        pin!(pl);
616        assert_poll_next!(pl, Bytes::from("test"));
617    }
618
619    #[actix_rt::test]
620    async fn test_static_bytes() {
621        assert_eq!(b"".as_ref().size(), BodySize::Sized(0));
622        assert_eq!(b"test".as_ref().size(), BodySize::Sized(4));
623
624        let pl = b"test".as_ref();
625        pin!(pl);
626        assert_poll_next!(pl, Bytes::from("test"));
627    }
628
629    #[actix_rt::test]
630    async fn test_vec() {
631        assert_eq!(vec![0; 0].size(), BodySize::Sized(0));
632        assert_eq!(Vec::from("test").size(), BodySize::Sized(4));
633
634        let pl = Vec::from("test");
635        pin!(pl);
636        assert_poll_next!(pl, Bytes::from("test"));
637    }
638
639    #[actix_rt::test]
640    async fn test_bytes() {
641        assert_eq!(Bytes::new().size(), BodySize::Sized(0));
642        assert_eq!(Bytes::from_static(b"test").size(), BodySize::Sized(4));
643
644        let pl = Bytes::from_static(b"test");
645        pin!(pl);
646        assert_poll_next!(pl, Bytes::from("test"));
647    }
648
649    #[actix_rt::test]
650    async fn test_bytes_mut() {
651        assert_eq!(BytesMut::new().size(), BodySize::Sized(0));
652        assert_eq!(BytesMut::from(b"test".as_ref()).size(), BodySize::Sized(4));
653
654        let pl = BytesMut::from("test");
655        pin!(pl);
656        assert_poll_next!(pl, Bytes::from("test"));
657    }
658
659    #[actix_rt::test]
660    async fn test_string() {
661        assert_eq!(String::new().size(), BodySize::Sized(0));
662        assert_eq!("test".to_owned().size(), BodySize::Sized(4));
663
664        let pl = "test".to_owned();
665        pin!(pl);
666        assert_poll_next!(pl, Bytes::from("test"));
667    }
668
669    #[actix_rt::test]
670    async fn complete_body_combinators() {
671        let body = Bytes::from_static(b"test");
672        let body = BoxBody::new(body);
673        let body = EitherBody::<_, ()>::left(body);
674        let body = EitherBody::<(), _>::right(body);
675        // Do not support try_into_bytes:
676        // let body = Box::new(body);
677        // let body = Box::pin(body);
678
679        assert_eq!(body.try_into_bytes().unwrap(), Bytes::from("test"));
680    }
681
682    #[actix_rt::test]
683    async fn complete_body_combinators_poll() {
684        let body = Bytes::from_static(b"test");
685        let body = BoxBody::new(body);
686        let body = EitherBody::<_, ()>::left(body);
687        let body = EitherBody::<(), _>::right(body);
688        let mut body = body;
689
690        assert_eq!(body.size(), BodySize::Sized(4));
691        assert_poll_next!(Pin::new(&mut body), Bytes::from("test"));
692        assert_poll_next_none!(Pin::new(&mut body));
693    }
694
695    #[actix_rt::test]
696    async fn none_body_combinators() {
697        fn none_body() -> BoxBody {
698            let body = body::None;
699            let body = BoxBody::new(body);
700            let body = EitherBody::<_, ()>::left(body);
701            let body = EitherBody::<(), _>::right(body);
702            body.boxed()
703        }
704
705        assert_eq!(none_body().size(), BodySize::None);
706        assert_eq!(none_body().try_into_bytes().unwrap(), Bytes::new());
707        assert_poll_next_none!(Pin::new(&mut none_body()));
708    }
709
710    // down-casting used to be done with a method on MessageBody trait
711    // test is kept to demonstrate equivalence of Any trait
712    #[actix_rt::test]
713    async fn test_body_casting() {
714        let mut body = String::from("hello cast");
715        // let mut resp_body: &mut dyn MessageBody<Error = Error> = &mut body;
716        let resp_body: &mut dyn std::any::Any = &mut body;
717        let body = resp_body.downcast_ref::<String>().unwrap();
718        assert_eq!(body, "hello cast");
719        let body = &mut resp_body.downcast_mut::<String>().unwrap();
720        body.push('!');
721        let body = resp_body.downcast_ref::<String>().unwrap();
722        assert_eq!(body, "hello cast!");
723        let not_body = resp_body.downcast_ref::<()>();
724        assert!(not_body.is_none());
725    }
726
727    #[actix_rt::test]
728    async fn non_owning_to_bytes() {
729        let mut body = BoxBody::new(());
730        let bytes = body::to_bytes(&mut body).await.unwrap();
731        assert_eq!(bytes, Bytes::new());
732
733        let mut body = body::BodyStream::new(stream::iter([
734            Ok::<_, std::io::Error>(Bytes::from("1234")),
735            Ok(Bytes::from("5678")),
736        ]));
737        let bytes = body::to_bytes(&mut body).await.unwrap();
738        assert_eq!(bytes, Bytes::from_static(b"12345678"));
739    }
740}