优化状态结构化流查询
管理有状态的中间状态信息结构化流查询可以帮助防止意想不到的延迟和生产问题。
使用多个状态运营商结构化流
在砖运行时的13.1及以上,砖提供先进的支持结构化流状态运营商工作负载。现在可以把多个状态操作串在一起,这意味着你可以养活一个操作的输出作为另一个窗口的聚合状态连接等操作。
您可以使用以下的示例将演示几个模式。
重要的
存在以下局限性在处理多个状态运营商:
FlatMapGroupWithState
不支持。只支持附加的输出模式。
链接时间窗聚合
单词=…#流DataFrame模式{时间戳:时间戳,词:字符串}#组数据窗口和词,计算每组的数量windowedCounts=单词。groupBy(窗口(单词。时间戳,“十分钟”,“5分钟”),单词。词)。数()#组由另一个窗口,窗口的数据的话,计算每组的数量anotherWindowedCounts=windowedCounts。groupBy(窗口(window_time(windowedCounts。窗口),“1小时”),windowedCounts。词)。数()
进口火花。值得一提的。_瓦尔单词=…/ /流DataFrame模式{时间戳:时间戳,词:字符串}/ /组数据窗口和词,计算每组的数量瓦尔windowedCounts=单词。groupBy(窗口(美元“时间戳”,“十分钟”,“5分钟”),美元“单词”)。数()/ /组由另一个窗口,窗口的数据的话,计算每组的数量瓦尔anotherWindowedCounts=windowedCounts。groupBy(窗口(美元“窗口”,“1小时”),美元“单词”)。数()
时间窗口聚集在两个不同的流紧随其后加入stream-stream窗口
clicksWindow=clicksWithWatermark。groupBy(clicksWithWatermark。clickAdId,窗口(clicksWithWatermark。clickTime,“1小时”))。数()impressionsWindow=impressionsWithWatermark。groupBy(impressionsWithWatermark。impressionAdId,窗口(impressionsWithWatermark。impressionTime,“1小时”))。数()clicksWindow。加入(impressionsWindow,“窗口”,“内心”)
瓦尔clicksWindow=clicksWithWatermark。groupBy(窗口(“clickTime”,“1小时”))。数()瓦尔impressionsWindow=impressionsWithWatermark。groupBy(窗口(“impressionTime”,“1小时”))。数()clicksWindow。加入(impressionsWindow,“窗口”,“内心”)
Stream-stream时间间隔加入时间窗聚合紧随其后
加入=impressionsWithWatermark。加入(clicksWithWatermark,expr(”“”clickAdId = impressionAdId和clickTime > = impressionTime和clickTime < = impressionTime +间隔1小时”“”),“leftOuter”#可以“内心”、“leftOuter”、“rightOuter”,“fullOuter”、“leftSemi”)加入。groupBy(加入。clickAdId,窗口(加入。clickTime,“1小时”))。数()
瓦尔加入=impressionsWithWatermark。加入(clicksWithWatermark,expr(”“”clickAdId = impressionAdId和clickTime > = impressionTime和clickTime < = impressionTime +间隔1小时”“”),joinType=“leftOuter”/ /可以“内心”、“leftOuter”、“rightOuter”,“fullOuter”、“leftSemi”)加入。groupBy(美元“clickAdId”,窗口(美元“clickTime”,“1小时”))。数()
阻止减缓垃圾收集(GC)暂停状态流
如果你有状态操作流查询(如流聚合)和你想维护数以百万计的键的状态,那么你可能会面临相关问题大型JVM的垃圾收集(GC)暂停。这将导致高micro-batch处理时间的变化。这是因为你的JVM的内存维护默认状态数据。有大量的状态对象会压迫你的JVM内存,导致高GC暂停。
在这种情况下,您可以选择使用一个更优化的基于状态管理解决方案RocksDB。这个解决方案可以在砖运行时。JVM内存而不是保持状态,这个解决方案使用RocksDB本机内存中有效地管理国家和当地的SSD(例如类型与当地SSD)。此外,任何更改到这个状态是由结构化流到检查点位置自动保存你已经提供,因此提供完整的容错担保(默认状态管理)一样。为指令配置RocksDB国家商店,看到的配置RocksDB状态存储在砖。
推荐配置有状态结构化流砖
砖的建议:
使用compute-optimized实例作为工人。例如,谷歌云n1-highcpu-32实例。
调整分区的数量设置为1 - 2倍数量的集群中的核心。
设置
spark.sql.streaming.noDataMicroBatches.enabled
配置假
SparkSession。这可以防止流micro-batch引擎处理micro-batches不包含数据。还请注意,设置此配置假
可能导致有状态操作,利用水印或处理时间超时没有数据输出到新数据到达,而不是立即。
关于性能优势,RocksDB-based状态管理可以保持100倍比默认状态键。例如,在一个火花集群与谷歌云n1-highcpu-32实例作为工人,默认状态管理可以保持1 - 2百万状态键/执行器之后,JVM GC开始显著地影响性能。相比之下,RocksDB-based状态管理可以很容易地保持1亿状态键/ GC执行人没有任何问题。
请注意
状态管理方案无法改变之间查询重启。也就是说,如果一个查询已经开始使用默认管理,那么它不能改变没有从头开始查询新的检查点位置。