当前位置: 首页 > news >正文

SparkSQL Join的源码分析

  1. 了解大厂经验
  2. 拥有和大厂相匹配的技术等

希望看什么,评论或者私信告诉我!

文章目录

    • 一、背景
      • 1. Join策略选择
      • 2. Hash Join实现
      • 3. Sort Merge Join实现
    • 总结
    • 参考资料

一、背景

SparkSQL 现在基本上可以说是离线计算的大拿了,所以掌握了 SparkSQL 的 Join 也就相当于掌握了这位大拿。

一直想要总结一下,今天遇到了 Broadcast 的一些事情,终于可以顺便把 SparkSQL 的 Join 总结一下

上一篇我们介绍了SparkSQL Join深度解析:三种实现方式全揭秘

为了更深入理解SparkSQL Join的实现原理,可以分析其源码。以下是SparkSQL Join的源码分析:

1. Join策略选择

SparkSQL在org.apache.spark.sql.execution.joins包中实现了各种Join策略。在Join类的doExecute方法中,会根据统计信息和配置选择合适的Join策略。

def doExecute(): RDD[InternalRow] = {val leftKeys = leftKeysArrayval rightKeys = rightKeysArrayif (joinType == JoinType.CROSS) {CrossHashJoin.doJoin(left, right, leftKeys, rightKeys, joinType, condition, leftFilters, rightFilters)} else {if (left.output.size > 0 && right.output.size > 0) {leftKeys.length match {case 0 =>// Cartesian productCartesianProduct.doJoin(left, right, joinType, condition, leftFilters, rightFilters)case 1 =>// Single key, use hash joinif (joinType == JoinType.INNER || joinType == JoinType.CROSS) {HashJoin.doJoin(left, right, leftKeys, rightKeys, joinType, condition, leftFilters, rightFilters)} else {// For outer joins, use sort merge join to preserve the orderSortMergeJoin.doJoin(left, right, leftKeys, rightKeys, joinType, condition, leftFilters, rightFilters)}case _ =>// Multiple keys, use sort merge joinSortMergeJoin.doJoin(left, right, leftKeys, rightKeys, joinType, condition, leftFilters, rightFilters)}} else {// One of the children has no output, return emptyRDD.empty[InternalRow](sparkContext)}}
}

2. Hash Join实现

Hash Join的实现主要在HashJoin类中。以下是Hash Join的主要实现步骤:

  1. 选择构建侧和Probe侧:根据统计信息选择较小的表作为构建侧
  2. 构建Hash表:将构建侧的数据按照Join键构建Hash表
  3. Probe阶段:将Probe侧的数据按照Join键进行查找
  4. 连接操作:根据Join类型(内连接、外连接等)进行相应的连接操作
object HashJoin {def doJoin(left: RDD[InternalRow],right: RDD[InternalRow],leftKeys: Array[Expression],rightKeys: Array[Expression],joinType: JoinType,condition: Option[Expression],leftFilters: Option[Expression],rightFilters: Option[Expression]): RDD[InternalRow] = {// 选择构建侧和Probe侧val (buildSide, probeSide) = chooseSides(left, right)val (buildKeys, probeKeys) = if (buildSide == BuildSide.LEFT) {(leftKeys, rightKeys)} else {(rightKeys, leftKeys)}// 构建Hash表val buildRDD = buildSide match {case BuildSide.LEFT =>left.mapPartitions(iter => {val keyToRows = new mutable.HashMap[Any, mutable.Buffer[InternalRow]]()iter.foreach(row => {val key = leftKeys.map(_.eval(row)).toArraykeyToRows.getOrElseUpdate(key, new mutable.ArrayBuffer[InternalRow]()) += row})iter ++ keyToRows.values.flatten})case BuildSide.RIGHT =>right.mapPartitions(iter => {val keyToRows = new mutable.HashMap[Any, mutable.Buffer[InternalRow]]()iter.foreach(row => {val key = rightKeys.map(_.eval(row)).toArraykeyToRows.getOrElseUpdate(key, new mutable.ArrayBuffer[InternalRow]()) += row})iter ++ keyToRows.values.flatten})}// Probe阶段val probeRDD = probeSide match {case BuildSide.LEFT =>right.mapPartitions(iter => {val keyToRows = new mutable.HashMap[Any, mutable.Buffer[InternalRow]]()iter.foreach(row => {val key = rightKeys.map(_.eval(row)).toArraykeyToRows.getOrElseUpdate(key, new mutable.ArrayBuffer[InternalRow]()) += row})iter ++ keyToRows.values.flatten})case BuildSide.RIGHT =>left.mapPartitions(iter => {val keyToRows = new mutable.HashMap[Any, mutable.Buffer[InternalRow]]()iter.foreach(row => {val key = leftKeys.map(_.eval(row)).toArraykeyToRows.getOrElseUpdate(key, new mutable.ArrayBuffer[InternalRow]()) += row})iter ++ keyToRows.values.flatten})}// 连接操作probeRDD.join(buildRDD).mapPartitions(iter => {iter.flatMap { case (key, (probeRow, buildRow)) =>// 根据Join类型进行连接操作joinType match {case JoinType.INNER =>if (condition.map(_.eval(probeRow, buildRow)).getOrElse(true)) {Some(InternalRow.fromSeq(probeRow ++ buildRow))} else {None}case JoinType.LEFT =>if (condition.map(_.eval(probeRow, buildRow)).getOrElse(true)) {Some(InternalRow.fromSeq(probeRow ++ buildRow))} else {Some(InternalRow.fromSeq(probeRow ++ Seq.fill(buildRow.length)(null)))}case JoinType.RIGHT =>if (condition.map(_.eval(probeRow, buildRow)).getOrElse(true)) {Some(InternalRow.fromSeq(Seq.fill(probeRow.length)(null) ++ buildRow))} else {Some(InternalRow.fromSeq(Seq.fill(probeRow.length)(null) ++ buildRow))}case JoinType.FULL =>if (condition.map(_.eval(probeRow, buildRow)).getOrElse(true)) {Some(InternalRow.fromSeq(probeRow ++ buildRow))} else {Some(InternalRow.fromSeq(probeRow ++ Seq.fill(buildRow.length)(null)))Some(InternalRow.fromSeq(Seq.fill(probeRow.length)(null) ++ buildRow))}}}})}
}

3. Sort Merge Join实现

Sort Merge Join的实现主要在SortMergeJoin类中。以下是Sort Merge Join的主要实现步骤:

  1. 排序:对两个表按照Join键进行排序
  2. 合并:使用双指针技术合并两个排序后的数据集
  3. 连接操作:根据Join类型进行连接操作
object SortMergeJoin {def doJoin(left: RDD[InternalRow],right: RDD[InternalRow],leftKeys: Array[Expression],rightKeys: Array[Expression],joinType: JoinType,condition: Option[Expression],leftFilters: Option[Expression],rightFilters: Option[Expression]): RDD[InternalRow] = {// 排序val sortedLeft = left.sortBy(row => leftKeys.map(_.eval(row)).toArray)val sortedRight = right.sortBy(row => rightKeys.map(_.eval(row)).toArray)// 合并sortedLeft.zip(sortedRight).mapPartitions(iter => {val leftIter = iter.map(_._1).iteratorval rightIter = iter.map(_._2).iteratorval leftRow = new mutable.ArrayBuffer[InternalRow]()val rightRow = new mutable.ArrayBuffer[InternalRow]()while (leftIter.hasNext && rightIter.hasNext) {val l = leftIter.next()val r = rightIter.next()val lKey = leftKeys.map(_.eval(l)).toArrayval rKey = rightKeys.map(_.eval(r)).toArrayif (lKey < rKey) {leftRow += l} else if (lKey > rKey) {rightRow += r} else {// Join键相等,进行连接操作if (condition.map(_.eval(l, r)).getOrElse(true)) {yield JoinedRow(l, r)}// 处理重复键while (leftIter.hasNext && leftKeys.map(_.eval(leftIter.head)).toArray == lKey) {leftRow += leftIter.next()}while (rightIter.hasNext && rightKeys.map(_.eval(rightIter.head)).toArray == rKey) {rightRow += rightIter.next()}// 生成所有可能的组合for (l <- leftRow; r <- rightRow) {if (condition.map(_.eval(l, r)).getOrElse(true)) {yield JoinedRow(l, r)}}leftRow.clear()rightRow.clear()}}// 处理剩余的行while (leftIter.hasNext) {leftRow += leftIter.next()}while (rightIter.hasNext) {rightRow += rightIter.next()}// 根据Join类型处理剩余的行joinType match {case JoinType.INNER =>// 不需要处理剩余的行case JoinType.LEFT =>for (l <- leftRow) {if (leftFilters.map(_.eval(l)).getOrElse(true)) {yield JoinedRow(l, null)}}case JoinType.RIGHT =>for (r <- rightRow) {if (rightFilters.map(_.eval(r)).getOrElse(true)) {yield JoinedRow(null, r)}}case JoinType.FULL =>for (l <- leftRow) {if (leftFilters.map(_.eval(l)).getOrElse(true)) {yield JoinedRow(l, null)}}for (r <- rightRow) {if (rightFilters.map(_.eval(r)).getOrElse(true)) {yield JoinedRow(null, r)}}}})}
}

总结

本报告详细介绍了SparkSQL中Join的实现方式,包括Broadcast Join、Hash Join(包括Shuffle Hash Join)和Sort Merge Join。通过分析它们的实现原理、工作流程和适用场景,我们可以更好地理解SparkSQL中Join操作的内部机制。
在实际应用中,选择合适的Join策略对于提高SparkSQL查询性能至关重要。根据表的大小、数据分布和内存资源选择合适的Join策略,可以显著提高Join操作的性能。
通过深入理解SparkSQL Join的实现原理,我们可以更好地优化SparkSQL查询,提高大数据处理的效率和性能。

参考资料

  1. Spark SQL join的三种实现方式
  2. Spark SQL Join实现原理
  3. SparkSQL中的三种Join及其具体实现
  4. Spark SQL Performance Tuning Documentation
  5. Spark SQL Join Source Code Analysis

相关文章:

  • python自动化浏览器标签页的切换
  • 大模型——Crawl4AI 中的数据提取策略
  • 【FPGA基础学习】DDS信号发生器设计
  • AI图片生成器
  • AIP-235 批量方法:Delete
  • idea如何使用git
  • Maybe:打造个人财务管理的开源操作系统
  • SpringBoot-基础特性
  • 前端vue3 实现倒计时功能 组件
  • 重返JAVA之路——图书管理系统
  • B2B2C多用户商城平台 的两种创新玩法
  • 华熙生物亮相消博会,这次又带来了什么样的变化?
  • springboot项目添加定时任务,用sftp推送zip包到目标服务器
  • 车载信息安全 --- 密钥管理
  • Anaconda笔记
  • C语言-习题整理(1)
  • 第 2 篇:快速上手 Framer Motion(实操入门)
  • 烽火ai场控接入deepseek自动回复话术软件
  • 【Python】列表的创建:[[] for _ in range(2)] 与 [[]] * 2有什么区别?
  • STM32F407实现内部FLASH的读写功能
  • 一中国公民在日本滑雪场意外死亡,我领馆发布提醒
  • 大理州工业投资(集团)有限公司党委副书记、副总经理赵云接受审查调查
  • 上海印发《新时代新征程促进民营经济高质量发展的若干措施》(全文)
  • 又有多名券商员工考公转型,近两年证券从业人员数量减逾7%
  • 开放创新,筑人民之城——写在浦东开发开放35周年之际
  • 英伟达CEO黄仁勋到访北京,称希望继续与中国合作