RabbitMQ 复习总结
多年未用,温故。。。。。
一、使用场景
-
异步通信解耦
- 场景示例:用户注册后需发送邮件/短信,通过消息队列异步通知邮件/短信服务,提升主流程响应速度。
- 优势:避免同步等待,提升系统吞吐量。
-
流量削峰填谷
- 场景示例:电商秒杀场景中,订单请求通过队列缓存,下游系统按处理能力消费消息,避免系统压垮。
- 核心机制:消息堆积能力(默认保留3天),支持亿级消息缓存。
-
服务解耦与容错
- 场景示例:订单系统与库存系统解耦,订单消息写入队列后直接返回成功,库存系统异步处理。
- 容错能力:镜像队列(主从模式)保障节点故障时服务可用性。
-
分布式事务补偿
- 场景示例:通过发布确认机制(Publisher Confirms)确保消息可靠投递,结合消费端幂等处理实现最终一致性。
二、竞品对标分析
竞品名称 | 核心优势 | 适用场景 | 劣势 | 吞吐量(TPS) | 响应速度(延迟) | 可靠性 |
---|---|---|---|---|---|---|
Apache Kafka | 高吞吐量、分布式架构、持久化存储 | 大数据实时处理、日志聚合 | 复杂度较高,需维护ZooKeeper | 百万级 | 低至2ms | 高 |
RocketMQ | 顺序消息、事务消息、高吞吐量 | 电商订单、金融级场景 | 社区活跃度低于Kafka | 百万级 | 毫秒级 | 高 |
ActiveMQ | JMS规范兼容、轻量级 | Java生态内传统应用集成 | 性能弱于RabbitMQ | 万级 | ms级 | 中 |
ZeroMQ | 轻量级、无中心化代理 | 嵌入式系统、高性能网络通信 | 缺乏消息持久化机制 | - | - | 低 |
RabbitMQ | 灵活路由、多种消息模式、跨平台 | 任务队列、发布-订阅、事件驱动 | 吞吐量相对较低 | 万级 | 微秒级(低吞吐量时) | 高 |
关键对比点:
- 吞吐量:Kafka和RocketMQ在吞吐量上表现优异,适合大规模数据处理;RabbitMQ吞吐量较低,但可通过集群优化提升。
- 响应速度:RabbitMQ在低吞吐量时延迟最低,适合对实时性要求高的场景;Kafka在高吞吐量时仍能保持低延迟。
- 可靠性:所有消息队列均支持持久化存储和消息确认机制,确保消息不丢失;RabbitMQ通过镜像队列模式提供高可用性。
- 场景适用性:RabbitMQ适用于需要灵活路由和多种消息模式的场景;Kafka适用于大数据实时处理和日志聚合;RocketMQ适用于电商和金融级场景。
三、常用Exchange类型
类型 | 路由规则 | 性能排序 | 典型场景 |
---|---|---|---|
Fanout | 广播模式,所有绑定队列均接收消息(忽略Routing Key) | 最高 | 群发通知、日志广播 |
Direct | 精确匹配Routing Key与Binding Key | 次之 | 订单状态更新、定向推送 |
Topic | 通配符匹配(* 匹配单个单词,# 匹配多级单词) | 较低 | 主题订阅(如日志按级别分类) |
Headers | 匹配消息头中的键值对(不依赖Routing Key) | 最低 | 复杂条件路由(如多属性组合筛选) |
四、Docker安装部署方法
1. 环境要求
- 依赖:Docker已安装并运行
- 系统支持:Linux/Windows/macOS(推荐Linux生产环境)
2. 安装步骤
# 1. 拉取RabbitMQ镜像(带管理界面)
docker pull rabbitmq:3.12.0-management# 2. 启动容器(开放端口+数据持久化)
docker run -d \--name rabbitmq \-p 5672:5672 \ # AMQP协议端口-p 15672:15672 \ # 管理界面端口-v /opt/rabbitmq/data:/var/lib/rabbitmq \ # 数据持久化目录rabbitmq:3.12.0-management# 3. 安装延迟队列插件(可选)
# 3.1 下载插件文件
wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.12.0/rabbitmq_delayed_message_exchange-3.12.0.ez# 3.2 复制插件到容器
docker cp rabbitmq_delayed_message_exchange-3.12.0.ez rabbitmq:/opt/rabbitmq/plugins# 3.3 启用插件并重启容器
docker exec rabbitmq rabbitmq-plugins enable rabbitmq_delayed_message_exchange
docker restart rabbitmq
3. 访问管理界面
地址:http://{服务器IP}:15672
默认账号:guest/guest(仅限本地登录,生产环境需创建新用户)
4. 生产环境建议
- 数据持久化:通过-v参数挂载本地目录,避免容器重启后数据丢失。
- 安全配置:
# 创建新用户并赋予权限
docker exec rabbitmq rabbitmqctl add_user admin admin123
docker exec rabbitmq rabbitmqctl set_user_tags admin administrator
docker exec rabbitmq rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
- 集群部署:通过Docker Compose实现多节点集群,需额外配置镜像队列和负载均衡。
五、Java集成方法
1. 添加Maven依赖
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.16.0</version>
</dependency>
2. 消息生产者示例
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;public class Producer {private final static String QUEUE_NAME = "order_queue";public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");factory.setUsername("admin");factory.setPassword("admin123");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {// 声明持久化队列channel.queueDeclare(QUEUE_NAME, true, false, false, null);// 发送消息String message = "Order ID: 12345";channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println(" [x] Sent '" + message + "'");}}
}
3. 消息消费者示例
import com.rabbitmq.client.*;public class Consumer {private final static String QUEUE_NAME = "order_queue";public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");factory.setUsername("admin");factory.setPassword("admin123");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, true, false, false, null);System.out.println(" [*] Waiting for messages. To exit press CTRL+C");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'");};// 手动ACK确认消息处理完成channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {});}
}
4. 关键配置说明
- 连接池:生产环境建议使用连接池(如com.rabbitmq.http.client.Client)复用连接。
- 消息确认:
- 生产者:启用Publisher Confirms确保消息到达Broker。
- 消费者:设置autoAck=false,手动发送basicAck避免消息丢失。 - 异常处理:捕获ShutdownSignalException等异常,实现重试或告警逻辑。