当前位置: 首页 > news >正文

Spark-SQL 项目

一、项目概述

(一)实验目标

  1. 统计有效数据条数:筛选出uid、phone、addr三个字段均无空值的记录并计数。
  2. 提取用户数量最多的前 20 个地址:按地址分组统计用户数,按降序排序后取前 20 名。

(二)数据说明

  1. 数据格式
    • 输入数据为 JSON 格式,字段包括uid(用户 ID)、phone(电话号码)、addr(地址)。
    • 数据特点:部分记录存在格式不规范问题(如单引号混用、字段值缺失、地址格式不统一,例如 “江苏省 苏州”“广东省 中山” 等),需先清洗转换。
    • 示例数据

json

{"uid":"1000166111","phone":"17703771999","addr":"河南省 南阳"}

{"uid":"1000432103","phone":"15388889881","addr":"云南省 昆明"}

  1. 有效数据定义
    同时满足以下条件的记录:
    • uid不为空(非null且非空字符串);
    • phone不为空(非null且非空字符串);
    • addr不为空(非null且非空字符串)。

二、实验准备

(一)环境配置

  1. 软件依赖
    • Spark 3.x+(需启用 Hive 支持以使用get_json_object函数);
    • 编程语言:Scala/Python(本文以 Scala 为例,Python 代码可通过 PySpark 实现);
    • 配置文件:确保spark.sql.warehouse.dir指向 HDFS 或本地路径(如hdfs://node01:9000/user/hive/warehouse)。
  2. 数据准备
    • 将 JSON 数据保存为文件(如user_data.json),确保每行一个 JSON 对象;
    • 若存在格式错误(如单引号),先用文本处理工具(如sed 's/\'/"/g')统一为双引号。

三、数据处理流程

(一)数据读取与格式转换

1. 读取原始数据

使用 Spark 的 JSON 数据源直接加载数据,自动推断 Schema:

scala

val rawDF = spark.read.json("path/to/user_data.json")

rawDF.printSchema() // 检查字段是否正确解析(可能因格式问题导致字段类型为String)

2. 字段提取与清洗

通过get_json_object函数(Spark SQL 内置函数)解析 JSON 字段,处理不规范格式:

scala

// 方法1:Spark SQL语句(推荐,清晰易读)

rawDF.createOrReplaceTempView("raw_data")

val parsedDF = spark.sql("""

SELECT

get_json_object(raw_data.data, '$.uid') AS uid, -- 提取uid

get_json_object(raw_data.data, '$.phone') AS phone, -- 提取phone

trim(get_json_object(raw_data.data, '$.addr')) AS addr -- 提取addr并去除前后空格

FROM raw_data

""")

// 方法2:DataFrame API(适合编程式处理)

import org.apache.spark.sql.functions.expr

val parsedDF = rawDF.select(

expr("get_json_object(data, '$.uid')").as("uid"),

expr("get_json_object(data, '$.phone')").as("phone"),

expr("trim(get_json_object(data, '$.addr'))").as("addr")

)

(二)统计有效数据条数

1. 筛选有效数据

过滤掉任一字段为空的记录:

scala

val validDF = parsedDF.filter(

col("uid").isNotNull &&

col("phone").isNotNull &&

col("addr").isNotNull

)

或通过SQL语句:

spark.sql("SELECT * FROM parsed_data WHERE uid IS NOT NULL AND phone IS NOT NULL AND addr IS NOT NULL")

2. 计数

scala

val validCount = validDF.count()

println(s"有效数据条数:$validCount")

或通过SQL返回结果:

spark.sql("SELECT COUNT(*) AS valid_data_count FROM valid_data").show()

(三)统计用户数量最多的前 20 个地址

1. 分组聚合

按addr分组,统计每个地址的用户数(直接使用count(*),因uid唯一,也可count(DISTINCT uid),需根据业务需求选择):

scala

val addrGroupDF = validDF.groupBy("addr").count().withColumnRenamed("count", "user_count")

2. 排序与筛选

按用户数降序排序,取前 20 条:

scala

val top20Addresses = addrGroupDF.orderBy(desc("user_count")).limit(20)

top20Addresses.show(false) // 展示结果,地址不换行

3. SQL 完整实现

spark.sql("""

SELECT

addr,

COUNT(*) AS user_count-- 或COUNT(DISTINCT uid)去重统计

FROM valid_data

GROUP BY addr

ORDER BY user_count DESC

LIMIT 20

""").show()

五、扩展与优化建议

(一)数据清洗增强

  1. 地址标准化:使用正则表达式或自定义函数清洗地址(如 “江苏省苏州” 统一为 “江苏省苏州市”);
  2. 手机号格式校验:添加正则表达式过滤无效手机号(如^1[3-9]\d{9}$)。

(二)性能优化

  1. 分区与缓存:对大数据集使用repartition分区,对高频访问的中间表(如validDF)调用cache();
  2. 列式存储:将结果数据保存为 Parquet 格式(validDF.write.parquet("output/valid_data")),提升后续查询效率。

(三)结果输出

将最终结果导出到 HDFS、本地文件或数据库:

scala

top20Addresses.write

.mode("overwrite")

.csv("output/top20_addresses") // 保存为CSV文件

相关文章:

  • 爬虫(requests库,logging库)
  • react 父子组件通信 子 直接到父, 父 forwardref子
  • window上 elasticsearch v9.0 与 jmeter5.6.3版本 冲突,造成es 启动失败
  • 关于在Springboot中设置时间格式问题
  • Git -> Git 所有提交阶段的回滚操作
  • 测试-时间规模化定律可以改进世界基础模型吗?
  • [Java · 铢积寸累] 数据结构 — 二维数组 - 概念引入
  • 【YOLOv8-pose部署至RK3588】模型训练→转换RKNN→开发板部署
  • docker保存镜像到本地
  • AutoJs相关学习
  • Spring Boot中`logging.config`配置项的详解及使用说明
  • Vscode指定缓存路径 .vscode 路径
  • 嘻游组件解密工具实战教程:资源解包与UI替换全流程
  • Java从入门到“放弃”(精通)之旅——抽象类和接口⑨
  • Linux新手快速入门指南
  • XML内容解析成实体类
  • 【Python笔记 03 】运算符
  • 基于大疆行业无人机的特色解决方案-无线通信篇:基于蜂窝以及自组网MESH的无线通信C2链路
  • 一文详解Pytorch环境搭建:Mac电脑pip安装Pytorch开发环境
  • 空间数据工程——如何使用 Python 和 ArcPy 对 Vision Zero 多边形的值进行地理处理
  • 夜读丨修车与“不凑合”
  • 俄外长拉夫罗夫将出席金砖国家外长会
  • 低轨卫星“千帆星座”已完成五批次组网卫星发射,未来还有这些计划
  • 泽连斯基提议乌俄“立即、全面和无条件”停火
  • 魔都眼·上海车展③ |被外籍展商围观的国产品牌
  • 这场宣介会,重庆市委书记和中联部部长同台为外宾答疑解惑