Spark SQL中的窗口函数介绍
在这篇博文中,我们将介绍新添加的窗口函数特性Apache火花.窗口函数允许Spark SQL的用户计算结果,例如给定行的排名或输入行的范围内的移动平均值。它们极大地提高了Spark的SQL和DataFrame api的表达能力。本博客将首先介绍窗口函数的概念,然后讨论如何在Spark SQL和Spark的DataFrame API中使用它们。
什么是窗口函数?
在1.4之前,Spark SQL支持两种函数,可用于计算单个返回值。内置函数或udf,例如字符串的子串
或轮
,从单行中获取值作为输入,并为每个输入行生成一个返回值。聚合函数,如总和
或马克斯
,对一组行进行操作,并为每个组计算一个返回值。
虽然这两种方法在实践中都非常有用,但仍有许多操作不能单独使用这些类型的函数来表示。具体来说,没有办法既对一组行进行操作,又为每个输入行返回一个值。这种限制使得难以执行各种数据处理任务,如计算移动平均值、计算累计和或访问出现在当前行之前的行值。幸运的是,对于Spark SQL的用户来说,窗口函数填补了这一空白。
窗口函数的核心是根据一组名为的行计算表中每个输入行的返回值框架.每个输入行都可以有一个与之关联的唯一帧。窗口函数的这一特性使其比其他函数更强大,并允许用户以简洁的方式表达没有窗口函数很难(如果不是不可能的话)表达的各种数据处理任务。现在,让我们来看两个例子。
假设我们有一个productRevenue表如下所示。
我们想回答两个问题:
- 每个类别中最畅销和第二畅销的产品是什么?
- 每种产品的收入和同类别中最畅销产品的收入有什么区别?
回答第一个问题每个类别中最畅销和第二畅销的产品是什么?,我们需要根据产品的收入对产品进行分类排名,并根据排名选出最畅销和第二畅销的产品。下面是使用窗口函数回答这个问题的SQL查询dense_rank
(我们将在下一节解释使用窗口函数的语法)。
选择产品,类别,收入从(选择产品,类别,收入,dense_rank()在(分区通过类别订单通过收入DESC)作为排名从productRevenue) tmp在哪里排名的结果的这个查询是所示。没有使用窗口函数,它是非常困难的来表达查询在SQL,而且即使SQL查询可以表达,它是硬为底层引擎来高效地计算查询。
SQL | DataFrame API | |
排序功能 | 排名 | 排名 |
dense_rank | denseRank | |
percent_rank | percentRank | |
ntile | ntile | |
row_number | rowNumber | |
分析功能 | cume_dist | cumeDist |
first_value | firstValue | |
last_value | lastValue | |
滞后 | 滞后 | |
引领 | 引领 |
要使用窗口函数,用户需要标记一个函数被任何一方用作窗口函数
- 添加一个在在SQL中受支持的函数后面的子句。
平均收入超过…
;或 - 调用在方法在DataFrame API中支持的函数,例如:
排名().over(…)
.
函数被标记为窗口函数之后,下一个关键步骤是定义窗口函数窗口规范与此函数关联。窗口规范定义了与给定输入行相关的框架中包含哪些行。窗口规范包括三个部分:
- 分区规范:控制哪些行将与给定行的位于同一分区。此外,用户可能希望确保在排序和计算框架之前,将类别列中具有相同值的所有行收集到同一台机器。如果没有给出分区规范,那么所有数据都必须收集到一台机器上。
- 排序规范:控制分区中的行排序方式,确定给定行在其分区中的位置。
- 帧规范:根据与当前行的相对位置,声明当前输入行的帧将包含哪些行。例如,“当前行到当前行的前三行”描述了一个帧,包括当前输入行和当前行之前出现的三行。
在SQL中,分区的
而且命令
关键字分别用于为分区规范指定分区表达式和为排序规范指定排序表达式。SQL语法如下所示。
Over (partition by…订购…)
在DataFrame API中,我们提供了实用函数来定义窗口规范。以Python为例,用户可以按照如下方式指定分区表达式和排序表达式。
从pyspark.sql.window进口窗口windowSpec=\窗口\.partitionBy \(…).orderBy(…)
除了排序和分区之外,用户还需要定义帧的开始边界、帧的结束边界和帧的类型,这是帧规范的三个组成部分。
边界有五种类型,分别是无限的前
,无界后
,当前行
,
,
.无限的前
而且无界后
分别表示分区的第一行和最后一行。对于其他三种类型的边界,它们指定了与当前输入行的位置的偏移量,并且它们的具体含义是基于帧的类型定义的。有两种框架,行框架和范围框架。
行帧
ROW帧基于与当前输入行的位置的物理偏移,这意味着当前行
,
,或
指定物理偏移量。如果当前行
用作边界时,它表示当前输入行。
而且
描述在当前输入行之前和之后分别出现的行数。下面的图说明了一个ROW帧1前
作为起始边界和1后
作为结束边界(前1行和后1行之间的行
SQL语法)。
框架范围
RANGE帧基于当前输入行位置的逻辑偏移,与row帧具有类似的语法。逻辑偏移是当前输入行的排序表达式的值与帧的边界行的相同表达式的值之间的差值。由于这个定义,当使用RANGE帧时,只允许一个排序表达式。同样,对于一个RANGE帧,只要考虑边界计算,所有与当前输入行的排序表达式值相同的行都被认为是同一行。
现在,让我们看一个例子。在本例中,排序表达式为收入
;起始边界为2000年前
;终点边界是1000年
(这个框架被定义为范围在2000之前和1000之后
SQL语法)。下面的五幅图说明了如何随着当前输入行的更新而更新帧。基本上,对于每个当前的输入行,基于收入的值,我们计算收入范围[当期收入值- 2000,当期收入值+ 1000]
.收入值落在这个范围内的所有行都在当前输入行的框架中。
总之,要定义窗口规范,用户可以在SQL中使用以下语法。
Over (partition by…订购…frame_type BETWEEN start AND end)
在这里,frame_type
可以是ROWS(用于ROW帧)或RANGE(用于RANGE帧);开始
可以是任何一个无限的前
,当前行
,
,
;而且结束
可以是任何一个无界后
,当前行
,
,
在Python DataFrame API中,用户可以按如下方式定义窗口规范。
从pyspark.sql.window进口窗口#定义分区规范而且要求规范。windowSpec=\窗口\.partitionBy \(…).orderBy(…)#定义一个窗口规范与一个行框架。windowSpec.rowsBetween (开始,结束)#定义一个窗口规范与一个范围框架。windowSpec.rangeBetween (开始,结束)
接下来是什么?
自从火花1.4,我们一直在积极地与社区成员一起进行优化,以提高性能并减少操作符求值窗口函数的内存消耗。其中一些将在Spark 1.5中添加,另一些将在我们未来的版本中添加。除了性能改进之外,我们还将在不久的将来添加两个特性,使Spark SQL中的窗口函数支持更加强大。首先,我们一直致力于为日期和时间戳数据类型(火星- 8943).使用Interval数据类型,用户可以使用间隔作为中指定的值
而且
这使得用窗口函数进行各种时间序列分析变得更加容易。其次,我们一直致力于在Spark SQL中添加对用户定义的聚合函数的支持(火星- 3947).有了我们的窗口函数支持,用户可以立即使用用户自定义的聚合函数作为窗口函数来执行各种高级数据分析任务。
为了尝试Spark的这些功能,免费试用Databricks或使用社区版.
确认
Spark 1.4中支持窗口函数的开发是由Spark社区的许多成员共同完成的。我们要特别感谢魏国贡献了最初的补丁。