Kafka 命令行操作与 Spark-Streaming 核心编程总结
一、Kafka 命令行操作详解
1.创建 Topic
命令格式:
kafka-topics.sh --create --zookeeper <zk节点列表> --topic <主题名> --partitions <分区数> --replication-factor <副本数>
参数说明:
分区数(partitions):必须指定,决定数据分片存储的并行度。
副本数(replication-factor):必须指定,不能超过 Broker 节点总数,用于数据冗余和高可用。
数据存储:创建后在 Kafka 数据目录生成以主题名-分区编号命名的文件夹(如test1-0)。
2.查看所有 Topic
命令:
kafka-topics.sh --list --zookeeper <zk节点列表>
3.查看 Topic 详细信息
命令:
bash
kafka-topics.sh --describe --zookeeper <zk节点列表> --topic <主题名>
ISR(In-Sync Replicas):与 Leader 同步的副本,可提供服务。
AR(Assigned Replicas):分区的所有副本。
4.删除 Topic
命令:
kafka-topics.sh --delete --zookeeper <zk节点列表> --topic <主题名>
5.生产数据
命令格式:
kafka-console-producer.sh --broker-list <Broker节点列表> --topic <主题名>
说明:数据以追加日志形式写入分区,每条数据仅存在于一个分区,但所有副本均存储数据。
6.消费数据
默认从最新位置消费:
kafka-console-consumer.sh --topic <主题名> --bootstrap-server <Broker节点列表>
从头开始消费:
kafka-console-consumer.sh --topic <主题名> --bootstrap-server <Broker节点列表> --from-beginning
指定消费组(Group ID):
kafka-console-consumer.sh --topic <主题名> --bootstrap-server <Broker节点列表> --consumer-property group.id=<组名>
特性:同一 Topic 的数据只能被同一 Group ID 的 Consumer 消费一次(通过偏移量记录消费进度)。
二、Spark-Streaming 核心编程:Kafka 数据源集成
1.Receiver API 与 Direct API 对比
Receiver API:
需要专用 Executor 接收数据,可能因接收与计算速度不匹配导致内存溢出,适用于早期版本。
Direct API(推荐):
计算 Executor 主动拉取 Kafka 数据,速度可控,适用于 Kafka 0.10 + 版本。
2.Kafka 0-10 Direct 模式实现步骤
(1)打开虚拟机zookpeer与kafka集群
(2)导入依赖
(3)编写代码
(4) 开启Kafka生产者,产生数据
kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic kafka
(5)运行程序,接收Kafka生产的数据并进行相应处理
(6)查看消费进度