当前位置: 首页 > news >正文

【RabbitMQ】保证消息不丢失

要确保 RabbitMQ 在消费者(Python 服务)重启或挂掉时消息不丢失,需结合 消息持久化确认机制(ACK)死信队列(DLX) 实现高可靠性:


1. 消息持久化(Durability)

确保消息和队列在 RabbitMQ 服务重启后仍存在:

Java 发布者(设置持久化)
// 创建持久化队列 + 持久化消息
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {// 声明持久化队列(durable=true)channel.queueDeclare("image_queue", true, false, false, null);// 发布持久化消息(deliveryMode=2)String message = "{\"task_id\": \"123\"}";channel.basicPublish("", "image_queue", new AMQP.BasicProperties.Builder().deliveryMode(2) // 持久化消息.build(),message.getBytes());System.out.println("消息已持久化发送");
}
Python 消费者(声明持久化队列)
channel.queue_declare(queue='image_queue', durable=True)  # durable=True

2. 手动确认(Manual ACK)

消费者处理完消息后显式发送 ACK,避免消息因崩溃丢失:

Python 消费者(关闭自动 ACK)
def callback(ch, method, properties, body):try:print(f"处理消息: {body.decode()}")# 业务逻辑...ch.basic_ack(delivery_tag=method.delivery_tag)  # 手动确认except Exception as e:print(f"处理失败: {e}")ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)  # 重新入队channel.basic_consume(queue='image_queue', on_message_callback=callback, auto_ack=False)  # 关闭自动ACK

关键点

  • auto_ack=False:必须关闭自动确认。
  • 处理成功后调用 basic_ack,失败时 basic_nack 并重新入队。

3. 死信队列(DLX)

处理因消费者崩溃或消息超时未被确认的消息:

Java 发布者(声明死信交换机和队列)
// 定义死信交换机和队列
Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", "dlx_exchange");  // 死信交换机
args.put("x-message-ttl", 60000);  // 消息存活时间(可选)channel.queueDeclare("image_queue", true, false, false, args);
channel.exchangeDeclare("dlx_exchange", "direct");
channel.queueDeclare("dlx_queue", true, false, false, null);
channel.queueBind("dlx_queue", "dlx_exchange", "");
Python 消费者(监听死信队列)
channel.queue_declare(queue='dlx_queue', durable=True)
channel.basic_consume(queue='dlx_queue', on_message_callback=handle_dlx_message, auto_ack=False)

作用

  • 消息超过 TTL 或消费者拒绝时,自动路由到死信队列。
  • 可对死信消息进行补偿处理(如重试或记录日志)。

4. 消费者高可用(HA)

确保消费者服务崩溃后能自动恢复:

方案 1:进程守护(如 systemd)
# /etc/systemd/system/python_consumer.service
[Unit]
Description=Python RabbitMQ Consumer
After=network.target[Service]
User=root
WorkingDirectory=/opt/app
ExecStart=/usr/bin/python3 /opt/app/consumer.py
Restart=always  # 崩溃后自动重启[Install]
WantedBy=multi-user.target
方案 2:容器化(Docker)
# Dockerfile
FROM python:3.9
COPY consumer.py /app/
CMD ["python", "/app/consumer.py"]
docker run -d --restart=unless-stopped my_consumer

5. 消息补偿机制(可选)

极端情况下(如 RabbitMQ 崩溃),可通过数据库记录消息状态:

Java 发布者(本地事务)
// 伪代码:本地事务
try {saveToDatabase(task);  // 1. 先存数据库sendToRabbitMQ(task); // 2. 再发消息commitTransaction();
} catch (Exception e) {rollbackTransaction();// 定时任务扫描数据库补偿发送
}

完整流程图

Java Publisher RabbitMQ Python Consumer DLX 发布持久化消息 (deliveryMode=2) 推送消息 (auto_ack=false) basic_ack() basic_nack(requeue=true) 重新投递 alt [处理成功] [处理失败或崩溃] 转入死信队列 触发补偿逻辑 alt [消息超时或多次失败] Java Publisher RabbitMQ Python Consumer DLX

最佳实践总结

措施实现方式作用
消息持久化deliveryMode=2 + queueDeclare(durable=true)防止 RabbitMQ 重启丢失消息
手动 ACKauto_ack=false + basic_ack()/basic_nack()确保消息处理完成才删除
死信队列(DLX)x-dead-letter-exchange + 独立死信消费者处理超时或失败消息
消费者高可用systemd Restart=alwaysdocker --restart=unless-stopped消费者崩溃后自动恢复
消息补偿数据库记录 + 定时任务扫描极端情况下的兜底措施

通过以上组合方案,可确保消息在消费者崩溃、重启或 RabbitMQ 异常时不丢失、不重复、可恢复

相关文章:

  • PaddleX的安装
  • “八股训练营”学习总结
  • C++STL(九) :bitset的介绍与使用
  • 特征工程四:数据特征提取TfidfVectorizer的使用
  • re题(48)BUUCTF-[网鼎杯 2020 青龙组]singal
  • 对日开发 秀丸文本编辑器 宏的基本使用
  • 计算属性 vs methods方法
  • Java大厂面试突击:从Spring Boot自动配置到Kafka分区策略实战解析
  • SVT-AV1源码分析-函数svt_aom_motion_estimation_kernel
  • linux:进程的替换
  • 深入解读:2025 数字化转型管理 参考架构
  • 【算法】回溯法
  • 杭电oj(1010、1015、1241)题解
  • 【沉浸式求职学习day27】
  • 【视频生成模型】通义万相Wan2.1模型本地部署和LoRA微调
  • Python----深度学习(基于DNN的吃鸡预测)
  • 动手学深度学习11.11. 学习率调度器-笔记练习(PyTorch)
  • arcpy列表函数的应用(4)
  • MySQL的锁(InnoDB)【学习笔记】
  • win11报错 ‘wmic‘ 不是内部或外部命令,也不是可运行的程序 或批处理文件 的解决方案
  • 现场|西岸美术馆与蓬皮杜启动新五年合作,新展今开幕
  • 商务部:4月份以来的出口总体延续平稳增长态势
  • 李公明|一周画记:哈佛打响第一枪
  • 清华成立人工智能医院,将构建“AI+医疗+教育+科研”闭环
  • 知名计算机专家、浙江大学教授张森逝世
  • 俄罗斯准备在没有先决条件的情况下与乌克兰进行谈判