使用foreachBatch使用结构化流写入任意数据接收器
结构化流api提供了两种方法来将流查询的输出写入没有流接收器的数据源:foreachBatch ()
而且foreach ()
.
重用现有批处理数据源foreachBatch ()
streamingDF.writeStream.foreachBatch(…)
允许您指定对流查询的每个微批的输出数据执行的函数。它接受两个参数:具有微批输出数据的DataFrame或Dataset和微批的唯一ID。与foreachBatch
,你可以:
重用现有批处理数据源
对于许多存储系统,可能还没有可用的流接收器,但可能已经存在用于批处理查询的数据写入器。使用foreachBatch ()
,您可以在每个微批的输出上使用批数据写入器。下面是一些例子:
许多其他的批处理数据源可以从foreachBatch ()
.
写入到多个位置
如果你想将一个流查询的输出写入多个位置,那么你可以简单地多次写入输出DataFrame/Dataset。但是,每次写入尝试都可能导致重新计算输出数据(包括可能重新读取输入数据)。为了避免重新计算,您应该缓存输出DataFrame/Dataset,将其写入多个位置,然后取消缓存。这是一个大纲。
streamingDF.writeStream.foreachBatch{(batchDF:DataFrame,batchId:长)= >batchDF.坚持()batchDF.写.格式(…)。保存(…)// location 1batchDF.写.格式(…)。保存(…)// location 2batchDF.unpersist()}
请注意
上运行多个Spark作业batchDF
,流查询的输入数据速率(通过StreamingQueryProgress
在笔记本速率图中可见)可以报告为数据在源处生成的实际速率的倍数。这是因为在每个批处理的多个Spark作业中,可能会多次读取输入数据。
应用附加的数据帧操作
流式数据帧不支持许多DataFrame和Dataset操作,因为Spark不支持在这些情况下生成增量计划。使用foreachBatch ()
您可以对每个微批输出应用其中的一些操作。例如,你可以使用foreachBath ()
和SQL合并成
操作,以更新模式将流聚合的输出写入Delta表。详情请参阅合并成.
重要的
foreachBatch ()
只提供至少一次写入保证。但是,您可以使用batchId
提供给函数作为重复删除输出并获得恰好一次保证的方法。在任何一种情况下,您都必须自己对端到端语义进行推理。foreachBatch ()
不工作与连续处理模式因为它基本上依赖于流查询的微批处理执行。如果在连续模式下写入数据,请使用foreach ()
代替。
可以调用空数据框架foreachBatch ()
用户代码需要具有弹性,以允许正确的操作。下面是一个例子:
.foreachBatch((outputDf:DataFrame,报价:长)= >{//只处理有效的数据帧如果(!outputDf.isEmpty){//业务逻辑}}).开始()
写入到使用的任何位置foreach ()
如果foreachBatch ()
不是一个选项(例如,您正在使用低于4.2的Databricks运行时,或对应的批数据写入器不存在),那么您可以使用foreach ()
.具体来说,可以将数据写入逻辑分为三种方法:open ()
,过程()
,close ()
.
有关示例,请参见在Scala和Python中使用foreach()写入Amazon DynamoDB.
使用Scala或Java
在Scala或Java中,扩展类ForeachWriter:
datasetOfString.writeStream.foreach(新ForeachWriter[字符串]{def开放(partitionId:长,版本:长):布尔={//打开连接}def过程(记录:字符串)={//写入字符串到连接}def关闭(errorOrNull:Throwable):单位={//关闭连接}}).开始()
使用Python
在Python中,可以调用foreach
有两种方式:在函数中或在对象中。该函数提供了一种表达处理逻辑的简单方法,但当失败导致某些输入数据的重新处理时,它不允许您重复删除生成的数据。对于这种情况,必须在对象中指定处理逻辑。
的函数接受一行作为输入。
defprocessRow(行)://写行来存储查询=streamingDF.writeStream.foreach(processRow).开始()
的对象有一个
过程
方法和可选开放
而且关闭
方法:类ForeachWriter:def开放(自我,partition_id,epoch_id)://开放连接.这方法是可选在Python.def过程(自我,行)://写行来连接.这方法是不可选在Python.def关闭(自我,错误)://关闭的连接.这方法是可选在Python.查询=streamingDF.writeStream.foreach(ForeachWriter()).开始()
执行语义
当流查询启动时,Spark以以下方式调用函数或对象的方法:
此对象的单个副本负责查询中单个任务生成的所有数据。换句话说,一个实例负责处理以分布式方式生成的数据的一个分区。
此对象必须是可序列化的,因为每个任务都将获得所提供对象的一个新的序列化-反序列化副本。因此,强烈建议在调用后进行写入数据的初始化(例如,打开连接或启动事务)
open ()
方法,该方法表示任务已准备好生成数据。方法的生命周期如下:
对于每个分区
partition_id
:对于流数据的每批/历元
epoch_id
:方法
打开(partitionIdepochId)
被称为。如果
打开(…)
对于分区和批处理/epoch方法中的每一行,返回true过程(行)
被称为。方法
关上(错误)
在处理行时看到错误(如果有)时调用。的
close ()
方法(如果它存在)open ()
方法存在并成功返回(与返回值无关),除非JVM或Python进程在中途崩溃。
请注意
的partitionId
而且epochId
在open ()
方法可用于在失败导致重新处理某些输入数据时重复删除生成的数据。这取决于查询的执行模式。如果流查询是在微批处理模式下执行的,那么每个分区都由唯一的元组表示(partition_idepoch_id)
保证有相同的数据。因此,(partition_idepoch_id)
可用于重复数据删除和/或事务性提交数据,并实现精确一次的保证。但是,如果流查询是在连续模式下执行的,那么这种保证不成立,因此不应该用于重复数据删除。