工程的博客

即将到来的Apache Spark 3.0中的矢量R I/O

通过Hyukjin Kwon

2020年6月1日 工程的博客

分享这篇文章

R是数据科学中最流行的计算机语言之一,专门用于统计分析,并带有许多扩展,如RStudio插件和其他R包,用于数据处理和机器学习任务。此外,它使数据科学家能够轻松地可视化他们的数据集。

通过SparkR在Apache火花TM, R代码可以很容易地伸缩。要以交互方式运行作业,您可以通过运行R shell轻松地运行分布式计算。

当SparkR不需要与R进程交互时,性能几乎与其他语言api(如Scala、Java和Python)相同.然而,当SparkR作业与本机R函数或数据类型交互时,会发生显著的性能下降。

Databricks Runtime在SparkR中引入了矢量化来提高Spark和R之间的数据I/O性能。我们很高兴地宣布使用R api从Apache箭头在0.15.1版本中,向量化现在可以在即将到来的Apache Spark 3.0中使用,并获得了实质性的性能改进。

这篇文章概述了SparkR内部的Spark和R交互,当前的原生实现和SparkR中的向量化实现以及基准测试结果。

火花和R相互酌

SparkR不仅支持一组丰富的ML和类sql api,还支持一组通常用于直接与R代码交互的api——例如,Spark DataFrame从/到R DataFrame的无缝转换,以及在Spark DataFrame上以分布式方式执行R原生函数。

在大多数情况下,Spark中的其他语言api的性能实际上是一致的——例如,当用户代码依赖于Spark udf和/或SQL api时,执行完全发生在JVM内部,在I/O中没有性能损失。请看下面的例子,它们同样需要1秒。

//Scala API//1第二个sql("SELECT id FROM range(2000000000)")。过滤器("id > 10")。()
# r API1第二个过滤器sql("SELECT * FROM range(2000000000)"), "id > 10"))

然而,在需要执行R原生函数或将其从/转换为R原生类型的情况下,性能会有很大的不同,如下所示。

// Scala APIval ds = (1L到100000L).toDS// ~1秒ds.mapPartitions (iter= >iter.filter (_
              
              
# R APIdf
              虽然简单的情况下以上只是过滤低于的值50000每个分区,SparkR慢了15倍。
              < >之前// Scala API// ~0.2秒Val df = sql(SELECT * FROM range(1000000)) .collect ()
# r API# ~8秒-慢40倍df
              上面的例子更糟。它只是将相同的数据收集到驱动端,但是它40 x慢SparkR。因为需要交互的apiR原函数数据类型而且它的实现有很能干的人。有六种api具有显著的性能损失:
  • createDataFrame ()
  • 收集()
  • 有斑纹的()
  • dapplyCollect ()
  • 新闻出版总署()
  • gapplyCollect ()

简而言之,createDataFrame ()而且收集()要求(反)序列化和转换数据从JVM从/到R驱动端。例如,字符串在Java中变成字符在r中。有斑纹的()而且新闻出版总署(), JVM和R执行器之间的转换是必需的,因为它需要(反)序列化R本机函数和数据。如果dapplyCollect ()而且gapplyCollect (),它需要JVM和R之间的驱动程序和执行程序的开销。

本机实现

在Spark中没有矢量化的R本机实现,这需要低效的(反)序列化和数据从JVM到R驱动程序端的转换,导致显著的性能损失。

SparkR DataFrame上的计算分布在Spark集群上的所有可用节点上。如果它不需要作为R来收集数据,那么在驱动程序或执行程序端就不需要与上面的R进程进行通信data.frame或者执行R原生函数。当它需要R时data.frame或R原生函数的执行,它们在JVM和R驱动程序/执行器之间使用套接字进行通信。

它以一种低效的编码格式在JVM和R之间逐行(反)序列化和传输数据,这种编码格式没有考虑到现代的CPU设计,例如CPU流水线。

矢量化实现

在Apache Spark 3.0中,SparkR引入了一种新的向量化实现,它利用Apache Arrow直接在JVM和R驱动程序/执行器之间交换数据,从而使序列化成本最小化。

在Spark中使用矢量化实现R(在Spark 3.0中可用),其中数据在JVM和R执行器/驱动程序之间交换,通过Apache Arrow进行高效的(反)序列化,以获得更好的性能。

新的实现利用Apache Arrow以高效的柱状格式支持流水线和单指令多数据(SIMD),而不是在JVM和R之间使用低效格式逐行(反)序列化数据。

新的向量化SparkR api在默认情况下不启用,但可以通过设置启用spark.sql.execution.arrow.sparkr.enabled真正的在即将到来的Apache Spark 3.0中。注意这是向量化的dapplyCollect ()而且gapplyCollect ()尚未实现。鼓励用户使用有斑纹的()而且新闻出版总署()代替。

基准测试结果

基准测试是使用一个简单的数据集执行的500000条记录通过执行相同的代码并比较启用和禁用向量化时的总运行时间。我们的代码、数据集和笔记本都是可用的在GitHub上

SparkR与未进行向量化的SparkR之间的性能比较表明,前者的性能优于后者。

如果收集()而且createDataFrame ()使用R DataFrame,当启用向量化时,速度大约快了17倍和42倍。为有斑纹的()而且新闻出版总署()时,它分别比禁用向量化时快43倍和33倍。

当通过启用优化时,性能提高了17x-43xspark.sql.execution.arrow.sparkr.enabled真正的.数据越大,预期的性能就越高。详细信息请参见之前为Databricks Runtime执行的基准测试

结论

即将到来的Apache Spark 3.0,支持向量化api,有斑纹的()新闻出版总署()收集()而且createDataFrame ()与R DataFrame,利用Apache Arrow。在SparkR中启用向量化将性能提高了43倍,当数据的大小更大时,预期会有更大的提升。

至于未来的工作,Apache Arrow有一个持续的问题,箭- 4512.JVM和R之间的通信目前不是完全以流方式进行的。它必须(反)批量序列化,因为Arrow R API不支持开箱即用。此外,dapplyCollect ()而且gapplyCollect ()将在Apache Spark 3中得到支持。x版本。用户可以通过有斑纹的()而且收集(),新闻出版总署()而且收集()与此同时,单独的。

试试这些新的今天在Databricks上的功能通过DBR 7.0 Beta,其中包括即将发布的Spark 3.0版本的预览。BOB低频彩了解更多关于Spark 3.0的信息预览网络研讨会。

免费试用Databricks
看到所有工程的博客的帖子