指定初始状态mapGroupsWithState
您可以指定一个用户定义的初始状态为结构化流状态处理使用flatMapGroupsWithState
或mapGroupsWithState
。这可以让你避免再处理数据时开始有状态流没有一个有效的检查点。
defmapGroupsWithState(年代:编码器,U:编码器)(timeoutConf:GroupStateTimeout,initialState:KeyValueGroupedDataset(K,年代))(函数:(K,迭代器(V),GroupState(年代])= >U):数据集(U]defflatMapGroupsWithState(年代:编码器,U:编码器)(outputMode:OutputMode,timeoutConf:GroupStateTimeout,initialState:KeyValueGroupedDataset(K,年代))(函数:(K,迭代器(V),GroupState(年代])= >迭代器(U])
指定一个初始状态的示例用例flatMapGroupsWithState
接线员:
瓦尔fruitCountFunc=(关键:字符串,值:迭代器(字符串),状态:GroupState(RunningCount])= >{瓦尔数=状态。getOption。地图(_。数)。getOrElse(0 l)+valList。大小状态。更新(新RunningCount(数))迭代器((关键,数。toString))}瓦尔fruitCountInitialDS:数据集((字符串,RunningCount)]=Seq((“苹果”,新RunningCount(1)),(“橙色”,新RunningCount(2)),(“芒果”,新RunningCount(5)),)。托托()瓦尔fruitCountInitial=initialState。groupByKey(x= >x。_1)。mapValues(_。_2)fruitStream。groupByKey(x= >x)。flatMapGroupsWithState(更新,GroupStateTimeout。NoTimeout,fruitCountInitial)(fruitCountFunc)
指定一个初始状态的示例用例mapGroupsWithState
接线员:
瓦尔fruitCountFunc=(关键:字符串,值:迭代器(字符串),状态:GroupState(RunningCount])= >{瓦尔数=状态。getOption。地图(_。数)。getOrElse(0 l)+valList。大小状态。更新(新RunningCount(数))(关键,数。toString)}瓦尔fruitCountInitialDS:数据集((字符串,RunningCount)]=Seq((“苹果”,新RunningCount(1)),(“橙色”,新RunningCount(2)),(“芒果”,新RunningCount(5)),)。托托()瓦尔fruitCountInitial=initialState。groupByKey(x= >x。_1)。mapValues(_。_2)fruitStream。groupByKey(x= >x)。mapGroupsWithState(GroupStateTimeout。NoTimeout,fruitCountInitial)(fruitCountFunc)