Sha256: 143ee19056b9ee9e480903cf4a1b00da7d4e528c5804569bf8c40869e6ac6eed

Contents?: true

Size: 1.98 KB

Versions: 27

Compression:

Stored size: 1.98 KB

Contents

use futures::channel::{mpsc, oneshot};
use futures::executor::{block_on, block_on_stream};
use futures::sink::SinkExt;
use futures::stream::StreamExt;
use std::sync::mpsc as std_mpsc;
use std::thread;

#[test]
#[ignore] // FIXME: https://github.com/rust-lang/futures-rs/issues/1790
fn works() {
    const N: usize = 4;

    let (mut tx, rx) = mpsc::channel(1);

    let (tx2, rx2) = std_mpsc::channel();
    let (tx3, rx3) = std_mpsc::channel();
    let t1 = thread::spawn(move || {
        for _ in 0..=N {
            let (mytx, myrx) = oneshot::channel();
            block_on(tx.send(myrx)).unwrap();
            tx3.send(mytx).unwrap();
        }
        rx2.recv().unwrap();
        for _ in 0..N {
            let (mytx, myrx) = oneshot::channel();
            block_on(tx.send(myrx)).unwrap();
            tx3.send(mytx).unwrap();
        }
    });

    let (tx4, rx4) = std_mpsc::channel();
    let t2 = thread::spawn(move || {
        for item in block_on_stream(rx.buffer_unordered(N)) {
            tx4.send(item.unwrap()).unwrap();
        }
    });

    let o1 = rx3.recv().unwrap();
    let o2 = rx3.recv().unwrap();
    let o3 = rx3.recv().unwrap();
    let o4 = rx3.recv().unwrap();
    assert!(rx4.try_recv().is_err());

    o1.send(1).unwrap();
    assert_eq!(rx4.recv(), Ok(1));
    o3.send(3).unwrap();
    assert_eq!(rx4.recv(), Ok(3));
    tx2.send(()).unwrap();
    o2.send(2).unwrap();
    assert_eq!(rx4.recv(), Ok(2));
    o4.send(4).unwrap();
    assert_eq!(rx4.recv(), Ok(4));

    let o5 = rx3.recv().unwrap();
    let o6 = rx3.recv().unwrap();
    let o7 = rx3.recv().unwrap();
    let o8 = rx3.recv().unwrap();
    let o9 = rx3.recv().unwrap();

    o5.send(5).unwrap();
    assert_eq!(rx4.recv(), Ok(5));
    o8.send(8).unwrap();
    assert_eq!(rx4.recv(), Ok(8));
    o9.send(9).unwrap();
    assert_eq!(rx4.recv(), Ok(9));
    o7.send(7).unwrap();
    assert_eq!(rx4.recv(), Ok(7));
    o6.send(6).unwrap();
    assert_eq!(rx4.recv(), Ok(6));

    t1.join().unwrap();
    t2.join().unwrap();
}

Version data entries

27 entries across 27 versions & 1 rubygems

Version Path
wasmtime-30.0.2 ./ext/cargo-vendor/futures-0.3.31/tests/stream_buffer_unordered.rs
wasmtime-29.0.0 ./ext/cargo-vendor/futures-0.3.31/tests/stream_buffer_unordered.rs
wasmtime-28.0.0 ./ext/cargo-vendor/futures-0.3.31/tests/stream_buffer_unordered.rs
wasmtime-27.0.0 ./ext/cargo-vendor/futures-0.3.31/tests/stream_buffer_unordered.rs
wasmtime-26.0.0 ./ext/cargo-vendor/futures-0.3.31/tests/stream_buffer_unordered.rs
wasmtime-25.0.2 ./ext/cargo-vendor/futures-0.3.30/tests/stream_buffer_unordered.rs
wasmtime-25.0.1 ./ext/cargo-vendor/futures-0.3.30/tests/stream_buffer_unordered.rs
wasmtime-25.0.0 ./ext/cargo-vendor/futures-0.3.30/tests/stream_buffer_unordered.rs
wasmtime-24.0.0 ./ext/cargo-vendor/futures-0.3.30/tests/stream_buffer_unordered.rs
wasmtime-23.0.2 ./ext/cargo-vendor/futures-0.3.30/tests/stream_buffer_unordered.rs
wasmtime-22.0.0 ./ext/cargo-vendor/futures-0.3.30/tests/stream_buffer_unordered.rs
wasmtime-21.0.1 ./ext/cargo-vendor/futures-0.3.30/tests/stream_buffer_unordered.rs
wasmtime-20.0.2 ./ext/cargo-vendor/futures-0.3.30/tests/stream_buffer_unordered.rs
wasmtime-20.0.0 ./ext/cargo-vendor/futures-0.3.30/tests/stream_buffer_unordered.rs
wasmtime-18.0.3 ./ext/cargo-vendor/futures-0.3.30/tests/stream_buffer_unordered.rs
wasmtime-17.0.1 ./ext/cargo-vendor/futures-0.3.30/tests/stream_buffer_unordered.rs
wasmtime-17.0.0 ./ext/cargo-vendor/futures-0.3.30/tests/stream_buffer_unordered.rs
wasmtime-16.0.0 ./ext/cargo-vendor/futures-0.3.30/tests/stream_buffer_unordered.rs
wasmtime-15.0.1 ./ext/cargo-vendor/futures-0.3.30/tests/stream_buffer_unordered.rs
wasmtime-15.0.0 ./ext/cargo-vendor/futures-0.3.30/tests/stream_buffer_unordered.rs