异步状态检查点结构化流
请注意
在砖运行时10.3及以上。
状态流查询瓶颈状态更新,使异步状态检查点可以降低端到端延迟不牺牲任何容错担保,但与未成年人的成本更高的启动延迟。
结构化流默认使用同步检查点。每个micro-batch确保批处理的所有状态更新备份在云存储(称为“检查点位置”)开始前下一批。如果有状态流查询失败,所有micro-batches除了最后micro-batch检查点。重新启动,只有最后一批需要重新运行。快速恢复与同步检查点付出更高的成本为每个micro-batch延迟。
异步状态检查点试图执行异步执行检查点,以便micro-batch不必等待检查点完成。换句话说,接下来micro-batch就可以开始计算之前的micro-batch已经完成。然而,内部抵消元数据(也保存在检查点位置)跟踪状态是否为micro-batch检查点已经完成。对查询重启,多个micro-batch可能需要重新执行,最后micro-batch的计算是不完整的,以及一个micro-batch检查点之前的状态是不完整的。得到相同的容错担保(即只有一次担保与幂等沉)的同步检查点。
确定结构化流负载受益于异步检查点
以下是流媒体工作特点,可能受益于异步状态检查点。
工作有一个或多个有状态操作(如聚合,
flatMapGroupsWithState
,mapGroupsWithState
stream-stream连接)检查点状态延迟的主要贡献者之一总体执行批处理延迟。这些信息可以在找到StreamingQueryProgress事件。这些事件被发现在log4j日志引发司机。下面是一个示例流查询的进步和如何找到状态检查点影响整个批处理执行延迟。
-
{“id”:“2 e3495a2-de2c-4a6a-9a8e-f6d4c4796f19”,“runId”:“e36e9d7e-d2b1-4a43-b0b3-e875e767e1fe”,“…”,“batchId”:0,“durationMs”:{“…”,“triggerExecution”:547730年,“…”},“stateOperators”:({“…”,“commitTimeMs”:3186626,“numShufflePartitions”:64年,“…”})}
检查点状态延迟分析以上查询事件进展
批处理时间(
durationMs.triggerDuration
)是547秒左右。状态存储提交延迟(
stateOperations [0] .commitTimeMs
)是3186秒左右。提交延迟聚合在任务包含存储状态。在这种情况下,有64个这样的任务(stateOperators [0] .numShufflePartitions
)。每个任务包含国家运营商平均需要50秒(3186/64)检查站。这是一个额外的延迟,导致了批处理时间。假设所有64个任务并行运行,检查点一步贡献了约9%(50秒/ 547秒)的批处理时间。最大并发任务时更高比例小于64。
-
启用异步状态检查点
设置以下配置流媒体的工作。异步检查点需要一个状态存储实现支持异步提交。目前只有基于RocksDB状态存储实现支持它。
火花。相依。集(“spark.databricks.streaming.statefulOperator.asyncCheckpoint.enabled”,“真正的”)火花。相依。集(“spark.sql.streaming.stateStore.providerClass”,“com.databricks.sql.streaming.state.RocksDBStateStoreProvider”)
限制和要求异步检查点
请注意
计算伸缩扩展限制了集群大小结构化流工作负载。砖建议使用三角洲表与增强的自动定量直播工作负载。看到增强的自动定量是多少?。
任何故障在异步检查站在任何一个或多个存储查询失败。在同步检查点模式下,执行检查点的一部分任务,引发多次重试任务之前失败的查询。这种机制不存在与异步状态检查点。然而,使用砖工作重试,这种失败可以自动重试。
异步检查点时效果最好状态存储位置不改变micro-batch之间执行。集群调整,结合异步状态检查点,可能不适合,因为状态存储实例可能会re-distributed随着节点的增加或删除集群调整事件的一部分。
异步状态只支持检查点RocksDB状态存储提供程序实现。默认的内存状态存储实现不支持它。