dioxus icon indicating copy to clipboard operation
dioxus copied to clipboard

Add `use_stream` hook

Open trevyn opened this issue 3 years ago • 13 comments

Specific Demand

Like use_future, except instead of taking a Future<Output = T>, takes a futures::Stream<Item = T>

trevyn avatar Jun 06 '22 14:06 trevyn

Hi, thanks for the suggestion!

What would the API look like? use_future returns a UseFuture struct, which you can use to query the current state & resolved value of the future.

But if I understand correctly, Stream asynchronously provides the values of an iterator one by one. How would you expect to use those in the context of a Dioxus component?

I'm guessing there could probably be a callback for consuming the values? Then you would be able to push them into some state, and render all the values yielded so far. But I don't know a whole lot about async Rust, so not sure if that makes sense.

rMazeiks avatar Jun 06 '22 14:06 rMazeiks

Ah, yes. My use case would be replacing the state with each new value, basically the same API as use_future except it gets updated as new values come in. You’re right that appending each new value is also a valid use case. Hmm…

trevyn avatar Jun 06 '22 14:06 trevyn

Maybe it keeps an internal buffer, and has both a method for getting the entire buffer, and a method for getting the last item and clearing all previous items? (retain_last()? That’s the best name I can come up with right now!😅)

trevyn avatar Jun 06 '22 14:06 trevyn

My use case would be replacing the state with each new value

Ah, makes sense! I think with a callback API, we can support both usecases. Collection:

// pseudocode

let stuff = use_state(cx, || Vec::new()); // can be any data structure!
let _ = use_stream(cx, ||make_sream(), |value| stuff.write().push(value));

Latest-only:

let latest: Option<ValueType> = use_state(cx, || None);
let _ = use_stream(cx, ||make_sream(), |value| *latest.write() = Some(value));

rMazeiks avatar Jun 06 '22 15:06 rMazeiks

I like it!

trevyn avatar Jun 06 '22 15:06 trevyn

I'm interested, too.

We have a background job to refresh the state every N seconds, however, UseFuture or its set is not 'static so we cannot make it in a cx.spawn. We fork the use_future, rename it to use_stream, and make it accept Stream currently.

Please make use_stream work as use_future as much as possiblie, if we'd like to offer use_stream directly, especially the argument dependencies, Thanks!

flisky avatar Jun 07 '22 02:06 flisky

It looks like for ergonomics, it might make sense to include the use_state in the hook, otherwise a bunch of lifetime issues come up.

Here's an attempt:

pub fn use_stream<R, S, T, U>(
 cx: &ScopeState,
 stream: impl FnOnce() -> S + 'static,
 initial: impl FnOnce() -> U,
 cb: impl Fn(&UseState<U>, Option<T>) -> R + 'static,
) -> &UseState<U>
where
 S: futures_util::Stream<Item = T>,
{
 let state = use_state(cx, initial);
 let state_cloned = state.clone();
 let _: &CoroutineHandle<()> = use_coroutine(cx, |_| async move {
  let stream = stream();
  futures_util::pin_mut!(stream);
  while let Some(val) = futures_util::StreamExt::next(&mut stream).await {
   cb(&state_cloned, Some(val));
  }
  cb(&state_cloned, None);
 });
 state
}

Usage:

let _ = use_stream(&cx, make_stream, || None, |s, v| v.map(|v| s.set(Some(v))));

let _ = use_stream(&cx, make_stream, Vec::new, |s, v| v.map(|v| s.make_mut().push(v)));

The dependencies part is a bit intense...!

trevyn avatar Jun 07 '22 19:06 trevyn

Here's an attempt:

Hmm, API looks pretty good to me! (Though I'm not the maintainer)

The dependencies part is a bit intense...!

True, 3 parameters can get hard to follow. Perhaps we can use Default to avoid the initializer? Works for all std collections (including Vec) and Option too. Not sure if there's a usecase it doesn't cover.

Perhaps, for maximum flexibility, we could have use_stream_with_init which takes an init callback, and a use_stream which just uses Default.

rMazeiks avatar Jun 07 '22 20:06 rMazeiks

I did try making it use Default, but then it required an explicit &UseState<Option<_>> in the usage, which seemed a bit much.

trevyn avatar Jun 07 '22 22:06 trevyn

Hi again, I took a closer look at your code.

I did try making it use Default, but then it required an explicit &UseState<Option<_>> in the usage, which seemed a bit much.

Indeed, it appears that type inference doesn't work in this situation. However, if we let the closure take a &mut instead of the UseState, inference works for some reason! Also, I think &mut is more ergonomic, since the closure doesn't need to know anything about a UseState.

btw I changed the UseState to a UseRef because we can't afford cloning the entire thing (say a huge Vec) if it happens to be a collection.

Also, I removed the R generic, and the update closure now returns nothing. Not sure why that was there.

Anyway, this compiles and seems to work:

#![allow(non_snake_case)]

use dioxus::prelude::*;
use futures::stream::{iter, StreamExt};
use futures::Stream;

fn main() {
    dioxus::desktop::launch(app);
}

fn make_stream() -> impl Stream<Item = i64> {
    iter(0..42)
}

pub fn app(cx: Scope) -> Element {
    let latest = use_stream(&cx, make_stream, |s, v| {
        if let Some(v) = v {
            *s = Some(v)
        }
    });

    let vec = use_stream(&cx, make_stream, |s: &mut Vec<_>, v| {
        if let Some(v) = v {
            s.push(v)
        }
    });

    let current = format!("{:?}", latest.read());
    let all = format!("{:?}", vec.read());

    cx.render(rsx! {
        div {
            "Current: {current}"
        },
        div {
            "All: {all}"
        }
    })
}

pub fn use_stream<C, T, S>(
    cx: &ScopeState,
    stream: impl FnOnce() -> S + 'static,
    cb: impl Fn(&mut C, Option<T>) + 'static,
) -> &UseRef<C>
where
    C: Default + 'static,
    S: Stream<Item = T>,
{
    let state = use_ref(cx, <C as Default>::default);

    let state_cloned = state.clone();
    let _ = use_future(cx, (), |_| async move {
        let stream = stream();
        futures_util::pin_mut!(stream);

        while let Some(val) = StreamExt::next(&mut stream).await {
            let mut container = state_cloned.write();
            cb(&mut *container, Some(val));
        }

        let mut container = state_cloned.write();
        cb(&mut *container, None);
    });

    state
}

Let me know what you think!

rMazeiks avatar Jun 08 '22 08:06 rMazeiks

Ah, good call on the &mut, use_ref, and use_future!

The R generic parameter is there so that we can make the usage a one-liner with map, which actually returns an Option that just gets dropped:

let latest = use_stream(&cx, make_stream, |s, v| v.map(|v| *s = Some(v)));

Haha, or even more aggressively:

let latest = use_stream(&cx, make_stream, |s, v| v.map(|_| *s = v));

Now for adding the dependencies tuple, could that just be passed through to use_future?...

trevyn avatar Jun 08 '22 13:06 trevyn

The R generic parameter is there so that we can make the usage a one-liner with map, which actually returns an Option that just gets dropped

idk, this smells like an anti-pattern to me. reasons why i don't like this:

  • Meaningless type parameter; confusing signature. Makes it seem like the return value does something – users may be mistaken into thinking e.g. that they should return the new value
  • Messes with Rust's must_use lints. E.g. a user may not notice that they are accidentally ignoring Result because it is considered used.

I've seen this pattern in typescript (: any), but I don't think it applies well to Rust.

Also, this reads pretty weird to me:

use_stream(&cx, make_stream, |s, v| v.map(|v| *s = Some(v)));

map is explicitly for producing a new value. Producing an Option<()> and then ignoring it feels hacky.

btw, if you really want a one-liner with the current signature, you can technically do this:

v.into_iter().for_each(|v| *s = Some(v))

It is semantically correct, and it's clear what it does. Personally, I prefer the if let version, which doesn't sacrifice readability for saving 2 lines.

What if we get rid of the Option?

From the examples we have so far, it seems like we can replace Option<T> with a plain T. Is there a use case that will need to handle the None case (end of iteration)? I can't think of one at least. So we could simplify the API a bit yet again:

pub fn use_stream<C, T, S>(
    cx: &ScopeState,
    stream: impl FnOnce() -> S + 'static,
    callback: impl Fn(&mut C, T) + 'static,
) -> &UseRef<C>
where
    C: Default + 'static,
    S: Stream<Item = T>,
{
    let state = use_ref(cx, <C as Default>::default);

    let state_cloned = state.clone();
    let _ = use_future(cx, (), |_| async move {
        let stream = stream();
        futures_util::pin_mut!(stream);

        while let Some(value) = StreamExt::next(&mut stream).await {
            let mut container = state_cloned.write();
            callback(&mut *container, value);
        }
    });

    state
}

And then we can

    let latest = use_stream(&cx, make_stream, |s, v| *s = Some(v));

    let vec = use_stream(&cx, make_stream, |s: &mut Vec<_>, v| s.push(v));

Dependencies

Now for adding the dependencies tuple, could that just be passed through to use_future?...

Yes, I suppose we can just expose that.

What about the returned UseFuture? It has useful stuff like restart, cancel, etc. But we're already returning the UseRef. I suspect a good thing to return would be a UseStream struct, which would have the behavior of UseFuture as well as UseRef.

rMazeiks avatar Jun 08 '22 14:06 rMazeiks

"Stream is finished" is definitely a semantically useful thing, signifying a "final" result, which might have a UI representation.

That said, in the cases where it's important, it could be handled in a stream combinator, and removing it does make the ergonomics of the common use-cases much nicer.

(And it also sidesteps the map thing, which I suspect you're right about...😉)

I suspect a good thing to return would be a UseStream struct, which would have the behavior of UseFuture as well as UseRef.

This makes sense.

trevyn avatar Jun 08 '22 15:06 trevyn