指定初始状态mapGroupsWithState

您可以指定一个用户定义的初始状态为结构化流状态处理使用flatMapGroupsWithStatemapGroupsWithState。这可以让你避免再处理数据时开始有状态流没有一个有效的检查点。

defmapGroupsWithState(年代:编码器,U:编码器)(timeoutConf:GroupStateTimeout,initialState:KeyValueGroupedDataset(K,年代))(函数:(K,迭代器(V),GroupState(年代])= >U):数据集(U]defflatMapGroupsWithState(年代:编码器,U:编码器)(outputMode:OutputMode,timeoutConf:GroupStateTimeout,initialState:KeyValueGroupedDataset(K,年代))(函数:(K,迭代器(V),GroupState(年代])= >迭代器(U])

指定一个初始状态的示例用例flatMapGroupsWithState接线员:

瓦尔fruitCountFunc=(关键:字符串,:迭代器(字符串),状态:GroupState(RunningCount])= >{瓦尔=状态getOption地图(_)。getOrElse(0 l)+valList大小状态更新(RunningCount())迭代器((关键,toString))}瓦尔fruitCountInitialDS:数据集((字符串,RunningCount)]=Seq((“苹果”,RunningCount(1)),(“橙色”,RunningCount(2)),(“芒果”,RunningCount(5)),)。托托()瓦尔fruitCountInitial=initialStategroupByKey(x= >x_1)。mapValues(__2)fruitStreamgroupByKey(x= >x)flatMapGroupsWithState(更新,GroupStateTimeoutNoTimeout,fruitCountInitial)(fruitCountFunc)

指定一个初始状态的示例用例mapGroupsWithState接线员:

瓦尔fruitCountFunc=(关键:字符串,:迭代器(字符串),状态:GroupState(RunningCount])= >{瓦尔=状态getOption地图(_)。getOrElse(0 l)+valList大小状态更新(RunningCount())(关键,toString)}瓦尔fruitCountInitialDS:数据集((字符串,RunningCount)]=Seq((“苹果”,RunningCount(1)),(“橙色”,RunningCount(2)),(“芒果”,RunningCount(5)),)。托托()瓦尔fruitCountInitial=initialStategroupByKey(x= >x_1)。mapValues(__2)fruitStreamgroupByKey(x= >x)mapGroupsWithState(GroupStateTimeoutNoTimeout,fruitCountInitial)(fruitCountFunc)