当前位置: 首页 > news >正文

RocketMQ事务消息详解

        事务消息是指一种特殊类型的消息,它允许生产者发送消息时保证消息的最终一致性(即执行流程可能会存在时差,但最终状态一定是一致的),即在生产者发出消息后,如果发生了错误,系统能够自动回滚,保障消息的可靠性。

        事务消息由三部分组成:事务消息的发送、事务消息的回查、事务消息的确认,生产者在执行本地事务时,会同时发送一个半消息到队列中,此时对消费者不可见,当本地事务执行成功时,半消息状态变为”已提交“,消费者可进行消费。如果本地事务执行失败,则会回滚该事务,消息被标记为”回滚“状态,RocketMQ会丢弃该消息。

1. 消息回查机制

如果本地事务在执行过程中执行时间过长或发生宕机,未能及时将事务执行状态返回给 RocketMQ,RocketMQ 使用回查机制来确保消息的最终一致性。Broker 会定期向生产者发送回查消息,询问事务的执行状态,如果多次查询未果,会返回一个兜底的返回状态,确保最终的一致性。

2. 事务消息的实现流程

        事务消息的实现需要生产者实现事务监听器(TransactionListener),该监听器负责处理事务消息的提交、回滚和回查。

public class MyTransactionListener implements TransactionListener {// 执行本地事务@Overridepublic LocalTransactionState executeLocalTransaction(Message msg, Object arg) {try {// 执行本地事务,例如数据库操作System.out.println("Executing local transaction...");// 假设本地事务成功return LocalTransactionState.COMMIT_MESSAGE;} catch (Exception e) {e.printStackTrace();// 本地事务失败return LocalTransactionState.ROLLBACK_MESSAGE;}}// 回查事务状态@Overridepublic LocalTransactionState checkLocalTransaction(Message msg) {// 检查本地事务是否成功,例如查询数据库状态System.out.println("Checking local transaction status...");// 假设事务状态已经提交return LocalTransactionState.COMMIT_MESSAGE;}@Overridepublic void onTransactionException(Throwable e) {// 处理事务异常System.out.println("Transaction Exception: " + e.getMessage());}
}

3. 事务消息的实现原理

        事务消息的实现原理同延迟消息的实现原理,都是一开始不把消息发送目标 Topic 队列,而是发往特定的 Topic : RMQ_SYS_TRANS_HALF_TOPIC,队列号默认为0,原始消息的 Topic 和队列号存储在属性中,这样一来消息即使被存储也不会被消费者消费,如果收到生产者的成功执行提交请求,则从 CommitLog 中取出该事务消息,根据属性中的原目标 TOPIC 和 queueId,构建一个新的消息发往 CommitLog 及目标队列中,此时消费者便可进行消费。如果本地事务执行失败需要回滚时,则不需要构建新的消息存储在 CommitLog 中,这样消费者就不会消费到了。

        并且和延迟消息中 Broker 在启动时会初始化ScheduleMessageService并创建线程池来定时调度延迟消息队列一样,Broker 在启动时也会起一个定时线程TransactionalMessageCheckService服务,它会定时的扫描RMQ_SYS_TRANS_HALF_TOPIC这个 TOPIC 下的消息,去请求生产者的回查接口来检查事务是否执行成功,如果执行成功则恢复原先的 TOPIC 消息供消费者消费,如果执行失败则不投递。

相关文章:

  • c#-命名和书写规范
  • Java虚拟机(JVM)家族发展史及版本对比
  • C语言之阶乘2.0
  • H3C Magic路由器安全警报来啦![特殊字符][特殊字符]
  • uniapp 仿小红书轮播图效果
  • 深度解析 TransmittableThreadLocal(TTL):原理、实战与优化指南
  • Node.js 学习入门指南
  • Linux 内核 IPv4 套接字创建机制与协议表管理深度解析
  • 全链路数据仓建设指南:从构建流程到应用场景
  • 银河麒麟系统安装vscode
  • 2023 国考
  • JAVA中包装类型的数值比较问题
  • SPH Engineering - 无人机技术开发专家
  • shell脚本2
  • k8s基于角色的访问控制(RBAC)
  • 使用ffmpeg 将图片合成为视频,填充模糊背景,并添加两段音乐
  • SiamFC算法深度解析
  • 解决微信开发者工具报错 “Component is not found in path wx://not-found“ 代码修改后热更新报错
  • 【无人机】无人机遥控器设置与校准,飞行模式的选择,无线电控制 (RC) 设置
  • 被封号如何申诉?Google Play开发者账号申诉模版分享
  • 商务部:美方应彻底取消所有对华单边关税措施
  • 国防部:希望美方不要有“受迫害妄想症”,总拿别人当借口
  • 大理洱源4.8级地震致442户房屋受损,无人员伤亡
  • 牛市早报|特朗普称或将“大幅降低”对华关税,外交部回应
  • 聚焦“共赢蓝色未来”,首届 “海洋命运共同体”上海论坛举行
  • 金融监管总局:支持将上海打造成具有国际竞争力的再保险中心