PySpark中DataFrame应用升阶及UDF使用
目录
- 1. 加载数据
- 2. 列常见操作
- 2.1 添加新列
- 2.2 重命名列
- 2.3 删除指定列
- 2.4 修改数据
- 3 空值处理
- 3.1 丢弃空值
- 3.2 空值填充
- 4 聚合操作
- 4.1 分组聚合
- 5 用户自定义函数(UDF)
- 5.1 传统UDF函数
- 5.2 Pandas UDF(向量化UDF)
- 参考资料
import findspark
findspark.init()
from pyspark.sql import SparkSession
# 创建 Spark 会话
spark = SparkSession.builder \.appName("Test PySpark") \.master("local[*]") \.getOrCreate()
sc=spark.sparkContext
sc.master
'local[*]'
1. 加载数据
mes_df = spark.read.csv('spark练习数据.csv',inferSchema=True,header=True)
mes_df.columns
['销售年份','车辆用途','电池形状','动力类型','电池类型','行驶里程(km)','容量保持率(%)','电量(KWh)','质量(kg)','车辆数(辆)']
print(mes_df.printSchema())
mes_df.show(3)
root|-- 销售年份: integer (nullable = true)|-- 车辆用途: string (nullable = true)|-- 电池形状: string (nullable = true)|-- 动力类型: string (nullable = true)|-- 电池类型: string (nullable = true)|-- 行驶里程(km): integer (nullable = true)|-- 容量保持率(%): double (nullable = true)|-- 电量(KWh): double (nullable = true)|-- 质量(kg): double (nullable = true)|-- 车辆数(辆): integer (nullable = true)None
+--------+----------+--------+--------+--------+------------+-------------+---------+--------+----------+
|销售年份| 车辆用途|电池形状|动力类型|电池类型|行驶里程(km)|容量保持率(%)|电量(KWh)|质量(kg)|车辆数(辆)|
+--------+----------+--------+--------+--------+------------+-------------+---------+--------+----------+
| 2013|私人乘用车| 方形| 纯电动|磷酸铁锂| 50000| 92.2| 128.0| 1525.0| 5|
| 2013|私人乘用车| 方形| 纯电动|磷酸铁锂| 85000| 91.6| 128.0| 1525.0| 5|
| 2013|私人乘用车| 方形| 纯电动|磷酸铁锂| 30000| 94.3| 120.4| 1455.0| 5|
+--------+----------+--------+--------+--------+------------+-------------+---------+--------+----------+
only showing top 3 rows
2. 列常见操作
2.1 添加新列
mes_df2 = mes_df.withColumn('功率',mes_df['电量(KWh)']/mes_df['质量(kg)'])
mes_df2.show(3)
+--------+----------+--------+--------+--------+------------+-------------+---------+--------+----------+------------------+
|销售年份| 车辆用途|电池形状|动力类型|电池类型|行驶里程(km)|容量保持率(%)|电量(KWh)|质量(kg)|车辆数(辆)| 功率|
+--------+----------+--------+--------+--------+------------+-------------+---------+--------+----------+------------------+
| 2013|私人乘用车| 方形| 纯电动|磷酸铁锂| 50000| 92.2| 128.0| 1525.0| 5|0.0839344262295082|
| 2013|私人乘用车| 方形| 纯电动|磷酸铁锂| 85000| 91.6| 128.0| 1525.0| 5|0.0839344262295082|
| 2013|私人乘用车| 方形| 纯电动|磷酸铁锂| 30000| 94.3| 120.4| 1455.0| 5|0.0827491408934708|
+--------+----------+--------+--------+--------+------------+-------------+---------+--------+----------+------------------+
only showing top 3 rows
2.2 重命名列
mes_df2 = mes_df2.withColumnsRenamed({'功率':'功率(wh)'})
mes_df2.show(5)
+--------+----------+--------+--------+--------+------------+-------------+---------+--------+----------+-------------------+
|销售年份| 车辆用途|电池形状|动力类型|电池类型|行驶里程(km)|容量保持率(%)|电量(KWh)|质量(kg)|车辆数(辆)| 功率(wh)|
+--------+----------+--------+--------+--------+------------+-------------+---------+--------+----------+-------------------+
| 2013|私人乘用车| 方形| 纯电动|磷酸铁锂| 50000| 92.2| 128.0| 1525.0| 5| 0.0839344262295082|
| 2013|私人乘用车| 方形| 纯电动|磷酸铁锂| 85000| 91.6| 128.0| 1525.0| 5| 0.0839344262295082|
| 2013|私人乘用车| 方形| 纯电动|磷酸铁锂| 30000| 94.3| 120.4| 1455.0| 5| 0.0827491408934708|
| 2013|私人乘用车| 方形| 纯电动|磷酸铁锂| 70000| 87.4| 120.4| 1455.0| 5| 0.0827491408934708|
| 2013|私人乘用车| 方形| 纯电动|磷酸铁锂| 75000| 87.5| 94.8| 1150.0| 4|0.08243478260869565|
+--------+----------+--------+--------+--------+------------+-------------+---------+--------+----------+-------------------+
only showing top 5 rows
2.3 删除指定列
mes_df3 = mes_df2.drop('电池类型','动力类型')
mes_df3.show(5)
+--------+----------+--------+------------+-------------+---------+--------+----------+-------------------+
|销售年份| 车辆用途|电池形状|行驶里程(km)|容量保持率(%)|电量(KWh)|质量(kg)|车辆数(辆)| 功率(wh)|
+--------+----------+--------+------------+-------------+---------+--------+----------+-------------------+
| 2013|私人乘用车| 方形| 50000| 92.2| 128.0| 1525.0| 5| 0.0839344262295082|
| 2013|私人乘用车| 方形| 85000| 91.6| 128.0| 1525.0| 5| 0.0839344262295082|
| 2013|私人乘用车| 方形| 30000| 94.3| 120.4| 1455.0| 5| 0.0827491408934708|
| 2013|私人乘用车| 方形| 70000| 87.4| 120.4| 1455.0| 5| 0.0827491408934708|
| 2013|私人乘用车| 方形| 75000| 87.5| 94.8| 1150.0| 4|0.08243478260869565|
+--------+----------+--------+------------+-------------+---------+--------+----------+-------------------+
only showing top 5 rows
2.4 修改数据
mes_f = mes_df3.replace({'私人乘用车':'私人'})
mes_f.show(3)
+--------+--------+--------+------------+-------------+---------+--------+----------+------------------+
|销售年份|车辆用途|电池形状|行驶里程(km)|容量保持率(%)|电量(KWh)|质量(kg)|车辆数(辆)| 功率(wh)|
+--------+--------+--------+------------+-------------+---------+--------+----------+------------------+
| 2013| 私人| 方形| 50000| 92.2| 128.0| 1525.0| 5|0.0839344262295082|
| 2013| 私人| 方形| 85000| 91.6| 128.0| 1525.0| 5|0.0839344262295082|
| 2013| 私人| 方形| 30000| 94.3| 120.4| 1455.0| 5|0.0827491408934708|
+--------+--------+--------+------------+-------------+---------+--------+----------+------------------+
only showing top 3 rows
3 空值处理
3.1 丢弃空值
mes_df4 = mes_df3.dropna(how = 'all',subset=['车辆用途'])
mes_df4.show(5)
+--------+----------+--------+------------+-------------+---------+--------+----------+-------------------+
|销售年份| 车辆用途|电池形状|行驶里程(km)|容量保持率(%)|电量(KWh)|质量(kg)|车辆数(辆)| 功率(wh)|
+--------+----------+--------+------------+-------------+---------+--------+----------+-------------------+
| 2013|私人乘用车| 方形| 50000| 92.2| 128.0| 1525.0| 5| 0.0839344262295082|
| 2013|私人乘用车| 方形| 85000| 91.6| 128.0| 1525.0| 5| 0.0839344262295082|
| 2013|私人乘用车| 方形| 30000| 94.3| 120.4| 1455.0| 5| 0.0827491408934708|
| 2013|私人乘用车| 方形| 70000| 87.4| 120.4| 1455.0| 5| 0.0827491408934708|
| 2013|私人乘用车| 方形| 75000| 87.5| 94.8| 1150.0| 4|0.08243478260869565|
+--------+----------+--------+------------+-------------+---------+--------+----------+-------------------+
only showing top 5 rows
3.2 空值填充
mes_df5 = mes_df4.fillna({'销售年份':1970,'车辆用途':'私人乘用车'})
mes_df5.show(5)
+--------+----------+--------+------------+-------------+---------+--------+----------+-------------------+
|销售年份| 车辆用途|电池形状|行驶里程(km)|容量保持率(%)|电量(KWh)|质量(kg)|车辆数(辆)| 功率(wh)|
+--------+----------+--------+------------+-------------+---------+--------+----------+-------------------+
| 2013|私人乘用车| 方形| 50000| 92.2| 128.0| 1525.0| 5| 0.0839344262295082|
| 2013|私人乘用车| 方形| 85000| 91.6| 128.0| 1525.0| 5| 0.0839344262295082|
| 2013|私人乘用车| 方形| 30000| 94.3| 120.4| 1455.0| 5| 0.0827491408934708|
| 2013|私人乘用车| 方形| 70000| 87.4| 120.4| 1455.0| 5| 0.0827491408934708|
| 2013|私人乘用车| 方形| 75000| 87.5| 94.8| 1150.0| 4|0.08243478260869565|
+--------+----------+--------+------------+-------------+---------+--------+----------+-------------------+
only showing top 5 rows
4 聚合操作
4.1 分组聚合
g_df = mes_df.groupBy('销售年份').agg({'电量(KWh)':'sum','质量(kg)':'sum'})
# g_df = g_df.withColumnRenamed({'sum(质量(kg))':'总质量','sum(电量(KWh))':'总电量'})
g_df = g_df.withColumnRenamed('sum(质量(kg))','总质量').withColumnRenamed('sum(电量(KWh))','总电量')
g_df.show(3)
+--------+--------------------+-------------------+
|销售年份| 总质量| 总电量|
+--------+--------------------+-------------------+
| 2018|2.4133957670000014E8|3.040895040000008E7|
| 2015| 2551879.0| 240302.29999999897|
| 2013| 20373.6| 1867.0999999999992|
+--------+--------------------+-------------------+
only showing top 3 rows
g_df.show()
+--------+--------------------+-------------------+
|销售年份| sum(质量(kg))| sum(电量(KWh))|
+--------+--------------------+-------------------+
| 2018|2.4133957670000014E8|3.040895040000008E7|
| 2015| 2551879.0| 240302.29999999897|
| 2013| 20373.6| 1867.0999999999992|
| 2014| 220269.0| 20847.199999999968|
| 2019| 2.346486377000002E8|3.259483540000003E7|
| 2016| 4.531763910000002E7| 4402030.500000004|
| 2017|1.7762183689999998E8|1.887332539999987E7|
+--------+--------------------+-------------------+
5 用户自定义函数(UDF)
PySpark 有两种 UDF:传统UDF(非向量化UDF) 和 Pandas UDF(向量化UDF)
传统UDF(非向量化UDF) :通过Python函数逐行处理数据,使用pyspark.sql.functions.udf注册
- 优点:
- 适合简单逻辑(如字符串处理、数值转换)
- 在所有Spark版本(≥1.3)中均可使用
- 逐行调试方便,便于通过print或日志逐行调试逻辑。
- 缺点:
- 性能差,高延迟,尤其在大数据集上可能成为瓶颈。
- 需手动处理Spark数据类型与Python类型的映射,易因类型不匹配出错。
- 无法批量处理数据,无法利用现代CPU的SIMD指令加速。
Pandas UDF(向量化UDF):将整个列或分块数据转换为Pandas Series/DataFrame,批量处理。基于Apache Arrow的批量处理模式,使用pyspark.sql.functions.pandas_udf定义。
- 优点:
- 高性能,利用Pandas向量化操作,数据通过Arrow以零拷贝方式传输,减少序列化开销。
- 支持复杂操作,适合处理整列或分组数据(如时间窗口计算、分组聚合)
- Arrow优化内存布局,减少内存占用。
- 缺点:
- 依赖Arrow和Pandas。需要安装PyArrow和Pandas,且版本需与Spark兼容。
- 调试困难,批量处理逻辑出错时,难以定位具体行的问题。
- 型处理隐式,需熟悉Pandas与Spark类型的隐式转换规则,类型错误可能更隐晦。
5.1 传统UDF函数
from pyspark.sql.functions import udf # 传统udfdef powerCal(num,num2):return num/num2# udf创建
power_udf = udf(powerCal,DoubleType())
g_df2 = g_df.withColumn('功率',power_udf(g_df['总电量'],g_df['总质量']))
g_df2.show()
+--------+--------------------+-------------------+-------------------+
|销售年份| 总质量| 总电量| 功率|
+--------+--------------------+-------------------+-------------------+
| 2018|2.4133957670000014E8|3.040895040000008E7|0.12600067844570834|
| 2015| 2551879.0| 240302.29999999897|0.09416680806574253|
| 2013| 20373.6| 1867.0999999999992|0.09164310676561822|
| 2014| 220269.0| 20847.199999999968|0.09464427586269501|
| 2019| 2.346486377000002E8|3.259483540000003E7|0.13890911841419995|
| 2016| 4.531763910000002E7| 4402030.500000004|0.09713724252682886|
| 2017|1.7762183689999998E8|1.887332539999987E7|0.10625565937945705|
+--------+--------------------+-------------------+-------------------+
5.2 Pandas UDF(向量化UDF)
from pyspark.sql.functions import pandas_udf # 向量化udfdef powerCal2(num,num2):return num/num2# udf创建
power_pudf = pandas_udf(powerCal,DoubleType())
g_df3 = g_df.withColumn('功率',power_pudf(g_df['总电量'],g_df['总质量']))
g_df3.show()
+--------+--------------------+-------------------+-------------------+
|销售年份| 总质量| 总电量| 功率|
+--------+--------------------+-------------------+-------------------+
| 2018|2.4133957670000014E8|3.040895040000008E7|0.12600067844570834|
| 2015| 2551879.0| 240302.29999999897|0.09416680806574253|
| 2013| 20373.6| 1867.0999999999992|0.09164310676561822|
| 2014| 220269.0| 20847.199999999968|0.09464427586269501|
| 2019| 2.346486377000002E8|3.259483540000003E7|0.13890911841419995|
| 2016| 4.531763910000002E7| 4402030.500000004|0.09713724252682886|
| 2017|1.7762183689999998E8|1.887332539999987E7|0.10625565937945705|
+--------+--------------------+-------------------+-------------------+
spark.stop()
参考资料
《Python+Spark 2.0+Hadoop机器学习与大数据实战》, 林大贵,清华大学出版社,2017-12,9787302490739
