1use std::fmt;
2use std::future::Future;
3use std::pin::Pin;
4use std::task::{ready, Context, Poll};
5use std::time::Duration;
6
7use bytes::Bytes;
8use http_body::Body as HttpBody;
9use http_body_util::combinators::BoxBody;
10use pin_project_lite::pin_project;
11#[cfg(feature = "stream")]
12use tokio::fs::File;
13use tokio::time::Sleep;
14#[cfg(feature = "stream")]
15use tokio_util::io::ReaderStream;
16
17pub struct Body {
19 inner: Inner,
20}
21
22enum Inner {
23 Reusable(Bytes),
24 Streaming(BoxBody<Bytes, Box<dyn std::error::Error + Send + Sync>>),
25}
26
27pin_project! {
28 pub(crate) struct TotalTimeoutBody<B> {
33 #[pin]
34 inner: B,
35 timeout: Pin<Box<Sleep>>,
36 }
37}
38
39pin_project! {
40 pub(crate) struct ReadTimeoutBody<B> {
41 #[pin]
42 inner: B,
43 #[pin]
44 sleep: Option<Sleep>,
45 timeout: Duration,
46 }
47}
48
49#[cfg(any(feature = "stream", feature = "multipart",))]
51pub(crate) struct DataStream<B>(pub(crate) B);
52
53impl Body {
54 pub fn as_bytes(&self) -> Option<&[u8]> {
58 match &self.inner {
59 Inner::Reusable(bytes) => Some(bytes.as_ref()),
60 Inner::Streaming(..) => None,
61 }
62 }
63
64 #[cfg(feature = "stream")]
88 #[cfg_attr(docsrs, doc(cfg(feature = "stream")))]
89 pub fn wrap_stream<S>(stream: S) -> Body
90 where
91 S: futures_core::stream::TryStream + Send + 'static,
92 S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
93 Bytes: From<S::Ok>,
94 {
95 Body::stream(stream)
96 }
97
98 #[cfg(any(feature = "stream", feature = "multipart", feature = "blocking"))]
99 pub(crate) fn stream<S>(stream: S) -> Body
100 where
101 S: futures_core::stream::TryStream + Send + 'static,
102 S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
103 Bytes: From<S::Ok>,
104 {
105 use futures_util::TryStreamExt;
106 use http_body::Frame;
107 use http_body_util::StreamBody;
108
109 let body = http_body_util::BodyExt::boxed(StreamBody::new(sync_wrapper::SyncStream::new(
110 stream
111 .map_ok(|d| Frame::data(Bytes::from(d)))
112 .map_err(Into::into),
113 )));
114 Body {
115 inner: Inner::Streaming(body),
116 }
117 }
118
119 pub(crate) fn empty() -> Body {
120 Body::reusable(Bytes::new())
121 }
122
123 pub(crate) fn reusable(chunk: Bytes) -> Body {
124 Body {
125 inner: Inner::Reusable(chunk),
126 }
127 }
128
129 pub fn wrap<B>(inner: B) -> Body
143 where
144 B: HttpBody + Send + Sync + 'static,
145 B::Data: Into<Bytes>,
146 B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
147 {
148 use http_body_util::BodyExt;
149
150 let boxed = IntoBytesBody { inner }.map_err(Into::into).boxed();
151
152 Body {
153 inner: Inner::Streaming(boxed),
154 }
155 }
156
157 pub(crate) fn try_clone(&self) -> Option<Body> {
158 match self.inner {
159 Inner::Reusable(ref chunk) => Some(Body::reusable(chunk.clone())),
160 Inner::Streaming { .. } => None,
161 }
162 }
163
164 #[cfg(feature = "multipart")]
165 pub(crate) fn into_stream(self) -> DataStream<Body> {
166 DataStream(self)
167 }
168
169 #[cfg(feature = "multipart")]
170 pub(crate) fn content_length(&self) -> Option<u64> {
171 match self.inner {
172 Inner::Reusable(ref bytes) => Some(bytes.len() as u64),
173 Inner::Streaming(ref body) => body.size_hint().exact(),
174 }
175 }
176}
177
178impl Default for Body {
179 #[inline]
180 fn default() -> Body {
181 Body::empty()
182 }
183}
184
185impl From<Bytes> for Body {
199 #[inline]
200 fn from(bytes: Bytes) -> Body {
201 Body::reusable(bytes)
202 }
203}
204
205impl From<Vec<u8>> for Body {
206 #[inline]
207 fn from(vec: Vec<u8>) -> Body {
208 Body::reusable(vec.into())
209 }
210}
211
212impl From<&'static [u8]> for Body {
213 #[inline]
214 fn from(s: &'static [u8]) -> Body {
215 Body::reusable(Bytes::from_static(s))
216 }
217}
218
219impl From<String> for Body {
220 #[inline]
221 fn from(s: String) -> Body {
222 Body::reusable(s.into())
223 }
224}
225
226impl From<&'static str> for Body {
227 #[inline]
228 fn from(s: &'static str) -> Body {
229 s.as_bytes().into()
230 }
231}
232
233#[cfg(feature = "stream")]
234#[cfg_attr(docsrs, doc(cfg(feature = "stream")))]
235impl From<File> for Body {
236 #[inline]
237 fn from(file: File) -> Body {
238 Body::wrap_stream(ReaderStream::new(file))
239 }
240}
241
242impl fmt::Debug for Body {
243 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
244 f.debug_struct("Body").finish()
245 }
246}
247
248impl HttpBody for Body {
249 type Data = Bytes;
250 type Error = crate::Error;
251
252 fn poll_frame(
253 mut self: Pin<&mut Self>,
254 cx: &mut Context,
255 ) -> Poll<Option<Result<hyper::body::Frame<Self::Data>, Self::Error>>> {
256 match self.inner {
257 Inner::Reusable(ref mut bytes) => {
258 let out = bytes.split_off(0);
259 if out.is_empty() {
260 Poll::Ready(None)
261 } else {
262 Poll::Ready(Some(Ok(hyper::body::Frame::data(out))))
263 }
264 }
265 Inner::Streaming(ref mut body) => Poll::Ready(
266 ready!(Pin::new(body).poll_frame(cx))
267 .map(|opt_chunk| opt_chunk.map_err(crate::error::body)),
268 ),
269 }
270 }
271
272 fn size_hint(&self) -> http_body::SizeHint {
273 match self.inner {
274 Inner::Reusable(ref bytes) => http_body::SizeHint::with_exact(bytes.len() as u64),
275 Inner::Streaming(ref body) => body.size_hint(),
276 }
277 }
278
279 fn is_end_stream(&self) -> bool {
280 match self.inner {
281 Inner::Reusable(ref bytes) => bytes.is_empty(),
282 Inner::Streaming(ref body) => body.is_end_stream(),
283 }
284 }
285}
286
287pub(crate) fn total_timeout<B>(body: B, timeout: Pin<Box<Sleep>>) -> TotalTimeoutBody<B> {
290 TotalTimeoutBody {
291 inner: body,
292 timeout,
293 }
294}
295
296pub(crate) fn with_read_timeout<B>(body: B, timeout: Duration) -> ReadTimeoutBody<B> {
297 ReadTimeoutBody {
298 inner: body,
299 sleep: None,
300 timeout,
301 }
302}
303
304impl<B> hyper::body::Body for TotalTimeoutBody<B>
305where
306 B: hyper::body::Body,
307 B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
308{
309 type Data = B::Data;
310 type Error = crate::Error;
311
312 fn poll_frame(
313 self: Pin<&mut Self>,
314 cx: &mut Context,
315 ) -> Poll<Option<Result<hyper::body::Frame<Self::Data>, Self::Error>>> {
316 let this = self.project();
317 if let Poll::Ready(()) = this.timeout.as_mut().poll(cx) {
318 return Poll::Ready(Some(Err(crate::error::body(crate::error::TimedOut))));
319 }
320 Poll::Ready(
321 ready!(this.inner.poll_frame(cx))
322 .map(|opt_chunk| opt_chunk.map_err(crate::error::body)),
323 )
324 }
325
326 #[inline]
327 fn size_hint(&self) -> http_body::SizeHint {
328 self.inner.size_hint()
329 }
330
331 #[inline]
332 fn is_end_stream(&self) -> bool {
333 self.inner.is_end_stream()
334 }
335}
336
337impl<B> hyper::body::Body for ReadTimeoutBody<B>
338where
339 B: hyper::body::Body,
340 B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
341{
342 type Data = B::Data;
343 type Error = crate::Error;
344
345 fn poll_frame(
346 self: Pin<&mut Self>,
347 cx: &mut Context,
348 ) -> Poll<Option<Result<hyper::body::Frame<Self::Data>, Self::Error>>> {
349 let mut this = self.project();
350
351 let sleep_pinned = if let Some(some) = this.sleep.as_mut().as_pin_mut() {
353 some
354 } else {
355 this.sleep.set(Some(tokio::time::sleep(*this.timeout)));
356 this.sleep.as_mut().as_pin_mut().unwrap()
357 };
358
359 if let Poll::Ready(()) = sleep_pinned.poll(cx) {
361 return Poll::Ready(Some(Err(crate::error::body(crate::error::TimedOut))));
362 }
363
364 let item = ready!(this.inner.poll_frame(cx))
365 .map(|opt_chunk| opt_chunk.map_err(crate::error::body));
366 this.sleep.set(None);
368 Poll::Ready(item)
369 }
370
371 #[inline]
372 fn size_hint(&self) -> http_body::SizeHint {
373 self.inner.size_hint()
374 }
375
376 #[inline]
377 fn is_end_stream(&self) -> bool {
378 self.inner.is_end_stream()
379 }
380}
381
382pub(crate) type ResponseBody =
383 http_body_util::combinators::BoxBody<Bytes, Box<dyn std::error::Error + Send + Sync>>;
384
385pub(crate) fn boxed<B>(body: B) -> ResponseBody
386where
387 B: hyper::body::Body<Data = Bytes> + Send + Sync + 'static,
388 B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
389{
390 use http_body_util::BodyExt;
391
392 body.map_err(box_err).boxed()
393}
394
395pub(crate) fn response<B>(
396 body: B,
397 deadline: Option<Pin<Box<Sleep>>>,
398 read_timeout: Option<Duration>,
399) -> ResponseBody
400where
401 B: hyper::body::Body<Data = Bytes> + Send + Sync + 'static,
402 B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
403{
404 use http_body_util::BodyExt;
405
406 match (deadline, read_timeout) {
407 (Some(total), Some(read)) => {
408 let body = with_read_timeout(body, read).map_err(box_err);
409 total_timeout(body, total).map_err(box_err).boxed()
410 }
411 (Some(total), None) => total_timeout(body, total).map_err(box_err).boxed(),
412 (None, Some(read)) => with_read_timeout(body, read).map_err(box_err).boxed(),
413 (None, None) => body.map_err(box_err).boxed(),
414 }
415}
416
417fn box_err<E>(err: E) -> Box<dyn std::error::Error + Send + Sync>
418where
419 E: Into<Box<dyn std::error::Error + Send + Sync>>,
420{
421 err.into()
422}
423
424#[cfg(any(feature = "stream", feature = "multipart",))]
427impl<B> futures_core::Stream for DataStream<B>
428where
429 B: HttpBody<Data = Bytes> + Unpin,
430{
431 type Item = Result<Bytes, B::Error>;
432
433 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
434 loop {
435 return match ready!(Pin::new(&mut self.0).poll_frame(cx)) {
436 Some(Ok(frame)) => {
437 if let Ok(buf) = frame.into_data() {
439 Poll::Ready(Some(Ok(buf)))
440 } else {
441 continue;
442 }
443 }
444 Some(Err(err)) => Poll::Ready(Some(Err(err))),
445 None => Poll::Ready(None),
446 };
447 }
448 }
449}
450
451pin_project! {
454 struct IntoBytesBody<B> {
455 #[pin]
456 inner: B,
457 }
458}
459
460impl<B> hyper::body::Body for IntoBytesBody<B>
463where
464 B: hyper::body::Body,
465 B::Data: Into<Bytes>,
466{
467 type Data = Bytes;
468 type Error = B::Error;
469
470 fn poll_frame(
471 self: Pin<&mut Self>,
472 cx: &mut Context,
473 ) -> Poll<Option<Result<hyper::body::Frame<Self::Data>, Self::Error>>> {
474 match ready!(self.project().inner.poll_frame(cx)) {
475 Some(Ok(f)) => Poll::Ready(Some(Ok(f.map_data(Into::into)))),
476 Some(Err(e)) => Poll::Ready(Some(Err(e))),
477 None => Poll::Ready(None),
478 }
479 }
480
481 #[inline]
482 fn size_hint(&self) -> http_body::SizeHint {
483 self.inner.size_hint()
484 }
485
486 #[inline]
487 fn is_end_stream(&self) -> bool {
488 self.inner.is_end_stream()
489 }
490}
491
492#[cfg(test)]
493mod tests {
494 use http_body::Body as _;
495
496 use super::Body;
497
498 #[test]
499 fn test_as_bytes() {
500 let test_data = b"Test body";
501 let body = Body::from(&test_data[..]);
502 assert_eq!(body.as_bytes(), Some(&test_data[..]));
503 }
504
505 #[test]
506 fn body_exact_length() {
507 let empty_body = Body::empty();
508 assert!(empty_body.is_end_stream());
509 assert_eq!(empty_body.size_hint().exact(), Some(0));
510
511 let bytes_body = Body::reusable("abc".into());
512 assert!(!bytes_body.is_end_stream());
513 assert_eq!(bytes_body.size_hint().exact(), Some(3));
514
515 let stream_body = Body::wrap(empty_body);
517 assert!(stream_body.is_end_stream());
518 assert_eq!(stream_body.size_hint().exact(), Some(0));
519 }
520}