第一个结构化流负载运行
本文提供了代码示例所需的基本概念和解释数据砖上运行第一个结构化流查询。您可以使用结构化流进行实时和增量处理工作负载。
结构化流是几种技术之一,功率流表在三角洲住表。砖建议使用三角洲生活表所有新ETL、摄取和结构化流工作负载。看到δ生活是什么表?。
请注意
而三角洲生活表提供了一个稍微修改语法声明流表、配置流读取和转换的一般语法适用于所有用例流砖。三角洲生活表也简化了流媒体管理状态信息,元数据,和大量的配置。
读取数据流
您可以使用结构化的不断摄取来自支持数据源的数据流。一些最常见的数据源中使用砖结构的流负载包括以下:
在云对象存储数据文件
消息总线和队列
三角洲湖
砖建议使用自动加载程序从云对象存储流媒体摄入。自动加载程序支持大多数文件格式支持结构化流。看到自动加载器是什么?。
每个数据源提供了许多选项来指定如何加载批次的数据。在读者的配置,主要选项可能需要设置分为以下几类:
选项指定数据源或格式(例如,文件类型、分隔符和模式)。
选项配置访问源系统(例如,端口设置和凭证)。
选项指定从哪里开始在流(例如,卡夫卡补偿或阅读所有现有文件)。
选项控制多少数据在每一批处理(例如,马克斯补偿、文件或字节每批)。
使用自动加载器读取流数据对象存储
下面的例子演示了与自动加载程序加载JSON数据,它使用cloudFiles
表示格式和选项。的schemaLocation
选项允许模式推理和演化。粘贴以下代码在砖的笔记本电池和运行单元创建一个流DataFrame命名raw_df
:
file_path=“/ databricks-datasets / structured-streaming /事件”checkpoint_path=“/ tmp / ss-tutorial / _checkpoint”raw_df=(火花。readStream。格式(“cloudFiles”)。选项(“cloudFiles.format”,“json”)。选项(“cloudFiles.schemaLocation”,checkpoint_path)。负载(file_path))
像其他砖上读取操作,配置一个流读取并不实际负荷数据。你必须触发一个动作开始前的数据流。
请注意
调用显示()
在流媒体DataFrame开始流工作。对于大多数结构化流用例,触发的动作应该写入数据流水槽。看到为生产准备的结构化流代码。
执行流转换
结构化流支持大多数转换可用在砖和火花SQL。你甚至可以负载MLflow模型作为udf和做出流预测转换。
以下代码示例完成一个简单的转换来丰富摄取JSON数据附加信息使用火花SQL函数:
从pyspark.sql.functions进口上校,current_timestamptransformed_df=(raw_df。选择(“*”,上校(“_metadata.file_path”)。别名(“source_file”),current_timestamp()。别名(“processing_time”)))
由此产生的transformed_df
包含查询指令加载和转换每个记录到达数据源。
请注意
结构化流将数据源视为无限或无限的数据集。因此,一些结构化流转换不支持工作负载,因为他们需要排序无限条目的数量。
大多数聚合和许多连接需要管理状态与水印信息,窗户,和输出模式。看到水印应用于控制数据处理的阈值。
写数据接收器
一个数据接收器流写操作的目标。常用沉在砖流负载包括以下:
三角洲湖
消息总线和队列
键-值数据库
与数据源,大多数数据汇提供许多选项来控制数据写入到目标系统。在作家的配置,主要选项可能需要设置分为以下几类:
默认输出模式(附加)。
(要求每一个检查点位置作家)。
触发间隔;看到配置结构化流触发间隔。
选项指定数据接收器或格式(例如,文件类型、分隔符和模式)。
选项配置访问目标系统(例如,端口设置和凭证)。
执行增量批写三角洲湖
下面的例子写到三角洲湖使用一个指定的文件路径和检查站。
重要的
总是确保你指定一个独一无二的检查点位置为每个流作家配置。检查站提供独特的身份为你流,跟踪所有记录和状态信息与流查询处理。
的availableNow
设置触发指示结构化流处理所有先前未处理记录从源数据集,然后关闭,所以您可以安全地执行下面的代码,而不用担心离开流运行:
target_path=“/ tmp / ss-tutorial /”checkpoint_path=“/ tmp / ss-tutorial / _checkpoint”transformed_df。writeStream。触发(availableNow=真正的)。选项(“checkpointLocation”,checkpoint_path)。选项(“路径”,target_path)。开始()
在这个例子中,没有新记录到我们的数据来源,所以重复执行这段代码不摄取新记录。
警告
结构化流执行可以阻止汽车终止关闭计算资源。为了避免意想不到的成本,一定要终止流查询。
为生产准备的结构化流代码
砖建议使用三角洲生活表对于大多数结构化流工作负载。下面的建议提供了一个起点准备结构化流生产工作负载:
删除不必要的代码从返回结果的笔记本,等
显示
和数
。不要运行结构化流互动集群工作负载;总是安排流工作。
帮助流自动恢复工作,与无限重试配置工作。
不使用自动伸缩与结构化流工作负载。
更多的建议,看看生产注意事项结构化流。
读取数据从三角洲湖、转换和写入三角洲湖
三角洲湖拥有广泛的支持与结构化流作为一个源和一个水槽。看到表流读取和写入。
下面的例子展示了示例语法增量加载所有新记录从三角洲表,与另一个三角洲表的快照加入他们,和写一个增量表:
(火花。readStream。表(“< table-name1 >”)。加入(火花。读。表(“< table-name2 >”),在=“< id >”,如何=“左”)。writeStream。触发(availableNow=真正的)。选项(“checkpointLocation”,“< checkpoint-path >”)。toTable(“< table-name3 >”))
你必须有适当的权限配置为读取源表和写入目标表和指定检查点位置。填写所有参数用尖括号(< >
)使用相关的数据源和下沉值。
请注意
三角洲生活表提供了一个完全声明性语法创建三角洲湖管道和自动管理属性触发器和检查点。看到δ生活是什么表?。
读取数据从卡夫卡、转换和写信给卡夫卡
Apache卡夫卡和其他消息传递总线提供了一些最低的延迟可用于大型数据集。您可以使用砖应用转换的数据从卡夫卡摄取,然后回到卡夫卡写入数据。
请注意
写数据到云对象存储增加了额外的延迟开销。如果您希望存储数据从一个消息总线三角洲湖但要求流负载最低的延迟,砖建议配置单独的流媒体工作向lakehouse摄取数据和应用实时转换为下游消息总线下沉。
下面的代码示例演示了一个简单的模式从卡夫卡通过加入丰富数据增量表中的数据,然后写回到卡夫卡:
(火花。readStream。格式(“卡夫卡”)。选项(“kafka.bootstrap.servers”,“<服务器:ip >”)。选项(“订阅”,“<主题>”)。选项(“startingOffsets”,“最新”)。负载()。加入(火花。读。表(“<表名称>”),在=“< id >”,如何=“左”)。writeStream。格式(“卡夫卡”)。选项(“kafka.bootstrap.servers”,“<服务器:ip >”)。选项(“主题”,“<主题>”)。选项(“checkpointLocation”,“< checkpoint-path >”)。开始())
你必须有适当的权限配置为访问卡夫卡服务。填写所有参数用尖括号(< >
)使用相关的数据源和下沉值。看到与Apache卡夫卡和砖流处理。