kop icon indicating copy to clipboard operation
kop copied to clipboard

[BUG] Consumer gets old offset when it joins consumer group

Open utahkay opened this issue 3 years ago • 5 comments

Describe the bug I have a long-running test that has three consumers in a consumer group, every 40 minutes it stops one consumer, then 20 minutes later adds a new consumer to the group. So every 40 minutes there is a new consumer. The new consumer often gets an old offset.

To Reproduce pulsar-cloud:2.9.2.16 pulsar-protocol-handler-kafka-2.9.2.16.nar sncloud-staging ('eagle' poolmember)

Steps to reproduce the behavior:

  1. Start three Kafka consumers in the same group. a. The consumers are configured with auto.offset.reset=earliest, enable.auto.commit=false. b. The consumers call commitSync() after each message is received.
  2. Start a Kafka producer and produce messages to the topic.
  3. Stop a consumer, wait a few minutes.
  4. Add a new consumer to the group, wait a few minutes.
  5. Repeat (stop a consumer, wait, add a consumer).

Expected behavior Expect the new consumer always to start at the last committed offset. Instead, the new consumer often starts at an older offset. Over time this causes the consumers to receive thousands of duplicate messages.

Screenshots

Consumers receive more messages than the producer sends. Many duplicate messages received.

image

Additional context Client source code: https://github.com/streamnative/continuous-verification/tree/master/workers/kop-produce-consume/app/src

Running with this config:
    acceptableKafkaReceiveLatencyMs: 750
    consumerGroupRebalanceIntervalMs: 1200000
    doConsumerGroupRebalance: true
    elasticSearchUrl: null
    heartbeatServiceAccount: sa-continuous-verification@cv-pulsar.auth.test.cloud.gcp.streamnative.dev
    kafkaBootstrapServers: null
    kafkaConsumer:
      autoCommitIntervalMs: 500
      enableAutoCommit: false
      groupId: kop-produce-consume
    kafkaProducer:
      acks: all
      clientId: kop-produce-consume
    metricsPort: 8080
    produceIntervalMs: 5000
    pulsarClient:
      oauthAudience: urn:sn:pulsar:cv-pulsar:test-2-9-kop2
      oauthCredentialsUrl: file:///var/creds/service-account.json
      oauthCredentialsUrlAdmin: file:///var/admincreds/admin-service-account.json
      oauthIssuerUrl: https://auth.sncloud-stg.dev/
      serviceUrl: pulsar+ssl://test-2-9-kop2-49eea6ed-86f7-472a-8ac5-29787c6ee3cd.gcp-shared-gcp-usce1-eagle.streamnative.g.sn3.dev:6651
      serviceUrlAdmin: https://test-2-9-kop2-49eea6ed-86f7-472a-8ac5-29787c6ee3cd.gcp-shared-gcp-usce1-eagle.streamnative.g.sn3.dev
    target: test-2-9-kop
    targetNamespace: cv-pulsar
    topic: public/default/partitioned-topic

utahkay avatar May 17 '22 18:05 utahkay

It seems to happen on a cycle. The offset is stale for a few hours, then the new consumers get the correct offset when they join, for a while, then the offset is stale again. image

utahkay avatar May 17 '22 18:05 utahkay

From the client logs.

For partition 1, the client received message at 32266 and called commitSync().

But during the rebalance, the consumer is assigned the starting offset 32153.

"May 17, 2022 @ 07:47:17.928","kop-produce-consume2-test-2-9-kop2-75c774df49-mk746","13:47:17.927 [kafka-consumer-109] INFO kop.produce.consume.KafkaConsumer - Received message 1:32266, 1652795237477 with latency of 422 ms"
"May 17, 2022 @ 07:47:22.647","kop-produce-consume2-test-2-9-kop2-75c774df49-mk746","13:47:22.646 [kafka-consumer-98] INFO kop.produce.consume.KafkaConsumer - Received message 2:32246, 1652795242477 with latency of 140 ms"
"May 17, 2022 @ 07:47:27.964","kop-produce-consume2-test-2-9-kop2-75c774df49-mk746","13:47:27.964 [kafka-consumer-109] INFO kop.produce.consume.KafkaConsumer - Received message 0:32336, 1652795247477 with latency of 457 ms"
"May 17, 2022 @ 07:47:30.401","kop-produce-consume2-test-2-9-kop2-75c774df49-mk746","13:47:30.401 [rebalancer] INFO kop.produce.consume.KafkaConsumerSupervisor - ******** Starting kafka consumer kafka-consumer-130 ********"
"May 17, 2022 @ 07:47:31.341","kop-produce-consume2-test-2-9-kop2-75c774df49-mk746","13:47:31.341 [kafka-consumer-109] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-kop-produce-consume-109, groupId=kop-produce-consume] Setting offset for partition public/default/partitioned-topic-0 to the committed offset FetchPosition{offset=32232, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[test-2-9-kop2-broker-0-49eea6ed-86f7-472a-8ac5-29787c6ee3cd.gcp-shared-gcp-usce1-eagle.streamnative.g.sn3.dev:9093 (id: 1886075926 rack: null)], epoch=absent}}"
"May 17, 2022 @ 07:47:31.341","kop-produce-consume2-test-2-9-kop2-75c774df49-mk746","13:47:31.341 [kafka-consumer-98] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-kop-produce-consume-98, groupId=kop-produce-consume] Setting offset for partition public/default/partitioned-topic-2 to the committed offset FetchPosition{offset=32140, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[test-2-9-kop2-broker-0-49eea6ed-86f7-472a-8ac5-29787c6ee3cd.gcp-shared-gcp-usce1-eagle.streamnative.g.sn3.dev:9093 (id: 1886075926 rack: null)], epoch=absent}}"
"May 17, 2022 @ 07:47:31.342","kop-produce-consume2-test-2-9-kop2-75c774df49-mk746","13:47:31.342 [kafka-consumer-130] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-kop-produce-consume-130, groupId=kop-produce-consume] Setting offset for partition public/default/partitioned-topic-1 to the committed offset FetchPosition{offset=32153, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[test-2-9-kop2-broker-0-49eea6ed-86f7-472a-8ac5-29787c6ee3cd.gcp-shared-gcp-usce1-eagle.streamnative.g.sn3.dev:9093 (id: 1886075926 rack: null)], epoch=absent}}"

utahkay avatar May 17 '22 18:05 utahkay

@Demogorgon314 This seems fixed in 2.9.3.13, does that sound right? Do you know what PR fixes this?

utahkay avatar Oct 14 '22 22:10 utahkay

@utahkay PR #1501 has fixed this issue.

Demogorgon314 avatar Oct 15 '22 14:10 Demogorgon314

Here the client receives the same message 1:157137 three times

Client logs

Received message 1:157709, 1666002785014 with latency of 505 ms
Resetting offset for partition public/default/kop-topic-1 to position FetchPosition{offset=156746
Received message 1:156746, 1665988229347 with latency of 14569905 ms

It isn't happening on every consumer group rebalance like it used to. It happens about every 4-6 hours, which translates to about every 12-18 consumer group rebalances.

utahkay avatar Oct 17 '22 16:10 utahkay