use core::marker::PhantomData; use core::pin::Pin; use futures_core::ready; use futures_core::stream::{FusedStream, Stream, TryStream}; use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; use pin_project_lite::pin_project; use crate::future::Either; use crate::stream::stream::flatten_unordered::{ FlattenUnorderedWithFlowController, FlowController, FlowStep, }; use crate::stream::IntoStream; use crate::TryStreamExt; delegate_all!( /// Stream for the [`try_flatten_unordered`](super::TryStreamExt::try_flatten_unordered) method. TryFlattenUnordered( FlattenUnorderedWithFlowController, PropagateBaseStreamError> ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[ |stream: St, limit: impl Into>| FlattenUnorderedWithFlowController::new( NestedTryStreamIntoEitherTryStream::new(stream), limit.into() ) ] where St: TryStream, St::Ok: TryStream, St::Ok: Unpin, ::Error: From ); pin_project! { /// Emits either successful streams or single-item streams containing the underlying errors. /// This's a wrapper for `FlattenUnordered` to reuse its logic over `TryStream`. #[derive(Debug)] #[must_use = "streams do nothing unless polled"] pub struct NestedTryStreamIntoEitherTryStream where St: TryStream, St::Ok: TryStream, St::Ok: Unpin, ::Error: From { #[pin] stream: St } } impl NestedTryStreamIntoEitherTryStream where St: TryStream, St::Ok: TryStream + Unpin, ::Error: From, { fn new(stream: St) -> Self { Self { stream } } delegate_access_inner!(stream, St, ()); } /// Emits a single item immediately, then stream will be terminated. #[derive(Debug, Clone)] pub struct Single(Option); impl Single { /// Constructs new `Single` with the given value. fn new(val: T) -> Self { Self(Some(val)) } /// Attempts to take inner item immediately. Will always succeed if the stream isn't terminated. fn next_immediate(&mut self) -> Option { self.0.take() } } impl Unpin for Single {} impl Stream for Single { type Item = T; fn poll_next(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { Poll::Ready(self.0.take()) } fn size_hint(&self) -> (usize, Option) { self.0.as_ref().map_or((0, Some(0)), |_| (1, Some(1))) } } /// Immediately propagates errors occurred in the base stream. #[derive(Debug, Clone, Copy)] pub struct PropagateBaseStreamError(PhantomData); type BaseStreamItem = as Stream>::Item; type InnerStreamItem = as Stream>::Item; impl FlowController, InnerStreamItem> for PropagateBaseStreamError where St: TryStream, St::Ok: TryStream + Unpin, ::Error: From, { fn next_step(item: BaseStreamItem) -> FlowStep, InnerStreamItem> { match item { // A new successful inner stream received st @ Either::Left(_) => FlowStep::Continue(st), // An error encountered Either::Right(mut err) => FlowStep::Return(err.next_immediate().unwrap()), } } } type SingleStreamResult = Single::Ok, ::Error>>; impl Stream for NestedTryStreamIntoEitherTryStream where St: TryStream, St::Ok: TryStream + Unpin, ::Error: From, { // Item is either an inner stream or a stream containing a single error. // This will allow using `Either`'s `Stream` implementation as both branches are actually streams of `Result`'s. type Item = Either, SingleStreamResult>; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let item = ready!(self.project().stream.try_poll_next(cx)); let out = match item { Some(res) => match res { // Emit successful inner stream as is Ok(stream) => Either::Left(stream.into_stream()), // Wrap an error into a stream containing a single item err @ Err(_) => { let res = err.map(|_: St::Ok| unreachable!()).map_err(Into::into); Either::Right(Single::new(res)) } }, None => return Poll::Ready(None), }; Poll::Ready(Some(out)) } } impl FusedStream for NestedTryStreamIntoEitherTryStream where St: TryStream + FusedStream, St::Ok: TryStream + Unpin, ::Error: From, { fn is_terminated(&self) -> bool { self.stream.is_terminated() } } // Forwarding impl of Sink from the underlying stream #[cfg(feature = "sink")] impl Sink for NestedTryStreamIntoEitherTryStream where St: TryStream + Sink, St::Ok: TryStream + Unpin, ::Error: From<::Error>, { type Error = >::Error; delegate_sink!(stream, Item); }