parallel-consumer icon indicating copy to clipboard operation
parallel-consumer copied to clipboard

Concurrent modification exception when closing

Open krvajal opened this issue 2 years ago • 4 comments

There seems to be a race condition when closing

Exception in thread "Thread-7" java.util.concurrent.ExecutionException: java.util.concurrent.ExecutionException: java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access. currentThread(name: pc-broker-poll, id: 38) otherThread(id: 27)
	at java.base/java.util.concurrent.FutureTask.report(Unknown Source)
	at java.base/java.util.concurrent.FutureTask.get(Unknown Source)
	at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.waitForClose(AbstractParallelEoSStreamProcessor.java:557)
	at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.close(AbstractParallelEoSStreamProcessor.java:531)
	at io.confluent.parallelconsumer.internal.DrainingCloseable.closeDrainFirst(DrainingCloseable.java:40)
	at com.sinch.engage.chatlayer.ConsumerProxy.shutdown(ConsumerProxy.java:93)
	at java.base/java.lang.Thread.run(Unknown Source)
Caused by: java.util.concurrent.ExecutionException: java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access. currentThread(name: pc-broker-poll, id: 38) otherThread(id: 27)
	at java.base/java.util.concurrent.FutureTask.report(Unknown Source)
	at java.base/java.util.concurrent.FutureTask.get(Unknown Source)
	at io.confluent.parallelconsumer.internal.BrokerPollSystem.closeAndWait(BrokerPollSystem.java:274)
	at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.doClose(AbstractParallelEoSStreamProcessor.java:624)
	at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.lambda$supervisorLoop$10(AbstractParallelEoSStreamProcessor.java:741)
	at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	... 1 more
Caused by: java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access. currentThread(name: pc-broker-poll, id: 38) otherThread(id: 27)
	at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:2551)
	at org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:2532)
	at org.apache.kafka.clients.consumer.KafkaConsumer.groupMetadata(KafkaConsumer.java:2324)
	at io.confluent.parallelconsumer.internal.ConsumerManager.updateMetadataCache(ConsumerManager.java:69)
	at io.confluent.parallelconsumer.internal.ConsumerManager.poll(ConsumerManager.j…

krvajal avatar Mar 01 '24 06:03 krvajal

Could you provide more details / reproducible example?

I have never ran into this error neither when testing things in Parallel Consumer manually or in integration tests - so curious if there were some specific steps that lead to this error or some edge case setup etc.

And some general information as well please: Version of Parallel Consumer Version of Kafka clients Configuration of Parallel Consumer (commit mode, consume or consume+produce, transactions etc) Logs leading up to the exception Was it being closed cleanly or on exception?

rkolesnev avatar Mar 01 '24 15:03 rkolesnev

Here is the info requested:

Version of Parallel Consumer: io.confluent.parallelconsumer:parallel-consumer-core:0.5.2.6 Version of Kafka clients: the one included in the the above version of the parallel consumer

Configuration of Parallel Consumer: parallel.consumer.commit.mode=PERIODIC_CONSUMER_ASYNCHRONOUS parallel.consumer.seconds.between.commits=5 parallel.consumer.max.concurrency=200 parallel.consumer.order=KEY

Kafka consumer config partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor

Was it being closed cleanly or on exception?: Closed cleanly

krvajal avatar Mar 05 '24 17:03 krvajal

@krvajal Thanks for the info. Could you provide a bit more :) Is it a one-off issue or is it a repeating behaviour? Do you use producer or consumer only? Do you have full stack trace and maybe some logs just prior to the exception?

Is there anything in the logs to indicate that Consumer was in bad state during shutdown or was getting partition revoked during shutdown or something like that? At the point indicated in the stack trace Consumer should be paused, and closed by the BrokerPoller - but it seems it was still being used by some other thread (thread id 27) at that time.

I think there were issues using Consumer with CooperativeStickyAssignor as well - i need to investigate if that is relevant - potentially partition revocation is handled differently with that assignor and not handled correctly in Parallel Consumer - but i need more info / investigation on that.

rkolesnev avatar Mar 07 '24 12:03 rkolesnev

@krvajal - Hey - can you please provide more information as per above?

rkolesnev avatar Mar 26 '24 11:03 rkolesnev

Stale, closing - reopen if re-occurs or more information is provided.

rkolesnev avatar May 23 '24 08:05 rkolesnev