Spark-SQL 项目
一、项目概述
(一)实验目标
- 统计有效数据条数:筛选出uid、phone、addr三个字段均无空值的记录并计数。
- 提取用户数量最多的前 20 个地址:按地址分组统计用户数,按降序排序后取前 20 名。
(二)数据说明
- 数据格式
- 输入数据为 JSON 格式,字段包括uid(用户 ID)、phone(电话号码)、addr(地址)。
- 数据特点:部分记录存在格式不规范问题(如单引号混用、字段值缺失、地址格式不统一,例如 “江苏省 苏州”“广东省 中山” 等),需先清洗转换。
- 示例数据:
json
{"uid":"1000166111","phone":"17703771999","addr":"河南省 南阳"}
{"uid":"1000432103","phone":"15388889881","addr":"云南省 昆明"}
- 有效数据定义
同时满足以下条件的记录:- uid不为空(非null且非空字符串);
- phone不为空(非null且非空字符串);
- addr不为空(非null且非空字符串)。
二、实验准备
(一)环境配置
- 软件依赖
- Spark 3.x+(需启用 Hive 支持以使用get_json_object函数);
- 编程语言:Scala/Python(本文以 Scala 为例,Python 代码可通过 PySpark 实现);
- 配置文件:确保spark.sql.warehouse.dir指向 HDFS 或本地路径(如hdfs://node01:9000/user/hive/warehouse)。
- 数据准备
- 将 JSON 数据保存为文件(如user_data.json),确保每行一个 JSON 对象;
- 若存在格式错误(如单引号),先用文本处理工具(如sed 's/\'/"/g')统一为双引号。
三、数据处理流程
(一)数据读取与格式转换
1. 读取原始数据
使用 Spark 的 JSON 数据源直接加载数据,自动推断 Schema:
scala
val rawDF = spark.read.json("path/to/user_data.json")
rawDF.printSchema() // 检查字段是否正确解析(可能因格式问题导致字段类型为String)
2. 字段提取与清洗
通过get_json_object函数(Spark SQL 内置函数)解析 JSON 字段,处理不规范格式:
scala
// 方法1:Spark SQL语句(推荐,清晰易读)
rawDF.createOrReplaceTempView("raw_data")
val parsedDF = spark.sql("""
SELECT
get_json_object(raw_data.data, '$.uid') AS uid, -- 提取uid
get_json_object(raw_data.data, '$.phone') AS phone, -- 提取phone
trim(get_json_object(raw_data.data, '$.addr')) AS addr -- 提取addr并去除前后空格
FROM raw_data
""")
// 方法2:DataFrame API(适合编程式处理)
import org.apache.spark.sql.functions.expr
val parsedDF = rawDF.select(
expr("get_json_object(data, '$.uid')").as("uid"),
expr("get_json_object(data, '$.phone')").as("phone"),
expr("trim(get_json_object(data, '$.addr'))").as("addr")
)
(二)统计有效数据条数
1. 筛选有效数据
过滤掉任一字段为空的记录:
scala
val validDF = parsedDF.filter(
col("uid").isNotNull &&
col("phone").isNotNull &&
col("addr").isNotNull
)
或通过SQL语句:
spark.sql("SELECT * FROM parsed_data WHERE uid IS NOT NULL AND phone IS NOT NULL AND addr IS NOT NULL")
2. 计数
scala
val validCount = validDF.count()
println(s"有效数据条数:$validCount")
或通过SQL返回结果:
spark.sql("SELECT COUNT(*) AS valid_data_count FROM valid_data").show()
(三)统计用户数量最多的前 20 个地址
1. 分组聚合
按addr分组,统计每个地址的用户数(直接使用count(*),因uid唯一,也可count(DISTINCT uid),需根据业务需求选择):
scala
val addrGroupDF = validDF.groupBy("addr").count().withColumnRenamed("count", "user_count")
2. 排序与筛选
按用户数降序排序,取前 20 条:
scala
val top20Addresses = addrGroupDF.orderBy(desc("user_count")).limit(20)
top20Addresses.show(false) // 展示结果,地址不换行
3. SQL 完整实现
spark.sql("""
SELECT
addr,
COUNT(*) AS user_count-- 或COUNT(DISTINCT uid)去重统计
FROM valid_data
GROUP BY addr
ORDER BY user_count DESC
LIMIT 20
""").show()
五、扩展与优化建议
(一)数据清洗增强
- 地址标准化:使用正则表达式或自定义函数清洗地址(如 “江苏省苏州” 统一为 “江苏省苏州市”);
- 手机号格式校验:添加正则表达式过滤无效手机号(如^1[3-9]\d{9}$)。
(二)性能优化
- 分区与缓存:对大数据集使用repartition分区,对高频访问的中间表(如validDF)调用cache();
- 列式存储:将结果数据保存为 Parquet 格式(validDF.write.parquet("output/valid_data")),提升后续查询效率。
(三)结果输出
将最终结果导出到 HDFS、本地文件或数据库:
scala
top20Addresses.write
.mode("overwrite")
.csv("output/top20_addresses") // 保存为CSV文件