当前位置: 首页 > news >正文

关于Spark知识点与代码测试的学习总结

在大数据技术飞速发展的时代,Spark 作为分布式计算框架的佼佼者,在数据处理、分析和机器学习等领域展现出强大的性能和广泛的应用前景。作为一名大数据相关专业的大学生,为了提升自己在大数据领域的技能,适应未来职业发展的需求,在老师的带领下,我深入学习了 Spark 技术。

我在自己的笔记本电脑上搭建了 Spark 开发环境,包括安装 Java、Scala、Spark 等软件,并配置相关环境变量。通过本地环境的搭建,我熟悉了 Spark 的安装和配置过程,为后续的实践操作打下了基础。

在学习知识点和代码测试中,使我掌握了 Spark 的核心原理和架构,包括 RDD、Spark SQL、Spark Streaming 等核心组件的使用方法。

当然在学习的过程中也是困难重重,在搭建 Spark 开发环境时,遇到了 Java 版本不兼容、Scala 和 Spark 版本匹配等问题。在学习 Scala 和 Spark 编程时,对一些语法和 API 的使用不太熟悉,导致代码编写困难等,还好通过查找资料和仔细审查等解决了这些难题。以下就是我在学习过程中掌握的知识点以及所学代码的测试:

                                                       安装Spark

一、Spark

1.Spark 是一种由 Scala 语言开发的快速、通用、可扩展的大数据分析引擎 

2.Spark Core 中提供了 Spark 最基础与最核心的功能 

3.Spark SQL 是 Spark 用来操作结构化数据的组件。通过 Spark SQL,用户可以使用SQL 或者 Apache Hive 版本的 SQL 方言(HQL)来查询数据。 

4.Spark Streaming 是 Spark 平台上针对实时数据进行流式计算的组件,提供了丰富的处理数据流的 API。 

  1. 开启三台linux虚拟机

2.准备好spark的压缩包

步骤:

  1. 将spark的安装包上传到虚拟机node01中(建议路径:/opt/software/spark)并解压缩文件。将解压文件夹重命名为spark-local

解压:tar -zxvf spark-3.0.0-bin-hadoop3.2.tgz

重命名:mv spark-3.0.0-bin-hadoop3.2.tgz spark-local

  1. 启动Local环境。进入spark-local中,执行如下命令:

bin/spark-shell

启动成功后,可以输入网址进行Web UI监控页面进行访问。

  1. 命令行工具

在解压缩文件夹下的 data 目录中,添加 word.txt 文件。

在命令行工具中执行如下代码指令。

sc.textFile("data/word.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect

  1. 提交测试应用

在spark-local路径中输入以下指令:

bin/spark-submit \

--class org.apache.spark.examples.SparkPi \

--master local[2] \

./examples/jars/spark-examples_2.12-3.0.0.jar \

5.

指令说明:

1) --class 表示要执行程序的主类,此处可以更换为咱们自己写的应用程序

2) --master local[2] 部署模式,默认为本地模式,数字表示分配的虚拟 CPU 核数量

3) spark-examples_2.12-3.0.0.jar 运行的应用类所在的 jar 包,实际使用时,可以设定为咱们自己打的 jar 包

4) 数字 10 表示程序的入口参数,用于设定当前应用的任务数量

运行结果:

Spark运行架构

运行架构

Spark 框架的核心是一个计算引擎,整体来说,它采用了标准 master-slave 的结构。

RDD相关概念

Spark 计算框架为了能够进行高并发和高吞吐的数据处理,封装了三大数据结构,用于处理不同的应用场景。三大数据结构分别是:

 RDD : 弹性分布式数据集

累加器:分布式共享只写变量

广播变量:分布式共享只读变量

RDD

什么是 RDD

RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是 Spark 中最基本的数据处理模型。代码中是一个抽象类,它代表一个弹性的、不可变、可分区、里面的元素可并行计算的集合。

Spark-Core编程

 Spark Core

Spark-Core编程(二)

RDD转换算子

RDD 根据数据处理方式的不同将算子整体上分为 Value 类型、双 Value 类型和 Key-Value 类型。

Value类型:
  1. map

➢ 函数签名

def map[U: ClassTag](f: T => U): RDD[U]

➢ 函数说明

将处理的数据逐条进行映射转换,这里的转换可以是类型的转换,也可以是值的转换。

val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD_function")
val sparkContext = new SparkContext(sparkConf)

val dataRDD: RDD[Int] = sparkContext.makeRDD(List(1, 2, 3, 4))
val dataRDD1: RDD[Int] = dataRDD.map(
  num => {
    num * 2
  }
)
val dataRDD2: RDD[String] = dataRDD1.map(
  num => {
    "" + num
  }
)
sparkContext.stop()

代码测试:

  1. mapPartitions

➢ 函数签名

def mapPartitions[U: ClassTag](

f: Iterator[T] => Iterator[U],

preservesPartitioning: Boolean = false): RDD[U]

➢ 函数说明

将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处理,哪怕是过滤数据。

val dataRDD1: RDD[Int] = dataRDD.mapPartitions(
  datas => {
    datas.filter(_==2)
  }
)

map 和 mapPartitions 的区别:

➢ 数据处理角度

Map 算子是分区内一个数据一个数据的执行,类似于串行操作。而 mapPartitions 算子是以分区为单位进行批处理操作。

➢ 功能的角度

Map 算子主要目的将数据源中的数据进行转换和改变。但是不会减少或增多数据。MapPartitions 算子需要传递一个迭代器,返回一个迭代器,没有要求的元素的个数保持不变,所以可以增加或减少数据

➢ 性能的角度

Map 算子因为类似于串行操作,所以性能比较低,而是 mapPartitions 算子类似于批处理,所以性能较高。但是 mapPartitions 算子会长时间占用内存,那么这样会导致内存可能不够用,出现内存溢出的错误。所以在内存有限的情况下,不推荐使用。使用 map 操作。

  1. mapPartitionsWithIndex

➢ 函数签名

def mapPartitionsWithIndex[U: ClassTag](

f: (Int, Iterator[T]) => Iterator[U],

preservesPartitioning: Boolean = false): RDD[U]

➢ 函数说明

将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处理,哪怕是过滤数据,在处理时同时可以获取当前分区索引。

  1. flatMap

➢ 函数签名

def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]

➢ 函数说明

将处理的数据进行扁平化后再进行映射处理,所以算子也称之为扁平映射。

val dataRDD = sparkContext.makeRDD(List(
  List(1,2),List(3,4)
),1)
val dataRDD1 = dataRDD.flatMap(
  list => list
)

map和flatMap的区别:

map会将每一条输入数据映射为一个新对象。

flatMap包含两个操作:会将每一个输入对象输入映射为一个新集合,然后把这些新集合连成一个大集合。

  1. glom

➢ 函数签名

def glom(): RDD[Array[T]]

➢ 函数说明

将同一个分区的数据直接转换为相同类型的内存数组进行处理,分区不变

val dataRDD = sparkContext.makeRDD(List(
  1,2,3,4
),1)
val dataRDD1:RDD[Array[Int]] = dataRDD.glom()

  1. groupBy

➢ 函数签名

def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])]

➢ 函数说明

将数据根据指定的规则进行分组, 分区默认不变,但是数据会被打乱重新组合,我们将这样的操作称之为 shuffle。极限情况下,数据可能被分在同一个分区中。

一个组的数据在一个分区中,但是并不是说一个分区中只有一个组val dataRDD = sparkContext.makeRDD(List(1,2,3,4),1)
val dataRDD1 = dataRDD.groupBy(
  _%2
)

  1. filter

函数签名

def filter(f: T => Boolean): RDD[T]

函数说明

将数据根据指定的规则进行筛选过滤,符合规则的数据保留,不符合规则的数据丢弃。

当数据进行筛选过滤后,分区不变,但是分区内的数据可能不均衡,生产环境下,可能会出

现数据倾斜。

val dataRDD = sparkContext.makeRDD(List(
  1,2,3,4
),1)
val dataRDD1 = dataRDD.filter(_%2 == 0)

val dataRDD2 = dataRDD.filter(_%2 == 1)

  1. sample

函数签名

def sample(

 withReplacement: Boolean,

 fraction: Double,

 seed: Long = Utils.random.nextLong): RDD[T]

➢ 函数说明

根据指定的规则从数据集中抽取数据

val dataRDD = sparkContext.makeRDD(List(
  1,2,3,4
),1)

// 抽取数据不放回(伯努利算法)
// 伯努利算法:又叫 01 分布。例如扔硬币,要么正面,要么反面。
// 具体实现:根据种子和随机算法算出一个数和第二个参数设置几率比较,小于第二个参数要,大于不要
// 第一个参数:抽取的数据是否放回,false:不放回
// 第二个参数:抽取的几率,范围在[0,1]之间,0:全不取;1:全取;
// 第三个参数:随机数种子

val dataRDD1 = dataRDD.sample(false, 0.5)

// 抽取数据放回(泊松算法)
// 第一个参数:抽取的数据是否放回,true:放回;false:不放回
// 第二个参数:重复数据的几率,范围大于等于 0.表示每一个元素被期望抽取到的次数
// 第三个参数:随机数种子

val dataRDD2 = dataRDD.sample(true, 2)

 Spark Core

Spark-Core编程(三)

Value类型:

9) distinct

➢ 函数签名

def distinct()(implicit ord: Ordering[T] = null): RDD[T]

def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]

➢ 函数说明

将数据集中重复的数据去重

val dataRDD = sparkContext.makeRDD(List(
  1,2,3,4,1,2
))
val dataRDD1 = dataRDD.distinct()
val dataRDD2 = dataRDD.distinct(2)

10) coalesce

➢ 函数签名

def coalesce(numPartitions: Int, shuffle: Boolean = false,

 partitionCoalescer: Option[PartitionCoalescer] = Option.empty)

 (implicit ord: Ordering[T] = null)

 : RDD[T]

➢ 函数说明

根据数据量缩减分区,用于大数据集过滤后,提高小数据集的执行效率

当 spark 程序中,存在过多的小任务的时候,可以通过 coalesce 方法,收缩合并分区,减少分区的个数,减小任务调度成本

val dataRDD = sparkContext.makeRDD(List(
  1,2,3,4,1,2
),6)
val dataRDD1 = dataRDD.coalesce(2)

11) repartition

➢ 函数签名

def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]

➢ 函数说明

该操作内部其实执行的是 coalesce 操作,参数 shuffle 的默认值为 true。无论是将分区数多的RDD 转换为分区数少的 RDD,还是将分区数少的 RDD 转换为分区数多的 RDD,repartition操作都可以完成,因为无论如何都会经 shuffle 过程。

val dataRDD = sparkContext.makeRDD(List(
  1,2,3,4,1,2
),2)
val dataRDD1 = dataRDD.repartition(4)

12) sortBy

➢ 函数签名

def sortBy[K](

 f: (T) => K,

ascending: Boolean = true,

 numPartitions: Int = this.partitions.length)

 (implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]

➢ 函数说明

该操作用于排序数据。在排序之前,可以将数据通过 f 函数进行处理,之后按照 f 函数处理的结果进行排序,默认为升序排列。排序后新产生的 RDD 的分区数与原 RDD 的分区数一致。中间存在 shuffle 的过程。

val dataRDD = sparkContext.makeRDD(List(
  1,2,3,4,1,2
),2)
val dataRDD1 = dataRDD.sortBy(num=>num, false, 4)
val dataRDD2 = dataRDD.sortBy(num=>num, true, 4)

双Value类型:

13) intersection

➢ 函数签名

def intersection(other: RDD[T]): RDD[T]

➢ 函数说明

对源 RDD 和参数 RDD 求交集后返回一个新的 RDD

val dataRDD1 = sparkContext.makeRDD(List(1,2,3,4))
val dataRDD2 = sparkContext.makeRDD(List(3,4,5,6))
val dataRDD = dataRDD1.intersection(dataRDD2)

14) union

➢ 函数签名

def union(other: RDD[T]): RDD[T]

➢ 函数说明

对源 RDD 和参数 RDD 求并集后返回一个新的 RDD(重复数据不会去重)

val dataRDD1 = sparkContext.makeRDD(List(1,2,3,4))
val dataRDD2 = sparkContext.makeRDD(List(3,4,5,6))
val dataRDD = dataRDD1.union(dataRDD2)

15) subtract

➢ 函数签名

def subtract(other: RDD[T]): RDD[T]

➢ 函数说明

以源 RDD 元素为主,去除两个 RDD 中重复元素,将源RDD的其他元素保留下来。(求差集)

val dataRDD1 = sparkContext.makeRDD(List(1,2,3,4))
val dataRDD2 = sparkContext.makeRDD(List(3,4,5,6))
val dataRDD = dataRDD1.subtract(dataRDD2)

16) zip

➢ 函数签名

def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)]

➢ 函数说明

将两个 RDD 中的元素,以键值对的形式进行合并。其中,键值对中的 Key 为第 1 个 RDD

中的元素,Value 为第 2 个 RDD 中的相同位置的元素。

val dataRDD1 = sparkContext.makeRDD(List("a","b","c","d"))
val dataRDD2 = sparkContext.makeRDD(List(1,2,3,4))
val dataRDD = dataRDD1.zip(dataRDD2)

 Spark Core

 Spark-Core编程(四)

Key-Value类型:

17) partitionBy

➢ 函数签名

def partitionBy(partitioner: Partitioner): RDD[(K, V)]

➢ 函数说明

将数据按照指定 Partitioner 重新进行分区。Spark 默认的分区器是 HashPartitioner

val rdd: RDD[(Int, String)] =
  sc.makeRDD(Array((1,"aaa"),(2,"bbb"),(3,"ccc")),3)

val rdd2: RDD[(Int, String)] =
  rdd.partitionBy(new HashPartitioner(2))

18) groupByKey

➢ 函数签名

def groupByKey(): RDD[(K, Iterable[V])]

def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])]

def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]

➢ 函数说明

将数据源的数据根据 key 对 value 进行分组

val dataRDD1 =
  sc.makeRDD(List(("a",1),("b",2),("c",3),("a",4)))
val dataRDD2 = dataRDD1.groupByKey()
val dataRDD3 = dataRDD1.groupByKey(2)
val dataRDD4 = dataRDD1.groupByKey(new HashPartitioner(2))

19) reduceByKey

➢ 函数签名

def reduceByKey(func: (V, V) => V): RDD[(K, V)]

def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]

➢ 函数说明

可以将数据按照相同的 Key 对 Value 进行聚合

val dataRDD1 = sc.makeRDD(List(("a",1),("b",2),("c",3),("a",4)))
val dataRDD2 = dataRDD1.reduceByKey(_+_)
val dataRDD3 = dataRDD1.reduceByKey(_+_, 2)

reduceByKey 和 groupByKey 的区别:

从 shuffle 的角度:reduceByKey 和 groupByKey 都存在 shuffle 的操作,但是 reduceByKey可以在 shuffle 前对分区内相同 key 的数据进行预聚合(combine)功能,这样会减少落盘的数据量,而 groupByKey 只是进行分组,不存在数据量减少的问题,reduceByKey 性能比较高。

从功能的角度:reduceByKey 其实包含分组和聚合的功能。GroupByKey 只能分组,不能聚

合,所以在分组聚合的场合下,推荐使用 reduceByKey,如果仅仅是分组而不需要聚合。那

么还是只能使用 groupByKey

20) aggregateByKey

➢ 函数签名

def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U,

 combOp: (U, U) => U): RDD[(K, U)]

➢ 函数说明

将数据根据不同的规则进行分区内计算和分区间计算val dataRDD1 =
  sc.makeRDD(List(("a",1),("b",2),("c",3),("a",4)))
val dataRDD2 =
  dataRDD1.aggregateByKey(0)(_+_,_+_)

21) foldByKey

➢ 函数签名

def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]

➢ 函数说明

当分区内计算规则和分区间计算规则相同时,aggregateByKey 就可以简化为 foldByKey

val dataRDD1 =
  sc.makeRDD(List(("a",1),("b",2),("c",3),("a",4)))
val dataRDD2 = dataRDD1.foldByKey(0)(_+_)

22) combineByKey

➢ 函数签名

def combineByKey[C](

 createCombiner: V => C,//将当前值作为参数进行附加操作并返回

 mergeValue: (C, V) => C,// 在分区内部进行,将新元素V合并到第一步操作得到的C中

 mergeCombiners: (C, C) => C): RDD[(K, C)]//将第二步操作得到的C进行分区间计算

➢ 函数说明

最通用的对 key-value 型 rdd 进行聚集操作的聚集函数(aggregation function)。类似于

aggregate(),combineByKey()允许用户返回值的类型与输入不一致。

示例:现有数据 List(("a", 88), ("b", 95), ("a", 91), ("b", 93), ("a", 95), ("b", 98)),求每个key的总值及每个key对应键值对的个数

val list: List[(String, Int)] = List(("a", 88), ("b", 95), ("a", 91), ("b", 93),("a", 95), ("b", 98))
val input: RDD[(String, Int)] = sc.makeRDD(list, 2)
val combineRDD: RDD[(String, (Int, Int))] = input.combineByKey(
  (_, 1), //a=>(a,1)
  (acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1), //acc_1为数据源的value,acc_2为key出现的次数,二者进行分区内部的计算
  (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2) //将分区内部计算的结果进行分区间的汇总计算,得到每个key的总值以及每个key出现的次数
)

reduceByKey、foldByKey、aggregateByKey、combineByKey 的区别:

reduceByKey: 相同 key 的第一个数据不进行任何计算,分区内和分区间计算规则相同

FoldByKey: 每一个key 对应的数据和初始值进行分区内计算,分区内和分区间计算规则相

AggregateByKey:每一个 key 对应的数据和初始值进行分区内计算,分区内和分区间计算规则可以不相同

CombineByKey:当计算时,发现数据结构不满足要求时,可以让第一个数据转换结构。分区

内和分区间计算规则不相同。

 Spark Core

Spark-Core编程(四)

23) sortByKey

➢ 函数签名

def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length)

 : RDD[(K, V)]

➢ 函数说明

在一个(K,V)的 RDD 上调用,K 必须实现 Ordered 接口(特质),返回一个按照 key 进行排序

val dataRDD1 = sc.makeRDD(List(("a",1),("b",2),("c",3)))
val sortRDD1: RDD[(String, Int)] = dataRDD1.sortByKey(true)
val sortRDD2: RDD[(String, Int)] = dataRDD1.sortByKey(false)

24) join

➢ 函数签名

def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]

➢ 函数说明

在类型为(K,V)和(K,W)的 RDD 上调用,返回一个相同 key 对应的所有元素连接在一起的

(K,(V,W))的 RDD

val rdd: RDD[(Int, String)] = sc.makeRDD(Array((1, "a"), (2, "b"), (3, "c")))
val rdd1: RDD[(Int, Int)] = sc.makeRDD(Array((1, 4), (2, 5), (3, 6)))
rdd.join(rdd1).collect().foreach(println)

25) leftOuterJoin

➢ 函数签名

def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]

➢ 函数说明

类似于 SQL 语句的左外连接

val dataRDD1 = sc.makeRDD(List(("a",1),("b",2),("c",4)))
val dataRDD2 = sc.makeRDD(List(("a",1),("b",2),("c",3)))
val rdd: RDD[(String, (Int, Option[Int]))] = dataRDD1.leftOuterJoin(dataRDD2)

26) cogroup

➢ 函数签名

def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]

➢ 函数说明

在类型为(K,V)和(K,W)的 RDD 上调用,返回一个(K,(Iterable<V>,Iterable<W>))类型的 RDD

val dataRDD1 = sc.makeRDD(List(("a",1),("a",2),("c",3)))
val dataRDD2 = sc.makeRDD(List(("a",1),("c",2),("c",3)))
val value: RDD[(String, (Iterable[Int], Iterable[Int]))] =
  dataRDD1.cogroup(dataRDD2)

Spark Core

Spark-Core编程(五)

RDD行动算子:

行动算子就是会触发action的算子,触发action的含义就是真正的计算数据。

1) reduce

➢ 函数签名

def reduce(f: (T, T) => T): T

➢ 函数说明

聚集 RDD 中的所有元素,先聚合分区内数据,再聚合分区间数据

val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))
val reduceResult: Int = rdd.reduce(_+_)
println(reduceResult)

2) collect

➢ 函数签名

def collect(): Array[T]

➢ 函数说明

在驱动程序中,以数组 Array 的形式返回数据集的所有元素

3) foreach

➢ 函数签名

def foreach(f: T => Unit): Unit = withScope {

 val cleanF = sc.clean(f)

 sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))

}

➢ 函数说明

分布式遍历 RDD 中的每一个元素,调用指定函数

val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))

rdd.collect().foreach(println)

4) count

➢ 函数签名

def count(): Long

➢ 函数说明

返回 RDD 中元素的个数

val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))

val countResult: Long = rdd.count()
println(countResult)

5) first

➢ 函数签名

def first(): T

➢ 函数说明

返回 RDD 中的第一个元素

val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))

val firstResult: Int = rdd.first()
println(firstResult)

6) take

➢ 函数签名

def take(num: Int): Array[T]

➢ 函数说明

返回一个由 RDD 的前 n 个元素组成的数组

val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))

val takeResult: Array[Int] = rdd.take(2)

takeResult.foreach(println)

7) takeOrdered

➢ 函数签名

def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]

➢ 函数说明

返回该 RDD 排序后的前 n 个元素组成的数组

val rdd: RDD[Int] = sc.makeRDD(List(1,3,2,4))

val result: Array[Int] = rdd.takeOrdered(2)
result.foreach(println)

8) aggregate

➢ 函数签名

def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U

➢ 函数说明

分区的数据通过初始值和分区内的数据进行聚合,然后再和初始值进行分区间的数据聚合

val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4),8)
// 将该 RDD 所有元素相加得到结果
val result1: Int = rdd.aggregate(0)(_+_, _+_)
val result2: Int = rdd.aggregate(10)(_+_,_+_)

println(result1)
println("**********")

9) fold

➢ 函数签名

def fold(zeroValue: T)(op: (T, T) => T): T

➢ 函数说明

折叠操作,aggregate 的简化版操作

val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))
val foldResult: Int = rdd.fold(0)(_+_)
println(foldResult)

10) countByKey

➢ 函数签名

def countByKey(): Map[K, Long]

➢ 函数说明

统计每种 key 的个数

val rdd: RDD[(Int, String)] = sc.makeRDD(List((1, "a"), (1, "a"), (1, "a"), (2,"b"), (3, "c"), (3, "c")))

val result: collection.Map[Int, Long] = rdd.countByKey()
print(result)

11) save 相关算子

➢ 函数签名

def saveAsTextFile(path: String): Unit

def saveAsObjectFile(path: String): Unit

def saveAsSequenceFile(

 path: String,

 codec: Option[Class[_ <: CompressionCodec]] = None): Unit //了解即可

➢ 函数说明

将数据保存到不同格式的文件中

val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))
// 保存成 Text 文件
rdd.saveAsTextFile("Spark-core/output/output")
// 序列化成对象保存到文件
rdd.saveAsObjectFile("Spark-core/output/output1")

Spark Core

 Spark-Core编程(六)

累加器

实现原理

累加器用来把 Executor 端变量信息聚合到 Driver 端。在 Driver 程序中定义的变量,在

Executor 端的每个 Task 都会得到这个变量的一份新的副本,每个 task 更新这些副本的值后,传回 Driver 端进行 merge。

val rdd = sparkContext.makeRDD(List(1,2,3,4,5))
// 声明累加器
var sum = sparkContext.longAccumulator("sum");
rdd.foreach(
  num => {
    // 使用累加器
    sum.add(num)
  }
)
// 获取累加器的值
println("sum = " + sum.value)

自定义累加器实现wordcount:

创建自定义累加器:

class WordCountAccumulator extends AccumulatorV2[String,mutable.Map[String,Long]] {
  var map:mutable.Map[String,Long] = mutable.Map()

  override def isZero: Boolean = map.isEmpty

  override def copy(): AccumulatorV2[String, mutable.Map[String,Long]] = new WordCountAccumulator

  override def reset(): Unit = map.clear()

  override def add(v: String): Unit = {
    map(v) = map.getOrElse(v,0L)+1L
  }

  override def merge(other: AccumulatorV2[String, mutable.Map[String,Long]
  ]): Unit = {
    val map1 = map
    val map2 = other.value
    map = map1.foldLeft(map2)(
      (innerMap,kv)=>{
        innerMap(kv._1) = innerMap.getOrElse(kv._1,0L)+kv._2
        innerMap
      }
    )
  }
  override def value: mutable.Map[String,Long] = map
}

调用自定义累加器:

val rdd = sparkContext.makeRDD(
  List("spark","scala","spark hadoop","hadoop")
)
val acc = new WordCountAccumulator
sparkContext.register(acc)

rdd.flatMap(_.split(" ")).foreach(
  word=>acc.add(word)
)
println(acc.value)

广播变量

实现原理

广播变量用来高效分发较大的对象。向所有工作节点发送一个较大的只读值,以供一个

或多个 Spark 操作使用。比如,如果你的应用需要向所有节点发送一个较大的只读查询表,

广播变量用起来都很顺手。在多个并行操作中使用同一个变量,但是 Spark 会为每个任务

分别发送。

val rdd1 = sparkContext.makeRDD(List( ("a",1), ("b", 2), ("c", 3), ("d", 4) ),4)
  val list = List( ("a",4), ("b", 5), ("c", 6), ("d", 7))

  val broadcast :Broadcast[List[(String,Int)]] = sparkContext.broadcast(list)
  val resultRDD :RDD[(String,(Int,Int))] = rdd1.map{
    case (key,num)=> {
      var num2 = 0
      for((k,v)<-broadcast.value){
        if(k == key) {
          num2 = v
        }
      }
      (key,(num,num2))
    }
  }
  resultRDD.collect().foreach(println)
  sparkContext.stop()
}

第三章第一节 Spark-SQL简介

Spark-SQL 概述

Spark SQL 是 Spark 用于结构化数据(structured data)处理的 Spark 模块。

Spark-SQL 特点

 易整合。无缝的整合了 SQL 查询和 Spark 编程

 统一的数据访问。使用相同的方式连接不同的数据源

 兼容 Hive。在已有的仓库上直接运行 SQL 或者 HQL

标准数据连接。通过 JDBC 或者 ODBC 来连接

 DataFrame 是什么

在 Spark 中,DataFrame 是一种以 RDD 为基础的分布式数据集,类似于传统数据库中 的二维表格。DataFrame 与 RDD 的主要区别在于,前者带有 schema 元信息,即 DataFrame 所表示的二维表数据集的每一列都带有名称和类型。

 DataSet 是什么

DataSet 是分布式数据集合。DataSet 是 Spark 1.6 中添加的一个新抽象,是 DataFrame 的一个扩展。它提供了 RDD 的优势(强类型,使用强大的 lambda 函数的能力)以及 Spark SQL 优化执行引擎的优点。DataSet 也可以使用功能性的转换(操作 map,flatMap,filter 等等)

 DataSet 是 DataFrame API 的一个扩展,是 SparkSQL 最新的数据抽象

 用户友好的 API 风格,既具有类型安全检查也具有 DataFrame 的查询优化特性;

 用样例类来对 DataSet 中定义数据的结构信息,样例类中每个属性的名称直接映射到 DataSet 中的字段名称;

 DataSet 是强类型的。比如可以有 DataSet[Car],DataSet[Person]。

DataFrame 是 DataSet 的特列,DataFrame=DataSet[Row] ,所以可以通过 as 方法将 DataFrame 转换为 DataSet。Row 是一个类型,跟 Car、Person 这些的类型一样,所有的表结构信息都用 Row 来表示。

第三章第二节 Spark-SQL核心编程(一)

DataFrame

Spark SQL 的 DataFrame API 允许我们使用 DataFrame 而不用必须去注册临时表或者生成 SQL 表达式。DataFrame API 既有 transformation 操作也有 action 操作。

创建 DataFrame

在 Spark SQL 中 SparkSession 是创建 DataFrame 和执行 SQL 的入口,创建 DataFrame

有三种方式:通过 Spark 的数据源进行创建;从一个存在的 RDD 进行转换;还可以从 Hive

Table 进行查询返回。

从 Spark 数据源进行创建

Spark-SQL支持的数据类型:

➢ 在 spark 的 bin/data 目录中创建 user.json 文件

{"username":"zhangsan","age":20}

{"username":"lisi","age":17}

➢ 读取 json 文件创建 DataFrame

 val df = spark.read.json("F:\\bigdata\\spark\\2.4.7\\bin\\data\\user.json")

展示数据:

df.show

SQL 语法

SQL 语法风格是指我们查询数据的时候使用 SQL 语句来查询,这种风格的查询必须要

有临时视图或者全局视图来辅助

  1. 读取 JSON 文件创建 DataFrame

 val df1 = spark.read.json("F:\\bigdata\\spark\\2.4.7\\bin\\data\\user.json")

  1. 对 DataFrame 创建一个临时表

df1.createOrReplaceTempView("people")

  1. 通过 SQL 语句实现查询全表

val sqlDF = spark.sql("select * from people")

  1. 结果展示

sqlDF.show

第三章第三节 Spark-SQL核心编程(二)

DataFrame

DSL 语法

DataFrame 提供一个特定领域语言(domain-specific language, DSL)去管理结构化的数据。 可以在 Scala, Java, Python 和 R 中使用 DSL,使用 DSL 语法风格不必去创建临时视图了。

  1. 创建一个 DataFrame

 val df = spark.read.json("F:\\bigdata\\spark\\2.4.7\\bin\\data\\user.json")

  1. 查看 DataFrame 的 Schema 信息

df.printSchema

  1. 只查看"username"列数据

df.select("username").show()

  1. 查看"username"列数据以及"age+1"数据

注意:涉及到运算的时候, 每列都必须使用$, 或者采用引号表达式:单引号+字段名

df.select($"username",$"age" + 1).show

  1. 查看"age"大于"18"的数据

df.filter($"age">18).show

  1. 按照"age"分组,查看数据条数

df.groupBy("age").count.show

RDD 转换为 DataFrame

在 IDEA 中开发程序时,如果需要 RDD 与 DF 或者 DS 之间互相操作,那么需要引入 import spark.implicits._ 这里的 spark 不是 Scala 中的包名,而是创建的 sparkSession 对象的变量名称,所以必 须先创建 SparkSession 对象再导入。这里的 spark 对象不能使用 var 声明,因为 Scala 只支持 val 修饰的对象的引入。

spark-shell 中无需导入,自动完成此操作。

val idRDD = sc.textFile("F:\\bigdata\\spark\\2.4.7\\bin\\data\\id.txt")

idRDD.toDF("id").show

实际开发中,一般通过样例类将 RDD 转换为 DataFrame

case class User(name:String, age:Int)

sc.makeRDD(List(("zhangsan",30), ("lisi",40))).map(t=>User(t._1, t._2)).toDF.show

DataFrame 转换为 RDD

DataFrame 其实就是对 RDD 的封装,所以可以直接获取内部的 RDD

val df = sc.makeRDD(List(("zhangsan",30), ("lisi",40))).map(t=>User(t._1, t._2)).toDF

val rdd = df.rdd

val array = rdd.collect

注意:此时得到的 RDD 存储类型为 Row

array(0)

array(0)(0)

array(0).getAs[String]("name")

第三章第四节 Spark-SQL核心编程(三)

DataSet

DataSet 是具有强类型的数据集合,需要提供对应的类型信息。

创建 DataSet

1) 使用样例类序列创建 DataSet

case class Person(name: String, age: Long)

val caseClassDS = Seq(Person("zhangsan",2)).toDS()

caseClassDS.show

2) 使用基本类型的序列创建 DataSet

val ds = Seq(1,2,3,4,5).toDS

ds.show

注意:在实际使用的时候,很少用到把序列转换成DataSet,更多的是通过RDD来得到DataSet

RDD 转换为 DataSet

SparkSQL 能够自动将包含有 case 类的 RDD 转换成 DataSet,case 类定义了 table 的结 构,case 类属性通过反射变成了表的列名。Case 类可以包含诸如 Seq 或者 Array 等复杂的结构。

case class User(name:String, age:Int)

sc.makeRDD(List(("zhangsan",30), ("lisi",49))).map(t=>User(t._1, t._2)).toDS

DataSet 转换为 RDD

DataSet 其实也是对 RDD 的封装,所以可以直接获取内部的 RDD

case class User(name:String, age:Int)

sc.makeRDD(List(("zhangsan",30), ("lisi",49))).map(t=>User(t._1, t._2)).toDS

val rdd = res3.rdd

rdd.collect

DataFrame 和 DataSet 转换

DataFrame 其实是 DataSet 的特例,所以它们之间是可以互相转换的。

DataFrame 转换为 DataSet

case class User(name:String, age:Int)

val df = sc.makeRDD(List(("zhangsan",30), ("lisi",49))).toDF("name","age")

val ds = df.as[User]

DataSet 转换为 DataFrame

val ds = df.as[User]

val df = ds.toDF

Spark-SQL核心编程(四)

实验内容:

利用IDEA开发Spark-SQL。

实验步骤:

利用IDEA开发Spark-SQL

  1. 创建子模块Spark-SQL,并添加依赖

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.12</artifactId>
    <version>3.0.0</version>
</dependency>

  1. 创建Spark-SQL的测试代码:

import org.apache.spark.SparkConf

import org.apache.spark.sql.{SparkSession, DataFrame, Dataset}

import org.apache.spark.rdd.RDD

import org.apache.spark.sql.Row

case class User(id:Int, name:String, age:Int)

object SparkSQLDemo {

  def main(args: Array[String]): Unit = {

    // 创建上下文环境配置对象

    val sparkConf = new SparkConf()

      .setMaster("local[*]")

      .setAppName("SQLDemo")

      .set("spark.some.config.option", "some-value") // 可选配置

    // 创建SparkSession对象

    val spark: SparkSession = SparkSession.builder()

      .config(sparkConf)

      .getOrCreate()

    import spark.implicits._

    // 1. 读取json文件

    val df: DataFrame = spark.read.json("F:\\bigdata\\spark\\2.4.7\\bin\\data\\user.json")

    println("原始数据:")

    df.show()

    // 2. SQL风格语法

    df.createOrReplaceTempView("user")

    println("SQL查询所有数据:")

    spark.sql("SELECT * FROM user").show()

    println("SQL计算平均年龄:")

    spark.sql("SELECT avg(age) as avg_age FROM user").show()

    // 3. DSL风格语法

    println("DSL查询部分字段:")

    df.select($"username", $"age").show()

    println("DSL过滤查询:")

    df.filter($"age" > 25).show()

    // 4. RDD <=> DataFrame <=> DataSet 转换

    // 4.1 RDD => DataFrame => DataSet

    println("RDD => DataFrame => DataSet 转换:")

    val rdd1: RDD[(Int, String, Int)] = spark.sparkContext.makeRDD(

      List((1, "zhangsan", 30), (2, "lisi", 40), (3, "wangwu", 25))

    )

    // RDD转DataFrame

    val df1: DataFrame = rdd1.toDF("id", "name", "age")

    println("RDD转DataFrame:")

    df1.show()

    // DataFrame转DataSet

    val ds1: Dataset[User] = df1.as[User]

    println("DataFrame转DataSet:")

    ds1.show()

    // 4.2 DataSet => DataFrame => RDD

    println("DataSet => DataFrame => RDD 转换:")

    val df2 = ds1.toDF()

    println("DataSet转DataFrame:")

    df2.show()

    val rdd2: RDD[Row] = df2.rdd

    println("DataFrame转RDD并打印:")

    rdd2.foreach(row => println(s"Row: id=${row.getInt(0)}, name=${row.getString(1)}, age=${row.getInt(2)}"))

    // 4.3 直接RDD转DataSet

    println("直接RDD转DataSet:")

    rdd1.map {

      case (id, name,age) => User(id, name, age)

    }.toDS().show()

    // 4.4 DataSet转RDD

    val rdd3 = ds1.rdd

    println("DataSet转RDD并打印字段:")

    rdd3.foreach(user => println(s"User: id=${user.id}, name=${user.name}, age=${user.age}"))

    // 5. 聚合操作示例

    println("聚合操作示例:")

    ds1.groupBy("age").count().show()

    // 6. 类型安全的DataSet操作

    println("类型安全的DataSet操作:")

    ds1.filter(_.age > 30).show()

    // 关闭SparkSession

    spark.stop()

  }

}

第三章第六节 Spark-SQL核心编程(五)

自定义函数:

UDF:

val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SQLDemo")
//创建SparkSession对象
val spark :SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()

import spark.implicits._
//读取json文件
val df : DataFrame = spark.read.json("Spark-SQL/input/user.json")

spark.udf.register("addName",(x:String)=>"Name:"+x)

df.createOrReplaceTempView("people")
spark.sql("select addName(username),age from people").show()

spark.stop()

UDAF(自定义聚合函数)

强类型的 Dataset 和弱类型的 DataFrame 都提供了相关的聚合函数, 如 count(),

countDistinct(),avg(),max(),min()。除此之外,用户可以设定自己的自定义聚合函数。Spark3.0之前我们使用的是UserDefinedAggregateFunction作为自定义聚合函数,从 Spark3.0 版本后可以统一采用强类型聚合函数 Aggregator

实验需求:计算平均工资

实现方式一:RDD

import org.apache.spark.{SparkConf, SparkContext}

object RDD{

  def main(args: Array[String]): Unit = {

    // 1. 创建Spark配置和上下文

    val sparkConf: SparkConf = new SparkConf()

      .setAppName("SalaryAverageCalculation")

      .setMaster("local[*]")  // 使用本地所有可用核

    val sc: SparkContext = new SparkContext(sparkConf)

    try {

      // 2. 创建RDD并计算平均工资

      val resRDD: (Int, Int) = sc.makeRDD(List(("zhangsan", 20), ("lisi", 30), ("wangwu", 40)))

        .map {

          case (name, salary) => {

            (salary, 1)  // 转换为(工资, 计数)的元组

          }

        }

        .reduce {

          (t1, t2) => {

            (t1._1 + t2._1, t1._2 + t2._2)  // 累加工资总和和人数

          }

        }

      // 3. 计算并打印平均工资

      if (resRDD._2 != 0) {

        val averageSalary = resRDD._1.toDouble / resRDD._2

        println(f"总工资: ${resRDD._1}")

        println(f"总人数: ${resRDD._2}")

        println(f"平均工资: $averageSalary%.2f")  // 保留两位小数

      } else {

        println("警告:没有有效数据计算平均工资")

      }

    } finally {

      // 4. 确保无论如何都关闭SparkContext

      sc.stop()

    }

  }

}

实现方式:强类型UDAF

import org.apache.spark.SparkConf

import org.apache.spark.sql.SparkSession

import org.apache.spark.sql.Encoder

import org.apache.spark.sql.Encoders

import org.apache.spark.sql.functions

import org.apache.spark.rdd.RDD

import org.apache.spark.sql.DataFrame

// 定义一个用于存储总和和计数的缓冲区类

case class Buff(var sum: Long, var cnt: Long)

// 自定义聚合函数类,继承自 Aggregator

class MyAverageUDAF extends org.apache.spark.sql.expressions.Aggregator[Long, Buff, Double] {

  // 初始化缓冲区

  override def zero: Buff = Buff(0, 0)

  // 合并输入值到缓冲区

  override def reduce(b: Buff, a: Long): Buff = {

    b.sum += a

    b.cnt += 1

    b

  }

  // 合并两个缓冲区

  override def merge(b1: Buff, b2: Buff): Buff = {

    b1.sum += b2.sum

    b1.cnt += b2.cnt

    b1

  }

  // 完成聚合操作,计算最终结果

  override def finish(reduction: Buff): Double = {

    reduction.sum.toDouble / reduction.cnt

  }

  // 定义缓冲区的编码器

  override def bufferEncoder: Encoder[Buff] = Encoders.product

  // 定义输出结果的编码器

  override def outputEncoder: Encoder[Double] = Encoders.scalaDouble

}

object Cyy {

  def main(args: Array[String]): Unit = {

    // 创建 SparkConf 对象,设置应用名称和运行模式

    val sparkconf: SparkConf = new SparkConf().setAppName("app").setMaster("local[*]")

    // 创建 SparkSession 对象

    val spark: SparkSession = SparkSession.builder().config(sparkconf).getOrCreate()

    import spark.implicits._

    // 创建 RDD 并存储人员姓名和工资信息

    val res: RDD[(String, Int)] = spark.sparkContext.makeRDD(List(("zhangsan", 20), ("lisi", 30), ("wangwu", 40)))

    // 将 RDD 转换为 DataFrame

    val df: DataFrame = res.toDF("name", "salary")

    // 在 Spark 中注册临时视图

    df.createOrReplaceTempView("user")

    // 创建自定义聚合函数实例

    var myAverage = new MyAverageUDAF

    // 在 Spark 中注册聚合函数

    spark.udf.register("avgSalary", functions.udaf(myAverage))

    // 执行 SQL 查询,计算平均工资并显示结果

    spark.sql("select avgSalary(salary) from user").show()

    // 关闭 SparkSession 连接

    spark.stop()

  }

}

    

第七节 Spark-SQL核心编程(六)

MySQL

Spark SQL 可以通过 JDBC 从关系型数据库中读取数据的方式创建 DataFrame,通过对

DataFrame 一系列的计算后,还可以将数据再写回关系型数据库中。

IDEA通过JDBC对MySQL进行操作:

  1. 导入依赖

<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.1.27</version>
</dependency>

MySQL8  <version>8.0.11</version>

  1. 读取数据

val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SQL")
val spark:SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()

import spark.implicits._

//通用的load方式读取


spark.read.format("jdbc")
  .option("url","jdbc:mysql://localhost:3306/system")
  .option("driver","com.mysql.jdbc.Driver")//com.mysql.cj.jdbc.Driver
  .option("user","root")
  .option("password","123456")
  .option("dbtable","user")
  .load().show()

spark.stop()

//通用的load方法的另一种形式
spark.read.format("jdbc")
  .options(
    Map("url"->"jdbc:mysql://localhost:3306/system?user=root&password=123456","dbtable"->"user","driver"->"com.mysql.jdbc.Driver"))
  .load().show()

//通过JDBC
val pros :Properties = new Properties()
pros.setProperty("user","root")
pros.setProperty("password","123456")
val df :DataFrame = spark.read.jdbc("jdbc:mysql://localhost:3306/system","user",pros)
df.show()

  1. 写入数据

val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SQL")
val spark:SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()

import spark.implicits._
val rdd: RDD[Stu] = spark.sparkContext.makeRDD(List(Stu("lisi", 20),
  Stu("zs", 30)))
val ds:Dataset[Stu] = rdd.toDS()

ds.write.format("jdbc")
  .option("url","jdbc:mysql://localhost:3306/system")
  .option("driver","com.mysql.jdbc.Driver")
  .option("user","root")
  .option("password","123456")
  .option("dbtable","user2")
  .mode(SaveMode.Append)
  .save()

spark.stop()

 Spark-SQL核心编程(七)

代码操作Hive

1. 导入依赖。

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-hive_2.12</artifactId>
    <version>3.0.0</version>
</dependency>

<dependency>
    <groupId>org.apache.hive</groupId>
    <artifactId>hive-exec</artifactId>
    <version>2.3.3</version>
</dependency>

可能出现下载jar包的问题:

D:\maven\repository\org\pentaho\pentaho-aggdesigner-algorithm\5.1.5-jhyde

2. 将hive-site.xml 文件拷贝到项目的 resources 目录中。

3. 代码实现。

val sparkConf = new SparkConf().setMaster("local[*]").setAppName("hive")
val spark:SparkSession = SparkSession.builder()
  .enableHiveSupport()
  .config(sparkConf)
  .getOrCreate()

spark.sql("show databases").show()
spark.sql("create database spark_sql")
spark.sql("show databases").show()

注意:

  1. 如果在执行操作时,出现如下错误:

可以在代码最前面增加如下代码解决:

System.setProperty("HADOOP_USER_NAME", "node01")

此处的 node01 改为自己的 hadoop 用户名称

  1. 在开发工具中创建数据库默认是在本地仓库,通过参数修改数据库仓库的地址: config("spark.sql.warehouse.dir", "hdfs://node01:9000/user/hive/warehouse")

Spark-Streaming

DStream实操

案例一:WordCount案例

需求:使用 netcat 工具向 9999 端口不断的发送数据,通过 SparkStreaming 读取端口数据并统计不同单词出现的次数

实验步骤:

  1. 添加依赖

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.12</artifactId>
    <version>3.0.0</version>
</dependency>

  1. 编写代码

val sparkConf = new SparkConf().setMaster("local[*]").setAppName("streaming")
val ssc = new StreamingContext(sparkConf,Seconds(3))

val lineStreams = ssc.socketTextStream("node01",9999)
val wordStreams = lineStreams.flatMap(_.split(" "))
val wordAndOneStreams = wordStreams.map((_,1))
val wordAndCountStreams = wordAndOneStreams.reduceByKey(_+_)
wordAndCountStreams.print()

ssc.start()
ssc.awaitTermination()

  1. 启动netcat发送数据

nc -lk 9999

 Spark-Streaming核心编程(一)

DStream 创建

创建DStream的三种方式:RDD队列、自定义数据源、kafka数据源

RDD队列

可以通过使用 ssc.queueStream(queueOfRDDs)来创建 DStream,每一个推送到这个队列中的 RDD,都会作为一个DStream 处理。

案例:

需求:循环创建几个 RDD,将 RDD 放入队列。通过 SparkStream 创建 Dstream,计算 WordCount

代码:

val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDDStream")
val ssc = new StreamingContext(sparkConf, Seconds(4))

val rddQueue = new mutable.Queue[RDD[Int]]()
val inputStream = ssc.queueStream(rddQueue,oneAtATime = false)
val mappedStream = inputStream.map((_,1))
val reducedStream = mappedStream.reduceByKey(_ + _)
reducedStream.print()

ssc.start()
for (i <- 1 to 5) {
  rddQueue += ssc.sparkContext.makeRDD(1 to 300, 10)
  Thread.sleep(2000)
}
ssc.awaitTermination()

结果展示:

自定义数据源

自定义数据源需要继承 Receiver,并实现 onStart、onStop 方法来自定义数据源采集。

案例:自定义数据源,实现监控某个端口号,获取该端口号内容。

  1. 自定义数据源

class CustomerReceiver(host:String,port:Int) extends Receiver[String](StorageLevel.MEMORY_ONLY) {
  override def onStart(): Unit = {
    new Thread("Socket Receiver"){
      override def run(): Unit ={
        receive()
      }
    }.start()
  }

  def receive(): Unit ={
    var socket:Socket = new Socket(host,port)
    var input :String = null
    var reader = new BufferedReader(new InputStreamReader(socket.getInputStream,StandardCharsets.UTF_8))

    input = reader.readLine()
    while(!isStopped() && input != null){
      store(input)
      input = reader.readLine()
    }
    reader.close()
    socket.close()

    restart("restart")
  }

  override def onStop(): Unit = {}
}

  1. 使用自定义的数据源采集数据

val sparkConf = new SparkConf().setMaster("local[*]").setAppName("stream")
val ssc = new StreamingContext(sparkConf,Seconds(5))

val lineStream = ssc.receiverStream(new CustomerReceiver("node01",9999))

val wordStream = lineStream.flatMap(_.split(" "))
val wordAndOneStream = wordStream.map((_,1))
val wordAndCountStream = wordAndOneStream.reduceByKey(_+_)
wordAndCountStream.print()

ssc.start()
ssc.awaitTermination()

 

Spark-Streaming核心编程(四)

有状态转化操作

val updateFunc = (values:Seq[Int],state:Option[Int])=>{
  val currentCount = values.foldLeft(0)(_+_)
  val previousCount = state.getOrElse(0)
  Some(currentCount+previousCount)
}
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("update")
val ssc = new StreamingContext(sparkConf,Seconds(5))
ssc.checkpoint("./ck")

val lines = ssc.socketTextStream("node01",9999)
val words = lines.flatMap(_.split(" "))
val pairs = words.map((_,1))
val stateDStream = pairs.updateStateByKey[Int](updateFunc)
stateDStream.print()

ssc.start()
ssc.awaitTermination()

WindowOperations

Window Operations 可以设置窗口的大小和滑动窗口的间隔来动态的获取当前 Steaming 的允许状态。所有基于窗口的操作都需要两个参数,分别为窗口时长以及滑动步长。

  • 窗口时长:计算内容的时间范围;
  • 滑动步长:隔多久触发一次计算。

注意:这两者都必须为采集周期大小的整数倍。

val sparkConf = new SparkConf().setMaster("local[*]").setAppName("window")
val ssc = new StreamingContext(sparkConf,Seconds(3))
ssc.checkpoint("./ck")

val lines = ssc.socketTextStream("node01",9999)
val words = lines.flatMap(_.split(" "))
val pairs = words.map((_,1))
val wordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int)=>(a+b),Seconds(12),Seconds(6))
wordCounts.print()

ssc.start()
ssc.awaitTermination()

相关文章:

  • element-ui dropdown 组件源码分享
  • 【c++】AVL树模拟实现
  • Comfy UI 笔记
  • 文章记单词 | 第47篇(六级)
  • 面试记录1-春招补录0427
  • 基础学习:(9)vit -- vision transformer 和其变体调研
  • 《大型网站技术架构-核心原理与案例分析》笔记
  • UV工具的安装与使用
  • Leetcode:283. 移动零
  • Scala 函数柯里化及闭包
  • 343. 整数拆分
  • Tailwind CSS 实战:基于 Kooboo 构建企业官网页面(二)
  • 【SF】在 Android 显示系统中,图层合成方式 Device 和 Client 的区别
  • 信创系统资产清单采集脚本:主机名+IP+MAC 一键生成 CSV
  • 汽车产业链主表及类别表设计
  • 2、Linux操作系统下,ubuntu22.04版本安装搜狗输入法
  • ACM会议模板设置单排作者数量
  • 低压电工常见知识点
  • Java——琐碎知识点一
  • 国家与省市县 标准地图服务网站 审图号地图下载
  • 国家发改委答澎湃:将指导限购城市针对长期摇号家庭和无车家庭等重点群体定向增发购车指标
  • 四川苍溪县教育局通报“工作人员辱骂举报学生”:停职检查
  • 四川一国企“80后”掌门人为报领导“知遇之恩”,盲目决策致数亿损失
  • 冯象|那“交出”后的崩溃,如撒旦坠落诸天
  • 吏亦有道|秦汉的发明家与技术传承
  • 可移动可变形的新型超材料问世