Redis PubSub Not Close Subscribing
Hi I Using broadcaster for websocket connection and set new unique id for websocket client when new websocket connection open and subscribe on this unique id other application publish message to websocket client by this unique id. my application work properly and this is fine. but I have one issue . connection close but subscribe of redis not close until applicatin down.
async def ws_sender(websocket):
async with broadcast.subscribe(channel=websocket.channel_name) as subscriber:
async for event in subscriber:
await websocket.send_text(event.message)
context manager close but subscribe not close (unsubscribe)
I have just tried switching to redis backend (as postgres backend seems to have memory leak issues due to asyncpg somewhere).
The issue with the subscribe not closing is due to the handling of the subscribers in _base.py
@asynccontextmanager
async def subscribe(self, channel: str) -> 'Subscriber':
queue: asyncio.Queue = asyncio.Queue()
try:
if not self._subscribers.get(channel):
await self._backend.subscribe(channel)
self._subscribers[channel] = set([queue])
else:
self._subscribers[channel].add(queue)
yield Subscriber(queue)
self._subscribers[channel].remove(queue)
if not self._subscribers.get(channel):
del self._subscribers[channel]
await self._backend.unsubscribe(channel)
finally:
await queue.put(None)
This section
self._subscribers[channel].remove(queue)
if not self._subscribers.get(channel):
del self._subscribers[channel]
await self._backend.unsubscribe(channel)
Needs to be contained in the Finally section, when the websocket ends it raises an exception which leads to the try/except breaking out and never unsubscribing.
It's not nice, but you can work around broadcast and maintain existing websocket behaviour like so.
async def ws_sender(websocket):
exc = None
async with broadcast.subscribe(channel=websocket.channel_name) as subscriber:
try:
async for event in subscriber:
await websocket.send_text(event.message)
except Exception as e:
exc = e
if exc:
raise exc
Fixed in https://github.com/encode/broadcaster/pull/112