Skip to content

FileSourceScanExec Leaf Physical Operator

FileSourceScanExec is a leaf physical operator (as a DataSourceScanExec) that represents a scan over collections of files (incl. Hive tables).

FileSourceScanExec is <> exclusively for a LogicalRelation.md[LogicalRelation] logical operator with a HadoopFsRelation when FileSourceStrategy execution planning strategy is executed.

// Create a bucketed data source table
// It is one of the most complex examples of a LogicalRelation with a HadoopFsRelation
val tableName = "bucketed_4_id"
spark
  .range(100)
  .withColumn("part", $"id" % 2)
  .write
  .partitionBy("part")
  .bucketBy(4, "id")
  .sortBy("id")
  .mode("overwrite")
  .saveAsTable(tableName)
val q = spark.table(tableName)

val sparkPlan = q.queryExecution.executedPlan
scala> :type sparkPlan
org.apache.spark.sql.execution.SparkPlan

scala> println(sparkPlan.numberedTreeString)
00 *(1) FileScan parquet default.bucketed_4_id[id#7L,part#8L] Batched: true, Format: Parquet, Location: CatalogFileIndex[file:/Users/jacek/dev/oss/spark/spark-warehouse/bucketed_4_id], PartitionCount: 2, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint>, SelectedBucketsCount: 4 out of 4

import org.apache.spark.sql.execution.FileSourceScanExec
val scan = sparkPlan.collectFirst { case exec: FileSourceScanExec => exec }.get

scala> :type scan
org.apache.spark.sql.execution.FileSourceScanExec

scala> scan.metadata.toSeq.sortBy(_._1).map { case (k, v) => s"$k -> $v" }.foreach(println)
Batched -> true
Format -> Parquet
Location -> CatalogFileIndex[file:/Users/jacek/dev/oss/spark/spark-warehouse/bucketed_4_id]
PartitionCount -> 2
PartitionFilters -> []
PushedFilters -> []
ReadSchema -> struct<id:bigint>
SelectedBucketsCount -> 4 out of 4

[[inputRDDs]] FileSourceScanExec uses the single <> as the input RDDs (in Whole-Stage Java Code Generation).

When <>, FileSourceScanExec operator creates a FileScanRDD (for <> and <>).

scala> :type scan
org.apache.spark.sql.execution.FileSourceScanExec

val rdd = scan.execute
scala> println(rdd.toDebugString)
(6) MapPartitionsRDD[7] at execute at <console>:28 []
 |  FileScanRDD[2] at execute at <console>:27 []

import org.apache.spark.sql.execution.datasources.FileScanRDD
assert(rdd.dependencies.head.rdd.isInstanceOf[FileScanRDD])

FileSourceScanExec supports bucket pruning so it only scans the bucket files required for a query.

scala> :type scan
org.apache.spark.sql.execution.FileSourceScanExec

import org.apache.spark.sql.execution.datasources.FileScanRDD
val rdd = scan.inputRDDs.head.asInstanceOf[FileScanRDD]

import org.apache.spark.sql.execution.datasources.FilePartition
val bucketFiles = for {
  FilePartition(bucketId, files) <- rdd.filePartitions
  f <- files
} yield s"Bucket $bucketId => $f"

scala> println(bucketFiles.size)
51

scala> bucketFiles.foreach(println)
Bucket 0 => path: file:///Users/jacek/dev/oss/spark/spark-warehouse/bucketed_4_id/part=0/part-00004-5301d371-01c3-47d4-bb6b-76c3c94f3699_00000.c000.snappy.parquet, range: 0-423, partition values: [0]
Bucket 0 => path: file:///Users/jacek/dev/oss/spark/spark-warehouse/bucketed_4_id/part=0/part-00001-5301d371-01c3-47d4-bb6b-76c3c94f3699_00000.c000.snappy.parquet, range: 0-423, partition values: [0]
...
Bucket 3 => path: file:///Users/jacek/dev/oss/spark/spark-warehouse/bucketed_4_id/part=1/part-00005-5301d371-01c3-47d4-bb6b-76c3c94f3699_00003.c000.snappy.parquet, range: 0-423, partition values: [1]
Bucket 3 => path: file:///Users/jacek/dev/oss/spark/spark-warehouse/bucketed_4_id/part=1/part-00000-5301d371-01c3-47d4-bb6b-76c3c94f3699_00003.c000.snappy.parquet, range: 0-431, partition values: [1]
Bucket 3 => path: file:///Users/jacek/dev/oss/spark/spark-warehouse/bucketed_4_id/part=1/part-00007-5301d371-01c3-47d4-bb6b-76c3c94f3699_00003.c000.snappy.parquet, range: 0-423, partition values: [1]

FileSourceScanExec uses a HashPartitioning or the default UnknownPartitioning as the <>.

FileSourceScanExec is a <> and <> only when the FileFormat (of the <>) supports it.

FileSourceScanExec supports <> that are printed out to the console (at <> logging level) and available as <> (e.g. in web UI or spark-sql-dataset-operators.md#explain[explain]).

Pushed Filters: [pushedDownFilters]

[[metrics]] .FileSourceScanExec's Performance Metrics [cols="1m,2,2",options="header",width="100%"] |=== | Key | Name (in web UI) | Description

| metadataTime | metadata time (ms) | [[metadataTime]]

| numFiles | number of files | [[numFiles]]

| numOutputRows | number of output rows | [[numOutputRows]]

| scanTime | scan time | [[scanTime]] |===

As a DataSourceScanExec.md[DataSourceScanExec], FileSourceScanExec uses Scan for the prefix of the DataSourceScanExec.md#nodeName[node name].

[source, scala]

val fileScanExec: FileSourceScanExec = ... // see the example earlier assert(fileScanExec.nodeName startsWith "Scan")


.FileSourceScanExec in web UI (Details for Query) image::images/spark-sql-FileSourceScanExec-webui-query-details.png[align="center"]

[[nodeNamePrefix]] FileSourceScanExec uses File for DataSourceScanExec.md#nodeNamePrefix[nodeNamePrefix] (that is used for the DataSourceScanExec.md#simpleString[simple node description] in query plans).

[source, scala]

val fileScanExec: FileSourceScanExec = ... // see the example earlier assert(fileScanExec.nodeNamePrefix == "File")

scala> println(fileScanExec.simpleString) FileScan csv [id#20,name#21,city#22] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/jacek/dev/oss/datasets/people.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct


[[internal-registries]] .FileSourceScanExec's Internal Properties (e.g. Registries, Counters and Flags) [cols="1m,2",options="header",width="100%"] |=== | Name | Description

| metadata a| [[metadata]]

[source, scala]

metadata: Map[String, String]

Metadata

NOTE: metadata is part of DataSourceScanExec.md#metadata[DataSourceScanExec] contract.

| pushedDownFilters a| [[pushedDownFilters]] Data source filters that are <> expressions converted to their respective filters

[TIP]

Enable <> logging level to see <> printed out to the console.

Pushed Filters: [pushedDownFilters]

Used when FileSourceScanExec is requested for the <> and <> |===

[[logging]] [TIP] ==== Enable ALL logging level for org.apache.spark.sql.execution.FileSourceScanExec logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.sql.execution.FileSourceScanExec=ALL

Refer to spark-logging.md[Logging].

Creating Instance

FileSourceScanExec takes the following when created:

  • [[relation]] HadoopFsRelation
  • [[output]] Output schema attributes
  • [[requiredSchema]] Schema
  • [[partitionFilters]] partitionFilters expressions
  • [[optionalBucketSet]] Bucket IDs for bucket pruning (Option[BitSet])
  • [[dataFilters]] dataFilters expressions
  • [[tableIdentifier]] Optional TableIdentifier

=== [[createNonBucketedReadRDD]] Creating RDD for Non-Bucketed Reads -- createNonBucketedReadRDD Internal Method

[source, scala]

createNonBucketedReadRDD( readFile: (PartitionedFile) => Iterator[InternalRow], selectedPartitions: Seq[PartitionDirectory], fsRelation: HadoopFsRelation): RDD[InternalRow]


createNonBucketedReadRDD calculates the maximum size of partitions (maxSplitBytes) based on the following properties:

createNonBucketedReadRDD sums up the size of all the files (with the extra spark.sql.files.openCostInBytes) for the given selectedPartitions and divides the sum by the "default parallelism" (i.e. number of CPU cores assigned to a Spark application) that gives bytesPerCore.

The maximum size of partitions is then the minimum of spark.sql.files.maxPartitionBytes and the bigger of spark.sql.files.openCostInBytes and the bytesPerCore.

createNonBucketedReadRDD prints out the following INFO message to the logs:

Planning scan with bin packing, max size: [maxSplitBytes] bytes, open cost is considered as scanning [openCostInBytes] bytes.

For every file (as Hadoop's FileStatus) in every partition (as PartitionDirectory in the given selectedPartitions), createNonBucketedReadRDD <> to create PartitionedFiles (possibly split per the maximum size of partitions if the FileFormat of the HadoopFsRelation is splittable). The partitioned files are then sorted by number of bytes to read (aka split size) in decreasing order (from the largest to the smallest).

createNonBucketedReadRDD "compresses" multiple splits per partition if together they are smaller than the maxSplitBytes ("Next Fit Decreasing") that gives the necessary partitions (file blocks as FilePartitions).

In the end, createNonBucketedReadRDD creates a FileScanRDD (with the given (PartitionedFile) => Iterator[InternalRow] read function and the partitions).

createNonBucketedReadRDD is used when FileSourceScanExec physical operator is requested for the input RDD (and neither the optional bucketing specification of the HadoopFsRelation is defined nor bucketing is enabled).

Input RDD

inputRDD: RDD[InternalRow]

lazy value

inputRDD is a Scala lazy value which is computed once when accessed and cached afterwards.

inputRDD is an input RDD that is used when FileSourceScanExec physical operator is requested for inputRDDs and to execute.

When created, inputRDD requests HadoopFsRelation to get the underlying FileFormat that is in turn requested to build a data reader with partition column values appended (with the input parameters from the properties of HadoopFsRelation and pushedDownFilters).

In case the HadoopFsRelation has bucketing specification specified and bucketing support is enabled, inputRDD creates a FileScanRDD with bucketing (with the bucketing specification, the reader, selectedPartitions and the HadoopFsRelation itself). Otherwise, inputRDD createNonBucketedReadRDD.

Dynamically Selected Partitions

dynamicallySelectedPartitions: Array[PartitionDirectory]

lazy value

dynamicallySelectedPartitions is a Scala lazy value which is computed once when accessed and cached afterwards.

dynamicallySelectedPartitions...FIXME

dynamicallySelectedPartitions is used when FileSourceScanExec is requested for inputRDD.

Selected Partitions

selectedPartitions: Seq[PartitionDirectory]

lazy value

selectedPartitions is a Scala lazy value which is computed once when accessed and cached afterwards.

selectedPartitions...FIXME

=== [[outputPartitioning]] Output Partitioning Scheme -- outputPartitioning Attribute

[source, scala]

outputPartitioning: Partitioning

outputPartitioning is part of the SparkPlan abstraction.

outputPartitioning can be one of the following:

=== [[createBucketedReadRDD]] Creating FileScanRDD with Bucketing Support -- createBucketedReadRDD Internal Method

[source, scala]

createBucketedReadRDD( bucketSpec: BucketSpec, readFile: (PartitionedFile) => Iterator[InternalRow], selectedPartitions: Seq[PartitionDirectory], fsRelation: HadoopFsRelation): RDD[InternalRow]


createBucketedReadRDD prints the following INFO message to the logs:

Planning with [numBuckets] buckets

createBucketedReadRDD maps the available files of the input selectedPartitions into PartitionedFiles. For every file, createBucketedReadRDD <> and <>.

createBucketedReadRDD then groups the PartitionedFiles by bucket ID.

NOTE: Bucket ID is of the format _0000n, i.e. the bucket ID prefixed with up to four 0s.

createBucketedReadRDD prunes (filters out) the bucket files for the bucket IDs that are not listed in the <>.

createBucketedReadRDD creates a FilePartition (file block) for every bucket ID and the (pruned) bucket PartitionedFiles.

In the end, createBucketedReadRDD creates a FileScanRDD (with the input readFile for the read function and the file blocks (FilePartitions) for every bucket ID for partitions)

[TIP]

Use RDD.toDebugString to see FileScanRDD in the RDD execution plan (aka RDD lineage).

[source, scala]

// Create a bucketed table spark.range(8).write.bucketBy(4, "id").saveAsTable("b1")

scala> sql("desc extended b1").where($"col_name" like "%Bucket%").show +--------------+---------+-------+ | col_name|data_type|comment| +--------------+---------+-------+ | Num Buckets| 4| | |Bucket Columns| [id]| | +--------------+---------+-------+

val bucketedTable = spark.table("b1")

val lineage = bucketedTable.queryExecution.toRdd.toDebugString scala> println(lineage) (4) MapPartitionsRDD[26] at toRdd at :26 [] | FileScanRDD[25] at toRdd at :26 []


====

NOTE: createBucketedReadRDD is used exclusively when FileSourceScanExec physical operator is requested for the <> (and the optional bucketing specification of the HadoopFsRelation is defined and bucketing is enabled).

=== [[supportsBatch]] supportsBatch Attribute

[source, scala]

supportsBatch: Boolean

supportsBatch is enabled (true) only when the FileFormat (of the <>) supports vectorized decoding. Otherwise, supportsBatch is disabled (i.e. false).

Note

FileFormat does not support vectorized decoding by default (i.e. supportBatch flag is disabled). Only ParquetFileFormat and OrcFileFormat have support for it under certain conditions.

supportsBatch is part of the ColumnarBatchScan abstraction.

=== [[ColumnarBatchScan]] FileSourceScanExec As ColumnarBatchScan

FileSourceScanExec is a ColumnarBatchScan and <> only when the FileFormat (of the <>) supports it.

FileSourceScanExec has <> flag enabled for ParquetFileFormat data sources exclusively.

FileSourceScanExec has <>...FIXME

==== [[needsUnsafeRowConversion]] needsUnsafeRowConversion Flag

[source, scala]

needsUnsafeRowConversion: Boolean

needsUnsafeRowConversion is enabled (i.e. true) when the following conditions all hold:

  1. FileFormat of the HadoopFsRelation is ParquetFileFormat

  2. spark.sql.parquet.enableVectorizedReader configuration property is enabled

Otherwise, needsUnsafeRowConversion is disabled (i.e. false).

NOTE: needsUnsafeRowConversion is used when FileSourceScanExec is <> (and <> flag is off).

needsUnsafeRowConversion is part of the ColumnarBatchScan abstraction.

==== [[vectorTypes]] Fully-Qualified Class Names (Types) of Concrete ColumnVectors -- vectorTypes Method

[source, scala]

vectorTypes: Option[Seq[String]]

vectorTypes simply requests the FileFormat of the HadoopFsRelation for vectorTypes.

vectorTypes is part of the ColumnarBatchScan abstraction.

=== [[doExecute]] Executing Physical Operator (Generating RDD[InternalRow]) -- doExecute Method

[source, scala]

doExecute(): RDD[InternalRow]

doExecute is part of the SparkPlan abstraction.

doExecute branches off per <> flag.

Note

supportsBatch flag can be enabled for ParquetFileFormat and OrcFileFormat built-in file formats (under certain conditions).

With <> flag enabled, doExecute creates a <> physical operator (with the FileSourceScanExec for the <> and WholeStageCodegenExec.md#codegenStageId[codegenStageId] as 0) and SparkPlan.md#execute[executes] it right after.

With <> flag disabled, doExecute creates an unsafeRows RDD to scan over which is different per <> flag.

If <> flag is on, doExecute takes the <> and creates a new RDD by applying a function to each partition (using RDD.mapPartitionsWithIndexInternal):

  1. Creates a UnsafeProjection for the schema

  2. Initializes the UnsafeProjection

  3. Maps over the rows in a partition iterator using the UnsafeProjection projection

Otherwise, doExecute simply takes the <> as the unsafeRows RDD (with no changes).

doExecute takes the numOutputRows metric and creates a new RDD by mapping every element in the unsafeRows and incrementing the numOutputRows metric.

[TIP]

Use RDD.toDebugString to review the RDD lineage and "reverse-engineer" the values of the <> and <> flags given the number of RDDs.

With <> off and <> on you should see two more RDDs in the RDD lineage.

=== [[outputOrdering]] Output Data Ordering -- outputOrdering Attribute

[source, scala]

outputOrdering: Seq[SortOrder]

outputOrdering is part of the SparkPlan abstraction.

outputOrdering is a SortOrder expression for every sort column in Ascending order only when all the following hold:

Otherwise, outputOrdering is simply empty (Nil).

=== [[updateDriverMetrics]] updateDriverMetrics Internal Method

[source, scala]

updateDriverMetrics(): Unit

updateDriverMetrics updates the following <>:

  • <> metric with the total of all the sizes of the files in the <>

  • <> metric with the time spent in the <>

In the end, updateDriverMetrics requests the SQLMetrics object to spark-sql-SQLMetric.md#postDriverMetricUpdates[posts the metric updates].

NOTE: updateDriverMetrics is used exclusively when FileSourceScanExec physical operator is requested for the <> (the very first time).

=== [[getBlockLocations]] getBlockLocations Internal Method

[source, scala]

getBlockLocations(file: FileStatus): Array[BlockLocation]

getBlockLocations simply requests the given Hadoop https://hadoop.apache.org/docs/r2.7.3/api/index.html?org/apache/hadoop/fs/LocatedFileStatus.html[FileStatus] for the block locations (getBlockLocations) if it is a Hadoop https://hadoop.apache.org/docs/r2.7.3/api/index.html?org/apache/hadoop/fs/LocatedFileStatus.html[LocatedFileStatus]. Otherwise, getBlockLocations returns an empty array.

NOTE: getBlockLocations is used when FileSourceScanExec physical operator is requested to <> and <>.


Last update: 2021-03-18