用户定义的聚合函数(UDAFs)<一个类="headerlink" href="//www.neidfyre.com/docs/spark/latest/spark-sql/language-manual/#user-defined-aggregate-functions-udafs" title="">
适用于:砖运行时
用户定义的聚合函数(UDAFs)可一次例程,作用于多个行,并返回一个聚合值。这个文档列出了所需的类创建和注册UDAFs。它还包含示例,演示如何定义和寄存器UDAFs在Scala中,在火花SQL调用它们。
聚合器<一个类="headerlink" href="//www.neidfyre.com/docs/spark/latest/spark-sql/language-manual/#aggregator" title="">
语法(——聚合器,缓冲区,出)
基类定义的聚合,可用于数据集操作所有的元素的一组,并减少一个值。
在:输入类型的聚合。
缓冲区:减少的中间值的类型。
出:最终的输出结果的类型。
bufferEncoder:编码器(BUF)
中间值类型的编码器。
完成(减少:BUF):
转换的输出。
合并(b1:缓冲区,b2: BUF):缓冲区
合并两个中间值。
outputEncoder:编码器(出)
最终的输出值类型的编码器。
减少(BUF,答:):缓冲区
总输入值
一个
到目前的中间值。性能、功能修改b
并返回它而不是构建新的对象b
。零:缓冲区
的初始值的中间结果聚合。
例子<一个类="headerlink" href="//www.neidfyre.com/docs/spark/latest/spark-sql/language-manual/#examples" title="">
类型安全的用户定义的聚合函数<一个类="headerlink" href="//www.neidfyre.com/docs/spark/latest/spark-sql/language-manual/#type-safe-user-defined-aggregate-functions" title="">
强类型数据集围绕着用户定义的聚合聚合器
抽象类。例如,一个用户定义类型安全的平均可以看起来像:
无类型定义的聚合函数<一个类="headerlink" href="//www.neidfyre.com/docs/spark/latest/spark-sql/language-manual/#untyped-user-defined-aggregate-functions" title="">
类型的聚合,如上所述,也可以注册为无类型的聚合与DataFrames udf使用。例如,一个用户定义的平均无类型DataFrames可以看起来像:
进口org。apache。火花。sql{。编码器,编码器,SparkSession}进口org。apache。火花。sql。表达式。聚合器进口org。apache。火花。sql。功能情况下类平均(var总和:长,var数:长)对象MyAverage扩展聚合器(长,平均,双]{/ /一个零值聚合。应该满足属性+ 0 = b吗def零:平均=平均(0 l,0 l)/ /合并两个值来产生一个新值。针对性能、功能修改“缓冲”/ /并返回它,而不是构造一个新对象def减少(缓冲:平均,数据:长):平均={缓冲。总和+ =数据缓冲。数+ =1缓冲}/ /合并两个中间值def合并(b1:平均,b2:平均):平均={b1。总和+ =b2。总和b1。数+ =b2。数b1}/ /转换的输出def完成(减少:平均):双=减少。总和。toDouble/减少。数/ /中间值类型的编码器defbufferEncoder:编码器(平均]=编码器。产品/ /最终的输出值类型的编码器defoutputEncoder:编码器(双]=编码器。scalaDouble}/ /注册函数来访问它火花。udf。注册(“myAverage”,功能。udaf(MyAverage))瓦尔df=火花。读。格式(“json”)。负载(“例子/ src / main /资源/ employees.json”)df。createOrReplaceTempView(“员工”)df。显示()/ / + - - - - - - - - - - - - - +/ /工资| | |名称/ / + - - - - - - - - - - - - - +/ / |迈克尔| 3000 |/ / |安迪| 4500 |/ / |贾斯汀| 3500 |/ / | Berta | 4000 |/ / + - - - - - - - - - - - - - +瓦尔结果=火花。sql(“选择myAverage(工资)作为average_salary从员工”)结果。显示()/ / + - - - - - - - - - - - - - - - - +/ / | average_salary |/ / + - - - - - - - - - - - - - - - - +/ / | 3750.0 |/ / + - - - - - - - - - - - - - - - - +
进口java.io.Serializable;进口org.apache.spark.sql.Dataset;进口org.apache.spark.sql.Encoder;进口org.apache.spark.sql.Encoders;进口org.apache.spark.sql.Row;进口org.apache.spark.sql.SparkSession;进口org.apache.spark.sql.expressions.Aggregator;进口org.apache.spark.sql.functions;公共静态类平均实现了可序列化的{私人长总和;私人长数;/ /构造函数,getter、setter……}公共静态类MyAverage扩展聚合器<长,平均,双>{/ /一个零值聚合。应该满足属性+ 0 = b吗公共平均零(){返回新平均(0l,0l);}/ /合并两个值来产生一个新值。针对性能、功能修改“缓冲”/ /并返回它,而不是构造一个新对象公共平均减少(平均缓冲,长数据){长newSum=缓冲。getSum()+数据;长newCount=缓冲。getCount()+1;缓冲。setSum(newSum);缓冲。setCount(newCount);返回缓冲;}/ /合并两个中间值公共平均合并(平均b1,平均b2){长mergedSum=b1。getSum()+b2。getSum();长mergedCount=b1。getCount()+b2。getCount();b1。setSum(mergedSum);b1。setCount(mergedCount);返回b1;}/ /转换的输出公共双完成(平均减少){返回((双)减少。getSum())/减少。getCount();}/ /中间值类型的编码器公共编码器<平均>bufferEncoder(){返回编码器。豆(平均。类);}/ /最终的输出值类型的编码器公共编码器<双>outputEncoder(){返回编码器。双();}}/ /注册函数来访问它火花。udf()。注册(“myAverage”,功能。udaf(新MyAverage(),编码器。长()));数据集<行>df=火花。读()。格式(“json”)。负载(“例子/ src / main /资源/ employees.json”);df。createOrReplaceTempView(“员工”);df。显示();/ / + - - - - - - - - - - - - - +/ /工资| | |名称/ / + - - - - - - - - - - - - - +/ / |迈克尔| 3000 |/ / |安迪| 4500 |/ / |贾斯汀| 3500 |/ / | Berta | 4000 |/ / + - - - - - - - - - - - - - +数据集<行>结果=火花。sql(“选择myAverage(工资)作为average_salary从员工”);结果。显示();/ / + - - - - - - - - - - - - - - - - +/ / | average_salary |/ / + - - - - - - - - - - - - - - - - +/ / | 3750.0 |/ / + - - - - - - - - - - - - - - - - +
——编译和地方UDAF MyAverage在一个名为“MyAverage的JAR文件。jar在/ tmp。创建函数myAverage作为“MyAverage”使用JAR“/ tmp / MyAverage.jar”;显示用户功能;+- - - - - - - - - - - - - - - - - - +|函数|+- - - - - - - - - - - - - - - - - - +|默认的。myAverage|+- - - - - - - - - - - - - - - - - - +创建临时视图员工使用org。apache。火花。sql。json选项(路径“例子/ src / main /资源/ employees.json”);选择*从员工;+- - - - - - - - - - - - - + +|的名字|工资|+- - - - - - - - - - - - - + +|迈克尔|3000年||安迪|4500年||贾斯汀|3500年||贝尔塔|4000年|+- - - - - - - - - - - - - + +选择myAverage(工资)作为average_salary从员工;+- - - - - - - - - - - - - - - - +|average_salary|+- - - - - - - - - - - - - - - - +|3750年。0|+- - - - - - - - - - - - - - - - +