Spark-Streaming(1)
Spark Streaming概述:
用于流式计算,处理实时数据流。
数据流以DStream(Discretized Stream)形式表示,内部由一系列RDD组成。
Spark Streaming特点:
易用、容错、易整合到spark体系。
易用性:支持Java、Python、Scala等编程语言,编写实时计算程序如同编写批处理程序。
容错性:无需额外代码和配置即可恢复丢失的数据,确保实时计算的可靠性。
整合性:可在Spark上运行,允许重复使用相关代码进行批处理和实时处理。
Spark-Streaming架构
驱动程序为StreamingContext,处理Spark作业并传给各工作节点。
工作节点接收数据并执行任务,结果备份到其他工作节点。
背压机制协调数据接收能力和资源处理能力,避免数据堆积或资源浪费。
Spark Streaming实操
实验需求:
使用 netcat 工具向9999端口发送数据,通过Spark Streaming读取端口数据并统计单词出现次数。
实验步骤:
配置Spark Streaming对象,设置时间间隔为3秒。
进行扁平化数据处理,统计单词出现次数并输出结果。
代码解析:
数据接收和处理的具体操作,包括扁平化、转换和累加。启动Spark Streaming并处理异常情况。
Discretized Stream 是 Spark Streaming 的基础抽象,代表持续性的数据流和经过各种 Spark 原语操作后的结果数据流。在内部实现上,DStream 是一系列连续的 RDD 来表示。每个 RDD 含有 一段时间间隔内的数据。
对数据的操作也是按照 RDD 为单位来进行的
计算过程由 Spark Engine 来完成
DStream 创建
创建DStream的三种方式:RDD队列、自定义数据源、kafka数据源
RDD队列
可以通过使用 ssc.queueStream(queueOfRDDs)来创建 DStream,每一个推送到这个队列中的 RDD,都会作为一个DStream 处理。
案例:
需求:循环创建几个 RDD,将 RDD 放入队列。通过 SparkStream 创建 Dstream,计算 WordCount
自定义数据源
自定义数据源需要继承 Receiver,并实现 onStart、onStop 方法来自定义数据源采集。
(在 class 中定义 on start 和 on stop 方法。
on start方法中创建新线程并调用接收数据的方法。
on stop方法为空。)
案例:自定义数据源,实现监控某个端口号,获取该端口号内容。
1) 自定义数据源
class CustomerReceiver(host:String,port:Int) extends Receiver[String](StorageLevel.MEMORY_ONLY) {override def onStart(): Unit = {new Thread("Socket Receiver"){override def run(): Unit ={receive()}}.start()}def receive(): Unit ={var socket:Socket = new Socket(host,port)var input :String = nullvar reader = new BufferedReader(new InputStreamReader(socket.getInputStream,StandardCharsets.UTF_8))input = reader.readLine()while(!isStopped() && input != null){store(input)input = reader.readLine()}reader.close()socket.close()restart("restart")}override def onStop(): Unit = {}
}2) 使用自定义的数据源采集数据
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("stream")
val ssc = new StreamingContext(sparkConf,Seconds(5))val lineStream = ssc.receiverStream(new CustomerReceiver("node01",9999))val wordStream = lineStream.flatMap(_.split(" "))
val wordAndOneStream = wordStream.map((_,1))
val wordAndCountStream = wordAndOneStream.reduceByKey(_+_)
wordAndCountStream.print()ssc.start()
ssc.awaitTermination()
扁平化数据
将所有数据根据空格切分并进行扁平化处理。
转换成键值对形式,相同单词进行分组累加,实现词频统计。