改变数据获取与三角洲生活表
预览
这个特性是在公共预览。
您可以使用变化数据捕获(CDC)三角洲生活表更新表根据源数据的变化。CDC在三角洲地区的生活表支持SQL和Python接口。三角洲生活与缓慢变化维度表支持更新表(SCD) 1型和2型:
直接使用的化合物1型更新记录。不保留历史记录更新。
使用SCD 2型保留的历史记录,在所有更新或更新一套指定的列。看到追踪历史只指定列SCD 2型
语法和其他参考资料,请参阅:
请注意
本文描述了如何更新表在三角洲住表管道基于源数据的变化。学习如何记录和查询为三角洲表行级变化信息,明白了使用三角洲湖变化数据以砖。
疾病预防控制中心是如何实现与达美住表吗?
您必须指定的源数据序列中的一列记录,三角洲生活表解释的单调递增表示适当的源数据的排序。三角洲生活表自动处理数据到达的顺序。化合物2型变化,δ住表传播适当的排序值__START_AT
和__END_AT
目标表的列。应该有一个不同的更新在每个排序值,每个键和零排序值是不支持的。
执行中心处理三角洲生活表,您首先创建一个流表,然后使用一个应用变化成
声明中指定源,钥匙,和测序改变饲料。创建目标流表,可以使用创建或刷新流媒体表
在SQL或声明create_streaming_table ()
在Python中,函数。创建语句定义疾病预防控制中心处理,使用应用变化
在SQL或声明apply_changes ()
在Python中,函数。语法细节,请参阅改变δ生活表中数据获取与SQL或改变δ生活表中数据获取与Python。
δ住表数据对象是用于什么疾病预防控制中心处理?
当你声明的目标表,在蜂巢metastore创建了两个数据结构:
一个视图使用分配给目标表的名称。
内部支持表用δ住表表管理中心处理。这张桌子是由将命名的
__apply_changes_storage_
到目标表名。
例如,如果你声明一个目标表命名dlt_cdc_target
,你会看到一个视图命名dlt_cdc_target
和一个表命名__apply_changes_storage_dlt_cdc_target
metastore。创建一个视图允许三角洲生活表过滤掉多余的信息(例如,墓碑和版本)需要处理无序的数据。将处理过的数据,查询目标视图。你也可以查询的原始数据__apply_changes_storage_
表删除记录和额外的版本列。如果你手动添加数据表,记录被认为是其他更改,因为之前的版本列失踪。
限制
度量目标表,如输出的行数,并不是可用的。
化合物2型更新将添加一个历史行对于每一个输入行,即使没有列已经改变了。
的目标
应用变化成
查询或apply_changes
不能使用函数作为一个流的源表。表读取的目标应用变化成
查询或apply_changes
函数必须是一个生活表。不支持在一个预期
应用变化成
查询或apply_changes ()
函数。使用预期的源或目标数据集:源数据上添加预期通过定义一个中间表所需的期望和使用这个数据集作为目标表的源。
添加预期与下游目标数据表,从目标表中读取输入数据。
化合物1型和SCD 2型砖
以下部分提供的例子,演示三角洲生活表SCD 1型和2型查询更新目标表基于源的事件:
创建新用户记录。
删除一个用户记录。
更新用户记录。在化合物1型的例子中,最后一次
更新
操作迟到,从目标表,展示了事件的处理。
所有下面的例子假设熟悉配置和更新三角洲住表管道。看到教程:管道运行第一个三角洲住表。
为了运行这些示例,您必须首先创建一个示例数据集。看到生成测试数据。
以下是输入记录这些例子:
用户标识 |
的名字 |
城市 |
操作 |
sequenceNum |
---|---|---|---|---|
124年 |
劳尔 |
瓦哈卡 |
插入 |
1 |
123年 |
伊莎贝尔 |
蒙特雷 |
插入 |
1 |
125年 |
梅塞德斯 |
提华纳 |
插入 |
2 |
126年 |
莉莉 |
坎昆 |
插入 |
2 |
123年 |
零 |
零 |
删除 |
6 |
125年 |
梅塞德斯 |
瓜达拉哈拉 |
更新 |
6 |
125年 |
梅塞德斯 |
墨西卡利 |
更新 |
5 |
123年 |
伊莎贝尔 |
吉娃娃 |
更新 |
5 |
如果你取消最后一行数据的示例中,将插入以下记录指定记录应该被截断的地方:
用户标识 |
的名字 |
城市 |
操作 |
sequenceNum |
---|---|---|---|---|
零 |
零 |
零 |
截断 |
3 |
请注意
下面的例子包括所有选项来指定删除
和截断
业务,但每一个都是可选的。
化合物1型更新过程
下面的代码示例演示了处理SCD 1型更新:
进口dlt从pyspark.sql.functions进口上校,expr@dlt。视图def用户():返回火花。readStream。格式(“δ”)。表(“cdc_data.users”)dlt。create_streaming_table(“目标”)dlt。apply_changes(目标=“目标”,源=“用户”,键=(“标识”),sequence_by=上校(“sequenceNum”),apply_as_deletes=expr(“=”删除“行动”),apply_as_truncates=expr(“=“截断”行动”),except_column_list=(“操作”,“sequenceNum”),stored_as_scd_type=1)
——创建和填充目标表。创建或刷新流媒体表目标;应用变化成生活。目标从流(cdc_data。用户)键(用户标识)应用作为删除当操作=“删除”应用作为截断当操作=“截断”序列通过sequenceNum列*除了(操作,sequenceNum)存储作为镜头分割类型1;
跑后SCD 1型的例子中,目标表包含以下记录:
用户标识 |
的名字 |
城市 |
---|---|---|
124年 |
劳尔 |
瓦哈卡 |
125年 |
梅塞德斯 |
瓜达拉哈拉 |
126年 |
莉莉 |
坎昆 |
后运行SCD 1型有额外的例子截断
记录,记录124年
和126年
截断的吗截断
操作在sequenceNum = 3
,目标表包含以下记录:
用户标识 |
的名字 |
城市 |
---|---|---|
125年 |
梅塞德斯 |
瓜达拉哈拉 |
2型更新过程的化合物
下面的代码示例演示了处理SCD 2型更新:
进口dlt从pyspark.sql.functions进口上校,expr@dlt。视图def用户():返回火花。readStream。格式(“δ”)。表(“cdc_data.users”)dlt。create_streaming_table(“目标”)dlt。apply_changes(目标=“目标”,源=“用户”,键=(“标识”),sequence_by=上校(“sequenceNum”),apply_as_deletes=expr(“=”删除“行动”),except_column_list=(“操作”,“sequenceNum”),stored_as_scd_type=“2”)
——创建和填充目标表。创建或刷新流媒体表目标;应用变化成生活。目标从流(cdc_data。用户)键(用户标识)应用作为删除当操作=“删除”序列通过sequenceNum列*除了(操作,sequenceNum)存储作为镜头分割类型2;
跑后SCD 2型的例子中,目标表包含以下记录:
用户标识 |
的名字 |
城市 |
__START_AT |
__END_AT |
---|---|---|---|---|
123年 |
伊莎贝尔 |
蒙特雷 |
1 |
5 |
123年 |
伊莎贝尔 |
吉娃娃 |
5 |
6 |
124年 |
劳尔 |
瓦哈卡 |
1 |
零 |
125年 |
梅塞德斯 |
提华纳 |
2 |
5 |
125年 |
梅塞德斯 |
墨西卡利 |
5 |
6 |
125年 |
梅塞德斯 |
瓜达拉哈拉 |
6 |
零 |
126年 |
莉莉 |
坎昆 |
2 |
零 |
追踪历史只指定列SCD 2型
SCD 2型支持指定输出列的一个子集生成历史上那些列;更改其他列就地更新,而不是产生新的历史记录。
使用跟踪历史SCD 2型三角洲生活表,您必须显式地启用这个特性在管道通过添加以下配置三角洲生活表管道设置:
{“配置”:{“pipelines.enableTrackHistory”:“真正的”}}
如果pipelines.enableTrackHistory
没有设置或设置假
谈到2型查询使用的默认行为为每个输入行生成一个历史记录。
下面的例子演示了使用跟踪历史SCD 2型:
进口dlt从pyspark.sql.functions进口上校,expr@dlt。视图def用户():返回火花。readStream。格式(“δ”)。表(“cdc_data.users”)dlt。create_streaming_table(“目标”)dlt。apply_changes(目标=“目标”,源=“用户”,键=(“标识”),sequence_by=上校(“sequenceNum”),apply_as_deletes=expr(“=”删除“行动”),except_column_list=(“操作”,“sequenceNum”),stored_as_scd_type=“2”,track_history_except_column_list=(“城市”])
——创建和填充目标表。创建或刷新流媒体表目标;应用变化成生活。目标从流(cdc_data。用户)键(用户标识)应用作为删除当操作=“删除”序列通过sequenceNum列*除了(操作,sequenceNum)存储作为镜头分割类型2;跟踪历史在*除了(城市)
跑后的SCD 2型跟踪历史有额外的例子截断
记录,目标表包含以下记录:
用户标识 |
的名字 |
城市 |
__START_AT |
__END_AT |
---|---|---|---|---|
123年 |
伊莎贝尔 |
吉娃娃 |
1 |
6 |
124年 |
劳尔 |
瓦哈卡 |
1 |
零 |
125年 |
梅塞德斯 |
瓜达拉哈拉 |
2 |
零 |
126年 |
莉莉 |
坎昆 |
2 |
零 |
生成测试数据
下面的代码生成提供了一个示例数据集用于本教程中的示例查询现在。假设你有适当的凭证来创建一个新的模式和创建一个新表,你可以用一个笔记本或执行这些语句砖SQL。下面的代码是不为了运行管道作为三角洲的一部分生活表:
创建模式如果不存在cdc_data;创建表cdc_data。用户作为选择col1作为用户标识,col2作为的名字,col3作为城市,col4作为操作,col5作为sequenceNum从(值——初始载荷。(124年,“劳尔”,“瓦哈卡”,“插入”,1),(123年,“伊莎贝尔”,“蒙特雷”,“插入”,1),——新用户。(125年,“梅赛德斯”,“提华纳”,“插入”,2),(126年,“莉莉”,“坎昆”,“插入”,2),——伊莎贝尔从系统中删除和奔驰搬到瓜达拉哈拉。(123年,零,零,“删除”,6),(125年,“梅赛德斯”,“瓜达拉哈拉”,“更新”,6),——这批更新到达的顺序。上面批sequenceNum 5将是最终的状态。(125年,“梅赛德斯”,“墨西卡利”,“更新”,5),(123年,“伊莎贝尔”,“吉娃娃”,“更新”,5)——取消测试截断。———(空,空,空,“截断”,3));