当前位置: 首页 > news >正文

Spark-Streaming(1)

Spark Streaming概述

用于流式计算,处理实时数据流。

数据流以DStream(Discretized Stream)形式表示,内部由一系列RDD组成。

Spark Streaming特点

易用、容错、易整合到spark体系。

        易用性:支持Java、Python、Scala等编程语言,编写实时计算程序如同编写批处理程序。

        容错性:无需额外代码和配置即可恢复丢失的数据,确保实时计算的可靠性。

        整合性:可在Spark上运行,允许重复使用相关代码进行批处理和实时处理。

Spark-Streaming架构

驱动程序为StreamingContext,处理Spark作业并传给各工作节点。

工作节点接收数据并执行任务,结果备份到其他工作节点。

背压机制协调数据接收能力和资源处理能力,避免数据堆积或资源浪费。

Spark Streaming实操

实验需求

        使用 netcat 工具向9999端口发送数据,通过Spark Streaming读取端口数据并统计单词出现次数。

实验步骤

        配置Spark Streaming对象,设置时间间隔为3秒。

        进行扁平化数据处理,统计单词出现次数并输出结果。

代码解析

       数据接收和处理的具体操作,包括扁平化、转换和累加。启动Spark Streaming并处理异常情况。

        Discretized Stream 是 Spark Streaming 的基础抽象,代表持续性的数据流和经过各种 Spark 原语操作后的结果数据流。在内部实现上,DStream 是一系列连续的 RDD 来表示。每个 RDD 含有 一段时间间隔内的数据。

         对数据的操作也是按照 RDD 为单位来进行的

         计算过程由 Spark Engine 来完成

DStream 创建

创建DStream的三种方式:RDD队列、自定义数据源、kafka数据源

        RDD队列

        可以通过使用 ssc.queueStream(queueOfRDDs)来创建 DStream,每一个推送到这个队列中的 RDD,都会作为一个DStream 处理。

案例:

需求:循环创建几个 RDD,将 RDD 放入队列。通过 SparkStream 创建 Dstream,计算 WordCount

        自定义数据源

自定义数据源需要继承 Receiver,并实现 onStart、onStop 方法来自定义数据源采集。

(在 class 中定义 on start 和 on stop 方法。
   on start方法中创建新线程并调用接收数据的方法。
   on stop方法为空。)

案例:自定义数据源,实现监控某个端口号,获取该端口号内容。

1)	自定义数据源
class CustomerReceiver(host:String,port:Int) extends Receiver[String](StorageLevel.MEMORY_ONLY) {override def onStart(): Unit = {new Thread("Socket Receiver"){override def run(): Unit ={receive()}}.start()}def receive(): Unit ={var socket:Socket = new Socket(host,port)var input :String = nullvar reader = new BufferedReader(new InputStreamReader(socket.getInputStream,StandardCharsets.UTF_8))input = reader.readLine()while(!isStopped() && input != null){store(input)input = reader.readLine()}reader.close()socket.close()restart("restart")}override def onStop(): Unit = {}
}2)	使用自定义的数据源采集数据
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("stream")
val ssc = new StreamingContext(sparkConf,Seconds(5))val lineStream = ssc.receiverStream(new CustomerReceiver("node01",9999))val wordStream = lineStream.flatMap(_.split(" "))
val wordAndOneStream = wordStream.map((_,1))
val wordAndCountStream = wordAndOneStream.reduceByKey(_+_)
wordAndCountStream.print()ssc.start()
ssc.awaitTermination()

扁平化数据
将所有数据根据空格切分并进行扁平化处理。
转换成键值对形式,相同单词进行分组累加,实现词频统计。

相关文章:

  • 【Git】Git的远程分支已删除,为何本地还能显示?
  • oracle将表字段逗号分隔的值进行拆分,并替换值
  • ​CTGCache ​CTG-Cache TeleDB
  • 【MySQL数据库】表的约束
  • 工程投标k值分析系统(需求和功能说明)
  • 使用Multipart Form-Data一次请求获取多张图片
  • 真我推出首款 AI 翻译耳机,支持 32 种语言翻译
  • 2.5 函数的拓展
  • LangGraph(二)——QuickStart样例中的第二步
  • C++ std::forward 详解
  • 【源码】【Java并发】【ThreadLocal】适合中学者体质的ThreadLocal源码阅读
  • 在 40 亿整数中捕获“恰好出现两次”的数字
  • 动态提示词(小模型)、RAG和提示词系统
  • 【CPP】固定大小内存池
  • 蓝牙 6.0 发布,解锁无线科技新可能
  • 【TeamFlow】4.3.2 细化时间单位
  • ISO15189认证有什么要求?ISO15189认证流程
  • 15.三数之和(LeetCode)java
  • 数据集 | 柑橘果目标检测数据集
  • 云原生--CNCF-1-云原生计算基金会介绍(云原生生态的发展目标和未来)
  • 全国双拥模范城(县)名单
  • 首开股份:去年亏损约81.4亿元,是公司发展史上极其困难的一年
  • 张文宏:加强基层医疗体系建设,提升传染病早期监测和预警能力
  • 深一度|上海半马,展示“体育+”无限可能的路跑狂欢
  • “你是做什么的?”——人们能否对工作说不?
  • 龙登高谈近世的基层治理及制度变迁