After pausing and resuming partition, the offset is reset far ahead.
If I use code that pauses and then resumes reading from partitions, the offset for the partition is set not to the last message read, but to the message far ahead. Example code:
consumer = KafkaConsumer(bootstrap_servers=[SERVER], auto_offset_reset='earliest', max_poll_records=10)
consumer.subscribe([TOPIC])
partition_count = len(consumer.partitions_for_topic(TOPIC))
paused_tp = set()
while True:
batch = consumer.poll(timeout_ms=1000)
topause = None
for tp, items in batch.items():
for item in items:
print(item.partition, item.offset)
if len(paused_tp) == partition_count-1:
topause = tp
break
consumer.pause(tp)
paused_tp.add(tp)
if topause:
consumer.resume(*paused_tp)
consumer.pause(topause)
break
batch = consumer.poll(timeout_ms=1000)
for tp, items in batch.items():
for item in items:
print(item.partition, item.offset)
This code will read 10 messages from each partition on the topic, and then pause it. After all partitions but one were treated this way, it will pause last partition and resume all the rest. One would expect that the next read resumes from offset 10 in one of the partitions. But this is not what happens, the real result looks something like:
6 2308
6 2309
6 2310
6 2311
6 2312
6 2313
6 2314
6 2315
6 2316
6 2317
And if we look at the debug message, there's this:
DEBUG: kafka.consumer.fetcher Advance position for partition TopicPartition(topic='TOPIC', partition=6) from 0 to 2308 (last message batch location plus one) to correct for deleted compacted messages
DEBUG: kafka.consumer.fetcher Adding fetch request for partition TopicPartition(topic='TOPIC', partition=6) at offset 2308
and so on fo the rest of the topic. The offset 2308 is far ahead the last consumed message at offset 9 and there are certainly messages at offsets between 9 and 2308. But for some reason Kafka jumps ahead, though this is certainly not what I expect when I just pause and unpause it. Is there any way to make pause/resume work as expected, i.e. resume from the last consumed offset?
If I add something like this after resume():
offsets = {p: consumer.position(p) for p in paused_tp}
for p, off in offsets.items():
consumer.seek(p, off)
then everything works as expected. Even though seek() to the result of position() as I understand should be a no-op semantically?
Same problem for me. I'm going to use the hack with seek
Same issue here, seek works for me as well.
The same for me. The fix above works as well. But IMHO, this looks like something to be fixed in library
Same for me, and the workaround works. Thanks @smalyshev
This bug is nasty since some users may not even notice it as it silently corrupts the returned message stream.
Seeing this issue as well, and am using the same seek method to workaround this issue.
Same workaround worked for me. Thanks @smalyshev