isahc/body/sync.rs
1use super::AsyncBody;
2use futures_lite::{future::yield_now, io::AsyncWriteExt};
3use sluice::pipe::{pipe, PipeWriter};
4use std::{
5 borrow::Cow,
6 fmt,
7 fs::File,
8 io::{Cursor, ErrorKind, Read, Result},
9};
10
11/// Contains the body of a synchronous HTTP request or response.
12///
13/// This type is used to encapsulate the underlying stream or region of memory
14/// where the contents of the body are stored. A [`Body`] can be created from
15/// many types of sources using the [`Into`](std::convert::Into) trait or one of
16/// its constructor functions. It can also be created from anything that
17/// implements [`Read`], which [`Body`] itself also implements.
18///
19/// For asynchronous requests, use [`AsyncBody`] instead.
20pub struct Body(Inner);
21
22enum Inner {
23 Empty,
24 Buffer(Cursor<Cow<'static, [u8]>>),
25 Reader(Box<dyn Read + Send + Sync>, Option<u64>),
26}
27
28impl Body {
29 /// Create a new empty body.
30 ///
31 /// An empty body represents the *absence* of a body, which is semantically
32 /// different than the presence of a body of zero length.
33 pub const fn empty() -> Self {
34 Self(Inner::Empty)
35 }
36
37 /// Create a new body from a potentially static byte buffer.
38 ///
39 /// The body will have a known length equal to the number of bytes given.
40 ///
41 /// This will try to prevent a copy if the type passed in can be re-used,
42 /// otherwise the buffer will be copied first. This method guarantees to not
43 /// require a copy for the following types:
44 ///
45 /// - `&'static [u8]`
46 /// - `&'static str`
47 ///
48 /// # Examples
49 ///
50 /// ```
51 /// use isahc::Body;
52 ///
53 /// // Create a body from a static string.
54 /// let body = Body::from_bytes_static("hello world");
55 /// ```
56 #[inline]
57 pub fn from_bytes_static<B>(bytes: B) -> Self
58 where
59 B: AsRef<[u8]> + 'static,
60 {
61 castaway::match_type!(bytes, {
62 Cursor<Cow<'static, [u8]>> as bytes => Self(Inner::Buffer(bytes)),
63 Vec<u8> as bytes => Self::from(bytes),
64 String as bytes => Self::from(bytes.into_bytes()),
65 bytes => Self::from(bytes.as_ref().to_vec()),
66 })
67 }
68
69 /// Create a streaming body that reads from the given reader.
70 ///
71 /// The body will have an unknown length. When used as a request body,
72 /// [chunked transfer
73 /// encoding](https://tools.ietf.org/html/rfc7230#section-4.1) might be used
74 /// to send the request.
75 pub fn from_reader<R>(reader: R) -> Self
76 where
77 R: Read + Send + Sync + 'static,
78 {
79 Self(Inner::Reader(Box::new(reader), None))
80 }
81
82 /// Create a streaming body with a known length.
83 ///
84 /// If the size of the body is known in advance, such as with a file, then
85 /// this function can be used to create a body that can determine its
86 /// `Content-Length` while still reading the bytes asynchronously.
87 ///
88 /// Giving a value for `length` that doesn't actually match how much data
89 /// the reader will produce may result in errors when sending the body in a
90 /// request.
91 pub fn from_reader_sized<R>(reader: R, length: u64) -> Self
92 where
93 R: Read + Send + Sync + 'static,
94 {
95 Self(Inner::Reader(Box::new(reader), Some(length)))
96 }
97
98 /// Report if this body is empty.
99 ///
100 /// This is not necessarily the same as checking for `self.len() ==
101 /// Some(0)`. Since HTTP message bodies are optional, there is a semantic
102 /// difference between the absence of a body and the presence of a
103 /// zero-length body. This method will only return `true` for the former.
104 pub fn is_empty(&self) -> bool {
105 match self.0 {
106 Inner::Empty => true,
107 _ => false,
108 }
109 }
110
111 /// Get the size of the body, if known.
112 ///
113 /// The value reported by this method is used to set the `Content-Length`
114 /// for outgoing requests.
115 ///
116 /// When coming from a response, this method will report the value of the
117 /// `Content-Length` response header if present. If this method returns
118 /// `None` then there's a good chance that the server used something like
119 /// chunked transfer encoding to send the response body.
120 ///
121 /// Since the length may be determined totally separately from the actual
122 /// bytes, even if a value is returned it should not be relied on as always
123 /// being accurate, and should be treated as a "hint".
124 pub fn len(&self) -> Option<u64> {
125 match &self.0 {
126 Inner::Empty => Some(0),
127 Inner::Buffer(bytes) => Some(bytes.get_ref().len() as u64),
128 Inner::Reader(_, len) => *len,
129 }
130 }
131
132 /// If this body is repeatable, reset the body stream back to the start of
133 /// the content. Returns `false` if the body cannot be reset.
134 pub fn reset(&mut self) -> bool {
135 match &mut self.0 {
136 Inner::Empty => true,
137 Inner::Buffer(cursor) => {
138 cursor.set_position(0);
139 true
140 }
141 _ => false,
142 }
143 }
144
145 /// Convert this body into an asynchronous one.
146 ///
147 /// Turning a synchronous operation into an asynchronous one can be quite
148 /// the challenge, so this method is used internally only for limited
149 /// scenarios in which this can work. If this body is an in-memory buffer,
150 /// then the translation is trivial.
151 ///
152 /// If this body was created from an underlying synchronous reader, then we
153 /// create a temporary asynchronous pipe and return a [`Writer`] which will
154 /// copy the bytes from the reader to the writing half of the pipe in a
155 /// blocking fashion.
156 pub(crate) fn into_async(self) -> (AsyncBody, Option<Writer>) {
157 match self.0 {
158 Inner::Empty => (AsyncBody::empty(), None),
159 Inner::Buffer(cursor) => (AsyncBody::from_bytes_static(cursor.into_inner()), None),
160 Inner::Reader(reader, len) => {
161 let (pipe_reader, writer) = pipe();
162
163 (
164 if let Some(len) = len {
165 AsyncBody::from_reader_sized(pipe_reader, len)
166 } else {
167 AsyncBody::from_reader(pipe_reader)
168 },
169 Some(Writer {
170 reader,
171 writer,
172 }),
173 )
174 }
175 }
176 }
177}
178
179impl Read for Body {
180 fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
181 match &mut self.0 {
182 Inner::Empty => Ok(0),
183 Inner::Buffer(cursor) => cursor.read(buf),
184 Inner::Reader(reader, _) => reader.read(buf),
185 }
186 }
187}
188
189impl Default for Body {
190 fn default() -> Self {
191 Self::empty()
192 }
193}
194
195impl From<()> for Body {
196 fn from(_: ()) -> Self {
197 Self::empty()
198 }
199}
200
201impl From<Vec<u8>> for Body {
202 fn from(body: Vec<u8>) -> Self {
203 Self(Inner::Buffer(Cursor::new(Cow::Owned(body))))
204 }
205}
206
207impl From<&'_ [u8]> for Body {
208 fn from(body: &[u8]) -> Self {
209 body.to_vec().into()
210 }
211}
212
213impl From<String> for Body {
214 fn from(body: String) -> Self {
215 body.into_bytes().into()
216 }
217}
218
219impl From<&'_ str> for Body {
220 fn from(body: &str) -> Self {
221 body.as_bytes().into()
222 }
223}
224
225impl From<File> for Body {
226 fn from(file: File) -> Self {
227 if let Ok(metadata) = file.metadata() {
228 Self::from_reader_sized(file, metadata.len())
229 } else {
230 Self::from_reader(file)
231 }
232 }
233}
234
235impl fmt::Debug for Body {
236 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
237 match self.len() {
238 Some(len) => write!(f, "Body({})", len),
239 None => write!(f, "Body(?)"),
240 }
241 }
242}
243
244/// Helper struct for writing a synchronous reader into an asynchronous pipe.
245pub(crate) struct Writer {
246 reader: Box<dyn Read + Send + Sync>,
247 writer: PipeWriter,
248}
249
250impl Writer {
251 /// The size of the temporary buffer to use for writing. Larger buffers can
252 /// improve performance, but at the cost of more memory.
253 ///
254 /// Curl's internal buffer size just happens to default to 16 KiB as well,
255 /// so this is a natural choice.
256 const BUF_SIZE: usize = 16384;
257
258 /// Write the response body from the synchronous reader.
259 ///
260 /// While this function is async, it isn't a well-behaved one as it blocks
261 /// frequently while reading from the request body reader. As long as this
262 /// method is invoked in a controlled environment within a thread dedicated
263 /// to blocking operations, this is OK.
264 pub(crate) async fn write(&mut self) -> Result<()> {
265 let mut buf = [0; Self::BUF_SIZE];
266
267 loop {
268 let len = match self.reader.read(&mut buf) {
269 Ok(0) => return Ok(()),
270 Ok(len) => len,
271 Err(e) if e.kind() == ErrorKind::Interrupted => {
272 yield_now().await;
273 continue;
274 }
275 Err(e) => return Err(e),
276 };
277
278 self.writer.write_all(&buf[..len]).await?;
279 }
280 }
281}
282
283#[cfg(test)]
284mod tests {
285 use super::*;
286
287 static_assertions::assert_impl_all!(Body: Send, Sync);
288
289 #[test]
290 fn empty_body() {
291 let body = Body::empty();
292
293 assert!(body.is_empty());
294 assert_eq!(body.len(), Some(0));
295 }
296
297 #[test]
298 fn zero_length_body() {
299 let body = Body::from(vec![]);
300
301 assert!(!body.is_empty());
302 assert_eq!(body.len(), Some(0));
303 }
304
305 #[test]
306 fn reader_with_unknown_length() {
307 let body = Body::from_reader(std::io::empty());
308
309 assert!(!body.is_empty());
310 assert_eq!(body.len(), None);
311 }
312
313 #[test]
314 fn reader_with_known_length() {
315 let body = Body::from_reader_sized(std::io::empty(), 0);
316
317 assert!(!body.is_empty());
318 assert_eq!(body.len(), Some(0));
319 }
320
321 #[test]
322 fn reset_memory_body() {
323 let mut body = Body::from("hello world");
324 let mut buf = String::new();
325
326 assert_eq!(body.read_to_string(&mut buf).unwrap(), 11);
327 assert_eq!(buf, "hello world");
328 assert!(body.reset());
329 assert_eq!(body.read_to_string(&mut buf).unwrap(), 11);
330 assert_eq!(buf, "hello worldhello world");
331 }
332
333 #[test]
334 fn cannot_reset_reader() {
335 let mut body = Body::from_reader(std::io::empty());
336
337 assert_eq!(body.reset(), false);
338 }
339}