Spring Cloud Stream routing-to channel (using spring.cloud.stream.function.definition) gives ClassCastException
Describe the issue
Publishing messages using .setHeader("spring.cloud.stream.sendto.destination", "output binding name here") gives me the following exception:
Caused by: java.lang.ClassCastException: class com.example.airplane.FlightEvent cannot be cast to class [B (com.example.airplane.FlightEvent is in unnamed module of loader 'app'; [B is in module java.base of loader 'bootstrap')
at org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerializer.java:19) ~[kafka-clients-3.3.2.jar:na]
at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62) ~[kafka-clients-3.3.2.jar:na]
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:1005) ~[kafka-clients-3.3.2.jar:na]
... 59 common frames omitted
More details:
I have a functionRouter that routes messages to different functions based on header. These functions in turn process the message and return a result which I need to publish to another topic.
application.properties
spring.application.name=airplane
server.port=8095
spring.cloud.stream.bindings.planeEventProducer.destination=plane-events
spring.cloud.stream.bindings.flightEventProducer.destination=flight-events
spring.cloud.stream.bindings.landEventProducer.destination=land-events
spring.cloud.stream.bindings.arrivalEventProducer.destination=arrival-events
spring.cloud.stream.output-bindings=replayProducer;planeEventProducer;flightEventProducer;landEventProducer;arrivalEventProducer
spring.cloud.stream.bindings.functionRouter-in-0.destination=plane-events,flight-events,land-events,arrival-events
spring.cloud.stream.bindings.functionRouter-in-0.group=airplane
spring.cloud.stream.bindings.functionRouter-in-0.consumer.concurrency=8
spring.cloud.stream.function.definition=functionRouter;
spring.cloud.stream.function.routing.enabled=true
spring.cloud.function.routing-expression=headers['Type']
spring.cloud.stream.kafka.binder.autoCreateTopics=true
spring.cloud.stream.kafka.binder.brokers=localhost:9092
spring.cloud.stream.kafka.binder.replication-factor=1
spring.cloud.stream.kafka.binder.min-partition-count=8
AirplaneApplication.java
package com.example.airplane;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import java.util.function.Consumer;
import java.util.function.Function;
@SpringBootApplication
public class AirplaneApplication {
public static void main(String[] args) {
SpringApplication.run(AirplaneApplication.class, args);
}
Logger logger = LoggerFactory.getLogger(AirplaneApplication.class);
@Bean("ArrivalEvent")
public Consumer<Message<ArrivalEvent>> arrivalEventConsumer() {
return msg -> {
logger.info("ArrivalEvent consumed: {}", msg.getPayload());
};
}
@Bean("LandEvent")
public Function<Message<LandEvent>, Message<ArrivalEvent>> landEventProcessor() {
return landEventMessage -> {
ArrivalEvent arrivalEvent = new ArrivalEvent(landEventMessage.getPayload().getFlightId()+"-flight", landEventMessage.getPayload().getCurrentAirport());
logger.info("Publishing ArrivalEvent: {}", arrivalEvent);
return MessageBuilder.withPayload(arrivalEvent)
.setHeader("Type", ArrivalEvent.class.getSimpleName())
.setHeader("spring.cloud.stream.sendto.destination", "arrivalEventProducer")
.build();
};
}
@Bean("FlightEvent")
public Function<Message<FlightEvent>, Message<LandEvent>> flightEventProcessor() {
return flightEventMessage -> {
LandEvent landEvent = new LandEvent(flightEventMessage.getPayload().getFlightId()+"-flight", flightEventMessage.getPayload().getCurrentAirport());
logger.info("Publishing LandEvent: {}", landEvent);
return MessageBuilder.withPayload(landEvent)
.setHeader("Type", LandEvent.class.getSimpleName())
.setHeader("spring.cloud.stream.sendto.destination", "landEventProducer")
.build();
};
}
@Bean("PlaneEvent")
public Function<Message<PlaneEvent>, Message<FlightEvent>> planeEventProcessor() {
return planeEventMessage -> {
FlightEvent flightEvent = new FlightEvent(planeEventMessage.getPayload().getPlaneId()+"-flight", planeEventMessage.getPayload().getCurrentAirport());
logger.info("Publishing Flight: {}", flightEvent);
return MessageBuilder.withPayload(flightEvent)
.setHeader("Type", FlightEvent.class.getSimpleName())
.setHeader("spring.cloud.stream.sendto.destination", "flightEventProducer")
.build();
};
}
}
ApplicationController.java
@RestController
public class AirplaneController {
@Autowired
StreamBridge streamBridge;
@GetMapping("publishPlane")
public void publishPlane() {
PlaneEvent planeEvent = new PlaneEvent(UUID.randomUUID().toString(), "CITY");
Message<PlaneEvent> message = MessageBuilder.withPayload(planeEvent).setHeader("Type", PlaneEvent.class.getSimpleName())
.build();
streamBridge.send("planeEventProducer", message);
}
}
On calling the GET endpoint (publish plane) I get a ClassCastException when planeEventProcessor tries to publish to the flightEventProducer binding:
org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2953) ~[spring-kafka-3.0.4.jar:3.0.4]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2894) ~[spring-kafka-3.0.4.jar:3.0.4]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2860) ~[spring-kafka-3.0.4.jar:3.0.4]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.lambda$doInvokeRecordListener$56(KafkaMessageListenerContainer.java:2783) ~[spring-kafka-3.0.4.jar:3.0.4]
at io.micrometer.observation.Observation.observe(Observation.java:559) ~[micrometer-observation-1.10.4.jar:1.10.4]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2781) ~[spring-kafka-3.0.4.jar:3.0.4]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2633) ~[spring-kafka-3.0.4.jar:3.0.4]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2519) ~[spring-kafka-3.0.4.jar:3.0.4]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:2171) ~[spring-kafka-3.0.4.jar:3.0.4]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1526) ~[spring-kafka-3.0.4.jar:3.0.4]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1490) ~[spring-kafka-3.0.4.jar:3.0.4]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1365) ~[spring-kafka-3.0.4.jar:3.0.4]
at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run$$$capture(CompletableFuture.java:1804) ~[na:na]
at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]
Caused by: org.springframework.kafka.KafkaException: Failed to execute runnable
at org.springframework.integration.kafka.inbound.KafkaInboundEndpoint.doWithRetry(KafkaInboundEndpoint.java:75) ~[spring-integration-kafka-6.0.3.jar:6.0.3]
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:461) ~[spring-integration-kafka-6.0.3.jar:6.0.3]
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:425) ~[spring-integration-kafka-6.0.3.jar:6.0.3]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2880) ~[spring-kafka-3.0.4.jar:3.0.4]
... 13 common frames omitted
Caused by: org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$ProducerConfigurationMessageHandler@24f4760]
at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:191) ~[spring-integration-core-6.0.3.jar:6.0.3]
at org.springframework.integration.handler.AbstractMessageHandler.doHandleMessage(AbstractMessageHandler.java:108) ~[spring-integration-core-6.0.3.jar:6.0.3]
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:73) ~[spring-integration-core-6.0.3.jar:6.0.3]
at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder$SendingHandler.handleMessageInternal(AbstractMessageChannelBinder.java:1118) ~[spring-cloud-stream-4.0.1.jar:4.0.1]
at org.springframework.integration.handler.AbstractMessageHandler.doHandleMessage(AbstractMessageHandler.java:105) ~[spring-integration-core-6.0.3.jar:6.0.3]
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:73) ~[spring-integration-core-6.0.3.jar:6.0.3]
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115) ~[spring-integration-core-6.0.3.jar:6.0.3]
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133) ~[spring-integration-core-6.0.3.jar:6.0.3]
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106) ~[spring-integration-core-6.0.3.jar:6.0.3]
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72) ~[spring-integration-core-6.0.3.jar:6.0.3]
at org.springframework.integration.channel.AbstractMessageChannel.sendInternal(AbstractMessageChannel.java:373) ~[spring-integration-core-6.0.3.jar:6.0.3]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:327) ~[spring-integration-core-6.0.3.jar:6.0.3]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:297) ~[spring-integration-core-6.0.3.jar:6.0.3]
at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder$1.doSendMessage(FunctionConfiguration.java:648) ~[spring-cloud-stream-4.0.1.jar:4.0.1]
at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder$1.handleMessageInternal(FunctionConfiguration.java:637) ~[spring-cloud-stream-4.0.1.jar:4.0.1]
at org.springframework.integration.handler.AbstractMessageHandler.doHandleMessage(AbstractMessageHandler.java:105) ~[spring-integration-core-6.0.3.jar:6.0.3]
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:73) ~[spring-integration-core-6.0.3.jar:6.0.3]
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115) ~[spring-integration-core-6.0.3.jar:6.0.3]
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133) ~[spring-integration-core-6.0.3.jar:6.0.3]
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106) ~[spring-integration-core-6.0.3.jar:6.0.3]
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72) ~[spring-integration-core-6.0.3.jar:6.0.3]
at org.springframework.integration.channel.AbstractMessageChannel.sendInternal(AbstractMessageChannel.java:373) ~[spring-integration-core-6.0.3.jar:6.0.3]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:327) ~[spring-integration-core-6.0.3.jar:6.0.3]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:297) ~[spring-integration-core-6.0.3.jar:6.0.3]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187) ~[spring-messaging-6.0.6.jar:6.0.6]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166) ~[spring-messaging-6.0.6.jar:6.0.6]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) ~[spring-messaging-6.0.6.jar:6.0.6]
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109) ~[spring-messaging-6.0.6.jar:6.0.6]
at org.springframework.integration.endpoint.MessageProducerSupport.lambda$sendMessage$1(MessageProducerSupport.java:262) ~[spring-integration-core-6.0.3.jar:6.0.3]
at io.micrometer.observation.Observation.observe(Observation.java:492) ~[micrometer-observation-1.10.4.jar:1.10.4]
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:262) ~[spring-integration-core-6.0.3.jar:6.0.3]
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.sendMessageIfAny(KafkaMessageDrivenChannelAdapter.java:394) ~[spring-integration-kafka-6.0.3.jar:6.0.3]
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.lambda$onMessage$0(KafkaMessageDrivenChannelAdapter.java:464) ~[spring-integration-kafka-6.0.3.jar:6.0.3]
at org.springframework.integration.kafka.inbound.KafkaInboundEndpoint.lambda$doWithRetry$0(KafkaInboundEndpoint.java:70) ~[spring-integration-kafka-6.0.3.jar:6.0.3]
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:329) ~[spring-retry-2.0.0.jar:na]
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:225) ~[spring-retry-2.0.0.jar:na]
at org.springframework.integration.kafka.inbound.KafkaInboundEndpoint.doWithRetry(KafkaInboundEndpoint.java:66) ~[spring-integration-kafka-6.0.3.jar:6.0.3]
... 16 common frames omitted
Caused by: org.apache.kafka.common.errors.SerializationException: Can't convert value of class com.example.airplane.FlightEvent to class org.apache.kafka.common.serialization.ByteArraySerializer specified in value.serializer
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:1008) ~[kafka-clients-3.3.2.jar:na]
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:952) ~[kafka-clients-3.3.2.jar:na]
at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.send(DefaultKafkaProducerFactory.java:1022) ~[spring-kafka-3.0.4.jar:3.0.4]
at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:729) ~[spring-kafka-3.0.4.jar:3.0.4]
at org.springframework.kafka.core.KafkaTemplate.observeSend(KafkaTemplate.java:700) ~[spring-kafka-3.0.4.jar:3.0.4]
at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:510) ~[spring-kafka-3.0.4.jar:3.0.4]
at org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler.handleRequestMessage(KafkaProducerMessageHandler.java:528) ~[spring-integration-kafka-6.0.3.jar:6.0.3]
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:136) ~[spring-integration-core-6.0.3.jar:6.0.3]
at org.springframework.integration.handler.AbstractMessageHandler.doHandleMessage(AbstractMessageHandler.java:105) ~[spring-integration-core-6.0.3.jar:6.0.3]
... 51 common frames omitted
Caused by: java.lang.ClassCastException: class com.example.airplane.FlightEvent cannot be cast to class [B (com.example.airplane.FlightEvent is in unnamed module of loader 'app'; [B is in module java.base of loader 'bootstrap')
at org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerializer.java:19) ~[kafka-clients-3.3.2.jar:na]
at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62) ~[kafka-clients-3.3.2.jar:na]
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:1005) ~[kafka-clients-3.3.2.jar:na]
... 59 common frames omitted
Publishing messages using output-bindings and streamBridge works as expected, but using the same output-binding with .setHeader("spring.cloud.stream.sendto.destination", "output binding name here") doesn't.
But, instead of using one of the bindings specified in spring.cloud.stream.output-bindings when I try to use the function's output binding PlaneEvent-out-0, and add PlaneEvent to spring.cloud.stream.function.definition, it works as expected.
On debugging a little I can see that streamBridge sets the outputContentType correctly (application/json).
public boolean send(String bindingName, Object data) {
BindingProperties bindingProperties = this.bindingServiceProperties.getBindingProperties(bindingName);
MimeType contentType = StringUtils.hasText(bindingProperties.getContentType()) ? MimeType.valueOf(bindingProperties.getContentType()) : MimeTypeUtils.APPLICATION_JSON;
return this.send(bindingName, data, contentType);
}
But, this doesn't seem to be happening with spring.cloud.stream.sendto.destination for bindings which are not specified in spring.cloud.stream.function.definition. The correct outputContentType is present in the BindingPropeties, but it isn't getting used.

To Reproduce Steps to reproduce the behavior:
- Clone this repo: https://github.com/KarthikRIyer/spring-cloud-stream-function-routing
- Set the kafka brokers with this property:
spring.cloud.stream.kafka.binder.brokers - Run the app. Execute this GET endpoint
localhost:8095/publishPlane. The sample app has swagger for convenience. - See the error in the logs.
Version of the framework SpringBoot 3.0.4 Java 17 Spring Cloud 2022.0.1
Expected behavior
Publishing messages with the spring.cloud.stream.function.definition header and value as an output-binding specified in the properties in spring.cloud.stream.output-bindings should take into consideration the output content type from BindingProperties.
Screenshots
When publishing with Stream Bridge (Debug point here):

When publishing with spring.cloud.stream.function.definition (Same debug point mentioned above):

Update:
The exception while using spring.cloud.stream.function.definition from a Function<> does not depend if I use a separate output-binding, or the output binding of the function. It works as long as the Function<>'s bean name is specified in spring.cloud.function.definition.
Many thanks @KarthikRIyer your last comment save me hours of debug
@KarthikRIyer Is the original issue still there? Or can we close this issue?
Yeah, the original issue is still there.
I am confused. What does it have to do with routing function? I see you have a Rest Controller from which you are using StreamBridge to always send to planeEventProducer, which itself always routes to flightEventProducer etc,
RouterFunction has nothing to do with spring.cloud.stream.sendto.destination
I am confused
But, instead of using one of the bindings specified in spring.cloud.stream.output-bindings when I try to use the function's output binding PlaneEvent-out-0, and add PlaneEvent to spring.cloud.stream.function.definition, it works as expected.
You have multiple functions. None of them will turn into a binding by default. We only auto-bind if user has a single function, so specifying all these functions in function.definition property is the correct and the expected behaviour
Also, you seem to using all three routing techniques s-c -stream has to offer. . . seems a bit too much
If you are using StreamBridge to effectively connect REST call with stream, why not determine right there where exactly do you want yo send the message and just do it with bridge.send(..). You would save tons of configuration and the app would be much more readable and performant.
If you still insist on using routing function, then make sure it has all the information to route message to proper function
So i really do no see a case where spring.cloud.stream.sendto.destination would make any sense in your application.
Keep in mind that spring.cloud.stream.sendto.destination will send the message to a destination. This means that an extra queue and an extra binding must be there. So it's a network call. And all that your are calling is in the same application. So you are leaving your app just to come back to it. Why?