熊猫api函数
熊猫函数api使您能够直接应用一个Python本机函数,熊猫实例并将其输出到一个PySpark DataFrame。类似于熊猫用户定义函数、功能的api也使用Apache箭头传输数据和熊猫的数据;然而,Python类型提示在熊猫函数api中是可选的。
有三种类型的熊猫api函数:
分组的地图
地图
Cogrouped地图
熊猫熊猫函数api利用相同的内部逻辑执行使用UDF。他们分享PyArrow等特点,支持SQL类型和配置。
有关更多信息,请参见博文新的熊猫udf和Python类型提示即将发布的Apache 3.0火花。
分组的地图
你改变你的分组数据使用.applyInPandas groupBy () ()
实现“split-apply-combine”模式。Split-apply-combine包括三个步骤:
把数据分成组使用
DataFrame.groupBy
。应用一个函数在每个组。函数的输入和输出都是
pandas.DataFrame
。输入数据包含每组的所有行和列。结合成一个新的结果
DataFrame
。
使用.applyInPandas groupBy () ()
,您必须定义如下:
一个Python函数,定义了每一组计算
一个
StructType
对象或一个字符串,该字符串定义了输出的模式DataFrame
返回的列标签pandas.DataFrame
必须匹配输出模式如果定义的字段名称指定为字符串,或匹配字段数据类型的位置如果不是字符串,例如,整数指数。看到pandas.DataFrame当构建一个如何标签列pandas.DataFrame
。
所有数据一组应用函数之前被加载到内存中。这可能导致内存不足异常,特别是尺寸倾向。的配置maxRecordsPerBatch不应用于组织和它是由你来确保分组数据符合可用的内存。
下面的例子展示了如何使用苹果groupby () ()
从每个值减去均值。
df=火花。createDataFrame(((1,1.0),(1,2.0),(2,3.0),(2,5.0),(2,10.0)),(“id”,“v”))defsubtract_mean(pdf):# pdf pandas.DataFramev=pdf。v返回pdf。分配(v=v- - - - - -v。的意思是())df。groupby(“id”)。applyInPandas(subtract_mean,模式=“id, v双”)。显示()# + - - - + - - - +# | v | | id# + - - - + - - - +# | 1 | -0.5 |# | 1 | 0.5 |# | 2 | -3.0 |# | 2 | -1.0 |# | 2 | 4.0 |# + - - - + - - - +
地图
你和熊猫执行映射操作实例DataFrame.mapInPandas ()
以变换的迭代器pandas.DataFrame
到另一个迭代器的pandas.DataFrame
代表当前PySpark DataFrame并返回结果作为PySpark DataFrame。
的底层函数和输出迭代器pandas.DataFrame
。它可以返回输出的任意长度与熊猫一些udf系列等系列。
下面的例子展示了如何使用mapInPandas ()
:
df=火花。createDataFrame(((1,21),(2,30.)),(“id”,“年龄”))deffilter_func(迭代器):为pdf在迭代器:收益率pdf(pdf。id= =1]df。mapInPandas(filter_func,模式=df。模式)。显示()# + - - - + - - - +# | | | id年龄# + - - - + - - - +# | 1 | | 21日# + - - - + - - - +
详细的用法,看到pyspark.sql.DataFrame.mapInPandas。
Cogrouped地图
与熊猫cogrouped地图操作实例,使用.applyInPandas .cogroup DataFrame.groupby () () ()
两个PySpark cogroupDataFrame
年代,一个共同的关键,然后应用一个Python函数每个cogroup如图所示:
洗牌,这样的数据组的每个DataFrame cogrouped在一起共享一个关键。
一个函数应用于每个cogroup。函数的输入是2
pandas.DataFrame
(有一个可选的元组表示的关键)。的输出是一个函数pandas.DataFrame
。结合
pandas.DataFrame
从所有组织成一个新的PySpark年代DataFrame
。
使用.applyInPandas .cogroup groupBy () () ()
,您必须定义如下:
一个Python函数,定义了每个cogroup计算。
一个
StructType
对象或一个字符串,该字符串定义了模式的输出PySparkDataFrame
。
返回的列标签pandas.DataFrame
必须匹配输出模式如果定义的字段名称指定为字符串,或匹配字段数据类型的位置如果不是字符串,例如,整数指数。看到pandas.DataFrame当构建一个如何标签列pandas.DataFrame
。
cogroup所有数据加载到内存中之前的函数。这可能导致内存不足异常,特别是尺寸倾向。的配置maxRecordsPerBatch不是应用,它是由你来确保cogrouped数据符合可用内存。
下面的例子展示了如何使用.applyInPandas .cogroup groupby () () ()
执行一个asof加入
两个数据集。
进口熊猫作为pddf1=火花。createDataFrame(((20000101,1,1.0),(20000101,2,2.0),(20000102,1,3.0),(20000102,2,4.0)),(“时间”,“id”,“v1”))df2=火花。createDataFrame(((20000101,1,“x”),(20000101,2,“y”)),(“时间”,“id”,“v2”))defasof_join(l,r):返回pd。merge_asof(l,r,在=“时间”,通过=“id”)df1。groupby(“id”)。cogroup(df2。groupby(“id”))。applyInPandas(asof_join,模式=“时间int, int id, v1加倍,v2字符串“)。显示()# + - - - - - - - - - - + - - - + - - - + - - - +# | | v1 v2 | | | id# + - - - - - - - - - - + - - - + - - - + - - - +# | 20000101 | 1 | 1.0 | | x# | 20000102 | 1 | 3.0 | | x# | 20000101 | 2 | 2.0 | |# | 20000102 | 2 | 4.0 | |# + - - - - - - - - - - + - - - + - - - + - - - +