kafka-python icon indicating copy to clipboard operation
kafka-python copied to clipboard

After pausing and resuming partition, the offset is reset far ahead.

Open smalyshev opened this issue 5 years ago • 7 comments

If I use code that pauses and then resumes reading from partitions, the offset for the partition is set not to the last message read, but to the message far ahead. Example code:

consumer = KafkaConsumer(bootstrap_servers=[SERVER], auto_offset_reset='earliest', max_poll_records=10)
consumer.subscribe([TOPIC])
partition_count = len(consumer.partitions_for_topic(TOPIC))
paused_tp = set()
while True:
    batch = consumer.poll(timeout_ms=1000)
    topause = None
    for tp, items in batch.items():
        for item in items:
            print(item.partition, item.offset)
        if len(paused_tp) == partition_count-1:
            topause = tp
            break
        consumer.pause(tp)
        paused_tp.add(tp)
    if topause:
        consumer.resume(*paused_tp)
        consumer.pause(topause)
        break

batch = consumer.poll(timeout_ms=1000)
for tp, items in batch.items():
    for item in items:
        print(item.partition, item.offset)

This code will read 10 messages from each partition on the topic, and then pause it. After all partitions but one were treated this way, it will pause last partition and resume all the rest. One would expect that the next read resumes from offset 10 in one of the partitions. But this is not what happens, the real result looks something like:

6 2308
6 2309
6 2310
6 2311
6 2312
6 2313
6 2314
6 2315
6 2316
6 2317

And if we look at the debug message, there's this:

DEBUG: kafka.consumer.fetcher Advance position for partition TopicPartition(topic='TOPIC', partition=6) from 0 to 2308 (last message batch location plus one) to correct for deleted compacted messages
DEBUG: kafka.consumer.fetcher Adding fetch request for partition TopicPartition(topic='TOPIC', partition=6) at offset 2308

and so on fo the rest of the topic. The offset 2308 is far ahead the last consumed message at offset 9 and there are certainly messages at offsets between 9 and 2308. But for some reason Kafka jumps ahead, though this is certainly not what I expect when I just pause and unpause it. Is there any way to make pause/resume work as expected, i.e. resume from the last consumed offset?

smalyshev avatar Mar 04 '20 06:03 smalyshev

If I add something like this after resume():

            offsets = {p: consumer.position(p) for p in paused_tp}
            for p, off in offsets.items():
                consumer.seek(p, off)

then everything works as expected. Even though seek() to the result of position() as I understand should be a no-op semantically?

smalyshev avatar Mar 04 '20 18:03 smalyshev

Same problem for me. I'm going to use the hack with seek

phoenix-mstu avatar Jun 26 '20 08:06 phoenix-mstu

Same issue here, seek works for me as well.

a-shkarupin avatar Sep 15 '20 14:09 a-shkarupin

The same for me. The fix above works as well. But IMHO, this looks like something to be fixed in library

yarik3571 avatar Sep 15 '20 14:09 yarik3571

Same for me, and the workaround works. Thanks @smalyshev

This bug is nasty since some users may not even notice it as it silently corrupts the returned message stream.

peter-marlowe avatar Oct 20 '21 22:10 peter-marlowe

Seeing this issue as well, and am using the same seek method to workaround this issue.

yeungalan0 avatar Nov 12 '21 18:11 yeungalan0

Same workaround worked for me. Thanks @smalyshev

chubutin avatar Jun 21 '22 16:06 chubutin