rocketmq-spring
rocketmq-spring copied to clipboard
single instance with multi DefaultLitePullConsumer don't poll message right
- Please describe the issue you observed:
- single java springboot server instance with multi DefaultLitePullConsumer
- invoke DefaultLitePullConsumer#poll() in loop
- some queue don't be consumed
- Please tell us about your environment:
- system: linux
- springboot: 2.4.2
- rocketmq-spring: 2.2.2
- Other information
DefaultLitePullConsumer create
` private void initRocketMQPushConsumer() throws MQClientException {
List<DefaultLitePullConsumer> pullConsumers = new ArrayList<>();
DefaultLitePullConsumer defaultLitePullConsumer = getDefaultLitePullConsumer("high");
pullConsumers.add(defaultLitePullConsumer);
DefaultLitePullConsumer defaultLitePullConsumer_1 = getDefaultLitePullConsumer("low");
pullConsumers.add(defaultLitePullConsumer_1);
PULL_CONSUMERS = Collections.unmodifiableList(pullConsumers);
}
private DefaultLitePullConsumer getDefaultLitePullConsumer(String tag) throws MQClientException {
RocketMQProperties.Consumer consumerConfig = new RocketMQProperties.Consumer();
consumerConfig.setGroup(MqConfig.CATEGORY_SMART_CAMPUS_APP_CENTER + "_priority");
consumerConfig.setTopic(TOPIC);
consumerConfig.setPullBatchSize(50);
DefaultLitePullConsumer defaultLitePullConsumer;
consumerConfig.setSelectorExpression(tag);
defaultLitePullConsumer = buildPullConsumer(consumerConfig);
return defaultLitePullConsumer;
}
private DefaultLitePullConsumer buildPullConsumer(RocketMQProperties.Consumer consumerConfig) throws MQClientException {
/**
* {@link org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration#defaultLitePullConsumer(RocketMQProperties)}
*/
String nameServer = rocketMQProperties.getNameServer();
String groupName = consumerConfig.getGroup();
String topicName = consumerConfig.getTopic();
Assert.hasText(nameServer, "[rocketmq.name-server] must not be null");
Assert.hasText(groupName, "[rocketmq.pull-consumer.group] must not be null");
Assert.hasText(topicName, "[rocketmq.pull-consumer.topic] must not be null");
String accessChannel = rocketMQProperties.getAccessChannel();
MessageModel messageModel = MessageModel.valueOf(consumerConfig.getMessageModel());
SelectorType selectorType = SelectorType.valueOf(consumerConfig.getSelectorType());
String selectorExpression = consumerConfig.getSelectorExpression();
String ak = consumerConfig.getAccessKey();
String sk = consumerConfig.getSecretKey();
int pullBatchSize = consumerConfig.getPullBatchSize();
boolean useTLS = consumerConfig.isTlsEnable();
DefaultLitePullConsumer litePullConsumer = RocketMQUtil.createDefaultLitePullConsumer(nameServer, accessChannel,
groupName, topicName, messageModel, selectorType, selectorExpression, ak, sk, pullBatchSize, useTLS);
litePullConsumer.setEnableMsgTrace(consumerConfig.isEnableMsgTrace());
litePullConsumer.setCustomizedTraceTopic(consumerConfig.getCustomizedTraceTopic());
litePullConsumer.setNamespace(consumerConfig.getNamespace());
litePullConsumer.setAllocateMessageQueueStrategy(new AllocateMessageQueueAveragely());
litePullConsumer.setAutoCommit(true);
return litePullConsumer;
}
`
invoke poll
for (DefaultLitePullConsumer pullConsumer : pullConsumers) { for (;;) { List<MessageExt> messageExts = pullConsumer.poll(); if (CollUtil.isNotEmpty(messageExts)) { System.out.println(JSON.toJSONString(pullConsumer) + "::::::::::" + JSON.toJSONString(messageExts)); continue; } break; } }
rocketmq-dashboard:
