当前位置: 首页 > news >正文

2.RabbitMQ - 入门

RabbitMQ 入门

文章目录

  • RabbitMQ 入门
  • 一、快速入门
    • 1.1 介绍
    • 1.2 创建项目
    • 1.3 简单入门
  • 二、Work模型
  • 三、交换机
    • 3.1 Fanout
    • 3.2 Direct
    • 3.3 Topic
  • 四、声明队列和交换机
    • 4.1 配置文件
    • 4.2 注解
  • 五、消息转换器

一、快速入门

1.1 介绍

官方的API较为麻烦,我们使用官方推荐的Spring AMQP客户端

RabbitMQ tutorial - “Hello World!” | RabbitMQ 是使用Java完成HelloWord的示例

Spring AMQP客户端实在Java客户端的基础上做了一层封装,让我们使用RabbitMQ变得更加的简单

image-20240511092119183

其中客户端Spring AMQP客户端:

image-20240511092320815

AMQP和Spring AMQP的介绍如下所示

我们使用Spring AMQP的时候,自动使用了AMQP协议

image-20240511092655082

Spring AMQP的官网地址:Spring AMQP

1.2 创建项目

创建一个聚合工程

image-20240511094937051

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>cn.itcast.demo</groupId><artifactId>mq-demo</artifactId><version>1.0-SNAPSHOT</version><modules><module>publisher</module><module>consumer</module></modules><packaging>pom</packaging><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.12</version><relativePath/></parent><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target></properties><dependencies><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><!--AMQP依赖,包含RabbitMQ--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!--单元测试--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency><!--Jackson--><dependency><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-xml</artifactId></dependency></dependencies>
</project>

1.3 简单入门

我们先实现一个简单的:publisher向queue发送消息,consumer从queue中获取消息,先省去一个交换机

需求如下

  • 利用控制台创建队列simple.queue

    image-20240511100732002

  • 在publisher服务中,利用SpringAMQP直接向simple.queue发送消息

1、父工程中引入spring-amqp依赖,这样publisher和consumer服务都可以使用

<!--AMQP依赖,包含RabbitMQ-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2、在每个微服务里面引入MQ服务端信息,连接到微服务

virtual-host虚拟主机,username用户名, password密码

logging:pattern:dateformat: MM-dd HH:mm:ss:SSS
spring:rabbitmq:host: 127.0.0.1port: 5672virtual-host: /testhostusername: testallpassword: guest

3、发送消息

SpringAMQP提供了RabbitTemplate工具类,方便我们发送消息。

@Autowired
private RabbitTemplate rabbitTemplate;@Test
public void testSimpleQueue(){// 队列名称String queueName = "simple.queue";//消息String message = "hello,simple.queue";//发送消息rabbitTemplate.convertAndSend(queueName,message);
}

4、控制台结果

image-20240511101655789

消息内容如下图所示

image-20240511101726913

  • 在consumer服务中,利用SpringAMQP编写消费者,监听simple.queue队列

1、监听队列,获取消息

@Slf4j
@Component
public class MqListener {//这个注解是监听哪个队列,queues是监听队列的名字@RabbitListener(queues = "simple.queue")public void listenSimpleQueue(String msg){System.out.println("消费者收到了simple.queue的消息:【" + msg +"】");}
}

2、启动本微服务查看结果

image-20240511140308141

image-20240511095500723

二、Work模型

Work Queues,任务模型,简单来说就是让多个消费者绑定到一个队列,共同消费队列中的消息。

其实就是一种用法而已,没有太高大上

基本思路如下

  1. 在RabbitMQ控制台创建一个队列,名为work.queue

image-20240511142328747

  1. 在publiser服务中定义测试方法,在1s内产生50条消息,发送到work.queue队列
@Autowired
private RabbitTemplate rabbitTemplate;@Test
void testWorkQueue() throws InterruptedException {String queueName = "work.queue";for (int i = 1; i <= 50; i++) {String msg = "hello, worker, message_" + i;rabbitTemplate.convertAndSend(queueName, msg);Thread.sleep(20);}
}
  1. 在consumer服务中定义两个消息监听者,都监听work.queue队列

  2. 消费者1每秒处理50条消息,消费者2每秒处理5条消息

@RabbitListener(queues = "work.queue")
public void listenWorkQueue1(String msg) throws InterruptedException {System.out.println("消费者1 收到了 work.queue的消息:【" + msg +"】");// Thread.sleep(20);
}@RabbitListener(queues = "work.queue")
public void listenWorkQueue2(String msg) throws InterruptedException {System.err.println("消费者2 收到了 work.queue的消息...... :【" + msg +"】");// Thread.sleep(200);
}
  1. 效果图

下面的效果图没截取全,下面接受消息的总数是50

image-20240511173052060

之后我们将注释打开,一个消费者睡眠20ms,一个消费者睡眠200ms,效果图,依然是一人一个,一人一半,所以说投递消息的时候并没有考虑到消费者的消费能力

image-20240511173149702

  1. 结论
  • 向一个队列发送消息,这个队列有两个消费者,那这同一条消息只会被消费一次

  • 默认情况下,如果队列中有多个消费者,那么在队列投递消息时,会采用一种轮训的机制,一个消费者一条

  • 可以在consumer工程中添加listener.simple.prefetch=1参数,每次只能回去一条消息,处理完成才能获取下一个消息

假如消费者处理消息处理不过来,消息就会在队列中进行积累,出现消息堆积问题,此时我们应该加快消息的消费速度,比如同一个队列绑定多个消费者,但是为了充分利用消费者的性能,或选择添加下列的prefetch参数

logging:pattern:dateformat: MM-dd HH:mm:ss:SSS
spring:rabbitmq:host: 127.0.0.1port: 5672virtual-host: /testhostusername: testallpassword: guestlistener:simple:prefetch: 1

效果图

image-20240511173409534

三、交换机

真正的生产环境都会经过exchange发送消息,而不是直接发送到队列

  • Fanout:广播
  • Direct:定向
  • Topic:话题

交换机本身具备一些路由功能,而我们实际业务需求往往是需要去做路由的

image-20240511174513683

3.1 Fanout

Fanout Exchange将接收到的消息广播到每一个跟其绑定的queue,所示叫广播模式

  • 接收publisher发送的消息
  • 将消息按照规则路由到与之绑定的队列
  • FanoutExchange的会将消息路由到每个绑定的队列

image-20240511174639379

实现思路如下所示

  1. 在RabbitMQ控制台,声明队列fanout.queue1和fanout.queue2

image-20240514085409051

  1. 在RabbitMQ控制台中,声明交换机hmall.fanout,将两个队列与其绑定

注意,声明队列的时候记得要选择队列的类型

image-20240514085623118

下图所示进行绑定

image-20240514090041526

  1. 在consumer服务中,编写两个消费者方法,分别监听fanout.queue1和fanout.queue2
@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) throws InterruptedException {System.out.println("消费者1 收到了 fanout.queue1的消息:【" + msg +"】");
}@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String msg) throws InterruptedException {System.out.println("消费者2 收到了 fanout.queue2的消息:【" + msg +"】");
}
  1. 在publish中编写测试方法,向hmall.fanout发送消息
@Test
void testSendFanout() {String exchangeName = "hmall.fanout";String msg = "hello, everyone!";rabbitTemplate.convertAndSend(exchangeName, null, msg);
}
  1. 实现效果图

image-20240514090521259

3.2 Direct

目前希望发送的消息不是所有人都接收到,目前场景下我们会用到Direct交换机

比如,支付成功后,就要改变订单状态为已支付,支付失败的话需要将支将订单状态改为支付失败或者取消

对于交易服务,不论成功与失败,都需要给交易服务发送请求,进而修改订单状态

支付成功可能需要给用户发送短信通知,但是支付失败可能就不需要给用户发送短信通知了

相当于通知服务只监听支付成功的消息,而交易服务要监听支付成功、支付失败的消息

Direct Exchange会将接收到的消息根据规则路由到指定的Queue,因此称为定向路由

  • 每一个Queue都与Exchange设置一个BindingKey
  • 发布者发送消息时,指定消息的RoutingKey
  • Exchange将消息路由到BindingKey与消息RoutingKey一致的队列

image-20240514092038512

需求如下

  1. 在RabbitMQ控制台中,声明队列direct.queue1和direct.queue2

image-20240514092430115

  1. 在RabbitMQ控制台中,声明交换机hmall.direct,将两个队列与其绑定

image-20240514092525950

如下图所示,绑定的时候一次不能绑定两个key,所以一个队列有两个key的话,我们需要多次添加

image-20240514092656663

image-20240514092817730

  1. 在consumer服务中,编写两个消费者方法,分别监听direct.queue1和direct.queue2
@RabbitListener(queues = "direct.queue1")
public void listenDirectQueue1(String msg) throws InterruptedException {System.out.println("消费者1 收到了 direct.queue1的消息:【" + msg +"】");
}@RabbitListener(queues = "direct.queue2")
public void listenDirectQueue2(String msg) throws InterruptedException {System.out.println("消费者2 收到了 direct.queue2的消息:【" + msg +"】");
}
  1. 在publisher中编写测试方法,利用不同的RoutingKey向hmall.direct发送消息
@Test
void testSendDirect() {String exchangeName = "hmall.direct";String msg = "蓝色通知,警报解除,哥斯拉是放的气球";rabbitTemplate.convertAndSend(exchangeName, "blue", msg);
}
  1. 效果图

确实只有队列1收到

image-20240514093343102

3.3 Topic

Topic交换机和Direct交换机非常的像,都是通过RoutingKey来控制消息转发给谁,路由给谁

区别在于,Topic交换机的RoutingKey可以使多个单词的列表并且以“.”分割

china.news 代表有中国的新闻消息

china.weather 代表中国的天气消息

japan.news 则代表日本新闻

japan.weather 代表日本的天气消息

Queue与Exchange指定BindingKey时可以使用通配符

  • #:代指0个活多个单词
  • *:代指一个单词

image-20240514095244524

需求如下所示

image-20240514095405700

  1. 在RabbitMQ控制台中,声明队列topic.queue1和topic.queue2

image-20240514095741739

  1. 在RabbitMQ控制台中,声明交换机hmall.topic,将两个队列与其绑定

image-20240514095803832

image-20240514095919843

  1. 在consumer服务中,编写两个消费者方法,分别监听topic.queue1和topic.queue2
@RabbitListener(queues = "topic.queue1")
public void listenTopicQueue1(String msg) throws InterruptedException {System.out.println("消费者1 收到了 topic.queue1的消息:【" + msg +"】");
}@RabbitListener(queues = "topic.queue2")
public void listenTopicQueue2(String msg) throws InterruptedException {System.out.println("消费者2 收到了 topic.queue2的消息:【" + msg +"】");
}
  1. 在publisher中编写测试方法,利用不同的RoutingKey向hmall.topic发送消息
@Test
void testSendTopic() {String exchangeName = "hmall.topic";String msg = "今天天气挺不错,我的心情的挺好的";rabbitTemplate.convertAndSend(exchangeName, "china.weather", msg);rabbitTemplate.convertAndSend(exchangeName, "china.news", "中国的新闻");rabbitTemplate.convertAndSend(exchangeName, "japan.news", "日本新闻");
}
  1. 效果图

image-20240514100223246

四、声明队列和交换机

在控制台创建队列和交换机非常容易出错、效率极低,实际应该采用Java代码进行创建

SpringAMQP提供了几个类,用来声明队列、交换机及其绑定关系

  • Queue:用于声明队列,可以用工厂类QueueBuilder构建
  • Exchange:用于声明交换机,可以用工厂类ExchangeBuilder构建

因为交换机有许多的类型,所以ExchangeBuilder有多个实现类

image-20240514101059146

  • Binding:用于声明队列和交换机的绑定关系,可以用工厂类BindingBuilder构建

4.1 配置文件

在消费者方面声明队列和Fanout类型交换机

@Configuration
public class FanoutConfiguration {//声明FanoutExchange交换机import org.springframework.amqp.core.FanoutExchange;@Beanpublic FanoutExchange fanoutExchange() {// ExchangeBuilder.fanoutExchange("").build();return new FanoutExchange("hmall.fanout2");}//声明队列import org.springframework.amqp.core.Queue;@Beanpublic Queue fanoutQueue3() {// QueueBuilder.durable("ff").build(); durable是持久化,当前的队列是持久的队列 与下面new的形式相同return new Queue("fanout.queue3");}//将上面声明的队列和交换机绑定@Beanpublic Binding fanoutBinding3(Queue fanoutQueue3, FanoutExchange fanoutExchange) {//将哪个队列绑定到哪个交换机return BindingBuilder.bind(fanoutQueue3).to(fanoutExchange);}//声明队列@Beanpublic Queue fanoutQueue4() {return new Queue("fanout.queue4");}//绑定队列和交换机@Beanpublic Binding fanoutBinding4() {//将哪个队列绑定到哪个交换机//下面是直接调用了队列和交换机的方法进行绑定的//凡是加了Bean的方法都会被动态代理,当我们调用方法时,Spring首先会检查Spring容器中是否有对应的Bean//如果有的话,就不会执行方法中的任何内容,直接从容器中取出Bean对象即可,//如果没有,便执行方法注入Bean即可//所以说这里虽然是直接调用了fanoutQueue4()和fanoutExchange()方法,但是这两个方法也要有@Bean注解return BindingBuilder.bind(fanoutQueue4()).to(fanoutExchange());}
}

假如是Direct类型的交换机的话,就显得非常的麻烦

@Configuration
public class DirectConfiguration {//交换机@Beanpublic DirectExchange directExchange(){return new DirectExchange("hmall.direct");}//队列@Beanpublic Queue directQueue1(){return new Queue("direct.queue1");}//绑定hmall.direct交换机与directQueue1队列@Beanpublic Binding directQueue1BindingRed(Queue directQueue1, DirectExchange directExchange){return BindingBuilder.bind(directQueue1).to(directExchange).with("red");}//绑定hmall.direct交换机与directQueue1队列@Beanpublic Binding directQueue1BindingBlue(Queue directQueue1, DirectExchange directExchange){return BindingBuilder.bind(directQueue1).to(directExchange).with("blue");}@Beanpublic Queue directQueue2(){return new Queue("direct.queue2");}@Beanpublic Binding directQueue2BindingRed(Queue directQueue2, DirectExchange directExchange){return BindingBuilder.bind(directQueue2).to(directExchange).with("red");}@Beanpublic Binding directQueue2BindingBlue(Queue directQueue2, DirectExchange directExchange){return BindingBuilder.bind(directQueue2).to(directExchange).with("yellow");}}

4.2 注解

如下图所示,Direct交换机形式

@Slf4j
@Component
public class MqListener {@RabbitListener(bindings = @QueueBinding(//队列value = @Queue(name = "direct.queue1", durable = "true"),//交换机exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),//Routing keykey = {"red", "blue"}))public void listenDirectQueue1(String msg) throws InterruptedException {System.out.println("消费者1 收到了 direct.queue1的消息:【" + msg + "】");}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue2", durable = "true"),exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),key = {"red", "yellow"}))public void listenDirectQueue2(String msg) throws InterruptedException {System.out.println("消费者2 收到了 direct.queue2的消息:【" + msg + "】");}}

五、消息转换器

需求:测试利用SpringAMQP发送对象类型的消息

  1. 声明一个名为object.queue的队列

image-20240514110035568

  1. 编写测试单元,向队列中直接发送一条消息,消息类型为Map
    @Testvoid testSendObject() {Map<String, Object> msg = new HashMap<>(2);msg.put("name", "jack");msg.put("age", 21);//我们发送的是一个对象,Spring在接受这个对象的时候会将其转换成字节的形似rabbitTemplate.convertAndSend("object.queue", msg);}
  1. 在控制台查看消息

下种情况是因为Spring的对消息对象的处理是由org.springframework.amqp.support.converter.MessageConverter来处理的。而默认实现是SimpleMessageConverter,基于JDK的ObjectOutputStream完成序列化。

image-20240514110435176

  1. 建议采用JSON序列化替换默认的JDK序列化

  2. 在publisher和consumer中引入Jackson依赖

<!--Jackson-->
<dependency><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-xml</artifactId>
</dependency>
  1. 在publisher和consumer中都要配置MessageConverter

发布和接收都要有这段代码

@Bean
public MessageConverter jacksonMessageConvertor(){//import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;return new Jackson2JsonMessageConverter();
}
  1. 效果图

image-20240514111740634

相关文章:

  • 【KWDB 创作者计划】_深度学习篇---归一化反归一化
  • MineWorld,微软研究院开源的实时交互式世界模型
  • 【Ubuntu】关于系统分区、挂载点、安装位置的一些基本信息
  • 新品发布 | 6 秒全谱成像,VIX-N320 内置推扫式高光谱相机重磅发布
  • 容器化-Docker-进阶
  • 【PCB工艺】运放电路中的负反馈机制
  • (19)VTK C++开发示例 --- 分隔文本读取器
  • 【⼆分查找】⼆分查找(easy)
  • 基于cubeMX的hal库STM32实现MQ2烟雾浓度检测
  • ZLMediaKit支持JT1078实时音视频
  • 深度学习--ResNet残差神经网络解析
  • 配置 Apache 的 HTTPS
  • 四川气象数据智能体示范应用入围中国信通院“开源大模型+”案例
  • jmeter中监控服务器ServerAgent
  • ctfhub-RCE
  • 用 C++ 模拟 Axios 的 then 方法处理异步网络请求
  • 深入探究Linux项目自动化构建工具:make与Makefile
  • RK3588 Buildroot 动态变更logo
  • 【数据可视化-24】巧克力销售数据的多维度可视化分析
  • 2025.04.23【Treemap】树状图数据可视化指南
  • 湖南永州公安全面推行“项目警官制”,为重点项目建设护航
  • 从“龙队”到“龙副”,国乒这批退役球员为何不爱当教练了
  • 中国和阿塞拜疆签署互免签证协定
  • 乌克兰关切有中国公司帮助俄罗斯制造军事硬件,外交部:坚决反对无端指责
  • 外交部答澎湃:愿同阿曼在国际和地区事务中加强沟通协调
  • 王毅将出席中国一中亚外长第六次会晤、金砖国家外长会晤和第十五次金砖国家安全事务高级代表会议