isahc/trailer.rs
1use event_listener::Event;
2use http::HeaderMap;
3use once_cell::sync::OnceCell;
4use std::{sync::Arc, time::Duration};
5
6/// Holds the current state of a trailer for a response.
7///
8/// This object acts as a shared handle that can be cloned and polled from
9/// multiple threads to wait for and act on the response trailer.
10///
11/// There are two typical workflows for accessing trailer headers:
12///
13/// - If you are consuming the response body and then accessing the headers
14/// afterward, then all trailers are guaranteed to have arrived (if any).
15/// [`Trailer::try_get`] will allow you to access them without extra overhead.
16/// - If you are handling trailers in a separate task, callback, or thread, then
17/// either [`Trailer::wait`] or [`Trailer::wait_async`] will allow you to wait
18/// for the trailer headers to arrive and then handle them.
19///
20/// Note that in either approach, trailer headers are delivered to your
21/// application as a single [`HeaderMap`]; it is not possible to handle
22/// individual headers as they arrive.
23#[derive(Clone, Debug)]
24pub struct Trailer {
25 shared: Arc<Shared>,
26}
27
28#[derive(Debug)]
29struct Shared {
30 headers: OnceCell<HeaderMap>,
31 ready: Event,
32}
33
34impl Trailer {
35 /// Get a populated trailer handle containing no headers.
36 pub(crate) fn empty() -> &'static Self {
37 static EMPTY: OnceCell<Trailer> = OnceCell::new();
38
39 EMPTY.get_or_init(|| Self {
40 shared: Arc::new(Shared {
41 headers: OnceCell::from(HeaderMap::new()),
42 ready: Event::new(),
43 }),
44 })
45 }
46
47 /// Returns true if the trailer has been received (if any).
48 ///
49 /// The trailer will not be received until the body stream associated with
50 /// this response has been fully consumed.
51 #[inline]
52 pub fn is_ready(&self) -> bool {
53 self.try_get().is_some()
54 }
55
56 /// Attempt to get the trailer headers without blocking. Returns `None` if
57 /// the trailer has not been received yet.
58 #[inline]
59 pub fn try_get(&self) -> Option<&HeaderMap> {
60 self.shared.headers.get()
61 }
62
63 /// Block the current thread until the trailer headers arrive, and then
64 /// return them.
65 ///
66 /// This is a blocking operation! If you are writing an asynchronous
67 /// application, then you probably want to use [`Trailer::wait_async`]
68 /// instead.
69 pub fn wait(&self) -> &HeaderMap {
70 loop {
71 // Fast path: If the headers are already set, return them.
72 if let Some(headers) = self.try_get() {
73 return headers;
74 }
75
76 // Headers not set, jump into the slow path by creating a new
77 // listener for the ready event.
78 let listener = self.shared.ready.listen();
79
80 // Double-check that the headers are not set.
81 if let Some(headers) = self.try_get() {
82 return headers;
83 }
84
85 // Otherwise, block until they are set.
86 listener.wait();
87
88 // If we got the notification, then the headers are likely to be
89 // set.
90 if let Some(headers) = self.try_get() {
91 return headers;
92 }
93 }
94 }
95
96 /// Block the current thread until the trailer headers arrive or a timeout
97 /// expires.
98 ///
99 /// If the given timeout expired before the trailer arrived then `None` is
100 /// returned.
101 ///
102 /// This is a blocking operation! If you are writing an asynchronous
103 /// application, then you probably want to use [`Trailer::wait_async`]
104 /// instead.
105 pub fn wait_timeout(&self, timeout: Duration) -> Option<&HeaderMap> {
106 // Fast path: If the headers are already set, return them.
107 if let Some(headers) = self.try_get() {
108 return Some(headers);
109 }
110
111 // Headers not set, jump into the slow path by creating a new listener
112 // for the ready event.
113 let listener = self.shared.ready.listen();
114
115 // Double-check that the headers are not set.
116 if let Some(headers) = self.try_get() {
117 return Some(headers);
118 }
119
120 // Otherwise, block with a timeout.
121 if listener.wait_timeout(timeout) {
122 self.try_get()
123 } else {
124 None
125 }
126 }
127
128 /// Wait asynchronously until the trailer headers arrive, and then return
129 /// them.
130 pub async fn wait_async(&self) -> &HeaderMap {
131 loop {
132 // Fast path: If the headers are already set, return them.
133 if let Some(headers) = self.try_get() {
134 return headers;
135 }
136
137 // Headers not set, jump into the slow path by creating a new
138 // listener for the ready event.
139 let listener = self.shared.ready.listen();
140
141 // Double-check that the headers are not set.
142 if let Some(headers) = self.try_get() {
143 return headers;
144 }
145
146 // Otherwise, wait asynchronously until they are.
147 listener.await;
148
149 // If we got the notification, then the headers are likely to be
150 // set.
151 if let Some(headers) = self.try_get() {
152 return headers;
153 }
154 }
155 }
156}
157
158pub(crate) struct TrailerWriter {
159 shared: Arc<Shared>,
160 headers: Option<HeaderMap>,
161}
162
163impl TrailerWriter {
164 pub(crate) fn new() -> Self {
165 Self {
166 shared: Arc::new(Shared {
167 headers: Default::default(),
168 ready: Event::new(),
169 }),
170 headers: Some(HeaderMap::new()),
171 }
172 }
173
174 pub(crate) fn trailer(&self) -> Trailer {
175 Trailer {
176 shared: self.shared.clone(),
177 }
178 }
179
180 pub(crate) fn get_mut(&mut self) -> Option<&mut HeaderMap> {
181 self.headers.as_mut()
182 }
183
184 #[inline]
185 pub(crate) fn flush(&mut self) {
186 if !self.flush_impl() {
187 tracing::warn!("tried to flush trailer multiple times");
188 }
189 }
190
191 fn flush_impl(&mut self) -> bool {
192 if let Some(headers) = self.headers.take() {
193 let _ = self.shared.headers.set(headers);
194
195 // Wake up any calls waiting for the headers.
196 self.shared.ready.notify(usize::max_value());
197
198 true
199 } else {
200 false
201 }
202 }
203}
204
205impl Drop for TrailerWriter {
206 fn drop(&mut self) {
207 self.flush_impl();
208 }
209}