broadcaster icon indicating copy to clipboard operation
broadcaster copied to clipboard

Kafka backend can't unsubscribe from individual channel

Open andrew222651 opened this issue 3 years ago • 3 comments

Currently, unsubscribing from one Kafka channel unsubscribes from all channels. Based on the aiokafka docs I'm guessing we want to do

self._consumer_channels.remove(channel)
self._consumer.subscribe(topics=self._consumer_channels)

andrew222651 avatar May 13 '22 13:05 andrew222651

Also, when we call AIOKafkaConsumer we might want to add auto_offset_reset="latest" based on https://aiokafka.readthedocs.io/en/stable/consumer.html#controlling-the-consumer-s-position. Even then, when we change the topics we're subscribed to it's not obvious to me that we won't miss events or process events multiple times.

andrew222651 avatar May 18 '22 20:05 andrew222651

Why didn't merge this PR?

tsotnesharvadze avatar Oct 23 '23 06:10 tsotnesharvadze

Currently, unsubscribing from one Kafka channel unsubscribes from all channels. Based on the aiokafka docs I'm guessing we want to do

self._consumer_channels.remove(channel)
self._consumer.subscribe(topics=self._consumer_channels)

When _counsumer_channels will be empty it raise error, so:

    async def unsubscribe(self, channel: str) -> None:
        self._consumer_channels.remove(channel)
        if self._consumer_channels:
            self._consumer.subscribe(topics=list(self._consumer_channels))
        else:
            self._consumer.unsubscribe()

tsotnesharvadze avatar Oct 23 '23 12:10 tsotnesharvadze