sluice/pipe/
chunked.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
//! Generally a ring buffer is an efficient and appropriate data structure for
//! asynchronously transmitting a stream of bytes between two threads that also
//! gives you control over memory allocation to avoid consuming an unknown
//! amount of memory. Setting a fixed memory limit also gives you a degree of
//! flow control if the producer ends up being faster than the consumer.
//!
//! But for some use cases a ring buffer will not work if an application uses
//! its own internal buffer management and requires you to consume either all of
//! a "chunk" of bytes, or none of it.
//!
//! Because of these constraints, instead we use a quite unique type of buffer
//! that uses a fixed number of growable buffers that are exchanged back and
//! forth between a producer and a consumer. Since each buffer is a vector, it
//! can grow to whatever size is required of it in order to fit a single chunk.
//!
//! To avoid the constant allocation overhead of creating a new buffer for every
//! chunk, after a consumer finishes reading from a buffer, it returns the
//! buffer to the producer over a channel to be reused. The number of buffers
//! available in this system is fixed at creation time, so the only allocations
//! that happen during reads and writes are occasional reallocation for each
//! individual vector to fit larger chunks of bytes that don't already fit.

use async_channel::{bounded, Sender, Receiver};
use futures_core::{FusedStream, Stream};
use futures_io::{AsyncBufRead, AsyncRead, AsyncWrite};
use std::{
    io,
    io::{BufRead, Cursor, Write},
    pin::Pin,
    task::{Context, Poll},
};

/// Create a new chunked pipe with room for a fixed number of chunks.
///
/// The `count` parameter sets how many buffers are available in the pipe at
/// once. Smaller values will reduce the number of allocations and reallocations
/// may be required when writing and reduce overall memory usage. Larger values
/// reduce the amount of waiting done between chunks if you have a producer and
/// consumer that run at different speeds.
///
/// If `count` is set to 1, then the pipe is essentially serial, since only the
/// reader or writer can operate on the single buffer at one time and cannot be
/// run in parallel.
pub(crate) fn new(count: usize) -> (Reader, Writer) {
    let (buf_pool_tx, buf_pool_rx) = bounded(count);
    let (buf_stream_tx, buf_stream_rx) = bounded(count);

    // Fill up the buffer pool.
    for _ in 0..count {
        buf_pool_tx
            .try_send(Cursor::new(Vec::new()))
            .expect("buffer pool overflow");
    }

    let reader = Reader {
        buf_pool_tx,
        buf_stream_rx,
        chunk: None,
    };

    let writer = Writer {
        buf_pool_rx,
        buf_stream_tx,
    };

    (reader, writer)
}

/// The reading half of a chunked pipe.
pub(crate) struct Reader {
    /// A channel of incoming chunks from the writer.
    buf_pool_tx: Sender<Cursor<Vec<u8>>>,

    /// A channel of chunk buffers that have been consumed and can be reused.
    buf_stream_rx: Receiver<Cursor<Vec<u8>>>,

    /// A chunk currently being read from.
    chunk: Option<Cursor<Vec<u8>>>,
}

impl AsyncRead for Reader {
    fn poll_read(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        mut buf: &mut [u8],
    ) -> Poll<io::Result<usize>> {
        // Read into the internal buffer.
        match self.as_mut().poll_fill_buf(cx)? {
            // Not quite ready yet.
            Poll::Pending => Poll::Pending,

            // A chunk is available.
            Poll::Ready(chunk) => {
                // Copy as much of the chunk as we can to the destination
                // buffer.
                let amt = buf.write(chunk)?;

                // Mark however much was successfully copied as being consumed.
                self.consume(amt);

                Poll::Ready(Ok(amt))
            }
        }
    }
}

impl AsyncBufRead for Reader {
    fn poll_fill_buf(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
        // If the current chunk is consumed, first return it to the writer for
        // reuse.
        if let Some(chunk) = self.chunk.as_ref() {
            if chunk.position() >= chunk.get_ref().len() as u64 {
                let mut chunk = self.chunk.take().unwrap();
                chunk.set_position(0);
                chunk.get_mut().clear();

                if let Err(e) = self.buf_pool_tx.try_send(chunk) {
                    // We pre-fill the buffer pool channel with an exact number
                    // of buffers, so this can never happen.
                    if e.is_full() {
                        panic!("buffer pool overflow")
                    }
                    // If the writer disconnects, then we'll just discard this
                    // buffer and any subsequent buffers until we've read
                    // everything still in the pipe.
                    else if e.is_closed() {
                        // Nothing!
                    }
                    // Some other error occurred.
                    else {
                        return Poll::Ready(Err(io::ErrorKind::BrokenPipe.into()));
                    }
                }
            }
        }

        // If we have no current chunk, then attempt to read one.
        if self.chunk.is_none() {
            // If the stream has terminated, then do not poll it again.
            if self.buf_stream_rx.is_terminated() {
                return Poll::Ready(Ok(&[]));
            }

            match Pin::new(&mut self.buf_stream_rx).poll_next(cx) {
                // Wait for a new chunk to be delivered.
                Poll::Pending => return Poll::Pending,

                // Pipe has closed, so return EOF.
                Poll::Ready(None) => return Poll::Ready(Ok(&[])),

                // Accept the new chunk.
                Poll::Ready(buf) => self.chunk = buf,
            }
        }

        // Return the current chunk, if any, as the buffer.
        #[allow(unsafe_code)]
        Poll::Ready(match unsafe { self.get_unchecked_mut().chunk.as_mut() } {
            Some(chunk) => chunk.fill_buf(),
            None => Ok(&[]),
        })
    }

    fn consume(mut self: Pin<&mut Self>, amt: usize) {
        if let Some(chunk) = self.chunk.as_mut() {
            // Consume the requested amount from the current chunk.
            chunk.consume(amt);
        }
    }
}

impl Drop for Reader {
    fn drop(&mut self) {
        // Ensure we close the primary stream first before the pool stream so
        // that the writer knows the pipe is closed before trying to poll the
        // pool channel.
        self.buf_stream_rx.close();
        self.buf_pool_tx.close();
    }
}

/// Writing half of a chunked pipe.
pub(crate) struct Writer {
    /// A channel of chunks to send to the reader.
    buf_pool_rx: Receiver<Cursor<Vec<u8>>>,

    /// A channel of incoming buffers to write chunks to.
    buf_stream_tx: Sender<Cursor<Vec<u8>>>,
}

impl AsyncWrite for Writer {
    fn poll_write(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &[u8],
    ) -> Poll<io::Result<usize>> {
        // If the reading end of the pipe is closed then return an error now,
        // otherwise we'd be spending time writing the entire buffer only to
        // discover that it is closed afterward.
        if self.buf_stream_tx.is_closed() {
            return Poll::Ready(Err(io::ErrorKind::BrokenPipe.into()));
        }

        // Do not send empty buffers through the rotation.
        if buf.is_empty() {
            return Poll::Ready(Ok(0));
        }

        // Attempt to grab an available buffer to write the chunk to.
        match Pin::new(&mut self.buf_pool_rx).poll_next(cx) {
            // Wait for the reader to finish reading a chunk.
            Poll::Pending => Poll::Pending,

            // Pipe has closed.
            Poll::Ready(None) => Poll::Ready(Err(io::ErrorKind::BrokenPipe.into())),

            // An available buffer has been found.
            Poll::Ready(Some(mut chunk)) => {
                // Write the buffer to the chunk.
                chunk.get_mut().extend_from_slice(buf);

                // Send the chunk to the reader.
                match self.buf_stream_tx.try_send(chunk) {
                    Ok(()) => Poll::Ready(Ok(buf.len())),

                    Err(e) => {
                        if e.is_full() {
                            panic!("buffer pool overflow")
                        } else {
                            Poll::Ready(Err(io::ErrorKind::BrokenPipe.into()))
                        }
                    }
                }
            }
        }
    }

    fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
        Poll::Ready(Ok(()))
    }

    fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
        self.buf_stream_tx.close();
        Poll::Ready(Ok(()))
    }
}