actix_http/
payload.rs

1use std::{
2    mem,
3    pin::Pin,
4    task::{Context, Poll},
5};
6
7use bytes::Bytes;
8use futures_core::Stream;
9use pin_project_lite::pin_project;
10
11use crate::error::PayloadError;
12
13/// A boxed payload stream.
14pub type BoxedPayloadStream = Pin<Box<dyn Stream<Item = Result<Bytes, PayloadError>>>>;
15
16#[doc(hidden)]
17#[deprecated(since = "3.0.0", note = "Renamed to `BoxedPayloadStream`.")]
18pub type PayloadStream = BoxedPayloadStream;
19
20#[cfg(not(feature = "http2"))]
21pin_project! {
22    /// A streaming payload.
23    #[project = PayloadProj]
24    pub enum Payload<S = BoxedPayloadStream> {
25        None,
26        H1 { payload: crate::h1::Payload },
27        Stream { #[pin] payload: S },
28    }
29}
30
31#[cfg(feature = "http2")]
32pin_project! {
33    /// A streaming payload.
34    #[project = PayloadProj]
35    pub enum Payload<S = BoxedPayloadStream> {
36        None,
37        H1 { payload: crate::h1::Payload },
38        H2 { payload: crate::h2::Payload },
39        Stream { #[pin] payload: S },
40    }
41}
42
43impl<S> From<crate::h1::Payload> for Payload<S> {
44    #[inline]
45    fn from(payload: crate::h1::Payload) -> Self {
46        Payload::H1 { payload }
47    }
48}
49
50impl<S> From<Bytes> for Payload<S> {
51    #[inline]
52    fn from(bytes: Bytes) -> Self {
53        let (_, mut pl) = crate::h1::Payload::create(true);
54        pl.unread_data(bytes);
55        self::Payload::from(pl)
56    }
57}
58
59impl<S> From<Vec<u8>> for Payload<S> {
60    #[inline]
61    fn from(vec: Vec<u8>) -> Self {
62        Payload::from(Bytes::from(vec))
63    }
64}
65
66#[cfg(feature = "http2")]
67impl<S> From<crate::h2::Payload> for Payload<S> {
68    #[inline]
69    fn from(payload: crate::h2::Payload) -> Self {
70        Payload::H2 { payload }
71    }
72}
73
74#[cfg(feature = "http2")]
75impl<S> From<::h2::RecvStream> for Payload<S> {
76    #[inline]
77    fn from(stream: ::h2::RecvStream) -> Self {
78        Payload::H2 {
79            payload: crate::h2::Payload::new(stream),
80        }
81    }
82}
83
84impl From<BoxedPayloadStream> for Payload {
85    #[inline]
86    fn from(payload: BoxedPayloadStream) -> Self {
87        Payload::Stream { payload }
88    }
89}
90
91impl<S> Payload<S> {
92    /// Takes current payload and replaces it with `None` value.
93    #[must_use]
94    pub fn take(&mut self) -> Payload<S> {
95        mem::replace(self, Payload::None)
96    }
97}
98
99impl<S> Stream for Payload<S>
100where
101    S: Stream<Item = Result<Bytes, PayloadError>>,
102{
103    type Item = Result<Bytes, PayloadError>;
104
105    #[inline]
106    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
107        match self.project() {
108            PayloadProj::None => Poll::Ready(None),
109            PayloadProj::H1 { payload } => Pin::new(payload).poll_next(cx),
110
111            #[cfg(feature = "http2")]
112            PayloadProj::H2 { payload } => Pin::new(payload).poll_next(cx),
113
114            PayloadProj::Stream { payload } => payload.poll_next(cx),
115        }
116    }
117}
118
119#[cfg(test)]
120mod tests {
121    use static_assertions::{assert_impl_all, assert_not_impl_any};
122
123    use super::*;
124
125    assert_impl_all!(Payload: Unpin);
126    assert_not_impl_any!(Payload: Send, Sync);
127}