熊猫用户定义函数
一个熊猫用户定义函数(UDF)也称为矢量化UDF是一个用户定义函数,使用Apache箭头传输数据和熊猫来处理数据。熊猫udf允许矢量化操作,可以提高性能比row-at-a-time 100 xPython udf。
博客文章的背景信息,请参阅新的熊猫udf和Python类型提示即将发布的Apache 3.0火花。
你定义一个熊猫UDF使用关键字pandas_udf
作为装饰和包装的功能Python类型提示。本文描述了不同类型的熊猫熊猫udf和展示了如何使用udf型提示。
系列,系列UDF
使用一系列系列熊猫UDF标量进行向量化操作。您可以使用api等选择
和withColumn
。
Python函数应该采取一个熊猫系列作为输入并返回一个熊猫系列相同的长度,和你应该在Python中指定这些类型提示。火花运行一个熊猫UDF列分割到批次,调用函数为每个批处理数据的一个子集,然后连接结果。
下面的例子显示了如何创建一个熊猫UDF,计算2列的产物。
进口熊猫作为pd从pyspark.sql.functions进口上校,pandas_udf从pyspark.sql.types进口LongType#声明函数和创建UDFdefmultiply_func(一个:pd。系列,b:pd。系列)- >pd。系列:返回一个*b乘=pandas_udf(multiply_func,returnType=LongType())# pandas_udf应该能够执行的功能与当地大熊猫数据x=pd。系列([1,2,3])打印(multiply_func(x,x))# 0 14 # 1# 2 9# dtype: int64#创建一个火花DataFrame,“火花”是现有SparkSessiondf=火花。createDataFrame(pd。DataFrame(x,列=(“x”)))#执行函数作为引发矢量化UDFdf。选择(乘(上校(“x”),上校(“x”)))。显示()# + - - - - - - - - - - - - - - - - - - - +# | multiply_func (x, x) |# + - - - - - - - - - - - - - - - - - - - +# | 1 |# | 4 |9 # | |# + - - - - - - - - - - - - - - - - - - - +
迭代器系列的迭代器系列的UDF
迭代器UDF是一样的一个标量熊猫UDF除外:
Python函数
需要一个迭代器的批次,而不是单个输入批处理作为输入。
返回一个迭代器的输出批次而不是单个批处理输出。
整个输出迭代器的长度应该与整个输入的长度相同。
包装的熊猫UDF星星之火列作为输入。
你应该指定Python类型提示迭代器(pandas.Series)
- >迭代器(pandas.Series)
。
这熊猫UDF UDF执行需要初始化时有用的一些国家,例如,加载机器学习模型推理应用于每个输入批处理文件。
下面的例子显示了如何创建一个熊猫UDF与迭代器支持。
进口熊猫作为pd从打字进口迭代器从pyspark.sql.functions进口上校,pandas_udf,结构体pdf=pd。DataFrame([1,2,3),列=(“x”])df=火花。createDataFrame(pdf)#当调用UDF的列,#输入pd.Series底层函数迭代器。@pandas_udf(“长”)defplus_one(batch_iter:迭代器(pd。系列])- >迭代器(pd。系列]:为x在batch_iter:收益率x+1df。选择(plus_one(上校(“x”)))。显示()# + - - - - - - - - - - - - +# | plus_one (x) |# + - - - - - - - - - - - - +# | 2 |# | 3 |# | 4 |# + - - - - - - - - - - - - +# UDF,您可以初始化一些国家在处理之前批次。#包装代码使用try / finally或使用上下文管理器来确保#最后释放资源。y_bc=火花。sparkContext。广播(1)@pandas_udf(“长”)defplus_y(batch_iter:迭代器(pd。系列])- >迭代器(pd。系列]:y=y_bc。价值#初始化状态试一试:为x在batch_iter:收益率x+y最后:通过#释放资源,如果任何df。选择(plus_y(上校(“x”)))。显示()# + - - - - - - - - - - - - +# | plus_y (x) |# + - - - - - - - - - - - - +# | 2 |# | 3 |# | 4 |# + - - - - - - - - - - - - +
多个系列的迭代器系列UDF的迭代器
迭代器的多个系列的迭代器系列UDF和限制也有类似的特征迭代器系列的迭代器系列的UDF。指定的函数迭代器批次的批次和输出迭代器。UDF执行需要初始化时也很有用的一些状态。
的差异是:
底层的Python函数的迭代器元组熊猫系列的。
包装的熊猫UDF多个火花列作为输入。
你指定类型的暗示迭代器(元组(pandas.Series,……]
- >迭代器(pandas.Series)
。
从打字进口迭代器,元组进口熊猫作为pd从pyspark.sql.functions进口上校,pandas_udf,结构体pdf=pd。DataFrame([1,2,3),列=(“x”])df=火花。createDataFrame(pdf)@pandas_udf(“长”)defmultiply_two_cols(迭代器:迭代器(元组(pd。系列,pd。系列]])- >迭代器(pd。系列]:为一个,b在迭代器:收益率一个*bdf。选择(multiply_two_cols(“x”,“x”))。显示()# + - - - - - - - - - - - - - - - - - - - - - - - - +# | multiply_two_cols (x, x) |# + - - - - - - - - - - - - - - - - - - - - - - - - +# | 1 |# | 4 |9 # | |# + - - - - - - - - - - - - - - - - - - - - - - - - +
系列标量UDF
系列标量熊猫udf类似于引发聚合函数。一系列标量熊猫UDF定义了一个聚合从一个或多个熊猫系列一个标量值,其中每个熊猫系列代表了一个火花列。你用一系列与api等标量熊猫UDF选择
,withColumn
,groupBy.agg
,pyspark.sql.Window。
你表达类型提示pandas.Series,…
- >任何
。返回类型应该是一个原始的数据类型,并返回标量可以是一个Python原始类型,例如,int
或浮动
或NumPy数据类型等numpy.int64
或numpy.float64
。任何
最好是一个特定的标量类型。
这种类型的UDF不每组支持部分聚合和所有数据加载到内存中。
下面的例子展示了如何使用这种类型的UDF来计算的意思选择
,groupBy
,窗口
操作:
进口熊猫作为pd从pyspark.sql.functions进口pandas_udf从pyspark.sql进口窗口df=火花。createDataFrame(((1,1.0),(1,2.0),(2,3.0),(2,5.0),(2,10.0)),(“id”,“v”))#声明函数和创建UDF@pandas_udf(“替身”)defmean_udf(v:pd。系列)- >浮动:返回v。的意思是()df。选择(mean_udf(df(“v”)))。显示()# + - - - - - - - - - - - - +# | mean_udf (v) |# + - - - - - - - - - - - - +# | 4.2 |# + - - - - - - - - - - - - +df。groupby(“id”)。gg(mean_udf(df(“v”)))。显示()# + - - - + - - - - - - - - - - - - +# | | id mean_udf (v) |# + - - - + - - - - - - - - - - - - +# | 1 | 1.5 |# | 2 | 6.0 |# + - - - + - - - - - - - - - - - - +w=窗口\。partitionBy(“id”)\。rowsBetween(窗口。unboundedPreceding,窗口。unboundedFollowing)df。withColumn(“mean_v”,mean_udf(df(“v”])。在(w))。显示()# + - - - + - - - + - - - +# | | id v | mean_v |# + - - - + - - - + - - - +# | 1 | 1.0 | 1.5 |# | 1 | 2.0 | 1.5 |# | 2 | 3.0 | 6.0 |# | 2 | 5.0 | 6.0 |# | 2 | 10.0 | 6.0 |# + - - - + - - - + - - - +
详细的用法,看到pyspark.sql.functions.pandas_udf。
使用
设置箭头批大小
引发的数据分区转换成箭头批次记录,可以暂时导致高JVM的内存使用。为了避免可能的内存不足异常,您可以调整大小的箭头记录批次通过设置spark.sql.execution.arrow.maxRecordsPerBatch
配置一个整数决定为每个批处理的最大行数。默认值是10000每批记录。如果列数很大,应相应调整。使用这个极限,每个数据分区分为1或多个记录批次进行处理。
时间戳与时区语义
引发内部商店UTC时间戳的值,没有指定的时区和时间戳数据转换为本地时间与微秒UTC决议。
时间戳数据导出或显示在火花时,会话时区是用来定位的时间戳值。会话时区设置的spark.sql.session.timeZone
配置和JVM系统默认为本地时区。大熊猫使用datetime64
纳秒精度的类型,datetime64 (ns)
在每列的基础上,可选的时区。
当时间戳数据从火花转移到熊猫它转换为纳秒,每列转换为引发会话时区的时区然后本地化,消除了时区和显示当地时间值。这发生在调用toPandas ()
或pandas_udf
时间戳列。
当时间戳数据从熊猫转移到火花,它转化为UTC微秒。这发生在调用createDataFrame
熊猫DataFrame或当熊猫UDF返回一个时间戳。这些转换是自动完成,以确保火花在预期的数据格式,所以没有必要做这些转换自己。纳秒值截断。
标准的UDF加载时间戳数据作为Python datetime对象,这是不同于一个熊猫时间戳。为了获得最佳的性能,我们建议您使用熊猫时间序列功能在处理时间戳在熊猫UDF。有关详细信息,请参见时间序列/日期功能。