Skip to content

InsertAdaptiveSparkPlan Physical Optimization

InsertAdaptiveSparkPlan is a physical query plan optimization (Rule[SparkPlan]) that re-optimizes a physical query plan in the middle of query execution, based on accurate runtime statistics.

Important

InsertAdaptiveSparkPlan is disabled by default based on spark.sql.adaptive.enabled configuration property.

Creating Instance

InsertAdaptiveSparkPlan takes the following to be created:

InsertAdaptiveSparkPlan is created when QueryExecution is requested for physical preparations rules.

Executing Rule

apply(
  plan: SparkPlan): SparkPlan

apply simply calls applyInternal with the given SparkPlan and isSubquery flag disabled (false).

apply is part of the Rule abstraction.

applyInternal

applyInternal(
  plan: SparkPlan,
  isSubquery: Boolean): SparkPlan

applyInternal does nothing (and simply returns the given physical plan "untouched") when spark.sql.adaptive.enabled is disabled.

applyInternal skips ExecutedCommandExec leaf operators (and simply returns the given physical plan "untouched").

For DataWritingCommandExec unary operators, applyInternal handles the child.

For V2CommandExec operators, applyInternal handles the children.

For all other operators for which shouldApplyAQE predicate holds, applyInternal branches off based on whether the physical plan supports Adaptive Query Execution or not.

Supported Query Plans

applyInternal collects subquery expressions in the query plan and creates a new PlanAdaptiveSubqueries optimization with them. applyInternal then executes the optimization rule on the plan. In the end, applyInternal creates an AdaptiveSparkPlanExec physical operator with the new optimized physical query plan.

applyInternal prints out the following DEBUG message to the logs:

Adaptive execution enabled for plan: [plan]

In case of SubqueryAdaptiveNotSupportedException, applyInternal prints out the WARN message and returns the given physical plan.

spark.sql.adaptive.enabled is enabled but is not supported for sub-query: [subquery].

Unsupported Query Plans

applyInternal simply prints out the WARN message and returns the given physical plan.

spark.sql.adaptive.enabled is enabled but is not supported for query: [plan].

Usage

applyInternal is used when InsertAdaptiveSparkPlan physical optimization is executed (with the isSubquery flag disabled) and requested to compileSubquery (with the isSubquery flag enabled).

Collecting Subquery Expressions

buildSubqueryMap(
  plan: SparkPlan): Map[Long, SubqueryExec]

buildSubqueryMap finds ScalarSubquery and ListQuery (in InSubquery) expressions (unique by expression ID to reuse the execution plan from another sub-query) in the given physical query plan.

For every ScalarSubquery and ListQuery expressions, buildSubqueryMap compileSubquery, verifyAdaptivePlan and registers a new SubqueryExec operator.

buildSubqueryMap is used when InsertAdaptiveSparkPlan physical optimization is executed.

compileSubquery

compileSubquery(
  plan: LogicalPlan): SparkPlan

compileSubquery...FIXME

compileSubquery is used when InsertAdaptiveSparkPlan physical optimization is requested to buildSubqueryMap.

verifyAdaptivePlan

verifyAdaptivePlan(
  plan: SparkPlan,
  logicalPlan: LogicalPlan): Unit

verifyAdaptivePlan...FIXME

verifyAdaptivePlan is used when InsertAdaptiveSparkPlan physical optimization is requested to buildSubqueryMap.

shouldApplyAQE Predicate

shouldApplyAQE(
  plan: SparkPlan,
  isSubquery: Boolean): Boolean

shouldApplyAQE is true when one of the following conditions holds:

  1. spark.sql.adaptive.forceApply configuration property is enabled
  2. The given isSubquery flag is enabled
  3. The given physical operator:

    1. Is an Exchange
    2. No requiredChildDistribution of the operator is UnspecifiedDistribution
    3. Contains SubqueryExpression

shouldApplyAQE is used when InsertAdaptiveSparkPlan physical optimization is executed.

supportAdaptive Predicate

supportAdaptive(
  plan: SparkPlan): Boolean

supportAdaptive is true when the given physical operator (including its children) has a logical operator linked that is neither streaming nor has DynamicPruningSubquery expressions.

supportAdaptive is used when InsertAdaptiveSparkPlan physical optimization is executed.


Last update: 2020-11-07