trio icon indicating copy to clipboard operation
trio copied to clipboard

Should MemoryChannels have their buffers be growable and shrinkable?

Open Fuyukai opened this issue 4 years ago • 7 comments

In my code, I use a channel to maintain a connection pool. The idea of being able to resize the pool by resizing the channel's buffer came to me. Would it be a good idea to have this available? It's certainly a niche design but potentially useful.

Growing would simply resize the underlying buffer, and wake up any senders. Shrinking would likely act the same way as CapacityLimiter, where the channel blocks any new senders until the buffer shrinks below the new buffer size.

Fuyukai avatar Jul 26 '21 06:07 Fuyukai

It's certainly a thing that we could implement. It does add some complexity to the memory channel API (+ docs, tests, etc.), so I guess it would be good to first figure out if there are use cases to justify that.

From chat, it sounds like in @Fuyukai's connection pool use case, the channel capacity is only used as a "best effort" check to catch misuse of private APIs/buggy code -- the actual pool capacity is tracked by other means, and if the code is correct then the limit is never hit (so it might as well be infinite). That's a real use case, but I'm not sure how compelling it is -- certainly less so than if this would make the actual connection tracking logic simpler :-).

njsmith avatar Jul 26 '21 06:07 njsmith

I tend not to do that, as a connection pool works better as a stack: the data of the connection that's been given back most recently are still in the cache, on both ends. A capacity-zero channel can then be used to feed new users whenever some are waiting.

smurfix avatar Jul 26 '21 07:07 smurfix

I brought up a different usecase for this in gitter a while ago. Copied for convenience:

Let's say item is a precious result that we definitely want to send to a receiver via an unbuffered memory channel. If I just put a shielded cancel scope around a MemorySendChannel.send, that'll work... unless all receivers are cancelled by the same cancel event (e.g. app shutdown), then I'll get a deadlock. This goofy buffer hack lets me force items into the channel which can be picked up by any live receivers or just dropped if the receivers all die first.

async def send(item):
    try:
        await send_chan.send(item)
    except BaseException:
        send_chan._state.max_buffer_size += 1
        send_chan.send_nowait(item)
        raise

Another way to solve this would be to make sure receivers are not cancellable and the send chan is closed upon sender cancel, but I guess I feel this should all be the responsibility of the sender.

Anyway, simply exposing _state.max_buffer_size via a property on the channels would have pretty reasonable semantics without any other behavior changes, so the main effort would be in documentation.

richardsheridan avatar Jul 26 '21 11:07 richardsheridan

Umm, thou shalt not swallow the Cancelled exception (in fact I'd ignore the internals and catch+reraise BaseException instead), but yes, that's a reasonable use case.

smurfix avatar Jul 26 '21 11:07 smurfix

Anyway, simply exposing _state.max_buffer_size via a property on the channels would have pretty reasonable semantics without any other behavior changes

Surely this would cause blocked senders to stay blocked until an appropriate receive, rather than being put onto the buffer? It works in your case since you're forcing an item on immediately after.

Fuyukai avatar Jul 27 '21 21:07 Fuyukai

Surely this would cause blocked senders to stay blocked until an appropriate receive, rather than being put onto the buffer? It works in your case since you're forcing an item on immediately after.

Good catch! I think the property logic would have access to all the state needed to wake up a task and put the value in the channel buffer, so it still seems doable.

richardsheridan avatar Jul 27 '21 22:07 richardsheridan

Guys, sorry to just hijack this, but @smurfix 's "hack" seemed interesting for something I'm working on at the moment. First of all, I'm just trying to fix some bug, I'm not a Python poweruser, or trio... not even a backend wizard... that said, I have an issue where a send raises an exeption and the code removes the listener (please go easy on my namings...)

async def dispatch_invoice_listener(payment: Payment):
    for send_channel in api_invoice_listeners:
        try:
            send_channel.send_nowait(payment)
        except trio.WouldBlock:
            print("removing sse listener", send_channel)
            api_invoice_listeners.remove(send_channel)

from what i understand, the listener is removed but never gets "attached" again, so I end up with never getting the notification about that payment, unless i restart the server, and it goes though all the ones in db, etc...

How can I make this work, to if it finds the error, I'm guessing maybe the buffer is full... to restart the listener... if I'm making sense!

BTW, buffer size is 5: invoice_paid_chan_send, invoice_paid_chan_recv = trio.open_memory_channel(5)

Any tips to make this better?

edited: three backticks please.

talvasconcelos avatar Aug 16 '21 14:08 talvasconcelos