Skip to content

Demo: Adaptive Query Execution

This demo shows Adaptive Query Execution in action.

Before you begin

Enable InsertAdaptiveSparkPlan logger.

Enable AQE

import org.apache.spark.sql.internal.SQLConf
val conf = SQLConf.get
assert(conf.adaptiveExecutionEnabled, "Adaptive Query Execution is disabled by default")

Enable Adaptive Query Execution using the spark.sql.adaptive.enabled configuration property (or its type-safe ADAPTIVE_EXECUTION_ENABLED).

conf.setConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED, true)

Query

val q = spark.range(6).repartition(2)
q.explain(extended = true)
== Parsed Logical Plan ==
Repartition 2, true
+- Range (0, 6, step=1, splits=Some(16))

== Analyzed Logical Plan ==
id: bigint
Repartition 2, true
+- Range (0, 6, step=1, splits=Some(16))

== Optimized Logical Plan ==
Repartition 2, true
+- Range (0, 6, step=1, splits=Some(16))

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Exchange RoundRobinPartitioning(2), REPARTITION_WITH_NUM, [id=#6]
   +- Range (0, 6, step=1, splits=16)

Query Execution

q.tail(1)
21/04/28 15:06:25 DEBUG InsertAdaptiveSparkPlan: Adaptive execution enabled for plan: CollectTail 1
+- Exchange RoundRobinPartitioning(2), REPARTITION_WITH_NUM, [id=#68]
   +- Range (0, 6, step=1, splits=16)

Explain Query

q.explain(extended = true)
== Parsed Logical Plan ==
Repartition 2, true
+- Range (0, 6, step=1, splits=Some(16))

== Analyzed Logical Plan ==
id: bigint
Repartition 2, true
+- Range (0, 6, step=1, splits=Some(16))

== Optimized Logical Plan ==
Repartition 2, true
+- Range (0, 6, step=1, splits=Some(16))

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
   ShuffleQueryStage 0
   +- Exchange RoundRobinPartitioning(2), REPARTITION_WITH_NUM, [id=#105]
      +- *(1) Range (0, 6, step=1, splits=16)
+- == Initial Plan ==
   Exchange RoundRobinPartitioning(2), REPARTITION_WITH_NUM, [id=#6]
   +- Range (0, 6, step=1, splits=16)

That's it. Congratulations!


Last update: 2021-04-28
Back to top