SparkSQL Join的源码分析
- 了解大厂经验
- 拥有和大厂相匹配的技术等
希望看什么,评论或者私信告诉我!
文章目录
- 一、背景
- 1. Join策略选择
- 2. Hash Join实现
- 3. Sort Merge Join实现
- 总结
- 参考资料
一、背景
SparkSQL 现在基本上可以说是离线计算的大拿了,所以掌握了 SparkSQL 的 Join 也就相当于掌握了这位大拿。
一直想要总结一下,今天遇到了 Broadcast 的一些事情,终于可以顺便把 SparkSQL 的 Join 总结一下
上一篇我们介绍了SparkSQL Join深度解析:三种实现方式全揭秘
为了更深入理解SparkSQL Join的实现原理,可以分析其源码。以下是SparkSQL Join的源码分析:
1. Join策略选择
SparkSQL在org.apache.spark.sql.execution.joins
包中实现了各种Join策略。在Join
类的doExecute
方法中,会根据统计信息和配置选择合适的Join策略。
def doExecute(): RDD[InternalRow] = {val leftKeys = leftKeysArrayval rightKeys = rightKeysArrayif (joinType == JoinType.CROSS) {CrossHashJoin.doJoin(left, right, leftKeys, rightKeys, joinType, condition, leftFilters, rightFilters)} else {if (left.output.size > 0 && right.output.size > 0) {leftKeys.length match {case 0 =>// Cartesian productCartesianProduct.doJoin(left, right, joinType, condition, leftFilters, rightFilters)case 1 =>// Single key, use hash joinif (joinType == JoinType.INNER || joinType == JoinType.CROSS) {HashJoin.doJoin(left, right, leftKeys, rightKeys, joinType, condition, leftFilters, rightFilters)} else {// For outer joins, use sort merge join to preserve the orderSortMergeJoin.doJoin(left, right, leftKeys, rightKeys, joinType, condition, leftFilters, rightFilters)}case _ =>// Multiple keys, use sort merge joinSortMergeJoin.doJoin(left, right, leftKeys, rightKeys, joinType, condition, leftFilters, rightFilters)}} else {// One of the children has no output, return emptyRDD.empty[InternalRow](sparkContext)}}
}
2. Hash Join实现
Hash Join的实现主要在HashJoin
类中。以下是Hash Join的主要实现步骤:
- 选择构建侧和Probe侧:根据统计信息选择较小的表作为构建侧
- 构建Hash表:将构建侧的数据按照Join键构建Hash表
- Probe阶段:将Probe侧的数据按照Join键进行查找
- 连接操作:根据Join类型(内连接、外连接等)进行相应的连接操作
object HashJoin {def doJoin(left: RDD[InternalRow],right: RDD[InternalRow],leftKeys: Array[Expression],rightKeys: Array[Expression],joinType: JoinType,condition: Option[Expression],leftFilters: Option[Expression],rightFilters: Option[Expression]): RDD[InternalRow] = {// 选择构建侧和Probe侧val (buildSide, probeSide) = chooseSides(left, right)val (buildKeys, probeKeys) = if (buildSide == BuildSide.LEFT) {(leftKeys, rightKeys)} else {(rightKeys, leftKeys)}// 构建Hash表val buildRDD = buildSide match {case BuildSide.LEFT =>left.mapPartitions(iter => {val keyToRows = new mutable.HashMap[Any, mutable.Buffer[InternalRow]]()iter.foreach(row => {val key = leftKeys.map(_.eval(row)).toArraykeyToRows.getOrElseUpdate(key, new mutable.ArrayBuffer[InternalRow]()) += row})iter ++ keyToRows.values.flatten})case BuildSide.RIGHT =>right.mapPartitions(iter => {val keyToRows = new mutable.HashMap[Any, mutable.Buffer[InternalRow]]()iter.foreach(row => {val key = rightKeys.map(_.eval(row)).toArraykeyToRows.getOrElseUpdate(key, new mutable.ArrayBuffer[InternalRow]()) += row})iter ++ keyToRows.values.flatten})}// Probe阶段val probeRDD = probeSide match {case BuildSide.LEFT =>right.mapPartitions(iter => {val keyToRows = new mutable.HashMap[Any, mutable.Buffer[InternalRow]]()iter.foreach(row => {val key = rightKeys.map(_.eval(row)).toArraykeyToRows.getOrElseUpdate(key, new mutable.ArrayBuffer[InternalRow]()) += row})iter ++ keyToRows.values.flatten})case BuildSide.RIGHT =>left.mapPartitions(iter => {val keyToRows = new mutable.HashMap[Any, mutable.Buffer[InternalRow]]()iter.foreach(row => {val key = leftKeys.map(_.eval(row)).toArraykeyToRows.getOrElseUpdate(key, new mutable.ArrayBuffer[InternalRow]()) += row})iter ++ keyToRows.values.flatten})}// 连接操作probeRDD.join(buildRDD).mapPartitions(iter => {iter.flatMap { case (key, (probeRow, buildRow)) =>// 根据Join类型进行连接操作joinType match {case JoinType.INNER =>if (condition.map(_.eval(probeRow, buildRow)).getOrElse(true)) {Some(InternalRow.fromSeq(probeRow ++ buildRow))} else {None}case JoinType.LEFT =>if (condition.map(_.eval(probeRow, buildRow)).getOrElse(true)) {Some(InternalRow.fromSeq(probeRow ++ buildRow))} else {Some(InternalRow.fromSeq(probeRow ++ Seq.fill(buildRow.length)(null)))}case JoinType.RIGHT =>if (condition.map(_.eval(probeRow, buildRow)).getOrElse(true)) {Some(InternalRow.fromSeq(Seq.fill(probeRow.length)(null) ++ buildRow))} else {Some(InternalRow.fromSeq(Seq.fill(probeRow.length)(null) ++ buildRow))}case JoinType.FULL =>if (condition.map(_.eval(probeRow, buildRow)).getOrElse(true)) {Some(InternalRow.fromSeq(probeRow ++ buildRow))} else {Some(InternalRow.fromSeq(probeRow ++ Seq.fill(buildRow.length)(null)))Some(InternalRow.fromSeq(Seq.fill(probeRow.length)(null) ++ buildRow))}}}})}
}
3. Sort Merge Join实现
Sort Merge Join的实现主要在SortMergeJoin
类中。以下是Sort Merge Join的主要实现步骤:
- 排序:对两个表按照Join键进行排序
- 合并:使用双指针技术合并两个排序后的数据集
- 连接操作:根据Join类型进行连接操作
object SortMergeJoin {def doJoin(left: RDD[InternalRow],right: RDD[InternalRow],leftKeys: Array[Expression],rightKeys: Array[Expression],joinType: JoinType,condition: Option[Expression],leftFilters: Option[Expression],rightFilters: Option[Expression]): RDD[InternalRow] = {// 排序val sortedLeft = left.sortBy(row => leftKeys.map(_.eval(row)).toArray)val sortedRight = right.sortBy(row => rightKeys.map(_.eval(row)).toArray)// 合并sortedLeft.zip(sortedRight).mapPartitions(iter => {val leftIter = iter.map(_._1).iteratorval rightIter = iter.map(_._2).iteratorval leftRow = new mutable.ArrayBuffer[InternalRow]()val rightRow = new mutable.ArrayBuffer[InternalRow]()while (leftIter.hasNext && rightIter.hasNext) {val l = leftIter.next()val r = rightIter.next()val lKey = leftKeys.map(_.eval(l)).toArrayval rKey = rightKeys.map(_.eval(r)).toArrayif (lKey < rKey) {leftRow += l} else if (lKey > rKey) {rightRow += r} else {// Join键相等,进行连接操作if (condition.map(_.eval(l, r)).getOrElse(true)) {yield JoinedRow(l, r)}// 处理重复键while (leftIter.hasNext && leftKeys.map(_.eval(leftIter.head)).toArray == lKey) {leftRow += leftIter.next()}while (rightIter.hasNext && rightKeys.map(_.eval(rightIter.head)).toArray == rKey) {rightRow += rightIter.next()}// 生成所有可能的组合for (l <- leftRow; r <- rightRow) {if (condition.map(_.eval(l, r)).getOrElse(true)) {yield JoinedRow(l, r)}}leftRow.clear()rightRow.clear()}}// 处理剩余的行while (leftIter.hasNext) {leftRow += leftIter.next()}while (rightIter.hasNext) {rightRow += rightIter.next()}// 根据Join类型处理剩余的行joinType match {case JoinType.INNER =>// 不需要处理剩余的行case JoinType.LEFT =>for (l <- leftRow) {if (leftFilters.map(_.eval(l)).getOrElse(true)) {yield JoinedRow(l, null)}}case JoinType.RIGHT =>for (r <- rightRow) {if (rightFilters.map(_.eval(r)).getOrElse(true)) {yield JoinedRow(null, r)}}case JoinType.FULL =>for (l <- leftRow) {if (leftFilters.map(_.eval(l)).getOrElse(true)) {yield JoinedRow(l, null)}}for (r <- rightRow) {if (rightFilters.map(_.eval(r)).getOrElse(true)) {yield JoinedRow(null, r)}}}})}
}
总结
本报告详细介绍了SparkSQL中Join的实现方式,包括Broadcast Join、Hash Join(包括Shuffle Hash Join)和Sort Merge Join。通过分析它们的实现原理、工作流程和适用场景,我们可以更好地理解SparkSQL中Join操作的内部机制。
在实际应用中,选择合适的Join策略对于提高SparkSQL查询性能至关重要。根据表的大小、数据分布和内存资源选择合适的Join策略,可以显著提高Join操作的性能。
通过深入理解SparkSQL Join的实现原理,我们可以更好地优化SparkSQL查询,提高大数据处理的效率和性能。
参考资料
- Spark SQL join的三种实现方式
- Spark SQL Join实现原理
- SparkSQL中的三种Join及其具体实现
- Spark SQL Performance Tuning Documentation
- Spark SQL Join Source Code Analysis