rocketmq icon indicating copy to clipboard operation
rocketmq copied to clipboard

[Bug] 分层存储激活时,当向新创建的 Topic 中生产消息,能否正常转储取决于任一批消息生产的时间点是否正确

Open bxfjb opened this issue 1 year ago • 2 comments

Before Creating the Bug Report

  • [X] I found a bug, not just asking a question, which should be created in GitHub Discussions.

  • [X] I have searched the GitHub Issues and GitHub Discussions of this repository and believe that this is not a duplicate.

  • [X] I have confirmed that this bug belongs to the current repository, not other repositories of RocketMQ.

Runtime platform environment

CentOS 7

RocketMQ version

develop

JDK Version

OpenJDK 8

Describe the Bug

表现

  • 分层存储激活时,当向新创建的 Topic 中生产消息,后续消息能否正常转储取决于任一批消息生产的时间点;
  • 日志:
2024-04-24 10:38:03 INFO ReputMessageService - Constructing Posix FileSegment, filePath: /home/work/hdd1/rocketmq/tieredstore/609b5c2c_c5tst-common-rocketmq/c5tst-common-rocketmq-raft1/order-test1/0/COMMIT_LOG/cfcd208400000000000000000000
2024-04-24 10:38:12 INFO MessageStoreDispatcher - Constructing Posix FileSegment, filePath: /home/work/hdd1/rocketmq/tieredstore/609b5c2c_c5tst-common-rocketmq/c5tst-common-rocketmq-raft1/order-test1/0/CONSUME_QUEUE/cfcd208400000000000000000000
2024-04-24 10:38:31 INFO TieredCommonExecutor_1 - Destroy Posix FileSegment, filePath: /home/work/hdd1/rocketmq/tieredstore/609b5c2c_c5tst-common-rocketmq/c5tst-common-rocketmq-raft1/order-test1/0/COMMIT_LOG/cfcd208400000000000000000000
2024-04-24 10:38:31 INFO TieredCommonExecutor_1 - Destroy Posix FileSegment, filePath: /home/work/hdd1/rocketmq/tieredstore/609b5c2c_c5tst-common-rocketmq/c5tst-common-rocketmq-raft1/order-test1/0/CONSUME_QUEUE/cfcd208400000000000000000000
2024-04-24 10:38:31 INFO TieredCommonExecutor_1 - FlatFileStore destroy file, topic=order-test1, queueId=0

2024-04-24 10:40:21 INFO ReputMessageService - Constructing Posix FileSegment, filePath: /home/work/hdd1/rocketmq/tieredstore/609b5c2c_c5tst-common-rocketmq/c5tst-common-rocketmq-raft1/order-test1/0/COMMIT_LOG/cfcd208400000000000000000000
2024-04-24 10:40:31 INFO TieredCommonExecutor_1 - Destroy Posix FileSegment, filePath: /home/work/hdd1/rocketmq/tieredstore/609b5c2c_c5tst-common-rocketmq/c5tst-common-rocketmq-raft1/order-test1/0/COMMIT_LOG/cfcd208400000000000000000000
2024-04-24 10:40:31 INFO TieredCommonExecutor_1 - FlatFileStore destroy file, topic=order-test1, queueId=0

2024-04-24 10:41:40 INFO ReputMessageService - Constructing Posix FileSegment, filePath: /home/work/hdd1/rocketmq/tieredstore/609b5c2c_c5tst-common-rocketmq/c5tst-common-rocketmq-raft1/order-test1/0/COMMIT_LOG/cfcd208400000000000000000000
2024-04-24 10:41:52 INFO MessageStoreDispatcher - Constructing Posix FileSegment, filePath: /home/work/hdd1/rocketmq/tieredstore/609b5c2c_c5tst-common-rocketmq/c5tst-common-rocketmq-raft1/order-test1/0/CONSUME_QUEUE/cfcd208400000000000000000000
2024-04-24 10:42:12 INFO MessageStoreDispatcher - MessageDispatcher#dispatch, topic=order-test1, queueId=0, offset=0-3000, current=0, remain=3000
  • 共生产了三批消息,每批 1000 条
  1. 10:38:03 第一批消息生产到 broker,冷存中创建了 commitlog 文件,稍后 10:38:12 冷存中创建了 consumequeue 文件,但FlatFileStore中对应的的FlatMessageFile在10:38:31被删除,所以转储没有进行;
  2. 10:40:21 第二批消息生产到 broker,冷存中创建了 commitlog 文件,没有创建 consumequeue,FlatMessageFile在10:40:31被删除,转储同样没有进行;
  3. 10:41:40 第三批消息生产到 broker,冷存中创建了 commitlog 文件,稍后 10:41:52 冷存中创建了 consumequeue 文件,并在 10:42:12 进行了正常转储,共 3000 条,没有消息丢失;

分析

经分析,问题抽象如下:

  • 分层存储中存在两个定时任务:dispatch 任务与destroyExpiredFile任务,定时周期分别为 20s 与 60s,也就是说两次destroyExpiredFile之间存在三次dispatch
  • Bug 的具体表现如下时间轴: timeline drawio
  1. 如果冷存中Topic没有写入数据且消息的生产时间在时间段2、3,那么这个Topic的转储不能正常进行;
  2. 如果任意一批消息的生产时间在时间段1,那么包括之前卡住的转储都能够正常进行,也就是说不会丢消息;

dispatch 任务

  • 生产消息时: 在 MessageStoreDispatcherImpl#dispatch()中,初始化了要转储的 MessageQueue 对应的 FlatMessageFile,也包括其中的 commitlog 与 consumequeue,二者的构造函数如下:
public FlatCommitLogFile(FileSegmentFactory fileSegmentFactory, String filePath) {
    super(fileSegmentFactory, FileSegmentType.COMMIT_LOG, filePath);
    this.initOffset(0L);
}

initOffset() 实际上在冷存中创建了文件,实际上是向FlatMessageFile.commitLog.fileSegmentTable 中 add 了一个空的 FileSegment

public FlatConsumeQueueFile(FileSegmentFactory fileSegmentFactory, String filePath) {
    super(fileSegmentFactory, FileSegmentType.CONSUME_QUEUE, filePath);
}

可以看到没有调用initOffset(),也就是说此时consumeQueue.fileSegmentTable 长度为 0,解释了为什么第一次生产消息时只创建了 commitlog 没有创建 consumequeue;

  • consumequeue 的创建在 FlatMessageFile 初始化后的第一次 dispatchWithSemaphore 中:
// If set to max offset here, some written messages may be lost
if (!flatFile.isFlatFileInit()) {  // return !this.consumeQueue.fileSegmentTable.isEmpty();
    currentOffset = Math.max(minOffsetInQueue,
        maxOffsetInQueue - storeConfig.getTieredStoreGroupCommitSize());
    flatFile.initOffset(currentOffset);
    // this.commitLog.initOffset(0L);
    // this.consumeQueue.initOffset(offset * MessageFormatUtil.CONSUME_QUEUE_UNIT_SIZE);
    return CompletableFuture.completedFuture(true);
}

这行代码 flatFile.initOffset(currentOffset); 在冷存中创建了 consumequeue 文件,之后直接返回,等待 20s 后下一次的dispatch进行真正的转储;

destroyExpiredFile任务

  • 清理过期文件逻辑 如下FlatAppendFile#destroyExpiredFile():
public void destroyExpiredFile(long expireTimestamp) {
    fileSegmentLock.writeLock().lock();
    try {
        while (!fileSegmentTable.isEmpty()) {

            // first remove expired file from fileSegmentTable
            // then close and delete expired file
            FileSegment fileSegment = fileSegmentTable.get(0);

            if (fileSegment.getMaxTimestamp() != Long.MAX_VALUE &&
                fileSegment.getMaxTimestamp() > expireTimestamp) {
                log.debug("FileSegment has not expired, filePath={}, fileType={}, " +
                        "offset={}, expireTimestamp={}, maxTimestamp={}", filePath, fileType,
                    fileSegment.getBaseOffset(), expireTimestamp, fileSegment.getMaxTimestamp());
                break;
            }

            fileSegment.destroyFile();
            if (!fileSegment.exists()) {
                fileSegmentTable.remove(0);
                metadataStore.deleteFileSegment(filePath, fileType, fileSegment.getBaseOffset());
            }
        }
    } finally {
        fileSegmentLock.writeLock().unlock();
    }
}

1.主要问题在于 if (fileSegment.getMaxTimestamp() != Long.MAX_VALUE && fileSegment.getMaxTimestamp() > expireTimestamp) 这一行,因为 commitlog 与 consumequeue 在没有写入消息时 maxTimestamp 默认为 Long.MAX_VALUE,(@lizhimins 这里这个判断条件有什么用意吗) 因此FlatMessageFile的初始化时间到下一次清理过期文件之间至少要执行两次dispatch,一次用于生成 consumequeue,一次用于修改 maxTimestamp 为正常值,否则尚未执行转储的 FlatMessageFile 会被清理; 2.但仅修改时间戳的限制这里的话,那么FlatMessageFile的初始化时间到下一次清理过期文件之间仍然至少要执行一次dispatch,用于生成 consumequeue,否则FlatMessageFile会被FlatStore#load()中的以下代码删除:

if (flatFile.consumeQueue.fileSegmentTable.isEmpty()) {
    this.destroyFile(flatFile.getMessageQueue());
}

解决方案

删去 if (fileSegment.getMaxTimestamp() != Long.MAX_VALUE 这一行,并在 FlatMessageFile 初始化时立即创建 commitlog 与 consumequeue文件

Steps to Reproduce

开启分层存储,按照时间轴,在不同的时间点生产消息

What Did You Expect to See?

消息正常转储,FlatMessageFile不被删除

What Did You See Instead?

消息被转储前FlatMessageFile被删除

Additional Context

No response

bxfjb avatar Apr 24 '24 13:04 bxfjb

分析的很具体,依次回复几个问题:

  1. 对于冷存储中的 CommitLog 可以从 0 开始,而 cq 必须与本地位点对齐,只能在 dispatch 时才能确定初始化的位点,因此不会在 factory 中初始化指定,这一点也是为了保证与旧存储格式的兼容。

  2. fileSegment.getMaxTimestamp() != Long.MAX_VALUE 这个条件还有一个原因是我发现之前同学实现的旧代码中有缺陷,没有修改 segment metadata 中的 time,导致整个 flatfile 无法过期删除。

  3. load 里面初始化的 destory 的间隔目前写的是1分钟,可以改为1小时。

lizhimins avatar Apr 26 '24 06:04 lizhimins

分析的很具体,依次回复几个问题:

  1. 对于冷存储中的 CommitLog 可以从 0 开始,而 cq 必须与本地位点对齐,只能在 dispatch 时才能确定初始化的位点,因此不会在 factory 中初始化指定,这一点也是为了保证与旧存储格式的兼容。
  2. fileSegment.getMaxTimestamp() != Long.MAX_VALUE 这个条件还有一个原因是我发现之前同学实现的旧代码中有缺陷,没有修改 segment metadata 中的 time,导致整个 flatfile 无法过期删除。
  3. load 里面初始化的 destory 的间隔目前写的是1分钟,可以改为1小时。

修改 destroy 间隔应该可以有效规避这个问题 另外 2 提到的问题现在还存在吗,如果不存在的话是否可以去掉这个条件,因为按照 dispatch 的逻辑,如果一个 FlatMessageFile 存在,那么意味着应该有消息等待被转移到冷存,删除一个刚刚创建的 FlatMessageFile 似乎不太合理

bxfjb avatar Apr 26 '24 07:04 bxfjb

This issue is stale because it has been open for 365 days with no activity. It will be closed in 3 days if no further activity occurs.

github-actions[bot] avatar Apr 27 '25 00:04 github-actions[bot]

This issue was closed because it has been inactive for 3 days since being marked as stale.

github-actions[bot] avatar May 01 '25 00:05 github-actions[bot]