详解RabbitMQ工作模式之工作队列模式
目录
工作队列模式
概念
特点
应用场景
工作原理
注意事项
代码案例
引入依赖
常量类
编写生产者代码
编写消费者1代码
编写消费者2代码
先运行生产者,后运行消费者
先运行消费者,后运行生产者
工作队列模式
概念
在工作队列模式中,一个生产者(producer)将任务发布到队列中,多个消费者(consumer)从队列中获取任务并执行。这种模式的主要目标是提高任务的并行处理能力,从而提高系统的吞吐量和效率。
特点
可以有多个消费者,但一条消息只能被一个消费者获取。
消费者在处理完某条消息后,才会收到下一条消息。
RabbitMQ采用轮询(Round-Robin)或公平分发(Fair Dispatch)的方式将消息发送给消费者。
应用场景
1.任务分发:将任务分发给多个工作者(消费者),以便并行处理。这对于需要高吞吐量和任务处理效率的应用程序非常有用。例如,图像处理、视频编码、数据转换等应用可以使用工作队列模式来并行处理大量任务。
2.负载均衡:当有多个消费者时,工作队列模式可以用来实现负载均衡。任务将均匀分布给可用的消费者,以确保每个消费者都有工作可做,而且不会超负荷。
3.后台任务处理:在Web应用程序中,后台任务处理是一个常见的需求。工作队列模式可用于处理与Web请求无关的长时间运行任务,而不会影响用户体验。例如,发送电子邮件、生成报告、备份数据等后台任务可以使用工作队列来处理。
工作原理
1.生产者发送任务:生产者将任务封装为消息,并将其发送到RabbitMQ队列中。
2.RabbitMQ分发任务:RabbitMQ根据配置的分发策略(如轮询或公平分发)将任务分发给消费者。
3.消费者处理任务:消费者从队列中获取任务并执行。在处理完任务后,消费者会向RabbitMQ发送确认消息,表示任务已完成。
4.RabbitMQ确认任务完成:在收到消费者的确认消息后,RabbitMQ会将该任务从队列中移除。
注意事项
1.消息确认:为了确保消息不会丢失,消费者在处理完任务后需要向RabbitMQ发送确认消息。如果消费者在处理任务时失败或崩溃,RabbitMQ会将该任务重新分发给其他消费者。
2.负载均衡:RabbitMQ默认采用轮询方式将消息分发给消费者。如果需要更复杂的负载均衡策略,可以考虑使用其他分发策略或自定义交换机类型。
3.错误处理:在生产者和消费者中都需要添加适当的错误处理逻辑,以处理可能出现的异常情况,如连接失败、消息发送失败等。
代码案例
引入依赖
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client --> <dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.21.0</version> </dependency>
常量类
public class Constants {public static final String HOST = "47.98.109.138";public static final int PORT = 5672;public static final String USER_NAME = "study";public static final String PASSWORD = "study";public static final String VIRTUAL_HOST = "aaa";//工作队列模式public static final String WORK_QUEUE = "work.queue";
}
编写生产者代码
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import rabbitmq.constant.Constants;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Producer {public static void main(String[] args) throws IOException, TimeoutException {//1. 建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT); //需要提前开放端口号connectionFactory.setUsername(Constants.USER_NAME);//账号connectionFactory.setPassword(Constants.PASSWORD); //密码connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机Connection connection = connectionFactory.newConnection();//2. 开启信道Channel channel = connection.createChannel();//3. 声明队列 使用内置的交换机//如果队列不存在, 则创建, 如果队列存在, 则不创建channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);//4. 发送消息for (int i = 0; i < 10; i++) {String msg = "hello work queue...."+i;channel.basicPublish("",Constants.WORK_QUEUE, null, msg.getBytes());}System.out.println("消息发送成功~");//6. 资源释放channel.close();connection.close();}
}
编写消费者1代码
import com.rabbitmq.client.*;
import rabbitmq.constant.Constants;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer1 {public static void main(String[] args) throws IOException, TimeoutException {//1. 建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT); //需要提前开放端口号connectionFactory.setUsername(Constants.USER_NAME);//账号connectionFactory.setPassword(Constants.PASSWORD); //密码connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机Connection connection = connectionFactory.newConnection();//2. 开启信道Channel channel = connection.createChannel();//3. 声明队列 使用内置的交换机//如果队列不存在, 则创建, 如果队列存在, 则不创建channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);//4. 消费消息DefaultConsumer consumer = new DefaultConsumer(channel){//从队列中收到消息, 就会执行的方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息:"+ new String(body));}};channel.basicConsume(Constants.WORK_QUEUE, true, consumer);//6. 资源释放
// channel.close();
// connection.close();}
}
编写消费者2代码
import com.rabbitmq.client.*;
import rabbitmq.constant.Constants;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer2 {public static void main(String[] args) throws IOException, TimeoutException {//1. 建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT); //需要提前开放端口号connectionFactory.setUsername(Constants.USER_NAME);//账号connectionFactory.setPassword(Constants.PASSWORD); //密码connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机Connection connection = connectionFactory.newConnection();//2. 开启信道Channel channel = connection.createChannel();//3. 声明队列 使用内置的交换机//如果队列不存在, 则创建, 如果队列存在, 则不创建channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);//4. 消费消息DefaultConsumer consumer = new DefaultConsumer(channel){//从队列中收到消息, 就会执行的方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//TODOSystem.out.println("接收到消息:"+ new String(body));}};channel.basicConsume(Constants.WORK_QUEUE, true, consumer);// //6. 资源释放
// channel.close();
// connection.close();}
}
先运行生产者,后运行消费者
查看管理界面
我们此时会看到,先启动的消费者会消费掉队列中所有的消息。
先运行消费者,后运行生产者
此时我们能看到,两个消费者都能够消费消息。