AdaptiveSparkPlanExec Physical Operator¶
AdaptiveSparkPlanExec
is a leaf physical operator for Adaptive Query Execution.
Creating Instance¶
AdaptiveSparkPlanExec
takes the following to be created:
- Initial physical query plan
- AdaptiveExecutionContext
- Preprocessing physical rules
-
isSubquery
flag
AdaptiveSparkPlanExec
is created when InsertAdaptiveSparkPlan physical optimisation is executed.
Executing Physical Operator¶
doExecute(): RDD[InternalRow]
doExecute
getFinalPhysicalPlan and requests it to execute (that generates a RDD[InternalRow]
that will be the return value).
doExecute
triggers finalPlanUpdate (unless done already).
doExecute
returns the RDD[InternalRow]
.
doExecute
is part of the SparkPlan abstraction.
Executing for Collect Operator¶
executeCollect(): Array[InternalRow]
executeCollect
...FIXME
executeCollect
is part of the SparkPlan abstraction.
Executing for Tail Operator¶
executeTail(
n: Int): Array[InternalRow]
executeTail
...FIXME
executeTail
is part of the SparkPlan abstraction.
Executing for Take Operator¶
executeTake(
n: Int): Array[InternalRow]
executeTake
...FIXME
executeTake
is part of the SparkPlan abstraction.
Final Physical Query Plan¶
getFinalPhysicalPlan(): SparkPlan
getFinalPhysicalPlan
...FIXME
getFinalPhysicalPlan
is used when AdaptiveSparkPlanExec
physical operator is requested to executeCollect, executeTake, executeTail and execute.
Current Optimized Physical Query Plan¶
currentPhysicalPlan: SparkPlan
currentPhysicalPlan
is a physical query plan that AdaptiveSparkPlanExec
operator creates right when created.
currentPhysicalPlan
is the initial physical query plan after applying the queryStagePreparationRules.
QueryStage Preparation Rules¶
queryStagePreparationRules: Seq[Rule[SparkPlan]]
queryStagePreparationRules
is a single-rule collection of EnsureRequirements physical optimization.
queryStagePreparationRules
is used when AdaptiveSparkPlanExec
operator is requested for the current physical plan and reOptimize.
Adaptive Optimizations¶
queryStageOptimizerRules: Seq[Rule[SparkPlan]]
AdaptiveSparkPlanExec
uses the following adaptive optimizations (physical optimization rules):
- ReuseAdaptiveSubquery
- CoalesceShufflePartitions
- OptimizeSkewedJoin
- OptimizeLocalShuffleReader
- ApplyColumnarRulesAndInsertTransitions
- CollapseCodegenStages
queryStageOptimizerRules
is used when AdaptiveSparkPlanExec
is requested to getFinalPhysicalPlan and newQueryStage.
generateTreeString¶
generateTreeString(
depth: Int,
lastChildren: Seq[Boolean],
append: String => Unit,
verbose: Boolean,
prefix: String = "",
addSuffix: Boolean = false,
maxFields: Int,
printNodeId: Boolean): Unit
generateTreeString
...FIXME
generateTreeString
is part of the TreeNode abstraction.
createQueryStages¶
createQueryStages(
plan: SparkPlan): CreateStageResult
createQueryStages
...FIXME
createQueryStages
is used when AdaptiveSparkPlanExec
physical operator is requested to getFinalPhysicalPlan.
newQueryStage¶
newQueryStage(
e: Exchange): QueryStageExec
newQueryStage
creates an optimized physical query plan for the child physical plan of the given Exchange (using the queryStageOptimizerRules).
newQueryStage
creates a QueryStageExec physical operator for the given Exchange
with the child physical plan as the optimized physical query plan:
-
For ShuffleExchangeExec,
newQueryStage
creates a ShuffleQueryStageExec (with the currentStageId counter and theShuffleExchangeExec
with the optimized plan as the child). -
For BroadcastExchangeExec,
newQueryStage
creates a BroadcastQueryStageExec (with the currentStageId counter and theBroadcastExchangeExec
with the optimized plan as the child).
newQueryStage
increments the currentStageId counter.
newQueryStage
setLogicalLinkForNewQueryStage for the QueryStageExec
physical operator.
In the end, newQueryStage
returns the QueryStageExec
physical operator.
newQueryStage
is used when AdaptiveSparkPlanExec
is requested to createQueryStages.
reuseQueryStage¶
reuseQueryStage(
existing: QueryStageExec,
exchange: Exchange): QueryStageExec
reuseQueryStage
...FIXME
reuseQueryStage
is used when AdaptiveSparkPlanExec
physical operator is requested to createQueryStages.
cleanUpAndThrowException¶
cleanUpAndThrowException(
errors: Seq[Throwable],
earlyFailedStage: Option[Int]): Unit
cleanUpAndThrowException
...FIXME
cleanUpAndThrowException
is used when AdaptiveSparkPlanExec
physical operator is requested to getFinalPhysicalPlan (and materialization of new stages fails).
Re-Optimizing Logical Query Plan¶
reOptimize(
logicalPlan: LogicalPlan): (SparkPlan, LogicalPlan)
reOptimize
gives optimized physical and logical query plans for the given logical query plan.
Internally, reOptimize
requests the given logical query plan to invalidateStatsCache and requests the local logical optimizer to generate an optimized logical query plan.
reOptimize
requests the query planner (bound to the AdaptiveExecutionContext) to plan the optimized logical query plan (and generate a physical query plan).
reOptimize
creates an optimized physical query plan using preprocessing and preparation rules.
reOptimize
is used when AdaptiveSparkPlanExec
physical operator is requested to getFinalPhysicalPlan (and materialization of new stages fails).
Logical Adaptive Optimizer¶
optimizer: RuleExecutor[LogicalPlan]
optimizer
is a RuleExecutor to transform logical query plans.
optimizer
has a single Demote BroadcastHashJoin rule batch with DemoteBroadcastHashJoin logical optimization only.
optimizer
is used when AdaptiveSparkPlanExec
physical operator is requested to re-optimize a logical query plan.
Executing Physical Rules¶
applyPhysicalRules(
plan: SparkPlan,
rules: Seq[Rule[SparkPlan]]): SparkPlan
applyPhysicalRules
simply applies (executes) the given rules to the given physical query plan.
applyPhysicalRules
is used when:
-
AdaptiveSparkPlanExec
physical operator is created (and initializes currentPhysicalPlan), is requested to getFinalPhysicalPlan, newQueryStage, reOptimize -
InsertAdaptiveSparkPlan physical optimization is executed
QueryStageCreator Thread Pool¶
executionContext: ExecutionContext
executionContext
is an ExecutionContext
that is used when:
-
AdaptiveSparkPlanExec
operator is requested for a getFinalPhysicalPlan (to materialize QueryStageExec operators asynchronously) -
BroadcastQueryStageExec operator is requested for materializeWithTimeout
finalPlanUpdate Lazy Value¶
finalPlanUpdate: Unit
lazy value
finalPlanUpdate
is a Scala lazy value which is computed once when accessed and cached afterwards.
finalPlanUpdate
...FIXME
In the end, finalPlanUpdate
prints out the following message to the logs:
Final plan: [currentPhysicalPlan]
finalPlanUpdate
is used when AdaptiveSparkPlanExec
physical operator is requested to executeCollect, executeTake, executeTail and doExecute.
logOnLevel Internal Method¶
logOnLevel: ( => String) => Unit
logOnLevel
uses spark.sql.adaptive.logLevel configuration property for the logging level and prints out the given message to the logs.
logOnLevel
is used when AdaptiveSparkPlanExec
physical operator is requested to getFinalPhysicalPlan and finalPlanUpdate.
isFinalPlan Internal Flag¶
isFinalPlan: Boolean = false
isFinalPlan
is an internal flag to avoid expensive getFinalPhysicalPlan (and return the current optimized physical query plan immediately)
isFinalPlan
is disabled by default and turned on at the end of getFinalPhysicalPlan.
isFinalPlan
is used when AdaptiveSparkPlanExec
physical operator is requested for stringArgs.
replaceWithQueryStagesInLogicalPlan Internal Method¶
replaceWithQueryStagesInLogicalPlan(
plan: LogicalPlan,
stagesToReplace: Seq[QueryStageExec]): LogicalPlan
replaceWithQueryStagesInLogicalPlan
...FIXME
replaceWithQueryStagesInLogicalPlan
is used when AdaptiveSparkPlanExec
physical operator is requested for a final physical plan.