rocketmq-flink icon indicating copy to clipboard operation
rocketmq-flink copied to clipboard

roketmq can not consumer topic in cluster, it's ok in idea

Open luckydarnell opened this issue 2 years ago • 4 comments

In standalone cluster

  • environment:

java version "1.8.0_271" + flink-1.14.6 + rocketmq 4.9.2

  • code
   // use
    dataStream = env.addSource(createMqsource(cfg, dataSource));
   // create RocketMQSource
    public static RocketMQSource<TransData> createMqsource(SourceCfg cfg, DataSourceCfg dataSource) {
        // 构造mq地址
        String[] hostNameSplit = dataSource.getHostname().split(SymbolConstant.SEPARATOR_SEMI_COLON);
        String addr = null;
        for (String s : hostNameSplit) {
            String joinStr = s + SymbolConstant.SEPARATOR_COLON + dataSource.getPort();
            addr = StringUtils.isEmpty(addr) ? joinStr : addr + SymbolConstant.SEPARATOR_SEMI_COLON + joinStr;
        }
        DeserializationSchema<List<MessageExt>, TransData> deserializationSchema = MqDeserializationSchemaFactory.create(cfg);
        // 判断启动模式
        OffsetResetStrategy startUpMode = cfg.getStartupMode() == null || cfg.getStartupMode().equals(StartupModeEnum.LATEST) ?
                OffsetResetStrategy.LATEST : OffsetResetStrategy.EARLIEST;

        RocketMQSourceBuilder<TransData> builder = new RocketMQSourceBuilder<TransData>()
                .setNameServerAddress(addr)
                .setConsumerGroup(cfg.getConsumerGroup())
                .setTopic(cfg.getTopic())
                .setTag(cfg.getTag())
                .setStartFromGroupOffsets(startUpMode)
                .setDeserializer(new RocketMQValueOnlyDeserializationSchemaWrapper<>(deserializationSchema));
        log.info("addr:{}, group:{}, tag:{}", addr, cfg.getConsumerGroup(), cfg.getTag());
        return builder.build();
    }
  • exception infos:

2024-01-05 13:48:01,579 INFO org.apache.rocketmq.flink.source.enumerator.RocketMQSourceEnumerator [] - Starting the RocketMQSourceEnumerator for consumer group X without periodic partition discovery.

2024-01-05 13:48:01,589 INFO org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Source Source: T_AC_RE_WZ_JJJZ(MQ)- registering reader for parallel task 0 @ 127.0.0.1 2024-01-05 13:48:01,886 INFO RocketmqRemoting [] - closeChannel: close the connection to remote address[10.200.38.118:9876] result: true 2024-01-05 13:48:01,887 INFO RocketmqRemoting [] - closeChannel: close the connection to remote address[10.200.38.118:9876] result: true 2024-01-05 13:48:01,889 INFO RocketmqRemoting [] - closeChannel: close the connection to remote address[10.200.38.118:9876] result: true 2024-01-05 13:48:01,889 INFO RocketmqRemoting [] - closeChannel: close the connection to remote address[10.200.38.118:9876] result: true 2024-01-05 13:48:01,890 INFO RocketmqRemoting [] - closeChannel: close the connection to remote address[10.200.38.118:9876] result: true 2024-01-05 13:48:01,887 ERROR org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext [] - Exception while handling result from async call in SourceCoordinator-Source: S(MQ)-. Triggering job failover. org.apache.flink.util.FlinkRuntimeException: Failed to handle partition splits change due to at org.apache.rocketmq.flink.source.enumerator.RocketMQSourceEnumerator.handlePartitionSplitChanges(RocketMQSourceEnumerator.java:279) ~[flink-rocketmq-1.0-SNAPSHOT.jar:?] at org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$null$1(ExecutorNotifier.java:83) ~[flink-runtime-1.14.6.jar:1.14.6] at org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:40) [flink-core-1.14.6.jar:1.14.6] at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) [?:1.8.0_271] at java.util.concurrent.FutureTask.run(Unknown Source) [?:1.8.0_271] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(Unknown Source) [?:1.8.0_271] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source) [?:1.8.0_271] at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [?:1.8.0_271] at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [?:1.8.0_271] at java.lang.Thread.run(Unknown Source) [?:1.8.0_271] Caused by: org.apache.rocketmq.client.exception.MQClientException: Can not find Message Queue for this topic, FPC_SYNC_FI_AC_REPTILE_1 See http://rocketmq.apache.org/docs/faq/ for further details. at org.apache.rocketmq.client.impl.MQAdminImpl.fetchSubscribeMessageQueues(MQAdminImpl.java:177) ~[rocketmq-client-4.9.2.jar:4.9.2] at org.apache.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl.fetchSubscribeMessageQueues(DefaultMQPullConsumerImpl.java:147) ~[rocketmq-client-4.9.2.jar:4.9.2] at org.apache.rocketmq.client.consumer.DefaultMQPullConsumer.fetchSubscribeMessageQueues(DefaultMQPullConsumer.java:290) ~[rocketmq-client-4.9.2.jar:4.9.2] at org.apache.rocketmq.flink.source.enumerator.RocketMQSourceEnumerator.discoverAndInitializePartitionSplit(RocketMQSourceEnumerator.java:248) ~[flink-rocketmq-1.0-SNAPSHOT.jar:?] at org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$notifyReadyAsync$2(ExecutorNotifier.java:80) ~[flink-runtime-1.14.6.jar:1.14.6] ... 7 more Caused by: org.apache.rocketmq.remoting.exception.RemotingSendRequestException: send request to <10.200.38.118:9876> failed at org.apache.rocketmq.remoting.netty.NettyRemotingAbstract.invokeSyncImpl(NettyRemotingAbstract.java:440) ~[rocketmq-remoting-4.9.2.jar:4.9.2] at org.apache.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:377) ~[rocketmq-remoting-4.9.2.jar:4.9.2] at org.apache.rocketmq.client.impl.MQClientAPIImpl.getTopicRouteInfoFromNameServer(MQClientAPIImpl.java:1367) ~[rocketmq-client-4.9.2.jar:4.9.2] at org.apache.rocketmq.client.impl.MQClientAPIImpl.getTopicRouteInfoFromNameServer(MQClientAPIImpl.java:1357) ~[rocketmq-client-4.9.2.jar:4.9.2] at org.apache.rocketmq.client.impl.MQAdminImpl.fetchSubscribeMessageQueues(MQAdminImpl.java:166) ~[rocketmq-client-4.9.2.jar:4.9.2] at org.apache.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl.fetchSubscribeMessageQueues(DefaultMQPullConsumerImpl.java:147) ~[rocketmq-client-4.9.2.jar:4.9.2] at org.apache.rocketmq.client.consumer.DefaultMQPullConsumer.fetchSubscribeMessageQueues(DefaultMQPullConsumer.java:290) ~[rocketmq-client-4.9.2.jar:4.9.2] at org.apache.rocketmq.flink.source.enumerator.RocketMQSourceEnumerator.discoverAndInitializePartitionSplit(RocketMQSourceEnumerator.java:248) ~[flink-rocketmq-1.0-SNAPSHOT.jar:?] at org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$notifyReadyAsync$2(ExecutorNotifier.java:80) ~[flink-runtime-1.14.6.jar:1.14.6] ... 7 more

luckydarnell avatar Jan 05 '24 05:01 luckydarnell

In IntelliJ,every thing is ok,the log:

24/01/05 10:07:16 INFO org.apache.rocketmq.flink.source.enumerator.RocketMQSourceEnumerator: Starting the RocketMQSourceEnumerator for consumer group X without periodic partition discovery. 24/01/05 10:07:16 INFO org.apache.flink.runtime.source.coordinator.SourceCoordinator: Source Source: s(MQ)- registering reader for parallel task 0 @ 24/01/05 10:07:17 INFO org.apache.rocketmq.flink.legacy.common.util.RocketMQUtils: current consumer thread:MessageQueue [topic=topicx, brokerName=broker-a, queueId=0] has no committed offset,use Strategy:LATEST instead 24/01/05 10:07:17 INFO org.apache.rocketmq.flink.legacy.common.util.RocketMQUtils: current consumer queue:broker-a-0 start from offset of: 0 24/01/05 10:07:17 INFO org.apache.rocketmq.flink.legacy.common.util.RocketMQUtils: current consumer thread:MessageQueue [topic=topicx, brokerName=broker-a, queueId=2] has no committed offset,use Strategy:LATEST instead 24/01/05 10:07:17 INFO org.apache.rocketmq.flink.legacy.common.util.RocketMQUtils: current consumer queue:broker-a-2 start from offset of: 0 24/01/05 10:07:17 INFO org.apache.rocketmq.flink.legacy.common.util.RocketMQUtils: current consumer thread:MessageQueue [topic=topicx, brokerName=broker-a, queueId=1] has no committed offset,use Strategy:LATEST instead 24/01/05 10:07:17 INFO org.apache.rocketmq.flink.legacy.common.util.RocketMQUtils: current consumer queue:broker-a-1 start from offset of: 0 24/01/05 10:07:17 INFO org.apache.rocketmq.flink.legacy.common.util.RocketMQUtils: current consumer thread:MessageQueue [topic=topicx, brokerName=broker-a, queueId=3] has no committed offset,use Strategy:LATEST instead 24/01/05 10:07:17 INFO org.apache.rocketmq.flink.legacy.common.util.RocketMQUtils: current consumer queue:broker-a-3 start from offset of: 0 24/01/05 10:07:17 INFO org.apache.rocketmq.flink.source.enumerator.RocketMQSourceEnumerator: Assigning splits to readers {0=[[Topic: topicx, Broker: broker-a, Partition: 1, StartingOffset: 0, StoppingTimestamp: 9223372036854775807], [Topic: topicx, Broker: broker-a, Partition: 0, StartingOffset: 0, StoppingTimestamp: 9223372036854775807], [Topic: topicx, Broker: broker-a, Partition: 3, StartingOffset: 0, StoppingTimestamp: 9223372036854775807], [Topic: topicx, Broker: broker-a, Partition: 2, StartingOffset: 0, StoppingTimestamp: 9223372036854775807]]} 24/01/05 10:07:17 INFO org.apache.flink.connector.base.source.reader.SourceReaderBase: Adding split(s) to reader: [[Topic: topicx, Broker: broker-a, Partition: 1, StartingOffset: 0, StoppingTimestamp: 9223372036854775807], [Topic: topicx, Broker: broker-a, Partition: 0, StartingOffset: 0, StoppingTimestamp: 9223372036854775807], [Topic: topicx, Broker: broker-a, Partition: 3, StartingOffset: 0, StoppingTimestamp: 9223372036854775807], [Topic: topicx, Broker: broker-a, Partition: 2, StartingOffset: 0, StoppingTimestamp: 9223372036854775807]] 24/01/05 10:07:17 INFO org.apache.flink.connector.base.source.reader.SourceReaderBase: Reader received NoMoreSplits event. 24/01/05 10:07:17 INFO org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher: Starting split fetcher 0

helpe me

luckydarnell avatar Jan 05 '24 06:01 luckydarnell

修改死信队列perm值为6,使消息可读

wsczm avatar Jan 05 '24 08:01 wsczm

修改死信队列perm值为6,使消息可读 1、没有创建死信队列 2、队列本身的perm值为6 现在的问题是我在idea里面跑是正常,启动集群在集群里面跑就有问题

luckydarnell avatar Jan 05 '24 08:01 luckydarnell

I have tried several times and found that RocketMQSourceFunction interface can run normally。so I guess there are some bugs in the RocketMQSource interface。

  • code
    public static RocketMQSourceFunction<TransData> createMqsource2(SourceCfg cfg, DataSourceCfg dataSource) {
        // 构造mq地址
        String[] hostNameSplit = dataSource.getHostname().split(SymbolConstant.SEPARATOR_SEMI_COLON);
        String addr = null;
        for (String s : hostNameSplit) {
            String joinStr = s + SymbolConstant.SEPARATOR_COLON + dataSource.getPort();
            addr = StringUtils.isEmpty(addr) ? joinStr : addr + SymbolConstant.SEPARATOR_SEMI_COLON + joinStr;
        }
        Properties consumerProps = new Properties();
        consumerProps.setProperty(RocketMQConfig.NAME_SERVER_ADDR, addr);
        consumerProps.setProperty(RocketMQConfig.CONSUMER_GROUP, cfg.getTag());
        consumerProps.setProperty(RocketMQConfig.CONSUMER_TOPIC, cfg.getTopic());
        KeyValueDeserializationSchema<TransData> deserializationSchema = MqDeserializationSchemaFactory.crateMqDeserializationSchema(cfg);
        // 判断启动模式
        OffsetResetStrategy startUpMode = cfg.getStartupMode() == null || cfg.getStartupMode().equals(StartupModeEnum.LATEST) ?
                OffsetResetStrategy.LATEST : OffsetResetStrategy.EARLIEST;

        RocketMQSourceFunction<TransData> source = new RocketMQSourceFunction(deserializationSchema, consumerProps);
        // use group offsets.
        // If there is no committed offset,consumer would start from the latest offset.
        source.setStartFromGroupOffsets(startUpMode);
        log.info("addr:{}, group:{}, tag:{}", addr, cfg.getConsumerGroup(), cfg.getTag());
        return source;
    }

luckydarnell avatar Jan 09 '24 03:01 luckydarnell