当前位置: 首页 > news >正文

RabbitMQ 优先级队列详解


在这里插入图片描述

本文是博主在记录使用 RabbitMQ 在执行业务时遇到的问题和解决办法,因此查阅了相关资料并做了以下记载,记录了优先级队列的机制和使用要点。

本文为长文,详细介绍了相关的知识,可作为学习资料看。


文章目录

  • 一、优先级队列介绍
    • 1、消息是在入优先级队列时排序,还是等消费者拉取消息时排序?
      • 1.1 消息入队列立即排序,即先排序
      • 1.2 优先级队列通常遵循以下原则
    • 2、高优先级消息源源不断,优先级队列如何处理低优先级消息?
      • 2.1 时间衰减机制(Aging)
      • 2.2 分层队列 + 比例调度
      • 2.3 动态优先级调整
      • 2.4 容量限制与降级
      • 2.5 空闲时段处理
    • 3、优先级参数是声明队列的时候设置,还是在发送消息时设置?
      • 3.1 声明队列时:启用优先级支持
      • 3.2 发送消息时:指定消息优先级
      • 3.3 优先级规则
      • 3.4 完整流程示例
        • 步骤 1:声明支持优先级的队列
        • 步骤 2:发送带优先级的消息
        • 步骤 3:消费者验证优先级
  • 二、优先级队列的常见问题和最佳实践
    • 1、常见问题与解决方案
    • 2、使用优先级队列的最佳实践
    • 3、普通队列变成优先级队列会导致已有消息丢失
    • 4、要注意优先级队列的《预计取数》设置
    • 5、多个消费者消费一个队列,优先级消息如何分配?
  • 三、Java 示例实现优先级队列
    • 1、添加 Maven 依赖
    • 2、完整 Java 代码实现
    • 3、关键代码解释
      • 3.1 声明优先级队列
      • 3.2 发送优先级消息
      • 3.3 消费消息
    • 4、运行与验证
    • 5、参数设置
    • 6、异常处理
    • 7、多线程消费

一、优先级队列介绍

首先我们要知道,RabbitMQ 是一个消息中间件,支持多种消息队列模式,而优先级队列就是其中一种,优先级队列允许不同优先级的消息被存储,高优先级 的消息会被 先消费

  • 配置队列的优先级范围比较常见的是例如 0 ~ 255 之间的数值,数值越大优先级越高
  • 优先级的配置时机是在生产者在发送消息时,消费者在处理时,队列会按优先级顺序传递消息

上面标黄的这句话会引出一个新的问题,比如,优先级队列如何影响消息的排序?如果队列已经有消息了,新来的高优先级消息会插到前面吗?或者只在消费者获取消息的时候才按优先级排序?

1、消息是在入优先级队列时排序,还是等消费者拉取消息时排序?

我们知道优先级队列和普通队列不同,普通队列是先进先出(FIFO),而优先级队列会根据消息的优先级来决定处理顺序。

假设队列中已经有三个消息,优先级分别为1、2、3(假设数字越大优先级越高),顺序是按照到达时间排列的。现在来了一条优先级为4的新消息,会引发先排序还是后排序的问题。

(1)先排序:消息入队列立即排序

消息到达队列先排序,立即将这条优先级为 4 的新消息插入到最前面,消费者直接获取优先级最高的消息。

  • 能做到消费者每次取消息时不需要额外的排序开销。
  • 队列内部的结构需要支持快速插入和删除,以保持消息的有序性,如 二叉堆结构

(2)后排序:消费者拉消息时排序

消息先入队列,消费者来取的时候,队列会按照优先级顺序将消息提供给消费者。

  • 消费者拉取消息时排序,需要一个能快速查找最高优先级消息的结构,如堆(heap)

上面两种做法大家可以思考一下优缺点。

1.1 消息入队列立即排序,即先排序

优先级队列通常会在消息入队时根据其优先级插入到正确的位置,而不是等到消费者获取时才进行排序。

该方式可以确保消费者每次获取到的都是当前队列中优先级最高的消息,而不需要每次消费时都重新排序整个队列。

因此,当一个新的高优先级消息到达时,它会被立即插入到队列中比它优先级低的消息前面,这样在后续的消费过程中,消费者会优先处理这条高优先级的消息。

1.2 优先级队列通常遵循以下原则

  • 入队时排序:大多数消息队列系统(如RabbitMQ)在消息入队时即根据优先级调整顺序。新到达的高优先级消息会立即插入到队列中合适的位置(如前面),而非等待消费者拉取时才排序。这意味着队列内部始终按优先级维护有序状态。
  • 数据结构支持:队列通常使用 堆(Heap) 或类似结构,确保插入和提取最高优先级消息的时间复杂度为O(log n) 。例如,新消息D(优先级4)到达后,会被插入到比现有消息更高优先级的位置。
  • 消费者行为透明:消费者获取消息时,无需感知排序过程,直接按队列现有顺序取出最高优先级的消息。队列在入队时已完成排序,因此消费时无额外开销。
示例场景:
现有队列:消息A1)、B2)、C3)→ 顺序为 ABC(假设数字越大优先级越高)。  
新消息D4)到达 → 队列调整为 D4)→ C3)→ B2)→ A1)。  
消费者下次获取时,直接取出D
  • 优势:提前排序确保消费效率,避免每次拉取时计算。
  • 限制:高频插入高优先级消息可能导致调整成本增加,但通常影响可控。

注意:RabbitMQ 的优先级队列有一个最大优先级的限制(0到255),并且如果队列中已经有大量消息,插入一个高优先级消息可能会导致性能问题,因为它需要调整内部结构。


2、高优先级消息源源不断,优先级队列如何处理低优先级消息?

在了解了消息如何入队列之后,就会紧接着遇到下一个问题,高优先级的消息一直进来,队列可能会一直优先处理这些高优先级的,导致低优先级的消息被 饿死 ,也就是永远得不到处理。这时候我们需要考虑怎么让低优先级的消息也能有机会被处理到。

2.1 时间衰减机制(Aging)

  • 原理:动态提升长时间未处理消息的优先级。
  • 实现:为每个消息记录等待时间(如时间戳)。
  • 定义优先级增长函数(如线性增长 新优先级 = 原优先级 + k*等待时间 )。
  • 定期扫描队列,更新低优先级消息的权重。
  • 效果:避免低优先级任务无限期等待,平衡公平性。

2.2 分层队列 + 比例调度

  • 原理:通过多级队列和固定比例分配资源。
  • 实现:将队列分为高、中、低三级,按比例分配处理机会,如 3:2:1
  • 使用加权轮询(WRR)或赤字轮询(DRR)算法调度。
  • 示例:网络 QoS 中为不同流量类型分配带宽。(自行查阅)

2.3 动态优先级调整

  • 原理:根据系统负载动态调整优先级策略。
  • 实现:监控队列深度和处理延迟,当高优先级队列超过阈值时,临时提升低优先级任务的权重。
  • 结合反馈控制(如PID控制器)自动调整参数。
  • 场景:实时系统在过载时降级非关键任务。

2.4 容量限制与降级

  • 原理:限制高优先级队列容量,强制资源释放。
  • 实现:设置高优先级队列的最大长度(如1000条),超出容量后,新到的高优先级消息降级为中等优先级。
  • 结合断路器模式拒绝过量请求。
  • 优势:防止高优先级洪泛导致系统崩溃。

2.5 空闲时段处理

  • 原理:利用系统空闲时间处理积压任务。
  • 实现:定义空闲检测机制,如 CPU利用率 < 20% 持续 5 秒 ,触发后台线程批量处理低优先级队列。
  • 结合预取和缓存优化处理效率。
  • 案例:数据库在低峰期执行维护任务。

常见的策略都在上面展示出来了,如果可以跟据业务需求进行调整,选择其中的一种或者组合使用。在使用的过程中要注意好如何权衡与优化。

  • 公平性 vs 吞吐量:严格按优先级排序最大化吞吐量,但需牺牲公平性;引入Aging或分层队列可改善公平性。
  • 实时性要求:硬实时系统可能需要绝对优先级,而软实时系统可结合超时重试机制。
  • 监控与调优:跟踪低优先级任务的平均等待时间(如 Prometheus 监控),动态调整Aging因子和队列比例(如基于强化学习)。

常见的一些应用场景,可以根据示例搜索如何使用。

  • 操作系统调度:Linux CFS 调度器通过虚拟运行时间(vruntime)实现公平性。
  • 消息队列系统:RabbitMQ 通过 x-max-priority 和插件支持优先级衰减。
  • 微服务架构:Envoy 网关通过流量优先级分级保障关键 APISLA

通过上述策略的组合,系统能够在高负载下兼顾高优先级任务的及时处理和低优先级任务的最终可达性。


3、优先级参数是声明队列的时候设置,还是在发送消息时设置?

在 RabbitMQ 中,优先级队列的配置需要同时在声明队列时和发送消息时设置,二者缺一不可。

3.1 声明队列时:启用优先级支持

必须在队列声明阶段通过参数 x-max-priority 显式启用优先级功能,并定义优先级范围。

Map<String, Object> args = new HashMap<>();
args.put("x-max-priority", 10);  // 定义优先级范围:0-10(共11级)
channel.queueDeclare(
    "priority_queue", 
    true,   // 持久化队列
    false, 
    false, 
    args    // 传入参数
);

注意事项

  • 必须显式声明:如果队列未设置 x-max-priority,发送消息时指定的优先级将被忽略
  • 取值范围:推荐值 0-10,数值越高性能损耗越大(RabbitMQ 官方建议不超过 255)。
  • 不可修改:已创建的队列无法动态修改优先级范围,需删除重建。

3.2 发送消息时:指定消息优先级

在消息属性中通过 priority 字段设置具体优先级值。

AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
    .priority(7)  // 设置优先级为7(需≤队列的x-max-priority)
    .deliveryMode(2)  // 持久化消息
    .build();

channel.basicPublish(
    "", 
    "priority_queue", 
    props, 
    "消息内容".getBytes()
);

3.3 优先级规则

  • 数值越大优先级越高:0 最低,最大值由 x-max-priority 决定。
  • 自动截断:若消息优先级超过 x-max-priority,会被截断为该最大值。
  • 如队列声明 x-max-priority=5,消息设置 priority=8 会被视为 5。
  • 默认优先级:未显式设置时,优先级为 0。

3.4 完整流程示例

步骤 1:声明支持优先级的队列
// 创建连接和Channel
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
     Channel channel = connection.createChannel()) {
    
    // 定义优先级参数
    Map<String, Object> args = new HashMap<>();
    args.put("x-max-priority", 10);
    
    // 声明队列
    channel.queueDeclare(
        "orders", 
        true,   // 持久化队列
        false, 
        false, 
        args
    );
}
步骤 2:发送带优先级的消息
// 发送高优先级订单消息
AMQP.BasicProperties highPriorityProps = new AMQP.BasicProperties.Builder()
    .priority(8)
    .build();
channel.basicPublish("", "orders", highPriorityProps, "VIP订单".getBytes());

// 发送普通优先级消息(默认0)
channel.basicPublish("", "orders", null, "普通订单".getBytes()); 
步骤 3:消费者验证优先级
DeliverCallback callback = (consumerTag, delivery) -> {
    String msg = new String(delivery.getBody(), "UTF-8");
    int priority = delivery.getProperties().getPriority();
    System.out.println("收到优先级 " + priority + " 的消息: " + msg);
    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
};
channel.basicConsume("orders", false, callback, consumerTag -> {});

二、优先级队列的常见问题和最佳实践

1、常见问题与解决方案

问题场景原因解决方案
消息优先级未生效队列未声明 x-max-priority删除旧队列,重新声明带优先级的队列
高优先级消息积压低优先级消息持续高负载导致饥饿现象结合时间衰减(Aging)机制提升老消息优先级
消费者未按优先级顺序处理消息使用 basicConsume 自动分配消息改用 basicGet 手动拉取消息并按优先级排序
优先级数值超过队列限制生产者设置值 > x-max-priority在生产者端做优先级范围校验
消息顺序错乱消费者使用了 prefetchCount > 1,导致低优先级消息被预取channel.basicQos(1); 每次只接收一条消息

2、使用优先级队列的最佳实践

  • 限制优先级层级:建议使用 0-5 级,避免过多层级增加调度复杂度。
  • 监控队列状态:通过 RabbitMQ Management Plugin 监控:
    • queue_messages_ready(按优先级分组)
    • message_age(消息等待时间)
  • 防御性设计:
    // 生产者端校验优先级合法性
    int safePriority = Math.min(requestedPriority, queueMaxPriority);
    
  • 混合队列策略:
    // 高、中、低优先级分队列处理
    channel.queueDeclare("high_pri", true, false, false, Map.of("x-max-priority", 10));
    channel.queueDeclare("low_pri", true, false, false, null); // 无优先级队列
    

3、普通队列变成优先级队列会导致已有消息丢失

如果在队列已经存在的情况下,修改 x-max-priority 参数会出现问题。

比如,原本队列没有设置优先级,后来想启用,可能需要重新声明队列,但这会导致已有的消息丢失。

4、要注意优先级队列的《预计取数》设置

优先级队列和消费者的 预取计数(prefetch count) 之间的关系。

  • 如果消费者设置了较高的预取数,可能会一次性获取多个消息,这时候即使有更高优先级的消息到达,已经被预取的消息可能不会被中断,导致高优先级消息不能及时处理。
  • 因此,可能需要合理设置预取数,或者使用 单条预取(prefetch=1) 来保证高优先级消息尽快被处理。

5、多个消费者消费一个队列,优先级消息如何分配?

如果有多个消费者,高优先级的消息会优先被哪个消费者获取?可能取决于消费者的空闲情况,但优先级高的消息会先被投递给空闲的消费者。

三、Java 示例实现优先级队列

1、添加 Maven 依赖

pom.xml 中添加 RabbitMQ Java 客户端依赖:

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.12.0</version>
</dependency>

2、完整 Java 代码实现

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

public class PriorityQueueExample {

    private final static String QUEUE_NAME = "java_priority_queue";

    public static void main(String[] args) {
        // 创建生产者线程
        Thread producerThread = new Thread(() -> {
            try {
                sendMessages();
            } catch (IOException | TimeoutException e) {
                e.printStackTrace();
            }
        });

        // 创建消费者线程
        Thread consumerThread = new Thread(() -> {
            try {
                consumeMessages();
            } catch (IOException | TimeoutException e) {
                e.printStackTrace();
            }
        });

        producerThread.start();
        consumerThread.start();
    }

    /**
     * 生产者发送消息
     */
    private static void sendMessages() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {

            // 声明优先级队列参数
            Map<String, Object> args = new HashMap<>();
            args.put("x-max-priority", 10); // 设置最大优先级为10

            // 声明队列
            channel.queueDeclare(QUEUE_NAME, true, false, false, args);

            // 发送不同优先级的消息
            int[] priorities = {1, 3, 5};
            for (int priority : priorities) {
                String message = "Message with priority " + priority;
                
                // 设置消息属性(包含优先级)
                AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
                        .priority(priority)
                        .build();

                channel.basicPublish("", QUEUE_NAME, props, message.getBytes());
                System.out.println("Sent: " + message);
            }
        }
    }

    /**
     * 消费者接收消息
     */
    private static void consumeMessages() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        // 定义消息回调处理
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) {
                String message = new String(body);
                int priority = properties.getPriority();
                System.out.println("Consumed: " + message + " | Priority: " + priority);
            }
        };

        // 开始消费(自动确认)
        channel.basicConsume(QUEUE_NAME, true, consumer);
        
        // 保持消费者线程运行
        try {
            Thread.sleep(5000); // 等待5秒确保消息处理完成
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            channel.close();
            connection.close();
        }
    }
}

3、关键代码解释

3.1 声明优先级队列

Map<String, Object> args = new HashMap<>();
args.put("x-max-priority", 10); // 必须参数
channel.queueDeclare(QUEUE_NAME, true, false, false, args);
  • x-max-priority:队列支持的最大优先级值(1-255)
  • 参数必须通过 Map<String, Object> 传递

3.2 发送优先级消息

AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
        .priority(priority) // 设置优先级
        .build();
channel.basicPublish("", QUEUE_NAME, props, message.getBytes());

3.3 消费消息

DefaultConsumer consumer = new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope,
                               AMQP.BasicProperties properties, byte[] body) {
        int priority = properties.getPriority(); // 获取优先级
    }
};

4、运行与验证

  • 启动生产者:发送优先级为 1、3、5 的消息
  • 启动消费者:观察输出顺序应为 5 → 3 → 1

输出示例:

Sent: Message with priority 1
Sent: Message with priority 3
Sent: Message with priority 5
Consumed: Message with priority 5 | Priority: 5
Consumed: Message with priority 3 | Priority: 3
Consumed: Message with priority 1 | Priority: 1

5、参数设置

  • 队列持久化:channel.queueDeclare(..., true, ...) 中第二个参数表示队列持久化
  • 消息持久化:通过 AMQP.BasicProperties.Builder().deliveryMode(2) 设置

6、异常处理

  • 必须处理 IOExceptionTimeoutException
  • 使用 try-with-resources 自动关闭连接:
    try (Connection connection = factory.newConnection();
         Channel channel = connection.createChannel()) {
        // ...
    }
    

7、多线程消费

  • 如果启动多个消费者,高优先级消息会被空闲消费者优先获取
  • 建议设置 prefetchCount=1:单条预取
    channel.basicQos(1);
    

通过以上 Java 实现,可以完整地使用 RabbitMQ 的优先级队列功能。如果需要进一步优化,可以结合线程池、消息确认机制等扩展功能。

相关文章:

  • 自适应调度器:动态分配测试资源
  • kubernetes》》k8s》》ConfigMap 、Secret
  • 数据结构(3)
  • 一图掌握 MySQL 核心要点
  • 中国CRM系统推荐:如何选择最适合企业的客户管理工具?
  • 机器学习中 提到的张量是什么?
  • CS5346 - Task Abstraction and Task Taxonomy 任务抽象和分类
  • 聊聊类模板
  • 波束形成(BF)从算法仿真到工程源码实现-第九节-延迟相减波束形成(delay sub)
  • 【Vue】v-if和v-show的区别
  • 鸿蒙开发-注解
  • 实时语音交互数字人VideoChat,可自定义形象与音色,支持音色克隆,首包延迟低至3s
  • WebRTC实时通话EasyRTC嵌入式音视频通信SDK,构建智慧医疗远程会诊高效方案
  • 【C++进阶六】list模拟实现
  • ShareX:多功能截图与录屏工具
  • AD917X系列JESD204B MODE7使用
  • C++23 新特性:auto(x) 和 auto{x} 的衰变复制
  • swift菜鸟教程15-18(枚举,结构体,类,属性)
  • LINUX基础 [四] - Linux工具
  • uniapp自定义tabbar,根据角色动态显示不同tabbar,无闪动问题
  • “梅花奖”快闪走入上海张园,朱洁静在石库门前起舞
  • 价格周报|猪价继续回暖:二次育肥热度仍存,对猪价仍有一定支撑
  • “五一”假期云南铁路预计发送旅客超330万人次
  • 财政部、证监会:加强对会计师事务所从事证券服务业务的全流程监管
  • 在黄岩朵云书院,邂逅陈丹燕与月季花的故事
  • 专访|白俄罗斯共产党中央第一书记瑟兰科夫:只有大家联合起来,才能有效应对当前危机所带来的冲击