MessageRoutingCallback and Routing-expression not worked for event routing
Describe the issue MessageRoutingCallback or Routing-expression not routing the message to appropriate functions.
Routingfunction responsible for routing the events to the desired function and there were two ways that we tried
- By implementing MessageRoutingCallback. It's expected to go to anotherConsumeMessage function if headers consist of the key with the desired value. But it didn't route to the expected function rather it's routed to 'consumeMessage'.
PFB the code
@Bean
public MessageRoutingCallback customRouter() {
return new MessageRoutingCallback() {
@Override
public FunctionRoutingResult routingResult(Message<?> message) {
if (!ObjectUtils.isEmpty(message.getHeaders().get("test-key")) &&
message.getHeaders().get("test-key").toString().equalsIgnoreCase("consume")) {
return new FunctionRoutingResult("consumeMessage");
} else if (!ObjectUtils.isEmpty(message.getHeaders().get("test-key")) &&
message.getHeaders().get("test-key").toString().equalsIgnoreCase("anotherConsume")) {
return new FunctionRoutingResult("anotherConsumeMessage");
}
return MessageRoutingCallback.super.routingResult(message);
}
};
}
- We also tried routing expression as can be seen in the below configuration but instead of anotherConsumeMessage, the message routed to ConsumeMessage. PFB the screenshot to check the content of the message and headers picked up by Kafka binder.
spring:
cloud:
stream:
default-binder: kafka
kafka:
binder:
auto-create-topics: false
brokers:
- localhost:29092
header-mapper-bean-name: kafkaheaderMapperTest
consumer-properties:
value.deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
bindings:
consumeMessage-in-0:
destination: test-kafka-topic
group: groupIdConfig
anotherConsumeMessage-in-0:
destination: test-kafka-topic
group: groupIdConfig
function:
definition: consumeMessage; anotherConsumeMessage
routing-expression: "headers['contentType'] == 'application/json' ? 'anotherConsumeMessage' : 'consumeMessage'"
configuration:
application:
name: KafkaStreams
Version of the framework
- Spring-Cloud-Stream - 3.2.4
- spring-cloud-stream-binder-kafka - 3.2.4
Expected behavior
Routingfunction should be picked and should decide the routing of the event to the desired function.
Screenshots
You can see the below logs to check the activity that happened on the consumer side and the message details received.
There is a lot going on in your configuration as well as callback sample. . makes it hard to follow as I am not sure which configuration is current. Consider creating a small sample that reproduces the issue and pushing it to github so we can take a look. Meanwhile you can look at the test cases for RoutingFunction that cover all scenarios - https://github.com/spring-cloud/spring-cloud-stream/blob/main/core/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/function/RoutingFunctionTests.java
Hi Oleg @olegz,
Thanks for reverting.
Currently, We have added the processResponse as below after which MessageRoutingCallback started working as expected.
@Bean
public Consumer<Message<?>> processResponse(final RoutingFunction customRouter) {
return customRouter::apply;
}
@Bean
public MessageRoutingCallback customRouter() {
return new MessageRoutingCallback() {
@Override
public FunctionRoutingResult routingResult(Message<?> message) {
if (!ObjectUtils.isEmpty(message.getHeaders().get("test-key")) &&
message.getHeaders().get("test-key").toString().equalsIgnoreCase("consume")) {
return new FunctionRoutingResult("consumeMessage");
} else if (!ObjectUtils.isEmpty(message.getHeaders().get("test-key")) &&
message.getHeaders().get("test-key").toString().equalsIgnoreCase("anotherConsume")) {
return new FunctionRoutingResult("anotherConsumeMessage");
}
return MessageRoutingCallback.super.routingResult(message);
}
};
}
But The application configuration below still has not worked. And I think the routing.enabled property does not exist any longer you might want to update the tests accordingly. Irrespective of the header value it's always going to consume function so not sure what's going wrong with the configuration.
spring:
cloud:
stream:
default-binder: kafka
kafka:
binder:
auto-create-topics: false
brokers:
- localhost:29092
bindings:
consumeMessage-in-0:
destination: test-kafka-topic
group: gdpr-orchestrator-svc
function:
definition: consumeMessage;anotherConsume
routing-expression: "headers.testKey.toString().equals('anotherConsume') ? 'anotherConsume': 'consume'"
application:
name: KafkaSpring
I have added the git repo spring-cloud-stream-routing and the screenshot of the output for invoked function :-

Please check and let me know what are your thoughts.
@olegz Did you get a chance to look at the above?
@olegz - Is this sorted out ?