使用射线砖
预览
这个特性是在公共预览。
雷tripwire以上支持创建雷集群和射线应用程序运行在Apache引发集群数据砖。对射线开始使用机器学习的信息,包括教程和示例,请参见射线的文档。关于雷和Apache火花集成的更多信息,见射线引发API文档。
创建一个射线集群
创建一个射线集群,使用ray.util.spark.setup_ray_clusterAPI。
从ray.util.spark进口setup_ray_cluster,shutdown_ray_clustersetup_ray_cluster(num_worker_nodes=2,num_cpus_per_node=4,collect_log_to_path=“/ dbfs /道路/ / ray_collected_logs”)
的ray.util.spark.setup_ray_cluster
API创建一个射线簇火花。在内部,它创建一个背景火花工作。工作中的每个火花任务创建一个射线工作者节点,和雷头节点上创建驱动程序。这个论点num_worker_nodes
创建代表射线工作者节点的数量。指定数量的CPU或GPU核心分配给每个射线工作者节点,设置参数num_cpus_per_node
或num_gpus_per_node
。
雷集群创建之后,您就可以直接在你的笔记本运行任何射线应用程序代码。一个HTML链接在新选项卡中打开雷集群仪表板也显示,允许您查看集群的射线仪表板。
提示
如果您正在使用一个砖分配模式集群,您可以设置num_worker_nodes
来ray.util.spark.MAX_NUM_WORKER_NODES
为了使用所有可用的资源为你雷集群。
setup_ray_cluster (#……num_worker_nodes = ray.util.spark.MAX_NUM_WORKER_NODES,)
你可以设置参数collect_log_to_path
指定目的地的路径,你想收集射线集群日志。日志收集运行射线集群后关闭。砖建议你设置一个路径开始/ dbfs /
所以日志保存即使你终止集群的火花。
请注意
调用ray.util.spark.setup_ray_cluster
将设置RAY_ADDRESS
环境变量的地址创建集群射线,射线应用程序将自动使用这个雷集群。您可以指定一个替代集群地址使用地址
论点的ray.initAPI。
运行一线的应用程序
雷集群创建之后,您可以运行任何射线砖笔记本的应用程序代码。例如,您可以运行一个简单的射线应用程序在一个数据砖笔记本如下:
进口雷进口随机进口时间从分数进口分数雷。初始化()@ray。远程defpi4_sample(sample_count):”““pi4_sample sample_count实验运行,并返回部分时间内循环。”“”in_count=0为我在范围(sample_count):x=随机。随机()y=随机。随机()如果x*x+y*y< =1:in_count+ =1返回分数(in_count,sample_count)SAMPLE_COUNT=1000年*1000年开始=时间。时间()未来=pi4_sample。远程(sample_count=SAMPLE_COUNT)pi4=雷。得到(未来)结束=时间。时间()大调的=结束- - - - - -开始打印(f“运行{SAMPLE_COUNT}测试了{大调的}秒的)π=pi4*4打印(浮动(π))
从火花DataFrame加载数据
加载一个火花DataFrame射线数据集,首先你需要保存火花DataFrame DBFS使用镶花或δ格式。为了控制DBFS安全地访问、砖建议你山云DBFS对象存储。然后,您可以创建一个ray.data.Dataset
实例从保存的火花DataFrame路径使用以下辅助方法:
进口雷进口操作系统从urllib.parse进口urlparsedefcreate_ray_dataset_from_spark_dataframe(spark_dataframe,dbfs_tmp_path):spark_df。写。模式(“覆盖”)。拼花(dbfs_tmp_path)fuse_path=“/ dbfs”+urlparse(dbfs_tmp_path)。路径返回雷。数据。read_parquet(fuse_path)#例如,读表作为火花DataFrame三角洲spark_df=火花。读。表(“diviner_demo.diviner_pedestrians_data_500”)#提供dbfs位置写表data_location_2=(“dbfs: / home / example.user@www.neidfyre.com/data/ray_test/test_data_2”)#火花DataFrame转换为一线的数据集ray_dataset=create_ray_dataset_from_spark_dataframe(spark_dataframe=spark_df,dbfs_tmp_path=data_location_2)
关闭集群一线
关闭射线集群上运行数据砖,你可以调用ray.utils.spark.shutdown_ray_clusterAPI。
调优射线集群配置
每个射线工作者节点的推荐配置是:
每个雷工人最低4个CPU核心节点。
最低10 gb堆内存为每个射线工作者节点。
所以,当调用ray.util.spark.setup_ray_cluster
,砖建议设置num_cpus_per_node
一个值> = 4。
看到射线工作者节点的内存分配细节调优堆内存为每个射线工作者节点。
射线工作者节点的内存分配
每个射线工作者节点使用两种类型的内存:堆内存和对象存储内存。每种类型的分配的内存大小是决定如下所述。
总内存分配给每个射线工作者节点是:
RAY_WORKER_NODE_TOTAL_MEMORY=(SPARK_WORKER_NODE_PHYSICAL_MEMORY/MAX_NUMBER_OF_LOCAL_RAY_WORKER_NODES*0.8)
MAX_NUMBER_OF_LOCAL_RAY_WORKER_NODES
是射线工作者节点的最大数量,可以引发工人节点上启动。这是由参数决定num_cpus_per_node
或num_gpus_per_node
。
如果你不设置参数object_store_memory_per_node
,然后堆内存大小和对象存储内存大小分配给每个射线工作者节点是:
RAY_WORKER_NODE_HEAP_MEMORY=RAY_WORKER_NODE_TOTAL_MEMORY*0.7OBJECT_STORE_MEMORY_PER_NODE=RAY_WORKER_NODE_TOTAL_MEMORY*0.3
如果你设置参数object_store_memory_per_node
:
RAY_WORKER_NODE_HEAP_MEMORY=RAY_WORKER_NODE_TOTAL_MEMORY- - - - - -argument_object_store_memory_per_node
此外,对象存储内存大小/射线工作者节点有限的共享内存操作系统。最大的价值是:
OBJECT_STORE_MEMORY_PER_NODE_CAP=(SPARK_WORKER_NODE_OS_SHARED_MEMORY/MAX_NUMBER_OF_LOCAL_RAY_WORKER_NODES*0.8)
SPARK_WORKER_NODE_OS_SHARED_MEMORY
是/dev/shm
磁盘容量配置的火花工作节点。