当前位置: 首页 > news >正文

【MQ篇】RabbitMQ之工作队列模式!

在这里插入图片描述

目录

    • 引言
    • 一、 回顾简单模式的局限 😔
    • 二、 认识工作队列模式:多劳多得的“团队协作” 💪
    • 三、 Java (Spring Boot) 代码实战:让多只兔子一起工作! 🐇🐇🐇💪
    • 四、 深入理解:更公平的任务分配与消息确认机制的重要性 🧐
    • 五、 工作队列模式的优势与适用场景总结 🎯
    • 六、 总结:团队协作,效率倍增! 🤝

🌟我的其他文章也讲解的比较有趣😁,如果喜欢博主的讲解方式,可以多多支持一下,感谢🤗!

🌟了解 MQ 请看 : 【MQ篇】初识MQ!

其他优质专栏: 【🎇SpringBoot】【🎉多线程】【🎨Redis】【✨设计模式专栏(已完结)】…等

如果喜欢作者的讲解方式,可以点赞收藏加关注,你的支持就是我的动力
✨更多文章请看个人主页: 码熔burning

引言

在上一篇文章中,我们一起探索了 RabbitMQ 的简单模式,认识了那只勤劳的“单人快递员”小兔子。然而,在实际应用中,我们经常会遇到需要处理大量耗时任务的场景。这时,简单模式可能会显得力不从心。为了解决这个问题,RabbitMQ 引入了“工作队列模式”(Work Queues)。本文将深入讲解工作队列模式,通过生动的比喻和详细的 Java (Spring Boot) 代码示例,展示如何利用多只“兔子Worker”高效地处理任务,提升系统吞吐量!🚀

了解RabbitMQ简单模式请看:【MQ篇】RabbitMQ之简单模式!

一、 回顾简单模式的局限 😔

在简单模式下,即使我们启动多个消费者监听同一个队列,每个消息也只会被其中的一个消费者处理。如果任务处理时间较长,单个消费者可能会成为性能瓶颈,导致消息堆积。

二、 认识工作队列模式:多劳多得的“团队协作” 💪

在这里插入图片描述

  1. 概念:任务共享的处理流水线
    工作队列模式的核心思想是将需要处理的任务(消息)放入一个队列中,然后让多个并行的工作者(消费者)从队列中获取任务并进行处理。每个任务只会被一个工作者处理,从而实现任务的并行化处理,提高整体的处理速度。

  2. 角色:高效分拣的“团队成员”

    • 生产者 (Producer): 仍然是消息的发送者,将需要处理的任务封装成消息并发送到队列。就像“总调度员”,将待处理的“包裹”放入公共的“分拣中心”。 📦➡️🏢
    • 队列 (Queue): 存储待处理任务的中心仓库。就像“分拣中心”的传送带,等待着工作者来领取任务。 📥
    • 多个消费者 (Workers): 多个并行的工作者,它们同时监听同一个队列,并从中获取任务进行处理。就像多名“分拣员”,从传送带上取走“包裹”进行处理。 🧑‍🔧🧑‍🏭👩‍⚕️
  3. 工作流程:任务是如何被高效分拣的? 流程图更清晰哦!

    • 连接 (Connect): 生产者和多个消费者都连接到 RabbitMQ 服务器。🔗
    • 声明队列 (Declare Queue): 生产者和所有消费者都声明同一个队列。🏷️
    • 发送任务 (Publish): 生产者将待处理的任务消息发送到队列中。✉️➡️📦
    • 消费者获取任务 (Consume): 多个消费者同时监听该队列,当队列中有新消息时,它们会尝试获取任务。RabbitMQ 会根据一定的策略(例如,轮询)将消息分发给不同的消费者。👂➡️📦
    • 处理任务 (Process): 获取到任务的消费者开始处理该任务。💻
    • 确认 (Acknowledge): 消费者在成功处理完任务后,向 RabbitMQ 发送确认。✅
  4. 核心特点:并行处理、提高吞吐量 🚀

    • 并行处理: 多个消费者可以同时处理队列中的任务,显著提高处理速度。
    • 负载均衡(基本): RabbitMQ 会尝试将消息均匀地分发给不同的消费者,实现基本的负载均衡。
    • 提高吞吐量: 在单位时间内可以处理更多的任务。
  5. 适用场景:需要并行处理耗时任务的“高效流水线” 🏭
    工作队列模式非常适合需要并行处理耗时任务的场景,例如:

    • 图片、视频等多媒体文件的处理。 🏞️🎬
    • 大数据分析和计算。 📊
    • 复杂的报表生成。 📈
    • 批量发送邮件或通知。 📧🔔

三、 Java (Spring Boot) 代码实战:让多只兔子一起工作! 🐇🐇🐇💪

接下来,我们通过 Spring Boot 的示例来演示工作队列模式。

  1. 项目依赖 (pom.xml):
    与简单模式相同,添加 Spring AMQP 的依赖。

  2. RabbitMQ 连接配置 (application.properties):
    配置 RabbitMQ 连接信息。

  3. 生产者 (TaskSender.java):
    创建一个发送任务消息的服务组件:

    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Service;@Service
    public class TaskSender {@Autowiredprivate RabbitTemplate rabbitTemplate;private static final String QUEUE_NAME = "task.queue"; // 定义任务队列名称 🏷️public void sendTask(String task) {System.out.println(" [TaskSender] Sending task: " + task + " to " + QUEUE_NAME + " ➡️📦");rabbitTemplate.convertAndSend(QUEUE_NAME, task);System.out.println(" [TaskSender] Task sent! ✅");}
    }
    
  4. 消费者 (TaskWorker.java):
    创建多个消费者组件来并行处理任务:

    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;@Component
    public class TaskWorker {@RabbitListener(queues = "task.queue")public void processTask1(String task) {System.out.println(" [Worker 1] Received task: " + task + " 📥");simulateProcessing(task);System.out.println(" [Worker 1] Task processed: " + task + " 👍");}
    }@Component
    public class TaskWorker2 {@RabbitListener(queues = "task.queue")public void processTask2(String task) {System.out.println(" [Worker 2] Received task: " + task + " 📥");simulateProcessing(task);System.out.println(" [Worker 2] Task processed: " + task + " 👍");}
    }// 模拟耗时任务
    private static void simulateProcessing(String task) {try {Thread.sleep(2000); // 模拟 2 秒的处理时间 ⏳⏳} catch (InterruptedException e) {Thread.currentThread().interrupt();}
    }
    

    注意: 我们创建了两个不同的 @Component 类 (TaskWorkerTaskWorker2),它们都监听同一个队列 "task.queue"。Spring AMQP 会自动创建这些消费者实例,并让它们并行地从队列中获取任务。

  5. 启动和测试 (RabbitmqWorkQueueDemoApplication.java):
    在你的 Spring Boot 主类中注入 TaskSender 并发送一些任务:

    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.CommandLineRunner;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
    public class RabbitmqWorkQueueDemoApplication implements CommandLineRunner {@Autowiredprivate TaskSender sender;public static void main(String[] args) {SpringApplication.run(RabbitmqWorkQueueDemoApplication.class, args);}@Overridepublic void run(String... args) throws Exception {sender.sendTask("Task 1");sender.sendTask("Task 2");sender.sendTask("Task 3");sender.sendTask("Task 4");}
    }
    

    运行你的 Spring Boot 应用,你会看到 TaskSender 发送的任务被 TaskWorkerTaskWorker2 并行地接收和处理。

    在这里插入图片描述

四、 深入理解:更公平的任务分配与消息确认机制的重要性 🧐

  1. 消息的轮询分发 (Round-Robin):
    默认情况下,RabbitMQ 会采用轮询的方式将队列中的消息分发给不同的消费者。这意味着每个消费者会依次接收到消息,从而实现基本的负载均衡。

  2. 消息确认机制 (Acknowledgement) 的重要性:确保任务不丢失 🛡️
    在工作队列模式下,显式地配置和使用消息确认机制尤为重要。如果一个 Worker 在处理任务的过程中崩溃,RabbitMQ 会将该任务重新放回队列中,以便其他 Worker 可以重新处理,从而保证任务不会丢失。在 Spring Boot 中,你可以通过配置 spring.rabbitmq.listener.acknowledge-modemanual,然后在你的 @RabbitListener 方法中注入 Channel 参数,并手动调用 channel.basicAck(deliveryTag, false) 进行确认。

    import com.rabbitmq.client.Channel;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.amqp.support.AmqpHeaders;
    import org.springframework.messaging.handler.annotation.Header;
    import org.springframework.stereotype.Component;@Component
    public class ReliableTaskWorker {@RabbitListener(queues = "task.queue")public void processTask(String task, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {System.out.println(" [ReliableWorker] Received task: " + task + " 📥");simulateProcessing(task);System.out.println(" [ReliableWorker] Task processed: " + task + " 👍");try {channel.basicAck(tag, false); // 手动发送确认} catch (Exception e) {// 处理确认失败的情况e.printStackTrace();}}// 模拟耗时任务 (与之前相同)private static void simulateProcessing(String task) { /* ... */ }
    }
    
  3. 公平分发 (Fair Dispatch):让能力更强的 Worker 处理更多任务 ⚖️
    默认的轮询分发在 Worker 处理能力不同的情况下可能并不理想。RabbitMQ 提供了“公平分发”的机制,在 Worker 确认处理完当前消息之前,不要再向其发送新的消息。这样,处理速度快的 Worker 就能处理更多的任务,从而更有效地利用系统资源。在 Spring Boot 中,你可以在配置文件中进行设置:
    在这里插入图片描述

五、 工作队列模式的优势与适用场景总结 🎯

  • 提高处理速度: 通过并行处理显著缩短任务的处理时间。
  • 提高系统吞吐量: 在单位时间内可以处理更多的任务。
  • 增强系统弹性: 即使某个 Worker 宕机,队列中的任务仍然可以被其他 Worker 处理。
  • 实现基本的负载均衡: 将任务分发给多个 Worker 处理。

工作队列模式非常适合需要处理大量后台任务,并且希望通过并行处理来提高效率和吞吐量的应用场景。

六、 总结:团队协作,效率倍增! 🤝

RabbitMQ 的工作队列模式就像一个高效的团队,通过将任务分发给多个 Worker 并行处理,极大地提高了系统的处理能力和吞吐量。掌握了工作队列模式,你就能更好地应对需要处理大量后台任务的场景,让你的 RabbitMQ 应用更加强大!💪

相关文章:

  • 【无标题】spark安装部署
  • 16.第二阶段x64游戏实战-分析二叉树结构
  • CAMAT
  • FreeRTOS深度解析:队列集(Queue Sets)的原理与应用
  • 域名 → IP 的解析全过程
  • 【PCB工艺】推挽电路及交越失真
  • 厚铜PCB制造中的散热结构工艺控制要点
  • 探秘Transformer系列之(30)--- 投机解码
  • JavaScript 改变this指向
  • LeetCode第164题_最大间距
  • 图文结合 - 光伏系统产品设计PRD文档 -(慧哥)慧知开源充电桩平台
  • 前端 JavaScript 处理流式响应的坑
  • DeepSeek+Mermaid:轻松实现可视化图表自动化生成(附实战演练)
  • Ubuntu使用war包部署Jenkins并通过systemcl管理
  • 【Java面试笔记:基础】11.Java提供了哪些IO方式? NIO如何实现多路复用?
  • 【Java学习笔记】选择结构
  • ACI multipod 二、IPN (Inter-Pod Network)
  • 【最新版】沃德代驾源码全开源+前端uniapp
  • [蓝桥杯 2025 省 Python B] 异或和
  • IDEA中Quarkus框架(3.13版本)容器编排、压测与调优、注意事项等
  • 央行上海总部:受益于过境免签政策,上海市外卡刷卡支付交易量稳步增长
  • 上海楼市明显复苏:一季度房地产开发投资增长5.1%,土地市场重燃战火
  • 中华人民共和国和阿塞拜疆共和国关于建立全面战略伙伴关系的联合声明
  • 金地集团:保交楼为经营的首要任务,将根据融资性现金流恢复程度等进行投资决策
  • 格力电器:选举董明珠为公司第十三届董事会董事长
  • 俄乌就不打击民用基础设施释放对话信号