1use std::{
4 convert::Infallible,
5 error::Error as StdError,
6 mem,
7 pin::Pin,
8 task::{Context, Poll},
9};
10
11use bytes::{Bytes, BytesMut};
12use futures_core::ready;
13use pin_project_lite::pin_project;
14
15use super::{BodySize, BoxBody};
16
17pub trait MessageBody {
56 type Error: Into<Box<dyn StdError>>;
61
62 fn size(&self) -> BodySize;
66
67 fn poll_next(
85 self: Pin<&mut Self>,
86 cx: &mut Context<'_>,
87 ) -> Poll<Option<Result<Bytes, Self::Error>>>;
88
89 #[inline]
101 fn try_into_bytes(self) -> Result<Bytes, Self>
102 where
103 Self: Sized,
104 {
105 Err(self)
106 }
107
108 #[inline]
114 fn boxed(self) -> BoxBody
115 where
116 Self: Sized + 'static,
117 {
118 BoxBody::new(self)
119 }
120}
121
122mod foreign_impls {
123 use std::{borrow::Cow, ops::DerefMut};
124
125 use super::*;
126
127 impl<B> MessageBody for &mut B
128 where
129 B: MessageBody + Unpin + ?Sized,
130 {
131 type Error = B::Error;
132
133 fn size(&self) -> BodySize {
134 (**self).size()
135 }
136
137 fn poll_next(
138 mut self: Pin<&mut Self>,
139 cx: &mut Context<'_>,
140 ) -> Poll<Option<Result<Bytes, Self::Error>>> {
141 Pin::new(&mut **self).poll_next(cx)
142 }
143 }
144
145 impl MessageBody for Infallible {
146 type Error = Infallible;
147
148 fn size(&self) -> BodySize {
149 match *self {}
150 }
151
152 fn poll_next(
153 self: Pin<&mut Self>,
154 _cx: &mut Context<'_>,
155 ) -> Poll<Option<Result<Bytes, Self::Error>>> {
156 match *self {}
157 }
158 }
159
160 impl MessageBody for () {
161 type Error = Infallible;
162
163 #[inline]
164 fn size(&self) -> BodySize {
165 BodySize::Sized(0)
166 }
167
168 #[inline]
169 fn poll_next(
170 self: Pin<&mut Self>,
171 _cx: &mut Context<'_>,
172 ) -> Poll<Option<Result<Bytes, Self::Error>>> {
173 Poll::Ready(None)
174 }
175
176 #[inline]
177 fn try_into_bytes(self) -> Result<Bytes, Self> {
178 Ok(Bytes::new())
179 }
180 }
181
182 impl<B> MessageBody for Box<B>
183 where
184 B: MessageBody + Unpin + ?Sized,
185 {
186 type Error = B::Error;
187
188 #[inline]
189 fn size(&self) -> BodySize {
190 self.as_ref().size()
191 }
192
193 #[inline]
194 fn poll_next(
195 self: Pin<&mut Self>,
196 cx: &mut Context<'_>,
197 ) -> Poll<Option<Result<Bytes, Self::Error>>> {
198 Pin::new(self.get_mut().as_mut()).poll_next(cx)
199 }
200 }
201
202 impl<T, B> MessageBody for Pin<T>
203 where
204 T: DerefMut<Target = B> + Unpin,
205 B: MessageBody + ?Sized,
206 {
207 type Error = B::Error;
208
209 #[inline]
210 fn size(&self) -> BodySize {
211 self.as_ref().size()
212 }
213
214 #[inline]
215 fn poll_next(
216 self: Pin<&mut Self>,
217 cx: &mut Context<'_>,
218 ) -> Poll<Option<Result<Bytes, Self::Error>>> {
219 self.get_mut().as_mut().poll_next(cx)
220 }
221 }
222
223 impl MessageBody for &'static [u8] {
224 type Error = Infallible;
225
226 #[inline]
227 fn size(&self) -> BodySize {
228 BodySize::Sized(self.len() as u64)
229 }
230
231 #[inline]
232 fn poll_next(
233 self: Pin<&mut Self>,
234 _cx: &mut Context<'_>,
235 ) -> Poll<Option<Result<Bytes, Self::Error>>> {
236 if self.is_empty() {
237 Poll::Ready(None)
238 } else {
239 Poll::Ready(Some(Ok(Bytes::from_static(mem::take(self.get_mut())))))
240 }
241 }
242
243 #[inline]
244 fn try_into_bytes(self) -> Result<Bytes, Self> {
245 Ok(Bytes::from_static(self))
246 }
247 }
248
249 impl MessageBody for Bytes {
250 type Error = Infallible;
251
252 #[inline]
253 fn size(&self) -> BodySize {
254 BodySize::Sized(self.len() as u64)
255 }
256
257 #[inline]
258 fn poll_next(
259 self: Pin<&mut Self>,
260 _cx: &mut Context<'_>,
261 ) -> Poll<Option<Result<Bytes, Self::Error>>> {
262 if self.is_empty() {
263 Poll::Ready(None)
264 } else {
265 Poll::Ready(Some(Ok(mem::take(self.get_mut()))))
266 }
267 }
268
269 #[inline]
270 fn try_into_bytes(self) -> Result<Bytes, Self> {
271 Ok(self)
272 }
273 }
274
275 impl MessageBody for BytesMut {
276 type Error = Infallible;
277
278 #[inline]
279 fn size(&self) -> BodySize {
280 BodySize::Sized(self.len() as u64)
281 }
282
283 #[inline]
284 fn poll_next(
285 self: Pin<&mut Self>,
286 _cx: &mut Context<'_>,
287 ) -> Poll<Option<Result<Bytes, Self::Error>>> {
288 if self.is_empty() {
289 Poll::Ready(None)
290 } else {
291 Poll::Ready(Some(Ok(mem::take(self.get_mut()).freeze())))
292 }
293 }
294
295 #[inline]
296 fn try_into_bytes(self) -> Result<Bytes, Self> {
297 Ok(self.freeze())
298 }
299 }
300
301 impl MessageBody for Vec<u8> {
302 type Error = Infallible;
303
304 #[inline]
305 fn size(&self) -> BodySize {
306 BodySize::Sized(self.len() as u64)
307 }
308
309 #[inline]
310 fn poll_next(
311 self: Pin<&mut Self>,
312 _cx: &mut Context<'_>,
313 ) -> Poll<Option<Result<Bytes, Self::Error>>> {
314 if self.is_empty() {
315 Poll::Ready(None)
316 } else {
317 Poll::Ready(Some(Ok(mem::take(self.get_mut()).into())))
318 }
319 }
320
321 #[inline]
322 fn try_into_bytes(self) -> Result<Bytes, Self> {
323 Ok(Bytes::from(self))
324 }
325 }
326
327 impl MessageBody for Cow<'static, [u8]> {
328 type Error = Infallible;
329
330 #[inline]
331 fn size(&self) -> BodySize {
332 BodySize::Sized(self.len() as u64)
333 }
334
335 #[inline]
336 fn poll_next(
337 self: Pin<&mut Self>,
338 _cx: &mut Context<'_>,
339 ) -> Poll<Option<Result<Bytes, Self::Error>>> {
340 if self.is_empty() {
341 Poll::Ready(None)
342 } else {
343 let bytes = match mem::take(self.get_mut()) {
344 Cow::Borrowed(b) => Bytes::from_static(b),
345 Cow::Owned(b) => Bytes::from(b),
346 };
347 Poll::Ready(Some(Ok(bytes)))
348 }
349 }
350
351 #[inline]
352 fn try_into_bytes(self) -> Result<Bytes, Self> {
353 match self {
354 Cow::Borrowed(b) => Ok(Bytes::from_static(b)),
355 Cow::Owned(b) => Ok(Bytes::from(b)),
356 }
357 }
358 }
359
360 impl MessageBody for &'static str {
361 type Error = Infallible;
362
363 #[inline]
364 fn size(&self) -> BodySize {
365 BodySize::Sized(self.len() as u64)
366 }
367
368 #[inline]
369 fn poll_next(
370 self: Pin<&mut Self>,
371 _cx: &mut Context<'_>,
372 ) -> Poll<Option<Result<Bytes, Self::Error>>> {
373 if self.is_empty() {
374 Poll::Ready(None)
375 } else {
376 let string = mem::take(self.get_mut());
377 let bytes = Bytes::from_static(string.as_bytes());
378 Poll::Ready(Some(Ok(bytes)))
379 }
380 }
381
382 #[inline]
383 fn try_into_bytes(self) -> Result<Bytes, Self> {
384 Ok(Bytes::from_static(self.as_bytes()))
385 }
386 }
387
388 impl MessageBody for String {
389 type Error = Infallible;
390
391 #[inline]
392 fn size(&self) -> BodySize {
393 BodySize::Sized(self.len() as u64)
394 }
395
396 #[inline]
397 fn poll_next(
398 self: Pin<&mut Self>,
399 _cx: &mut Context<'_>,
400 ) -> Poll<Option<Result<Bytes, Self::Error>>> {
401 if self.is_empty() {
402 Poll::Ready(None)
403 } else {
404 let string = mem::take(self.get_mut());
405 Poll::Ready(Some(Ok(Bytes::from(string))))
406 }
407 }
408
409 #[inline]
410 fn try_into_bytes(self) -> Result<Bytes, Self> {
411 Ok(Bytes::from(self))
412 }
413 }
414
415 impl MessageBody for Cow<'static, str> {
416 type Error = Infallible;
417
418 #[inline]
419 fn size(&self) -> BodySize {
420 BodySize::Sized(self.len() as u64)
421 }
422
423 #[inline]
424 fn poll_next(
425 self: Pin<&mut Self>,
426 _cx: &mut Context<'_>,
427 ) -> Poll<Option<Result<Bytes, Self::Error>>> {
428 if self.is_empty() {
429 Poll::Ready(None)
430 } else {
431 let bytes = match mem::take(self.get_mut()) {
432 Cow::Borrowed(s) => Bytes::from_static(s.as_bytes()),
433 Cow::Owned(s) => Bytes::from(s.into_bytes()),
434 };
435 Poll::Ready(Some(Ok(bytes)))
436 }
437 }
438
439 #[inline]
440 fn try_into_bytes(self) -> Result<Bytes, Self> {
441 match self {
442 Cow::Borrowed(s) => Ok(Bytes::from_static(s.as_bytes())),
443 Cow::Owned(s) => Ok(Bytes::from(s.into_bytes())),
444 }
445 }
446 }
447
448 impl MessageBody for bytestring::ByteString {
449 type Error = Infallible;
450
451 #[inline]
452 fn size(&self) -> BodySize {
453 BodySize::Sized(self.len() as u64)
454 }
455
456 #[inline]
457 fn poll_next(
458 self: Pin<&mut Self>,
459 _cx: &mut Context<'_>,
460 ) -> Poll<Option<Result<Bytes, Self::Error>>> {
461 let string = mem::take(self.get_mut());
462 Poll::Ready(Some(Ok(string.into_bytes())))
463 }
464
465 #[inline]
466 fn try_into_bytes(self) -> Result<Bytes, Self> {
467 Ok(self.into_bytes())
468 }
469 }
470}
471
472pin_project! {
473 pub(crate) struct MessageBodyMapErr<B, F> {
474 #[pin]
475 body: B,
476 mapper: Option<F>,
477 }
478}
479
480impl<B, F, E> MessageBodyMapErr<B, F>
481where
482 B: MessageBody,
483 F: FnOnce(B::Error) -> E,
484{
485 pub(crate) fn new(body: B, mapper: F) -> Self {
486 Self {
487 body,
488 mapper: Some(mapper),
489 }
490 }
491}
492
493impl<B, F, E> MessageBody for MessageBodyMapErr<B, F>
494where
495 B: MessageBody,
496 F: FnOnce(B::Error) -> E,
497 E: Into<Box<dyn StdError>>,
498{
499 type Error = E;
500
501 #[inline]
502 fn size(&self) -> BodySize {
503 self.body.size()
504 }
505
506 fn poll_next(
507 mut self: Pin<&mut Self>,
508 cx: &mut Context<'_>,
509 ) -> Poll<Option<Result<Bytes, Self::Error>>> {
510 let this = self.as_mut().project();
511
512 match ready!(this.body.poll_next(cx)) {
513 Some(Err(err)) => {
514 let f = self.as_mut().project().mapper.take().unwrap();
515 let mapped_err = (f)(err);
516 Poll::Ready(Some(Err(mapped_err)))
517 }
518 Some(Ok(val)) => Poll::Ready(Some(Ok(val))),
519 None => Poll::Ready(None),
520 }
521 }
522
523 #[inline]
524 fn try_into_bytes(self) -> Result<Bytes, Self> {
525 let Self { body, mapper } = self;
526 body.try_into_bytes().map_err(|body| Self { body, mapper })
527 }
528}
529
530#[cfg(test)]
531mod tests {
532 use actix_rt::pin;
533 use actix_utils::future::poll_fn;
534 use futures_util::stream;
535
536 use super::*;
537 use crate::body::{self, EitherBody};
538
539 macro_rules! assert_poll_next {
540 ($pin:expr, $exp:expr) => {
541 assert_eq!(
542 poll_fn(|cx| $pin.as_mut().poll_next(cx))
543 .await
544 .unwrap() .unwrap(), $exp
547 );
548 };
549 }
550
551 macro_rules! assert_poll_next_none {
552 ($pin:expr) => {
553 assert!(poll_fn(|cx| $pin.as_mut().poll_next(cx)).await.is_none());
554 };
555 }
556
557 #[allow(unused_allocation)] #[actix_rt::test]
559 async fn boxing_equivalence() {
560 assert_eq!(().size(), BodySize::Sized(0));
561 assert_eq!(().size(), Box::new(()).size());
562 assert_eq!(().size(), Box::pin(()).size());
563
564 let pl = Box::new(());
565 pin!(pl);
566 assert_poll_next_none!(pl);
567
568 let mut pl = Box::pin(());
569 assert_poll_next_none!(pl);
570 }
571
572 #[actix_rt::test]
573 async fn mut_equivalence() {
574 assert_eq!(().size(), BodySize::Sized(0));
575 assert_eq!(().size(), (&(&mut ())).size());
576
577 let pl = &mut ();
578 pin!(pl);
579 assert_poll_next_none!(pl);
580
581 let pl = &mut Box::new(());
582 pin!(pl);
583 assert_poll_next_none!(pl);
584
585 let mut body = body::SizedStream::new(
586 8,
587 stream::iter([
588 Ok::<_, std::io::Error>(Bytes::from("1234")),
589 Ok(Bytes::from("5678")),
590 ]),
591 );
592 let body = &mut body;
593 assert_eq!(body.size(), BodySize::Sized(8));
594 pin!(body);
595 assert_poll_next!(body, Bytes::from_static(b"1234"));
596 assert_poll_next!(body, Bytes::from_static(b"5678"));
597 assert_poll_next_none!(body);
598 }
599
600 #[allow(clippy::let_unit_value)]
601 #[actix_rt::test]
602 async fn test_unit() {
603 let pl = ();
604 assert_eq!(pl.size(), BodySize::Sized(0));
605 pin!(pl);
606 assert_poll_next_none!(pl);
607 }
608
609 #[actix_rt::test]
610 async fn test_static_str() {
611 assert_eq!("".size(), BodySize::Sized(0));
612 assert_eq!("test".size(), BodySize::Sized(4));
613
614 let pl = "test";
615 pin!(pl);
616 assert_poll_next!(pl, Bytes::from("test"));
617 }
618
619 #[actix_rt::test]
620 async fn test_static_bytes() {
621 assert_eq!(b"".as_ref().size(), BodySize::Sized(0));
622 assert_eq!(b"test".as_ref().size(), BodySize::Sized(4));
623
624 let pl = b"test".as_ref();
625 pin!(pl);
626 assert_poll_next!(pl, Bytes::from("test"));
627 }
628
629 #[actix_rt::test]
630 async fn test_vec() {
631 assert_eq!(vec![0; 0].size(), BodySize::Sized(0));
632 assert_eq!(Vec::from("test").size(), BodySize::Sized(4));
633
634 let pl = Vec::from("test");
635 pin!(pl);
636 assert_poll_next!(pl, Bytes::from("test"));
637 }
638
639 #[actix_rt::test]
640 async fn test_bytes() {
641 assert_eq!(Bytes::new().size(), BodySize::Sized(0));
642 assert_eq!(Bytes::from_static(b"test").size(), BodySize::Sized(4));
643
644 let pl = Bytes::from_static(b"test");
645 pin!(pl);
646 assert_poll_next!(pl, Bytes::from("test"));
647 }
648
649 #[actix_rt::test]
650 async fn test_bytes_mut() {
651 assert_eq!(BytesMut::new().size(), BodySize::Sized(0));
652 assert_eq!(BytesMut::from(b"test".as_ref()).size(), BodySize::Sized(4));
653
654 let pl = BytesMut::from("test");
655 pin!(pl);
656 assert_poll_next!(pl, Bytes::from("test"));
657 }
658
659 #[actix_rt::test]
660 async fn test_string() {
661 assert_eq!(String::new().size(), BodySize::Sized(0));
662 assert_eq!("test".to_owned().size(), BodySize::Sized(4));
663
664 let pl = "test".to_owned();
665 pin!(pl);
666 assert_poll_next!(pl, Bytes::from("test"));
667 }
668
669 #[actix_rt::test]
670 async fn complete_body_combinators() {
671 let body = Bytes::from_static(b"test");
672 let body = BoxBody::new(body);
673 let body = EitherBody::<_, ()>::left(body);
674 let body = EitherBody::<(), _>::right(body);
675 assert_eq!(body.try_into_bytes().unwrap(), Bytes::from("test"));
680 }
681
682 #[actix_rt::test]
683 async fn complete_body_combinators_poll() {
684 let body = Bytes::from_static(b"test");
685 let body = BoxBody::new(body);
686 let body = EitherBody::<_, ()>::left(body);
687 let body = EitherBody::<(), _>::right(body);
688 let mut body = body;
689
690 assert_eq!(body.size(), BodySize::Sized(4));
691 assert_poll_next!(Pin::new(&mut body), Bytes::from("test"));
692 assert_poll_next_none!(Pin::new(&mut body));
693 }
694
695 #[actix_rt::test]
696 async fn none_body_combinators() {
697 fn none_body() -> BoxBody {
698 let body = body::None;
699 let body = BoxBody::new(body);
700 let body = EitherBody::<_, ()>::left(body);
701 let body = EitherBody::<(), _>::right(body);
702 body.boxed()
703 }
704
705 assert_eq!(none_body().size(), BodySize::None);
706 assert_eq!(none_body().try_into_bytes().unwrap(), Bytes::new());
707 assert_poll_next_none!(Pin::new(&mut none_body()));
708 }
709
710 #[actix_rt::test]
713 async fn test_body_casting() {
714 let mut body = String::from("hello cast");
715 let resp_body: &mut dyn std::any::Any = &mut body;
717 let body = resp_body.downcast_ref::<String>().unwrap();
718 assert_eq!(body, "hello cast");
719 let body = &mut resp_body.downcast_mut::<String>().unwrap();
720 body.push('!');
721 let body = resp_body.downcast_ref::<String>().unwrap();
722 assert_eq!(body, "hello cast!");
723 let not_body = resp_body.downcast_ref::<()>();
724 assert!(not_body.is_none());
725 }
726
727 #[actix_rt::test]
728 async fn non_owning_to_bytes() {
729 let mut body = BoxBody::new(());
730 let bytes = body::to_bytes(&mut body).await.unwrap();
731 assert_eq!(bytes, Bytes::new());
732
733 let mut body = body::BodyStream::new(stream::iter([
734 Ok::<_, std::io::Error>(Bytes::from("1234")),
735 Ok(Bytes::from("5678")),
736 ]));
737 let bytes = body::to_bytes(&mut body).await.unwrap();
738 assert_eq!(bytes, Bytes::from_static(b"12345678"));
739 }
740}