Skip to content

StreamingSymmetricHashJoinExec Binary Physical Operator

StreamingSymmetricHashJoinExec is a binary physical operator for stream-stream equi-join at execution time.

Note

A binary physical operator (BinaryExecNode) is a physical operator with <> and <> child physical operators.

Learn more about BinaryExecNode (and physical operators in general) in The Internals of Spark SQL online book.

[[supported-join-types]][[joinType]] StreamingSymmetricHashJoinExec supports Inner, LeftOuter, and RightOuter join types (with the <> and the <> keys using the exact same data types).

StreamingSymmetricHashJoinExec is <> exclusively when StreamingJoinStrategy execution planning strategy is requested to plan a logical query plan with a Join logical operator of two streaming queries with equality predicates (EqualTo and EqualNullSafe).

StreamingSymmetricHashJoinExec is given execution-specific configuration (i.e. <>, <>, and <>) when IncrementalExecution is requested to plan a streaming query for execution (and uses the state preparation rule).

StreamingSymmetricHashJoinExec uses two OneSideHashJoiners (for the <> and <> sides of the join) to manage join state when <>.

StreamingSymmetricHashJoinExec is a stateful physical operator that writes to a state store.

Creating Instance

StreamingSymmetricHashJoinExec takes the following to be created:

StreamingSymmetricHashJoinExec initializes the <>.

=== [[output]] Output Schema -- output Method

[source, scala]

output: Seq[Attribute]

NOTE: output is part of the QueryPlan Contract to describe the attributes of (the schema of) the output.

output schema depends on the <>:

  • For Cross and Inner (InnerLike) joins, it is the output schema of the <> and <> operators

  • For LeftOuter joins, it is the output schema of the <> operator with the attributes of the <> operator with nullability flag enabled (true)

  • For RightOuter joins, it is the output schema of the <> operator with the attributes of the <> operator with nullability flag enabled (true)

output throws an IllegalArgumentException for other join types:

[className] should not take [joinType] as the JoinType

=== [[outputPartitioning]] Output Partitioning -- outputPartitioning Method

[source, scala]

outputPartitioning: Partitioning

NOTE: outputPartitioning is part of the SparkPlan Contract to specify how data should be partitioned across different nodes in the cluster.

outputPartitioning depends on the <>:

  • For Cross and Inner (InnerLike) joins, it is a PartitioningCollection of the output partitioning of the <> and <> operators

  • For LeftOuter joins, it is a PartitioningCollection of the output partitioning of the <> operator

  • For RightOuter joins, it is a PartitioningCollection of the output partitioning of the <> operator

outputPartitioning throws an IllegalArgumentException for other join types:

[className] should not take [joinType] as the JoinType

=== [[eventTimeWatermark]] Event-Time Watermark -- eventTimeWatermark Internal Property

[source, scala]

eventTimeWatermark: Option[Long]

When <>, StreamingSymmetricHashJoinExec can be given the event-time watermark of the current streaming micro-batch.

eventTimeWatermark is an optional property that is specified only after IncrementalExecution was requested to apply the state preparation rule to a physical query plan of a streaming query (to optimize (prepare) the physical plan of the streaming query once for ContinuousExecution and every trigger for MicroBatchExecution in the queryPlanning phase).

eventTimeWatermark is used when:

Watermark Predicates for State Removal

stateWatermarkPredicates: JoinStateWatermarkPredicates

When <>, StreamingSymmetricHashJoinExec is given a <> for the <> and <> join sides (using the StreamingSymmetricHashJoinHelper utility).

stateWatermarkPredicates contains the left and right predicates only when IncrementalExecution is requested to apply the state preparation rule to a physical query plan of a streaming query (to optimize (prepare) the physical plan of the streaming query once for ContinuousExecution and every trigger for MicroBatchExecution in the queryPlanning phase).

stateWatermarkPredicates is used when StreamingSymmetricHashJoinExec is requested for the following:

=== [[requiredChildDistribution]] Required Partition Requirements -- requiredChildDistribution Method

[source, scala]

requiredChildDistribution: Seq[Distribution]

[NOTE]

requiredChildDistribution is part of the SparkPlan Contract for the required partition requirements (aka required child distribution) of the input data, i.e. how the output of the children physical operators is split across partitions before this operator can be executed.

Read up on https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-SparkPlan.html[SparkPlan Contract] in https://bit.ly/spark-sql-internals[The Internals of Spark SQL] online book.

requiredChildDistribution returns two HashClusteredDistributions for the <> and <> keys with the required number of partitions based on the StatefulOperatorStateInfo.

[NOTE]

requiredChildDistribution is used exclusively when EnsureRequirements physical query plan optimization is executed (and enforces partition requirements).

Read up on https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-EnsureRequirements.html[EnsureRequirements Physical Query Optimization] in https://bit.ly/spark-sql-internals[The Internals of Spark SQL] online book.

[NOTE]

HashClusteredDistribution becomes HashPartitioning at execution that distributes rows across partitions (generates partition IDs of rows) based on Murmur3Hash of the join expressions (separately for the <> and <> keys) modulo the required number of partitions.

Read up on https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-Distribution-HashClusteredDistribution.html[HashClusteredDistribution] in https://bit.ly/spark-sql-internals[The Internals of Spark SQL] online book.

=== [[metrics]] Performance Metrics (SQLMetrics)

StreamingSymmetricHashJoinExec uses the performance metrics as other stateful physical operators that write to a state store.

StreamingSymmetricHashJoinExec in web UI (Details for Query)

The following table shows how the performance metrics are computed (and so their exact meaning).

[cols="30,70",options="header",width="100%"] |=== | Name (in web UI) | Description

| total time to update rows a| [[allUpdatesTimeMs]] Processing time of all rows

| total time to remove rows a| [[allRemovalsTimeMs]]

| time to commit changes a| [[commitTimeMs]]

| number of output rows a| [[numOutputRows]] Total number of output rows

| number of total state rows a| [[numTotalStateRows]]

| number of updated state rows a| [[numUpdatedStateRows]] Number of updated state rows of the left and right OneSideHashJoiners

| memory used by state a| [[stateMemory]] |===

=== [[shouldRunAnotherBatch]] Checking Out Whether Last Batch Execution Requires Another Non-Data Batch or Not -- shouldRunAnotherBatch Method

[source, scala]

shouldRunAnotherBatch( newMetadata: OffsetSeqMetadata): Boolean


shouldRunAnotherBatch is positive (true) when all of the following are positive:

  • Either the <> or <> join state watermark predicates are defined (in the <>)

  • <> threshold (of the StreamingSymmetricHashJoinExec operator) is defined and the current event-time watermark threshold of the given OffsetSeqMetadata is above (greater than) it, i.e. moved above

shouldRunAnotherBatch is negative (false) otherwise.

shouldRunAnotherBatch is part of the StateStoreWriter abstraction.

Executing Physical Operator

doExecute(): RDD[InternalRow]

doExecute is part of the SparkPlan abstraction (Spark SQL).

doExecute first requests the StreamingQueryManager for the StateStoreCoordinatorRef to the StateStoreCoordinator RPC endpoint (for the driver).

doExecute then uses SymmetricHashJoinStateManager utility to get the names of the state stores for the left and right sides of the streaming join.

In the end, doExecute requests the <> and <> child physical operators to execute (generate an RDD) and then <> with <> (and with the StateStoreCoordinatorRef and the state stores).

=== [[processPartitions]] Processing Partitions of Left and Right Sides of Stream-Stream Join -- processPartitions Internal Method

[source, scala]

processPartitions( leftInputIter: Iterator[InternalRow], rightInputIter: Iterator[InternalRow]): Iterator[InternalRow]


[[processPartitions-updateStartTimeNs]] processPartitions records the current time (as updateStartTimeNs for the <> performance metric in <>).

[[processPartitions-postJoinFilter]] processPartitions creates a new predicate (postJoinFilter) based on the bothSides of the <> if defined or true literal.

[[processPartitions-leftSideJoiner]] processPartitions creates a OneSideHashJoiner for the LeftSide and all other properties for the left-hand join side (leftSideJoiner).

[[processPartitions-rightSideJoiner]] processPartitions creates a OneSideHashJoiner for the RightSide and all other properties for the right-hand join side (rightSideJoiner).

[[processPartitions-leftOutputIter]][[processPartitions-rightOutputIter]] processPartitions requests the OneSideHashJoiner for the left-hand join side to storeAndJoinWithOtherSide with the right-hand side one (that creates a leftOutputIter row iterator) and the OneSideHashJoiner for the right-hand join side to do the same with the left-hand side one (and creates a rightOutputIter row iterator).

[[processPartitions-innerOutputCompletionTimeNs]] processPartitions records the current time (as innerOutputCompletionTimeNs for the <> performance metric in <>).

[[processPartitions-innerOutputIter]] processPartitions creates a CompletionIterator with the left and right output iterators (with the rows of the leftOutputIter first followed by rightOutputIter). When no rows are left to process, the CompletionIterator records the completion time.

[[processPartitions-outputIter]] processPartitions creates a join-specific output Iterator[InternalRow] of the output rows based on the <> (of the StreamingSymmetricHashJoinExec):

  • For Inner joins, processPartitions simply uses the <>

  • For LeftOuter joins, processPartitions...

  • For RightOuter joins, processPartitions...

  • For other joins, processPartitions simply throws an IllegalArgumentException.

[[processPartitions-outputIterWithMetrics]] processPartitions creates an UnsafeProjection for the <> (and the output of the <> and <> child operators) that counts all the rows of the <> (as the <> metric) and generate an output projection.

In the end, processPartitions returns a CompletionIterator with with the <> and <> completion function.

NOTE: processPartitions is used exclusively when StreamingSymmetricHashJoinExec physical operator is requested to <>.

==== [[processPartitions-onOutputCompletion]][[onOutputCompletion]] Calculating Performance Metrics (Output Completion Callback) -- onOutputCompletion Internal Method

[source, scala]

onOutputCompletion: Unit

onOutputCompletion calculates the <> performance metric (that is the time since the <> was executed).

onOutputCompletion adds the time for the inner join to complete (since <> time marker) to the <> performance metric.

onOutputCompletion records the time to remove old state (per the join state watermark predicate for the <> and the <> streaming queries) and adds it to the <> performance metric.

NOTE: onOutputCompletion triggers the old state removal eagerly by iterating over the state rows to be deleted.

onOutputCompletion records the time for the <> and <> OneSideHashJoiners to commit any state changes that becomes the <> performance metric.

onOutputCompletion calculates the <> performance metric (as the number of updated state rows of the <> and <> streaming queries).

onOutputCompletion calculates the <> performance metric (as the sum of the <> in the KeyWithIndexToValueStore of the <> and <> streaming queries).

onOutputCompletion calculates the <> performance metric (as the sum of the <> by the KeyToNumValuesStore and KeyWithIndexToValueStore of the <> and <> streams).

In the end, onOutputCompletion calculates the <>.

Internal Properties

[cols="30m,70",options="header",width="100%"] |=== | Name | Description

| hadoopConfBcast a| [[hadoopConfBcast]] Hadoop Configuration broadcast (to the Spark cluster)

Used exclusively to <>

| joinStateManager a| [[joinStateManager]] SymmetricHashJoinStateManager

Used when OneSideHashJoiner is requested to storeAndJoinWithOtherSide, removeOldState, commitStateAndGetMetrics, and for the values for a given key

| nullLeft a| [[nullLeft]] GenericInternalRow of the size of the output schema of the <>

| nullRight a| [[nullRight]] GenericInternalRow of the size of the output schema of the <>

| storeConf a| [[storeConf]] StateStoreConf

Used exclusively to <>

|===


Last update: 2020-11-28