spring-cloud-stream icon indicating copy to clipboard operation
spring-cloud-stream copied to clipboard

MessageRoutingCallback and Routing-expression not worked for event routing

Open amitchidrewar1301 opened this issue 3 years ago • 4 comments

Describe the issue MessageRoutingCallback or Routing-expression not routing the message to appropriate functions.

Routingfunction responsible for routing the events to the desired function and there were two ways that we tried

  1. By implementing MessageRoutingCallback. It's expected to go to anotherConsumeMessage function if headers consist of the key with the desired value. But it didn't route to the expected function rather it's routed to 'consumeMessage'.

PFB the code

 @Bean
  public MessageRoutingCallback customRouter() {
    return new MessageRoutingCallback() {
      @Override
      public FunctionRoutingResult routingResult(Message<?> message) {
        if (!ObjectUtils.isEmpty(message.getHeaders().get("test-key")) &&
            message.getHeaders().get("test-key").toString().equalsIgnoreCase("consume")) {
          return new FunctionRoutingResult("consumeMessage");
        } else if (!ObjectUtils.isEmpty(message.getHeaders().get("test-key")) &&
            message.getHeaders().get("test-key").toString().equalsIgnoreCase("anotherConsume")) {
          return new FunctionRoutingResult("anotherConsumeMessage");
        }
        return MessageRoutingCallback.super.routingResult(message);
      }
    };
  }
  1. We also tried routing expression as can be seen in the below configuration but instead of anotherConsumeMessage, the message routed to ConsumeMessage. PFB the screenshot to check the content of the message and headers picked up by Kafka binder.
spring:
  cloud:
    stream:
      default-binder: kafka
      kafka:
        binder:
          auto-create-topics: false
          brokers:
            - localhost:29092
          header-mapper-bean-name: kafkaheaderMapperTest
          consumer-properties:
            value.deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      bindings:
        consumeMessage-in-0:
          destination: test-kafka-topic
          group: groupIdConfig
        anotherConsumeMessage-in-0:
          destination: test-kafka-topic
          group: groupIdConfig
    function:
      definition: consumeMessage; anotherConsumeMessage
      routing-expression: "headers['contentType'] == 'application/json' ? 'anotherConsumeMessage' : 'consumeMessage'"
      configuration:
  application:
    name: KafkaStreams

Version of the framework

  1. Spring-Cloud-Stream - 3.2.4
  2. spring-cloud-stream-binder-kafka - 3.2.4 Expected behavior Routingfunction should be picked and should decide the routing of the event to the desired function. Screenshots You can see the below logs to check the activity that happened on the consumer side and the message details received. image

amitchidrewar1301 avatar Nov 02 '22 12:11 amitchidrewar1301

There is a lot going on in your configuration as well as callback sample. . makes it hard to follow as I am not sure which configuration is current. Consider creating a small sample that reproduces the issue and pushing it to github so we can take a look. Meanwhile you can look at the test cases for RoutingFunction that cover all scenarios - https://github.com/spring-cloud/spring-cloud-stream/blob/main/core/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/function/RoutingFunctionTests.java

olegz avatar Jan 04 '23 18:01 olegz

Hi Oleg @olegz,

Thanks for reverting.

Currently, We have added the processResponse as below after which MessageRoutingCallback started working as expected.


 @Bean
  public Consumer<Message<?>> processResponse(final RoutingFunction customRouter) {
    return customRouter::apply;
  }
  
@Bean
  public MessageRoutingCallback customRouter() {
    return new MessageRoutingCallback() {
      @Override
      public FunctionRoutingResult routingResult(Message<?> message) {
        if (!ObjectUtils.isEmpty(message.getHeaders().get("test-key")) &&
            message.getHeaders().get("test-key").toString().equalsIgnoreCase("consume")) {
          return new FunctionRoutingResult("consumeMessage");
        } else if (!ObjectUtils.isEmpty(message.getHeaders().get("test-key")) &&
            message.getHeaders().get("test-key").toString().equalsIgnoreCase("anotherConsume")) {
          return new FunctionRoutingResult("anotherConsumeMessage");
        }
        return MessageRoutingCallback.super.routingResult(message);
      }
    };
  }

But The application configuration below still has not worked. And I think the routing.enabled property does not exist any longer you might want to update the tests accordingly. Irrespective of the header value it's always going to consume function so not sure what's going wrong with the configuration.

spring:
  cloud:
    stream:
      default-binder: kafka
      kafka:
        binder:
          auto-create-topics: false
          brokers:
            - localhost:29092
      bindings:
        consumeMessage-in-0:
          destination: test-kafka-topic
          group: gdpr-orchestrator-svc
    function:
      definition: consumeMessage;anotherConsume
       routing-expression: "headers.testKey.toString().equals('anotherConsume') ? 'anotherConsume': 'consume'"
  application:
    name: KafkaSpring

I have added the git repo spring-cloud-stream-routing and the screenshot of the output for invoked function :- image

Please check and let me know what are your thoughts.

amitchidrewar1301 avatar Jan 05 '23 16:01 amitchidrewar1301

@olegz Did you get a chance to look at the above?

amitchidrewar1301 avatar Mar 09 '23 17:03 amitchidrewar1301

@olegz - Is this sorted out ?

javaHelper avatar Jul 30 '23 17:07 javaHelper