Using the PgListener, how to listen for events while removing/adding channels?
Hi!
I'm trying to use the PgListener interface to listen for events (using recv or try_recv), while also using listen and unlisten. I'd like to be able to do something like this (pseudo code):
let mut listener = PgListener::connect(url).await.unwrap();
listener.listen("key").await.unwrap();
tokio::spawn(async {
listener.listen("other-key").await.unwrap();
});
loop {
let message = listener.recv().await.unwrap();
dbg!(message);
}
But I can't see how I'd easily (from another task) call listen or unlisten. I can't use a mutex because then I'd have to wait for a message to subscribe/unsubscribe.
I thought that using a tokio::sync::mpsc channel could work, using tokio::select to get either a command to subscribe/unsubscribe or a notification. So to start off I tried to use select with a simple timer; but I get a decode error that I assume has to do with the fact that select! does not poll futures that weren't selected:
let mut listener = PgListener::connect(url).await.unwrap();
let mut heartbeat = time::interval(time::Duration::from_secs(1));
listener.listen("key").await.unwrap();
loop {
let task_listen = listener.recv().fuse();
let task_heartbeat = heartbeat.tick().fuse();
pin_mut!(task_listen, task_heartbeat);
tokio::select! {
message = task_listen => {
dbg!(message.unwrap());
},
_ = task_heartbeat => {
println!("heartbeat");
}
}
}
The error (full backtrace here):
heartbeat
heartbeat
thread 'tokio-runtime-worker' panicked at 'called `Result::unwrap()` on an `Err` value: Protocol("unknown message type: \'\\u{0}\'")', src/web/new.rs:81:22
If that helps, I'm using postgres 12.4, sqlx 0.4.0-beta.1, and tokio 0.2.22.
While executing listen[_all](), unlisten_[all]() and any methods from Executor, the connection inside PgListener buffers all subscribed messages in the background and returns them in order the next time you call recv(); you shouldn't miss anything unless there's a bug.
Thanks for the reply!
I fully trust that, my problem is that I can't afford to wait for another message to come before calling listen to subscribe to another channel. Maybe I'm missing something obvious, but since both listen and recv take an &mut self, I can't call them both at the same time..
Something like a mutex forces me to wait:
let listener = Arc::new(Mutex::new(PgListener::connect(url).await.unwrap()));
let copy = listener.clone();
tokio::spawn(async move {
// Assuming I'm receiving those channels from some stream
loop {
copy.lock().await.listen("some-channel").await.unwrap();
}
});
loop {
let mut listener = listener.lock().await;
// This waits for at best a few seconds, at worst a few minutes, while preventing calls to listen
let message = listener.recv().await;
dbg!(message);
}
Maybe adding a timeout to the recv operation could help, but I can't see how I'd do that.
Right, so tokio::select!() breaks the connection because our futures don't support being canceled very well; it's an area we're actively refactoring for another beta release. Using tokio::time::timeout would have the same effect, unfortunately.
Talked it over with @mehcode and we arrived on an interesting solution. We'd like to be able to attach a channel (such as futures::channel::mpsc) to PgListener for it to forward messages to so you can still keep using it normally and don't have to wait on .recv().
I'm thinking we can keep the original buffering behavior by default, but make PgListener generic over a trait MessageSink that's implemented for Vec<PgMessage>, VecDeque<PgMessage>, etc., as well as channel types from async-std and Tokio. (We can't use futures::sink::Sink even though it would be perfect because Tokio's channel's don't implement it.)
PgListener will still deref to PgConnection so it can be used as normal, but will also have a step() function (bikeshedding) to just wait for messages so it could be spawned into a task exclusively for listening to messages.
Was there any progress with this in 0.6.x?
any update on this?