broadcaster icon indicating copy to clipboard operation
broadcaster copied to clipboard

Redis PubSub Not Close Subscribing

Open vessaldaneshvar opened this issue 4 years ago • 2 comments

Hi I Using broadcaster for websocket connection and set new unique id for websocket client when new websocket connection open and subscribe on this unique id other application publish message to websocket client by this unique id. my application work properly and this is fine. but I have one issue . connection close but subscribe of redis not close until applicatin down.

async def ws_sender(websocket):
    async with broadcast.subscribe(channel=websocket.channel_name) as subscriber:
        async for event in subscriber:
            await websocket.send_text(event.message)

context manager close but subscribe not close (unsubscribe)

vessaldaneshvar avatar Oct 08 '21 16:10 vessaldaneshvar

I have just tried switching to redis backend (as postgres backend seems to have memory leak issues due to asyncpg somewhere).

The issue with the subscribe not closing is due to the handling of the subscribers in _base.py


    @asynccontextmanager
    async def subscribe(self, channel: str) -> 'Subscriber':
        queue: asyncio.Queue = asyncio.Queue()

        try:
            if not self._subscribers.get(channel):
                await self._backend.subscribe(channel)
                self._subscribers[channel] = set([queue])
            else:
                self._subscribers[channel].add(queue)

            yield Subscriber(queue)

            self._subscribers[channel].remove(queue)
            if not self._subscribers.get(channel):
                del self._subscribers[channel]
                await self._backend.unsubscribe(channel)

        finally:

            await queue.put(None)

This section

            self._subscribers[channel].remove(queue)
            if not self._subscribers.get(channel):
                del self._subscribers[channel]
                await self._backend.unsubscribe(channel)

Needs to be contained in the Finally section, when the websocket ends it raises an exception which leads to the try/except breaking out and never unsubscribing.

EdgyEdgemond avatar Mar 02 '22 15:03 EdgyEdgemond

It's not nice, but you can work around broadcast and maintain existing websocket behaviour like so.

async def ws_sender(websocket):
    exc = None
    async with broadcast.subscribe(channel=websocket.channel_name) as subscriber:
        try:
            async for event in subscriber:
                await websocket.send_text(event.message)
        except Exception as e:
           exc = e
    if exc:
        raise exc

EdgyEdgemond avatar Mar 02 '22 16:03 EdgyEdgemond

Fixed in https://github.com/encode/broadcaster/pull/112

alex-oleshkevich avatar Apr 05 '24 14:04 alex-oleshkevich