当前位置: 首页 > news >正文

Spark RDD行动算子与共享变量实战:从数据聚合到分布式通信

RDD行动算子:
行动算子就是会触发action的算子,触发action的含义就是真正的计算数据。
1、reduce
import org.apache.spark.{SparkConf, SparkContext}
object value11 {
  def main(args: Array[String]): Unit = {
    // 创建 SparkConf 对象并设置应用名称和运行模式
    val conf = new SparkConf().setAppName("RDDExample").setMaster("local[*]")
    // 创建 SparkContext 对象
    val sc = new SparkContext(conf)

    // 创建包含 1,2,3,4 的 RDD
    val rdd = sc.makeRDD(List(1, 2, 3, 4))
    // 使用 reduce 操作对 RDD 元素求和
    val reduceResult = rdd.reduce(_ + _)
    // 输出结果
    println(reduceResult)

    // 关闭 SparkContext
    sc.stop()
  }
}

2、Foreach
import org.apache.spark.{SparkConf, SparkContext}

object value12 {
  def main(args: Array[String]): Unit = {
    // 创建 SparkConf 对象并设置应用名称和运行模式
    val conf = new SparkConf().setAppName("RDDForeachExample").setMaster("local[*]")
    // 创建 SparkContext 对象
    val sc = new SparkContext(conf)

    // 创建包含 1,2,3,4 的 RDD
    val rdd = sc.makeRDD(List(1, 2, 3, 4))
    // 对 RDD 元素进行遍历并打印
    rdd.foreach(println)

    // 关闭 SparkContext
    sc.stop()
  }
}

3、count
import org.apache.spark.{SparkConf, SparkContext}

object value13{
  def main(args: Array[String]): Unit = {
    // 创建 SparkConf 对象并设置应用名称和运行模式
    val conf = new SparkConf().setAppName("RDDActionExample").setMaster("local[*]")
    // 创建 SparkContext 对象
    val sc = new SparkContext(conf)

    // 创建包含 1,2,3,4 的 RDD
    val rdd = sc.makeRDD(List(1, 2, 3, 4))
    // 使用 count 算子统计 RDD 中元素个数
    val countResult = rdd.count()
    // 打印统计结果
    println(countResult)

    // 关闭 SparkContext
    sc.stop()
  }
}

4、Take
import org.apache.spark.{SparkConf, SparkContext}

object value14 {
  def main(args: Array[String]): Unit = {
    // 创建 SparkConf 对象并设置应用名称和运行模式
    val conf = new SparkConf().setAppName("RDDActionTakeExample").setMaster("local[*]")
    // 创建 SparkContext 对象
    val sc = new SparkContext(conf)

    // 创建包含 1,2,3,4 的 RDD
    val rdd = sc.makeRDD(List(1, 2, 3, 4))
    // 使用 take 算子获取 RDD 的前 2 个元素
    val takeResult = rdd.take(2)
    // 遍历并打印获取到的元素
    takeResult.foreach(println)

    // 关闭 SparkContext
    sc.stop()
  }
}

5、first
import org.apache.spark.{SparkConf, SparkContext}

object value15 {
  def main(args: Array[String]): Unit = {
    // 创建 SparkConf 对象并设置应用名称和运行模式
    val conf = new SparkConf().setAppName("RDDFirstExample").setMaster("local[*]")
    // 创建 SparkContext 对象
    val sc = new SparkContext(conf)

    // 创建包含 1,2,3,4 的 RDD
    val rdd = sc.makeRDD(List(1, 2, 3, 4))
    // 使用 first 算子获取 RDD 中的第一个元素
    val firstResult = rdd.first()
    // 打印获取到的第一个元素
    println(firstResult)

    // 关闭 SparkContext
    sc.stop()
  }
}

6、Aggregate
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.util.ClassTag

object 、value16{
  def main(args: Array[String]): Unit = {
    // 创建 SparkConf 对象并设置应用名称和运行模式
    val conf = new SparkConf().setAppName("RDDAggregateExample").setMaster("local[*]")
    // 创建 SparkContext 对象
    val sc = new SparkContext(conf)

    // 创建包含 1,2,3,4 且分区数为 8 的 RDD
    val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 8)

    // 使用 aggregate 算子聚合 RDD 元素,初始值为 0
    val result1: Int = rdd.aggregate(0)(_ + _, _ + _)
    // 使用 aggregate 算子聚合 RDD 元素,初始值为 10
    val result2: Int = rdd.aggregate(10)(_ + _, _ + _)

    // 打印结果
    println(result1)
    println("**********")
    println(result2)

    // 关闭 SparkContext
    sc.stop()
  }
}

7、save 相关算子
import org.apache.spark.{SparkConf, SparkContext}

object value17 {
  def main(args: Array[String]): Unit = {
    // 创建 SparkConf 对象并设置应用名称和运行模式
    val conf = new SparkConf().setAppName("RDDSaveExample").setMaster("local[*]")
    // 创建 SparkContext 对象
    val sc = new SparkContext(conf)

    // 创建包含 1,2,3,4 的 RDD
    val rdd = sc.makeRDD(List(1, 2, 3, 4))

    // 保存成 Text 文件
    rdd.saveAsTextFile("Spark-core/output/output")
    // 序列化成对象保存到文件
    rdd.saveAsObjectFile("Spark-core/output/output1")

    // 关闭 SparkContext
    sc.stop()
  }
}

实现原理
累加器用来把 Executor 端变量信息聚合到 Driver 端。在 Driver 程序中定义的变量,在
Executor 端的每个 Task 都会得到这个变量的一份新的副本,每个 task 更新这些副本的值后,传回 Driver 端进行 merge。
import org.apache.spark.{SparkConf, SparkContext}

object value18 {
  def main(args: Array[String]): Unit = {
    // 创建 SparkConf 对象并设置应用名称和运行模式
    val conf = new SparkConf().setAppName("AccumulatorExample").setMaster("local[*]")
    // 创建 SparkContext 对象
    val sc = new SparkContext(conf)

    // 创建包含 1,2,3,4,5 的 RDD
    val rdd = sc.makeRDD(List(1, 2, 3, 4, 5))
    // 声明累加器
    val sum = sc.longAccumulator("sum")

    rdd.foreach { num =>
      // 使用累加器
      sum.add(num)
    }

    // 获取累加器的值
    println("sum = " + sum.value)

    // 关闭 SparkContext
    sc.stop()
  }
}
实现原理
广播变量用来高效分发较大的对象。向所有工作节点发送一个较大的只读值,以供一个
或多个 Spark 操作使用。比如,如果你的应用需要向所有节点发送一个较大的只读查询表,
广播变量用起来都很顺手。在多个并行操作中使用同一个变量,但是 Spark 会为每个任务
分别发送。
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.broadcast.Broadcast

object  value19{
  def main(args: Array[String]): Unit = {
    // 创建SparkConf对象并设置应用名称和运行模式
    val conf = new SparkConf().setAppName("BroadcastExample").setMaster("local[*]")
    // 创建SparkContext对象
    val sparkContext = new SparkContext(conf)

    // 创建RDD
    val rdd1 = sparkContext.makeRDD(List(("a", 1), ("b", 2), ("c", 3), ("d", 4)), 4)
    // 定义列表
    val list = List(("a", 4), ("b", 5), ("c", 6), ("d", 7))
    // 创建广播变量
    val broadcast: Broadcast[List[(String, Int)]] = sparkContext.broadcast(list)

    // 对RDD进行转换
    val resultRDD = rdd1.map { case (key, num) =>
      var num2 = 0
      for ((k, v) <- broadcast.value) {
        if (k == key) {
          num2 = v
        }
      }
      (key, (num, num2))
    }

    // 收集并打印结果
    resultRDD.collect().foreach(println)

    // 关闭SparkContext
    sparkContext.stop()
  }
}

相关文章:

  • 革新桌面自动化:微软UFO²操作系统深度解析与未来展望
  • 迷你世界UGC3.0脚本Wiki角色模块管理接口 Actor
  • django filter 排除字段
  • 程序代码篇---ESP32 Camera Server
  • 【Redis】zset类型
  • go语言八股文(三)
  • 2个小时1.5w字| React Golang 全栈微服务实战
  • 新增29个专业,科技成为未来主赛道!
  • 04.通过OpenAPI-Swagger规范让Dify玩转Agent
  • Linux操作系统学习---进程地址空间
  • Zabbix
  • Clang中ext_vector_type和address_space __attribute__的使用
  • 《从分遗产说起:JS 原型与继承详解》
  • 测地型GNSS接收机_毫米高精度精准定位
  • NEPCON China 2025 | 具身智能时代来临,灵途科技助力人形机器人“感知升级”
  • 读写算杂志读写算杂志社读写算编辑部2025年第12期目录
  • 现场问题排查-postgresql某表索引损坏导致指定数据无法更新影响卷宗材料上传
  • 97A6-ASEMI无人机专用功率器件97A6
  • 【神经网络与深度学习】端到端方法和多任务学习
  • 2025系统架构师---事件驱动架构
  • 柴德赓、纪庸与叫歇碑
  • 酒店就“保洁员调包住客港币”致歉,称希望尽早达成解决方案
  • 谁将主导“视觉大脑”?中国AI的下一个超级赛道
  • 人民日报读者点题:规范涉企执法,怎样防止问题反弹、提振企业信心?
  • 白酒瓶“神似”北京第一高楼被判侵权,法院一审判赔45万并停售
  • 中青报:“猿辅导员工猝死”事件上热搜,是对健康职场环境的共同关切