当前位置: 首页 > news >正文

启动你的RocketMQ之旅(六)-Broker详细——主从复制

前言
👏作者简介:我是笑霸final。
📝个人主页: 笑霸final的主页2
📕系列专栏:java专栏
📧如果文章知识点有错误的地方,请指正!和大家一起学习,一起进步👀
🔥如果感觉博主的文章还不错的话,👍点赞👍 + 👀关注👀 + 🤏收藏🤏

上一章节:启动你的RocketMQ之旅(五)-Broker详细——消息传输

目录

  • 概述
  • 主节点发送消息方法WriteSocketService
    • 主向从发心跳包的格式
    • 从节点接收数据
    • 主节点接收从数据ReadSocketService

概述

上一节介绍了rocketmq的消息通信原理,这一节主要介绍它的高可用机制-主从复制。
RocketMQ 的主从复制(Master-Slave Replication)机制是其高可用性架构的重要组成部分,主要用于提高系统的可靠性和数据的安全性。通过主从架构,RocketMQ 能够在主节点(Master)发生故障时,迅速切换到从节点(Slave),以保证消息服务的连续性。

主从架构概述
在 RocketMQ 中,Broker 分为 Master 和 Slave 两种角色。一个 Master 可以对应多个 Slave,但每个 Slave 只能属于一个 Master。Master 节点负责处理客户端的读写请求,而 Slave 节点则主要负责数据备份和提供只读服务(可选配置)。这种设计确保了即使 Master 发生故障,系统也能快速恢复服务,并且不会丢失数据。

数据同步方式
RocketMQ 支持两种类型的数据同步方式:

  • 同步双写(Sync):在这种模式下,Producer 发送的消息会同时写入 Master 和 Slave。只有当消息成功写入 Master 和所有关联的 Slave 后,才会向 Producer 返回确认响应。【供了更高的数据一致性,但也带来了较高的延迟
  • 异步复制(Async):Producer 只需将消息发送给 Master 并收到确认即可。随后,Master 会异步地将消息同步给 Slave。【消息写入速度快,但存在一定的数据丢失风险】

源码图片

这三个类就是关于Ha服务的 HAservice的构造方法和HAConnection的构造方法在这里插入图片描述
在这里插入图片描述在这里插入图片描述在这里插入图片描述

这上面两个构造方法内 都还有4个新的类
● WriteSocketService通常负责处理向网络通道写入数据的任务,也就是将数据发送到远程节点。
● ReadSocketService则专注于从网络通道读取数据,接收来自远程节点的信息。
● groupTransferService:负责将主节点写入的消息组按照一定的策略分发到从节点,确保消息在集群间的同步。
● HAClient通常用来实现主从节点间的通信和协调,包括但不限于心跳检测、状态同步、数据复制等操作,确保系统的高可用性和数据一致性。

在这里插入图片描述

主节点发送消息方法WriteSocketService

public void run() {HAConnection.log.info(this.getServiceName() + " service started");while (!this.isStopped()) {try {// 使用选择器(Selector)等待1秒钟,检测是否有网络事件(如连接请求或数据接收)this.selector.select(1000);if (-1 == HAConnection.this.slaveRequestOffset) {//如果从节点请求的偏移量尚未初始化(值为-1) 睡眠10ms 继续下一轮循环Thread.sleep(10);continue;}// 如果从节点请求的偏移量尚未初始化(值为-1)if (-1 == this.nextTransferFromWhere) {// 从阶段请求偏移量为0的情况if (0 == HAConnection.this.slaveRequestOffset) {//获得主节点最大偏移量(最后一个commitLog)long masterOffset = HAConnection.this.haService.getDefaultMessageStore().getCommitLog().getMaxOffset();// 对齐到CommitLog映射文件大小的整数倍,以便进行数据传输// 对齐到单个文件的起始偏移量masterOffset =masterOffset- (masterOffset % HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getMappedFileSizeCommitLog());if (masterOffset < 0) {masterOffset = 0;}// 设置从哪里开始向从节点传输数据this.nextTransferFromWhere = masterOffset;} else {this.nextTransferFromWhere = HAConnection.this.slaveRequestOffset;}log.info("master transfer data from " + this.nextTransferFromWhere + " to slave[" + HAConnection.this.clientAddr+ "], and slave request " + HAConnection.this.slaveRequestOffset);}/*** 这段代码的核心逻辑是检查是否到了应该发送心跳包的时间(根据lastWriteTimestamp与当前时间差判断),* 如果是,则构建心跳包并尝试发送;如果不是心跳发送时间,则尝试发送实际数据。* 不论发送的是心跳包还是实际数据,都会根据transferData方法的返回结果决定是否继续循环等待下一次发送机会。*///如果最后一次写入已经完成if (this.lastWriteOver) {// 计算当前系统时间与上一次写入完成的时间戳之间的间隔(毫秒)long interval =HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastWriteTimestamp;// 间隔时间超过了发送心跳最大时间 5sif (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaSendHeartbeatInterval()) {// Build Header  构建心跳包头信息this.byteBufferHeader.position(0);this.byteBufferHeader.limit(headerSize);this.byteBufferHeader.putLong(this.nextTransferFromWhere);this.byteBufferHeader.putInt(0);this.byteBufferHeader.flip();//发送心跳包this.lastWriteOver = this.transferData();if (!this.lastWriteOver)continue;}} else {//发送实际数据this.lastWriteOver = this.transferData();if (!this.lastWriteOver)continue;}//从commitLog中读取数据SelectMappedBufferResult selectResult =HAConnection.this.haService.getDefaultMessageStore().getCommitLogData(this.nextTransferFromWhere);if (selectResult != null) {int size = selectResult.getSize();if (size > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize()) {size = HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize();}//发送下一个偏移量long thisOffset = this.nextTransferFromWhere;this.nextTransferFromWhere += size;// 缓冲区从新设置为size大小selectResult.getByteBuffer().limit(size);this.selectMappedBufferResult = selectResult;// Build Headerthis.byteBufferHeader.position(0);this.byteBufferHeader.limit(headerSize);this.byteBufferHeader.putLong(thisOffset);this.byteBufferHeader.putInt(size);this.byteBufferHeader.flip();// 发送数据this.lastWriteOver = this.transferData();} else {// 这行代码是在等待特定条件满足或者等待一段时间后继续执行HAConnection.this.haService.getWaitNotifyObject().allWaitForRunning(100);}} catch (Exception e) {HAConnection.log.error(this.getServiceName() + " service has exception.", e);break;}}// 关闭服务//这一行代码从等待线程表中移除当前线程。在高可用(HA)服务中,可能存在一个线程等待表,记录着等待特定条件满足的线程。// 执行此行代码意味着当前线程完成了它的等待任务,不再需要留在等待线程表中。HAConnection.this.haService.getWaitNotifyObject().removeFromWaitingThreadTable();if (this.selectMappedBufferResult != null) {this.selectMappedBufferResult.release();//释放缓存区}this.makeStop();readSocketService.makeStop();haService.removeConnection(HAConnection.this);SelectionKey sk = this.socketChannel.keyFor(this.selector);if (sk != null) {sk.cancel();}try {this.selector.close();this.socketChannel.close();} catch (IOException e) {HAConnection.log.error("", e);}HAConnection.log.info(this.getServiceName() + " service end");}

上面方法大致流程(循环里):
● 获得主节点最后一个commitLog文件偏移量。
● 查是否到了应该发送心跳包的时间
● 如果是,则构建心跳包并尝试发送;如果不是心跳发送时间,则尝试发送实际数据。
● 不论发送的是心跳包还是实际数据,都会根据transferData方法的返回结果决定是否继续循环等待下一次发送机会。

进入transferData()方法

    private boolean transferData() throws Exception {int writeSizeZeroTimes = 0;// Write Header 写头 如果byteBufferHeader还有空间/*** 当连续三次发送的数据为0则表示数据发送完毕*/while (this.byteBufferHeader.hasRemaining()) {// 返回这次实际发出的数据int writeSize = this.socketChannel.write(this.byteBufferHeader);if (writeSize > 0) {writeSizeZeroTimes = 0;this.lastWriteTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();} else if (writeSize == 0) {if (++writeSizeZeroTimes >= 3) {break;}} else {throw new Exception("ha master write header error < 0");}}if (null == this.selectMappedBufferResult) {return !this.byteBufferHeader.hasRemaining();}writeSizeZeroTimes = 0;// Write Bodyif (!this.byteBufferHeader.hasRemaining()) {// commitLog中的数据 也是如果连续3次发送的数据为0则认为发送完毕while (this.selectMappedBufferResult.getByteBuffer().hasRemaining()) {int writeSize = this.socketChannel.write(this.selectMappedBufferResult.getByteBuffer());if (writeSize > 0) {writeSizeZeroTimes = 0;this.lastWriteTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();} else if (writeSize == 0) {if (++writeSizeZeroTimes >= 3) {break;}} else {throw new Exception("ha master write body error < 0");}}}boolean result = !this.byteBufferHeader.hasRemaining() && !this.selectMappedBufferResult.getByteBuffer().hasRemaining();if (!this.selectMappedBufferResult.getByteBuffer().hasRemaining()) {this.selectMappedBufferResult.release();this.selectMappedBufferResult = null;}return result;}

● 通过循环发送数据,确保数据完整传输,当writeSize大于0时,表示有数据成功发送,当writeSize等于0时,累计次数,连续三次等于0则认为发送完毕。这种方法有效地处理了网络不稳定或阻塞等情况,提高了数据传输的鲁棒性(稳定性:我也不知道为啥当初要音译)。
● 整个方法确保了数据的原子性发送,只有当头和正文均发送完毕时,才返回true,表示传输完成,从而确保了主从节点之间数据的一致性。

主向从发心跳包的格式

在这里插入图片描述在这里插入图片描述
心跳包 大小12字节包含:
● 8字节的 写入偏移量
● 4字节的 消息大小

从节点接收数据

HAClient的run()的方法在这里插入图片描述
分析while循环代码,下面processReadEvent就是接收方法在这里插入图片描述
在这里插入图片描述
dispatchReadRequest就是保存消息的方法在这里插入图片描述

主节点接收从数据ReadSocketService

ReadSocketService.run()方法中processReadEvent是具体接收的方法,在processReadEvent里面whie循环里面是核心过程在这里插入图片描述

 while (this.byteBufferRead.hasRemaining()) {try {// 从Channel里面读数据到 byteBufferReadint readSize = this.socketChannel.read(this.byteBufferRead);if (readSize > 0) {readSizeZeroTimes = 0;this.lastReadTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();// 因为偏移量信息通常是8字节if ((this.byteBufferRead.position() - this.processPosition) >= 8) {// 确保8字节对齐 8int pos = this.byteBufferRead.position() - (this.byteBufferRead.position() % 8);//读取一个长整型的偏移量 (8位)long readOffset = this.byteBufferRead.getLong(pos - 8);this.processPosition = pos;// ack 表示主节点已知从节点已经处理到的位置HAConnection.this.slaveAckOffset = readOffset;if (HAConnection.this.slaveRequestOffset < 0) {// 检查slaveRequestOffset是否小于0,如果是,则说明这是从节点首次反馈其处理位置,// 将readOffset赋值给slaveRequestOffset并记录日志。HAConnection.this.slaveRequestOffset = readOffset;log.info("slave[" + HAConnection.this.clientAddr + "] request offset " + readOffset);}// 通知主节点已经收到了从节点的ACK,可以依据slaveAckOffset来决定下一轮数据同步的起始位置。HAConnection.this.haService.notifyTransferSome(HAConnection.this.slaveAckOffset);}} else if (readSize == 0) {// 连续三次读到的数据为0 则认为消息读取完毕跳出循环if (++readSizeZeroTimes >= 3) {break;}} else {log.error("read socket[" + HAConnection.this.clientAddr + "] < 0");return false;}} catch (IOException e) {log.error("processReadEvent exception", e);return false;}}

这段代码是RocketMQ 主从复制(Master-Slave Replication)中 主节点处理从节点反馈的逻辑 的核心实现。它负责读取从节点发送过来的 ACK 消息,并更新主节点对从节点同步状态的认知

总结

  • ACK 数据格式: 从节点发送的 ACK 数据是一个长整型(8 字节),表示从节点已处理的消息偏移量。
  • 对齐处理: 在读取偏移量时,确保数据对齐到 8 字节边界,避免读取不完整或错误的数据。
  • 状态更新: 主节点会根据从节点发送的偏移量更新 slaveAckOffset 和 slaveRequestOffset,以便后续数据同步。
  • 通知机制: 主节点通过 notifyTransferSome 方法通知自己,可以依据从节点的 ACK 偏移量决定下一轮同步的起始位置。
  • 异常处理: 对各种异常情况进行处理(如读取失败、IO 异常等),确保系统的健壮性。

相关文章:

  • 如何在 PowerShell 脚本中调用外部 Windows 命令
  • TypeScript基础数据类型详解
  • [论文解析]Mip-Splatting: Alias-free 3D Gaussian Splatting
  • 【Java面试笔记:进阶】22.AtomicInteger底层实现原理是什么?如何在自己的产品代码中应用CAS操作?
  • 自然语言处理——语言转换
  • Java社区门诊系统源码 SaaS医院门诊系统源码 可上线运营
  • 怎样理解ceph?
  • mac笔记本安装brew、nvm、git等完整版
  • AI数字人:未来职业的重塑(9/10)
  • react的fiber 用法
  • 启动 n8n 步骤指南
  • [C++ 11] --- 线程异步
  • 2025新版修复蛇年运势测试风水起名系统源码
  • 常见的六种大语言模型微调框架
  • 【MATLAB第118期】基于MATLAB的双通道CNN多输入单输出分类预测方法
  • 【油猴脚本 0】油猴脚本工程化开发 vue3 element-plus
  • Scrapy爬取动态网页:简洁高效的实战指南
  • 深入理解二叉树遍历:递归与栈的双重视角
  • Python AI图像生成方案指南
  • Flutter 移动端开发:集成淘宝 API 实现商品数据实时展示 APP
  • 图像编辑新增一款开源模型,阶跃星辰发布Step1X-Edit
  • 文旅部副部长饶权出任国家文物局局长
  • 敲定!今年将制定金融法、金融稳定法
  • 合同约定拿850万保底利润?重庆市一中院:约定无效,发回重审
  • 长三角数智文化产业基金意向签约会成功举办
  • 马上评丨马拉松“方便门”被处罚,是一针清醒剂