use core::fmt; use core::pin::Pin; use futures_core::ready; use futures_core::stream::Stream; use futures_core::task::{Context, Poll}; use futures_sink::Sink; use crate::lock::BiLock; /// A `Stream` part of the split pair #[derive(Debug)] #[must_use = "streams do nothing unless polled"] #[cfg_attr(docsrs, doc(cfg(feature = "sink")))] pub struct SplitStream(BiLock); impl Unpin for SplitStream {} impl SplitStream { /// Returns `true` if the `SplitStream` and `SplitSink` originate from the same call to `StreamExt::split`. pub fn is_pair_of(&self, other: &SplitSink) -> bool { other.is_pair_of(&self) } } impl SplitStream { /// Attempts to put the two "halves" of a split `Stream + Sink` back /// together. Succeeds only if the `SplitStream` and `SplitSink` are /// a matching pair originating from the same call to `StreamExt::split`. pub fn reunite(self, other: SplitSink) -> Result> where S: Sink, { other.reunite(self) } } impl Stream for SplitStream { type Item = S::Item; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { ready!(self.0.poll_lock(cx)).as_pin_mut().poll_next(cx) } } #[allow(non_snake_case)] fn SplitSink, Item>(lock: BiLock) -> SplitSink { SplitSink { lock, slot: None } } /// A `Sink` part of the split pair #[derive(Debug)] #[must_use = "sinks do nothing unless polled"] #[cfg_attr(docsrs, doc(cfg(feature = "sink")))] pub struct SplitSink { lock: BiLock, slot: Option, } impl Unpin for SplitSink {} impl + Unpin, Item> SplitSink { /// Attempts to put the two "halves" of a split `Stream + Sink` back /// together. Succeeds only if the `SplitStream` and `SplitSink` are /// a matching pair originating from the same call to `StreamExt::split`. pub fn reunite(self, other: SplitStream) -> Result> { self.lock.reunite(other.0).map_err(|err| ReuniteError(SplitSink(err.0), SplitStream(err.1))) } } impl SplitSink { /// Returns `true` if the `SplitStream` and `SplitSink` originate from the same call to `StreamExt::split`. pub fn is_pair_of(&self, other: &SplitStream) -> bool { self.lock.is_pair_of(&other.0) } } impl, Item> SplitSink { fn poll_flush_slot( mut inner: Pin<&mut S>, slot: &mut Option, cx: &mut Context<'_>, ) -> Poll> { if slot.is_some() { ready!(inner.as_mut().poll_ready(cx))?; Poll::Ready(inner.start_send(slot.take().unwrap())) } else { Poll::Ready(Ok(())) } } fn poll_lock_and_flush_slot( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { let this = &mut *self; let mut inner = ready!(this.lock.poll_lock(cx)); Self::poll_flush_slot(inner.as_pin_mut(), &mut this.slot, cx) } } impl, Item> Sink for SplitSink { type Error = S::Error; fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { loop { if self.slot.is_none() { return Poll::Ready(Ok(())); } ready!(self.as_mut().poll_lock_and_flush_slot(cx))?; } } fn start_send(mut self: Pin<&mut Self>, item: Item) -> Result<(), S::Error> { self.slot = Some(item); Ok(()) } fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = &mut *self; let mut inner = ready!(this.lock.poll_lock(cx)); ready!(Self::poll_flush_slot(inner.as_pin_mut(), &mut this.slot, cx))?; inner.as_pin_mut().poll_flush(cx) } fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = &mut *self; let mut inner = ready!(this.lock.poll_lock(cx)); ready!(Self::poll_flush_slot(inner.as_pin_mut(), &mut this.slot, cx))?; inner.as_pin_mut().poll_close(cx) } } pub(super) fn split, Item>(s: S) -> (SplitSink, SplitStream) { let (a, b) = BiLock::new(s); let read = SplitStream(a); let write = SplitSink(b); (write, read) } /// Error indicating a `SplitSink` and `SplitStream` were not two halves /// of a `Stream + Split`, and thus could not be `reunite`d. #[cfg_attr(docsrs, doc(cfg(feature = "sink")))] pub struct ReuniteError(pub SplitSink, pub SplitStream); impl fmt::Debug for ReuniteError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_tuple("ReuniteError").field(&"...").finish() } } impl fmt::Display for ReuniteError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "tried to reunite a SplitStream and SplitSink that don't form a pair") } } #[cfg(feature = "std")] impl std::error::Error for ReuniteError {} #[cfg(test)] mod tests { use super::*; use crate::stream::StreamExt; use core::marker::PhantomData; struct NopStream { phantom: PhantomData, } impl Stream for NopStream { type Item = Item; fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { todo!() } } impl Sink for NopStream { type Error = (); fn poll_ready( self: Pin<&mut Self>, _cx: &mut Context<'_>, ) -> Poll> { todo!() } fn start_send(self: Pin<&mut Self>, _item: Item) -> Result<(), Self::Error> { todo!() } fn poll_flush( self: Pin<&mut Self>, _cx: &mut Context<'_>, ) -> Poll> { todo!() } fn poll_close( self: Pin<&mut Self>, _cx: &mut Context<'_>, ) -> Poll> { todo!() } } #[test] fn test_pairing() { let s1 = NopStream::<()> { phantom: PhantomData }; let (sink1, stream1) = s1.split(); assert!(sink1.is_pair_of(&stream1)); assert!(stream1.is_pair_of(&sink1)); let s2 = NopStream::<()> { phantom: PhantomData }; let (sink2, stream2) = s2.split(); assert!(sink2.is_pair_of(&stream2)); assert!(stream2.is_pair_of(&sink2)); assert!(!sink1.is_pair_of(&stream2)); assert!(!stream1.is_pair_of(&sink2)); assert!(!sink2.is_pair_of(&stream1)); assert!(!stream2.is_pair_of(&sink1)); } }