isahc/body/
sync.rs

1use super::AsyncBody;
2use futures_lite::{future::yield_now, io::AsyncWriteExt};
3use sluice::pipe::{pipe, PipeWriter};
4use std::{
5    borrow::Cow,
6    fmt,
7    fs::File,
8    io::{Cursor, ErrorKind, Read, Result},
9};
10
11/// Contains the body of a synchronous HTTP request or response.
12///
13/// This type is used to encapsulate the underlying stream or region of memory
14/// where the contents of the body are stored. A [`Body`] can be created from
15/// many types of sources using the [`Into`](std::convert::Into) trait or one of
16/// its constructor functions. It can also be created from anything that
17/// implements [`Read`], which [`Body`] itself also implements.
18///
19/// For asynchronous requests, use [`AsyncBody`] instead.
20pub struct Body(Inner);
21
22enum Inner {
23    Empty,
24    Buffer(Cursor<Cow<'static, [u8]>>),
25    Reader(Box<dyn Read + Send + Sync>, Option<u64>),
26}
27
28impl Body {
29    /// Create a new empty body.
30    ///
31    /// An empty body represents the *absence* of a body, which is semantically
32    /// different than the presence of a body of zero length.
33    pub const fn empty() -> Self {
34        Self(Inner::Empty)
35    }
36
37    /// Create a new body from a potentially static byte buffer.
38    ///
39    /// The body will have a known length equal to the number of bytes given.
40    ///
41    /// This will try to prevent a copy if the type passed in can be re-used,
42    /// otherwise the buffer will be copied first. This method guarantees to not
43    /// require a copy for the following types:
44    ///
45    /// - `&'static [u8]`
46    /// - `&'static str`
47    ///
48    /// # Examples
49    ///
50    /// ```
51    /// use isahc::Body;
52    ///
53    /// // Create a body from a static string.
54    /// let body = Body::from_bytes_static("hello world");
55    /// ```
56    #[inline]
57    pub fn from_bytes_static<B>(bytes: B) -> Self
58    where
59        B: AsRef<[u8]> + 'static,
60    {
61        castaway::match_type!(bytes, {
62            Cursor<Cow<'static, [u8]>> as bytes => Self(Inner::Buffer(bytes)),
63            Vec<u8> as bytes => Self::from(bytes),
64            String as bytes => Self::from(bytes.into_bytes()),
65            bytes => Self::from(bytes.as_ref().to_vec()),
66        })
67    }
68
69    /// Create a streaming body that reads from the given reader.
70    ///
71    /// The body will have an unknown length. When used as a request body,
72    /// [chunked transfer
73    /// encoding](https://tools.ietf.org/html/rfc7230#section-4.1) might be used
74    /// to send the request.
75    pub fn from_reader<R>(reader: R) -> Self
76    where
77        R: Read + Send + Sync + 'static,
78    {
79        Self(Inner::Reader(Box::new(reader), None))
80    }
81
82    /// Create a streaming body with a known length.
83    ///
84    /// If the size of the body is known in advance, such as with a file, then
85    /// this function can be used to create a body that can determine its
86    /// `Content-Length` while still reading the bytes asynchronously.
87    ///
88    /// Giving a value for `length` that doesn't actually match how much data
89    /// the reader will produce may result in errors when sending the body in a
90    /// request.
91    pub fn from_reader_sized<R>(reader: R, length: u64) -> Self
92    where
93        R: Read + Send + Sync + 'static,
94    {
95        Self(Inner::Reader(Box::new(reader), Some(length)))
96    }
97
98    /// Report if this body is empty.
99    ///
100    /// This is not necessarily the same as checking for `self.len() ==
101    /// Some(0)`. Since HTTP message bodies are optional, there is a semantic
102    /// difference between the absence of a body and the presence of a
103    /// zero-length body. This method will only return `true` for the former.
104    pub fn is_empty(&self) -> bool {
105        match self.0 {
106            Inner::Empty => true,
107            _ => false,
108        }
109    }
110
111    /// Get the size of the body, if known.
112    ///
113    /// The value reported by this method is used to set the `Content-Length`
114    /// for outgoing requests.
115    ///
116    /// When coming from a response, this method will report the value of the
117    /// `Content-Length` response header if present. If this method returns
118    /// `None` then there's a good chance that the server used something like
119    /// chunked transfer encoding to send the response body.
120    ///
121    /// Since the length may be determined totally separately from the actual
122    /// bytes, even if a value is returned it should not be relied on as always
123    /// being accurate, and should be treated as a "hint".
124    pub fn len(&self) -> Option<u64> {
125        match &self.0 {
126            Inner::Empty => Some(0),
127            Inner::Buffer(bytes) => Some(bytes.get_ref().len() as u64),
128            Inner::Reader(_, len) => *len,
129        }
130    }
131
132    /// If this body is repeatable, reset the body stream back to the start of
133    /// the content. Returns `false` if the body cannot be reset.
134    pub fn reset(&mut self) -> bool {
135        match &mut self.0 {
136            Inner::Empty => true,
137            Inner::Buffer(cursor) => {
138                cursor.set_position(0);
139                true
140            }
141            _ => false,
142        }
143    }
144
145    /// Convert this body into an asynchronous one.
146    ///
147    /// Turning a synchronous operation into an asynchronous one can be quite
148    /// the challenge, so this method is used internally only for limited
149    /// scenarios in which this can work. If this body is an in-memory buffer,
150    /// then the translation is trivial.
151    ///
152    /// If this body was created from an underlying synchronous reader, then we
153    /// create a temporary asynchronous pipe and return a [`Writer`] which will
154    /// copy the bytes from the reader to the writing half of the pipe in a
155    /// blocking fashion.
156    pub(crate) fn into_async(self) -> (AsyncBody, Option<Writer>) {
157        match self.0 {
158            Inner::Empty => (AsyncBody::empty(), None),
159            Inner::Buffer(cursor) => (AsyncBody::from_bytes_static(cursor.into_inner()), None),
160            Inner::Reader(reader, len) => {
161                let (pipe_reader, writer) = pipe();
162
163                (
164                    if let Some(len) = len {
165                        AsyncBody::from_reader_sized(pipe_reader, len)
166                    } else {
167                        AsyncBody::from_reader(pipe_reader)
168                    },
169                    Some(Writer {
170                        reader,
171                        writer,
172                    }),
173                )
174            }
175        }
176    }
177}
178
179impl Read for Body {
180    fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
181        match &mut self.0 {
182            Inner::Empty => Ok(0),
183            Inner::Buffer(cursor) => cursor.read(buf),
184            Inner::Reader(reader, _) => reader.read(buf),
185        }
186    }
187}
188
189impl Default for Body {
190    fn default() -> Self {
191        Self::empty()
192    }
193}
194
195impl From<()> for Body {
196    fn from(_: ()) -> Self {
197        Self::empty()
198    }
199}
200
201impl From<Vec<u8>> for Body {
202    fn from(body: Vec<u8>) -> Self {
203        Self(Inner::Buffer(Cursor::new(Cow::Owned(body))))
204    }
205}
206
207impl From<&'_ [u8]> for Body {
208    fn from(body: &[u8]) -> Self {
209        body.to_vec().into()
210    }
211}
212
213impl From<String> for Body {
214    fn from(body: String) -> Self {
215        body.into_bytes().into()
216    }
217}
218
219impl From<&'_ str> for Body {
220    fn from(body: &str) -> Self {
221        body.as_bytes().into()
222    }
223}
224
225impl From<File> for Body {
226    fn from(file: File) -> Self {
227        if let Ok(metadata) = file.metadata() {
228            Self::from_reader_sized(file, metadata.len())
229        } else {
230            Self::from_reader(file)
231        }
232    }
233}
234
235impl fmt::Debug for Body {
236    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
237        match self.len() {
238            Some(len) => write!(f, "Body({})", len),
239            None => write!(f, "Body(?)"),
240        }
241    }
242}
243
244/// Helper struct for writing a synchronous reader into an asynchronous pipe.
245pub(crate) struct Writer {
246    reader: Box<dyn Read + Send + Sync>,
247    writer: PipeWriter,
248}
249
250impl Writer {
251    /// The size of the temporary buffer to use for writing. Larger buffers can
252    /// improve performance, but at the cost of more memory.
253    ///
254    /// Curl's internal buffer size just happens to default to 16 KiB as well,
255    /// so this is a natural choice.
256    const BUF_SIZE: usize = 16384;
257
258    /// Write the response body from the synchronous reader.
259    ///
260    /// While this function is async, it isn't a well-behaved one as it blocks
261    /// frequently while reading from the request body reader. As long as this
262    /// method is invoked in a controlled environment within a thread dedicated
263    /// to blocking operations, this is OK.
264    pub(crate) async fn write(&mut self) -> Result<()> {
265        let mut buf = [0; Self::BUF_SIZE];
266
267        loop {
268            let len = match self.reader.read(&mut buf) {
269                Ok(0) => return Ok(()),
270                Ok(len) => len,
271                Err(e) if e.kind() == ErrorKind::Interrupted => {
272                    yield_now().await;
273                    continue;
274                }
275                Err(e) => return Err(e),
276            };
277
278            self.writer.write_all(&buf[..len]).await?;
279        }
280    }
281}
282
283#[cfg(test)]
284mod tests {
285    use super::*;
286
287    static_assertions::assert_impl_all!(Body: Send, Sync);
288
289    #[test]
290    fn empty_body() {
291        let body = Body::empty();
292
293        assert!(body.is_empty());
294        assert_eq!(body.len(), Some(0));
295    }
296
297    #[test]
298    fn zero_length_body() {
299        let body = Body::from(vec![]);
300
301        assert!(!body.is_empty());
302        assert_eq!(body.len(), Some(0));
303    }
304
305    #[test]
306    fn reader_with_unknown_length() {
307        let body = Body::from_reader(std::io::empty());
308
309        assert!(!body.is_empty());
310        assert_eq!(body.len(), None);
311    }
312
313    #[test]
314    fn reader_with_known_length() {
315        let body = Body::from_reader_sized(std::io::empty(), 0);
316
317        assert!(!body.is_empty());
318        assert_eq!(body.len(), Some(0));
319    }
320
321    #[test]
322    fn reset_memory_body() {
323        let mut body = Body::from("hello world");
324        let mut buf = String::new();
325
326        assert_eq!(body.read_to_string(&mut buf).unwrap(), 11);
327        assert_eq!(buf, "hello world");
328        assert!(body.reset());
329        assert_eq!(body.read_to_string(&mut buf).unwrap(), 11);
330        assert_eq!(buf, "hello worldhello world");
331    }
332
333    #[test]
334    fn cannot_reset_reader() {
335        let mut body = Body::from_reader(std::io::empty());
336
337        assert_eq!(body.reset(), false);
338    }
339}