1use std::{
4 error::Error as StdError,
5 future::Future,
6 io::{self, Write as _},
7 pin::Pin,
8 task::{Context, Poll},
9};
10
11use actix_rt::task::{spawn_blocking, JoinHandle};
12use bytes::Bytes;
13use derive_more::Display;
14#[cfg(feature = "compress-gzip")]
15use flate2::write::{GzEncoder, ZlibEncoder};
16use futures_core::ready;
17use pin_project_lite::pin_project;
18use tracing::trace;
19#[cfg(feature = "compress-zstd")]
20use zstd::stream::write::Encoder as ZstdEncoder;
21
22use super::Writer;
23use crate::{
24 body::{self, BodySize, MessageBody},
25 header::{self, ContentEncoding, HeaderValue, CONTENT_ENCODING},
26 ResponseHead, StatusCode,
27};
28
29const MAX_CHUNK_SIZE_ENCODE_IN_PLACE: usize = 1024;
30
31pin_project! {
32 pub struct Encoder<B> {
33 #[pin]
34 body: EncoderBody<B>,
35 encoder: Option<ContentEncoder>,
36 fut: Option<JoinHandle<Result<ContentEncoder, io::Error>>>,
37 eof: bool,
38 }
39}
40
41impl<B: MessageBody> Encoder<B> {
42 fn none() -> Self {
43 Encoder {
44 body: EncoderBody::None {
45 body: body::None::new(),
46 },
47 encoder: None,
48 fut: None,
49 eof: true,
50 }
51 }
52
53 fn empty() -> Self {
54 Encoder {
55 body: EncoderBody::Full { body: Bytes::new() },
56 encoder: None,
57 fut: None,
58 eof: true,
59 }
60 }
61
62 pub fn response(encoding: ContentEncoding, head: &mut ResponseHead, body: B) -> Self {
63 match body.size() {
65 BodySize::None => return Self::none(),
66 BodySize::Sized(0) => return Self::empty(),
67 _ => {}
68 }
69
70 let should_encode = !(head.headers().contains_key(&CONTENT_ENCODING)
71 || head.status == StatusCode::SWITCHING_PROTOCOLS
72 || head.status == StatusCode::NO_CONTENT
73 || encoding == ContentEncoding::Identity);
74
75 let body = match body.try_into_bytes() {
76 Ok(body) => EncoderBody::Full { body },
77 Err(body) => EncoderBody::Stream { body },
78 };
79
80 if should_encode {
81 if let Some(enc) = ContentEncoder::select(encoding) {
83 update_head(encoding, head);
84
85 return Encoder {
86 body,
87 encoder: Some(enc),
88 fut: None,
89 eof: false,
90 };
91 }
92 }
93
94 Encoder {
95 body,
96 encoder: None,
97 fut: None,
98 eof: false,
99 }
100 }
101}
102
103pin_project! {
104 #[project = EncoderBodyProj]
105 enum EncoderBody<B> {
106 None { body: body::None },
107 Full { body: Bytes },
108 Stream { #[pin] body: B },
109 }
110}
111
112impl<B> MessageBody for EncoderBody<B>
113where
114 B: MessageBody,
115{
116 type Error = EncoderError;
117
118 #[inline]
119 fn size(&self) -> BodySize {
120 match self {
121 EncoderBody::None { body } => body.size(),
122 EncoderBody::Full { body } => body.size(),
123 EncoderBody::Stream { body } => body.size(),
124 }
125 }
126
127 fn poll_next(
128 self: Pin<&mut Self>,
129 cx: &mut Context<'_>,
130 ) -> Poll<Option<Result<Bytes, Self::Error>>> {
131 match self.project() {
132 EncoderBodyProj::None { body } => {
133 Pin::new(body).poll_next(cx).map_err(|err| match err {})
134 }
135 EncoderBodyProj::Full { body } => {
136 Pin::new(body).poll_next(cx).map_err(|err| match err {})
137 }
138 EncoderBodyProj::Stream { body } => body
139 .poll_next(cx)
140 .map_err(|err| EncoderError::Body(err.into())),
141 }
142 }
143
144 #[inline]
145 fn try_into_bytes(self) -> Result<Bytes, Self>
146 where
147 Self: Sized,
148 {
149 match self {
150 EncoderBody::None { body } => Ok(body.try_into_bytes().unwrap()),
151 EncoderBody::Full { body } => Ok(body.try_into_bytes().unwrap()),
152 _ => Err(self),
153 }
154 }
155}
156
157impl<B> MessageBody for Encoder<B>
158where
159 B: MessageBody,
160{
161 type Error = EncoderError;
162
163 #[inline]
164 fn size(&self) -> BodySize {
165 if self.encoder.is_some() {
166 BodySize::Stream
167 } else {
168 self.body.size()
169 }
170 }
171
172 fn poll_next(
173 self: Pin<&mut Self>,
174 cx: &mut Context<'_>,
175 ) -> Poll<Option<Result<Bytes, Self::Error>>> {
176 let mut this = self.project();
177
178 loop {
179 if *this.eof {
180 return Poll::Ready(None);
181 }
182
183 if let Some(ref mut fut) = this.fut {
184 let mut encoder = ready!(Pin::new(fut).poll(cx))
185 .map_err(|_| {
186 EncoderError::Io(io::Error::new(
187 io::ErrorKind::Other,
188 "Blocking task was cancelled unexpectedly",
189 ))
190 })?
191 .map_err(EncoderError::Io)?;
192
193 let chunk = encoder.take();
194 *this.encoder = Some(encoder);
195 this.fut.take();
196
197 if !chunk.is_empty() {
198 return Poll::Ready(Some(Ok(chunk)));
199 }
200 }
201
202 let result = ready!(this.body.as_mut().poll_next(cx));
203
204 match result {
205 Some(Err(err)) => return Poll::Ready(Some(Err(err))),
206
207 Some(Ok(chunk)) => {
208 if let Some(mut encoder) = this.encoder.take() {
209 if chunk.len() < MAX_CHUNK_SIZE_ENCODE_IN_PLACE {
210 encoder.write(&chunk).map_err(EncoderError::Io)?;
211 let chunk = encoder.take();
212 *this.encoder = Some(encoder);
213
214 if !chunk.is_empty() {
215 return Poll::Ready(Some(Ok(chunk)));
216 }
217 } else {
218 *this.fut = Some(spawn_blocking(move || {
219 encoder.write(&chunk)?;
220 Ok(encoder)
221 }));
222 }
223 } else {
224 return Poll::Ready(Some(Ok(chunk)));
225 }
226 }
227
228 None => {
229 if let Some(encoder) = this.encoder.take() {
230 let chunk = encoder.finish().map_err(EncoderError::Io)?;
231
232 if chunk.is_empty() {
233 return Poll::Ready(None);
234 } else {
235 *this.eof = true;
236 return Poll::Ready(Some(Ok(chunk)));
237 }
238 } else {
239 return Poll::Ready(None);
240 }
241 }
242 }
243 }
244 }
245
246 #[inline]
247 fn try_into_bytes(mut self) -> Result<Bytes, Self>
248 where
249 Self: Sized,
250 {
251 if self.encoder.is_some() {
252 Err(self)
253 } else {
254 match self.body.try_into_bytes() {
255 Ok(body) => Ok(body),
256 Err(body) => {
257 self.body = body;
258 Err(self)
259 }
260 }
261 }
262 }
263}
264
265fn update_head(encoding: ContentEncoding, head: &mut ResponseHead) {
266 head.headers_mut()
267 .insert(header::CONTENT_ENCODING, encoding.to_header_value());
268 head.headers_mut()
269 .append(header::VARY, HeaderValue::from_static("accept-encoding"));
270
271 head.no_chunking(false);
272}
273
274enum ContentEncoder {
275 #[cfg(feature = "compress-gzip")]
276 Deflate(ZlibEncoder<Writer>),
277
278 #[cfg(feature = "compress-gzip")]
279 Gzip(GzEncoder<Writer>),
280
281 #[cfg(feature = "compress-brotli")]
282 Brotli(Box<brotli::CompressorWriter<Writer>>),
283
284 #[cfg(feature = "compress-zstd")]
287 Zstd(ZstdEncoder<'static, Writer>),
288}
289
290impl ContentEncoder {
291 fn select(encoding: ContentEncoding) -> Option<Self> {
292 match encoding {
293 #[cfg(feature = "compress-gzip")]
294 ContentEncoding::Deflate => Some(ContentEncoder::Deflate(ZlibEncoder::new(
295 Writer::new(),
296 flate2::Compression::fast(),
297 ))),
298
299 #[cfg(feature = "compress-gzip")]
300 ContentEncoding::Gzip => Some(ContentEncoder::Gzip(GzEncoder::new(
301 Writer::new(),
302 flate2::Compression::fast(),
303 ))),
304
305 #[cfg(feature = "compress-brotli")]
306 ContentEncoding::Brotli => Some(ContentEncoder::Brotli(new_brotli_compressor())),
307
308 #[cfg(feature = "compress-zstd")]
309 ContentEncoding::Zstd => {
310 let encoder = ZstdEncoder::new(Writer::new(), 3).ok()?;
311 Some(ContentEncoder::Zstd(encoder))
312 }
313
314 _ => None,
315 }
316 }
317
318 #[inline]
319 pub(crate) fn take(&mut self) -> Bytes {
320 match *self {
321 #[cfg(feature = "compress-brotli")]
322 ContentEncoder::Brotli(ref mut encoder) => encoder.get_mut().take(),
323
324 #[cfg(feature = "compress-gzip")]
325 ContentEncoder::Deflate(ref mut encoder) => encoder.get_mut().take(),
326
327 #[cfg(feature = "compress-gzip")]
328 ContentEncoder::Gzip(ref mut encoder) => encoder.get_mut().take(),
329
330 #[cfg(feature = "compress-zstd")]
331 ContentEncoder::Zstd(ref mut encoder) => encoder.get_mut().take(),
332 }
333 }
334
335 fn finish(self) -> Result<Bytes, io::Error> {
336 match self {
337 #[cfg(feature = "compress-brotli")]
338 ContentEncoder::Brotli(mut encoder) => match encoder.flush() {
339 Ok(()) => Ok(encoder.into_inner().buf.freeze()),
340 Err(err) => Err(err),
341 },
342
343 #[cfg(feature = "compress-gzip")]
344 ContentEncoder::Gzip(encoder) => match encoder.finish() {
345 Ok(writer) => Ok(writer.buf.freeze()),
346 Err(err) => Err(err),
347 },
348
349 #[cfg(feature = "compress-gzip")]
350 ContentEncoder::Deflate(encoder) => match encoder.finish() {
351 Ok(writer) => Ok(writer.buf.freeze()),
352 Err(err) => Err(err),
353 },
354
355 #[cfg(feature = "compress-zstd")]
356 ContentEncoder::Zstd(encoder) => match encoder.finish() {
357 Ok(writer) => Ok(writer.buf.freeze()),
358 Err(err) => Err(err),
359 },
360 }
361 }
362
363 fn write(&mut self, data: &[u8]) -> Result<(), io::Error> {
364 match *self {
365 #[cfg(feature = "compress-brotli")]
366 ContentEncoder::Brotli(ref mut encoder) => match encoder.write_all(data) {
367 Ok(_) => Ok(()),
368 Err(err) => {
369 trace!("Error decoding br encoding: {}", err);
370 Err(err)
371 }
372 },
373
374 #[cfg(feature = "compress-gzip")]
375 ContentEncoder::Gzip(ref mut encoder) => match encoder.write_all(data) {
376 Ok(_) => Ok(()),
377 Err(err) => {
378 trace!("Error decoding gzip encoding: {}", err);
379 Err(err)
380 }
381 },
382
383 #[cfg(feature = "compress-gzip")]
384 ContentEncoder::Deflate(ref mut encoder) => match encoder.write_all(data) {
385 Ok(_) => Ok(()),
386 Err(err) => {
387 trace!("Error decoding deflate encoding: {}", err);
388 Err(err)
389 }
390 },
391
392 #[cfg(feature = "compress-zstd")]
393 ContentEncoder::Zstd(ref mut encoder) => match encoder.write_all(data) {
394 Ok(_) => Ok(()),
395 Err(err) => {
396 trace!("Error decoding ztsd encoding: {}", err);
397 Err(err)
398 }
399 },
400 }
401 }
402}
403
404#[cfg(feature = "compress-brotli")]
405fn new_brotli_compressor() -> Box<brotli::CompressorWriter<Writer>> {
406 Box::new(brotli::CompressorWriter::new(
407 Writer::new(),
408 32 * 1024, 3, 22, ))
412}
413
414#[derive(Debug, Display)]
415#[non_exhaustive]
416pub enum EncoderError {
417 #[display("body")]
419 Body(Box<dyn StdError>),
420
421 #[display("io")]
423 Io(io::Error),
424}
425
426impl StdError for EncoderError {
427 fn source(&self) -> Option<&(dyn StdError + 'static)> {
428 match self {
429 EncoderError::Body(err) => Some(&**err),
430 EncoderError::Io(err) => Some(err),
431 }
432 }
433}
434
435impl From<EncoderError> for crate::Error {
436 fn from(err: EncoderError) -> Self {
437 crate::Error::new_encoder().with_cause(err)
438 }
439}