转换数据与达美住表
本文描述了如何使用达美住表声明转换数据并通过查询逻辑指定如何处理记录。它还包含一些常见转换模式的例子可以是有用的,以构建出三角洲住表管道。
您可以定义一个数据集对任何查询,返回一个DataFrame。可以使用Apache火花内置操作、udf定制逻辑,MLflowδ住表中管道模型转换。一旦数据被吸收到三角洲住表管道,您可以定义新的数据集对上游资源创建新的流表、物化视图和视图。
何时使用视图、物化视图和流表
确保你的管道是有效的和可维护的,选择最好的数据集类型实现管道时查询。
考虑使用一个视图:
你有一个大型或复杂的查询您想要闯入更易管理查询。
你想验证使用中间结果的期望。
你想减少存储和计算成本,不需要查询结果的实体化。因为表是物化,他们需要额外的计算和存储资源。
考虑使用物化视图时:
多个下游消费表的查询。因为视图计算需求,视图重新计算每次查询视图。
其他管道,工作,或消耗表的查询。因为不物化视图,您只能使用它们在同一管道。
你想在开发过程中查看查询的结果。因为表是物化的,可以查看和查询外的管道,在开发过程中使用表格可以帮助验证计算的正确性。验证后,不需要具体化的查询转换成视图。
考虑使用一个流表时:
查询定义数据源不断或者逐步增长。
查询结果应逐步计算的。
需要高吞吐量和低延迟的管道。
请注意
流表总是定义针对流媒体资源。您还可以使用流媒体资源应用变化成
从疾病预防控制中心应用更新提要。看到改变数据获取与三角洲生活表。
结合流表和物化视图在一个单一的管道
流表继承的处理保证配置Apache火花结构化流和处理查询扩展的数据来源,在新行总是插入源表而不是修改。
请注意
虽然默认情况下,流表需要扩展的数据源,当流源是另一个流表需要更新或者删除,您可以覆盖这个行为的skipChangeCommits国旗。
常见的流模式包括摄入源数据来创建初始数据集在一个管道。这些初始数据集通常被称为青铜表和经常执行简单的转换。
相比之下,最后一个表在一个管道,通常被称为黄金表,通常需要复杂的聚合或阅读来源的目标应用变化成
操作。因为这些操作本质上创建更新而不是附加,他们不支持输入流表。这些转换更适合物化视图。
通过混合流表和物化视图到一个管道,可以简化你的管道,避免昂贵的re-ingestion或加工的原始数据,并有完整的SQL来计算复杂的力量聚合在一个有效地编码和过滤数据集。下面的例子说明了这种类型的混合处理:
请注意
这些例子从云存储使用自动加载程序加载文件。加载文件自动加载程序在统一目录启用管道,必须使用外部位置。了解更BOB低频彩多关于使用统一目录与达美住表,看看使用统一的目录与三角洲住表管道。
@dlt。表defstreaming_bronze():返回(#由于这是一个流源,这个表是增量。火花。readStream。格式(“cloudFiles”)。选项(“cloudFiles.format”,“json”)。负载(“gs: / /道路/ /生/数据”))@dlt。表defstreaming_silver():#因为我们读青铜表流,这银表也#增量更新。返回dlt。read_stream(“streaming_bronze”)。在哪里(…)@dlt。表deflive_gold():#这个表将被重新计算完全阅读整个银表#时更新。返回dlt。读(“streaming_silver”)。groupBy(“user_id”)。数()
创建或刷新流媒体表streaming_bronze作为选择*从cloud_files(“gs: / /道路/ /生/数据”,“json”)创建或刷新流媒体表streaming_silver作为选择*从流(生活。streaming_bronze)在哪里…创建或刷新生活表live_gold作为选择数(*)从生活。streaming_silver集团通过user_id
BOB低频彩了解更多关于使用自动加载程序有效地读取JSON文件从谷歌云存储进行增量处理。
Stream-static连接
Stream-static连接是一个好的选择当denormalizing扩展数据的连续流主要是静态维度表。
每个管道更新,新记录从流与最新的静态表的快照。如果记录被添加或更新后的静态表对应的数据流表处理,合成记录并非重新计算,除非完全执行刷新。
在管道配置为触发执行静态表返回结果的时间更新开始。在管道配置连续执行,每次表流程更新,最新版本的静态表查询。
下面是一个示例stream-static加入:
@dlt。表defcustomer_sales():返回dlt。read_stream(“销售”)。加入(读(“顾客”),(“customer_id”),“左”)
创建或刷新流媒体表customer_sales作为选择*从流(生活。销售)内心的加入左生活。客户使用(customer_id)
计算聚合效率
您可以使用流表增量计算简单的分配总量统计,min,马克斯,或总和,代数总量平均和标准偏差。砖建议增量的聚合查询与数量有限的组织,例如,一个查询的集团通过国家
条款。只有新的输入数据读取与每个更新。
在三角洲住表管道使用MLflow模型
您可以使用MLflow-trained三角洲住表中管道模型。在砖MLflow模型被视为转换,这意味着他们行动火花DataFrame作为火花DataFrame输入,并返回结果。因为δ生活对DataFrames表定义数据集,您可以将Apache火花工作负载,利用MLflow三角洲住表只有几行代码。更多关于MLflow,看到MLflow指南。
如果你已经有一个Python笔记本调用一个MLflow模型,可以适应这段代码三角洲生活表使用@dlt.table
装饰和确保函数定义返回转换结果。默认不安装MLflow三角洲生活表,所以确保你%皮普安装mlflow
和导入mlflow
和dlt
你的笔记本的顶部。介绍三角洲住表的语法,看与Python教程:声明一个数据管道三角洲生活表。
在三角洲住表使用MLflow模型,完成以下步骤:
获得的运行ID和模型名称MLflow模型。运行ID和模型名称是用于构造MLflow的URI模式。
使用URI来定义一个火花UDF加载MLflow模型。
在你的表定义调用UDF使用MLflow模型。
下面的例子显示了这一模式的基本语法:
%皮普安装mlflow进口dlt进口mlflowrun_id=“< mlflow-run-id >”model_name=“< the-model-name-in-run >”model_uri=f”:/{run_id}/{model_name}”loaded_model_udf=mlflow。pyfunc。spark_udf(火花,model_uri=model_uri)@dlt。表defmodel_predictions():返回dlt。读(<输入- - - - - -数据>)。withColumn(“预测”,loaded_model_udf(<模型- - - - - -特性>))
作为一个完整的例子,下面的代码定义了一个火花UDF命名loaded_model_udf
装载一个MLflow模型对贷款风险数据训练。预测的数据列用于制造作为一个参数传递给UDF。表loan_risk_predictions
计算预测为每一行loan_risk_input_data
。
%皮普安装mlflow进口dlt进口mlflow从pyspark.sql.functions进口结构体run_id=“mlflow_run_id”model_name=“the_model_name_in_run”model_uri=f”:/{run_id}/{model_name}”loaded_model_udf=mlflow。pyfunc。spark_udf(火花,model_uri=model_uri)分类=(“术语”,“home_ownership”,“目的”,“addr_state”,“verification_status”,“application_type”]数字组成=(“loan_amnt”,“emp_length”,“annual_inc”,“唯一”,“delinq_2yrs”,“revol_util”,“total_acc”,“credit_length_in_years”]特性=分类+数字组成@dlt。表(评论=“贷款风险GBT毫升预测”,table_properties={“质量”:“黄金”})defloan_risk_predictions():返回dlt。读(“loan_risk_input_data”)。withColumn(“预测”,loaded_model_udf(结构体(特性)))
保留手动删除或更新
三角洲生活表允许您手动删除或更新一个表的记录和下游验算表进行刷新操作。
默认情况下,三角洲生活表验算表结果基于输入数据每次更新一条管道,所以你必须确保删除记录不是从源数据加载。设置pipelines.reset.allowed
表属性假
防止刷新表但并不妨碍增量写入表或防止新流入的数据表。
下图展示了一个示例使用两个流表:
raw_user_table
接受原始用户数据从源。bmi_table
使用体重和身高的增量计算体重指数得分raw_user_table
。
你想要手动删除或更新用户的记录raw_user_table
和验算bmi_table
。
下面的代码演示了设置pipelines.reset.allowed
表属性假
禁用全部刷新raw_user_table
这有意保留随时间的变化,但下游表重新计算管道更新时,运行:
创建或刷新流媒体表raw_user_tableTBLPROPERTIES(管道。重置。允许=假)作为选择*从cloud_files(“/ databricks-datasets / iot-stream /关键因素”,“csv”);创建或刷新流媒体表bmi_table作为选择用户标识,(重量/2。2)/战俘(高度*0。0254年,2)作为身体质量指数从流(生活。raw_user_table);