当前位置: 首页 > news >正文

RocketMQ 存储核心:深入解析 CommitLog 设计原理

一、引言

在分布式消息队列系统中,消息存储的可靠性高吞吐能力是衡量系统优劣的核心指标。Apache RocketMQ 作为一款高性能、高可用的分布式消息中间件,其独特的 CommitLog 存储机制在消息持久化过程中扮演了关键角色。本文将深入剖析 CommitLog 的设计思想、实现原理及优化策略,揭示其如何支撑 RocketMQ 实现每秒数十万级消息处理能力。

二、CommitLog 的核心定位

1. 什么是 CommitLog?

CommitLog 是 RocketMQ 消息存储的核心物理文件,所有 Topic 的消息均以顺序追加(Append-Only)的方式写入同一个 CommitLog 文件。这种设计颠覆了传统“每个 Topic 独立存储”的模式,通过统一的顺序写机制最大化磁盘 I/O 效率。

2. 设计哲学
  • 全局顺序写:所有消息顺序写入,规避磁盘随机 I/O 性能瓶颈。

  • 物理连续存储:消息按到达顺序连续存储,消除碎片化。

  • 逻辑与物理分离:消息的消费队列(ConsumeQueue)和索引文件(IndexFile)作为逻辑视图,与物理存储解耦。

三、CommitLog 的物理结构

 1. 文件组织

  • 文件路径${ROCKET_HOME}/store/commitlog/

  • 文件命名:以文件初始偏移量命名(如 00000000000000000000

  • 固定大小:默认 1GB(可配置),写满后创建新文件

2. 消息存储格式

每条消息在 CommitLog 中的存储结构如下

字段长度(字节)说明
Total Size4消息总长度
Magic Code4固定值 0xAABBCCDD
Body CRC4消息体 CRC 校验码
Queue ID4消息所属队列 ID
Flag4消息标记(事务/压缩等)
Queue Offset8在 ConsumeQueue 的物理偏移
Physical Offset8在 CommitLog 的物理偏移
SysFlag4系统标记(压缩/事务等)
Born Timestamp8消息生成时间戳
Born Host8生产者地址
Store Timestamp8存储时间戳
Store Host8Broker 地址
Reconsume Times4重试次数
Prepared Transaction Offset8事务相关偏移量
Body Length4消息体长度
BodyBody Length消息体内容
Topic Length1Topic 名称长度
TopicTopic LengthTopic 名称
Properties Length2属性字段长度
Properties变长扩展属性(Key-Value) 

四、commitLog方法属性介绍

主要属性:

/*** 映射文件的队列*/protected final MappedFileQueue mappedFileQueue;/*** 所属的消息存储组件*/protected final DefaultMessageStore defaultMessageStore;/*** 刷新CommitLog服务的组件*/private final FlushCommitLogService flushCommitLogService;/*** 缓冲区  刷新到CommitLog服务*///If TransientStorePool enabled, we must flush message to FileChannel at fixed periods//如果开启了TransientStorePool 我们必须有一个固定的频率刷新消息到FileChannelprivate final FlushCommitLogService commitLogService;/*** 追加消息的回调函数*/private final AppendMessageCallback appendMessageCallback;/*** 写入消息的线程本地副本*/private final ThreadLocal<PutMessageThreadLocal> putMessageThreadLocal;/*** topic-queueid -> offset*/protected HashMap<String/* topic-queueid */, Long/* offset */> topicQueueTable = new HashMap<String, Long>(1024);/*** topic-queueid -> offset*/protected Map<String/* topic-queueid */, Long/* offset */> lmqTopicQueueTable = new ConcurrentHashMap<>(1024);/*** 确认偏移量*/protected volatile long confirmOffset = -1L;/*** 锁开始的时间*/private volatile long beginTimeInLock = 0;/*** 写入消息的锁*/protected final PutMessageLock putMessageLock;/*** 完成的存储路径*/private volatile Set<String> fullStorePaths = Collections.emptySet();/*** 多路分发的组件*/protected final MultiDispatch multiDispatch;/*** 刷新磁盘的监控组件*/private final FlushDiskWatcher flushDiskWatcher;

主要方法:

1.构造函数

public CommitLog(final DefaultMessageStore defaultMessageStore) {//commitLog的存储路径String storePath = defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog();//是否包含一个切割的符号 存储路径里如果包含了一个切割符号 逗号 那么就走这个多个路径的MappedFile队列//正常情况下走的是普通的MappedFile队列if (storePath.contains(MessageStoreConfig.MULTI_PATH_SPLITTER)) {this.mappedFileQueue = new MultiPathMappedFileQueue(defaultMessageStore.getMessageStoreConfig(),defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(),defaultMessageStore.getAllocateMappedFileService(), this::getFullStorePaths);} else {this.mappedFileQueue = new MappedFileQueue(storePath, //存储路径defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(), //mappedFileSize 文件的大小 默认是1GdefaultMessageStore.getAllocateMappedFileService()); //分配文件的服务}this.defaultMessageStore = defaultMessageStore;//如果是走同步刷盘的策略的话 就是GroupCommitService 如果是异步刷盘就是FlushRealTimeServiceif (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {this.flushCommitLogService = new GroupCommitService();} else {this.flushCommitLogService = new FlushRealTimeService();}//this.commitLogService = new CommitRealTimeService();this.appendMessageCallback = new DefaultAppendMessageCallback();putMessageThreadLocal = new ThreadLocal<PutMessageThreadLocal>() {@Overrideprotected PutMessageThreadLocal initialValue() {return new PutMessageThreadLocal(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());}};//写入消息的时候是否默认使用可重入的锁 默认是truethis.putMessageLock = defaultMessageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage() ? new PutMessageReentrantLock() : new PutMessageSpinLock();this.multiDispatch = new MultiDispatch(defaultMessageStore, this);flushDiskWatcher = new FlushDiskWatcher();}

2. start方法

    //对commitLog组件进行启用public void start() {// 启动刷盘服务this.flushCommitLogService.start();//设置刷新磁盘的监控组件flushDiskWatcher.setDaemon(true);flushDiskWatcher.start();//如果启动了瞬时池化技术if (defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {this.commitLogService.start();}}

3.recoverNormally方法

可以参考之前的博客:

https://blog.csdn.net/u013127325/article/details/147397018

4.asyncPutMessage方法

可以参考之前的博客:

RocketMQ CommitLog异步写入机制深度解析:从asyncPutMessage看亿级消息吞吐设计-CSDN博客

 

相关文章:

  • UARA串口开发基础
  • PCB硬件电路设计_pcb布线设计
  • SpringAI集成本地部署DeepSeek大模型服务(Ollama)
  • Android开发,实现一个简约又好看的登录页
  • 深入理解java线程池
  • [RoarCTF 2019]Easy Calc 详解
  • 空洞/膨胀卷积
  • clangd-vscode配置
  • 网络安全之红队LLM的大模型自动化越狱
  • LinuxAgent开源程序是一款智能运维助手,通过接入 DeepSeek API 实现对 Linux 终端的自然语言控制,帮助用户更高效地进行系统运维工作
  • 遗传算法实现单货架库位优化
  • 在Linux系统中安装Anaconda的完整指南
  • vue3代码规范管理;基于vite和vue3、 eslint、prettier、stylelint、husky规范;git触发eslint校验
  • JavaWeb:vueaxios
  • 光触发RFID:破解物流、电力、资产管理三大领域的“不可能三角”
  • 基于 Python 的实现:居民用电量数据分析与可视化
  • 基于SpringBoot的食物营养分析与推荐网站系统
  • 22.晶振的信号与布局布线处理
  • 安卓基础(泛型)
  • 跨语言哈希一致性:C# 与 Java 的 MD5 之战?
  • “天链”继续上新!长三乙火箭成功发射天链二号05星
  • 加拿大警方:已确认有9人在温哥华驾车撞人事件中遇难
  • 王一博赛车故障退赛冲上热搜,工作室回应:下次再战
  • 马上评丨发钱奖励结婚,支持婚育就该系统性发力
  • 清华数字政府与治理研究院揭牌:服务数字政府建设需求
  • 哈马斯同意释放剩余所有以方被扣押人员,以换取停火五年