[Bug] PopReviveService may revive incorrect message
Before Creating the Bug Report
-
[X] I found a bug, not just asking a question, which should be created in GitHub Discussions.
-
[X] I have searched the GitHub Issues and GitHub Discussions of this repository and believe that this is not a duplicate.
-
[X] I have confirmed that this bug belongs to the current repository, not other repositories of RocketMQ.
Runtime platform environment
centos7
RocketMQ version
5.1.0
JDK Version
JDK 1.8
Describe the Bug
A message is popped up again after being acknowledged, even appears twice at the same time. No offset-reset or service-reboot.
queryMsgByOffset results:
It can be noticed that broker store two identical messages (same UNIQ_KEY) to pop-retry topic at the same time. And this message has actually been acknowledged before.
After reviewing the codes, we have a suspicion about the reviveMsgFromCk: When revive a message from a PopCheckPoint, the original message should be read out from store firstly. But what the getBizMessage actual does is reading one message starting from the specified queue offset, instead of message of the specified queue offset. In other words, incorrect message may be read out and revived in some scenarios.
(In addition, although the pop-time-check may avoids the above issue to some extent, we should ensure that a message's pop-time will never be less than its store-time.)
Steps to Reproduce
A possible case:
message in queue:
m0(storeTime=t0,commitlogfile0), m1(storeTime=t1,commitlogfile1)
timeline:
t2 -> Pop m0 (donnot ack)
t3 -> commitlogfile0 is expired and deleted
t4 -> invisible timeout and try revive m0, but the actual read and revived messag is m1
(t0<t1<t2<t3<t4)
What Did You Expect to See?
Message will not be received again after being acknowledged.
What Did You See Instead?
Message is received again after being acknowledged.
Additional Context
No response
描述的有点复杂,pop 消费的 ck 到期了,数据被清理,此时 getBizMessage 拿到了 min offset 的消息,不一定满足 skip 的条件,导致 ck 产生 retry。
CompletableFuture<Pair<Long, Boolean>> future = getBizMessage(popCheckPoint.getTopic(), msgOffset, popCheckPoint.getQueueId(), popCheckPoint.getBrokerName())
这里建议加个注释,Boolean 表示消费结果,false 时需要 rePutCk()