用户定义的聚合函数- Scala
本文包含一个UDAF以及如何注册它使用Apache火花SQL。看到用户定义的聚合函数(UDAFs)为更多的细节。
实现一个UserDefinedAggregateFunction
进口org。apache。火花。sql。表达式。MutableAggregationBuffer进口org。apache。火花。sql。表达式。UserDefinedAggregateFunction进口org。apache。火花。sql。行进口org。apache。火花。sql。类型。_类GeometricMean扩展UserDefinedAggregateFunction{/ /这是聚合函数的输入字段。覆盖definputSchema:org。apache。火花。sql。类型。StructType=StructType(StructField(“价值”,倍增式)::零)/ /这是你保持的内部字段计算你的总。覆盖defbufferSchema:StructType=StructType(StructField(“数”,LongType)::StructField(“产品”,倍增式)::零)/ /这是你aggregatation函数的输出类型。覆盖def数据类型:数据类型=倍增式覆盖def确定的:布尔=真正的/ /这是缓冲模式的初始值。覆盖def初始化(缓冲:MutableAggregationBuffer):单位={缓冲(0)=0 l缓冲(1)=1.0}/ /这是如何更新给定一个输入缓冲模式。覆盖def更新(缓冲:MutableAggregationBuffer,输入:行):单位={缓冲(0)=缓冲。木屐(长)(0)+1缓冲(1)=缓冲。木屐(双)(1)*输入。木屐(双)(0)}/ /这是如何与bufferSchema合并两个对象类型。覆盖def合并(buffer1:MutableAggregationBuffer,buffer2:行):单位={buffer1(0)=buffer1。木屐(长)(0)+buffer2。木屐(长)(0)buffer1(1)=buffer1。木屐(双)(1)*buffer2。木屐(双)(1)}/ /这是你输出最终的价值,给你bufferSchema的最终值。覆盖def评估(缓冲:行):任何={数学。战俘(缓冲。用(1),1。toDouble/缓冲。getLong(0))}}
用你UDAF
/ /创建一个DataFrame并引发SQL表进口org。apache。火花。sql。功能。_瓦尔id=火花。范围(1,20.)id。createOrReplaceTempView(“id”)瓦尔df=火花。sql(“选择id, id % 3从ids group_id”)df。createOrReplaceTempView(“简单”)
——使用UDAF group_by声明和调用。选择group_id,通用汽车(id)从简单的集团通过group_id
/ /或者使用DataFrame语法调用聚合函数。/ /创建一个实例的UDAF GeometricMean。瓦尔通用汽车=新GeometricMean/ /显示的列值的几何平均数“id”。df。groupBy(“group_id”)。gg(通用汽车(上校(“id”))。作为(“GeometricMean”))。显示()/ /调用UDAF由其指定的名字。df。groupBy(“group_id”)。gg(expr(“通用汽车(id) GeometricMean”))。显示()