parallel-consumer icon indicating copy to clipboard operation
parallel-consumer copied to clipboard

Error in onPartitionsAssigned in parallel consumer

Open milansanjeev opened this issue 3 years ago • 14 comments

Hi Team,

I want to use parallel consumer in one of over spring service to process kafka stream. I am using core parallel-consumer-core 0.5.1.0 but getting below exception. We are secured kafka clsuter.


2022-07-04 15:14:49.224 app=opxhub-order-domain-bookings-stream environment=springEnvironment_IS_UNDEFINED ERROR 39138 --- [ pc-broker-poll] i.c.p.state.PartitionStateManager        : Error in onPartitionsAssigned

java.lang.RuntimeException: Unexpected magic: 1
	at io.confluent.parallelconsumer.offsets.OffsetEncoding.decode(OffsetEncoding.java:52)
	at io.confluent.parallelconsumer.offsets.EncodedOffsetPair.unwrap(EncodedOffsetPair.java:75)
	at io.confluent.parallelconsumer.offsets.OffsetMapCodecManager.decodeCompressedOffsets(OffsetMapCodecManager.java:229)
	at io.confluent.parallelconsumer.offsets.OffsetMapCodecManager.deserialiseIncompleteOffsetMapFromBase64(OffsetMapCodecManager.java:161)
	at io.confluent.parallelconsumer.offsets.OffsetMapCodecManager.deserialiseIncompleteOffsetMapFromBase64(OffsetMapCodecManager.java:151)
	at io.confluent.parallelconsumer.offsets.OffsetMapCodecManager.decodePartitionState(OffsetMapCodecManager.java:165)
	at io.confluent.parallelconsumer.offsets.OffsetMapCodecManager.lambda$loadPartitionStateForAssignment$0(OffsetMapCodecManager.java:128)
	at java.base/java.util.HashMap.forEach(HashMap.java:1425)
	at io.confluent.parallelconsumer.offsets.OffsetMapCodecManager.loadPartitionStateForAssignment(OffsetMapCodecManager.java:125)
	at io.confluent.parallelconsumer.state.PartitionStateManager.onPartitionsAssigned(PartitionStateManager.java:107)
	at io.confluent.parallelconsumer.state.WorkManager.onPartitionsAssigned(WorkManager.java:98)
	at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.onPartitionsAssigned(AbstractParallelEoSStreamProcessor.java:360)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsAssigned(ConsumerCoordinator.java:293)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:430)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:449)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:365)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:508)
	at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1261)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1230)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210)
	at io.confluent.parallelconsumer.internal.ConsumerManager.poll(ConsumerManager.java:54)
	at io.confluent.parallelconsumer.internal.BrokerPollSystem.pollBrokerForRecords(BrokerPollSystem.java:183)
	at io.confluent.parallelconsumer.internal.BrokerPollSystem.handlePoll(BrokerPollSystem.java:140)
	at io.confluent.parallelconsumer.internal.BrokerPollSystem.controlLoop(BrokerPollSystem.java:116)
	at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
	at java.base/java.lang.Thread.run(Thread.java:832)

2022-07-04 15:14:49.224 app=opxhub-order-domain-bookings-stream environment=springEnvironment_IS_UNDEFINED ERROR 39138 --- [ pc-broker-poll] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=cre_impulse, groupId=cre_impulse] User provided listener io.confluent.parallelconsumer.ParallelEoSStreamProcessor failed on invocation of onPartitionsAssigned for partitions [commerce_order_eg_domain_event_v3-2, commerce_order_eg_domain_event_v3-1, commerce_order_eg_domain_event_v3-0]

java.lang.RuntimeException: Unexpected magic: 1
	at io.confluent.parallelconsumer.offsets.OffsetEncoding.decode(OffsetEncoding.java:52)
	at io.confluent.parallelconsumer.offsets.EncodedOffsetPair.unwrap(EncodedOffsetPair.java:75)
	at io.confluent.parallelconsumer.offsets.OffsetMapCodecManager.decodeCompressedOffsets(OffsetMapCodecManager.java:229)
	at io.confluent.parallelconsumer.offsets.OffsetMapCodecManager.deserialiseIncompleteOffsetMapFromBase64(OffsetMapCodecManager.java:161)
	at io.confluent.parallelconsumer.offsets.OffsetMapCodecManager.deserialiseIncompleteOffsetMapFromBase64(OffsetMapCodecManager.java:151)
	at io.confluent.parallelconsumer.offsets.OffsetMapCodecManager.decodePartitionState(OffsetMapCodecManager.java:165)
	at io.confluent.parallelconsumer.offsets.OffsetMapCodecManager.lambda$loadPartitionStateForAssignment$0(OffsetMapCodecManager.java:128)
	at java.base/java.util.HashMap.forEach(HashMap.java:1425)
	at io.confluent.parallelconsumer.offsets.OffsetMapCodecManager.loadPartitionStateForAssignment(OffsetMapCodecManager.java:125)
	at io.confluent.parallelconsumer.state.PartitionStateManager.onPartitionsAssigned(PartitionStateManager.java:107)
	at io.confluent.parallelconsumer.state.WorkManager.onPartitionsAssigned(WorkManager.java:98)
	at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.onPartitionsAssigned(AbstractParallelEoSStreamProcessor.java:360)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsAssigned(ConsumerCoordinator.java:293)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:430)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:449)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:365)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:508)
	at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1261)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1230)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210)
	at io.confluent.parallelconsumer.internal.ConsumerManager.poll(ConsumerManager.java:54)
	at io.confluent.parallelconsumer.internal.BrokerPollSystem.pollBrokerForRecords(BrokerPollSystem.java:183)
	at io.confluent.parallelconsumer.internal.BrokerPollSystem.handlePoll(BrokerPollSystem.java:140)
	at io.confluent.parallelconsumer.internal.BrokerPollSystem.controlLoop(BrokerPollSystem.java:116)
	at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
	at java.base/java.lang.Thread.run(Thread.java:832)

2022-07-04 15:14:49.224 app=opxhub-order-domain-bookings-stream environment=springEnvironment_IS_UNDEFINED ERROR 39138 --- [ pc-broker-poll] i.c.p.internal.BrokerPollSystem          : Unknown error

org.apache.kafka.common.KafkaException: User rebalance callback throws an error
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:436)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:449)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:365)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:508)
	at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1261)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1230)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210)
	at io.confluent.parallelconsumer.internal.ConsumerManager.poll(ConsumerManager.java:54)
	at io.confluent.parallelconsumer.internal.BrokerPollSystem.pollBrokerForRecords(BrokerPollSystem.java:183)
	at io.confluent.parallelconsumer.internal.BrokerPollSystem.handlePoll(BrokerPollSystem.java:140)
	at io.confluent.parallelconsumer.internal.BrokerPollSystem.controlLoop(BrokerPollSystem.java:116)
	at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
	at java.base/java.lang.Thread.run(Thread.java:832)
Caused by: java.lang.RuntimeException: Unexpected magic: 1
	at io.confluent.parallelconsumer.offsets.OffsetEncoding.decode(OffsetEncoding.java:52)
	at io.confluent.parallelconsumer.offsets.EncodedOffsetPair.unwrap(EncodedOffsetPair.java:75)
	at io.confluent.parallelconsumer.offsets.OffsetMapCodecManager.decodeCompressedOffsets(OffsetMapCodecManager.java:229)
	at io.confluent.parallelconsumer.offsets.OffsetMapCodecManager.deserialiseIncompleteOffsetMapFromBase64(OffsetMapCodecManager.java:161)
	at io.confluent.parallelconsumer.offsets.OffsetMapCodecManager.deserialiseIncompleteOffsetMapFromBase64(OffsetMapCodecManager.java:151)
	at io.confluent.parallelconsumer.offsets.OffsetMapCodecManager.decodePartitionState(OffsetMapCodecManager.java:165)
	at io.confluent.parallelconsumer.offsets.OffsetMapCodecManager.lambda$loadPartitionStateForAssignment$0(OffsetMapCodecManager.java:128)
	at java.base/java.util.HashMap.forEach(HashMap.java:1425)
	at io.confluent.parallelconsumer.offsets.OffsetMapCodecManager.loadPartitionStateForAssignment(OffsetMapCodecManager.java:125)
	at io.confluent.parallelconsumer.state.PartitionStateManager.onPartitionsAssigned(PartitionStateManager.java:107)
	at io.confluent.parallelconsumer.state.WorkManager.onPartitionsAssigned(WorkManager.java:98)
	at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.onPartitionsAssigned(AbstractParallelEoSStreamProcessor.java:360)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsAssigned(ConsumerCoordinator.java:293)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:430)
	... 15 common frames omitted

2022-07-04 15:14:51.489 app=opxhub-order-domain-bookings-stream environment=springEnvironment_IS_UNDEFINED DEBUG 39138 --- [d | cre_impulse] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=cre_impulse, groupId=cre_impulse] Sending Heartbeat request with generation 15 and member id cre_impulse-f929f15d-aab5-4aaa-a7ed-0a821c208b03 to coordinator kafka-1g-us-east-1.egdp-test.aws.away.black:11302 (id: 2147483615 rack: null)
2022-07-04 15:14:51.490 app=opxhub-order-domain-bookings-stream environment=springEnvironment_IS_UNDEFINED DEBUG 39138 --- [d | cre_impulse] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=cre_impulse, groupId=cre_impulse] Sending HEARTBEAT request with header RequestHeader(apiKey=HEARTBEAT, apiVersion=4, clientId=cre_impulse, correlationId=8) and timeout 30000 to node 2147483615: HeartbeatRequestData(groupId='cre_impulse', generationId=15, memberId='cre_impulse-f929f15d-aab5-4aaa-a7ed-0a821c208b03', groupInstanceId=null)
2022-07-04 15:14:51.902 app=opxhub-order-domain-bookings-stream environment=springEnvironment_IS_UNDEFINED DEBUG 39138 --- [d | cre_impulse] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=cre_impulse, groupId=cre_impulse] Received HEARTBEAT response from node 2147483615 for request with header RequestHeader(apiKey=HEARTBEAT, apiVersion=4, clientId=cre_impulse, correlationId=8): HeartbeatResponseData(throttleTimeMs=0, errorCode=0)
2022-07-04 15:14:51.903 app=opxhub-order-domain-bookings-stream environment=springEnvironment_IS_UNDEFINED DEBUG 39138 --- [d | cre_impulse] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=cre_impulse, groupId=cre_impulse] Received successful Heartbeat response
2022-07-04 15:14:51.997 app=opxhub-order-domain-bookings-stream environment=springEnvironment_IS_UNDEFINED DEBUG 39138 --- [     pc-control] c.p.i.AbstractParallelEoSStreamProcessor : Mailbox results returned null, indicating timeToBlockFor (which was set as PT4.998615S)
2022-07-04 15:14:51.997 app=opxhub-order-domain-bookings-stream environment=springEnvironment_IS_UNDEFINED DEBUG 39138 --- [     pc-control] c.p.i.AbstractParallelEoSStreamProcessor : Should commit this cycle? shouldCommitNow? true : shouldDoANormalCommit? true, commitFrequencyOK? true, lingerBeneficial? false, isCommandedToCommit? false
2022-07-04 15:14:51.997 app=opxhub-order-domain-bookings-stream environment=springEnvironment_IS_UNDEFINED DEBUG 39138 --- [     pc-control] c.p.i.AbstractParallelEoSStreamProcessor : Committing offsets that are ready...
2022-07-04 15:14:51.997 app=opxhub-order-domain-bookings-stream environment=springEnvironment_IS_UNDEFINED DEBUG 39138 --- [     pc-control] c.p.i.AbstractParallelEoSStreamProcessor : Committing offsets that are ready...
2022-07-04 15:14:51.997 app=opxhub-order-domain-bookings-stream environment=springEnvironment_IS_UNDEFINED DEBUG 39138 --- [     pc-control] i.c.p.internal.ConsumerOffsetCommitter   : Async commit to be requested
2022-07-04 15:14:51.998 app=opxhub-order-domain-bookings-stream environment=springEnvironment_IS_UNDEFINED ERROR 39138 --- [     pc-control] c.p.i.AbstractParallelEoSStreamProcessor : Error from poll control thread, will attempt controlled shutdown, then rethrow. Error: Error in BrokerPollSystem system.

io.confluent.parallelconsumer.internal.InternalRuntimeError: Error in BrokerPollSystem system.
	at io.confluent.parallelconsumer.internal.BrokerPollSystem.supervise(BrokerPollSystem.java:100)
	at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.controlLoop(AbstractParallelEoSStreamProcessor.java:694)
	at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.lambda$supervisorLoop$5(AbstractParallelEoSStreamProcessor.java:630)
	at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
	at java.base/java.lang.Thread.run(Thread.java:832)
Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.KafkaException: User rebalance callback throws an error
	at java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122)
	at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:191)
	at io.confluent.parallelconsumer.internal.BrokerPollSystem.supervise(BrokerPollSystem.java:98)
	... 7 common frames omitted
Caused by: org.apache.kafka.common.KafkaException: User rebalance callback throws an error
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:436)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:449)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:365)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:508)
	at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1261)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1230)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210)
	at io.confluent.parallelconsumer.internal.ConsumerManager.poll(ConsumerManager.java:54)
	at io.confluent.parallelconsumer.internal.BrokerPollSystem.pollBrokerForRecords(BrokerPollSystem.java:183)
	at io.confluent.parallelconsumer.internal.BrokerPollSystem.handlePoll(BrokerPollSystem.java:140)
	at io.confluent.parallelconsumer.internal.BrokerPollSystem.controlLoop(BrokerPollSystem.java:116)
	... 5 common frames omitted
Caused by: java.lang.RuntimeException: Unexpected magic: 1
	at io.confluent.parallelconsumer.offsets.OffsetEncoding.decode(OffsetEncoding.java:52)
	at io.confluent.parallelconsumer.offsets.EncodedOffsetPair.unwrap(EncodedOffsetPair.java:75)
	at io.confluent.parallelconsumer.offsets.OffsetMapCodecManager.decodeCompressedOffsets(OffsetMapCodecManager.java:229)
	at io.confluent.parallelconsumer.offsets.OffsetMapCodecManager.deserialiseIncompleteOffsetMapFromBase64(OffsetMapCodecManager.java:161)
	at io.confluent.parallelconsumer.offsets.OffsetMapCodecManager.deserialiseIncompleteOffsetMapFromBase64(OffsetMapCodecManager.java:151)
	at io.confluent.parallelconsumer.offsets.OffsetMapCodecManager.decodePartitionState(OffsetMapCodecManager.java:165)
	at io.confluent.parallelconsumer.offsets.OffsetMapCodecManager.lambda$loadPartitionStateForAssignment$0(OffsetMapCodecManager.java:128)
	at java.base/java.util.HashMap.forEach(HashMap.java:1425)
	at io.confluent.parallelconsumer.offsets.OffsetMapCodecManager.loadPartitionStateForAssignment(OffsetMapCodecManager.java:125)
	at io.confluent.parallelconsumer.state.PartitionStateManager.onPartitionsAssigned(PartitionStateManager.java:107)
	at io.confluent.parallelconsumer.state.WorkManager.onPartitionsAssigned(WorkManager.java:98)
	at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.onPartitionsAssigned(AbstractParallelEoSStreamProcessor.java:360)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsAssigned(ConsumerCoordinator.java:293)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:430)
	... 15 common frames omitted

2022-07-04 15:14:51.998 app=opxhub-order-domain-bookings-stream environment=springEnvironment_IS_UNDEFINED DEBUG 39138 --- [     pc-control] c.p.i.AbstractParallelEoSStreamProcessor : Starting close process (state: running)...
2022-07-04 15:14:51.998 app=opxhub-order-domain-bookings-stream environment=springEnvironment_IS_UNDEFINED DEBUG 39138 --- [     pc-control] c.p.i.AbstractParallelEoSStreamProcessor : Shutting down execution pool...
2022-07-04 15:14:51.998 app=opxhub-order-domain-bookings-stream environment=springEnvironment_IS_UNDEFINED DEBUG 39138 --- [     pc-control] c.p.i.AbstractParallelEoSStreamProcessor : Awaiting worker pool termination...
2022-07-04 15:14:51.998 app=opxhub-order-domain-bookings-stream environment=springEnvironment_IS_UNDEFINED DEBUG 39138 --- [     pc-control] c.p.i.AbstractParallelEoSStreamProcessor : Still interrupted
2022-07-04 15:14:51.998 app=opxhub-order-domain-bookings-stream environment=springEnvironment_IS_UNDEFINED DEBUG 39138 --- [     pc-control] c.p.i.AbstractParallelEoSStreamProcessor : Worker pool terminated.
2022-07-04 15:14:51.998 app=opxhub-order-domain-bookings-stream environment=springEnvironment_IS_UNDEFINED DEBUG 39138 --- [     pc-control] c.p.i.AbstractParallelEoSStreamProcessor : Blocking normally until next commit time of PT4.998802S
2022-07-04 15:14:51.998 app=opxhub-order-domain-bookings-stream environment=springEnvironment_IS_UNDEFINED DEBUG 39138 --- [     pc-control] c.p.i.AbstractParallelEoSStreamProcessor : Blocking poll on work until next scheduled offset commit attempt for PT4.998802S. active threads: 0, queue: 0
2022-07-04 15:14:54.573 app=opxhub-order-domain-bookings-stream environment=springEnvironment_IS_UNDEFINED DEBUG 39138 --- [d | cre_impulse] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=cre_impulse, groupId=cre_impulse] Sending Heartbeat request with generation 15 and member id cre_impulse-f929f15d-aab5-4aaa-a7ed-0a821c208b03 to coordinator kafka-1g-us-east-1.egdp-test.aws.away.black:11302 (id: 2147483615 rack: null)
2022-07-04 15:14:54.574 app=opxhub-order-domain-bookings-stream environment=springEnvironment_IS_UNDEFINED DEBUG 39138 --- [d | cre_impulse] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=cre_impulse, groupId=cre_impulse] Sending HEARTBEAT request with header RequestHeader(apiKey=HEARTBEAT, apiVersion=4, clientId=cre_impulse, correlationId=9) and timeout 30000 to node 2147483615: HeartbeatRequestData(groupId='cre_impulse', generationId=15, memberId='cre_impulse-f929f15d-aab5-4aaa-a7ed-0a821c208b03', groupInstanceId=null)
2022-07-04 15:14:54.984 app=opxhub-order-domain-bookings-stream environment=springEnvironment_IS_UNDEFINED DEBUG 39138 --- [d | cre_impulse] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=cre_impulse, groupId=cre_impulse] Received HEARTBEAT response from node 2147483615 for request with header RequestHeader(apiKey=HEARTBEAT, apiVersion=4, clientId=cre_impulse, correlationId=9): HeartbeatResponseData(throttleTimeMs=0, errorCode=0)
2022-07-04 15:14:54.985 app=opxhub-order-domain-bookings-stream environment=springEnvironment_IS_UNDEFINED DEBUG 39138 --- [d | cre_impulse] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=cre_impulse, groupId=cre_impulse] Received successful Heartbeat response
2022-07-04 15:14:57.001 app=opxhub-order-domain-bookings-stream environment=springEnvironment_IS_UNDEFINED DEBUG 39138 --- [     pc-control] c.p.i.AbstractParallelEoSStreamProcessor : Mailbox results returned null, indicating timeToBlockFor (which was set as PT4.998802S)
2022-07-04 15:14:57.001 app=opxhub-order-domain-bookings-stream environment=springEnvironment_IS_UNDEFINED DEBUG 39138 --- [     pc-control] c.p.i.AbstractParallelEoSStreamProcessor : Committing offsets that are ready...
2022-07-04 15:14:57.002 app=opxhub-order-domain-bookings-stream environment=springEnvironment_IS_UNDEFINED DEBUG 39138 --- [     pc-control] c.p.i.AbstractParallelEoSStreamProcessor : Committing offsets that are ready...
2022-07-04 15:14:57.002 app=opxhub-order-domain-bookings-stream environment=springEnvironment_IS_UNDEFINED DEBUG 39138 --- [     pc-control] i.c.p.internal.ConsumerOffsetCommitter   : Async commit to be requested
2022-07-04 15:14:57.002 app=opxhub-order-domain-bookings-stream environment=springEnvironment_IS_UNDEFINED DEBUG 39138 --- [     pc-control] c.p.i.AbstractParallelEoSStreamProcessor : Closing and waiting for broker poll system...
2022-07-04 15:14:57.002 app=opxhub-order-domain-bookings-stream environment=springEnvironment_IS_UNDEFINED DEBUG 39138 --- [     pc-control] i.c.p.internal.BrokerPollSystem          : Requesting broker polling system to close...
2022-07-04 15:14:57.002 app=opxhub-order-domain-bookings-stream environment=springEnvironment_IS_UNDEFINED DEBUG 39138 --- [     pc-control] i.c.p.internal.BrokerPollSystem          : Poller transitioning to closing, waking up consumer
2022-07-04 15:14:57.002 app=opxhub-order-domain-bookings-stream environment=springEnvironment_IS_UNDEFINED DEBUG 39138 --- [     pc-control] i.c.p.internal.BrokerPollSystem          : Wait for loop to finish ending...
2022-07-04 15:14:57.002 app=opxhub-order-domain-bookings-stream environment=springEnvironment_IS_UNDEFINED ERROR 39138 --- [     pc-control] i.c.p.internal.BrokerPollSystem          : Execution or timeout exception waiting for broker poller thread to finish

java.util.concurrent.ExecutionException: org.apache.kafka.common.KafkaException: User rebalance callback throws an error
	at java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122)
	at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:205)
	at io.confluent.parallelconsumer.internal.BrokerPollSystem.closeAndWait(BrokerPollSystem.java:244)
	at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.doClose(AbstractParallelEoSStreamProcessor.java:506)
	at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.lambda$supervisorLoop$5(AbstractParallelEoSStreamProcessor.java:633)
	at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
	at java.base/java.lang.Thread.run(Thread.java:832)
Caused by: org.apache.kafka.common.KafkaException: User rebalance callback throws an error
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:436)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:449)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:365)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:508)
	at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1261)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1230)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210)
	at io.confluent.parallelconsumer.internal.ConsumerManager.poll(ConsumerManager.java:54)
	at io.confluent.parallelconsumer.internal.BrokerPollSystem.pollBrokerForRecords(BrokerPollSystem.java:183)
	at io.confluent.parallelconsumer.internal.BrokerPollSystem.handlePoll(BrokerPollSystem.java:140)
	at io.confluent.parallelconsumer.internal.BrokerPollSystem.controlLoop(BrokerPollSystem.java:116)
	... 5 common frames omitted
Caused by: java.lang.RuntimeException: Unexpected magic: 1
	at io.confluent.parallelconsumer.offsets.OffsetEncoding.decode(OffsetEncoding.java:52)
	at io.confluent.parallelconsumer.offsets.EncodedOffsetPair.unwrap(EncodedOffsetPair.java:75)
	at io.confluent.parallelconsumer.offsets.OffsetMapCodecManager.decodeCompressedOffsets(OffsetMapCodecManager.java:229)
	at io.confluent.parallelconsumer.offsets.OffsetMapCodecManager.deserialiseIncompleteOffsetMapFromBase64(OffsetMapCodecManager.java:161)
	at io.confluent.parallelconsumer.offsets.OffsetMapCodecManager.deserialiseIncompleteOffsetMapFromBase64(OffsetMapCodecManager.java:151)
	at io.confluent.parallelconsumer.offsets.OffsetMapCodecManager.decodePartitionState(OffsetMapCodecManager.java:165)
	at io.confluent.parallelconsumer.offsets.OffsetMapCodecManager.lambda$loadPartitionStateForAssignment$0(OffsetMapCodecManager.java:128)
	at java.base/java.util.HashMap.forEach(HashMap.java:1425)
	at io.confluent.parallelconsumer.offsets.OffsetMapCodecManager.loadPartitionStateForAssignment(OffsetMapCodecManager.java:125)
	at io.confluent.parallelconsumer.state.PartitionStateManager.onPartitionsAssigned(PartitionStateManager.java:107)
	at io.confluent.parallelconsumer.state.WorkManager.onPartitionsAssigned(WorkManager.java:98)
	at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.onPartitionsAssigned(AbstractParallelEoSStreamProcessor.java:360)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsAssigned(ConsumerCoordinator.java:293)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:430)
	... 15 common frames omitted

2022-07-04 15:14:57.641 app=opxhub-order-domain-bookings-stream environment=springEnvironment_IS_UNDEFINED DEBUG 39138 --- [d | cre_impulse] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=cre_impulse, groupId=cre_impulse] Sending Heartbeat request with generation 15 and member id cre_impulse-f929f15d-aab5-4aaa-a7ed-0a821c208b03 to coordinator kafka-1g-us-east-1.egdp-test.aws.away.black:11302 (id: 2147483615 rack: null)
2022-07-04 15:14:57.642 app=opxhub-order-domain-bookings-stream environment=springEnvironment_IS_UNDEFINED DEBUG 39138 --- [d | cre_impulse] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=cre_impulse, groupId=cre_impulse] Sending HEARTBEAT request with header RequestHeader(apiKey=HEARTBEAT, apiVersion=4, clientId=cre_impulse, correlationId=10) and timeout 30000 to node 2147483615: HeartbeatRequestData(groupId='cre_impulse', generationId=15, memberId='cre_impulse-f929f15d-aab5-4aaa-a7ed-0a821c208b03', groupInstanceId=null)

milansanjeev avatar Jul 04 '22 10:07 milansanjeev

Hi there! :) Welcome to the project… Have you used this consumer group Id before? If so, with what? Can you try a fresh unique consumer group id?

astubbs avatar Jul 04 '22 11:07 astubbs

Thank you so much @astubbs.

Yes the consumer group id was used earlier with kstream. I have changed it to new unique and error get resolved.

But now I am getting below error and after this error my application get closed.

2022-07-04 18:10:41.816 app=opxhub-order-domain-bookings-stream environment=springEnvironment_IS_UNDEFINED DEBUG 57504 --- [ pc-broker-poll] i.c.p.internal.BrokerPollSystem          : Long polling broker with timeout PT2S, might appear to sleep here if subs are paused, or no data available on broker. Run state: running
2022-07-04 18:10:41.816 app=opxhub-order-domain-bookings-stream environment=springEnvironment_IS_UNDEFINED DEBUG 57504 --- [ pc-broker-poll] i.c.p.internal.ConsumerManager           : Poll starting with timeout: PT2S
2022-07-04 18:10:41.817 app=opxhub-order-domain-bookings-stream environment=springEnvironment_IS_UNDEFINED ERROR 57504 --- [     pc-control] c.p.i.AbstractParallelEoSStreamProcessor : Error from poll control thread, will attempt controlled shutdown, then rethrow. Error: null

java.lang.NullPointerException: null
	at java.base/java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1693)
	at io.confluent.parallelconsumer.state.ShardManager.addWorkContainer(ShardManager.java:139)
	at io.confluent.parallelconsumer.state.PartitionStateManager.maybeRegisterNewRecordAsWork(PartitionStateManager.java:361)
	at io.confluent.parallelconsumer.state.PartitionStateManager.maybeRegisterNewRecordAsWork(PartitionStateManager.java:339)
	at io.confluent.parallelconsumer.state.WorkManager.registerWork(WorkManager.java:128)
	at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.processWorkCompleteMailBox(AbstractParallelEoSStreamProcessor.java:953)
	at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.controlLoop(AbstractParallelEoSStreamProcessor.java:671)
	at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.lambda$supervisorLoop$5(AbstractParallelEoSStreamProcessor.java:630)
	at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
	at java.base/java.lang.Thread.run(Thread.java:832)

2022-07-04 18:10:41.817 app=opxhub-order-domain-bookings-stream environment=springEnvironment_IS_UNDEFINED DEBUG 57504 --- [     pc-control] c.p.i.AbstractParallelEoSStreamProcessor : Starting close process (state: running)...
2022-07-04 18:10:41.817 app=opxhub-order-domain-bookings-stream environment=springEnvironment_IS_UNDEFINED DEBUG 57504 --- [     pc-control] c.p.i.AbstractParallelEoSStreamProcessor : Shutting down execution pool...
2022-07-04 18:10:41.817 app=opxhub-order-domain-bookings-stream environment=springEnvironment_IS_UNDEFINED DEBUG 57504 --- [     pc-control] c.p.i.AbstractParallelEoSStreamProcessor : Awaiting worker pool termination...
2022-07-04 18:10:41.817 app=opxhub-order-domain-bookings-stream environment=springEnvironment_IS_UNDEFINED DEBUG 57504 --- [     pc-control] c.p.i.AbstractParallelEoSStreamProcessor : Still interrupted
2022-07-04 18:10:41.817 app=opxhub-order-domain-bookings-stream environment=springEnvironment_IS_UNDEFINED DEBUG 57504 --- [     pc-control] c.p.i.AbstractParallelEoSStreamProcessor : Worker pool terminated.
2022-07-04 18:10:41.817 app=opxhub-order-domain-bookings-stream environment=springEnvironment_IS_UNDEFINED DEBUG 57504 --- [     pc-control] c.p.i.AbstractParallelEoSStreamProcessor : Blocking normally until next commit time of PT1.237956S
2022-07-04 18:10:41.817 app=opxhub-order-domain-bookings-stream environment=springEnvironment_IS_UNDEFINED DEBUG 57504 --- [     pc-control] c.p.i.AbstractParallelEoSStreamProcessor : Blocking poll on work until next scheduled offset commit attempt for PT1.237956S. active threads: 0, queue: 0
2022-07-04 18:10:42.618 app=opxhub-order-domain-bookings-stream environment=springEnvironment_IS_UNDEFINED DEBUG 57504 --- [ pc-broker-poll] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=cre_impulse, groupId=CRE_IMPULSE_USE] Received FETCH response from node 12 for request with header RequestHeader(apiKey=FETCH, apiVersion=12, clientId=cre_impulse, correlationId=78): FetchResponseData(throttleTimeMs=0, errorCode=0, sessionId=0, responses=[FetchableTopicResponse(topic='commerce_order_eg_domain_event_v3', partitionResponses=[FetchablePartitionResponse(partition=1, errorCode=0, highWatermark=514020, lastStableOffset=514020, logStartOffset=489163, divergingEpoch=EpochEndOffset(epoch=-1, endOffset=-1), currentLeader=LeaderIdAndEpoch(leaderId=-1, leaderEpoch=-1), snapshotId=SnapshotId(endOffset=-1, epoch=-1), abortedTransactions=null, preferredReadReplica=-1, recordSet=MemoryRecords(size=0, buffer=java.nio.HeapByteBuffer[pos=0 lim=0 cap=3]))])])
2022-07-04 18:10:42.618 app=opxhub-order-domain-bookings-stream environment=springEnvironment_IS_UNDEFINED DEBUG 57504 --- [ pc-broker-poll] o.a.kafka.clients.FetchSessionHandler    : [Consumer clientId=cre_impulse, groupId=CRE_IMPULSE_USE] Node 12 sent a full fetch response with 1 response partition(s)
2022-07-04 18:10:42.618 app=opxhub-order-domain-bookings-stream environment=springEnvironment_IS_UNDEFINED DEBUG 57504 --- [ pc-broker-poll] o.a.k.c.consumer.internals.Fetcher       : [Consumer clientId=cre_impulse, groupId=CRE_IMPULSE_USE] Fetch READ_UNCOMMITTED at offset 514020 for partition commerce_order_eg_domain_event_v3-1 returned fetch data (error=NONE, highWaterMark=514020, lastStableOffset = 514020, logStartOffset = 489163, preferredReadReplica = absent, abortedTransactions = null, divergingEpoch =Optional.empty, recordsSizeInBytes=0)
2022-07-04 18:10:42.618 app=opxhub-order-domain-bookings-stream environment=springEnvironment_IS_UNDEFINED DEBUG 57504 --- [ pc-broker-poll] o.a.k.c.consumer.internals.Fetcher       : [Consumer clientId=cre_impulse, groupId=CRE_IMPULSE_USE] Added READ_UNCOMMITTED fetch request for partition commerce_order_eg_domain_event_v3-1 at position F

milansanjeev avatar Jul 04 '22 12:07 milansanjeev

Are you using KEY ordering and have records with NULL keys? If so:

  • #318

#318 is also fixed in:

  • #315

Which is at the top of the merge queue (I just got back from PTO and some other stuff, so there's a bit of domino queue of new stuff to get merged :)

astubbs avatar Jul 04 '22 13:07 astubbs

Progress?

astubbs avatar Jul 05 '22 07:07 astubbs

Yes @astubbs

The issue has been resolved after making an UNORDERED ordering.

milansanjeev avatar Jul 05 '22 08:07 milansanjeev

Ok, good to hear! Do you know if you're using null keys? Just want to confirm it is indeed the same issue.

astubbs avatar Jul 05 '22 08:07 astubbs

@astubbs yes you were right. Key was null in my case.

milansanjeev avatar Jul 05 '22 10:07 milansanjeev

If we enable PC and reuse the existing consumer group, is that possible to avoid the error, such as Unexpected magic: 2. To create a new consumer group will require copying the offsets from previous consumer group. Otherwise, we will lose or duplicate the messages during the migration

Any ideas or suggestions? Thanks!

colinkuo avatar Jan 12 '23 20:01 colinkuo

What's the use case for reusing the group? Is it to try to use the same offsets?

Interesting.

So we have two situations:

  1. mistaken use of same group
  2. required use of same group

How should we distinguish between the two?

Since 2 is probably the exception, how about an option to explicitly "ignore" any existing data in the metadata payload? Which is off my default?

cc @nachomdo

astubbs avatar Jan 12 '23 20:01 astubbs

Thanks for the quick response @astubbs

Yes, we wanted to explore whether we can reuse the same offsets. I'd consider that It's similar to a new completely different Kafka client application using an existing consumer group. Even it's a different Kafka client application, it doesn't prevent the new application from reusing the offsets. PC uses Kafka client under the hood, which could be acting the same behavior like a native Kafka client.

My two cents

colinkuo avatar Jan 12 '23 21:01 colinkuo

It's absolutely no problem to use the same offsets as far as PC is concerned. I just had assumed it would be a mistake :)

We'll probably add an option that you have to turn on, for it to be ignored (although it'd only be encountered once).

astubbs avatar Jan 12 '23 22:01 astubbs

When you say to add an option for it to be ignored, does it mean the we don't see the error once we turn the option on. Also, PC will resume consuming the messages from the existing offsets of the consumer group? Thanks!

colinkuo avatar Jan 12 '23 22:01 colinkuo

we don't see the error once we turn the option o

yes, just a warning. and it'll only show the first time, going forward it wouldn't show after PC installs it's own metadata.

Also, PC will resume consuming the messages from the existing offsets of the consumer group?

yup!

astubbs avatar Jan 13 '23 09:01 astubbs

we don't see the error once we turn the option o

yes, just a warning. and it'll only show the first time, going forward it wouldn't show after PC installs it's own metadata.

Also, PC will resume consuming the messages from the existing offsets of the consumer group?

yup!

That would be great! Thanks

colinkuo avatar Jan 13 '23 19:01 colinkuo