tokio_util/codec/
framed_write.rs1use crate::codec::encoder::Encoder;
2use crate::codec::framed_impl::{FramedImpl, WriteFrame};
3
4use futures_core::Stream;
5use tokio::io::AsyncWrite;
6
7use bytes::BytesMut;
8use futures_sink::Sink;
9use pin_project_lite::pin_project;
10use std::fmt;
11use std::io;
12use std::pin::Pin;
13use std::task::{Context, Poll};
14
15pin_project! {
16 pub struct FramedWrite<T, E> {
31 #[pin]
32 inner: FramedImpl<T, E, WriteFrame>,
33 }
34}
35
36impl<T, E> FramedWrite<T, E>
37where
38 T: AsyncWrite,
39{
40 pub fn new(inner: T, encoder: E) -> FramedWrite<T, E> {
42 FramedWrite {
43 inner: FramedImpl {
44 inner,
45 codec: encoder,
46 state: WriteFrame::default(),
47 },
48 }
49 }
50
51 pub fn with_capacity(inner: T, encoder: E, capacity: usize) -> FramedWrite<T, E> {
54 FramedWrite {
55 inner: FramedImpl {
56 inner,
57 codec: encoder,
58 state: WriteFrame {
59 buffer: BytesMut::with_capacity(capacity),
60 backpressure_boundary: capacity,
61 },
62 },
63 }
64 }
65}
66
67impl<T, E> FramedWrite<T, E> {
68 pub fn get_ref(&self) -> &T {
75 &self.inner.inner
76 }
77
78 pub fn get_mut(&mut self) -> &mut T {
85 &mut self.inner.inner
86 }
87
88 pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut T> {
95 self.project().inner.project().inner
96 }
97
98 pub fn into_inner(self) -> T {
104 self.inner.inner
105 }
106
107 pub fn encoder(&self) -> &E {
109 &self.inner.codec
110 }
111
112 pub fn encoder_mut(&mut self) -> &mut E {
114 &mut self.inner.codec
115 }
116
117 pub fn map_encoder<C, F>(self, map: F) -> FramedWrite<T, C>
120 where
121 F: FnOnce(E) -> C,
122 {
123 let FramedImpl {
125 inner,
126 state,
127 codec,
128 } = self.inner;
129 FramedWrite {
130 inner: FramedImpl {
131 inner,
132 state,
133 codec: map(codec),
134 },
135 }
136 }
137
138 pub fn encoder_pin_mut(self: Pin<&mut Self>) -> &mut E {
140 self.project().inner.project().codec
141 }
142
143 pub fn write_buffer(&self) -> &BytesMut {
145 &self.inner.state.buffer
146 }
147
148 pub fn write_buffer_mut(&mut self) -> &mut BytesMut {
150 &mut self.inner.state.buffer
151 }
152
153 pub fn backpressure_boundary(&self) -> usize {
155 self.inner.state.backpressure_boundary
156 }
157
158 pub fn set_backpressure_boundary(&mut self, boundary: usize) {
160 self.inner.state.backpressure_boundary = boundary;
161 }
162}
163
164impl<T, I, E> Sink<I> for FramedWrite<T, E>
166where
167 T: AsyncWrite,
168 E: Encoder<I>,
169 E::Error: From<io::Error>,
170{
171 type Error = E::Error;
172
173 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
174 self.project().inner.poll_ready(cx)
175 }
176
177 fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> {
178 self.project().inner.start_send(item)
179 }
180
181 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
182 self.project().inner.poll_flush(cx)
183 }
184
185 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
186 self.project().inner.poll_close(cx)
187 }
188}
189
190impl<T, D> Stream for FramedWrite<T, D>
192where
193 T: Stream,
194{
195 type Item = T::Item;
196
197 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
198 self.project().inner.project().inner.poll_next(cx)
199 }
200}
201
202impl<T, U> fmt::Debug for FramedWrite<T, U>
203where
204 T: fmt::Debug,
205 U: fmt::Debug,
206{
207 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
208 f.debug_struct("FramedWrite")
209 .field("inner", &self.get_ref())
210 .field("encoder", &self.encoder())
211 .field("buffer", &self.inner.state.buffer)
212 .finish()
213 }
214}