Spark SQL开发实战:从IDEA环境搭建到UDF/UDAF自定义函数实现
利用IDEA开发Spark-SQL
1、创建子模块Spark-SQL,并添加依赖
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.0.0</version>
</dependency>
3、创建Spark-SQL的测试代码:
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
case class User(id: Int, name: String, age: Int)
object value20 {
def main(args: Array[String]): Unit = {
// 创建上下文环境配置对象
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SQLDemo")
// 创建SparkSession对象
val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
import spark.implicits._
try {
// 读取json文件
val df: DataFrame = spark.read.json("Spark-SQL/input/user.json")
df.show()
// SQL风格语法
df.createOrReplaceTempView("user")
spark.sql("select * from user").show()
spark.sql("select avg(age) from user").show()
// DSL风格语法
df.select("username", "age").show()
// RDD=>DataFrame=>DataSet
// RDD
val rdd1: RDD[(Int, String, Int)] = spark.sparkContext.makeRDD(
List((1, "zhangsan", 30), (2, "lisi", 40))
)
// DataFrame
val df1: DataFrame = rdd1.toDF("id", "name", "age")
df1.show()
//DataSet
val ds1: Dataset[User] = df1.as[User]
ds1.show()
// DataSet=>DataFrame=>RDD
val df2 = ds1.toDF()
df2.show()
val rdd2: RDD[Row] = df2.rdd
rdd2.foreach(a => println(a.getString(1)))
rdd1.map {
case (id, name, age) => User(id, name, age)
}.toDS().show()
val rdd3 = ds1.rdd
rdd3.foreach { a =>
println(a.age)
println(a.id)
println(a.name)
}
} catch {
case e: Exception =>
println(s"发生异常: ${e.getMessage}")
} finally {
spark.stop()
}
}
}
自定义函数:
UDF:
import org.apache.spark.sql.SparkSession
object value21 {
def main(args: Array[String]): Unit = {
// 创建SparkConf对象
val sparkConf = new org.apache.spark.SparkConf()
.setMaster("local[*]")
.setAppName("SQLDemo")
// 创建SparkSession对象
val spark = SparkSession.builder()
.config(sparkConf)
.getOrCreate()
import spark.implicits._
// 读取json文件
val df = spark.read.json("Spark-SQL/input/user.json")
// 注册UDF
spark.udf.register("addName", (x: String) => "Name:" + x)
df.createOrReplaceTempView("people")
spark.sql("select addName(username), age from people").show()
spark.stop()
}
}
UDAF(自定义聚合函数)
强类型的 Dataset 和弱类型的 DataFrame 都提供了相关的聚合函数, 如 count(),
countDistinct(),avg(),max(),min()。除此之外,用户可以设定自己的自定义聚合函数。Spark3.0之前我们使用的是UserDefinedAggregateFunction作为自定义聚合函数,从 Spark3.0 版本后可以统一采用强类型聚合函数 Aggregator
实验需求:计算平均工资
实现方式一:RDD
import org.apache.spark.{SparkConf, SparkContext}
object value22 {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf()
.setAppName("app")
.setMaster("local[*]")
val sc = new SparkContext(sparkConf)
val resRDD = sc.parallelize(List(("zhangsan", 20), ("lisi", 30), ("wangwu", 40)))
.map {
case (name, salary) => (salary, 1)
}
.reduce {
case ((t1Salary, t1Count), (t2Salary, t2Count)) =>
(t1Salary + t2Salary, t1Count + t2Count)
}
println(resRDD._1 / resRDD._2.toDouble)
sc.stop()
}
}
实现方式二:强类型UDAF
import org.apache.spark.sql.Encoder
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.Encoders
import org.apache.spark.sql.functions
import org.apache.spark.sql.SparkSession
import org.apache.spark.SparkConf
case class Buff(var sum: Long, var cnt: Long)
class MyAverageUDAF extends Aggregator[Long, Buff, Double] {
override def zero: Buff = Buff(0, 0)
override def reduce(b: Buff, a: Long): Buff = {
b.sum += a
b.cnt += 1
b
}
override def merge(b1: Buff, b2: Buff): Buff = {
b1.sum += b2.sum
b1.cnt += b2.cnt
b1
}
override def finish(reduction: Buff): Double = {
reduction.sum.toDouble / reduction.cnt
}
override def bufferEncoder: Encoder[Buff] = Encoders.product
override def outputEncoder: Encoder[Double] = Encoders.scalaDouble
}
object value23 {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("app").setMaster("local[*]")
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
import spark.implicits._
val res = spark.sparkContext.makeRDD(List(("zhangsan", 20), ("lisi", 30), ("wangwu", 40)))
val df = res.toDF("name", "salary")
df.createOrReplaceTempView("user")
val myAverage = new MyAverageUDAF
spark.udf.register("avgSalary", functions.udaf(myAverage))
spark.sql("select avgSalary(salary) from user").show()
spark.stop()
}
}