启动你的RocketMQ之旅(五)-Broker详细——消息传输
前言:
👏作者简介:我是笑霸final。
📝个人主页: 笑霸final的主页2
📕系列专栏:java专栏
📧如果文章知识点有错误的地方,请指正!和大家一起学习,一起进步👀
🔥如果感觉博主的文章还不错的话,👍点赞👍 + 👀关注👀 + 🤏收藏🤏
上一章节:启动你的RocketMQ之旅(四)-Producer启动和发送流程(下)
下一章节:启动你的RocketMQ之旅(六)-Broker详细——主从复制
目录
- 一、概述
- 二、消息传输
- 2.1SendMessageProcessor
- 2.2 非批量消息 asyncSendMessage
- 2.3 延迟消息
- 2.4 事务消息接收
一、概述
RocketMQ 的 Broker 是消息系统中的核心组件,它负责消息的存储、传输和消费者的请求响应等功能。它的特点和功能如下:
- 消息传输
生产者将消息发送到 Broker,Broker 接收消息后将其写入 CommitLog,并根据消息的主题和队列信息将其存储在相应的 ConsumeQueue 中。
消费者订阅特定的主题,并从 Broker 获取消息进行处理。Broker 根据消费者的订阅关系提供消息。
- 高可用性
主从复制:为了保证高可用性,Broker 支持同步和异步两种方式的主从复制。同步复制保证数据一致性但可能增加延迟,异步复制则提高性能但在故障时可能会丢失部分数据。
水平扩展:RocketMQ 集群可以包含多个 Broker 节点,通过添加新的 Broker 节点来实现系统的水平扩展,从而提高消息存储容量和吞吐量。
- 消息存储
CommitLog:这是 Broker 中存储消息的主要文件,所有消息都以顺序写入的方式记录在 CommitLog 文件中。
ConsumeQueue:逻辑队列,每个主题(Topic)和队列(Queue)对应一个 ConsumeQueue 文件,用于存储消息在 CommitLog 中的物理偏移量等信息,方便快速检索消息。
- 存储优化策略
顺序写入:利用顺序写入的方式提高消息存储效率。
零拷贝技术:减少数据在内核空间和用户空间之间拷贝次数,降低 CPU 和内存开销。
批量处理:支持批量发送和接收消息,减少网络交互次数,提高吞吐量。
二、消息传输
这是 RocketMQ Broker 接收消息的一个简化版源码分析流程。
- BrokerController:作为整个 Broker 的控制中心,它负责启动和管理 Broker 的各个组件。在这个阶段,BrokerController 会调用 NettyRemotingServer 的 start() 方法来启动 Netty 服务端。
- NettyRemotingServer:这是一个基于 Netty 实现的网络通信模块,用于处理与客户端之间的网络通信。当 start() 方法被调用时,它会绑定到指定的端口并开始监听来自客户端的消息发送请求。
- 当 Netty 服务端接收到客户端的消息发送请求后,会触发 processMessageReceived() 方法的执行。
- NettyRemotingAbstract:这是一个抽象类,提供了处理网络通信的基本功能。在 processMessageReceived() 方法中,会对接收到的消息进行初步处理,如解码、验证等,并将其封装成一个请求对象。
- 经过初步处理后的请求会被传递给 SendMessageProcessor 进行进一步处理。SendMessageProcessor 是专门负责处理消息发送请求的处理器。
- 在 asyncProcessRequest() 方法中,SendMessageProcessor 会对请求进行更详细的解析,并决定如何处理该请求。如果请求是合法的消息发送请求,那么它会调用 DefaultMessageStore 的 asyncSendMessage() 方法来异步发送消息。
- DefaultMessageStore 是 RocketMQ 中负责消息存储的核心组件。在 asyncSendMessage() 方法中,它会创建一个新的线程或使用现有的线程池来异步处理消息发送请求。
- 在异步线程中,DefaultMessageStore 会调用自己的 asyncPutMessage() 方法来实际存储消息。
- 在 asyncPutMessage() 方法中,DefaultMessageStore 会将消息写入 CommitLog 文件,并更新相应的 ConsumeQueue 和 IndexFile 等索引信息。完成消息存储后,DefaultMessageStore 会返回一个结果给 SendMessageProcessor,表示消息已经成功存储或者存储过程中发生了错误。
2.1SendMessageProcessor
public CompletableFuture<RemotingCommand> asyncProcessRequest(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {final SendMessageContext mqtraceContext;switch (request.getCode()) {case RequestCode.CONSUMER_SEND_MSG_BACK:return this.asyncConsumerSendMsgBack(ctx, request);default:// 对请求头进行解析SendMessageRequestHeader requestHeader = parseRequestHeader(request);if (requestHeader == null) {return CompletableFuture.completedFuture(null);}mqtraceContext = buildMsgContext(ctx, requestHeader);this.executeSendMessageHookBefore(ctx, request, mqtraceContext);// todo 根据请求头走批量还是非批量if (requestHeader.isBatch()) {return this.asyncSendBatchMessage(ctx, request, mqtraceContext, requestHeader);} else {// todo 非批量return this.asyncSendMessage(ctx, request, mqtraceContext, requestHeader);}}}
段代码是 RocketMQ Broker 中处理消息发送请求的核心逻辑之一,具体实现了 asyncProcessRequest 方法。
- ChannelHandlerContext ctx:表示当前的网络上下文,包含了与客户端通信的相关信息。
- RemotingCommand request:表示从客户端接收到的消息请求对象。
- 返回值:一个 CompletableFuture,用于异步处理请求并最终返回结果。
2.2 非批量消息 asyncSendMessage
private CompletableFuture<RemotingCommand> asyncSendMessage(ChannelHandlerContext ctx, RemotingCommand request,SendMessageContext mqtraceContext,SendMessageRequestHeader requestHeader) {// 1 包装一些信息给相应final RemotingCommand response = preSend(ctx, request, requestHeader);final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();if (response.getCode() != -1) {return CompletableFuture.completedFuture(response);}// 2 得到剧具体消息final byte[] body = request.getBody();int queueIdInt = requestHeader.getQueueId();// 得到topic信息TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());// 请求消息 没有选择队列,就随机if (queueIdInt < 0) {queueIdInt = randomQueueId(topicConfig.getWriteQueueNums());}// 构建存储到磁盘的类 ,消息内部保存信息MessageExtBrokerInner msgInner = new MessageExtBrokerInner();msgInner.setTopic(requestHeader.getTopic());//topic信息msgInner.setQueueId(queueIdInt);// 队列id//todo 消息重试和死信队列if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig)) {return CompletableFuture.completedFuture(response);}msgInner.setBody(body);//具体消息字节码msgInner.setFlag(requestHeader.getFlag());Map<String, String> origProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());MessageAccessor.setProperties(msgInner, origProps);msgInner.setBornTimestamp(requestHeader.getBornTimestamp());msgInner.setBornHost(ctx.channel().remoteAddress());msgInner.setStoreHost(this.getStoreHost());msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());String clusterName = this.brokerController.getBrokerConfig().getBrokerClusterName();MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_CLUSTER, clusterName);if (origProps.containsKey(MessageConst.PROPERTY_WAIT_STORE_MSG_OK)) {// There is no need to store "WAIT=true", remove it from propertiesString to save 9 bytes for each message.// It works for most case. In some cases msgInner.setPropertiesString invoked later and replace it.String waitStoreMsgOKValue = origProps.remove(MessageConst.PROPERTY_WAIT_STORE_MSG_OK);msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));// Reput to properties, since msgInner.isWaitStoreMsgOK() will be invoked laterorigProps.put(MessageConst.PROPERTY_WAIT_STORE_MSG_OK, waitStoreMsgOKValue);} else {msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));}CompletableFuture<PutMessageResult> putMessageResult = null;String transFlag = origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);// todo 是否是事务消息if (transFlag != null && Boolean.parseBoolean(transFlag)) {if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {// Broker如果不支持事务response.setCode(ResponseCode.NO_PERMISSION);response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()+ "] sending transaction message is forbidden");return CompletableFuture.completedFuture(response);}// 一个异步方法,用于处理事务消息的预提交阶段。这个方法会将消息暂存到事务消息存储中,putMessageResult = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);} else {// 完成对信息的落盘【异步】putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);}return handlePutMessageResultFuture(putMessageResult, response, request, msgInner, responseHeader, mqtraceContext, ctx, queueIdInt);}
这段代码是 RocketMQ Broker 中处理非批量消息发送的核心逻辑,具体实现了 asyncSendMessage 方法,它负责将客户端发送的消息存储到磁盘中,并返回结果给客户端。
大致流程:
这里构建了一个msgInner对象,用于异步刷盘 。此对象保存了
- Topic:消息所属的主题名称。
- Queue ID:消息所在的队列编号。
- Body:消息的实际内容,即消息体,以字节码形式存储。
- Flag:消息的标志位,包含消息的属性信息。
- Properties:用户自定义的消息属性,转换为Map格式,可以从MessageExtBrokerInner中提取出来或设置>进去。
- Born Timestamp:消息创建的时间戳。
- Born Host:消息产生的主机地址,即生产者客户端的网络地址。
- Store Host:消息存储的服务端地址,即Broker的地址。
- Reconsume Times:消息已经被重新消费的次数。
- Cluster Name:消息所在的集群名称。
- Transaction Flag(间接存储):通过检查用户自定义属性判断是否为事务消息。
最终根据是事务消息还是普通消息,决定把msgInner异步保存到事务消息存储中还是异步落盘,然后封装返回对象返回给producer
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
msgInner.setTopic(requestHeader.getTopic());//topic信息
msgInner.setQueueId(queueIdInt);//todo 消息重试和死信队列if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig)) {return CompletableFuture.completedFuture(response);}
msgInner.setBody(body);
msgInner.setFlag(requestHeader.getFlag());
Map<String, String> origProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());
MessageAccessor.setProperties(msgInner, origProps);
msgInner.setBornTimestamp(requestHeader.getBornTimestamp());
msgInner.setBornHost(ctx.channel().remoteAddress());
msgInner.setStoreHost(this.getStoreHost());
msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
MessageExtBrokerInner
:
- 这是一个内部类,表示消息在 Broker 内部的存储格式。
- 设置消息的主题、队列 ID、消息体、标志位、属性、时间戳、生产者地址、Broker 地址等信息。
重试和死信队列
:
- 调用 handleRetryAndDLQ 方法处理消息重试和死信队列相关的逻辑。如果处理失败,则直接返回响应。
2.3 延迟消息
//非事务消息 或者 是已经提交事务的消息if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {/*** 延迟消息 并且 延迟等级要大于0*/// Delay Deliveryif (msg.getDelayTimeLevel() > 0) {// 检查延时级别是否超过最大允许值,超过最大值就设置为最大值 18if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());}//修改主题为内置的延迟主题topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;//根据延时级别计算出新的队列IDqueueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());// Backup real topic, queueId//根据延时级别计算出新的队列IDMessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));//更新消息的topic和queueId字段。msg.setTopic(topic);msg.setQueueId(queueId);}}
这段代码展示了 RocketMQ 中处理 非事务消息 或 已提交的事务消息 的逻辑,特别是针对 延迟消息 的特殊处理
流程
1、检查延时级别是否超过最大允许值,超过最大值就设置为最大值 18
2、修改主题为延迟主题为SCHEDULE_TOPIC_XXXX
3、根据延迟等级计算出新的队列id ;id=延迟等级-1
4、存储原始主题和队列ID到消息属性中,以便后续恢复。
5、更新消息的topic和queueId字段。
然后将其作为普通消息进行存储,追加到commitlog文件中中。
注意:
延迟级别的配置
: 默认情况下,RocketMQ 支持 18 个延迟级别,分别对应不同的时间间隔(如 1 秒、5 秒、10 秒等)。
延迟消息的可靠性
:延迟消息的可靠性依赖于 Broker 的调度能力。如果 Broker 出现故障,可能会导致延迟消息的投递时间不准确。
性能影响
:延迟消息的处理会增加 Broker 的调度负担,特别是在高并发场景下,需要合理设计延迟队列的数量和大小。
定时任务处理
public void start() {if (started.compareAndSet(false, true)) {super.load();//启动一个定时器 守护线程this.timer = new Timer("ScheduleMessageTimerThread", true);for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {//得到延迟Integer level = entry.getKey();//获取当前遍历到的延迟级别。Long timeDelay = entry.getValue();//获取当前延迟级别的延迟时间。//从offsetTable获取对应延迟级别的偏移量,若无则后续赋值为0。Long offset = this.offsetTable.get(level);if (null == offset) {offset = 0L;}//如果存在有效的延迟时间,则为每个延迟级别创建并调度一个DeliverDelayedMessageTimerTask任务,// 任务将在FIRST_DELAY_TIME=1000 毫秒后执行,开始检查并投递达到延迟时间的消息。if (timeDelay != null) {this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);}}/*** 调度一个固定频率的定时任务,每隔 1000 * 10毫秒执行一次。* 在任务内部,如果服务仍处于启动状态,则调用ScheduleMessageService.this.persist()方法持久化服务状态。*/this.timer.scheduleAtFixedRate(new TimerTask() {/*** 在run方法内,尝试持久化服务状态,如果出现异常则记录错误日志。*/@Overridepublic void run() {try {if (started.get()) {// 持久化ScheduleMessageService.this.persist();}} catch (Throwable e) {log.error("scheduleAtFixedRate flush exception", e);}}}, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());}}
在定时器里,如果延迟时间不为null则为每个延迟级别创建并调度一个DeliverDelayedMessageTimerTask任务,任务将在1000 毫秒后执行,开始检查并投递达到延迟时间的消息。DeliverDelayedMessageTimerTask此类继承了TimerTask,下面是他的run方法
在DeliverDelayedMessageTimerTask中根据
SCHEDULE_TOPIC_XXXX
名称和延时等级对应的queueId获取消息队列,然后从commitlog中读取消息,还原消息的原有信息(消息的原topic信息)再将消息持久化到commitlog文件中,这样消费者就可以拉取消息了.
public void executeOnTimeup() {// 定位到特定延时级别的系统延时消息队列,以便后续从中读取和处理已到达投递时间的延时消息。ConsumeQueue cq =ScheduleMessageService.this.defaultMessageStore//查找或创建指定主题和队列ID的消费队列对象。如果消费队列不存在,则先创建一个并加入到映射表中,最后返回这个消费队列对象。.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,delayLevel2QueueId(delayLevel));long failScheduleOffset = offset;//初始偏移量if (cq != null) {//从ConsumeQueue(消费队列)中获取索引缓冲区。SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);if (bufferCQ != null) {try {long nextOffset = offset;//下一个偏移量int i = 0;//当需要从扩展地址获取更多有关消息的信息时,会使用此类实例来装载这些扩展数据,便于进一步解析和处理消息。ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {//遍历缓冲区,读取每个消息的偏移量、大小及标签码(tagsCode)等元数据。long offsetPy = bufferCQ.getByteBuffer().getLong();int sizePy = bufferCQ.getByteBuffer().getInt();long tagsCode = bufferCQ.getByteBuffer().getLong();if (cq.isExtAddr(tagsCode)) {if (cq.getExt(tagsCode, cqExtUnit)) {//如果消息具有扩展信息,则加载扩展内容到ConsumeQueueExt.CqExtUnit对象中。tagsCode = cqExtUnit.getTagsCode();} else {//can't find ext content.So re compute tags code.log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}",tagsCode, offsetPy, sizePy);long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);}}long now = System.currentTimeMillis();//根据当前时间计算实际的投递时间戳。 返回的时间long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);//下一个偏移量nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);long countdown = deliverTimestamp - now;if (countdown <= 0) {// 如果当前时间已经超过投递时间,则从CommitLog中查找并加载完整消息。MessageExt msgExt =ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(offsetPy, sizePy);if (msgExt != null) {try {// 将原始的MessageExt对象转换成MessageExtBrokerInner对象MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);if (TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC.equals(msgInner.getTopic())) {log.error("[BUG] the real topic of schedule msg is {}, discard the msg. msg={}",msgInner.getTopic(), msgInner);continue;}// 将处理过的延时消息重新存入消息存储系统(例如CommitLog)的过程。PutMessageResult putMessageResult =ScheduleMessageService.this.writeMessageStore.putMessage(msgInner);if (putMessageResult != null&& putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {// 消息写入成功if (ScheduleMessageService.this.defaultMessageStore.getMessageStoreConfig().isEnableScheduleMessageStats()) {// 如果启用了延时消息统计,则进行一系列统计信息的更新:ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incQueueGetNums(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, delayLevel - 1, putMessageResult.getAppendMessageResult().getMsgNum());ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incQueueGetSize(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, delayLevel - 1, putMessageResult.getAppendMessageResult().getWroteBytes());ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incGroupGetNums(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, putMessageResult.getAppendMessageResult().getMsgNum());ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incGroupGetSize(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, putMessageResult.getAppendMessageResult().getWroteBytes());ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incTopicPutNums(msgInner.getTopic(), putMessageResult.getAppendMessageResult().getMsgNum(), 1);ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incTopicPutSize(msgInner.getTopic(),putMessageResult.getAppendMessageResult().getWroteBytes());ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incBrokerPutNums(putMessageResult.getAppendMessageResult().getMsgNum());}continue;} else {// 写入失败 容错机制 当重新投递延时消息失败时,不是立刻停止处理,// 而是记录错误日志并重新安排任务稍后再次尝试投递,确保延时消息能在合适的时间得到处理。// XXX: warn and notify melog.error("ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}",msgExt.getTopic(), msgExt.getMsgId());ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,nextOffset), DELAY_FOR_A_PERIOD);ScheduleMessageService.this.updateOffset(this.delayLevel,nextOffset);return;}} catch (Exception e) {/** XXX: warn and notify me*/log.error("ScheduleMessageService, messageTimeup execute error, drop it. msgExt={}, nextOffset={}, offsetPy={}, sizePy={}", msgExt, nextOffset, offsetPy, sizePy, e);}}} else {// 还没到投递时间 则按剩余时间重新调度任务ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset),countdown);ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);return;}} // end of for//处理延时消息队列中的所有消息之后或者在循环内遇到需要延迟投递的消息时,进行的任务调度和偏移量更新操作:nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);return;} finally {bufferCQ.release();}} // end of if (bufferCQ != null)else {//这段代码的作用是校验提供的偏移量是否在当前消费队列(ConsumeQueue)的有效范围内long cqMinOffset = cq.getMinOffsetInQueue();long cqMaxOffset = cq.getMaxOffsetInQueue();if (offset < cqMinOffset) {failScheduleOffset = cqMinOffset;log.error("schedule CQ offset invalid. offset={}, cqMinOffset={}, cqMaxOffset={}, queueId={}",offset, cqMinOffset, cqMaxOffset, cq.getQueueId());}if (offset > cqMaxOffset) {failScheduleOffset = cqMaxOffset;log.error("schedule CQ offset invalid. offset={}, cqMinOffset={}, cqMaxOffset={}, queueId={}",offset, cqMinOffset, cqMaxOffset, cq.getQueueId());}}} // end of if (cq != null)ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,failScheduleOffset), DELAY_FOR_A_WHILE);}
- 定位延时消息队列:根据延时级别找到对应的系统延时消息队列(ConsumeQueue)。
- 初始化变量:设置初始偏移量(failScheduleOffset)和下一个待检查的偏移量(nextOffset)。
- 获取索引缓冲区:从延时消息队列中获取索引缓冲区,用于读取队列中的消息元数据。
- 循环处理消息:
○ 读取元数据:遍历缓冲区,读取每个消息的偏移量、大小及标签码(tagsCode)等元数据。
○ 处理扩展信息:如果消息具有扩展信息,则加载扩展内容到ConsumeQueueExt.CqExtUnit对象中。
○ 计算投递时间:根据当前时间计算实际的投递时间戳。
○ 判断消息是否可投递:如果当前时间已经超过投递时间,则从CommitLog中查找并加载完整消息。
○ 重新投递消息:若消息有效,则对其进行适当处理(如messageTimeup方法),并将其重新发布到目标主题。同时更新统计信息和指标,并根据发布结果决定是否需要重新调度任务。
○ 延期投递:若消息未达到投递时间,则按剩余延时时间重新调度任务。 - 循环结束后处理:循环结束后,如果没有更多立即可投递的消息,则根据当前偏移量设置下一次任务的触发时间。
- 错误处理:如果在处理过程中发现提供的偏移量超出延时消息队列的有效范围,则调整failScheduleOffset为队列的最小或最大有效偏移量,并记录错误日志。
- 最终调度:无论上述过程如何,在方法结束时,都会根据failScheduleOffset设置一个定时任务,确保即使出现异常也能继续检查延时消息队列中的其他消息。
总结
首先将延时消息换了一个topic名称进行持久化,这样消费者就无法获取消息,然后有定时任务,会将消息还原到原有的topic信息,这样消费者又可以重新拉取消息了。
2.4 事务消息接收
调用这个方法时,Broker会接收并持久化这个半事务消息,但并不会立即将其暴露给消费者,而是等待生产者后续提交或回滚事务状态的确认。只有当生产者通知Broker事务已经成功提交时,Broker才会将消息标记为可消费状态;反之,如果事务回滚,Broker则会丢弃这条消息,从而确保分布式事务的一致性。
private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {// 记录消息的新属性// 把消息的topic 记录到 REAL_TOPIC 的属性中MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());//把消息的 QueueId 记录到 REAL_QID 的属性中MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,String.valueOf(msgInner.getQueueId()));// 更新系统标志位 为0 非事务类型msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));// 将消息的topic 设置为 RMQ_SYS_TRANS_HALF_TOPIC 半事务topicmsgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());// 设置队列idmsgInner.setQueueId(0);msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));return msgInner;}
RocketMQ并非将事务消息保存至消息中 client 指定的 queue,而是记录了原始的 topic 和 queue 后,把这个事务消息保存在 - 特殊的内部 topic:RMQ_SYS_TRANS_HALF_TOPIC - 序号为 0 的 queue。这套 topic 和 queue 对消费者不可见,因此里面的消息也永远不会被消费。这就保证在事务提交成功之前,这个事务消息对 Consumer 是消费不到的。
如何事务反查
进入到sndCheckMessage()方法
/*** 【重点】处理 EndTransactionRequest 请求*/@Overridepublic RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throwsRemotingCommandException {final RemotingCommand response = RemotingCommand.createResponseCommand(null);final EndTransactionRequestHeader requestHeader =(EndTransactionRequestHeader)request.decodeCommandCustomHeader(EndTransactionRequestHeader.class);...OperationResult result = new OperationResult();// 提交事务if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) {// 从commitLog中查出原始的prepared消息,要求producer在发送半消息和comit消息都要同一个brokerresult = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);if (result.getResponseCode() == ResponseCode.SUCCESS) {// 检查获取的消息与请求的消息是否匹配RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);if (res.getCode() == ResponseCode.SUCCESS) {// 将prepareMessage构建为要发送给consumer的消息MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());...// 调用MessageStore的消息存储接口提交消息,使用真正的topic和queueIdRemotingCommand sendResult = sendFinalMessage(msgInner);if (sendResult.getCode() == ResponseCode.SUCCESS) {// 将prepareMessage标记为deletethis.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());}return sendResult;}return res;}} else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {// 收到的是rollback,查出原始Prepare消息result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader);if (result.getResponseCode() == ResponseCode.SUCCESS) {// 检查获取的消息与请求的消息是否匹配RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);if (res.getCode() == ResponseCode.SUCCESS) {// 将prepareMessage标记为deletethis.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());}return res;}}...// 返回响应return response;}
如果接收到的是Commit,则将原本的half消息构建为普通消息,然后使用真正的topic和queueId将消息保存到store,这时可以被consumer消费;然后将原本的half消息标记为delete状态(这里因为对consumer不可见,无需撤销消息,且因为RocketMQ也无法真正的删除一条消息,因为是顺序写文件的)
如果是Rollback,则直接将half消息标记为delete返回响应
服务端在接收到ROLLBACK_MESSAGE的指令后,会根据事务消息的事务ID等信息找到对应的消息,并将其从CommitLog中清除。这里的清除实际上是指在后续的清理流程中,将包含回滚事务消息的文件段标记为可回收,待下次刷盘或清理时,这部分空间可以被重用,从而达到逻辑上的删除效果。
不论提交还是回滚,都将操作结果封装到响应命令response中,并返回给客户端。如果在处理事务过程中遇到错误,将错误码和错误原因填充到响应命令中