desync icon indicating copy to clipboard operation
desync copied to clipboard

Pipe sometimes skips sent item

Open mpskowron opened this issue 5 years ago • 2 comments

Hi, I had a code like this:

let (mut sender, stream) = mpsc::channel(1000);
tokio::task::spawn(async move {
            while <some_condition> {
                let item = ...
                sender.send(item).await.unwrap();
            }
        });
let stream2 = pipe(Arc::new(Desync::new(0)), stream, move |_, item| {
            let result = ...
            futures::future::ready(result).boxed()
        });

It occurred, that sometimes an item, despite being successfully sent by sender, wasn't processed at all in pipe's closure. After rewriting the pipe to:

let (mut sender2, stream2) = mpsc::channel(1000);
tokio::task::spawn(async move {
            while let Some(item) = stream.next().await {
                sender2.send(item).await.unwrap();
            }
        });        

Everything started working correctly, thus it seems like it is a bug.

mpskowron avatar Sep 12 '20 16:09 mpskowron

Are you using v0.6 (the latest version available on crates.io) or v0.7 (the latest version in this repository) when you see this bug? I rewrote the way streams get scheduled in 0.7 so it's possible that it fixes the bug if you're using v0.6 (or introduces it if it's the other way around).

From your description, I take it the pipe isn't getting stuck and stopping processing data but rather dropping values - so you send '1, 2, 3, 4' and it only processes '1, 2, 4' or something. This would probably be in relation to the code I've rewritten if so, though it could also be somewhere in the main scheduler (in which case there should be a way to get the issue to occur without using a pipe).

I've tried expanding your example into a larger program to try to reproduce the issue and I haven't seen it yet on either version (though the v0.7 version is a lot faster) - it looks like this, in case I'm missing something (if this is a race condition, it's possible something extra is needed to make it fail):

use ::desync::*;
use futures::prelude::*;
use futures::channel::mpsc;
use futures::future::{select, Either};
use tokio;

use std::time::{Duration};
use std::sync::{Arc};

#[tokio::main]
async fn main() {
    let (mut sender, stream) = mpsc::channel(1000);
    tokio::task::spawn(async move {
        let mut counter: i64 = 0;
        loop {
            counter += 1;
            let item = counter;
            sender.send(item).await.unwrap();
        }
    });

    let mut stream2 = pipe(Arc::new(Desync::new(0)), stream, move |_, item| {
        let result = item + 1;
        futures::future::ready(result).boxed()
    });

    let mut next_expected = 2;

    println!("Running...");
    loop {
        let timeout = tokio::time::delay_for(Duration::from_millis(1000));

        let next = select(stream2.next(), timeout).await;

        if (next_expected % 100_000) == 0 {
            println!("{} iterations", next_expected);
        }

        match next {
            Either::Left((next, _)) => {
                assert!(Some(next_expected) == next);
            }

            Either::Right(_timeout) => {
                panic!("Stream timeout");
            }
        }
        next_expected += 1;
    }
}

Logicalshift avatar Sep 12 '20 21:09 Logicalshift

I am using v0.6. There was one more important thing happening in my code, which I thought doesn't matter, but it definitely does - the cycle between channels. Looks like I was mistaken and it doesn't skip the item, but it seems like sometimes the newly arrived item doesn't trigger pipe's callback and get stuck until another item arrives. I modified your code to showcase it - this program is consistently getting stuck after some time on my machine:

use futures::channel::mpsc;
use tokio;

use std::sync::{Arc};
use tokio::stream::StreamExt;
use futures::{SinkExt, FutureExt};

#[tokio::main]
async fn main() {
    let (mut sender0, mut stream0) = mpsc::channel(1000);
    sender0.send(0).await.unwrap();
    let (mut sender, stream) = mpsc::channel(1000);
    tokio::task::spawn(async move {
        while let Some(counter) = stream0.next().await {
            sender.send(counter).await.unwrap();
        }
    });

    let mut stream2 = pipe(Arc::new(Desync::new(0)), stream, move |_, item| {
        let result = item + 1;
        futures::future::ready(result).boxed()
    });

    let mut next_expected = 1i64;

    println!("Running...");
    loop {
        let next = stream2.next().await.unwrap();
        sender0.send(next).await.unwrap();
        assert_eq!(next_expected, next);
        if (next_expected % 10_000) == 0 {
            println!("{} iterations", next_expected);
        }
        next_expected += 1;
    }
}```

mpskowron avatar Sep 13 '20 08:09 mpskowron