水印应用于控制数据处理的阈值
本文介绍了数字水印的基本概念,并提供建议使用共同的水印有状态流操作。你必须申请水印有状态的流操作,以避免无限扩大的数据量保持状态,从而引入内存问题和增加在长期运行流操作期间处理延迟。
水印是什么?
结构化流使用水印来控制阈值多长时间继续处理更新对于一个给定的实体状态。国有实体的常见例子包括:
聚合在一个时间窗口。
独特的键联接两个流。
当你声明一个水印,你指定一个时间戳字段和流DataFrame水印阈值。随着新数据到来,国家经理跟踪最近的时间戳在指定的领域和流程中的所有记录迟到阈值。
下面的示例10分钟的水印阈值适用于窗口数:
从pyspark.sql.functions进口窗口(df。withWatermark(“event_time”,“十分钟”)。groupBy(窗口(“event_time”,“5分钟”),“id”)。数())
在这个例子中:
的
event_time
列是用来定义一个10分钟的水印和5分钟暴跌窗口。收集计数为每个
id
观察每个重叠5分钟窗口。为每个计数状态信息维护到窗口10分钟以上最新的观察
event_time
。
重要的
水印阈值保证记录内到达指定的阈值处理根据定义的语义查询。后面到达记录到达外指定的阈值可能仍然使用查询处理指标,但这是没有保证的。
水印为窗口的聚合和输出模式
下表的细节处理查询与聚合时间戳水印定义:
输出模式 |
行为 |
---|---|
附加 |
行写入目标表一旦水印阈值已经过去了。所有的写操作都被推迟基于迟到阈值。旧的聚合状态阈值已通过删除一次。 |
更新 |
行写入目标表的计算结果,可以更新和覆盖新数据到来。旧的聚合状态阈值已通过删除一次。 |
完整的 |
聚合状态并不下降。目标表与每个触发器重写。 |
stream-stream加入水印和输出
多个流只支持append模式之间的连接,和匹配的记录都写在每一批他们发现。内部连接,砖建议设置水印阈值在每个流数据来源。这允许状态信息被丢弃老的记录。没有水印,结构化流试图加入每个键加入每个触发器的两边。
结构化流有特殊的语义支持外部连接。水印是强制性的外部连接,因为它表明当一个关键必须用null值后无可匹敌的。注意,尽管外部连接可以用于记录记录不匹配在数据处理过程中,因为只加入写入表附加操作,这没有记录丢失的数据,直到迟到阈值后已经过去了。
控制后期数据阈值与多个水印政策结构化流
当使用多个结构化流输入,您可以设置多个水印来控制公差为后面到达数据阈值。配置水印允许您控制状态信息和影响延迟。
流媒体查询可以有多个输入流联合或连接在一起。每一个输入流可以有不同的阈值的数据需要容忍有状态操作。指定这些阈值使用withWatermarks (“eventTime”,延迟)
对每一个输入流。下面是一个示例查询stream-stream连接。
瓦尔inputStream1=…/ /延迟1小时瓦尔inputStream2=…/ /延迟2个小时inputStream1。withWatermark(“eventTime1”,“1小时”)。加入(inputStream2。withWatermark(“eventTime2”,“两小时”),joinCondition)
在运行查询,结构化流单独跟踪最大事件时间出现在每一个输入流,计算水印根据相应的延迟,并选择一个全球水印用于有状态操作。默认情况下,选择最小作为全球水印,因为它确保没有数据意外下降太晚如果别人背后的溪流瀑布之一(例如,一个流停止接收数据由于上游失败)。换句话说,全球水印在最慢的速度流和安全地移动查询输出相应延迟。
如果你想要更快的结果,您可以设置多个水印策略选择最大值随着全球水印通过设置SQL配置spark.sql.streaming.multipleWatermarkPolicy
来马克斯
(默认是最小值
)。这让全球水印以最快的速度流。然而,这种配置下降最慢的数据流。正因为如此,砖建议你谨慎使用此配置。
减少重复在水印
在砖运行时的13.1及以上,你可以删除处理记录在一个水印阈值使用一个惟一的标识符。
结构化流提供只有一次处理担保,但不会自动删除处理记录数据来源。您可以使用dropDuplicatesWithinWatermark
删除处理记录在任何指定的字段,允许您删除重复从流,即使某些领域有所不同(例如事件时间和到达时间)。
重复的记录,在到达指定的水印是保证。这个保证是严格的只有一个方向,和重复的记录之外,到达指定的阈值也可能下降。你必须设置水印的延迟阈值超过最大时间戳差异删除所有重复重复的事件。
您必须指定一个水印使用dropDuplicatesWithinWatermark
方法,如以下示例:
streamingDf=火花。readStream。…#删除处理使用guid列与水印基于eventTime列(streamingDf。withWatermark(“eventTime”,“10个小时”)。dropDuplicatesWithinWatermark(“guid”))
瓦尔streamingDf=火花。readStream。…/ /列:guid, eventTime,…/ /删除处理使用guid列与水印基于eventTime列streamingDf。withWatermark(“eventTime”,“10个小时”)。dropDuplicatesWithinWatermark(“guid”)