Add `use_stream` hook
Specific Demand
Like use_future, except instead of taking a Future<Output = T>, takes a futures::Stream<Item = T>
Hi, thanks for the suggestion!
What would the API look like? use_future returns a UseFuture struct, which you can use to query the current state & resolved value of the future.
But if I understand correctly, Stream asynchronously provides the values of an iterator one by one. How would you expect to use those in the context of a Dioxus component?
I'm guessing there could probably be a callback for consuming the values? Then you would be able to push them into some state, and render all the values yielded so far. But I don't know a whole lot about async Rust, so not sure if that makes sense.
Ah, yes. My use case would be replacing the state with each new value, basically the same API as use_future except it gets updated as new values come in. You’re right that appending each new value is also a valid use case. Hmm…
Maybe it keeps an internal buffer, and has both a method for getting the entire buffer, and a method for getting the last item and clearing all previous items? (retain_last()? That’s the best name I can come up with right now!😅)
My use case would be replacing the state with each new value
Ah, makes sense! I think with a callback API, we can support both usecases. Collection:
// pseudocode
let stuff = use_state(cx, || Vec::new()); // can be any data structure!
let _ = use_stream(cx, ||make_sream(), |value| stuff.write().push(value));
Latest-only:
let latest: Option<ValueType> = use_state(cx, || None);
let _ = use_stream(cx, ||make_sream(), |value| *latest.write() = Some(value));
I like it!
I'm interested, too.
We have a background job to refresh the state every N seconds, however, UseFuture or its set is not 'static so we cannot make it in a cx.spawn. We fork the use_future, rename it to use_stream, and make it accept Stream currently.
Please make use_stream work as use_future as much as possiblie, if we'd like to offer use_stream directly, especially the argument dependencies, Thanks!
It looks like for ergonomics, it might make sense to include the use_state in the hook, otherwise a bunch of lifetime issues come up.
Here's an attempt:
pub fn use_stream<R, S, T, U>(
cx: &ScopeState,
stream: impl FnOnce() -> S + 'static,
initial: impl FnOnce() -> U,
cb: impl Fn(&UseState<U>, Option<T>) -> R + 'static,
) -> &UseState<U>
where
S: futures_util::Stream<Item = T>,
{
let state = use_state(cx, initial);
let state_cloned = state.clone();
let _: &CoroutineHandle<()> = use_coroutine(cx, |_| async move {
let stream = stream();
futures_util::pin_mut!(stream);
while let Some(val) = futures_util::StreamExt::next(&mut stream).await {
cb(&state_cloned, Some(val));
}
cb(&state_cloned, None);
});
state
}
Usage:
let _ = use_stream(&cx, make_stream, || None, |s, v| v.map(|v| s.set(Some(v))));
let _ = use_stream(&cx, make_stream, Vec::new, |s, v| v.map(|v| s.make_mut().push(v)));
The dependencies part is a bit intense...!
Here's an attempt:
Hmm, API looks pretty good to me! (Though I'm not the maintainer)
The dependencies part is a bit intense...!
True, 3 parameters can get hard to follow. Perhaps we can use Default to avoid the initializer? Works for all std collections (including Vec) and Option too. Not sure if there's a usecase it doesn't cover.
Perhaps, for maximum flexibility, we could have use_stream_with_init which takes an init callback, and a use_stream which just uses Default.
I did try making it use Default, but then it required an explicit &UseState<Option<_>> in the usage, which seemed a bit much.
Hi again, I took a closer look at your code.
I did try making it use Default, but then it required an explicit &UseState<Option<_>> in the usage, which seemed a bit much.
Indeed, it appears that type inference doesn't work in this situation. However, if we let the closure take a &mut instead of the UseState, inference works for some reason! Also, I think &mut is more ergonomic, since the closure doesn't need to know anything about a UseState.
btw I changed the UseState to a UseRef because we can't afford cloning the entire thing (say a huge Vec) if it happens to be a collection.
Also, I removed the R generic, and the update closure now returns nothing. Not sure why that was there.
Anyway, this compiles and seems to work:
#![allow(non_snake_case)]
use dioxus::prelude::*;
use futures::stream::{iter, StreamExt};
use futures::Stream;
fn main() {
dioxus::desktop::launch(app);
}
fn make_stream() -> impl Stream<Item = i64> {
iter(0..42)
}
pub fn app(cx: Scope) -> Element {
let latest = use_stream(&cx, make_stream, |s, v| {
if let Some(v) = v {
*s = Some(v)
}
});
let vec = use_stream(&cx, make_stream, |s: &mut Vec<_>, v| {
if let Some(v) = v {
s.push(v)
}
});
let current = format!("{:?}", latest.read());
let all = format!("{:?}", vec.read());
cx.render(rsx! {
div {
"Current: {current}"
},
div {
"All: {all}"
}
})
}
pub fn use_stream<C, T, S>(
cx: &ScopeState,
stream: impl FnOnce() -> S + 'static,
cb: impl Fn(&mut C, Option<T>) + 'static,
) -> &UseRef<C>
where
C: Default + 'static,
S: Stream<Item = T>,
{
let state = use_ref(cx, <C as Default>::default);
let state_cloned = state.clone();
let _ = use_future(cx, (), |_| async move {
let stream = stream();
futures_util::pin_mut!(stream);
while let Some(val) = StreamExt::next(&mut stream).await {
let mut container = state_cloned.write();
cb(&mut *container, Some(val));
}
let mut container = state_cloned.write();
cb(&mut *container, None);
});
state
}
Let me know what you think!
Ah, good call on the &mut, use_ref, and use_future!
The R generic parameter is there so that we can make the usage a one-liner with map, which actually returns an Option that just gets dropped:
let latest = use_stream(&cx, make_stream, |s, v| v.map(|v| *s = Some(v)));
Haha, or even more aggressively:
let latest = use_stream(&cx, make_stream, |s, v| v.map(|_| *s = v));
Now for adding the dependencies tuple, could that just be passed through to use_future?...
The R generic parameter is there so that we can make the usage a one-liner with map, which actually returns an Option that just gets dropped
idk, this smells like an anti-pattern to me. reasons why i don't like this:
- Meaningless type parameter; confusing signature. Makes it seem like the return value does something – users may be mistaken into thinking e.g. that they should return the new value
- Messes with Rust's
must_uselints. E.g. a user may not notice that they are accidentally ignoringResultbecause it is considered used.
I've seen this pattern in typescript (: any), but I don't think it applies well to Rust.
Also, this reads pretty weird to me:
use_stream(&cx, make_stream, |s, v| v.map(|v| *s = Some(v)));
map is explicitly for producing a new value. Producing an Option<()> and then ignoring it feels hacky.
btw, if you really want a one-liner with the current signature, you can technically do this:
v.into_iter().for_each(|v| *s = Some(v))
It is semantically correct, and it's clear what it does. Personally, I prefer the if let version, which doesn't sacrifice readability for saving 2 lines.
What if we get rid of the Option?
From the examples we have so far, it seems like we can replace Option<T> with a plain T. Is there a use case that will need to handle the None case (end of iteration)? I can't think of one at least. So we could simplify the API a bit yet again:
pub fn use_stream<C, T, S>(
cx: &ScopeState,
stream: impl FnOnce() -> S + 'static,
callback: impl Fn(&mut C, T) + 'static,
) -> &UseRef<C>
where
C: Default + 'static,
S: Stream<Item = T>,
{
let state = use_ref(cx, <C as Default>::default);
let state_cloned = state.clone();
let _ = use_future(cx, (), |_| async move {
let stream = stream();
futures_util::pin_mut!(stream);
while let Some(value) = StreamExt::next(&mut stream).await {
let mut container = state_cloned.write();
callback(&mut *container, value);
}
});
state
}
And then we can
let latest = use_stream(&cx, make_stream, |s, v| *s = Some(v));
let vec = use_stream(&cx, make_stream, |s: &mut Vec<_>, v| s.push(v));
Dependencies
Now for adding the dependencies tuple, could that just be passed through to use_future?...
Yes, I suppose we can just expose that.
What about the returned UseFuture? It has useful stuff like restart, cancel, etc. But we're already returning the UseRef. I suspect a good thing to return would be a UseStream struct, which would have the behavior of UseFuture as well as UseRef.
"Stream is finished" is definitely a semantically useful thing, signifying a "final" result, which might have a UI representation.
That said, in the cases where it's important, it could be handled in a stream combinator, and removing it does make the ergonomics of the common use-cases much nicer.
(And it also sidesteps the map thing, which I suspect you're right about...😉)
I suspect a good thing to return would be a
UseStreamstruct, which would have the behavior ofUseFutureas well asUseRef.
This makes sense.