1use futures_lite::io::{AsyncRead, BlockOn};
4use std::{
5 borrow::Cow,
6 fmt,
7 io::{self, Cursor, Read},
8 pin::Pin,
9 str,
10 task::{Context, Poll},
11};
12
13mod sync;
14
15#[allow(unreachable_pub)]
16pub use sync::Body;
17
18pub struct AsyncBody(Inner);
32
33enum Inner {
35 Empty,
37
38 Buffer(Cursor<Cow<'static, [u8]>>),
40
41 Reader(Pin<Box<dyn AsyncRead + Send + Sync>>, Option<u64>),
43}
44
45impl AsyncBody {
46 pub const fn empty() -> Self {
51 Self(Inner::Empty)
52 }
53
54 #[inline]
74 pub fn from_bytes_static<B>(bytes: B) -> Self
75 where
76 B: AsRef<[u8]> + 'static,
77 {
78 castaway::match_type!(bytes, {
79 Cursor<Cow<'static, [u8]>> as bytes => Self(Inner::Buffer(bytes)),
80 &'static [u8] as bytes => Self::from_static_impl(bytes),
81 &'static str as bytes => Self::from_static_impl(bytes.as_bytes()),
82 Vec<u8> as bytes => Self::from(bytes),
83 String as bytes => Self::from(bytes.into_bytes()),
84 bytes => Self::from(bytes.as_ref().to_vec()),
85 })
86 }
87
88 #[inline]
89 fn from_static_impl(bytes: &'static [u8]) -> Self {
90 Self(Inner::Buffer(Cursor::new(Cow::Borrowed(bytes))))
91 }
92
93 pub fn from_reader<R>(read: R) -> Self
100 where
101 R: AsyncRead + Send + Sync + 'static,
102 {
103 Self(Inner::Reader(Box::pin(read), None))
104 }
105
106 pub fn from_reader_sized<R>(read: R, length: u64) -> Self
116 where
117 R: AsyncRead + Send + Sync + 'static,
118 {
119 Self(Inner::Reader(Box::pin(read), Some(length)))
120 }
121
122 pub fn is_empty(&self) -> bool {
129 match self.0 {
130 Inner::Empty => true,
131 _ => false,
132 }
133 }
134
135 pub fn len(&self) -> Option<u64> {
149 match &self.0 {
150 Inner::Empty => Some(0),
151 Inner::Buffer(bytes) => Some(bytes.get_ref().len() as u64),
152 Inner::Reader(_, len) => *len,
153 }
154 }
155
156 pub fn reset(&mut self) -> bool {
159 match &mut self.0 {
160 Inner::Empty => true,
161 Inner::Buffer(cursor) => {
162 cursor.set_position(0);
163 true
164 }
165 Inner::Reader(_, _) => false,
166 }
167 }
168
169 pub(crate) fn into_sync(self) -> sync::Body {
177 match self.0 {
178 Inner::Empty => sync::Body::empty(),
179 Inner::Buffer(cursor) => sync::Body::from_bytes_static(cursor.into_inner()),
180 Inner::Reader(reader, Some(len)) => {
181 sync::Body::from_reader_sized(BlockOn::new(reader), len)
182 }
183 Inner::Reader(reader, None) => sync::Body::from_reader(BlockOn::new(reader)),
184 }
185 }
186}
187
188impl AsyncRead for AsyncBody {
189 fn poll_read(
190 mut self: Pin<&mut Self>,
191 cx: &mut Context<'_>,
192 buf: &mut [u8],
193 ) -> Poll<io::Result<usize>> {
194 match &mut self.0 {
195 Inner::Empty => Poll::Ready(Ok(0)),
196 Inner::Buffer(cursor) => Poll::Ready(cursor.read(buf)),
197 Inner::Reader(read, _) => AsyncRead::poll_read(read.as_mut(), cx, buf),
198 }
199 }
200}
201
202impl Default for AsyncBody {
203 fn default() -> Self {
204 Self::empty()
205 }
206}
207
208impl From<()> for AsyncBody {
209 fn from(_: ()) -> Self {
210 Self::empty()
211 }
212}
213
214impl From<Vec<u8>> for AsyncBody {
215 fn from(body: Vec<u8>) -> Self {
216 Self(Inner::Buffer(Cursor::new(Cow::Owned(body))))
217 }
218}
219
220impl From<&'_ [u8]> for AsyncBody {
221 fn from(body: &[u8]) -> Self {
222 body.to_vec().into()
223 }
224}
225
226impl From<String> for AsyncBody {
227 fn from(body: String) -> Self {
228 body.into_bytes().into()
229 }
230}
231
232impl From<&'_ str> for AsyncBody {
233 fn from(body: &str) -> Self {
234 body.as_bytes().into()
235 }
236}
237
238impl<T: Into<Self>> From<Option<T>> for AsyncBody {
239 fn from(body: Option<T>) -> Self {
240 match body {
241 Some(body) => body.into(),
242 None => Self::empty(),
243 }
244 }
245}
246
247impl fmt::Debug for AsyncBody {
248 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
249 match self.len() {
250 Some(len) => write!(f, "AsyncBody({})", len),
251 None => write!(f, "AsyncBody(?)"),
252 }
253 }
254}
255
256#[cfg(test)]
257mod tests {
258 use super::*;
259 use futures_lite::{
260 future::{block_on, zip},
261 io::AsyncReadExt,
262 };
263
264 static_assertions::assert_impl_all!(AsyncBody: Send, Sync);
265
266 #[test]
267 fn empty_body() {
268 let body = AsyncBody::empty();
269
270 assert!(body.is_empty());
271 assert_eq!(body.len(), Some(0));
272 }
273
274 #[test]
275 fn zero_length_body() {
276 let body = AsyncBody::from(vec![]);
277
278 assert!(!body.is_empty());
279 assert_eq!(body.len(), Some(0));
280 }
281
282 #[test]
283 fn reader_with_unknown_length() {
284 let body = AsyncBody::from_reader(futures_lite::io::empty());
285
286 assert!(!body.is_empty());
287 assert_eq!(body.len(), None);
288 }
289
290 #[test]
291 fn reader_with_known_length() {
292 let body = AsyncBody::from_reader_sized(futures_lite::io::empty(), 0);
293
294 assert!(!body.is_empty());
295 assert_eq!(body.len(), Some(0));
296 }
297
298 #[test]
299 fn reset_memory_body() {
300 block_on(async {
301 let mut body = AsyncBody::from("hello world");
302 let mut buf = String::new();
303
304 assert_eq!(body.read_to_string(&mut buf).await.unwrap(), 11);
305 assert_eq!(buf, "hello world");
306 assert!(body.reset());
307 buf.clear(); assert_eq!(body.read_to_string(&mut buf).await.unwrap(), 11);
309 assert_eq!(buf, "hello world");
310 });
311 }
312
313 #[test]
314 fn cannot_reset_reader() {
315 let mut body = AsyncBody::from_reader(futures_lite::io::empty());
316
317 assert_eq!(body.reset(), false);
318 }
319
320 #[test]
321 fn sync_memory_into_async() {
322 let (body, writer) = Body::from("hello world").into_async();
323
324 assert!(writer.is_none());
325 assert_eq!(body.len(), Some(11));
326 }
327
328 #[test]
329 fn sync_reader_into_async() {
330 block_on(async {
331 let (mut body, writer) = Body::from_reader("hello world".as_bytes()).into_async();
332
333 assert!(writer.is_some());
334
335 zip(
337 async move {
338 writer.unwrap().write().await.unwrap();
339 },
340 async move {
341 let mut buf = String::new();
342 body.read_to_string(&mut buf).await.unwrap();
343 assert_eq!(buf, "hello world");
344 },
345 )
346 .await;
347 });
348 }
349}