Concurrent modification exception when closing
There seems to be a race condition when closing
Exception in thread "Thread-7" java.util.concurrent.ExecutionException: java.util.concurrent.ExecutionException: java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access. currentThread(name: pc-broker-poll, id: 38) otherThread(id: 27)
at java.base/java.util.concurrent.FutureTask.report(Unknown Source)
at java.base/java.util.concurrent.FutureTask.get(Unknown Source)
at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.waitForClose(AbstractParallelEoSStreamProcessor.java:557)
at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.close(AbstractParallelEoSStreamProcessor.java:531)
at io.confluent.parallelconsumer.internal.DrainingCloseable.closeDrainFirst(DrainingCloseable.java:40)
at com.sinch.engage.chatlayer.ConsumerProxy.shutdown(ConsumerProxy.java:93)
at java.base/java.lang.Thread.run(Unknown Source)
Caused by: java.util.concurrent.ExecutionException: java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access. currentThread(name: pc-broker-poll, id: 38) otherThread(id: 27)
at java.base/java.util.concurrent.FutureTask.report(Unknown Source)
at java.base/java.util.concurrent.FutureTask.get(Unknown Source)
at io.confluent.parallelconsumer.internal.BrokerPollSystem.closeAndWait(BrokerPollSystem.java:274)
at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.doClose(AbstractParallelEoSStreamProcessor.java:624)
at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.lambda$supervisorLoop$10(AbstractParallelEoSStreamProcessor.java:741)
at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
... 1 more
Caused by: java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access. currentThread(name: pc-broker-poll, id: 38) otherThread(id: 27)
at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:2551)
at org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:2532)
at org.apache.kafka.clients.consumer.KafkaConsumer.groupMetadata(KafkaConsumer.java:2324)
at io.confluent.parallelconsumer.internal.ConsumerManager.updateMetadataCache(ConsumerManager.java:69)
at io.confluent.parallelconsumer.internal.ConsumerManager.poll(ConsumerManager.j…
Could you provide more details / reproducible example?
I have never ran into this error neither when testing things in Parallel Consumer manually or in integration tests - so curious if there were some specific steps that lead to this error or some edge case setup etc.
And some general information as well please: Version of Parallel Consumer Version of Kafka clients Configuration of Parallel Consumer (commit mode, consume or consume+produce, transactions etc) Logs leading up to the exception Was it being closed cleanly or on exception?
Here is the info requested:
Version of Parallel Consumer: io.confluent.parallelconsumer:parallel-consumer-core:0.5.2.6
Version of Kafka clients: the one included in the the above version of the parallel consumer
Configuration of Parallel Consumer:
parallel.consumer.commit.mode=PERIODIC_CONSUMER_ASYNCHRONOUS
parallel.consumer.seconds.between.commits=5
parallel.consumer.max.concurrency=200
parallel.consumer.order=KEY
Kafka consumer config
partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor
Was it being closed cleanly or on exception?: Closed cleanly
@krvajal Thanks for the info. Could you provide a bit more :) Is it a one-off issue or is it a repeating behaviour? Do you use producer or consumer only? Do you have full stack trace and maybe some logs just prior to the exception?
Is there anything in the logs to indicate that Consumer was in bad state during shutdown or was getting partition revoked during shutdown or something like that? At the point indicated in the stack trace Consumer should be paused, and closed by the BrokerPoller - but it seems it was still being used by some other thread (thread id 27) at that time.
I think there were issues using Consumer with CooperativeStickyAssignor as well - i need to investigate if that is relevant - potentially partition revocation is handled differently with that assignor and not handled correctly in Parallel Consumer - but i need more info / investigation on that.
@krvajal - Hey - can you please provide more information as per above?
Stale, closing - reopen if re-occurs or more information is provided.