tokio_util/codec/
framed_write.rs1use crate::codec::encoder::Encoder;
2use crate::codec::framed_impl::{FramedImpl, WriteFrame};
3
4use futures_core::Stream;
5use tokio::io::AsyncWrite;
6
7use bytes::BytesMut;
8use futures_sink::Sink;
9use pin_project_lite::pin_project;
10use std::fmt;
11use std::io;
12use std::pin::Pin;
13use std::task::{Context, Poll};
14
15pin_project! {
16    pub struct FramedWrite<T, E> {
31        #[pin]
32        inner: FramedImpl<T, E, WriteFrame>,
33    }
34}
35
36impl<T, E> FramedWrite<T, E>
37where
38    T: AsyncWrite,
39{
40    pub fn new(inner: T, encoder: E) -> FramedWrite<T, E> {
42        FramedWrite {
43            inner: FramedImpl {
44                inner,
45                codec: encoder,
46                state: WriteFrame::default(),
47            },
48        }
49    }
50
51    pub fn with_capacity(inner: T, encoder: E, capacity: usize) -> FramedWrite<T, E> {
54        FramedWrite {
55            inner: FramedImpl {
56                inner,
57                codec: encoder,
58                state: WriteFrame {
59                    buffer: BytesMut::with_capacity(capacity),
60                    backpressure_boundary: capacity,
61                },
62            },
63        }
64    }
65}
66
67impl<T, E> FramedWrite<T, E> {
68    pub fn get_ref(&self) -> &T {
75        &self.inner.inner
76    }
77
78    pub fn get_mut(&mut self) -> &mut T {
85        &mut self.inner.inner
86    }
87
88    pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut T> {
95        self.project().inner.project().inner
96    }
97
98    pub fn into_inner(self) -> T {
104        self.inner.inner
105    }
106
107    pub fn encoder(&self) -> &E {
109        &self.inner.codec
110    }
111
112    pub fn encoder_mut(&mut self) -> &mut E {
114        &mut self.inner.codec
115    }
116
117    pub fn map_encoder<C, F>(self, map: F) -> FramedWrite<T, C>
120    where
121        F: FnOnce(E) -> C,
122    {
123        let FramedImpl {
125            inner,
126            state,
127            codec,
128        } = self.inner;
129        FramedWrite {
130            inner: FramedImpl {
131                inner,
132                state,
133                codec: map(codec),
134            },
135        }
136    }
137
138    pub fn encoder_pin_mut(self: Pin<&mut Self>) -> &mut E {
140        self.project().inner.project().codec
141    }
142
143    pub fn write_buffer(&self) -> &BytesMut {
145        &self.inner.state.buffer
146    }
147
148    pub fn write_buffer_mut(&mut self) -> &mut BytesMut {
150        &mut self.inner.state.buffer
151    }
152
153    pub fn backpressure_boundary(&self) -> usize {
155        self.inner.state.backpressure_boundary
156    }
157
158    pub fn set_backpressure_boundary(&mut self, boundary: usize) {
160        self.inner.state.backpressure_boundary = boundary;
161    }
162}
163
164impl<T, I, E> Sink<I> for FramedWrite<T, E>
166where
167    T: AsyncWrite,
168    E: Encoder<I>,
169    E::Error: From<io::Error>,
170{
171    type Error = E::Error;
172
173    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
174        self.project().inner.poll_ready(cx)
175    }
176
177    fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> {
178        self.project().inner.start_send(item)
179    }
180
181    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
182        self.project().inner.poll_flush(cx)
183    }
184
185    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
186        self.project().inner.poll_close(cx)
187    }
188}
189
190impl<T, D> Stream for FramedWrite<T, D>
192where
193    T: Stream,
194{
195    type Item = T::Item;
196
197    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
198        self.project().inner.project().inner.poll_next(cx)
199    }
200}
201
202impl<T, U> fmt::Debug for FramedWrite<T, U>
203where
204    T: fmt::Debug,
205    U: fmt::Debug,
206{
207    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
208        f.debug_struct("FramedWrite")
209            .field("inner", &self.get_ref())
210            .field("encoder", &self.encoder())
211            .field("buffer", &self.inner.state.buffer)
212            .finish()
213    }
214}