简化变化数据获取与数据砖三角洲
得到的早期预览O ' reilly的新电子书一步一步的指导你需要开始使用三角洲湖
注意:我们也推荐你阅读高效的插入到数据与砖湖泊三角洲这解释了使用MERGE命令做高效的插入和删除。
一个常见的用例,我们遇到在砖是顾客执行变化数据捕获(CDC)从一个或多个来源为一组砖三角洲的表。这些来源可能是本地或在云中,操作性事务存储,或者数据仓库。普通胶水,将他们都是变更集生成:
- 使用ETL工具像Oracle GoldenGate或Informatica PowerExchange,
- 从供应商提供的变化表(例如,甲骨文变化数据捕获),或
- 用户维护数据库表,捕获变更集使用插入/更新/删除触发器
和他们希望合并这些变更集成砖三角洲。基于我们的经验执行这个用例在公共和私人部门的客户,我们提供一个参考体系结构来执行中心使用特性今天在砖三角洲。
背景
变化数据捕获或疾病预防控制中心,简言之,是指捕捉变化的过程一组数据源和合并在一组目标表,通常在一个数据仓库。这些通常是刷新每晚每小时,或者在某些情况下,设置(例如,每15分钟)。我们称这一时期为刷新周期。
给定表的组改变记录在一个刷新周期称为一个变更集。最后,我们将记录在一个变更集的集合作为一个记录集具有相同的主键。直觉这是指不同的变化最终表中相同的记录。
国旗 | ID | 价值 | CDC_TIMESTAMP |
---|---|---|---|
我 | 1 | 10 | 2018-01-01 16:02:00 |
U | 1 | 11 | 2018-01-01 16:02:01 |
D | 1 | 11 | 2018-01-01 16:02:03 |
U | 2 | 20. | 2018-01-01 16:02:00 |
D | 3 | 30. | 2018-01-01 16:02:00 |
表1:更改集C表2018-01-01 17:00:00时刻T
表1显示了一个示例更改集C表T在给定的一段时间。更改集C有四个列:
- 一个标志指示是否变化类型的I / U / D(插入/更新/删除),
- 一个ID列惟一地标识记录集,
- 一个值列更改记录被更新时,和
- CDC_TIMESTAMP显示插入/更新/删除记录时。目标表T除了国旗列有相同的模式。
在这种变更集,记录ID 1插入、更新和删除(行1、2和3)。这样的记录设置为ID = 1有三个记录。记录ID 2只是更新,记录ID 3被删除。它是安全的假设插入记录ID 2和3在某种程度上。
CDC砖前三角洲
前三角洲,样本疾病预防控制中心管道我们的一些客户是:甲骨文Informatica = > = >引发夜间批处理作业= >砖。
在这个场景中,Informatica推动变更集30多个不同的数据来源并巩固他们在Oracle数据仓库。大约一天一次,砖的工作从Oracle检索这些变更集,通过JDBC、砖和刷新表。虽然这计划成功productionized,它有两个主要的缺点:
- 它已经超载的Oracle实例添加载荷,导致限制,这些ETL作业如何运行时,和
- 刷新率最好是在夜间,由于并发局限性的香草拼花表(砖前三角洲)。
疾病预防控制中心与砖δ
与砖三角洲,CDC管道现在是流线型的,可以更频繁地刷新:Informatica = > S3 = >δ= >火花每批工作。在这个场景中,Informatica写变更集直接使用Informatica S3的镶花的作家。砖工作运行所需的sub-nightly刷新率(例如,每隔15分钟,每小时,每3小时,等等)来读取这些变更集和更新目标砖三角洲的表。
微小的变化,这管道也适应阅读来自卡夫卡疾控中心记录,因此,管道就像卡夫卡= > = >δ火花。在本节的其余部分,我们详细说明这个过程,以及我们如何使用砖三角洲作为沉疾控中心工作流程。
与我们的一个客户,我们实现了这些疾病预防控制中心技术最大和最频繁刷新ETL管道。在这个客户场景中,Informatica写道一个变更集S3为65表,有任何变化每15分钟。而变更集本身是相当小的(使用插入覆盖
这种方法的基本思想是保持暂存表,对于一个给定的记录集的所有更新和积累最后表,其中包含当前最新的快照,用户可以查询。
图1:插入覆盖流从源Informatica砖三角洲的云存储
对于每一个刷新周期,火花作业将运行两个INSERT语句。
- 插入(插入1):阅读S3的变更集或卡夫卡在这个刷新周期,这些更改并将其插入暂存表。
- 插入覆盖(插入2):获取当前版本的每一个从暂存表记录集和最终的表中覆盖这些记录。
图2:插入覆盖流从源到卡夫卡结构化流砖三角洲
一个熟悉的分类方案中心从业人员是不同的类型ala处理更新缓慢变化维度(SCDs)。我们暂存表映射接近SCD 2型方案而最终表地图最接近一个SCD 1型方案。
实现
让我们深入了解两个步骤,首先第一个插入。
%scalaval变更集=数组(file1 file2,…)spark.read。拼花(变更集:_*).createOrReplaceTempView(“增量”)%sql插入成T_STAGING分区(CREATE_DATE_YEAR)选择ID,价值,CDC_TIMESTAMP从增量
这里,第一个单元格定义一个临时视图在美联储的变更集插入
在第二单元。的插入
相当简单的异常分区
条款,让我们花一些时间来打开。
回想一下,在云数据存储和HDFS,记录存储在文件和一个更新的单位为一个文件。砖的δ,这是检查机关文件,提出了帖子。记录需要更新时,火花需要读取和改写整个文件。因此,重要的是本地化更新尽可能少的文件。因此,我们分区分段和最后一个表的列,最小化的行数感动在疾控中心,并提供分区的分区列规范(Azure|AWS),这样砖三角洲可以插入的记录在正确的分区T_STAGING
。
接下来,我们看看第二个插入。
%sql插入覆盖表T_FINAL分区(CREATE_DATE_YEAR)选择ID,价值,CDC_TIMESTAMP从(选择一个。*,排名()在(分区通过ID订单通过CDC_TIMESTAMPDESC)作为RNK从T_STAGING。*在哪里CREATE_DATE_YEAR在(2018年,2016年,2015年)B)在哪里B.RNK=1和B.FLAG' D '
让我们从内部开始查询读取T_STAGING
。回想一下,staging表可能有任意数量的插入、更新和删除操作对于一个给定的记录集。这些变化可能来自一个给定的变更集(例如,ID = 1
表1中有3个变化),或者它可能遇到变更集,因为它们插入分级表跨多个刷新时间。内排名
随着外过滤器B.RNK=1和B.FLAG' D '
确保:
- 我们只选择最近的变化对于给定的记录集,和
- 最近的变化是一个在哪里
' D '
,我们排除整个纪录被插入在最后的表,从而实现删除记录的目的。
接下来,注意到在CREATE_DATE_YEAR (…)
条款。这与分区(CREATE_DATE_YEAR)
在外层查询确保砖δ只会覆盖这些分区,即2018年,2016年,2015年,其余被搁浅。值得一提的是,虽然我们提供硬编码值以上分区清醒,在实际的实现中,这些分区提供了一个Scala从查询列表动态生成的变更集,等
val partitionsToOverwrite = spark.sql (“选择年(to_date (create_date。”MM / dd / yyyy“从增量))”)…spark.sql (s”“”插入覆盖T_FINAL…在($ {CREATE_DATE_YEAR partitionsToOverwrite.mkString (", "))…”“”)
性能
正如上面提到的,砖三角洲CDC管道可以并发运行与用户查询一致的数据视图。在这里,我们显示两个特性在砖三角洲,可用于优化读者和作家。
- 分区修剪:在第二个插入(即以上。,the writers), the query optimizer in Databricks Delta looks at the
分区
规范和在
列表中在哪里
条款只读取和改写那些需要更新的分区。在实践中,这可以很容易地减少表的部分接触到一半或者,通常情况下,低得多,从而帮助第二插入,同时,本地化更新T_FINAL
,选择
查询T_STAGING
。 - 数据跳/ ZORDER索引:用户查询
T_FINAL
的范围可以从BI工具特别的SQL查询。在这里,可能有也可能没有分区列的查询CREATE_DATE_YEAR
在在哪里
条款。例如,
%sql选择…从T_FINAL在哪里COL1=瓦尔和COL2=瓦尔
在这种情况下,既不COL1
也不COL2
是分区规范的一部分。然而,用户可以创建一个z值指数这两个列:
优化T_FINAL ZORDER (COL1, COL2)
下面,砖三角洲集群镶花的文件的z值,查询等上面的联系只有那些文件,可能包含COL1 =瓦尔
和COL2 =瓦尔
。
我们注意到两个细节的z值指数扩大查询的列表,他们可以使用
- 在上面的案例中,只有过滤查询
COL1
(或者,,COL2
)也可以受益于该指数以来,不同,复合索引的RDBMS, z值指数并不偏向查询过滤器在前缀索引列的列表。 - 不像上面,如果查询分区列上也有一个过滤器,然后分区修剪和z顺序索引可以大大减少文件数量在查询时。
我们称这个优秀的读者帖子有关数据为什么以及如何跳过和z值索引有或没有分区修剪工作。
并发性
提出了在早些时候帖子,砖三角洲将事务支持添加到云存储。我们依靠这种支持在以下方式。而覆盖分区,砖三角洲将确保除了创建新的镶花文件,离开旧铺文件在用户查询并发运行数据。查询开始覆盖完成后将新数据。三角洲可靠地使用事务日志查询指向一致的数据版本。
压实和清理
随着时间的推移,这两个T_STAGING
和T_FINAL
积累过期和未使用的记录。例如,任何记录T_STAGING
在哪里等级> 1
,或任何文件T_FINAL
标志着陈旧的覆盖该文件。而这并不影响查询的正确性,降低疾病预防控制中心和查询性能。值得庆幸的是,这些维护任务在砖三角洲的简化。清除旧的文件T_FINAL
例如,是那么简单
%sql真空T_FINAL
没有保留参数(见真空文档:Azure|AWS),该清洗所有过期文件,不再是在事务日志和7天以上,这是足够的时间来确保没有并发读者访问这些文件。
打扫屋子上T_STAGING
,另一方面,包括删除所有记录等级> 1
。最简单的方法就是复制T_FINAL
成T_STAGING
%sql插入覆盖T_STAGING选择*从T_FINAL
上面的命令和前面所示优化
命令可以组织成一个笔记本维护任务和调度运行砖的工作。
作为即将到来的砖运行时的5.0发行版的一部分,使用并入可能成为另一个伟大的方法由于砖三角洲的性能改进和支持删除(D)记录。
Productionizing管道
砖作为一个平台不仅有助于开发和构bob体育客户端下载建ETL productionizing这些管道管道也加速时间。在这里,我们描述了两个特性和使能技术在Apache火花帮助我们productionize CDC管道。
配置驱动编程
构建大型应用程序中的一个常见设计模式是使用配置驱动软件行为(例如,YAML或基于json配置文件)。火花支持SQL +通用编程语言像Scala和Python是适合这个设计模式,因为配置可以存储在表和动态SQL构建使用它。让我们看看这将在疾控中心工作背景。
首先,回想一下,我们疾控中心管道有65表。我们保持一个配置
表,每一行是一个65年的表,和领域帮助我们构建CDC SQL语句。
表 | PARTITION_COLUMN_EXPRESSION | PARTITION_COLUMN_ALIAS | RANK_EXPRESSION | IS_INSERT_ONLY |
---|---|---|---|---|
T1 | (to_date (create_date MM / dd / yyyy)) | create_date_year | 分区通过ID CDC_TIMESTAMP DESC秩序 | N |
T2 | (to_date (transaction_date MM / dd / yyyy)) | transaction_date_year | 分区通过ID1、ID2 CDC_TIMESTAMP DESC秩序 | N |
T3 | 零 | 零 | 零 | Y |
T4 | … | … | … | … |
表2 -配置表为推动疾控中心管道的一组表
为一个特定的配置信息表和执行CDC逻辑表,使用以下代码。
val hiveDb =“mydb”val CONFIG_TABLE =“配置”/ /表是一个笔记本输入小部件val表=年代" " " $ {dbutils.widgets.get (wTable)}“”“瓦尔(partitionColumnExpression partitionColumnAlias、rankExpression isInsertOnly) = spark.sql (s”“”选择PARTITION_COLUMN_EXPRESSION、PARTITION_COLUMN_ALIAS RANK_EXPRESSION IS_INSERT_ONLY从$ {hiveDb}。$ {CONFIG_TABLE}TABLE_NAME =低(“美元表”)”“”)。作为[(字符串,字符串,字符串,布尔)].head
…
/ **插入1上面的样子。在这里,表*变量是集对T1或T2从配置表* /spark.sql (s”“”表插入$ {}_STAGING分区($ {partitionColumnAlias)选择$ {projectListFromIncremental}从增量”“”)
…/ /插入2可能看起来像val partitionsToOverwrite = spark.sql (s”“”选择不同的$ {partitionColumnExpression}从增量”“)。作为[String] .collectspark.sql (s”“”表插入覆盖表$ {}_FINAL分区($ {partitionColumnAlias})选择$ {projectListFromIncremental}(选择一个。*,排名()在(${rankExpression}) AS RNK从${表}_STAGING a *在$ {partitionColumnAlias} ($ {partitionsToOverwrite.mkString (", "))B)在B。RNK=1和B.FLAG' D '”“”)
笔记本工作流程和工作
说,上面的是实现一个笔记本ProcessIncremental
。我们可以使用笔记本的工作流和有一个控制器笔记本通过的65个表,发现优秀的变更集,和电话ProcessIncremental
在他们身上。
val startDate可以=“20180101”val表= spark.sql (s”“”选择TABLE_NAME从hiveDb。CONFIG_TABLE美元”“”)。作为[String] .collect。地图(_.toLowerCase)
表。foreach{台= >val processTheseChangeSets = dbutils.notebook.run (“GetNextChangeSets”,0地图(“wHiveDb”- > hiveDb,“wTable”- >资源,“wStartDate”- > startDate可以))如果(! processTheseChangeSets.isEmpty) {val统计= dbutils.notebook.run (“ProcessIncremental”,0地图(“wHiveDb”- > hiveDb,“wIncrFiles”- > processTheseChangeSets,“wTable”- >(资源)))}
控制器的笔记本可以很容易地计划作为一个工作在砖CDC管道运行所需的频率。最后,尽管上述循环系列,它可以很容易地更改为一个平行的循环使用,说,.par成语把串行的收藏并行集合,或者使用Scala的未来。
结论
在这个博客中,我们提出了一个参考架构合并成砖三角洲,变更集捕获通过疾控中心的工具(例如,Oracle GoldenGate或Informatica PowerExchange),或通过改变表维护的供应商(如Oracle变化数据捕获),或通过改变表维护的用户使用插入/更新/删除触发器。我们引入了火花SQL用于反映这些记录在砖三角洲,两个性能注意事项(分区和z顺序索引),和辅助因素,如压实和清理,以确保终端用户查询的表优化读取。然后我们看到砖有助于加速的发展这两个ETL管道通过支持配置驱动的编程,并使用笔记本productionizing这些工作流工作流和工作。
访问在线三角洲湖中心要了解BOB低频彩更多,请下载最新的代码,并加入三角洲湖社区。