Skip to content

StateStoreSaveExec Unary Physical Operator

StateStoreSaveExec is a unary physical operator that saves a streaming state to a state store with support for streaming watermark.

Note

A unary physical operator (UnaryExecNode) is a physical operator with a single <> physical operator.

Read up on https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-SparkPlan.html[UnaryExecNode] (and physical operators in general) in https://bit.ly/spark-sql-internals[The Internals of Spark SQL] book.

StateStoreSaveExec is <> when StatefulAggregationStrategy execution planning strategy is requested to plan a streaming aggregation for execution (Aggregate logical operators in the logical plan of a streaming query).

StateStoreSaveExec and StatefulAggregationStrategy

The optional properties, i.e. the <>, the <>, and the <>, are initially undefined when StateStoreSaveExec is <>. StateStoreSaveExec is updated to hold execution-specific configuration when IncrementalExecution is requested to prepare the logical plan (of a streaming query) for execution (when the state preparation rule is executed).

StateStoreSaveExec and IncrementalExecution

Note

Unlike StateStoreRestoreExec operator, StateStoreSaveExec takes output mode and event time watermark when created.

When <>, StateStoreSaveExec creates a StateStoreRDD to map over partitions with storeUpdateFunction that manages the StateStore.

StateStoreSaveExec creates StateStoreRDD

StateStoreSaveExec and StateStoreRDD (after streamingBatch.toRdd.count)

Note

The number of partitions of StateStoreRDD (and hence the number of Spark tasks) is what was defined for the <> physical plan.

There will be that many StateStores as there are partitions in StateStoreRDD.

NOTE: StateStoreSaveExec <> differently per output mode.

When <>, StateStoreSaveExec executes the <> physical operator and creates a StateStoreRDD (with storeUpdateFunction specific to the output mode).

[[output]] The output schema of StateStoreSaveExec is exactly the <>'s output schema.

[[outputPartitioning]] The output partitioning of StateStoreSaveExec is exactly the <>'s output partitioning.

[[stateManager]] StateStoreRestoreExec uses a StreamingAggregationStateManager (that is created for the keyExpressions, the output of the child physical operator and the stateFormatVersion).

[[logging]] [TIP] ==== Enable ALL logging level for org.apache.spark.sql.execution.streaming.StateStoreSaveExec to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.sql.execution.streaming.StateStoreSaveExec=ALL

Refer to <>.

=== [[metrics]] Performance Metrics (SQLMetrics)

StateStoreSaveExec uses the performance metrics as other stateful physical operators that write to a state store.

StateStoreSaveExec in web UI (Details for Query)

The following table shows how the performance metrics are computed (and so their exact meaning).

[cols="30,70",options="header",width="100%"] |=== | Name (in web UI) | Description

| total time to update rows a| [[allUpdatesTimeMs]] Time taken to read the input rows and store them in a state store (possibly filtering out expired rows per watermarkPredicateForData predicate)

The number of rows stored is the <> metric

| total time to remove rows a| [[allRemovalsTimeMs]]

| time to commit changes a| [[commitTimeMs]] Time taken for the StreamingAggregationStateManager to commit changes to a state store

| number of output rows a| [[numOutputRows]]

  • For <> output mode, the metric does not seem to be used

  • For <> output mode, the number of rows in a StateStore (i.e. all values in a StateStore in the <> that should be equivalent to the <> metric)

  • For <> output mode, the number of rows that the <> was requested to store in a state store (that did not expire per the optional watermarkPredicateForData predicate) that is equivalent to the <> metric)

| number of total state rows a| [[numTotalStateRows]] Number of entries in a state store at the very end of <> (aka numTotalStateRows)

Corresponds to numRowsTotal attribute in stateOperators in StreamingQueryProgress (and is available as sq.lastProgress.stateOperators for an operator).

| number of updated state rows a| [[numUpdatedStateRows]] Number of the entries that were stored as updates in a state store in a trigger and for the keys in the result rows of the upstream physical operator (aka numUpdatedStateRows)

  • For <> output mode, the number of input rows that have not expired yet (per the required watermarkPredicateForData predicate) and that the <> was requested to store in a state store (the time taken is the <> metric)

  • For <> output mode, the number of input rows (which should be exactly the number of output rows from the <>)

  • For <> output mode, the number of rows that the <> was requested to store in a state store (that did not expire per the optional watermarkPredicateForData predicate) that is equivalent to the <> metric)

Corresponds to numRowsUpdated attribute in stateOperators in StreamingQueryProgress (and is available as sq.lastProgress.stateOperators for an operator).

| memory used by state a| [[stateMemory]] Estimated memory used by a StateStore (aka stateMemory) after StateStoreSaveExec finished <> (per the StateStoreMetrics of the StateStore) |===

Creating Instance

StateStoreSaveExec takes the following to be created:

Executing Physical Operator

doExecute(): RDD[InternalRow]

doExecute is part of the SparkPlan abstraction (Spark SQL).

Internally, doExecute initializes metrics.

NOTE: doExecute requires that the optional <> is at this point defined (that should have happened when IncrementalExecution had prepared a streaming aggregation for execution).

doExecute executes <> physical operator and creates a StateStoreRDD with storeUpdateFunction that:

  1. Generates an unsafe projection to access the key field (using <> and the output schema of <>).

  2. Branches off per <>: <>, <> and <>.

doExecute throws an UnsupportedOperationException when executed with an invalid <>:

Invalid output mode: [outputMode]

==== [[doExecute-Append]] Append Output Mode

NOTE: Append is the default output mode when not specified explicitly.

NOTE: Append output mode requires that a streaming query defines event-time watermark (e.g. using withWatermark operator) on the event-time column that is used in aggregation (directly or using window standard function).

For Append output mode, doExecute does the following:

  1. Finds late (aggregate) rows from <> physical operator (that have expired per watermark)

  2. Stores the late rows in the state store and increments the <> metric

  3. Gets all the added (late) rows from the state store

  4. Creates an iterator that removes the late rows from the state store when requested the next row and in the end commits the state updates

TIP: Refer to <> for an example of StateStoreSaveExec with Append output mode.

CAUTION: FIXME When is "Filtering state store on:" printed out?


  1. Uses watermarkPredicateForData predicate to exclude matching rows and (like in Complete output mode) stores all the remaining rows in StateStore.

  2. (like in <> output mode) While storing the rows, increments <> metric (for every row) and records the total time in <> metric.

  3. Takes all the rows from StateStore and returns a NextIterator that:

  4. In getNext, finds the first row that matches watermarkPredicateForKeys predicate, removes it from StateStore, and returns it back. + If no row was found, getNext also marks the iterator as finished.

  5. In close, records the time to iterate over all the rows in <> metric, commits the updates to StateStore followed by recording the time in <> metric and recording StateStore metrics.

Complete Output Mode

For Complete output mode, doExecute does the following:

  1. Takes all UnsafeRow rows (from the parent iterator)

  2. Stores the rows by key in the state store eagerly (i.e. all rows that are available in the parent iterator before proceeding)

  3. Commits the state updates

  4. In the end, reads the key-row pairs from the state store and passes the rows along (i.e. to the following physical operator)

The number of keys stored in the state store is recorded in <> metric.

NOTE: In Complete output mode the <> metric is exactly the <> metric.

TIP: Refer to <> for an example of StateStoreSaveExec with Complete output mode.


  1. Stores all rows (as UnsafeRow) in StateStore.

  2. While storing the rows, increments <> metric (for every row) and records the total time in <> metric.

  3. Records 0 in <> metric.

  4. Commits the state updates to StateStore and records the time in <> metric.

  5. Records StateStore metrics

  6. In the end, takes all the rows stored in StateStore and increments numOutputRows metric.

Update Output Mode

For Update output mode, doExecute returns an iterator that filters out late aggregate rows (per watermark if defined) and stores the "young" rows in the state store (one by one, i.e. every next).

With no more rows available, that removes the late rows from the state store (all at once) and commits the state updates.

TIP: Refer to <> for an example of StateStoreSaveExec with Update output mode.


doExecute returns Iterator of rows that uses watermarkPredicateForData predicate to filter out late rows.

In hasNext, when rows are no longer available:

  1. Records the total time to iterate over all the rows in <> metric.

  2. removeKeysOlderThanWatermark and records the time in <> metric.

  3. Commits the updates to StateStore and records the time in <> metric.

  4. Records StateStore metrics

In next, stores a row in StateStore and increments numOutputRows and numUpdatedStateRows metrics.

=== [[shouldRunAnotherBatch]] Checking Out Whether Last Batch Execution Requires Another Non-Data Batch or Not -- shouldRunAnotherBatch Method

shouldRunAnotherBatch(
  newMetadata: OffsetSeqMetadata): Boolean

shouldRunAnotherBatch is positive (true) when all of the following are met:

Otherwise, shouldRunAnotherBatch is negative (false).

shouldRunAnotherBatch is part of the StateStoreWriter abstraction.