开始
加载和管理数据
处理数据
政府
引用和资源
2023年8月1日更新
给我们反馈
本文向您展示了如何创建和部署一个端到端的数据处理管道,包括如何摄取原始数据,转换数据和运行分析处理数据。
请注意
尽管本文演示如何创建一个完整的数据管道使用砖<一个class="reference internal" href="//www.neidfyre.com/docs.gcp/notebooks/index.html">笔记本电脑和一个砖<一个class="reference internal" href="//www.neidfyre.com/docs.gcp/workflows/index.html">工作编排工作流,砖推荐使用<一个class="reference internal" href="//www.neidfyre.com/docs.gcp/delta-live-tables/index.html">三角洲生活表声明接口为构建可靠、可维护、可测试的数据处理管道。
一个数据管道实现所需的步骤,将数据从源系统,基于需求的数据转换,将数据存储在一个目标系统。一个数据管道包括所有必要的流程将原始数据转化为准备数据,用户可以使用。例如,一个数据管道可能准备数据,数据分析师和数据科学家可以从数据中提取价值分析和报告。
提取、转换和加载(ETL)流程是一种常见的数据管道。在ETL处理中,数据从源系统摄取和书面暂存区域,改变了基于需求(确保数据质量,彻底删除记录,等等),然后写入到目标系统数据仓库或数据等。
来帮助你开始构建数据管道砖,本文中的例子包括走过创建一个数据处理流程:
使用砖特性探索原始数据集。
创建一个砖笔记本摄取原始源数据和原始数据写入目标表。
创建一个砖笔记本将原始的源数据,转换后的数据写入目标表。
创建一个砖笔记本查询转换后的数据。
自动化的数据管道砖的工作。
你登录到砖和数据科学与工程的工作区。
你有<一个class="reference internal" href="//www.neidfyre.com/docs.gcp/clusters/clusters-manage.html">允许创建一个集群或<一个class="reference internal" href="//www.neidfyre.com/docs.gcp/clusters/clusters-manage.html">访问一个集群。
(可选)发布表统一目录,你必须创建一个<一个class="reference internal" href="//www.neidfyre.com/docs.gcp/data-governance/unity-catalog/create-catalogs.html">目录和<一个class="reference internal" href="//www.neidfyre.com/docs.gcp/data-governance/unity-catalog/create-schemas.html">模式在统一目录。
在这个例子中使用的数据集的一个子集<一个class="reference external" href="http://labrosa.ee.columbia.edu/millionsong/">百万歌曲数据集,当代音乐特征和元数据的集合。这个数据是可用的<一个class="reference internal" href="//www.neidfyre.com/docs.gcp/dbfs/databricks-datasets.html">样本数据集包括在你的砖工作区。
执行数据处理和分析在这个示例中,创建一个集群来提供所需的计算资源运行命令。
因为这个例子使用一个示例数据集存储在DBFS和建议坚持表<一个class="reference internal" href="//www.neidfyre.com/docs.gcp/data-governance/unity-catalog/index.html">统一目录,您将创建一个集群配置单用户访问模式。单个用户访问提供完整访问DBFS同时启用访问统一目录。看到<一个class="reference internal" href="//www.neidfyre.com/docs.gcp/dbfs/unity-catalog.html">最佳实践DBFS和统一目录。
点击计算在侧边栏。
在计算页面,点击创建集群。
在新的集群页面,输入惟一名称的集群。
在访问模式中,选择单用户。
在单用户或服务主体访问,请选择您的用户名。
保留剩余的值默认状态,然后单击创建集群。
更多地BOB低频彩了解砖集群,明白了<一个class="reference internal" href="//www.neidfyre.com/docs.gcp/clusters/index.html">集群。
学习如何使用砖接口来探索原始源数据,看看<一个class="reference internal" href="//www.neidfyre.com/docs.gcp/getting-started/data-pipeline-explore-data.html">探索数据管道的源数据。如果你想去直接摄取和准备数据,继续<一个class="reference internal" href="//www.neidfyre.com/docs.gcp/getting-started/#ingest-prepare-data">步骤3:摄取原始数据。
在这个步骤中,您的原始数据加载到一个表,使其可用于进一步的处理。等砖平台来管理数据资产表、砖建议bob体育客户端下载<一个class="reference internal" href="//www.neidfyre.com/docs.gcp/data-governance/unity-catalog/index.html">统一目录。然而,如果你没有权限创建所需的目录和模式发布表统一目录,你仍然可以完成以下步骤通过发布蜂巢metastore表。
摄取数据,数据砖推荐使用<一个class="reference internal" href="//www.neidfyre.com/docs.gcp/ingestion/auto-loader/index.html">自动加载程序。自动加载程序自动检测和过程到达云对象存储新文件。
您可以配置自动加载程序加载数据的自动检测模式,允许您初始化表没有显式地声明数据模式和发展介绍了新列的表模式。这消除了需要手动跟踪和应用模式会随着时间而改变。砖建议模式推理在使用自动加载程序。然而,当看到在数据探索一步,歌曲的数据不包含标题信息。因为头没有存储的数据,你需要明确定义的模式,在下一个示例中所示。
在侧边栏中,单击新并选择笔记本从菜单中。的创建笔记本对话框出现了。
例如,输入一个名称的笔记本摄取歌曲数据。默认情况下:
摄取歌曲数据
Python是所选的语言。
笔记本连接到最后您使用集群。在这种情况下,集群中创建<一个class="reference internal" href="//www.neidfyre.com/docs.gcp/getting-started/#create-a-cluster">步骤1:创建一个集群。
点击创建。
输入以下的第一个细胞笔记本:
从pyspark.sql.types进口倍增式,IntegerType,StringType,StructType,StructField#定义变量中使用以下代码file_path=“/ databricks-datasets /歌曲/数据- 001 /”table_name=“<表名称>”checkpoint_path=“/ tmp / pipeline_get_started / _checkpoint / song_data”模式=StructType((StructField(“artist_id”,StringType(),真正的),StructField(“artist_lat”,倍增式(),真正的),StructField(“artist_long”,倍增式(),真正的),StructField(“artist_location”,StringType(),真正的),StructField(“artist_name”,StringType(),真正的),StructField(“持续时间”,倍增式(),真正的),StructField(“end_of_fade_in”,倍增式(),真正的),StructField(“关键”,IntegerType(),真正的),StructField(“key_confidence”,倍增式(),真正的),StructField(“响度”,倍增式(),真正的),StructField(“发布”,StringType(),真正的),StructField(“song_hotnes”,倍增式(),真正的),StructField(“song_id”,StringType(),真正的),StructField(“start_of_fade_out”,倍增式(),真正的),StructField(“节奏”,倍增式(),真正的),StructField(“time_signature”,倍增式(),真正的),StructField(“time_signature_confidence”,倍增式(),真正的),StructField(“标题”,StringType(),真正的),StructField(“年”,IntegerType(),真正的),StructField(“partial_sequence”,IntegerType(),真正的)])(火花。readStream。格式(“cloudFiles”)。模式(模式)。选项(“cloudFiles.format”,“csv”)。选项(“9”,”\ t”)。负载(file_path)。writeStream。选项(“checkpointLocation”,checkpoint_path)。触发(availableNow=真正的)。toTable(table_name))
如果您使用的是统一目录,替换<表名称>与目录、模式和表名控制摄入记录(例如,data_pipelines.songs_data.raw_song_data)。否则,取代<表名称>与一个表的名称包含摄取记录,例如,raw_song_data。
<表名称>
data_pipelines.songs_data.raw_song_data
raw_song_data
取代< checkpoint-path >的路径的目录DBFS维持检查点文件,例如,/ tmp / pipeline_get_started / _checkpoint / song_data。
< checkpoint-path >
/ tmp / pipeline_get_started / _checkpoint / song_data
点击,并选择运行单元。这个例子定义了使用信息从数据模式自述、摄食的歌曲中包含的所有文件的数据file_path,并将数据写入指定的表table_name。
自述
file_path
table_name
准备原始数据进行分析,以下步骤,将原始的歌曲数据过滤掉不需要的列和添加一个新的字段,其中包含一个时间戳的创建新记录。
输入一个名称的笔记本。例如,准备歌曲数据。改变默认的语言SQL。
准备歌曲数据
第一个单元格输入以下的笔记本:
创建或取代表<表- - - - - -的名字>(artist_id字符串,artist_name字符串,持续时间双,释放字符串,节奏双,time_signature双,标题字符串,一年双,processed_time时间戳);插入成<表- - - - - -的名字>选择artist_id,artist_name,持续时间,释放,节奏,time_signature,标题,一年,current_timestamp()从<生- - - - - -歌曲- - - - - -表- - - - - -的名字>
如果您使用的是统一目录,替换<表名称>与目录、模式和表名包含过滤和转换记录(例如,data_pipelines.songs_data.prepared_song_data)。否则,取代<表名称>与一个表的名称包含过滤和转换记录(例如,prepared_song_data)。
data_pipelines.songs_data.prepared_song_data
prepared_song_data
取代< raw-songs-table-name >包含原始的歌曲的名字表记录前面步骤中摄取。
< raw-songs-table-name >
点击,并选择运行单元。
在这个步骤中,您将通过添加查询分析歌曲数据处理管道。这些查询使用前面步骤中创建的准备记录。
输入一个名称的笔记本。例如,分析歌曲数据。改变默认的语言SQL。
分析歌曲数据
——艺术家每年发布的大部分歌曲吗?选择artist_name,数(artist_name)作为num_songs,一年从<准备- - - - - -歌曲- - - - - -表- - - - - -的名字>在哪里一年>0集团通过artist_name,一年订单通过num_songsDESC,一年DESC
取代< prepared-songs-table-name >包含准备数据的表的名称。例如,data_pipelines.songs_data.prepared_song_data。
< prepared-songs-table-name >
点击在细胞操作菜单,选择添加下面的细胞并输入以下新细胞:
——为你的DJ列表找到歌曲选择artist_name,标题,节奏从<准备- - - - - -歌曲- - - - - -表- - - - - -的名字>在哪里time_signature=4和节奏之间的One hundred.和140年;
取代< prepared-songs-table-name >准备表的名称在上一步中创建的。例如,data_pipelines.songs_data.prepared_song_data。
运行查询和查看输出,点击运行所有。
您可以创建一个工作流自动化运行数据摄入,处理和分析步骤使用砖的工作。
在你的数据科学与工程工作区,做以下之一:
点击工作流在侧边栏,然后单击。
在侧边栏中,单击新并选择工作。
在任务对话框上任务选项卡中,取代添加一个名称为你的工作…对你的工作名称。例如,“歌工作流”。
在任务名称第一个任务,输入一个名称,例如,Ingest_songs_data。
Ingest_songs_data
在类型,选择笔记本任务类型。
在源中,选择工作空间。
使用文件浏览器找到数据摄入笔记本,点击笔记本名称,点击确认。
在集群中,选择Shared_job_cluster或集群中创建创建一个集群的一步。
创建一个集群
点击您刚才创建的任务并选择以下笔记本。
在任务名称任务,输入一个名称,例如,Prepare_songs_data。
Prepare_songs_data
使用文件浏览器找到数据准备笔记本,点击笔记本名称,点击确认。
在任务名称任务,输入一个名称,例如,Analyze_songs_data。
Analyze_songs_data
使用文件浏览器找到数据分析笔记本,点击笔记本名称,点击确认。
工作流运行,点击。查看<一个class="reference internal" href="//www.neidfyre.com/docs.gcp/workflows/jobs/monitor-job-runs.html">细节的运行,单击链接开始时间列的运行<一个class="reference internal" href="//www.neidfyre.com/docs.gcp/workflows/jobs/monitor-job-runs.html">工作运行视图。单击每个任务视图细节任务运行。
查看结果当工作流程完成后,点击最后的数据分析任务。的输出页面出现并显示查询结果。
演示使用砖的工作安排计划的工作流程,这个开始的例子将摄入,准备,和分析步骤为独立的笔记本,然后每个笔记本是用来创建一个任务的工作。如果所有的处理都包含在一个笔记本电脑,您可以轻松地安排直接从砖笔记本笔记本UI。看到<一个class="reference internal" href="//www.neidfyre.com/docs.gcp/notebooks/schedule-notebook-jobs.html">创建和管理计划的笔记本工作。
一个常见需求是运行一个数据管道计划的基础上。定义一个时间表运行管道的工作:
点击工作流在侧边栏。
在的名字列,单击工作名称。侧板显示工作细节。
点击添加触发器在工作细节面板并选择计划在触发类型。
指定时间、起始时间和时区。可选的选择显示Cron语法复选框来显示和编辑的时间表<一个class="reference external" href="http://www.quartz-scheduler.org/documentation/quartz-2.3.0/tutorials/crontrigger.html">石英Cron语法。
点击保存。
更多地BOB低频彩了解砖笔记本,看到的<一个class="reference internal" href="//www.neidfyre.com/docs.gcp/notebooks/index.html">介绍砖笔记本。
了解更BOB低频彩多关于砖工作,明白了<一个class="reference internal" href="//www.neidfyre.com/docs.gcp/workflows/index.html">砖的工作是什么?。
了解更BOB低频彩多关于三角洲湖,看到<一个class="reference internal" href="//www.neidfyre.com/docs.gcp/delta/index.html">三角洲湖是什么?。
学习更BOB低频彩多关于数据处理管道与达美住表,看看<一个class="reference internal" href="//www.neidfyre.com/docs.gcp/delta-live-tables/index.html">δ生活是什么表?。