当前位置: 首页 > news >正文

Spark SQL开发实战:从IDEA环境搭建到UDF/UDAF自定义函数实现

利用IDEA开发Spark-SQL
1、创建子模块Spark-SQL,并添加依赖
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.12</artifactId>
    <version>3.0.0</version>
</dependency>
3、创建Spark-SQL的测试代码:
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

case class User(id: Int, name: String, age: Int)

object value20 {
  def main(args: Array[String]): Unit = {
    // 创建上下文环境配置对象
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SQLDemo")
    // 创建SparkSession对象
    val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()

    import spark.implicits._

    try {
      // 读取json文件
      val df: DataFrame = spark.read.json("Spark-SQL/input/user.json")
      df.show()
      // SQL风格语法
      df.createOrReplaceTempView("user")
      spark.sql("select * from user").show()
      spark.sql("select avg(age) from user").show()

      // DSL风格语法
      df.select("username", "age").show()

      // RDD=>DataFrame=>DataSet
      // RDD
      val rdd1: RDD[(Int, String, Int)] = spark.sparkContext.makeRDD(
        List((1, "zhangsan", 30), (2, "lisi", 40))
      )
      // DataFrame
      val df1: DataFrame = rdd1.toDF("id", "name", "age")
      df1.show()
      //DataSet
      val ds1: Dataset[User] = df1.as[User]
      ds1.show()

      // DataSet=>DataFrame=>RDD
      val df2 = ds1.toDF()
      df2.show()

      val rdd2: RDD[Row] = df2.rdd
      rdd2.foreach(a => println(a.getString(1)))

      rdd1.map {
        case (id, name, age) => User(id, name, age)
      }.toDS().show()

      val rdd3 = ds1.rdd
      rdd3.foreach { a =>
        println(a.age)
        println(a.id)
        println(a.name)
      }
    } catch {
      case e: Exception =>
        println(s"发生异常: ${e.getMessage}")
    } finally {
      spark.stop()
    }
  }
}

自定义函数:
UDF:
import org.apache.spark.sql.SparkSession

object value21 {
  def main(args: Array[String]): Unit = {
    // 创建SparkConf对象
    val sparkConf = new org.apache.spark.SparkConf()
     .setMaster("local[*]")
     .setAppName("SQLDemo")

    // 创建SparkSession对象
    val spark = SparkSession.builder()
     .config(sparkConf)
     .getOrCreate()

    import spark.implicits._

    // 读取json文件
    val df = spark.read.json("Spark-SQL/input/user.json")

    // 注册UDF
    spark.udf.register("addName", (x: String) => "Name:" + x)

    df.createOrReplaceTempView("people")
    spark.sql("select addName(username), age from people").show()

    spark.stop()
  }
}
UDAF(自定义聚合函数)
强类型的 Dataset 和弱类型的 DataFrame 都提供了相关的聚合函数, 如 count(),
countDistinct(),avg(),max(),min()。除此之外,用户可以设定自己的自定义聚合函数。Spark3.0之前我们使用的是UserDefinedAggregateFunction作为自定义聚合函数,从 Spark3.0 版本后可以统一采用强类型聚合函数 Aggregator

实验需求:计算平均工资
实现方式一:RDD
import org.apache.spark.{SparkConf, SparkContext}

object value22 {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf()
     .setAppName("app")
     .setMaster("local[*]")
    val sc = new SparkContext(sparkConf)

    val resRDD = sc.parallelize(List(("zhangsan", 20), ("lisi", 30), ("wangwu", 40)))
     .map {
        case (name, salary) => (salary, 1)
      }
     .reduce {
        case ((t1Salary, t1Count), (t2Salary, t2Count)) =>
          (t1Salary + t2Salary, t1Count + t2Count)
      }

    println(resRDD._1 / resRDD._2.toDouble)

    sc.stop()
  }
}

实现方式二:强类型UDAF
import org.apache.spark.sql.Encoder
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.Encoders
import org.apache.spark.sql.functions
import org.apache.spark.sql.SparkSession
import org.apache.spark.SparkConf

case class Buff(var sum: Long, var cnt: Long)

class MyAverageUDAF extends Aggregator[Long, Buff, Double] {
  override def zero: Buff = Buff(0, 0)

  override def reduce(b: Buff, a: Long): Buff = {
    b.sum += a
    b.cnt += 1
    b
  }

  override def merge(b1: Buff, b2: Buff): Buff = {
    b1.sum += b2.sum
    b1.cnt += b2.cnt
    b1
  }

  override def finish(reduction: Buff): Double = {
    reduction.sum.toDouble / reduction.cnt
  }

  override def bufferEncoder: Encoder[Buff] = Encoders.product
  override def outputEncoder: Encoder[Double] = Encoders.scalaDouble
}

object value23 {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName("app").setMaster("local[*]")
    val spark = SparkSession.builder().config(sparkConf).getOrCreate()

    import spark.implicits._
    val res = spark.sparkContext.makeRDD(List(("zhangsan", 20), ("lisi", 30), ("wangwu", 40)))
    val df = res.toDF("name", "salary")
    df.createOrReplaceTempView("user")

    val myAverage = new MyAverageUDAF
    spark.udf.register("avgSalary", functions.udaf(myAverage))
    spark.sql("select avgSalary(salary) from user").show()

    spark.stop()
  }
}

相关文章:

  • Maven下载aspose依赖失败的解决方法
  • BeeWorks Meet更适合企业内部使用的原因
  • Linux中线程池的简单实现 -- 线程安全的日志模块,策略模式,线程池的封装设计,单例模式,饿汉式单例模式,懒汉式单例模式
  • streamlit实现非原生的按钮触发效果 + flask实现带信息的按钮触发
  • 前端浏览器窗口交互完全指南:从基础操作到高级控制
  • 论文导读 - 基于大规模测量与多任务深度学习的电子鼻系统实现目标识别、浓度预测与状态判断
  • [计算机科学#3]:布尔逻辑 (计算机数学基础)
  • 【中级软件设计师】编译和解释程序的翻译阶段、符号表 (附软考真题)
  • Lua 第10部分 模式匹配
  • 【嵌入式八股22】排序算法与哈希算法
  • 辞九门回忆
  • windows安装docker,发现没有hyper
  • WSL2里手动安装Docker 遇坑
  • 14【模块学习】74HC595:使用学习
  • SpringMVC 前后端数据交互 中文乱码
  • 微服务基础-Ribbon
  • 同样开源的自动化工作流工具n8n和Dify对比
  • 从零搭建云原生后端系统 —— 一次真实项目实践分享
  • 迷你世界UGC3.0脚本Wiki触发器脚本交互
  • 云原生--核心组件-容器篇-4-认识Dockerfile文件(镜像创建的基础文件和指令介绍)
  • 人社部:将会同更多部门分行业、分领域制定专项培训计划
  • 财政部下达农业生产防灾救灾资金3.76亿元,支持黄淮海等地抗旱保春播
  • 上海首个航空前置货站落户松江综合保税区,通关效率可提升30%
  • 保时捷中国研发中心落户上海虹桥商务区,计划下半年投入运营
  • 解放军仪仗司礼大队参加越南纪念南方解放50周年庆典活动
  • 印巴在克什米尔实控线附近小规模交火,巴防长发出“全面战争”警告