表流读取和写入
三角洲湖深入结合火花结构化流通过readStream
和writeStream
。三角洲湖克服了许多限制通常与流媒体系统和相关文件,包括:
合并小文件产生的低延迟摄取
维护“只有一次”的处理与不止一个流(或并发批处理作业)
有效地发现哪些文件是新当使用作为一个流的源文件
δ表来源
当你加载一个三角洲表作为流源和流查询中使用它,查询流程表中所有的数据以及任何新到达的数据流后开始。
你可以加载路径和表作为一个流。
火花。readStream。格式(“δ”)。负载(“/ tmp /δ/事件”)进口io。δ。值得一提的。_火花。readStream。δ(“/ tmp /δ/事件”)
或
进口io。δ。值得一提的。_火花。readStream。格式(“δ”)。表(“事件”)
限制输入的速度
以下选项可以控制micro-batches:
maxFilesPerTrigger
:有多少新文件被认为是在每个micro-batch。默认值是1000。maxBytesPerTrigger
在每个micro-batch:多少数据被处理。这个选项设置一个“软马克斯”,这意味着一个批处理过程大约这个过程的数据量,可能超过极限为了使流查询前进情况下的最小输入单位超过这个极限。如果你使用Trigger.Once
为你流,此选项将被忽略。这不是默认设置。
如果你使用maxBytesPerTrigger
结合maxFilesPerTrigger
,micro-batch过程数据,直到maxFilesPerTrigger
或maxBytesPerTrigger
达到极限。
请注意
当源表事务是清理由于logRetentionDuration
配置在处理流滞后,三角洲湖处理数据对应的最新交易历史源表流但不失败。这可能导致数据被删除。
河三角洲湖变化数据捕获(CDC)饲料
三角洲湖改变数据提要三角洲的记录更改表,包括更新和删除。启用时,你可以从改变流数据饲料和编写逻辑处理插入、更新和删除操作到下游的表中。尽管变化数据提要数据输出三角洲表描述略有不同,这提供了一种解决方案宣传下游表的增量变化大奖章架构。
忽略更新和删除
结构化流不处理不是一个附加的输入,将抛出一个异常如果发生任何修改在桌子上被用作源。有两个主要的策略来处理变化,不能自动下游传播:
您可以删除输出从一开始就和检查点和重启流。
你可以设置这两个选择:
ignoreDeletes
:忽略事务删除数据分区的边界。ignoreChanges
:如果文件必须被重写处理文档更新源表中的数据修改操作,比如更新
,合并成
,删除
(分区),或覆盖
。不变行可能仍在释放,因此你的下游消费者应该能够处理重复。删除不是下游传播。ignoreChanges
包容ignoreDeletes
。因此如果你使用ignoreChanges
,你流将不会被删除或更新源表。
例子
例如,假设您有一个表user_events
与日期
,user_email
,行动
分区的列日期
。你流的user_events
你需要删除数据表由于GDPR。
当你在分区边界(即删除在哪里
分区列),文件已经分割的价值所以删除掉这些文件的元数据。因此,如果你只是想一些分区的删除数据,您可以使用:
火花。readStream。格式(“δ”)。选项(“ignoreDeletes”,“真正的”)。负载(“/ tmp /δ/ user_events”)
但是,如果你必须删除数据的基础上user_email
,那么您将需要使用:
火花。readStream。格式(“δ”)。选项(“ignoreChanges”,“真正的”)。负载(“/ tmp /δ/ user_events”)
如果你更新user_email
与更新
声明中包含的文件user_email
在的问题是重写。当你使用ignoreChanges
,新记录与所有其他不变向下游传播记录在同一个文件中。你的逻辑应该能够处理这些输入重复的记录。
指定初始位置
您可以使用以下选项指定的起点三角洲湖流源没有处理整个表。
startingVersion
:从三角洲湖版本。所有表变化从这个版本(包容)将读取流源。你可以获取提交的版本版本
列的描述历史命令的输出。在砖运行时的7.4及以上,只返回最新的变化,指定
最新的
。startingTimestamp
:从时间戳。所有表更改后承诺或时间戳(包容)将读取流源。之一:时间戳字符串。例如,
“2019 - 01 - 01 t00:00:00.000z”
。一个日期字符串。例如,
“2019-01-01”
。
你不能同时设置两个选项;你可以只使用其中的一个。他们只有当开始一个新的流媒体查询生效。如果一个流媒体查询已经开始和检查点的进展记录,这些选项将被忽略。
重要的
虽然您可以启动流源从一个指定的版本或时间戳、流媒体来源的模式总是最新的三角洲表的模式。你必须确保没有不兼容模式改变δ表后指定的版本或时间戳。否则,流源读取数据时可能会返回不正确的结果,错误的模式。
没有数据被删除过程初始快照
请注意
这个特性可以在砖运行时11.1及以上。这个特性是在公共预览。
使用增量表作为流源时,数据的查询过程首先出现在桌子上。三角洲表在这个版本被称为初始快照。默认情况下,三角洲表的数据文件处理基于文件最后修改。然而,最后修改时间并不一定代表记录事件的时间顺序。
在有状态流查询定义水印,处理文件的修改时间会导致记录处理错误的订单。这可能导致晚期事件的记录删除水印。
你可以避免数据下降问题通过支持下列选项:
withEventTimeOrder:初始快照是否应该处理事件的时间顺序。
启用事件时间顺序后,初始快照数据的事件时间范围分为桶。每个微一桶的批处理流程过滤数据的时间范围内。maxFilesPerTrigger和maxBytesPerTrigger配置选项仍适用于控制microbatch大小,但只在一个近似的方法由于处理的本质。
下图显示了这个过程:
明显的这一特性的信息:
数据下降问题只有当初始增量快照的状态在默认顺序流查询处理。
你不能改变
withEventTimeOrder
一旦流查询开始时初始快照还是正在处理。重新启动,withEventTimeOrder
改变,你需要删除检查点。如果您正在运行一个启用了withEventTimeOrder流查询,你不能降级到DBR版本不支持这个功能,直到初始快照处理完成。如果你需要降级,你可以等待初始快照完成,查询或删除检查点和重启。
不支持这个功能在以下常见场景:
事件时间列是一个生成的列有non-projectionδ源和水印之间的转换。
有一个水印,不止一个δ源在流查询。
启用事件时间顺序后,三角洲初始快照处理的性能可能较慢。
每一批微扫描初始快照来过滤数据对应的事件时间范围内。加快过滤操作,建议使用一个δ源列事件时间,以便数据不可以应用(检查数据不与z顺序索引三角洲湖在适当的时候)。另外,表分区沿着事件时间列可以进一步加快处理。您可以检查火花UI,看看有多少δ特定微批处理文件扫描。
三角洲表作为一个水槽
您还可以使用结构化数据写入三角洲表流。事务日志可以让三角洲湖保证只有一次处理,即使还有其他流对表或批量查询并发运行的情况。
请注意
三角洲湖真空
函数删除所有文件不是由三角洲湖但跳过任何目录开始_
。您可以安全地存储检查点与其他数据和元数据表使用一个目录结构如δ< table_name > / _checkpoints
。
指标
请注意
在砖运行时8.1及以上。
你可以找到的字节数和文件数量有待处理流媒体查询流程随着numBytesOutstanding
和numFilesOutstanding
指标。额外的指标包括:
numNewListedFiles
:列出的三角洲湖文件数量来计算这批的积压。backlogEndOffset
:表版本用来计算积压。
如果您正在运行流在一个笔记本,你可以看到这些度量标准下原始数据选项卡中流仪表板查询进展:
{“源”:({“描述”:“DeltaSource(文件/道路/ /源):“,“指标”:{“numBytesOutstanding”:“3456”,“numFilesOutstanding”:“8”},}]}
Append模式
默认情况下,流在附加模式下运行,向表添加新记录。
您可以使用路径的方法:
(事件。writeStream。格式(“δ”)。outputMode(“添加”)。选项(“checkpointLocation”,“/ tmp /δ/ _checkpoints /”)。开始(“/δ/事件”))
事件。writeStream。格式(“δ”)。outputMode(“添加”)。选项(“checkpointLocation”,“/ tmp /δ/事件/ _checkpoints /”)。开始(“/ tmp /δ/事件”)
或者是toTable
方法在火花3.1和更高版本(砖运行时的8.3及以上),如下所示。(在火花之前版本3.1(砖运行时8.2及以下),使用表
方法。)
(事件。writeStream。格式(“δ”)。outputMode(“添加”)。选项(“checkpointLocation”,“/ tmp /δ/事件/ _checkpoints /”)。toTable(“事件”))
事件。writeStream。outputMode(“添加”)。选项(“checkpointLocation”,“/ tmp /δ/事件/ _checkpoints /”)。toTable(“事件”)
完整的模式
您还可以使用结构化流与每一批替换整个表。用例的一个例子是使用聚合计算一个总结:
(火花。readStream。格式(“δ”)。负载(“/ tmp /δ/事件”)。groupBy(“customerId”)。数()。writeStream。格式(“δ”)。outputMode(“完整的”)。选项(“checkpointLocation”,“/ tmp /δ/ eventsByCustomer / _checkpoints /”)。开始(“/ tmp /δ/ eventsByCustomer”))
火花。readStream。格式(“δ”)。负载(“/ tmp /δ/事件”)。groupBy(“customerId”)。数()。writeStream。格式(“δ”)。outputMode(“完整的”)。选项(“checkpointLocation”,“/ tmp /δ/ eventsByCustomer / _checkpoints /”)。开始(“/ tmp /δ/ eventsByCustomer”)
前面的例子不断更新一个表,其中包含客户总数量的事件。
更宽松的延迟需求的应用程序,你可以节省计算资源与一次性触发器。使用这些更新总结聚合表在给定的时间表,只处理新数据到达自上次更新。
幂等表中写道foreachBatch
请注意
在砖运行时8.4及以上。
命令foreachBatch允许您指定一个函数执行后的输出每个micro-batch流查询任意转换。这使得作为补充foreachBatch
函数可以写micro-batch输出到一个或多个目标三角洲表目的地。然而,foreachBatch
不会让那些写幂等作为写尝试缺乏信息是否重新执行批处理。例如,重新运行失败的批处理可能导致重复数据写道。
为了解决这个问题,δ表支持以下DataFrameWriter
的选项,使得幂等写道:
txnAppId
:一个独一无二的字符串,您可以通过在每个DataFrame
写。例如,您可以使用StreamingQuery IDtxnAppId
。txnVersion
:一个单调递增数字作为事务的版本。
δ表使用的组合txnAppId
和txnVersion
识别重复的写和忽略它们。
如果一批写中断故障,运行批处理使用相同的应用程序和批处理ID,这将有助于正确运行时识别重复的写和忽略它们。应用程序ID (txnAppId
)可以是任何用户生成唯一的字符串,不需要流相关ID。
警告
如果你删除流检查点和重启查询新的关卡,你必须提供一个不同的appId
;否则,写的重新启动查询将被忽略,因为它将包含相同的txnAppId
和批处理ID将从0开始。
相同的DataFrameWriter
选项可以用来实现在非幂等写工作。有关详细信息,使幂等写在工作。
例子
app_id=…#一个独一无二的字符串用作应用程序ID。defwriteToDeltaLakeTableIdempotent(batch_df,batch_id):batch_df。写。格式(…)。选项(“txnVersion”,batch_id)。选项(“txnAppId”,app_id)。保存(…)#位置1batch_df。写。格式(…)。选项(“txnVersion”,batch_id)。选项(“txnAppId”,app_id)。保存(…)#位置2
瓦尔appId=…/ /一个独一无二的字符串用作应用程序ID。streamingDF。writeStream。foreachBatch{(batchDF:DataFrame,batchId:长)= >batchDF。写。格式(…)。选项(“txnVersion”,batchId)。选项(“txnAppId”,appId)。保存(…)/ /位置1batchDF。写。格式(…)。选项(“txnVersion”,batchId)。选项(“txnAppId”,appId)。保存(…)/ /位置2}
执行stream-static连接
你可以依赖于事务担保和三角洲湖执行版本控制协议stream-static连接。stream-static加入加入最新的有效版本增量表(静态数据)的数据流使用无状态连接。
当砖过程数据的micro-batch stream-static加入,从静态三角洲的最新有效版本数据表连接的记录出现在当前micro-batch。因为连接是无状态的,你不需要配置水印和可以处理结果与低延迟。静态三角洲表中的数据使用的加入应该不常更改。
streamingDF=火花。readStream。表(“订单”)staticDF=火花。读。表(“顾客”)查询=(streamingDF。加入(staticDF,streamingDF。customer_id= =staticDF。id,“内心”)。writeStream。选项(“checkpointLocation”,checkpoint_path)。表(“orders_with_customer_info”))
插入从流媒体查询使用foreachBatch
您可以使用的组合合并
和foreachBatch
写复杂的从流媒体查询插入到三角洲表。看到使用foreachBatch写任意数据和结构化流水槽。
这种模式有许多应用程序,包括以下几点:
写流聚集在更新模式:这是更有效的比完整的模式。
数据库更改流写入一个增量表:合并查询写更改数据可以用在
foreachBatch
不断应用流变化δ表。数据流写入三角洲与重复数据删除表:纯插入合并查询重复数据删除可以用在
foreachBatch
不断写入数据(副本)三角洲表自动重复数据删除。
请注意
确保你的
合并
声明内foreachBatch
是幂等的重启流查询可以应用相同的一批数据上的操作很多次了。当
合并
被用在foreachBatch
,输入数据流查询(通过报道StreamingQueryProgress
和可见的笔记本率图)可能被报道为多个实际的速率生成的数据来源。这是因为合并
读取输入数据多次导致输入指标成倍增加。如果这是一个瓶颈,您可以缓存批DataFrame之前合并
然后uncache之后合并
。
下面的例子演示了如何使用SQLforeachBatch
完成这项任务:
/ /函数upsert microBatchOutputDF使用合并到三角洲表defupsertToDelta(microBatchOutputDF:DataFrame,batchId:长){/ /设置dataframe视图名称microBatchOutputDF。createOrReplaceTempView(“更新”)/ /使用视图名称申请合并/ /注意:必须使用的SparkSession用于定义dataframe“更新”microBatchOutputDF。sparkSession。sql(”“”合并成骨料t使用更新的年代在年代。关键= t.key当匹配更新设置*当不匹配插入*”“”)}/ /写的输出流聚合查询到三角洲表streamingAggregatesDF。writeStream。格式(“δ”)。foreachBatch(upsertToDelta_)。outputMode(“更新”)。开始()
#使用合并函数向三角洲upsert microBatchOutputDF表defupsertToDelta(microBatchOutputDF,batchId):#设置dataframe视图名称microBatchOutputDF。createOrReplaceTempView(“更新”)#使用视图名称申请合并#注意:必须使用的SparkSession用于定义dataframe“更新”#在砖运行时的10.5和下面,你必须使用以下:# microBatchOutputDF._jdf.sparkSession () . sql (“””microBatchOutputDF。sparkSession。sql(”“”合并成骨料t使用更新的年代在年代。关键= t.key当匹配更新设置*当不匹配插入*”“”)#流聚合查询的输出写入δ表(streamingAggregatesDF。writeStream。格式(“δ”)。foreachBatch(upsertToDelta)。outputMode(“更新”)。开始())
您还可以选择使用三角洲湖api来执行流插入,如以下示例:
进口io。δ。表。*瓦尔deltaTable=DeltaTable。forPath(火花,“/数据/聚合物”)/ /函数upsert microBatchOutputDF使用合并到三角洲表defupsertToDelta(microBatchOutputDF:DataFrame,batchId:长){deltaTable。作为(“t”)。合并(microBatchOutputDF。作为(“s”),”年代。关键= t.key”)。whenMatched()。updateAll()。whenNotMatched()。insertAll()。执行()}/ /写的输出流聚合查询到三角洲表streamingAggregatesDF。writeStream。格式(“δ”)。foreachBatch(upsertToDelta_)。outputMode(“更新”)。开始()
从delta.tables进口*deltaTable=DeltaTable。forPath(火花,“/数据/聚合物”)#使用合并函数向三角洲upsert microBatchOutputDF表defupsertToDelta(microBatchOutputDF,batchId):(deltaTable。别名(“t”)。合并(microBatchOutputDF。别名(“s”),”年代。关键= t.key”)。whenMatchedUpdateAll()。whenNotMatchedInsertAll()。执行())#流聚合查询的输出写入δ表(streamingAggregatesDF。writeStream。格式(“δ”)。foreachBatch(upsertToDelta)。outputMode(“更新”)。开始())