Skip to content

InMemoryTableScanExec Leaf Physical Operator

InMemoryTableScanExec is a leaf physical operator that represents an InMemoryRelation logical operator at execution time.

InMemoryTableScanExec is <> exclusively when InMemoryScans execution planning strategy is executed and finds an InMemoryRelation.md[InMemoryRelation] logical operator in a logical query plan.

[[creating-instance]] InMemoryTableScanExec takes the following to be created:

InMemoryTableScanExec is a ColumnarBatchScan that <>.

InMemoryTableScanExec supports <> (only when spark.sql.inMemoryColumnarStorage.partitionPruning internal configuration property is enabled).

// Sample DataFrames
val tokens = Seq(
  (0, "playing"),
  (1, "with"),
  (2, "InMemoryTableScanExec")
).toDF("id", "token")
val ids = spark.range(10)

// Cache DataFrames
tokens.cache
ids.cache

val q = tokens.join(ids, Seq("id"), "outer")
scala> q.explain
== Physical Plan ==
*Project [coalesce(cast(id#5 as bigint), id#10L) AS id#33L, token#6]
+- SortMergeJoin [cast(id#5 as bigint)], [id#10L], FullOuter
   :- *Sort [cast(id#5 as bigint) ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(cast(id#5 as bigint), 200)
   :     +- InMemoryTableScan [id#5, token#6]
   :           +- InMemoryRelation [id#5, token#6], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
   :                 +- LocalTableScan [id#5, token#6]
   +- *Sort [id#10L ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(id#10L, 200)
         +- InMemoryTableScan [id#10L]
               +- InMemoryRelation [id#10L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
                     +- *Range (0, 10, step=1, splits=8)
val q = spark.range(4).cache
val plan = q.queryExecution.executedPlan
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
val inmemoryScan = plan.collectFirst { case exec: InMemoryTableScanExec => exec }.get
assert(inmemoryScan.supportCodegen == inmemoryScan.supportsBatch)

[[supportCodegen]] InMemoryTableScanExec supports Java code generation only if <> is enabled.

[[inputRDDs]] InMemoryTableScanExec gives the single <> as the only RDD of internal rows (when WholeStageCodegenExec physical operator is WholeStageCodegenExec.md#doExecute[executed]).

[[enableAccumulatorsForTest]] [[spark.sql.inMemoryTableScanStatistics.enable]] InMemoryTableScanExec uses spark.sql.inMemoryTableScanStatistics.enable flag (default: false) to enable accumulators (that seems to be exclusively for testing purposes).

[[internal-registries]] .InMemoryTableScanExec's Internal Properties (e.g. Registries, Counters and Flags) [cols="1,2",options="header",width="100%"] |=== | Name | Description

| [[columnarBatchSchema]] columnarBatchSchema | Schema of a columnar batch

Used exclusively when InMemoryTableScanExec is requested to <>.

| [[stats]] stats | PartitionStatistics of the <>

Used when InMemoryTableScanExec is requested for <>, <> and <>. |===

=== [[vectorTypes]] vectorTypes Method

[source, scala]

vectorTypes: Option[Seq[String]]

NOTE: vectorTypes is part of spark-sql-ColumnarBatchScan.md#vectorTypes[ColumnarBatchScan Contract] to...FIXME.

vectorTypes uses spark.sql.columnVector.offheap.enabled internal configuration property to select the name of the concrete column vector (OnHeapColumnVector or OffHeapColumnVector).

vectorTypes gives as many column vectors as the attribute expressions.

supportsBatch Flag

supportsBatch: Boolean

supportsBatch is part of the ColumnarBatchScan abstraction.

supportsBatch is enabled when all of the following holds:

  1. spark.sql.inMemoryColumnarStorage.enableVectorizedReader configuration property is enabled

  2. The output schema of the InMemoryRelation uses primitive data types only BooleanType, ByteType, ShortType, IntegerType, LongType, FloatType, DoubleType

  3. The number of nested fields in the output schema of the InMemoryRelation is at most spark.sql.codegen.maxFields internal configuration property

=== [[partitionFilters]] partitionFilters Property

[source, scala]

partitionFilters: Seq[Expression]

NOTE: partitionFilters is a Scala lazy value which is computed once when accessed and cached afterwards.

partitionFilters...FIXME

NOTE: partitionFilters is used when...FIXME

=== [[filteredCachedBatches]] Applying Partition Batch Pruning to Cached Column Buffers (Creating MapPartitionsRDD of Filtered CachedBatches) -- filteredCachedBatches Internal Method

[source, scala]

filteredCachedBatches(): RDD[CachedBatch]

filteredCachedBatches requests <> for the output schema and <> for cached column buffers (as a RDD[CachedBatch]).

filteredCachedBatches takes the cached column buffers (as a RDD[CachedBatch]) and transforms the RDD per partition with index (i.e. RDD.mapPartitionsWithIndexInternal) as follows:

  1. Creates a partition filter as a new GenPredicate for the <> expressions (concatenated together using And binary operator and the schema)

  2. Requests the generated partition filter Predicate to initialize

  3. Uses spark.sql.inMemoryColumnarStorage.partitionPruning internal configuration property to enable partition batch pruning and filtering out (skipping) CachedBatches in a partition based on column stats and the generated partition filter Predicate

Note

When spark.sql.inMemoryColumnarStorage.partitionPruning internal configuration property is disabled, filteredCachedBatches does nothing and simply passes all CachedBatch elements along.

filteredCachedBatches is used when InMemoryTableScanExec is requested for the inputRDD internal property.

=== [[statsFor]] statsFor Internal Method

[source, scala]

statsFor(a: Attribute)

statsFor...FIXME

NOTE: statsFor is used when...FIXME

=== [[createAndDecompressColumn]] createAndDecompressColumn Internal Method

createAndDecompressColumn(
   cachedColumnarBatch: CachedBatch): ColumnarBatch

createAndDecompressColumn takes the number of rows in the input CachedBatch.

createAndDecompressColumn requests OffHeapColumnVector or OnHeapColumnVector to allocate column vectors (with the number of rows and columnarBatchSchema) per the spark.sql.columnVector.offheap.enabled internal configuration flag.

createAndDecompressColumn creates a ColumnarBatch for the allocated column vectors (as an array of ColumnVector).

createAndDecompressColumn sets the number of rows in the columnar batch.

For every <> createAndDecompressColumn requests ColumnAccessor to decompress the column.

createAndDecompressColumn registers a callback to be executed on a task completion that will close the ColumnarBatch.

In the end, createAndDecompressColumn returns the ColumnarBatch.

NOTE: createAndDecompressColumn is used exclusively when InMemoryTableScanExec is requested for the <>.

=== [[inputRDD]] Creating Input RDD of Internal Rows -- inputRDD Internal Property

[source, scala]

inputRDD: RDD[InternalRow]

NOTE: inputRDD is a Scala lazy value which is computed once when accessed and cached afterwards.

inputRDD firstly <> (and creates a filtered cached batches as a RDD[CachedBatch]).

With <> flag on, inputRDD finishes with a new MapPartitionsRDD (using RDD.map) by <> on all cached columnar batches.

CAUTION: Show examples of <> enabled and disabled

[source, scala]

// Demo: A MapPartitionsRDD in the RDD lineage val q = spark.range(4).cache val plan = q.queryExecution.executedPlan import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec val inmemoryScan = plan.collectFirst { case exec: InMemoryTableScanExec => exec }.get

// supportsBatch flag is on since the schema is a single column of longs assert(inmemoryScan.supportsBatch)

val rdd = inmemoryScan.inputRDDs.head scala> rdd.toDebugString res2: String = (8) MapPartitionsRDD[5] at inputRDDs at :27 [] | MapPartitionsRDD[4] at inputRDDs at :27 [] | *(1) Range (0, 4, step=1, splits=8) MapPartitionsRDD[3] at cache at :23 [] | MapPartitionsRDD[2] at cache at :23 [] | MapPartitionsRDD[1] at cache at :23 [] | ParallelCollectionRDD[0] at cache at :23 []


With <> flag off, inputRDD firstly <> (and creates a filtered cached batches as a RDD[CachedBatch]).

NOTE: Indeed. inputRDD <> (and creates a filtered cached batches as a RDD[CachedBatch]) twice which seems unnecessary.

In the end, inputRDD creates a new MapPartitionsRDD (using RDD.map) with a ColumnarIterator applied to all cached columnar batches that is created as follows:

. For every CachedBatch in the partition iterator adds the total number of rows in the batch to <> SQL metric

. Requests GenerateColumnAccessor to generate the Java code for a ColumnarIterator to perform expression evaluation for the given <>.

. Requests ColumnarIterator to initialize

[source, scala]

// Demo: A MapPartitionsRDD in the RDD lineage (supportsBatch flag off) import java.sql.Date import java.time.LocalDate val q = Seq(Date.valueOf(LocalDate.now)).toDF("date").cache val plan = q.queryExecution.executedPlan

import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec val inmemoryScan = plan.collectFirst { case exec: InMemoryTableScanExec => exec }.get

// supportsBatch flag is off since the schema uses java.sql.Date assert(inmemoryScan.supportsBatch == false)

val rdd = inmemoryScan.inputRDDs.head scala> rdd.toDebugString res2: String = (1) MapPartitionsRDD[12] at inputRDDs at :28 [] | MapPartitionsRDD[11] at inputRDDs at :28 [] | LocalTableScan [date#15] MapPartitionsRDD[9] at cache at :25 [] | MapPartitionsRDD[8] at cache at :25 [] | ParallelCollectionRDD[7] at cache at :25 []


NOTE: inputRDD is used when InMemoryTableScanExec is requested for the <> and to <>.

=== [[doExecute]] Executing Physical Operator (Generating RDD[InternalRow]) -- doExecute Method

[source, scala]

doExecute(): RDD[InternalRow]

doExecute branches off per <> flag.

With <> flag on, doExecute creates a WholeStageCodegenExec.md#creating-instance[WholeStageCodegenExec] (with the InMemoryTableScanExec physical operator as the WholeStageCodegenExec.md#child[child] and WholeStageCodegenExec.md#codegenStageId[codegenStageId] as 0) and requests it to SparkPlan.md#execute[execute].

Otherwise, when <> flag is off, doExecute simply gives the <>.

doExecute is part of the SparkPlan abstraction.

=== [[buildFilter]] buildFilter Property

[source, scala]

buildFilter: PartialFunction[Expression, Expression]

NOTE: buildFilter is a Scala lazy value which is computed once when accessed and cached afterwards.

buildFilter is a Scala https://www.scala-lang.org/api/2.11.11/#scala.PartialFunction[PartialFunction] that accepts an expressions/Expression.md[Expression] and produces an expressions/Expression.md[Expression], i.e. PartialFunction[Expression, Expression].

[[buildFilter-expressions]] .buildFilter's Expressions [cols="1,2",options="header",width="100%"] |=== | Input Expression | Description

And
Or
EqualTo
EqualNullSafe
LessThan
LessThanOrEqual
GreaterThan
GreaterThanOrEqual
IsNull
IsNotNull

| In with a non-empty spark-sql-Expression-In.md#list[list] of spark-sql-Expression-Literal.md[Literal] expressions | For every Literal expression in the expression list, buildFilter creates an And expression with the lower and upper bounds of the <> and the Literal.

In the end, buildFilter joins the And expressions with Or expressions. |===

NOTE: buildFilter is used exclusively when InMemoryTableScanExec is requested for <>.

=== [[innerChildren]] innerChildren Method

[source, scala]

innerChildren: Seq[QueryPlan[_]]

NOTE: innerChildren is part of catalyst/QueryPlan.md#innerChildren[QueryPlan Contract] to...FIXME.

innerChildren...FIXME

Performance Metrics

Key Name (in web UI) Description
numOutputRows number of output rows Number of output rows

InMemoryTableScanExec in web UI (Details for Query)

Back to top