NullPointerException on partitions revoked
I have an application deployed on a k8s cluster, while the system is under some load and the pod is scaled up I've seen a NullPointerException in the logs when the partitions are being revoked from an existing consumer
java.lang.NullPointerException: Cannot invoke "io.confluent.parallelconsumer.state.WorkContainer.getRetryDueAt()" because "workContainer" is null
at i.c.parallelconsumer.state.ShardManager.lambda$new$0(ShardManager.java:81)
at java.util.Comparator.lambda$comparing$77a9974f$1(Comparator.java:473)
at java.util.Comparator.lambda$thenComparing$36697e65$1(Comparator.java:220)
at java.util.Comparator.lambda$thenComparing$36697e65$1(Comparator.java:220)
at java.util.TreeMap.getEntryUsingComparator(TreeMap.java:374)
at java.util.TreeMap.getEntry(TreeMap.java:344)
at java.util.TreeMap.remove(TreeMap.java:881)
at java.util.TreeSet.remove(TreeSet.java:276)
at i.c.parallelconsumer.state.ShardManager.removeWorkFromShardFor(ShardManager.java:173)
at i.c.parallelconsumer.state.ShardManager.removeAnyShardEntriesReferencedFrom(ShardManager.java:157)
at i.c.p.state.PartitionState.onPartitionsRemoved(PartitionState.java:545)
at i.c.p.state.PartitionStateManager.resetOffsetMapAndRemoveWork(PartitionStateManager.java:245)
at i.c.p.state.PartitionStateManager.onPartitionsRemoved(PartitionStateManager.java:184)
at i.c.p.state.PartitionStateManager.onPartitionsRevoked(PartitionStateManager.java:175)
at i.c.parallelconsumer.state.WorkManager.onPartitionsRevoked(WorkManager.java:109)
at i.c.p.i.AbstractParallelEoSStreamProcessor.onPartitionsRevoked(AbstractParallelEoSStreamProcessor.java:421)
... 16 common frames omitted
Wrapped by: i.c.p.internal.InternalRuntimeException: onPartitionsRevoked event error
at i.c.p.i.AbstractParallelEoSStreamProcessor.onPartitionsRevoked(AbstractParallelEoSStreamProcessor.java:423)
at o.a.k.c.c.internals.ConsumerCoordinator.invokePartitionsRevoked(ConsumerCoordinator.java:340)
at o.a.k.c.c.internals.ConsumerCoordinator.onJoinPrepare(ConsumerCoordinator.java:830)
... 14 common frames omitted
Wrapped by: org.apache.kafka.common.KafkaException: User rebalance callback throws an error
at o.a.k.c.c.internals.ConsumerCoordinator.onJoinPrepare(ConsumerCoordinator.java:861)
at o.a.k.c.c.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:443)
at o.a.k.c.c.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:385)
at o.a.k.c.c.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:552)
at o.a.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1276)
at o.a.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1240)
at o.a.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1220)
at i.c.p.internal.ConsumerManager.poll(ConsumerManager.java:58)
at i.c.p.internal.BrokerPollSystem.pollBrokerForRecords(BrokerPollSystem.java:205)
at i.c.p.internal.BrokerPollSystem.handlePoll(BrokerPollSystem.java:160)
at i.c.p.internal.BrokerPollSystem.controlLoop(BrokerPollSystem.java:136)
at java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at j.u.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.lang.Thread.run(Thread.java:842)
From what I've checked in the code looks like a null WorkContainer is passed to the retryQueue TreeSet thus the exception looks like a valid one ShardManager
I'm on version 0.5.2.8.
This should be race condition after checking the code, I am working on the fix
Fix for this is merged now. @Amoratinos - can you please test on build from master?
I can give it a try but I just saw this error once and under heavy load 🤞
Closing as fixed - reopen if it will reoccur.