parallel-consumer icon indicating copy to clipboard operation
parallel-consumer copied to clipboard

Paused consumption across multiple consumers

Open rbokade-rbk opened this issue 10 months ago • 26 comments

We have observed that consumption is frequently halted or paused indefinitely until we restart the pod associated with the partition.

In production, we observe that message consumption is being paused in a partition because the PartitionStateManager marks the partition as lost and skips message processing, assuming it has been assigned to a different consumer.

Rebalance triggered : P12 assigned to same pod 2025-03-03 07:30:15.219 INFO pc-broker-poll io.confluent.parallelconsumer.state.PartitionStateManager Partitions revoked: [app-xxx.consent.consent.receipts-11] 2025-03-03 07:30:15.232 INFO pc-broker-poll io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor Assigned 1 total (1 new) partition(s) [app-xxx.consent.consent.receipts-12]

PartitionStateManager declared partition Lost 2025-03-03 12:07:42.765 INFO pc-broker-poll io.confluent.parallelconsumer.state.PartitionStateManager Lost partitions: [app-xxx.consent.consent.receipts-12]

No work in mailbox though we have enough lag to process and No Inflight messages , last commit was on 23rd , which was paused until 24th .

Image

Logs : Partition is assigned to this pod but as per logs it claims that its assigned to different consumer .

2025-03-03 12:07:42.765 INFO pc-broker-poll io.confluent.parallelconsumer.state.PartitionStateManager Lost partitions: [app-xxx.consent.consent.receipts-12] 2025-03-03 12:07:43.003 INFO (tenant-config-input-consumer,app-xxx.ds-preference-tenant-config.retry).retry-worker-4 com.onetrust.messaging.consumer.KafkaConsumer Partitions assigned: [] 2025-03-03 12:07:44.303 INFO pc-broker-poll io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor Assigned 1 total (1 new) partition(s) [app-xxx.consent.consent.receipts-12] 2025-03-04 00:00:19.334 DEBUG pc-broker-poll io.confluent.parallelconsumer.internal.ConsumerManager Poll completed normally (after timeout of PT2S) and returned 0... 2025-03-04 00:00:19.335 DEBUG pc-broker-poll io.confluent.parallelconsumer.internal.BrokerPollSystem Poll completed 2025-03-04 00:00:19.335 DEBUG pc-broker-poll io.confluent.parallelconsumer.internal.BrokerPollSystem Got 0 records in poll result 2025-03-04 00:00:19.335 DEBUG pc-broker-poll io.confluent.parallelconsumer.internal.BrokerPollSystem Subscriptions are paused: true 2025-03-04 00:00:19.335 DEBUG pc-broker-poll io.confluent.parallelconsumer.internal.BrokerPollSystem Long polling broker with timeout PT2S, might appear to sleep here if subs are paused, or no data available on broker. Run state: RUNNING 2025-03-04 00:00:19.335 DEBUG pc-broker-poll io.confluent.parallelconsumer.internal.ConsumerManager Poll starting with timeout: PT2S 2025-03-04 00:00:21.336 DEBUG pc-broker-poll io.confluent.parallelconsumer.internal.ConsumerManager Poll completed normally (after timeout of PT2S) and returned 0... 2025-03-04 00:00:21.336 DEBUG pc-broker-poll io.confluent.parallelconsumer.internal.BrokerPollSystem Poll completed 2025-03-04 00:00:21.336 DEBUG pc-broker-poll io.confluent.parallelconsumer.internal.BrokerPollSystem Got 0 records in poll result 2025-03-04 00:00:21.337 DEBUG pc-broker-poll io.confluent.parallelconsumer.internal.BrokerPollSystem Subscriptions are paused: true 2025-03-04 00:00:21.337 DEBUG pc-broker-poll io.confluent.parallelconsumer.internal.BrokerPollSystem Long polling broker with timeout PT2S, might appear to sleep here if subs are paused, or no data available on broker. Run state: RUNNING 2025-03-04 00:00:21.337 DEBUG pc-broker-poll io.confluent.parallelconsumer.internal.ConsumerManager Poll starting with timeout: PT2S 2025-03-04 00:00:22.440 DEBUG pc-control io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor Mailbox results returned null, indicating timeToBlockFor elapsed (which was set as PT4.99999484S) 2025-03-04 00:00:22.440 DEBUG pc-control io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor isPoolQueueLow()? workAmountBelowTarget true 0 vs 10; 2025-03-04 00:00:22.440 DEBUG pc-control io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor Will try to get work - target: 20, current queue size: 0, requesting: 20, loading factor: 2 2025-03-04 00:00:22.441 DEBUG pc-control io.confluent.parallelconsumer.state.PartitionState Epoch mismatch 0 vs 2 for record WorkContainer(tp:app-xxx.consent.consent.receipts-12:o:771160:k:16ffceee). Skipping message - it's partition has already assigned to a different consumer. 2025-03-04 00:00:22.441 DEBUG pc-control io.confluent.parallelconsumer.state.PartitionState Work is in queue with stale epoch or no longer assigned. Skipping. Shard it came from will/was removed during partition revocation. WC: WorkContainer(tp:app-xxx.consent.consent.receipts-12:o:771160:k:16ffceee) 2025-03-04 00:00:22.441 DEBUG pc-control io.confluent.parallelconsumer.state.PartitionState Epoch mismatch 0 vs 2 for record WorkContainer(tp:app-xxx.consent.consent.receipts-12:o:771169:k:3a1af6a2). Skipping message - it's partition has already assigned to a different consumer. 2025-03-04 00:00:22.441 DEBUG pc-control io.confluent.parallelconsumer.state.PartitionState Work is in queue with stale epoch or no longer assigned. Skipping. Shard it came from will/was removed during partition revocation. WC: WorkContainer(tp:app-xxx.consent.consent.receipts-12:o:771169:k:3a1af6a2) 2025-03-04 00:00:22.441 DEBUG pc-control io.confluent.parallelconsumer.state.PartitionState Epoch mismatch 0 vs 2 for record WorkContainer(tp:app-xxx.consent.consent.receipts-12:o:771170:k:fc145cb0). Skipping message - it's partition has already assigned to a different consumer. 2025-03-04 00:00:22.441 DEBUG pc-control io.confluent.parallelconsumer.state.PartitionState Work is in queue with stale epoch or no longer assigned. Skipping. Shard it came from will/was removed during partition revocation. WC: WorkContainer(tp:app-xxx.consent.consent.receipts-12:o:771170:k:fc145cb0) 2025-03-04 00:00:22.441 DEBUG pc-control io.confluent.parallelconsumer.state.PartitionState Epoch mismatch 0 vs 2 for record WorkContainer(tp:app-xxx.consent.consent.receipts-12:o:771215:k:a7bbbfff). Skipping message - it's partition has already assigned to a different consumer. 2025-03-04 00:00:22.441 DEBUG pc-control io.confluent.parallelconsumer.state.PartitionState Work is in queue with stale epoch or no longer assigned. Skipping. Shard it came from will/was removed during partition revocation. WC: WorkContainer(tp:app-xxx.consent.consent.receipts-12:o:771215:k:a7bbbfff) 2025-03-04 00:00:22.441 DEBUG pc-control io.confluent.parallelconsumer.state.PartitionState Epoch mismatch 0 vs 2 for record WorkContainer(tp:app-xxx.consent.consent.receipts-12:o:771189:k:3c5ce0b3). Skipping message - it's partition has already assigned to a different consumer. 2025-03-04 00:00:22.441 DEBUG pc-control io.confluent.parallelconsumer.state.PartitionState Work is in queue with stale epoch or no longer assigned. Skipping. Shard it came from will/was removed during partition revocation. WC: WorkContainer(tp:app-xxx.consent.consent.receipts-12:o:771189:k:3c5ce0b3) 2025-03-04 00:00:22.441 DEBUG pc-control io.confluent.parallelconsumer.state.PartitionState Epoch mismatch 0 vs 2 for record WorkContainer(tp:app-xxx.consent.consent.receipts-12:o:771243:k:8e803207). Skipping message - it's partition has already assigned to a different consumer. 2025-03-04 00:00:22.441 DEBUG pc-control io.confluent.parallelconsumer.state.PartitionState Work is in queue with stale epoch or no longer assigned. Skipping. Shard it came from will/was removed during partition revocation. WC: WorkContainer(tp:app-xxx.consent.consent.receipts-12:o:771243:k:8e803207) 2025-03-04 00:00:22.441 DEBUG pc-control io.confluent.parallelconsumer.state.PartitionState Epoch mismatch 0 vs 2 for record WorkContainer(tp:app-xxx.consent.consent.receipts-12:o:771168:k:d50689cd). Skipping message - it's partition has already assigned to a different consumer. 2025-03-04 00:00:22.441 DEBUG pc-control io.confluent.parallelconsumer.state.PartitionState Work is in queue with stale epoch or no longer assigned. Skipping. Shard it came from will/was removed during partition revocation. WC: WorkContainer(tp:app-xxx.consent.consent.receipts-12:o:771168:k:d50689cd) 2025-03-04 00:00:22.442 DEBUG pc-control io.confluent.parallelconsumer.state.PartitionState Epoch mismatch 0 vs 2 for record WorkContainer(tp:app-xxx.consent.consent.receipts-12:o:771171:k:20d5aeca). Skipping message - it's partition has already assigned to a different consumer.

On reviewing thread dump , PC threads are in wait state

"(dsCacheSync-in-0,ds-preference-cache-sync).pc-pool-13-thread-1" - Thread t@139 java.lang.Thread.State: WAITING at [email protected]/jdk.internal.misc.Unsafe.park(Native Method) - parking to wait for <70bbade3> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at [email protected]/java.util.concurrent.locks.LockSupport.park(Unknown Source) at [email protected]/java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionNode.block(Unknown Source) at [email protected]/java.util.concurrent.ForkJoinPool.unmanagedBlock(Unknown Source) at [email protected]/java.util.concurrent.ForkJoinPool.managedBlock(Unknown Source) at [email protected]/java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(Unknown Source) at [email protected]/java.util.concurrent.LinkedBlockingQueue.take(Unknown Source) at [email protected]/java.util.concurrent.ThreadPoolExecutor.getTask(Unknown Source) at [email protected]/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at [email protected]/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) at [email protected]/java.lang.Thread.run(Unknown Source)

PC Version in production : 0.5.2.8 Upgrade in progress : 0.5.3.2

rbokade-rbk avatar Mar 31 '25 13:03 rbokade-rbk

@rbokade-rbk , I think the partitions sub is paused according to the log and that is why you could not get any new records even you assigned the partition. And I highly suspect that this bug is due to : failed to clear the stale container which I fixed in 2023, but somehow the fix doesn't work.

I will check on this one. But if you could reproduce, could u please tune the log level to TRACE and catch again? so I could get more information to confirm.

sangreal avatar Apr 23 '25 00:04 sangreal

We've stumbled upon probably the same kind of issue. Sometimes when there is a redeployment and Kafka rebalancing kicks in (under heavy load) we may end up in a situation where it seems like consumer is not processing any messages. However there are messages not processed which is visible via a lag.

I've also tried to use Kafka Incremental Cooperative Rebalancing and this issue is even more visible.

@sangreal any news about potential fix?

netroute-js avatar Jun 19 '25 09:06 netroute-js

@sangreal , could you please see if this can be prioritized , we are widely noticing this issue

rbokade-rbk avatar Jun 20 '25 03:06 rbokade-rbk

@sangreal or @astubbs, any ETA when this issue could be prioritized and fixed? It's really something that impacts our flow.

netroute-js avatar Jun 24 '25 10:06 netroute-js

@netroute-js @rbokade-rbk please provide a trace level log when this issue happens, I will take a look.

sangreal avatar Jun 24 '25 10:06 sangreal

It. cannot be reproduced in non production envs , it does halt in production , we enabled the logs after the fact and have attached to the ticket (https://github.com/confluentinc/parallel-consumer/issues/857#issuecomment-2766945013) , but cannot enable the logs before the fact as we are unsure how long it would take to issue to reoccur in an env/consumer

rbokade-rbk avatar Jun 26 '25 12:06 rbokade-rbk

@rbokade-rbk the log is too short, I checked again. I don't have enough information. My fix is introduced in 0.5.2.7. So supposedly you should not have the issue. Because the issue happens where partition revoked and assign to the same consumer again(high possibility if you enable Kafka Incremental Cooperative Rebalancing). And after I fixed, our system never encounter this issue again. So it is weird. What I need is :

  1. give me the log contains when rebalancing happens
  2. It will be very helpful if you enable TRACE level log for para-consumer.

And please make sure of that you are using 0.5.2.8.

Another thought is you may try to remove the StickyCoorperativeAssignor to check if this issue is gone or much less.

sangreal avatar Jun 27 '25 07:06 sangreal

We've stumbled upon a lag of more than 700 000 records in one topic and consumer completely stack on that one until the consumer is terminated.

We are using 0.5.3.2. I will try to enable TRACE logging tommorow and reproduce it in a testing environment.

netroute-js avatar Jul 03 '25 15:07 netroute-js

@netroute-js please set TRACE level logging limited to the io.confluent package to avoid too many other irrelevant logging.

sangreal avatar Jul 04 '25 08:07 sangreal

@sangreal, I was able to reproduce the issue in our testing environment. Below you can find the information about the environment:

  • PC version: 0.5.3.2
  • Kafka topic has 30 partitions,
  • Traffic TPS: couple of thousands of events per sec
  • Enabled Kafka Incremental Rebalancing. However we've stumbled upon this issue in production as well (in prod, we don't have Incremental Rebalancing)
  • During the test, I was basically adding/removing k8s pods (like 2-3 in single go) to force rebalancing and at some point the issue kicked in and made our consumer to stop processing events. In that case lag grows very quickly,
  • the only way to move forward is to re-assign the impacted partition to a different consumer

In the attachment you can find logs from the test. It's quite a lot of them and our service consumes from multiple different topics so I've replaced the topics names in the output file:

  • LAG_TOPIC, that one is of the interest here,
  • NOT_INTERESTING_TOPIC, you can skip that one.

I've also gathered some PC metrics for that specific consumer and partition:

  • In AKQH, I see that this partition has offset set to 8037030, Lag set to 4989
  • Highest completed offset is 8042018
  • Highest seen offset is 8042018
  • Sequentially succeded offset is 8042018
  • Incomplete offset is 0
  • Latest commited offset is 8042019

It's important to note that at 10:34:00, it seems that the issue has already happened because the lag is there but consumption stopped more or less.

If you need any more information, let me know please.

pc_logs.txt

netroute-js avatar Jul 09 '25 13:07 netroute-js

@sangreal, please let know if we are able to identify issue ?

rbokade-rbk avatar Jul 16 '25 15:07 rbokade-rbk

@sangreal any news? We've stumbled upon this issue in production again. The bad news is that redeploying doesn't help now.

UPDATE: We've had a lag of 5 messages in a retry topic. Once I've restarted the pods, 3 of those messages were consumed but the two remain as lag - no matter how many times I redeploy. I've checked the logs and I've found the log that says:

Fetch position FetchPosition{offset=8, ....} is out for range for partition Resetting offset for partition to position FetchPosition{offset=10} There are no new messages but somehow the consumer is stack at 8 as committed offset seen in AKHQ and offset 7 in PC metrics. Once I redeploy this pod, it starts from the offset 8 and loop continues. I think, this is a different issue but maybe that will help you guys to troubleshoot whats going on.

netroute-js avatar Jul 21 '25 10:07 netroute-js

@netroute-js, I was super busy as you know I am not Confluent guys so I have my work to do.. Anyway, I took some time to look at the new logs which is pc_log. But the behavior is different from Apr log since : (1) Apr log is because there are too many stale messages then the sub is paused. and lag happens (2) pc_log: actually is not paused. but somehow polling 0 records, which is weird but I cannot find any suspicious. Because this is already TRACE, so supposedly there will be fishy logging, but there are not.

Do you have more logs to share? DEBUG also fine. I want to confirm it is caused by stale messages. If so, I will find a way to improve the stale message exclusion logic.

sangreal avatar Jul 28 '25 11:07 sangreal

@sangreal thanks for the update. Unfortunately, it's all that I was able to extract. What do you mean by stale messages?

netroute-js avatar Jul 28 '25 17:07 netroute-js

@netroute-js @rbokade-rbk I draft a pr for possible fix. I will ask confluent guys to review https://github.com/confluentinc/parallel-consumer/pull/882

sangreal avatar Aug 02 '25 02:08 sangreal

Could you please confirm when will be 5.3.3 jar available to adopt ? i see latest is 5.3.2 https://repo1.maven.org/maven2/io/confluent/parallelconsumer/parallel-consumer-core/

rbokade11 avatar Aug 14 '25 15:08 rbokade11

We are also awaiting for this change. Any news? @sangreal

netroute-js avatar Aug 25 '25 08:08 netroute-js

@netroute-js please ask for help in Slack channel since I am not member of Confluent so I don't have permission to release new version.

sangreal avatar Aug 26 '25 13:08 sangreal

@astubbs any idea how we can proceed with the new version?

netroute-js avatar Aug 27 '25 12:08 netroute-js

If I forked this project, and made it completely a community thing, would you still use it?

astubbs avatar Aug 27 '25 12:08 astubbs

@netroute-js @rbokade-rbk @rbokade11 The new version has released, please integrate and let me know if still having issue

sangreal avatar Sep 03 '25 07:09 sangreal

I see core is not release yet https://repo1.maven.org/maven2/io/confluent/parallelconsumer/parallel-consumer-core/

Image

but parent is https://repo1.maven.org/maven2/io/confluent/parallelconsumer/parallel-consumer-parent/

rbokade11 avatar Sep 03 '25 17:09 rbokade11

Hey @sangreal,

Unfortunately, I have to report that after upgrading to the newest version the problem is still there. The probability seems to be reduced because it happened to us after 2 months of continous un-interrupted traffic. The consumer was once again stuck and it was caused by a new leader election on our side - could you try to verify that scenario on your side please?

netroute-js avatar Nov 24 '25 08:11 netroute-js

Hey @sangreal

I've spent a bit more time trying to debug the issue carefully and below you can find the details:

  • it started to happen during every redeployment of our services (traffic hours),
  • during redeployments, it fails to handle some records, so they are sent to retry topic,
  • even though the consumer lag is visible, it seems that the records were actually handled by retry topic handler - however, the lag does not change.

To visualize it even more, let's say we've got an active ConsumerA and we trigger a redeployment. The ConsumerA is being terminated and a new ConsumerB is being bootstrapped. ConsumerA tries to gracefully shutdown but some connections are already closed so it cannot finish some inflight requests thus it fails, sends failed records to retry topic and at some point is terminated completely. ConsumerB on the other hand is ready and consumes the failed records from retry topic. I can confirm that those records were handled successfully. However, the lag is still visible in Kafka.

According to my metrics and AKHQ the current offset of a retry topic is 108 and the consumer lag is 39. Below you can find PC metrics for this specific partition:

pc_partition_highest_completed_offset = 107 (Consumer A)

pc_partition_highest_completed_offset = 146 (Consumer B)

pc_partition_highest_seen_offset = 107 (Consumer A)

pc_partition_highest_seen_offset = 146 (Consumer B)

pc_partition_highest_sequential_succeeded_offset = 107 (Consumer A)

pc_partition_highest_sequential_succeeded_offset = 146 (Consumer B)

pc_partition_incomplete_offsets = 0 (Consumer A)

pc_partition_incomplete_offsets  = 0 (Consumer B)

pc_partition_latest_committed_offset = 0 (Consumer A)

pc_partition_latest_committed_offset = 147 (Consumer B)

I wonder why some metrics have the offset 146 but the pc_partition_latest_committed_offset is set to 147. Moreover, according to AKHQ, the offset is 108 and lag is 39, so adding 108 + 39 = 147

What's the source of those discrepancies?

netroute-js avatar Dec 08 '25 12:12 netroute-js

Thx, I am hectic busy but I will try my best to spare some time to investigate. Meanwhile could u please share you log here? @netroute-js

sangreal avatar Dec 09 '25 13:12 sangreal