Sha256: 0f68f444ef13fe7115164be855c3b7b1d269e1119e69fcdad1706988255641f1
Contents?: true
Size: 1.9 KB
Versions: 27
Compression:
Stored size: 1.9 KB
Contents
use core::pin::Pin; use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; use futures_sink::Sink; use pin_project_lite::pin_project; pin_project! { /// Sink for the [`sink_map_err`](super::SinkExt::sink_map_err) method. #[derive(Debug, Clone)] #[must_use = "sinks do nothing unless polled"] pub struct SinkMapErr<Si, F> { #[pin] sink: Si, f: Option<F>, } } impl<Si, F> SinkMapErr<Si, F> { pub(super) fn new(sink: Si, f: F) -> Self { Self { sink, f: Some(f) } } delegate_access_inner!(sink, Si, ()); fn take_f(self: Pin<&mut Self>) -> F { self.project().f.take().expect("polled MapErr after completion") } } impl<Si, F, E, Item> Sink<Item> for SinkMapErr<Si, F> where Si: Sink<Item>, F: FnOnce(Si::Error) -> E, { type Error = E; fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { self.as_mut().project().sink.poll_ready(cx).map_err(|e| self.as_mut().take_f()(e)) } fn start_send(mut self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> { self.as_mut().project().sink.start_send(item).map_err(|e| self.as_mut().take_f()(e)) } fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { self.as_mut().project().sink.poll_flush(cx).map_err(|e| self.as_mut().take_f()(e)) } fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { self.as_mut().project().sink.poll_close(cx).map_err(|e| self.as_mut().take_f()(e)) } } // Forwarding impl of Stream from the underlying sink impl<S: Stream, F> Stream for SinkMapErr<S, F> { type Item = S::Item; delegate_stream!(sink); } impl<S: FusedStream, F> FusedStream for SinkMapErr<S, F> { fn is_terminated(&self) -> bool { self.sink.is_terminated() } }
Version data entries
27 entries across 27 versions & 1 rubygems