[Bug] 分层存储激活时,当向新创建的 Topic 中生产消息,能否正常转储取决于任一批消息生产的时间点是否正确
Before Creating the Bug Report
-
[X] I found a bug, not just asking a question, which should be created in GitHub Discussions.
-
[X] I have searched the GitHub Issues and GitHub Discussions of this repository and believe that this is not a duplicate.
-
[X] I have confirmed that this bug belongs to the current repository, not other repositories of RocketMQ.
Runtime platform environment
CentOS 7
RocketMQ version
develop
JDK Version
OpenJDK 8
Describe the Bug
表现
- 分层存储激活时,当向新创建的 Topic 中生产消息,后续消息能否正常转储取决于任一批消息生产的时间点;
- 日志:
2024-04-24 10:38:03 INFO ReputMessageService - Constructing Posix FileSegment, filePath: /home/work/hdd1/rocketmq/tieredstore/609b5c2c_c5tst-common-rocketmq/c5tst-common-rocketmq-raft1/order-test1/0/COMMIT_LOG/cfcd208400000000000000000000
2024-04-24 10:38:12 INFO MessageStoreDispatcher - Constructing Posix FileSegment, filePath: /home/work/hdd1/rocketmq/tieredstore/609b5c2c_c5tst-common-rocketmq/c5tst-common-rocketmq-raft1/order-test1/0/CONSUME_QUEUE/cfcd208400000000000000000000
2024-04-24 10:38:31 INFO TieredCommonExecutor_1 - Destroy Posix FileSegment, filePath: /home/work/hdd1/rocketmq/tieredstore/609b5c2c_c5tst-common-rocketmq/c5tst-common-rocketmq-raft1/order-test1/0/COMMIT_LOG/cfcd208400000000000000000000
2024-04-24 10:38:31 INFO TieredCommonExecutor_1 - Destroy Posix FileSegment, filePath: /home/work/hdd1/rocketmq/tieredstore/609b5c2c_c5tst-common-rocketmq/c5tst-common-rocketmq-raft1/order-test1/0/CONSUME_QUEUE/cfcd208400000000000000000000
2024-04-24 10:38:31 INFO TieredCommonExecutor_1 - FlatFileStore destroy file, topic=order-test1, queueId=0
2024-04-24 10:40:21 INFO ReputMessageService - Constructing Posix FileSegment, filePath: /home/work/hdd1/rocketmq/tieredstore/609b5c2c_c5tst-common-rocketmq/c5tst-common-rocketmq-raft1/order-test1/0/COMMIT_LOG/cfcd208400000000000000000000
2024-04-24 10:40:31 INFO TieredCommonExecutor_1 - Destroy Posix FileSegment, filePath: /home/work/hdd1/rocketmq/tieredstore/609b5c2c_c5tst-common-rocketmq/c5tst-common-rocketmq-raft1/order-test1/0/COMMIT_LOG/cfcd208400000000000000000000
2024-04-24 10:40:31 INFO TieredCommonExecutor_1 - FlatFileStore destroy file, topic=order-test1, queueId=0
2024-04-24 10:41:40 INFO ReputMessageService - Constructing Posix FileSegment, filePath: /home/work/hdd1/rocketmq/tieredstore/609b5c2c_c5tst-common-rocketmq/c5tst-common-rocketmq-raft1/order-test1/0/COMMIT_LOG/cfcd208400000000000000000000
2024-04-24 10:41:52 INFO MessageStoreDispatcher - Constructing Posix FileSegment, filePath: /home/work/hdd1/rocketmq/tieredstore/609b5c2c_c5tst-common-rocketmq/c5tst-common-rocketmq-raft1/order-test1/0/CONSUME_QUEUE/cfcd208400000000000000000000
2024-04-24 10:42:12 INFO MessageStoreDispatcher - MessageDispatcher#dispatch, topic=order-test1, queueId=0, offset=0-3000, current=0, remain=3000
- 共生产了三批消息,每批 1000 条
- 10:38:03 第一批消息生产到 broker,冷存中创建了 commitlog 文件,稍后 10:38:12 冷存中创建了 consumequeue 文件,但
FlatFileStore中对应的的FlatMessageFile在10:38:31被删除,所以转储没有进行; - 10:40:21 第二批消息生产到 broker,冷存中创建了 commitlog 文件,没有创建 consumequeue,
FlatMessageFile在10:40:31被删除,转储同样没有进行; - 10:41:40 第三批消息生产到 broker,冷存中创建了 commitlog 文件,稍后 10:41:52 冷存中创建了 consumequeue 文件,并在 10:42:12 进行了正常转储,共 3000 条,没有消息丢失;
分析
经分析,问题抽象如下:
- 分层存储中存在两个定时任务:
dispatch任务与destroyExpiredFile任务,定时周期分别为 20s 与 60s,也就是说两次destroyExpiredFile之间存在三次dispatch; - Bug 的具体表现如下时间轴:
- 如果冷存中Topic没有写入数据且消息的生产时间在时间段2、3,那么这个Topic的转储不能正常进行;
- 如果任意一批消息的生产时间在时间段1,那么包括之前卡住的转储都能够正常进行,也就是说不会丢消息;
dispatch 任务
- 生产消息时:
在
MessageStoreDispatcherImpl#dispatch()中,初始化了要转储的MessageQueue对应的FlatMessageFile,也包括其中的 commitlog 与 consumequeue,二者的构造函数如下:
public FlatCommitLogFile(FileSegmentFactory fileSegmentFactory, String filePath) {
super(fileSegmentFactory, FileSegmentType.COMMIT_LOG, filePath);
this.initOffset(0L);
}
initOffset() 实际上在冷存中创建了文件,实际上是向FlatMessageFile.commitLog.fileSegmentTable 中 add 了一个空的 FileSegment;
public FlatConsumeQueueFile(FileSegmentFactory fileSegmentFactory, String filePath) {
super(fileSegmentFactory, FileSegmentType.CONSUME_QUEUE, filePath);
}
可以看到没有调用initOffset(),也就是说此时consumeQueue.fileSegmentTable 长度为 0,解释了为什么第一次生产消息时只创建了 commitlog 没有创建 consumequeue;
- consumequeue 的创建在
FlatMessageFile初始化后的第一次dispatchWithSemaphore中:
// If set to max offset here, some written messages may be lost
if (!flatFile.isFlatFileInit()) { // return !this.consumeQueue.fileSegmentTable.isEmpty();
currentOffset = Math.max(minOffsetInQueue,
maxOffsetInQueue - storeConfig.getTieredStoreGroupCommitSize());
flatFile.initOffset(currentOffset);
// this.commitLog.initOffset(0L);
// this.consumeQueue.initOffset(offset * MessageFormatUtil.CONSUME_QUEUE_UNIT_SIZE);
return CompletableFuture.completedFuture(true);
}
这行代码 flatFile.initOffset(currentOffset); 在冷存中创建了 consumequeue 文件,之后直接返回,等待 20s 后下一次的dispatch进行真正的转储;
destroyExpiredFile任务
- 清理过期文件逻辑
如下
FlatAppendFile#destroyExpiredFile():
public void destroyExpiredFile(long expireTimestamp) {
fileSegmentLock.writeLock().lock();
try {
while (!fileSegmentTable.isEmpty()) {
// first remove expired file from fileSegmentTable
// then close and delete expired file
FileSegment fileSegment = fileSegmentTable.get(0);
if (fileSegment.getMaxTimestamp() != Long.MAX_VALUE &&
fileSegment.getMaxTimestamp() > expireTimestamp) {
log.debug("FileSegment has not expired, filePath={}, fileType={}, " +
"offset={}, expireTimestamp={}, maxTimestamp={}", filePath, fileType,
fileSegment.getBaseOffset(), expireTimestamp, fileSegment.getMaxTimestamp());
break;
}
fileSegment.destroyFile();
if (!fileSegment.exists()) {
fileSegmentTable.remove(0);
metadataStore.deleteFileSegment(filePath, fileType, fileSegment.getBaseOffset());
}
}
} finally {
fileSegmentLock.writeLock().unlock();
}
}
1.主要问题在于 if (fileSegment.getMaxTimestamp() != Long.MAX_VALUE && fileSegment.getMaxTimestamp() > expireTimestamp) 这一行,因为 commitlog 与 consumequeue 在没有写入消息时 maxTimestamp 默认为 Long.MAX_VALUE,(@lizhimins 这里这个判断条件有什么用意吗) 因此FlatMessageFile的初始化时间到下一次清理过期文件之间至少要执行两次dispatch,一次用于生成 consumequeue,一次用于修改 maxTimestamp 为正常值,否则尚未执行转储的 FlatMessageFile 会被清理;
2.但仅修改时间戳的限制这里的话,那么FlatMessageFile的初始化时间到下一次清理过期文件之间仍然至少要执行一次dispatch,用于生成 consumequeue,否则FlatMessageFile会被FlatStore#load()中的以下代码删除:
if (flatFile.consumeQueue.fileSegmentTable.isEmpty()) {
this.destroyFile(flatFile.getMessageQueue());
}
解决方案
删去 if (fileSegment.getMaxTimestamp() != Long.MAX_VALUE 这一行,并在 FlatMessageFile 初始化时立即创建 commitlog 与 consumequeue文件
Steps to Reproduce
开启分层存储,按照时间轴,在不同的时间点生产消息
What Did You Expect to See?
消息正常转储,FlatMessageFile不被删除
What Did You See Instead?
消息被转储前FlatMessageFile被删除
Additional Context
No response
分析的很具体,依次回复几个问题:
-
对于冷存储中的 CommitLog 可以从 0 开始,而 cq 必须与本地位点对齐,只能在 dispatch 时才能确定初始化的位点,因此不会在 factory 中初始化指定,这一点也是为了保证与旧存储格式的兼容。
-
fileSegment.getMaxTimestamp() != Long.MAX_VALUE 这个条件还有一个原因是我发现之前同学实现的旧代码中有缺陷,没有修改 segment metadata 中的 time,导致整个 flatfile 无法过期删除。
-
load 里面初始化的 destory 的间隔目前写的是1分钟,可以改为1小时。
分析的很具体,依次回复几个问题:
- 对于冷存储中的 CommitLog 可以从 0 开始,而 cq 必须与本地位点对齐,只能在 dispatch 时才能确定初始化的位点,因此不会在 factory 中初始化指定,这一点也是为了保证与旧存储格式的兼容。
- fileSegment.getMaxTimestamp() != Long.MAX_VALUE 这个条件还有一个原因是我发现之前同学实现的旧代码中有缺陷,没有修改 segment metadata 中的 time,导致整个 flatfile 无法过期删除。
- load 里面初始化的 destory 的间隔目前写的是1分钟,可以改为1小时。
修改 destroy 间隔应该可以有效规避这个问题 另外 2 提到的问题现在还存在吗,如果不存在的话是否可以去掉这个条件,因为按照 dispatch 的逻辑,如果一个 FlatMessageFile 存在,那么意味着应该有消息等待被转移到冷存,删除一个刚刚创建的 FlatMessageFile 似乎不太合理
This issue is stale because it has been open for 365 days with no activity. It will be closed in 3 days if no further activity occurs.
This issue was closed because it has been inactive for 3 days since being marked as stale.