Skip to content

InsertAdaptiveSparkPlan Physical Optimization

InsertAdaptiveSparkPlan is a physical query plan optimization (Rule[SparkPlan]) that re-optimizes a physical query plan in the middle of query execution, based on accurate runtime statistics.

Important

InsertAdaptiveSparkPlan is disabled by default based on spark.sql.adaptive.enabled configuration property.

Adaptive Requirements

shouldApplyAQE(
  plan: SparkPlan,
  isSubquery: Boolean): Boolean

shouldApplyAQE is true when one of the following conditions holds:

  1. spark.sql.adaptive.forceApply configuration property is enabled
  2. The given isSubquery flag is enabled (a shortcut to continue since the input plan is from a sub-query and it was already decided to apply AQE for the main query)
  3. The given physical query plan uses a physical operator that matches or is the following:
    1. Exchange
    2. There is a UnspecifiedDistribution among the requiredChildDistribution of the operator (and the query may need to add exchanges)
    3. Contains SubqueryExpression

Creating Instance

InsertAdaptiveSparkPlan takes the following to be created:

InsertAdaptiveSparkPlan is created when:

Executing Rule

apply(
  plan: SparkPlan): SparkPlan

apply is part of the Rule abstraction.

apply applyInternal with the given SparkPlan and isSubquery flag disabled (false).

applyInternal

applyInternal(
  plan: SparkPlan,
  isSubquery: Boolean): SparkPlan

applyInternal returns the given physical plan "untouched" when spark.sql.adaptive.enabled is disabled.

applyInternal skips ExecutedCommandExec leaf operators (and simply returns the given physical plan "untouched").

For DataWritingCommandExec unary operators, applyInternal handles the child recursively.

For V2CommandExec operators, applyInternal handles the children recursively.

For all other operators for which shouldApplyAQE predicate holds, applyInternal branches off based on whether the physical plan supports Adaptive Query Execution or not.

Supported Query Plans

applyInternal creates a new PlanAdaptiveSubqueries optimization (with subquery expressions) and applies it to the plan.

applyInternal prints out the following DEBUG message to the logs:

Adaptive execution enabled for plan: [plan]

In the end, applyInternal creates an AdaptiveSparkPlanExec physical operator with the new pre-processed physical query plan.

In case of SubqueryAdaptiveNotSupportedException, applyInternal prints out the WARN message and returns the given physical plan.

spark.sql.adaptive.enabled is enabled but is not supported for sub-query: [subquery].

Unsupported Query Plans

applyInternal simply prints out the WARN message and returns the given physical plan.

spark.sql.adaptive.enabled is enabled but is not supported for query: [plan].

Usage

applyInternal is used by InsertAdaptiveSparkPlan when requested for the following:

Collecting Subquery Expressions

buildSubqueryMap(
  plan: SparkPlan): Map[Long, SubqueryExec]

buildSubqueryMap finds ScalarSubquery and ListQuery (in InSubquery) expressions (unique by expression ID to reuse the execution plan from another sub-query) in the given physical query plan.

For every ScalarSubquery and ListQuery expressions, buildSubqueryMap compileSubquery, verifyAdaptivePlan and registers a new SubqueryExec operator.

compileSubquery

compileSubquery(
  plan: LogicalPlan): SparkPlan

compileSubquery requests the session-bound SparkPlanner (from the AdaptiveExecutionContext) to plan the given LogicalPlan (that produces a SparkPlan).

In the end, compileSubquery applyInternal with isSubquery flag turned on.

Enforcing AdaptiveSparkPlanExec

verifyAdaptivePlan(
  plan: SparkPlan,
  logicalPlan: LogicalPlan): Unit

verifyAdaptivePlan throws a SubqueryAdaptiveNotSupportedException when the given SparkPlan is not a AdaptiveSparkPlanExec.

supportAdaptive Predicate

supportAdaptive(
  plan: SparkPlan): Boolean

supportAdaptive is true when the given physical operator (including its children) has a logical operator linked that is neither streaming nor has DynamicPruningSubquery expressions.

supportAdaptive is used when InsertAdaptiveSparkPlan physical optimization is executed.

Logging

Enable ALL logging level for org.apache.spark.sql.execution.adaptive.InsertAdaptiveSparkPlan logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.sql.execution.adaptive.InsertAdaptiveSparkPlan=ALL

Refer to Logging.


Last update: 2021-04-28
Back to top