Consumer with manual ack and transactional producer : exception on acknowledgment
Describe the issue
I'm using SCDF 2.11 with Kafka and I'm trying to implement processors in which we don't want to lose any message.
So we want that our processors use transactional producer and consumer which acks messages only if they are processed OK or, when an error occurs, if a message has been put in DLQ.
But when I have an error in the processor, the acks of the consumed offset failed after creating the message in the DLQ so the offset is consumed again and again and I end up with many identical message in th DLQ.
To Reproduce Here is my configuration :
# Producer config
spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix=my-tx-
spring.cloud.stream.kafka.binder.transaction.producer.configuration.acks=all
spring.cloud.stream.kafka.binder.transaction.producer.configuration.retries=5
spring.cloud.stream.kafka.binder.transaction.producer.configuration.max.block.ms=5000
spring.cloud.stream.kafka.binder.transaction.producer.configuration.delivery.timeout.ms=4500
spring.cloud.stream.kafka.binder.transaction.producer.configuration.request.timeout.ms=2000
spring.cloud.stream.kafka.binder.transaction.producer.configuration.linger.ms=0
spring.cloud.stream.kafka.binder.transaction.producer.configuration.batch.size=0
# Consumer config
spring.cloud.stream.kafka.binder.consumer.isolation.level=read_committed
spring.cloud.stream.kafka.bindings.input.consumer.ackMode=MANUAL_IMMEDIATE
spring.cloud.stream.kafka.bindings.input.consumer.enableDlq=true
spring.cloud.stream.bindings.input.group=mygroup
Here is an extract of the stacktrace :
o.s.k.t.KafkaTransactionManager : Participating in existing transaction
o.s.c.s.b.k.KafkaMessageChannelBinder : Sent to DLQ a message with key='mykey' and payload='byte[1281]' received from 0: mytopic@152
o.s.t.support.TransactionTemplate : Initiating transaction rollback on application exception
java.lang.NullPointerException: Cannot invoke "org.springframework.kafka.support.Acknowledgment.acknowledge()" because the return value of "org.springframework.messaging.MessageHeaders.get(Object, java.lang.Class)" is null
at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$DlqSender.sendToDlq(KafkaMessageChannelBinder.java:1639) ~[spring-cloud-stream-binder-kafka-4.1.5.jar:4.1.5]
at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.lambda$handleRecordForDlq$10(KafkaMessageChannelBinder.java:1273) ~[spring-cloud-stream-binder-kafka-4.1.5.jar:4.1.5]
at org.springframework.transaction.support.TransactionOperations.lambda$executeWithoutResult$0(TransactionOperations.java:68) ~[spring-tx-6.1.16.jar:6.1.16]
at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:140) ~[spring-tx-6.1.16.jar:6.1.16]
at org.springframework.transaction.support.TransactionOperations.executeWithoutResult(TransactionOperations.java:67) ~[spring-tx-6.1.16.jar:6.1.16]
at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.handleRecordForDlq(KafkaMessageChannelBinder.java:1272) ~[spring-cloud-stream-binder-kafka-4.1.5.jar:4.1.5]
at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.lambda$getErrorMessageHandler$6(KafkaMessageChannelBinder.java:1136) ~[spring-cloud-stream-binder-kafka-4.1.5.jar:
Version of the framework
- Spring Boot 3.3.7
- Spring cloud stream binder Kafka 4.1.5
- Spring Kafka 3.2.6
Additional context It seems that here in KafkaMessageChannelBinder, the ErrorMessage is created without the originalMessage : https://github.com/spring-cloud/spring-cloud-stream/blob/3a937debfa7e05560c9cc2158cb57688098f2090/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaMessageChannelBinder.java#L732 So headers of the original message cannot be retrieved here : https://github.com/spring-cloud/spring-cloud-stream/blob/3a937debfa7e05560c9cc2158cb57688098f2090/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaMessageChannelBinder.java#L1265 And then a NPE is raised here : https://github.com/spring-cloud/spring-cloud-stream/blob/3a937debfa7e05560c9cc2158cb57688098f2090/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaMessageChannelBinder.java#L1630