Skip to content

JoinSelection Execution Planning Strategy

JoinSelection is an execution planning strategy that SparkPlanner uses to <> (as described by <>).

JoinSelection firstly <> join physical operators per whether join keys are used or not. When join keys are used, JoinSelection considers <>, <> or <> operators. Without join keys, JoinSelection considers <> or <>.

[[join-selection-requirements]] .Join Physical Operator Selection Requirements (in the order of preference) [cols="1,3",options="header",width="100%"] |=== | Physical Join Operator | Selection Requirements

| BroadcastHashJoinExec.md[BroadcastHashJoinExec] a| [[BroadcastHashJoinExec]] There are join keys and one of the following holds:

  • Join type is spark-sql-joins.md#CROSS[CROSS], spark-sql-joins.md#INNER[INNER], spark-sql-joins.md#LEFT_ANTI[LEFT ANTI], spark-sql-joins.md#LEFT_OUTER[LEFT OUTER], spark-sql-joins.md#LEFT_SEMI[LEFT SEMI] or spark-sql-joins.md#ExistenceJoin[ExistenceJoin] (i.e. <> for the input joinType is positive) and right join side <>

  • Join type is spark-sql-joins.md#CROSS[CROSS], spark-sql-joins.md#INNER[INNER] or spark-sql-joins.md#RIGHT_OUTER[RIGHT OUTER] (i.e. <> for the input joinType is positive) and left join side <>

| ShuffledHashJoinExec.md[ShuffledHashJoinExec] a| [[ShuffledHashJoinExec]] There are join keys and one of the following holds:

  • spark.sql.join.preferSortMergeJoin is disabled, the join type is CROSS, INNER, LEFT ANTI, LEFT OUTER, LEFT SEMI or ExistenceJoin (i.e. <> for the input joinType is positive), <> for right join side and finally right join side is <> than left side

  • spark.sql.join.preferSortMergeJoin is disabled, the join type is spark-sql-joins.md#CROSS[CROSS], spark-sql-joins.md#INNER[INNER] or spark-sql-joins.md#RIGHT_OUTER[RIGHT OUTER] (i.e. <> for the input joinType is positive), <> for left join side and finally left join side is <> than right

  • Left join keys are not SortMergeJoinExec.md#orderable[orderable]

| SortMergeJoinExec.md[SortMergeJoinExec] | [[SortMergeJoinExec]] Left join keys are SortMergeJoinExec.md#orderable[orderable]

| BroadcastNestedLoopJoinExec.md[BroadcastNestedLoopJoinExec] a| [[BroadcastNestedLoopJoinExec]] There are no join keys and one of the following holds:

  • Join type is spark-sql-joins.md#CROSS[CROSS], spark-sql-joins.md#INNER[INNER], spark-sql-joins.md#LEFT_ANTI[LEFT ANTI], spark-sql-joins.md#LEFT_OUTER[LEFT OUTER], spark-sql-joins.md#LEFT_SEMI[LEFT SEMI] or spark-sql-joins.md#ExistenceJoin[ExistenceJoin] (i.e. <> for the input joinType is positive) and right join side <>

  • Join type is spark-sql-joins.md#CROSS[CROSS], spark-sql-joins.md#INNER[INNER] or spark-sql-joins.md#RIGHT_OUTER[RIGHT OUTER] (i.e. <> for the input joinType is positive) and left join side <>

| CartesianProductExec.md[CartesianProductExec] | [[CartesianProductExec]] There are no join keys and spark-sql-joins.md#join-types[join type] is spark-sql-joins.md#CROSS[CROSS] or spark-sql-joins.md#INNER[INNER]

| BroadcastNestedLoopJoinExec.md[BroadcastNestedLoopJoinExec] | No other join operator has matched already |===

NOTE: JoinSelection uses spark-sql-ExtractEquiJoinKeys.md[ExtractEquiJoinKeys] Scala extractor to destructure a Join logical operator.

=== [[muchSmaller]] Is Left-Side Plan At Least 3 Times Smaller Than Right-Side Plan? -- muchSmaller Internal Condition

[source, scala]

muchSmaller(a: LogicalPlan, b: LogicalPlan): Boolean

muchSmaller condition holds when plan a is at least 3 times smaller than plan b.

Internally, muchSmaller spark-sql-LogicalPlan.md#stats[calculates the estimated statistics for the input logical plans] and compares their physical size in bytes (sizeInBytes).

NOTE: muchSmaller is used when JoinSelection checks <> for ShuffledHashJoinExec physical operator.

=== [[canBuildLocalHashMap]] canBuildLocalHashMap Internal Condition

[source, scala]

canBuildLocalHashMap(plan: LogicalPlan): Boolean

canBuildLocalHashMap condition holds for the logical plan whose single partition is small enough to build a hash table (i.e. spark.sql.autoBroadcastJoinThreshold multiplied by spark.sql.shuffle.partitions).

Internally, canBuildLocalHashMap spark-sql-LogicalPlan.md#stats[calculates the estimated statistics for the input logical plans] and takes the size in bytes (sizeInBytes).

NOTE: canBuildLocalHashMap is used when JoinSelection checks <> for ShuffledHashJoinExec physical operator.

=== [[canBroadcast]] Can Logical Plan Be Broadcast? -- canBroadcast Internal Condition

[source, scala]

canBroadcast(plan: LogicalPlan): Boolean

canBroadcast is enabled (true) when the size of the output of the input logical plan (aka sizeInBytes) is less than spark.sql.autoBroadcastJoinThreshold configuration property.

canBroadcast uses the total size statistic from Statistics of a logical operator.

canBroadcast is used when JoinSelection is requested to canBroadcastBySizes and selects the build side per join type and total size statistic of join sides.

=== [[canBroadcastByHints]] canBroadcastByHints Internal Method

[source, scala]

canBroadcastByHints(joinType: JoinType, left: LogicalPlan, right: LogicalPlan): Boolean

canBroadcastByHints is positive (i.e. true) when either condition holds:

. Join type is spark-sql-joins.md#CROSS[CROSS], spark-sql-joins.md#INNER[INNER] or spark-sql-joins.md#RIGHT_OUTER[RIGHT OUTER] (i.e. <> for the input joinType is positive) and left operator's broadcast hint flag is on

. Join type is spark-sql-joins.md#CROSS[CROSS], spark-sql-joins.md#INNER[INNER], spark-sql-joins.md#LEFT_ANTI[LEFT ANTI], spark-sql-joins.md#LEFT_OUTER[LEFT OUTER], spark-sql-joins.md#LEFT_SEMI[LEFT SEMI] or spark-sql-joins.md#ExistenceJoin[ExistenceJoin] (i.e. <> for the input joinType is positive) and right operator's broadcast hint flag is on

Otherwise, canBroadcastByHints is negative (i.e. false).

NOTE: canBroadcastByHints is used when JoinSelection is requested to <> (and considers a BroadcastHashJoinExec.md[BroadcastHashJoinExec] or a BroadcastNestedLoopJoinExec.md[BroadcastNestedLoopJoinExec] physical operator).

=== [[broadcastSideByHints]] Selecting Build Side Per Join Type and Broadcast Hints -- broadcastSideByHints Internal Method

[source, scala]

broadcastSideByHints(joinType: JoinType, left: LogicalPlan, right: LogicalPlan): BuildSide

broadcastSideByHints computes buildLeft and buildRight flags:

  • buildLeft flag is positive (i.e. true) when the join type is spark-sql-joins.md#CROSS[CROSS], spark-sql-joins.md#INNER[INNER] or spark-sql-joins.md#RIGHT_OUTER[RIGHT OUTER] (i.e. <> for the input joinType is positive) and the left operator's broadcast hint flag is positive

  • buildRight flag is positive (i.e. true) when the join type is spark-sql-joins.md#CROSS[CROSS], spark-sql-joins.md#INNER[INNER], spark-sql-joins.md#LEFT_ANTI[LEFT ANTI], spark-sql-joins.md#LEFT_OUTER[LEFT OUTER], spark-sql-joins.md#LEFT_SEMI[LEFT SEMI] or spark-sql-joins.md#ExistenceJoin[ExistenceJoin] (i.e. <> for the input joinType is positive) and the right operator's broadcast hint flag is positive

In the end, broadcastSideByHints <>.

NOTE: broadcastSideByHints is used when JoinSelection is requested to <> (and considers a BroadcastHashJoinExec.md[BroadcastHashJoinExec] or a BroadcastNestedLoopJoinExec.md[BroadcastNestedLoopJoinExec] physical operator).

=== [[broadcastSide]] Choosing Join Side to Broadcast -- broadcastSide Internal Method

[source, scala]

broadcastSide( canBuildLeft: Boolean, canBuildRight: Boolean, left: LogicalPlan, right: LogicalPlan): BuildSide


broadcastSide gives the smaller side (BuildRight or BuildLeft) per total size when canBuildLeft and canBuildRight are both positive (i.e. true).

broadcastSide gives BuildRight when canBuildRight is positive.

broadcastSide gives BuildLeft when canBuildLeft is positive.

When all the above conditions are not met, broadcastSide gives the smaller side (BuildRight or BuildLeft) per total size (similarly to the first case when canBuildLeft and canBuildRight are both positive).

NOTE: broadcastSide is used when JoinSelection is requested to <>, <>, and <> (and considers a BroadcastNestedLoopJoinExec.md[BroadcastNestedLoopJoinExec] physical operator).

=== [[canBuildLeft]] Checking If Join Type Allows For Left Join Side As Build Side -- canBuildLeft Internal Condition

[source, scala]

canBuildLeft(joinType: JoinType): Boolean

canBuildLeft is positive (i.e. true) for spark-sql-joins.md#CROSS[CROSS], spark-sql-joins.md#INNER[INNER] and spark-sql-joins.md#RIGHT_OUTER[RIGHT OUTER] join types. Otherwise, canBuildLeft is negative (i.e. false).

NOTE: canBuildLeft is used when JoinSelection is requested to <>, <>, <>, <> and <> (when selecting a <> physical operator).

=== [[canBuildRight]] Checking If Join Type Allows For Right Join Side As Build Side -- canBuildRight Internal Condition

[source, scala]

canBuildRight(joinType: JoinType): Boolean

canBuildRight is positive (i.e. true) if the input join type is one of the following:

  • spark-sql-joins.md#CROSS[CROSS], spark-sql-joins.md#INNER[INNER], spark-sql-joins.md#LEFT_ANTI[LEFT ANTI], spark-sql-joins.md#LEFT_OUTER[LEFT OUTER], spark-sql-joins.md#LEFT_SEMI[LEFT SEMI] or spark-sql-joins.md#ExistenceJoin[ExistenceJoin]

Otherwise, canBuildRight is negative (i.e. false).

NOTE: canBuildRight is used when JoinSelection is requested to <>, <>, <>, <> and <> (when selecting a <> physical operator).

=== [[canBroadcastBySizes]] Checking If Join Type and Total Size Statistic of Join Sides Allow for Broadcast Join -- canBroadcastBySizes Internal Method

[source, scala]

canBroadcastBySizes(joinType: JoinType, left: LogicalPlan, right: LogicalPlan): Boolean

canBroadcastBySizes is positive (i.e. true) when either condition holds:

. Join type is spark-sql-joins.md#CROSS[CROSS], spark-sql-joins.md#INNER[INNER] or spark-sql-joins.md#RIGHT_OUTER[RIGHT OUTER] (i.e. <> for the input joinType is positive) and left operator <>

. Join type is spark-sql-joins.md#CROSS[CROSS], spark-sql-joins.md#INNER[INNER], spark-sql-joins.md#LEFT_ANTI[LEFT ANTI], spark-sql-joins.md#LEFT_OUTER[LEFT OUTER], spark-sql-joins.md#LEFT_SEMI[LEFT SEMI] or spark-sql-joins.md#ExistenceJoin[ExistenceJoin] (i.e. <> for the input joinType is positive) and right operator <>

Otherwise, canBroadcastByHints is negative (i.e. false).

NOTE: canBroadcastByHints is used when JoinSelection is requested to <> (and considers a BroadcastHashJoinExec.md[BroadcastHashJoinExec] or a BroadcastNestedLoopJoinExec.md[BroadcastNestedLoopJoinExec] physical operator).

=== [[broadcastSideBySizes]] Selecting Build Side Per Join Type and Total Size Statistic of Join Sides -- broadcastSideBySizes Internal Method

[source, scala]

broadcastSideBySizes(joinType: JoinType, left: LogicalPlan, right: LogicalPlan): BuildSide

broadcastSideBySizes computes buildLeft and buildRight flags:

  • buildLeft flag is positive (i.e. true) when the join type is spark-sql-joins.md#CROSS[CROSS], spark-sql-joins.md#INNER[INNER] or spark-sql-joins.md#RIGHT_OUTER[RIGHT OUTER] (i.e. <> for the input joinType is positive) and left operator <>

  • buildRight flag is positive (i.e. true) when the join type is spark-sql-joins.md#CROSS[CROSS], spark-sql-joins.md#INNER[INNER], spark-sql-joins.md#LEFT_ANTI[LEFT ANTI], spark-sql-joins.md#LEFT_OUTER[LEFT OUTER], spark-sql-joins.md#LEFT_SEMI[LEFT SEMI] or spark-sql-joins.md#ExistenceJoin[ExistenceJoin] (i.e. <> for the input joinType is positive) and right operator <>

In the end, broadcastSideByHints <>.

NOTE: broadcastSideByHints is used when JoinSelection is requested to <> (and considers a BroadcastHashJoinExec.md[BroadcastHashJoinExec] or a BroadcastNestedLoopJoinExec.md[BroadcastNestedLoopJoinExec] physical operator).

=== [[apply]] Applying JoinSelection Strategy to Logical Plan (Executing JoinSelection) -- apply Method

[source, scala]

apply(plan: LogicalPlan): Seq[SparkPlan]

apply is part of GenericStrategy abstraction.

apply uses spark-sql-ExtractEquiJoinKeys.md[ExtractEquiJoinKeys] Scala extractor to destructure the input logical plan.

==== [[apply-BroadcastHashJoinExec]] Considering BroadcastHashJoinExec Physical Operator

apply gives a BroadcastHashJoinExec.md#creating-instance[BroadcastHashJoinExec] physical operator if the plan <> (for the join type and left or right side of the join). apply <>.

apply gives a BroadcastHashJoinExec.md#creating-instance[BroadcastHashJoinExec] physical operator if the plan <> (for the join type and left or right side of the join). apply <>.

==== [[apply-ShuffledHashJoinExec]] Considering ShuffledHashJoinExec Physical Operator

apply gives...FIXME

==== [[apply-SortMergeJoinExec]] Considering SortMergeJoinExec Physical Operator

apply gives...FIXME

==== [[apply-BroadcastNestedLoopJoinExec]] Considering BroadcastNestedLoopJoinExec Physical Operator

apply gives...FIXME

==== [[apply-CartesianProductExec]] Considering CartesianProductExec Physical Operator

apply gives...FIXME


Last update: 2020-11-07