spring-cloud-stream icon indicating copy to clipboard operation
spring-cloud-stream copied to clipboard

How multi consumers to montor one topic, add routing-expression don't work.

Open weiro-9-w7 opened this issue 2 years ago • 4 comments

maven :
Spring boot version is 3.0.5 spring cloud version is 2022.0.4

org.springframework.cloud spring-cloud-stream-binder-rabbit

Code: `@Component @Slf4j @RequiredArgsConstructor public class EventConsumer {

private final StreamBridge streamBridge;

public void sendScheduleEvent(ScheduleEvent event) { log.info("sent: {}", event); Message<ScheduleEvent> message = MessageBuilder.withPayload(event) .setHeader("eventType", event.getEventType()).build();

streamBridge.send(SCHEDULE_EVENT, message);

}

@Bean public Consumer<ScheduleEvent> orderConsumer() { return request -> { log.info("order received: {}", request); }; }

@Bean public Consumer<ScheduleEvent> productConsumer() { return request -> { log.info("product received: {}", request); }; }

}`

application.yml: ` cloud: stream: bindings: sendScheduleEvent-out-0: destination: schedule-event-topic group: schedule binder: rabbit-epan orderConsumer-in-0: destination: schedule-event-topic content-type: application/json group: schedule binder: rabbit-epan productConsumer-in-0: destination: schedule-event-topic content-type: application/json group: schedule binder: rabbit-epan # 添加coupon - Producer addCoupon-out-0: destination: request-coupon-topic content-type: application/json binder: rabbit-core # 添加coupon - Consumer addCoupon-in-0: destination: request-coupon-topic content-type: application/json # 消费组,同一个组内只能被消费一次 group: add-coupon-group binder: rabbit-epan consumer: max-attempts: 5 back-off-initial-interval: 2000 back-off-max-interval: 10000 back-off-multiplier: 2 retryable-exceptions: java.lang.IllegalArgumentException: false

    # 删除coupon - Producer
    deleteCoupon-out-0:
      destination: delete-coupon-topic
      content-type: application/json
      binder: rabbit-epan
    # 删除coupon - Consumer
    deleteCoupon-in-0:
      destination: delete-coupon-topic
      content-type: application/json
      group: delete-coupon-group
      binder: rabbit-epan
  binders:
    rabbit-epan:
      type: rabbit # 消息中间件类型
      environment: # 连接信息
        spring:
          rabbitmq:
            host: localhost
            port: 5672
            username: guest
            password: guest
    rabbit-core:
      type: rabbit # 消息中间件类型
      environment: # 连接信息
        spring:
          rabbitmq:
            host: localhost
            port: 5672
            username: guest
            password: guest
  default-binder: rabbit-epan

function:
  routing:
    enabled: true
  definition: addCoupon;deleteCoupon;orderConsumer;productConsumer;sendScheduleEvent
  routing-expression: "headers.eventType.toString().equals('order') ? 'orderConsumer' : 'productConsumer'"`

Now I send message to schedule-event-topic, orderConsumer & productConsumer consume the message at random.

weiro-9-w7 avatar Nov 01 '23 08:11 weiro-9-w7

Can you provide more demos to showcase the scene ? now about the latest spring stream resource are very scarce, thank you

weiro-9-w7 avatar Nov 01 '23 08:11 weiro-9-w7

@weiro-9-w7 Here is a sample application that demonstrates message routing callback. Although written for Kafka, this is applicable in a Rabbit scenario also. This is something that you might want to look at. Regarding your specific issue of routing-expression not working, could you create a minimal sample where we can reproduce the issue? That way, we can triage it faster.

sobychacko avatar Nov 01 '23 18:11 sobychacko

@sobychacko thanks for your reply. I try to use message routing callback to test it. and i want to know if use the message header can implement the scenaia? thanks

weiro-9-w7 avatar Nov 02 '23 00:11 weiro-9-w7

@weiro-9-w7 Are there any latest updates on this issue?

sobychacko avatar Feb 21 '24 22:02 sobychacko

Closing the issue due to no activity. Feel free to re-open it if needed.

sobychacko avatar Jun 05 '24 20:06 sobychacko