rocketmq-spring icon indicating copy to clipboard operation
rocketmq-spring copied to clipboard

single instance with multi DefaultLitePullConsumer don't poll message right

Open jiakme opened this issue 2 years ago • 0 comments

  1. Please describe the issue you observed:
  • single java springboot server instance with multi DefaultLitePullConsumer
  • invoke DefaultLitePullConsumer#poll() in loop
  • some queue don't be consumed
  1. Please tell us about your environment:
  • system: linux
  • springboot: 2.4.2
  • rocketmq-spring: 2.2.2
  1. Other information

DefaultLitePullConsumer create

` private void initRocketMQPushConsumer() throws MQClientException {

    List<DefaultLitePullConsumer> pullConsumers = new ArrayList<>();

    DefaultLitePullConsumer defaultLitePullConsumer = getDefaultLitePullConsumer("high");
    pullConsumers.add(defaultLitePullConsumer);

    DefaultLitePullConsumer defaultLitePullConsumer_1 = getDefaultLitePullConsumer("low");
    pullConsumers.add(defaultLitePullConsumer_1);

    PULL_CONSUMERS = Collections.unmodifiableList(pullConsumers);
}

private DefaultLitePullConsumer getDefaultLitePullConsumer(String tag) throws MQClientException {
    RocketMQProperties.Consumer consumerConfig = new RocketMQProperties.Consumer();
    consumerConfig.setGroup(MqConfig.CATEGORY_SMART_CAMPUS_APP_CENTER + "_priority");

    consumerConfig.setTopic(TOPIC);
    consumerConfig.setPullBatchSize(50);
    DefaultLitePullConsumer defaultLitePullConsumer;

    consumerConfig.setSelectorExpression(tag);
    defaultLitePullConsumer = buildPullConsumer(consumerConfig);
    return defaultLitePullConsumer;
}
private DefaultLitePullConsumer buildPullConsumer(RocketMQProperties.Consumer consumerConfig) throws MQClientException {
    /**
     * {@link org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration#defaultLitePullConsumer(RocketMQProperties)}
     */
    String nameServer = rocketMQProperties.getNameServer();
    String groupName = consumerConfig.getGroup();
    String topicName = consumerConfig.getTopic();
    Assert.hasText(nameServer, "[rocketmq.name-server] must not be null");
    Assert.hasText(groupName, "[rocketmq.pull-consumer.group] must not be null");
    Assert.hasText(topicName, "[rocketmq.pull-consumer.topic] must not be null");

    String accessChannel = rocketMQProperties.getAccessChannel();
    MessageModel messageModel = MessageModel.valueOf(consumerConfig.getMessageModel());
    SelectorType selectorType = SelectorType.valueOf(consumerConfig.getSelectorType());
    String selectorExpression = consumerConfig.getSelectorExpression();
    String ak = consumerConfig.getAccessKey();
    String sk = consumerConfig.getSecretKey();
    int pullBatchSize = consumerConfig.getPullBatchSize();
    boolean useTLS = consumerConfig.isTlsEnable();

    DefaultLitePullConsumer litePullConsumer = RocketMQUtil.createDefaultLitePullConsumer(nameServer, accessChannel,
            groupName, topicName, messageModel, selectorType, selectorExpression, ak, sk, pullBatchSize, useTLS);
    litePullConsumer.setEnableMsgTrace(consumerConfig.isEnableMsgTrace());
    litePullConsumer.setCustomizedTraceTopic(consumerConfig.getCustomizedTraceTopic());
    litePullConsumer.setNamespace(consumerConfig.getNamespace());
    litePullConsumer.setAllocateMessageQueueStrategy(new AllocateMessageQueueAveragely());
    litePullConsumer.setAutoCommit(true);
    return litePullConsumer;
}

`

invoke poll for (DefaultLitePullConsumer pullConsumer : pullConsumers) { for (;;) { List<MessageExt> messageExts = pullConsumer.poll(); if (CollUtil.isNotEmpty(messageExts)) { System.out.println(JSON.toJSONString(pullConsumer) + "::::::::::" + JSON.toJSONString(messageExts)); continue; } break; } }

rocketmq-dashboard: image

jiakme avatar Apr 11 '23 12:04 jiakme