Skip to content

Arbitrary Stateful Streaming Aggregation

Arbitrary Stateful Streaming Aggregation is a streaming aggregation query that uses the following KeyValueGroupedDataset operators:

KeyValueGroupedDataset represents a grouped dataset as a result of Dataset.groupByKey operator.

mapGroupsWithState and flatMapGroupsWithState operators use GroupState as group streaming aggregation state that is created separately for every aggregation key with an aggregation state value (of a user-defined type).

mapGroupsWithState and flatMapGroupsWithState operators use GroupStateTimeout as an aggregation state timeout that defines when a GroupState is considered timed-out (expired).

Demos

Use the following demos and complete applications to learn more:

Performance Metrics

Arbitrary Stateful Streaming Aggregation uses performance metrics (of the StateStoreWriter through FlatMapGroupsWithStateExec physical operator).

Internals

One of the most important internal execution components of Arbitrary Stateful Streaming Aggregation is FlatMapGroupsWithStateExec physical operator.

When executed, FlatMapGroupsWithStateExec first validates a selected GroupStateTimeout:

Note

FIXME When are the above requirements met?

FlatMapGroupsWithStateExec physical operator then mapPartitionsWithStateStore with a custom storeUpdateFunction of the following signature:

(StateStore, Iterator[T]) => Iterator[U]

While generating the recipe, FlatMapGroupsWithStateExec uses StateStoreOps extension method object to register a listener that is executed on a task completion. The listener makes sure that a given StateStore has all state changes either committed or aborted.

In the end, FlatMapGroupsWithStateExec creates a new StateStoreRDD and adds it to the RDD lineage.

StateStoreRDD is used to properly distribute tasks across executors (per preferred locations) with help of StateStoreCoordinator (that runs on the driver).

StateStoreRDD uses StateStore helper to look up a StateStore by StateStoreProviderId and store version.

FlatMapGroupsWithStateExec physical operator uses state managers that are different than state managers for Streaming Aggregation. StateStore abstraction is the same as in Streaming Aggregation.

One of the important execution steps is when InputProcessor (of FlatMapGroupsWithStateExec physical operator) is requested to callFunctionAndUpdateState. That executes the user-defined state function on a per-group state key object, value objects, and a GroupStateImpl.