以编程方式创建多个表
您可以使用Python与达美住表以编程方式创建多个表来减少代码冗余。
你可能会包含多个流的管道或数据集定义,只有少量的参数不同。这种冗余导致管道容易出错且难以维护。例如,下图显示的图形管道使用消防部门数据集来发现社区最快的响应时间为不同类别的紧急呼叫。在这个例子中,并行流只相差几个参数。
三角洲生活与Python示例表元编程
请注意
这个例子中读取包含在示例数据砖的数据集。因为砖与管道发布数据集不支持统一目录,这个例子只能与一个蜂巢metastore管道配置发布。然而,这种模式也适用于统一目录启用管道,但你必须读取数据外部位置。了解更BOB低频彩多关于使用统一目录与达美住表,看看使用统一的目录与三角洲住表管道。
您可以使用元编程模式来减少生成的开销和维护冗余流的定义。元编程在三角洲住表是使用Python内部函数完成的。因为这些功能是懒洋洋地评估,您可以使用它们来创建流,除了输入参数是相同的。每次调用可以包括一组不同的参数,控制每个表应该如何生成,如以下示例所示。
重要的
因为Python函数与达美住表调用decorator懒洋洋地,当创建数据集在一个循环中你必须调用另一个函数来创建数据集,以确保使用正确的参数值。未能创建数据集在一个单独的函数结果在多个表,使用参数的最终执行循环。
下面的示例调用create_table ()
函数在一个循环来创建表t1
和t2
:
defcreate_table(的名字):@dlt。表(的名字=的名字)deft():返回火花。读。表(的名字)表=(“t1”,“t2”]为t在表:create_table(t)
进口dlt从pyspark.sql.functions进口*@dlt。表(的名字=“raw_fire_department”,评论=“原始表消防部门反应”)@dlt。expect_or_drop(“valid_received”,“收到NOT NULL”)@dlt。expect_or_drop(“valid_response”,“回应不是零”)@dlt。expect_or_drop(“valid_neighborhood”,“社区! =‘没有’”)defget_raw_fire_department():返回(火花。读。格式(“csv”)。选项(“头”,“真正的”)。选项(“多行”,“真正的”)。负载(' / databricks-datasets / timeseries /火灾/ Fire_Department_Calls_for_Service.csv ')。withColumnRenamed(“调用类型”,“call_type”)。withColumnRenamed(“收到DtTm”,“收到”)。withColumnRenamed(“响应DtTm”,“回应”)。withColumnRenamed(“Neighborhooods分析边界”,“社区”)。选择(“call_type”,“收到”,“回应”,“社区”))all_tables=[]defgenerate_tables(call_table,response_table,过滤器):@dlt。表(的名字=call_table,评论=“顶级表调用类型”)defcreate_call_table():返回(火花。sql(”“”选择unix_timestamp (, ' M / d / yyyy h: M:年代”)作为ts_received,unix_timestamp(回答说,' M / d / yyyy h: M:年代”)作为ts_responded,社区从LIVE.raw_fire_department在call_type = '{过滤器}””“”。格式(过滤器=过滤器)))@dlt。表(的名字=response_table,评论=“十大社区最快响应时间”)defcreate_response_table():返回(火花。sql(”“”选择社区,AVG response_time ((ts_received - ts_responded))从生活。{call_table}组1ORDER BY response_time限制10”“”。格式(call_table=call_table)))all_tables。附加(response_table)generate_tables(“alarms_table”,“alarms_response”,“警报”)generate_tables(“fire_table”,“fire_response”,“结构火”)generate_tables(“medical_table”,“medical_response”,“医疗事故”)@dlt。表(的名字=“best_neighborhoods”,评论=”邻居出现在最好的响应时间列表最“)def总结():target_tables=(dlt。读(t)为t在all_tables]联合=functools。减少(λx,y:x。联盟(y),target_tables)返回(联合。groupBy(上校(“社区”))。gg(数(“*”)。别名(“分数”))。orderBy(desc(“分数”)))