tokio_util/codec/
framed_write.rs

1use crate::codec::encoder::Encoder;
2use crate::codec::framed_impl::{FramedImpl, WriteFrame};
3
4use futures_core::Stream;
5use tokio::io::AsyncWrite;
6
7use bytes::BytesMut;
8use futures_sink::Sink;
9use pin_project_lite::pin_project;
10use std::fmt;
11use std::io;
12use std::pin::Pin;
13use std::task::{Context, Poll};
14
15pin_project! {
16    /// A [`Sink`] of frames encoded to an `AsyncWrite`.
17    ///
18    /// For examples of how to use `FramedWrite` with a codec, see the
19    /// examples on the [`codec`] module.
20    ///
21    /// # Cancellation safety
22    ///
23    /// * [`futures_util::sink::SinkExt::send`]: if send is used as the event in a
24    /// `tokio::select!` statement and some other branch completes first, then it is
25    /// guaranteed that the message was not sent, but the message itself is lost.
26    ///
27    /// [`Sink`]: futures_sink::Sink
28    /// [`codec`]: crate::codec
29    /// [`futures_util::sink::SinkExt::send`]: futures_util::sink::SinkExt::send
30    pub struct FramedWrite<T, E> {
31        #[pin]
32        inner: FramedImpl<T, E, WriteFrame>,
33    }
34}
35
36impl<T, E> FramedWrite<T, E>
37where
38    T: AsyncWrite,
39{
40    /// Creates a new `FramedWrite` with the given `encoder`.
41    pub fn new(inner: T, encoder: E) -> FramedWrite<T, E> {
42        FramedWrite {
43            inner: FramedImpl {
44                inner,
45                codec: encoder,
46                state: WriteFrame::default(),
47            },
48        }
49    }
50
51    /// Creates a new `FramedWrite` with the given `encoder` and a buffer of `capacity`
52    /// initial size.
53    pub fn with_capacity(inner: T, encoder: E, capacity: usize) -> FramedWrite<T, E> {
54        FramedWrite {
55            inner: FramedImpl {
56                inner,
57                codec: encoder,
58                state: WriteFrame {
59                    buffer: BytesMut::with_capacity(capacity),
60                    backpressure_boundary: capacity,
61                },
62            },
63        }
64    }
65}
66
67impl<T, E> FramedWrite<T, E> {
68    /// Returns a reference to the underlying I/O stream wrapped by
69    /// `FramedWrite`.
70    ///
71    /// Note that care should be taken to not tamper with the underlying stream
72    /// of data coming in as it may corrupt the stream of frames otherwise
73    /// being worked with.
74    pub fn get_ref(&self) -> &T {
75        &self.inner.inner
76    }
77
78    /// Returns a mutable reference to the underlying I/O stream wrapped by
79    /// `FramedWrite`.
80    ///
81    /// Note that care should be taken to not tamper with the underlying stream
82    /// of data coming in as it may corrupt the stream of frames otherwise
83    /// being worked with.
84    pub fn get_mut(&mut self) -> &mut T {
85        &mut self.inner.inner
86    }
87
88    /// Returns a pinned mutable reference to the underlying I/O stream wrapped by
89    /// `FramedWrite`.
90    ///
91    /// Note that care should be taken to not tamper with the underlying stream
92    /// of data coming in as it may corrupt the stream of frames otherwise
93    /// being worked with.
94    pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut T> {
95        self.project().inner.project().inner
96    }
97
98    /// Consumes the `FramedWrite`, returning its underlying I/O stream.
99    ///
100    /// Note that care should be taken to not tamper with the underlying stream
101    /// of data coming in as it may corrupt the stream of frames otherwise
102    /// being worked with.
103    pub fn into_inner(self) -> T {
104        self.inner.inner
105    }
106
107    /// Returns a reference to the underlying encoder.
108    pub fn encoder(&self) -> &E {
109        &self.inner.codec
110    }
111
112    /// Returns a mutable reference to the underlying encoder.
113    pub fn encoder_mut(&mut self) -> &mut E {
114        &mut self.inner.codec
115    }
116
117    /// Maps the encoder `E` to `C`, preserving the write buffer
118    /// wrapped by `Framed`.
119    pub fn map_encoder<C, F>(self, map: F) -> FramedWrite<T, C>
120    where
121        F: FnOnce(E) -> C,
122    {
123        // This could be potentially simplified once rust-lang/rust#86555 hits stable
124        let FramedImpl {
125            inner,
126            state,
127            codec,
128        } = self.inner;
129        FramedWrite {
130            inner: FramedImpl {
131                inner,
132                state,
133                codec: map(codec),
134            },
135        }
136    }
137
138    /// Returns a mutable reference to the underlying encoder.
139    pub fn encoder_pin_mut(self: Pin<&mut Self>) -> &mut E {
140        self.project().inner.project().codec
141    }
142
143    /// Returns a reference to the write buffer.
144    pub fn write_buffer(&self) -> &BytesMut {
145        &self.inner.state.buffer
146    }
147
148    /// Returns a mutable reference to the write buffer.
149    pub fn write_buffer_mut(&mut self) -> &mut BytesMut {
150        &mut self.inner.state.buffer
151    }
152
153    /// Returns backpressure boundary
154    pub fn backpressure_boundary(&self) -> usize {
155        self.inner.state.backpressure_boundary
156    }
157
158    /// Updates backpressure boundary
159    pub fn set_backpressure_boundary(&mut self, boundary: usize) {
160        self.inner.state.backpressure_boundary = boundary;
161    }
162}
163
164// This impl just defers to the underlying FramedImpl
165impl<T, I, E> Sink<I> for FramedWrite<T, E>
166where
167    T: AsyncWrite,
168    E: Encoder<I>,
169    E::Error: From<io::Error>,
170{
171    type Error = E::Error;
172
173    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
174        self.project().inner.poll_ready(cx)
175    }
176
177    fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> {
178        self.project().inner.start_send(item)
179    }
180
181    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
182        self.project().inner.poll_flush(cx)
183    }
184
185    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
186        self.project().inner.poll_close(cx)
187    }
188}
189
190// This impl just defers to the underlying T: Stream
191impl<T, D> Stream for FramedWrite<T, D>
192where
193    T: Stream,
194{
195    type Item = T::Item;
196
197    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
198        self.project().inner.project().inner.poll_next(cx)
199    }
200}
201
202impl<T, U> fmt::Debug for FramedWrite<T, U>
203where
204    T: fmt::Debug,
205    U: fmt::Debug,
206{
207    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
208        f.debug_struct("FramedWrite")
209            .field("inner", &self.get_ref())
210            .field("encoder", &self.encoder())
211            .field("buffer", &self.inner.state.buffer)
212            .finish()
213    }
214}