StreamingSymmetricHashJoinExec Binary Physical Operator¶
StreamingSymmetricHashJoinExec
is a binary physical operator for stream-stream equi-join at execution time.
Note
A binary physical operator (BinaryExecNode
) is a physical operator with <
Learn more about BinaryExecNode (and physical operators in general) in The Internals of Spark SQL online book.
[[supported-join-types]][[joinType]] StreamingSymmetricHashJoinExec
supports Inner
, LeftOuter
, and RightOuter
join types (with the <
StreamingSymmetricHashJoinExec
is <Join
logical operator of two streaming queries with equality predicates (EqualTo
and EqualNullSafe
).
StreamingSymmetricHashJoinExec
is given execution-specific configuration (i.e. <IncrementalExecution
is requested to plan a streaming query for execution (and uses the state preparation rule).
StreamingSymmetricHashJoinExec
uses two OneSideHashJoiners (for the <
StreamingSymmetricHashJoinExec
is a stateful physical operator that writes to a state store.
Creating Instance¶
StreamingSymmetricHashJoinExec
takes the following to be created:
- [[leftKeys]] Left keys (Catalyst expressions of the keys on the left side)
- [[rightKeys]] Right keys (Catalyst expressions of the keys on the right side)
- Join type
- [[condition]] Join condition (
JoinConditionSplitPredicates
) - [[stateInfo]] StatefulOperatorStateInfo
- Event-Time Watermark
- Watermark Predicates for State Removal
- [[left]] Physical operator on the left side (
SparkPlan
) - [[right]] Physical operator on the right side (
SparkPlan
)
StreamingSymmetricHashJoinExec
initializes the <
=== [[output]] Output Schema -- output
Method
[source, scala]¶
output: Seq[Attribute]¶
NOTE: output
is part of the QueryPlan
Contract to describe the attributes of (the schema of) the output.
output
schema depends on the <
-
For
Cross
andInner
(InnerLike
) joins, it is the output schema of the <> and < > operators -
For
LeftOuter
joins, it is the output schema of the <> operator with the attributes of the < > operator with nullability
flag enabled (true
) -
For
RightOuter
joins, it is the output schema of the <> operator with the attributes of the < > operator with nullability
flag enabled (true
)
output
throws an IllegalArgumentException
for other join types:
[className] should not take [joinType] as the JoinType
=== [[outputPartitioning]] Output Partitioning -- outputPartitioning
Method
[source, scala]¶
outputPartitioning: Partitioning¶
NOTE: outputPartitioning
is part of the SparkPlan
Contract to specify how data should be partitioned across different nodes in the cluster.
outputPartitioning
depends on the <
-
For
Cross
andInner
(InnerLike
) joins, it is aPartitioningCollection
of the output partitioning of the <> and < > operators -
For
LeftOuter
joins, it is aPartitioningCollection
of the output partitioning of the <> operator -
For
RightOuter
joins, it is aPartitioningCollection
of the output partitioning of the <> operator
outputPartitioning
throws an IllegalArgumentException
for other join types:
[className] should not take [joinType] as the JoinType
=== [[eventTimeWatermark]] Event-Time Watermark -- eventTimeWatermark
Internal Property
[source, scala]¶
eventTimeWatermark: Option[Long]¶
When <StreamingSymmetricHashJoinExec
can be given the event-time watermark of the current streaming micro-batch.
eventTimeWatermark
is an optional property that is specified only after IncrementalExecution was requested to apply the state preparation rule to a physical query plan of a streaming query (to optimize (prepare) the physical plan of the streaming query once for ContinuousExecution and every trigger for MicroBatchExecution in the queryPlanning phase).
eventTimeWatermark
is used when:
StreamingSymmetricHashJoinExec
is requested to check out whether the last batch execution requires another non-data batch or notOneSideHashJoiner
is requested to storeAndJoinWithOtherSide
Watermark Predicates for State Removal¶
stateWatermarkPredicates: JoinStateWatermarkPredicates
When <StreamingSymmetricHashJoinExec
is given a <
stateWatermarkPredicates
contains the left and right predicates only when IncrementalExecution is requested to apply the state preparation rule to a physical query plan of a streaming query (to optimize (prepare) the physical plan of the streaming query once for ContinuousExecution and every trigger for MicroBatchExecution in the queryPlanning phase).
stateWatermarkPredicates
is used when StreamingSymmetricHashJoinExec
is requested for the following:
-
Process partitions of the left and right sides of the stream-stream join (and creating OneSideHashJoiners)
-
Checking out whether the last batch execution requires another non-data batch or not
=== [[requiredChildDistribution]] Required Partition Requirements -- requiredChildDistribution
Method
[source, scala]¶
requiredChildDistribution: Seq[Distribution]¶
[NOTE]¶
requiredChildDistribution
is part of the SparkPlan
Contract for the required partition requirements (aka required child distribution) of the input data, i.e. how the output of the children physical operators is split across partitions before this operator can be executed.
Read up on https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-SparkPlan.html[SparkPlan Contract] in https://bit.ly/spark-sql-internals[The Internals of Spark SQL] online book.¶
requiredChildDistribution
returns two HashClusteredDistributions
for the <
[NOTE]¶
requiredChildDistribution
is used exclusively when EnsureRequirements
physical query plan optimization is executed (and enforces partition requirements).
Read up on https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-EnsureRequirements.html[EnsureRequirements Physical Query Optimization] in https://bit.ly/spark-sql-internals[The Internals of Spark SQL] online book.¶
[NOTE]¶
HashClusteredDistribution
becomes HashPartitioning
at execution that distributes rows across partitions (generates partition IDs of rows) based on Murmur3Hash
of the join expressions (separately for the <
Read up on https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-Distribution-HashClusteredDistribution.html[HashClusteredDistribution] in https://bit.ly/spark-sql-internals[The Internals of Spark SQL] online book.¶
=== [[metrics]] Performance Metrics (SQLMetrics)
StreamingSymmetricHashJoinExec
uses the performance metrics as other stateful physical operators that write to a state store.
The following table shows how the performance metrics are computed (and so their exact meaning).
[cols="30,70",options="header",width="100%"] |=== | Name (in web UI) | Description
| total time to update rows a| [[allUpdatesTimeMs]] Processing time of all rows
| total time to remove rows a| [[allRemovalsTimeMs]]
| time to commit changes a| [[commitTimeMs]]
| number of output rows a| [[numOutputRows]] Total number of output rows
| number of total state rows a| [[numTotalStateRows]]
| number of updated state rows a| [[numUpdatedStateRows]] Number of updated state rows of the left and right OneSideHashJoiners
| memory used by state a| [[stateMemory]] |===
=== [[shouldRunAnotherBatch]] Checking Out Whether Last Batch Execution Requires Another Non-Data Batch or Not -- shouldRunAnotherBatch
Method
[source, scala]¶
shouldRunAnotherBatch( newMetadata: OffsetSeqMetadata): Boolean
shouldRunAnotherBatch
is positive (true
) when all of the following are positive:
-
Either the <
> or < > join state watermark predicates are defined (in the < >) -
<
> threshold (of the StreamingSymmetricHashJoinExec
operator) is defined and the current event-time watermark threshold of the givenOffsetSeqMetadata
is above (greater than) it, i.e. moved above
shouldRunAnotherBatch
is negative (false
) otherwise.
shouldRunAnotherBatch
is part of the StateStoreWriter abstraction.
Executing Physical Operator¶
doExecute(): RDD[InternalRow]
doExecute
is part of the SparkPlan
abstraction (Spark SQL).
doExecute
first requests the StreamingQueryManager
for the StateStoreCoordinatorRef to the StateStoreCoordinator
RPC endpoint (for the driver).
doExecute
then uses SymmetricHashJoinStateManager
utility to get the names of the state stores for the left and right sides of the streaming join.
In the end, doExecute
requests the <StateStoreCoordinatorRef
and the state stores).
=== [[processPartitions]] Processing Partitions of Left and Right Sides of Stream-Stream Join -- processPartitions
Internal Method
[source, scala]¶
processPartitions( leftInputIter: Iterator[InternalRow], rightInputIter: Iterator[InternalRow]): Iterator[InternalRow]
[[processPartitions-updateStartTimeNs]] processPartitions
records the current time (as updateStartTimeNs for the <
[[processPartitions-postJoinFilter]] processPartitions
creates a new predicate (postJoinFilter) based on the bothSides
of the <true
literal.
[[processPartitions-leftSideJoiner]] processPartitions
creates a OneSideHashJoiner for the LeftSide and all other properties for the left-hand join side (leftSideJoiner
).
[[processPartitions-rightSideJoiner]] processPartitions
creates a OneSideHashJoiner for the RightSide and all other properties for the right-hand join side (rightSideJoiner
).
[[processPartitions-leftOutputIter]][[processPartitions-rightOutputIter]] processPartitions
requests the OneSideHashJoiner
for the left-hand join side to storeAndJoinWithOtherSide with the right-hand side one (that creates a leftOutputIter
row iterator) and the OneSideHashJoiner
for the right-hand join side to do the same with the left-hand side one (and creates a rightOutputIter
row iterator).
[[processPartitions-innerOutputCompletionTimeNs]] processPartitions
records the current time (as innerOutputCompletionTimeNs for the <
[[processPartitions-innerOutputIter]] processPartitions
creates a CompletionIterator
with the left and right output iterators (with the rows of the leftOutputIter
first followed by rightOutputIter
). When no rows are left to process, the CompletionIterator
records the completion time.
[[processPartitions-outputIter]] processPartitions
creates a join-specific output Iterator[InternalRow]
of the output rows based on the <StreamingSymmetricHashJoinExec
):
-
For
Inner
joins,processPartitions
simply uses the <> -
For
LeftOuter
joins,processPartitions
... -
For
RightOuter
joins,processPartitions
... -
For other joins,
processPartitions
simply throws anIllegalArgumentException
.
[[processPartitions-outputIterWithMetrics]] processPartitions
creates an UnsafeProjection
for the <
In the end, processPartitions
returns a CompletionIterator
with with the <
NOTE: processPartitions
is used exclusively when StreamingSymmetricHashJoinExec
physical operator is requested to <
==== [[processPartitions-onOutputCompletion]][[onOutputCompletion]] Calculating Performance Metrics (Output Completion Callback) -- onOutputCompletion
Internal Method
[source, scala]¶
onOutputCompletion: Unit¶
onOutputCompletion
calculates the <
onOutputCompletion
adds the time for the inner join to complete (since <
onOutputCompletion
records the time to remove old state (per the join state watermark predicate for the <
NOTE: onOutputCompletion
triggers the old state removal eagerly by iterating over the state rows to be deleted.
onOutputCompletion
records the time for the <OneSideHashJoiners
to commit any state changes that becomes the <
onOutputCompletion
calculates the <
onOutputCompletion
calculates the <
onOutputCompletion
calculates the <
In the end, onOutputCompletion
calculates the <
Internal Properties¶
[cols="30m,70",options="header",width="100%"] |=== | Name | Description
| hadoopConfBcast a| [[hadoopConfBcast]] Hadoop Configuration broadcast (to the Spark cluster)
Used exclusively to <
| joinStateManager a| [[joinStateManager]] SymmetricHashJoinStateManager
Used when OneSideHashJoiner
is requested to storeAndJoinWithOtherSide, removeOldState, commitStateAndGetMetrics, and for the values for a given key
| nullLeft a| [[nullLeft]] GenericInternalRow
of the size of the output schema of the <
| nullRight a| [[nullRight]] GenericInternalRow
of the size of the output schema of the <
| storeConf a| [[storeConf]] StateStoreConf
Used exclusively to <
|===