[BUG] Consumer gets old offset when it joins consumer group
Describe the bug I have a long-running test that has three consumers in a consumer group, every 40 minutes it stops one consumer, then 20 minutes later adds a new consumer to the group. So every 40 minutes there is a new consumer. The new consumer often gets an old offset.
To Reproduce pulsar-cloud:2.9.2.16 pulsar-protocol-handler-kafka-2.9.2.16.nar sncloud-staging ('eagle' poolmember)
Steps to reproduce the behavior:
- Start three Kafka consumers in the same group. a. The consumers are configured with auto.offset.reset=earliest, enable.auto.commit=false. b. The consumers call commitSync() after each message is received.
- Start a Kafka producer and produce messages to the topic.
- Stop a consumer, wait a few minutes.
- Add a new consumer to the group, wait a few minutes.
- Repeat (stop a consumer, wait, add a consumer).
Expected behavior Expect the new consumer always to start at the last committed offset. Instead, the new consumer often starts at an older offset. Over time this causes the consumers to receive thousands of duplicate messages.
Screenshots
Consumers receive more messages than the producer sends. Many duplicate messages received.

Additional context Client source code: https://github.com/streamnative/continuous-verification/tree/master/workers/kop-produce-consume/app/src
Running with this config:
acceptableKafkaReceiveLatencyMs: 750
consumerGroupRebalanceIntervalMs: 1200000
doConsumerGroupRebalance: true
elasticSearchUrl: null
heartbeatServiceAccount: sa-continuous-verification@cv-pulsar.auth.test.cloud.gcp.streamnative.dev
kafkaBootstrapServers: null
kafkaConsumer:
autoCommitIntervalMs: 500
enableAutoCommit: false
groupId: kop-produce-consume
kafkaProducer:
acks: all
clientId: kop-produce-consume
metricsPort: 8080
produceIntervalMs: 5000
pulsarClient:
oauthAudience: urn:sn:pulsar:cv-pulsar:test-2-9-kop2
oauthCredentialsUrl: file:///var/creds/service-account.json
oauthCredentialsUrlAdmin: file:///var/admincreds/admin-service-account.json
oauthIssuerUrl: https://auth.sncloud-stg.dev/
serviceUrl: pulsar+ssl://test-2-9-kop2-49eea6ed-86f7-472a-8ac5-29787c6ee3cd.gcp-shared-gcp-usce1-eagle.streamnative.g.sn3.dev:6651
serviceUrlAdmin: https://test-2-9-kop2-49eea6ed-86f7-472a-8ac5-29787c6ee3cd.gcp-shared-gcp-usce1-eagle.streamnative.g.sn3.dev
target: test-2-9-kop
targetNamespace: cv-pulsar
topic: public/default/partitioned-topic
It seems to happen on a cycle. The offset is stale for a few hours, then the new consumers get the correct offset when they join, for a while, then the offset is stale again.

From the client logs.
For partition 1, the client received message at 32266 and called commitSync().
But during the rebalance, the consumer is assigned the starting offset 32153.
"May 17, 2022 @ 07:47:17.928","kop-produce-consume2-test-2-9-kop2-75c774df49-mk746","13:47:17.927 [kafka-consumer-109] INFO kop.produce.consume.KafkaConsumer - Received message 1:32266, 1652795237477 with latency of 422 ms"
"May 17, 2022 @ 07:47:22.647","kop-produce-consume2-test-2-9-kop2-75c774df49-mk746","13:47:22.646 [kafka-consumer-98] INFO kop.produce.consume.KafkaConsumer - Received message 2:32246, 1652795242477 with latency of 140 ms"
"May 17, 2022 @ 07:47:27.964","kop-produce-consume2-test-2-9-kop2-75c774df49-mk746","13:47:27.964 [kafka-consumer-109] INFO kop.produce.consume.KafkaConsumer - Received message 0:32336, 1652795247477 with latency of 457 ms"
"May 17, 2022 @ 07:47:30.401","kop-produce-consume2-test-2-9-kop2-75c774df49-mk746","13:47:30.401 [rebalancer] INFO kop.produce.consume.KafkaConsumerSupervisor - ******** Starting kafka consumer kafka-consumer-130 ********"
"May 17, 2022 @ 07:47:31.341","kop-produce-consume2-test-2-9-kop2-75c774df49-mk746","13:47:31.341 [kafka-consumer-109] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-kop-produce-consume-109, groupId=kop-produce-consume] Setting offset for partition public/default/partitioned-topic-0 to the committed offset FetchPosition{offset=32232, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[test-2-9-kop2-broker-0-49eea6ed-86f7-472a-8ac5-29787c6ee3cd.gcp-shared-gcp-usce1-eagle.streamnative.g.sn3.dev:9093 (id: 1886075926 rack: null)], epoch=absent}}"
"May 17, 2022 @ 07:47:31.341","kop-produce-consume2-test-2-9-kop2-75c774df49-mk746","13:47:31.341 [kafka-consumer-98] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-kop-produce-consume-98, groupId=kop-produce-consume] Setting offset for partition public/default/partitioned-topic-2 to the committed offset FetchPosition{offset=32140, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[test-2-9-kop2-broker-0-49eea6ed-86f7-472a-8ac5-29787c6ee3cd.gcp-shared-gcp-usce1-eagle.streamnative.g.sn3.dev:9093 (id: 1886075926 rack: null)], epoch=absent}}"
"May 17, 2022 @ 07:47:31.342","kop-produce-consume2-test-2-9-kop2-75c774df49-mk746","13:47:31.342 [kafka-consumer-130] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-kop-produce-consume-130, groupId=kop-produce-consume] Setting offset for partition public/default/partitioned-topic-1 to the committed offset FetchPosition{offset=32153, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[test-2-9-kop2-broker-0-49eea6ed-86f7-472a-8ac5-29787c6ee3cd.gcp-shared-gcp-usce1-eagle.streamnative.g.sn3.dev:9093 (id: 1886075926 rack: null)], epoch=absent}}"
@Demogorgon314 This seems fixed in 2.9.3.13, does that sound right? Do you know what PR fixes this?
@utahkay PR #1501 has fixed this issue.
Here the client receives the same message 1:157137 three times
Received message 1:157709, 1666002785014 with latency of 505 ms
Resetting offset for partition public/default/kop-topic-1 to position FetchPosition{offset=156746
Received message 1:156746, 1665988229347 with latency of 14569905 ms
It isn't happening on every consumer group rebalance like it used to. It happens about every 4-6 hours, which translates to about every 12-18 consumer group rebalances.