如何重新启动结构流查询

学习从上写偏移重开结构流查询

写由亚当巴夫拉卡

2022年5月18日

假设

流水流运行窗口聚合查询 从ApacheKafka读并写文件附加模式化您想升级应用并重新启动查询,偏差等于最后写偏差放弃所有未写入水槽的状态信息,从最早偏移开始处理并相应修改检查站目录

升级应用码后使用现有检查站时,前应用版老式状态和对象会重新使用,结果产生意外输出,如从老式源码读取或用老式应用码处理

求解

Apachespark检查站执行对象和二进制对象保持状态因此无法修改检查站目录以输入记录复制更新偏移器并存储到文件或数据库中初始化下次重开时读取并使用同值读Stream.确定删除检查站目录

当前偏移使用异步APIs

%scala    spark.streams.addListener(new StreamingQueryListener() {      override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = {          println("Query started:" + queryStarted.id)      }      override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): Unit = {          println("Query terminated" + queryTerminated.id)      }      override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = {       println("Query made progress")          println("Starting offset:" + queryProgress.progress.sources(0).startOffset)          println("Ending offset:" + queryProgress.progress.sources(0).endOffset)          //Logic to save these offsets      }  })

可使用读Stream上表进程最近写下减法

scala选项(“启动非控件”、“”{'articleA'{#023#1}#1}#B{#0#2

输入流记录模式如下:

root++++++++++++++++++++++++++++++++++++++

并实现逻辑保存并更新数据库并下次重开读取

文章有帮助吗