当前位置: 首页 > news >正文

RocketMQ .NET

RocketMQ 是一款由阿里巴巴集团开发并开源给Apache软件基金会的分布式消息及流处理平台。以其高吞吐量、低延迟、高可用性等特点而广受欢迎。支持Java,C++, Python, Go, .NET等。

异步解耦:可以实现上游和下游业务系统的松耦合设计,使得服务部分节点异常不会影响到核心交易系统的正常运转。在电商、金融等分布式系统中,这种解耦设计尤为重要。
削峰填谷:在如秒杀、大促等大型活动中,系统会面临巨大的流量冲击。RocketMQ利用其高性能的消息处理能力,可以有效地应对这种流量冲击,保证系统的稳定运行。
顺序消息:支持顺序消息(分区有序),可以确保消息的先进先出。这在交易系统中的订单创建、支付、退款等流程中尤为重要,因为这些流程对消息的顺序有严格要求。
分布式事务消息:支持分布式事务消息,可以保证分布式事务的强一致性。这在涉及多个服务的分布式系统中非常有用,可以确保数据的一致性和完整性。

RocketMQ优点
高吞吐量和低延迟:能够处理大规模消息流,并提供低延迟的消息传递。这使得它非常适合处理高并发的应用场景,如电子商务和金融交易系统。
可靠性:具有高度可靠的消息传递机制。它支持消息持久化和复制,确保消息不会丢失,并能够在故障发生时进行自动恢复。
分布式扩展:支持水平扩展,可以方便地添加新的消息生产者和消费者来应对负载增加的情况。
易于部署:提供开箱即用的部署方式,非常适合在分布式系统中使用。

RocketMQ架构
在这里插入图片描述
生产者(Producer)
用于产生消息的运行实体,通常集成在业务系统的上游。
主题(Topic)
消息传输和存储的分组容器,内部由多个队列组成。
队列(MessageQueue)
消息传输和存储的实际单元容器,类似于其他消息队列中的分区。
消息(Message)
RocketMQ 的最小传输单元,具备不可变性。
消费者分组(ConsumerGroup)
发布订阅模型中定义的独立的消费身份分组,用于统一管理多个消费者。
消费者(Consumer)
消费消息的运行实体,集成在业务系统的下游。
订阅关系(Subscription)
发布订阅模型中消息过滤、重试、消费进度的规则配置。

部署RocketMQ(docker)

安装 Docker:

#debian
 curl -fsSL https://get.docker.com -o get-docker.sh
 sudo sh get-docker.sh

拉取 RocketMQ 镜像:

使用以下命令从 Docker Hub 拉取最新的 RocketMQ 镜像:

docker pull apache/rocketmq:latest

启动 RocketMQ NameServer:

RocketMQ 的 NameServer 是负责管理所有 Broker 节点的目录服务。可以使用以下命令启动 NameServer:

docker run -d --name rmqnamesrv -p 9876:9876 apache/rocketmq:latest sh mqnamesrv
```bash
## 启动 RocketMQ Broker:
RocketMQ 的 Broker 负责存储消息并处理生产者和消费者的请求。使用以下命令启动 Broker:


```bash
docker run -d --name rmqbroker --link rmqnamesrv:namesrv -e "NAMESRV_ADDR=namesrv:9876"   -e "BROKER_NAME=broker-a"    -e "BROKER_ID=0"    -e "AUTO_CREATE_TOPIC_ENABLE=true"    -e "AUTO_CREATE_SUBSCRIPTION_GROUP=true"    -p 10911:10911 -p 10909:10909 apache/rocketmq:latest sh mqbroker

验证
查看 NameServer 日志:

docker logs -f rmqnamesrv

查看 Broker 日志:

docker logs -f rmqbroker

使用 RocketMQ Console:

如果需要可视化管理 RocketMQ,可以运行 RocketMQ Console:

docker pull styletang/rocketmq-console-ng
docker run -d -p 8080:8080 --link rmqnamesrv:namesrv -e "JAVA_OPTS=-Drocketmq.namesrv.addr=namesrv:9876" styletang/rocketmq-console-ng

在这里插入图片描述
在这里插入图片描述

总结

# 拉取 RocketMQ 镜像
docker pull apache/rocketmq:latest

# 启动 NameServer
docker run -d --name rmqnamesrv -p 9876:9876 apache/rocketmq:latest sh mqnamesrv

# 启动 Broker
docker run -d --name rmqbroker --link rmqnamesrv:namesrv -e "NAMESRV_ADDR=namesrv:9876" -p 10911:10911 -p 10909:10909 apache/rocketmq:latest sh mqbroker

# 启动 RocketMQ Console
docker pull styletang/rocketmq-console-ng
docker run -d -p 8080:8080 --link rmqnamesrv:namesrv -e "JAVA_OPTS=-Drocketmq.namesrv.addr=namesrv:9876" styletang/rocketmq-console-ng

如果是外网环境
run Broker 外网IP

mkdir -p /usr/data/rocketMQ/data/broker/logs
mkdir -p /usr/data/rocketMQ/data/broker/store
mkdir -p /usr/data/rocketMQ/data/broker/conf/

broket.conf

brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
namesrvAddr = 外网ip:9876
brokerIP1 = 外网ip
autoCreateTopicEnable=true
docker run -p 0.0.0.0:10911:10911 -p 0.0.0.0:10909:10909 -d -v /usr/data/rocketMQ/data/broker/logs:/root/logs -v /usr/data/rocketMQ/data/broker/store:/root/store -v /usr/data/rocketMQ/data/broker/conf/broker.conf:/opt/rocketmq/conf/broker.conf --name rmqbroker --link rmqnamesrv:namesrv -e "NAMESRV_ADDR=namesrv:9876" -e "MAX_POSSIBLE_HEAP=200000000" -e "autoCreateTopicEnable=true" rocketmqinc/rocketmq sh mqbroker -c /opt/rocketmq/conf/broker.conf

在这里插入图片描述

.NET RocketMQ

NuGet NewLife.RocketMQ

在这里插入图片描述

Producer

	//main
   XTrace.UseConsole();

   var producer = new Producer
   {
       NameServerAddress = "x.x.x.x:9876",
       Topic = "t0529"
   };

   try
   {
       producer.Start();

       // 发送一条测试消息,以确保 Topic 被创建
       for (var i = 0; i < 10; i++)
       {
           var str = "mqm" + i;
           //var str = Rand.NextString(1337);

           var sr = producer.Publish(str, "TagA");
       }
   }
   catch (Exception ex)
   {
       Console.WriteLine($"Exception: {ex.Message}");
   }
   finally
   {
       producer.Stop();
   }

在这里插入图片描述

Consumer

 var consumer = new Consumer
 {
     Topic = "t0529",
     Group = "test",
     NameServerAddress = "x.x.x.x:9876",

     FromLastOffset = false,
     //SkipOverStoredMsgCount = 0,
     //BatchSize = 20,

     Log = XTrace.Log,
     ClientLog = XTrace.Log,
 };

 consumer.OnConsume = OnConsume;

 consumer.Configure(MqSetting.Current);
 consumer.Start();

 _consumer = consumer;

在这里插入图片描述

相关文章:

  • 华为校招机试 - LRU模拟(20240515)
  • mysql中InnoDB的统计数据
  • oracle tree
  • 数据网络理论基础 第六章 流量和拥塞控制
  • Rabbitmq 搭建使用案例 [附源码]
  • C语言面试题1-10
  • 油烟净化器清新餐饮生活,助力打造绿色餐饮
  • 【渗透测试】|文件上传
  • 动态规划part03 Day43
  • 网络请求客户端WebClient的使用
  • 【Linux】Socket中的心跳机制(心跳包)
  • C语言数据结构堆排序、向上调整和向下调整的时间复杂度的计算、TopK问题等的介绍
  • redis数据类型之Hash,Bitmaps
  • P3128 [USACO15DEC] Max Flow P题解(树上差分,最近公共祖先,图论)
  • HDR视频相关标准-HDR vivid(二)
  • WordPress外贸网站建设的成功要素与技术点
  • 摩尔线程MTT S4000 AI GPU助力30亿参数大模型训练,性能比肩英伟达同类解决方案
  • Go语言GoFly框架快速新增接口/上手写代码
  • 23种设计模式之一— — — —装饰模式详细介绍与讲解
  • Java final关键字
  • 脱发后怎么把头发养回来?脱发自救指南来了
  • 第二十届华表奖提名名单公布,张译、王一博、马丽、郭帆等入围
  • 传媒湃︱《金陵晚报》副刊“雨花石”5月起改为免费刊登
  • 长三角数智文化产业基金意向签约会成功举办
  • 女子隐私被“上墙”莫名遭网暴,网警揪出始作俑者
  • 深圳大学传播学院院长巢乃鹏已任深圳大学副校长