2.RabbitMQ - 入门
RabbitMQ 入门
文章目录
- RabbitMQ 入门
- 一、快速入门
- 1.1 介绍
- 1.2 创建项目
- 1.3 简单入门
- 二、Work模型
- 三、交换机
- 3.1 Fanout
- 3.2 Direct
- 3.3 Topic
- 四、声明队列和交换机
- 4.1 配置文件
- 4.2 注解
- 五、消息转换器
一、快速入门
1.1 介绍
官方的API较为麻烦,我们使用官方推荐的Spring AMQP客户端
RabbitMQ tutorial - “Hello World!” | RabbitMQ 是使用Java完成HelloWord的示例
Spring AMQP客户端实在Java客户端的基础上做了一层封装,让我们使用RabbitMQ变得更加的简单
其中客户端Spring AMQP客户端:
AMQP和Spring AMQP的介绍如下所示:
我们使用Spring AMQP的时候,自动使用了AMQP协议
Spring AMQP的官网地址:Spring AMQP
1.2 创建项目
创建一个聚合工程
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>cn.itcast.demo</groupId><artifactId>mq-demo</artifactId><version>1.0-SNAPSHOT</version><modules><module>publisher</module><module>consumer</module></modules><packaging>pom</packaging><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.12</version><relativePath/></parent><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target></properties><dependencies><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><!--AMQP依赖,包含RabbitMQ--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!--单元测试--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency><!--Jackson--><dependency><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-xml</artifactId></dependency></dependencies>
</project>
1.3 简单入门
我们先实现一个简单的:publisher向queue发送消息,consumer从queue中获取消息,先省去一个交换机
需求如下:
-
利用控制台创建队列simple.queue
-
在publisher服务中,利用SpringAMQP直接向simple.queue发送消息
1、父工程中引入spring-amqp依赖,这样publisher和consumer服务都可以使用
<!--AMQP依赖,包含RabbitMQ-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2、在每个微服务里面引入MQ服务端信息,连接到微服务
virtual-host虚拟主机,username用户名, password密码
logging:pattern:dateformat: MM-dd HH:mm:ss:SSS
spring:rabbitmq:host: 127.0.0.1port: 5672virtual-host: /testhostusername: testallpassword: guest
3、发送消息
SpringAMQP提供了RabbitTemplate工具类,方便我们发送消息。
@Autowired
private RabbitTemplate rabbitTemplate;@Test
public void testSimpleQueue(){// 队列名称String queueName = "simple.queue";//消息String message = "hello,simple.queue";//发送消息rabbitTemplate.convertAndSend(queueName,message);
}
4、控制台结果
消息内容如下图所示
- 在consumer服务中,利用SpringAMQP编写消费者,监听simple.queue队列
1、监听队列,获取消息
@Slf4j
@Component
public class MqListener {//这个注解是监听哪个队列,queues是监听队列的名字@RabbitListener(queues = "simple.queue")public void listenSimpleQueue(String msg){System.out.println("消费者收到了simple.queue的消息:【" + msg +"】");}
}
2、启动本微服务查看结果
二、Work模型
Work Queues,任务模型,简单来说就是让多个消费者绑定到一个队列,共同消费队列中的消息。
其实就是一种用法而已,没有太高大上
基本思路如下:
- 在RabbitMQ控制台创建一个队列,名为work.queue
- 在publiser服务中定义测试方法,在1s内产生50条消息,发送到work.queue队列
@Autowired
private RabbitTemplate rabbitTemplate;@Test
void testWorkQueue() throws InterruptedException {String queueName = "work.queue";for (int i = 1; i <= 50; i++) {String msg = "hello, worker, message_" + i;rabbitTemplate.convertAndSend(queueName, msg);Thread.sleep(20);}
}
-
在consumer服务中定义两个消息监听者,都监听work.queue队列
-
消费者1每秒处理50条消息,消费者2每秒处理5条消息
@RabbitListener(queues = "work.queue")
public void listenWorkQueue1(String msg) throws InterruptedException {System.out.println("消费者1 收到了 work.queue的消息:【" + msg +"】");// Thread.sleep(20);
}@RabbitListener(queues = "work.queue")
public void listenWorkQueue2(String msg) throws InterruptedException {System.err.println("消费者2 收到了 work.queue的消息...... :【" + msg +"】");// Thread.sleep(200);
}
- 效果图
下面的效果图没截取全,下面接受消息的总数是50
之后我们将注释打开,一个消费者睡眠20ms,一个消费者睡眠200ms,效果图,依然是一人一个,一人一半,所以说投递消息的时候并没有考虑到消费者的消费能力
- 结论
-
向一个队列发送消息,这个队列有两个消费者,那这同一条消息只会被消费一次
-
默认情况下,如果队列中有多个消费者,那么在队列投递消息时,会采用一种轮训的机制,一个消费者一条
-
可以在consumer工程中添加listener.simple.prefetch=1参数,每次只能回去一条消息,处理完成才能获取下一个消息
假如消费者处理消息处理不过来,消息就会在队列中进行积累,出现消息堆积问题,此时我们应该加快消息的消费速度,比如同一个队列绑定多个消费者,但是为了充分利用消费者的性能,或选择添加下列的prefetch参数
logging:pattern:dateformat: MM-dd HH:mm:ss:SSS
spring:rabbitmq:host: 127.0.0.1port: 5672virtual-host: /testhostusername: testallpassword: guestlistener:simple:prefetch: 1
效果图
三、交换机
真正的生产环境都会经过exchange发送消息,而不是直接发送到队列
- Fanout:广播
- Direct:定向
- Topic:话题
交换机本身具备一些路由功能,而我们实际业务需求往往是需要去做路由的
3.1 Fanout
Fanout Exchange将接收到的消息广播到每一个跟其绑定的queue,所示叫广播模式
- 接收publisher发送的消息
- 将消息按照规则路由到与之绑定的队列
- FanoutExchange的会将消息路由到每个绑定的队列
实现思路如下所示
- 在RabbitMQ控制台,声明队列fanout.queue1和fanout.queue2
- 在RabbitMQ控制台中,声明交换机hmall.fanout,将两个队列与其绑定
注意,声明队列的时候记得要选择队列的类型
下图所示进行绑定
- 在consumer服务中,编写两个消费者方法,分别监听fanout.queue1和fanout.queue2
@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) throws InterruptedException {System.out.println("消费者1 收到了 fanout.queue1的消息:【" + msg +"】");
}@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String msg) throws InterruptedException {System.out.println("消费者2 收到了 fanout.queue2的消息:【" + msg +"】");
}
- 在publish中编写测试方法,向hmall.fanout发送消息
@Test
void testSendFanout() {String exchangeName = "hmall.fanout";String msg = "hello, everyone!";rabbitTemplate.convertAndSend(exchangeName, null, msg);
}
- 实现效果图
3.2 Direct
目前希望发送的消息不是所有人都接收到,目前场景下我们会用到Direct交换机
比如,支付成功后,就要改变订单状态为已支付,支付失败的话需要将支将订单状态改为支付失败或者取消
对于交易服务,不论成功与失败,都需要给交易服务发送请求,进而修改订单状态
支付成功可能需要给用户发送短信通知,但是支付失败可能就不需要给用户发送短信通知了
相当于通知服务只监听支付成功的消息,而交易服务要监听支付成功、支付失败的消息
Direct Exchange会将接收到的消息根据规则路由到指定的Queue,因此称为定向路由
- 每一个Queue都与Exchange设置一个BindingKey
- 发布者发送消息时,指定消息的RoutingKey
- Exchange将消息路由到BindingKey与消息RoutingKey一致的队列
需求如下:
- 在RabbitMQ控制台中,声明队列direct.queue1和direct.queue2
- 在RabbitMQ控制台中,声明交换机hmall.direct,将两个队列与其绑定
如下图所示,绑定的时候一次不能绑定两个key,所以一个队列有两个key的话,我们需要多次添加
- 在consumer服务中,编写两个消费者方法,分别监听direct.queue1和direct.queue2
@RabbitListener(queues = "direct.queue1")
public void listenDirectQueue1(String msg) throws InterruptedException {System.out.println("消费者1 收到了 direct.queue1的消息:【" + msg +"】");
}@RabbitListener(queues = "direct.queue2")
public void listenDirectQueue2(String msg) throws InterruptedException {System.out.println("消费者2 收到了 direct.queue2的消息:【" + msg +"】");
}
- 在publisher中编写测试方法,利用不同的RoutingKey向hmall.direct发送消息
@Test
void testSendDirect() {String exchangeName = "hmall.direct";String msg = "蓝色通知,警报解除,哥斯拉是放的气球";rabbitTemplate.convertAndSend(exchangeName, "blue", msg);
}
- 效果图
确实只有队列1收到
3.3 Topic
Topic交换机和Direct交换机非常的像,都是通过RoutingKey来控制消息转发给谁,路由给谁
区别在于,Topic交换机的RoutingKey可以使多个单词的列表并且以“.”分割
china.news 代表有中国的新闻消息
china.weather 代表中国的天气消息
japan.news 则代表日本新闻
japan.weather 代表日本的天气消息
Queue与Exchange指定BindingKey时可以使用通配符:
- #:代指0个活多个单词
- *:代指一个单词
需求如下所示
- 在RabbitMQ控制台中,声明队列topic.queue1和topic.queue2
- 在RabbitMQ控制台中,声明交换机hmall.topic,将两个队列与其绑定
- 在consumer服务中,编写两个消费者方法,分别监听topic.queue1和topic.queue2
@RabbitListener(queues = "topic.queue1")
public void listenTopicQueue1(String msg) throws InterruptedException {System.out.println("消费者1 收到了 topic.queue1的消息:【" + msg +"】");
}@RabbitListener(queues = "topic.queue2")
public void listenTopicQueue2(String msg) throws InterruptedException {System.out.println("消费者2 收到了 topic.queue2的消息:【" + msg +"】");
}
- 在publisher中编写测试方法,利用不同的RoutingKey向hmall.topic发送消息
@Test
void testSendTopic() {String exchangeName = "hmall.topic";String msg = "今天天气挺不错,我的心情的挺好的";rabbitTemplate.convertAndSend(exchangeName, "china.weather", msg);rabbitTemplate.convertAndSend(exchangeName, "china.news", "中国的新闻");rabbitTemplate.convertAndSend(exchangeName, "japan.news", "日本新闻");
}
- 效果图
四、声明队列和交换机
在控制台创建队列和交换机非常容易出错、效率极低,实际应该采用Java代码进行创建
SpringAMQP提供了几个类,用来声明队列、交换机及其绑定关系
- Queue:用于声明队列,可以用工厂类QueueBuilder构建
- Exchange:用于声明交换机,可以用工厂类ExchangeBuilder构建
因为交换机有许多的类型,所以ExchangeBuilder有多个实现类
- Binding:用于声明队列和交换机的绑定关系,可以用工厂类BindingBuilder构建
4.1 配置文件
在消费者方面声明队列和Fanout类型交换机
@Configuration
public class FanoutConfiguration {//声明FanoutExchange交换机import org.springframework.amqp.core.FanoutExchange;@Beanpublic FanoutExchange fanoutExchange() {// ExchangeBuilder.fanoutExchange("").build();return new FanoutExchange("hmall.fanout2");}//声明队列import org.springframework.amqp.core.Queue;@Beanpublic Queue fanoutQueue3() {// QueueBuilder.durable("ff").build(); durable是持久化,当前的队列是持久的队列 与下面new的形式相同return new Queue("fanout.queue3");}//将上面声明的队列和交换机绑定@Beanpublic Binding fanoutBinding3(Queue fanoutQueue3, FanoutExchange fanoutExchange) {//将哪个队列绑定到哪个交换机return BindingBuilder.bind(fanoutQueue3).to(fanoutExchange);}//声明队列@Beanpublic Queue fanoutQueue4() {return new Queue("fanout.queue4");}//绑定队列和交换机@Beanpublic Binding fanoutBinding4() {//将哪个队列绑定到哪个交换机//下面是直接调用了队列和交换机的方法进行绑定的//凡是加了Bean的方法都会被动态代理,当我们调用方法时,Spring首先会检查Spring容器中是否有对应的Bean//如果有的话,就不会执行方法中的任何内容,直接从容器中取出Bean对象即可,//如果没有,便执行方法注入Bean即可//所以说这里虽然是直接调用了fanoutQueue4()和fanoutExchange()方法,但是这两个方法也要有@Bean注解return BindingBuilder.bind(fanoutQueue4()).to(fanoutExchange());}
}
假如是Direct类型的交换机的话,就显得非常的麻烦
@Configuration
public class DirectConfiguration {//交换机@Beanpublic DirectExchange directExchange(){return new DirectExchange("hmall.direct");}//队列@Beanpublic Queue directQueue1(){return new Queue("direct.queue1");}//绑定hmall.direct交换机与directQueue1队列@Beanpublic Binding directQueue1BindingRed(Queue directQueue1, DirectExchange directExchange){return BindingBuilder.bind(directQueue1).to(directExchange).with("red");}//绑定hmall.direct交换机与directQueue1队列@Beanpublic Binding directQueue1BindingBlue(Queue directQueue1, DirectExchange directExchange){return BindingBuilder.bind(directQueue1).to(directExchange).with("blue");}@Beanpublic Queue directQueue2(){return new Queue("direct.queue2");}@Beanpublic Binding directQueue2BindingRed(Queue directQueue2, DirectExchange directExchange){return BindingBuilder.bind(directQueue2).to(directExchange).with("red");}@Beanpublic Binding directQueue2BindingBlue(Queue directQueue2, DirectExchange directExchange){return BindingBuilder.bind(directQueue2).to(directExchange).with("yellow");}}
4.2 注解
如下图所示,Direct交换机形式
@Slf4j
@Component
public class MqListener {@RabbitListener(bindings = @QueueBinding(//队列value = @Queue(name = "direct.queue1", durable = "true"),//交换机exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),//Routing keykey = {"red", "blue"}))public void listenDirectQueue1(String msg) throws InterruptedException {System.out.println("消费者1 收到了 direct.queue1的消息:【" + msg + "】");}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue2", durable = "true"),exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),key = {"red", "yellow"}))public void listenDirectQueue2(String msg) throws InterruptedException {System.out.println("消费者2 收到了 direct.queue2的消息:【" + msg + "】");}}
五、消息转换器
需求:测试利用SpringAMQP发送对象类型的消息
- 声明一个名为object.queue的队列
- 编写测试单元,向队列中直接发送一条消息,消息类型为Map
@Testvoid testSendObject() {Map<String, Object> msg = new HashMap<>(2);msg.put("name", "jack");msg.put("age", 21);//我们发送的是一个对象,Spring在接受这个对象的时候会将其转换成字节的形似rabbitTemplate.convertAndSend("object.queue", msg);}
- 在控制台查看消息
下种情况是因为Spring的对消息对象的处理是由org.springframework.amqp.support.converter.MessageConverter来处理的。而默认实现是SimpleMessageConverter,基于JDK的ObjectOutputStream完成序列化。
-
建议采用JSON序列化替换默认的JDK序列化
-
在publisher和consumer中引入Jackson依赖
<!--Jackson-->
<dependency><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-xml</artifactId>
</dependency>
- 在publisher和consumer中都要配置MessageConverter
发布和接收都要有这段代码
@Bean
public MessageConverter jacksonMessageConvertor(){//import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;return new Jackson2JsonMessageConverter();
}
- 效果图