即将到来的Apache Spark 3.0中的矢量R I/O
2020年6月1日 在工程的博客
R是数据科学中最流行的计算机语言之一,专门用于统计分析,并带有许多扩展,如RStudio插件和其他R包,用于数据处理和机器学习任务。此外,它使数据科学家能够轻松地可视化他们的数据集。
通过SparkR在Apache火花TM, R代码可以很容易地伸缩。要以交互方式运行作业,您可以通过运行R shell轻松地运行分布式计算。
当SparkR不需要与R进程交互时,性能几乎与其他语言api(如Scala、Java和Python)相同.然而,当SparkR作业与本机R函数或数据类型交互时,会发生显著的性能下降。
Databricks Runtime在SparkR中引入了矢量化来提高Spark和R之间的数据I/O性能。我们很高兴地宣布使用R api从Apache箭头在0.15.1版本中,向量化现在可以在即将到来的Apache Spark 3.0中使用,并获得了实质性的性能改进。
这篇文章概述了SparkR内部的Spark和R交互,当前的原生实现和SparkR中的向量化实现以及基准测试结果。
火花和R相互酌
SparkR不仅支持一组丰富的ML和类sql api,还支持一组通常用于直接与R代码交互的api——例如,Spark DataFrame从/到R DataFrame的无缝转换,以及在Spark DataFrame上以分布式方式执行R原生函数。
在大多数情况下,Spark中的其他语言api的性能实际上是一致的——例如,当用户代码依赖于Spark udf和/或SQL api时,执行完全发生在JVM内部,在I/O中没有性能损失。请看下面的例子,它们同样需要1秒。
//Scala API//~1第二个sql("SELECT id FROM range(2000000000)")。过滤器("id > 10")。数()
# r API#~1第二个数(过滤器(sql("SELECT * FROM range(2000000000)"), "id > 10"))
然而,在需要执行R原生函数或将其从/转换为R原生类型的情况下,性能会有很大的不同,如下所示。
// Scala APIval ds = (1L到100000L).toDS// ~1秒ds.mapPartitions (iter= >iter.filter (_
# R APIdf
虽然这简单的情况下以上只是过滤低于的值50,000为每个分区,SparkR慢了15倍。
< >之前// Scala API// ~0.2秒Val df = sql(SELECT * FROM range(1000000)) .collect ()
# r API# ~8秒-慢40倍df
上面的例子是更糟。它只是将相同的数据收集到驱动端,但是它是40 x慢在SparkR。这是因为需要交互的api与R原函数或数据类型而且它的实现有不很能干的人。有六种api具有显著的性能损失:
createDataFrame ()
收集()
有斑纹的()
dapplyCollect ()
新闻出版总署()
gapplyCollect ()
简而言之,createDataFrame ()
而且收集()
要求(反)序列化和转换数据从JVM从/到R驱动端。例如,字符串
在Java中变成字符
在r中。有斑纹的()
而且新闻出版总署()
, JVM和R执行器之间的转换是必需的,因为它需要(反)序列化R本机函数和数据。如果dapplyCollect ()
而且gapplyCollect ()
,它需要JVM和R之间的驱动程序和执行程序的开销。
本机实现
SparkR DataFrame上的计算分布在Spark集群上的所有可用节点上。如果它不需要作为R来收集数据,那么在驱动程序或执行程序端就不需要与上面的R进程进行通信data.frame
或者执行R原生函数。当它需要R时data.frame
或R原生函数的执行,它们在JVM和R驱动程序/执行器之间使用套接字进行通信。
它以一种低效的编码格式在JVM和R之间逐行(反)序列化和传输数据,这种编码格式没有考虑到现代的CPU设计,例如CPU流水线。
矢量化实现
在Apache Spark 3.0中,SparkR引入了一种新的向量化实现,它利用Apache Arrow直接在JVM和R驱动程序/执行器之间交换数据,从而使序列化成本最小化。
新的实现利用Apache Arrow以高效的柱状格式支持流水线和单指令多数据(SIMD),而不是在JVM和R之间使用低效格式逐行(反)序列化数据。
新的向量化SparkR api在默认情况下不启用,但可以通过设置启用spark.sql.execution.arrow.sparkr.enabled
来真正的
在即将到来的Apache Spark 3.0中。注意这是向量化的dapplyCollect ()
而且gapplyCollect ()
尚未实现。鼓励用户使用有斑纹的()
而且新闻出版总署()
代替。
基准测试结果
基准测试是使用一个简单的数据集执行的500000条记录通过执行相同的代码并比较启用和禁用向量化时的总运行时间。我们的代码、数据集和笔记本都是可用的在GitHub上.
如果收集()
而且createDataFrame ()
使用R DataFrame,当启用向量化时,速度大约快了17倍和42倍。为有斑纹的()
而且新闻出版总署()
时,它分别比禁用向量化时快43倍和33倍。
当通过启用优化时,性能提高了17x-43xspark.sql.execution.arrow.sparkr.enabled
来真正的
.数据越大,预期的性能就越高。详细信息请参见之前为Databricks Runtime执行的基准测试.
结论
即将到来的Apache Spark 3.0,支持向量化api,有斑纹的()
,新闻出版总署()
,收集()
而且createDataFrame ()
与R DataFrame,利用Apache Arrow。在SparkR中启用向量化将性能提高了43倍,当数据的大小更大时,预期会有更大的提升。
至于未来的工作,Apache Arrow有一个持续的问题,箭- 4512.JVM和R之间的通信目前不是完全以流方式进行的。它必须(反)批量序列化,因为Arrow R API不支持开箱即用。此外,dapplyCollect ()
而且gapplyCollect ()
将在Apache Spark 3中得到支持。x版本。用户可以通过有斑纹的()
而且收集()
,新闻出版总署()
而且收集()
与此同时,单独的。
试试这些新的今天在Databricks上的功能通过DBR 7.0 Beta,其中包括即将发布的Spark 3.0版本的预览。BOB低频彩了解更多关于Spark 3.0的信息预览网络研讨会。