JoinSelection Execution Planning Strategy¶
JoinSelection
is an execution planning strategy that SparkPlanner uses to <
JoinSelection
firstly <JoinSelection
considers <JoinSelection
considers <
[[join-selection-requirements]] .Join Physical Operator Selection Requirements (in the order of preference) [cols="1,3",options="header",width="100%"] |=== | Physical Join Operator | Selection Requirements
| BroadcastHashJoinExec.md[BroadcastHashJoinExec] a| [[BroadcastHashJoinExec]] There are join keys and one of the following holds:
-
Join type is spark-sql-joins.md#CROSS[CROSS], spark-sql-joins.md#INNER[INNER], spark-sql-joins.md#LEFT_ANTI[LEFT ANTI], spark-sql-joins.md#LEFT_OUTER[LEFT OUTER], spark-sql-joins.md#LEFT_SEMI[LEFT SEMI] or spark-sql-joins.md#ExistenceJoin[ExistenceJoin] (i.e. <
> for the input joinType
is positive) and right join side <> -
Join type is spark-sql-joins.md#CROSS[CROSS], spark-sql-joins.md#INNER[INNER] or spark-sql-joins.md#RIGHT_OUTER[RIGHT OUTER] (i.e. <
> for the input joinType
is positive) and left join side <>
| ShuffledHashJoinExec.md[ShuffledHashJoinExec] a| [[ShuffledHashJoinExec]] There are join keys and one of the following holds:
-
spark.sql.join.preferSortMergeJoin is disabled, the join type is CROSS, INNER, LEFT ANTI, LEFT OUTER, LEFT SEMI or ExistenceJoin (i.e. <
> for the input joinType
is positive), <> for right join side and finally right join side is < > than left side -
spark.sql.join.preferSortMergeJoin is disabled, the join type is spark-sql-joins.md#CROSS[CROSS], spark-sql-joins.md#INNER[INNER] or spark-sql-joins.md#RIGHT_OUTER[RIGHT OUTER] (i.e. <
> for the input joinType
is positive), <> for left join side and finally left join side is < > than right -
Left join keys are not SortMergeJoinExec.md#orderable[orderable]
| SortMergeJoinExec.md[SortMergeJoinExec] | [[SortMergeJoinExec]] Left join keys are SortMergeJoinExec.md#orderable[orderable]
| BroadcastNestedLoopJoinExec.md[BroadcastNestedLoopJoinExec] a| [[BroadcastNestedLoopJoinExec]] There are no join keys and one of the following holds:
-
Join type is spark-sql-joins.md#CROSS[CROSS], spark-sql-joins.md#INNER[INNER], spark-sql-joins.md#LEFT_ANTI[LEFT ANTI], spark-sql-joins.md#LEFT_OUTER[LEFT OUTER], spark-sql-joins.md#LEFT_SEMI[LEFT SEMI] or spark-sql-joins.md#ExistenceJoin[ExistenceJoin] (i.e. <
> for the input joinType
is positive) and right join side <> -
Join type is spark-sql-joins.md#CROSS[CROSS], spark-sql-joins.md#INNER[INNER] or spark-sql-joins.md#RIGHT_OUTER[RIGHT OUTER] (i.e. <
> for the input joinType
is positive) and left join side <>
| CartesianProductExec.md[CartesianProductExec] | [[CartesianProductExec]] There are no join keys and spark-sql-joins.md#join-types[join type] is spark-sql-joins.md#CROSS[CROSS] or spark-sql-joins.md#INNER[INNER]
| BroadcastNestedLoopJoinExec.md[BroadcastNestedLoopJoinExec] | No other join operator has matched already |===
NOTE: JoinSelection
uses spark-sql-ExtractEquiJoinKeys.md[ExtractEquiJoinKeys] Scala extractor to destructure a Join
logical operator.
=== [[muchSmaller]] Is Left-Side Plan At Least 3 Times Smaller Than Right-Side Plan? -- muchSmaller
Internal Condition
[source, scala]¶
muchSmaller(a: LogicalPlan, b: LogicalPlan): Boolean¶
muchSmaller
condition holds when plan a
is at least 3 times smaller than plan b
.
Internally, muchSmaller
spark-sql-LogicalPlan.md#stats[calculates the estimated statistics for the input logical plans] and compares their physical size in bytes (sizeInBytes
).
NOTE: muchSmaller
is used when JoinSelection
checks <ShuffledHashJoinExec
physical operator.
=== [[canBuildLocalHashMap]] canBuildLocalHashMap
Internal Condition
[source, scala]¶
canBuildLocalHashMap(plan: LogicalPlan): Boolean¶
canBuildLocalHashMap
condition holds for the logical plan
whose single partition is small enough to build a hash table (i.e. spark.sql.autoBroadcastJoinThreshold multiplied by spark.sql.shuffle.partitions).
Internally, canBuildLocalHashMap
spark-sql-LogicalPlan.md#stats[calculates the estimated statistics for the input logical plans] and takes the size in bytes (sizeInBytes
).
NOTE: canBuildLocalHashMap
is used when JoinSelection
checks <ShuffledHashJoinExec
physical operator.
=== [[canBroadcast]] Can Logical Plan Be Broadcast? -- canBroadcast
Internal Condition
[source, scala]¶
canBroadcast(plan: LogicalPlan): Boolean¶
canBroadcast
is enabled (true
) when the size of the output of the input logical plan (aka sizeInBytes) is less than spark.sql.autoBroadcastJoinThreshold configuration property.
canBroadcast
uses the total size statistic from Statistics of a logical operator.
canBroadcast
is used when JoinSelection
is requested to canBroadcastBySizes and selects the build side per join type and total size statistic of join sides.
=== [[canBroadcastByHints]] canBroadcastByHints
Internal Method
[source, scala]¶
canBroadcastByHints(joinType: JoinType, left: LogicalPlan, right: LogicalPlan): Boolean¶
canBroadcastByHints
is positive (i.e. true
) when either condition holds:
. Join type is spark-sql-joins.md#CROSS[CROSS], spark-sql-joins.md#INNER[INNER] or spark-sql-joins.md#RIGHT_OUTER[RIGHT OUTER] (i.e. <joinType
is positive) and left
operator's broadcast hint flag is on
. Join type is spark-sql-joins.md#CROSS[CROSS], spark-sql-joins.md#INNER[INNER], spark-sql-joins.md#LEFT_ANTI[LEFT ANTI], spark-sql-joins.md#LEFT_OUTER[LEFT OUTER], spark-sql-joins.md#LEFT_SEMI[LEFT SEMI] or spark-sql-joins.md#ExistenceJoin[ExistenceJoin] (i.e. <joinType
is positive) and right
operator's broadcast hint flag is on
Otherwise, canBroadcastByHints
is negative (i.e. false
).
NOTE: canBroadcastByHints
is used when JoinSelection
is requested to <
=== [[broadcastSideByHints]] Selecting Build Side Per Join Type and Broadcast Hints -- broadcastSideByHints
Internal Method
[source, scala]¶
broadcastSideByHints(joinType: JoinType, left: LogicalPlan, right: LogicalPlan): BuildSide¶
broadcastSideByHints
computes buildLeft
and buildRight
flags:
-
buildLeft
flag is positive (i.e.true
) when the join type is spark-sql-joins.md#CROSS[CROSS], spark-sql-joins.md#INNER[INNER] or spark-sql-joins.md#RIGHT_OUTER[RIGHT OUTER] (i.e. <> for the input joinType
is positive) and theleft
operator's broadcast hint flag is positive -
buildRight
flag is positive (i.e.true
) when the join type is spark-sql-joins.md#CROSS[CROSS], spark-sql-joins.md#INNER[INNER], spark-sql-joins.md#LEFT_ANTI[LEFT ANTI], spark-sql-joins.md#LEFT_OUTER[LEFT OUTER], spark-sql-joins.md#LEFT_SEMI[LEFT SEMI] or spark-sql-joins.md#ExistenceJoin[ExistenceJoin] (i.e. <> for the input joinType
is positive) and theright
operator's broadcast hint flag is positive
In the end, broadcastSideByHints
<
NOTE: broadcastSideByHints
is used when JoinSelection
is requested to <
=== [[broadcastSide]] Choosing Join Side to Broadcast -- broadcastSide
Internal Method
[source, scala]¶
broadcastSide( canBuildLeft: Boolean, canBuildRight: Boolean, left: LogicalPlan, right: LogicalPlan): BuildSide
broadcastSide
gives the smaller side (BuildRight
or BuildLeft
) per total size when canBuildLeft
and canBuildRight
are both positive (i.e. true
).
broadcastSide
gives BuildRight
when canBuildRight
is positive.
broadcastSide
gives BuildLeft
when canBuildLeft
is positive.
When all the above conditions are not met, broadcastSide
gives the smaller side (BuildRight
or BuildLeft
) per total size (similarly to the first case when canBuildLeft
and canBuildRight
are both positive).
NOTE: broadcastSide
is used when JoinSelection
is requested to <
=== [[canBuildLeft]] Checking If Join Type Allows For Left Join Side As Build Side -- canBuildLeft
Internal Condition
[source, scala]¶
canBuildLeft(joinType: JoinType): Boolean¶
canBuildLeft
is positive (i.e. true
) for spark-sql-joins.md#CROSS[CROSS], spark-sql-joins.md#INNER[INNER] and spark-sql-joins.md#RIGHT_OUTER[RIGHT OUTER] join types. Otherwise, canBuildLeft
is negative (i.e. false
).
NOTE: canBuildLeft
is used when JoinSelection
is requested to <
=== [[canBuildRight]] Checking If Join Type Allows For Right Join Side As Build Side -- canBuildRight
Internal Condition
[source, scala]¶
canBuildRight(joinType: JoinType): Boolean¶
canBuildRight
is positive (i.e. true
) if the input join type is one of the following:
- spark-sql-joins.md#CROSS[CROSS], spark-sql-joins.md#INNER[INNER], spark-sql-joins.md#LEFT_ANTI[LEFT ANTI], spark-sql-joins.md#LEFT_OUTER[LEFT OUTER], spark-sql-joins.md#LEFT_SEMI[LEFT SEMI] or spark-sql-joins.md#ExistenceJoin[ExistenceJoin]
Otherwise, canBuildRight
is negative (i.e. false
).
NOTE: canBuildRight
is used when JoinSelection
is requested to <
=== [[canBroadcastBySizes]] Checking If Join Type and Total Size Statistic of Join Sides Allow for Broadcast Join -- canBroadcastBySizes
Internal Method
[source, scala]¶
canBroadcastBySizes(joinType: JoinType, left: LogicalPlan, right: LogicalPlan): Boolean¶
canBroadcastBySizes
is positive (i.e. true
) when either condition holds:
. Join type is spark-sql-joins.md#CROSS[CROSS], spark-sql-joins.md#INNER[INNER] or spark-sql-joins.md#RIGHT_OUTER[RIGHT OUTER] (i.e. <joinType
is positive) and left
operator <
. Join type is spark-sql-joins.md#CROSS[CROSS], spark-sql-joins.md#INNER[INNER], spark-sql-joins.md#LEFT_ANTI[LEFT ANTI], spark-sql-joins.md#LEFT_OUTER[LEFT OUTER], spark-sql-joins.md#LEFT_SEMI[LEFT SEMI] or spark-sql-joins.md#ExistenceJoin[ExistenceJoin] (i.e. <joinType
is positive) and right
operator <
Otherwise, canBroadcastByHints
is negative (i.e. false
).
NOTE: canBroadcastByHints
is used when JoinSelection
is requested to <
=== [[broadcastSideBySizes]] Selecting Build Side Per Join Type and Total Size Statistic of Join Sides -- broadcastSideBySizes
Internal Method
[source, scala]¶
broadcastSideBySizes(joinType: JoinType, left: LogicalPlan, right: LogicalPlan): BuildSide¶
broadcastSideBySizes
computes buildLeft
and buildRight
flags:
-
buildLeft
flag is positive (i.e.true
) when the join type is spark-sql-joins.md#CROSS[CROSS], spark-sql-joins.md#INNER[INNER] or spark-sql-joins.md#RIGHT_OUTER[RIGHT OUTER] (i.e. <> for the input joinType
is positive) andleft
operator <> -
buildRight
flag is positive (i.e.true
) when the join type is spark-sql-joins.md#CROSS[CROSS], spark-sql-joins.md#INNER[INNER], spark-sql-joins.md#LEFT_ANTI[LEFT ANTI], spark-sql-joins.md#LEFT_OUTER[LEFT OUTER], spark-sql-joins.md#LEFT_SEMI[LEFT SEMI] or spark-sql-joins.md#ExistenceJoin[ExistenceJoin] (i.e. <> for the input joinType
is positive) andright
operator <>
In the end, broadcastSideByHints
<
NOTE: broadcastSideByHints
is used when JoinSelection
is requested to <
=== [[apply]] Applying JoinSelection Strategy to Logical Plan (Executing JoinSelection) -- apply
Method
[source, scala]¶
apply(plan: LogicalPlan): Seq[SparkPlan]¶
apply
is part of GenericStrategy abstraction.
apply
uses spark-sql-ExtractEquiJoinKeys.md[ExtractEquiJoinKeys] Scala extractor to destructure the input logical plan
.
==== [[apply-BroadcastHashJoinExec]] Considering BroadcastHashJoinExec Physical Operator
apply
gives a BroadcastHashJoinExec.md#creating-instance[BroadcastHashJoinExec] physical operator if the plan <apply
<
apply
gives a BroadcastHashJoinExec.md#creating-instance[BroadcastHashJoinExec] physical operator if the plan <apply
<
==== [[apply-ShuffledHashJoinExec]] Considering ShuffledHashJoinExec Physical Operator
apply
gives...FIXME
==== [[apply-SortMergeJoinExec]] Considering SortMergeJoinExec Physical Operator
apply
gives...FIXME
==== [[apply-BroadcastNestedLoopJoinExec]] Considering BroadcastNestedLoopJoinExec Physical Operator
apply
gives...FIXME
==== [[apply-CartesianProductExec]] Considering CartesianProductExec Physical Operator
apply
gives...FIXME