用户定义的聚合函数(UDAFs)<一个类="headerlink" href="//www.neidfyre.com/docs.gcp/spark/latest/spark-sql/language-manual/#user-defined-aggregate-functions-udafs" title="">

适用于:检查标记是的砖运行时

用户定义的聚合函数(UDAFs)可一次例程,作用于多个行,并返回一个聚合值。这个文档列出了所需的类创建和注册UDAFs。它还包含示例,演示如何定义和寄存器UDAFs在Scala中,在火花SQL调用它们。

聚合器<一个类="headerlink" href="//www.neidfyre.com/docs.gcp/spark/latest/spark-sql/language-manual/#aggregator" title="">

语法(——聚合器,缓冲区,出)

基类定义的聚合,可用于数据集操作所有的元素的一组,并减少一个值。

  • :输入类型的聚合。

  • 缓冲区:减少的中间值的类型。

  • :最终的输出结果的类型。

  • bufferEncoder:编码器(BUF)

    中间值类型的编码器。

  • 完成(减少:BUF):

    转换的输出。

  • 合并(b1:缓冲区,b2: BUF):缓冲区

    合并两个中间值。

  • outputEncoder:编码器(出)

    最终的输出值类型的编码器。

  • 减少(BUF,答:):缓冲区

    总输入值一个到目前的中间值。性能、功能修改b并返回它而不是构建新的对象b

  • 零:缓冲区

    的初始值的中间结果聚合。

例子<一个类="headerlink" href="//www.neidfyre.com/docs.gcp/spark/latest/spark-sql/language-manual/#examples" title="">

类型安全的用户定义的聚合函数<一个类="headerlink" href="//www.neidfyre.com/docs.gcp/spark/latest/spark-sql/language-manual/#type-safe-user-defined-aggregate-functions" title="">

强类型数据集围绕着用户定义的聚合聚合器抽象类。例如,一个用户定义类型安全的平均可以看起来像:

无类型定义的聚合函数<一个类="headerlink" href="//www.neidfyre.com/docs.gcp/spark/latest/spark-sql/language-manual/#untyped-user-defined-aggregate-functions" title="">

类型的聚合,如上所述,也可以注册为无类型的聚合与DataFrames udf使用。例如,一个用户定义的平均无类型DataFrames可以看起来像:

进口orgapache火花sql{。编码器,编码器,SparkSession}进口orgapache火花sql表达式聚合器进口orgapache火花sql功能情况下平均(var总和:,var:)对象MyAverage扩展聚合器(,平均,]{/ /一个零值聚合。应该满足属性+ 0 = b吗def:平均=平均(0 l,0 l)/ /合并两个值来产生一个新值。针对性能、功能修改“缓冲”/ /并返回它,而不是构造一个新对象def减少(缓冲:平均,数据:):平均={缓冲总和+ =数据缓冲+ =1缓冲}/ /合并两个中间值def合并(b1:平均,b2:平均):平均={b1总和+ =b2总和b1+ =b2b1}/ /转换的输出def完成(减少:平均):=减少总和toDouble/减少/ /中间值类型的编码器defbufferEncoder:编码器(平均]=编码器产品/ /最终的输出值类型的编码器defoutputEncoder:编码器(]=编码器scalaDouble}/ /注册函数来访问它火花udf注册(“myAverage”,功能udaf(MyAverage))瓦尔df=火花格式(“json”)。负载(“例子/ src / main /资源/ employees.json”)dfcreateOrReplaceTempView(“员工”)df显示()/ / + - - - - - - - - - - - - - +/ /工资| | |名称/ / + - - - - - - - - - - - - - +/ / |迈克尔| 3000 |/ / |安迪| 4500 |/ / |贾斯汀| 3500 |/ / | Berta | 4000 |/ / + - - - - - - - - - - - - - +瓦尔结果=火花sql(“选择myAverage(工资)作为average_salary从员工”)结果显示()/ / + - - - - - - - - - - - - - - - - +/ / | average_salary |/ / + - - - - - - - - - - - - - - - - +/ / | 3750.0 |/ / + - - - - - - - - - - - - - - - - +
进口java.io.Serializable;进口org.apache.spark.sql.Dataset;进口org.apache.spark.sql.Encoder;进口org.apache.spark.sql.Encoders;进口org.apache.spark.sql.Row;进口org.apache.spark.sql.SparkSession;进口org.apache.spark.sql.expressions.Aggregator;进口org.apache.spark.sql.functions;公共静态平均实现了可序列化的{私人总和;私人;/ /构造函数,getter、setter……}公共静态MyAverage扩展聚合器<,平均,>{/ /一个零值聚合。应该满足属性+ 0 = b吗公共平均(){返回平均(0l,0l);}/ /合并两个值来产生一个新值。针对性能、功能修改“缓冲”/ /并返回它,而不是构造一个新对象公共平均减少(平均缓冲,数据){newSum=缓冲getSum()+数据;newCount=缓冲getCount()+1;缓冲setSum(newSum);缓冲setCount(newCount);返回缓冲;}/ /合并两个中间值公共平均合并(平均b1,平均b2){mergedSum=b1getSum()+b2getSum();mergedCount=b1getCount()+b2getCount();b1setSum(mergedSum);b1setCount(mergedCount);返回b1;}/ /转换的输出公共完成(平均减少){返回(()减少getSum())/减少getCount();}/ /中间值类型的编码器公共编码器<平均>bufferEncoder(){返回编码器(平均);}/ /最终的输出值类型的编码器公共编码器<>outputEncoder(){返回编码器();}}/ /注册函数来访问它火花udf()。注册(“myAverage”,功能udaf(MyAverage(),编码器()));数据集<>df=火花()。格式(“json”)。负载(“例子/ src / main /资源/ employees.json”);dfcreateOrReplaceTempView(“员工”);df显示();/ / + - - - - - - - - - - - - - +/ /工资| | |名称/ / + - - - - - - - - - - - - - +/ / |迈克尔| 3000 |/ / |安迪| 4500 |/ / |贾斯汀| 3500 |/ / | Berta | 4000 |/ / + - - - - - - - - - - - - - +数据集<>结果=火花sql(“选择myAverage(工资)作为average_salary从员工”);结果显示();/ / + - - - - - - - - - - - - - - - - +/ / | average_salary |/ / + - - - - - - - - - - - - - - - - +/ / | 3750.0 |/ / + - - - - - - - - - - - - - - - - +
——编译和地方UDAF MyAverage在一个名为“MyAverage的JAR文件。jar在/ tmp。创建函数myAverage作为“MyAverage”使用JAR“/ tmp / MyAverage.jar”;显示用户功能;+- - - - - - - - - - - - - - - - - - +|函数|+- - - - - - - - - - - - - - - - - - +|默认的myAverage|+- - - - - - - - - - - - - - - - - - +创建临时视图员工使用orgapache火花sqljson选项(路径“例子/ src / main /资源/ employees.json”);选择*员工;+- - - - - - - - - - - - - + +|的名字|工资|+- - - - - - - - - - - - - + +|迈克尔|3000年||安迪|4500年||贾斯汀|3500年||贝尔塔|4000年|+- - - - - - - - - - - - - + +选择myAverage(工资)作为average_salary员工;+- - - - - - - - - - - - - - - - +|average_salary|+- - - - - - - - - - - - - - - - +|3750年0|+- - - - - - - - - - - - - - - - +