使用starter的时候需要自定义全局拦截器在 listener onMessage 之前处理 MessageExt
FEATURE REQUEST
- 使用starter的时候需要自定义全局拦截器在 listener onMessage 之前处理 MessageExt
- Provide any additional detail on your proposed use case for this feature.
- 添加message处理器
package org.apache.rocketmq.spring.support;
import org.apache.rocketmq.common.message.MessageExt;
/**
* listener message customizer
*
* @author texousliu
* @since 2023-08-18
*/
public interface RocketMQListenerMessageCustomizer {
void customize(final MessageExt messageExt);
}
- ListenerContainerConfiguration 注入自定义处理器
private List<RocketMQListenerMessageCustomizer> rocketMQListenerMessageCustomizers;
public ListenerContainerConfiguration(RocketMQMessageConverter rocketMQMessageConverter,
ConfigurableEnvironment environment, RocketMQProperties rocketMQProperties,
@Autowired(required = false) List<RocketMQListenerMessageCustomizer> rocketMQListenerMessageCustomizers) {
this.rocketMQMessageConverter = rocketMQMessageConverter;
this.environment = environment;
this.rocketMQProperties = rocketMQProperties;
this.rocketMQListenerMessageCustomizers = rocketMQListenerMessageCustomizers;
}
- DefaultRocketMQListenerContainer 添加
private List<RocketMQListenerMessageCustomizer> rocketMQListenerMessageCustomizers;
public List<RocketMQListenerMessageCustomizer> getRocketMQListenerMessageCustomizers() {
return rocketMQListenerMessageCustomizers;
}
public void setRocketMQListenerMessageCustomizers(List<RocketMQListenerMessageCustomizer> rocketMQListenerMessageCustomizers) {
this.rocketMQListenerMessageCustomizers = rocketMQListenerMessageCustomizers;
}
- ListenerContainerConfiguration 配置 DefaultRocketMQListenerContainer
container.setRocketMQListenerMessageCustomizers(rocketMQListenerMessageCustomizers);
- 处理消息的地方添加调用
private void handleMessage(
MessageExt messageExt) throws MQClientException, RemotingException, InterruptedException {
if (rocketMQListenerMessageCustomizers != null) {
for (RocketMQListenerMessageCustomizer customizer : rocketMQListenerMessageCustomizers) {
customizer.customize(messageExt);
}
}
// ...... other code
}
- Indicate the importance of this issue to you (blocker, must-have, should-have, nice-to-have). Are you currently using any workarounds to address this issue?
我觉得这是一个比较重要的功能,在获取消息之后能够全局拦截并预先处理。比如说:需要全链路追踪的时候,可以通过这个扩展拦截器处理额外的信息到 context 中
因为我在 starter 的源代码中没有看到相关的扩展接口,如果需要强制实现的话需要重写 ListenerContainerConfiguration,我认为这是不太友好的方式
写的不错,很有想法。
个人认为,作者没有实现扩展接口,可能另有考虑。
另外,如果要针对rocketMQ实现链路追踪,需要生产者在发送消息时,通过properties增加对应值。在消费者端,使用aop进行处理,也很方便
下面是我的代码示例
`@Slf4j @Aspect @Component public class TraceAspect {
private static final String TRACE_ID = "traceId";
@SneakyThrows
@Around("@annotation(mqTraceID)")
public Object before2(ProceedingJoinPoint joinPoint, MqTraceID mqTraceID) {
Object[] args = joinPoint.getArgs();
for (Object arg : args) {
if (arg instanceof MessageExt messageExt) {
Map<String, String> properties = messageExt.getProperties();
String traceId = properties.get(TRACE_ID);
if (StringUtils.isBlank(traceId)) {
traceId = generateTraceId();
}
MDC.put(TRACE_ID, traceId);
break;
}
}
try {
return joinPoint.proceed();
} finally {
MDC.clear();
}
}
public static String generateTraceId() {
return UUID.randomUUID().toString().replace("-", "").toLowerCase();
}
}`
消费者
Component @RocketMQMessageListener(topic = "test-topic", consumerGroup = "test-consumer-group") public class BuyRenewCloudDiskMq implements RocketMQListener<MessageExt>{ @MqTraceID @SneakyThrows @Override public void onMessage(MessageExt messageExt) {
}
}
因为我在 starter 的源代码中没有看到相关的扩展接口,如果需要强制实现的话需要重写
ListenerContainerConfiguration,我认为这是不太友好的方式
太需要这个了,不然就只能自己定制处理
可以写个父类,实现RocketMQListene<MessageExt>,可以自定义消费逻辑
ocketMQ实现链路追踪,需要生产者在发送消息时,通过
properties增加对应值。在消费者端,使
这样就需要修改旧代码了,如果从源头可扩展支持,那么我可以不用动旧代码
可以写个父类,实现RocketMQListene<MessageExt>,可以自定义消费逻辑
旧代码需要改造,新代码需要约束开发者行为,如果不知情的同事还是回使用原生注解
ref https://github.com/apache/rocketmq-spring/issues/616
只要这个方法改成public,就可以通过切面DefaultRocketMQListenerContainer 来做统一的处理了。