当前位置: 首页 > news >正文

PySpark中DataFrame应用升阶及UDF使用

目录

  • 1. 加载数据
  • 2. 列常见操作
    • 2.1 添加新列
    • 2.2 重命名列
    • 2.3 删除指定列
    • 2.4 修改数据
  • 3 空值处理
    • 3.1 丢弃空值
    • 3.2 空值填充
  • 4 聚合操作
    • 4.1 分组聚合
  • 5 用户自定义函数(UDF)
    • 5.1 传统UDF函数
    • 5.2 Pandas UDF(向量化UDF)
  • 参考资料

在这里插入图片描述



import findspark
findspark.init() 
from pyspark.sql import SparkSession
# 创建 Spark 会话
spark = SparkSession.builder \.appName("Test PySpark") \.master("local[*]") \.getOrCreate()
sc=spark.sparkContext
sc.master
'local[*]'

1. 加载数据

mes_df = spark.read.csv('spark练习数据.csv',inferSchema=True,header=True)
mes_df.columns
['销售年份','车辆用途','电池形状','动力类型','电池类型','行驶里程(km)','容量保持率(%)','电量(KWh)','质量(kg)','车辆数(辆)']
print(mes_df.printSchema())
mes_df.show(3)
root|-- 销售年份: integer (nullable = true)|-- 车辆用途: string (nullable = true)|-- 电池形状: string (nullable = true)|-- 动力类型: string (nullable = true)|-- 电池类型: string (nullable = true)|-- 行驶里程(km): integer (nullable = true)|-- 容量保持率(%): double (nullable = true)|-- 电量(KWh): double (nullable = true)|-- 质量(kg): double (nullable = true)|-- 车辆数(辆): integer (nullable = true)None
+--------+----------+--------+--------+--------+------------+-------------+---------+--------+----------+
|销售年份|  车辆用途|电池形状|动力类型|电池类型|行驶里程(km)|容量保持率(%)|电量(KWh)|质量(kg)|车辆数(辆)|
+--------+----------+--------+--------+--------+------------+-------------+---------+--------+----------+
|    2013|私人乘用车|    方形|  纯电动|磷酸铁锂|       50000|         92.2|    128.0|  1525.0|         5|
|    2013|私人乘用车|    方形|  纯电动|磷酸铁锂|       85000|         91.6|    128.0|  1525.0|         5|
|    2013|私人乘用车|    方形|  纯电动|磷酸铁锂|       30000|         94.3|    120.4|  1455.0|         5|
+--------+----------+--------+--------+--------+------------+-------------+---------+--------+----------+
only showing top 3 rows

2. 列常见操作

2.1 添加新列

mes_df2 = mes_df.withColumn('功率',mes_df['电量(KWh)']/mes_df['质量(kg)'])
mes_df2.show(3)
+--------+----------+--------+--------+--------+------------+-------------+---------+--------+----------+------------------+
|销售年份|  车辆用途|电池形状|动力类型|电池类型|行驶里程(km)|容量保持率(%)|电量(KWh)|质量(kg)|车辆数(辆)|              功率|
+--------+----------+--------+--------+--------+------------+-------------+---------+--------+----------+------------------+
|    2013|私人乘用车|    方形|  纯电动|磷酸铁锂|       50000|         92.2|    128.0|  1525.0|         5|0.0839344262295082|
|    2013|私人乘用车|    方形|  纯电动|磷酸铁锂|       85000|         91.6|    128.0|  1525.0|         5|0.0839344262295082|
|    2013|私人乘用车|    方形|  纯电动|磷酸铁锂|       30000|         94.3|    120.4|  1455.0|         5|0.0827491408934708|
+--------+----------+--------+--------+--------+------------+-------------+---------+--------+----------+------------------+
only showing top 3 rows

2.2 重命名列

mes_df2 = mes_df2.withColumnsRenamed({'功率':'功率(wh)'})
mes_df2.show(5)
+--------+----------+--------+--------+--------+------------+-------------+---------+--------+----------+-------------------+
|销售年份|  车辆用途|电池形状|动力类型|电池类型|行驶里程(km)|容量保持率(%)|电量(KWh)|质量(kg)|车辆数(辆)|           功率(wh)|
+--------+----------+--------+--------+--------+------------+-------------+---------+--------+----------+-------------------+
|    2013|私人乘用车|    方形|  纯电动|磷酸铁锂|       50000|         92.2|    128.0|  1525.0|         5| 0.0839344262295082|
|    2013|私人乘用车|    方形|  纯电动|磷酸铁锂|       85000|         91.6|    128.0|  1525.0|         5| 0.0839344262295082|
|    2013|私人乘用车|    方形|  纯电动|磷酸铁锂|       30000|         94.3|    120.4|  1455.0|         5| 0.0827491408934708|
|    2013|私人乘用车|    方形|  纯电动|磷酸铁锂|       70000|         87.4|    120.4|  1455.0|         5| 0.0827491408934708|
|    2013|私人乘用车|    方形|  纯电动|磷酸铁锂|       75000|         87.5|     94.8|  1150.0|         4|0.08243478260869565|
+--------+----------+--------+--------+--------+------------+-------------+---------+--------+----------+-------------------+
only showing top 5 rows

2.3 删除指定列

mes_df3 = mes_df2.drop('电池类型','动力类型')
mes_df3.show(5)
+--------+----------+--------+------------+-------------+---------+--------+----------+-------------------+
|销售年份|  车辆用途|电池形状|行驶里程(km)|容量保持率(%)|电量(KWh)|质量(kg)|车辆数(辆)|           功率(wh)|
+--------+----------+--------+------------+-------------+---------+--------+----------+-------------------+
|    2013|私人乘用车|    方形|       50000|         92.2|    128.0|  1525.0|         5| 0.0839344262295082|
|    2013|私人乘用车|    方形|       85000|         91.6|    128.0|  1525.0|         5| 0.0839344262295082|
|    2013|私人乘用车|    方形|       30000|         94.3|    120.4|  1455.0|         5| 0.0827491408934708|
|    2013|私人乘用车|    方形|       70000|         87.4|    120.4|  1455.0|         5| 0.0827491408934708|
|    2013|私人乘用车|    方形|       75000|         87.5|     94.8|  1150.0|         4|0.08243478260869565|
+--------+----------+--------+------------+-------------+---------+--------+----------+-------------------+
only showing top 5 rows

2.4 修改数据

mes_f = mes_df3.replace({'私人乘用车':'私人'})
mes_f.show(3)
+--------+--------+--------+------------+-------------+---------+--------+----------+------------------+
|销售年份|车辆用途|电池形状|行驶里程(km)|容量保持率(%)|电量(KWh)|质量(kg)|车辆数(辆)|          功率(wh)|
+--------+--------+--------+------------+-------------+---------+--------+----------+------------------+
|    2013|    私人|    方形|       50000|         92.2|    128.0|  1525.0|         5|0.0839344262295082|
|    2013|    私人|    方形|       85000|         91.6|    128.0|  1525.0|         5|0.0839344262295082|
|    2013|    私人|    方形|       30000|         94.3|    120.4|  1455.0|         5|0.0827491408934708|
+--------+--------+--------+------------+-------------+---------+--------+----------+------------------+
only showing top 3 rows

3 空值处理

3.1 丢弃空值

mes_df4 = mes_df3.dropna(how = 'all',subset=['车辆用途'])
mes_df4.show(5)
+--------+----------+--------+------------+-------------+---------+--------+----------+-------------------+
|销售年份|  车辆用途|电池形状|行驶里程(km)|容量保持率(%)|电量(KWh)|质量(kg)|车辆数(辆)|           功率(wh)|
+--------+----------+--------+------------+-------------+---------+--------+----------+-------------------+
|    2013|私人乘用车|    方形|       50000|         92.2|    128.0|  1525.0|         5| 0.0839344262295082|
|    2013|私人乘用车|    方形|       85000|         91.6|    128.0|  1525.0|         5| 0.0839344262295082|
|    2013|私人乘用车|    方形|       30000|         94.3|    120.4|  1455.0|         5| 0.0827491408934708|
|    2013|私人乘用车|    方形|       70000|         87.4|    120.4|  1455.0|         5| 0.0827491408934708|
|    2013|私人乘用车|    方形|       75000|         87.5|     94.8|  1150.0|         4|0.08243478260869565|
+--------+----------+--------+------------+-------------+---------+--------+----------+-------------------+
only showing top 5 rows

3.2 空值填充

mes_df5 = mes_df4.fillna({'销售年份':1970,'车辆用途':'私人乘用车'})
mes_df5.show(5)
+--------+----------+--------+------------+-------------+---------+--------+----------+-------------------+
|销售年份|  车辆用途|电池形状|行驶里程(km)|容量保持率(%)|电量(KWh)|质量(kg)|车辆数(辆)|           功率(wh)|
+--------+----------+--------+------------+-------------+---------+--------+----------+-------------------+
|    2013|私人乘用车|    方形|       50000|         92.2|    128.0|  1525.0|         5| 0.0839344262295082|
|    2013|私人乘用车|    方形|       85000|         91.6|    128.0|  1525.0|         5| 0.0839344262295082|
|    2013|私人乘用车|    方形|       30000|         94.3|    120.4|  1455.0|         5| 0.0827491408934708|
|    2013|私人乘用车|    方形|       70000|         87.4|    120.4|  1455.0|         5| 0.0827491408934708|
|    2013|私人乘用车|    方形|       75000|         87.5|     94.8|  1150.0|         4|0.08243478260869565|
+--------+----------+--------+------------+-------------+---------+--------+----------+-------------------+
only showing top 5 rows

4 聚合操作

4.1 分组聚合

g_df = mes_df.groupBy('销售年份').agg({'电量(KWh)':'sum','质量(kg)':'sum'})
# g_df = g_df.withColumnRenamed({'sum(质量(kg))':'总质量','sum(电量(KWh))':'总电量'})
g_df =  g_df.withColumnRenamed('sum(质量(kg))','总质量').withColumnRenamed('sum(电量(KWh))','总电量')
g_df.show(3)
+--------+--------------------+-------------------+
|销售年份|              总质量|             总电量|
+--------+--------------------+-------------------+
|    2018|2.4133957670000014E8|3.040895040000008E7|
|    2015|           2551879.0| 240302.29999999897|
|    2013|             20373.6| 1867.0999999999992|
+--------+--------------------+-------------------+
only showing top 3 rows
g_df.show()
+--------+--------------------+-------------------+
|销售年份|       sum(质量(kg))|     sum(电量(KWh))|
+--------+--------------------+-------------------+
|    2018|2.4133957670000014E8|3.040895040000008E7|
|    2015|           2551879.0| 240302.29999999897|
|    2013|             20373.6| 1867.0999999999992|
|    2014|            220269.0| 20847.199999999968|
|    2019| 2.346486377000002E8|3.259483540000003E7|
|    2016| 4.531763910000002E7|  4402030.500000004|
|    2017|1.7762183689999998E8|1.887332539999987E7|
+--------+--------------------+-------------------+

5 用户自定义函数(UDF)

PySpark 有两种 UDF:传统UDF(非向量化UDF) 和 Pandas UDF(向量化UDF)

传统UDF(非向量化UDF) :通过Python函数逐行处理数据,使用pyspark.sql.functions.udf注册

  • 优点:
  • 适合简单逻辑(如字符串处理、数值转换)
  • 在所有Spark版本(≥1.3)中均可使用
  • 逐行调试方便,便于通过print或日志逐行调试逻辑。
  • 缺点:
  • 性能差,高延迟,尤其在大数据集上可能成为瓶颈。
  • 需手动处理Spark数据类型与Python类型的映射,易因类型不匹配出错。
  • 无法批量处理数据,无法利用现代CPU的SIMD指令加速。

Pandas UDF(向量化UDF):将整个列或分块数据转换为Pandas Series/DataFrame,批量处理。基于Apache Arrow的批量处理模式,使用pyspark.sql.functions.pandas_udf定义。

  • 优点:
  • 高性能,利用Pandas向量化操作,数据通过Arrow以零拷贝方式传输,减少序列化开销。
  • 支持复杂操作,适合处理整列或分组数据(如时间窗口计算、分组聚合)
  • Arrow优化内存布局,减少内存占用。
  • 缺点:
  • 依赖Arrow和Pandas。需要安装PyArrow和Pandas,且版本需与Spark兼容。
  • 调试困难,批量处理逻辑出错时,难以定位具体行的问题。
  • 型处理隐式,需熟悉Pandas与Spark类型的隐式转换规则,类型错误可能更隐晦。

5.1 传统UDF函数

from pyspark.sql.functions import udf # 传统udfdef powerCal(num,num2):return num/num2# udf创建
power_udf = udf(powerCal,DoubleType())
g_df2 = g_df.withColumn('功率',power_udf(g_df['总电量'],g_df['总质量']))
g_df2.show()
+--------+--------------------+-------------------+-------------------+
|销售年份|              总质量|             总电量|               功率|
+--------+--------------------+-------------------+-------------------+
|    2018|2.4133957670000014E8|3.040895040000008E7|0.12600067844570834|
|    2015|           2551879.0| 240302.29999999897|0.09416680806574253|
|    2013|             20373.6| 1867.0999999999992|0.09164310676561822|
|    2014|            220269.0| 20847.199999999968|0.09464427586269501|
|    2019| 2.346486377000002E8|3.259483540000003E7|0.13890911841419995|
|    2016| 4.531763910000002E7|  4402030.500000004|0.09713724252682886|
|    2017|1.7762183689999998E8|1.887332539999987E7|0.10625565937945705|
+--------+--------------------+-------------------+-------------------+

5.2 Pandas UDF(向量化UDF)

from pyspark.sql.functions import pandas_udf # 向量化udfdef powerCal2(num,num2):return num/num2# udf创建
power_pudf = pandas_udf(powerCal,DoubleType())
g_df3 = g_df.withColumn('功率',power_pudf(g_df['总电量'],g_df['总质量']))
g_df3.show()
+--------+--------------------+-------------------+-------------------+
|销售年份|              总质量|             总电量|               功率|
+--------+--------------------+-------------------+-------------------+
|    2018|2.4133957670000014E8|3.040895040000008E7|0.12600067844570834|
|    2015|           2551879.0| 240302.29999999897|0.09416680806574253|
|    2013|             20373.6| 1867.0999999999992|0.09164310676561822|
|    2014|            220269.0| 20847.199999999968|0.09464427586269501|
|    2019| 2.346486377000002E8|3.259483540000003E7|0.13890911841419995|
|    2016| 4.531763910000002E7|  4402030.500000004|0.09713724252682886|
|    2017|1.7762183689999998E8|1.887332539999987E7|0.10625565937945705|
+--------+--------------------+-------------------+-------------------+
spark.stop()

参考资料

《Python+Spark 2.0+Hadoop机器学习与大数据实战》, 林大贵,清华大学出版社,2017-12,9787302490739

相关文章:

  • 手写SpringMVC(基本框架)
  • 集成方案 | Docusign + 甄零科技,赋能企业海外业务高效增长!
  • OpenCV实验室工具的使用
  • 高能效计算:破解算力增长与能源约束的科技密码
  • 基于 Amazon RDS 数据库之间复制数据并屏蔽个人身份信息
  • 缺省处理、容错处理
  • java 类的实例化过程,其中的相关顺序 包括有继承的子类等复杂情况,静态成员变量的初始化顺序,这其中jvm在干什么
  • 关于定时任务原理
  • Mysql如何高效的查询数据是否存在
  • Jenkins(CI/CD工具)
  • OceanBase单机重启和配置修改
  • 自动伴随无人机说明文档
  • Redis 缓存并发问题深度解析:击穿、雪崩与穿透防治指南
  • 使用 LLM助手进行 Python 数据可视化
  • Python 数据可视化进阶:精准插入图表到指定 Excel 工作表
  • 手撕——贪吃蛇小游戏(下)
  • 如何通过挖掘需求、SEO优化及流量变现成功出海?探索互联网产品的盈利之道
  • Java高频面试之并发编程-08
  • C++/SDL 进阶游戏开发 —— 双人塔防(代号:村庄保卫战 14)
  • 前端分页与瀑布流最佳实践笔记 - React Antd 版
  • 《奇袭白虎团》原型人物之一赵顺合辞世,享年95岁
  • 大家聊中国式现代化|陶希东:打造高水平安全韧性城市,给群众看得见的安全感
  • 解放日报头版头条:“五个中心”蹄疾步稳谱新篇
  • 上海市市管干部任职前公示:赵亮拟为地区区长人选
  • CSR周刊:李宁打造世界地球日特别活动,珀莱雅发布2024年度可持续发展报告
  • 伊朗港口爆炸最新情况:14死700多伤,大火延烧,调查困难