Skip to content

AdaptiveSparkPlanExec Physical Operator

AdaptiveSparkPlanExec is a leaf physical operator for Adaptive Query Execution.

Creating Instance

AdaptiveSparkPlanExec takes the following to be created:

AdaptiveSparkPlanExec is created when InsertAdaptiveSparkPlan physical optimisation is executed.

Executing Physical Operator

doExecute(): RDD[InternalRow]

doExecute getFinalPhysicalPlan and requests it to execute (that generates a RDD[InternalRow] that will be the return value).

doExecute triggers finalPlanUpdate (unless done already).

doExecute returns the RDD[InternalRow].

doExecute is part of the SparkPlan abstraction.

Executing for Collect Operator

executeCollect(): Array[InternalRow]

executeCollect...FIXME

executeCollect is part of the SparkPlan abstraction.

Executing for Tail Operator

executeTail(
  n: Int): Array[InternalRow]

executeTail...FIXME

executeTail is part of the SparkPlan abstraction.

Executing for Take Operator

executeTake(
  n: Int): Array[InternalRow]

executeTake...FIXME

executeTake is part of the SparkPlan abstraction.

Final Physical Query Plan

getFinalPhysicalPlan(): SparkPlan

getFinalPhysicalPlan...FIXME

getFinalPhysicalPlan is used when AdaptiveSparkPlanExec physical operator is requested to executeCollect, executeTake, executeTail and execute.

Current Optimized Physical Query Plan

currentPhysicalPlan: SparkPlan

currentPhysicalPlan is a physical query plan that AdaptiveSparkPlanExec operator creates right when created.

currentPhysicalPlan is the initial physical query plan after applying the queryStagePreparationRules.

QueryStage Preparation Rules

queryStagePreparationRules: Seq[Rule[SparkPlan]]

queryStagePreparationRules is a single-rule collection of EnsureRequirements physical optimization.

queryStagePreparationRules is used when AdaptiveSparkPlanExec operator is requested for the current physical plan and reOptimize.

Adaptive Optimizations

queryStageOptimizerRules: Seq[Rule[SparkPlan]]

AdaptiveSparkPlanExec uses the following adaptive optimizations (physical optimization rules):

queryStageOptimizerRules is used when AdaptiveSparkPlanExec is requested to getFinalPhysicalPlan and newQueryStage.

generateTreeString

generateTreeString(
  depth: Int,
  lastChildren: Seq[Boolean],
  append: String => Unit,
  verbose: Boolean,
  prefix: String = "",
  addSuffix: Boolean = false,
  maxFields: Int,
  printNodeId: Boolean): Unit

generateTreeString...FIXME

generateTreeString is part of the TreeNode abstraction.

createQueryStages

createQueryStages(
  plan: SparkPlan): CreateStageResult

createQueryStages...FIXME

createQueryStages is used when AdaptiveSparkPlanExec physical operator is requested to getFinalPhysicalPlan.

newQueryStage

newQueryStage(
  e: Exchange): QueryStageExec

newQueryStage creates an optimized physical query plan for the child physical plan of the given Exchange (using the queryStageOptimizerRules).

newQueryStage creates a QueryStageExec physical operator for the given Exchange with the child physical plan as the optimized physical query plan:

newQueryStage increments the currentStageId counter.

newQueryStage setLogicalLinkForNewQueryStage for the QueryStageExec physical operator.

In the end, newQueryStage returns the QueryStageExec physical operator.

newQueryStage is used when AdaptiveSparkPlanExec is requested to createQueryStages.

reuseQueryStage

reuseQueryStage(
  existing: QueryStageExec,
  exchange: Exchange): QueryStageExec

reuseQueryStage...FIXME

reuseQueryStage is used when AdaptiveSparkPlanExec physical operator is requested to createQueryStages.

cleanUpAndThrowException

cleanUpAndThrowException(
  errors: Seq[Throwable],
  earlyFailedStage: Option[Int]): Unit

cleanUpAndThrowException...FIXME

cleanUpAndThrowException is used when AdaptiveSparkPlanExec physical operator is requested to getFinalPhysicalPlan (and materialization of new stages fails).

Re-Optimizing Logical Query Plan

reOptimize(
  logicalPlan: LogicalPlan): (SparkPlan, LogicalPlan)

reOptimize gives optimized physical and logical query plans for the given logical query plan.

Internally, reOptimize requests the given logical query plan to invalidateStatsCache and requests the local logical optimizer to generate an optimized logical query plan.

reOptimize requests the query planner (bound to the AdaptiveExecutionContext) to plan the optimized logical query plan (and generate a physical query plan).

reOptimize creates an optimized physical query plan using preprocessing and preparation rules.

reOptimize is used when AdaptiveSparkPlanExec physical operator is requested to getFinalPhysicalPlan (and materialization of new stages fails).

Logical Adaptive Optimizer

optimizer: RuleExecutor[LogicalPlan]

optimizer is a RuleExecutor to transform logical query plans.

optimizer has a single Demote BroadcastHashJoin rule batch with DemoteBroadcastHashJoin logical optimization only.

optimizer is used when AdaptiveSparkPlanExec physical operator is requested to re-optimize a logical query plan.

Executing Physical Rules

applyPhysicalRules(
  plan: SparkPlan,
  rules: Seq[Rule[SparkPlan]]): SparkPlan

applyPhysicalRules simply applies (executes) the given rules to the given physical query plan.

applyPhysicalRules is used when:

QueryStageCreator Thread Pool

executionContext: ExecutionContext

executionContext is an ExecutionContext that is used when:

finalPlanUpdate Lazy Value

finalPlanUpdate: Unit
lazy value

finalPlanUpdate is a Scala lazy value which is computed once when accessed and cached afterwards.

finalPlanUpdate...FIXME

In the end, finalPlanUpdate prints out the following message to the logs:

Final plan: [currentPhysicalPlan]

finalPlanUpdate is used when AdaptiveSparkPlanExec physical operator is requested to executeCollect, executeTake, executeTail and doExecute.

logOnLevel Internal Method

logOnLevel: ( => String) => Unit

logOnLevel uses spark.sql.adaptive.logLevel configuration property for the logging level and prints out the given message to the logs.

logOnLevel is used when AdaptiveSparkPlanExec physical operator is requested to getFinalPhysicalPlan and finalPlanUpdate.

isFinalPlan Internal Flag

isFinalPlan: Boolean = false

isFinalPlan is an internal flag to avoid expensive getFinalPhysicalPlan (and return the current optimized physical query plan immediately)

isFinalPlan is disabled by default and turned on at the end of getFinalPhysicalPlan.

isFinalPlan is used when AdaptiveSparkPlanExec physical operator is requested for stringArgs.

replaceWithQueryStagesInLogicalPlan Internal Method

replaceWithQueryStagesInLogicalPlan(
  plan: LogicalPlan,
  stagesToReplace: Seq[QueryStageExec]): LogicalPlan

replaceWithQueryStagesInLogicalPlan...FIXME

replaceWithQueryStagesInLogicalPlan is used when AdaptiveSparkPlanExec physical operator is requested for a final physical plan.


Last update: 2021-02-14