Flash Apache Spark Shuffle与中远

下载幻灯片

中远是一个高效可靠的“洗牌即服务”,为Spark提供了Facebook仓库规模的工作机会。Cosco建立在共享分布式内存池的内存聚合之上,提供比Spark内置shuffle更有效的磁盘使用。在这次演讲中,我们将介绍如何在Cosco中添加一点flash来进一步提高shuffle效率:flash减少内存使用,更大的预写(聚合)缓冲区进一步帮助减少磁盘IO。通过仔细的实验和分析,我们还演示了动态利用内存和闪存保护闪存的持久性,即使是在像shuffle这样的写一次/读一次工作负载下。最后,flash持久瓶颈的长时间尺度允许它优雅地吸收工作负载的短期峰值。我们讨论了flash如何适应中远在Facebook的架构和部署模型,从大规模生产部署中获得的经验教训,以及未来可能的工作。我们在2019年Spark+AI峰会上首次介绍了中远集团。

点击这里观看更多Spark + AI课程

免费试用Databricks

视频记录

嗨,我是Aaron,我在Cosco工作,这是一项服务,基本上是Facebook Spark引擎的一部分。

Flash为Spark Shuffle与中远

具体来说,它处理shuffle,即Spark阶段之间的数据传输。我将谈谈我们如何使用闪存硬件来提高中远的效率。

我将从一些动机开始,然后我将介绍shuffle架构的基础知识。我将描述使用flash来提高效率的基本思想。我会讲一些更高级的技巧。我将谈谈我们未来可以做的改进。我会讲一些测试和分析技术。

你为什么要关心这个?

所以,有些动力。我们在Facebook上使用Cosco的原因是它极大地提高了IO效率。因此,您可以将磁盘服务时间视为旋转磁盘用于服务shuffle数据的时间总量,而这个指标在Cosco上下降了大约3倍。也就是说,中国远洋确实会使用一些计算资源,特别是内存。通过使用闪存,我们可以用相对较少的闪存消耗更少的内存。因此,需要关注的两个原因是提高IO效率和提高计算效率。我们不太关注查询延迟,因为我们主要使用Spark处理批处理工作负载。我还会讲一些开发和分析的技术,这些技术通常是有用的,即使在中远之外也是如此。

这是一些基本的shuffle架构。

Spark Shuffle Recap

当Spark任务发送映射输出数据时,我将它称为映射器。并且,当Spark任务读取shuffle数据时,我将它称为reducer。因此,在没有Cosco的普通Spark中,映射器将其输出数据写入按reducer分区分组的本地磁盘。每个减速器都有一个分区。

然后,还原器从映射器的本地磁盘读取这些数据。

如果需要,还原器在处理应用程序逻辑之前对这些数据进行排序。

这里有两个关于IO效率的问题。其中之一是写放大。

在实践中,我们观察到大约3倍的写入放大,这意味着对于映射器逻辑上尝试转移到约化器的每个字节,我们实际上最终将三个字节写入本地磁盘,这有两个原因。一个在制图者这边。如果一个映射器有太多的数据,以至于它不适合映射器的内存,它需要写入到磁盘,然后产生更多的数据,将其写入磁盘,然后再读回来,以便按分区分组以获得最终的输出文件。

我说过这叫做映射端合并。类似地,在减速器一侧,如果需要排序,我们进行减速器一侧的归并。而且,如果数据太大,无法装入reducer内存,reducer同样需要进行某种外部合并。这就导致了写放大。另一个问题是小IOs。在实践中,我们看到,平均每次读取大约只有200kb,这对于旋转磁盘来说是非常小的。这样做的原因是读取的次数基本上是M乘以R,其中M是映射器的数量R是还原器的数量。而且,这种方法与作业的大小成二次比例,而shuffle数据的数量只成线性比例。这就是为什么我们看到小型IOs。

为了描述中远如何提高IO效率,我首先要简化这张图。

现在,Cosco进入了这个领域,因为映射器将直接将输出数据流到Cosco Shuffle Services,而不是将输出数据写入本地磁盘。

Cosco Shuffle for Spark

因此,我们可能有数千个Cosco Shuffle服务,每个Shuffle服务都有一个用于每个reducer分区的内存缓冲区。映射输出数据直接追加到这些缓冲区。当缓冲区已满时,我们将它们写入分布式文件系统,如HDFS。

然后,约简器从分布式文件系统读取这些数据。所以,减速机读取的每个文件都是完全专用于减速机的,这些文件有几十兆字节,这样就解决了小IO的问题。我们还解决了写入放大的问题,或者说我们改进了很多,因为没有映射端合并。同样,映射器将数据直接流到Shuffle Services。如果减速器一次打开的文件太多,可能会有一些减速器侧的合并,但这不是一个问题。

这是关于flash相关内容的shuffle。中远集团还有其他部分,尤其是元数据管理。我不会在这个演讲中详细讲,但是,如果你愿意,你可以看看我们去年的演讲,其中涉及了一些细节。这张幻灯片底部有一个链接。而且,我还要总结说,中远集团是一个可靠的、容错的、可扩展的分布式系统。

那么,让我们来谈谈如何使用flash让中远更有效率。

缓冲是追加

让我们仔细看看这些Shuffle服务内部发生了什么。数据从映射器中发送,我们称之为包,几十千字节数据被追加到内存中,用于Shuffle Services分区的内存缓冲区中。使用flash的基本思想是我们可以用一个flash内缓冲区来代替这个内存内缓冲区。

用Flash代替DRAM简单地用闪存代替内存

直接写入闪存,而不是写入内存。我观察到这是一个非常友好的使用flash的模式。使用flash的主要挑战是flash的持续时间有限。也就是说,如果您向驱动器写入太多内容,它将更快地磨损。但是有些模式写入闪存比其他模式更有利于写入耐力,特别是相对于写入的数据量而言,大量随机的小写入对驱动器不利。这种追加模式,写入几十kb,都附加到同一个文件中,对闪存非常友好,这就是我们减少驱动器损耗的方法。

在缓冲区满后,如果需要排序,我们将它读入主存进行排序。

我还会观察到flash比主存慢一点,但对于这个用例来说,这不是问题,因为所有这些对flash的读写通常都是无阻塞的,而且延迟比数据在内存中缓冲的总时间要短得多。

经验法则示例

让我们用经验法则来计算。我将使用的经验法则是,让我们假设您在集群中部署1gb的RAM,或者可以承受每天100gb写入的闪存数量。第一个要点有一些数据可以证明这一点,但我要指出的是,在决定部署多少DRAM和多少闪存时,可能有很多因素?而且,其中一个因素是DRAM比闪存消耗更多的能量。

因此,为了应用这个经验法则,让我们假设一个有10个节点的集群,每个节点使用100gb的DRAM进行缓冲。

基本评价

应用这个1 -100的规则,我们可以在使用这个集群和使用每个节点用1tb闪存替换100gb DRAM的集群之间进行比较。

因此,总的来说,如果使用闪存,集群每天可以承受100tb的写入,而如果使用内存,集群每天只能承受1tb的DRAM。

这是一张总结图。如果集群每天变换100tb,那么这两张图片是相等的。每个集群有100个服务。在左边,每个服务有100g的DRAM。在右边,它有1tb的闪存,因此每个服务可以承受10tb的写操作,集群总共可以承受100tb的写操作。

是的,如果你每天洗牌100tb,它们是相等的如果你洗牌更少,那么闪存更好。

让我们来讨论一些可以进一步提高这种效率的混合技术。

两种混合技术使用DRAM和flash进行缓冲的两种方法

第一种技术是先在DRAM中进行缓冲,仅在内存压力下进行刷新。而且,第二种技术将是对填充最快的分区使用DRAM,对填充最慢的分区使用闪存,因为较慢的分区对闪存的损耗较小。

混合技术#1

我们来谈谈第一个技巧。我们基本上会利用Shuffle工作负载随时间变化的优势。因此,在y轴上,我们有在任何给定时间缓冲在Shuffle Service内存中的字节数。因此,我们可以让集群中有1tb的DRAM处理这个工作负载,或者,再次使用我们的经验法则,我们可以让集群每天承受100tb的写操作。假设这是100tb。我们会用flash来缓冲。

但是,混合技术是我们可以只保留250g的DRAM,并先将缓冲区数据发送到DRAM。然后,如果我们用完了DRAM,我们会把它发送到闪存,也许这只会导致我们每天写入25tb到闪存。因此,总的来说,我们已经将这个纯DRAM集群替换为一个具有25% RAM和25%同等容量闪存的集群,但我们仍然支持整个工作负载,因此效率提高了大约2倍。我还要指出的是,使用flash将这个系统推到极限会更安全,因为如果你只使用内存,而内存用完,糟糕的事情就会立即发生。然而,如果你使用闪存,你超过了每天应该写入闪存的指导方针,这并不是一个大问题,只要在驱动器的生命周期内,你不写太多,所以你可以在以后弥补它,并在较短的时间内吸收小峰值。

这种混合方法的一个实现细节是我们需要决定何时刷新到flash。特别是,我们需要决定什么时候做这个和我们可以做的其他事情。因此,我们可以将分区重定向到另一个Shuffle Service。我们可以立即将其刷新到分布式文件系统以释放内存。或者,我们可以通过告诉它们慢下来并更慢地发送数据来反压绘图器。因此,中远已经在纯DRAM场景中处理了所有这些平衡决策。

我们如何修改这个来支持flash呢?其中一种方法是我们可以插入现有的平衡逻辑。我们可以引入一些阈值,并说如果我们在驱动器的工作集中的闪存数量,基本上是我们当前在文件中占用的闪存数量,如果这个数量小于某个阈值,那么我们可以刷新为闪存。如果它大于这个阈值,那么我们可以应用之前使用的相同的平衡逻辑。

并且,我们可以在生产中配置这个阈值,这样我们就可以保持在flash耐力限制之下。然后,不管这个阈值是多少,我们都可以用它来预测集群的性能。基本上,集群的性能应该与闪存阈值为DRAM时一样好,因为我们基本上只是以一种不超过闪存续航限制的方式将DRAM替换为闪存。这可能不是做出这种平衡决策的最佳方式,但它确实很好,因为它易于实现,也易于预测性能。只是插入现有的平衡逻辑。

因此,总而言之,我们可以利用这种首先使用DRAM和仅在内存压力下使用闪存的混合技术,利用随时间变化的shuffle工作负载。我们可以调整平衡逻辑。第二种混合技术利用了一些分区比其他分区填充得慢的优点。较慢的分区磨损速度较慢。因此,我们的想法是对较慢的分区使用闪存,对较快的分区使用DRAM。

在我们的纯DRAM集群中,使用这个小示例集群,我们有1tb的DRAM。这可以支持,我称它为,100,000个流,每个流缓冲高达10兆字节。所以,你可以把一个流想象成一个永恒的分区,一个永远存在的分区。我们只是把它作为一个假设来建模和分析。因此,这个集群支持100,000个这样的流,因为100,000乘以10mb等于1tb。另一方面,如果我们使用flash,那么为了获得100,000个流,我们希望每个流每秒写入小于12千字节,因为这将导致整个集群每天写入100tb,同样,根据我们的经验法则,这是使这些DRAM和flash集群在我们想要部署的方面等效的原因。

而且,如果流速度低于每秒12千字节,那么flash可以支持更多的流。所以,这些较慢的流在闪存上更好,然而,如果流更快,那么它们在DRAM上更好。

因此,技术是定期测量每个分区的填充率。如果我们发现它小于某个阈值,那么我们可以将该分区的未来数据发送到flash。否则,我们可以继续在DRAM中缓冲分区。这里有一些数学计算。根据您的工作负载,您可以计算使用这种技术可以节省多少费用。

这个技术,同样,它依赖于工作负载,依赖于现有的慢分区,让我们看看真实的工作负载是什么样的。

基本上,有一些分区非常快,但大多数都比较慢。这看起来像一个指数曲线,这对于使用这种混合技术来说是个好消息因为这意味着有很多分区我们可以有效地使用flash。我要指出的是,在替换DRAM方面,我们真正关心的是,通过将一个分区移动到闪存来替换多少DRAM。这取决于内存的大小乘以分区所消耗的时间。所以,我们也可以看相同的图,但是用每个分区所花费的缓冲时间来加权,我们看到这是一个非常相似的图,所以同样的结论适用。有很多分区可以发送到flash中。

结合两种混合技术

然后,也许我们有兴趣把这两种混合技术结合起来。我们将首先在DRAM中进行缓冲,然后,一旦DRAM满了,我们将把最慢的分区发送到闪存中。如何评估这个,如何估计效率,一种方法是使用离散偶模拟,这个我会在后面的演讲中讨论。首先,让我们谈谈未来可能的改进。

转型的查询

一类改进可以尝试获得更低的延迟查询。这对于交互式工作负载可能特别有用。一种方法是我们可以直接从flash中提供数据。从某种意义上说,将这些数据保存在闪存上直到驱动器被填满是一种自由,因为我们希望在闪存的续航能力上受到瓶颈,而不是闪存工作集的限制。所以,没有理由删除闪存中的数据,直到硬盘满了,或者至少满到开始更快地磨损它。

这是一种提高交互查询延迟的方法。

然后,另一种方法是,当我们从Cosco写入分布式文件系统时,flash允许我们有更大的块。我们称它为chunk。然后,更大的块意味着减速器需要归并来排序的机会更少。因此,这对于交互式查询也很有用,总体上降低了查询延迟。

进一步提高效率

另一类未来的胜利,我们可以实现的flash是更高效的胜利。一种方法是降低复制因子。所以,我注意到中远是一个持久的分布式系统。

我们实现持久性的一种方法是我们有R2复制。在数据被写入分布式文件系统之前,它被复制到两个不同的Cosco Shuffle服务上。但是,对于flash,因为flash是非易失性的,而内存是易失性的,我们也许可以减少复制因子。在实践中,我们看到大多数shuffle服务崩溃都是通过在同一台机器上重新启动进程在几分钟内解决的。因此,我们可以潜在地恢复这些数据,并摆脱较低的复制因子。然后,再一次,这里有更大的块,它允许我们通过在分布式文件系统上使用更有效的Reed-Solomon编码来提高效率。

那么,让我们来谈谈一些实用的评估技巧。其中一些技术将帮助我们预测效率的胜利,就像我在之前的演讲中所做的那样。然后,他们中的一些人还会帮助我们检查使用flash没有可靠性问题。

实用评估技术

这里有四种不同的技术,从最理论性到最类似于生产中的测试。我将重点介绍其中的两个,离散事件模拟和生产集群上的特殊金丝雀。

离散事件模拟

离散事件模拟是一种著名的模拟技术。在维基百科上有一个页面。

离散事件模拟的基本思想是模拟中的每一步都对应一个事件。这里,事件是包,从每个mapper到达的几十千字节,每个包在特定的时间到达它被附加到Shuffle Service中特定的内存缓冲区中。我们可以跟踪我们关心的指标,比如写入flash的数据总量。

另一种类型的事件是当缓冲区被排序并溢出到DFS时。我们感兴趣的另一个指标是分布式文件系统中的平均文件大小,所以我们也可以跟踪它。

我们可以用真实世界的数据来驱动这个模拟。我们有这个中远区块数据集。对于写入DFS的每个块文件,我们都写入一行,这一行包含了一些信息,比如块大小和块用于缓冲的时间。我们可以使用这两个数字来驱动块填充率,我们可以使用它,以及块开始时间,这是块开始缓冲的时间,我们可以使用它来驱动模拟,并决定我们应该在什么时候尝试模拟哪些离散事件。

同样,我们可以用这个来预测效率。特别是,您可以想象在每个模拟的Cosco Shuffle Service上都有一定的内存限制,一旦达到内存限制,我们将把最慢的分区发送到flash。我们可以用这个来算出,如果我们有这样那样的内存限制,我们最终会向flash写入这么多。如果我们有更多的内存,写入flash的空间会减少多少?而且,总的来说,我们可以预测我们需要多少闪存,放多少内存。

生产集群上的金丝雀

这就是离散事件模拟。另一种技术基本上是在生产集群中金丝雀,但这里有一些有趣的事情,因为您可以做的最简单的事情是您可以用一些纯闪存Shuffle Services替换一些纯内存Shuffle Services。或者,更确切地说,一些Shuffle服务有一些闪光。但是观察这个金丝雀的结果并不是特别容易,因为我们关心的许多指标都是在mapper任务上观察到的,每个mapper都可能与许多Shuffle Services进行通信。

在map任务的整个生命周期内,映射器甚至可以与不同的Shuffle Services通信。因为我们进行了这种动态平衡,所以可以将映射器重定向到特定分区的不同Shuffle Service。所以,这很难说。

如果我们看到映射器指标的变化,是由于这个吗

纯flash Shuffle服务或只是工作负载随时间的变化?是因为纯内存Shuffle服务吗?一种更好地理解在绘图仪上观察到的这些指标的方法是使用我们已经在Cosco中使用的这个功能,甚至在使用flash之前。我们称之为特征子集群。基本上,

所有Shuffle服务都被分成不同的组,我们称之为子集群,每个映射器只与来自一个子集群的Shuffle服务通信。在使用flash之前,我们使用它来限制故障域和其他一些原因。

但这对flash计算很好因为我们可以把金丝雀机器和一些flash放在一起,我们可以把它们都放在一个子集群中,然后我们可以比较使用flash机器的子集群的映射器和使用纯DRAM子集群的映射器之间的总指标。而且,这使我们免受工作负载随时间变化的影响。我们可以在生产环境中运行它,看看不同子集群上的映射器指标是什么。

我要感谢Chen和Serge为这个项目所做的所有工作。

这里有一些链接到我们在前几年关于shuffle的演讲。

现在,我期待听到大家的提问。

点击这里观看更多Spark + AI课程

免费试用Databricks
«回来
亚伦费尔德曼
关于Aaron Feldman

脸谱网

亚伦四年前加入Facebook的数据平台团队,在那里他设计bob体育客户端下载和实施了中远集团的核心组件,并帮助推动其在内部得到广泛采用。在Facebook之前,艾伦在加州理工学院读本科,学习数学和计算机科学。