RecursionError while polling inside custom ConsumerRebalanceListener.on_partitions_assigned
Python version: 3.8 Kafka-python version: 2.0.2 Kafka broker version: strimzi/kafka:0.19.0-kafka-2.5.0 (docker image provided by strimzi)
In my application, I am providing a regex pattern to kafka consumer's subscribe() method at runtime. The pattern is simply an "or" of different topics. Since I have to force the seeking to latest offset for each of those topics, and I know that using subscribe() partition assignment is lazy, I am calling seek_to_end() but after having called a dummy poll() that is going to trigger metadata refreshing, causing the consumer to exactly know which are its assigned partitions to successfully execute the subsequent seek_to_end() on those partitions. The dummy poll() + seek_to_end() methods are executed inside a custom callback overriding ConsumerRebalanceListener.on_partitions_assigned(). Usually it works as expected, but sometimes I noticed that the following exception is raised when calling the dummy poll() (with timeout=0 and max_records=1):
File "/usr/local/lib/python3.8/site-packages/kafka/consumer/group.py", line 655, in poll
records = self._poll_once(remaining, max_records, update_offsets=update_offsets)
File "/usr/local/lib/python3.8/site-packages/kafka/consumer/group.py", line 675, in _poll_once
self._coordinator.poll()
File "/usr/local/lib/python3.8/site-packages/kafka/coordinator/consumer.py", line 287, in poll
self._client.poll(future=metadata_update)
File "/usr/local/lib/python3.8/site-packages/kafka/client_async.py", line 602, in poll
self._poll(timeout / 1000)
File "/usr/local/lib/python3.8/site-packages/kafka/client_async.py", line 687, in _poll
self._pending_completion.extend(conn.recv())
File "/usr/local/lib/python3.8/site-packages/kafka/conn.py", line 1053, in recv
responses = self._recv()
File "/usr/local/lib/python3.8/site-packages/kafka/conn.py", line 1127, in _recv
return self._protocol.receive_bytes(recvd_data)
File "/usr/local/lib/python3.8/site-packages/kafka/protocol/parser.py", line 132, in receive_bytes
resp = self._process_response(self._rbuffer)
File "/usr/local/lib/python3.8/site-packages/kafka/protocol/parser.py", line 168, in _process_response
response = request.RESPONSE_TYPE.decode(read_buffer)
File "/usr/local/lib/python3.8/site-packages/kafka/protocol/struct.py", line 50, in decode
return cls(*[field.decode(data) for field in cls.SCHEMA.fields])
File "/usr/local/lib/python3.8/site-packages/kafka/protocol/struct.py", line 50, in <listcomp>
return cls(*[field.decode(data) for field in cls.SCHEMA.fields])
File "/usr/local/lib/python3.8/site-packages/kafka/protocol/types.py", line 193, in decode
return [self.array_of.decode(data) for _ in range(length)]
File "/usr/local/lib/python3.8/site-packages/kafka/protocol/types.py", line 193, in <listcomp>
return [self.array_of.decode(data) for _ in range(length)]
File "/usr/local/lib/python3.8/site-packages/kafka/protocol/types.py", line 152, in decode
return tuple([field.decode(data) for field in self.fields])
File "/usr/local/lib/python3.8/site-packages/kafka/protocol/types.py", line 152, in <listcomp>
return tuple([field.decode(data) for field in self.fields])
File "/usr/local/lib/python3.8/site-packages/kafka/protocol/types.py", line 193, in decode
return [self.array_of.decode(data) for _ in range(length)]
File "/usr/local/lib/python3.8/site-packages/kafka/protocol/types.py", line 193, in <listcomp>
return [self.array_of.decode(data) for _ in range(length)]
File "/usr/local/lib/python3.8/site-packages/kafka/protocol/types.py", line 152, in decode
return tuple([field.decode(data) for field in self.fields])
File "/usr/local/lib/python3.8/site-packages/kafka/protocol/types.py", line 152, in <listcomp>
return tuple([field.decode(data) for field in self.fields])
File "/usr/local/lib/python3.8/site-packages/kafka/protocol/types.py", line 190, in decode
length = Int32.decode(data)
File "/usr/local/lib/python3.8/site-packages/kafka/protocol/types.py", line 64, in decode
return _unpack(cls._unpack, data.read(4))
File "/usr/local/lib/python3.8/site-packages/kafka/protocol/frame.py", line 11, in read
if self._idx > len(self):
RecursionError: maximum recursion depth exceeded while calling a Python object
So, to recap the flow of operations is as follows:
- I subscribe to a topic pattern
- In my custom on_partitions_assigned callback, I call a dummy poll(timeout=0, max_records=1) if I see that the partitions arg, provided by the callback itself, contains some topics on which I have to force a seek_to_end() call
- The dummy poll() itself raises the above exception (so, the code block after that dummy poll() executing the seek_to_end() is never called)
Could you please check if this is a bug on the lib itself or if I'm just doing something wrong? Thank you.