异步定点结构流

注解

databricks运行时10.3

状态异常检查站可减少端对端延时, 不牺牲任何容错保证,

结构流默认使用同步检查站微批量保证批量中所有状态更新在启动下批量前备份云存储如果静态流查询失败,除上小批量外所有微插件都受检查站检查重新开机时,只需重运行最后批量 。快速恢复同步检查站 代价是提高单批微量延时

流态检验模式

异步检查站试图异步执行检查站作业,这样微批执行不必等待检查站完工换句话说,下次微批量计算完成后立即启动内部计算元数据(并保存检查站位置)跟踪州检是否完成微批处理查询重开时,多件微批量可能需重新执行-最后一个微批量计算不完全,以及前一个微批次前状态检查站不完全并获得与同步检查站相同的容错保证(即完全从保证到一流水槽)。

识别结构流工作从异步检查站获益

下流作业特征可受益于异步状态检查站

  • 作业有一个或多个状态运算flatMapGroupsWithState,mapGroupsWithState流水相接

  • 状态检查站延时率是批量执行延时率的主要推理信息可见StreamingQueryProgress事件传导Spark驱动器日志中也发现这些事件例流查询进度和如何发现状态检查站对批量执行总体延时的影响

    • {{识别码:"2e3495a2-de2c-4a6a-9a8e-f6d4c4796f19",运行标识:"e36e9d7e-d2b1-4a43-b0b3-e875e767e1fe",.,批处理:0,句号:{{.,"triggerExecution":547730,.},"stateOperators":[{{.,填充时间Ms:318626,"numShufflePartitions":64码,.}}
    • 状态检查站延迟分析以上查询进度事件

      • 批量持续时间durationMs.triggerDuration约547秒

      • 状态存储连接延迟度stateOperations[0].commitTimeMs约3 186秒委托延时跨任务汇总状态存储本案有64项任务stateOperators[0].numShufflePartitions)

      • 包含状态运算符的每项任务平均50秒(3,186/64)用于检查站额外延时促成批量持续假设所有64项任务同时运行,检查站步骤贡献约批量持续量的9%(50秒/547秒)。最大并发任务小于64时百分比会更高

启动异步状态检查站

集流作业后方配置Async检查站需要状态存储实现支持aync当前唯一RocksDB基础状态存储实现支持它

点火.conf.高山市"spark.databricks.streaming.statefulOperator.asyncCheckpoint.enabled",真实性)点火.conf.高山市"spark.sql.streaming.stateStore.providerClass","com.databricks.sql.streaming.state.RocksDBStateStoreProvider")

异步检查站限制和需求

注解

计算自标量限制结构流工作量集群缩放Databricks建议使用Delta直播表增强自动缩放看吧增强自动缩放.

  • 异步检查站中出现故障时,一个或多个商店失效查询同步检查站作为任务执行Spark重试多次机制不存在异步状态检查站使用数据布列克作业重试,这些故障可自动重试

  • 异步检查站最有效使用时,国库位置不因微批执行而改变集群重定位加上异步状态检查站可能效果不好,因为状态存储实例可能随着节点作为集群重定位事件的一部分被增删而重新分布

  • 异步状态检查站仅在 RocksDB状态存储提供程序实现中支持默认模拟状态存储实现不支持