Is There Any Exception Handler?
If an error occurs while the conuser polls the message example deserialize exception
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.RecordDeserializationException: Error deserializing key/value for partition crash-android-21 at offset 1047530996. If needed, please seek past the record to continue consumption.
at java.util.concurrent.FutureTask.report(FutureTask.java:122) ~[?:?]
at java.util.concurrent.FutureTask.get(FutureTask.java:205) ~[?:?]
at io.confluent.parallelconsumer.internal.BrokerPollSystem.closeAndWait(BrokerPollSystem.java:247) ~[parallel-consumer-core-0.5.2.3.jar:?]
at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.doClose(AbstractParallelEoSStreamProcessor.java:529) ~[parallel-consumer-core-0.5.2.3.jar:?]
at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.lambda$supervisorLoop$5(AbstractParallelEoSStreamProcessor.java:665) ~[parallel-consumer-core-0.5.2.3.jar:?]
at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?]
at java.lang.Thread.run(Thread.java:833) ~[?:?]
Caused by: org.apache.kafka.common.errors.RecordDeserializationException: Error deserializing key/value for partition crash-android-21 at offset 1047530996. If needed, please seek past the record to continue consumption.
at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1435) ~[kafka-clients-3.3.1.jar:?]
at org.apache.kafka.clients.consumer.internals.Fetcher.access$3400(Fetcher.java:133) ~[kafka-clients-3.3.1.jar:?]
at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1658) ~[kafka-clients-3.3.1.jar:?]
at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1900(Fetcher.java:1494) ~[kafka-clients-3.3.1.jar:?]
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:716) ~[kafka-clients-3.3.1.jar:?]
at org.apache.kafka.clients.consumer.internals.Fetcher.collectFetch(Fetcher.java:682) ~[kafka-clients-3.3.1.jar:?]
at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1291) ~[kafka-clients-3.3.1.jar:?]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1247) ~[kafka-clients-3.3.1.jar:?]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1220) ~[kafka-clients-3.3.1.jar:?]
at io.confluent.parallelconsumer.internal.ConsumerManager.poll(ConsumerManager.java:54) ~[parallel-consumer-core-0.5.2.3.jar:?]
at io.confluent.parallelconsumer.internal.BrokerPollSystem.pollBrokerForRecords(BrokerPollSystem.java:183) ~[parallel-consumer-core-0.5.2.3.jar:?]
at io.confluent.parallelconsumer.internal.BrokerPollSystem.handlePoll(BrokerPollSystem.java:140) ~[parallel-consumer-core-0.5.2.3.jar:?]
at io.confluent.parallelconsumer.internal.BrokerPollSystem.controlLoop(BrokerPollSystem.java:116) ~[parallel-consumer-core-0.5.2.3.jar:?]
... 4 more
Caused by: java.lang.IllegalArgumentException: Location must not be null
the polling thread die. And after that, it doesn't work normally.
is there any exception handler like apache kafka
ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory);
factory.setErrorHandler(new SeekToCurrentErrorHandler());
like this
Hi,
As far as i know there is no additional error handling in Parallel Consumer for errors stemming from Kafka Consumer poll calls - like the deserialization exception above.
The Apache Kafka consumer doesn't have standard exception handlers for this scenario - the example above using ConcurrentKafkaListenerContainerFactory is from the Spring Framework.
thank you for reply I wonder if there is a plan to add error handler processing or if we need to customize it.
I believe there is - eventually - it makes sense to add it. But i dont have any specific time frame for it.
Here are couple of related issues - https://github.com/confluentinc/parallel-consumer/issues/195 and https://github.com/confluentinc/parallel-consumer/issues/304.
#304 suggests an one of the options to get around it - defer deserialization to the user function - Consume as byte[],byte[] and handle deserialization and exceptions yourself - there you can either skip failed message or implement DLQ etc... as opposed to wrapping Consumer and handling exceptions on poll().
https://developer.confluent.io/courses/kafka-streams/hands-on-error-handling/
Borrowing the idea from ProductionExceptionHandler of Kafka Streams also looks good. Just let users to decide either fail the processing or ignore the problematic messages