当前位置: 首页 > news >正文

消息队列知识点详解

消息队列场景

什么是消息队列

可以把消息队列理解一个使用队列来通信的组件,它的本质是交换机+队列的模式,实现发送消息,存储消息,消费消息的过程。

 我们通常说的消息队列,MQ其实就是消息中间件,业界中比较常用的消息中间件有:RabbiyMQ、RockMQ、Kafka等

消息队列的选型

选型的时候我们需要根据业务的场景,结合上述特性来进行选型。

消息队列的使用场景

  • 解耦:在分布式系统中,可以通过消息队列进行异步通信,不再使用OpenFeign等HTTP的网络之间进行通信,这样项目中就不会存在耦合,系统也不会有太大的影响,即使一个系统挂了,请求消息也只是堆积在消息队列中,不会对其他系统造成影响。
  • 异步:加入一个操作涉及到了好几个不同的步骤或者是分布式系统,这些不需要同步执行,那么可以考虑消息队列进行异步操作。假设用于有一个创建订单操作,其中涉及到客户轨迹添加,更新库存,创建订单,拉起支付等功能。那么如果我们使用OpenFeign等网络调用链进行调用,那么此时会产生大量的时间,客户是无法接受的。并且其中像客户轨迹添加这样的操作是不需要同步的,如果使用MQ将客户创建订单时,将后面的所有操作全部放到MQ进行异步操作,随后返回成功信息,这样就可以加快系统的访问速度。
  • 削峰:一个系统访问有流量高峰期,也有流量低峰期,假设12306的购票活动,需要用户创建订单->锁定余票库存->拉起支付->更新记录等,但是此时如果有几千万人次进行抢票,我们的支付服务可能受不了这么大的并发,可能会直接挂掉,所以这里可以加一个MQ,将微服务调用链发给支付服务的所有请求堆在MQ中,在支付服务的最大限度内进行处理,这样就会避免服务崩溃的问题。

消息重复的消费怎么做

生产端为了保证数据的发送成功,可能会重复推送消息直到收到成功ACK(网络、消息队列异常),这样就会在队列内产生重复的消息,一个成熟的MQ框架会有自己的解决方案,比如用空间换时间,存储已经处理过的message_id,给生产者提供一个可靠的发送消息的接口。

但是消费者段却无法根本的解决这个问题,在不高并发的要求下,拉取消息+处理业务逻辑+提交消费者偏移量需要事务做处理,且消费者端可能会挂掉,很可能导致拉取到重复的消息。

这种解决办法也就只有在业务层面做控制,对于已经成功的消息,本地数据库或者缓存来存储业务标识,每次处理前先进行校验,保证幂等性。

消息丢失怎么解决

使用一个消息队列,其实就分为三大块:生产者,中间件,消费者,如果要保证数据的不丢失,就要保证这三个部分不会出问题。

  • 消息生产阶段:生产者会不会丢失消息,取决于消息生产者对异常的处理是否合理。丛消息被生产出来,然后提交给MQ的过程中,只要能正常收到MQ消息队列的ack确认消息,就表示发送成功,所以只要处理好返回值和异常,如果返回错误,就进行重发,那么这个阶段是不会出现问题的。
  • 消息存储阶段:这里使用kafka为例,kafka一般是使用集群部署的,生产者在发送消息时,消息队列一般会写多个节点,一般是多个节点,可以理解为多个备份,即便一个节点挂了,也会保证数据不丢失。
  • 消息消费阶段:消费者接受到消息+处理完毕后,并回复ack的话,那么消息消费阶段就不会消失,这样一来即使其中一个节点挂了,也可以保证数据不丢失

以上三点只要有一个疏漏就会导致消息的丢失

消息队列的可靠性是怎么保证的

  • 消息持久化:确保消息能够持久化是非常关键的,在系统掉点、系统崩溃重启后仍然可以读取持久化的消息并放到消息队列中
  • 消息确认机制:消费者在成功消费消息后,应该向消息队列发送确认,消息队列只有收到确认后才会将消息丛队列中移除。如果没有收到确认,则会在一定时间内重发该消息。在kafka中,消费者通过commitasync提交便宜量,从而确认消息。
  • 消息重试策略:当消费者处理消息失败时,需要有合理的重试策略,可以设置重试次数以及重试间隔时间。

消息队列的顺序性怎么保证的

以kafka为例,可以将顺序的消息发送到同一个主题的同一个分区来保证消息是有序的,但是这个可能会影响消费者的并行处理速度,并且消费者进行消费的时候必须单线程处理顺序消息。

如何保证幂等性写

幂等性写指的是同一操作的多次执行的结果和一次执行的结果相同。假设支付宝付款操作,多次执行但是只会扣一次钱。

  • 唯一标识:为每一个请求生成全局唯一的id,服务端校验该id是否已处理,适用于场景接口调用,消息消费等。
  • 乐观锁:通过版本号或者时间戳等方式控制并发更新,确保多次更新等同于单次操作,适用于场景数据库记录的更新
  • 数据库唯一约束:使用数据库的唯一索引来限制重复的插入,适用于插入的场景
  • 分布式锁:通过锁机制保证一个时刻只有一个请求执行关键操作,适用于高并发场景下的资源争夺。

如何保证数据的一致性,事务消息如何实现。

这里直接举一个用户发起创建订单的例子

假设用户点击支付后,首先创建订单(包含订单表写入、库存表更新等数数据库更新操作),事务完成后拉起支付服务(下游服务),此时我们的需求是,当事务执行出错(发生rollback),不再调用下游服务,当事务执行成功并commit,必须拉起下游服务执行支付操作,但是此时万一发送消息不成功,下游操作就无法感知这个操作,出现数据不一致。

  • 生产者产生消息,发送一条半事务消息到MQ服务器
  • MQ收到消息后,将消息持久化道存储系统,这条消息是待发送的状态
  • MQ服务器返回ACK给生产者,将消息持久化到存储系统,这条消息不会触发推送事件
  • 生产者执行本地事务
  • 如果本地事务执行成功,即commit执行结果到MQ服务器,如果执行失败,发送rollback
  • 如果是正常的commit,MQ服务器更新消息状态为可发送,如果是rollback则删除消息
  • 如果消息更新为可推送,则MQ服务器直接将消息push给消费者,消费者消费完返回ACK
  • 如果MQ服务器长时间收不到生产者的commit或者rollback,他会反查生产者。

kafka

kafka的特点

  • 高吞吐量,低延迟:kafka每秒可以处理几十万条消息,他的最低延迟也只有几毫米,每个topic可以分为对歌partition,consumer group对partition进行consumer操作
  • 可扩展性:kafka集群支持扩展
  • 持久性:消息被持久化到本地磁盘,并且支持数据备份
  • 容错性:允许集群中及节点失败
  • 高并发:数千个客户端同时读写

kafka为什么这么快?

  • 顺序写入优化:kafka将消息顺序写入磁盘,减少了磁盘的寻道时间,这种方式比随机读写效率更高
  • 批处理技术:kafka支持批量发送消息,这意味着生产者在发送消息的时候可以等待到有足够数据量囤积的时候再发送,这种方法减少了网络和磁盘的开销
  • 零拷贝技术:kafka使用零拷贝技术,可以直接将数据从磁盘里发送到网络套接字,避免了再用户空间和内核空间之间相互转换
  • 压缩技术:kafka支持消息压缩,这不仅减少了网络的传输的数据量,提高整体的吞吐量

介绍下Kafka的模型,kafka是推送还是拉取

消费者模型

推送模型(push)

  • 基于推送模型的消息系统,由消息代理记录消费者的状态
  • 消息代理在将消息推送到消费者后,标记这条消息已经消费,但是这种方法无法很好的保证数据被处理
  • 如果要保证数据被处理,需要消息代理记录消息的所有状态,这种太消耗资源,不可取
  • push模式是设置MQ中的,他无法适应消费者的速率,过快会导致消费者拒绝服务,过慢会导致队列内消息积压

拉取模型(pull)

kafka采用的是拉取模型,由消费者自己记录消费的状态,每个消费者顺序的拉取每个分区的消息

  • 两个消费者拉取同一主题的消息,消费者A的消费进度是3,B的进度为6,也叫偏移量
  • 消费者拉取的最大上限称为最高水位,生产者最新写入的消息如果还没有到达备份的数量,那么这个消息对消费者不可见
  • 这种优点是:消费者可以任意控制偏移量来消费任意时刻的消息。

消费者组

kafka中的消费者是以消费者组的形式工作,由一个或者多个消费者构成消费者组,共同消费一个topic,但是每一个分区在同一时刻只能被同一消费者组中的一个消费者消费,但是不同消费者组可以在同一时刻消费一个分区。

消费方式

kafka采用拉取pull的方式从broker中读取数据

优点是pull模式可以根据consumer的消费能力进行拉取

缺点是如果此时kafka中没有数据,消费者可能会陷入循环,一致在拉取null数据,针对这一点kafka的消费者在消费数据时会传入一个时常参数,如果没有数据,consumer会等待一段时间返回,时常为timeout

kafka为什么一个分区只能由消费者组的一个消费者消费

如果两个消费者负责同一个分区,那么就意味着两个消费者同时读取分区的消息,由于消费者自己可以控制读取消息的offset,可能c1才读到2,c2读到1,那么当c1没读完的时候,c2已经读到3了,这样会造成消息浪费,相当于多线程同时读取一个消息,这是没意义的

消息中间件是如何做到高可用的

消息中间件如何保证高可用呢,单机是没有高可用可言的,高可用都是对集群来说的。

kafka的基础架构,由多个broker组成,每个broker都是一个节点,当你创建一个topic时,他可以划分为partion,而每个partion只放一部分数据,分别存在不同的broker上,也就是说,一个topic数据是分散在不同的broker上的,每个及其存放一部分数据

那么是不是broker挂了partion就挂了?其实在kafka0.8之后提供了副本机制来保证高可用,即每个partion的数据会同步到其他的机器上,形成副本,然后所有副本会选取一个leader出来,让leader和生产者和消费者打交道,其他副本都是follower。写数据时,leader负责把数据同步给所有的follower,读消息时,直接读leader上的数据即可。如果某个broker挂掉了,那么这个broker的partion在其他机器上都有副本的,如果挂的是leader,那么会从follower会重新选择一个leader

相关文章:

  • 基于RK3588+FPGA+AI YOLO的无人船目标检测系统(一)概述
  • 每天五分钟机器学习:凸优化
  • SBTI科学碳目标认证有什么要求?SBTI认证的好处?
  • MES系统中标签模板设计框架与实现思路
  • C++常用函数合集
  • 考研单词笔记 2025.04.21
  • qt画一朵花
  • elasticsearch7.15节点磁盘空间满了迁移数据到新磁盘
  • 【系统架构设计师】信息安全的概念
  • 每天学一个 Linux 命令(30):cut
  • OpenFeign 使用教程:从入门到实践
  • 线性代数-矩阵的秩
  • jvm-获取方法签名的方法
  • redis常用的五种数据类型
  • 如何使用UE Cesium插件实现网页端无算力负担访问?
  • Spring MVC 一个简单的多文件上传
  • 用自然语言指令构建机器学习可视化编程流程:InstructPipe 的创新探索
  • iTwin Tools函数拆解
  • Hiera:一款简洁的层次化视觉transformer
  • 基于SA模拟退火算法的车间调度优化matlab仿真,输出甘特图和优化收敛曲线
  • 工人日报评一些旅行社不收记者律师:“拒客黑名单”暴露心虚病
  • 豫章故郡,剑指演艺经济新高地
  • 纪念沈渭滨︱沈渭滨先生与新修《清史》
  • 商务部新闻发言人就美国以关税手段胁迫其他国家限制对华经贸合作事答记者问
  • 美国多地举行抗议活动,特朗普经济政策支持率创新低
  • 上海警方:男子拍摄女性视频后在网上配发诱导他人违法犯罪文字,被行拘