RocketMQ事务消息详解
事务消息是指一种特殊类型的消息,它允许生产者发送消息时保证消息的最终一致性(即执行流程可能会存在时差,但最终状态一定是一致的),即在生产者发出消息后,如果发生了错误,系统能够自动回滚,保障消息的可靠性。
事务消息由三部分组成:事务消息的发送、事务消息的回查、事务消息的确认,生产者在执行本地事务时,会同时发送一个半消息到队列中,此时对消费者不可见,当本地事务执行成功时,半消息状态变为”已提交“,消费者可进行消费。如果本地事务执行失败,则会回滚该事务,消息被标记为”回滚“状态,RocketMQ会丢弃该消息。
1. 消息回查机制
如果本地事务在执行过程中执行时间过长或发生宕机,未能及时将事务执行状态返回给 RocketMQ,RocketMQ 使用回查机制来确保消息的最终一致性。Broker 会定期向生产者发送回查消息,询问事务的执行状态,如果多次查询未果,会返回一个兜底的返回状态,确保最终的一致性。
2. 事务消息的实现流程
事务消息的实现需要生产者实现事务监听器(TransactionListener
),该监听器负责处理事务消息的提交、回滚和回查。
public class MyTransactionListener implements TransactionListener {// 执行本地事务@Overridepublic LocalTransactionState executeLocalTransaction(Message msg, Object arg) {try {// 执行本地事务,例如数据库操作System.out.println("Executing local transaction...");// 假设本地事务成功return LocalTransactionState.COMMIT_MESSAGE;} catch (Exception e) {e.printStackTrace();// 本地事务失败return LocalTransactionState.ROLLBACK_MESSAGE;}}// 回查事务状态@Overridepublic LocalTransactionState checkLocalTransaction(Message msg) {// 检查本地事务是否成功,例如查询数据库状态System.out.println("Checking local transaction status...");// 假设事务状态已经提交return LocalTransactionState.COMMIT_MESSAGE;}@Overridepublic void onTransactionException(Throwable e) {// 处理事务异常System.out.println("Transaction Exception: " + e.getMessage());}
}
3. 事务消息的实现原理
事务消息的实现原理同延迟消息的实现原理,都是一开始不把消息发送目标 Topic 队列,而是发往特定的 Topic : RMQ_SYS_TRANS_HALF_TOPIC
,队列号默认为0,原始消息的 Topic 和队列号存储在属性中,这样一来消息即使被存储也不会被消费者消费,如果收到生产者的成功执行提交请求,则从 CommitLog 中取出该事务消息,根据属性中的原目标 TOPIC 和 queueId,构建一个新的消息发往 CommitLog 及目标队列中,此时消费者便可进行消费。如果本地事务执行失败需要回滚时,则不需要构建新的消息存储在 CommitLog 中,这样消费者就不会消费到了。
并且和延迟消息中 Broker 在启动时会初始化ScheduleMessageService
并创建线程池来定时调度延迟消息队列一样,Broker 在启动时也会起一个定时线程TransactionalMessageCheckService
服务,它会定时的扫描RMQ_SYS_TRANS_HALF_TOPIC
这个 TOPIC 下的消息,去请求生产者的回查接口来检查事务是否执行成功,如果执行成功则恢复原先的 TOPIC 消息供消费者消费,如果执行失败则不投递。