Sha256: 00d48122fa2ccbf1fe0b110ce3cf22590eda54b3ddec0134b1f9376eb1169645
Contents?: true
Size: 1.07 KB
Versions: 39
Compression:
Stored size: 1.07 KB
Contents
#![allow(dead_code)] use std::pin::Pin; use std::task::{Context, Poll}; use tokio::sync::mpsc::{self, Receiver, Sender, UnboundedReceiver, UnboundedSender}; use tokio_stream::Stream; struct UnboundedStream<T> { recv: UnboundedReceiver<T>, } impl<T> Stream for UnboundedStream<T> { type Item = T; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> { Pin::into_inner(self).recv.poll_recv(cx) } } pub fn unbounded_channel_stream<T: Unpin>() -> (UnboundedSender<T>, impl Stream<Item = T>) { let (tx, rx) = mpsc::unbounded_channel(); let stream = UnboundedStream { recv: rx }; (tx, stream) } struct BoundedStream<T> { recv: Receiver<T>, } impl<T> Stream for BoundedStream<T> { type Item = T; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> { Pin::into_inner(self).recv.poll_recv(cx) } } pub fn channel_stream<T: Unpin>(size: usize) -> (Sender<T>, impl Stream<Item = T>) { let (tx, rx) = mpsc::channel(size); let stream = BoundedStream { recv: rx }; (tx, stream) }
Version data entries
39 entries across 39 versions & 1 rubygems