当前位置: 首页 > news >正文

RabbitMQ全栈实践手册:从零搭建消息中间件到SpringAMQP高阶玩法

目录

前言

认识MQ

同步调用

异步调用

技术选型

安装

SpringAMQP

交换机类型

队列交换机绑定

环境搭建

Fanout交换机

声明队列和交换机

消息发送

消息接收

总结

Direct交换机

声明队列和交换机

消息发送

消息接收

总结

Topic交换机

声明队列和交换机

消息发送

消息接收

总结

消息转换器

测试默认转换器

配置JSON转换器

消费者接收Object


前言

RabbitMQ是目前主流消息中间件,基本现在的中小公司都用的RabbitMQ,我所在公司目前好几套

系统,消息中间件也是使用RabbitMQ,RabbitMQ并发好像最多10万,单个MQ应该QPS 5 6万应

该不成问题,对于大部分中小公司来说,完全够了,市面上QPS能几百的公司都顶天了,能用

kafka那些公司基本都是大公司大数据量。

认识MQ

同步调用

同步调用很好理解,就是我要等待你响应,然后才能进行下一步,例如A系统RPC调用B系统,B系

统响应后你才能继续下一步,这里拿一张图举例子:

  1. 支付后调用用户服务扣减余额
  2. 更新交易流水
  3. 调用交易服务 更新订单状态
  4. 调用通知服务 短信通知用户
  5. 调用积分服务 更新用户积分

可以看到都要去等待服务的响应才能进行下一步,其实我们也不需要对方的响应结果做处理,万一

后续产品经理增加服务,越来越多可能调用个支付服务都要等3~4秒钟,对于用户的体验非常不

好,如果哪个服务挂了或者阻塞还会引起服务雪崩。

异步调用

异步调用一般有三个角色:

消息发送者:发送消息的人 向消息代理投递消息(外卖员)

消息Broker:管理、暂存、转发消息(外卖柜)

消息接收者:接收和处理消息的人(我拿外卖 吃饭)

现在我们使用消息Broker(外卖柜)把消息投递到Broker,就不用等待服务响应才能继续往下走,

投递完消息就结束,这样节省了时间,也不会发生级联失败的场景,同时也解耦合了,无论产品经

理后续增加什么服务,我都不用去修改支付服务代码,我只需要去消息Broker(外卖柜)拿到消息

处理即可。

技术选型

消息Broker,目前常见的实现方案就是消息队列(MessageQueue),简称为MQ.

目比较常见的MQ实现:

  • ActiveMQ

  • RabbitMQ

  • RocketMQ

  • Kafka

几种常见MQ的对比:

RabbitMQActiveMQRocketMQKafka
公司/社区RabbitApache阿里Apache
开发语言ErlangJavaJavaScala&Java
协议支持AMQP,XMPP,SMTP,STOMPOpenWire,STOMP,REST,XMPP,AMQP自定义协议自定义协议
可用性一般
单机吞吐量一般非常高
消息延迟微秒级毫秒级毫秒级毫秒以内
消息可靠性一般一般

追求可用性:Kafka、 RocketMQ 、RabbitMQ

追求可靠性:RabbitMQ、RocketMQ

追求吞吐能力:RocketMQ、Kafka

追求消息低延迟:RabbitMQ、Kafka

据统计,目前国内消息队列使用最多的还是RabbitMQ,再加上其各方面都比较均衡,稳定性也

好。

安装

docker安装RabbitMQ

docker run \-e RABBITMQ_DEFAULT_USER=itheima \-e RABBITMQ_DEFAULT_PASS=123321 \-v mq-plugins:/plugins \--name mq \--hostname mq \-p 15672:15672 \-p 5672:5672 \-d \rabbitmq:3.8-management

15672:RabbitMQ提供的管理控制台端口

5672:RabbitMQ的消息发送处理接口 

RabbitMQ架构图

其中包含几个概念:

  • publisher:生产者,也就是发送消息的一方

  • consumer:消费者,也就是消费消息的一方

  • queue:队列,存储消息。生产者投递的消息会暂存在消息队列中,等待消费者处理

  • exchange:交换机,负责消息路由。生产者发送的消息由交换机决定投递到哪个队列。

  • virtual host:虚拟主机,起到数据隔离的作用。每个虚拟主机相互独立,有各自的exchange、queue

SpringAMQP

RabbitMQ采用的是AMQP协议,具有夸语言特性,RabbitMQ提供了多种语言客户端,任何语言只

要遵循AMQP协议收发消息,都可以与RabbitMQ交互。

RabbitMQ提供的Java客户端比较复杂,基本我们都使用Spring提供一套消息收发模板工具:

SpringAMQP,并且还基于SpringBoot对其实现了自动装配。

Spring AMQP官方地址:Spring AMQP

SpringAMQP提供了三个功能:

  • 自动声明队列、交换机及其绑定关系
  • 基于注解的监听器模式,异步接收消息
  • 封装了RabbitTemplate工具,用于发消息。

交换机类型

交换机类型有四种:

Fanout:广播,将消息交个所有绑定到交换机的队列。

Direct:订阅,基于RoutingKey(路由Key)发送个订阅了消息的队列。

Topic:通配符订阅,与Direct类似,支只不过RoutingKey可以使用通配符。

Headers:头匹配,基于MQ的消息头匹配,用的交互。

在开发中,用的最多的就是Direct交换机。

队列交换机绑定

我们可以在控制台手动创建交换机、队列,然后手动将队列和交换机绑定。处了这种还可以在

springboot中,手动创建交换机和队列,然后手动绑定放到IOC容器中,这两种方式一般不怎么常

用,现在基本用的最多的是基于注解声明绑定,开发中也基本用这种模式,简单方便。

环境搭建

导入RabbitMQ依赖:

        <!--AMQP依赖,包含RabbitMQ--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

application.yml配置RabbitMQ信息连接RabbitMQ

spring:rabbitmq:host: 182.92.96.XXX       #  RabbitMQ服务器的IP地址port: 5672                #  RabbitMQ服务器的端口号virtual-host: /           #  RabbitMQ服务器的虚拟主机名称username: itheima         #  RabbitMQ服务器的登录用户名password: 123321          #  RabbitMQ服务器的登录密码

Fanout交换机

声明队列和交换机

如果在MQ没有这个交换机和队列,会自动创建交换机和队列,并且将他们进行绑定。

@Component
public class RabbitMQListener {@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "fanout.queue1"),exchange = @Exchange(name = "fanout.exchange",type = ExchangeTypes.FANOUT)))public void ListenFanoutQueue1(String msg){System.out.println("消费者1接收fantou.queue1的消息:【" + msg + "】");}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "fanout.queue2"),exchange = @Exchange(name = "fanout.exchange",type = ExchangeTypes.FANOUT)))public void ListenFanoutQueue2(String msg){System.out.println("消费者2接收fantou.queue2的消息:【" + msg + "】");}
}

消息发送

@SpringBootTest
public class SpringAmqpTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testFanoutQueue(){String exchangeName = "fanout.exchange";String message = "Hello,Spring AMQP! 我叫陶然同学";rabbitTemplate.convertAndSend(exchangeName,null,message);}
}

消息接收

总结

交换机的作用是什么?

  • 接收publisher发送的消息
  • 将消息按照规则路由到与之绑定的队列
  • 不能缓存消息,路由失败,消息丢失
  • FanoutExchange的会将消息路由到每个绑定的队列

Direct交换机

声明队列和交换机

相比Fanout交换机,Direct交换机多了RoutingKey,如果我发送消息指定的RoutingKey是red,那么

两个消费者都能接收消息,如果我发送消息指定的RoutingKey是blue,那么只有消费者1才能接收

到消息。

    @RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue1"),exchange = @Exchange(name = "direct.exchange"),key = {"red","blue"}))public void listenDirectQueue1(String msg){System.out.println("消费者1接收direct.queue1的消息:【" + msg + "】");}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue2"),exchange = @Exchange(name = "direct.exchange"),key = {"red","yellow"}))public void listenDirectQueue2(String msg){System.out.println("消费者2接收direct.queue2的消息:【" + msg + "】");}

消息发送

    @Testpublic void testDirectQueue(){String exchangeName = "direct.exchange";String message = "Hello,Spring AMQP! 我叫陶然同学";rabbitTemplate.convertAndSend(exchangeName,"red",message);}

消息接收

总结

描述下Direct交换机与Fanout交换机的差异?

  • Fanout交换机将消息路由给每一个与之绑定的队列
  • Direct交换机根据RoutingKey判断路由给哪一个队列
  • 如果多个队列具有相同的RoutingKey,则与Fanout功能类似

Topic交换机

声明队列和交换机

通配符规则:

  • #:匹配一个或多个词
  • *:匹配不多不少敲好1个词
    @RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue1"),exchange = @Exchange(name = "topic.exchange",type = ExchangeTypes.TOPIC),key = "china.#"))public void listenTopictQueue1(String msg){System.out.println("消费者1接收topic.queue1的消息:【" + msg + "】");}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue2"),exchange = @Exchange(name = "topic.exchange",type = ExchangeTypes.TOPIC),key = "#.news"))public void listenTopictQueue2(String msg){System.out.println("消费者2接收topic.queue1的消息:【" + msg + "】");}

消息发送

    @Testpublic void testTopicQueue(){String exchangeName = "topic.exchange";String message = "Hello,Spring AMQP! 我叫陶然同学";rabbitTemplate.convertAndSend(exchangeName,"china.news",message);}

消息接收

总结

描述下Direct交换机Topic交换机的差异?

  • Topic交换机接收的消息RoutingKey必须是多个单词,以.分割
  • Topic交换机与队列的bindingKey可以指定通配符
  • #:代表0个或多个词
  • *:代表1个词

消息转换器

Spring的消息发送接收消息体是一个Object,而在数据传输时,会把你的消息序列化为字节发送

MQ,接收消息时把字节反序列化为Java对象,但是默认情况Spring采用JDK序列化方式,众所周

知,JDK序列化存在下列问题:

  • 数据体积过大
  • 有安全漏洞
  • 可读性查

测试默认转换器

我们发送Map结构消息到object.queue队列

    @Testpublic void testSendMap() throws InterruptedException {// 准备消息Map<String,Object> msg = new HashMap<>();msg.put("name", "陶然同学");msg.put("age", 22);// 发送消息rabbitTemplate.convertAndSend("object.queue", msg);}

查看object.queue消息,可以看到消息体非常不友好,看不懂~~~

配置JSON转换器

显然这种JDK序列化方式不适合我们,我们希望体积更小、可读性更高,因此可以使用JSON方式

来做序列化。

引入依赖:

<dependency><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-xml</artifactId><version>2.9.10</version>
</dependency>

配置转换器在发送者和接受者都配置

    @Beanpublic MessageConverter messageConverter(){// 1.定义消息转换器Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();// 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息jackson2JsonMessageConverter.setCreateMessageIds(true);return jackson2JsonMessageConverter;}

消费者接收Object

我们在consumer服务中定义一个新的消费者,publisher是用Map发送,那么消费者也一定要用

Map接收,格式如下:

@RabbitListener(queues = "object.queue")
public void listenSimpleQueueMessage(Map<String, Object> msg) throws InterruptedException {System.out.println("消费者接收到object.queue消息:【" + msg + "】");
}

相关文章:

  • 【后端】主从单体数据库故障自动切换,容灾与高可用
  • 2025最新Facefusion3.1.2使用Docker部署,保姆级教程,无需配置环境
  • 29、简要描述三层架构开发模式以及三层架构有哪些好处?
  • Maven进阶知识
  • Python循环语句-for循环(基础语法,range语句,临时变量作用域,嵌套应用)
  • 数据结构与算法-单链表专题
  • Netmiko 源码解析
  • openEuler对比CentOS的核心优势分析
  • 论文阅读:2025 arxiv Reward Shaping to Mitigate Reward Hacking in RLHF
  • Android学习总结之Retrofit篇
  • 生成器(generator)
  • 从新手到高手:小程序开发进阶技巧分享
  • 搭建spark-local模式
  • 《USB技术应用与开发》第四讲:实现USB鼠标
  • RabbitMQ安装流程(Windows环境)
  • 矩阵系统私信功能开发技术实践,支持OEM
  • 传统TDs系统。
  • CentOS7 部署 Ollama 全栈指南:构建安全远程大模型服务
  • Eigen线性代数求解器(分解类)
  • 代码随想录算法训练营Day31 | 56. 合并区间 738.单调递增的数字
  • 申花四连胜领跑中超,下轮榜首大战对蓉城将是硬仗考验
  • 人民日报:光荣属于每一个挺膺担当的奋斗者
  • 应勇:以法治力量服务黄河流域生态保护和高质量发展
  • 委员呼吁提高政府机构电话号码准确性,辽宁阜新回应
  • 永辉超市一季度净利降近八成,未来12个月至18个月是改革成果集中释放期
  • 大家聊中国式现代化|郑崇选:提升文化软实力,打造文化自信自强的上海样本