当前位置: 首页 > news >正文

Kafka详解——介绍与部署

1. 什么是 Kafka?

Kafka 是一个分布式的消息队列系统,最初由 LinkedIn 开发,后来成为 Apache 开源项目。它的主要用途包括实时数据处理、日志收集、数据流管道构建等。Kafka 具备高吞吐量、可扩展性、持久性和容错性,广泛应用于大数据和实时流处理场景。

核心概念

  1. Producer(生产者):负责向 Kafka 发送消息的数据发布方。
  2. Consumer(消费者):从 Kafka 读取消息的应用程序。
  3. Topic(主题):消息的分类,生产者将消息发布到特定的 Topic,消费者订阅感兴趣的 Topic。
  4. Partition(分区):每个 Topic 可划分为多个分区,支持并行处理,提高吞吐量。
  5. Broker(代理):Kafka 集群中的服务器节点,负责存储数据和处理请求。
  6. Zookeeper:用于管理 Kafka 集群的元数据,跟踪 Broker、Partition 以及消费者的状态。

Kafka 的工作流程

  1. 生产者发送消息

    生产者将消息发布到指定的 Topic。Topic 内部有多个分区,生产者按照轮询或键哈希的方式将消息分配到不同分区。
  2. Broker 存储消息

    Broker 接收到消息后,持久化到磁盘,并为每条消息分配一个偏移量(offset)。
  3. 消费者读取消息

    消费者订阅 Topic,从对应的分区读取消息,并记录已消费的偏移量,确保数据不重复、不丢失。

Kafka 的特点

  • 高吞吐量:支持大规模消息处理,利用分区并行处理能力。
  • 持久化存储:消息持久化到磁盘,支持数据恢复。
  • 水平扩展:通过增加 Broker 扩容,支持百万级 TPS(每秒事务处理量)。
  • 容错性:分区副本机制确保数据高可用,Broker 故障时其他节点能接管。
  • 流处理能力:与 Kafka Streams、Flink 等工具结合,支持实时数据处理。

常见应用场景

  1. 日志收集:收集服务器日志,统一传输到大数据平台。
  2. 监控系统:实时收集应用程序指标,构建监控告警系统。
  3. 数据管道:作为数据流的中间件,在系统间传输和处理数据。
  4. 消息队列:解耦系统,提高服务扩展性和容错能力。

2. Kafka的部署

前置准备:需要有3台免密互通的虚拟机并安装好JAVA环境,未安装可参考:

本地部署大数据集群前置准备https://blog.csdn.net/m0_73641796/article/details/145994787?spm=1001.2014.3001.5501

2.1 Zookeeper安装

上传Zookeeper并修改配置文件

tar -zxf apache-zookeeper-3.7.1-bin.tar.gz -C /export/server/
cd /export/server/
mv apache-zookeeper-3.7.1-bin/ zookeeper
cd zookeeper/
mkdir zkData
cd zkData/
echo "1" >> myid;
cd ../conf/
mv zoo_sample.cfg zoo.cfg
vim zoo.cfg
# 以下内容为修改内容
dataDir=/export/server/zookeeper/zkData

# 以下内容为新增内容
server.1=node1:2888:3888
server.2=node2:2888:3888
server.3=node3:2888:3888

分发Zookeeper

cd /export/server/
scp -r zookeeper node2:`pwd`/
scp -r zookeeper node3:`pwd`/

# 分别将不同虚拟机/export/server/zookeeper/zkData目录下myid文件进行修改
vim /export/server/zookeeper/zkData/myid

# node1:1
# node2:2
# node3:3

封装启停脚本

cd /usr/bin
vim zk.sh
--添加如下内容:
#!/bin/bash

case $1 in
"start"){
	for i in node1 node2 node3
	do
        echo ---------- zookeeper $i 启动 ------------
		ssh $i "/export/server/zookeeper/bin/zkServer.sh start"
	done
};;
"stop"){
	for i in node1 node2 node3
	do
        echo ---------- zookeeper $i 停止 ------------    
		ssh $i "/export/server/zookeeper/bin/zkServer.sh stop"
	done
};;
"status"){
	for i in node1 node2 node3
	do
        echo ---------- zookeeper $i 状态 ------------    
		ssh $i "/export/server/zookeeper/bin/zkServer.sh status"
	done
};;
esac

# 给zk.sh文件授权
chmod 777 zk.sh

2.2 启停Zookeeper

# 启动ZK服务
zk.sh start
# 查看ZK服务状态
zk.sh status
# 停止ZK服务
zk.sh stop

2.3  Kafka安装

上传kafka并修改配置文件

tar -zxf kafka_2.12-3.6.1.tgz -C /export/server/
cd /export/server/
mv kafka_2.12-3.6.1/ kafka
cd kafka/config
vim server.properties
--修改如下配置
broker.id=1
advertised.listeners=PLAINTEXT://node1:9092
log.dirs=/export/server/kafka/kafka-logs
zookeeper.connect=node1:2181,node2:2181,node3:2181/kafka

分发kafka

cd /export/server/

scp -r kafka node2:`pwd`/
scp -r kafka node3:`pwd`/

# 分别修改Node2与Node3的配置文件
vim /export/server/kafka/config/server.properties
--node2
broker.id=2
advertised.listeners=PLAINTEXT://node2:9092
--node3
broker.id=3
advertised.listeners=PLAINTEXT://node3:9092

配置环境变量

--在node1,node2,node3均执行以下操作:
 
vim /etc/profile
--添加如下内容:
#KAFKA_HOME
export KAFKA_HOME=/export/server/kafka
export PATH=$PATH:$KAFKA_HOME/bin
 
source /etc/profile

2.4  启停Kafka

启动前请先启动ZooKeeper服务

cd /export/server/kafka
# 执行启动指令
bin/kafka-server-start.sh -daemon config/server.properties
# 执行关闭指令
bin/kafka-server-stop.sh

 封装启停脚本

cd /usr/bin
vim kfk.sh
--添加如下内容:
#! /bin/bash

case $1 in
"start"){
    for i in node1 node2 node3
    do
        echo " --------启动 $i Kafka-------"
        ssh $i "/export/server/kafka/bin/kafka-server-start.sh -daemon /export/server/kafka/config/server.properties"
    done
};;
"stop"){
    for i in node1 node2 node3
    do
        echo " --------停止 $i Kafka-------"
        ssh $i "/export/server/kafka/bin/kafka-server-stop.sh "
    done
};;
esac

# 给文件授权
chmod 777 kfk.sh

# 启动kafka
kfk.sh start
# 停止Kafka
kfk.sh stop

注意:停止Kafka集群时,一定要等Kafka所有节点进程全部停止后再停止ZooKeeper集群。因为Zookeeper集群当中记录着Kafka集群相关信息,Zookeeper集群一旦先停止,Kafka集群就没有办法再获取停止进程的信息,只能手动杀死Kafka进程了。

2.5 联合脚本

因为Kafka启动前,需要先启动ZooKeeper,关闭时,又需要将所有Kafka全部关闭后,才能关闭ZooKeeper,这样,操作起来感觉比较麻烦,所以可以将之前的2个脚本再做一次封装。

新建xcall脚本,具体实现可参考:

集群批量命令执行工具 xcall 配置指南https://blog.csdn.net/m0_73641796/article/details/146318713?sharetype=blogdetail&sharerId=146318713&sharerefer=PC&sharesource=m0_73641796&spm=1011.2480.3001.8118创建cluster.sh脚本文件

vim /usr/bin/cluster.sh
--添加如下内容:
#!/bin/bash

case $1 in
"start"){
        echo ================== 启动 Kafka集群 ==================
        #启动 Zookeeper集群
        zk.sh start

        #启动 Kafka集群
        kfk.sh start

        };;
"stop"){
    echo "================== 停止 Kafka 集群 =================="

    # 停止 Kafka
    kfk.sh stop

    # 检查 Kafka 进程是否完全退出
    max_wait=30  # 最多等待 30 秒
    count=0
    while true
    do
        kafka_count=$(xcall jps | grep -c Kafka)
        echo "当前未停止的 Kafka 进程数为 $kafka_count"

        # 判断 Kafka 是否全部停止
        if [ "$kafka_count" -eq 0 ]; then
            echo "Kafka 已完全停止"
            break
        fi

        # 如果超过最大等待时间,还没停干净,强制结束
        count=$((count+1))
        if [ $count -ge $max_wait ]; then
            echo "Kafka 停止超时,强制杀死残留进程"
            xcall "pkill -f kafka"
            break
        fi

        sleep 1
    done

    # 停止 Zookeeper
    zk.sh stop
};;
esac

--添加权限
chmod 777 /usr/bin/cluster.sh

脚本调用方式

# 集群启动
cluster.sh start
# 集群关闭
cluster.sh stop

相关文章:

  • 封装红黑树实现map和set(部分常用接口)(C++)
  • Redis的IO多路复用机制:高效的网络通信设计
  • Redis常用数据类型和使用常见以及基本操作举例(适合初学者,以医药连锁管理系统为背景)
  • 安装并使用anaconda(宏观版)
  • https握手过程
  • 【 node.js 】windows 电脑:node版本更换
  • [HelloCTF]PHPinclude-labs超详细WP-Level 1-FILE协议
  • Qt Widgets、Qt Quick
  • AI建模智能生成:从2D到3D,AI只需一步!
  • C语言——结构体、联合、枚举
  • DEEPSEEK可能是 “特洛伊木马“:论第三方数据模型的潜在安全威胁
  • 基于x11vnc的ubuntu远程桌面
  • Elasticsearch分页查询、关键词高亮与性能优化全解析
  • LeetCode热题100JS(54/100)第十天|124|200|994|207|208
  • 数图亮相第三届全国生鲜创新峰会,赋能生鲜零售数字化转型
  • 【力扣100】简要总结之哈希
  • 配置blender的python环境
  • 红黑树的部分实现(C++)
  • IPD解读——IPD重构产品研发与产品管理
  • C程序设计(第五版)及其参考解答,附pdf
  • 中国人保一季度业绩“分化”:财险净利增超92%,寿险增收不增利
  • 国家发改委:我国能源进口来源多元,企业减少甚至停止自美能源进口对国内能源供应没有影响
  • 国家发改委:是否进口美国饲料粮、油料不会影响我国粮食供应
  • 北上广深还是小城之春?“五一”想好去哪玩了吗
  • 加拿大警方:已确认有9人在温哥华驾车撞人事件中遇难
  • 多地征集农村假冒伪劣食品违法线索,全链条整治“三无”产品