三角洲生活表Python语言参考<一个class="headerlink" href="//www.neidfyre.com/docs.gcp/workflows/delta-live-tables/#delta-live-tables-python-language-reference" title="">

本文提供了细节的三角洲生活表Python编程接口。

SQL API的信息,请参阅<一个class="reference internal" href="//www.neidfyre.com/docs.gcp/workflows/delta-live-tables/sql-ref.html">三角洲生活表SQL语言参考

具体配置自动加载程序的详细信息,请参见<一个class="reference internal" href="//www.neidfyre.com/docs.gcp/workflows/ingestion/auto-loader/index.html">自动加载器是什么?

限制<一个class="headerlink" href="//www.neidfyre.com/docs.gcp/workflows/delta-live-tables/#limitations" title="">

三角洲住Python接口表有以下限制:

  • Python视图函数必须返回一个DataFrame。一些在DataFrames操作的函数不返回DataFrames,不应使用。因为DataFrame转换执行完整的数据流图已经解决,使用这样的操作可能会产生意想不到的副作用。这些操作包括等功能收集(),count (),toPandas (),save (),saveAsTable ()。然而,您可以包括这些功能之外的视图函数定义,因为这段代码运行一次初始化阶段在图。

  • 主()不支持功能。的操作的火花需要立即加载输入数据来计算输出的模式。不支持此功能在三角洲住表。

导入dltPython模块<一个class="headerlink" href="//www.neidfyre.com/docs.gcp/workflows/delta-live-tables/#import-the-dlt-python-module" title="">

三角洲住表的Python函数中定义dlt模块。你的管道用Python API必须进口这个模块:实现

进口dlt

创建一个三角洲住表物化视图或流表<一个class="headerlink" href="//www.neidfyre.com/docs.gcp/workflows/delta-live-tables/#create-a-delta-live-tables-materialized-view-or-streaming-table" title="">

在Python中,δ住表确定更新数据集作为一个物化视图或流表的基础上,定义查询。的@ table修饰符是用来定义物化视图和流表。

定义一个物化视图在Python中,适用@ table查询执行静态读对一个数据源。定义一个流表,适用@ table查询执行流读取数据源。这两个数据集类型有相同的语法规范如下:

进口dlt@dlt(的名字=“<名称>”,评论=“< >评论”,spark_conf={“<键>”:“<价值”,“<键”:“< >价值”},table_properties={“<键>”:“< >价值”,“<键>”:“< >价值”},路径=“< storage-location-path >”,partition_cols=(“<划分字段>”,“<划分字段>”),模式=“模式定义”,临时=)@dlt预计@dltexpect_or_fail@dltexpect_or_drop@dltexpect_all@dltexpect_all_or_drop@dltexpect_all_or_faildef<函数- - - - - -的名字>():返回(<查询>)

创建一个三角洲住表视图<一个class="headerlink" href="//www.neidfyre.com/docs.gcp/workflows/delta-live-tables/#create-a-delta-live-tables-view" title="">

定义一个视图在Python中,应用@view装饰。就像@ table修饰符,您可以使用视图在三角洲住表静态或流媒体数据集。下面是用Python语法定义视图:

进口dlt@dlt视图(的名字=“<名称>”,评论=“< >评论”)@dlt预计@dltexpect_or_fail@dltexpect_or_drop@dltexpect_all@dltexpect_all_or_drop@dltexpect_all_or_faildef<函数- - - - - -的名字>():返回(<查询>)

例如:定义表和视图<一个class="headerlink" href="//www.neidfyre.com/docs.gcp/workflows/delta-live-tables/#example-define-tables-and-views" title="">

定义一个表或视图在Python中,应用@dlt.view@dlt.table装饰功能。您可以使用函数名或的名字参数指定表或视图的名称。下面的例子定义了两个不同的数据集:一个视图taxi_raw这需要JSON文件作为输入源和一个表filtered_data这需要的taxi_raw认为输入:

进口dlt@dlt视图deftaxi_raw():返回火花格式(“json”)负载(“/ databricks-datasets / nyctaxi /样本/ json /”)#使用函数名作为表名@dltdeffiltered_data():返回dlt(“taxi_raw”)在哪里()#使用名称参数作为表名@dlt(的名字=“filtered_data”)defcreate_filtered_data():返回dlt(“taxi_raw”)在哪里()

例如:访问数据集定义在相同的管道<一个class="headerlink" href="//www.neidfyre.com/docs.gcp/workflows/delta-live-tables/#example-access-a-dataset-defined-in-the-same-pipeline" title="">

除了阅读从外部数据源,您可以访问数据集定义在相同的管道与三角洲生活表read ()函数。下面的例子演示了创建一个customers_filtered数据集使用read ()功能:

@dltdefcustomers_raw():返回火花格式(“csv”)负载(“/数据/ customers.csv”)@dltdefcustomers_filteredA():返回dlt(“customers_raw”)在哪里()

您还可以使用spark.table ()函数来访问数据集定义在相同的管道。当使用spark.table ()函数来访问数据集定义的管道,在函数参数预先考虑生活数据集名称关键字:

@dltdefcustomers_raw():返回火花格式(“csv”)负载(“/数据/ customers.csv”)@dltdefcustomers_filteredB():返回火花(“LIVE.customers_raw”)在哪里()

例如:在metastore读取注册表<一个class="headerlink" href="//www.neidfyre.com/docs.gcp/workflows/delta-live-tables/#example-read-from-a-table-registered-in-a-metastore" title="">

读取数据从一个表在蜂巢metastore注册,在函数参数省略了生活关键字选择限定表名和数据库名称:

@dltdef客户():返回火花(“sales.customers”)在哪里()

例如阅读从统一编目表,看看<一个class="reference internal" href="//www.neidfyre.com/docs.gcp/workflows/delta-live-tables/unity-catalog.html">数据摄取到统一目录管道

例如:访问数据集使用spark.sql

您也可以返回一个数据集使用spark.sql表达一个查询功能。阅读从一个内部数据集,预谋生活。数据集名称:

@dltdefchicago_customers():返回火花sql(“SELECT * FROM生活。客户_cleaned WHERE city = 'Chicago'")

控制表是如何实现的<一个class="headerlink" href="//www.neidfyre.com/docs.gcp/workflows/delta-live-tables/#control-how-tables-are-materialized" title="">

表也提供额外的控制他们的具体化:

  • 指定如何表<一个class="reference internal" href="//www.neidfyre.com/docs.gcp/workflows/delta-live-tables/#schema-partition-example">分区使用partition_cols。您可以使用分区加快查询速度。

  • 你可以设置表属性定义一个视图或表。看到<一个class="reference internal" href="//www.neidfyre.com/docs.gcp/workflows/delta-live-tables/properties.html">三角洲生活表属性表

  • 设置表数据使用的存储位置路径设置。默认情况下,管道表数据存储在存储位置路径不设置。

  • 您可以使用<一个class="reference internal" href="//www.neidfyre.com/docs.gcp/workflows/delta/generated-columns.html">生成的列在你的模式定义。看到<一个class="reference internal" href="//www.neidfyre.com/docs.gcp/workflows/delta-live-tables/#schema-partition-example">例如:指定一个模式和分区列

请注意

表的大小小于1 TB,砖建议让三角洲生活表控制数据的组织。除非你希望你表超出tb,通常不应当指定分区列。

例如:指定一个模式和分区列<一个class="headerlink" href="//www.neidfyre.com/docs.gcp/workflows/delta-live-tables/#example-specify-a-schema-and-partition-columns" title="">

您可以指定一个表模式使用PythonStructType或一个SQL DDL字符串。指定一个DDL字符串时,可以包括定义<一个class="reference internal" href="//www.neidfyre.com/docs.gcp/workflows/delta/generated-columns.html">生成的列

下面的示例创建一个表销售指定一个模式使用PythonStructType:

sales_schema=StructType([StructField(“customer_id”,StringType(),真正的),StructField(“customer_name”,StringType(),真正的),StructField(“number_of_line_items”,StringType(),真正的),StructField(“order_datetime”,StringType(),真正的),StructField(“order_number”,LongType(),真正的)])@dlt(评论=“销售原始数据”,模式=sales_schema)def销售():返回(“…”)

下面的示例指定的模式使用DDL字符串表,定义了一个生成的列,并定义一个分区列:

@dlt(评论=“销售原始数据”,模式=”“”customer_id字符串,customer_name字符串,number_of_line_items字符串,order_datetime字符串,order_number长,order_day_of_week字符串生成总是像(dayofweek (order_datetime))”“”,partition_cols=(“order_day_of_week”])def销售():返回(“…”)

默认情况下,三角洲生活表推断的模式如果你不指定一个模式定义。

配置一个流表忽略流源表的变化<一个class="headerlink" href="//www.neidfyre.com/docs.gcp/workflows/delta-live-tables/#configure-a-streaming-table-to-ignore-changes-in-a-source-streaming-table" title="">

请注意

  • 使用skipChangeCommits国旗,你必须选择预览在管道设置通道。

  • skipChangeCommits国旗只能与spark.readStream使用选择()函数。你不能使用这个标志dlt.read_stream ()函数。

默认情况下,流表需要扩展来源。当一个流表使用另一个流表源,源流表需要更新或者删除,例如,GDPR“被遗忘”处理skipChangeCommits国旗可以设置在目标流表无视这些变化。更多信息关于这个国旗,看到的<一个class="reference internal" href="//www.neidfyre.com/docs.gcp/workflows/structured-streaming/delta-lake.html">忽略更新和删除

@ tabledefb():返回火花readStream选项(“skipChangeCommits”,“真正的”)(“LIVE.A”)

Python三角洲生活表属性<一个class="headerlink" href="//www.neidfyre.com/docs.gcp/workflows/delta-live-tables/#python-delta-live-tables-properties" title="">

下面的表描述的选项和属性可以指定与δ定义表和视图时生活表:

@ table或@view

的名字

类型:str

一个可选的表或视图的名称。如果没有定义,函数名称用作表或视图名称。

评论

类型:str

一个可选描述表。

spark_conf

类型:dict

一个可选的火花配置列表这个查询的执行。

table_properties

类型:dict

一个可选列表<一个class="reference internal" href="//www.neidfyre.com/docs.gcp/workflows/delta-live-tables/properties.html">表属性为表。

路径

类型:str

一个可选的表数据的存储位置。如果没有设置,系统将默认存储位置。

partition_cols

类型:一个集合str

一个可选的集合,例如,一个列表的一个或多个列用于分区表。

模式

类型:strStructType

一个可选的模式定义为表。模式可以被定义为一个SQL DDL字符串,或PythonStructType

临时

类型:bool

创建一个临时表。不持续这个表的元数据。

默认值是“假”。

表或视图的定义

def<函数名> ()

一个Python函数,定义了数据集。如果的名字参数没有设置,那么<函数名>用作目标数据集名称。

查询

火花SQL语句返回一个火花数据集或考拉DataFrame。

使用dlt.read ()spark.table ()执行一个完整的阅读从一个数据集定义在相同的管道。当使用spark.table ()函数来读取数据集定义在相同的管道,预谋生活数据集名称的关键字函数参数。例如,阅读从一个数据集命名客户:

spark.table (“LIVE.customers”)

您还可以使用spark.table ()函数来读取注册表的metastore省略生活关键字,选择符合条件的表名与数据库名称:

spark.table (“sales.customers”)

使用dlt.read_stream ()执行流读取数据集定义在相同的管道。

使用spark.sql函数定义创建SQL查询返回的数据集。

使用<一个class="reference external" href="//www.neidfyre.com/api-docs/python/pyspark/latest/pyspark.sql/dataframe.html">PySpark与Python语法来定义三角洲生活表查询。

预期

@expect(“描述”,“约束”)

声明一个数据质量约束了描述。如果违反了期望,一行包含目标数据集的行。

@expect_or_drop(“描述”,“约束”)

声明一个数据质量约束了描述。如果连续违反了期望,把目标数据集的行。

@expect_or_fail(“描述”,“约束”)

声明一个数据质量约束了描述。如果连续违反了期望,立即停止执行。

@expect_all(期望)

声明一个或多个数据质量约束。预期Python字典,关键是期望描述和价值期望约束。如果连续违反任何预期,包括在目标数据集的行。

@expect_all_or_drop(期望)

声明一个或多个数据质量约束。预期Python字典,关键是期望描述和价值期望约束。如果连续违反任何预期,从目标数据集的行。

@expect_all_or_fail(期望)

声明一个或多个数据质量约束。预期Python字典,关键是期望描述和价值期望约束。如果连续违反任何期望,立即停止执行。

改变δ生活表中数据获取与Python<一个class="headerlink" href="//www.neidfyre.com/docs.gcp/workflows/delta-live-tables/#change-data-capture-with-python-in-delta-live-tables" title="">

使用apply_changes ()函数在Python API使用三角洲住表疾病预防控制中心的功能。三角洲生活表Python疾控中心还提供了接口<一个class="reference internal" href="//www.neidfyre.com/docs.gcp/workflows/delta-live-tables/#create-target-fn">create_streaming_table ()函数。您可以使用此函数创建所需的目标表apply_changes ()函数。

apply_changes(目标=“<目标表>”,=“<数据源>”,=(“key1”,“key2”,“keyN”),sequence_by=“< sequence-column >”,ignore_null_updates=,apply_as_deletes=没有一个,apply_as_truncates=没有一个,column_list=没有一个,except_column_list=没有一个,stored_as_scd_type=<类型>,track_history_column_list=没有一个,track_history_except_column_list=没有一个)

请注意

的默认行为插入更新事件是插入疾控中心事件从源:更新任何目标表中的行匹配指定的键(s)或时插入一个新行匹配的目标表中的记录不存在。处理的删除可以指定的事件应用作为删除条件。

重要的

你必须声明一个目标流表应用更改。您可以选择指定目标表的模式。当指定的模式apply_changes目标表,你还必须包括__START_AT__END_AT列有相同的数据类型sequence_by字段。

看到<一个class="reference internal" href="//www.neidfyre.com/docs.gcp/workflows/delta-live-tables/cdc.html">改变数据获取与三角洲生活表

参数

目标

类型:str

要更新的表的名称。您可以使用<一个class="reference internal" href="//www.neidfyre.com/docs.gcp/workflows/delta-live-tables/#create-target-fn">create_streaming_table ()函数执行之前创建目标表apply_changes ()函数。

这个参数是必需的。

类型:str

数据源包含疾病预防控制中心记录。

这个参数是必需的。

类型:列表

列或列的组合唯一地标识源数据中的一行。这是用来确定哪些疾病预防控制中心事件适用于目标表中的特定记录。

您可以指定:

  • 字符串的列表:["标识",“orderId”)

  • 火花SQL的列表坳()功能:[坳(“标识”),坳(“orderId”)

参数坳()函数不能包括限定符。例如,您可以使用坳(标识),但你不能使用坳(source.userId)

这个参数是必需的。

sequence_by

类型:str坳()

疾控中心的列名称指定逻辑顺序事件源数据。三角洲生活表使用这个序列处理变更的事件到达的顺序。

您可以指定:

  • 一个字符串:“sequenceNum”

  • 一个火花的SQL坳()功能:坳(“sequenceNum”)

参数坳()函数不能包括限定符。例如,您可以使用坳(标识),但你不能使用坳(source.userId)

这个参数是必需的。

ignore_null_updates

类型:bool

允许摄入更新包含目标列的一个子集。当一个事件匹配现有的行和疾控中心ignore_null_updates真正的,列将保留现有的价值目标。这也适用于嵌套列的值。当ignore_null_updates,现有的值将被覆盖值。

这个参数是可选的。

默认值是

apply_as_deletes

类型:strexpr ()

指定当事件应该被视为一个疾控中心删除而不是插入。处理无序的数据,删除行暂时保留作为一个墓碑在底层三角洲表,并创建一个视图metastore过滤掉这些墓碑。保留时间间隔可以配置了pipelines.cdc.tombstoneGCThresholdInSeconds表属性

您可以指定:

  • 一个字符串:”操作=“删除”

  • 一个火花的SQLexpr ()功能:expr(“操作=“删除”)

这个参数是可选的。

apply_as_truncates

类型:strexpr ()

指定当一个中心事件应该被视为一个完整的表截断。因为这一条款触发目标表的完整截断,它应该只用于特定的用例需要此功能。

apply_as_truncates支持参数只对SCD 1型。化合物2型不支持截断。

您可以指定:

  • 一个字符串:”操作=“截断”

  • 一个火花的SQLexpr ()功能:expr(“操作=“截断”)

这个参数是可选的。

column_list

except_column_list

类型:列表

列的一个子集,包括在目标表中。使用column_list指定列的完整列表包括。使用except_column_list排除指定列。你可以申报价值作为一个字符串列表,或引发SQL坳()功能:

  • column_list=["标识",“名称”,“城市”)

  • column_list=[坳(“标识”),坳(“名字”),坳(“城市”)]

  • except_column_list=["操作",“sequenceNum”)

  • except_column_list=[坳(“操作”),坳(“sequenceNum”)

参数坳()函数不能包括限定符。例如,您可以使用坳(标识),但你不能使用坳(source.userId)

这个参数是可选的。

默认是在目标表时没有包含所有列column_listexcept_column_list参数传递给函数。

stored_as_scd_type

类型:strint

是否保存记录SCD 1型或SCD 2型。

设置为1化合物1型或2化合物2型。

这一条款是可选的。

缺省值是SCD 1型。

track_history_column_list

track_history_except_column_list

类型:列表

输出列的子集被跟踪目标表的历史。当pipelines.enableTrackHistory设置、使用track_history_column_list指定列被跟踪的完整列表。使用track_history_except_column_list指定的列被排除在跟踪。你可以申报价值作为一个字符串列表,或引发SQL坳()功能:-track_history_column_list=["标识",“名称”,“城市”)。- - - - - -track_history_column_list=[坳(“标识”),坳(“名字”),坳(“城市”)]- - - - - -track_history_except_column_list=["操作",“sequenceNum”)- - - - - -track_history_except_column_list=[坳(“操作”),坳(“sequenceNum”)

参数坳()函数不能包括限定符。例如,您可以使用坳(标识),但你不能使用坳(source.userId)

这个参数是可选的。

默认是在目标表时没有包含所有列track_history_column_listtrack_history_except_column_list参数传递给函数。

使用这些参数,必须设置pipelines.enableTrackHistory在管道设置。否则,就会抛出一个异常。当pipelines.enableTrackHistory没有设置,为每个输入行生成一个历史记录。

为疾病预防控制中心创建一个目标表输出<一个class="headerlink" href="//www.neidfyre.com/docs.gcp/workflows/delta-live-tables/#create-a-target-table-for-cdc-output" title="">

使用create_streaming_table ()函数创建的目标表apply_changes ()输出记录。

请注意

create_target_table ()create_streaming_live_table ()函数是弃用。砖建议更新现有代码使用create_streaming_table ()函数。

create_streaming_table(的名字=“<表名称>”,评论=“< >评论”spark_conf={“<键>”:“<价值”,“<键”:“< >价值”},table_properties={“<键>”:“< >价值”,“<键>”:“< >价值”},partition_cols=(“<划分字段>”,“<划分字段>”),路径=“< storage-location-path >”,模式=“模式定义”)

参数

的名字

类型:str

表名。

这个参数是必需的。

评论

类型:str

一个可选描述表。

spark_conf

类型:dict

一个可选的火花配置列表这个查询的执行。

table_properties

类型:dict

一个可选列表<一个class="reference internal" href="//www.neidfyre.com/docs.gcp/workflows/delta-live-tables/properties.html">表属性为表。

partition_cols

类型:数组

一个可选的一列或多列列表用于分区表。

路径

类型:str

一个可选的表数据的存储位置。如果没有设置,系统将默认存储位置。

模式

类型:strStructType

一个可选的模式定义为表。模式可以被定义为一个SQL DDL字符串,或PythonStructType