StateStoreRestoreExec Unary Physical Operator¶
StateStoreRestoreExec
is a unary physical operator that restores (reads) a streaming state from a state store (for the keys from the <
[NOTE]¶
A unary physical operator (UnaryExecNode
) is a physical operator with a single <
Read up on https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-SparkPlan.html[UnaryExecNode] (and physical operators in general) in https://bit.ly/spark-sql-internals[The Internals of Spark SQL] book.¶
StateStoreRestoreExec
is <Aggregate
logical operators in the logical plan of a streaming query).
The optional <StateStoreRestoreExec
is <StateStoreRestoreExec
is updated to hold the streaming batch-specific execution property when IncrementalExecution
prepares a streaming physical plan for execution (and state preparation rule is executed when StreamExecution
MicroBatchExecution.md#runBatch-queryPlanning[plans a streaming query] for a streaming batch).
When <StateStoreRestoreExec
executes the <storeUpdateFunction
that restores the state for the keys in the input rows if available.
[[output]] The output schema of StateStoreRestoreExec
is exactly the <
[[outputPartitioning]] The output partitioning of StateStoreRestoreExec
is exactly the <
=== [[metrics]] Performance Metrics (SQLMetrics)
[cols="1m,1,3",options="header",width="100%"] |=== | Key | Name (in UI) | Description
| numOutputRows | number of output rows | [[numOutputRows]] The number of input rows from the <StateStoreRestoreExec
tried to find the state) |===
.StateStoreRestoreExec in web UI (Details for Query) image::images/StateStoreRestoreExec-webui-query-details.png[align="center"]
Creating Instance¶
StateStoreRestoreExec
takes the following to be created:
- [[keyExpressions]] Key expressions (Catalyst attributes for the grouping keys)
- [[stateInfo]] Optional StatefulOperatorStateInfo (default:
None
) - [[stateFormatVersion]] Version of the state format (based on the spark.sql.streaming.aggregation.stateFormatVersion configuration property)
- [[child]] Child physical operator (
SparkPlan
)
=== [[stateManager]] StateStoreRestoreExec and StreamingAggregationStateManager -- stateManager
Property
[source, scala]¶
stateManager: StreamingAggregationStateManager¶
stateManager
is a StreamingAggregationStateManager that is created together with StateStoreRestoreExec
.
The StreamingAggregationStateManager
is created for the <
The StreamingAggregationStateManager
is used when StateStoreRestoreExec
is requested to <
Executing Physical Operator (Generating RDD[InternalRow])¶
doExecute(): RDD[InternalRow]
doExecute
is part of the SparkPlan
abstraction.
Internally, doExecute
executes <storeUpdateFunction
that does the following per <
-
Generates an unsafe projection to access the key field (using <
> and the output schema of < > operator). -
For every input row (as
InternalRow
) -
Extracts the key from the row (using the unsafe projection above)
-
Gets the saved state in
StateStore
for the key if available (it might not be if the key appeared in the input the first time) -
Increments <
> metric (that in the end is the number of rows from the < > operator) -
Generates collection made up of the current row and possibly the state for the key if available
NOTE: The number of rows from StateStoreRestoreExec
is the number of rows from the <
NOTE: There is no way in StateStoreRestoreExec
to find out how many rows had associated state available in a state store. You would have to use the corresponding StateStoreSaveExec
operator's StateStoreSaveExec.md#metrics[metrics] (most likely number of total state rows
but that could depend on the output mode).