spring-cloud-stream icon indicating copy to clipboard operation
spring-cloud-stream copied to clipboard

Consumer with manual ack and transactional producer : exception on acknowledgment

Open CEDDM opened this issue 7 months ago • 0 comments

Describe the issue I'm using SCDF 2.11 with Kafka and I'm trying to implement processors in which we don't want to lose any message. So we want that our processors use transactional producer and consumer which acks messages only if they are processed OK or, when an error occurs, if a message has been put in DLQ.

But when I have an error in the processor, the acks of the consumed offset failed after creating the message in the DLQ so the offset is consumed again and again and I end up with many identical message in th DLQ.

To Reproduce Here is my configuration :

# Producer config
spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix=my-tx-
spring.cloud.stream.kafka.binder.transaction.producer.configuration.acks=all
spring.cloud.stream.kafka.binder.transaction.producer.configuration.retries=5
spring.cloud.stream.kafka.binder.transaction.producer.configuration.max.block.ms=5000
spring.cloud.stream.kafka.binder.transaction.producer.configuration.delivery.timeout.ms=4500
spring.cloud.stream.kafka.binder.transaction.producer.configuration.request.timeout.ms=2000
spring.cloud.stream.kafka.binder.transaction.producer.configuration.linger.ms=0
spring.cloud.stream.kafka.binder.transaction.producer.configuration.batch.size=0

# Consumer config
spring.cloud.stream.kafka.binder.consumer.isolation.level=read_committed
spring.cloud.stream.kafka.bindings.input.consumer.ackMode=MANUAL_IMMEDIATE
spring.cloud.stream.kafka.bindings.input.consumer.enableDlq=true
spring.cloud.stream.bindings.input.group=mygroup

Here is an extract of the stacktrace :

o.s.k.t.KafkaTransactionManager          : Participating in existing transaction
o.s.c.s.b.k.KafkaMessageChannelBinder    : Sent to DLQ  a message with key='mykey' and payload='byte[1281]' received from 0: mytopic@152
o.s.t.support.TransactionTemplate        : Initiating transaction rollback on application exception

java.lang.NullPointerException: Cannot invoke "org.springframework.kafka.support.Acknowledgment.acknowledge()" because the return value of "org.springframework.messaging.MessageHeaders.get(Object, java.lang.Class)" is null
    at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$DlqSender.sendToDlq(KafkaMessageChannelBinder.java:1639) ~[spring-cloud-stream-binder-kafka-4.1.5.jar:4.1.5]
    at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.lambda$handleRecordForDlq$10(KafkaMessageChannelBinder.java:1273) ~[spring-cloud-stream-binder-kafka-4.1.5.jar:4.1.5]
    at org.springframework.transaction.support.TransactionOperations.lambda$executeWithoutResult$0(TransactionOperations.java:68) ~[spring-tx-6.1.16.jar:6.1.16]
    at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:140) ~[spring-tx-6.1.16.jar:6.1.16]
    at org.springframework.transaction.support.TransactionOperations.executeWithoutResult(TransactionOperations.java:67) ~[spring-tx-6.1.16.jar:6.1.16]
    at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.handleRecordForDlq(KafkaMessageChannelBinder.java:1272) ~[spring-cloud-stream-binder-kafka-4.1.5.jar:4.1.5]
    at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.lambda$getErrorMessageHandler$6(KafkaMessageChannelBinder.java:1136) ~[spring-cloud-stream-binder-kafka-4.1.5.jar:

Version of the framework

  • Spring Boot 3.3.7
  • Spring cloud stream binder Kafka 4.1.5
  • Spring Kafka 3.2.6

Additional context It seems that here in KafkaMessageChannelBinder, the ErrorMessage is created without the originalMessage : https://github.com/spring-cloud/spring-cloud-stream/blob/3a937debfa7e05560c9cc2158cb57688098f2090/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaMessageChannelBinder.java#L732 So headers of the original message cannot be retrieved here : https://github.com/spring-cloud/spring-cloud-stream/blob/3a937debfa7e05560c9cc2158cb57688098f2090/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaMessageChannelBinder.java#L1265 And then a NPE is raised here : https://github.com/spring-cloud/spring-cloud-stream/blob/3a937debfa7e05560c9cc2158cb57688098f2090/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaMessageChannelBinder.java#L1630

CEDDM avatar Jun 12 '25 08:06 CEDDM