use crate::io::interest::Interest; use crate::runtime::io::Registration; use crate::runtime::scheduler; use mio::event::Source; use std::fmt; use std::io; use std::ops::Deref; use std::panic::{RefUnwindSafe, UnwindSafe}; use std::task::ready; cfg_io_driver! { /// Associates an I/O resource that implements the [`std::io::Read`] and/or /// [`std::io::Write`] traits with the reactor that drives it. /// /// `PollEvented` uses [`Registration`] internally to take a type that /// implements [`mio::event::Source`] as well as [`std::io::Read`] and/or /// [`std::io::Write`] and associate it with a reactor that will drive it. /// /// Once the [`mio::event::Source`] type is wrapped by `PollEvented`, it can be /// used from within the future's execution model. As such, the /// `PollEvented` type provides [`AsyncRead`] and [`AsyncWrite`] /// implementations using the underlying I/O resource as well as readiness /// events provided by the reactor. /// /// **Note**: While `PollEvented` is `Sync` (if the underlying I/O type is /// `Sync`), the caller must ensure that there are at most two tasks that /// use a `PollEvented` instance concurrently. One for reading and one for /// writing. While violating this requirement is "safe" from a Rust memory /// model point of view, it will result in unexpected behavior in the form /// of lost notifications and tasks hanging. /// /// ## Readiness events /// /// Besides just providing [`AsyncRead`] and [`AsyncWrite`] implementations, /// this type also supports access to the underlying readiness event stream. /// While similar in function to what [`Registration`] provides, the /// semantics are a bit different. /// /// Two functions are provided to access the readiness events: /// [`poll_read_ready`] and [`poll_write_ready`]. These functions return the /// current readiness state of the `PollEvented` instance. If /// [`poll_read_ready`] indicates read readiness, immediately calling /// [`poll_read_ready`] again will also indicate read readiness. /// /// When the operation is attempted and is unable to succeed due to the I/O /// resource not being ready, the caller must call [`clear_readiness`]. /// This clears the readiness state until a new readiness event is received. /// /// This allows the caller to implement additional functions. For example, /// [`TcpListener`] implements `poll_accept` by using [`poll_read_ready`] and /// [`clear_readiness`]. /// /// ## Platform-specific events /// /// `PollEvented` also allows receiving platform-specific `mio::Ready` events. /// These events are included as part of the read readiness event stream. The /// write readiness event stream is only for `Ready::writable()` events. /// /// [`AsyncRead`]: crate::io::AsyncRead /// [`AsyncWrite`]: crate::io::AsyncWrite /// [`TcpListener`]: crate::net::TcpListener /// [`clear_readiness`]: Registration::clear_readiness /// [`poll_read_ready`]: Registration::poll_read_ready /// [`poll_write_ready`]: Registration::poll_write_ready pub(crate) struct PollEvented { io: Option, registration: Registration, } } // ===== impl PollEvented ===== impl PollEvented { /// Creates a new `PollEvented` associated with the default reactor. /// /// The returned `PollEvented` has readable and writable interests. For more control, use /// [`Self::new_with_interest`]. /// /// # Panics /// /// This function panics if thread-local runtime is not set. /// /// The runtime is usually set implicitly when this function is called /// from a future driven by a tokio runtime, otherwise runtime can be set /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. #[track_caller] #[cfg_attr(feature = "signal", allow(unused))] pub(crate) fn new(io: E) -> io::Result { PollEvented::new_with_interest(io, Interest::READABLE | Interest::WRITABLE) } /// Creates a new `PollEvented` associated with the default reactor, for /// specific `Interest` state. `new_with_interest` should be used over `new` /// when you need control over the readiness state, such as when a file /// descriptor only allows reads. This does not add `hup` or `error` so if /// you are interested in those states, you will need to add them to the /// readiness state passed to this function. /// /// # Panics /// /// This function panics if thread-local runtime is not set. /// /// The runtime is usually set implicitly when this function is called from /// a future driven by a tokio runtime, otherwise runtime can be set /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) /// function. #[track_caller] #[cfg_attr(feature = "signal", allow(unused))] pub(crate) fn new_with_interest(io: E, interest: Interest) -> io::Result { Self::new_with_interest_and_handle(io, interest, scheduler::Handle::current()) } #[track_caller] pub(crate) fn new_with_interest_and_handle( mut io: E, interest: Interest, handle: scheduler::Handle, ) -> io::Result { let registration = Registration::new_with_interest_and_handle(&mut io, interest, handle)?; Ok(Self { io: Some(io), registration, }) } /// Returns a reference to the registration. #[cfg(feature = "net")] pub(crate) fn registration(&self) -> &Registration { &self.registration } /// Deregisters the inner io from the registration and returns a Result containing the inner io. #[cfg(any(feature = "net", feature = "process"))] pub(crate) fn into_inner(mut self) -> io::Result { let mut inner = self.io.take().unwrap(); // As io shouldn't ever be None, just unwrap here. self.registration.deregister(&mut inner)?; Ok(inner) } #[cfg(all(feature = "process", target_os = "linux"))] pub(crate) fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll> { self.registration .poll_read_ready(cx) .map_err(io::Error::from) .map_ok(|_| ()) } /// Re-register under new runtime with `interest`. #[cfg(all(feature = "process", target_os = "linux"))] pub(crate) fn reregister(&mut self, interest: Interest) -> io::Result<()> { let io = self.io.as_mut().unwrap(); // As io shouldn't ever be None, just unwrap here. let _ = self.registration.deregister(io); self.registration = Registration::new_with_interest_and_handle(io, interest, scheduler::Handle::current())?; Ok(()) } } feature! { #![any(feature = "net", all(unix, feature = "process"))] use crate::io::ReadBuf; use std::task::{Context, Poll}; impl PollEvented { // Safety: The caller must ensure that `E` can read into uninitialized memory pub(crate) unsafe fn poll_read<'a>( &'a self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll> where &'a E: io::Read + 'a, { use std::io::Read; loop { let evt = ready!(self.registration.poll_read_ready(cx))?; let b = &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit] as *mut [u8]); // used only when the cfgs below apply #[allow(unused_variables)] let len = b.len(); match self.io.as_ref().unwrap().read(b) { Ok(n) => { // When mio is using the epoll or kqueue selector, reading a partially full // buffer is sufficient to show that the socket buffer has been drained. // // This optimization does not work for level-triggered selectors such as // windows or when poll is used. // // Read more: // https://github.com/tokio-rs/tokio/issues/5866 #[cfg(all( not(mio_unsupported_force_poll_poll), any( // epoll target_os = "android", target_os = "illumos", target_os = "linux", target_os = "redox", // kqueue target_os = "dragonfly", target_os = "freebsd", target_os = "ios", target_os = "macos", target_os = "netbsd", target_os = "openbsd", target_os = "tvos", target_os = "visionos", target_os = "watchos", ) ))] if 0 < n && n < len { self.registration.clear_readiness(evt); } // Safety: We trust `TcpStream::read` to have filled up `n` bytes in the // buffer. buf.assume_init(n); buf.advance(n); return Poll::Ready(Ok(())); }, Err(e) if e.kind() == io::ErrorKind::WouldBlock => { self.registration.clear_readiness(evt); } Err(e) => return Poll::Ready(Err(e)), } } } pub(crate) fn poll_write<'a>(&'a self, cx: &mut Context<'_>, buf: &[u8]) -> Poll> where &'a E: io::Write + 'a, { use std::io::Write; loop { let evt = ready!(self.registration.poll_write_ready(cx))?; match self.io.as_ref().unwrap().write(buf) { Ok(n) => { // if we write only part of our buffer, this is sufficient on unix to show // that the socket buffer is full. Unfortunately this assumption // fails for level-triggered selectors (like on Windows or poll even for // UNIX): https://github.com/tokio-rs/tokio/issues/5866 if n > 0 && (!cfg!(windows) && !cfg!(mio_unsupported_force_poll_poll) && n < buf.len()) { self.registration.clear_readiness(evt); } return Poll::Ready(Ok(n)); }, Err(e) if e.kind() == io::ErrorKind::WouldBlock => { self.registration.clear_readiness(evt); } Err(e) => return Poll::Ready(Err(e)), } } } #[cfg(any(feature = "net", feature = "process"))] pub(crate) fn poll_write_vectored<'a>( &'a self, cx: &mut Context<'_>, bufs: &[io::IoSlice<'_>], ) -> Poll> where &'a E: io::Write + 'a, { use std::io::Write; self.registration.poll_write_io(cx, || self.io.as_ref().unwrap().write_vectored(bufs)) } } } impl UnwindSafe for PollEvented {} impl RefUnwindSafe for PollEvented {} impl Deref for PollEvented { type Target = E; fn deref(&self) -> &E { self.io.as_ref().unwrap() } } impl fmt::Debug for PollEvented { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("PollEvented").field("io", &self.io).finish() } } impl Drop for PollEvented { fn drop(&mut self) { if let Some(mut io) = self.io.take() { // Ignore errors let _ = self.registration.deregister(&mut io); } } }