如何使用数据库管理端到端深度学习管道
2021年8月25日 在bob体育客户端下载平台的博客
深度学习(DL)模型正被应用于所有行业的用例——金融服务中的欺诈检测、媒体中的个性化、医疗保健中的图像识别等等。随着应用的日益广泛,今天使用DL技术比短短几年前要容易得多。流行的DL框架,如Tensorflow而且Pytorch已经成熟到他们表现良好和非常精确的地步。机器学习(ML)环境,如Databricks的Lakehouse平台bob体育客户端下载管理MLflow已经使得以分布式方式运行DL变得非常容易,使用像Horovod而且熊猫udf.
挑战
今天仍然存在的一个关键挑战是如何以可控和可重复的方式最好地自动化和操作DL机器学习管道。技术包括Kubeflow提供解决方案,但它们通常是重量级的,需要大量的特定知识,而且可用的托管服务很少——这意味着工程师必须自己管理这些复杂的环境。将DL管道的管理集成到数据和分析平台本身中会简单得多。bob体育客户端下载
这篇文章将概述如何在Databricks环境中轻松地管理DL管道Databricks作业编排,目前是一个公开预览功能。工作编制使管理多步ML管道,包括深度学习管道,易于按照设定的时间表构建、测试和运行。请注意,所有的代码都是可用的GitHub回购.有关如何访问它的说明,请参阅本博客的最后一节。
让我们看一个真实的业务用例。CoolFundCo是一家(虚构的)投资公司,每天分析数万张图片,以确定它们所代表的内容并对其进行分类。CoolFundCo以多种方式使用这种技术:例如,通过查看全国各地商场的图片来确定短期经济趋势。然后,该公司将其作为投资的数据点之一。CoolFundCo的数据科学家和ML工程师花费了大量的时间和精力来管理这个过程。CoolFundCo拥有大量现有映像,并且每天都有大量新映像发送到他们的云对象存储(在本例中微软Azure数据湖存储(ADLS)),但也有可能是AWS S3或谷歌云存储(GCS).
目前,管理这个过程是一场噩梦。每天,他们的工程师都会复制图像,运行他们的深度学习模型来预测图像类别,然后通过将模型的输出保存在CSV文件中来共享结果。DL模型必须定期进行验证和重新训练,以确保保持图像识别的质量,目前这也是团队在自己的开发环境中手动进行的过程。他们经常不知道底层ML模型的最新和最好的版本,以及他们使用哪些映像来训练当前的生产模型。管道的执行发生在外部工具中,它们必须管理不同的环境来控制端到端流程。
解决方案
为了给混乱带来秩序,CoolFundCo采用了Databricks来自动化这个过程。首先,他们将流程分为培训和评分工作流。
在培训流程中,他们需要:
- 从云存储中摄取标记图像到集中的湖屋
- 使用现有的标记图像来训练机器学习模型
- 在集中式存储库中注册新训练的模型
他们的每个工作流程都由一组任务组成,以实现预期的结果。每个任务使用不同的工具和功能集,因此需要不同的资源配置(集群大小、实例类型、CPU vs. GPU等)。他们决定在一个单独的Databricks笔记本中实现这些任务。解析体系结构如图2所示:
评分流程由以下步骤组成:
- 从云存储中摄取新的图像到集中的湖屋
- 尽可能快地使用存储库中的最新模型对每个图像进行评分
- 将评分结果存储在集中的湖屋中
- 将图像的子集发送到手动标记服务以验证准确性
DL培训管道
让我们分别来看一下培训管道的每个任务:
从云存储摄取标记图像到集中的数据湖[所需的基础设施:大型CPU集群]
该过程的第一步是将图像数据加载为模型训练可用的格式。他们使用Databricks加载所有的训练数据(即新图像)自动加载程序当新数据文件到达云存储时,它会增量地、高效地处理这些文件。自动加载功能有助于数据管理和自动处理不断到达的新图像。CoolFundCo的团队决定使用Auto Loader的“触发一次”功能,该功能允许Auto Loader流作业启动,检测自上次训练作业运行以来的任何新图像文件,只加载这些新文件,然后关闭流。它们使用Apache Spark的™binaryFile读取器加载所有图像,并从文件名解析标签,并将其存储为自己的列。binaryFile阅读器将每个图像文件转换为DataFrame中的单个记录,其中包含原始内容以及文件的元数据。DataFrame将有以下列:
- path (StringType):文件的路径。
- modificationTime (TimestampType):文件修改时间。在某些Hadoop文件系统实现中,此参数可能不可用,并且该值将被设置为默认值。
- length (LongType):以字节为单位的文件长度。
- content (BinaryType):文件的内容。
raw_image_df=spark.readStream.format \(“cloudFiles”).option(“cloudFiles。format", "binaryFile") \.option("recursiveFileLookup", "true") \.option("pathGlobFilter", "*.jpg") \.load (caltech_256_path)
image_df=raw_image_df.withColumn(“标签”,子字符串(element_at(分裂(raw_image_df [“路径”],' / '),-2),1,3.).投(IntegerType ())) \.withColumn(“load_date”,当前日期())
然后将所有数据写入a三角洲湖表格,他们可以在其余的训练和评分管道中访问和更新。Delta Lake为数据湖增加了可靠性、可伸缩性、安全性和性能,并允许使用标准SQL查询进行类似数据仓库的访问——这就是为什么这种类型的体系结构也被称为lakehouse。Delta Tables自动添加版本控制,因此每当该表更新时,一个新版本将指示已添加的图像。
使用现有的标记图像来训练机器学习模型[所需的基础设施:GPU集群]
该过程的第二步是使用预先标记的数据来训练模型。他们可以使用Petastorm,一个开源bob下载地址的数据访问库,允许直接从Parquet文件和Spark DataFrames训练深度学习模型。他们将图像的Delta表直接读入Spark Dataframe,将每张图像处理为正确的形状和格式,然后使用Petastorm的Spark Converter为他们的模型生成输入特征。
Converter_train = make_spark_converter(df_train)Converter_val = make_spark_converter(df_val)deftransform_row(pd_batch):pd_batch [“特性”] = pd_batch[“内容”]。地图(λx:预处理(x))Pd_batch = Pd_batch .drop(标签=“内容”轴=1)返回pd_batch
transform_spec_fn = TransformSpec(transform_row,edit_fields = [(“特性”, np。float32 IMG_SHAPE,假)),selected_fields = [“特性”,“标签”])与converter_train.make_tf_dataset (transform_spec = transform_spec_fn,cur_shard = hvd.rank (), shard_count = hvd.size (),batch_size = batch_size)作为train_reader, \converter_val.make_tf_dataset (transform_spec = transform_spec_fn,cur_shard = hvd.rank (), shard_count = hvd.size (),batch_size = batch_size)作为test_reader:#特遣部队。Keras只接受元组,不接受命名元组Train_dataset = train_reader。地图(λX:(X .features, X .label))steps_per_epoch =len(converter_train) // (BATCH_SIZE * hvd.size())Test_dataset = test_reader。地图(λX:(X .features, X .label))
为了扩大深度学习训练的规模,他们想要利用的不仅仅是单个大型GPU,而是GPU集群。在Databricks上,这可以通过导入和使用简单地完成HorovodRunner这是一个通用的API,可以使用Uber在Spark Cluster上运行分布式深度学习工作负载Horovod框架。
使用MLflow,团队能够跟踪整个模型训练过程,包括超参数、训练持续时间、损失和准确性指标,以及模型工件本身,直到MLflow实验。MLflow API有自动对数最常见的ML库的功能,包括Spark MLlib, Keras, Tensorflow, SKlearn和XGBoost。该特性自动记录特定于模型的度量、参数和模型工件。在Databricks上,当使用Delta训练数据源时,自动日志记录还跟踪用于训练模型的数据版本,这使得在原始数据集上运行的任何训练都可以轻松再现。
- 在MLflow注册表中注册新训练的模型-[所需的基础设施:单节点CPU集群]
模型训练管道的最后一步是将新训练的模型注册到Databricks模型注册表。使用前一个训练步骤中存储的工件,他们可以创建一个新版本的图像分类器。随着模型从一个新的模型版本过渡到阶段,然后是生产,他们可以开发和运行其他可以验证模型性能、可伸缩性等的任务。Databricks Models UI显示了模型的最新状态(见下文)。
得分管道
接下来,我们可以看看CoolFundCo评分流程中的步骤:
从云存储中摄取新的未标记的映像到集中的数据湖[所需的基础设施:大型CPU集群]
评分过程的第一步是将新着陆的图像数据加载为可供模型分类的可用格式。他们使用Databricks Auto Loader加载所有的新图像。CoolFundCo的团队再次决定使用Auto Loader的触发一次功能,该功能允许Auto Loader流作业启动,检测自上次评分作业运行以来的任何新图像文件,只加载这些新文件,然后关闭流。将来,他们可以选择将此作业更改为连续流。在这种情况下,登陆云存储的新图像将在到达后立即被拾取并发送给模型进行评分。
作为最后一步,所有未标记的图像存储在三角洲湖表,可以访问和更新在他们的评分管道的其余部分。
对新图像进行评分,并在Delta表中更新它们的预测标签[所需的基础设施:GPU集群]
一旦新图像加载到Delta表中,它们就可以运行我们的模型评分笔记本。这个笔记本取表中还没有标签或预测标签的所有记录(图像),加载在我们的训练管道中训练的分类器模型的生产版本,使用模型对每个图像进行分类,然后用预测标签更新Delta表。因为我们使用的是Delta格式,所以我们可以使用合并成命令更新表中有新预测的所有记录。
%sql合并成image_data我使用仅仅p在i.path=p.path当匹配然后更新集*当不匹配然后插入*
发送由Azure手动标记的图像[所需的基础设施:单节点CPU]
CoolFundCo使用Azure机器学习标签服务手动标记新图像的子集。具体来说,他们对DL模型不能做出非常自信的决定的图像进行采样——对标签的确定程度不到95%。然后,他们可以轻松地从Delta表中选择这些图像,其中所有的图像、图像元数据和标签预测都是作为评分管道的结果存储的。然后,这些图像被写入一个位置,用作标记服务的数据存储。通过标签服务的增量刷新,标签项目可以找到要标记的图像并进行标记。然后,标记服务的输出可以被Databricks重新处理并合并到Delta表中,为图像填充标签字段。
工作流部署
一旦训练、评分和标记任务笔记本测试成功,它们就可以投入生产管道。这些管道将根据团队期望的时间表定期运行培训、评分和标记过程(例如,每天、每周、两周或每月)。对于这个功能,Databricks的新岗位编制特性是理想的解决方案,因为它使您能够可靠地调度和触发包含具有依赖关系的多个任务的job序列。每个笔记本都是一个任务,因此,它们的整个训练管道创建了一个有向无环图(DAG)。这与开源工具类似bob下载地址Apache气流创建;然而,好处是整个端到端流程完全嵌入在Databricks环境中,因此非常容易在一个地方管理、执行和监视这些流程。
设置任务
工作流中的每个步骤或“任务”都有自己分配的Databricks Notebook和集群配置。这允许工作流中的每个步骤在不同的集群上执行,具有不同数量的实例、实例类型(内存vs优化的计算、CPU vs GPU)、预安装的库、自动伸缩设置等等。它还允许配置参数并将其传递给各个任务。
为了使用Jobs Orchestration公共预览功能,它必须是在Databricks工作区中启用通过工作区管理员。它将取代现有的(单个任务)作业特性,并且不能被反转。因此,如果可能的话,最好在单独的Databricks工作空间中尝试,因为可能存在与以前定义的单个任务作业的兼容性问题。
{“email_notifications”: {},“名称”:“Pipeline_DL_Image_Train”,“max_concurrent_runs”:1,“任务”:【{“existing_cluster_id”:“0512 - 123048 hares793”,“notebook_task”: {“notebook_path”:”奥利弗/回购。koernig/databricks_dl_demo/Deep Learning Image Prep -初始数据加载,“base_parameters”: {“image_path”:“/ tmp / 256 _objectcategories /”}},“email_notifications”: {},“task_key”:“Load_Images_for_Training”},...}
图像评分工作流是一个独立的Jobs Orchestration管道,每天执行一次。由于gpu可能无法提供足够的图像评分优势,所有节点都使用常规的基于cpu的计算集群。
最后,为了进一步提高和验证分类的准确性,评分工作流选择图像的一个子集,并将其提供给手动图像标记服务。在这个例子中,我们使用Azure ML的手动标签服务.其他云提供商也提供类似的服务。
执行和监视作业编排管道
当Jobs Orchestration管道执行时,用户可以在Jobs查看器中实时查看进度。这样可以方便地检查管道是否正确运行,以及经过了多少时间。
有关如何管理作业编排管道的详细信息,请参阅在线文档.
结论
在Databricks中实现DL管道后,CoolFundCo能够解决他们的主要挑战:
- 所有图像及其标签都存储在一个集中和管理的位置,易于工程师、数据科学家和分析师访问。
- 模型的新版本和改进版本在中央存储库(MLflow注册中心)中进行管理和访问。对于哪些模型是正确测试的,哪些模型是最新的,哪些模型可以用于生产,不再有困惑。
- 不同的管道(训练和评分)可以在不同的时间运行,同时使用不同的计算资源,甚至在相同的工作流中。
- 通过使用Databricks作业编排,管道的执行发生在相同的Databricks环境中,易于调度、监视和管理。
使用这种新的改进流程,数据科学家和机器学习工程师现在可以专注于真正重要的事情——获得深入的见解——而不是浪费时间争论与机器学习操作相关的问题。
开始
本博客的所有代码都可以在以下GitHub存储库中找到
https://GitHub.com/koernigo/databricks_dl_demo
方法将回购复制到您的工作区Databricks回购功能.
笔记
演示中使用的图像基于Caltech256数据集,可以使用Kaggle访问,例如数据集存储在/tmp/256_ObjectCategories/下的Databricks文件系统(DBFS)中。在repo中提供了如何使用Databricks Notebook下载和安装数据集的示例:
https://github.com/koernigo/databricks_dl_demo/blob/main/Create%20Sample%20Images.py
在repo中还提供了一个设置笔记本。它包含整个管道中使用的Delta表的DDL。它还将上面步骤中从Kaggle下载的图像数据的子集分离到单独的评分文件夹中。该文件夹位于DBFS目录下/tmp/unlabeled_images/256_ObjectCategories/,它将表示当模型需要对未标记的图像进行评分时,未标记图像的位置
这个笔记本可以在这里找到回购:
https://github.com/koernigo/databricks_dl_demo/blob/main/setup.py
训练和评分作业也包含在回购中,以JSON文件表示。
Jobs Orchestration UI目前不允许使用该UI通过JSON创建作业。如果您想使用来自回购的JSON,则需要安装砖CLI.
安装并配置CLI后,请按照以下步骤在Databricks工作区中复制作业:
- 本地克隆回购(命令行):
git克隆https://github.com/koernigo/databricks_dl_demo
cd databricks_dl_demo - 创建GPU和非GPU (CPU集群:
对于这个演示,我们使用一个支持gpu的集群和一个基于cpu的集群。请创建两个集群。集群规格示例可以在这里找到:
https://github.com/koernigo/databricks_dl_demo/blob/main/dl_demo_cpu.json
https://github.com/koernigo/databricks_dl_demo/blob/main/dl_demo_ml_gpu.json
请注意,这个集群规范中的某些特性是Azure Databricks特定的(例如,节点类型)。如果在AWS或GCP上运行代码,则需要使用等效的GPU/CPU节点类型。 - 编辑JSON作业规范
选择您想要创建的工作JSON规范,例如培训管道(https://github.com/koernigo/databricks_dl_demo/blob/main/Pipeline_DL_Image_train.json).您需要将所有集群的cluster-ids替换为在上一步中创建的集群(CPU和GPU)。 - 编辑Notebook路径
在现有的JSON中,回购路径是/Repos/(电子邮件保护)/…请在您的工作空间中找到并将它们替换为repos路径(通常是/ repos / - 使用Databricks CLI创建作业
——json文件Pipeline_DL_Image_train。json -概要 - 在Jobs UI中验证作业是否已成功创建