介绍熊猫PySpark UDF
注意:火花3.0引入了一个新的熊猫UDF。你可以找到更多的细节在接下来的博文:新的熊猫udf和Python类型提示即将发布的Apache 3.0火花
这是一个客人从李进社区后,软件工程师在两个σ投资,LP在纽约。这个博客也在两个σ
更新:本博客更新2月22日,2018年,包括一些变化。
这篇文章介绍了大熊猫udf(也称为矢量化udf)功能即将到来的Apache火花2.3版本中,显著地提高了性能和可用性的Python用户定义函数(udf)。
在过去的几年中,Python已经成为了默认的语言数据科学家。包等熊猫,numpy,statsmodel,scikit-learn获得了伟大的收养,成为主流的工具包。与此同时,Apache火花已成为事实上的标准在处理大数据。使数据科学家利用大数据的价值,火花在version 0.7中,添加了一个Python API支持用户定义函数。这些用户定义函数操作one-row-at-a-time,因此遭受高序列化和调用开销。因此,许多数据管道定义在Java udf和Scala,然后从Python调用它们。
熊猫udf之上的Apache箭头带给你最好的两个世界的能力定义低开销,高性能udf完全在Python中。
在火花2.3中,将会有两种类型的熊猫udf:标量和分组的地图。接下来,我们使用四个示例程序说明它们的使用:+ 1,累积概率,减去的意思是,普通最小二乘线性回归。
标量熊猫udf
标量熊猫udf用于vectorizing标量操作。定义一个标量熊猫UDF,简单地使用@pandas_udf
注释一个Python函数pandas.Series
作为参数,并返回另一个pandas.Series
相同的大小。下面我们使用两个例子说明:+ 1和累积概率。
+ 1
计算v + 1是一个简单的示例演示差异row-at-a-time udf和标量熊猫udf。注意,内置列运营商可以执行在这个场景中快得多。
使用row-at-a-time udf:
从pyspark.sql.functions进口udf#使用udf来定义一个row-at-a-time udf@udf (“双”)#输入/输出都是一个双精度值defplus_one(v):返回v +1df.withColumn (“v2”plus_one (df.v))
使用熊猫udf:
从pyspark.sql.functions进口pandas_udf, PandasUDFType#使用pandas_udf定义一个熊猫UDF@pandas_udf (“双”,PandasUDFType.SCALAR)#输入/输出都是熊猫。一系列的双打defpandas_plus_one(v):返回v +1df.withColumn (“v2”pandas_plus_one (df.v))
上面的示例定义一个row-at-a-time UDF“plus_one”和一个标量熊猫UDF“pandas_plus_one”执行相同的计算“+ 1”。UDF定义是相同的除了函数修饰符:“UDF”与“pandas_udf”。
row-at-a-time版本,用户定义函数获取双“v”并返回的结果作为一个双“v + 1”。在熊猫的版本中,用户定义的函数pandas.Series
“v”,并返回“v + 1”的结果pandas.Series
。因为“v + 1”是矢量化pandas.Series
熊猫版本row-at-a-time版本要快得多。
请注意,有两个重要的需求使用标量熊猫udf时:
- 输入和输出序列必须具有相同的大小。
- 一列是如何分成多个
pandas.Series
是内部引发,因此用户定义函数的结果必须是独立的分裂。
累积概率
这个例子显示了一个更实用的使用标量熊猫UDF:计算累积概率一个值的正态分布N(0,1)使用scipy包中。
进口熊猫作为pd从scipy进口统计数据@pandas_udf (“双”)def提供(v):返回pd.Series (stats.norm.cdf (v))
df.withColumn (“cumulative_probability”,cdf实验组(df.v))
stats.norm.cdf
一个标量值和工作pandas.Series
,这个例子可以写row-at-a-time udf。与前面的示例相似,熊猫版本运行更快,见后面的“性能比较”部分。
分组地图熊猫udf
Python用户相当熟悉split-apply-combine模式在数据分析。分组地图熊猫udf设计对于这个场景,他们操作的所有数据组,例如,“对于每一个日期,应用此操作”。
分组地图熊猫udf第一次分裂火花DataFrame
分成小组根据groupby操作符中指定的条件,适用于一个用户定义的函数(pandas.DataFrame
- >pandas.DataFrame
每组),结合并返回结果作为一个新的火花DataFrame
。
分组地图熊猫udf使用相同的函数修饰符pandas_udf
作为标量熊猫udf,但是他们有一些差异:
- 输入的用户定义函数:
- 标量:
pandas.Series
- 分组的地图:
pandas.DataFrame
- 标量:
- 用户定义函数的输出:
- 标量:
pandas.Series
- 分组的地图:
pandas.DataFrame
- 标量:
- 语义分组:
- 标量:没有分组语义
- 分组地图:定义为“groupby”条款
- 输出大小:
- 标量:作为输入的大小相同
- 分组地图:任何大小
- 函数的返回类型修饰符:
- 数量:一个
数据类型
指定返回的类型pandas.Series
- 分组的地图:
StructType
返回指定每一列的名称和类型pandas.DataFrame
- 数量:一个
接下来,让我们走进两个例子来说明用例分组地图熊猫udf。
减去的意思
这个例子显示了一个简单的使用分组地图熊猫udf:从组中的每个值减去的意思。
@pandas_udf (df。模式,PandasUDFType.GROUPED_MAP)#输入/输出都是pandas.DataFramedefsubtract_mean(pdf):返回pdf.assign (v = pdf。v - pdf.v.mean ())df.groupby (“id”苹果(subtract_mean)
在这个例子中,我们从每个值减去均值v v为每个组。分组的语义是“groupby”定义的函数,我。e,每个输入pandas.DataFrame
用户定义的函数有相同的“id”价值。这个用户定义函数的输入和输出模式是相同的,所以我们通过“df。装饰模式”pandas_udf
用于指定模式。
分组地图熊猫udf也可以称为独立的Python函数驱动程序。这对调试非常有用,例如:
示例= df。过滤器(id= =1).toPandas ()#运行作为一个独立的函数在一个熊猫。DataFrame并验证结果subtract_mean.func(样本)现在#运行与火花df.groupby (“id”苹果(substract_mean)
在上面的示例中,我们首先将火花的一个小子集DataFrame
到一个pandas.DataFrame
,然后运行subtract_mean作为一个独立的Python函数。验证逻辑函数后,我们可以调用UDF与火花在整个数据集。
普通最小二乘线性回归
最后一个例子展示了如何使用statsmodels运行OLS线性回归为每个组。对于每一组,我们计算βb = (b1, b2) X = (x1, x2)根据统计模型Y = bX + c。
进口statsmodels.api作为sm# df有四个列:id, y, x1, x2group_column =“id”y_column =“y”x_columns = [x1的,“x2”]模式= df。选择(group_column * x_columns) . schema@pandas_udf (模式,PandasUDFType.GROUPED_MAP)#输入/输出都是pandas.DataFramedefols(pdf):group_key = pdf [group_column] .iloc [0]y = pdf (y_column)X = pdf (x_columns)X = sm.add_constant (X)= sm模型。OLS (y, X) .fit ()返回pd。DataFrame ([[group_key] + [model.params[我]为我在x_columns]],列= [group_column] + x_columns)
β= df.groupby (group_column)苹果(ols)
这个例子表明,分组地图熊猫udf可以用于任意python函数:pandas.DataFrame- >pandas.DataFrame
。返回的pandas.DataFrame
可以有不同的行和列数作为输入。
性能比较
最后,我们想要展示row-at-a-time udf和熊猫udf之间的性能比较。我们跑微基准的三个上面的示例(+ 1,累积概率和减去的意思)。
配置和方法论
我们跑的基准在单个节点上火花砖community edition的集群。
配置信息:
数据:10 m-row DataFrame Int列和两列
集群:6.0 GB内存,0.88内核,1 DBU
砖运行时Scala版本:最新RC (4.0, 2.11)
详细的实施的基准,检查熊猫UDF笔记本。
如图表所示,熊猫udf执行比row-at-a-time udf,从3 x / 100 x。
结论和未来的工作
即将到来的火花2.3版本放下大幅改善的基础在Python中用户自定义函数的功能和性能。在未来,我们打算引入支持大熊猫udf聚合和窗口函数。可以跟踪相关工作火星- 22216。
熊猫udf是一个很好的例子引发社会的努力。我们要感谢布赖恩•卡特勒Hyukjin Kwon杰夫贝克Liang-Chi谢长廷,列夫沃尔什,李进,雷诺鑫,Takuya Ueshin, Wenchen粉丝,韦斯·麦金尼,小李和许多其他人对他们的贡献。最后,特别感谢Apache箭头社区使这项工作成为可能。