【RabbitMQ】保证消息不丢失
要确保 RabbitMQ 在消费者(Python 服务)重启或挂掉时消息不丢失,需结合 消息持久化、确认机制(ACK) 和 死信队列(DLX) 实现高可靠性:
1. 消息持久化(Durability)
确保消息和队列在 RabbitMQ 服务重启后仍存在:
Java 发布者(设置持久化)
// 创建持久化队列 + 持久化消息
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {// 声明持久化队列(durable=true)channel.queueDeclare("image_queue", true, false, false, null);// 发布持久化消息(deliveryMode=2)String message = "{\"task_id\": \"123\"}";channel.basicPublish("", "image_queue", new AMQP.BasicProperties.Builder().deliveryMode(2) // 持久化消息.build(),message.getBytes());System.out.println("消息已持久化发送");
}
Python 消费者(声明持久化队列)
channel.queue_declare(queue='image_queue', durable=True) # durable=True
2. 手动确认(Manual ACK)
消费者处理完消息后显式发送 ACK,避免消息因崩溃丢失:
Python 消费者(关闭自动 ACK)
def callback(ch, method, properties, body):try:print(f"处理消息: {body.decode()}")# 业务逻辑...ch.basic_ack(delivery_tag=method.delivery_tag) # 手动确认except Exception as e:print(f"处理失败: {e}")ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True) # 重新入队channel.basic_consume(queue='image_queue', on_message_callback=callback, auto_ack=False) # 关闭自动ACK
关键点:
auto_ack=False
:必须关闭自动确认。- 处理成功后调用
basic_ack
,失败时basic_nack
并重新入队。
3. 死信队列(DLX)
处理因消费者崩溃或消息超时未被确认的消息:
Java 发布者(声明死信交换机和队列)
// 定义死信交换机和队列
Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", "dlx_exchange"); // 死信交换机
args.put("x-message-ttl", 60000); // 消息存活时间(可选)channel.queueDeclare("image_queue", true, false, false, args);
channel.exchangeDeclare("dlx_exchange", "direct");
channel.queueDeclare("dlx_queue", true, false, false, null);
channel.queueBind("dlx_queue", "dlx_exchange", "");
Python 消费者(监听死信队列)
channel.queue_declare(queue='dlx_queue', durable=True)
channel.basic_consume(queue='dlx_queue', on_message_callback=handle_dlx_message, auto_ack=False)
作用:
- 消息超过 TTL 或消费者拒绝时,自动路由到死信队列。
- 可对死信消息进行补偿处理(如重试或记录日志)。
4. 消费者高可用(HA)
确保消费者服务崩溃后能自动恢复:
方案 1:进程守护(如 systemd)
# /etc/systemd/system/python_consumer.service
[Unit]
Description=Python RabbitMQ Consumer
After=network.target[Service]
User=root
WorkingDirectory=/opt/app
ExecStart=/usr/bin/python3 /opt/app/consumer.py
Restart=always # 崩溃后自动重启[Install]
WantedBy=multi-user.target
方案 2:容器化(Docker)
# Dockerfile
FROM python:3.9
COPY consumer.py /app/
CMD ["python", "/app/consumer.py"]
docker run -d --restart=unless-stopped my_consumer
5. 消息补偿机制(可选)
极端情况下(如 RabbitMQ 崩溃),可通过数据库记录消息状态:
Java 发布者(本地事务)
// 伪代码:本地事务
try {saveToDatabase(task); // 1. 先存数据库sendToRabbitMQ(task); // 2. 再发消息commitTransaction();
} catch (Exception e) {rollbackTransaction();// 定时任务扫描数据库补偿发送
}
完整流程图
最佳实践总结
措施 | 实现方式 | 作用 |
---|---|---|
消息持久化 | deliveryMode=2 + queueDeclare(durable=true) | 防止 RabbitMQ 重启丢失消息 |
手动 ACK | auto_ack=false + basic_ack() /basic_nack() | 确保消息处理完成才删除 |
死信队列(DLX) | x-dead-letter-exchange + 独立死信消费者 | 处理超时或失败消息 |
消费者高可用 | systemd Restart=always 或 docker --restart=unless-stopped | 消费者崩溃后自动恢复 |
消息补偿 | 数据库记录 + 定时任务扫描 | 极端情况下的兜底措施 |
通过以上组合方案,可确保消息在消费者崩溃、重启或 RabbitMQ 异常时不丢失、不重复、可恢复。