当前位置: 首页 > news >正文

RocketMQ 的详细使用教程

以下是 RocketMQ 的详细使用教程,涵盖安装配置、基础操作、高级功能及高可用部署等内容:


一、安装与配置

1. 单机模式安装
  1. 下载与解压
    • 从 RocketMQ 官网 下载对应版本,解压后进入 bin 目录15。

示例命令:

unzip rocketmq-all-5.1.4-bin-release.zip
cd rocketmq-5.1.4
  1. 启动服务

NameServer:RocketMQ 的注册中心。

nohup sh bin/mqnamesrv &
tail -f ~/logs/rocketmqlogs/namesrv.log  # 查看启动日志

Broker:消息代理服务。

nohup sh bin/mqbroker -n localhost:9876 &
tail -f ~/logs/rocketmqlogs/broker.log
  1. 验证安装

使用内置工具测试消息收发:

sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer  # 生产者
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer  # 消费者
2. 集群部署(高可用)
  1. 双主双从架构

在两台服务器上分别部署 NameServer 和 Broker,配置文件 broker-master.propertiesbroker-slave.properties,关键参数:

namesrvAddr=192.168.100.43:9876;192.168.100.44:9876  # 多 NameServer 地址
brokerRole=SYNC_MASTER  # 主节点同步复制
flushDiskType=ASYNC_FLUSH  # 异步刷盘

启动命令:

nohup sh bin/mqbroker -c conf/broker-master.properties &
nohup sh bin/mqbroker -c conf/broker-slave.properties &
  1. 配置说明
    • brokerId=0 表示主节点,>0 为从节点5。
    • fileReservedTime=48 设置消息保留时间(小时)5。

二、基础使用

1. 创建 Topic

命令行创建

  • bash复制sh bin/mqadmin createTopic -n localhost:9876 -c DefaultCluster -t testTopic
  • 控制台创建:访问 http://IP:8080/admin,输入默认账号 admin/admin,在主题管理页面创建4。
2. 生产者发送消息
// 初始化生产者
DefaultMQProducer producer = new DefaultMQProducer("producerGroup");
producer.setNamesrvAddr("localhost:9876");
producer.start();// 发送消息
Message msg = new Message("testTopic", "tagA", "Hello RocketMQ".getBytes());
SendResult result = producer.send(msg);
producer.shutdown();
3. 消费者订阅消息
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("testTopic", "*");consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {for (MessageExt msg : msgs) {System.out.println("Received: " + new String(msg.getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();

三、高级功能

1. 事务消息

实现两阶段提交

TransactionMQProducer producer = new TransactionMQProducer("transactionGroup");
producer.setTransactionListener(new TransactionListener() {@Overridepublic LocalTransactionState executeLocalTransaction(Message msg, Object arg) {// 执行本地事务(如数据库操作)return LocalTransactionState.COMMIT_MESSAGE;}@Overridepublic LocalTransactionState checkLocalTransaction(MessageExt msg) {return LocalTransactionState.UNKNOW;}
});
producer.sendMessageInTransaction(msg, null);
2. 延迟消息

支持 18 个延迟级别(1s~2h):

Message msg = new Message("testTopic", "tagA", "Delayed Message".getBytes());
msg.setDelayTimeLevel(3);  // 10秒延迟
producer.send(msg);
3. 消息过滤
  • Tag 过滤:消费者订阅时指定 Tag(如 consumer.subscribe("testTopic", "tagA || tagB"))。
  • SQL 过滤:通过消息属性筛选(需 Broker 配置 enablePropertyFilter=true)。

四、高可用与性能优化

1. 消息存储机制
  • CommitLog:顺序写入消息内容,提升吞吐量(顺序写速度可达 600MB/s)2。
  • ConsumeQueue:逻辑队列,存储消息索引,加速消费2。
2. 刷盘策略
  • 同步刷盘:消息写入磁盘后才返回成功,数据可靠性高,性能较低。
  • 异步刷盘:消息写入 PageCache 后立即返回,性能高,适合高吞吐场景25。
3. 负载均衡
  • 生产者负载均衡:默认轮询发送到不同 Broker 的队列2。
  • 消费者负载均衡:集群模式下自动分配队列,支持 AllocateMessageQueueAveragely(平均分配)和 AllocateMessageQueueByCircle(环状分配)2。

五、监控与管理

1. 控制台功能
  • 实时监控:查看消息堆积、TPS、消费者状态等4。
  • 主题管理:动态创建/删除 Topic,调整队列数量。
  • 日志查看:支持按时间范围检索 Broker 和消费者日志。
2. 日志配置
  • 日志路径:~/logs/rocketmqlogs/,可通过 logback.xml 调整日志级别和格式。

六、常见问题

  1. 消息重复消费
    • 解决方案:消费者实现幂等性(如通过唯一业务 ID 去重)2。
  1. Broker 启动失败
    • 检查端口冲突(默认 NameServer 端口 9876,Broker 端口 10911)5。

总结

RocketMQ 的核心使用流程包括安装部署、Topic 管理、消息生产与消费,高级功能涵盖事务消息、延迟消息和高可用集群配置。实际应用中需根据业务场景选择刷盘策略(同步/异步)和复制方式(同步/异步主从),并结合控制台监控优化性能。更多配置细节可参考 官方文档7。

相关文章:

  • MySQL启动Failed to start LSB: start and stop MySQL
  • JAVA 异常+File
  • 分享4-5月工信部排考计划
  • Altium Designer安装教程
  • LeetCode19.删除链表的倒数第N个节点
  • __call__ 方法
  • 文章记单词 | 第38篇(六级)
  • 《GPT-4.1深度解析:AI进化新标杆,如何重塑行业未来?》
  • 2025年03月中国电子学会青少年软件编程(Python)等级考试试卷(六级)答案 + 解析
  • [python]@staticmethod
  • 【AI提示词】退休规划顾问专家
  • SAP系统中MD01与MD02区别
  • Manus AI:突破多语言手写识别技术壁垒之路
  • 嵌入式设备网络的动态ID分配机制实现
  • yolo系列发展
  • Linux系统编程---多进程
  • Linux 系统编程 day5 进程管道
  • 启动vite项目报Unexpected “\x88“ in JSON
  • FreeFileSync:文件同步对比工具
  • 2025年03月中国电子学会青少年软件编程(Python)等级考试试卷(五级)真题
  • 观察|中日航线加速扩容,航空公司如何抓住机会?
  • 中物联声明:反对美对华物流、海事和造船领域301调查措施
  • 8个月女婴被指受虐后体重仅6斤?潮州警方:未发现虐待,父母有抚养意愿
  • 第六季了,姐姐们还能掀起怎样的风浪
  • 美政府公布1968年罗伯特·肯尼迪遇刺事件档案
  • 九部门:将符合条件的家政从业人员纳入公租房等保障范围