isahc/agent/selector.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315
use curl::multi::Socket;
use polling::{Event, Poller};
use std::{
collections::{HashMap, HashSet},
hash::{BuildHasherDefault, Hasher},
io,
sync::Arc,
task::Waker,
time::Duration,
};
/// Asynchronous I/O selector for sockets. Used by the agent to wait for network
/// activity asynchronously, as directed by curl.
///
/// Provides an abstraction layer on a bare poller that manages the bookkeeping
/// of socket registration and translating oneshot registrations into persistent
/// registrations.
///
/// Events are level-triggered, since that is what curl wants.
pub(crate) struct Selector {
/// This is the poller we use to poll for socket activity!
poller: Arc<Poller>,
/// All of the sockets that we have been asked to keep track of.
sockets: HashMap<Socket, Registration, BuildHasherDefault<IntHasher>>,
/// If a socket is currently invalid when it is registered, we put it in this
/// set and try to register it again later.
bad_sockets: HashSet<Socket, BuildHasherDefault<IntHasher>>,
/// Socket events that have occurred. We re-use this vec every call for
/// efficiency.
events: Vec<Event>,
/// Incrementing counter used to deduplicate registration operations.
tick: usize,
}
/// Information stored about each registered socket.
struct Registration {
readable: bool,
writable: bool,
tick: usize,
}
impl Selector {
/// Create a new socket selector.
pub(crate) fn new() -> io::Result<Self> {
Ok(Self {
poller: Arc::new(Poller::new()?),
sockets: HashMap::with_hasher(Default::default()),
bad_sockets: HashSet::with_hasher(Default::default()),
events: Vec::new(),
tick: 0,
})
}
/// Get a task waker that will interrupt this selector whenever it is
/// waiting for activity.
pub(crate) fn waker(&self) -> Waker {
waker_fn::waker_fn({
let poller_ref = self.poller.clone();
move || {
let _ = poller_ref.notify();
}
})
}
/// Register a socket with the selector to begin receiving readiness events
/// for it.
///
/// This method can also be used to update/modify the readiness events you
/// are interested in for a previously registered socket.
pub(crate) fn register(
&mut self,
socket: Socket,
readable: bool,
writable: bool,
) -> io::Result<()> {
let previous = self.sockets.insert(socket, Registration {
readable,
writable,
tick: self.tick,
});
let result = if previous.is_some() {
poller_modify(&self.poller, socket, readable, writable)
} else {
poller_add(&self.poller, socket, readable, writable)
};
match result {
Err(e) if is_bad_socket_error(&e) => {
// We've been asked to monitor a socket, but the poller thinks
// that the socket is invalid or closed. On occasion, curl will
// give such sockets with the assumption that we will monitor
// them until curl tells us to stop. With stateless pollers such
// as `select(2)` this is not a problem, but most
// high-performance stateless pollers need the socket to be
// valid in order to monitor them.
//
// To get around this problem, we return `Ok` to the caller and
// hold onto this currently invalid socket for later. Whenever
// `poll` is called, we retry registering these sockets in the
// hope that they will eventually become valid.
tracing::debug!(socket, error = ?e, "bad socket registered, will try again later");
self.bad_sockets.insert(socket);
Ok(())
}
result => result,
}
}
/// Remove a socket from the selector and stop receiving events for it.
pub(crate) fn deregister(&mut self, socket: Socket) -> io::Result<()> {
// Remove this socket from our bookkeeping. If we recognize it, also
// remove it from the underlying poller.
if self.sockets.remove(&socket).is_some() {
self.bad_sockets.remove(&socket);
// There's a good chance that the socket has already been closed.
// Depending on the poller implementation, it may have already
// forgotten about this socket (e.g. epoll). Therefore if we get an
// error back complaining that the socket is invalid, we can safely
// ignore it.
if let Err(e) = self.poller.delete(socket) {
if !is_bad_socket_error(&e) && e.kind() != io::ErrorKind::PermissionDenied {
return Err(e);
}
}
}
Ok(())
}
/// Block until socket activity is detected or a timeout passes.
///
/// Returns `true` if one or more socket events occurred.
pub(crate) fn poll(&mut self, timeout: Duration) -> io::Result<bool> {
// Since our I/O events are oneshot, make sure we re-register sockets
// with the poller that previously were triggered last time we polled.
//
// We don't do this immediately after polling because the caller may
// choose to de-register a socket before the next call. That's why we
// wait until the last minute.
for event in self.events.drain(..) {
let socket = event.key as Socket;
if let Some(registration) = self.sockets.get_mut(&socket) {
// If the socket was already re-registered this tick, then we
// don't need to do this.
if registration.tick != self.tick {
poller_modify(
&self.poller,
socket,
registration.readable,
registration.writable,
)?;
registration.tick = self.tick;
}
}
}
// Iterate over sockets that have been registered, but failed to be
// added to the underlying poller temporarily, and retry adding them.
self.bad_sockets.retain({
let sockets = &mut self.sockets;
let poller = &self.poller;
let tick = self.tick;
move |&socket| {
if let Some(registration) = sockets.get_mut(&socket) {
if registration.tick != tick {
registration.tick = tick;
poller_add(poller, socket, registration.readable, registration.writable)
.is_err()
} else {
true
}
} else {
false
}
}
});
self.tick = self.tick.wrapping_add(1);
// Block until either an I/O event occurs on a socket, the timeout is
// reached, or the agent handle interrupts us.
match self.poller.wait(&mut self.events, Some(timeout)) {
Ok(0) => Ok(false),
Ok(_) => Ok(true),
Err(e) if e.kind() == io::ErrorKind::Interrupted => Ok(false),
Err(e) => Err(e),
}
}
/// Get an iterator over the socket events that occurred during the most
/// recent call to `poll`.
pub(crate) fn events(&self) -> impl Iterator<Item = (Socket, bool, bool)> + '_ {
self.events
.iter()
.map(|event| (event.key as Socket, event.readable, event.writable))
}
}
fn poller_add(poller: &Poller, socket: Socket, readable: bool, writable: bool) -> io::Result<()> {
// If this errors, we retry the operation as a modification instead. This is
// because this new socket might re-use a file descriptor that was
// previously closed, but is still registered with the poller. Retrying the
// operation as a modification is sufficient to handle this.
//
// This is especially common with the epoll backend.
if let Err(e) = poller.add(socket, Event {
key: socket as usize,
readable,
writable,
}) {
tracing::debug!(
"failed to add interest for socket {}, retrying as a modify: {}",
socket,
e
);
poller.modify(socket, Event {
key: socket as usize,
readable,
writable,
})?;
}
Ok(())
}
fn poller_modify(
poller: &Poller,
socket: Socket,
readable: bool,
writable: bool,
) -> io::Result<()> {
// If this errors, we retry the operation as an add instead. This is done
// because epoll is weird.
if let Err(e) = poller.modify(socket, Event {
key: socket as usize,
readable,
writable,
}) {
tracing::debug!(
"failed to modify interest for socket {}, retrying as an add: {}",
socket,
e
);
poller.add(socket, Event {
key: socket as usize,
readable,
writable,
})?;
}
Ok(())
}
fn is_bad_socket_error(error: &io::Error) -> bool {
// OS-specific error codes that aren't mapped to an `std::io::ErrorKind`.
const EBADF: i32 = 9;
const ERROR_INVALID_HANDLE: i32 = 6;
const ERROR_NOT_FOUND: i32 = 1168;
match error.kind() {
// Common error codes std understands.
io::ErrorKind::NotFound | io::ErrorKind::InvalidInput => true,
// Check for OS-specific error codes.
_ => match error.raw_os_error() {
// kqueue likes to return EBADF, especially on removal, since it
// automatically removes sockets when they are closed.
Some(EBADF) if cfg!(unix) => true,
// IOCP can return these in rare circumstances. Typically these just
// indicate that the socket is no longer registered with the
// completion port or was already closed.
Some(ERROR_INVALID_HANDLE) | Some(ERROR_NOT_FOUND) if cfg!(windows) => true,
_ => false,
},
}
}
/// Trivial hash function to use for our maps and sets that use file descriptors
/// as keys.
#[derive(Default)]
struct IntHasher([u8; 8], #[cfg(debug_assertions)] bool);
impl Hasher for IntHasher {
fn write(&mut self, bytes: &[u8]) {
#[cfg(debug_assertions)]
{
if self.1 {
panic!("socket hash function can only be written to once");
} else {
self.1 = true;
}
if bytes.len() > 8 {
panic!("only a maximum of 8 bytes can be hashed");
}
}
(&mut self.0[..bytes.len()]).copy_from_slice(bytes);
}
#[inline]
fn finish(&self) -> u64 {
u64::from_ne_bytes(self.0)
}
}