RocketMQ 存储核心:深入解析 CommitLog 设计原理
一、引言
在分布式消息队列系统中,消息存储的可靠性和高吞吐能力是衡量系统优劣的核心指标。Apache RocketMQ 作为一款高性能、高可用的分布式消息中间件,其独特的 CommitLog 存储机制在消息持久化过程中扮演了关键角色。本文将深入剖析 CommitLog 的设计思想、实现原理及优化策略,揭示其如何支撑 RocketMQ 实现每秒数十万级消息处理能力。
二、CommitLog 的核心定位
1. 什么是 CommitLog?
CommitLog 是 RocketMQ 消息存储的核心物理文件,所有 Topic 的消息均以顺序追加(Append-Only)的方式写入同一个 CommitLog 文件。这种设计颠覆了传统“每个 Topic 独立存储”的模式,通过统一的顺序写机制最大化磁盘 I/O 效率。
2. 设计哲学
-
全局顺序写:所有消息顺序写入,规避磁盘随机 I/O 性能瓶颈。
-
物理连续存储:消息按到达顺序连续存储,消除碎片化。
-
逻辑与物理分离:消息的消费队列(ConsumeQueue)和索引文件(IndexFile)作为逻辑视图,与物理存储解耦。
三、CommitLog 的物理结构
1. 文件组织
-
文件路径:
${ROCKET_HOME}/store/commitlog/
-
文件命名:以文件初始偏移量命名(如
00000000000000000000
) -
固定大小:默认 1GB(可配置),写满后创建新文件
2. 消息存储格式
每条消息在 CommitLog 中的存储结构如下
字段 | 长度(字节) | 说明 |
---|---|---|
Total Size | 4 | 消息总长度 |
Magic Code | 4 | 固定值 0xAABBCCDD |
Body CRC | 4 | 消息体 CRC 校验码 |
Queue ID | 4 | 消息所属队列 ID |
Flag | 4 | 消息标记(事务/压缩等) |
Queue Offset | 8 | 在 ConsumeQueue 的物理偏移 |
Physical Offset | 8 | 在 CommitLog 的物理偏移 |
SysFlag | 4 | 系统标记(压缩/事务等) |
Born Timestamp | 8 | 消息生成时间戳 |
Born Host | 8 | 生产者地址 |
Store Timestamp | 8 | 存储时间戳 |
Store Host | 8 | Broker 地址 |
Reconsume Times | 4 | 重试次数 |
Prepared Transaction Offset | 8 | 事务相关偏移量 |
Body Length | 4 | 消息体长度 |
Body | Body Length | 消息体内容 |
Topic Length | 1 | Topic 名称长度 |
Topic | Topic Length | Topic 名称 |
Properties Length | 2 | 属性字段长度 |
Properties | 变长 | 扩展属性(Key-Value) |
四、commitLog方法属性介绍
主要属性:
/*** 映射文件的队列*/protected final MappedFileQueue mappedFileQueue;/*** 所属的消息存储组件*/protected final DefaultMessageStore defaultMessageStore;/*** 刷新CommitLog服务的组件*/private final FlushCommitLogService flushCommitLogService;/*** 缓冲区 刷新到CommitLog服务*///If TransientStorePool enabled, we must flush message to FileChannel at fixed periods//如果开启了TransientStorePool 我们必须有一个固定的频率刷新消息到FileChannelprivate final FlushCommitLogService commitLogService;/*** 追加消息的回调函数*/private final AppendMessageCallback appendMessageCallback;/*** 写入消息的线程本地副本*/private final ThreadLocal<PutMessageThreadLocal> putMessageThreadLocal;/*** topic-queueid -> offset*/protected HashMap<String/* topic-queueid */, Long/* offset */> topicQueueTable = new HashMap<String, Long>(1024);/*** topic-queueid -> offset*/protected Map<String/* topic-queueid */, Long/* offset */> lmqTopicQueueTable = new ConcurrentHashMap<>(1024);/*** 确认偏移量*/protected volatile long confirmOffset = -1L;/*** 锁开始的时间*/private volatile long beginTimeInLock = 0;/*** 写入消息的锁*/protected final PutMessageLock putMessageLock;/*** 完成的存储路径*/private volatile Set<String> fullStorePaths = Collections.emptySet();/*** 多路分发的组件*/protected final MultiDispatch multiDispatch;/*** 刷新磁盘的监控组件*/private final FlushDiskWatcher flushDiskWatcher;
主要方法:
1.构造函数
public CommitLog(final DefaultMessageStore defaultMessageStore) {//commitLog的存储路径String storePath = defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog();//是否包含一个切割的符号 存储路径里如果包含了一个切割符号 逗号 那么就走这个多个路径的MappedFile队列//正常情况下走的是普通的MappedFile队列if (storePath.contains(MessageStoreConfig.MULTI_PATH_SPLITTER)) {this.mappedFileQueue = new MultiPathMappedFileQueue(defaultMessageStore.getMessageStoreConfig(),defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(),defaultMessageStore.getAllocateMappedFileService(), this::getFullStorePaths);} else {this.mappedFileQueue = new MappedFileQueue(storePath, //存储路径defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(), //mappedFileSize 文件的大小 默认是1GdefaultMessageStore.getAllocateMappedFileService()); //分配文件的服务}this.defaultMessageStore = defaultMessageStore;//如果是走同步刷盘的策略的话 就是GroupCommitService 如果是异步刷盘就是FlushRealTimeServiceif (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {this.flushCommitLogService = new GroupCommitService();} else {this.flushCommitLogService = new FlushRealTimeService();}//this.commitLogService = new CommitRealTimeService();this.appendMessageCallback = new DefaultAppendMessageCallback();putMessageThreadLocal = new ThreadLocal<PutMessageThreadLocal>() {@Overrideprotected PutMessageThreadLocal initialValue() {return new PutMessageThreadLocal(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());}};//写入消息的时候是否默认使用可重入的锁 默认是truethis.putMessageLock = defaultMessageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage() ? new PutMessageReentrantLock() : new PutMessageSpinLock();this.multiDispatch = new MultiDispatch(defaultMessageStore, this);flushDiskWatcher = new FlushDiskWatcher();}
2. start方法
//对commitLog组件进行启用public void start() {// 启动刷盘服务this.flushCommitLogService.start();//设置刷新磁盘的监控组件flushDiskWatcher.setDaemon(true);flushDiskWatcher.start();//如果启动了瞬时池化技术if (defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {this.commitLogService.start();}}
3.recoverNormally方法
可以参考之前的博客:
https://blog.csdn.net/u013127325/article/details/147397018
4.asyncPutMessage方法
可以参考之前的博客:
RocketMQ CommitLog异步写入机制深度解析:从asyncPutMessage看亿级消息吞吐设计-CSDN博客