当前位置: 首页 > news >正文

Kafka 命令行操作与 Spark-Streaming 核心编程总结

一、Kafka 命令行操作详解

1.创建 Topic

命令格式:

kafka-topics.sh --create --zookeeper <zk节点列表> --topic <主题名> --partitions <分区数> --replication-factor <副本数>

参数说明:

分区数(partitions):必须指定,决定数据分片存储的并行度。

副本数(replication-factor):必须指定,不能超过 Broker 节点总数,用于数据冗余和高可用。

数据存储:创建后在 Kafka 数据目录生成以主题名-分区编号命名的文件夹(如test1-0)。

2.查看所有 Topic

命令:

kafka-topics.sh --list --zookeeper <zk节点列表>

3.查看 Topic 详细信息

命令:

bash

kafka-topics.sh --describe --zookeeper <zk节点列表> --topic <主题名>

ISR(In-Sync Replicas):与 Leader 同步的副本,可提供服务。

AR(Assigned Replicas):分区的所有副本。

4.删除 Topic

命令:

kafka-topics.sh --delete --zookeeper <zk节点列表> --topic <主题名>

5.生产数据

命令格式:

kafka-console-producer.sh --broker-list <Broker节点列表> --topic <主题名>

说明:数据以追加日志形式写入分区,每条数据仅存在于一个分区,但所有副本均存储数据。

6.消费数据

默认从最新位置消费:

kafka-console-consumer.sh --topic <主题名> --bootstrap-server <Broker节点列表>

从头开始消费:

kafka-console-consumer.sh --topic <主题名> --bootstrap-server <Broker节点列表> --from-beginning

指定消费组(Group ID):

kafka-console-consumer.sh --topic <主题名> --bootstrap-server <Broker节点列表> --consumer-property group.id=<组名>

特性:同一 Topic 的数据只能被同一 Group ID 的 Consumer 消费一次(通过偏移量记录消费进度)。

二、Spark-Streaming 核心编程:Kafka 数据源集成

1.Receiver API 与 Direct API 对比

Receiver API:

需要专用 Executor 接收数据,可能因接收与计算速度不匹配导致内存溢出,适用于早期版本。

Direct API(推荐):

计算 Executor 主动拉取 Kafka 数据,速度可控,适用于 Kafka 0.10 + 版本。

2.Kafka 0-10 Direct 模式实现步骤

(1)打开虚拟机zookpeer与kafka集群

(2)导入依赖

(3)编写代码

    (4) 开启Kafka生产者,产生数据

    kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic kafka

    (5)运行程序,接收Kafka生产的数据并进行相应处理

    (6)查看消费进度

    相关文章:

  1. 让Docker端口映射受Firewall管理而非iptables
  2. Python爬虫爬取图片并存储到MongoDB(注意:仅尝试存储一条空的示例数据到MongoDB,验证MongoDB的联通性)
  3. Vue3 setup、计算属性、侦听器、响应式API
  4. 【go语言】window环境从源码编译go
  5. 游戏引擎学习第241天:将OpenGL VSync 和 sRGB 扩展
  6. 【c++】【STL库】vector类详解
  7. Unity 使用 ADB 实时查看手机运行性能
  8. [linux]设置邮件发送告警功能
  9. 【C++】入门基础【下】
  10. 编译 C++ 报错“找不到 g++ 编译器”的终极解决方案(含 Windows/Linux/macOS)
  11. 2025最新系统 Linux 教程(六)
  12. HTML5 服务器发送事件 (Server-Sent Events):实现网页自动获取服务器更新
  13. 第53.5讲 | 小项目实战:用 SHAP 值解释农作物产量预测模型 [特殊字符][特殊字符]
  14. Next.js v15 eslint 规则配置
  15. Spring Boot知识点详解
  16. 27、Session有什么重⼤BUG?微软提出了什么⽅法加以解决?
  17. 【基础】Node.js 介绍、安装及npm 和 npx功能了解
  18. 如何快速高效学习Python?
  19. 界面开发框架DevExpress XAF实践:如何在Blazor项目中集成.NET Aspire?(二)
  20. (第三篇)Springcloud之Ribbon负载均衡
  21. 李强主持召开国务院常务会议
  22. 一周观展|五一假期将到,特展大展陆续开幕
  23. 四川一国企“80后”掌门人为报领导“知遇之恩”,盲目决策致数亿损失
  24. 新城市志|中国消费第一城,迎来“补贴力度最大”购物节
  25. 游戏论|迟来的忍者与武士:从《刺客信条:影》论多元话语的争议
  26. 一年吸引30多万人次打卡,江苏这个渔村是怎么做到的?