当前位置: 首页 > news >正文

MQ消息的不可靠性发生情况与解决方案

文章目录

    • 问题:
    • 可能出现的情况:
  • 解决流程与兜底方案
    • 第一个方面:确保生产者一定把消息发送到MQ
      • 1.生产者重试机制
      • 2.生产者确认机制
    • 第二个方面:确保MQ不会将消息丢失
      • 数据持久化
        • 交换机持久化
        • 2.队列持久化
        • 3.消息持久化
      • LazyQueue
        • 控制台配置Lazy模式
        • SpringAMQP代码声明Lazy模式
        • 更新已有队列为lazy模式
    • 第三方面.确保消费者一定要处理消息
      • 消费者确认机制(**Consumer Acknowledgement**)
      • 失败重试机制
      • 失败处理策略:
    • 业务幂等性
      • 唯一消息id:
      • 业务判断:
    • 兜底方案
    • 总结:

问题:

  • 我们该如何确保MQ消息的可靠性
  • 如果真的发送失败,有没有其它的兜底方案?

可能出现的情况:

消息丢失的可能性:

发送消息的流程如图所示:

在消息发送的流程中从生产者到消费者每一步都有可能导致消息丢失:

可能的情况:

  • 生产者发送消息时丢失:
    • **网络问题:**生产者发送消息时连接MQ失败
    • 生产者发送消息到达MQ后未找到**Exchange**
    • 生产者发送消息到达MQ的**Exchange**后,未找到合适的**Queue**
    • 消息到达MQ后,处理消息的进程发生异常
  • MQ导致消息丢失:
    • 消息到达MQ,保存到队列后,尚未消费就突然宕机
  • 消费者处理消息时:
    • 消费者接收消息后还未处理,MQ突然宕机
    • 消息接收后处理过程中抛出异常

要从三个方面保证MQ的可靠性:

  • 确保生产者一定把消息发送到MQ
  • 确保MQ不会将消息弄丢
  • 确保消费者一定要处理消息

解决流程与兜底方案

第一个方面:确保生产者一定把消息发送到MQ

确保生产者将消息发送到MQ有两种机制分别是:1.生产者重试机制、2.生产者确认机制

1.生产者重试机制

发生原因:

第一种情况,就是生产者发送消息时,出现了网络故障,导致与MQ的连接中断。

实现:

使用SpringAMQP提供的消息发送时的重试机制。即:当RabbitTemplate与MQ断网后等情况下,连接超时后,多次重试。

application.yaml文件配置

spring:rabbitmq:connection-timeout: 1s # 设置MQ的连接超时时间template:retry:enabled: true # 开启超时重试机制initial-interval: 1000ms # 失败后的初始等待时间multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multipliermax-attempts: 3 # 最大重试次数

:::info
注意事项:

在网络不稳定的情况下,利用重试机制可以有效提高消息发送的成功率。但是SpringAMQP提供的重试机制是阻塞式的重试,也就是说多次重试等待的过程中,当前线程是被阻塞的。

在项目中如果对于该业务性能有要求,建议禁用重试机制。如果一定要使用,请合理配置等待时长和重试次数,当然也可以考虑使用异步线程来执行发送消息的代码。

:::

2.生产者确认机制

发生原因:

一般情况下生产者与MQ之间的网络没有问题,基本就不会出现消息丢失的问题,但在少数情况下也有可能出现消息发送到MQ之后消息丢失的问题。

少数情况:

  • 在MQ的内部处理消息的进程出现了问题
  • 生产者发送消息到达MQ之后没有找到对应的交换机(Exchange)
  • 生产者发送消息到达MQ的交换机(Exchange)后,没有找到合适的Queue,因此无法进行路由

方案:

RabbitMQ的生产者消息确认机制包括有:Publisher ConfirmPublisher Return两种。开启消息确认机制情况下,当生产者发送消息给MQ后,MQ会根据消息处理的情况返回不同的回执

如图所示:

图解如下:

  • 当消息投递到MQ,但是路由失败时,通过Publisher Return返回异常信息,同时返回ack的确认信息,代表投递成功
  • 临时消息投递到了MQ,并且入队成功,返回ACK,告知投递成功
  • 持久消息投递到了MQ,并且入队完成持久化,返回ACK ,告知投递成功
  • 其它情况都会返回NACK,告知投递失败

其中acknack属于Publisher Confirm机制,ack是投递成功;nack是投递失败。而return则属于Publisher Return机制。

实现:

**在application.yaml**中添加配置,开启生产者确认:

spring:rabbitmq:publisher-confirm-type: correlated # 开启publisher confirm机制,并设置confirm类型publisher-returns: true # 开启publisher return机制

publisher-confirm-type有三种模式可选分别是:

  • none:关闭confirm机制,
    • 优点:性能高。
    • 缺点:无法保证消息送达,可能丢失消息。
  • simple:同步阻塞等待MQ的回执( 同步阻塞等待确认 ),会阻塞当前线程直到收到 confirm。
    • 优点: 实现简单 。
    • 缺点: 性能差,吞吐量低
  • correlated:MQ异步回调返回回执( 异步回调确认 )
    • 优点**:** 性能更好,推荐使用
    • 缺点**:** 实现略复杂,需要维护消息状态和回调处理

一般推荐使用:correlated机制,回调机制。

定义ReturnCallback

每个RabbitTemplate只能配置一个ReturnCallback,因此可以在配置类中统一设置。在publisher模块定义一个配置类:

package com.publisher.config;import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Configuration;import javax.annotation.PostConstruct;@Slf4j
@AllArgsConstructor
@Configuration
public class MqConfig {private final RabbitTemplate rabbitTemplate;@PostConstructpublic void init(){rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returned) {log.error("触发return callback,");log.debug("exchange: {}", returned.getExchange());log.debug("routingKey: {}", returned.getRoutingKey());log.debug("message: {}", returned.getMessage());log.debug("replyCode: {}", returned.getReplyCode());log.debug("replyText: {}", returned.getReplyText());}});}
}

定义ConfirmCallback

由于每个消息发送时的处理逻辑不一定相同,因此ConfirmCallback需要在每次发消息时定义。具体来说,是在调用RabbitTemplate中的convertAndSend方法时,多传递一个参数:

CorrelationData中包含两个核心的东西:

  • id:消息的唯一标示,MQ对不同的消息的回执以此做判断,避免混淆
  • SettableListenableFuture:回执结果的Future对象

将来MQ的回执就会通过这个Future来返回,提前给CorrelationData中的Future添加回调函数来处理消息回执:

新建一个测试,向系统自带的交换机发送消息,并且添加ConfirmCallback

@Test
void testPublisherConfirm() {// 1.创建CorrelationDataCorrelationData cd = new CorrelationData();// 2.给Future添加ConfirmCallbackcd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {@Overridepublic void onFailure(Throwable ex) {// 2.1.Future发生异常时的处理逻辑,基本不会触发log.error("send message fail", ex);}@Overridepublic void onSuccess(CorrelationData.Confirm result) {// 2.2.Future接收到回执的处理逻辑,参数中的result就是回执内容if(result.isAck()){ // result.isAck(),boolean类型,true代表ack回执,false 代表 nack回执log.debug("发送消息成功,收到 ack!");}else{ // result.getReason(),String类型,返回nack时的异常描述log.error("发送消息失败,收到 nack, reason : {}", result.getReason());}}});// 3.发送消息rabbitTemplate.convertAndSend("hmall.direct", "q", "hello", cd);
}

执行结果如下:

可以看到,由于传递的RoutingKey是错误的,路由失败后,触发了return callback,同时也收到了ack。

当修改为正确的RoutingKey以后,就不会触发return callback了,只收到ack。

而如果连交换机都是错误的,则只会收到nack。

:::info
注意事项:

  • 开启生产者确认比较消耗MQ性能,一般不建议开启。
  • 路由失败:一般是因为RoutingKey错误导致,往往是编程导致
  • 交换机名称错误:同样是编程错误导致
  • MQ内部故障:这种需要处理,但概率往往较低。因此只有对消息可靠性要求非常高的业务才需要开启,而且仅仅需要开启ConfirmCallback处理nack就可以了。

:::


第二个方面:确保MQ不会将消息丢失

可能出现的问题:

消息在生产者发送到达MQ之后,MQ不能及时进行保存就有可能会导致消息丢失。

数据持久化

为了提升性能,默认MQ的数据都是在内存存储的临时数据,重启后就会消失。为了保证数据的可靠性,必须配置数据持久化

数据持久化方案有:

  • 交换机持久化
  • 队列持久化
  • 消息持久化
交换机持久化

控制台的Exchanges页面,添加交换机时配置交换机的Durability参数:

设置为Durable就是持久化模式,Transient就是临时模式。

2.队列持久化

在控制台的Queues页面,添加队列时,同样可以配置队列的Durability参数:

3.消息持久化

控制台发送消息的时候,可以添加很多参数,而消息的持久化是要配置一个properties

:::info
说明:在开启持久化机制以后,如果同时还开启了生产者确认,那么MQ会在消息持久化以后才发送ACK回执,进一步确保消息的可靠性。

不过出于性能考虑,为了减少IO次数,发送到MQ的消息并不是逐条持久化到数据库的,而是每隔一段时间批量持久化。一般间隔在100毫秒左右,这就会导致ACK有一定的延迟,因此建议生产者确认全部采用异步方式。

:::

LazyQueue

可能出现的情况:

RabbitMQ默认将接收到的信息保存在内存中以降低消息收发的延迟。在某些特殊情况下会导致消息积压:

  • 消费者宕机或出现网络故障
  • 消息发送量激增,超过了消费者处理速度
  • 消费者处理业务发生阻塞

出现消息堆积问题,RabbitMQ的内存占用就会越来越高,直到触发内存预警上限。此时RabbitMQ会将内存消息刷到磁盘上,这个行为成为PageOut. PageOut会耗费一段时间,并且会阻塞队列进程。因此在这个过程中RabbitMQ不会再处理新的消息,生产者的所有请求都会被阻塞。

配置Lazy Queue(惰性队列)作用:

  • MQ接收到消息后将消息直接存入磁盘,不存入内存从而减小内存的占用,避免消息积压而导致的性能下降。
  • 消费者需要消费消息时才会从磁盘中读取并加载到内存(懒加载)
  • 支持数百万条消息的存储
控制台配置Lazy模式

添加队列的时候,添加x-queue-mod=lazy参数即可设置队列为Lazy模式:

SpringAMQP代码声明Lazy模式

SpringAMQP声明队列的时候,添加x-queue-mod=lazy参数也可设置队列为Lazy模式:

@Bean
public Queue lazyQueue(){return QueueBuilder.durable("lazy.queue").lazy() // 开启Lazy模式.build();
}

基于注解来声明队列并设置为Lazy模式:

@RabbitListener(queuesToDeclare = @Queue(name = "lazy.queue",durable = "true",arguments = @Argument(name = "x-queue-mode", value = "lazy")
))
public void listenLazyQueue(String msg){log.info("接收到 lazy.queue的消息:{}", msg);
}
更新已有队列为lazy模式

对于已经存在的队列,也可以配置为lazy模式,但是要通过设置policy实现。

可以基于命令行设置policy:

rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"lazy"}' --apply-to queues

命令解读:

  • rabbitmqctl :RabbitMQ的命令行工具
  • set_policy :添加一个策略
  • Lazy :策略名称,可以自定义
  • "^lazy-queue$" :用正则表达式匹配队列的名字
  • '{"queue-mode":"lazy"}' :设置队列模式为lazy模式
  • --apply-to queues:策略的作用对象,是所有的队列

也可以在控制台配置policy,进入在控制台的Admin页面,点击Policies,即可添加配置:

第三方面.确保消费者一定要处理消息

**可能出现的情况:**RabbitMQ发送消息给消费者后,消息不一定被消费者正确消费可能出现的故障:

  • 消息投递的过程中出现了网络故障
  • 消费者接收到消息后突然宕机
  • 消费者接收到消息后,因处理不当导致异常

这时候RabbitMQ就需要知道消费者的处理状态,消息处理失败时重新投递消息。

消费者确认机制(Consumer Acknowledgement

当消费者处理消息结束后,向RabbitMQ发送一个回执,告知RabbitMQ自己消息处理状态。回执有三种可选值:

  • ack:成功处理消息,RabbitMQ从队列中删除该消息
  • nack:消息处理失败,RabbitMQ需要再次投递消息
  • reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息

:::info
注意:

reject方式用的较少,除非是消息格式有问题,那就是开发问题了。因此大多数情况下我们需要将消息处理的代码通过try catch机制捕获,消息处理成功时返回ack,处理失败时返回nack.

:::

SpringAMQP实现:

配置文件:

spring:rabbitmq:listener:simple:acknowledge-mode: none # 不做处理

有三种处理模式:

  • **none**:不处理。即消息投递给消费者后立刻ack,消息会立刻从MQ删除。非常不安全,不建议使用
  • **manual**:手动模式。需要自己在业务代码中调用api,发送ackreject,存在业务入侵,但更灵活
  • **auto**:自动模式。SpringAMQP利用AOP对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回ack. 当业务出现异常时,根据异常判断返回不同结果:
    • 如果是业务异常,会自动返回nack,MQ重新发送消息;
    • 如果是消息处理或校验异常,自动返回reject;

失败重试机制

问题:

当消费者出现异常后,消息会不断requeue(重入队)到队列,再重新发送给消费者。如果消费者再次执行依然出错,消息会再次requeue到队列,再次投递,直到消息处理成功为止。

极端情况就是消费者一直无法执行成功,那么消息requeue就会无限循环,导致mq的消息处理飙升,给CPU和内存带来较大的压力。

解决方案:

开启消费者重试机制:在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列。

application.yml文件,添加内容:

spring:rabbitmq:listener:simple:retry:enabled: true # 开启消费者失败重试initial-interval: 1000ms # 初识的失败等待时长为1秒multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-intervalmax-attempts: 3 # 最大重试次数stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

:::info
注意事项:

  • 开启本地重试时,消息处理过程中抛出异常,不会requeue到队列,而是在消费者本地重试
  • 重试达到最大次数后,Spring会返回reject,消息会被丢弃在某些对于消息可靠性要求较高的业务场景下,显然不太合适,需要结合失败处理策略进行处理。

:::

失败处理策略:

Spring允们自定义重试次数耗尽后的消息处理策略,这个策略是由MessageRecovery接口来定义的,有3个不同实现:

  • RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式
  • ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
  • RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机

优雅的一种处理方案是RepublishMessageRecoverer,失败后将消息投递到一个指定的,专门存放异常消息的队列,后续由人工集中处理。

示例代码:

1.consumer服务中定义处理失败消息的交换机和队列

public class ErrorMessageConfig {//定义处理失败消息的交换机@Beanpublic DirectExchange errorMessageExchange(){return new DirectExchange("error.direct");}//定义处理失败消息的队列@Beanpublic Queue errorQueue(){return new Queue("error.queue", true);}@Beanpublic Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");}//关联队列和交换机@Beanpublic MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");}
}

当出现失败消息时可以在控制台查看并在后续进行处理:

业务幂等性

幂等性指的是:指同一个业务,执行一次或多次对业务状态的影响是一致的。

但数据的更新往往不是幂等的,如果重复执行可能造成不一样的后果。比如:

  • 取消订单,恢复库存的业务。如果多次恢复就会出现库存重复增加的情况
  • 退款业务。重复退款对商家而言会有经济损失。

可能出现重复执行的情况:

  • 页面卡顿时频繁刷新导致表单重复提交
  • 服务间调用的重试
  • MQ消息的重复投递

唯一消息id:

  1. 每一条消息都生成一个唯一的id,与消息一起投递给消费者。
  2. 消费者接收到消息后处理自己的业务,业务处理成功后将消息ID保存到数据库
  3. 如果下次又收到相同消息,去数据库查询判断是否存在,存在则为重复消息放弃处理。

实现:

SpringAMQP的MessageConverter自带了MessageID的功能

以Jackson的消息转换器为例:

@Bean
public MessageConverter messageConverter(){// 1.定义消息转换器Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();// 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息jjmc.setCreateMessageIds(true);return jjmc;
}

业务判断:

业务判断就是基于业务本身的逻辑或状态来判断是否是重复的请求或消息,不同的业务场景判断的思路也不一样。

支付修改订单的业务为例,markOrderPaySuccess方法:

@Override
public void markOrderPaySuccess(Long orderId) {
// 1.查询订单
Order old = getById(orderId);
// 2.判断订单状态
if (old == null || old.getStatus() != 1) {// 订单不存在或者订单状态不是1,放弃处理return;
}
// 3.尝试更新订单
Order order = new Order();
order.setId(orderId);
order.setStatus(2);
order.setPayTime(LocalDateTime.now());
updateById(order);
}

优化合并:

@Override
public void markOrderPaySuccess(Long orderId) {lambdaUpdate().set(Order::getStatus, 2).set(Order::getPayTime, LocalDateTime.now()).eq(Order::getId, orderId).eq(Order::getStatus, 1).update();//相当于这样的SQL语句// UPDATE `order` SET status = ? , pay_time = ? WHERE id = ? AND status = 1}

兜底方案

**最终实现目标:**尽管MQ通知失败也要保证订单的支付状态一致。

实现:

通过主动查询保证订单的一致性

流程:

色线圈起来的部分就是MQ通知失败后的兜底处理方案,由交易服务自己主动去查询支付状态。

利用定时任务定期查询,例如每隔20秒就查询一次,并判断支付状态。如果发现订单已经支付,则立刻更新订单状态为已支付即可。

总结:

保证MQ消息的可靠性:

保证MQ消息的可靠性,采用了生产者确认机制、消费者确认、消费者失败重试等策略,确保消息投递的可靠性

兜底方案:

设置了定时任务,这样即便MQ通知失败,还可以利用定时任务作为兜底方案。

相关文章:

  • 显示器关闭和鼠标键盘锁定工具
  • Pygame事件处理详解:键盘、鼠标与自定义事件
  • 树相关处理
  • 结合五层网络结构讲一下用户在浏览器输入一个网址并按下回车后到底发生了什么?
  • Eclipse 插件开发 1
  • 面试新收获-大模型学习
  • Python编程中的基本语句
  • 长短板理论——AI与思维模型【83】
  • 【C++11】右值引用和移动语义:万字总结
  • Docker Compose--在Ubuntu中安装Docker compose
  • 嵌入式C设计模式---策略模式
  • unity bug
  • SpringBoot程序的创建以及特点,配置文件,LogBack记录日志,配置过滤器、拦截器、全局异常
  • JAVA服务内存缓慢上涨,年轻代GC正常但Full GC频繁,如何定位?
  • [ACTF2020 新生赛]BackupFile题解
  • 用Podman Desktop创建自用的WSL-Fedora Linux子系统
  • LeetCode100题
  • linux blueZ 第五篇:高阶优化与性能调优——蓝牙吞吐、延迟与功耗全攻略
  • 编译语言、半编译语言(混合型)和非编译语言(解释型)的差异
  • ROS 快速入门教程05
  • 教育强国建设基础教育综合改革试点来了!改什么?怎么改?
  • 最高法专门规范涉企案件审执工作:从源头防止趋利性执法司法
  • 李彦宏:DeepSeek不是万能,多模态将是未来基础模型的标配
  • 冯象|那“交出”后的崩溃,如撒旦坠落诸天
  • 上海:全面建设重复使用火箭创新高地、低成本商业卫星规模制造高地
  • 谭秀洪任广西梧州市人大常委会党组书记,此前任北海市委常委