rocketmq-flink icon indicating copy to clipboard operation
rocketmq-flink copied to clipboard

[BUG] topic route changed,job can't recover normally

Open deemogsw opened this issue 3 years ago • 6 comments

1.Add partition Flink will check partition is changed in function of snapshotState.If route changed,job will switch state from running to failed. Then job will recover from checkpoint without new message queue record.Offset table alse has none new message queue,NPE will throw in RockertMQSourceFunction$run image

2.Decrease partition If partition decreased,can't connect execption will throw in RockertMQSourceFunction$run.After five reries,function of run will close normally.Job will switch state from running to finished rather than failed because thread poll will swallow the connection exception image

deemogsw avatar Jan 17 '23 02:01 deemogsw

Refer to the following commit https://github.com/deemogsw/rocketMQ-flink-connector/commit/b070213f3a2976f691d0a7811f197b12f8cc18fb

deemogsw avatar Jan 17 '23 06:01 deemogsw

I also encountered the same problem. After modification according to the above link, has the problem been fixed? @deemogsw

SOD-DOB avatar Feb 01 '23 11:02 SOD-DOB

@SOD-DOB Fixed! But it just work for RokcetMQSourceFounction. You can merge above commit to your own code or using the latest code in my private repertory.

deemogsw avatar Feb 06 '23 03:02 deemogsw

@deemogsw Have you ever encountered this kind of problem? It looks like I timed out while pulling messages, but nothing unusual was found on the server side of rocketmq

rocketmq-client version: 4.5.2

2023-02-27 08:47:14,109 WARN org.apache.rocketmq.flink.legacy.common.util.RetryUtil [] - RuntimeException, retry 5/5 java.lang.RuntimeException: org.apache.rocketmq.remoting.exception.RemotingSendRequestException: send request to <xx:10911> failed at org.apache.rocketmq.flink.legacy.RocketMQSourceFunction.lambda$null$1(RocketMQSourceFunction.java:389) ~[blob_p-62baae343e6f0a7fca0f63942204ff47885115d5-f336abddcb7f2a64f472c7c5d92f1394:?] at org.apache.rocketmq.flink.legacy.common.util.RetryUtil.call(RetryUtil.java:52) ~[blob_p-62baae343e6f0a7fca0f63942204ff47885115d5-f336abddcb7f2a64f472c7c5d92f1394:?] at org.apache.rocketmq.flink.legacy.RocketMQSourceFunction.lambda$run$2(RocketMQSourceFunction.java:279) ~[blob_p-62baae343e6f0a7fca0f63942204ff47885115d5-f336abddcb7f2a64f472c7c5d92f1394:?] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_302] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_302] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_302] Caused by: org.apache.rocketmq.remoting.exception.RemotingSendRequestException: send request to <xx:10911> failed at org.apache.rocketmq.remoting.netty.NettyRemotingAbstract.invokeSyncImpl(NettyRemotingAbstract.java:429) ~[blob_p-62baae343e6f0a7fca0f63942204ff47885115d5-f336abddcb7f2a64f472c7c5d92f1394:?] at org.apache.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:375) ~[blob_p-62baae343e6f0a7fca0f63942204ff47885115d5-f336abddcb7f2a64f472c7c5d92f1394:?] at org.apache.rocketmq.client.impl.MQClientAPIImpl.pullMessageSync(MQClientAPIImpl.java:737) ~[blob_p-62baae343e6f0a7fca0f63942204ff47885115d5-f336abddcb7f2a64f472c7c5d92f1394:?] at org.apache.rocketmq.client.impl.MQClientAPIImpl.pullMessage(MQClientAPIImpl.java:691) ~[blob_p-62baae343e6f0a7fca0f63942204ff47885115d5-f336abddcb7f2a64f472c7c5d92f1394:?] at org.apache.rocketmq.client.impl.consumer.PullAPIWrapper.pullKernelImpl(PullAPIWrapper.java:199) ~[blob_p-62baae343e6f0a7fca0f63942204ff47885115d5-f336abddcb7f2a64f472c7c5d92f1394:?] at org.apache.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl.pullSyncImpl(DefaultMQPullConsumerImpl.java:249) ~[blob_p-62baae343e6f0a7fca0f63942204ff47885115d5-f336abddcb7f2a64f472c7c5d92f1394:?] at org.apache.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl.pullBlockIfNotFound(DefaultMQPullConsumerImpl.java:529) ~[blob_p-62baae343e6f0a7fca0f63942204ff47885115d5-f336abddcb7f2a64f472c7c5d92f1394:?] at org.apache.rocketmq.client.consumer.DefaultMQPullConsumer.pullBlockIfNotFound(DefaultMQPullConsumer.java:364) ~[blob_p-62baae343e6f0a7fca0f63942204ff47885115d5-f336abddcb7f2a64f472c7c5d92f1394:?] at org.apache.rocketmq.flink.legacy.RocketMQSourceFunction.lambda$null$1(RocketMQSourceFunction.java:288) ~[blob_p-62baae343e6f0a7fca0f63942204ff47885115d5-f336abddcb7f2a64f472c7c5d92f1394:?] ... 5 more

SOD-DOB avatar Feb 27 '23 08:02 SOD-DOB

@SOD-DOB This exception looks like an inside error in RMQ broker.You can check the log in this machine. [send request to xx:10911 ]

deemogsw avatar Mar 06 '23 02:03 deemogsw

See https://github.com/apache/rocketmq-flink/pull/96

humkum avatar Mar 12 '24 13:03 humkum