kafka-python icon indicating copy to clipboard operation
kafka-python copied to clipboard

Consumption with poll looses some (~30%) of messages forever

Open indywidualny opened this issue 5 years ago • 4 comments

We noticed a critical bug (or expected behavior, you will tell but this is totally not expected IMHO) with Kafka poll(). It's very easy to reproduce I believe.

We initialize Kafka Consumer with the following code:

self.consumer = KafkaConsumer(*kafka_topics, group_id=self.group_id, bootstrap_servers=BOOTSTRAP_SERVERS,
    fetch_max_wait_ms=500, reconnect_backoff_ms=100,
    reconnect_backoff_max_ms=1000, connections_max_idle_ms=540000,
    request_timeout_ms=15000, heartbeat_interval_ms=2000,
    auto_offset_reset=self.auto_offset_reset, enable_auto_commit=False,
    metadata_max_age_ms=1500,
    partition_assignment_strategy=[StickyPartitionAssignor])

Variables:

self.auto_offset_reset = 'earliest'

Then we consume messages in an infinite loop calling the function below constantly. When we want to consume something we ask for max_records=N (N is between 1..25 in our case) and commit afterwards. When we don't want any messages we simply consume max_records=1 and don't commit. This ensures Kafka being polled all the time so heartbeats are being sent and consumer stays all the time active. We simply take care of the commit ourselves.

    def get_messages(self, max_records=1):
        data = self.consumer.poll(timeout_ms=self.timeout, max_records=max_records)
        ready_messages = list()
        if data:
            for tp, messages in data.items():
                for message in messages:
                    logger.debug("Consumed %s:%d:%d: key=%s value=%s" % (tp.topic, tp.partition, message.offset,
                                                                         message.key, message.value))
                ready_messages.extend(messages)
        return ready_messages

Here's the most interesting part. Let's say we post 1000 messages to an empty topic. We do count all the messages consumed since consumer was started. We noticed a horrible, unforgivable behavior. Although all the 1000 messages are marked as consumed (offset moved) we're receiving about 700 of them via get_messages() only!

What happens to hundreds of messages which are lost forever? I cannot believe nobody noticed it but this is a critical bug to be reported, the one which prevents us from using the library instantly.

indywidualny avatar Nov 26 '20 20:11 indywidualny

Ok. Figured out that poll() without commit acts like it was committed anyway...

EDIT: When you want to cancel commit even when really not committed you need to unsubscribe from topics and subscribe again later. This way commit can be cancelled only (even when there was no commit).

indywidualny avatar Nov 27 '20 11:11 indywidualny

What i did in my case was that in the case was do a poll and seek back to the original offset before the poll in case the message is not to be consumed and only to be used as a heartbeat. This also avoids any rebalance which would happen in the case of a re-subscription.

abhaygupta3390 avatar Dec 15 '20 06:12 abhaygupta3390

@abhaygupta3390 You have a nice idea here but keep in mind partition assignment can change in any moment. Therefore you cannot be sure you will not consume from a new partition sometimes. You cannot restore its offset then. It makes it not a reliable solution.

We also tried an internal poll flag called update_offsets but it's not for public usage apparently. It did strange things.

So far there's no solution to the issue except for redesigning the app using Kafka to make sure you will always consume something and well, you need to deal with it later even when it's problematic for you at the moment.

indywidualny avatar Dec 15 '20 13:12 indywidualny

Hello I'm no deleveloper of this project, I did not try your code and I cannot share ours. I just wanted to note here that, with a topic of a single partition I could not reproduce this issue.

tonkolviktor avatar Jan 11 '22 08:01 tonkolviktor