当前位置: 首页 > news >正文

Spark-streaming核心编程

1.导入依赖

<dependency>

<groupId>org.apache.spark</groupId>

<artifactId>spark-streaming-kafka-0-10_2.12</artifactId>

<version>3.0.0</version>

</dependency>

2.编写代码

创建SparkConfStreamingContext

定义Kafka相关参数,如bootstrap serversgroup idkeyvaluedeserializer

使用KafkaUtils.createDirectStream方法创建DStream,该方法接受StreamingContext、位置策略、消费者策略等参数。

提取数据中的value部分,并进行word count计算。

启动StreamingContext并等待其终止。

import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}

import org.apache.spark.SparkConf

import org.apache.spark.streaming.{Seconds, StreamingContext}

import org.apache.spark.streaming.dstream.{DStream, InputDStream}

import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}

object DirectAPI {

  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("direct")

    val ssc = new StreamingContext(sparkConf,Seconds(3))

    //定义kafka相关参数

    val kafkaPara :Map[String,Object] = Map[String,Object](ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG

      ->"node01:9092,node02:9092,node03:9092",

      ConsumerConfig.GROUP_ID_CONFIG->"kafka",

      "key.deserializer"->"org.apache.kafka.common.serialization.StringDeserializer",

      "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer"

    )

    //通过读取kafka数据,创建DStream

    val kafkaDStream:InputDStream[ConsumerRecord[String,String]] = KafkaUtils.createDirectStream[String,String](

      ssc,LocationStrategies.PreferConsistent,

      ConsumerStrategies.Subscribe[String,String](Set("kafka"),kafkaPara)

    )

    //提取出数据中的value部分

    val valueDStream :DStream[String] = kafkaDStream.map(record=>record.value())

    //wordCount计算逻辑

    valueDStream.flatMap(_.split(" "))

      .map((_,1))

      .reduceByKey(_+_)

      .print()

    ssc.start()

    ssc.awaitTermination()

  }

  }

3.运行程序

开启Kafka集群。

4.使用Kafka生产者产生数据。

kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic kafka

​5运行Spark Streaming程序,接收Kafka生产的数据并进行处理。

6.查看消费进度

使用Kafka提供的kafka-consumer-groups.sh脚本查看消费组的消费进度。

相关文章:

  • 甘特图Vue3 | 原生绘制
  • leetcode 69和367
  • 构造函数体赋值和初始化列表
  • 面试题:在1亿个数据中取前10个最大的数据(Java实现)
  • 【数据结构】Map与Set结构详解
  • 开源交易所源码,交易所开发
  • 时序数据库IoTDB构建的能源电力解决方案
  • 无人设备遥控之调度自动化技术篇
  • 从岗位依附到能力生态:AI革命下“什么叫就业”的重构与价值
  • Python3(8) 字符串
  • 使用HYPRE库并行装配IJ稀疏矩阵指南: 矩阵预分配和重复利用
  • 数据集-目标检测系列- F35 战斗机 检测数据集 F35 plane >> DataBall
  • 数据分析之技术干货业务价值​​ powerquery 分组排序后取TOP
  • Code Splitting 分包策略
  • 【网络原理】从零开始深入理解TCP的各项特性和机制.(一)
  • 立錡科技优化 HDD、LPDDR、SoC 供电的高性能降压转换器
  • Python实现技能记录系统
  • 【华为OD机试真题】428、连续字母长度 | 机试真题+思路参考+代码解析(E卷)(C++)
  • Browser-Use WebUI:让AI自动使用浏览器帮你查询信息执行任务
  • StableDiffusionPipeline原理解读——引导尺度是如何调整噪声残差的
  • 美联储报告披露关税战冲击波:消费信心下降,经济担忧加深
  • 国家市监总局:民生无小事,严打民生领域侵权假冒违法行为
  • 《2025职场人阅读报告》:超半数会因AI改变阅读方向
  • 《哪吒之魔童降世》电影版权方诉《仙侠神域》游戏运营方侵权案开庭
  • 银行板块整体走强,工行、农行、中行股价再创新高
  • 云南洱源县4.8级地震:房屋受损442户,无人员伤亡报告