isahc/
response.rs

1use crate::{metrics::Metrics, redirect::EffectiveUri, trailer::Trailer};
2use futures_lite::io::{copy as copy_async, AsyncRead, AsyncWrite};
3use http::{Response, Uri};
4use std::{
5    fs::File,
6    io::{self, Read, Write},
7    net::SocketAddr,
8    path::Path,
9};
10
11/// Provides extension methods for working with HTTP responses.
12pub trait ResponseExt<T> {
13    /// Get the trailer of the response containing headers that were received
14    /// after the response body.
15    ///
16    /// See the documentation for [`Trailer`] for more details on how to handle
17    /// trailing headers.
18    ///
19    /// # Examples
20    ///
21    /// ```no_run
22    /// use isahc::prelude::*;
23    ///
24    /// let mut response = isahc::get("https://my-site-with-trailers.com")?;
25    ///
26    /// println!("Status: {}", response.status());
27    /// println!("Headers: {:#?}", response.headers());
28    ///
29    /// // Read and discard the response body until the end.
30    /// response.consume()?;
31    ///
32    /// // Now the trailer will be available as well.
33    /// println!("Trailing headers: {:#?}", response.trailer().try_get().unwrap());
34    /// # Ok::<(), isahc::Error>(())
35    /// ```
36    fn trailer(&self) -> &Trailer;
37
38    /// Get the effective URI of this response. This value differs from the
39    /// original URI provided when making the request if at least one redirect
40    /// was followed.
41    ///
42    /// This information is only available if populated by the HTTP client that
43    /// produced the response.
44    fn effective_uri(&self) -> Option<&Uri>;
45
46    /// Get the local socket address of the last-used connection involved in
47    /// this request, if known.
48    ///
49    /// Multiple connections may be involved in a request, such as with
50    /// redirects.
51    ///
52    /// This method only makes sense with a normal Internet request. If some
53    /// other kind of transport is used to perform the request, such as a Unix
54    /// socket, then this method will return `None`.
55    fn local_addr(&self) -> Option<SocketAddr>;
56
57    /// Get the remote socket address of the last-used connection involved in
58    /// this request, if known.
59    ///
60    /// Multiple connections may be involved in a request, such as with
61    /// redirects.
62    ///
63    /// This method only makes sense with a normal Internet request. If some
64    /// other kind of transport is used to perform the request, such as a Unix
65    /// socket, then this method will return `None`.
66    ///
67    /// # Addresses and proxies
68    ///
69    /// The address returned by this method is the IP address and port that the
70    /// client _connected to_ and not necessarily the real address of the origin
71    /// server. Forward and reverse proxies between the caller and the server
72    /// can cause the address to be returned to reflect the address of the
73    /// nearest proxy rather than the server.
74    fn remote_addr(&self) -> Option<SocketAddr>;
75
76    /// Get the configured cookie jar used for persisting cookies from this
77    /// response, if any.
78    ///
79    /// # Availability
80    ///
81    /// This method is only available when the [`cookies`](index.html#cookies)
82    /// feature is enabled.
83    #[cfg(feature = "cookies")]
84    fn cookie_jar(&self) -> Option<&crate::cookies::CookieJar>;
85
86    /// If request metrics are enabled for this particular transfer, return a
87    /// metrics object containing a live view of currently available data.
88    ///
89    /// By default metrics are disabled and `None` will be returned. To enable
90    /// metrics you can use
91    /// [`Configurable::metrics`](crate::config::Configurable::metrics).
92    fn metrics(&self) -> Option<&Metrics>;
93}
94
95impl<T> ResponseExt<T> for Response<T> {
96    #[allow(clippy::redundant_closure)]
97    fn trailer(&self) -> &Trailer {
98        // Return a static empty trailer if the extension does not exist. This
99        // offers a more convenient API so that users do not have to unwrap the
100        // trailer from an extra Option.
101        self.extensions().get().unwrap_or_else(|| Trailer::empty())
102    }
103
104    fn effective_uri(&self) -> Option<&Uri> {
105        self.extensions().get::<EffectiveUri>().map(|v| &v.0)
106    }
107
108    fn local_addr(&self) -> Option<SocketAddr> {
109        self.extensions().get::<LocalAddr>().map(|v| v.0)
110    }
111
112    fn remote_addr(&self) -> Option<SocketAddr> {
113        self.extensions().get::<RemoteAddr>().map(|v| v.0)
114    }
115
116    #[cfg(feature = "cookies")]
117    fn cookie_jar(&self) -> Option<&crate::cookies::CookieJar> {
118        self.extensions().get()
119    }
120
121    fn metrics(&self) -> Option<&Metrics> {
122        self.extensions().get()
123    }
124}
125
126/// Provides extension methods for consuming HTTP response streams.
127pub trait ReadResponseExt<R: Read> {
128    /// Read any remaining bytes from the response body stream and discard them
129    /// until the end of the stream is reached. It is usually a good idea to
130    /// call this method before dropping a response if you know you haven't read
131    /// the entire response body.
132    ///
133    /// # Background
134    ///
135    /// By default, if a response stream is dropped before it has been
136    /// completely read from, then that HTTP connection will be terminated.
137    /// Depending on which version of HTTP is being used, this may require
138    /// closing the network connection to the server entirely. This can result
139    /// in sub-optimal performance for making multiple requests, as it prevents
140    /// Isahc from keeping the connection alive to be reused for subsequent
141    /// requests.
142    ///
143    /// If you are downloading a file on behalf of a user and have been
144    /// requested to cancel the operation, then this is probably what you want.
145    /// But if you are making many small API calls to a known server, then you
146    /// may want to call `consume()` before dropping the response, as reading a
147    /// few megabytes off a socket is usually more efficient in the long run
148    /// than taking a hit on connection reuse, and opening new connections can
149    /// be expensive.
150    ///
151    /// Note that in HTTP/2 and newer, it is not necessary to close the network
152    /// connection in order to interrupt the transfer of a particular response.
153    /// If you know that you will be using only HTTP/2 or newer, then calling
154    /// this method is probably unnecessary.
155    ///
156    /// # Examples
157    ///
158    /// ```no_run
159    /// use isahc::prelude::*;
160    ///
161    /// let mut response = isahc::get("https://example.org")?;
162    ///
163    /// println!("Status: {}", response.status());
164    /// println!("Headers: {:#?}", response.headers());
165    ///
166    /// // Read and discard the response body until the end.
167    /// response.consume()?;
168    /// # Ok::<(), isahc::Error>(())
169    /// ```
170    fn consume(&mut self) -> io::Result<()> {
171        self.copy_to(io::sink())?;
172
173        Ok(())
174    }
175
176    /// Copy the response body into a writer.
177    ///
178    /// Returns the number of bytes that were written.
179    ///
180    /// # Examples
181    ///
182    /// Copying the response into an in-memory buffer:
183    ///
184    /// ```no_run
185    /// use isahc::prelude::*;
186    ///
187    /// let mut buf = vec![];
188    /// isahc::get("https://example.org")?.copy_to(&mut buf)?;
189    /// println!("Read {} bytes", buf.len());
190    /// # Ok::<(), isahc::Error>(())
191    /// ```
192    fn copy_to<W: Write>(&mut self, writer: W) -> io::Result<u64>;
193
194    /// Write the response body to a file.
195    ///
196    /// This method makes it convenient to download a file using a GET request
197    /// and write it to a file synchronously in a single chain of calls.
198    ///
199    /// Returns the number of bytes that were written.
200    ///
201    /// # Examples
202    ///
203    /// ```no_run
204    /// use isahc::prelude::*;
205    ///
206    /// isahc::get("https://httpbin.org/image/jpeg")?
207    ///     .copy_to_file("myimage.jpg")?;
208    /// # Ok::<(), isahc::Error>(())
209    /// ```
210    fn copy_to_file<P: AsRef<Path>>(&mut self, path: P) -> io::Result<u64> {
211        File::create(path).and_then(|f| self.copy_to(f))
212    }
213
214    /// Read the entire response body into memory.
215    ///
216    /// # Examples
217    ///
218    /// ```no_run
219    /// use isahc::prelude::*;
220    ///
221    /// let image_bytes = isahc::get("https://httpbin.org/image/jpeg")?.bytes()?;
222    /// # Ok::<(), isahc::Error>(())
223    /// ```
224    fn bytes(&mut self) -> io::Result<Vec<u8>>;
225
226    /// Read the response body as a string.
227    ///
228    /// The encoding used to decode the response body into a string depends on
229    /// the response. If the body begins with a [Byte Order Mark
230    /// (BOM)](https://en.wikipedia.org/wiki/Byte_order_mark), then UTF-8,
231    /// UTF-16LE or UTF-16BE is used as indicated by the BOM. If no BOM is
232    /// present, the encoding specified in the `charset` parameter of the
233    /// `Content-Type` header is used if present. Otherwise UTF-8 is assumed.
234    ///
235    /// If the response body contains any malformed characters or characters not
236    /// representable in UTF-8, the offending bytes will be replaced with
237    /// `U+FFFD REPLACEMENT CHARACTER`, which looks like this: �.
238    ///
239    /// This method consumes the entire response body stream and can only be
240    /// called once.
241    ///
242    /// # Availability
243    ///
244    /// This method is only available when the
245    /// [`text-decoding`](index.html#text-decoding) feature is enabled, which it
246    /// is by default.
247    ///
248    /// # Examples
249    ///
250    /// ```no_run
251    /// use isahc::prelude::*;
252    ///
253    /// let text = isahc::get("https://example.org")?.text()?;
254    /// println!("{}", text);
255    /// # Ok::<(), isahc::Error>(())
256    /// ```
257    #[cfg(feature = "text-decoding")]
258    fn text(&mut self) -> io::Result<String>;
259
260    /// Deserialize the response body as JSON into a given type.
261    ///
262    /// # Availability
263    ///
264    /// This method is only available when the [`json`](index.html#json) feature
265    /// is enabled.
266    ///
267    /// # Examples
268    ///
269    /// ```no_run
270    /// use isahc::prelude::*;
271    /// use serde_json::Value;
272    ///
273    /// let json: Value = isahc::get("https://httpbin.org/json")?.json()?;
274    /// println!("author: {}", json["slideshow"]["author"]);
275    /// # Ok::<(), Box<dyn std::error::Error>>(())
276    /// ```
277    #[cfg(feature = "json")]
278    fn json<T>(&mut self) -> Result<T, serde_json::Error>
279    where
280        T: serde::de::DeserializeOwned;
281}
282
283impl<R: Read> ReadResponseExt<R> for Response<R> {
284    fn copy_to<W: Write>(&mut self, mut writer: W) -> io::Result<u64> {
285        io::copy(self.body_mut(), &mut writer)
286    }
287
288    fn bytes(&mut self) -> io::Result<Vec<u8>> {
289        let mut buf = allocate_buffer(self);
290
291        self.copy_to(&mut buf)?;
292
293        Ok(buf)
294    }
295
296    #[cfg(feature = "text-decoding")]
297    fn text(&mut self) -> io::Result<String> {
298        crate::text::Decoder::for_response(self).decode_reader(self.body_mut())
299    }
300
301    #[cfg(feature = "json")]
302    fn json<D>(&mut self) -> Result<D, serde_json::Error>
303    where
304        D: serde::de::DeserializeOwned,
305    {
306        serde_json::from_reader(self.body_mut())
307    }
308}
309
310/// Provides extension methods for consuming asynchronous HTTP response streams.
311pub trait AsyncReadResponseExt<R: AsyncRead + Unpin> {
312    /// Read any remaining bytes from the response body stream and discard them
313    /// until the end of the stream is reached. It is usually a good idea to
314    /// call this method before dropping a response if you know you haven't read
315    /// the entire response body.
316    ///
317    /// # Background
318    ///
319    /// By default, if a response stream is dropped before it has been
320    /// completely read from, then that HTTP connection will be terminated.
321    /// Depending on which version of HTTP is being used, this may require
322    /// closing the network connection to the server entirely. This can result
323    /// in sub-optimal performance for making multiple requests, as it prevents
324    /// Isahc from keeping the connection alive to be reused for subsequent
325    /// requests.
326    ///
327    /// If you are downloading a file on behalf of a user and have been
328    /// requested to cancel the operation, then this is probably what you want.
329    /// But if you are making many small API calls to a known server, then you
330    /// may want to call `consume()` before dropping the response, as reading a
331    /// few megabytes off a socket is usually more efficient in the long run
332    /// than taking a hit on connection reuse, and opening new connections can
333    /// be expensive.
334    ///
335    /// Note that in HTTP/2 and newer, it is not necessary to close the network
336    /// connection in order to interrupt the transfer of a particular response.
337    /// If you know that you will be using only HTTP/2 or newer, then calling
338    /// this method is probably unnecessary.
339    ///
340    /// # Examples
341    ///
342    /// ```no_run
343    /// use isahc::prelude::*;
344    ///
345    /// # async fn run() -> Result<(), isahc::Error> {
346    /// let mut response = isahc::get_async("https://example.org").await?;
347    ///
348    /// println!("Status: {}", response.status());
349    /// println!("Headers: {:#?}", response.headers());
350    ///
351    /// // Read and discard the response body until the end.
352    /// response.consume().await?;
353    /// # Ok(()) }
354    /// ```
355    fn consume(&mut self) -> ConsumeFuture<'_, R>;
356
357    /// Copy the response body into a writer asynchronously.
358    ///
359    /// Returns the number of bytes that were written.
360    ///
361    /// # Examples
362    ///
363    /// Copying the response into an in-memory buffer:
364    ///
365    /// ```no_run
366    /// use isahc::prelude::*;
367    ///
368    /// # async fn run() -> Result<(), isahc::Error> {
369    /// let mut buf = vec![];
370    /// isahc::get_async("https://example.org").await?
371    ///     .copy_to(&mut buf).await?;
372    /// println!("Read {} bytes", buf.len());
373    /// # Ok(()) }
374    /// ```
375    fn copy_to<'a, W>(&'a mut self, writer: W) -> CopyFuture<'a, R, W>
376    where
377        W: AsyncWrite + Unpin + 'a;
378
379    /// Read the entire response body into memory.
380    ///
381    /// # Examples
382    ///
383    /// ```no_run
384    /// use isahc::prelude::*;
385    ///
386    /// # async fn run() -> Result<(), isahc::Error> {
387    /// let image_bytes = isahc::get_async("https://httpbin.org/image/jpeg")
388    ///     .await?
389    ///     .bytes()
390    ///     .await?;
391    /// # Ok(()) }
392    /// ```
393    fn bytes(&mut self) -> BytesFuture<'_, &mut R>;
394
395    /// Read the response body as a string asynchronously.
396    ///
397    /// This method consumes the entire response body stream and can only be
398    /// called once.
399    ///
400    /// # Availability
401    ///
402    /// This method is only available when the
403    /// [`text-decoding`](index.html#text-decoding) feature is enabled, which it
404    /// is by default.
405    ///
406    /// # Examples
407    ///
408    /// ```no_run
409    /// use isahc::prelude::*;
410    ///
411    /// # async fn run() -> Result<(), isahc::Error> {
412    /// let text = isahc::get_async("https://example.org").await?
413    ///     .text().await?;
414    /// println!("{}", text);
415    /// # Ok(()) }
416    /// ```
417    #[cfg(feature = "text-decoding")]
418    fn text(&mut self) -> crate::text::TextFuture<'_, &mut R>;
419
420    /// Deserialize the response body as JSON into a given type.
421    ///
422    /// # Caveats
423    ///
424    /// Unlike its [synchronous equivalent](ReadResponseExt::json), this method
425    /// reads the entire response body into memory before attempting
426    /// deserialization. This is due to a Serde limitation since incremental
427    /// partial deserializing is not supported.
428    ///
429    /// # Availability
430    ///
431    /// This method is only available when the [`json`](index.html#json) feature
432    /// is enabled.
433    ///
434    /// # Examples
435    ///
436    /// ```no_run
437    /// use isahc::prelude::*;
438    /// use serde_json::Value;
439    ///
440    /// # async fn run() -> Result<(), Box<dyn std::error::Error>> {
441    /// let json: Value = isahc::get_async("https://httpbin.org/json").await?
442    ///     .json().await?;
443    /// println!("author: {}", json["slideshow"]["author"]);
444    /// # Ok(()) }
445    /// ```
446    #[cfg(feature = "json")]
447    fn json<T>(&mut self) -> JsonFuture<'_, R, T>
448    where
449        T: serde::de::DeserializeOwned;
450}
451
452impl<R: AsyncRead + Unpin> AsyncReadResponseExt<R> for Response<R> {
453    fn consume(&mut self) -> ConsumeFuture<'_, R> {
454        ConsumeFuture::new(async move {
455            copy_async(self.body_mut(), futures_lite::io::sink()).await?;
456
457            Ok(())
458        })
459    }
460
461    fn copy_to<'a, W>(&'a mut self, writer: W) -> CopyFuture<'a, R, W>
462    where
463        W: AsyncWrite + Unpin + 'a,
464    {
465        CopyFuture::new(async move { copy_async(self.body_mut(), writer).await })
466    }
467
468    fn bytes(&mut self) -> BytesFuture<'_, &mut R> {
469        BytesFuture::new(async move {
470            let mut buf = allocate_buffer(self);
471
472            copy_async(self.body_mut(), &mut buf).await?;
473
474            Ok(buf)
475        })
476    }
477
478    #[cfg(feature = "text-decoding")]
479    fn text(&mut self) -> crate::text::TextFuture<'_, &mut R> {
480        crate::text::Decoder::for_response(self).decode_reader_async(self.body_mut())
481    }
482
483    #[cfg(feature = "json")]
484    fn json<T>(&mut self) -> JsonFuture<'_, R, T>
485    where
486        T: serde::de::DeserializeOwned,
487    {
488        JsonFuture::new(async move {
489            let mut buf = allocate_buffer(self);
490
491            // Serde does not support incremental parsing, so we have to resort
492            // to reading the entire response into memory first and then
493            // deserializing.
494            if let Err(e) = copy_async(self.body_mut(), &mut buf).await {
495                struct ErrorReader(Option<io::Error>);
496
497                impl Read for ErrorReader {
498                    fn read(&mut self, _buf: &mut [u8]) -> io::Result<usize> {
499                        Err(self.0.take().unwrap())
500                    }
501                }
502
503                // Serde offers no public way to directly create an error from
504                // an I/O error, but we can do so in a roundabout way by parsing
505                // a reader that always returns the desired error.
506                serde_json::from_reader(ErrorReader(Some(e)))
507            } else {
508                serde_json::from_slice(&buf)
509            }
510        })
511    }
512}
513
514fn allocate_buffer<T>(response: &Response<T>) -> Vec<u8> {
515    if let Some(length) = get_content_length(response) {
516        Vec::with_capacity(length as usize)
517    } else {
518        Vec::new()
519    }
520}
521
522fn get_content_length<T>(response: &Response<T>) -> Option<u64> {
523    response.headers()
524        .get(http::header::CONTENT_LENGTH)?
525        .to_str()
526        .ok()?
527        .parse()
528        .ok()
529}
530
531decl_future! {
532    /// A future which reads any remaining bytes from the response body stream
533    /// and discard them.
534    pub type ConsumeFuture<R> = impl Future<Output = io::Result<()>> + SendIf<R>;
535
536    /// A future which copies all the response body bytes into a sink.
537    pub type CopyFuture<R, W> = impl Future<Output = io::Result<u64>> + SendIf<R, W>;
538
539    /// A future which reads the entire response body into memory.
540    pub type BytesFuture<R> = impl Future<Output = io::Result<Vec<u8>>> + SendIf<R>;
541
542    /// A future which deserializes the response body as JSON.
543    #[cfg(feature = "json")]
544    pub type JsonFuture<R, T> = impl Future<Output = Result<T, serde_json::Error>> + SendIf<R, T>;
545}
546
547pub(crate) struct LocalAddr(pub(crate) SocketAddr);
548
549pub(crate) struct RemoteAddr(pub(crate) SocketAddr);
550
551#[cfg(test)]
552mod tests {
553    use super::*;
554
555    static_assertions::assert_impl_all!(CopyFuture<'static, Vec<u8>, Vec<u8>>: Send);
556
557    // *mut T is !Send
558    static_assertions::assert_not_impl_any!(CopyFuture<'static, *mut Vec<u8>, Vec<u8>>: Send);
559    static_assertions::assert_not_impl_any!(CopyFuture<'static, Vec<u8>, *mut Vec<u8>>: Send);
560    static_assertions::assert_not_impl_any!(CopyFuture<'static, *mut Vec<u8>, *mut Vec<u8>>: Send);
561}