warp/filters/
body.rs

1//! Body filters
2//!
3//! Filters that extract a body for a route.
4
5use std::error::Error as StdError;
6use std::fmt;
7use std::pin::Pin;
8use std::task::{Context, Poll};
9
10use bytes::{Buf, Bytes};
11use futures_util::{future, ready, Stream, TryFutureExt};
12use headers::ContentLength;
13use http::header::CONTENT_TYPE;
14use hyper::Body;
15use serde::de::DeserializeOwned;
16
17use crate::filter::{filter_fn, filter_fn_one, Filter, FilterBase};
18use crate::reject::{self, Rejection};
19
20type BoxError = Box<dyn StdError + Send + Sync>;
21
22// Extracts the `Body` Stream from the route.
23//
24// Does not consume any of it.
25pub(crate) fn body() -> impl Filter<Extract = (Body,), Error = Rejection> + Copy {
26    filter_fn_one(|route| {
27        future::ready(route.take_body().ok_or_else(|| {
28            tracing::error!("request body already taken in previous filter");
29            reject::known(BodyConsumedMultipleTimes { _p: () })
30        }))
31    })
32}
33
34/// Require a `content-length` header to have a value no greater than some limit.
35///
36/// Rejects if `content-length` header is missing, is invalid, or has a number
37/// larger than the limit provided.
38///
39/// # Example
40///
41/// ```
42/// use warp::Filter;
43///
44/// // Limit the upload to 4kb...
45/// let upload = warp::body::content_length_limit(4096)
46///     .and(warp::body::aggregate());
47/// ```
48pub fn content_length_limit(limit: u64) -> impl Filter<Extract = (), Error = Rejection> + Copy {
49    crate::filters::header::header2()
50        .map_err(crate::filter::Internal, |_| {
51            tracing::debug!("content-length missing");
52            reject::length_required()
53        })
54        .and_then(move |ContentLength(length)| {
55            if length <= limit {
56                future::ok(())
57            } else {
58                tracing::debug!("content-length: {} is over limit {}", length, limit);
59                future::err(reject::payload_too_large())
60            }
61        })
62        .untuple_one()
63}
64
65/// Create a `Filter` that extracts the request body as a `futures::Stream`.
66///
67/// If other filters have already extracted the body, this filter will reject
68/// with a `500 Internal Server Error`.
69///
70/// For example usage, please take a look at [examples/stream.rs](https://github.com/seanmonstar/warp/blob/master/examples/stream.rs).
71///
72/// # Warning
73///
74/// This does not have a default size limit, it would be wise to use one to
75/// prevent a overly large request from using too much memory.
76pub fn stream(
77) -> impl Filter<Extract = (impl Stream<Item = Result<impl Buf, crate::Error>>,), Error = Rejection> + Copy
78{
79    body().map(|body: Body| BodyStream { body })
80}
81
82/// Returns a `Filter` that matches any request and extracts a `Future` of a
83/// concatenated body.
84///
85/// The contents of the body will be flattened into a single contiguous
86/// `Bytes`, which may require memory copies. If you don't require a
87/// contiguous buffer, using `aggregate` can be give better performance.
88///
89/// # Warning
90///
91/// This does not have a default size limit, it would be wise to use one to
92/// prevent a overly large request from using too much memory.
93///
94/// # Example
95///
96/// ```
97/// use warp::{Buf, Filter};
98///
99/// let route = warp::body::content_length_limit(1024 * 32)
100///     .and(warp::body::bytes())
101///     .map(|bytes: bytes::Bytes| {
102///         println!("bytes = {:?}", bytes);
103///     });
104/// ```
105pub fn bytes() -> impl Filter<Extract = (Bytes,), Error = Rejection> + Copy {
106    body().and_then(|body: hyper::Body| {
107        hyper::body::to_bytes(body).map_err(|err| {
108            tracing::debug!("to_bytes error: {}", err);
109            reject::known(BodyReadError(err))
110        })
111    })
112}
113
114/// Returns a `Filter` that matches any request and extracts a `Future` of an
115/// aggregated body.
116///
117/// The `Buf` may contain multiple, non-contiguous buffers. This can be more
118/// performant (by reducing copies) when receiving large bodies.
119///
120/// # Warning
121///
122/// This does not have a default size limit, it would be wise to use one to
123/// prevent a overly large request from using too much memory.
124///
125/// # Example
126///
127/// ```
128/// use warp::{Buf, Filter};
129///
130/// fn full_body(mut body: impl Buf) {
131///     // It could have several non-contiguous slices of memory...
132///     while body.has_remaining() {
133///         println!("slice = {:?}", body.chunk());
134///         let cnt = body.chunk().len();
135///         body.advance(cnt);
136///     }
137/// }
138///
139/// let route = warp::body::content_length_limit(1024 * 32)
140///     .and(warp::body::aggregate())
141///     .map(full_body);
142/// ```
143pub fn aggregate() -> impl Filter<Extract = (impl Buf,), Error = Rejection> + Copy {
144    body().and_then(|body: ::hyper::Body| {
145        hyper::body::aggregate(body).map_err(|err| {
146            tracing::debug!("aggregate error: {}", err);
147            reject::known(BodyReadError(err))
148        })
149    })
150}
151
152/// Returns a `Filter` that matches any request and extracts a `Future` of a
153/// JSON-decoded body.
154///
155/// # Warning
156///
157/// This does not have a default size limit, it would be wise to use one to
158/// prevent a overly large request from using too much memory.
159///
160/// # Example
161///
162/// ```
163/// use std::collections::HashMap;
164/// use warp::Filter;
165///
166/// let route = warp::body::content_length_limit(1024 * 32)
167///     .and(warp::body::json())
168///     .map(|simple_map: HashMap<String, String>| {
169///         "Got a JSON body!"
170///     });
171/// ```
172pub fn json<T: DeserializeOwned + Send>() -> impl Filter<Extract = (T,), Error = Rejection> + Copy {
173    is_content_type::<Json>()
174        .and(bytes())
175        .and_then(|buf| async move {
176            Json::decode(buf).map_err(|err| {
177                tracing::debug!("request json body error: {}", err);
178                reject::known(BodyDeserializeError { cause: err })
179            })
180        })
181}
182
183/// Returns a `Filter` that matches any request and extracts a
184/// `Future` of a form encoded body.
185///
186/// # Note
187///
188/// This filter is for the simpler `application/x-www-form-urlencoded` format,
189/// not `multipart/form-data`.
190///
191/// # Warning
192///
193/// This does not have a default size limit, it would be wise to use one to
194/// prevent a overly large request from using too much memory.
195///
196///
197/// ```
198/// use std::collections::HashMap;
199/// use warp::Filter;
200///
201/// let route = warp::body::content_length_limit(1024 * 32)
202///     .and(warp::body::form())
203///     .map(|simple_map: HashMap<String, String>| {
204///         "Got a urlencoded body!"
205///     });
206/// ```
207pub fn form<T: DeserializeOwned + Send>() -> impl Filter<Extract = (T,), Error = Rejection> + Copy {
208    is_content_type::<Form>()
209        .and(aggregate())
210        .and_then(|buf| async move {
211            Form::decode(buf).map_err(|err| {
212                tracing::debug!("request form body error: {}", err);
213                reject::known(BodyDeserializeError { cause: err })
214            })
215        })
216}
217
218// ===== Decoders =====
219
220trait Decode {
221    const MIME: (mime::Name<'static>, mime::Name<'static>);
222    const WITH_NO_CONTENT_TYPE: bool;
223
224    fn decode<B: Buf, T: DeserializeOwned>(buf: B) -> Result<T, BoxError>;
225}
226
227struct Json;
228
229impl Decode for Json {
230    const MIME: (mime::Name<'static>, mime::Name<'static>) = (mime::APPLICATION, mime::JSON);
231    const WITH_NO_CONTENT_TYPE: bool = true;
232
233    fn decode<B: Buf, T: DeserializeOwned>(mut buf: B) -> Result<T, BoxError> {
234        serde_json::from_slice(&buf.copy_to_bytes(buf.remaining())).map_err(Into::into)
235    }
236}
237
238struct Form;
239
240impl Decode for Form {
241    const MIME: (mime::Name<'static>, mime::Name<'static>) =
242        (mime::APPLICATION, mime::WWW_FORM_URLENCODED);
243    const WITH_NO_CONTENT_TYPE: bool = true;
244
245    fn decode<B: Buf, T: DeserializeOwned>(buf: B) -> Result<T, BoxError> {
246        serde_urlencoded::from_reader(buf.reader()).map_err(Into::into)
247    }
248}
249
250// Require the `content-type` header to be this type (or, if there's no `content-type`
251// header at all, optimistically hope it's the right type).
252fn is_content_type<D: Decode>() -> impl Filter<Extract = (), Error = Rejection> + Copy {
253    filter_fn(move |route| {
254        let (type_, subtype) = D::MIME;
255        if let Some(value) = route.headers().get(CONTENT_TYPE) {
256            tracing::trace!("is_content_type {}/{}? {:?}", type_, subtype, value);
257            let ct = value
258                .to_str()
259                .ok()
260                .and_then(|s| s.parse::<mime::Mime>().ok());
261            if let Some(ct) = ct {
262                if ct.type_() == type_ && ct.subtype() == subtype {
263                    future::ok(())
264                } else {
265                    tracing::debug!(
266                        "content-type {:?} doesn't match {}/{}",
267                        value,
268                        type_,
269                        subtype
270                    );
271                    future::err(reject::unsupported_media_type())
272                }
273            } else {
274                tracing::debug!("content-type {:?} couldn't be parsed", value);
275                future::err(reject::unsupported_media_type())
276            }
277        } else if D::WITH_NO_CONTENT_TYPE {
278            // Optimistically assume its correct!
279            tracing::trace!("no content-type header, assuming {}/{}", type_, subtype);
280            future::ok(())
281        } else {
282            tracing::debug!("no content-type found");
283            future::err(reject::unsupported_media_type())
284        }
285    })
286}
287
288// ===== BodyStream =====
289
290struct BodyStream {
291    body: Body,
292}
293
294impl Stream for BodyStream {
295    type Item = Result<Bytes, crate::Error>;
296
297    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
298        let opt_item = ready!(Pin::new(&mut self.get_mut().body).poll_next(cx));
299
300        match opt_item {
301            None => Poll::Ready(None),
302            Some(item) => {
303                let stream_buf = item.map_err(crate::Error::new);
304
305                Poll::Ready(Some(stream_buf))
306            }
307        }
308    }
309}
310
311// ===== Rejections =====
312
313/// An error used in rejections when deserializing a request body fails.
314#[derive(Debug)]
315pub struct BodyDeserializeError {
316    cause: BoxError,
317}
318
319impl fmt::Display for BodyDeserializeError {
320    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
321        write!(f, "Request body deserialize error: {}", self.cause)
322    }
323}
324
325impl StdError for BodyDeserializeError {
326    fn source(&self) -> Option<&(dyn StdError + 'static)> {
327        Some(self.cause.as_ref())
328    }
329}
330
331#[derive(Debug)]
332pub(crate) struct BodyReadError(::hyper::Error);
333
334impl fmt::Display for BodyReadError {
335    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
336        write!(f, "Request body read error: {}", self.0)
337    }
338}
339
340impl StdError for BodyReadError {}
341
342unit_error! {
343    pub(crate) BodyConsumedMultipleTimes: "Request body consumed multiple times"
344}