parallel-consumer icon indicating copy to clipboard operation
parallel-consumer copied to clipboard

Is There Any Exception Handler?

Open 10000-ki opened this issue 3 years ago • 4 comments

If an error occurs while the conuser polls the message example deserialize exception

java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.RecordDeserializationException: Error deserializing key/value for partition crash-android-21 at offset 1047530996. If needed, please seek past the record to continue consumption.
  at java.util.concurrent.FutureTask.report(FutureTask.java:122) ~[?:?]
  at java.util.concurrent.FutureTask.get(FutureTask.java:205) ~[?:?]
  at io.confluent.parallelconsumer.internal.BrokerPollSystem.closeAndWait(BrokerPollSystem.java:247) ~[parallel-consumer-core-0.5.2.3.jar:?]
  at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.doClose(AbstractParallelEoSStreamProcessor.java:529) ~[parallel-consumer-core-0.5.2.3.jar:?]
  at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.lambda$supervisorLoop$5(AbstractParallelEoSStreamProcessor.java:665) ~[parallel-consumer-core-0.5.2.3.jar:?]
  at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?]
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?]
  at java.lang.Thread.run(Thread.java:833) ~[?:?]
Caused by: org.apache.kafka.common.errors.RecordDeserializationException: Error deserializing key/value for partition crash-android-21 at offset 1047530996. If needed, please seek past the record to continue consumption.
  at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1435) ~[kafka-clients-3.3.1.jar:?]
  at org.apache.kafka.clients.consumer.internals.Fetcher.access$3400(Fetcher.java:133) ~[kafka-clients-3.3.1.jar:?]
  at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1658) ~[kafka-clients-3.3.1.jar:?]
  at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1900(Fetcher.java:1494) ~[kafka-clients-3.3.1.jar:?]
  at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:716) ~[kafka-clients-3.3.1.jar:?]
  at org.apache.kafka.clients.consumer.internals.Fetcher.collectFetch(Fetcher.java:682) ~[kafka-clients-3.3.1.jar:?]
  at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1291) ~[kafka-clients-3.3.1.jar:?]
  at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1247) ~[kafka-clients-3.3.1.jar:?]
  at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1220) ~[kafka-clients-3.3.1.jar:?]
  at io.confluent.parallelconsumer.internal.ConsumerManager.poll(ConsumerManager.java:54) ~[parallel-consumer-core-0.5.2.3.jar:?]
  at io.confluent.parallelconsumer.internal.BrokerPollSystem.pollBrokerForRecords(BrokerPollSystem.java:183) ~[parallel-consumer-core-0.5.2.3.jar:?]
  at io.confluent.parallelconsumer.internal.BrokerPollSystem.handlePoll(BrokerPollSystem.java:140) ~[parallel-consumer-core-0.5.2.3.jar:?]
  at io.confluent.parallelconsumer.internal.BrokerPollSystem.controlLoop(BrokerPollSystem.java:116) ~[parallel-consumer-core-0.5.2.3.jar:?]
  ... 4 more
Caused by: java.lang.IllegalArgumentException: Location must not be null

the polling thread die. And after that, it doesn't work normally.

is there any exception handler like apache kafka

ConcurrentKafkaListenerContainerFactory<>();
  configurer.configure(factory, kafkaConsumerFactory);
  factory.setErrorHandler(new SeekToCurrentErrorHandler());

like this

10000-ki avatar Feb 20 '23 09:02 10000-ki

Hi,

As far as i know there is no additional error handling in Parallel Consumer for errors stemming from Kafka Consumer poll calls - like the deserialization exception above.

The Apache Kafka consumer doesn't have standard exception handlers for this scenario - the example above using ConcurrentKafkaListenerContainerFactory is from the Spring Framework.

rkolesnev avatar Feb 20 '23 18:02 rkolesnev

thank you for reply I wonder if there is a plan to add error handler processing or if we need to customize it.

10000-ki avatar Feb 21 '23 09:02 10000-ki

I believe there is - eventually - it makes sense to add it. But i dont have any specific time frame for it.

Here are couple of related issues - https://github.com/confluentinc/parallel-consumer/issues/195 and https://github.com/confluentinc/parallel-consumer/issues/304.

#304 suggests an one of the options to get around it - defer deserialization to the user function - Consume as byte[],byte[] and handle deserialization and exceptions yourself - there you can either skip failed message or implement DLQ etc... as opposed to wrapping Consumer and handling exceptions on poll().

rkolesnev avatar Feb 21 '23 09:02 rkolesnev

https://developer.confluent.io/courses/kafka-streams/hands-on-error-handling/ Borrowing the idea from ProductionExceptionHandler of Kafka Streams also looks good. Just let users to decide either fail the processing or ignore the problematic messages

colinkuo avatar Sep 11 '23 19:09 colinkuo