actix_http/encoding/
encoder.rs

1//! Stream encoders.
2
3use std::{
4    error::Error as StdError,
5    future::Future,
6    io::{self, Write as _},
7    pin::Pin,
8    task::{Context, Poll},
9};
10
11use actix_rt::task::{spawn_blocking, JoinHandle};
12use bytes::Bytes;
13use derive_more::Display;
14#[cfg(feature = "compress-gzip")]
15use flate2::write::{GzEncoder, ZlibEncoder};
16use futures_core::ready;
17use pin_project_lite::pin_project;
18use tracing::trace;
19#[cfg(feature = "compress-zstd")]
20use zstd::stream::write::Encoder as ZstdEncoder;
21
22use super::Writer;
23use crate::{
24    body::{self, BodySize, MessageBody},
25    header::{self, ContentEncoding, HeaderValue, CONTENT_ENCODING},
26    ResponseHead, StatusCode,
27};
28
29const MAX_CHUNK_SIZE_ENCODE_IN_PLACE: usize = 1024;
30
31pin_project! {
32    pub struct Encoder<B> {
33        #[pin]
34        body: EncoderBody<B>,
35        encoder: Option<ContentEncoder>,
36        fut: Option<JoinHandle<Result<ContentEncoder, io::Error>>>,
37        eof: bool,
38    }
39}
40
41impl<B: MessageBody> Encoder<B> {
42    fn none() -> Self {
43        Encoder {
44            body: EncoderBody::None {
45                body: body::None::new(),
46            },
47            encoder: None,
48            fut: None,
49            eof: true,
50        }
51    }
52
53    fn empty() -> Self {
54        Encoder {
55            body: EncoderBody::Full { body: Bytes::new() },
56            encoder: None,
57            fut: None,
58            eof: true,
59        }
60    }
61
62    pub fn response(encoding: ContentEncoding, head: &mut ResponseHead, body: B) -> Self {
63        // no need to compress empty bodies
64        match body.size() {
65            BodySize::None => return Self::none(),
66            BodySize::Sized(0) => return Self::empty(),
67            _ => {}
68        }
69
70        let should_encode = !(head.headers().contains_key(&CONTENT_ENCODING)
71            || head.status == StatusCode::SWITCHING_PROTOCOLS
72            || head.status == StatusCode::NO_CONTENT
73            || encoding == ContentEncoding::Identity);
74
75        let body = match body.try_into_bytes() {
76            Ok(body) => EncoderBody::Full { body },
77            Err(body) => EncoderBody::Stream { body },
78        };
79
80        if should_encode {
81            // wrap body only if encoder is feature-enabled
82            if let Some(enc) = ContentEncoder::select(encoding) {
83                update_head(encoding, head);
84
85                return Encoder {
86                    body,
87                    encoder: Some(enc),
88                    fut: None,
89                    eof: false,
90                };
91            }
92        }
93
94        Encoder {
95            body,
96            encoder: None,
97            fut: None,
98            eof: false,
99        }
100    }
101}
102
103pin_project! {
104    #[project = EncoderBodyProj]
105    enum EncoderBody<B> {
106        None { body: body::None },
107        Full { body: Bytes },
108        Stream { #[pin] body: B },
109    }
110}
111
112impl<B> MessageBody for EncoderBody<B>
113where
114    B: MessageBody,
115{
116    type Error = EncoderError;
117
118    #[inline]
119    fn size(&self) -> BodySize {
120        match self {
121            EncoderBody::None { body } => body.size(),
122            EncoderBody::Full { body } => body.size(),
123            EncoderBody::Stream { body } => body.size(),
124        }
125    }
126
127    fn poll_next(
128        self: Pin<&mut Self>,
129        cx: &mut Context<'_>,
130    ) -> Poll<Option<Result<Bytes, Self::Error>>> {
131        match self.project() {
132            EncoderBodyProj::None { body } => {
133                Pin::new(body).poll_next(cx).map_err(|err| match err {})
134            }
135            EncoderBodyProj::Full { body } => {
136                Pin::new(body).poll_next(cx).map_err(|err| match err {})
137            }
138            EncoderBodyProj::Stream { body } => body
139                .poll_next(cx)
140                .map_err(|err| EncoderError::Body(err.into())),
141        }
142    }
143
144    #[inline]
145    fn try_into_bytes(self) -> Result<Bytes, Self>
146    where
147        Self: Sized,
148    {
149        match self {
150            EncoderBody::None { body } => Ok(body.try_into_bytes().unwrap()),
151            EncoderBody::Full { body } => Ok(body.try_into_bytes().unwrap()),
152            _ => Err(self),
153        }
154    }
155}
156
157impl<B> MessageBody for Encoder<B>
158where
159    B: MessageBody,
160{
161    type Error = EncoderError;
162
163    #[inline]
164    fn size(&self) -> BodySize {
165        if self.encoder.is_some() {
166            BodySize::Stream
167        } else {
168            self.body.size()
169        }
170    }
171
172    fn poll_next(
173        self: Pin<&mut Self>,
174        cx: &mut Context<'_>,
175    ) -> Poll<Option<Result<Bytes, Self::Error>>> {
176        let mut this = self.project();
177
178        loop {
179            if *this.eof {
180                return Poll::Ready(None);
181            }
182
183            if let Some(ref mut fut) = this.fut {
184                let mut encoder = ready!(Pin::new(fut).poll(cx))
185                    .map_err(|_| {
186                        EncoderError::Io(io::Error::new(
187                            io::ErrorKind::Other,
188                            "Blocking task was cancelled unexpectedly",
189                        ))
190                    })?
191                    .map_err(EncoderError::Io)?;
192
193                let chunk = encoder.take();
194                *this.encoder = Some(encoder);
195                this.fut.take();
196
197                if !chunk.is_empty() {
198                    return Poll::Ready(Some(Ok(chunk)));
199                }
200            }
201
202            let result = ready!(this.body.as_mut().poll_next(cx));
203
204            match result {
205                Some(Err(err)) => return Poll::Ready(Some(Err(err))),
206
207                Some(Ok(chunk)) => {
208                    if let Some(mut encoder) = this.encoder.take() {
209                        if chunk.len() < MAX_CHUNK_SIZE_ENCODE_IN_PLACE {
210                            encoder.write(&chunk).map_err(EncoderError::Io)?;
211                            let chunk = encoder.take();
212                            *this.encoder = Some(encoder);
213
214                            if !chunk.is_empty() {
215                                return Poll::Ready(Some(Ok(chunk)));
216                            }
217                        } else {
218                            *this.fut = Some(spawn_blocking(move || {
219                                encoder.write(&chunk)?;
220                                Ok(encoder)
221                            }));
222                        }
223                    } else {
224                        return Poll::Ready(Some(Ok(chunk)));
225                    }
226                }
227
228                None => {
229                    if let Some(encoder) = this.encoder.take() {
230                        let chunk = encoder.finish().map_err(EncoderError::Io)?;
231
232                        if chunk.is_empty() {
233                            return Poll::Ready(None);
234                        } else {
235                            *this.eof = true;
236                            return Poll::Ready(Some(Ok(chunk)));
237                        }
238                    } else {
239                        return Poll::Ready(None);
240                    }
241                }
242            }
243        }
244    }
245
246    #[inline]
247    fn try_into_bytes(mut self) -> Result<Bytes, Self>
248    where
249        Self: Sized,
250    {
251        if self.encoder.is_some() {
252            Err(self)
253        } else {
254            match self.body.try_into_bytes() {
255                Ok(body) => Ok(body),
256                Err(body) => {
257                    self.body = body;
258                    Err(self)
259                }
260            }
261        }
262    }
263}
264
265fn update_head(encoding: ContentEncoding, head: &mut ResponseHead) {
266    head.headers_mut()
267        .insert(header::CONTENT_ENCODING, encoding.to_header_value());
268    head.headers_mut()
269        .append(header::VARY, HeaderValue::from_static("accept-encoding"));
270
271    head.no_chunking(false);
272}
273
274enum ContentEncoder {
275    #[cfg(feature = "compress-gzip")]
276    Deflate(ZlibEncoder<Writer>),
277
278    #[cfg(feature = "compress-gzip")]
279    Gzip(GzEncoder<Writer>),
280
281    #[cfg(feature = "compress-brotli")]
282    Brotli(Box<brotli::CompressorWriter<Writer>>),
283
284    // Wwe need explicit 'static lifetime here because ZstdEncoder needs a lifetime argument and we
285    // use `spawn_blocking` in `Encoder::poll_next` that requires `FnOnce() -> R + Send + 'static`.
286    #[cfg(feature = "compress-zstd")]
287    Zstd(ZstdEncoder<'static, Writer>),
288}
289
290impl ContentEncoder {
291    fn select(encoding: ContentEncoding) -> Option<Self> {
292        match encoding {
293            #[cfg(feature = "compress-gzip")]
294            ContentEncoding::Deflate => Some(ContentEncoder::Deflate(ZlibEncoder::new(
295                Writer::new(),
296                flate2::Compression::fast(),
297            ))),
298
299            #[cfg(feature = "compress-gzip")]
300            ContentEncoding::Gzip => Some(ContentEncoder::Gzip(GzEncoder::new(
301                Writer::new(),
302                flate2::Compression::fast(),
303            ))),
304
305            #[cfg(feature = "compress-brotli")]
306            ContentEncoding::Brotli => Some(ContentEncoder::Brotli(new_brotli_compressor())),
307
308            #[cfg(feature = "compress-zstd")]
309            ContentEncoding::Zstd => {
310                let encoder = ZstdEncoder::new(Writer::new(), 3).ok()?;
311                Some(ContentEncoder::Zstd(encoder))
312            }
313
314            _ => None,
315        }
316    }
317
318    #[inline]
319    pub(crate) fn take(&mut self) -> Bytes {
320        match *self {
321            #[cfg(feature = "compress-brotli")]
322            ContentEncoder::Brotli(ref mut encoder) => encoder.get_mut().take(),
323
324            #[cfg(feature = "compress-gzip")]
325            ContentEncoder::Deflate(ref mut encoder) => encoder.get_mut().take(),
326
327            #[cfg(feature = "compress-gzip")]
328            ContentEncoder::Gzip(ref mut encoder) => encoder.get_mut().take(),
329
330            #[cfg(feature = "compress-zstd")]
331            ContentEncoder::Zstd(ref mut encoder) => encoder.get_mut().take(),
332        }
333    }
334
335    fn finish(self) -> Result<Bytes, io::Error> {
336        match self {
337            #[cfg(feature = "compress-brotli")]
338            ContentEncoder::Brotli(mut encoder) => match encoder.flush() {
339                Ok(()) => Ok(encoder.into_inner().buf.freeze()),
340                Err(err) => Err(err),
341            },
342
343            #[cfg(feature = "compress-gzip")]
344            ContentEncoder::Gzip(encoder) => match encoder.finish() {
345                Ok(writer) => Ok(writer.buf.freeze()),
346                Err(err) => Err(err),
347            },
348
349            #[cfg(feature = "compress-gzip")]
350            ContentEncoder::Deflate(encoder) => match encoder.finish() {
351                Ok(writer) => Ok(writer.buf.freeze()),
352                Err(err) => Err(err),
353            },
354
355            #[cfg(feature = "compress-zstd")]
356            ContentEncoder::Zstd(encoder) => match encoder.finish() {
357                Ok(writer) => Ok(writer.buf.freeze()),
358                Err(err) => Err(err),
359            },
360        }
361    }
362
363    fn write(&mut self, data: &[u8]) -> Result<(), io::Error> {
364        match *self {
365            #[cfg(feature = "compress-brotli")]
366            ContentEncoder::Brotli(ref mut encoder) => match encoder.write_all(data) {
367                Ok(_) => Ok(()),
368                Err(err) => {
369                    trace!("Error decoding br encoding: {}", err);
370                    Err(err)
371                }
372            },
373
374            #[cfg(feature = "compress-gzip")]
375            ContentEncoder::Gzip(ref mut encoder) => match encoder.write_all(data) {
376                Ok(_) => Ok(()),
377                Err(err) => {
378                    trace!("Error decoding gzip encoding: {}", err);
379                    Err(err)
380                }
381            },
382
383            #[cfg(feature = "compress-gzip")]
384            ContentEncoder::Deflate(ref mut encoder) => match encoder.write_all(data) {
385                Ok(_) => Ok(()),
386                Err(err) => {
387                    trace!("Error decoding deflate encoding: {}", err);
388                    Err(err)
389                }
390            },
391
392            #[cfg(feature = "compress-zstd")]
393            ContentEncoder::Zstd(ref mut encoder) => match encoder.write_all(data) {
394                Ok(_) => Ok(()),
395                Err(err) => {
396                    trace!("Error decoding ztsd encoding: {}", err);
397                    Err(err)
398                }
399            },
400        }
401    }
402}
403
404#[cfg(feature = "compress-brotli")]
405fn new_brotli_compressor() -> Box<brotli::CompressorWriter<Writer>> {
406    Box::new(brotli::CompressorWriter::new(
407        Writer::new(),
408        32 * 1024, // 32 KiB buffer
409        3,         // BROTLI_PARAM_QUALITY
410        22,        // BROTLI_PARAM_LGWIN
411    ))
412}
413
414#[derive(Debug, Display)]
415#[non_exhaustive]
416pub enum EncoderError {
417    /// Wrapped body stream error.
418    #[display("body")]
419    Body(Box<dyn StdError>),
420
421    /// Generic I/O error.
422    #[display("io")]
423    Io(io::Error),
424}
425
426impl StdError for EncoderError {
427    fn source(&self) -> Option<&(dyn StdError + 'static)> {
428        match self {
429            EncoderError::Body(err) => Some(&**err),
430            EncoderError::Io(err) => Some(err),
431        }
432    }
433}
434
435impl From<EncoderError> for crate::Error {
436    fn from(err: EncoderError) -> Self {
437        crate::Error::new_encoder().with_cause(err)
438    }
439}