Skip to content

ShuffledHashJoinExec Binary Physical Operator for Shuffled Hash Join

ShuffledHashJoinExec is a SparkPlan.md#BinaryExecNode[binary physical operator] to <> a shuffled hash join.

ShuffledHashJoinExec performs a hash join of two child relations by first shuffling the data using the join keys.

ShuffledHashJoinExec is <> to represent a Join.md[Join] logical operator when JoinSelection execution planning strategy is executed and spark.sql.join.preferSortMergeJoin configuration property is off.

Note

JoinSelection execution planning strategy (and so Spark Planner) prefers sort merge join over shuffled hash join based on spark.sql.join.preferSortMergeJoin configuration property.

In other words, you will hardly see shuffled hash joins in your structured queries unless you turn spark.sql.join.preferSortMergeJoin on.

Beside the spark.sql.join.preferSortMergeJoin configuration property one of the following requirements has to hold:

[TIP]

Enable DEBUG logging level for org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys logger to see the join condition and the left and right join keys.

[source, scala]

// Use ShuffledHashJoinExec's selection requirements // 1. Disable auto broadcasting // JoinSelection (canBuildLocalHashMap specifically) requires that // plan.stats.sizeInBytes < autoBroadcastJoinThreshold * numShufflePartitions // That gives that autoBroadcastJoinThreshold has to be at least 1 spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 1)

scala> println(spark.sessionState.conf.numShufflePartitions) 200

// 2. Disable preference on SortMergeJoin spark.conf.set("spark.sql.join.preferSortMergeJoin", false)

val dataset = Seq( (0, "playing"), (1, "with"), (2, "ShuffledHashJoinExec") ).toDF("id", "token") // Self LEFT SEMI join val q = dataset.join(dataset, Seq("id"), "leftsemi")

val sizeInBytes = q.queryExecution.optimizedPlan.stats.sizeInBytes scala> println(sizeInBytes) 72

// 3. canBuildLeft is on for leftsemi

// the right join side is at least three times smaller than the left side // Even though it's a self LEFT SEMI join there are two different join sides // How is that possible?

// BINGO! ShuffledHashJoin is here!

// Enable DEBUG logging level import org.apache.log4j.{Level, Logger} val logger = "org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys" Logger.getLogger(logger).setLevel(Level.DEBUG)

// ShuffledHashJoin with BuildRight scala> q.explain == Physical Plan == ShuffledHashJoin [id#37], [id#41], LeftSemi, BuildRight :- Exchange hashpartitioning(id#37, 200) : +- LocalTableScan [id#37, token#38] +- Exchange hashpartitioning(id#41, 200) +- LocalTableScan [id#41]

scala> println(q.queryExecution.executedPlan.numberedTreeString) 00 ShuffledHashJoin [id#37], [id#41], LeftSemi, BuildRight 01 :- Exchange hashpartitioning(id#37, 200) 02 : +- LocalTableScan [id#37, token#38] 03 +- Exchange hashpartitioning(id#41, 200) 04 +- LocalTableScan [id#41]


[[metrics]] .ShuffledHashJoinExec's Performance Metrics [cols="1,2,2",options="header",width="100%"] |=== | Key | Name (in web UI) | Description

| [[avgHashProbe]] avgHashProbe | avg hash probe |

| [[buildDataSize]] buildDataSize | data size of build side |

| [[buildTime]] buildTime | time to build hash map |

| [[numOutputRows]] numOutputRows | number of output rows | |===

.ShuffledHashJoinExec in web UI (Details for Query) image::images/spark-sql-ShuffledHashJoinExec-webui-query-details.png[align="center"]

[[requiredChildDistribution]] .ShuffledHashJoinExec's Required Child Output Distributions [cols="1,1",options="header",width="100%"] |=== | Left Child | Right Child

| HashClusteredDistribution (per <>) | HashClusteredDistribution (per <>) |===

=== [[doExecute]] Executing Physical Operator (Generating RDD[InternalRow]) -- doExecute Method

[source, scala]

doExecute(): RDD[InternalRow]

doExecute is part of the SparkPlan abstraction.

doExecute requests streamedPlan physical operator to SparkPlan.md#execute[execute] (and generate a RDD[InternalRow]).

doExecute requests buildPlan physical operator to SparkPlan.md#execute[execute] (and generate a RDD[InternalRow]).

doExecute requests streamedPlan physical operator's RDD[InternalRow] to zip partition-wise with buildPlan physical operator's RDD[InternalRow] (using RDD.zipPartitions method with preservesPartitioning flag disabled).

[NOTE]

doExecute generates a ZippedPartitionsRDD2 that you can see in a RDD lineage.

[source, scala]

scala> println(q.queryExecution.toRdd.toDebugString) (200) ZippedPartitionsRDD2[8] at toRdd at :26 [] | ShuffledRowRDD[3] at toRdd at :26 [] +-(3) MapPartitionsRDD[2] at toRdd at :26 [] | MapPartitionsRDD[1] at toRdd at :26 [] | ParallelCollectionRDD[0] at toRdd at :26 [] | ShuffledRowRDD[7] at toRdd at :26 [] +-(3) MapPartitionsRDD[6] at toRdd at :26 [] | MapPartitionsRDD[5] at toRdd at :26 [] | ParallelCollectionRDD[4] at toRdd at :26 []


====

doExecute uses RDD.zipPartitions with a function applied to zipped partitions that takes two iterators of rows from the partitions of streamedPlan and buildPlan.

For every partition (and pairs of rows from the RDD), the function <> on the partition of buildPlan and join the streamedPlan partition iterator, the HashedRelation, <> and <> SQL metrics.

=== [[buildHashedRelation]] Building HashedRelation for Internal Rows -- buildHashedRelation Internal Method

[source, scala]

buildHashedRelation(iter: Iterator[InternalRow]): HashedRelation

buildHashedRelation creates a HashedRelation (for the input iter iterator of InternalRows, buildKeys and the current TaskMemoryManager).

NOTE: buildHashedRelation uses TaskContext.get() to access the current TaskContext that in turn is used to access the TaskMemoryManager.

buildHashedRelation records the time to create the HashedRelation as <>.

buildHashedRelation requests the HashedRelation for estimatedSize that is recorded as <>.

NOTE: buildHashedRelation is used exclusively when ShuffledHashJoinExec is requested to <> (when streamedPlan and buildPlan physical operators are executed and their RDDs zipped partition-wise using RDD.zipPartitions method).

=== [[creating-instance]] Creating ShuffledHashJoinExec Instance

ShuffledHashJoinExec takes the following when created:

  • [[leftKeys]] Left join key expressions/Expression.md[expressions]
  • [[rightKeys]] Right join key expressions/Expression.md[expressions]
  • [[joinType]] spark-sql-joins.md#join-types[Join type]
  • [[buildSide]] BuildSide
  • [[condition]] Optional join condition expressions/Expression.md[expression]
  • [[left]] Left SparkPlan.md[physical operator]
  • [[right]] Right SparkPlan.md[physical operator]

Last update: 2020-11-07