Skip to content

StreamingAggregationStateManager

StreamingAggregationStateManager is the <> of <> that act as middlemen between state stores and the physical operators used in Streaming Aggregation (e.g. StateStoreSaveExec and StateStoreRestoreExec).

[[contract]] .StreamingAggregationStateManager Contract [cols="1m,2",options="header",width="100%"] |=== | Method | Description

| commit a| [[commit]]

[source, scala]

commit( store: StateStore): Long


Commits all updates (changes) to the given StateStore and returns the new version

Used when StateStoreSaveExec physical operator is executed.

| get a| [[get]]

[source, scala]

get(store: StateStore, key: UnsafeRow): UnsafeRow

Looks up the value of the key from the StateStore (the key is non-null)

Used exclusively when StateStoreRestoreExec physical operator is executed.

| getKey a| [[getKey]]

[source, scala]

getKey(row: UnsafeRow): UnsafeRow

Extracts the columns for the key from the input row

Used when:

| getStateValueSchema a| [[getStateValueSchema]]

[source, scala]

getStateValueSchema: StructType

Gets the schema of the values in StateStores

Used when StateStoreRestoreExec and StateStoreSaveExec physical operators are executed

| iterator a| [[iterator]]

[source, scala]

iterator( store: StateStore): Iterator[UnsafeRowPair]


Returns all UnsafeRow key-value pairs in the given StateStore

Used exclusively when StateStoreSaveExec physical operator is executed.

| keys a| [[keys]]

[source, scala]

keys(store: StateStore): Iterator[UnsafeRow]

Returns all the keys in the given StateStore

Used exclusively when physical operators with WatermarkSupport are requested to removeKeysOlderThanWatermark (when StateStoreSaveExec physical operator is executed).

| put a| [[put]]

[source, scala]

put( store: StateStore, row: UnsafeRow): Unit


Stores (puts) the given row in the given StateStore

Used exclusively when StateStoreSaveExec physical operator is executed.

| remove a| [[remove]]

[source, scala]

remove( store: StateStore, key: UnsafeRow): Unit


Removes the key-value pair from the given StateStore per key

Used exclusively when StateStoreSaveExec physical operator is executed (directly or indirectly as a WatermarkSupport)

| values a| [[values]]

[source, scala]

values( store: StateStore): Iterator[UnsafeRow]


All values in the given StateStore

Used exclusively when StateStoreSaveExec physical operator is executed.

|===

[[supportedVersions]] StreamingAggregationStateManager supports <> (per the spark.sql.streaming.aggregation.stateFormatVersion internal configuration property):

[[implementations]] NOTE: StreamingAggregationStateManagerBaseImpl is the one and only known direct implementation of the <> in Spark Structured Streaming.

NOTE: StreamingAggregationStateManager is a Scala sealed trait which means that all the <> are in the same compilation unit (a single file).

=== [[createStateManager]] Creating StreamingAggregationStateManager Instance -- createStateManager Factory Method

[source, scala]

createStateManager( keyExpressions: Seq[Attribute], inputRowAttributes: Seq[Attribute], stateFormatVersion: Int): StreamingAggregationStateManager


createStateManager creates a new StreamingAggregationStateManager for a given stateFormatVersion:

createStateManager throws a IllegalArgumentException for any other stateFormatVersion:

Version [stateFormatVersion] is invalid

createStateManager is used when StateStoreRestoreExec and StateStoreSaveExec physical operators are created.


Last update: 2020-11-28