【无标题】Spark-SQL编程(2)
以下是今天学习的知识点以及代码测试:
Spark-SQL核心编程(四)
实验内容:
利用IDEA开发Spark-SQL。
实验步骤:
利用IDEA开发Spark-SQL
- 创建子模块Spark-SQL,并添加依赖
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.0.0</version>
</dependency>
- 创建Spark-SQL的测试代码:
import org.apache.spark.SparkConf
import org.apache.spark.sql.{SparkSession, DataFrame, Dataset}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Row
case class User(id:Int, name:String, age:Int)
object SparkSQLDemo {
def main(args: Array[String]): Unit = {
// 创建上下文环境配置对象
val sparkConf = new SparkConf()
.setMaster("local[*]")
.setAppName("SQLDemo")
.set("spark.some.config.option", "some-value") // 可选配置
// 创建SparkSession对象
val spark: SparkSession = SparkSession.builder()
.config(sparkConf)
.getOrCreate()
import spark.implicits._
// 1. 读取json文件
val df: DataFrame = spark.read.json("F:\\bigdata\\spark\\2.4.7\\bin\\data\\user.json")
println("原始数据:")
df.show()
// 2. SQL风格语法
df.createOrReplaceTempView("user")
println("SQL查询所有数据:")
spark.sql("SELECT * FROM user").show()
println("SQL计算平均年龄:")
spark.sql("SELECT avg(age) as avg_age FROM user").show()
// 3. DSL风格语法
println("DSL查询部分字段:")
df.select($"username", $"age").show()
println("DSL过滤查询:")
df.filter($"age" > 25).show()
// 4. RDD <=> DataFrame <=> DataSet 转换
// 4.1 RDD => DataFrame => DataSet
println("RDD => DataFrame => DataSet 转换:")
val rdd1: RDD[(Int, String, Int)] = spark.sparkContext.makeRDD(
List((1, "zhangsan", 30), (2, "lisi", 40), (3, "wangwu", 25))
)
// RDD转DataFrame
val df1: DataFrame = rdd1.toDF("id", "name", "age")
println("RDD转DataFrame:")
df1.show()
// DataFrame转DataSet
val ds1: Dataset[User] = df1.as[User]
println("DataFrame转DataSet:")
ds1.show()
// 4.2 DataSet => DataFrame => RDD
println("DataSet => DataFrame => RDD 转换:")
val df2 = ds1.toDF()
println("DataSet转DataFrame:")
df2.show()
val rdd2: RDD[Row] = df2.rdd
println("DataFrame转RDD并打印:")
rdd2.foreach(row => println(s"Row: id=${row.getInt(0)}, name=${row.getString(1)}, age=${row.getInt(2)}"))
// 4.3 直接RDD转DataSet
println("直接RDD转DataSet:")
rdd1.map {
case (id, name,age) => User(id, name, age)
}.toDS().show()
// 4.4 DataSet转RDD
val rdd3 = ds1.rdd
println("DataSet转RDD并打印字段:")
rdd3.foreach(user => println(s"User: id=${user.id}, name=${user.name}, age=${user.age}"))
// 5. 聚合操作示例
println("聚合操作示例:")
ds1.groupBy("age").count().show()
// 6. 类型安全的DataSet操作
println("类型安全的DataSet操作:")
ds1.filter(_.age > 30).show()
// 关闭SparkSession
spark.stop()
}
}
第六节 Spark-SQL核心编程(五)
自定义函数:
UDF:
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SQLDemo")
//创建SparkSession对象
val spark :SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
import spark.implicits._
//读取json文件
val df : DataFrame = spark.read.json("Spark-SQL/input/user.json")
spark.udf.register("addName",(x:String)=>"Name:"+x)
df.createOrReplaceTempView("people")
spark.sql("select addName(username),age from people").show()
spark.stop()
UDAF(自定义聚合函数)
强类型的 Dataset 和弱类型的 DataFrame 都提供了相关的聚合函数, 如 count(),
countDistinct(),avg(),max(),min()。除此之外,用户可以设定自己的自定义聚合函数。Spark3.0之前我们使用的是UserDefinedAggregateFunction作为自定义聚合函数,从 Spark3.0 版本后可以统一采用强类型聚合函数 Aggregator
实验需求:计算平均工资
实现方式一:RDD
import org.apache.spark.{SparkConf, SparkContext}
object RDD{
def main(args: Array[String]): Unit = {
// 1. 创建Spark配置和上下文
val sparkConf: SparkConf = new SparkConf()
.setAppName("SalaryAverageCalculation")
.setMaster("local[*]") // 使用本地所有可用核
val sc: SparkContext = new SparkContext(sparkConf)
try {
// 2. 创建RDD并计算平均工资
val resRDD: (Int, Int) = sc.makeRDD(List(("zhangsan", 20), ("lisi", 30), ("wangwu", 40)))
.map {
case (name, salary) => {
(salary, 1) // 转换为(工资, 计数)的元组
}
}
.reduce {
(t1, t2) => {
(t1._1 + t2._1, t1._2 + t2._2) // 累加工资总和和人数
}
}
// 3. 计算并打印平均工资
if (resRDD._2 != 0) {
val averageSalary = resRDD._1.toDouble / resRDD._2
println(f"总工资: ${resRDD._1}")
println(f"总人数: ${resRDD._2}")
println(f"平均工资: $averageSalary%.2f") // 保留两位小数
} else {
println("警告:没有有效数据计算平均工资")
}
} finally {
// 4. 确保无论如何都关闭SparkContext
sc.stop()
}
}
}
实现方式二:强类型UDAF
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.Encoder
import org.apache.spark.sql.Encoders
import org.apache.spark.sql.functions
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.DataFrame
// 定义一个用于存储总和和计数的缓冲区类
case class Buff(var sum: Long, var cnt: Long)
// 自定义聚合函数类,继承自 Aggregator
class MyAverageUDAF extends org.apache.spark.sql.expressions.Aggregator[Long, Buff, Double] {
// 初始化缓冲区
override def zero: Buff = Buff(0, 0)
// 合并输入值到缓冲区
override def reduce(b: Buff, a: Long): Buff = {
b.sum += a
b.cnt += 1
b
}
// 合并两个缓冲区
override def merge(b1: Buff, b2: Buff): Buff = {
b1.sum += b2.sum
b1.cnt += b2.cnt
b1
}
// 完成聚合操作,计算最终结果
override def finish(reduction: Buff): Double = {
reduction.sum.toDouble / reduction.cnt
}
// 定义缓冲区的编码器
override def bufferEncoder: Encoder[Buff] = Encoders.product
// 定义输出结果的编码器
override def outputEncoder: Encoder[Double] = Encoders.scalaDouble
}
object Cyy {
def main(args: Array[String]): Unit = {
// 创建 SparkConf 对象,设置应用名称和运行模式
val sparkconf: SparkConf = new SparkConf().setAppName("app").setMaster("local[*]")
// 创建 SparkSession 对象
val spark: SparkSession = SparkSession.builder().config(sparkconf).getOrCreate()
import spark.implicits._
// 创建 RDD 并存储人员姓名和工资信息
val res: RDD[(String, Int)] = spark.sparkContext.makeRDD(List(("zhangsan", 20), ("lisi", 30), ("wangwu", 40)))
// 将 RDD 转换为 DataFrame
val df: DataFrame = res.toDF("name", "salary")
// 在 Spark 中注册临时视图
df.createOrReplaceTempView("user")
// 创建自定义聚合函数实例
var myAverage = new MyAverageUDAF
// 在 Spark 中注册聚合函数
spark.udf.register("avgSalary", functions.udaf(myAverage))
// 执行 SQL 查询,计算平均工资并显示结果
spark.sql("select avgSalary(salary) from user").show()
// 关闭 SparkSession 连接
spark.stop()
}
}