使用Delta Live表更改数据捕获
请注意
本文描述如何根据源数据的更改更新Delta Live tables管道中的表。要了解如何记录和查询Delta表的行级更改信息,请参见在Databricks上使用Delta Lake更改数据提要.
预览
Delta Live Tables支持SCD类型2公共预览.
可以在Delta Live Tables中使用更改数据捕获(CDC)根据源数据的更改更新表。Delta Live Tables SQL和Python接口支持CDC。Delta Live Tables支持更新慢变维(SCD)类型1和类型2的表:
使用SCD类型1直接更新记录。更新的记录不保留历史记录。
使用SCD类型2来保留对记录的所有更新的历史。在使用SCD类型2时,还可以保留对指定列的更新历史记录。看到仅跟踪具有SCD类型2的指定列的历史记录
为了表示变更的有效期间,SCD Type 2将每个变更与生成的变更一起存储__START_AT
而且__END_AT
列。的列指定序列通过
使用SQL或sequence_by
来生成__START_AT
而且__END_AT
列。
请注意
的数据类型
__START_AT
而且__END_AT
列的值与指定的数据类型相同序列通过
字段。要查询APPLY CHANGES目标表,必须发布你的表。
当您发布APPLY CHANGES表时,以
__apply_changes_storage_
也是在包含APPLY CHANGES命令的底层内部状态的metastore中创建的。
SQL
使用应用变化成
语句使用Delta Live Tables CDC功能:
将更改应用到活动中。表格_name FROM source KEYS (keys) [WHERE condition] [IGNORE NULL UPDATES] [APPLY AS DELETE WHEN condition] [APPLY AS TRUNCATE WHEN condition] SEQUENCE BY orderByColumn [COLUMNS {columnList | * EXCEPT (exceptColumnList)}] [STORED AS {SCD TYPE 1 | SCD TYPE 2}] [TRACK HISTORY ON {columnList | * EXCEPT (exceptColumnList)}]
条款 |
---|
键 在源数据中唯一标识一行的列或列的组合。这用于确定哪些CDC事件应用于目标表中的特定记录。 这一条款是必需的。 |
在哪里 应用于源和目标的条件,以触发优化,如分区修剪。此条件不能用于删除源行;源中的所有CDC行必须满足此条件,否则将抛出错误。使用WHERE子句是可选的,应该在处理需要特定优化时使用。 这个条款是可选的。 |
忽略空更新 允许摄取包含目标列子集的更新。当CDC事件匹配现有行并且指定了IGNORE NULL UPDATES时,带有 这个条款是可选的。 的默认值是覆盖现有列 |
应用为delete时 指定CDC事件何时应作为事件处理 这个条款是可选的。 |
应用为截断时 指定CDC事件何时应作为满表处理 的 这个条款是可选的。 |
序列由 指定源数据中CDC事件逻辑顺序的列名。Delta Live Tables使用此排序来处理无序到达的更改事件。 这一条款是必需的。 |
列 指定要包含在目标表中的列的子集。你可以:
这个条款是可选的。 属性时,默认值是在目标表中包含所有列 |
存储为 是否将记录存储为SCD类型1或SCD类型2。 这个条款是可选的。 默认为SCD类型1。 |
航迹历史记录 当
这个条款是可选的。当有任何更改时,默认为所有输出列的跟踪历史记录,相当于 要使用此子句,必须设置 |
的默认行为插入
而且更新
事件是插入来自源的CDC事件:更新目标表中与指定键匹配的任何行,或者在目标表中不存在匹配记录时插入新行。处理的删除
属性可以指定事件应用作为删除当
条件。
Python
使用apply_changes ()
函数中使用Delta Live Tables CDC功能。Delta Live Tables Python CDC接口还提供create_streaming_live_table ()函数。类所需的目标表可以使用此函数创建apply_changes ()
函数。看到示例查询.
应用更改函数
apply_changes(目标=“<目标表>”,源=“<数据源>”,键=[“key1”,“key2”,“keyN”],sequence_by=“< sequence-column >”,ignore_null_updates=假,apply_as_deletes=没有一个,apply_as_truncates=没有一个,column_list=没有一个,except_column_list=没有一个,stored_as_scd_type=<类型>,track_history_column_list=没有一个,track_history_except_column_list=没有一个)
参数 |
---|
目标 类型: 要更新的表的名称。您可以使用create_streaming_live_table ()函数在执行 必选参数。 |
源 类型: 包含CDC记录的数据源。 必选参数。 |
键 类型: 在源数据中唯一标识一行的列或列的组合。这用于确定哪些CDC事件应用于目标表中的特定记录。 你可以指定:
参数 必选参数。 |
sequence_by 类型: 指定源数据中CDC事件逻辑顺序的列名。Delta Live Tables使用此排序来处理无序到达的更改事件。 你可以指定:
参数 必选参数。 |
ignore_null_updates 类型: 允许摄取包含目标列子集的更新。当CDC事件与现有行和 可选参数。 默认为 |
apply_as_deletes 类型: 指定CDC事件何时应作为事件处理 你可以指定:
可选参数。 |
apply_as_truncates 类型: 指定CDC事件何时应作为满表处理 的 你可以指定:
可选参数。 |
column_listexcept_column_list 类型: 要包含在目标表中的列的子集。使用
参数 可选参数。 默认情况下是在目标表中包含所有列 |
stored_as_scd_type 类型: 是否将记录存储为SCD类型1或SCD类型2。 设置为 这个条款是可选的。 默认为SCD类型1。 |
track_history_column_listtrack_history_except_column_list 类型: 要在目标表中跟踪历史记录的输出列的子集。当 参数 可选参数。 默认情况下是在目标表中包含所有列 要使用这些参数,必须设置 |
的默认行为插入
而且更新
事件是插入来自源的CDC事件:更新目标表中与指定键匹配的任何行,或者在目标表中不存在匹配记录时插入新行。处理的删除
属性可以指定事件apply_as_deletes
论点。
为输出记录创建一个目标表
使用create_streaming_live_table ()
方法创建目标表apply_changes ()
输出记录。
请注意
的create_target_table ()
函数已弃用。Databricks建议更新现有代码以使用create_streaming_live_table ()
函数。
create_streaming_live_table(的名字=“<表名称>”,评论=“< >评论”spark_conf={“< >键”:“<价值”,“<键”:“< >价值”},table_properties={“< >键”:“< >价值”,“< >键”:“< >价值”},partition_cols=[“<划分字段>”,“<划分字段>”],路径=“< storage-location-path >”,模式=“模式定义”)
参数 |
---|
的名字 类型: 表名。 必选参数。 |
评论 类型: 表的可选描述。 |
spark_conf 类型: 用于执行此查询的可选Spark配置列表。 |
table_properties 类型: 可选的表属性在桌子上。 |
partition_cols 类型: 用于对表进行分区的一个或多个列的可选列表。 |
路径 类型: 表数据的可选存储位置。如果不设置,系统将默认为管道存储位置。 |
模式 类型: 表的可选模式定义。模式可以定义为SQL DDL字符串,也可以定义为Python |
类的模式时apply_changes
目标表中,还必须包含__START_AT
而且__END_AT
类具有相同数据类型的列sequence_by
字段。例如,如果目标表有列键,字符串
,值,字符串
,测序,长
:
create_streaming_live_table(的名字=“目标”,评论="目标为疾病控制中心摄入。",partition_cols=[“价值”],路径=“tablePath美元”,模式=StructType([StructField(“关键”,StringType()),StructField(“价值”,StringType()),StructField(“排序”,LongType()),StructField(“__START_AT”,LongType()),StructField(“__END_AT”,LongType())]))
请注意
方法之前,必须确保已创建目标表
应用变化成
查询或apply_changes
函数。看到示例查询.目标表的指标(如输出行数)不可用。
SCD类型2更新将为每个输入行添加历史记录行,即使没有列发生更改。
的目标
应用变化成
查询或apply_changes
函数不能用作流式直播表的源。对象的目标中读取的表应用变化成
查询或apply_changes
函数必须是活动表。类中不支持期望
应用变化成
查询或apply_changes ()
函数。要对源数据集或目标数据集使用期望:通过定义具有所需期望的中间表,在源数据上添加期望,并使用此数据集作为目标表的源。
使用从目标表读取输入数据的下游表在目标数据上添加期望。
表属性
添加以下表属性以控制墓碑管理的行为删除
事件:
表属性 |
---|
pipelines.cdc.tombstoneGCThresholdInSeconds 将此值设置为匹配无序数据之间的最高预期间隔。 缺省值:5分钟 |
pipelines.cdc.tombstoneGCFrequencyInSeconds 控制墓碑清理检查的频率。 缺省值:60秒 |
仅跟踪具有SCD类型2的指定列的历史记录
SCD类型2支持指定输出列的子集,仅在这些列上生成历史记录;对其他列的更改将就地更新,而不是生成新的历史记录。
要在Delta Live Tables SCD type 2中使用跟踪历史,必须通过将以下配置添加到Delta Live Tables管道设置中来显式启用管道中的功能:
{“配置”:{“pipelines.enableTrackHistory”:“真正的”}}
如果pipelines.enableTrackHistory
是未设置还是设置为假
, SCD类型2查询使用为每个输入行生成历史记录的默认行为。
数据库上的SCD类型1和SCD类型2
这些示例演示了Delta Live Tables SCD类型1和类型2查询,这些查询基于以下源事件更新目标表:
创建新的用户记录。
删除用户记录。
更新用户记录。在SCD类型1的示例中,最后一个
更新
操作延迟到达并从目标表中删除,演示了无序事件的处理。
以下是这些例子的输入记录:
用户标识 |
的名字 |
城市 |
操作 |
sequenceNum |
---|---|---|---|---|
124 |
劳尔 |
瓦哈卡 |
插入 |
1 |
123 |
伊莎贝尔 |
蒙特雷 |
插入 |
1 |
125 |
梅塞德斯 |
提华纳 |
插入 |
2 |
126 |
莉莉 |
坎昆 |
插入 |
2 |
123 |
零 |
零 |
删除 |
6 |
125 |
梅塞德斯 |
瓜达拉哈拉 |
更新 |
6 |
125 |
梅塞德斯 |
墨西卡利 |
更新 |
5 |
123 |
伊莎贝尔 |
吉娃娃 |
更新 |
5 |
运行SCD type 1示例后,目标表包含以下记录:
用户标识 |
的名字 |
城市 |
---|---|---|
124 |
劳尔 |
瓦哈卡 |
125 |
梅塞德斯 |
瓜达拉哈拉 |
126 |
莉莉 |
坎昆 |
属性的附加记录包含以下输入记录截断
操作,可与SCD类型1示例代码一起使用:
用户标识 |
的名字 |
城市 |
操作 |
sequenceNum |
---|---|---|---|---|
124 |
劳尔 |
瓦哈卡 |
插入 |
1 |
123 |
伊莎贝尔 |
蒙特雷 |
插入 |
1 |
125 |
梅塞德斯 |
提华纳 |
插入 |
2 |
126 |
莉莉 |
坎昆 |
插入 |
2 |
123 |
零 |
零 |
删除 |
6 |
125 |
梅塞德斯 |
瓜达拉哈拉 |
更新 |
6 |
125 |
梅塞德斯 |
墨西卡利 |
更新 |
5 |
123 |
伊莎贝尔 |
吉娃娃 |
更新 |
5 |
零 |
零 |
零 |
截断 |
3. |
在运行带有附加的SCD类型1示例之后截断
记录,记录124
而且126
会被截断,因为截断
操作在sequenceNum = 3
,目标表包含以下记录:
用户标识 |
的名字 |
城市 |
---|---|---|
125 |
梅塞德斯 |
瓜达拉哈拉 |
在没有附加的情况下运行SCD类型2的示例之后截断
记录,目标表包含以下记录:
用户标识 |
的名字 |
城市 |
__START_AT |
__END_AT |
---|---|---|---|---|
123 |
伊莎贝尔 |
蒙特雷 |
1 |
5 |
123 |
伊莎贝尔 |
吉娃娃 |
5 |
6 |
124 |
劳尔 |
瓦哈卡 |
1 |
零 |
125 |
梅塞德斯 |
提华纳 |
2 |
5 |
125 |
梅塞德斯 |
墨西卡利 |
5 |
6 |
125 |
梅塞德斯 |
瓜达拉哈拉 |
6 |
零 |
126 |
莉莉 |
坎昆 |
2 |
零 |
运行后的SCD类型2跟踪历史的例子没有额外的截断
记录,目标表包含以下记录:
用户标识 |
的名字 |
城市 |
__START_AT |
__END_AT |
---|---|---|---|---|
123 |
伊莎贝尔 |
吉娃娃 |
1 |
6 |
124 |
劳尔 |
瓦哈卡 |
1 |
零 |
125 |
梅塞德斯 |
瓜达拉哈拉 |
2 |
零 |
126 |
莉莉 |
坎昆 |
2 |
零 |
生成测试数据
为这个例子创建测试记录:
转到Databricks登录页并选择创建一个笔记本,或按新在侧栏中选择笔记本.的创建笔记本对话框出现了。
在创建笔记本对话,给你的笔记本起个名字;例如,生成测试CDC记录.选择SQL从默认的语言下拉菜单。
如果有正在运行的集群,则集群下拉显示。选择要将笔记本附加到的集群。你也可以创建创建笔记本后要附加到的新集群。
点击创建.
复制以下查询并将其粘贴到新笔记本的第一个单元格中:
创建模式如果不存在cdc_data;创建表格cdc_data.用户作为选择col1作为用户标识,col2作为的名字,col3作为城市,col4作为操作,col5作为sequenceNum从(值——初始负载。(124,“劳尔”,“瓦哈卡”,“插入”,1),(123,“伊莎贝尔”,“蒙特雷”,“插入”,1),——新用户。(125,“梅赛德斯”,“提华纳”,“插入”,2),(126,“莉莉”,“坎昆”,“插入”,2),伊莎贝尔被从系统中移除,梅赛德斯被转移到瓜达拉哈拉。(123,零,零,“删除”,6),(125,“梅赛德斯”,“瓜达拉哈拉”,“更新”,6),—这批更新是不按顺序来的。上述位于sequenceNum 5的批处理将是最终状态。(125,“梅赛德斯”,“墨西卡利”,“更新”,5),(123,“伊莎贝尔”,“吉娃娃”,“更新”,5)—取消注释以测试TRUNCATE。——,(null, null, null, "TRUNCATE", 3));
要运行笔记本并填充测试记录,请在单元格操作菜单中在最右侧,单击并选择运行单元,或按shift + enter.
示例查询
进口dlt从pyspark.sql.functions进口上校,expr@dlt.视图def用户():返回火花.readStream.格式(“δ”).表格(“cdc_data.users”)dlt.create_streaming_live_table(“目标”)dlt.apply_changes(目标=“目标”,源=“用户”,键=[“标识”],sequence_by=上校(“sequenceNum”),apply_as_deletes=expr("operation = 'DELETE'"),apply_as_truncates=expr("操作= 'TRUNCATE'"),except_column_list=[“操作”,“sequenceNum”],stored_as_scd_type=1)
——创建并填充目标表。创建或刷新流媒体生活表格目标;应用变化成生活.目标从流(cdc_data.用户)键(用户标识)应用作为删除当操作=“删除”应用作为截断当操作=“截断”序列通过sequenceNum列*除了(操作,sequenceNum)存储作为镜头分割类型1;
示例查询
进口dlt从pyspark.sql.functions进口上校,expr@dlt.视图def用户():返回火花.readStream.格式(“δ”).表格(“cdc_data.users”)dlt.create_streaming_live_table(“目标”)dlt.apply_changes(目标=“目标”,源=“用户”,键=[“标识”],sequence_by=上校(“sequenceNum”),apply_as_deletes=expr("operation = 'DELETE'"),except_column_list=[“操作”,“sequenceNum”],stored_as_scd_type=“2”)
——创建并填充目标表。创建或刷新流媒体生活表格目标;应用变化成生活.目标从流(cdc_data.用户)键(用户标识)应用作为删除当操作=“删除”序列通过sequenceNum列*除了(操作,sequenceNum)存储作为镜头分割类型2;
带有跟踪历史的示例查询
进口dlt从pyspark.sql.functions进口上校,expr@dlt.视图def用户():返回火花.readStream.格式(“δ”).表格(“cdc_data.users”)dlt.create_streaming_live_table(“目标”)dlt.apply_changes(目标=“目标”,源=“用户”,键=[“标识”],sequence_by=上校(“sequenceNum”),apply_as_deletes=expr("operation = 'DELETE'"),except_column_list=[“操作”,“sequenceNum”],stored_as_scd_type=“2”,track_history_except_column_list=[“城市”])
——创建并填充目标表。创建或刷新流媒体生活表格目标;应用变化成生活.目标从流(cdc_data.用户)键(用户标识)应用作为删除当操作=“删除”序列通过sequenceNum列*除了(操作,sequenceNum)存储作为镜头分割类型2;跟踪历史在*除了(城市)