三角洲生活表Python语言参考<一个class="headerlink" href="//www.neidfyre.com/docs.gcp/workflows/delta-live-tables/#delta-live-tables-python-language-reference" title="">
本文提供了细节的三角洲生活表Python编程接口。
SQL API的信息,请参阅<一个class="reference internal" href="//www.neidfyre.com/docs.gcp/workflows/delta-live-tables/sql-ref.html">三角洲生活表SQL语言参考。
具体配置自动加载程序的详细信息,请参见<一个class="reference internal" href="//www.neidfyre.com/docs.gcp/workflows/ingestion/auto-loader/index.html">自动加载器是什么?。
限制<一个class="headerlink" href="//www.neidfyre.com/docs.gcp/workflows/delta-live-tables/#limitations" title="">
三角洲住Python接口表有以下限制:
Python
表
和视图
函数必须返回一个DataFrame。一些在DataFrames操作的函数不返回DataFrames,不应使用。因为DataFrame转换执行后完整的数据流图已经解决,使用这样的操作可能会产生意想不到的副作用。这些操作包括等功能收集()
,count ()
,toPandas ()
,save ()
,saveAsTable ()
。然而,您可以包括这些功能之外的表
或视图
函数定义,因为这段代码运行一次初始化阶段在图。的
主()
不支持功能。的主
操作的火花需要立即加载输入数据来计算输出的模式。不支持此功能在三角洲住表。
导入dlt
Python模块<一个class="headerlink" href="//www.neidfyre.com/docs.gcp/workflows/delta-live-tables/#import-the-dlt-python-module" title="">
三角洲住表的Python函数中定义dlt
模块。你的管道用Python API必须进口这个模块:实现
进口dlt
创建一个三角洲住表物化视图或流表<一个class="headerlink" href="//www.neidfyre.com/docs.gcp/workflows/delta-live-tables/#create-a-delta-live-tables-materialized-view-or-streaming-table" title="">
在Python中,δ住表确定更新数据集作为一个物化视图或流表的基础上,定义查询。的@ table
修饰符是用来定义物化视图和流表。
定义一个物化视图在Python中,适用@ table
查询执行静态读对一个数据源。定义一个流表,适用@ table
查询执行流读取数据源。这两个数据集类型有相同的语法规范如下:
进口dlt@dlt。表(的名字=“<名称>”,评论=“< >评论”,spark_conf={“<键>”:“<价值”,“<键”:“< >价值”},table_properties={“<键>”:“< >价值”,“<键>”:“< >价值”},路径=“< storage-location-path >”,partition_cols=(“<划分字段>”,“<划分字段>”),模式=“模式定义”,临时=假)@dlt。预计@dlt。expect_or_fail@dlt。expect_or_drop@dlt。expect_all@dlt。expect_all_or_drop@dlt。expect_all_or_faildef<函数- - - - - -的名字>():返回(<查询>)
创建一个三角洲住表视图<一个class="headerlink" href="//www.neidfyre.com/docs.gcp/workflows/delta-live-tables/#create-a-delta-live-tables-view" title="">
定义一个视图在Python中,应用@view
装饰。就像@ table
修饰符,您可以使用视图在三角洲住表静态或流媒体数据集。下面是用Python语法定义视图:
进口dlt@dlt。视图(的名字=“<名称>”,评论=“< >评论”)@dlt。预计@dlt。expect_or_fail@dlt。expect_or_drop@dlt。expect_all@dlt。expect_all_or_drop@dlt。expect_all_or_faildef<函数- - - - - -的名字>():返回(<查询>)
例如:定义表和视图<一个class="headerlink" href="//www.neidfyre.com/docs.gcp/workflows/delta-live-tables/#example-define-tables-and-views" title="">
定义一个表或视图在Python中,应用@dlt.view
或@dlt.table
装饰功能。您可以使用函数名或的名字
参数指定表或视图的名称。下面的例子定义了两个不同的数据集:一个视图taxi_raw
这需要JSON文件作为输入源和一个表filtered_data
这需要的taxi_raw
认为输入:
进口dlt@dlt。视图deftaxi_raw():返回火花。读。格式(“json”)。负载(“/ databricks-datasets / nyctaxi /样本/ json /”)#使用函数名作为表名@dlt。表deffiltered_data():返回dlt。读(“taxi_raw”)。在哪里(…)#使用名称参数作为表名@dlt。表(的名字=“filtered_data”)defcreate_filtered_data():返回dlt。读(“taxi_raw”)。在哪里(…)
例如:访问数据集定义在相同的管道<一个class="headerlink" href="//www.neidfyre.com/docs.gcp/workflows/delta-live-tables/#example-access-a-dataset-defined-in-the-same-pipeline" title="">
除了阅读从外部数据源,您可以访问数据集定义在相同的管道与三角洲生活表read ()
函数。下面的例子演示了创建一个customers_filtered
数据集使用read ()
功能:
@dlt。表defcustomers_raw():返回火花。读。格式(“csv”)。负载(“/数据/ customers.csv”)@dlt。表defcustomers_filteredA():返回dlt。读(“customers_raw”)。在哪里(…)
您还可以使用spark.table ()
函数来访问数据集定义在相同的管道。当使用spark.table ()
函数来访问数据集定义的管道,在函数参数预先考虑生活
数据集名称关键字:
@dlt。表defcustomers_raw():返回火花。读。格式(“csv”)。负载(“/数据/ customers.csv”)@dlt。表defcustomers_filteredB():返回火花。表(“LIVE.customers_raw”)。在哪里(…)
例如:在metastore读取注册表<一个class="headerlink" href="//www.neidfyre.com/docs.gcp/workflows/delta-live-tables/#example-read-from-a-table-registered-in-a-metastore" title="">
读取数据从一个表在蜂巢metastore注册,在函数参数省略了生活
关键字选择限定表名和数据库名称:
@dlt。表def客户():返回火花。表(“sales.customers”)。在哪里(…)
例如阅读从统一编目表,看看<一个class="reference internal" href="//www.neidfyre.com/docs.gcp/workflows/delta-live-tables/unity-catalog.html">数据摄取到统一目录管道。
例如:访问数据集使用spark.sql
您也可以返回一个数据集使用spark.sql
表达一个查询功能。阅读从一个内部数据集,预谋生活。
数据集名称:
@dlt。表defchicago_customers():返回火花。sql(“SELECT * FROM生活。客户_cleaned WHERE city = 'Chicago'")
控制表是如何实现的<一个class="headerlink" href="//www.neidfyre.com/docs.gcp/workflows/delta-live-tables/#control-how-tables-are-materialized" title="">
表也提供额外的控制他们的具体化:
指定如何表<一个class="reference internal" href="//www.neidfyre.com/docs.gcp/workflows/delta-live-tables/#schema-partition-example">分区使用
partition_cols
。您可以使用分区加快查询速度。你可以设置表属性定义一个视图或表。看到<一个class="reference internal" href="//www.neidfyre.com/docs.gcp/workflows/delta-live-tables/properties.html">三角洲生活表属性表。
设置表数据使用的存储位置
路径
设置。默认情况下,管道表数据存储在存储位置路径
不设置。您可以使用<一个class="reference internal" href="//www.neidfyre.com/docs.gcp/workflows/delta/generated-columns.html">生成的列在你的模式定义。看到<一个class="reference internal" href="//www.neidfyre.com/docs.gcp/workflows/delta-live-tables/#schema-partition-example">例如:指定一个模式和分区列。
请注意
表的大小小于1 TB,砖建议让三角洲生活表控制数据的组织。除非你希望你表超出tb,通常不应当指定分区列。
例如:指定一个模式和分区列<一个class="headerlink" href="//www.neidfyre.com/docs.gcp/workflows/delta-live-tables/#example-specify-a-schema-and-partition-columns" title="">
您可以指定一个表模式使用PythonStructType
或一个SQL DDL字符串。指定一个DDL字符串时,可以包括定义<一个class="reference internal" href="//www.neidfyre.com/docs.gcp/workflows/delta/generated-columns.html">生成的列。
下面的示例创建一个表销售
指定一个模式使用PythonStructType
:
sales_schema=StructType([StructField(“customer_id”,StringType(),真正的),StructField(“customer_name”,StringType(),真正的),StructField(“number_of_line_items”,StringType(),真正的),StructField(“order_datetime”,StringType(),真正的),StructField(“order_number”,LongType(),真正的)])@dlt。表(评论=“销售原始数据”,模式=sales_schema)def销售():返回(“…”)
下面的示例指定的模式使用DDL字符串表,定义了一个生成的列,并定义一个分区列:
@dlt。表(评论=“销售原始数据”,模式=”“”customer_id字符串,customer_name字符串,number_of_line_items字符串,order_datetime字符串,order_number长,order_day_of_week字符串生成总是像(dayofweek (order_datetime))”“”,partition_cols=(“order_day_of_week”])def销售():返回(“…”)
默认情况下,三角洲生活表推断的模式表
如果你不指定一个模式定义。
配置一个流表忽略流源表的变化<一个class="headerlink" href="//www.neidfyre.com/docs.gcp/workflows/delta-live-tables/#configure-a-streaming-table-to-ignore-changes-in-a-source-streaming-table" title="">
请注意
使用
skipChangeCommits
国旗,你必须选择预览在管道设置通道。的
skipChangeCommits
国旗只能与spark.readStream
使用选择()
函数。你不能使用这个标志dlt.read_stream ()
函数。
默认情况下,流表需要扩展来源。当一个流表使用另一个流表源,源流表需要更新或者删除,例如,GDPR“被遗忘”处理skipChangeCommits
国旗可以设置在目标流表无视这些变化。更多信息关于这个国旗,看到的<一个class="reference internal" href="//www.neidfyre.com/docs.gcp/workflows/structured-streaming/delta-lake.html">忽略更新和删除。
@ tabledefb():返回火花。readStream。选项(“skipChangeCommits”,“真正的”)。表(“LIVE.A”)
Python三角洲生活表属性<一个class="headerlink" href="//www.neidfyre.com/docs.gcp/workflows/delta-live-tables/#python-delta-live-tables-properties" title="">
下面的表描述的选项和属性可以指定与δ定义表和视图时生活表:
@ table或@view |
---|
类型: 一个可选的表或视图的名称。如果没有定义,函数名称用作表或视图名称。 |
类型: 一个可选描述表。 |
类型: 一个可选的火花配置列表这个查询的执行。 |
类型: 一个可选列表<一个class="reference internal" href="//www.neidfyre.com/docs.gcp/workflows/delta-live-tables/properties.html">表属性为表。 |
类型: 一个可选的表数据的存储位置。如果没有设置,系统将默认存储位置。 |
类型: 一个可选的集合,例如,一个 |
类型: 一个可选的模式定义为表。模式可以被定义为一个SQL DDL字符串,或Python |
类型: 创建一个临时表。不持续这个表的元数据。 默认值是“假”。 |
表或视图的定义 |
---|
一个Python函数,定义了数据集。如果 |
火花SQL语句返回一个火花数据集或考拉DataFrame。 使用
您还可以使用
使用 使用 使用<一个class="reference external" href="//www.neidfyre.com/api-docs/python/pyspark/latest/pyspark.sql/dataframe.html">PySpark与Python语法来定义三角洲生活表查询。 |
预期 |
---|
声明一个数据质量约束了 |
声明一个数据质量约束了 |
声明一个数据质量约束了 |
声明一个或多个数据质量约束。 |
声明一个或多个数据质量约束。 |
声明一个或多个数据质量约束。 |
改变δ生活表中数据获取与Python<一个class="headerlink" href="//www.neidfyre.com/docs.gcp/workflows/delta-live-tables/#change-data-capture-with-python-in-delta-live-tables" title="">
使用apply_changes ()
函数在Python API使用三角洲住表疾病预防控制中心的功能。三角洲生活表Python疾控中心还提供了接口<一个class="reference internal" href="//www.neidfyre.com/docs.gcp/workflows/delta-live-tables/#create-target-fn">create_streaming_table ()函数。您可以使用此函数创建所需的目标表apply_changes ()
函数。
apply_changes(目标=“<目标表>”,源=“<数据源>”,键=(“key1”,“key2”,“keyN”),sequence_by=“< sequence-column >”,ignore_null_updates=假,apply_as_deletes=没有一个,apply_as_truncates=没有一个,column_list=没有一个,except_column_list=没有一个,stored_as_scd_type=<类型>,track_history_column_list=没有一个,track_history_except_column_list=没有一个)
请注意
的默认行为插入
和更新
事件是插入疾控中心事件从源:更新任何目标表中的行匹配指定的键(s)或时插入一个新行匹配的目标表中的记录不存在。处理的删除
可以指定的事件应用作为删除当
条件。
重要的
你必须声明一个目标流表应用更改。您可以选择指定目标表的模式。当指定的模式apply_changes
目标表,你还必须包括__START_AT
和__END_AT
列有相同的数据类型sequence_by
字段。
看到<一个class="reference internal" href="//www.neidfyre.com/docs.gcp/workflows/delta-live-tables/cdc.html">改变数据获取与三角洲生活表。
参数 |
---|
类型: 要更新的表的名称。您可以使用<一个class="reference internal" href="//www.neidfyre.com/docs.gcp/workflows/delta-live-tables/#create-target-fn">create_streaming_table ()函数执行之前创建目标表 这个参数是必需的。 |
类型: 数据源包含疾病预防控制中心记录。 这个参数是必需的。 |
类型: 列或列的组合唯一地标识源数据中的一行。这是用来确定哪些疾病预防控制中心事件适用于目标表中的特定记录。 您可以指定:
参数 这个参数是必需的。 |
类型: 疾控中心的列名称指定逻辑顺序事件源数据。三角洲生活表使用这个序列处理变更的事件到达的顺序。 您可以指定:
参数 这个参数是必需的。 |
类型: 允许摄入更新包含目标列的一个子集。当一个事件匹配现有的行和疾控中心 这个参数是可选的。 默认值是 |
类型: 指定当事件应该被视为一个疾控中心 您可以指定:
这个参数是可选的。 |
类型: 指定当一个中心事件应该被视为一个完整的表 的 您可以指定:
这个参数是可选的。 |
类型: 列的一个子集,包括在目标表中。使用
参数 这个参数是可选的。 默认是在目标表时没有包含所有列 |
类型: 是否保存记录SCD 1型或SCD 2型。 设置为 这一条款是可选的。 缺省值是SCD 1型。 |
类型: 输出列的子集被跟踪目标表的历史。当 参数 这个参数是可选的。 默认是在目标表时没有包含所有列 使用这些参数,必须设置 |
为疾病预防控制中心创建一个目标表输出<一个class="headerlink" href="//www.neidfyre.com/docs.gcp/workflows/delta-live-tables/#create-a-target-table-for-cdc-output" title="">
使用create_streaming_table ()
函数创建的目标表apply_changes ()
输出记录。
请注意
的create_target_table ()
和create_streaming_live_table ()
函数是弃用。砖建议更新现有代码使用create_streaming_table ()
函数。
create_streaming_table(的名字=“<表名称>”,评论=“< >评论”spark_conf={“<键>”:“<价值”,“<键”:“< >价值”},table_properties={“<键>”:“< >价值”,“<键>”:“< >价值”},partition_cols=(“<划分字段>”,“<划分字段>”),路径=“< storage-location-path >”,模式=“模式定义”)
参数 |
---|
类型: 表名。 这个参数是必需的。 |
类型: 一个可选描述表。 |
类型: 一个可选的火花配置列表这个查询的执行。 |
类型: 一个可选列表<一个class="reference internal" href="//www.neidfyre.com/docs.gcp/workflows/delta-live-tables/properties.html">表属性为表。 |
类型: 一个可选的一列或多列列表用于分区表。 |
类型: 一个可选的表数据的存储位置。如果没有设置,系统将默认存储位置。 |
类型: 一个可选的模式定义为表。模式可以被定义为一个SQL DDL字符串,或Python |