1use std::error::Error as StdError;
6use std::fmt;
7use std::pin::Pin;
8use std::task::{Context, Poll};
9
10use bytes::{Buf, Bytes};
11use futures_util::{future, ready, Stream, TryFutureExt};
12use headers::ContentLength;
13use http::header::CONTENT_TYPE;
14use hyper::Body;
15use serde::de::DeserializeOwned;
16
17use crate::filter::{filter_fn, filter_fn_one, Filter, FilterBase};
18use crate::reject::{self, Rejection};
19
20type BoxError = Box<dyn StdError + Send + Sync>;
21
22pub(crate) fn body() -> impl Filter<Extract = (Body,), Error = Rejection> + Copy {
26 filter_fn_one(|route| {
27 future::ready(route.take_body().ok_or_else(|| {
28 tracing::error!("request body already taken in previous filter");
29 reject::known(BodyConsumedMultipleTimes { _p: () })
30 }))
31 })
32}
33
34pub fn content_length_limit(limit: u64) -> impl Filter<Extract = (), Error = Rejection> + Copy {
49 crate::filters::header::header2()
50 .map_err(crate::filter::Internal, |_| {
51 tracing::debug!("content-length missing");
52 reject::length_required()
53 })
54 .and_then(move |ContentLength(length)| {
55 if length <= limit {
56 future::ok(())
57 } else {
58 tracing::debug!("content-length: {} is over limit {}", length, limit);
59 future::err(reject::payload_too_large())
60 }
61 })
62 .untuple_one()
63}
64
65pub fn stream(
77) -> impl Filter<Extract = (impl Stream<Item = Result<impl Buf, crate::Error>>,), Error = Rejection> + Copy
78{
79 body().map(|body: Body| BodyStream { body })
80}
81
82pub fn bytes() -> impl Filter<Extract = (Bytes,), Error = Rejection> + Copy {
106 body().and_then(|body: hyper::Body| {
107 hyper::body::to_bytes(body).map_err(|err| {
108 tracing::debug!("to_bytes error: {}", err);
109 reject::known(BodyReadError(err))
110 })
111 })
112}
113
114pub fn aggregate() -> impl Filter<Extract = (impl Buf,), Error = Rejection> + Copy {
144 body().and_then(|body: ::hyper::Body| {
145 hyper::body::aggregate(body).map_err(|err| {
146 tracing::debug!("aggregate error: {}", err);
147 reject::known(BodyReadError(err))
148 })
149 })
150}
151
152pub fn json<T: DeserializeOwned + Send>() -> impl Filter<Extract = (T,), Error = Rejection> + Copy {
173 is_content_type::<Json>()
174 .and(bytes())
175 .and_then(|buf| async move {
176 Json::decode(buf).map_err(|err| {
177 tracing::debug!("request json body error: {}", err);
178 reject::known(BodyDeserializeError { cause: err })
179 })
180 })
181}
182
183pub fn form<T: DeserializeOwned + Send>() -> impl Filter<Extract = (T,), Error = Rejection> + Copy {
208 is_content_type::<Form>()
209 .and(aggregate())
210 .and_then(|buf| async move {
211 Form::decode(buf).map_err(|err| {
212 tracing::debug!("request form body error: {}", err);
213 reject::known(BodyDeserializeError { cause: err })
214 })
215 })
216}
217
218trait Decode {
221 const MIME: (mime::Name<'static>, mime::Name<'static>);
222 const WITH_NO_CONTENT_TYPE: bool;
223
224 fn decode<B: Buf, T: DeserializeOwned>(buf: B) -> Result<T, BoxError>;
225}
226
227struct Json;
228
229impl Decode for Json {
230 const MIME: (mime::Name<'static>, mime::Name<'static>) = (mime::APPLICATION, mime::JSON);
231 const WITH_NO_CONTENT_TYPE: bool = true;
232
233 fn decode<B: Buf, T: DeserializeOwned>(mut buf: B) -> Result<T, BoxError> {
234 serde_json::from_slice(&buf.copy_to_bytes(buf.remaining())).map_err(Into::into)
235 }
236}
237
238struct Form;
239
240impl Decode for Form {
241 const MIME: (mime::Name<'static>, mime::Name<'static>) =
242 (mime::APPLICATION, mime::WWW_FORM_URLENCODED);
243 const WITH_NO_CONTENT_TYPE: bool = true;
244
245 fn decode<B: Buf, T: DeserializeOwned>(buf: B) -> Result<T, BoxError> {
246 serde_urlencoded::from_reader(buf.reader()).map_err(Into::into)
247 }
248}
249
250fn is_content_type<D: Decode>() -> impl Filter<Extract = (), Error = Rejection> + Copy {
253 filter_fn(move |route| {
254 let (type_, subtype) = D::MIME;
255 if let Some(value) = route.headers().get(CONTENT_TYPE) {
256 tracing::trace!("is_content_type {}/{}? {:?}", type_, subtype, value);
257 let ct = value
258 .to_str()
259 .ok()
260 .and_then(|s| s.parse::<mime::Mime>().ok());
261 if let Some(ct) = ct {
262 if ct.type_() == type_ && ct.subtype() == subtype {
263 future::ok(())
264 } else {
265 tracing::debug!(
266 "content-type {:?} doesn't match {}/{}",
267 value,
268 type_,
269 subtype
270 );
271 future::err(reject::unsupported_media_type())
272 }
273 } else {
274 tracing::debug!("content-type {:?} couldn't be parsed", value);
275 future::err(reject::unsupported_media_type())
276 }
277 } else if D::WITH_NO_CONTENT_TYPE {
278 tracing::trace!("no content-type header, assuming {}/{}", type_, subtype);
280 future::ok(())
281 } else {
282 tracing::debug!("no content-type found");
283 future::err(reject::unsupported_media_type())
284 }
285 })
286}
287
288struct BodyStream {
291 body: Body,
292}
293
294impl Stream for BodyStream {
295 type Item = Result<Bytes, crate::Error>;
296
297 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
298 let opt_item = ready!(Pin::new(&mut self.get_mut().body).poll_next(cx));
299
300 match opt_item {
301 None => Poll::Ready(None),
302 Some(item) => {
303 let stream_buf = item.map_err(crate::Error::new);
304
305 Poll::Ready(Some(stream_buf))
306 }
307 }
308 }
309}
310
311#[derive(Debug)]
315pub struct BodyDeserializeError {
316 cause: BoxError,
317}
318
319impl fmt::Display for BodyDeserializeError {
320 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
321 write!(f, "Request body deserialize error: {}", self.cause)
322 }
323}
324
325impl StdError for BodyDeserializeError {
326 fn source(&self) -> Option<&(dyn StdError + 'static)> {
327 Some(self.cause.as_ref())
328 }
329}
330
331#[derive(Debug)]
332pub(crate) struct BodyReadError(::hyper::Error);
333
334impl fmt::Display for BodyReadError {
335 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
336 write!(f, "Request body read error: {}", self.0)
337 }
338}
339
340impl StdError for BodyReadError {}
341
342unit_error! {
343 pub(crate) BodyConsumedMultipleTimes: "Request body consumed multiple times"
344}