tokio_util/codec/framed.rs
1use crate::codec::decoder::Decoder;
2use crate::codec::encoder::Encoder;
3use crate::codec::framed_impl::{FramedImpl, RWFrames, ReadFrame, WriteFrame};
4
5use futures_core::Stream;
6use tokio::io::{AsyncRead, AsyncWrite};
7
8use bytes::BytesMut;
9use futures_sink::Sink;
10use pin_project_lite::pin_project;
11use std::fmt;
12use std::io;
13use std::pin::Pin;
14use std::task::{Context, Poll};
15
16pin_project! {
17 /// A unified [`Stream`] and [`Sink`] interface to an underlying I/O object, using
18 /// the `Encoder` and `Decoder` traits to encode and decode frames.
19 ///
20 /// You can create a `Framed` instance by using the [`Decoder::framed`] adapter, or
21 /// by using the `new` function seen below.
22 ///
23 /// # Cancellation safety
24 ///
25 /// * [`futures_util::sink::SinkExt::send`]: if send is used as the event in a
26 /// `tokio::select!` statement and some other branch completes first, then it is
27 /// guaranteed that the message was not sent, but the message itself is lost.
28 /// * [`tokio_stream::StreamExt::next`]: This method is cancel safe. The returned
29 /// future only holds onto a reference to the underlying stream, so dropping it will
30 /// never lose a value.
31 ///
32 /// [`Stream`]: futures_core::Stream
33 /// [`Sink`]: futures_sink::Sink
34 /// [`AsyncRead`]: tokio::io::AsyncRead
35 /// [`Decoder::framed`]: crate::codec::Decoder::framed()
36 /// [`futures_util::sink::SinkExt::send`]: futures_util::sink::SinkExt::send
37 /// [`tokio_stream::StreamExt::next`]: https://docs.rs/tokio-stream/latest/tokio_stream/trait.StreamExt.html#method.next
38 pub struct Framed<T, U> {
39 #[pin]
40 inner: FramedImpl<T, U, RWFrames>
41 }
42}
43
44impl<T, U> Framed<T, U>
45where
46 T: AsyncRead + AsyncWrite,
47{
48 /// Provides a [`Stream`] and [`Sink`] interface for reading and writing to this
49 /// I/O object, using [`Decoder`] and [`Encoder`] to read and write the raw data.
50 ///
51 /// Raw I/O objects work with byte sequences, but higher-level code usually
52 /// wants to batch these into meaningful chunks, called "frames". This
53 /// method layers framing on top of an I/O object, by using the codec
54 /// traits to handle encoding and decoding of messages frames. Note that
55 /// the incoming and outgoing frame types may be distinct.
56 ///
57 /// This function returns a *single* object that is both [`Stream`] and
58 /// [`Sink`]; grouping this into a single object is often useful for layering
59 /// things like gzip or TLS, which require both read and write access to the
60 /// underlying object.
61 ///
62 /// If you want to work more directly with the streams and sink, consider
63 /// calling [`split`] on the `Framed` returned by this method, which will
64 /// break them into separate objects, allowing them to interact more easily.
65 ///
66 /// Note that, for some byte sources, the stream can be resumed after an EOF
67 /// by reading from it, even after it has returned `None`. Repeated attempts
68 /// to do so, without new data available, continue to return `None` without
69 /// creating more (closing) frames.
70 ///
71 /// [`Stream`]: futures_core::Stream
72 /// [`Sink`]: futures_sink::Sink
73 /// [`Decode`]: crate::codec::Decoder
74 /// [`Encoder`]: crate::codec::Encoder
75 /// [`split`]: https://docs.rs/futures/0.3/futures/stream/trait.StreamExt.html#method.split
76 pub fn new(inner: T, codec: U) -> Framed<T, U> {
77 Framed {
78 inner: FramedImpl {
79 inner,
80 codec,
81 state: Default::default(),
82 },
83 }
84 }
85
86 /// Provides a [`Stream`] and [`Sink`] interface for reading and writing to this
87 /// I/O object, using [`Decoder`] and [`Encoder`] to read and write the raw data,
88 /// with a specific read buffer initial capacity.
89 ///
90 /// Raw I/O objects work with byte sequences, but higher-level code usually
91 /// wants to batch these into meaningful chunks, called "frames". This
92 /// method layers framing on top of an I/O object, by using the codec
93 /// traits to handle encoding and decoding of messages frames. Note that
94 /// the incoming and outgoing frame types may be distinct.
95 ///
96 /// This function returns a *single* object that is both [`Stream`] and
97 /// [`Sink`]; grouping this into a single object is often useful for layering
98 /// things like gzip or TLS, which require both read and write access to the
99 /// underlying object.
100 ///
101 /// If you want to work more directly with the streams and sink, consider
102 /// calling [`split`] on the `Framed` returned by this method, which will
103 /// break them into separate objects, allowing them to interact more easily.
104 ///
105 /// [`Stream`]: futures_core::Stream
106 /// [`Sink`]: futures_sink::Sink
107 /// [`Decode`]: crate::codec::Decoder
108 /// [`Encoder`]: crate::codec::Encoder
109 /// [`split`]: https://docs.rs/futures/0.3/futures/stream/trait.StreamExt.html#method.split
110 pub fn with_capacity(inner: T, codec: U, capacity: usize) -> Framed<T, U> {
111 Framed {
112 inner: FramedImpl {
113 inner,
114 codec,
115 state: RWFrames {
116 read: ReadFrame {
117 eof: false,
118 is_readable: false,
119 buffer: BytesMut::with_capacity(capacity),
120 has_errored: false,
121 },
122 write: WriteFrame {
123 buffer: BytesMut::with_capacity(capacity),
124 backpressure_boundary: capacity,
125 },
126 },
127 },
128 }
129 }
130}
131
132impl<T, U> Framed<T, U> {
133 /// Provides a [`Stream`] and [`Sink`] interface for reading and writing to this
134 /// I/O object, using [`Decoder`] and [`Encoder`] to read and write the raw data.
135 ///
136 /// Raw I/O objects work with byte sequences, but higher-level code usually
137 /// wants to batch these into meaningful chunks, called "frames". This
138 /// method layers framing on top of an I/O object, by using the `Codec`
139 /// traits to handle encoding and decoding of messages frames. Note that
140 /// the incoming and outgoing frame types may be distinct.
141 ///
142 /// This function returns a *single* object that is both [`Stream`] and
143 /// [`Sink`]; grouping this into a single object is often useful for layering
144 /// things like gzip or TLS, which require both read and write access to the
145 /// underlying object.
146 ///
147 /// This objects takes a stream and a `readbuffer` and a `writebuffer`. These field
148 /// can be obtained from an existing `Framed` with the [`into_parts`] method.
149 ///
150 /// If you want to work more directly with the streams and sink, consider
151 /// calling [`split`] on the `Framed` returned by this method, which will
152 /// break them into separate objects, allowing them to interact more easily.
153 ///
154 /// [`Stream`]: futures_core::Stream
155 /// [`Sink`]: futures_sink::Sink
156 /// [`Decoder`]: crate::codec::Decoder
157 /// [`Encoder`]: crate::codec::Encoder
158 /// [`into_parts`]: crate::codec::Framed::into_parts()
159 /// [`split`]: https://docs.rs/futures/0.3/futures/stream/trait.StreamExt.html#method.split
160 pub fn from_parts(parts: FramedParts<T, U>) -> Framed<T, U> {
161 Framed {
162 inner: FramedImpl {
163 inner: parts.io,
164 codec: parts.codec,
165 state: RWFrames {
166 read: parts.read_buf.into(),
167 write: parts.write_buf.into(),
168 },
169 },
170 }
171 }
172
173 /// Returns a reference to the underlying I/O stream wrapped by
174 /// `Framed`.
175 ///
176 /// Note that care should be taken to not tamper with the underlying stream
177 /// of data coming in as it may corrupt the stream of frames otherwise
178 /// being worked with.
179 pub fn get_ref(&self) -> &T {
180 &self.inner.inner
181 }
182
183 /// Returns a mutable reference to the underlying I/O stream wrapped by
184 /// `Framed`.
185 ///
186 /// Note that care should be taken to not tamper with the underlying stream
187 /// of data coming in as it may corrupt the stream of frames otherwise
188 /// being worked with.
189 pub fn get_mut(&mut self) -> &mut T {
190 &mut self.inner.inner
191 }
192
193 /// Returns a pinned mutable reference to the underlying I/O stream wrapped by
194 /// `Framed`.
195 ///
196 /// Note that care should be taken to not tamper with the underlying stream
197 /// of data coming in as it may corrupt the stream of frames otherwise
198 /// being worked with.
199 pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut T> {
200 self.project().inner.project().inner
201 }
202
203 /// Returns a reference to the underlying codec wrapped by
204 /// `Framed`.
205 ///
206 /// Note that care should be taken to not tamper with the underlying codec
207 /// as it may corrupt the stream of frames otherwise being worked with.
208 pub fn codec(&self) -> &U {
209 &self.inner.codec
210 }
211
212 /// Returns a mutable reference to the underlying codec wrapped by
213 /// `Framed`.
214 ///
215 /// Note that care should be taken to not tamper with the underlying codec
216 /// as it may corrupt the stream of frames otherwise being worked with.
217 pub fn codec_mut(&mut self) -> &mut U {
218 &mut self.inner.codec
219 }
220
221 /// Maps the codec `U` to `C`, preserving the read and write buffers
222 /// wrapped by `Framed`.
223 ///
224 /// Note that care should be taken to not tamper with the underlying codec
225 /// as it may corrupt the stream of frames otherwise being worked with.
226 pub fn map_codec<C, F>(self, map: F) -> Framed<T, C>
227 where
228 F: FnOnce(U) -> C,
229 {
230 // This could be potentially simplified once rust-lang/rust#86555 hits stable
231 let parts = self.into_parts();
232 Framed::from_parts(FramedParts {
233 io: parts.io,
234 codec: map(parts.codec),
235 read_buf: parts.read_buf,
236 write_buf: parts.write_buf,
237 _priv: (),
238 })
239 }
240
241 /// Returns a mutable reference to the underlying codec wrapped by
242 /// `Framed`.
243 ///
244 /// Note that care should be taken to not tamper with the underlying codec
245 /// as it may corrupt the stream of frames otherwise being worked with.
246 pub fn codec_pin_mut(self: Pin<&mut Self>) -> &mut U {
247 self.project().inner.project().codec
248 }
249
250 /// Returns a reference to the read buffer.
251 pub fn read_buffer(&self) -> &BytesMut {
252 &self.inner.state.read.buffer
253 }
254
255 /// Returns a mutable reference to the read buffer.
256 pub fn read_buffer_mut(&mut self) -> &mut BytesMut {
257 &mut self.inner.state.read.buffer
258 }
259
260 /// Returns a reference to the write buffer.
261 pub fn write_buffer(&self) -> &BytesMut {
262 &self.inner.state.write.buffer
263 }
264
265 /// Returns a mutable reference to the write buffer.
266 pub fn write_buffer_mut(&mut self) -> &mut BytesMut {
267 &mut self.inner.state.write.buffer
268 }
269
270 /// Returns backpressure boundary
271 pub fn backpressure_boundary(&self) -> usize {
272 self.inner.state.write.backpressure_boundary
273 }
274
275 /// Updates backpressure boundary
276 pub fn set_backpressure_boundary(&mut self, boundary: usize) {
277 self.inner.state.write.backpressure_boundary = boundary;
278 }
279
280 /// Consumes the `Framed`, returning its underlying I/O stream.
281 ///
282 /// Note that care should be taken to not tamper with the underlying stream
283 /// of data coming in as it may corrupt the stream of frames otherwise
284 /// being worked with.
285 pub fn into_inner(self) -> T {
286 self.inner.inner
287 }
288
289 /// Consumes the `Framed`, returning its underlying I/O stream, the buffer
290 /// with unprocessed data, and the codec.
291 ///
292 /// Note that care should be taken to not tamper with the underlying stream
293 /// of data coming in as it may corrupt the stream of frames otherwise
294 /// being worked with.
295 pub fn into_parts(self) -> FramedParts<T, U> {
296 FramedParts {
297 io: self.inner.inner,
298 codec: self.inner.codec,
299 read_buf: self.inner.state.read.buffer,
300 write_buf: self.inner.state.write.buffer,
301 _priv: (),
302 }
303 }
304}
305
306// This impl just defers to the underlying FramedImpl
307impl<T, U> Stream for Framed<T, U>
308where
309 T: AsyncRead,
310 U: Decoder,
311{
312 type Item = Result<U::Item, U::Error>;
313
314 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
315 self.project().inner.poll_next(cx)
316 }
317}
318
319// This impl just defers to the underlying FramedImpl
320impl<T, I, U> Sink<I> for Framed<T, U>
321where
322 T: AsyncWrite,
323 U: Encoder<I>,
324 U::Error: From<io::Error>,
325{
326 type Error = U::Error;
327
328 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
329 self.project().inner.poll_ready(cx)
330 }
331
332 fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> {
333 self.project().inner.start_send(item)
334 }
335
336 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
337 self.project().inner.poll_flush(cx)
338 }
339
340 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
341 self.project().inner.poll_close(cx)
342 }
343}
344
345impl<T, U> fmt::Debug for Framed<T, U>
346where
347 T: fmt::Debug,
348 U: fmt::Debug,
349{
350 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
351 f.debug_struct("Framed")
352 .field("io", self.get_ref())
353 .field("codec", self.codec())
354 .finish()
355 }
356}
357
358/// `FramedParts` contains an export of the data of a Framed transport.
359/// It can be used to construct a new [`Framed`] with a different codec.
360/// It contains all current buffers and the inner transport.
361///
362/// [`Framed`]: crate::codec::Framed
363#[derive(Debug)]
364#[allow(clippy::manual_non_exhaustive)]
365pub struct FramedParts<T, U> {
366 /// The inner transport used to read bytes to and write bytes to
367 pub io: T,
368
369 /// The codec
370 pub codec: U,
371
372 /// The buffer with read but unprocessed data.
373 pub read_buf: BytesMut,
374
375 /// A buffer with unprocessed data which are not written yet.
376 pub write_buf: BytesMut,
377
378 /// This private field allows us to add additional fields in the future in a
379 /// backwards compatible way.
380 _priv: (),
381}
382
383impl<T, U> FramedParts<T, U> {
384 /// Create a new, default, `FramedParts`
385 pub fn new<I>(io: T, codec: U) -> FramedParts<T, U>
386 where
387 U: Encoder<I>,
388 {
389 FramedParts {
390 io,
391 codec,
392 read_buf: BytesMut::new(),
393 write_buf: BytesMut::new(),
394 _priv: (),
395 }
396 }
397}