当前位置: 首页 > news >正文

java—12 kafka

目录

一、消息队列的优缺点

二、常用MQ

1. Kafka

2. RocketMQ

3. RabbitMQ

4. ActiveMQ

5. ZeroMQ

6. MQ选型对比

适用场景——从公司基础建设力量角度出发

适用场景——从业务场景角度出发

四、基本概念和操作

1. kafka常用术语

2. kafka常用指令

3. 单播消息(group.id相同)

4. 多播消息(group.id不相同)

5. 消费组

​编辑6. 稀疏索引

五、kafka集群

1. 集群搭建

2. 集群启动和验证

3. Topic的意义

4. Topic和Partition

5. 分区

6. 副本

7. 集群操作指令

8. 多分区&多副本

9. 多分区消费组

10. Controller

11. Rebalance机制

12. HW和LEO

六、高频面试题

1. 如何防止消息丢失?

2. 如何防止消息的重复消费?

3. 如何做到顺序消费?

4. 如何解决消息积压的问题?

5. 如何实现延迟队列?

6. Kafka如何做到单机上百万的高吞吐量呢?

7. Kafka高吞吐——非零拷贝技术

8. Kafka高吞吐——零拷贝技术


一、消息队列的优缺点

消息队列的优点:

  1. ① 实现系统解耦
  2. ② 实现异步调用
  3. ③ 流量削峰

消息队列的缺点:

  1. ① 系统可用性降低
  2. ② 提升系统的复杂度
  3. ③ 数据一致性问题

二、常用MQ

1. Kafka

Apache Kafka是一个分布式消息发布订阅系统。它最初由LinkedIn公司基于独特的设计实 现为一个分布式的日志提交系统(a distributed commit log),之后成为Apache项目的一 部分。Kafka性能高效、可扩展良好并且可持久化。它的分区特性,可复制和可容错都是 其不错的特性。

优点: 客户端语言丰富:支持Java、.Net、PHP、Ruby、Python、Go等多种语言 高性能:单机写入TPS约在100万条/秒,消息大小10个字节; 提供完全分布式架构,并有replica机制,拥有较高的可用性和可靠性,理论上支持消息无限堆积; 消费者采用Pull方式获取消息。消息有序,通过控制能够保证所有消息被消费且仅被消费一次; 在日志领域比较成熟,被多家公司和多个开源项目使用。有管理界面Kafka-Manager;

缺点: Kafka单机超过64个队列/分区时,Load时会发生明显的飙高现象。队列越多,负载越高,发送消息响 应时间变长; 使用短轮询方式,实时性取决于轮询间隔时间; 消费失败不支持重试; 社区更新较慢。

2. RocketMQ

RocketMQ出自阿里的开源产品,用Java语言实现,在设计时参考了Kafka,并做出了自 己的一些改进,消息可靠性上比Kafka更好。RocketMQ在阿里内部被广泛应用在订单, 交易,充值,流计算,消息推送,日志流式处理,binglog分发等场景。

优点:单机支持1万以上持久化队列; RocketMQ的所有消息都是持久化的,先写入系统PAGECACHE,然后刷盘,可以保证内存与磁盘都有 一份数据,而访问时,直接从内存读取。 模型简单,接口易用(JMS的接口很多场合并不太实用); 性能非常好,可以允许大量堆积消息在Broker中; 支持多种消费模式,包括集群消费、广播消费等; 各个环节分布式扩展设计,支持主从和高可用;开发度较活跃,版本更新很快。

缺点:支持的客户端语言不多,目前是Java及C++,其中C++还不成熟; RocketMQ社区关注度及成熟度也不及Kafka; 没有Web管理界面,提供了一个 CLI (命令行界面) 管理工具带来查询、管理和诊断各种问题; 没有在MQ核心里实现JMS等接口;

3. RabbitMQ

RabbitMQ于2007年发布,是一个在AMQP(高级消息队列协议)基础上完成的,可复用的企 业消息系统,是当前最主流的消息中间件之一。它提供了多种技术可以让你在性能和可靠 性之间进行权衡。这些技术包括持久性机制、投递确认、发布者证实和高可用性机制;灵 活的路由,消息在到达队列前是通过交换机进行路由的。RabbitMQ为典型的路由逻辑提 供了多种内置交换机类型。

优点:由于Erlang语言的特性,消息队列性能较好,支持高并发; 健壮、稳定、易用、跨平台、支持多种语言、文档齐全; 有消息确认机制和持久化机制,可靠性高; 高度可定制的路由; 管理界面较丰富,在互联网公司也有较大规模的应用,社区活跃度高。

缺点:实现了代理架构,意味着消息在发送到客户端之前可以在中央节点上排队。此特性使得RabbitMQ易 于使用和部署,但是使得其运行速度较慢,因为中央节点增加了延迟,消息封装后也比较大;需要学习比 较复杂的接口和协议,学习和维护成本较高。 尽管结合 Erlang 语言本身的并发优势,性能较好,但是不利于做二次开发和维护;

4. ActiveMQ

ActiveMQ是由Apache出品,ActiveMQ是一个完全支持JMS1.1和J2EE 1.4规范的JMS Provider实现。它非常快速,支持多种语言的客户端和协议,而且可以非常容易的嵌入到 企业的应用环境中,并有许多高级功能。

优点:可以用JDBC:可以将数据持久化到数据库。虽然使用JDBC会降低ActiveMQ的性能,但是数据库一直都 是开发人员最熟悉的存储介质; 支持JMS规范:支持JMS规范提供的统一接口; 支持自动重连和错误重试机制; 有安全机制:支持基于shiro,jaas等多种安全配置机制,可以对Queue/Topic进行认证和授权; 监控完善:拥有完善的监控,包括WebConsole,JMX,Shell命令行,Jolokia的RESTful API; 界面友善:提供的WebConsole可以满足大部分情况,还有很多第三方的组件可以使用,比如hawtio;

缺点:社区活跃度不及RabbitMQ高; 根据其他用户反馈,会出莫名其妙的问题,会丢失消息; 目前重心放到activemq6.0产品Apollo,对5.x的维护较少; 不适合用于上千个队列的应用场景;

5. ZeroMQ

号称史上最快的消息队列,它实际类似于Socket的一系列接口,他跟Socket的区别是:普 通的socket是端到端的(1:1的关系),而ZMQ却是可以N:M 的关系,人们对BSD套接字 的了解较多的是点对点的连接,点对点连接需要显式地建立连接、销毁连接、选择协议 (TCP/UDP)和处理错误等,而ZMQ屏蔽了这些细节,让你的网络编程更为简单。ZMQ用 于node与node间的通信,node可以是主机或者是进程。

引用官方的说法: “ZMQ(以下ZeroMQ简称ZMQ)是一个简单好用的传输层,像框架一样的 一个socket library,他使得Socket编程更加简单、简洁和性能更高。是一个消息处理队列 库,可在多个线程、内核和主机盒之间弹性伸缩。ZMQ的明确目标是“成为标准网络协议栈 的一部分,之后进入Linux内核”。

与RabbitMQ相比,ZMQ并不像是一个传统意义上的消息队列服务器,事实上,它也根本不 是一个服务器,更像一个底层的网络通讯库,在Socket API之上做了一层封装,将网络通 讯、进程通讯和线程通讯抽象为统一的API接口。支持“Request-Reply “,”Publisher Subscriber“,”Parallel Pipeline”三种基本模型和扩展模型。

6. MQ选型对比

总结:消息队列的选型需要根据具体应用需求而定,ZeroMQ小而美,RabbitMQ大而稳,Kakfa和RocketMQ快而强劲。

适用场景——从公司基础建设力量角度出发

中小型软件公司,建议选RabbitMQ 首先:erlang语言天生具备高并发的特性,而且他的管理界面用起来十分方便。他的弊端 也很明显,虽然RabbitMQ是开源的,然而国内有几个能定制化开发erlang的程序员呢?所幸, RabbitMQ的社区十分活跃,可以解决开发过程中遇到的bug,这点对于中小型公司来说十分重 要。

其次:不考虑Kafka的原因是,一方面中小型软件公司不如互联网公司,数据量没那么大, 选消息中间件,应首选功能比较完备的,所以kafka排除。 最后:不考虑RocketMQ的原因是,RocketMQ是阿里出品,如果阿里放弃维护RocketMQ, 中小型公司一般抽不出人来进行RocketMQ的定制化开发,因此不推荐。

大型软件公司 根据具体使用在RocketMQ和kafka之间二选一。 一方面,大型软件公司,具备足够的资金搭建分布式环境,也具备足够大的数据量。 针对RocketMQ,大型软件公司也可以抽出人手对RocketMQ进行定制化开发,毕竟国内有 能力改JAVA源码的人,还是相当多的。 至于kafka,根据业务场景选择,如果有日志采集功能,肯定是首选kafka了。

适用场景——从业务场景角度出发

RocketMQ定位于非日志的可靠消息传输(日志场景也OK),目前RocketMQ在阿里集团被 广泛应用在订单,交易,充值,流计算,消息推送,日志流式处理,binglog分发等场景。

Kafka是LinkedIn开源的分布式发布-订阅消息系统,目前归属于Apache顶级项目。Kafka主 要特点是基于Pull的模式来处理消息消费,追求高吞吐量,一开始的目的就是用于日志收集 和传输。0.8版本开始支持复制,不支持事务,对消息的重复、丢失、错误没有严格要求, 适合产生大量数据的互联网服务的数据收集业务。

RabbitMQ是使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现。AMQP的主 要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。AMQP协议更 多用在企业系统内,对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要 求还在其次。

三、安装

四、基本概念和操作

1. kafka常用术语

名词解释
Broker【节点】一个Kafka节点就是一个Broker,一个和多个Broker可以组成 一个Kafka集群
Topic【主题】Kafka根据topic对消息进行归类,发布到kafka集群的每套消 息都需要指定一个topic,topic是一个逻辑概念,物理上是不存在的
Producer【生产者】用于向Kafka中发送消息
Consumer【消费者】从Kafka中获取消息
Consumer Group【消费组】每个Consumer都会归属于一个消费组,一条消息可以同时 被多个不同的消费组消费,但是只能被一个消费组中的消费者消费
Partition【分片】物理上的概念,可以将一个topic上的数据拆分为多分放到 Partition中,每个Patition内部的消息是有序的。

2. kafka常用指令

创建主题

  • ./kafka-topics.sh --bootstrap-server localhost:9092 --create --topic muse -- partitions 1 --replication-factor 1

查看主题

  • ./kafka-topics.sh --list --bootstrap-server localhost:9092

开启消息生产端

  • ./kafka-console-producer.sh --topic muse --bootstrap-server localhost:9092

开启消息消费端

  • ./kafka-console-consumer.sh --topic muse --bootstrap-server localhost:9092
  • ./kafka-console-consumer.sh --topic muse --bootstrap-server localhost:9092 -- from-beginning

消息是会被存储在kafka中的文件里的,并且是顺序存储的,消息有偏移量的概念,所以我们可以指定偏移量去读取某个位置的消息。

3. 单播消息(group.id相同)

一个消费组里,只会有一个消费者能够消费到某个topic中的消息。

首先,打开两个窗口,分别执行如下语句开启消费端,那么就在“museGroup1”消费组中创 建了两个Consumer

  • ./kafka-console-consumer.sh --topic muse --bootstrap-server localhost:9092 -- consumer-property group.id=museGroup1

然后:Producer端发送3条消息,我们发现,只有一个Consumer收到了消息。

4. 多播消息(group.id不相同)

当业务场景中,需要同一个topic下的消息被多个消费者消费,那么我们可以采用创建多个消费组的方 式,那么这种方式就是多播消息。打开两个窗口,分别执行如下指令:

  • ./kafka-console-consumer.sh --topic muse --bootstrap-server localhost:9092 --consumer property group.id=museGroup1
  • ./kafka-console-consumer.sh --topic muse --bootstrap-server localhost:9092 --consumer property group.id=museGroup2

最后,Producer端发送3条消息,我们发现,两个Consumer都收到了消息。如下所示:

5. 消费组

查看当前主题下有哪些消费组

  • ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

查看消费组中的具体信息

  • ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group museGroup1 --describe

其中,展现出来的信息有如下含义:

  • CURRENT-OFFSET:当前消费组已消费消息的偏移量。
  • LOG-END-OFFSET:主题对应分区消息的结束偏移量(水位——HW)。
  • LAG:当前消费组堆积未消费的消息数量。

__consumer_offsets-N

kafka默认创建了一个拥有50个分区的主题,名称为:“__consumer_offsets”。

consumer会将消费分区的offset提交给kafka内部topic:__consumer_offsets,提交过去 的时候,【key】=consumerGroupID+topic+分区号,【value】=当前offset的值。

Kafka会定期清理topic里的消息,最后就保留最新的那条数据。可以通过如下公式确定 consumer消费的offset要提交到哪个__consumer_offsets   hash(consumerGroupID)% 主题“__consumer_offsets”的分区数量

6. 稀疏索引

五、kafka集群

1. 集群搭建

创建kafka-cluster目录,并解压kafka_2.13-3.0.0.tgz为3份kafka

修改kafka1的server.propertis配置文件

  • broker.id=10 listeners=PLAINTEXT://localhost:9093 log.dirs=/Users/muse/Lesson/ServerContext/kafka-cluster/kafka1/kafka-logs

修改kafka2的server.propertis配置文件

  • broker.id=11 listeners=PLAINTEXT://localhost:9094 log.dirs=/Users/muse/Lesson/ServerContext/kafka-cluster/kafka2/kafka-logs

修改kafka3的server.propertis配置文件

  • broker.id=12 listeners=PLAINTEXT://localhost:9095 log.dirs=/Users/muse/Lesson/ServerContext/kafka-cluster/kafka3/kafka-logs

2. 集群启动和验证

启动kafka1,kafka2和kafka3

  • ./kafka1/bin/kafka-server-start.sh -daemon ./kafka1/config/server.properties
  • ./kafka2/bin/kafka-server-start.sh -daemon ./kafka2/config/server.properties
  • ./kafka3/bin/kafka-server-start.sh -daemon ./kafka3/config/server.properties

验证3个Broker是否启动成功

  • 首先,可以通过ps -ef | grep kafka来查看进程是否启动
  • 其次:在zookeeper中查看/brokers/ids下中是否有相应的brokerId目录生成

3. Topic的意义

试想一下,当我们要尝试发送 /消费消息的时候需要注意什 么呢?“这有啥需要注意的, 发送不就得了!” 结果,我们发现了一个非常重 大的问题,大家都往Kafka中 发送消息,所有的消息都混合 在了一起,就类似所有快递公 司的快递员(Producer端)把 快递都扔到了一个大仓库里, 结果,去取快递的小伙伴们 (Consumer端)面对堆积如 山且混乱不堪的“快递山”—— 疯了。。。

4. Topic和Partition

5. 分区

一个主题中的消息量是非常大的,因此可以通过分区的设置,来分布式存储这些消息。 并且分区也可以提供消息并发存储的能力。

6. 副本

如果分片挂掉了,数据就丢失了。那么为了提高系统的可用性。我们把分片复制多个,这 就是副本了。但是,副本的产生,也会随之带来数据一致性的问题,即:有的副本写数据 成功,但是有的副本写数据失败。

Leader:kafka的读写操作都发生在leader上,leader负责把数据同步给follower。 当leader挂掉了,那么经过主从选举,从多个follower中选举产生一个新的leader。

Follower:follower接收leader同步过来的数据,它不提供读写(主要是为了保证多副本数据与消费 的一致性)

7. 集群操作指令

为名称为muse-rp的Topic创建2个分区(--partitions)3个副本(--replication-factor)

  • ./kafka-topics.sh --create --topic muse-rp --bootstrap-server localhost:9093 localhost:9094 localhost:9095 --partitions 2 --replication-factor 3

删除Topic

  • ./kafka-topics.sh --delete --topic muse-rp --bootstrap-server localhost:9093 localhost:9094 localhost:9095

查看分区和副本分布情况

  • ./kafka-topics.sh --describe --topic muse-rp --bootstrap-server localhost:9093 localhost:9094 localhost:9095

消息发送和消费

  • ./kafka-console-producer.sh --topic muse-rp --bootstrap-server localhost:9093 localhost:9094 localhost:9095
  • ./kafka-console-consumer.sh --topic muse-rp --bootstrap-server localhost:9093 localhost:9094 localhost:9095 --consumer-property group.id=museGroup1

8. 多分区&多副本

为名称为muse-rp的Topic创建---------------------------- 2个分区(--partitions) 3个副本(--replication-factor)

9. 多分区消费组

一个partition只能被一个消费组中的一个消费者消费,这样设计的目的是保证消息的有序 性,但是在多个partition的多个消费者消费的总顺序性是无法得到保证的。

partition的数量决定了消费组中Consumer的数量,建议同一个消费组中的Consumer数量 不要超过partition的数量,否则多余的Consumer就无法消费到消息了。

但是,如果消费者挂掉了,那么就会触发rebalance机制,会由其他消费者来消费该分区。

10. Controller

Kafka集群中的Broker在ZK中创建临时序号节点,序号最小的节点也就是最先创建的那个 节点,将作为集群的Controller,负责管理整个集群中的所有分区和副本的状态。

Controller控制器的作用如下:

  • ① 当某个分区的leader副本出现故障时,由控制器负责为该分区选举出新的leader副本。
  • ② 当检测到某个分区的ISR集合发生变化时,由控制器负责通知所有broker更新其元数 据信息。
  • ③ 当使用kafka-topics.sh脚本为某个topic增加分区数量时,同样还是由控制器负责让 新分区被其他节点感知到。

11. Rebalance机制

当消费者没有指明分区消费时,消费组里的消费者和分区关系发生了变化,那么就会触发 rebalance机制。这个机制会重新调整消费者消费哪个分区。

在触发rebalance机制之前,消费者消费哪个分区有3种策略:

1> range:通过公式来计算某个消费者消费哪个分区。

2> 轮询:大家轮流对分区进行消费。

3> sticky:在触发rebalance之后,在消费者消费的原分区不变的基础上进行调整。

12. HW和LEO

HW(HighWatermark)俗称高水位,取一个partition对应的ISR中最小的LEO(log-end offset)作为HW;

Consumer最多只能消费到HW所在的位置,每个副本都有HW,Leader和Follower各自负 责更新自己的HW的状态。

对于Leader新写入的消息,Consumer不能立刻消费,Leader会等待该消息被所有ISR中的 副本同步后更新HW,此时消息才能被Consumer所消费。这样就能保证如果Leader所在的 broker失效,该消息仍然可以从新选举的Leader中获取。

具体逻辑请看下一张图:

六、高频面试题

1. 如何防止消息丢失?

针对发送方:将ack设置为1或者-1/all可以防止消息丢失,如果要做到99.9999%,ack要设 置为all,并把min.insync.replicas配置成分区备份数

针对接收方:把自动提交修改为手动提交(enable-auto-commit)。

2. 如何防止消息的重复消费?

一条消息被消费者消费多次,如果为了不消费到重复的消息,我们需要在消费 端增加幂等性处理,例如:

  • ① 通过mysql插入业务id作为主键,因为主键具有唯一性,所以一次只能插 入一条业务数据。
  • ② 使用redis或zk的分布式锁,实现对业务数据的幂等操作。

3. 如何做到顺序消费?

针对发送方:在发送时将ack配置为非0,确保消息至少同步到leader之后再返回ack继续发 送。但是,只能保证分区内部的消息是顺序的,而无法保证一个Topic下的多个分 区总的消息是有序的。

针对接收方:消息发送到一个分区中,只配置一个消费组的消费者来接收消息,那么这个 Consumer所接收到的消息就是有顺序的了,不过这也就牺牲掉了性能。

4. 如何解决消息积压的问题?

消息积压会导致很多问题,比如:磁盘被打满、Producer发送消息导致kafka 性能过慢,然后就有可能发生服务雪崩。解决的方案如下所示:

① 提升一个Consumer的处理能力。即:在一个消费者中启动多个线程,让 多线程加快消费的速度。

② 提升总体Consumer的处理能力。启动多个消费组,增加Consumer的数量 从而提高消费能力。

③ 如果业务运行,设定某个时间内,如果消息仍没有被消费,那么Consumer收到消息后,直接废弃掉,不执行下面的业务逻辑。

5. 如何实现延迟队列?

应用场景:订单创建成功后如果超过30分钟没有付款,则需要取消订单。

  • ① 创建一个表示“订单30分钟未支付”的Topic,如:order_not_paid_30min, 表示延迟30分钟的消息队列。
  • ② Producer发送消息的时候,消息内容要带上订单生成的时间create_time。
  • ③ Consumer消费Topic中的消息,如果发现now减去create_time不足30分钟, 则不去消费;记录当前的offset,不去消费当前以及之后的消息。
  • ④ 通过记录的offset去获取消息,如果发现消息已经超过30分钟且订单状态 是“未支付”,那么则将订单状态设置为“取消”,然后获取下一个offset的 消息。

6. Kafka如何做到单机上百万的高吞吐量呢?

  • 写入数据:主要是依靠页面缓存技术 + 磁盘顺序写实现的。
  • 读取数据:主要依靠零拷贝技术实现的。

7. Kafka高吞吐——非零拷贝技术

通过下图过程的描述,很明显可以看到存在两次没必要的copy,一次是从OS Cache里拷 贝到Kafka进程的缓存里,接着又从Kafka进程的缓存里拷贝回OS的Socket缓存里。而且 为了进行这两次拷贝,中间还发生了好几次上下文切换,一会儿是Kafka进程在执行,一 会儿上下文切换到操作系统来执行,所以这种方式来读取数据是比较消耗性能的。

8. Kafka高吞吐——零拷贝技术

Kafka为了解决非零拷贝这个问题,在读数据的时候是引入零拷贝技术。也就是说,通过 OS的sendfile技术直接让OS Cache中的数据发送到网卡后传输给下游的消费者,中间跳 过了两次拷贝数据的步骤。通过零拷贝技术,就不需要把OS Cache里的数据拷贝到应用 缓存,再从应用缓存拷贝到 Socket 缓存了,两次拷贝都省略了,所以叫做零拷贝

相关文章:

  • [特殊字符][特殊字符] HarmonyOS相关实现原理聊聊![特殊字符][特殊字符]
  • BY免费空间去掉?i=1
  • 使用eclipse将原有tomcat插件工程调整为的Dynamic Web Module工程(保姆级教程)
  • 原生微信小程序,canvas生成凭证,保存到手机
  • 数据结构-选择排序(Python)
  • 配置RSUniVLM环境(自用)
  • 多模态大模型 Qwen2.5-VL 的学习之旅
  • 无标注文本的行业划分(行业分类)算法 —— 无监督或自监督学习
  • 以太网的mac帧格式
  • 优化uniappx页面性能,处理页面滑动卡顿问题
  • WebServiceg工具
  • 中心极限定理(CLT)习题集 · 题目篇
  • 深入浅出学会函数(上)
  • C++ 模板特化 (Template Specialization)
  • 如何规避矩阵运营中的限流风险及解决方案
  • springboot整合redis实现缓存
  • mapbox高阶,高程影像、行政区边界阴影效果实现
  • Windows 安装 JDK
  • Qt 处理 XML 数据
  • HarmonyOS:一多能力介绍:一次开发,多端部署
  • 外交部:美国是国际军控与防扩散体系的最大破坏者
  • 岭南非遗大IP来上海了,舞剧《英歌》在文化广场连演两场
  • 商务部谈中欧汽车谈判进展
  • 世卫发布预防少女怀孕新指南,呼吁终止童婚、延长女孩受教育时间
  • 习近平举行仪式欢迎肯尼亚总统鲁托访华
  • 王励勤当选中国乒乓球协会新一任主席