当前位置: 首页 > news >正文

【无标题】Spark-SQL编程(2)

以下是今天学习的知识点以及代码测试:

Spark-SQL核心编程(四)

实验内容:

利用IDEA开发Spark-SQL。

实验步骤:

利用IDEA开发Spark-SQL

  1. 创建子模块Spark-SQL,并添加依赖

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.12</artifactId>
    <version>3.0.0</version>
</dependency>

  1. 创建Spark-SQL的测试代码:

import org.apache.spark.SparkConf

import org.apache.spark.sql.{SparkSession, DataFrame, Dataset}

import org.apache.spark.rdd.RDD

import org.apache.spark.sql.Row

case class User(id:Int, name:String, age:Int)

object SparkSQLDemo {

  def main(args: Array[String]): Unit = {

    // 创建上下文环境配置对象

    val sparkConf = new SparkConf()

      .setMaster("local[*]")

      .setAppName("SQLDemo")

      .set("spark.some.config.option", "some-value") // 可选配置

    // 创建SparkSession对象

    val spark: SparkSession = SparkSession.builder()

      .config(sparkConf)

      .getOrCreate()

    import spark.implicits._

    // 1. 读取json文件

    val df: DataFrame = spark.read.json("F:\\bigdata\\spark\\2.4.7\\bin\\data\\user.json")

    println("原始数据:")

    df.show()

    // 2. SQL风格语法

    df.createOrReplaceTempView("user")

    println("SQL查询所有数据:")

    spark.sql("SELECT * FROM user").show()

    println("SQL计算平均年龄:")

    spark.sql("SELECT avg(age) as avg_age FROM user").show()

    // 3. DSL风格语法

    println("DSL查询部分字段:")

    df.select($"username", $"age").show()

    println("DSL过滤查询:")

    df.filter($"age" > 25).show()

    // 4. RDD <=> DataFrame <=> DataSet 转换

    // 4.1 RDD => DataFrame => DataSet

    println("RDD => DataFrame => DataSet 转换:")

    val rdd1: RDD[(Int, String, Int)] = spark.sparkContext.makeRDD(

      List((1, "zhangsan", 30), (2, "lisi", 40), (3, "wangwu", 25))

    )

    // RDD转DataFrame

    val df1: DataFrame = rdd1.toDF("id", "name", "age")

    println("RDD转DataFrame:")

    df1.show()

    // DataFrame转DataSet

    val ds1: Dataset[User] = df1.as[User]

    println("DataFrame转DataSet:")

    ds1.show()

    // 4.2 DataSet => DataFrame => RDD

    println("DataSet => DataFrame => RDD 转换:")

    val df2 = ds1.toDF()

    println("DataSet转DataFrame:")

    df2.show()

    val rdd2: RDD[Row] = df2.rdd

    println("DataFrame转RDD并打印:")

    rdd2.foreach(row => println(s"Row: id=${row.getInt(0)}, name=${row.getString(1)}, age=${row.getInt(2)}"))

    // 4.3 直接RDD转DataSet

    println("直接RDD转DataSet:")

    rdd1.map {

      case (id, name,age) => User(id, name, age)

    }.toDS().show()

    // 4.4 DataSet转RDD

    val rdd3 = ds1.rdd

    println("DataSet转RDD并打印字段:")

    rdd3.foreach(user => println(s"User: id=${user.id}, name=${user.name}, age=${user.age}"))

    // 5. 聚合操作示例

    println("聚合操作示例:")

    ds1.groupBy("age").count().show()

    // 6. 类型安全的DataSet操作

    println("类型安全的DataSet操作:")

    ds1.filter(_.age > 30).show()

    // 关闭SparkSession

    spark.stop()

  }

}

第六节 Spark-SQL核心编程(五)

自定义函数:

UDF:

val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SQLDemo")
//创建SparkSession对象
val spark :SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()

import spark.implicits._
//读取json文件
val df : DataFrame = spark.read.json("Spark-SQL/input/user.json")

spark.udf.register("addName",(x:String)=>"Name:"+x)

df.createOrReplaceTempView("people")
spark.sql("select addName(username),age from people").show()

spark.stop()

UDAF(自定义聚合函数)

强类型的 Dataset 和弱类型的 DataFrame 都提供了相关的聚合函数, 如 count(),

countDistinct(),avg(),max(),min()。除此之外,用户可以设定自己的自定义聚合函数。Spark3.0之前我们使用的是UserDefinedAggregateFunction作为自定义聚合函数,从 Spark3.0 版本后可以统一采用强类型聚合函数 Aggregator

实验需求:计算平均工资

实现方式一:RDD

import org.apache.spark.{SparkConf, SparkContext}

object RDD{

  def main(args: Array[String]): Unit = {

    // 1. 创建Spark配置和上下文

    val sparkConf: SparkConf = new SparkConf()

      .setAppName("SalaryAverageCalculation")

      .setMaster("local[*]")  // 使用本地所有可用核

    val sc: SparkContext = new SparkContext(sparkConf)

    try {

      // 2. 创建RDD并计算平均工资

      val resRDD: (Int, Int) = sc.makeRDD(List(("zhangsan", 20), ("lisi", 30), ("wangwu", 40)))

        .map {

          case (name, salary) => {

            (salary, 1)  // 转换为(工资, 计数)的元组

          }

        }

        .reduce {

          (t1, t2) => {

            (t1._1 + t2._1, t1._2 + t2._2)  // 累加工资总和和人数

          }

        }

      // 3. 计算并打印平均工资

      if (resRDD._2 != 0) {

        val averageSalary = resRDD._1.toDouble / resRDD._2

        println(f"总工资: ${resRDD._1}")

        println(f"总人数: ${resRDD._2}")

        println(f"平均工资: $averageSalary%.2f")  // 保留两位小数

      } else {

        println("警告:没有有效数据计算平均工资")

      }

    } finally {

      // 4. 确保无论如何都关闭SparkContext

      sc.stop()

    }

  }

}

实现方式:强类型UDAF

import org.apache.spark.SparkConf

import org.apache.spark.sql.SparkSession

import org.apache.spark.sql.Encoder

import org.apache.spark.sql.Encoders

import org.apache.spark.sql.functions

import org.apache.spark.rdd.RDD

import org.apache.spark.sql.DataFrame

// 定义一个用于存储总和和计数的缓冲区类

case class Buff(var sum: Long, var cnt: Long)

// 自定义聚合函数类,继承自 Aggregator

class MyAverageUDAF extends org.apache.spark.sql.expressions.Aggregator[Long, Buff, Double] {

  // 初始化缓冲区

  override def zero: Buff = Buff(0, 0)

  // 合并输入值到缓冲区

  override def reduce(b: Buff, a: Long): Buff = {

    b.sum += a

    b.cnt += 1

    b

  }

  // 合并两个缓冲区

  override def merge(b1: Buff, b2: Buff): Buff = {

    b1.sum += b2.sum

    b1.cnt += b2.cnt

    b1

  }

  // 完成聚合操作,计算最终结果

  override def finish(reduction: Buff): Double = {

    reduction.sum.toDouble / reduction.cnt

  }

  // 定义缓冲区的编码器

  override def bufferEncoder: Encoder[Buff] = Encoders.product

  // 定义输出结果的编码器

  override def outputEncoder: Encoder[Double] = Encoders.scalaDouble

}

object Cyy {

  def main(args: Array[String]): Unit = {

    // 创建 SparkConf 对象,设置应用名称和运行模式

    val sparkconf: SparkConf = new SparkConf().setAppName("app").setMaster("local[*]")

    // 创建 SparkSession 对象

    val spark: SparkSession = SparkSession.builder().config(sparkconf).getOrCreate()

    import spark.implicits._

    // 创建 RDD 并存储人员姓名和工资信息

    val res: RDD[(String, Int)] = spark.sparkContext.makeRDD(List(("zhangsan", 20), ("lisi", 30), ("wangwu", 40)))

    // 将 RDD 转换为 DataFrame

    val df: DataFrame = res.toDF("name", "salary")

    // 在 Spark 中注册临时视图

    df.createOrReplaceTempView("user")

    // 创建自定义聚合函数实例

    var myAverage = new MyAverageUDAF

    // 在 Spark 中注册聚合函数

    spark.udf.register("avgSalary", functions.udaf(myAverage))

    // 执行 SQL 查询,计算平均工资并显示结果

    spark.sql("select avgSalary(salary) from user").show()

    // 关闭 SparkSession 连接

    spark.stop()

  }

}

    

相关文章:

  • 玩转Docker | 使用Docker部署Xnote笔记工具
  • 从Gradio App创建Discord Bot/Slack Bot/Website Widget(2)——从Gradio App创建Slack Bot
  • 智谱开源 9B/32B 系列模型,性价比超 DeepSeek-R1,Z.ai 平台上线
  • 疾控01-实验室信息管理系统需求分析
  • 2025.4.15六年之约day11
  • linux0.11内核源码修仙传第十三章——进程调度之fork函数
  • 用DeepSeek AI高效制作专业PPT
  • DES对称加密算法实操(python)
  • 入门-C编程基础部分:4、数据类型
  • 【力扣】day1
  • 第十一章 网络编程
  • 【设计模式】适配器模式:让不兼容的接口和谐共处
  • java开发中的设计模式之工厂模式
  • 设计模式:命令模式-解耦请求与执行的完美方案
  • DB-GPT 最新0.7.0版本Windows 部署
  • Differentiable Micro-Mesh Construction 论文阅读
  • 龙虎榜——20250415
  • centos时间不正确解决
  • GPTNet如何革新创意与效率
  • 本地实现Rtsp视频流推送
  • 新加坡选情渐热:播客、短视频各显神通,总理反对身份政治
  • “梅花奖”快闪走入上海张园,朱洁静在石库门前起舞
  • 马上评丨马拉松“方便门”被处罚,是一针清醒剂
  • 广西北海市人大常委会副主任李安洪已兼任合浦县委书记
  • 习近平同肯尼亚总统鲁托会谈
  • 看正背面月壤、听火星上的声音,记者探营“中国航天日”科普展