sqlx icon indicating copy to clipboard operation
sqlx copied to clipboard

Using the PgListener, how to listen for events while removing/adding channels?

Open vivescere opened this issue 5 years ago • 5 comments

Hi!

I'm trying to use the PgListener interface to listen for events (using recv or try_recv), while also using listen and unlisten. I'd like to be able to do something like this (pseudo code):

let mut listener = PgListener::connect(url).await.unwrap();
listener.listen("key").await.unwrap();

tokio::spawn(async {
    listener.listen("other-key").await.unwrap();
});

loop {
    let message = listener.recv().await.unwrap();
    dbg!(message);
}

But I can't see how I'd easily (from another task) call listen or unlisten. I can't use a mutex because then I'd have to wait for a message to subscribe/unsubscribe.

I thought that using a tokio::sync::mpsc channel could work, using tokio::select to get either a command to subscribe/unsubscribe or a notification. So to start off I tried to use select with a simple timer; but I get a decode error that I assume has to do with the fact that select! does not poll futures that weren't selected:

let mut listener = PgListener::connect(url).await.unwrap();
let mut heartbeat = time::interval(time::Duration::from_secs(1));

listener.listen("key").await.unwrap();

loop {
    let task_listen = listener.recv().fuse();
    let task_heartbeat = heartbeat.tick().fuse();

    pin_mut!(task_listen, task_heartbeat);

    tokio::select! {
        message = task_listen => {
            dbg!(message.unwrap());
        },
        _ = task_heartbeat => {
            println!("heartbeat");
        }
    }
}

The error (full backtrace here):

heartbeat
heartbeat
thread 'tokio-runtime-worker' panicked at 'called `Result::unwrap()` on an `Err` value: Protocol("unknown message type: \'\\u{0}\'")', src/web/new.rs:81:22

If that helps, I'm using postgres 12.4, sqlx 0.4.0-beta.1, and tokio 0.2.22.

vivescere avatar Aug 17 '20 18:08 vivescere

While executing listen[_all](), unlisten_[all]() and any methods from Executor, the connection inside PgListener buffers all subscribed messages in the background and returns them in order the next time you call recv(); you shouldn't miss anything unless there's a bug.

abonander avatar Aug 18 '20 04:08 abonander

Thanks for the reply!

I fully trust that, my problem is that I can't afford to wait for another message to come before calling listen to subscribe to another channel. Maybe I'm missing something obvious, but since both listen and recv take an &mut self, I can't call them both at the same time..

Something like a mutex forces me to wait:

let listener = Arc::new(Mutex::new(PgListener::connect(url).await.unwrap()));
let copy = listener.clone();

tokio::spawn(async move {
    // Assuming I'm receiving those channels from some stream
    loop {
        copy.lock().await.listen("some-channel").await.unwrap();
    }
});

loop {
    let mut listener = listener.lock().await;
    // This waits for at best a few seconds, at worst a few minutes, while preventing calls to listen
    let message = listener.recv().await;
    dbg!(message);
}

Maybe adding a timeout to the recv operation could help, but I can't see how I'd do that.

vivescere avatar Aug 18 '20 13:08 vivescere

Right, so tokio::select!() breaks the connection because our futures don't support being canceled very well; it's an area we're actively refactoring for another beta release. Using tokio::time::timeout would have the same effect, unfortunately.

abonander avatar Aug 18 '20 21:08 abonander

Talked it over with @mehcode and we arrived on an interesting solution. We'd like to be able to attach a channel (such as futures::channel::mpsc) to PgListener for it to forward messages to so you can still keep using it normally and don't have to wait on .recv().

I'm thinking we can keep the original buffering behavior by default, but make PgListener generic over a trait MessageSink that's implemented for Vec<PgMessage>, VecDeque<PgMessage>, etc., as well as channel types from async-std and Tokio. (We can't use futures::sink::Sink even though it would be perfect because Tokio's channel's don't implement it.)

PgListener will still deref to PgConnection so it can be used as normal, but will also have a step() function (bikeshedding) to just wait for messages so it could be spawned into a task exclusively for listening to messages.

abonander avatar Dec 21 '20 04:12 abonander

Was there any progress with this in 0.6.x?

bbqsrc avatar Jul 07 '22 12:07 bbqsrc

any update on this?

hairui19 avatar Dec 10 '23 05:12 hairui19