Arbitrary Stateful Streaming Aggregation¶
KeyValueGroupedDataset represents a grouped dataset as a result of Dataset.groupByKey operator.
flatMapGroupsWithState operators use GroupState as group streaming aggregation state that is created separately for every aggregation key with an aggregation state value (of a user-defined type).
Use the following demos and complete applications to learn more:
One of the most important internal execution components of Arbitrary Stateful Streaming Aggregation is FlatMapGroupsWithStateExec physical operator.
FlatMapGroupsWithStateExec first validates a selected GroupStateTimeout:
FIXME When are the above requirements met?
FlatMapGroupsWithStateExec physical operator then mapPartitionsWithStateStore with a custom
storeUpdateFunction of the following signature:
(StateStore, Iterator[T]) => Iterator[U]
While generating the recipe,
FlatMapGroupsWithStateExec uses StateStoreOps extension method object to register a listener that is executed on a task completion. The listener makes sure that a given StateStore has all state changes either committed or aborted.
In the end,
FlatMapGroupsWithStateExec creates a new StateStoreRDD and adds it to the RDD lineage.
One of the important execution steps is when
InputProcessor (of FlatMapGroupsWithStateExec physical operator) is requested to callFunctionAndUpdateState. That executes the user-defined state function on a per-group state key object, value objects, and a GroupStateImpl.