关于改进Apache Spark SQL中的广播连接

下载幻灯片

广播连接是Spark SQL执行引擎的重要组成部分。当使用时,它通过将较小的关系广播给所有Spark执行程序来执行两个关系的连接,然后用每个执行程序的另一个关系的分区计算连接标准。当广播关系足够小时,广播连接速度很快,因为它们需要最少的数据变换。然而,超过一定的阈值,由于网络和内存使用的瓶颈,广播连接的可靠性或性能往往不如基于shuffle的连接算法。这次演讲分享了Workday为提高关系大小阈值所做的改进,在此阈值下,Spark中的广播连接是可行的。我们将讨论诸如执行程序端广播(数据直接在执行程序之间广播,而不是首先收集到驱动程序)等主题,以及我们在Spark的全阶段代码生成器中所做的更改,以适应广播的增加阈值。具体来说,我们强调了如何限制执行程序中广播连接的内存使用,这样我们就可以在不增加执行程序内存的情况下提高阈值。此外,我们还展示了在生产中运行这些改进的结果,这些改进是在从复杂的ETL管道编译的大规模作业上进行的。从这个环节,观众将能够窥探Spark的连接基础设施的内部,并利用我们的学习来更好地调整他们自己的工作负载。

点击这里观看更多Spark + AI课程

免费试用Databricks

视频记录

大家好,欢迎大家的到来,我叫Jianneng,是Workday的软件工程师。今天我将讨论Broadcast Joins。

所以,撇开法律人员不谈,今天的议程首先是关于Spark在我的组织中是如何工作的,然后是广播连接,我们是如何改进它的,最后是一个生产案例研究。

Workday Prism Analytics中的Spark

所以,Spark是如何与我们合作的。Spark是我们用作数据处理主干的引擎。

我所在的组织正在开发一款名为Prism Analytics的产品,它以多种方式为客户提供服务,但其核心是,客户使用我们的自助服务产品来构建数据转换管道,然后将其编译为dataframe,最后由Spark执行。我们有财务和人力资源用例。财务方面,比如,我们试图管理你的信贷和借贷,以及人力资源用例,我们试图从你的劳动力中收集见解。你可以想象,金融用例会有传统的大数据量,每天都有很多不同的交易,而人力资源用例可能没有那么多数据,但关于人力资源的有趣之处在于,计划要复杂得多,这次演讲将集中在人力资源用例上。所以如果你看到我使用的硬件我使用的数据并不大重点更多的是计划的复杂性。为了让你们了解事情有多复杂,在右边,我们有一个图表是我们的人力资源用例的物理计划。因此,如果您习惯了在Spark UI中显示的物理计划,那么图中左上方的apache Spark开源徽标就只有几个像素。bob下载地址只是为了让你们了解一下这个计划有多大。

通常可以看到多达数百甚至数千个操作符。所以通常很难分析。

如果您有兴趣了解Spark如何集成到我们的工作负载中,那么请随意查看我们去年的演示文稿。

广播加入审查

好了,接下来是Spark中的广播连接。首先,稍微回顾一下广播连接的含义。假设你有两个节点有两个数据集,蓝色表和红色表你想把它们连接起来。因此,广播连接将广播表的较小一侧,以便表在两个节点中完整地存在。在这种情况下,红色表被广播到两个节点然后你可以执行连接,这很快,因为你不需要洗牌表的大一边每个分区都可以单独执行一个。

比较一下这个和另一种流行的合并策略shuffle join, broadcast join更好因为它避免了大的一边洗牌而shuffle join是两边都洗牌。因为没有涉及到洗牌,广播连接自然地处理数据倾斜,因为您不必担心倾斜的数据进入一个分区,使得该分区进程的处理时间比其他分区要长一些,相反,洗牌也会有这个问题,因为您正在进行洗牌。在类似的情况下,广播连接更适合选择性连接。假设你有多个连接,但在最后一个连接中,你恰好有一个非常有选择性的连接,它产生的行很少。在Broadcast情况下,你可以一次性执行所有的连接,这不会对输出产生太大影响。而在shuffle连接中,对于每个连接,在你完成后,你必须将它洗牌到磁盘,这可能会产生许多不必要的中间结果。因此,对于广播带来的其他好处,也有一些缺点,即我们必须广播数据,这意味着数据需要适合内存。最后,广播连接不能总是用于所有的连接,例如,如果你在使用左外连接,那么就不可能广播左边,因为这样就不可能知道左边是否存在,而右边是否不存在。所以对于左外连接你只能广播右边。对于外部连接,您根本不能使用广播连接。但是shuffle join在这方面是万能的。

广播加入vs. Shuffle加入

因此,考虑到所有这些,当内存不是问题并且可以计划时,广播连接确实应该比shuffle连接更快。

在Spark中广播

然后让我们更多地讨论如何在Spark中进行广播。

在Spark中,广播是通过将数据从执行程序广播到驱动程序,然后由驱动程序广播回执行程序来实现的。

换句话说,驱动程序收集所有数据然后广播数据给执行程序。所以你可以看到,这有点低效,因为驱动程序现在成为瓶颈,在我们的工作负载中,驱动程序通常没有很多内存,因为它真的没有做很多事情。但是当涉及到广播时,我们必须确保驱动程序有足够的内存。

然后再进一步,broadcast加入Spark在这个broadcast机制之上工作。

它使用广播变量将数据广播回驱动程序,首先将数据收集到驱动程序,然后使用广播变量将其广播给执行程序。然后,在计划使用哪种连接策略时,它将使用此配置阈值根据预连接基础上的大小估计进行计划。

在Spark中有两种类型的广播连接,一种是广播哈希连接,驱动程序在内存中构建哈希表并将其分发给执行器。另一个是broadcast嵌套循环连接基本上是一个嵌套for循环。

广播嵌套循环对于非equi连接或coalescian连接非常好,但出于稳定性考虑,它在我们的工作负载中被禁用了,主要原因是,我们不知道客户是否会这样做。众所周知,客户内置管道和广播嵌套循环连接在大型输入数据集上表现不佳。出于稳定性考虑,这里我们更倾向于使用sort, merge, join。

目标:更多的广播连接

好了,接下来是如何改进广播连接。

我们的目标当然是有更多的广播连接因为我们刚说过,如果可能的话我们真的想做更多的广播连接因为这样性能更好。

那么问题是,广播连接是否真的更好,只要它适合内存?因为这似乎是这里真正的限制而不是连接类型而答案实际上是,我们很快就会看到,这取决于情况。

我们将使用的方法是,我们将提高阈值,看看什么会出错。

作为一个剧透警告,在驱动程序耗尽内存之前,许多事情实际上都出了问题,我们很快就会看到原因。因此,从简单开始,让我们只考虑一个连接。

实验设置

我们的实验设置是我们使用TPC-H数据集如果你熟悉它,它的10gb数据集版本。事实表lineitem有60分钟的行它连接到第二大表orders它有1500万行,连接在连接键上。驱动程序有一个核心和12g内存,执行程序有18个核心和102g内存。

单个连接结果

这是结果,我们只是简单地进行了连接,我们比较了行项的结果,你会看到排序合并连接实际上比广播连接快。为什么会这样呢?好吧,我之前提到过,如果你深入挖掘日志,你会看到驱动程序花时间收集所有1500万行订单返回到它的tone GVM,构建哈希表,然后通过广播变量将哈希表广播给所有的执行程序,在这种情况下,只有一个执行程序,执行程序反序列化哈希表。这有点低效,因为执行程序有数据,但它必须把数据发送给驱动程序,这样驱动程序才能把数据发送回执行程序。所以我们给了一个慢速合并连接所有的好处因为只有一个节点所以你不用担心网络但在这种情况下,广播连接实际上会导致更多的网络使用比排序合并连接要多。

为什么BHJ速度较慢?

那么解决方案是什么呢?自然解决的办法是,不做驱动集广播。

答案是executor side broadcast。

有了它,这是基于已经存在的Spark原型(听不清),然后添加了我们自己的改进,因为我们看到了比较原型的差异。这个概念很简单,基本上数据不是被发回给驱动程序,而是直接从执行程序发送到执行程序驱动程序所做的唯一一件事就是跟踪数据的位置。

执行方广播

执行程序BHJ vs.驱动程序

比较两种实现,驱动端广播必须处理它自己的哈希表,而执行端则不需要,这样更好,也更少的数据在网络中移动,因为执行程序之间进行通信,而不必与驱动程序对话以获取数据工作负载。

我想,执行程序端广播的另一个缺点是,由于数据完全存在于执行程序中,因此获得广播实际要在OI中显示的大小信息有点棘手。但综合考虑,我认为可以肯定地说,执行程序端的广播散列连接比驱动程序端的要好。好吧,让我们再运行一次结果,看看会发生什么。我们得到了和之前一样的图但是现在是执行程序端广播,你可以看到,是的,执行程序端广播比驱动程序端快但是,它还是没有排序归并连接快。为什么会这样呢?

新结果

为了理解这里到底发生了什么,让我们比较一下连接的代价模型。首先假设我们有n个核,我们有两个表A和B B是较小的表,这意味着我们想要传播它。

如果我们考虑成本,我们需要的步骤是,首先我们需要用n个核排序读取A,然后再用n个核写A。

我们需要对B做同样的事情,最后我们用n个核读取A和B,然后执行连接。

如果你认为在这种情况下,只有I/O是最昂贵的操作,通常情况就是这样,如果你消耗读和写大约相同的时间,那么总的I/O成本大约是3个A / n核和3个B / n核,因为我们总共没有写三次。

广播哈希连接也类似,假设有n个核,我们有A和B,它现在做的是用n个核读取B建立哈希表,然后用一个分区把它写出来。我们可以先把它写出来,然后用类似的代价构建一个哈希表。然后第二步,你只需用n个分区读取A,对于每个分区,你用一个分区读取B,然后执行连接。

同样,如果只考虑I/O成本,那么成本大致是使用n个核读取A,使用n个核读取B,然后重新写入B。

比较SMJ和成本

如果你把这两项成本放在一起比较,你要减去它们。然后你可以看到,在这些抵消之后,它大约是使用n个核两次读取A和B所花费的时间,而不是使用单个分区重写B所花费的时间。如果你消去更多的部分,那么你基本上就得到了比较排序归并联接和广播联接的成本,作为读取A和B的成本,使用n个核进行组合,与使用单个核读取B的成本进行比较。所以如果你稍微看一下,假设B的大小是1,那么我现在所做的分析基本上告诉你,如果a + 1大于n那么排序归并连接会慢一些,因为在这种情况下,如果a + 1 / n大于1那么排序归并连接会慢一些。

从这个公式中你可以推断出两个观点,一是核数越多,排序合并连接的性能就越好因为它受益于并行性,二是a越大,广播连接的性能就越好因为它不需要多次读取a。好了,现在让我们再次尝试基准测试,这一次,我改变了用于维护内存和执行器的内核数量,但只改变了运行完全相同的查询的内核数量。这就是不同之处,执行程序端广播,甚至驱动程序端广播两者都与排序归并连接相比。如果你回想一下我们做的粗略的数学计算,在这种情况下,大表的大小是小表的四倍,所以四加一等于五,这意味着如果我们有超过五个核,那么广播连接读起来会更好。如果内核少于5个排序归并连接会更好。这个计算在这个图中非常有效因为你可以看到6个核是我们的收支平衡点当你减少核数时,广播连接开始失效当你增加核数时,排序合并连接开始失效。

不同的核心- SMJ更好与更多的核心

改变A- BHJ的尺寸效果更好,差异越大

然后我们看另一个,如果我们增加A的大小会怎样?

我们还看到,当大小差异变大时,广播连接会更好因为A变大,然后除以相同大小的B会得到更大的数这意味着排序归并连接会变慢。

增加B -驱动器BHJ的大小失败,执行器BHJ最好

好了,这张钱的幻灯片展示了遗嘱执行人侧广播的好处。在这种情况下,我不仅增加了A,也增加了B,所以B现在是1.5亿行,而不是1500万行。如预期的那样,排序合并连接需要很长时间在A行上进行洗牌,但执行程序侧广播并不是因为它不需要再次洗牌A,它要快得多。但是驱动端完全失败了,因为它不能广播,这很有趣,因为如果你深入挖掘并理解统计数据。绘图键是一个整数。如果你有1亿5千万个实数,那就是1g左右的空间,但一旦它在内存中,它实际上就变成了高千兆字节。

哈希表需要8到15g的空间。驱动程序因为我们只配置了12g,你不能广播那么多数据。我猜,你甚至不能把数据收集回驱动程序,因为它没有足够的内存来容纳它。

因此驱动程序的广播不能正常工作而执行程序的广播显然是赢家。这就是这项技术想要展示的东西我们可以在这个实验中证明这一点。

其他广播连接改进

我们还做了一些其他的实验,也做了一些其他的改进,我在这里只提到其中的一些。调整初始JVM大小非常重要,也就是Xms,调整元空间大小也非常重要,元空间大小是JVM保存分配给它的所有类的地方,因为如果你一开始没有将它们设置得足够大,那么JVM会在扩展到占用所有分配的空间之前保守地先进行GC。因为我们不希望GC影响性能我们希望GC设置得更大。

对于我的用例,我将Metaspace大小设置为1gb以确保安全。下一次,我总是将它设置为与Xms相同的大小,这是您可以用于JVM的最大大小。

你注意到的另一件事是广播变量都是同步获取的,因为在编码原期间,编码原将决定先读取哪些变量,因为所有的线程都在相同的代码中执行,所有线程都在一次阻塞三分之一来读取广播变量。所以我们没有那样做,而是让它是并发的。

我们还做了其他内存改进,这些改进与广播连接上下文中的全阶段代码原有关,但由于它们没有直接关系,我不会在这里讨论它,但我们确实贡献了,我们确实计划将所有这些都贡献给开源。bob下载地址如果你点击链接,它会把你带到一个页面,告诉你我们面临的所有问题,以及我们计划如何解决这些问题。

好的,继续,我们将讨论一个生产案例研究。

加入人力资源客户管道中的类型

所以,在我们深入研究这个案例之前,我们已经运行了一些统计数据来理解我们在生产环境中运行的连接类型正如你在图表中看到的,大约98%的连接不是内连接就是左外连接这是个好消息,因为我们没有完全的外部连接这意味着我们的大多数情况,绝大多数情况都有希望从广播连接中受益。

在人力资源客户管道中广播估算

为了理解,因为有可能有join选项卡我们需要看到阈值被满足了。在这个图中,我们展示的是广播较小一侧的阈值估计,看看我们需要设置的阈值是什么,以便计划广播连接。在这种情况下,如果我们设置默认值,如果你从默认值设置阈值,从10mb到100mb那么80%的连接可以被广播。

你可能会想,这真的是大数据吗我想提醒你,我们关注的是非常复杂的管道。所以数据可能不是很大,但它们需要很长时间,因为我们需要通过大量的物理操作。

好,回到我之前暗示过的客户管道,这是一张有数百个物理操作员的图片。它可能看起来非常复杂,但如果你深入研究,你会发现实际上只有30个表。所以我们用例子数据来填充它其中29个是10000行其中一个是300万行。

人力资源用例管道

如果你执行这个连接,执行这个管道你会看到大约160个连接。连接的数量实际上取决于阈值。它们可以增加一点,也可以减少一点,我们用相同的设置运行这个,所以有18个执行器核心,假设内存在这里不是问题。

我们想问的问题是,广播连接能让管道运行得更快吗?换句话说,如果我们提高阈值,它能运行得更快吗?

不同广播MB, 10MB, 1GB)

这是我们运行的基准,我们将阈值设置为0兆,10兆,然后是1gb。0表示没有任何广播,10是默认值,1gb表示我们假设基本上所有可以广播的都被广播了。

如你所见,排序归并连接的情况不是很好但当你在默认情况下使用广播连接时它会有20%到30%的改善。如前所述,执行程序端的广播总是更快但一旦我们将广播阈值提高到1gb,你看到的实际上并不是很明显有一个性能关键。

那么这真的是,好吧,也许这不是(听不清)分布。如果我们再增加300万的表格,看看会发生什么?

大桌子排50米

故事其实是相似的。10mb的速度会更好,但是1gb的速度会更慢,你能猜到为什么吗?

为什么更多的广播连接更慢?

事实上,答案是我之前提到过的。

它与连接有关如果你回想一下我之前提到的公式广播连接的成本实际上取决于两个表之间的相对大小,在我们的例子中,我们的管道有很多自连接而不是左外连接左边实际上是较小的一边这意味着当你把阈值增加到最大时,大的表实际上会被广播,这不是很理想,因为这样你就需要做哈希表,通过网络发送,广播连接占用了最后一个连接的单个线程。这样就减少了并行度。最重要的是,它会占用存储内存因为哈希表需要留在内存中因此,自连接,广播连接的性能没有达到应有的水平。这些是我们真正需要研究的东西看看是否有可以改进的地方就使用广播连接来提高大型表的自连接的性能而言。

以上就是生产用例的全部内容,让我给你们一些总结。那么,我们得出的结论是什么呢?广播连接更好,但有一些注意事项。我们可以肯定的一点是,执行程序端的广播比驱动程序端的广播好,因为在我们看到的所有图表中,执行程序端的表现与驱动程序端的表现相当或更好,这很好。但是当你评估广播连接是否更好时,我们需要考虑,正如我们在公式中看到的,可用的核数,大表和小表的相对边,当然,你要广播的表需要适合你的内存。最后,你需要注意自连接和外部连接,你不能传播的实际上是更小的表。

广播连接更好,但需要注意

广播连接的未来改进

展望和思考未来的改进,我认为Spark 3.0中的自适应查询执行将会有极大的帮助,因为一方面,它将使大小估计更好。它将在查询运行时查看大小估计值,并使用新的统计信息和更准确的统计信息来决定后续连接是否被广播或用于排序归并。另一件事是,当你广播1.5亿行哈希表时,在我的基准测试中,它实际上需要50秒来为1.5亿行构建哈希表。对于大规模的工作负载,这可能不是很糟糕,但是当工作负载的总运行时间在较低的几分钟内时,时间就变得很重要了。所以如果你能使用多核,那就太好了。虽然这与每个任务使用一个线程的执行器模型相违背,但也许我们可以在模型中加入一些东西来适应这一点。

由于哈希表占用太多空间,我们应该努力使其更有效。哈希表的实现中,如果哈希键是沿着的,它会使用一个更有效的哈希表。在我的例子中,它节省了大约30%到40%的内存,但我不认为这在更大的环境中是足够的。最后,稍微展望一下未来。

实际上,我们可以让sortmerge join更好,让它处理倾斜,你可以通过传播倾斜值来做到这一点如果你洗牌的话。如果你在广播中发送,如果你在所有其他节点上发送倾斜值,那么倾斜就会被有效地处理。本质上,自适应查询执行会更好通过传播倾斜值,它会让排序归并连接更快。只是为了让连接运行得更快。我的演讲到此结束。我想花几秒钟感谢我的同事们让这次演讲成为可能。谢谢凯文,谢谢迈克,谢谢亚历克斯,谢谢安德烈和所有给我反馈的同事。现在,我可以回答提问了。谢谢你!

点击这里观看更多Spark + AI课程

免费试用Databricks
«回来
关于李建能

工作日

Jianneng Li是Workday的软件开发工程师(此前任职于Platfora,于2016年底被Workday收购)。他在Prism Analytics工作,这是一个端到端数据分析产品,是Workday生态系统的一部分,帮助企业更好地了解他们的财务和人力资源数据。作为Analytics组织Spark团队的一员,Jianneng擅长分布式系统和数据处理。他喜欢深入研究Spark内部,并发表了几篇关于Apache Spark和分析的博客文章。Jianneng拥有加州大学伯克利分校的EECS硕士学位和康奈尔大学的CS学士学位。