当前位置: 首页 > news >正文

【RabbitMQ消息队列】详解(一)

初识RabbitMQ

RabbitMQ 是一个开源的消息代理软件,也被称为消息队列中间件,它遵循 AMQP(高级消息队列协议),并且支持多种其他消息协议。
在这里插入图片描述

核心概念

  • 生产者(Producer):创建消息并将其发送到 RabbitMQ 的应用程序。生产者并不关心消息会被发送到哪个队列,而是将消息发送到交换器(Exchange)。
  • 消费者(Consumer):从队列中获取消息并进行处理的应用程序。消费者监听特定的队列,一旦队列中有新消息,就会将其取出并处理。
  • 队列(Queue):是 RabbitMQ 内部用于存储消息的缓冲区。它是一个先进先出(FIFO)的数据结构,多个生产者可以向同一个队列发送消息,多个消费者也可以从同一个队列接收消息。
  • 交换器(Exchange):接收生产者发送的消息,并根据路由规则将消息路由到一个或多个队列。常见的交换器类型有直连交换器(Direct Exchange)、主题交换器(Topic Exchange)、扇形交换器(Fanout Exchange)和头交换器(Headers Exchange)。
  • 绑定(Binding):是交换器和队列之间的关联关系。通过绑定,交换器可以知道将消息路由到哪些队列。

工作模式

  • 简单模式(Simple Mode):一个生产者将消息发送到一个队列,一个消费者从该队列接收消息。
  • 工作队列模式(Work Queues):多个消费者从同一个队列中竞争获取消息,以实现任务的分发和负载均衡。
  • 发布 / 订阅模式(Publish/Subscribe):生产者将消息发送到扇形交换器,交换器将消息广播到所有绑定的队列,每个绑定的队列都有一个消费者接收消息。
  • 路由模式(Routing):生产者将消息发送到直连交换器,交换器根据消息的路由键将消息路由到绑定键与之匹配的队列。
  • 主题模式(Topics):生产者将消息发送到主题交换器,交换器根据消息的路由键和绑定键的匹配规则将消息路由到相应的队列。

优势

  • 解耦:生产者和消费者之间通过消息队列进行通信,它们不需要直接相互了解,从而降低了系统之间的耦合度。
  • 异步通信:生产者发送消息后可以继续执行其他任务,不需要等待消费者处理完消息,提高了系统的响应速度和吞吐量。
  • 流量削峰:在高并发场景下,消息队列可以作为缓冲区,将大量的请求暂存起来,避免后端服务因瞬间高流量而崩溃。
  • 可扩展性:可以通过增加消费者的数量来提高系统的处理能力,以应对不断增长的业务需求。

应用场景

  • 异步处理:例如用户注册时,将发送邮件、短信等通知的任务放入消息队列,由专门的消费者异步处理,提高注册接口的响应速度。
  • 系统解耦:在微服务架构中,各个服务之间通过消息队列进行通信,降低服务之间的耦合度,提高系统的可维护性和可扩展性。
  • 流量削峰:在电商系统的秒杀活动中,将用户的请求放入消息队列,后端服务按照一定的速率从队列中取出请求进行处理,避免系统因瞬间高流量而崩溃。
  • 日志收集:将各个服务产生的日志信息发送到消息队列,由专门的日志处理服务从队列中获取日志信息进行存储和分析。

与其他消息队列的比较

  • Kafka:Kafka 是一个高吞吐量的分布式消息系统,主要用于处理大规模的实时数据流。与 RabbitMQ 相比,Kafka 的吞吐量更高,更适合处理海量数据的实时传输和处理,但在消息的可靠性和灵活性方面相对较弱。
  • ActiveMQ:ActiveMQ 是一个老牌的消息队列中间件,支持多种消息协议,功能较为全面。与 RabbitMQ 相比,ActiveMQ 的性能相对较低,配置和管理也较为复杂。

RabbitMQ 是一个功能强大、性能稳定、易于使用的消息队列中间件,广泛应用于各种分布式系统和微服务架构中。

生产者

1. 导入 pika

import pika

pika 是 Python 里用于与 RabbitMQ 消息代理交互的库,借助它能够实现消息的发送与接收。

2. 建立与 RabbitMQ 服务器的连接

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

pika.ConnectionParameters('localhost'):创建一个连接参数对象,这里指定 RabbitMQ 服务器的地址为 localhost,也就是本地机器。要是你的 RabbitMQ 服务器部署在其他机器上,就得把 localhost 替换成对应的 IP 地址或者域名。
pika.BlockingConnection():构建一个阻塞式的连接对象。所谓阻塞式连接,意味着在连接建立之后,程序会暂停等待,直到操作完成。

3. 创建通道

channel = connection.channel()

在 RabbitMQ 里,通道是进行大部分操作的基础。通道是轻量级的连接,可在一个连接上创建多个通道,以此来提升效率。借助通道,你能够声明队列、发送和接收消息等。

4. 声明队列

channel.queue_declare(queue='queue1')

channel.queue_declare():这是一个方法,其用途是声明一个队列。如果指定的队列不存在,就会创建该队列;若队列已经存在,则不会有任何影响。
queue='queue1':指定队列的名称为 queue1

5. 发布消息到队列

channel.basic_publish(exchange='', routing_key='queue1', body='hello world!')

channel.basic_publish():这是一个用于发布消息的方法。
exchange='':指定交换器的名称。这里为空字符串,代表使用默认的交换器(也就是简单模式)。默认交换器会根据路由键把消息路由到对应的队列。
routing_key='queue1':指定路由键。由于使用的是默认交换器,所以消息会被路由到名称为 hello 的队列。
body='hello world!':指定要发送的消息内容,此处消息内容为 hello world!

import pika# 1.建立与 RabbitMQ 服务器的连接
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))# 2.创建通道
channel = connection.channel()# 3.声明队列
channel.queue_declare(queue='queue1')# 4.发布消息到队列
channel.basic_publish(exchange='', routing_key='queue1', body='hello world!')channel.close()connection.close()

消费者

1. 定义回调函数

def callback(ch, method, properties, body):print(" [x] Received %r" % body)

这是一个回调函数,当消费者从队列中接收到消息时,会调用这个函数。
函数的参数含义如下:
ch:通道对象,用于与 RabbitMQ 进行交互。
method:包含消息传递的元数据,如路由键、交换器等。
properties:消息的属性,如消息头、优先级等。
body:消息的实际内容,以字节形式表示

2.开始消费消息

channel.basic_consume(queue='queue1', auto_ack=True, on_message_callback=callback)

channel.basic_consume():该方法用于启动一个消费者,从指定队列中接收消息。
queue='queue1':指定要消费的队列名称为 queue1
auto_ack=True:表示自动确认消息。当消费者接收到消息后,会立即向 RabbitMQ 发送确认信号,告知 RabbitMQ 该消息已被处理,可以从队列中删除。
on_message_callback=callback:指定当接收到消息时要调用的回调函数。

3. 启动消息消费循环

channel.start_consuming()

channel.start_consuming():启动一个无限循环,持续从队列中接收消息,并调用回调函数处理接收到的消息。直到程序被手动终止(如按下 CTRL+C)。

import pika# 1.建立与 RabbitMQ 服务器的连接
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))# 2.创建通道
channel = connection.channel()# 3.声明队列
channel.queue_declare(queue='queue1')# 4.定义回调函数
def callback(ch, method, properties, body):print(" [x] Received %r" % body)# 5.开始消费消息
channel.basic_consume(queue='queue1', auto_ack=True, on_message_callback=callback)# 6.启动消息消费循环
channel.start_consuming()

相关文章:

  • Linux系统类型及常用操作命令总结
  • 第三方软件检测报告:热门办公软件评估及功能表现如何?
  • 电力系统失步解列与振荡解析
  • Java 内存泄漏 详解
  • 【AI提示词】领导力教练
  • 4.2.1 MYSQL语句,索引,视图,存储过程,触发器
  • 第十三步:vue
  • 【PVR】《Adaptive Palm Vein Recognition Method》
  • React Testing Library
  • Java学习手册:开发 Web 网站要知道的知识
  • T检验、F检验及样本容量计算学习总结
  • 2025第16届蓝桥杯省赛之研究生组D题最大数字求解
  • 学习spark总结
  • 常见锁策略
  • 关系型数据库PostgreSQL vs MySQL 深度对比:专业术语+白话解析+实战案例
  • Customizing Materials Management with SAP ERP Operations
  • AI日报 - 2025年04月28日
  • (26)VTK C++开发示例 ---将点坐标写入PLY文件
  • Java多线程实现顺序执行
  • 界面打印和重定向同时实现
  • 伊朗港口爆炸已致46人死亡
  • 全国电影工作会:聚焦扩大电影国际交流合作,提升全球影响力
  • 人社部:就业政策储备充足,将会根据形势变化及时推出
  • 今年3月全国查处违反中央八项规定精神问题16994起
  • 乌方称泽连斯基与特朗普进行简短会谈
  • 敲定!今年将制定金融法、金融稳定法