Spark 极速回顾
Spark 极速回顾
Spark的典型部署方式
Spark可以通过多种方式部署,其中最常见的是Spark On Yarn。此方式中,Spark客户端直接连接Yarn,利用Yarn进行资源调度。具体分为Yarn-Cluster和Yarn-Client两种模式,主要区别在于Driver的运行节点。
其他部署方式包括Local和StandAlone等。
Spark的架构
Driver:驱动程序,解析用户代码,生成DAG并协调任务执行。
Executor:执行器,负责在工作节点上运行具体的任务。
Cluster Manager:集群管理器,资源管理核心(Spark On Yarn则表示使用Yarn完成资源的管理与调度)。
Worker Node:物理工作节点,负责启动和管理Executor进程。每个Worker可以运行多个Executor。
Task:执行器执行的最小工作单元。
Spark的作业提交流程
- spark-submit提交作业(指定master、deploy-mode、资源参数)
- ResourceManager分配容器,在NodeManager节点启动ApplicationMaster(即Driver)
- Driver初始化SparkContext,向ResourceManager申请Executor资源
- NodeManager启动ExecutorBackend进程,注册到Driver
- DAGScheduler将Job拆分为Stage,TaskScheduler分发Task到Executor
- Executor执行Task,通过BlockManager管理Shuffle数据
- 作业完成后释放Yarn容器资源
Spark Core之RDD的五大特性
- 分区列表:数据分布式存储在多个Partition
- 血缘依赖:通过Lineage记录RDD转换关系(容错基础)
- 计算函数:每个RDD通过compute方法生成子RDD数据
- 分区函数:仅Key-Value型RDD可指定Partitioner(Hash/Range)
- 数据本地性:优先将Task调度到数据所在节点(移动计算而非数据)
Spark中的宽窄依赖
宽依赖:父RDD的一个分区可能被多个子RDD的分区使用,需要跨节点的数据传输,设计网络传输,开销大,会触发Shuffle
窄依赖:父RDD的分区最多被子RDD的一个分区使用,数据传递是局部的,不需要跨节点的网络传输,无需出发Shuffle
窄依赖的典型算子:map、filter、flatMap
宽依赖的典型算子:groupByKey、reduceByKey
Spark中groupByKey和reduceByKey的区别
reduceByKey会在shuffle之前先进行一个预聚合操作(Map端的预聚合),而groupByKey并不会,网络开销更大。
Spark中的Shuffle简介
Shuffle简介:跨节点数据重分布的过程,通常由宽窄依赖操作触发,将数据按照Key进行重新分组,确保相同Key的数据落入到同一个分区,方便后续聚合或计算。
Shuffle方法:SortShuffle,对数据进行排序来优化Shuffle过程,每个Map任务将数据写入一个临时文件,并在写入过程中对数据进行排序。Reduce任务从这些排序后的文件中读取数据,进行合并。后续优化成了基于堆外内存和二进制数据操作。
Spark中Repartition和Coalesce的关系与区别
两者都是用来改变RDD的分区数量,repartition底层调用的就是coalesce,但是repartition一定会发生shuffle,coalesce会根据传入的参数来判断是否会发生shuffle。
增大partition推荐使用repartition,减少partition推荐使用coalesce。
Spark中广播变量和累加器的基本原理与用途
累加器:仅支持累加操作的变量,通常用于在执行并行操作时对数值进行聚合,其值只能在Driver进程中读取,不能在任务中读取(聚合分布式的改变)。典型应用如计算满足某个条件的数量、计算总和、最大值等。
广播变量:将只读数据缓存到每个节点上,以便在任务中使用,避免在每个任务中重复传输数据,也就是在集群的每个节点上缓存一份(共享大家都需要的数据)。典型应用如需要多次使用的公共只读数据。
注:累加器和广播变量都属于共享变量,分别为结果聚合与广播这两种常见的通信模式。
Spark SQL中RDD、DataFrame、DataSet三者的区别与联系
-
RDD (Resilient Distributed Dataset)
- 定义: RDD是Spark中最基本的数据抽象,代表一个不可变的分布式集合对象,可以通过类名点的方式直接操作数据。
- 特点:
- 不支持查询优化。
- 强类型,支持编译时类型检查。
- 适合处理非结构化数据。
DataFrame
- 定义: DataFrame是一个分布式数据集的表格化表示,类似于关系数据库中的表。
- 特点:
- 使用Catalyst优化器进行查询优化。
- 弱类型,不支持编译时类型检查。
- 适合处理结构化数据。
Dataset
- 定义: Dataset是Spark中的分布式数据集抽象,结合了RDD的强类型和DataFrame的优化特性。
- 特点:
- 使用Catalyst优化器进行查询优化。
- 强类型,支持编译时类型检查。
- 适合Scala/Java复杂业务场景。
区别
- 查询优化: RDD不支持查询优化,而DataFrame和Dataset使用Catalyst优化器进行优化。
- 类型安全: RDD和Dataset是强类型的,支持编译时类型检查;DataFrame是弱类型的。
- 数据结构: RDD适合无结构化数据,DataFrame和Dataset适合结构化数据。
联系
- 转换: RDD、DataFrame和Dataset可以相互转换。DataFrame和Dataset都是基于RDD构建的。
总结
- RDD: 灵活但低效,适合非结构化数据。
- DataFrame: 高效易用,适合结构化数据处理。
- Dataset: 类型安全的高效抽象,适合Scala/Java复杂业务场景。
Spark中BroadCast Join的基本原理与用途
原理:先将小表数据查询出来聚合到Driver端,再广播到各个Executor端,使表与表join时先进行本地join,避免进行网络传输产生shuffle
用途:大表join小表,广播小表,提高性能。
Spark Streaming的基本原理
Spark Streaming通过将实时数据流分解成为小的批次来处理(微批次),也就是一个个由时间片划分的RDD序列,每个批次的数据都作为一个独立作业处理。
Spark Streaming的窗口操作
计算一段时间内的数据聚合,也就是在原来计算批次大小的基础上再进行封装,每次计算多个批次的数据。
Spark Streaming如何消费Kafka数据?
基于Direct的方式,周期性查询Kafka,来获得每个topic+partition的最新offset,从而定义每个batch的offset范围。
SparkStreaming核心
**事件时间处理:**通过Watermark处理延迟数据(而不是根据批数据到达的时间进行处理)。
状态管理:通过mapGroupWithState实现复杂会话逻辑。
**端到端Exactly-Once:**Kafka事务写入(原子性)、幂等输出存储(防止重复发送)、Checkpoint元数据跟踪(元数据持久性与故障恢复性)。
**背压机制:**动态速率调节(根据处理延迟自动调整Kafka的拉取速率),配置参数(spark.streaming.backpressure.enabled=true)
数据倾斜
定义:某个Stage中的某个Task运行数据量/时长远远超过其它Task内的平均运行数据量或时间
如何定位:某个Task运行的数据量或时长超过其它Task平均指标的五倍,某个Executor运行时间过长/OOM错误等,Spark WebUI日志分析(纯人工判断)。
原因:产生shuffle类的算子,例如join、group by、distinct、开窗函数等。
发生阶段:绝大多数是在Reduce阶段,也就是Shuffle重分区之后,导致某个Reduce间数据分布不均匀。
解决方案:
方案一,Spark AQE,自动将倾斜分区拆成多个分区进行join,默认判断是某分区的数据量超过平均分区数据量5倍以上会被Spark进行拆分。
方案二,广播,大表join小表,将小表进行广播。调整spark.sql.autoBroadcastJoinThreshold、spark.sql.broadcastTimeout。
方案三,若是异常key倾斜,则可提前完成异常值过滤。
方案四,加盐关联,仅对倾斜的key值做加盐处理。
Spark Core
内存管理模型
堆内内存 + 堆外内存 + 内存溢出到磁盘。
任务调度机制
DAGScheduler:Job划分为Stage,处理Shuffle依赖并生成TaskSet。
TaskScheduler:分配Task到Executor,支持FIFO/FAIR调度模式
持久化与CheckPoint
持久化类型:MEMORY_ONLY_SER(序列化节省空间)、MEMORY_AND_DISK(内存不足落盘)
Checkpoint:切断RDD血缘,将数据持久化到HDFS上。
Join策略
BroadcastHash:大表join小表,适合于星型模型维度表关联
SortMergeJoin:两表均分桶且排序(两表数据已按Join Key分区且排序),适合于大规模数据关联
ShuffleHash:单表可放入内存,适合于中等规模数据关联
BucketJoin:两表均按相同列分桶,且分桶成倍数关系。
策略 | 网络开销 | 内存开销 | CPU开销 | 适用数据规模 |
---|---|---|---|---|
BroadcastHash | 无 | 高(小表) | 低 | 小表 + 任意大表 |
SortMerge | 高 | 低 | 高(排序) | 大规模数据关联 |
ShuffleHash | 高 | 中 | 中 | 中等规模数据 |
BucketJoin | 无/低 | 低 | 低 | 预分桶数据 |
选择策略:
小表能否广播(小表广播不犹豫)——>BroadcastHash Join
数据是否预分桶(提前分桶是王道)——>BucketJoin
内存是否足够缓存分区数据(中等身材内存够)——>ShuffleHash Join
默认选择(两个巨人来PK)——> SortMerge Join
OOM问题排查
Driver OOM:常见原因,大结果集collect。解决方案,提前过滤数据。
Executor OOM:数据倾斜/内存泄露。
Shuffle OOM:数据倾斜/分区过大。
慢任务如何定位
Spark Web UI界面中通过持续时间、输入输出数据量进行判断。
资源监控可视化界面。
DAG调度核心流程
Action触发Job提交,进入DAG调度器,划分Stage,提交TaskSet,进入Task调度器,调度Task到具体的Executo完成具体的任务执行(Spark Web UI页面中,可详细查看Job、Stage、TaskSet、Task之间的关系,以及DAG的划分)