RocketMQ 的详细使用教程
以下是 RocketMQ 的详细使用教程,涵盖安装配置、基础操作、高级功能及高可用部署等内容:
一、安装与配置
1. 单机模式安装
- 下载与解压
-
- 从 RocketMQ 官网 下载对应版本,解压后进入
bin
目录15。
- 从 RocketMQ 官网 下载对应版本,解压后进入
示例命令:
unzip rocketmq-all-5.1.4-bin-release.zip
cd rocketmq-5.1.4
- 启动服务
NameServer:RocketMQ 的注册中心。
nohup sh bin/mqnamesrv &
tail -f ~/logs/rocketmqlogs/namesrv.log # 查看启动日志
Broker:消息代理服务。
nohup sh bin/mqbroker -n localhost:9876 &
tail -f ~/logs/rocketmqlogs/broker.log
- 验证安装
使用内置工具测试消息收发:
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer # 生产者
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer # 消费者
2. 集群部署(高可用)
- 双主双从架构
在两台服务器上分别部署 NameServer 和 Broker,配置文件 broker-master.properties
和 broker-slave.properties
,关键参数:
namesrvAddr=192.168.100.43:9876;192.168.100.44:9876 # 多 NameServer 地址
brokerRole=SYNC_MASTER # 主节点同步复制
flushDiskType=ASYNC_FLUSH # 异步刷盘
启动命令:
nohup sh bin/mqbroker -c conf/broker-master.properties &
nohup sh bin/mqbroker -c conf/broker-slave.properties &
- 配置说明
-
brokerId=0
表示主节点,>0
为从节点5。fileReservedTime=48
设置消息保留时间(小时)5。
二、基础使用
1. 创建 Topic
命令行创建:
- bash复制sh bin/mqadmin createTopic -n localhost:9876 -c DefaultCluster -t testTopic
- 控制台创建:访问
http://IP:8080/admin
,输入默认账号admin/admin
,在主题管理页面创建4。
2. 生产者发送消息
// 初始化生产者
DefaultMQProducer producer = new DefaultMQProducer("producerGroup");
producer.setNamesrvAddr("localhost:9876");
producer.start();// 发送消息
Message msg = new Message("testTopic", "tagA", "Hello RocketMQ".getBytes());
SendResult result = producer.send(msg);
producer.shutdown();
3. 消费者订阅消息
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("testTopic", "*");consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {for (MessageExt msg : msgs) {System.out.println("Received: " + new String(msg.getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
三、高级功能
1. 事务消息
实现两阶段提交:
TransactionMQProducer producer = new TransactionMQProducer("transactionGroup");
producer.setTransactionListener(new TransactionListener() {@Overridepublic LocalTransactionState executeLocalTransaction(Message msg, Object arg) {// 执行本地事务(如数据库操作)return LocalTransactionState.COMMIT_MESSAGE;}@Overridepublic LocalTransactionState checkLocalTransaction(MessageExt msg) {return LocalTransactionState.UNKNOW;}
});
producer.sendMessageInTransaction(msg, null);
2. 延迟消息
支持 18 个延迟级别(1s~2h):
Message msg = new Message("testTopic", "tagA", "Delayed Message".getBytes());
msg.setDelayTimeLevel(3); // 10秒延迟
producer.send(msg);
3. 消息过滤
- Tag 过滤:消费者订阅时指定 Tag(如
consumer.subscribe("testTopic", "tagA || tagB")
)。 - SQL 过滤:通过消息属性筛选(需 Broker 配置
enablePropertyFilter=true
)。
四、高可用与性能优化
1. 消息存储机制
- CommitLog:顺序写入消息内容,提升吞吐量(顺序写速度可达 600MB/s)2。
- ConsumeQueue:逻辑队列,存储消息索引,加速消费2。
2. 刷盘策略
- 同步刷盘:消息写入磁盘后才返回成功,数据可靠性高,性能较低。
- 异步刷盘:消息写入 PageCache 后立即返回,性能高,适合高吞吐场景25。
3. 负载均衡
- 生产者负载均衡:默认轮询发送到不同 Broker 的队列2。
- 消费者负载均衡:集群模式下自动分配队列,支持
AllocateMessageQueueAveragely
(平均分配)和AllocateMessageQueueByCircle
(环状分配)2。
五、监控与管理
1. 控制台功能
- 实时监控:查看消息堆积、TPS、消费者状态等4。
- 主题管理:动态创建/删除 Topic,调整队列数量。
- 日志查看:支持按时间范围检索 Broker 和消费者日志。
2. 日志配置
- 日志路径:
~/logs/rocketmqlogs/
,可通过logback.xml
调整日志级别和格式。
六、常见问题
- 消息重复消费
-
- 解决方案:消费者实现幂等性(如通过唯一业务 ID 去重)2。
- Broker 启动失败
-
- 检查端口冲突(默认 NameServer 端口
9876
,Broker 端口10911
)5。
- 检查端口冲突(默认 NameServer 端口
总结
RocketMQ 的核心使用流程包括安装部署、Topic 管理、消息生产与消费,高级功能涵盖事务消息、延迟消息和高可用集群配置。实际应用中需根据业务场景选择刷盘策略(同步/异步)和复制方式(同步/异步主从),并结合控制台监控优化性能。更多配置细节可参考 官方文档7。