FlatMapGroupsWithStateExec Unary Physical Operator¶
FlatMapGroupsWithStateExec is a unary physical operator that represents FlatMapGroupsWithState logical operator at execution time.
A unary physical operator (
UnaryExecNode) is a physical operator with a single child physical operator.
FlatMapGroupsWithStateExec is an
ObjectProducerExec physical operator and so produces a single output object.
FlatMapGroupsWithStateExec is given an OutputMode when created, but it does not seem to be used at all. Check out the question What's the purpose of OutputMode in flatMapGroupsWithState? How/where is it used? on StackOverflow.
FlatMapGroupsWithStateExec takes the following to be created:
- User-defined state function that is applied to every group (of type
(Any, Iterator[Any], LogicalGroupState[Any]) => Iterator[Any])
- Deserializer expression for keys
- Deserializer expression for values
- Grouping attributes (as used for grouping in KeyValueGroupedDataset for
- Data attributes
- Output object attribute (that is the reference to the single object field this operator outputs)
- Optional StatefulOperatorStateInfo
- State encoder (
- State format version
- Optional Batch Processing Time
- Optional Event-Time Watermark
- Child physical operator
Executing Physical Operator¶
doExecute first initializes the metrics (which happens on the driver).
doExecute then requests the child physical operator to execute (and generate an
(only when the GroupStateTimeout is EventTimeTimeout) Filters out late data based on the event-time watermark, i.e. rows from a given
Iterator[InternalRow]that are older than the event-time watermark are excluded from the steps that follow
InputProcessorto create an iterator of a new data processed from the (possibly filtered) iterator
InputProcessorto create an iterator of a timed-out state data
Creates an iterator by concatenating the above iterators (with the new data processed first)
In the end, creates a
CompletionIteratorthat executes a completion function (
completionFunction) after it has successfully iterated through all the elements (i.e. when a client has consumed all the rows). The completion method requests the given
StateStoreto commit changes followed by setting the store-specific metrics
doExecute is part of Spark SQL's
FlatMapGroupsWithStateExec uses the performance metrics of StateStoreWriter.
FlatMapGroupsWithStateExec is a stateful physical operator that can write to a state store (and
MicroBatchExecution requests whether to run another batch or not based on the GroupStateTimeout).
FlatMapGroupsWithStateExec uses the GroupStateTimeout (and possibly the updated metadata) when asked whether to run another batch or not (when
MicroBatchExecution is requested to construct the next streaming micro-batch when requested to run the activated streaming query).
Streaming Event-Time Watermark Support¶
FlatMapGroupsWithStateExec is a physical operator that supports streaming event-time watermark.
FlatMapGroupsWithStateExec is given the optional event time watermark when created.
FlatMapGroupsWithStateStrategy converts FlatMapGroupsWithState unary logical operator to
FlatMapGroupsWithStateExec physical operator with undefined StatefulOperatorStateInfo, batchTimestampMs, and eventTimeWatermark.
The event-time watermark (with the StatefulOperatorStateInfo and the batchTimestampMs) is only defined to the current event-time watermark of the given OffsetSeqMetadata when
IncrementalExecution query execution pipeline is requested to apply the state preparation rule (as part of the preparations rules).
The preparations rules are executed (applied to a physical query plan) at the
executedPlan phase of Structured Query Execution Pipeline to generate an optimized physical query plan ready for execution).
IncrementalExecution is used as the lastExecution of the available streaming query execution engines. It is created in the queryPlanning phase (of the MicroBatchExecution and ContinuousExecution execution engines) based on the current OffsetSeqMetadata.
The optional event-time watermark can only be defined when the state preparation rule is executed which is at the
executedPlan phase of Structured Query Execution Pipeline which is also part of the queryPlanning phase.
The state format version is controlled by spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion internal configuration property.
StateManager is used exclusively when
FlatMapGroupsWithStateExec physical operator is executed for the following:
keyExpressions simply returns the grouping attributes.
keyExpressions is part of the WatermarkSupport abstraction.
Checking Out Whether Last Batch Execution Requires Another Non-Data Batch or Not¶
shouldRunAnotherBatch( newMetadata: OffsetSeqMetadata): Boolean
shouldRunAnotherBatch uses the GroupStateTimeout as follows:
shouldRunAnotherBatch is part of the StateStoreWriter abstraction.
FlatMapGroupsWithStateExecis created (and creates the internal StateManager)
InputProcessoris requested to processTimedOutState
InputProcessor is requested to callFunctionAndUpdateState
ALL logging level for
org.apache.spark.sql.execution.streaming.FlatMapGroupsWithStateExec logger to see what happens inside.
Add the following line to
Refer to Logging.