熊猫api函数

熊猫函数api使您能够直接应用一个Python本机函数,熊猫实例并将其输出到一个PySpark DataFrame。类似于熊猫用户定义函数、功能的api也使用Apache箭头传输数据和熊猫的数据;然而,Python类型提示在熊猫函数api中是可选的。

有三种类型的熊猫api函数:

  • 分组的地图

  • 地图

  • Cogrouped地图

熊猫熊猫函数api利用相同的内部逻辑执行使用UDF。他们分享PyArrow等特点,支持SQL类型和配置。

有关更多信息,请参见博文新的熊猫udf和Python类型提示即将发布的Apache 3.0火花

分组的地图

你改变你的分组数据使用.applyInPandas groupBy () ()实现“split-apply-combine”模式。Split-apply-combine包括三个步骤:

  • 把数据分成组使用DataFrame.groupBy

  • 应用一个函数在每个组。函数的输入和输出都是pandas.DataFrame。输入数据包含每组的所有行和列。

  • 结合成一个新的结果DataFrame

使用.applyInPandas groupBy () (),您必须定义如下:

  • 一个Python函数,定义了每一组计算

  • 一个StructType对象或一个字符串,该字符串定义了输出的模式DataFrame

返回的列标签pandas.DataFrame必须匹配输出模式如果定义的字段名称指定为字符串,或匹配字段数据类型的位置如果不是字符串,例如,整数指数。看到pandas.DataFrame当构建一个如何标签列pandas.DataFrame

所有数据一组应用函数之前被加载到内存中。这可能导致内存不足异常,特别是尺寸倾向。的配置maxRecordsPerBatch不应用于组织和它是由你来确保分组数据符合可用的内存。

下面的例子展示了如何使用苹果groupby () ()从每个值减去均值。

df=火花createDataFrame(((1,1.0),(1,2.0),(2,3.0),(2,5.0),(2,10.0)),(“id”,“v”))defsubtract_mean(pdf):# pdf pandas.DataFramev=pdfv返回pdf分配(v=v- - - - - -v的意思是())dfgroupby(“id”)applyInPandas(subtract_mean,模式=“id, v双”)显示()# + - - - + - - - +# | v | | id# + - - - + - - - +# | 1 | -0.5 |# | 1 | 0.5 |# | 2 | -3.0 |# | 2 | -1.0 |# | 2 | 4.0 |# + - - - + - - - +

详细的用法,看到pyspark.sql.GroupedData.applyInPandas

地图

你和熊猫执行映射操作实例DataFrame.mapInPandas ()以变换的迭代器pandas.DataFrame到另一个迭代器的pandas.DataFrame代表当前PySpark DataFrame并返回结果作为PySpark DataFrame。

的底层函数和输出迭代器pandas.DataFrame。它可以返回输出的任意长度与熊猫一些udf系列等系列。

下面的例子展示了如何使用mapInPandas ():

df=火花createDataFrame(((1,21),(2,30.)),(“id”,“年龄”))deffilter_func(迭代器):pdf迭代器:收益率pdf(pdfid= =1]dfmapInPandas(filter_func,模式=df模式)显示()# + - - - + - - - +# | | | id年龄# + - - - + - - - +# | 1 | | 21日# + - - - + - - - +

详细的用法,看到pyspark.sql.DataFrame.mapInPandas

Cogrouped地图

与熊猫cogrouped地图操作实例,使用.applyInPandas .cogroup DataFrame.groupby () () ()两个PySpark cogroupDataFrame年代,一个共同的关键,然后应用一个Python函数每个cogroup如图所示:

  • 洗牌,这样的数据组的每个DataFrame cogrouped在一起共享一个关键。

  • 一个函数应用于每个cogroup。函数的输入是2pandas.DataFrame(有一个可选的元组表示的关键)。的输出是一个函数pandas.DataFrame

  • 结合pandas.DataFrame从所有组织成一个新的PySpark年代DataFrame

使用.applyInPandas .cogroup groupBy () () (),您必须定义如下:

  • 一个Python函数,定义了每个cogroup计算。

  • 一个StructType对象或一个字符串,该字符串定义了模式的输出PySparkDataFrame

返回的列标签pandas.DataFrame必须匹配输出模式如果定义的字段名称指定为字符串,或匹配字段数据类型的位置如果不是字符串,例如,整数指数。看到pandas.DataFrame当构建一个如何标签列pandas.DataFrame

cogroup所有数据加载到内存中之前的函数。这可能导致内存不足异常,特别是尺寸倾向。的配置maxRecordsPerBatch不是应用,它是由你来确保cogrouped数据符合可用内存。

下面的例子展示了如何使用.applyInPandas .cogroup groupby () () ()执行一个asof加入两个数据集。

进口熊猫作为pddf1=火花createDataFrame(((20000101,1,1.0),(20000101,2,2.0),(20000102,1,3.0),(20000102,2,4.0)),(“时间”,“id”,“v1”))df2=火花createDataFrame(((20000101,1,“x”),(20000101,2,“y”)),(“时间”,“id”,“v2”))defasof_join(l,r):返回pdmerge_asof(l,r,=“时间”,通过=“id”)df1groupby(“id”)cogroup(df2groupby(“id”))applyInPandas(asof_join,模式=“时间int, int id, v1加倍,v2字符串“)显示()# + - - - - - - - - - - + - - - + - - - + - - - +# | | v1 v2 | | | id# + - - - - - - - - - - + - - - + - - - + - - - +# | 20000101 | 1 | 1.0 | | x# | 20000102 | 1 | 3.0 | | x# | 20000101 | 2 | 2.0 | |# | 20000102 | 2 | 4.0 | |# + - - - - - - - - - - + - - - + - - - + - - - +

详细的用法,看到pyspark.sql.PandasCogroupedOps.applyInPandas