当前位置: 首页 > news >正文

spark-streaming(二)

DStream创建(kafka数据源)

1.在idea中的 pom.xml 中添加依赖

<dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.12</artifactId><version>3.0.0</version>
</dependency>

2.创建一个新的object,并写入以下代码

import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.kafka.clients.consumer.ConsumerRecord/*** 通过 DirectAPI 0 - 10 消费 Kafka 数据* 消费的 offset 保存在 _consumer_offsets 主题中*/
object DirectAPI {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("direct")val ssc = new StreamingContext(sparkConf, Seconds(3))// 定义 Kafka 相关参数val kafkaPara: Map[String, Object] = Map[String, Object](ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "node01:9092,node02:9092,node03:9092",ConsumerConfig.GROUP_ID_CONFIG -> "kafka","key.deserializer" -> classOf[StringDeserializer],"value.deserializer" -> classOf[StringDeserializer])// 通过读取 Kafka 数据,创建 DStreamval kafkaDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](Set("kafka"), kafkaPara))// 提取出数据中的 value 部分val valueDStream = kafkaDStream.map(record => record.value())// WordCount 计算逻辑valueDStream.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).print()ssc.start()ssc.awaitTermination()}
}    

3.在虚拟机中,开启kafka、zookeeper、yarn、dfs集群

4.创建一个新的topic---kafka,用于接下来的操作

查看所有的topic(是否创建成功)

开启kafka生产者,用于产生数据

启动idea中的代码,在虚拟机中输入数据

输入后可以在idea中查看到

查看消费进度

相关文章:

  • NeRF:原理 + 实现 + 实践全流程配置+数据集测试【Ubuntu20.04 】【2025最新版】
  • 【1区SCI】Fusion entropy融合熵,多尺度,复合多尺度、时移多尺度、层次 + 故障识别、诊断-matlab代码
  • CE第一次作业
  • 协作开发攻略:Git全面使用指南 — 第一部分 Git基础
  • 3台CentOS虚拟机部署 StarRocks 1 FE+ 3 BE集群
  • 与终端同居日记:Shell交响曲の终极共舞指南
  • 海量聊天消息处理:ShardingJDBC分库分表、ClickHouse冷热数据分离、ES复合查询方案、Flink实时计算与SpringCloud集成
  • C++ RPC以及cmake
  • Oracle 11g RAC ASM磁盘组剔盘、加盘实施过程
  • 基于 CentOS 的 Docker Swarm 集群管理实战指南
  • CentOS 7 基于 Nginx 的 HTML 部署全流程指南
  • 智能吸顶灯/摄影补光灯专用!FP7195双通道LED驱动,高效节能省空间 !
  • 保姆级教程:用EndNote 20让参考文献自动分组排序(中文在前,英文在后)
  • 【bug修复】一次诡异的接口数据显示 bug 排查之旅
  • Java高频面试之并发编程-07
  • Docker部署一款开源的极简服务器监控工具Ward内网穿透远程使用
  • 23种设计模式-行为型模式之策略模式(Java版本)
  • 记录学习的第三十一天
  • 基于PHP+Uniapp的互联网医院源码:电子处方功能落地方案
  • IDEA启动报错Failed to create JVM. JVM path的解决办法
  • 无视规范开“远端”、企业云端被窃密,国安部:莫让运维成运“危”
  • 加拿大财长:加拿大需要抗击美国关税
  • 广东东莞调整普通住宅价格标准:一类镇街上浮300余元/平方米
  • 韩国称DeepSeek未经同意将用户数据传至境外,外交部回应
  • 冲击一英里4分钟大关,基普耶贡挑战女子中长跑极限
  • 时隔七年,上合组织国家电影节再度在中国举办