WinRT Futures should update their inner Waker
Opening this issue as a continuation of #322. While it is now possible to call poll on winrt futures more than once, consecutive poll calls will not update the Waker. As a result, these futures will hang forever if a future is transferred between tasks after being polled. A simple example of this behavior:
mod bindings {
winrt::include_bindings!();
}
use bindings::*;
use crate::test_component::TestRunner;
use futures::FutureExt;
fn main() {
let mut tokio_rt = tokio::runtime::Builder::new()
.threaded_scheduler()
.build()
.unwrap();
tokio_rt.block_on(async {
let fut_1 = TestRunner::create_async_action(1000).unwrap();
let fut_2 = TestRunner::create_async_action(2000).unwrap();
let mut fut_1_fuse = fut_1.fuse();
let mut fut_2_fuse = fut_2.fuse();
let incomplete_future = futures::select! {
res_1 = fut_1_fuse => fut_2_fuse,
res_2 = fut_2_fuse => fut_1_fuse,
};
println!("1 future complete, finishing other on different task...");
// Work-around !Send future, this example still works with Send winrt futures and tokio::spawn.
let local = tokio::task::LocalSet::new();
local.spawn_local(async move {
incomplete_future.await.unwrap();
});
local.await;
println!("Both futures complete!");
});
println!("Done!");
}
This example will hang indefinitely and never complete. Unfortunately, I don't this its possible to fix this while maintaining the Future impl on IAsyncxxx as some extra memory is needed to store the shared reference to the Waker to update if needed.
In my opinion, IntoFuture is probably the best way forward, however it is currently unstable. Implementing this trait will allow await-ing on that value by implicity calling into_future, just as the IntoIterator trait allows iterating over a value that doesn't explicitly implement Iterator. In the meantime, maybe we could remove the Future impl from IAsyncxxx, replacing it with a method to get a wrapper struct implementing Future?
Thanks for the repro, Nathaniel! That's super helpful. A wrapper may be the way to go, but I really don't enjoy all the verbiage Rust requires with all the unwrapping so I'll think about generating the projection to possibly return the wrapper directly so that it doesn't require another method call.
I was thinking that some of the verbiage could be avoided by making the conversion method not return a result, like this:
// Work-around being unable to implement new functions on foreign types
trait IAsyncActionExt {
fn to_future(self) -> IAsyncActionFuture;
}
impl IAsyncActionExt for IAsyncAction {
fn to_future(self) -> IAsyncActionFuture {
IAsyncActionFuture {
inner: self,
shared_waker: Arc::new(Mutex::new(None)),
}
}
}
struct IAsyncActionFuture {
inner: IAsyncAction,
shared_waker: Arc<Mutex<Option<Waker>>>,
}
impl Future for IAsyncActionFuture {
type Output = winrt::Result<()>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if self.inner.status()? == AsyncStatus::Started {
let mut shared_waker = self.shared_waker.lock().unwrap();
let shared_waker_is_none = shared_waker.is_none();
*shared_waker = Some(cx.waker().clone());
if shared_waker_is_none {
let shared_waker = self.shared_waker.clone();
self.inner.set_completed(AsyncActionCompletedHandler::new(
move |_sender, _args| {
// The waker will always be some here
shared_waker.lock().unwrap().take().unwrap().wake();
Ok(())
},
))?;
}
Poll::Pending
} else {
Poll::Ready(self.inner.get_results())
}
}
}
fn main() {
let mut tokio_rt = tokio::runtime::Builder::new()
.threaded_scheduler()
.build()
.unwrap();
tokio_rt.block_on(async {
let fut_1 = TestRunner::create_async_action(1000).unwrap().to_future();
let fut_2 = TestRunner::create_async_action(2000).unwrap().to_future();
let mut fut_1_fuse = fut_1.fuse();
let mut fut_2_fuse = fut_2.fuse();
let incomplete_future = futures::select! {
res_1 = fut_1_fuse => fut_2_fuse,
res_2 = fut_2_fuse => fut_1_fuse,
};
println!("1 future complete, finishing other on background task...");
// Work-around !Send future, this example still works with Send winrt futures and tokio::spawn.
let local = tokio::task::LocalSet::new();
local.spawn_local(async move {
incomplete_future.await.unwrap();
});
local.await;
println!("Both futures complete!");
});
println!("Done!");
}
The only job of the conversion method is to essentially create a wrapper around an IAsyncxxx and a shared waker implementation, which is infallible. As a result, I think the only increased verbosity here is in the extra to_future method call, which we can partially hide when IntoFuture stabilizes. In this case though, i think the into_future verbosity is needed in order to call fuse on the created futures as FutureExt is only implemented for futures.
IntoFuture is stabilizing now, so this seems worth doing. I would also suggest implementing Drop for the wrapper future and having it call Cancel() on the IAsyncInfo, since that's consistent with semantics of other Rust futures.
Thanks for the reminder. Yes, now that IntoFuture is in 1.64 I'd love to see whether it can help to improve the integration of WinRT async with Rust async/await.