How multi consumers to montor one topic, add routing-expression don't work.
maven :
Spring boot version is 3.0.5
spring cloud version is 2022.0.4
Code: `@Component @Slf4j @RequiredArgsConstructor public class EventConsumer {
private final StreamBridge streamBridge;
public void sendScheduleEvent(ScheduleEvent event) { log.info("sent: {}", event); Message<ScheduleEvent> message = MessageBuilder.withPayload(event) .setHeader("eventType", event.getEventType()).build();
streamBridge.send(SCHEDULE_EVENT, message);
}
@Bean public Consumer<ScheduleEvent> orderConsumer() { return request -> { log.info("order received: {}", request); }; }
@Bean public Consumer<ScheduleEvent> productConsumer() { return request -> { log.info("product received: {}", request); }; }
}`
application.yml: ` cloud: stream: bindings: sendScheduleEvent-out-0: destination: schedule-event-topic group: schedule binder: rabbit-epan orderConsumer-in-0: destination: schedule-event-topic content-type: application/json group: schedule binder: rabbit-epan productConsumer-in-0: destination: schedule-event-topic content-type: application/json group: schedule binder: rabbit-epan # 添加coupon - Producer addCoupon-out-0: destination: request-coupon-topic content-type: application/json binder: rabbit-core # 添加coupon - Consumer addCoupon-in-0: destination: request-coupon-topic content-type: application/json # 消费组,同一个组内只能被消费一次 group: add-coupon-group binder: rabbit-epan consumer: max-attempts: 5 back-off-initial-interval: 2000 back-off-max-interval: 10000 back-off-multiplier: 2 retryable-exceptions: java.lang.IllegalArgumentException: false
# 删除coupon - Producer
deleteCoupon-out-0:
destination: delete-coupon-topic
content-type: application/json
binder: rabbit-epan
# 删除coupon - Consumer
deleteCoupon-in-0:
destination: delete-coupon-topic
content-type: application/json
group: delete-coupon-group
binder: rabbit-epan
binders:
rabbit-epan:
type: rabbit # 消息中间件类型
environment: # 连接信息
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
rabbit-core:
type: rabbit # 消息中间件类型
environment: # 连接信息
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
default-binder: rabbit-epan
function:
routing:
enabled: true
definition: addCoupon;deleteCoupon;orderConsumer;productConsumer;sendScheduleEvent
routing-expression: "headers.eventType.toString().equals('order') ? 'orderConsumer' : 'productConsumer'"`
Now I send message to schedule-event-topic, orderConsumer & productConsumer consume the message at random.
Can you provide more demos to showcase the scene ? now about the latest spring stream resource are very scarce, thank you
@weiro-9-w7 Here is a sample application that demonstrates message routing callback. Although written for Kafka, this is applicable in a Rabbit scenario also. This is something that you might want to look at. Regarding your specific issue of routing-expression not working, could you create a minimal sample where we can reproduce the issue? That way, we can triage it faster.
@sobychacko thanks for your reply. I try to use message routing callback to test it. and i want to know if use the message header can implement the scenaia? thanks
@weiro-9-w7 Are there any latest updates on this issue?
Closing the issue due to no activity. Feel free to re-open it if needed.