Skip to content

WholeStageCodegenExec Unary Physical Operator

WholeStageCodegenExec is a unary physical operator that is one of the two physical operators that lay the foundation for the Whole-Stage Java Code Generation for a Codegened Execution Pipeline of a structured query.

Note

InputAdapter is the other physical operator for Codegened Execution Pipeline of a structured query.

WholeStageCodegenExec itself supports the Java code generation and so when <> triggers code generation for the entire child physical plan subtree of a structured query.

val q = spark.range(10).where('id === 4)
scala> q.queryExecution.debug.codegen
Found 1 WholeStageCodegen subtrees.
== Subtree 1 / 1 ==
*(1) Filter (id#3L = 4)
+- *(1) Range (0, 10, step=1, splits=8)

Generated code:
/* 001 */ public Object generate(Object[] references) {
/* 002 */   return new GeneratedIteratorForCodegenStage1(references);
/* 003 */ }
/* 004 */
/* 005 */ final class GeneratedIteratorForCodegenStage1 extends org.apache.spark.sql.execution.BufferedRowIterator {
...

[TIP]

Consider using spark-sql-debugging-query-execution.md[Debugging Query Execution facility] to deep dive into the whole-stage code generation.

[source, scala]

val q = spark.range(10).where('id === 4) import org.apache.spark.sql.execution.debug._ scala> q.debugCodegen() Found 1 WholeStageCodegen subtrees. == Subtree 1 / 1 == *(1) Filter (id#0L = 4) +- *(1) Range (0, 10, step=1, splits=8)

Generated code: /* 001 / public Object generate(Object[] references) { / 002 / return new GeneratedIteratorForCodegenStage1(references); / 003 / } / 004 / / 005 */ final class GeneratedIteratorForCodegenStage1 extends org.apache.spark.sql.execution.BufferedRowIterator { ...


[TIP]

Use the following to enable comments in generated code.

[source, scala]

org.apache.spark.SparkEnv.get.conf.set("spark.sql.codegen.comments", "true")

====

[source, scala]

val q = spark.range(10).where('id === 4) import org.apache.spark.sql.execution.debug._ scala> q.debugCodegen() Found 1 WholeStageCodegen subtrees. == Subtree 1 / 1 == *(1) Filter (id#6L = 4) +- *(1) Range (0, 10, step=1, splits=8)

Generated code: /* 001 / public Object generate(Object[] references) { / 002 / return new GeneratedIteratorForCodegenStage1(references); / 003 / } / 004 / / 005 / /* * Codegend pipeline for stage (id=1) * (1) Filter (id#6L = 4) * +- *(1) Range (0, 10, step=1, splits=8) */ / 006 */ final class GeneratedIteratorForCodegenStage1 extends org.apache.spark.sql.execution.BufferedRowIterator { ...


WholeStageCodegenExec is <> when:

  • CollapseCodegenStages physical optimization is executed (with spark.sql.codegen.wholeStage configuration property enabled)

  • FileSourceScanExec leaf physical operator is <> (with the <> flag enabled)

  • InMemoryTableScanExec leaf physical operator is <> (with the <> flag enabled)

  • DataSourceV2ScanExec leaf physical operator is <> (with the <> flag enabled)

[[creating-instance]] [[child]] [[codegenStageId]] WholeStageCodegenExec takes a single child SparkPlan.md[physical operator] (a physical subquery tree) and codegen stage ID when created.

NOTE: WholeStageCodegenExec <> that the single <> physical operator supports Java code generation.

// RangeExec physical operator does support codegen
import org.apache.spark.sql.execution.RangeExec
import org.apache.spark.sql.catalyst.plans.logical.Range
val rangeExec = RangeExec(Range(start = 0, end = 1, step = 1, numSlices = 1))

import org.apache.spark.sql.execution.WholeStageCodegenExec
val rdd = WholeStageCodegenExec(rangeExec)(codegenStageId = 0).execute()

[[generateTreeString]] WholeStageCodegenExec marks the <> physical operator with * (star) prefix and <> (in round brackets) in the text representation of a physical plan tree.

scala> println(plan.numberedTreeString)
00 *(1) Project [id#117L]
01 +- *(1) BroadcastHashJoin [id#117L], [cast(id#115 as bigint)], Inner, BuildRight
02    :- *(1) Range (0, 1, step=1, splits=8)
03    +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
04       +- Generate explode(ids#112), false, [id#115]
05          +- LocalTableScan [ids#112]

NOTE: As WholeStageCodegenExec is created as a result of CollapseCodegenStages physical optimization, it is only executed in executedPlan phase of a query execution (that you can only notice by the * star prefix in a plan output).

[source, scala]

val q = spark.range(9)

// we need executedPlan with WholeStageCodegenExec physical operator "injected" val plan = q.queryExecution.executedPlan

// Note the star prefix of Range that marks WholeStageCodegenExec // As a matter of fact, there are two physical operators in play here // i.e. WholeStageCodegenExec with Range as the child scala> println(plan.numberedTreeString) 00 *Range (0, 9, step=1, splits=8)

// Let's unwrap Range physical operator // and access the parent WholeStageCodegenExec import org.apache.spark.sql.execution.WholeStageCodegenExec val wsce = plan.asInstanceOf[WholeStageCodegenExec]

// Trigger code generation of the entire query plan tree val (ctx, code) = wsce.doCodeGen

// CodeFormatter can pretty-print the code import org.apache.spark.sql.catalyst.expressions.codegen.CodeFormatter scala> println(CodeFormatter.format(code)) /* 001 / public Object generate(Object[] references) { / 002 / return new GeneratedIterator(references); / 003 / } / 004 / / 005 / /* * Codegend pipeline for * Range (0, 9, step=1, splits=8) / / 006 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator { ...


When <>, WholeStageCodegenExec gives <> performance metric.

TIP: Use Dataset.md#explain[explain] operator to know the physical plan of a query and find out whether or not WholeStageCodegen is in use.

[source, scala]

val q = spark.range(10).where('id === 4) // Note the stars in the output that are for codegened operators scala> q.explain == Physical Plan == *Filter (id#0L = 4) +- *Range (0, 10, step=1, splits=8)


NOTE: SparkPlan.md[Physical plans] that support code generation extend CodegenSupport.

[[logging]] [TIP] ==== Enable DEBUG logging level for org.apache.spark.sql.execution.WholeStageCodegenExec logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.sql.execution.WholeStageCodegenExec=DEBUG

Refer to spark-logging.md[Logging].

=== [[doExecute]] Executing Physical Operator (Generating RDD[InternalRow]) -- doExecute Method

[source, scala]

doExecute(): RDD[InternalRow]

doExecute is part of the SparkPlan abstraction.

doExecute <> first and uses CodeGenerator to compile it right afterwards.

If compilation goes well, doExecute branches off per the number of input RDDs.

NOTE: doExecute only supports up to two input RDDs.

CAUTION: FIXME Finish the "success" path

If the size of the generated codes is greater than spark.sql.codegen.hugeMethodLimit, doExecute prints out the following INFO message:

Found too long generated codes and JIT optimization might not work: the bytecode size ([maxCodeSize]) is above the limit [spark.sql.codegen.hugeMethodLimit], and the whole-stage codegen was disabled for this plan (id=[codegenStageId]). To avoid this, you can raise the limit `spark.sql.codegen.hugeMethodLimit`:
[treeString]

In the end, doExecute requests the <> physical operator to <> (that triggers physical query planning and generates an RDD[InternalRow]) and returns it.

NOTE: doExecute skips requesting the <> physical operator to <> for <> leaf physical operator with <> flag enabled (as FileSourceScanExec operator uses WholeStageCodegenExec operator when <>).

If compilation fails and spark.sql.codegen.fallback configuration property is enabled, doExecute prints out the following WARN message to the logs, requests the <> physical operator to SparkPlan.md#execute[execute] and returns it.

Whole-stage codegen disabled for plan (id=[codegenStageId]):
 [treeString]

=== [[doCodeGen]] Generating Java Source Code for Child Physical Plan Subtree -- doCodeGen Method

[source, scala]

doCodeGen(): (CodegenContext, CodeAndComment)

doCodeGen creates a new CodegenContext and requests the single <> physical operator to generate a Java source code for produce code path (with the new CodegenContext and the WholeStageCodegenExec physical operator itself).

doCodeGen adds the new function under the name of processNext.

doCodeGen <>.

doCodeGen generates the final Java source code of the following format:

[source, scala]

public Object generate(Object[] references) { return new className; }

/** * Codegend pipeline for stage (id=[codegenStageId]) * [treeString] */ final class [className] extends BufferedRowIterator {

private Object[] references; private scala.collection.Iterator[] inputs; // ctx.declareMutableStates()

public className { this.references = references; }

public void init(int index, scala.collection.Iterator[] inputs) { partitionIndex = index; this.inputs = inputs; // ctx.initMutableStates() // ctx.initPartition() }

// ctx.emitExtraCode()

// ctx.declareAddedFunctions() }


doCodeGen requires that the single child physical operator supports Java code generation.

doCodeGen cleans up the generated code (using CodeFormatter to stripExtraNewLines, stripOverlappingComments).

doCodeGen prints out the following DEBUG message to the logs:

DEBUG WholeStageCodegenExec:
[cleanedSource]

In the end, doCodeGen returns the CodegenContext and the Java source code (as a CodeAndComment).

doCodeGen is used when:

  • WholeStageCodegenExec is <>

  • Debugging Query Execution is requested to <>

Generating Java Source Code for Consume Path

doConsume(
  ctx: CodegenContext,
  input: Seq[ExprCode],
  row: ExprCode): String

doConsume is part of the CodegenSupport abstraction.

Danger

Review Me

doConsume generates a Java source code that:

  1. Takes (from the input row) the code to evaluate a Catalyst expression on an input InternalRow
  2. Takes (from the input row) the term for a value of the result of the evaluation a. Adds .copy() to the term if <> is turned on
  3. Wraps the term inside append() code block

Generating Class Name

generatedClassName(): String

generatedClassName gives a class name per spark.sql.codegen.useIdInClassName configuration property:

  • GeneratedIteratorForCodegenStage with the <> when enabled (true)

  • GeneratedIterator when disabled (false)

generatedClassName is used when WholeStageCodegenExec unary physical operator is requested to generate the Java source code for the child physical plan subtree.

=== [[isTooManyFields]] isTooManyFields Object Method

[source, scala]

isTooManyFields(conf: SQLConf, dataType: DataType): Boolean

isTooManyFields...FIXME

NOTE: isTooManyFields is used when...FIXME

Performance Metrics

Key Name (in web UI) Description
pipelineTime (empty) Time of how long the whole-stage codegend pipeline has been running (i.e. the elapsed time since the underlying BufferedRowIterator had been created and the internal rows were all consumed).

WholeStageCodegenExec in web UI (Details for Query)

Demo

import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext
val ctx = new CodegenContext()

import org.apache.spark.sql.catalyst.expressions.codegen.ExprCode
val exprCode = ExprCode(code = "my_code", isNull = "false", value = "my_value")

// wsce defined above, i.e at the top of the page
val consumeCode = wsce.doConsume(ctx, input = Seq(), row = exprCode)
scala> println(consumeCode)
my_code
append(my_value);
Back to top