Apache Spark中的用户定义聚合:一个爱情故事

下载幻灯片

定义定制的可伸缩聚合逻辑是Apache Spark最强大的特性之一。用户定义聚合函数(UDAF)是一种灵活的机制,用于扩展Spark数据帧和结构化流,具有从专门的摘要技术到用于探索性数据分析的构建块等新功能。尽管udaf功能强大,但Spark 3.0之前的udaf也存在一些微妙的缺陷,会损害性能和可用性。

在这次演讲中,Erik将讲述他如何遇到UDAFs并爱上它们强大的功能的故事。他将描述他如何面对UDAF设计及其性能属性的挑战,以及如何在Apache Spark社区的帮助下,最终在Spark 3.0中修复了UDAF设计,并再次爱上了UDAF。在此过程中,您将了解用户定义聚合如何在Spark中工作,如何编写自己的UDAF库,以及Spark最新的UDAF特性如何提高可用性和性能。您还将听到Spark的代码评审过程如何使这些新特性变得更好,并学习如何将大型特性成功引入Apache Spark上游社区的技巧。

点击这里观看更多Spark + AI课程

免费试用Databricks

视频记录

-好了,大家好,谢谢你们来听我的演讲。今天,我要给大家讲一个关于Spark中用户定义聚合的故事。它是如何工作的,如何使用它,以及它在Spark 3.0中的变化。

自定义聚合

我讲的不仅仅是一个故事,还是一个爱情故事。所有的爱情故事都是人类最古老的故事之一,它们基本上都是一样的。英雄满足聚合API,英雄为性能错误提交Spark JIRA,最后,英雄合并一个Spark 3.0 pull请求。

所有的好故事都应该从建立情节开始。为了确保我们都在同一页上,我将总结Spark的数据和计算模型。

Spark的数据模型是向用户展示其数据的逻辑视图,即简单的数据元素序列。

但实际上,它的物理模型是将数据存储在可能的多个物理分区中。在这里,您可以看到我们的数据在示例中被分割为三个物理分区。

用这种数据进行计算是什么样子的?作为一个例子,让我们来看看一些数字的总和。首先,我们将某个累加器设为0,然后用我们在分区中看到的值更新累加器,然后循环遍历整个分区,直到更新,在这种情况下,对分区中的所有数据求和。

当我们完成后,我们得到了部分结果,在这种情况下,这是一个物理分区中数据的部分和。

同时,Spark对数据集中的每个分区执行相同的过程,直到它处理完数据的所有分区。一旦它得到这些部分结果,它就开始合并它们,直到得到最终答案,这是我们所有逻辑数据的和。

引发聚合器

sum是Spark运算符的一个例子。

不好意思,是Spark聚合器。聚合器都具有相同类型的属性。例如,sum操作类型为number的输入数据。它还有一个累加器,或聚合器,也是数值的。它定义了初始值为0,还定义了如何更新累加器或聚合器,以及在计算部分结果时如何合并它。现在,Spark附带了一整套预定义的聚合器,sum只是一个。另一个是取最大值。你可以在这里看到,maximum和sum填充了聚合器的所有相同属性,但它的方式不同。它的初值概念是负无穷。

它在更新和合并时使用最大值而不是加法。

另一种常见的聚合器是average,它也对数值数据进行操作,但它的累加器或聚合器是一对数字、一个和和和元素的计数。您可以看到更新和合并操作的是这个数字对,而不仅仅是单个数字。他们还定义了一个最终的表示函数来向用户返回一个值。在这种情况下,就是对计数的和。

现在我们已经建立了情节,是时候把男主角介绍给他们喜欢的对象了。

数据草图:T-Digest

作为一名数据科学家,我的专业兴趣之一是高效的数据草图。我从一个特定的数据草图中得到了很多好处,它被称为T-Digest。t -摘要是对数据分布的一个非常小且非常快的近似。

使用T-Digest可以做的事情的一个例子是非常容易地从数据中计算分位数值,而实际上不必存储所有数据。在这个示例中,您可以看到我们如何使用查找分布的紧凑草图来计算数据的第90百分位数。

今天我不会详细讨论T-Digest是如何工作的,但是如果您有兴趣了解更多关于它们是如何运作的以及可以用它们做什么,左边的这个链接将带您到我之前在Spark峰会上关于实现和使用T-Digest的演讲。

T-Digest是一个聚合器吗?

所以我们可以问一个问题,T-Digest的行为像一个聚合器吗?在不深入所有细节的情况下,我们可以为T-Digest定义一个累加器类型,可以定义一个充当零的空状态,还可以定义使用单个数据元素更新其中一个对象或合并其中两个对象的方法,如果愿意,还可以定义一个最终表示函数。例如,从结构中提取单个分位数。

所以答案是肯定的,T-Digest绝对可以像一个聚合器一样工作。

Romantid化学

这为什么令人兴奋呢?Spark不仅为我们提供了预定义的聚合器,它还允许我们定义自己的自定义聚合数据结构。在这个例子中,我为我的T-Digest创建了一个用户定义的聚合器函数,然后我还声明了两个普通的用户定义函数,以从该结构中提取中位数和第90百分位数。

我能用它做什么?如果您定义了其中一个用户定义的聚合函数,您将被允许在Spark结构化流中使用它,就像使用任何预定义的操作符一样。在这个例子中,我有一些流数值数据我使用了用户定义的聚合器,我在上一张幻灯片中声明过,直接在结构化流表达式中获得流中值和流数据的90百分位并通过窗口聚合器进行分组。这是一个非常强大的概念。

浪漫的蒙太奇

我发现数据草图、用户定义聚合器和结构化流的内部部分是如此强大,以至于我最终在Spark峰会上做了几次演讲,讨论了这些主题的各个交叉点。这张幻灯片,每一张都有我演讲的链接。

我们都知道,爱情永远不会一帆风顺。

UDAF解剖学

为了解释这一切是如何开始出错的,我必须回顾用户定义的聚合函数的实际剖析。任何实现的核心都是初始化方法(初始化方法是空状态或零状态)和更新或合并方法的实现。

我想特别提醒你们注意更新方法。您可以从函数签名中看到,它将实际的聚合器结构存储在聚合缓冲区中,而这实际上只是我们都熟悉的标准数据帧行的另一种子类。

为了告诉Spark如何做到这一点,你必须为你的聚合器定义一个用户定义的类型,你可以在底部看到定义的bufferSchema方法。

用户定义类型解剖

让我们看看这个用户定义的类型。

用户定义类型的核心是一个模式,它向Spark描述了您的对象如何实际存储在聚合缓冲行中,然后是将结构打包到该行并随后解包的方法,这些方法是序列化和反序列化。

从这里得到的关键信息是,打包和解包这些结构通常是一项昂贵的操作。特别是对于T-Digest结构,它是相当昂贵的。T-Digest是一种相当结构化的数据类型,从聚合缓冲区打包和解包它需要真正的计算。

会出什么问题?

有一天,像传统一样,我在调试这些序列化器,并放入一些print语句,看看我对数据结构实际做了什么。

让我暂停一下,解释一下我对这个实验的期待。

理论上,惟一需要序列化其中一个聚合器的时间是在从单个分区完成更新之后,这样您就可以通过网络将它们发送到正在进行最后合并操作的任何节点上收集它们。你可以在右边看到这些序列化。在这个示例图中,我希望打印语句运行三次。

现在,当我在Spark中运行这个测试数据帧时,有1000行数据,实际发生的是,我看到我的打印语句执行了1000次。所以它们对我的每一行数据执行一次,而不是每个分区执行一次。

这让我非常困惑,所以我开始深入Spark代码,我发现这个更新方法和用户定义的聚合器有一些特殊之处。为了解释发生了什么,让我们把这个定义放在最上面,重写它来揭示发生了什么。首先要做的是,它必须从聚合缓冲区反序列化我的对象。

然后它进行实际的数据更新,这是这个操作中我们真正关心的唯一部分,而且通常非常快。

最后,它必须将聚合器存储回聚合缓冲区,因此它必须重新序列化结构。

现在,这正在发生,正如你所看到的,每一行的数据你还可以看到做所有序列化的必要性已经被烤到结构的定义中。它不能简单地通过调整一些内部代码来修复。这是机制本身设计的基础。

我提交了一份Spark JIRA来描述我所看到的问题,然后我们都在Spark开发邮件列表上讨论了它,正如我们在前面的幻灯片中看到的,这个问题不容易解决。如果这个问题是可以解决的,那就必须通过重新设计整个机制来解决。

我们永远不想放弃我们爱的人,所以去年夏天我为Spark提交了一个pull请求,在那里我提出了一种新的机制来进行用户定义的聚合,而不需要序列化。

聚合器解剖

现在我将解释它是如何工作的。为了做到这一点,我首先应该解释Spark实际上提供了两种完全不同的机制来实现用户定义的聚合。第一个是我们已经见过的,用户定义的聚合器。第二个就是你们在幻灯片上看到的,它被简单地称为聚合器。这张幻灯片在这个新的聚合器接口中实现了t - digest。我要指出的第一件事是,你们可以看到这个例子中的实现完全符合幻灯片,没有我需要遗漏的代码。因此,聚合器界面是一个更简单、更清晰的界面。

我想指出的第二点是,如果你看reduce方法,这就像我们之前幻灯片上的update方法,它不使用聚合缓冲区,它直接操作我的T-Digest数据类型。所以不需要序列化或反序列化。

第三,您可以在底部看到,它使用编码器接口来描述序列化。这是一个更好、功能更强的界面,用途更广。

最后,主要的一点是,在这个新机制中,这个接口,聚合器接口,是唯一向前发展的接口,因此将不再有两个单独的接口,这一直是设计混乱的根源。

正如我所提到的,这个接口中的新更新方法不需要对每个数据元素进行序列化,因此我们现在回到了我们想要的直观的序列化行为,也就是说,它只序列化每个数据分区一次,正如我们所期望的那样。

Spark 3.0中的自定义聚合

在代码中使用它是什么样子的?下面是一个简短的示例,其中我声明了一个T-Digest聚合器,然后如果我想在动态类型的数据帧中使用它,我调用一个新的SQL函数,即UDAF,您可以在第二行中看到,它将这个聚合器注册为专门的用户定义函数。当我有了这个,我可以在标准数据帧或结构化流接口中使用这个函数。

这个新界面在性能方面给我们带来了什么?这是Spark shell的一个简短示例,在顶部,我使用旧的用户定义聚合函数样式进行聚合,然后使用新样式进行完全相同的聚合。在粗体部分,您可以看到结果计时。

如果我们计算一下这些数字,就会发现新接口比旧接口快70倍。我想强调的是,这是对完全相同的数据进行完全相同的聚合。唯一不同的是,新接口允许我们只在分区边界处进行聚合序列化,这就是70倍的速度提升的原因。

这是一个很好的地方来说明这是如此大的差异的原因之一

与T-Digest的不同之处在于,正如我前面提到的,T-Digest是一种执行序列化的成本较高的数据结构。

如果您正在使用您编写的具有较低串行化的聚合器,您可能会看到不到70倍的加速,可能只有10倍或5倍。我用非常非常简单的聚合器做过实验,差异几乎为零。

然而,一般来说,Spark已经预定义了所有简单的聚合器。如果您正在使用自定义聚合器,那么您的数据结构很可能不是微不足道的东西,因此这应该对您有很大帮助。

这就是主要故事的结尾。这不仅仅是一个关于聚合和Spark的爱情故事,它也是一个开源的爱情故事。bob下载地址我想分享一些我在开源上游工作中学到的东西,特别是非常复杂的拉请求,就像这个。bob下载地址

第一是永不放弃。

当我提交这个pull请求时,我最终重写了三次解决方案,直到我们得到一个我们都满意的解决方案并准备合并。

当你不得不像那样回到画板上时,很容易气馁,但最终,如果你不放弃,你会得到很好的结果,你总是会从这个过程中学到一些东西。

第二,要有耐心。编写这些实现并正确地测试它们需要花费大量时间,而且审查人员也需要花费很长时间来正确地审查它们。

从我提交最初的拉请求到我们最终合并它,基本上花了6个月。

为了正确地完成工作,为了得到社区的支持,得到你想要合并到代码中的结果,等待是值得的。

最后,非常重要的是,我们总是互相尊重。我收到了大量关于这个拉请求的非常好的反馈,我们最终合并的结果比我去年夏天最初提出的解决方案更清晰,更容易使用。

尊重评审人员社区,尊重所有对您的pull请求做出评论的人,对于成功地提交软件改进是至关重要的。

这就是我的故事的结尾。

点击这里观看更多Spark + AI课程

免费试用Databricks
«回来
关于Erik Erlandson

红色的帽子

Erik Erlandson是Red Hat的软件工程师,在那里他研究了Apache Spark在云中的分析用例和可扩展部署。他还为内部数据科学和分析项目提供咨询。Erik是Apache Spark和Spark生态系统中其他开源项目的贡献者,包括Spark obob下载地址n Kubernetes社区项目、Algebird和Scala。