Skip to content

AdaptiveSparkPlanExec Leaf Physical Operator

AdaptiveSparkPlanExec is a leaf physical operator for Adaptive Query Execution.

Creating Instance

AdaptiveSparkPlanExec takes the following to be created:

AdaptiveSparkPlanExec is created when:

Executing Physical Operator

doExecute(): RDD[InternalRow]

doExecute getFinalPhysicalPlan and requests it to execute (that generates a RDD[InternalRow] that will be the return value).

doExecute triggers finalPlanUpdate (unless done already).

doExecute returns the RDD[InternalRow].

doExecute is part of the SparkPlan abstraction.

Executing for Collect Operator

executeCollect(): Array[InternalRow]

executeCollect...FIXME

executeCollect is part of the SparkPlan abstraction.

Executing for Tail Operator

executeTail(
  n: Int): Array[InternalRow]

executeTail...FIXME

executeTail is part of the SparkPlan abstraction.

Executing for Take Operator

executeTake(
  n: Int): Array[InternalRow]

executeTake...FIXME

executeTake is part of the SparkPlan abstraction.

Final Physical Query Plan

getFinalPhysicalPlan(): SparkPlan

Note

getFinalPhysicalPlan uses the isFinalPlan internal flag (and an optimized physical query plan) to short-circuit (skip) the whole computation.

Step 1. createQueryStages

getFinalPhysicalPlan createQueryStages with the currentPhysicalPlan.

Step 2. Until allChildStagesMaterialized

getFinalPhysicalPlan executes the following until allChildStagesMaterialized.

Step 2.1 New QueryStageExecs

getFinalPhysicalPlan does the following when there are new stages to be processed:

  • FIXME

Step 2.2 StageMaterializationEvents

getFinalPhysicalPlan executes the following until allChildStagesMaterialized:

  • FIXME

Step 2.3 Errors

In case of errors, getFinalPhysicalPlan cleanUpAndThrowException.

Step 2.4 replaceWithQueryStagesInLogicalPlan

getFinalPhysicalPlan replaceWithQueryStagesInLogicalPlan with the currentLogicalPlan and the stagesToReplace.

Step 2.5 reOptimize

getFinalPhysicalPlan reOptimize the new logical plan.

Step 2.6 Evaluating Cost

getFinalPhysicalPlan requests the SimpleCostEvaluator to evaluateCost of the currentPhysicalPlan and the new newPhysicalPlan.

Step 2.7 Adopting New Physical Plan

getFinalPhysicalPlan adopts the new plan if the cost is less than the currentPhysicalPlan or the costs are equal but the physical plans are different (likely better).

getFinalPhysicalPlan prints out the following message to the logs (using the logOnLevel):

Plan changed from [currentPhysicalPlan] to [newPhysicalPlan]

getFinalPhysicalPlan cleanUpTempTags with the newPhysicalPlan.

getFinalPhysicalPlan saves the newPhysicalPlan as the currentPhysicalPlan (alongside the currentLogicalPlan with the newLogicalPlan).

getFinalPhysicalPlan resets the stagesToReplace.

Step 2.8 createQueryStages

getFinalPhysicalPlan createQueryStages for the currentPhysicalPlan (that may have changed).

Step 3. applyPhysicalRules

getFinalPhysicalPlan applyPhysicalRules on the final plan (with the finalStageOptimizerRules, the planChangeLogger and AQE Final Query Stage Optimization name).

getFinalPhysicalPlan turns the isFinalPlan internal flag on.

Usage

getFinalPhysicalPlan is used when:

finalStageOptimizerRules

finalStageOptimizerRules: Seq[Rule[SparkPlan]]

finalStageOptimizerRules...FIXME

createQueryStages

createQueryStages(
  plan: SparkPlan): CreateStageResult

createQueryStages...FIXME

Creating QueryStageExec for Exchange

newQueryStage(
  e: Exchange): QueryStageExec

newQueryStage creates a new QueryStageExec physical operator based on the type of the given Exchange physical operator.

Exchange QueryStageExec
ShuffleExchangeLike ShuffleQueryStageExec
BroadcastExchangeLike BroadcastQueryStageExec

newQueryStage creates an optimized physical query plan for the child physical plan of the given Exchange. newQueryStage uses the adaptive optimizations, the PlanChangeLogger and AQE Query Stage Optimization batch name.

newQueryStage creates a new QueryStageExec physical operator for the given Exchange operator (using the currentStageId for the ID).

After applyPhysicalRules for the child operator, newQueryStage creates an optimized physical query plan for the Exchange itself (with the new optimized physical query plan for the child). newQueryStage uses the post-stage-creation optimizations, the PlanChangeLogger and AQE Post Stage Creation batch name.

newQueryStage increments the currentStageId counter.

newQueryStage associates the new query stage operator with the Exchange physical operator.

In the end, newQueryStage returns the QueryStageExec physical operator.

Optimized Physical Query Plan

AdaptiveSparkPlanExec uses currentPhysicalPlan internal registry for an optimized physical query plan (that is available as executedPlan method).

Initially, when AdaptiveSparkPlanExec operator is created, currentPhysicalPlan is the initialPlan.

currentPhysicalPlan may change in getFinalPhysicalPlan until the isFinalPlan internal flag is on.

QueryStage Preparation Rules

queryStagePreparationRules: Seq[Rule[SparkPlan]]

queryStagePreparationRules is a single-rule collection of EnsureRequirements physical optimization.

queryStagePreparationRules is used when AdaptiveSparkPlanExec operator is requested for the current physical plan and reOptimize.

Adaptive Optimizations

queryStageOptimizerRules: Seq[Rule[SparkPlan]]

queryStageOptimizerRules is the following adaptive optimizations (physical optimization rules):

queryStageOptimizerRules is used when:

Post-Stage-Creation Adaptive Optimizations

postStageCreationRules: Seq[Rule[SparkPlan]]

postStageCreationRules is the following adaptive optimizations (physical optimization rules):

postStageCreationRules is used when:

generateTreeString

generateTreeString(
  depth: Int,
  lastChildren: Seq[Boolean],
  append: String => Unit,
  verbose: Boolean,
  prefix: String = "",
  addSuffix: Boolean = false,
  maxFields: Int,
  printNodeId: Boolean): Unit

generateTreeString...FIXME

generateTreeString is part of the TreeNode abstraction.

reuseQueryStage

reuseQueryStage(
  existing: QueryStageExec,
  exchange: Exchange): QueryStageExec

reuseQueryStage...FIXME

reuseQueryStage is used when AdaptiveSparkPlanExec physical operator is requested to createQueryStages.

cleanUpAndThrowException

cleanUpAndThrowException(
  errors: Seq[Throwable],
  earlyFailedStage: Option[Int]): Unit

cleanUpAndThrowException...FIXME

cleanUpAndThrowException is used when AdaptiveSparkPlanExec physical operator is requested to getFinalPhysicalPlan (and materialization of new stages fails).

Re-Optimizing Logical Query Plan

reOptimize(
  logicalPlan: LogicalPlan): (SparkPlan, LogicalPlan)

reOptimize gives optimized physical and logical query plans for the given logical query plan.

Internally, reOptimize requests the given logical query plan to invalidateStatsCache and requests the local logical optimizer to generate an optimized logical query plan.

reOptimize requests the query planner (bound to the AdaptiveExecutionContext) to plan the optimized logical query plan (and generate a physical query plan).

reOptimize creates an optimized physical query plan using preprocessing and preparation rules.

reOptimize is used when AdaptiveSparkPlanExec physical operator is requested to getFinalPhysicalPlan (and materialization of new stages fails).

Adaptive Logical Optimizer

optimizer: AQEOptimizer

AdaptiveSparkPlanExec creates an AQEOptimizer (while created) for re-optimizing a logical query plan.

QueryStageCreator Thread Pool

executionContext: ExecutionContext

executionContext is an ExecutionContext that is used when:

finalPlanUpdate Lazy Value

finalPlanUpdate: Unit
lazy value

finalPlanUpdate is a Scala lazy value which is computed once when accessed and cached afterwards.

finalPlanUpdate...FIXME

In the end, finalPlanUpdate prints out the following message to the logs:

Final plan: [currentPhysicalPlan]

finalPlanUpdate is used when AdaptiveSparkPlanExec physical operator is requested to executeCollect, executeTake, executeTail and doExecute.

isFinalPlan Internal Flag

isFinalPlan: Boolean = false

isFinalPlan is an internal flag to avoid expensive getFinalPhysicalPlan (and return the current optimized physical query plan immediately)

isFinalPlan is disabled by default and turned on at the end of getFinalPhysicalPlan.

isFinalPlan is also used when:

  • AdaptiveSparkPlanExec is requested for stringArgs

Initial Physical Plan and AQE Preparations

initialPlan: SparkPlan

AdaptiveSparkPlanExec defines an initialPlan internal registry for a physical query plan when created.

initialPlan is a physical query plan after executing the queryStagePreparationRules on the inputPlan (with the planChangeLogger and AQE Preparations name).

initialPlan is an internal flag to avoid expensive getFinalPhysicalPlan (and return the current optimized physical query plan immediately)

initialPlan is disabled by default and turned on at the end of getFinalPhysicalPlan.

initialPlan is also used when:

  • AdaptiveSparkPlanExec is requested for stringArgs

replaceWithQueryStagesInLogicalPlan

replaceWithQueryStagesInLogicalPlan(
  plan: LogicalPlan,
  stagesToReplace: Seq[QueryStageExec]): LogicalPlan

replaceWithQueryStagesInLogicalPlan...FIXME

replaceWithQueryStagesInLogicalPlan is used when AdaptiveSparkPlanExec physical operator is requested for a final physical plan.

Executing Physical Rules

applyPhysicalRules(
  plan: SparkPlan,
  rules: Seq[Rule[SparkPlan]],
  loggerAndBatchName: Option[(PlanChangeLogger[SparkPlan], String)] = None): SparkPlan

By default (with no loggerAndBatchName given) applyPhysicalRules applies (executes) the given rules to the given physical query plan.

With loggerAndBatchName specified, applyPhysicalRules executes the rules and, for every rule, requests the PlanChangeLogger to logRule. In the end, applyPhysicalRules requests the PlanChangeLogger to logBatch.

applyPhysicalRules is used when:

Logging

Enable ALL logging level for org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec=ALL

Refer to Logging.

PlanChangeLogger

AdaptiveSparkPlanExec uses a PlanChangeLogger for the following:

logOnLevel

logOnLevel: (=> String) => Unit

logOnLevel uses the internal spark.sql.adaptive.logLevel configuration property for the logging level and prints out the given message to the logs (at the log level).

logOnLevel is used when:

Back to top