Skip to content

OptimizeSkewedJoin Physical Optimization

OptimizeSkewedJoin is a physical query plan optimization to make data distribution more even in Adaptive Query Execution.

OptimizeSkewedJoin is a CustomShuffleReaderRule.

OptimizeSkewedJoin is also called skew join optimization.

OptimizeSkewedJoin is a Catalyst rule for transforming physical plans (Rule[SparkPlan]).

Supported Join Types

OptimizeSkewedJoin supports the following join types:

  • Inner
  • Cross
  • LeftSemi
  • LeftAnti
  • LeftOuter
  • RightOuter

Configuration Properties

OptimizeSkewedJoin uses the following configuration properties:

Creating Instance

OptimizeSkewedJoin takes the following to be created:

OptimizeSkewedJoin is created when AdaptiveSparkPlanExec physical operator is requested for the adaptive optimizations.

Executing Rule

apply(
  plan: SparkPlan): SparkPlan

apply uses spark.sql.adaptive.skewJoin.enabled configuration property to determine whether to apply any optimizations or not.

apply collects ShuffleQueryStageExec physical operators.

Note

apply does nothing and simply gives the query plan "untouched" when applied to a query plan with the number of ShuffleQueryStageExec physical operators different than 2.

apply...FIXME

apply is part of the Rule abstraction.

Optimizing Skewed Joins

optimizeSkewJoin(
  plan: SparkPlan): SparkPlan

optimizeSkewJoin transforms SortMergeJoinExec physical operators (with the supportedJoinTypes) of two SortExec operators with ShuffleQueryStageExec children.

optimizeSkewJoin handles SortMergeJoinExec operators with the left and right operators of the same number of partitions.

optimizeSkewJoin computes median partition size for the left and right operators.

optimizeSkewJoin prints out the following DEBUG message to the logs:

Optimizing skewed join.
Left side partitions size info:
[info]
Right side partitions size info:
[info]

optimizeSkewJoin...FIXME

optimizeSkewJoin prints out the following DEBUG message to the logs:

number of skewed partitions: left [numPartitions], right [numPartitions]

In the end, optimizeSkewJoin creates CustomShuffleReaderExec physical operators for the left and right children of the SortMergeJoinExec operator if and only if the number of skewed partitions for either side is greater than 0. optimizeSkewJoin turns on the isSkewJoin flag (of the SortMergeJoinExec operator). Otherwise, optimizeSkewJoin leaves the SortMergeJoinExec operator "untouched".

isSkewed Predicate

isSkewed(
  size: Long,
  medianSize: Long): Boolean

isSkewed is on (true) when the given size is greater than all of the following:

Target Partition Size

targetSize(
  sizes: Seq[Long],
  medianSize: Long): Long

targetSize determines the target partition size (to optimize skewed join) and is the greatest value among the following:

targetSize throws an AssertionError when all partitions are skewed (no non-skewed partitions).

Median Partition Size

medianSize(
  sizes: Seq[Long]): Long

medianSize...FIXME

Logging

Enable ALL logging level for org.apache.spark.sql.execution.adaptive.OptimizeSkewedJoin logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.sql.execution.adaptive.OptimizeSkewedJoin=ALL

Refer to Logging.


Last update: 2021-05-17