跳转到主要内容
工程的博客

介绍熊猫PySpark UDF

如何运行与PySpark本机Python代码,快。
通过李进

2017年10月30日 工程的博客

分享这篇文章
注意:火花3.0引入了一个新的熊猫UDF。你可以找到更多的细节在接下来的博文:新的熊猫udf和Python类型提示即将发布的Apache 3.0火花

这是一个客人从李进社区后,软件工程师在两个σ投资,LP在纽约。这个博客也在两个σ

在砖试试这个笔记本

更新:本博客更新2月22日,2018年,包括一些变化。

这篇文章介绍了大熊猫udf(也称为矢量化udf)功能即将到来的Apache火花2.3版本中,显著地提高了性能和可用性的Python用户定义函数(udf)。

在过去的几年中,Python已经成为了默认的语言数据科学家。包等熊猫,numpy,statsmodel,scikit-learn获得了伟大的收养,成为主流的工具包。与此同时,Apache火花已成为事实上的标准在处理大数据。使数据科学家利用大数据的价值,火花在version 0.7中,添加了一个Python API支持用户定义函数。这些用户定义函数操作one-row-at-a-time,因此遭受高序列化和调用开销。因此,许多数据管道定义在Java udf和Scala,然后从Python调用它们。

熊猫udf之上的Apache箭头带给你最好的两个世界的能力定义低开销,高性能udf完全在Python中。

在火花2.3中,将会有两种类型的熊猫udf:标量和分组的地图。接下来,我们使用四个示例程序说明它们的使用:+ 1,累积概率,减去的意思是,普通最小二乘线性回归。

标量熊猫udf

标量熊猫udf用于vectorizing标量操作。定义一个标量熊猫UDF,简单地使用@pandas_udf注释一个Python函数pandas.Series作为参数,并返回另一个pandas.Series相同的大小。下面我们使用两个例子说明:+ 1和累积概率。

+ 1

计算v + 1是一个简单的示例演示差异row-at-a-time udf和标量熊猫udf。注意,内置列运营商可以执行在这个场景中快得多。

使用row-at-a-time udf:

pyspark.sql.functions进口udf#使用udf来定义一个row-at-a-time udf@udf (“双”)#输入/输出都是一个双精度值defplus_one(v):返回v +1df.withColumn (“v2”plus_one (df.v))

使用熊猫udf:

pyspark.sql.functions进口pandas_udf, PandasUDFType#使用pandas_udf定义一个熊猫UDF@pandas_udf (“双”,PandasUDFType.SCALAR)#输入/输出都是熊猫。一系列的双打defpandas_plus_one(v):返回v +1df.withColumn (“v2”pandas_plus_one (df.v))

上面的示例定义一个row-at-a-time UDF“plus_one”和一个标量熊猫UDF“pandas_plus_one”执行相同的计算“+ 1”。UDF定义是相同的除了函数修饰符:“UDF”与“pandas_udf”。

row-at-a-time版本,用户定义函数获取双“v”并返回的结果作为一个双“v + 1”。在熊猫的版本中,用户定义的函数pandas.Series“v”,并返回“v + 1”的结果pandas.Series。因为“v + 1”是矢量化pandas.Series熊猫版本row-at-a-time版本要快得多。

请注意,有两个重要的需求使用标量熊猫udf时:

  • 输入和输出序列必须具有相同的大小。
  • 一列是如何分成多个pandas.Series是内部引发,因此用户定义函数的结果必须是独立的分裂。

累积概率

这个例子显示了一个更实用的使用标量熊猫UDF:计算累积概率一个值的正态分布N(0,1)使用scipy包中。

进口熊猫作为pdscipy进口统计数据@pandas_udf (“双”)def提供(v):返回pd.Series (stats.norm.cdf (v))
              df.withColumn (“cumulative_probability”,cdf实验组(df.v))

stats.norm.cdf一个标量值和工作pandas.Series,这个例子可以写row-at-a-time udf。与前面的示例相似,熊猫版本运行更快,见后面的“性能比较”部分。

分组地图熊猫udf

Python用户相当熟悉split-apply-combine模式在数据分析。分组地图熊猫udf设计对于这个场景,他们操作的所有数据组,例如,“对于每一个日期,应用此操作”。

分组地图熊猫udf第一次分裂火花DataFrame分成小组根据groupby操作符中指定的条件,适用于一个用户定义的函数(pandas.DataFrame- >pandas.DataFrame每组),结合并返回结果作为一个新的火花DataFrame

分组地图熊猫udf使用相同的函数修饰符pandas_udf作为标量熊猫udf,但是他们有一些差异:

  • 输入的用户定义函数:
    • 标量:pandas.Series
    • 分组的地图:pandas.DataFrame
  • 用户定义函数的输出:
    • 标量:pandas.Series
    • 分组的地图:pandas.DataFrame
  • 语义分组:
    • 标量:没有分组语义
    • 分组地图:定义为“groupby”条款
  • 输出大小:
    • 标量:作为输入的大小相同
    • 分组地图:任何大小
  • 函数的返回类型修饰符:
    • 数量:一个数据类型指定返回的类型pandas.Series
    • 分组的地图:StructType返回指定每一列的名称和类型pandas.DataFrame

接下来,让我们走进两个例子来说明用例分组地图熊猫udf。

减去的意思

这个例子显示了一个简单的使用分组地图熊猫udf:从组中的每个值减去的意思。

@pandas_udf (df。模式,PandasUDFType.GROUPED_MAP)#输入/输出都是pandas.DataFramedefsubtract_mean(pdf):返回pdf.assign (v = pdf。v - pdf.v.mean ())df.groupby (“id”苹果(subtract_mean)

在这个例子中,我们从每个值减去均值v v为每个组。分组的语义是“groupby”定义的函数,我。e,每个输入pandas.DataFrame用户定义的函数有相同的“id”价值。这个用户定义函数的输入和输出模式是相同的,所以我们通过“df。装饰模式”pandas_udf用于指定模式。

分组地图熊猫udf也可以称为独立的Python函数驱动程序。这对调试非常有用,例如:

示例= df。过滤器(id= =1).toPandas ()#运行作为一个独立的函数在一个熊猫。DataFrame并验证结果subtract_mean.func(样本)现在#运行与火花df.groupby (“id”苹果(substract_mean)

在上面的示例中,我们首先将火花的一个小子集DataFrame到一个pandas.DataFrame,然后运行subtract_mean作为一个独立的Python函数。验证逻辑函数后,我们可以调用UDF与火花在整个数据集。

普通最小二乘线性回归

最后一个例子展示了如何使用statsmodels运行OLS线性回归为每个组。对于每一组,我们计算βb = (b1, b2) X = (x1, x2)根据统计模型Y = bX + c

进口statsmodels.api作为sm# df有四个列:id, y, x1, x2group_column =“id”y_column =“y”x_columns = [x1的,“x2”]模式= df。选择(group_column * x_columns) . schema@pandas_udf (模式,PandasUDFType.GROUPED_MAP)#输入/输出都是pandas.DataFramedefols(pdf):group_key = pdf [group_column] .iloc [0]y = pdf (y_column)X = pdf (x_columns)X = sm.add_constant (X)= sm模型。OLS (y, X) .fit ()返回pd。DataFrame ([[group_key] + [model.params[我]x_columns]],列= [group_column] + x_columns)
              β= df.groupby (group_column)苹果(ols)

这个例子表明,分组地图熊猫udf可以用于任意python函数:pandas.DataFrame- >pandas.DataFrame。返回的pandas.DataFrame可以有不同的行和列数作为输入。

性能比较

最后,我们想要展示row-at-a-time udf和熊猫udf之间的性能比较。我们跑微基准的三个上面的示例(+ 1,累积概率和减去的意思)。

配置和方法论

我们跑的基准在单个节点上火花砖community edition的集群。

配置信息:
数据:10 m-row DataFrame Int列和两列
集群:6.0 GB内存,0.88内核,1 DBU
砖运行时Scala版本:最新RC (4.0, 2.11)

详细的实施的基准,检查熊猫UDF笔记本

如图表所示,熊猫udf执行比row-at-a-time udf,从3 x / 100 x。

结论和未来的工作

即将到来的火花2.3版本放下大幅改善的基础在Python中用户自定义函数的功能和性能。在未来,我们打算引入支持大熊猫udf聚合和窗口函数。可以跟踪相关工作火星- 22216

熊猫udf是一个很好的例子引发社会的努力。我们要感谢布赖恩•卡特勒Hyukjin Kwon杰夫贝克Liang-Chi谢长廷,列夫沃尔什,李进,雷诺鑫,Takuya Ueshin, Wenchen粉丝,韦斯·麦金尼,小李和许多其他人对他们的贡献。最后,特别感谢Apache箭头社区使这项工作成为可能。

接下来是什么

你可以试试的熊猫UDF笔记本现在这个功能是可用的砖4.0运行时β

免费试着砖

相关的帖子

看到所有工程的博客的帖子