Skip to content

StateStoreRestoreExec Unary Physical Operator

StateStoreRestoreExec is a unary physical operator that restores (reads) a streaming state from a state store (for the keys from the <> physical operator).

[NOTE]

A unary physical operator (UnaryExecNode) is a physical operator with a single <> physical operator.

Read up on https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-SparkPlan.html[UnaryExecNode] (and physical operators in general) in https://bit.ly/spark-sql-internals[The Internals of Spark SQL] book.

StateStoreRestoreExec is <> exclusively when StatefulAggregationStrategy execution planning strategy is requested to plan a streaming aggregation for execution (Aggregate logical operators in the logical plan of a streaming query).

StateStoreRestoreExec and StatefulAggregationStrategy

The optional <> is initially undefined (i.e. when StateStoreRestoreExec is <>). StateStoreRestoreExec is updated to hold the streaming batch-specific execution property when IncrementalExecution prepares a streaming physical plan for execution (and state preparation rule is executed when StreamExecution MicroBatchExecution.md#runBatch-queryPlanning[plans a streaming query] for a streaming batch).

StateStoreRestoreExec and IncrementalExecution

When <>, StateStoreRestoreExec executes the <> physical operator and creates a StateStoreRDD to map over partitions with storeUpdateFunction that restores the state for the keys in the input rows if available.

[[output]] The output schema of StateStoreRestoreExec is exactly the <>'s output schema.

[[outputPartitioning]] The output partitioning of StateStoreRestoreExec is exactly the <>'s output partitioning.

=== [[metrics]] Performance Metrics (SQLMetrics)

[cols="1m,1,3",options="header",width="100%"] |=== | Key | Name (in UI) | Description

| numOutputRows | number of output rows | [[numOutputRows]] The number of input rows from the <> physical operator (for which StateStoreRestoreExec tried to find the state) |===

.StateStoreRestoreExec in web UI (Details for Query) image::images/StateStoreRestoreExec-webui-query-details.png[align="center"]

Creating Instance

StateStoreRestoreExec takes the following to be created:

=== [[stateManager]] StateStoreRestoreExec and StreamingAggregationStateManager -- stateManager Property

[source, scala]

stateManager: StreamingAggregationStateManager

stateManager is a StreamingAggregationStateManager that is created together with StateStoreRestoreExec.

The StreamingAggregationStateManager is created for the <>, the output schema of the <> physical operator and the <>.

The StreamingAggregationStateManager is used when StateStoreRestoreExec is requested to <> for the following:

Executing Physical Operator (Generating RDD[InternalRow])

doExecute(): RDD[InternalRow]

doExecute is part of the SparkPlan abstraction.

Internally, doExecute executes <> physical operator and creates a StateStoreRDD with storeUpdateFunction that does the following per <> operator's RDD partition:

  1. Generates an unsafe projection to access the key field (using <> and the output schema of <> operator).

  2. For every input row (as InternalRow)

  3. Extracts the key from the row (using the unsafe projection above)

  4. Gets the saved state in StateStore for the key if available (it might not be if the key appeared in the input the first time)

  5. Increments <> metric (that in the end is the number of rows from the <> operator)

  6. Generates collection made up of the current row and possibly the state for the key if available

NOTE: The number of rows from StateStoreRestoreExec is the number of rows from the <> operator with additional rows for the saved state.

NOTE: There is no way in StateStoreRestoreExec to find out how many rows had associated state available in a state store. You would have to use the corresponding StateStoreSaveExec operator's StateStoreSaveExec.md#metrics[metrics] (most likely number of total state rows but that could depend on the output mode).


Last update: 2020-11-28