使用foreachBatch编写任意数据汇
结构化流api提供两种方式查询的输出流写入数据源没有现有的流水槽:foreachBatch ()
和foreach ()
。
重用现有的批处理数据源foreachBatch ()
streamingDF.writeStream.foreachBatch (…)
允许您指定一个函数执行的输出数据流的每个micro-batch查询。它接受两个参数:一个DataFrame或数据集的输出数据micro-batch micro-batch的惟一ID。与foreachBatch
,您可以:
重用现有的批处理的数据源
对于许多存储系统来说,可能没有一个流水槽可用,但可能已经存在一个数据作家对于批处理查询。使用foreachBatch ()
,您可以使用批处理数据作家每个micro-batch的输出。下面是一些例子:
许多其他的批处理数据源可以使用从foreachBatch ()
。
写入多个位置
如果你想查询的输出流写入多个位置,那么您可以简单地编写多次输出DataFrame /数据集。然而,每个试图写会导致输出数据重新计算(包括可能的重读输入数据)。为了避免重新计算,你应该缓存输出DataFrame /数据集,写多个位置,然后uncache它。这是一个大纲。
streamingDF。writeStream。foreachBatch{(batchDF:DataFrame,batchId:长)= >batchDF。坚持()batchDF。写。格式(…)。保存(…)/ /位置1batchDF。写。格式(…)。保存(…)/ /位置2batchDF。unpersist()}
请注意
如果你运行多个引发工作batchDF
,输入数据流查询(通过报道StreamingQueryProgress
和可见的笔记本率图)可能被报道为多个实际的速率生成的数据来源。这是由于输入数据可能在多个读多次引发每批工作。
申请额外DataFrame操作
许多DataFrame和数据集操作不支持流媒体DataFrames因为火花不支持生成增量计划在这些情况下。使用foreachBatch ()
你可以把这些操作在每个micro-batch输出。例如,您可以使用foreachBath ()
和SQL合并成
写操作的输出流聚合成三角洲表更新模式。看到更多的细节合并成。
重要的
foreachBatch ()
只提供“至少一次”写担保。然而,您可以使用batchId
提供的函数作为方法删除处理,得到一个仅一次保证的输出。在这两种情况下,你必须思考自己端到端语义。foreachBatch ()
不工作的吗连续处理模式因为它从根本上依赖于micro-batch查询的执行流。如果你写在连续模式下,使用的数据foreach ()
代替。
一个空dataframe可以调用foreachBatch ()
和用户代码需要有弹性,以便正确操作。一个例子所示:
。foreachBatch((outputDf:DataFrame,报价:长)= >{/ /流程有效的数据帧如果(!outputDf。isEmpty){/ /业务逻辑}})。开始()
写任何位置使用foreach ()
如果foreachBatch ()
不是一个选项(例如,您使用的是砖运行时低于4.2的,或不存在相应的批处理数据的作家),然后你可以表达你的定制作家逻辑使用吗foreach ()
。具体来说,你可以表达写作逻辑的数据分为三种方法:open ()
,过程()
,close ()
。
有关示例,请参见写信给亚马逊DynamoDB在Scala和Python使用foreach ()。
使用Scala或Java
在Scala或Java,您扩展类ForeachWriter:
datasetOfString。writeStream。foreach(新ForeachWriter(字符串]{def开放(partitionId:长,版本:长):布尔={/ /打开连接}def过程(记录:字符串)={/ /写字符串连接}def关闭(errorOrNull:Throwable):单位={/ /关闭连接}})。开始()
使用Python
在Python中,您可以调用foreach
在两个方面:在一个函数或一个对象。函数提供了一种简单的方式来表达你的处理逻辑,但不允许您删除处理失败时生成的数据导致一些输入数据的后处理。这种情况下你必须指定一个对象的处理逻辑。
的函数行作为输入。
defprocessRow(行):/ /写行来存储查询=streamingDF。writeStream。foreach(processRow)。开始()
的对象有一个
过程
方法和可选的开放
和关闭
方法:类ForeachWriter:def开放(自我,partition_id,epoch_id):/ /开放连接。这方法是可选在Python。def过程(自我,行):/ /写行来连接。这方法是不可选在Python。def关闭(自我,错误):/ /关闭的连接。这方法是可选在Python。查询=streamingDF。writeStream。foreach(ForeachWriter())。开始()
执行语义
流媒体查询开始时,火花调用函数或对象的方法在以下方式:
这个对象的一个副本负责所有数据查询中生成的一个任务。换句话说,一个实例负责处理一个分区的数据在一个分布式的方式生成。
这个对象必须是可序列化的,因为每个任务将得到一个新的serialized-deserialized提供对象的副本。因此,强烈建议任何写作初始化数据(例如,打开一个连接或启动一个事务)完成后调用
open ()
方法,它意味着任务准备生成数据。的生命周期方法如下:
为每个分区
partition_id
:对于每一批/时代的流数据
epoch_id
:方法
打开(partitionIdepochId)
被称为。如果
打开(…)
返回true,分区和批处理中的每一行/时代,方法过程(行)
被称为。方法
关上(错误)
被称为时遇到以下错误(如果有的话)处理的行。的
close ()
方法(如果它存在的话)如果一个open ()
方法存在并返回成功(不管返回值),除非中间JVM或Python程序崩溃。
请注意
的partitionId
和epochId
在open ()
方法可以用来删除处理失败时生成的数据导致一些输入数据的后处理。这取决于查询的执行模式。如果流查询被执行在micro-batch模式,然后每个分区代表一个独特的元组(partition_idepoch_id)
保证相同的数据。因此,(partition_idepoch_id)
可以用来删除处理和/或以事务的提交数据,实现只有一次担保。然而,如果流查询被执行在连续模式,那么这个保证不持有,因此不应该被用于重复数据删除。