Skip to content

SubqueryExec Unary Physical Operator

SubqueryExec is a unary physical operator that...FIXME

SubqueryExec uses <> that is lazily and executed only once when SubqueryExec is first requested to <> that simply triggers execution of the <> operator asynchronously (i.e. on a separate thread) and to <> soon after (that makes SubqueryExec waiting indefinitely for the child operator to be finished).

CAUTION: FIXME When is doPrepare executed?

SubqueryExec is <> when PlanSubqueries physical optimization is executed (and transforms ScalarSubquery expressions in a physical plan).

[source, scala]

val q = sql("select (select max(id) from t1) tt from t1") scala> q.explain == Physical Plan == *Project [Subquery subquery32 AS tt#33L] : +- Subquery subquery32 : +- *HashAggregate(keys=[], functions=[max(id#20L)]) : +- Exchange SinglePartition : +- *HashAggregate(keys=[], functions=[partial_max(id#20L)]) : +- *FileScan parquet default.t1[id#20L] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/Users/jacek/dev/oss/spark/spark-warehouse/t1], PartitionFilters: [], PushedFilters: [], ReadSchema: struct +- *FileScan parquet default.t1[] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/Users/jacek/dev/oss/spark/spark-warehouse/t1], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<>

[[metrics]] .SubqueryExec's Performance Metrics [cols="1,2,2",options="header",width="100%"] |=== | Key | Name (in web UI) | Description

| [[collectTime]] collectTime | time to collect (ms) |

| [[dataSize]] dataSize | data size (bytes) | |===

.SubqueryExec in web UI (Details for Query) image::images/spark-sql-SubqueryExec-webui-details-for-query.png[align="center"]

NOTE: SubqueryExec physical operator is almost an exact copy of[BroadcastExchangeExec] physical operator.

=== [[doPrepare]] Executing Child Operator Asynchronously -- doPrepare Method

[source, scala]

doPrepare(): Unit

NOTE: doPrepare is part of[SparkPlan Contract] to prepare a physical operator for execution.

doPrepare simply triggers initialization of the internal lazily-once-initialized <> asynchronous computation.

=== [[relationFuture]] relationFuture Internal Lazily-Once-Initialized Property

[source, scala]

relationFuture: Future[Array[InternalRow]]

When "materialized" (aka executed), relationFuture spawns a new thread of execution that requests SQLExecution to execute an action (with the current execution id) on subquery <>.

NOTE: relationFuture uses Scala's[scala.concurrent.Future] that spawns a new thread of execution once instantiated.

The action tracks execution of the <> to[executeCollect] and collects <> and <> SQL metrics.

In the end, relationFuture[posts metric updates] and returns the internal rows.

[[executionContext]] NOTE: relationFuture is executed on a separate thread from a custom[scala.concurrent.ExecutionContext] (built from a cached[java.util.concurrent.ThreadPoolExecutor] with the prefix subquery and up to 16 threads).

NOTE: relationFuture is used when SubqueryExec is requested to <> (that triggers execution of the child operator) and <> (that waits indefinitely until the child operator has finished).

=== [[creating-instance]] Creating SubqueryExec Instance

SubqueryExec takes the following when created:

  • [[name]] Name of the subquery
  • [[child]] Child[physical plan]

=== [[executeCollect]] Collecting Internal Rows of Executing SubqueryExec Operator -- executeCollect Method

[source, scala]

executeCollect(): Array[InternalRow]

NOTE: executeCollect is part of[SparkPlan Contract] to execute a physical operator and collect the results as collection of internal rows.

executeCollect waits till <> gives a result (as a Array[InternalRow]).

Last update: 2020-11-25