当前位置: 首页 > news >正文

黑马商城(六)RabbitMQ

一、同步调用

二、异步调用

三、MQ技术选型

快速Docker部署:

docker run \-e RABBITMQ_DEFAULT_USER=itheima \-e RABBITMQ_DEFAULT_PASS=123321 \-v mq-plugins:/plugins \--name mq \--hostname mq \-p 15672:15672 \-p 5672:5672 \--network hmall\-d \rabbitmq:3.8-management

RabbitMQ:

数据隔离: 

案例:

 快速入门(实现基于MQ收发消息):

        <!--AMQP依赖,包含RabbitMQ--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

spring:rabbitmq:host: 192.168.50.129 # 你的虚拟机IPport: 5672 # 端口virtual-host: /hmall # 虚拟主机username: hmall # 用户名password: 123 # 密码

Work Queues:

案例:

构造了两个方法模拟两个消费者

package com.itheima.consumer.mq;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.time.LocalDateTime;@Slf4j
@Component
public class SpringRabbitListener {@RabbitListener(queues = "work.queue")public void listenWorkQueue1(String message){System.out.println("消费者1接收到消息: "+message+ ","+ LocalDateTime.now());}@RabbitListener(queues = "work.queue")public void listenWorkQueue2(String message){System.err.println("消费者2......接收到消息: "+message+ ","+ LocalDateTime.now());}}
package com.itheima.publisher;import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;@SpringBootTest
class SpringAmqpTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testWorkQueue(){//1.队列名String queueName="work.queue";//2.消息for (int i=1;i<=50;i++){String message="hello.spring amqp_"+i;//3.发送消息rabbitTemplate.convertAndSend(queueName,message);}}
}

    @RabbitListener(queues = "work.queue")public void listenWorkQueue1(String message) throws InterruptedException {System.out.println("消费者1接收到消息: "+message+ ","+ LocalDateTime.now());Thread.sleep(25);}@RabbitListener(queues = "work.queue")public void listenWorkQueue2(String message) throws InterruptedException {System.err.println("消费者2......接收到消息: "+message+ ","+ LocalDateTime.now());Thread.sleep(200);}

默认性能不影响分配规则

交换机:

Fanout交换机:

 案例:

    @Testpublic void testFanoutQueue(){//1.交换机名String exName="hmall.fanout";//2.消息String message="hello.everyone!";//3.发送消息rabbitTemplate.convertAndSend(exName,null,message);}
    @RabbitListener(queues = "fanout.queue1")public void listenFanoutQueue1(String message){log.info("消费者1监听到fanout.queue1的消息:{}",message);}@RabbitListener(queues = "fanout.queue2")public void listenFanoutQueue2(String message){log.info("消费者2监听到fanout.queue2的消息:{}",message);}

Direct交换机:

案例:

    @RabbitListener(queues = "direct.queue1")public void listenDirectQueue1(String message){log.info("消费者1监听到direct.queue1的消息:{}",message);}@RabbitListener(queues = "direct.queue2")public void listenDirectQueue2(String message){log.info("消费者2监听到 direct.queue2的消息:{}",message);}
    @Testpublic void testDirectQueue(){//1.交换机名String exName="hmall.direct";//2.消息String message="hello.yellow!";//3.发送消息rabbitTemplate.convertAndSend(exName,"yellow",message);}

Topic交换机:

案例:
    @Testpublic void testTopicQueue(){//1.交换机名String exName="hmall.topic";//2.消息String message="天气不错";//3.发送消息rabbitTemplate.convertAndSend(exName,"china.weather",message);}

声明队列交换机: 

package com.itheima.consumer.Config;import com.rabbitmq.client.AMQP;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class FanoutConfiguration {@Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange("hmall.fanout");/* return ExchangeBuilder.fanoutExchange("hamll.fanout").build();*/}@Beanpublic Queue fanoutQueue1(){return QueueBuilder.durable("fanout.queue1").build();/*return new Queue("fanout.queue1");*/}@Beanpublic Binding fanoutQueue1Binding(Queue fanoutQueue1,FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}@Beanpublic Queue fanoutQueue2(){return QueueBuilder.durable("fanout.queue2").build();}@Beanpublic Binding fanoutQueue2Binding(Queue fanoutQueue2,FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}}
案例:

法一:
package com.itheima.consumer.Config;import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class DirectConfiguration {@Beanpublic DirectExchange directExchange(){return new DirectExchange("hmall.direct");/* return ExchangeBuilder.fanoutExchange("hamll.direct").build();*/}@Beanpublic Queue directQueue1(){return QueueBuilder.durable("direct.queue1").build();/*return new Queue("direct.queue1");*/}//Direct交换机的Key每次只能绑定一个Key@Beanpublic Binding directQueue1BindingRed(Queue directQueue1,DirectExchange directExchange){return BindingBuilder.bind(directQueue1).to(directExchange).with("red");}@Beanpublic Binding directQueue1BindingBlue(Queue directQueue1,DirectExchange directExchange){return BindingBuilder.bind(directQueue1).to(directExchange).with("blue");}@Beanpublic Queue directQueue2(){return QueueBuilder.durable("direct.queue2").build();/*return new Queue("direct.queue1");*/}//Direct交换机的Key每次只能绑定一个Key@Beanpublic Binding directQueue2BindingRed(Queue directQueue2,DirectExchange directExchange){return BindingBuilder.bind(directQueue2).to(directExchange).with("red");}@Beanpublic Binding directQueue2BindingYellow(Queue directQueue2,DirectExchange directExchange){return BindingBuilder.bind(directQueue2).to(directExchange).with("yellow");}
}
法二(基于注解):

    @RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue1",durable = "true"),exchange = @Exchange(name = "hmall.direct",type = ExchangeTypes.DIRECT),key = {"red","blue"}))/*@RabbitListener(queues = "direct.queue1")*/public void listenDirectQueue1(String message){log.info("消费者1监听到direct.queue1的消息:{}",message);}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue2",durable = "true"),exchange = @Exchange(name = "hmall.direct",type = ExchangeTypes.DIRECT),key = {"red","yellow"}))/*@RabbitListener(queues = "direct.queue2")*/public void listenDirectQueue2(String message){log.info("消费者2监听到 direct.queue2的消息:{}",message);}

消息转换器:

案例:

package com.itheima.publisher.config;import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class MessageConverterConfiguration {@Beanpublic MessageConverter messageConverter(){return new Jackson2JsonMessageConverter();}
}

    @RabbitListener(queues = "object.queue")public void listenObjectQueue(Map<String,Object> message){log.info("消费者2监听到 direct.queue2的消息:{}",message);}

四、业务改造

配置依赖:

        <!--amqp--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

配置yaml文件:

spring:rabbitmq:host: 192.168.50.129 # 你的虚拟机IPport: 5672 # 端口virtual-host: /hmall # 虚拟主机username: hmall # 用户名password: 123 # 密码

在Common中配置一个Json消息转换器:

package com.hmall.common.config;import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class MqConfig {@Beanpublic MessageConverter messageConverter(){return new Jackson2JsonMessageConverter();}
}

交易微服务中置消费者 Consumer--Listener(监听):

package com.hmall.trade.listener;import com.hmall.trade.service.IOrderService;
import lombok.RequiredArgsConstructor;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
@RequiredArgsConstructor
public class PayStatusListener {private final IOrderService orderService;@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "trade.pay.success.queue",durable = "true"),exchange = @Exchange(name = "pay.direct",type = ExchangeTypes.DIRECT),key = {"pay.success"}))public void listenPaySuccess(Long orderId){orderService.markOrderPaySuccess(orderId);}}

 支付微服务中设置Publisher--基于MQ发送消息:

注入 RabbitTemplate  来实现

    private final RabbitTemplate rabbitTemplate;   @Override@Transactionalpublic void tryPayOrderByBalance(PayOrderFormDTO payOrderFormDTO) {// 1.查询支付单PayOrder po = getById(payOrderFormDTO.getId());// 2.判断状态if(!PayStatus.WAIT_BUYER_PAY.equalsValue(po.getStatus())){// 订单不是未支付,状态异常throw new BizIllegalException("交易已支付或关闭!");}// 3.尝试扣减余额/*userService.deductMoney(payOrderFormDTO.getPw(), po.getAmount());*/userClient.deductMoney(payOrderFormDTO.getPw(), po.getAmount());// 4.修改支付单状态boolean success = markPayOrderSuccess(payOrderFormDTO.getId(), LocalDateTime.now());if (!success) {throw new BizIllegalException("交易已支付或关闭!");}//5.修改订单状态//异步通知try {rabbitTemplate.convertAndSend("pay.direct","pay.success",po.getBizOrderNo());} catch (Exception e) {log.error("发送支付状态通知失败,订单id:{}",po.getBizOrderNo(),e);//兜底方案}}

相关文章:

  • 利用java语言,怎样开发和利用各种开源库和内部/自定义框架,实现“提取-转换-加载”(ETL)流程的自动化
  • python+selenium+pytest自动化测试chrome driver版本下载
  • 用Qt和deepseek创建自己的问答系统
  • Oracle DBA 高效运维指南:高频实用 SQL 大全
  • CentOS笔记本合上盖子不休眠
  • CentOS创建swap内存
  • 【音视频】FFmpeg解封装
  • 多路转接poll服务器
  • 一键配置多用户VNC远程桌面:自动化脚本详解
  • taobao.trades.sold.get(淘宝店铺订单接口)
  • go语言八股文
  • 【Nova UI】七、SASS 全局变量体系:组件库样式开发的坚固基石
  • Vue3集成sass
  • 【gpt生成-其二】以go语言为例,详细讲解 并发模型:线程/协程/ Actor 实现
  • sqoop的参数及初体验
  • RHCE 作业二(密钥登录实验)
  • 香港科技大学广州|先进材料学域金融科技学域博士招生宣讲会—天津大学专场!!!(暨全额奖学金政策)
  • 【LLM】Ollama:容器化并加载本地 GGUF 模型
  • 中国人寿财险广西分公司:金融助推乡村振兴全面发展
  • 如何改电脑网络ip地址完整教程
  • 世界读书日丨人均一年超10本!你达到上海平均阅读水平了吗
  • 新任遂宁市委副书记王忠诚已任市政府党组书记
  • 美菲开始举行年度军演,外交部:菲公然站在地区国家的对立面
  • 习近平向加蓬当选总统恩圭马致贺电
  • “我们一直都是面向全世界做生意”,“世界超市”义乌一线走访见闻
  • 花卉引流+商场促销,上海浦东用“花经济”带动“消费热”