rocketmq-client-go icon indicating copy to clipboard operation
rocketmq-client-go copied to clipboard

still consumption blocked after issue #944

Open hxzqlh opened this issue 3 years ago • 5 comments

I used the commit: f3349bd which says it fixed the push consumer blocked issue #944. But in my experience, consumer is still blocked while rocket mq actually has newest msg produced.

some logs like this:

time="2022-10-14T15:28:09Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=%RETRY%spot_quote_kline_test, brokerName=rocketmq-internal-0, queueId=0]" consumerGroup=spot_quote_kline_test offset=221
time="2022-10-14T15:28:09Z" level=info msg="Stats In One Minute." AVGPT=0 SUM=0 TPS=0.00 statsKey=rocket_mq_test@spot_quote_kline_test statsName=PULL_RT
time="2022-10-14T15:28:09Z" level=info msg="Stats In One Minute." AVGPT=0 SUM=0 TPS=0.00 statsKey="%RETRY%spot_quote_kline_test@spot_quote_kline_test" statsName=PULL_RT
time="2022-10-14T15:28:09Z" level=info msg="Stats In One Minute." AVGPT=0 SUM=0 TPS=0.00 statsKey=rocket_mq_test@spot_quote_kline_test statsName=CONSUME_RT
time="2022-10-14T15:28:09Z" level=info msg="Stats In One Minute." AVGPT=0 SUM=0 TPS=0.00 statsKey="%RETRY%spot_quote_kline_test@spot_quote_kline_test" statsName=CONSUME_RT
time="2022-10-14T15:28:09Z" level=info msg="Stats In One Minute." AVGPT=0 SUM=0 TPS=0.00 statsKey=rocket_mq_test@spot_quote_kline_test statsName=CONSUME_OK_TPS
time="2022-10-14T15:28:09Z" level=info msg="Stats In One Minute." AVGPT=0 SUM=0 TPS=0.00 statsKey="%RETRY%spot_quote_kline_test@spot_quote_kline_test" statsName=CONSUME_OK_TPS
time="2022-10-14T15:28:09Z" level=info msg="Stats In One Minute." AVGPT=0 SUM=0 TPS=0.00 statsKey=rocket_mq_test@spot_quote_kline_test statsName=PULL_TPS
time="2022-10-14T15:28:09Z" level=info msg="Stats In One Minute." AVGPT=0 SUM=0 TPS=0.00 statsKey="%RETRY%spot_quote_kline_test@spot_quote_kline_test" statsName=PULL_TPS
time="2022-10-14T15:28:10Z" level=warning msg="the queue's messages span too long, so do flow control" ConsumeConcurrentlyMaxSpan=1000 PullRequest="[ConsumerGroup: spot_quote_kline_test, Topic: rocket_mq_test, MessageQueue: brokerName=rocketmq-internal-0, queueId=0, nextOffset=2035]" flowControlTimes=0 maxOffset=2034 maxSpan=1030 minOffset=1004
time="2022-10-14T15:28:14Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=2]" consumerGroup=spot_quote_kline_test offset=956
time="2022-10-14T15:28:14Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=3]" consumerGroup=spot_quote_kline_test offset=1001
time="2022-10-14T15:28:14Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=%RETRY%spot_quote_kline_test, brokerName=rocketmq-internal-0, queueId=0]" consumerGroup=spot_quote_kline_test offset=221
time="2022-10-14T15:28:14Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=0]" consumerGroup=spot_quote_kline_test offset=1004
time="2022-10-14T15:28:14Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=1]" consumerGroup=spot_quote_kline_test offset=983
time="2022-10-14T15:28:19Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=1]" consumerGroup=spot_quote_kline_test offset=983
time="2022-10-14T15:28:19Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=2]" consumerGroup=spot_quote_kline_test offset=956
time="2022-10-14T15:28:19Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=3]" consumerGroup=spot_quote_kline_test offset=1001
time="2022-10-14T15:28:19Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=%RETRY%spot_quote_kline_test, brokerName=rocketmq-internal-0, queueId=0]" consumerGroup=spot_quote_kline_test offset=221
time="2022-10-14T15:28:19Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=0]" consumerGroup=spot_quote_kline_test offset=1004
time="2022-10-14T15:28:23Z" level=warning msg="the queue's messages span too long, so do flow control" ConsumeConcurrentlyMaxSpan=1000 PullRequest="[ConsumerGroup: spot_quote_kline_test, Topic: rocket_mq_test, MessageQueue: brokerName=rocketmq-internal-0, queueId=0, nextOffset=2035]" flowControlTimes=0 maxOffset=2034 maxSpan=1030 minOffset=1004
time="2022-10-14T15:28:24Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=1]" consumerGroup=spot_quote_kline_test offset=983
time="2022-10-14T15:28:24Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=2]" consumerGroup=spot_quote_kline_test offset=956
time="2022-10-14T15:28:24Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=3]" consumerGroup=spot_quote_kline_test offset=1001
time="2022-10-14T15:28:24Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=%RETRY%spot_quote_kline_test, brokerName=rocketmq-internal-0, queueId=0]" consumerGroup=spot_quote_kline_test offset=221
time="2022-10-14T15:28:24Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=0]" consumerGroup=spot_quote_kline_test offset=1004
time="2022-10-14T15:28:29Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=1]" consumerGroup=spot_quote_kline_test offset=983
time="2022-10-14T15:28:29Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=2]" consumerGroup=spot_quote_kline_test offset=956
time="2022-10-14T15:28:29Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=3]" consumerGroup=spot_quote_kline_test offset=1001
time="2022-10-14T15:28:29Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=%RETRY%spot_quote_kline_test, brokerName=rocketmq-internal-0, queueId=0]" consumerGroup=spot_quote_kline_test offset=221
time="2022-10-14T15:28:29Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=0]" consumerGroup=spot_quote_kline_test offset=1004
time="2022-10-14T15:28:34Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=0]" consumerGroup=spot_quote_kline_test offset=1004
time="2022-10-14T15:28:34Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=1]" consumerGroup=spot_quote_kline_test offset=983
time="2022-10-14T15:28:34Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=2]" consumerGroup=spot_quote_kline_test offset=956
time="2022-10-14T15:28:34Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=3]" consumerGroup=spot_quote_kline_test offset=1001
time="2022-10-14T15:28:34Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=%RETRY%spot_quote_kline_test, brokerName=rocketmq-internal-0, queueId=0]" consumerGroup=spot_quote_kline_test offset=221
time="2022-10-14T15:28:36Z" level=warning msg="the queue's messages span too long, so do flow control" ConsumeConcurrentlyMaxSpan=1000 PullRequest="[ConsumerGroup: spot_quote_kline_test, Topic: rocket_mq_test, MessageQueue: brokerName=rocketmq-internal-0, queueId=0, nextOffset=2035]" flowControlTimes=0 maxOffset=2034 maxSpan=1030 minOffset=1004
time="2022-10-14T15:28:39Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=0]" consumerGroup=spot_quote_kline_test offset=1004
time="2022-10-14T15:28:39Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=1]" consumerGroup=spot_quote_kline_test offset=983
time="2022-10-14T15:28:39Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=2]" consumerGroup=spot_quote_kline_test offset=956
time="2022-10-14T15:28:39Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=3]" consumerGroup=spot_quote_kline_test offset=1001
time="2022-10-14T15:28:39Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=%RETRY%spot_quote_kline_test, brokerName=rocketmq-internal-0, queueId=0]" consumerGroup=spot_quote_kline_test offset=221
time="2022-10-14T15:28:44Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=1]" consumerGroup=spot_quote_kline_test offset=983
time="2022-10-14T15:28:44Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=2]" consumerGroup=spot_quote_kline_test offset=956
time="2022-10-14T15:28:44Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=3]" consumerGroup=spot_quote_kline_test offset=1001
time="2022-10-14T15:28:44Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=%RETRY%spot_quote_kline_test, brokerName=rocketmq-internal-0, queueId=0]" consumerGroup=spot_quote_kline_test offset=221
time="2022-10-14T15:28:44Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=0]" consumerGroup=spot_quote_kline_test offset=1004
time="2022-10-14T15:28:48Z" level=warning msg="the queue's messages span too long, so do flow control" ConsumeConcurrentlyMaxSpan=1000 PullRequest="[ConsumerGroup: spot_quote_kline_test, Topic: rocket_mq_test, MessageQueue: brokerName=rocketmq-internal-0, queueId=0, nextOffset=2035]" flowControlTimes=0 maxOffset=2034 maxSpan=1030 minOffset=1004
time="2022-10-14T15:28:49Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=1]" consumerGroup=spot_quote_kline_test offset=983
time="2022-10-14T15:28:49Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=2]" consumerGroup=spot_quote_kline_test offset=956
time="2022-10-14T15:28:49Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=3]" consumerGroup=spot_quote_kline_test offset=1001
time="2022-10-14T15:28:49Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=%RETRY%spot_quote_kline_test, brokerName=rocketmq-internal-0, queueId=0]" consumerGroup=spot_quote_kline_test offset=221
time="2022-10-14T15:28:49Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=0]" consumerGroup=spot_quote_kline_test offset=1004
time="2022-10-14T15:28:54Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=1]" consumerGroup=spot_quote_kline_test offset=983
time="2022-10-14T15:28:54Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=2]" consumerGroup=spot_quote_kline_test offset=956
time="2022-10-14T15:28:54Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=3]" consumerGroup=spot_quote_kline_test offset=1001
time="2022-10-14T15:28:54Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=%RETRY%spot_quote_kline_test, brokerName=rocketmq-internal-0, queueId=0]" consumerGroup=spot_quote_kline_test offset=221
time="2022-10-14T15:28:54Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=0]" consumerGroup=spot_quote_kline_test offset=1004
time="2022-10-14T15:28:59Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=0]" consumerGroup=spot_quote_kline_test offset=1004
time="2022-10-14T15:28:59Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=1]" consumerGroup=spot_quote_kline_test offset=983
time="2022-10-14T15:28:59Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=2]" consumerGroup=spot_quote_kline_test offset=956
time="2022-10-14T15:28:59Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=3]" consumerGroup=spot_quote_kline_test offset=1001
time="2022-10-14T15:28:59Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=%RETRY%spot_quote_kline_test, brokerName=rocketmq-internal-0, queueId=0]" consumerGroup=spot_quote_kline_test offset=221
time="2022-10-14T15:29:01Z" level=warning msg="the queue's messages span too long, so do flow control" ConsumeConcurrentlyMaxSpan=1000 PullRequest="[ConsumerGroup: spot_quote_kline_test, Topic: rocket_mq_test, MessageQueue: brokerName=rocketmq-internal-0, queueId=2, nextOffset=2099]" flowControlTimes=0 maxOffset=2098 maxSpan=1142 minOffset=956
time="2022-10-14T15:29:04Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=1]" consumerGroup=spot_quote_kline_test offset=983
time="2022-10-14T15:29:04Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=2]" consumerGroup=spot_quote_kline_test offset=956
time="2022-10-14T15:29:04Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=3]" consumerGroup=spot_quote_kline_test offset=1001
time="2022-10-14T15:29:04Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=%RETRY%spot_quote_kline_test, brokerName=rocketmq-internal-0, queueId=0]" consumerGroup=spot_quote_kline_test offset=221
time="2022-10-14T15:29:04Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=0]" consumerGroup=spot_quote_kline_test offset=1004
time="2022-10-14T15:29:09Z" level=info msg="Stats In One Minute." AVGPT=0 SUM=0 TPS=0.00 statsKey="%RETRY%spot_quote_kline_test@spot_quote_kline_test" statsName=CONSUME_OK_TPS
time="2022-10-14T15:29:09Z" level=info msg="Stats In One Minute." AVGPT=0 SUM=0 TPS=0.00 statsKey=rocket_mq_test@spot_quote_kline_test statsName=CONSUME_OK_TPS
time="2022-10-14T15:29:09Z" level=info msg="Stats In One Minute." AVGPT=0 SUM=0 TPS=0.00 statsKey=rocket_mq_test@spot_quote_kline_test statsName=PULL_RT
time="2022-10-14T15:29:09Z" level=info msg="Stats In One Minute." AVGPT=0 SUM=0 TPS=0.00 statsKey="%RETRY%spot_quote_kline_test@spot_quote_kline_test" statsName=PULL_RT
time="2022-10-14T15:29:09Z" level=info msg="Stats In One Minute." AVGPT=0 SUM=0 TPS=0.00 statsKey=rocket_mq_test@spot_quote_kline_test statsName=PULL_TPS
time="2022-10-14T15:29:09Z" level=info msg="Stats In One Minute." AVGPT=0 SUM=0 TPS=0.00 statsKey="%RETRY%spot_quote_kline_test@spot_quote_kline_test" statsName=PULL_TPS
time="2022-10-14T15:29:09Z" level=info msg="Stats In One Minute." AVGPT=0 SUM=0 TPS=0.00 statsKey="%RETRY%spot_quote_kline_test@spot_quote_kline_test" statsName=CONSUME_RT
time="2022-10-14T15:29:09Z" level=info msg="Stats In One Minute." AVGPT=0 SUM=0 TPS=0.00 statsKey=rocket_mq_test@spot_quote_kline_test statsName=CONSUME_RT
time="2022-10-14T15:29:09Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=0]" consumerGroup=spot_quote_kline_test offset=1004
time="2022-10-14T15:29:09Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=1]" consumerGroup=spot_quote_kline_test offset=983
time="2022-10-14T15:29:09Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=2]" consumerGroup=spot_quote_kline_test offset=956
time="2022-10-14T15:29:09Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=3]" consumerGroup=spot_quote_kline_test offset=1001
time="2022-10-14T15:29:09Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=%RETRY%spot_quote_kline_test, brokerName=rocketmq-internal-0, queueId=0]" consumerGroup=spot_quote_kline_test offset=221
time="2022-10-14T15:29:14Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=1]" consumerGroup=spot_quote_kline_test offset=983
time="2022-10-14T15:29:14Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=2]" consumerGroup=spot_quote_kline_test offset=956
time="2022-10-14T15:29:14Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=3]" consumerGroup=spot_quote_kline_test offset=1001
time="2022-10-14T15:29:14Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=%RETRY%spot_quote_kline_test, brokerName=rocketmq-internal-0, queueId=0]" consumerGroup=spot_quote_kline_test offset=221
time="2022-10-14T15:29:14Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=0]" consumerGroup=spot_quote_kline_test offset=1004
time="2022-10-14T15:29:14Z" level=warning msg="the queue's messages span too long, so do flow control" ConsumeConcurrentlyMaxSpan=1000 PullRequest="[ConsumerGroup: spot_quote_kline_test, Topic: rocket_mq_test, MessageQueue: brokerName=rocketmq-internal-0, queueId=2, nextOffset=2099]" flowControlTimes=0 maxOffset=2098 maxSpan=1142 minOffset=956
time="2022-10-14T15:29:19Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=3]" consumerGroup=spot_quote_kline_test offset=1001
time="2022-10-14T15:29:19Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=%RETRY%spot_quote_kline_test, brokerName=rocketmq-internal-0, queueId=0]" consumerGroup=spot_quote_kline_test offset=221
time="2022-10-14T15:29:19Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=0]" consumerGroup=spot_quote_kline_test offset=1004
time="2022-10-14T15:29:19Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=1]" consumerGroup=spot_quote_kline_test offset=983
time="2022-10-14T15:29:19Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=2]" consumerGroup=spot_quote_kline_test offset=956
time="2022-10-14T15:29:24Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=%RETRY%spot_quote_kline_test, brokerName=rocketmq-internal-0, queueId=0]" consumerGroup=spot_quote_kline_test offset=221
time="2022-10-14T15:29:24Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=0]" consumerGroup=spot_quote_kline_test offset=1004
time="2022-10-14T15:29:24Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=1]" consumerGroup=spot_quote_kline_test offset=983
time="2022-10-14T15:29:24Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=2]" consumerGroup=spot_quote_kline_test offset=956
time="2022-10-14T15:29:24Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=3]" consumerGroup=spot_quote_kline_test offset=1001
time="2022-10-14T15:29:27Z" level=warning msg="the queue's messages span too long, so do flow control" ConsumeConcurrentlyMaxSpan=1000 PullRequest="[ConsumerGroup: spot_quote_kline_test, Topic: rocket_mq_test, MessageQueue: brokerName=rocketmq-internal-0, queueId=2, nextOffset=2099]" flowControlTimes=0 maxOffset=2098 maxSpan=1142 minOffset=956
time="2022-10-14T15:29:29Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=0]" consumerGroup=spot_quote_kline_test offset=1004
time="2022-10-14T15:29:29Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=1]" consumerGroup=spot_quote_kline_test offset=983
time="2022-10-14T15:29:29Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=2]" consumerGroup=spot_quote_kline_test offset=956
time="2022-10-14T15:29:29Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=3]" consumerGroup=spot_quote_kline_test offset=1001
time="2022-10-14T15:29:29Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=%RETRY%spot_quote_kline_test, brokerName=rocketmq-internal-0, queueId=0]" consumerGroup=spot_quote_kline_test offset=221
time="2022-10-14T15:29:34Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=0]" consumerGroup=spot_quote_kline_test offset=1004
time="2022-10-14T15:29:34Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=1]" consumerGroup=spot_quote_kline_test offset=983
time="2022-10-14T15:29:34Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=2]" consumerGroup=spot_quote_kline_test offset=956
time="2022-10-14T15:29:34Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=3]" consumerGroup=spot_quote_kline_test offset=1001
time="2022-10-14T15:29:34Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=%RETRY%spot_quote_kline_test, brokerName=rocketmq-internal-0, queueId=0]" consumerGroup=spot_quote_kline_test offset=221
time="2022-10-14T15:29:39Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=1]" consumerGroup=spot_quote_kline_test offset=983
time="2022-10-14T15:29:39Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=2]" consumerGroup=spot_quote_kline_test offset=956
time="2022-10-14T15:29:39Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=3]" consumerGroup=spot_quote_kline_test offset=1001
time="2022-10-14T15:29:39Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=%RETRY%spot_quote_kline_test, brokerName=rocketmq-internal-0, queueId=0]" consumerGroup=spot_quote_kline_test offset=221
time="2022-10-14T15:29:39Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=0]" consumerGroup=spot_quote_kline_test offset=1004
time="2022-10-14T15:29:39Z" level=warning msg="the queue's messages span too long, so do flow control" ConsumeConcurrentlyMaxSpan=1000 PullRequest="[ConsumerGroup: spot_quote_kline_test, Topic: rocket_mq_test, MessageQueue: brokerName=rocketmq-internal-0, queueId=2, nextOffset=2099]" flowControlTimes=0 maxOffset=2098 maxSpan=1142 minOffset=956
time="2022-10-14T15:29:44Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=0]" consumerGroup=spot_quote_kline_test offset=1004
time="2022-10-14T15:29:44Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=1]" consumerGroup=spot_quote_kline_test offset=983
time="2022-10-14T15:29:44Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=2]" consumerGroup=spot_quote_kline_test offset=956
time="2022-10-14T15:29:44Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=3]" consumerGroup=spot_quote_kline_test offset=1001
time="2022-10-14T15:29:44Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=%RETRY%spot_quote_kline_test, brokerName=rocketmq-internal-0, queueId=0]" consumerGroup=spot_quote_kline_test offset=221
time="2022-10-14T15:29:49Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=3]" consumerGroup=spot_quote_kline_test offset=1001
time="2022-10-14T15:29:49Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=%RETRY%spot_quote_kline_test, brokerName=rocketmq-internal-0, queueId=0]" consumerGroup=spot_quote_kline_test offset=221
time="2022-10-14T15:29:49Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=0]" consumerGroup=spot_quote_kline_test offset=1004
time="2022-10-14T15:29:49Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=1]" consumerGroup=spot_quote_kline_test offset=983
time="2022-10-14T15:29:49Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=2]" consumerGroup=spot_quote_kline_test offset=956
time="2022-10-14T15:29:52Z" level=warning msg="the queue's messages span too long, so do flow control" ConsumeConcurrentlyMaxSpan=1000 PullRequest="[ConsumerGroup: spot_quote_kline_test, Topic: rocket_mq_test, MessageQueue: brokerName=rocketmq-internal-0, queueId=2, nextOffset=2099]" flowControlTimes=0 maxOffset=2098 maxSpan=1142 minOffset=956
time="2022-10-14T15:29:54Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=1]" consumerGroup=spot_quote_kline_test offset=983
time="2022-10-14T15:29:54Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=2]" consumerGroup=spot_quote_kline_test offset=956
time="2022-10-14T15:29:54Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=3]" consumerGroup=spot_quote_kline_test offset=1001
time="2022-10-14T15:29:54Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=%RETRY%spot_quote_kline_test, brokerName=rocketmq-internal-0, queueId=0]" consumerGroup=spot_quote_kline_test offset=221
time="2022-10-14T15:29:54Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=0]" consumerGroup=spot_quote_kline_test offset=1004
time="2022-10-14T15:29:59Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=3]" consumerGroup=spot_quote_kline_test offset=1001
time="2022-10-14T15:29:59Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=%RETRY%spot_quote_kline_test, brokerName=rocketmq-internal-0, queueId=0]" consumerGroup=spot_quote_kline_test offset=221
time="2022-10-14T15:29:59Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=0]" consumerGroup=spot_quote_kline_test offset=1004
time="2022-10-14T15:29:59Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=1]" consumerGroup=spot_quote_kline_test offset=983
time="2022-10-14T15:29:59Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=2]" consumerGroup=spot_quote_kline_test offset=956
time="2022-10-14T15:30:04Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=0]" consumerGroup=spot_quote_kline_test offset=1004
time="2022-10-14T15:30:04Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=1]" consumerGroup=spot_quote_kline_test offset=983
time="2022-10-14T15:30:04Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=2]" consumerGroup=spot_quote_kline_test offset=956
time="2022-10-14T15:30:04Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=3]" consumerGroup=spot_quote_kline_test offset=1001
time="2022-10-14T15:30:04Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=%RETRY%spot_quote_kline_test, brokerName=rocketmq-internal-0, queueId=0]" consumerGroup=spot_quote_kline_test offset=221
time="2022-10-14T15:30:05Z" level=warning msg="the queue's messages span too long, so do flow control" ConsumeConcurrentlyMaxSpan=1000 PullRequest="[ConsumerGroup: spot_quote_kline_test, Topic: rocket_mq_test, MessageQueue: brokerName=rocketmq-internal-0, queueId=1, nextOffset=2836]" flowControlTimes=0 maxOffset=2835 maxSpan=1852 minOffset=983
time="2022-10-14T15:30:09Z" level=info msg="Stats In One Minute." AVGPT=0 SUM=0 TPS=0.00 statsKey=rocket_mq_test@spot_quote_kline_test statsName=PULL_RT
time="2022-10-14T15:30:09Z" level=info msg="Stats In One Minute." AVGPT=0 SUM=0 TPS=0.00 statsKey=rocket_mq_test@spot_quote_kline_test statsName=CONSUME_OK_TPS
time="2022-10-14T15:30:09Z" level=info msg="Stats In One Minute." AVGPT=0 SUM=0 TPS=0.00 statsKey=rocket_mq_test@spot_quote_kline_test statsName=PULL_TPS
time="2022-10-14T15:30:09Z" level=info msg="Stats In One Minute." AVGPT=0 SUM=0 TPS=0.00 statsKey="%RETRY%spot_quote_kline_test@spot_quote_kline_test" statsName=PULL_TPS
time="2022-10-14T15:30:09Z" level=info msg="Stats In One Minute." AVGPT=0 SUM=0 TPS=0.00 statsKey="%RETRY%spot_quote_kline_test@spot_quote_kline_test" statsName=CONSUME_OK_TPS
time="2022-10-14T15:30:09Z" level=info msg="Stats In One Minute." AVGPT=0 SUM=0 TPS=0.00 statsKey="%RETRY%spot_quote_kline_test@spot_quote_kline_test" statsName=PULL_RT
time="2022-10-14T15:30:09Z" level=info msg="Stats In One Minute." AVGPT=0 SUM=0 TPS=0.00 statsKey=rocket_mq_test@spot_quote_kline_test statsName=CONSUME_RT
time="2022-10-14T15:30:09Z" level=info msg="Stats In One Minute." AVGPT=0 SUM=0 TPS=0.00 statsKey="%RETRY%spot_quote_kline_test@spot_quote_kline_test" statsName=CONSUME_RT
time="2022-10-14T15:30:09Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=2]" consumerGroup=spot_quote_kline_test offset=956
time="2022-10-14T15:30:09Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=3]" consumerGroup=spot_quote_kline_test offset=1001
time="2022-10-14T15:30:09Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=%RETRY%spot_quote_kline_test, brokerName=rocketmq-internal-0, queueId=0]" consumerGroup=spot_quote_kline_test offset=221
time="2022-10-14T15:30:09Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=0]" consumerGroup=spot_quote_kline_test offset=1004
time="2022-10-14T15:30:09Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=1]" consumerGroup=spot_quote_kline_test offset=983
time="2022-10-14T15:30:14Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=3]" consumerGroup=spot_quote_kline_test offset=1001
time="2022-10-14T15:30:14Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=%RETRY%spot_quote_kline_test, brokerName=rocketmq-internal-0, queueId=0]" consumerGroup=spot_quote_kline_test offset=221
time="2022-10-14T15:30:14Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=0]" consumerGroup=spot_quote_kline_test offset=1004
time="2022-10-14T15:30:14Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=1]" consumerGroup=spot_quote_kline_test offset=983
time="2022-10-14T15:30:14Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=2]" consumerGroup=spot_quote_kline_test offset=956
time="2022-10-14T15:30:17Z" level=warning msg="the queue's messages span too long, so do flow control" ConsumeConcurrentlyMaxSpan=1000 PullRequest="[ConsumerGroup: spot_quote_kline_test, Topic: rocket_mq_test, MessageQueue: brokerName=rocketmq-internal-0, queueId=2, nextOffset=2099]" flowControlTimes=0 maxOffset=2098 maxSpan=1142 minOffset=956
time="2022-10-14T15:30:19Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=3]" consumerGroup=spot_quote_kline_test offset=1001
time="2022-10-14T15:30:19Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=%RETRY%spot_quote_kline_test, brokerName=rocketmq-internal-0, queueId=0]" consumerGroup=spot_quote_kline_test offset=221
time="2022-10-14T15:30:19Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=0]" consumerGroup=spot_quote_kline_test offset=1004
time="2022-10-14T15:30:19Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=1]" consumerGroup=spot_quote_kline_test offset=983
time="2022-10-14T15:30:19Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=2]" consumerGroup=spot_quote_kline_test offset=956
time="2022-10-14T15:30:24Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=1]" consumerGroup=spot_quote_kline_test offset=983
time="2022-10-14T15:30:24Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=2]" consumerGroup=spot_quote_kline_test offset=956
time="2022-10-14T15:30:24Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=3]" consumerGroup=spot_quote_kline_test offset=1001
time="2022-10-14T15:30:24Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=%RETRY%spot_quote_kline_test, brokerName=rocketmq-internal-0, queueId=0]" consumerGroup=spot_quote_kline_test offset=221
time="2022-10-14T15:30:24Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=0]" consumerGroup=spot_quote_kline_test offset=1004
time="2022-10-14T15:30:29Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=2]" consumerGroup=spot_quote_kline_test offset=956
time="2022-10-14T15:30:29Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=3]" consumerGroup=spot_quote_kline_test offset=1001
time="2022-10-14T15:30:29Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=%RETRY%spot_quote_kline_test, brokerName=rocketmq-internal-0, queueId=0]" consumerGroup=spot_quote_kline_test offset=221
time="2022-10-14T15:30:29Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=0]" consumerGroup=spot_quote_kline_test offset=1004
time="2022-10-14T15:30:29Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=1]" consumerGroup=spot_quote_kline_test offset=983
time="2022-10-14T15:30:30Z" level=warning msg="the queue's messages span too long, so do flow control" ConsumeConcurrentlyMaxSpan=1000 PullRequest="[ConsumerGroup: spot_quote_kline_test, Topic: rocket_mq_test, MessageQueue: brokerName=rocketmq-internal-0, queueId=0, nextOffset=2035]" flowControlTimes=0 maxOffset=2034 maxSpan=1030 minOffset=1004
time="2022-10-14T15:30:34Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=0]" consumerGroup=spot_quote_kline_test offset=1004
time="2022-10-14T15:30:34Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=1]" consumerGroup=spot_quote_kline_test offset=983
time="2022-10-14T15:30:34Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=2]" consumerGroup=spot_quote_kline_test offset=956
time="2022-10-14T15:30:34Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=3]" consumerGroup=spot_quote_kline_test offset=1001
time="2022-10-14T15:30:34Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=%RETRY%spot_quote_kline_test, brokerName=rocketmq-internal-0, queueId=0]" consumerGroup=spot_quote_kline_test offset=221
time="2022-10-14T15:30:39Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=0]" consumerGroup=spot_quote_kline_test offset=1004
time="2022-10-14T15:30:39Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=1]" consumerGroup=spot_quote_kline_test offset=983
time="2022-10-14T15:30:39Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=2]" consumerGroup=spot_quote_kline_test offset=956
time="2022-10-14T15:30:39Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=3]" consumerGroup=spot_quote_kline_test offset=1001
time="2022-10-14T15:30:39Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=%RETRY%spot_quote_kline_test, brokerName=rocketmq-internal-0, queueId=0]" consumerGroup=spot_quote_kline_test offset=221
time="2022-10-14T15:30:43Z" level=warning msg="the queue's messages span too long, so do flow control" ConsumeConcurrentlyMaxSpan=1000 PullRequest="[ConsumerGroup: spot_quote_kline_test, Topic: rocket_mq_test, MessageQueue: brokerName=rocketmq-internal-0, queueId=0, nextOffset=2035]" flowControlTimes=0 maxOffset=2034 maxSpan=1030 minOffset=1004
time="2022-10-14T15:30:43Z" level=warning msg="the queue's messages span too long, so do flow control" ConsumeConcurrentlyMaxSpan=1000 PullRequest="[ConsumerGroup: spot_quote_kline_test, Topic: rocket_mq_test, MessageQueue: brokerName=rocketmq-internal-0, queueId=1, nextOffset=2836]" flowControlTimes=0 maxOffset=2835 maxSpan=1852 minOffset=983
time="2022-10-14T15:30:43Z" level=warning msg="the queue's messages span too long, so do flow control" ConsumeConcurrentlyMaxSpan=1000 PullRequest="[ConsumerGroup: spot_quote_kline_test, Topic: rocket_mq_test, MessageQueue: brokerName=rocketmq-internal-0, queueId=3, nextOffset=2407]" flowControlTimes=0 maxOffset=2406 maxSpan=1405 minOffset=1001
time="2022-10-14T15:30:44Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=0]" consumerGroup=spot_quote_kline_test offset=1004
time="2022-10-14T15:30:44Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=1]" consumerGroup=spot_quote_kline_test offset=983
time="2022-10-14T15:30:44Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=2]" consumerGroup=spot_quote_kline_test offset=956
time="2022-10-14T15:30:44Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=3]" consumerGroup=spot_quote_kline_test offset=1001
time="2022-10-14T15:30:44Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=%RETRY%spot_quote_kline_test, brokerName=rocketmq-internal-0, queueId=0]" consumerGroup=spot_quote_kline_test offset=221
time="2022-10-14T15:30:49Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=0]" consumerGroup=spot_quote_kline_test offset=1004
time="2022-10-14T15:30:49Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=1]" consumerGroup=spot_quote_kline_test offset=983
time="2022-10-14T15:30:49Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=2]" consumerGroup=spot_quote_kline_test offset=956
time="2022-10-14T15:30:49Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=rocket_mq_test, brokerName=rocketmq-internal-0, queueId=3]" consumerGroup=spot_quote_kline_test offset=1001
time="2022-10-14T15:30:49Z" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=%RETRY%spot_quote_kline_test, brokerName=rocketmq-internal-0, queueId=0]" consumerGroup=spot_quote_kline_test offset=221

then, after some minutes, the client continue consumed out new msg.

For short, the client do the loop: consumed some msg ---> blocked for a period of time ----> continue consumed out msg ----> blocked for a period of time ---> continue consumed out msg...

PS:

  1. rocket mq always has new msg produced.
  2. consumer just print the msg received, no other logics.

consumer code like this:

c, err := rocketmq.NewPushConsumer(
		consumer.WithGroupName(group),
		consumer.WithConsumeFromWhere(consumer.ConsumeFromLastOffset),
		consumer.WithRetry(3),
		consumer.WithNsResolver(primitive.NewEnvResolver()))
	if err != nil {
		panic(err)
	}

	if err := c.Subscribe(topic, consumer.MessageSelector{}, h.OnConsumeMessage); err != nil {
		panic(err)
	}

	// Note: start after subscribe
	if err := c.Start(); err != nil {
		panic(err)
	}

UPDATED: when produced 1 msg per second, the block issue didn't happen. when produced 10 msg per second, the block issue always happen.

WHY:

  1. v2.1.1 has a bug which was fixed by commit ffd72416 in issue #886, and push consumer can normally consume msg in commit ffd72416 .
  2. the bug sources from ffd72416 to f3349bdb

hxzqlh avatar Oct 14 '22 15:10 hxzqlh

see #927 #928, you can try it

0daypwn avatar Oct 17 '22 01:10 0daypwn

This problem has not been solved? I had the same problem in v.2.1.1

INFO[1665] update offset to broker success MessageQueue="MessageQueue [topic=test, brokerName=broker-a, queueId=2]" consumerGroup=GID_Test offset=3 INFO[1665] update offset to broker success MessageQueue="MessageQueue [topic=test, brokerName=broker-a, queueId=3]" consumerGroup=GID_Test offset=2 INFO[1665] update offset to broker success MessageQueue="MessageQueue [topic=test, brokerName=broker-a, queueId=0]" consumerGroup=GID_Test offset=2 INFO[1670] update offset to broker success MessageQueue="MessageQueue [topic=%RETRY%GID_Test, brokerName=broker-a, queueId=0]" consumerGroup=GID_Test offset=0 INFO[1670] update offset to broker success MessageQueue="MessageQueue [topic=test, brokerName=broker-a, queueId=1]" consumerGroup=GID_Test offset=4 INFO[1670] update offset to broker success MessageQueue="MessageQueue [topic=test, brokerName=broker-a, queueId=2]" consumerGroup=GID_Test offset=3 INFO[1670] update offset to broker success MessageQueue="MessageQueue [topic=test, brokerName=broker-a, queueId=3]" consumerGroup=GID_Test offset=2 INFO[1670] update offset to broker success MessageQueue="MessageQueue [topic=test, brokerName=broker-a, queueId=0]" consumerGroup=GID_Test offset=2 INFO[1675] update offset to broker success MessageQueue="MessageQueue [topic=test, brokerName=broker-a, queueId=3]" consumerGroup=GID_Test offset=2 INFO[1675] update offset to broker success MessageQueue="MessageQueue [topic=test, brokerName=broker-a, queueId=0]" consumerGroup=GID_Test offset=2 INFO[1675] update offset to broker success MessageQueue="MessageQueue [topic=%RETRY%GID_Test, brokerName=broker-a, queueId=0]" consumerGroup=GID_Test offset=0 INFO[1675] update offset to broker success MessageQueue="MessageQueue [topic=test, brokerName=broker-a, queueId=1]" consumerGroup=GID_Test offset=4 INFO[1675] update offset to broker success MessageQueue="MessageQueue [topic=test, brokerName=broker-a, queueId=2]" consumerGroup=GID_Test offset=3 INFO[1680] update offset to broker success MessageQueue="MessageQueue [topic=%RETRY%GID_Test, brokerName=broker-a, queueId=0]" consumerGroup=GID_Test offset=0 INFO[1680] update offset to broker success MessageQueue="MessageQueue [topic=test, brokerName=broker-a, queueId=1]" consumerGroup=GID_Test offset=4 INFO[1680] update offset to broker success MessageQueue="MessageQueue [topic=test, brokerName=broker-a, queueId=2]" consumerGroup=GID_Test offset=3 INFO[1680] update offset to broker success MessageQueue="MessageQueue [topic=test, brokerName=broker-a, queueId=3]" consumerGroup=GID_Test offset=2 INFO[1680] update offset to broker success MessageQueue="MessageQueue [topic=test, brokerName=broker-a, queueId=0]" consumerGroup=GID_Test offset=2

xuehuizhang avatar Jan 05 '23 09:01 xuehuizhang

the queue's messages span too long, so do flow control"

一般的, 有这种日志

time="2022-10-14T15:30:30Z" level=warning msg="the queue's messages span too long, so do flow control" ConsumeConcurrentlyMaxSpan=1000 PullRequest="[ConsumerGroup: spot_quote_kline_test, Topic: rocket_mq_test, MessageQueue: brokerName=rocketmq-internal-0, queueId=0, nextOffset=2035]" flowControlTimes=0 maxOffset=2034 maxSpan=1030 minOffset=1004

表示消息消费消费太慢,本地缓存消息没有被及时消费,引起client的pull线程挂起,这个也可以成为 “背压机制” 代码如下:https://github.com/apache/rocketmq-client-go/blob/26142da1b88ef22ac018138c724f5d4f48059da3/consumer/push_consumer.go#L715-L729

建议记录下本地消费耗时,以此来确认下是客户端背压机制有问题, 还是消费慢。

francisoliverlee avatar Feb 22 '23 03:02 francisoliverlee

@francisoliverlee 没有报错确实没有必要打印出来,或者你设计一个关闭的参数吧,不断打印确实不太好

nelsonkti avatar Mar 30 '23 12:03 nelsonkti

@francisoliverlee 没有报错确实没有必要打印出来,或者你设计一个关闭的参数吧,不断打印确实不太好

@nelsonkti 这个是客户端的消费被压机制,需要日志来提醒使用者消费太慢,不打日志的话,用户如何去知道自己消费堆积的原因呢? 简单的关闭可能不是一个好选择,当然如果有更好的办法的话,可以直接通过 issue 描述解决 :-D

cserwen avatar Apr 28 '23 02:04 cserwen