isahc/
trailer.rs

1use event_listener::Event;
2use http::HeaderMap;
3use once_cell::sync::OnceCell;
4use std::{sync::Arc, time::Duration};
5
6/// Holds the current state of a trailer for a response.
7///
8/// This object acts as a shared handle that can be cloned and polled from
9/// multiple threads to wait for and act on the response trailer.
10///
11/// There are two typical workflows for accessing trailer headers:
12///
13/// - If you are consuming the response body and then accessing the headers
14///   afterward, then all trailers are guaranteed to have arrived (if any).
15///   [`Trailer::try_get`] will allow you to access them without extra overhead.
16/// - If you are handling trailers in a separate task, callback, or thread, then
17///   either [`Trailer::wait`] or [`Trailer::wait_async`] will allow you to wait
18///   for the trailer headers to arrive and then handle them.
19///
20/// Note that in either approach, trailer headers are delivered to your
21/// application as a single [`HeaderMap`]; it is not possible to handle
22/// individual headers as they arrive.
23#[derive(Clone, Debug)]
24pub struct Trailer {
25    shared: Arc<Shared>,
26}
27
28#[derive(Debug)]
29struct Shared {
30    headers: OnceCell<HeaderMap>,
31    ready: Event,
32}
33
34impl Trailer {
35    /// Get a populated trailer handle containing no headers.
36    pub(crate) fn empty() -> &'static Self {
37        static EMPTY: OnceCell<Trailer> = OnceCell::new();
38
39        EMPTY.get_or_init(|| Self {
40            shared: Arc::new(Shared {
41                headers: OnceCell::from(HeaderMap::new()),
42                ready: Event::new(),
43            }),
44        })
45    }
46
47    /// Returns true if the trailer has been received (if any).
48    ///
49    /// The trailer will not be received until the body stream associated with
50    /// this response has been fully consumed.
51    #[inline]
52    pub fn is_ready(&self) -> bool {
53        self.try_get().is_some()
54    }
55
56    /// Attempt to get the trailer headers without blocking. Returns `None` if
57    /// the trailer has not been received yet.
58    #[inline]
59    pub fn try_get(&self) -> Option<&HeaderMap> {
60        self.shared.headers.get()
61    }
62
63    /// Block the current thread until the trailer headers arrive, and then
64    /// return them.
65    ///
66    /// This is a blocking operation! If you are writing an asynchronous
67    /// application, then you probably want to use [`Trailer::wait_async`]
68    /// instead.
69    pub fn wait(&self) -> &HeaderMap {
70        loop {
71            // Fast path: If the headers are already set, return them.
72            if let Some(headers) = self.try_get() {
73                return headers;
74            }
75
76            // Headers not set, jump into the slow path by creating a new
77            // listener for the ready event.
78            let listener = self.shared.ready.listen();
79
80            // Double-check that the headers are not set.
81            if let Some(headers) = self.try_get() {
82                return headers;
83            }
84
85            // Otherwise, block until they are set.
86            listener.wait();
87
88            // If we got the notification, then the headers are likely to be
89            // set.
90            if let Some(headers) = self.try_get() {
91                return headers;
92            }
93        }
94    }
95
96    /// Block the current thread until the trailer headers arrive or a timeout
97    /// expires.
98    ///
99    /// If the given timeout expired before the trailer arrived then `None` is
100    /// returned.
101    ///
102    /// This is a blocking operation! If you are writing an asynchronous
103    /// application, then you probably want to use [`Trailer::wait_async`]
104    /// instead.
105    pub fn wait_timeout(&self, timeout: Duration) -> Option<&HeaderMap> {
106        // Fast path: If the headers are already set, return them.
107        if let Some(headers) = self.try_get() {
108            return Some(headers);
109        }
110
111        // Headers not set, jump into the slow path by creating a new listener
112        // for the ready event.
113        let listener = self.shared.ready.listen();
114
115        // Double-check that the headers are not set.
116        if let Some(headers) = self.try_get() {
117            return Some(headers);
118        }
119
120        // Otherwise, block with a timeout.
121        if listener.wait_timeout(timeout) {
122            self.try_get()
123        } else {
124            None
125        }
126    }
127
128    /// Wait asynchronously until the trailer headers arrive, and then return
129    /// them.
130    pub async fn wait_async(&self) -> &HeaderMap {
131        loop {
132            // Fast path: If the headers are already set, return them.
133            if let Some(headers) = self.try_get() {
134                return headers;
135            }
136
137            // Headers not set, jump into the slow path by creating a new
138            // listener for the ready event.
139            let listener = self.shared.ready.listen();
140
141            // Double-check that the headers are not set.
142            if let Some(headers) = self.try_get() {
143                return headers;
144            }
145
146            // Otherwise, wait asynchronously until they are.
147            listener.await;
148
149            // If we got the notification, then the headers are likely to be
150            // set.
151            if let Some(headers) = self.try_get() {
152                return headers;
153            }
154        }
155    }
156}
157
158pub(crate) struct TrailerWriter {
159    shared: Arc<Shared>,
160    headers: Option<HeaderMap>,
161}
162
163impl TrailerWriter {
164    pub(crate) fn new() -> Self {
165        Self {
166            shared: Arc::new(Shared {
167                headers: Default::default(),
168                ready: Event::new(),
169            }),
170            headers: Some(HeaderMap::new()),
171        }
172    }
173
174    pub(crate) fn trailer(&self) -> Trailer {
175        Trailer {
176            shared: self.shared.clone(),
177        }
178    }
179
180    pub(crate) fn get_mut(&mut self) -> Option<&mut HeaderMap> {
181        self.headers.as_mut()
182    }
183
184    #[inline]
185    pub(crate) fn flush(&mut self) {
186        if !self.flush_impl() {
187            tracing::warn!("tried to flush trailer multiple times");
188        }
189    }
190
191    fn flush_impl(&mut self) -> bool {
192        if let Some(headers) = self.headers.take() {
193            let _ = self.shared.headers.set(headers);
194
195            // Wake up any calls waiting for the headers.
196            self.shared.ready.notify(usize::max_value());
197
198            true
199        } else {
200            false
201        }
202    }
203}
204
205impl Drop for TrailerWriter {
206    fn drop(&mut self) {
207        self.flush_impl();
208    }
209}