futures-rs icon indicating copy to clipboard operation
futures-rs copied to clipboard

Missing function to generate a TryStream from an iterator?

Open petekubiak opened this issue 1 year ago • 2 comments

I'm trying to generate a stream to work on a &mut [u8]. To do this I've tried to use:

async fn process(buffer: &mut [u8]) -> Result<(), MyError> {
    stream::iter(buffer).enumerate().try_for_each(async |(index, element)| {
        *element = fallible_read(index).await?;
        Ok(())
     })
}

but I get an error on try_for_each saying that "the method try_for_each exists for struct Enumerate<Iter<IterMut<'_, u8>>>, but its trait bounds were not satisfied"

There is also the following information:

note: the following trait bounds were not satisfied:
            `futures::stream::Enumerate<futures::stream::Iter<core::slice::IterMut<'_, u8>>>: TryStream`
            which is required by `futures::stream::Enumerate<futures::stream::Iter<core::slice::IterMut<'_, u8>>>: TryStreamExt`
            `&futures::stream::Enumerate<futures::stream::Iter<core::slice::IterMut<'_, u8>>>: TryStream`
            which is required by `&futures::stream::Enumerate<futures::stream::Iter<core::slice::IterMut<'_, u8>>>: TryStreamExt`
            `&mut futures::stream::Enumerate<futures::stream::Iter<core::slice::IterMut<'_, u8>>>: TryStream`
            which is required by `&mut futures::stream::Enumerate<futures::stream::Iter<core::slice::IterMut<'_, u8>>>: TryStreamExt`

Is there a missing function which generates a TryStream from an iterator rather than a Stream? E.g. stream::try_iter or something? Or am I just using this wrong?

petekubiak avatar Feb 13 '25 15:02 petekubiak

Maybe you want something like:

use futures::{StreamExt as _, TryStreamExt as _, stream};

struct MyError;
async fn process(buffer: &mut [u8]) -> Result<(), MyError> {
    stream::iter(buffer).enumerate().map(Ok).try_for_each(async |_| {
        Ok(())
    }).await
}

fn main() {}

Playground link

traviscross avatar Feb 17 '25 11:02 traviscross

Looks like that works, thanks!

So to dive into this a bit deeper, try_for_each's trait requirements are that the "input" is a Result type, hence why the map(Ok) is required.

As far as I'm aware though, Iterator::try_for_each() has no such restriction. As Streams are essentially async Iterators, it seems to me like their API should be equivalent. Most of the API is the same, is there a reason that this is different?

petekubiak avatar Feb 19 '25 15:02 petekubiak