// This implementation is based on the one in the `polling` crate. // Thanks to https://github.com/Kestrer for the original implementation! // Permission to use this code has been granted by original author: // https://github.com/tokio-rs/mio/pull/1602#issuecomment-1218441031 use std::collections::HashMap; use std::fmt::{Debug, Formatter}; #[cfg(not(target_os = "hermit"))] use std::os::fd::{AsRawFd, RawFd}; // TODO: once is fixed this // can use `std::os::fd` and be merged with the above. #[cfg(target_os = "hermit")] use std::os::hermit::io::{AsRawFd, RawFd}; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::{Arc, Condvar, Mutex}; use std::time::Duration; use std::{cmp, fmt, io}; use crate::sys::unix::waker::Waker as WakerInternal; use crate::{Interest, Token}; /// Unique id for use as `SelectorId`. #[cfg(debug_assertions)] static NEXT_ID: AtomicUsize = AtomicUsize::new(1); #[derive(Debug)] pub struct Selector { state: Arc, } impl Selector { pub fn new() -> io::Result { let state = SelectorState::new()?; Ok(Selector { state: Arc::new(state), }) } pub fn try_clone(&self) -> io::Result { let state = self.state.clone(); Ok(Selector { state }) } pub fn select(&self, events: &mut Events, timeout: Option) -> io::Result<()> { self.state.select(events, timeout) } pub fn register(&self, fd: RawFd, token: Token, interests: Interest) -> io::Result<()> { self.state.register(fd, token, interests) } #[allow(dead_code)] pub(crate) fn register_internal( &self, fd: RawFd, token: Token, interests: Interest, ) -> io::Result> { self.state.register_internal(fd, token, interests) } pub fn reregister(&self, fd: RawFd, token: Token, interests: Interest) -> io::Result<()> { self.state.reregister(fd, token, interests) } pub fn deregister(&self, fd: RawFd) -> io::Result<()> { self.state.deregister(fd) } pub fn wake(&self, token: Token) -> io::Result<()> { self.state.wake(token) } cfg_io_source! { #[cfg(debug_assertions)] pub fn id(&self) -> usize { self.state.id } } } /// Interface to poll. #[derive(Debug)] struct SelectorState { /// File descriptors to poll. fds: Mutex, /// File descriptors which will be removed before the next poll call. /// /// When a file descriptor is deregistered while a poll is running, we need to filter /// out all removed descriptors after that poll is finished running. pending_removal: Mutex>, /// Token associated with Waker that have recently asked to wake. This will /// cause a synthetic behaviour where on any wakeup we add all pending tokens /// to the list of emitted events. pending_wake_token: Mutex>, /// Data is written to this to wake up the current instance of `wait`, which can occur when the /// user notifies it (in which case `notified` would have been set) or when an operation needs /// to occur (in which case `waiting_operations` would have been incremented). notify_waker: WakerInternal, /// The number of operations (`add`, `modify` or `delete`) that are currently waiting on the /// mutex to become free. When this is nonzero, `wait` must be suspended until it reaches zero /// again. waiting_operations: AtomicUsize, /// The condition variable that gets notified when `waiting_operations` reaches zero or /// `notified` becomes true. /// /// This is used with the `fds` mutex. operations_complete: Condvar, /// This selectors id. #[cfg(debug_assertions)] #[allow(dead_code)] id: usize, } /// The file descriptors to poll in a `Poller`. #[derive(Debug, Clone)] struct Fds { /// The list of `pollfds` taken by poll. /// /// The first file descriptor is always present and is used to notify the poller. poll_fds: Vec, /// The map of each file descriptor to data associated with it. This does not include the file /// descriptors created by the internal notify waker. fd_data: HashMap, } /// Transparent wrapper around `libc::pollfd`, used to support `Debug` derives without adding the /// `extra_traits` feature of `libc`. #[repr(transparent)] #[derive(Clone)] struct PollFd(libc::pollfd); impl Debug for PollFd { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { f.debug_struct("pollfd") .field("fd", &self.0.fd) .field("events", &self.0.events) .field("revents", &self.0.revents) .finish() } } /// Data associated with a file descriptor in a poller. #[derive(Debug, Clone)] struct FdData { /// The index into `poll_fds` this file descriptor is. poll_fds_index: usize, /// The key of the `Event` associated with this file descriptor. token: Token, /// Used to communicate with IoSourceState when we need to internally deregister /// based on a closed fd. shared_record: Arc, } impl SelectorState { pub fn new() -> io::Result { let notify_waker = WakerInternal::new_unregistered()?; Ok(Self { fds: Mutex::new(Fds { poll_fds: vec![PollFd(libc::pollfd { fd: notify_waker.as_raw_fd(), events: libc::POLLIN, revents: 0, })], fd_data: HashMap::new(), }), pending_removal: Mutex::new(Vec::new()), pending_wake_token: Mutex::new(None), notify_waker, waiting_operations: AtomicUsize::new(0), operations_complete: Condvar::new(), #[cfg(debug_assertions)] id: NEXT_ID.fetch_add(1, Ordering::Relaxed), }) } pub fn select(&self, events: &mut Events, timeout: Option) -> io::Result<()> { events.clear(); let mut fds = self.fds.lock().unwrap(); // Keep track of fds that receive POLLHUP or POLLERR (i.e. won't receive further // events) and internally deregister them before they are externally deregister'd. See // IoSourceState below to track how the external deregister call will be handled // when this state occurs. let mut closed_raw_fds = Vec::new(); loop { // Complete all current operations. loop { if self.waiting_operations.load(Ordering::SeqCst) == 0 { break; } fds = self.operations_complete.wait(fds).unwrap(); } // Perform the poll. trace!("Polling on {:?}", &fds); let num_events = poll(&mut fds.poll_fds, timeout)?; trace!("Poll finished: {:?}", &fds); if num_events == 0 { return Ok(()); } let waker_events = fds.poll_fds[0].0.revents; let notified = waker_events != 0; let mut num_fd_events = if notified { num_events - 1 } else { num_events }; let pending_wake_token = self.pending_wake_token.lock().unwrap().take(); if notified { self.notify_waker.ack_and_reset(); if pending_wake_token.is_some() { num_fd_events += 1; } } // We now check whether this poll was performed with descriptors which were pending // for removal and filter out any matching. let mut pending_removal_guard = self.pending_removal.lock().unwrap(); let mut pending_removal = std::mem::replace(pending_removal_guard.as_mut(), Vec::new()); drop(pending_removal_guard); // Store the events if there were any. if num_fd_events > 0 { let fds = &mut *fds; events.reserve(num_fd_events); // Add synthetic events we picked up from calls to wake() if let Some(pending_wake_token) = pending_wake_token { events.push(Event { token: pending_wake_token, events: waker_events, }); } for fd_data in fds.fd_data.values_mut() { let PollFd(poll_fd) = &mut fds.poll_fds[fd_data.poll_fds_index]; if pending_removal.contains(&poll_fd.fd) { // Fd was removed while poll was running continue; } if poll_fd.revents != 0 { // Store event events.push(Event { token: fd_data.token, events: poll_fd.revents, }); if poll_fd.revents & (libc::POLLHUP | libc::POLLERR) != 0 { pending_removal.push(poll_fd.fd); closed_raw_fds.push(poll_fd.fd); } // Remove the interest which just got triggered the IoSourceState's do_io // wrapper used with this selector will add back the interest using // reregister. poll_fd.events &= !poll_fd.revents; // Minor optimization to potentially avoid looping n times where n is the // number of input fds (i.e. we might loop between m and n times where m is // the number of fds with revents != 0). if events.len() == num_fd_events { break; } } } break; // No more polling. } // If we didn't break above it means we got woken up internally (for example for adding an fd), so we poll again. } drop(fds); let _ = self.deregister_all(&closed_raw_fds); Ok(()) } pub fn register(&self, fd: RawFd, token: Token, interests: Interest) -> io::Result<()> { self.register_internal(fd, token, interests).map(|_| ()) } pub fn register_internal( &self, fd: RawFd, token: Token, interests: Interest, ) -> io::Result> { #[cfg(debug_assertions)] if fd == self.notify_waker.as_raw_fd() { return Err(io::Error::from(io::ErrorKind::InvalidInput)); } // We must handle the unlikely case that the following order of operations happens: // // register(1 as RawFd) // deregister(1 as RawFd) // register(1 as RawFd) // // // Fd's pending removal only get cleared when poll has been run. It is possible that // between registering and deregistering and then _again_ registering the file descriptor // poll never gets called, thus the fd stays stuck in the pending removal list. // // To avoid this scenario we remove an fd from pending removals when registering it. let mut pending_removal = self.pending_removal.lock().unwrap(); if let Some(idx) = pending_removal.iter().position(|&pending| pending == fd) { pending_removal.swap_remove(idx); } drop(pending_removal); self.modify_fds(|fds| { if fds.fd_data.contains_key(&fd) { return Err(io::Error::new( io::ErrorKind::AlreadyExists, "I/O source already registered this `Registry` \ (an old file descriptor might have been closed without deregistration)", )); } let poll_fds_index = fds.poll_fds.len(); let record = Arc::new(RegistrationRecord::new()); fds.fd_data.insert( fd, FdData { poll_fds_index, token, shared_record: record.clone(), }, ); fds.poll_fds.push(PollFd(libc::pollfd { fd, events: interests_to_poll(interests), revents: 0, })); Ok(record) }) } pub fn reregister(&self, fd: RawFd, token: Token, interests: Interest) -> io::Result<()> { self.modify_fds(|fds| { let data = fds.fd_data.get_mut(&fd).ok_or(io::ErrorKind::NotFound)?; data.token = token; let poll_fds_index = data.poll_fds_index; fds.poll_fds[poll_fds_index].0.events = interests_to_poll(interests); Ok(()) }) } pub fn deregister(&self, fd: RawFd) -> io::Result<()> { self.deregister_all(&[fd]) .map_err(|_| io::ErrorKind::NotFound)?; Ok(()) } /// Perform a modification on `fds`, interrupting the current caller of `wait` if it's running. fn modify_fds(&self, f: impl FnOnce(&mut Fds) -> T) -> T { self.waiting_operations.fetch_add(1, Ordering::SeqCst); // Wake up the current caller of `wait` if there is one. let sent_notification = self.notify_waker.wake().is_ok(); let mut fds = self.fds.lock().unwrap(); // If there was no caller of `wait` our notification was not removed from the pipe. if sent_notification { self.notify_waker.ack_and_reset(); } let res = f(&mut *fds); if self.waiting_operations.fetch_sub(1, Ordering::SeqCst) == 1 { self.operations_complete.notify_one(); } res } /// Special optimized version of [Self::deregister] which handles multiple removals /// at once. Ok result if all removals were performed, Err if any entries /// were not found. fn deregister_all(&self, targets: &[RawFd]) -> Result<(), ()> { if targets.is_empty() { return Ok(()); } let mut pending_removal = self.pending_removal.lock().unwrap(); pending_removal.extend(targets); drop(pending_removal); self.modify_fds(|fds| { let mut all_successful = true; for target in targets { match fds.fd_data.remove(target).ok_or(()) { Ok(data) => { data.shared_record.mark_unregistered(); fds.poll_fds.swap_remove(data.poll_fds_index); if let Some(swapped_pollfd) = fds.poll_fds.get(data.poll_fds_index) { fds.fd_data .get_mut(&swapped_pollfd.0.fd) .unwrap() .poll_fds_index = data.poll_fds_index; } } Err(_) => all_successful = false, } } if all_successful { Ok(()) } else { Err(()) } }) } pub fn wake(&self, token: Token) -> io::Result<()> { self.pending_wake_token.lock().unwrap().replace(token); self.notify_waker.wake() } } /// Shared record between IoSourceState and SelectorState that allows us to internally /// deregister partially or fully closed fds (i.e. when we get POLLHUP or PULLERR) without /// confusing IoSourceState and trying to deregister twice. This isn't strictly /// required as technically deregister is idempotent but it is confusing /// when trying to debug behaviour as we get imbalanced calls to register/deregister and /// superfluous NotFound errors. #[derive(Debug)] pub(crate) struct RegistrationRecord { is_unregistered: AtomicBool, } impl RegistrationRecord { pub fn new() -> Self { Self { is_unregistered: AtomicBool::new(false), } } pub fn mark_unregistered(&self) { self.is_unregistered.store(true, Ordering::Relaxed); } #[allow(dead_code)] pub fn is_registered(&self) -> bool { !self.is_unregistered.load(Ordering::Relaxed) } } #[cfg(target_os = "linux")] const POLLRDHUP: libc::c_short = libc::POLLRDHUP; #[cfg(not(target_os = "linux"))] const POLLRDHUP: libc::c_short = 0; const READ_EVENTS: libc::c_short = libc::POLLIN | POLLRDHUP; const WRITE_EVENTS: libc::c_short = libc::POLLOUT; const PRIORITY_EVENTS: libc::c_short = libc::POLLPRI; /// Get the input poll events for the given event. fn interests_to_poll(interest: Interest) -> libc::c_short { let mut kind = 0; if interest.is_readable() { kind |= READ_EVENTS; } if interest.is_writable() { kind |= WRITE_EVENTS; } if interest.is_priority() { kind |= PRIORITY_EVENTS; } kind } /// Helper function to call poll. fn poll(fds: &mut [PollFd], timeout: Option) -> io::Result { loop { // A bug in kernels < 2.6.37 makes timeouts larger than LONG_MAX / CONFIG_HZ // (approx. 30 minutes with CONFIG_HZ=1200) effectively infinite on 32 bits // architectures. The magic number is the same constant used by libuv. #[cfg(target_pointer_width = "32")] const MAX_SAFE_TIMEOUT: u128 = 1789569; #[cfg(not(target_pointer_width = "32"))] const MAX_SAFE_TIMEOUT: u128 = libc::c_int::MAX as u128; let timeout = timeout .map(|to| { // `Duration::as_millis` truncates, so round up. This avoids // turning sub-millisecond timeouts into a zero timeout, unless // the caller explicitly requests that by specifying a zero // timeout. let to_ms = to .checked_add(Duration::from_nanos(999_999)) .unwrap_or(to) .as_millis(); cmp::min(MAX_SAFE_TIMEOUT, to_ms) as libc::c_int }) .unwrap_or(-1); let res = syscall!(poll( fds.as_mut_ptr() as *mut libc::pollfd, fds.len() as libc::nfds_t, timeout, )); match res { Ok(num_events) => break Ok(num_events as usize), // poll returns EAGAIN if we can retry it. Err(e) if e.raw_os_error() == Some(libc::EAGAIN) => continue, Err(e) => return Err(e), } } } #[derive(Debug, Clone)] pub struct Event { token: Token, events: libc::c_short, } pub type Events = Vec; pub mod event { use std::fmt; use crate::sys::Event; use crate::Token; use super::POLLRDHUP; pub fn token(event: &Event) -> Token { event.token } pub fn is_readable(event: &Event) -> bool { (event.events & libc::POLLIN) != 0 || (event.events & libc::POLLPRI) != 0 } pub fn is_writable(event: &Event) -> bool { (event.events & libc::POLLOUT) != 0 } pub fn is_error(event: &Event) -> bool { (event.events & libc::POLLERR) != 0 } pub fn is_read_closed(event: &Event) -> bool { // Both halves of the socket have closed (event.events & libc::POLLHUP) != 0 // Socket has received FIN or called shutdown(SHUT_RD) || (event.events & POLLRDHUP) != 0 } pub fn is_write_closed(event: &Event) -> bool { // Both halves of the socket have closed (event.events & libc::POLLHUP) != 0 // Unix pipe write end has closed || ((event.events & libc::POLLOUT) != 0 && (event.events & libc::POLLERR) != 0) // The other side (read end) of a Unix pipe has closed. || (event.events == libc::POLLERR) } pub fn is_priority(event: &Event) -> bool { (event.events & libc::POLLPRI) != 0 } pub fn is_aio(_: &Event) -> bool { // Not supported in the kernel, only in libc. false } pub fn is_lio(_: &Event) -> bool { // Not supported. false } pub fn debug_details(f: &mut fmt::Formatter<'_>, event: &Event) -> fmt::Result { #[allow(clippy::trivially_copy_pass_by_ref)] fn check_events(got: &libc::c_short, want: &libc::c_short) -> bool { (*got & want) != 0 } debug_detail!( EventsDetails(libc::c_short), check_events, libc::POLLIN, libc::POLLPRI, libc::POLLOUT, libc::POLLRDNORM, libc::POLLRDBAND, libc::POLLWRNORM, libc::POLLWRBAND, libc::POLLERR, libc::POLLHUP, ); f.debug_struct("poll_event") .field("token", &event.token) .field("events", &EventsDetails(event.events)) .finish() } } #[derive(Debug)] pub(crate) struct Waker { selector: Selector, token: Token, } impl Waker { pub(crate) fn new(selector: &Selector, token: Token) -> io::Result { Ok(Waker { selector: selector.try_clone()?, token, }) } pub(crate) fn wake(&self) -> io::Result<()> { self.selector.wake(self.token) } } cfg_io_source! { use crate::Registry; struct InternalState { selector: Selector, token: Token, interests: Interest, fd: RawFd, shared_record: Arc, } impl Drop for InternalState { fn drop(&mut self) { if self.shared_record.is_registered() { let _ = self.selector.deregister(self.fd); } } } pub(crate) struct IoSourceState { inner: Option>, } impl IoSourceState { pub fn new() -> IoSourceState { IoSourceState { inner: None } } pub fn do_io(&self, f: F, io: &T) -> io::Result where F: FnOnce(&T) -> io::Result, { let result = f(io); if let Err(err) = &result { if err.kind() == io::ErrorKind::WouldBlock { self.inner.as_ref().map_or(Ok(()), |state| { state .selector .reregister(state.fd, state.token, state.interests) })?; } } result } pub fn register( &mut self, registry: &Registry, token: Token, interests: Interest, fd: RawFd, ) -> io::Result<()> { if self.inner.is_some() { Err(io::ErrorKind::AlreadyExists.into()) } else { let selector = registry.selector().try_clone()?; selector.register_internal(fd, token, interests).map(move |shared_record| { let state = InternalState { selector, token, interests, fd, shared_record, }; self.inner = Some(Box::new(state)); }) } } pub fn reregister( &mut self, registry: &Registry, token: Token, interests: Interest, fd: RawFd, ) -> io::Result<()> { match self.inner.as_mut() { Some(state) => registry .selector() .reregister(fd, token, interests) .map(|()| { state.token = token; state.interests = interests; }), None => Err(io::ErrorKind::NotFound.into()), } } pub fn deregister(&mut self, registry: &Registry, fd: RawFd) -> io::Result<()> { if let Some(state) = self.inner.take() { // Marking unregistered will short circuit the drop behaviour of calling // deregister so the call to deregister below is strictly required. state.shared_record.mark_unregistered(); } registry.selector().deregister(fd) } } }