1use std::{
2 mem,
3 pin::Pin,
4 task::{Context, Poll},
5};
6
7use bytes::Bytes;
8use futures_core::Stream;
9use pin_project_lite::pin_project;
10
11use crate::error::PayloadError;
12
13pub type BoxedPayloadStream = Pin<Box<dyn Stream<Item = Result<Bytes, PayloadError>>>>;
15
16#[doc(hidden)]
17#[deprecated(since = "3.0.0", note = "Renamed to `BoxedPayloadStream`.")]
18pub type PayloadStream = BoxedPayloadStream;
19
20#[cfg(not(feature = "http2"))]
21pin_project! {
22 #[project = PayloadProj]
24 pub enum Payload<S = BoxedPayloadStream> {
25 None,
26 H1 { payload: crate::h1::Payload },
27 Stream { #[pin] payload: S },
28 }
29}
30
31#[cfg(feature = "http2")]
32pin_project! {
33 #[project = PayloadProj]
35 pub enum Payload<S = BoxedPayloadStream> {
36 None,
37 H1 { payload: crate::h1::Payload },
38 H2 { payload: crate::h2::Payload },
39 Stream { #[pin] payload: S },
40 }
41}
42
43impl<S> From<crate::h1::Payload> for Payload<S> {
44 #[inline]
45 fn from(payload: crate::h1::Payload) -> Self {
46 Payload::H1 { payload }
47 }
48}
49
50impl<S> From<Bytes> for Payload<S> {
51 #[inline]
52 fn from(bytes: Bytes) -> Self {
53 let (_, mut pl) = crate::h1::Payload::create(true);
54 pl.unread_data(bytes);
55 self::Payload::from(pl)
56 }
57}
58
59impl<S> From<Vec<u8>> for Payload<S> {
60 #[inline]
61 fn from(vec: Vec<u8>) -> Self {
62 Payload::from(Bytes::from(vec))
63 }
64}
65
66#[cfg(feature = "http2")]
67impl<S> From<crate::h2::Payload> for Payload<S> {
68 #[inline]
69 fn from(payload: crate::h2::Payload) -> Self {
70 Payload::H2 { payload }
71 }
72}
73
74#[cfg(feature = "http2")]
75impl<S> From<::h2::RecvStream> for Payload<S> {
76 #[inline]
77 fn from(stream: ::h2::RecvStream) -> Self {
78 Payload::H2 {
79 payload: crate::h2::Payload::new(stream),
80 }
81 }
82}
83
84impl From<BoxedPayloadStream> for Payload {
85 #[inline]
86 fn from(payload: BoxedPayloadStream) -> Self {
87 Payload::Stream { payload }
88 }
89}
90
91impl<S> Payload<S> {
92 #[must_use]
94 pub fn take(&mut self) -> Payload<S> {
95 mem::replace(self, Payload::None)
96 }
97}
98
99impl<S> Stream for Payload<S>
100where
101 S: Stream<Item = Result<Bytes, PayloadError>>,
102{
103 type Item = Result<Bytes, PayloadError>;
104
105 #[inline]
106 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
107 match self.project() {
108 PayloadProj::None => Poll::Ready(None),
109 PayloadProj::H1 { payload } => Pin::new(payload).poll_next(cx),
110
111 #[cfg(feature = "http2")]
112 PayloadProj::H2 { payload } => Pin::new(payload).poll_next(cx),
113
114 PayloadProj::Stream { payload } => payload.poll_next(cx),
115 }
116 }
117}
118
119#[cfg(test)]
120mod tests {
121 use static_assertions::{assert_impl_all, assert_not_impl_any};
122
123 use super::*;
124
125 assert_impl_all!(Payload: Unpin);
126 assert_not_impl_any!(Payload: Send, Sync);
127}