新的熊猫udf和Python类型提示即将发布的Apache 3.0火花
2020年5月20日 在工程的博客
熊猫用户定义函数(udf)是一个最重要的增强Apache火花TM对数据的科学。他们带来很多好处,比如让用户使用熊猫api和提高性能。
然而,熊猫udf演变有机地随着时间的推移,这导致了一些不一致和在用户中制造混乱。Apache火花的完整版本3.0,预计很快就会公布),将引入一个新的接口利用的熊猫udfPython类型提示解决熊猫UDF的扩散,帮助他们变得更加神谕的,自描述的类型。
这篇文章介绍了新的熊猫udf Python类型提示,和新的熊猫函数api包括分组地图,地图,co-grouped地图。
熊猫udf
熊猫udf在火花2.3中引入的,见也介绍熊猫PySpark UDF。熊猫是众所周知的科学家和数据无缝集成了许多Python库和包等NumPy,statsmodel,scikit-learn,熊猫udf允许数据科学家不仅扩展他们的工作负载,还利用熊猫api在Apache火花。
用户定义的函数执行的:
- Apache箭头,JVM之间直接交换数据和Python驱动程序/执行人的(反)序列化成本几乎为零。
- 熊猫在函数内部,与熊猫实例和api。
熊猫熊猫udf使用api函数内部和Apache箭头交换数据。它允许矢量化操作,可以提高性能100 x,而row-at-a-time Python udf。
下面的例子展示了一个熊猫UDF之间简单地添加一个值,它与调用的函数定义pandas_plus_one
装饰的pandas_udf
与熊猫UDF类型指定为PandasUDFType.SCALAR
。
从pyspark.sql.functions进口pandas_udf, PandasUDFType@pandas_udf (“双”,PandasUDFType.SCALAR)defpandas_plus_one(v):#“v”是一个熊猫系列返回v.add (1)#输出一个熊猫系列火花。范围(10).select (pandas_plus_one (“id”)),告诉()
Python函数和输出一个熊猫系列。您可以执行一个矢量化操作添加一个值通过使用丰富的熊猫在这个函数api。(反)序列化也自动矢量化利用Apache箭头。
Python类型提示
Python类型提示被官方介绍PEP 484Python 3.5。静态类型提示是一个官方的方式显示在Python中值的类型。看下面的例子。
def问候(名称:str)- - - >str:返回“你好”+名字
名称:str
显示参数是str的类型和名称- >
语法表示问候()
函数返回一个字符串。
Python类型提示两个重要造福PySpark和熊猫UDF上下文。
- 它给一个明确的定义的函数应该做什么,让用户更容易理解的代码。例如,除非它是记录,用户无法知道
问候
可以采取没有一个
如果没有提示类型。它可以避免需要文档这样微妙的情况下的测试用例和/或为用户测试和自己弄清楚。 - 它能让我们更容易执行静态分析。ide,比如PyCharm和Visual Studio代码可以利用类型注解提供代码完成,显示错误,并支持更好的转向定义功能。
熊猫UDF扩散类型
Apache火花2.3发布以来,一些新熊猫udf已经实现,使得用户很难学习新规范和如何使用它们。例如,这里有三个熊猫udf,输出几乎相同的结果:
从pyspark.sql.functions进口pandas_udf, PandasUDFType@pandas_udf (“长”,PandasUDFType.SCALAR)defpandas_plus_one(v):#“v”是一个熊猫系列返回v +1#输出一个熊猫系列火花。范围(10).select (pandas_plus_one (“id”)),告诉()
从pyspark.sql.functions进口pandas_udf, PandasUDFType
#新型3.0熊猫UDF的火花。@pandas_udf (“长”,PandasUDFType.SCALAR_ITER)defpandas_plus_one(itr):#“迭代器”是一个熊猫系列的迭代器。返回地图(λv: +1itr)#熊猫系列的输出迭代器。火花。范围(10).select (pandas_plus_one (“id”)),告诉()
从pyspark.sql.functions进口pandas_udf, PandasUDFType@pandas_udf (“id”,PandasUDFType.GROUPED_MAP)defpandas_plus_one(pdf):#是一个熊猫DataFrame pdf返回pdf +1#输出一个熊猫DataFrame#“pandas_plus_one”_only_可以用于“groupby(…)苹果(…)”火花。范围(10).groupby (“id”苹果(pandas_plus_one),告诉()
尽管这些UDF类型有不同的目的,一些可以适用。在这个简单的例子中,你可以使用任何的三个。然而,每个熊猫udf预计不同的输入和输出类型,和工作在不同的方式不同的语义和不同的性能。它使用户感到困惑关于哪一个使用和学习,以及每个是如何运作的。
此外,pandas_plus_one
在第一和第二的情况下可以使用常规PySpark列。考虑的参数withColumn
或函数等其他表达式的组合pandas_plus_one (" id ") + 1
。然而,过去的pandas_plus_one
只能使用吗groupby(…)苹果(pandas_plus_one)
。
这种级别的复杂性与火花引发了很多讨论开发人员,和开车的努力推出新的熊猫通过一个api与Python类型提示正式的求婚。我们的目标是让用户自然地表达他们的熊猫udf使用Python类型提示如上问题情况下没有混乱。例如,上面的情况下可以写成如下:
defpandas_plus_one(v: pd.Series)- > pd.Series:返回v +1
defpandas_plus_one(itr:迭代器(pd.Series))- >迭代器(pd.Series):返回地图(λv: +1itr)
defpandas_plus_one(pdf: pd.DataFrame)- > pd.DataFrame:返回pdf +1
新的熊猫与Python api类型提示
为了解决老熊猫udf的复杂性,从Apache火花与Python 3.6和3.0以上,Python类型提示等pandas.Series
,pandas.DataFrame
,元组
,迭代器
可以用来表达新的熊猫UDF类型。
此外,老熊猫udf被分成两个API类:熊猫udf和熊猫的API函数。尽管他们在内部以类似的方式工作,有明显的差异。
你可以以同样的方式对待熊猫udf,你使用其他PySpark列实例。然而,您不能使用这些列的熊猫api函数实例。这里有两个例子:
#熊猫UDF进口熊猫作为pd从pyspark.sql.functions进口pandas_udf、log2坳@pandas_udf (“长”)defpandas_plus_one(s: pd.Series)- > pd.Series:返回s +1# pandas_plus_one (" id ")等于视为SQL expression_内部。#即可以结合其他列、函数和表达式。火花。范围(10).select (pandas_plus_one(坳(“id”)- - -1)+ log2 (“id”)+1),告诉()
#熊猫API函数从打字进口迭代器进口熊猫作为pd
defpandas_plus_one(迭代器:迭代器(pd.DataFrame))- >迭代器(pd.DataFrame):返回地图(λv: +1迭代器)
# pandas_plus_one只是一个常规的Python函数,mapInPandas#逻辑视为_a单独的SQL查询plan_而不是SQL表达式。#因此,直接互动与其他表达式是不可能的。火花。范围(10)。地图InPandas(pandas_plus_one, schema=“id”),告诉()
同时,注意,熊猫udf需要Python类型的暗示而类型提示在熊猫函数api目前可选的。类型提示计划在熊猫api和功能可能需要在未来。
新的熊猫udf
而不是手动定义和指定每个熊猫UDF类型,新的熊猫UDF推断熊猫UDF类型从给定的Python类型暗示Python函数。目前有四个支持Python类型的情况下在熊猫udf提示:
- 系列,系列
- 迭代器系列的迭代器系列
- 迭代器的多个系列的迭代器系列
- 系列标量(单个值)
我们深入了解每个案例之前,让我们看看三个重点工作与新熊猫udf。
- 尽管Python类型提示Python世界是可选的,您必须指定Python类型提示输入和输出,以使用新的熊猫udf。
- 用户仍然可以使用老方法通过手动指定熊猫UDF类型。然而,鼓励使用Python类型提示。
- 提示应该使用类型
pandas.Series
在所有情况下。然而,有一个变体pandas.DataFrame
应该用于其输入或输出类型提示:当输入或输出列的StructType。
看看下面的例子:进口熊猫作为pd从pyspark.sql.functions进口pandas_udf df = spark.createDataFrame ([[1,“字符串”,(“一个嵌套的字符串”)]],“long_col长string_col字符串,struct_col结构(字符串)”)@pandas_udf (“col1字符串,col2长”)defpandas_plus_len(s1: pd。系列,s2: pd。系列,pdf: pd.DataFrame)- > pd.DataFrame:#专栏系列,结构是DataFrame列。pdf (“col2”)= s1 + s2。str。len()返回pdf# struct列预计DataFrame返回df.select (pandas_plus_len (“long_col”,“string_col”,“struct_col”)),告诉()
系列,系列
系列,系列映射到标量熊猫UDF在Apache火花2.3中引入的。提示类型可以表示为pandas.Series,。。。- > pandas.Series
。将一个或多个给定的函数pandas.Series
和输出一个pandas.Series
。输出长度预计将与输入相同。
进口熊猫作为pd从pyspark.sql.functions进口pandas_udf@pandas_udf (“长”)defpandas_plus_one(s: pd.Series)- > pd.Series:返回s +1火花。范围(10).select (pandas_plus_one (“id”)),告诉()
上面的例子中可以映射到旧风格与标量熊猫UDF,如下。
从pyspark.sql.functions进口pandas_udf, PandasUDFType@pandas_udf (“长”,PandasUDFType.SCALAR)defpandas_plus_one(v):返回v +1火花。范围(10).select (pandas_plus_one (“id”)),告诉()
迭代器系列的迭代器系列
这是一种新型的熊猫UDF Apache火花3.0。系列系列的一个变种,提示类型可以表示为迭代器(pd。系列)- >迭代器(pd.Series)
。的函数和输出迭代器pandas.Series
。
整个输出的长度必须相同长度的输入。因此,它可以预取数据从输入迭代器,只要整个输入和输出的长度是相同的。给定的函数应该接受一个单独的列作为输入。
从打字进口迭代器进口熊猫作为pd从pyspark.sql.functions进口pandas_udf@pandas_udf (“长”)defpandas_plus_one(迭代器:迭代器(pd.Series))- >迭代器(pd.Series):返回地图(λs: s +1迭代器)火花。范围(10).select (pandas_plus_one (“id”)),告诉()
时也有用UDF执行需要昂贵的一些初始化状态。下面的伪代码演示了这种情况。
@pandas_udf (“长”)def计算(迭代器:迭代器(pd.Series))- >迭代器(pd.Series):#做一些昂贵的初始化状态状态= very_expensive_initialization ()为x在迭代器:#负责整个迭代器使用。收益率calculate_with_state (x,状态)df.select(计算(“价值”)),告诉()
迭代器的级数迭代器的级数也可以映射到老熊猫UDF的风格。看下面的例子。
从pyspark.sql.functions进口pandas_udf, PandasUDFType@pandas_udf (“长”,PandasUDFType.SCALAR_ITER)defpandas_plus_one(迭代器):返回地图(λs: s +1迭代器)火花。范围(10).select (pandas_plus_one (“id”)),告诉()
迭代器的多个系列的迭代器系列
这种类型的熊猫UDF将在Apache 3.0火花,还介绍了与迭代器系列的迭代器系列。提示类型可以表示为迭代器(元组(熊猫。系列中,…]]- >迭代器[pandas.Series]
。
它有类似的特征和限制的迭代器系列的迭代器系列。给定的函数迭代器的一个元组pandas.Series
和输出迭代器pandas.Series
。时也有用时使用一些州和预取输入数据。整个输出的长度也应与整个输入的长度相同。然而,给定的函数应该多个列作为输入,与迭代器系列的迭代器系列。
从打字进口迭代器,元组进口熊猫作为pd从pyspark.sql.functions进口pandas_udf@pandas_udf (“长”)defmultiply_two(迭代器:迭代器(元组(pd。系列,pd.Series]])- >迭代器(pd.Series):返回(a * b为a、b在迭代器)火花。范围(10).select (multiply_two (“id”,“id”)),告诉()
这也可以映射到老熊猫UDF样式如下。
从pyspark.sql.functions进口pandas_udf, PandasUDFType@pandas_udf (“长”,PandasUDFType.SCALAR_ITER)defmultiply_two(迭代器):返回(a * b为a、b在迭代器)火花。范围(10).select (multiply_two (“id”,“id”)),告诉()
系列标量
系列标量映射到分组聚合熊猫UDF在Apache火花2.4中引入的。提示表示为类型pandas.Series,。。。- >任何
。函数接受一个或多个熊猫。系列和输出原始数据类型。返回标量可以是一个Python原始类型,例如,int
,浮动
或NumPy数据类型等numpy.int64
,numpy.float64
等。任何
理想情况下应该相应特定的标量类型。
进口熊猫作为pd从pyspark.sql.functions进口pandas_udf从pyspark.sql进口窗口
df = spark.createDataFrame (((1,1.0),(1,2.0),(2,3.0),(2,5.0),(2,10.0),(“id”,“v”))@pandas_udf (“替身”)defpandas_mean(v: pd.Series)- - - >浮动:返回v。总和()df.select (pandas_mean (df (“v”])),告诉()df.groupby (“id”).agg (pandas_mean (df (“v”])),告诉()df.select (pandas_mean (df (“v”]).over (Window.partitionBy (“id”))),告诉()
上面的例子可以转化成分组总熊猫UDF的例子可以看到:
进口熊猫作为pd从pyspark.sql.functions进口pandas_udf, PandasUDFType从pyspark.sql进口窗口
df = spark.createDataFrame (((1,1.0),(1,2.0),(2,3.0),(2,5.0),(2,10.0),(“id”,“v”))@pandas_udf (“替身”,PandasUDFType.GROUPED_AGG)defpandas_mean(v):返回v。总和()df.select (pandas_mean (df (“v”])),告诉()df.groupby (“id”).agg (pandas_mean (df (“v”])),告诉()df.select (pandas_mean (df (“v”]).over (Window.partitionBy (“id”))),告诉()
新的熊猫函数api
这个新类别在Apache 3.0火花使您能够直接应用一个Python本机函数,以熊猫和输出实例对PySpark DataFrame。熊猫函数api支持Apache火花3.0:分组地图,地图,co-grouped地图。
注意分组地图熊猫UDF现在归类为一组地图熊猫API函数。如前所述,Python类型提示目前大熊猫api函数是可选的。
分组的地图
熊猫分组版图的API函数applyInPandas
在分组DataFrame,例如,df.groupby (…)
。这是映射到分组地图熊猫UDF在老熊猫UDF类型。它将每一组映射到每一个pandas.DataFrame
的函数。请注意,它不需要为输出是输入的长度相同。
进口熊猫作为pd
df = spark.createDataFrame (((1,1.0),(1,2.0),(2,3.0),(2,5.0),(2,10.0),(“id”,“v”))defsubtract_mean(pdf: pd.DataFrame)- > pd.DataFrame:v = pdf.v返回pdf。作为sign(v=v - v.mean())df.groupby (“id”)。applyInPandas (subtract_mean、模式= df.schema),告诉()
分组的分组的映射类型映射到地图熊猫UDF支持从火花2.3,如下:
进口熊猫作为pd从pyspark.sql.functions进口pandas_udf, PandasUDFType
df = spark.createDataFrame (((1,1.0),(1,2.0),(2,3.0),(2,5.0),(2,10.0),(“id”,“v”))@pandas_udf (df。模式,PandasUDFType.GROUPED_MAP)defsubtract_mean(pdf):v = pdf.v返回pdf。作为sign(v=v - v.mean())df.groupby (“id”苹果(subtract_mean),告诉()
地图
熊猫地图API函数mapInPandas
DataFrame。它在Apache火花3.0是新的。它在每个分区地图每一个批处理和转换。该函数的迭代器pandas.DataFrame
和输出迭代器pandas.DataFrame
。输出长度不需要匹配输入的大小。
从打字进口迭代器进口熊猫作为pddf = spark.createDataFrame (((1,21),(2,30.),(“id”,“年龄”))defpandas_filter(迭代器:迭代器(pd.DataFrame))- >迭代器(pd.DataFrame):为pdf在迭代器:收益率pdf [pdf。id= =1]
df。地图InPandas(pandas_filter, schema=df.schema).show()
Co-grouped地图
Co-grouped地图,applyInPandas
在一个co-grouped DataFrame等df.groupby (…) .cogroup (df.groupby (…))
,还将介绍Apache 3.0火花。类似于分组的地图,它将每一组映射到每一个pandas.DataFrame
函数,但组与另一个DataFrame通过常见的键(s),然后函数应用于每个cogroup。同样,没有长度限制输出。
进口熊猫作为pd
df1 = spark.createDataFrame (((1201年,1,1.0),(1201年,2,2.0),(1202年,1,3.0),(1202年,2,4.0)),(“时间”,“id”,“v1”))df2 = spark.createDataFrame (((1201年,1,“x”),(1201年,2,“y”),(“时间”,“id”,“v2”))defasof_join(左:pd。DataFrame吧:pd.DataFrame)- > pd.DataFrame:返回pd。merge_asof(左,右,=“时间”,=“id”)df1.groupby (“id”).cogroup (df2.groupby (“id”)).applyInPandas (asof_join“时间int, int id, v1加倍,v2字符串“),告诉()
结论和未来的工作
即将发布的Apache 3.0(火花阅读我们的预览博客了解详情)。将提供Python类型提示,使它更简单可供用户表达熊猫udf和熊猫的api函数。在未来,我们应该考虑添加支持其他类型提示组合在熊猫udf和熊猫的api函数。目前,许多可能的组合的支持的情况下只有几个Python类型的提示。还有其他正在进行的讨论Apache火花社区。访问一边讨论和未来的改进要学习BOB低频彩更多的知识。