当前位置: 首页 > news >正文

MQ基础篇

1.初识MQ

1.同步调用

概念:
同步调用是一种程序执行方式,在调用一个函数或服务时,调用方会一直等待被调用方执行完成并返回结果,才会继续执行后续代码 ,期间调用线程处于阻塞状态。

同步调用的优势:

  • 时效性强,等待到结果后才返回。

同步调用的问题:

  • 拓展性差
  • 性能下降
  • 级联失败问题

2.异步调用

概念:
异步调用是一种程序执行机制,调用方发出请求后,无需等待被调用方处理完成并返回结果,就能继续执行后续代码 。它基于消息通知的方式,涉及消息发送者、消息接收者和消息代理三个角色。

异步调用通常是基于消息通知的方式,包含三个角色:

  • 消息发送者:投递消息的人,就是原来的调用者
  • 消息接收者:接收和处理消息的人,就是原来的服务提供者
  • 消息代理:管理、暂存、转发消息,你可以把它理解成微信服务器
    在这里插入图片描述
    异步调用的优势:
  • 耦合度低,拓展性强
  • 异步调用,无需等待,性能好
  • 故障隔离,下游服务故障不影响上游业务
  • 缓存消息,流量削峰填谷

异步调用的问题:

  • 不能立即得到调用结果,时效性差
  • 不确定下游业务执行是否成功
  • 业务安全依赖于Broker的可靠性

3.MQ技术选型

MQ(MessageQueue),中文是消息队列,字面来看就是存放消息的队列。也就是异步调用中的Broker。

RabbitMQActiveMQRocketMQKafka
公司/社区RabbitApache阿里Apache
开发语言ErlangJavaJavaScala&Java
协议支持AMQP,XMPP,SMTP,STOMPOpenWire,STOMP,REST,XMPP,AMQP自定义协议自定义协议
可用性一般
单机吞吐量一般非常高
消息延迟微秒级毫秒级毫秒级毫秒以内
消息可靠性一般一般

Broker:核心组件,在生产者和消费者间起中介作用,负责接收、存储和转发消息

2.RabbitMQ

安装部署

docker run \-e RABBITMQ_DEFAULT_USER=wang \-e RABBITMQ_DEFAULT_PASS=123 \-v mq-plugins:/plugins \--name mq \--hostname mq \-p 15672:15672 \-p 5672:5672 \--network hm-net\-d \rabbitmq:3.8-management

在这里插入图片描述

消息发送的注意事项有哪些?

  • 交换机只能路由消息,无法存储消息
  • 交换机只会路由消息给与其绑定的队列,因此队列必须与交换机绑定

3.Java客户端

1.快速入门

① 引入spring-boot-starter-amqp依赖

<!--AMQP依赖,包含RabbitMQ-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

② 配置rabbitmq服务端信息

spring:rabbitmq:host: 192.168.88.130 # 你的虚拟机IPport: 5672 # 端口virtual-host: /hmall # 虚拟主机username: hmall # 用户名password: 123 # 密码

③ 利用RabbitTemplate发送消息

@SpringBootTest
class SpringAmqpTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testSimpleQueue() {//1.队列名String queueName = "simple.queue";//2.消息String message = "hello,spring amqp!";//3.发送消息rabbitTemplate.convertAndSend(queueName, message);}}

④ 利用@RabbitListener注解声明要监听的队列,监听消息

@Slf4j
@Component
public class SpringRabbitListener {@RabbitListener(queues = "simple.queue")public void listenSimpleQueue(String message) {log.info("Simple queue: {}", message);}
}

2.WorkQueue

实现一个队列绑定多个消费者

Work模型的使用:

  • 多个消费者绑定到一个队列,可以加快消息处理速度
  • 同一条消息只会被一个消费者处理
  • 通过设置prefetch来控制消费者预取的消息数量,处理完一条再处理下一条,实现能者多劳
@RabbitListener(queues = "work.queue")
public void listenWorkQueue1(String message) {System.out.println("消费者1接收到消息:" + message + "," + LocalDateTime.now());
}@RabbitListener(queues = "work.queue")
public void listenWorkQueue2(String message) {System.err.println("消费者2接收到消息:" + message + "," + LocalDateTime.now());
}
@Test
public void testWorkQueue() {//1.队列名String queueName = "work.queue";for(int i=1;i<=50;i++){//2.消息String message = "hello,spring amqp!"+i;//3.发送消息rabbitTemplate.convertAndSend(queueName, message);}
}
listener:simple:prefetch: 1

3.Fanout交换机

交换机的作用:

  • 接收publisher发送的消息
  • 将消息按照规则路由到与之绑定的队列
  • FanoutExchange的会将消息路由到每个绑定的队列

发送消息到交换机的API:

@Test
public void testFanoutExchange() {// 交换机名称String exchangeName = "itcast.fanout";// 消息String message = "hello, everyone!";// 发送消息,参数分别是:交换机名称、RoutingKey(暂时为空)、消息rabbitTemplate.convertAndSend(exchangeName, "", message);
}

Fanout Exchange 会将接收到的消息路由到每一个跟其绑定的queue,所以也叫广播模式
在这里插入图片描述

@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String message) {log.info("消费者1监听到 fanout.queue1的消息: {}", message);
}
@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String message) {log.info("消费者2监听到 fanout.queue2的消息: {}", message);
}
@Test
public void testFanoutQueue() {//1.交换机名String exchangeName = "hmall.fanout";//2.消息String message = "hello,everyone!";//3.发送消息rabbitTemplate.convertAndSend(exchangeName,"", message);
}

4.Direct交换机

在这里插入图片描述
Direct交换机与Fanout交换机的差异:

  • Fanout交换机将消息路由给每一个与之绑定的队列
  • Direct交换机根据RoutingKey判断路由给哪个队列
  • 如果多个队列具有相同RoutingKey,则与Fanout功能类似
@RabbitListener(queues = "direct.queue1")
public void listenDirectQueue1(String message) {log.info("消费者1监听到 direct.queue1的消息: {}", message);
}
@RabbitListener(queues = "direct.queue2")
public void listenDirectQueue2(String message) {log.info("消费者2监听到 direct.queue2的消息: {}", message);
}
@Test
public void testDirectQueue() {//1.交换机名String exchangeName = "hmall.direct";//2.消息String message = "hello,blue!";//3.发送消息rabbitTemplate.convertAndSend(exchangeName, "blue", message);
}

5.Topic交换机

在这里插入图片描述
Topic交换机相比Direct交换机的差异:

  • Topic的RoutingKey和bindingKey可以是多个单词,以 . 分割
  • Topic交换机与队列绑定时的bindingKey可以指定通配符
  • #: 代表0个或多个词
  • *: 代表1个词
@RabbitListener(queues = "topic.queue1")
public void listenTopicQueue1(String message) {log.info("消费者1监听到 topic.queue1的消息: {}", message);
}
@RabbitListener(queues = "topic.queue2")
public void listenTopicQueue2(String message) {log.info("消费者2监听到 topic.queue2的消息: {}", message);
}
@Test
public void testTopicQueue() {//1.交换机名String exchangeName = "hmall.topic";//2.消息String message = "hello,blue!";//3.发送消息rabbitTemplate.convertAndSend(exchangeName, "china.news", message);
}

6.声明队列交换机

@Configuration
public class FanoutConfiguration {@Beanpublic FanoutExchange fanoutExchange() {
//        return new FanoutExchange("hmall.fanout");return ExchangeBuilder.fanoutExchange("hamll.fanout").build();}@Beanpublic Queue fanoutQueue1() {
//        return new Queue("fanout.queue1");return QueueBuilder.durable("fanout.queue1").build();}@Beanpublic Binding fanoutQueueBinding1(Queue fanoutQueue1, FanoutExchange fanoutExchange) {return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}@Beanpublic Queue fanoutQueue2() {
//        return new Queue("fanout.queue1");return QueueBuilder.durable("fanout.queue2").build();}@Beanpublic Binding fanoutQueueBinding2(Queue fanoutQueue2, FanoutExchange fanoutExchange) {return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}
}
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue1", durable = "true"),exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),key = {"red", "blue"}
))
public void listenDirectQueue1(String message) {log.info("消费者1监听到 direct.queue1的消息: {}", message);
}

7.消息转换器

建议采用JSON序列化代替默认的JDK序列化,要做两件事情:

在publisher和consumer中都要引入jackson依赖:

<dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId>
</dependency>

在publisher和consumer中都要配置MessageConverter:

@Bean
public MessageConverter messageConverter(){return new Jackson2JsonMessageConverter();
}
@Test
public void testSendObject() {//1.准备消息Map<String, Object> msg=new HashMap<>(2);msg.put("name","Jack");msg.put("age",18);//3.发送消息rabbitTemplate.convertAndSend("object.queue",msg);
}

相关文章:

  • 深度学习3.1 线性回归
  • 前端基础之《Vue(6)—组件基础(2)》
  • 1.Linux基础指令
  • MATLAB 控制系统设计与仿真 - 37
  • Linux:命令行参数、环境变量
  • [经验总结]Linux双机双网卡Keepalived高可用配置及验证细节
  • 大数据赋能,全面提升‘企业服务平台’实际效能!
  • 浏览器的存储机制 - Storage
  • NO.97十六届蓝桥杯备战|数论板块-最大公约数和最小公倍数|欧几里得算法|秦九韶算法|小红的gcd(C++)
  • 爬虫学习——Scrapy
  • Java编程语言 1.打印数组元素 2.Student类 StudentTest类
  • 【go】什么是Go语言中的GC,作用是什么?调优,sync.Pool优化,逃逸分析演示
  • alertManager部署安装、告警规则配置详解及告警消息推送
  • 华为openEuler操作系统全解析:起源、特性与生态对比
  • 机器学习模型(2/4课时):损失函数
  • 深度学习中的卷积神经网络
  • 命令行工具kubectl
  • 密码学中的盐值是什么?
  • RAII资源管理理解
  • Python 中的数据类型有哪些
  • 商务部:新一轮服务业扩大开放一次性向11个试点省市全面铺开
  • 成都市政府秘书长王忠诚调任遂宁市委副书记
  • 复旦大学史地学系在北碚
  • 影子调查丨义门陈遗址建筑被“没收”风波
  • 人民日报和音:书写周边命运共同体建设新篇章
  • 对话地铁读书人|来自法学教授的科普:读书日也是版权日