FileSourceScanExec Leaf Physical Operator¶
FileSourceScanExec
is a leaf physical operator (as a DataSourceScanExec) that represents a scan over collections of files (incl. Hive tables).
FileSourceScanExec
is <
// Create a bucketed data source table
// It is one of the most complex examples of a LogicalRelation with a HadoopFsRelation
val tableName = "bucketed_4_id"
spark
.range(100)
.withColumn("part", $"id" % 2)
.write
.partitionBy("part")
.bucketBy(4, "id")
.sortBy("id")
.mode("overwrite")
.saveAsTable(tableName)
val q = spark.table(tableName)
val sparkPlan = q.queryExecution.executedPlan
scala> :type sparkPlan
org.apache.spark.sql.execution.SparkPlan
scala> println(sparkPlan.numberedTreeString)
00 *(1) FileScan parquet default.bucketed_4_id[id#7L,part#8L] Batched: true, Format: Parquet, Location: CatalogFileIndex[file:/Users/jacek/dev/oss/spark/spark-warehouse/bucketed_4_id], PartitionCount: 2, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint>, SelectedBucketsCount: 4 out of 4
import org.apache.spark.sql.execution.FileSourceScanExec
val scan = sparkPlan.collectFirst { case exec: FileSourceScanExec => exec }.get
scala> :type scan
org.apache.spark.sql.execution.FileSourceScanExec
scala> scan.metadata.toSeq.sortBy(_._1).map { case (k, v) => s"$k -> $v" }.foreach(println)
Batched -> true
Format -> Parquet
Location -> CatalogFileIndex[file:/Users/jacek/dev/oss/spark/spark-warehouse/bucketed_4_id]
PartitionCount -> 2
PartitionFilters -> []
PushedFilters -> []
ReadSchema -> struct<id:bigint>
SelectedBucketsCount -> 4 out of 4
[[inputRDDs]] FileSourceScanExec
uses the single <
When <FileSourceScanExec
operator creates a FileScanRDD (for <
scala> :type scan
org.apache.spark.sql.execution.FileSourceScanExec
val rdd = scan.execute
scala> println(rdd.toDebugString)
(6) MapPartitionsRDD[7] at execute at <console>:28 []
| FileScanRDD[2] at execute at <console>:27 []
import org.apache.spark.sql.execution.datasources.FileScanRDD
assert(rdd.dependencies.head.rdd.isInstanceOf[FileScanRDD])
FileSourceScanExec
supports <
scala> :type scan
org.apache.spark.sql.execution.FileSourceScanExec
import org.apache.spark.sql.execution.datasources.FileScanRDD
val rdd = scan.inputRDDs.head.asInstanceOf[FileScanRDD]
import org.apache.spark.sql.execution.datasources.FilePartition
val bucketFiles = for {
FilePartition(bucketId, files) <- rdd.filePartitions
f <- files
} yield s"Bucket $bucketId => $f"
scala> println(bucketFiles.size)
51
scala> bucketFiles.foreach(println)
Bucket 0 => path: file:///Users/jacek/dev/oss/spark/spark-warehouse/bucketed_4_id/part=0/part-00004-5301d371-01c3-47d4-bb6b-76c3c94f3699_00000.c000.snappy.parquet, range: 0-423, partition values: [0]
Bucket 0 => path: file:///Users/jacek/dev/oss/spark/spark-warehouse/bucketed_4_id/part=0/part-00001-5301d371-01c3-47d4-bb6b-76c3c94f3699_00000.c000.snappy.parquet, range: 0-423, partition values: [0]
...
Bucket 3 => path: file:///Users/jacek/dev/oss/spark/spark-warehouse/bucketed_4_id/part=1/part-00005-5301d371-01c3-47d4-bb6b-76c3c94f3699_00003.c000.snappy.parquet, range: 0-423, partition values: [1]
Bucket 3 => path: file:///Users/jacek/dev/oss/spark/spark-warehouse/bucketed_4_id/part=1/part-00000-5301d371-01c3-47d4-bb6b-76c3c94f3699_00003.c000.snappy.parquet, range: 0-431, partition values: [1]
Bucket 3 => path: file:///Users/jacek/dev/oss/spark/spark-warehouse/bucketed_4_id/part=1/part-00007-5301d371-01c3-47d4-bb6b-76c3c94f3699_00003.c000.snappy.parquet, range: 0-423, partition values: [1]
FileSourceScanExec
uses a HashPartitioning
or the default UnknownPartitioning
as the <
FileSourceScanExec
is a <
FileSourceScanExec
supports <
Pushed Filters: [pushedDownFilters]
[[metrics]] .FileSourceScanExec's Performance Metrics [cols="1m,2,2",options="header",width="100%"] |=== | Key | Name (in web UI) | Description
| metadataTime | metadata time (ms) | [[metadataTime]]
| numFiles | number of files | [[numFiles]]
| numOutputRows | number of output rows | [[numOutputRows]]
| scanTime | scan time | [[scanTime]] |===
As a DataSourceScanExec.md[DataSourceScanExec], FileSourceScanExec
uses Scan for the prefix of the DataSourceScanExec.md#nodeName[node name].
[source, scala]¶
val fileScanExec: FileSourceScanExec = ... // see the example earlier assert(fileScanExec.nodeName startsWith "Scan")
.FileSourceScanExec in web UI (Details for Query) image::images/spark-sql-FileSourceScanExec-webui-query-details.png[align="center"]
[[nodeNamePrefix]] FileSourceScanExec
uses File for DataSourceScanExec.md#nodeNamePrefix[nodeNamePrefix] (that is used for the DataSourceScanExec.md#simpleString[simple node description] in query plans).
[source, scala]¶
val fileScanExec: FileSourceScanExec = ... // see the example earlier assert(fileScanExec.nodeNamePrefix == "File")
scala> println(fileScanExec.simpleString) FileScan csv [id#20,name#21,city#22] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/jacek/dev/oss/datasets/people.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct
[[internal-registries]] .FileSourceScanExec's Internal Properties (e.g. Registries, Counters and Flags) [cols="1m,2",options="header",width="100%"] |=== | Name | Description
| metadata a| [[metadata]]
[source, scala]¶
metadata: Map[String, String]¶
Metadata
NOTE: metadata
is part of DataSourceScanExec.md#metadata[DataSourceScanExec] contract.
| pushedDownFilters a| [[pushedDownFilters]] Data source filters that are <
[TIP]¶
Enable <
Pushed Filters: [pushedDownFilters]
¶
Pushed Filters: [pushedDownFilters]
Used when FileSourceScanExec
is requested for the <
[[logging]] [TIP] ==== Enable ALL
logging level for org.apache.spark.sql.execution.FileSourceScanExec
logger to see what happens inside.
Add the following line to conf/log4j.properties
:
log4j.logger.org.apache.spark.sql.execution.FileSourceScanExec=ALL
Refer to spark-logging.md[Logging].¶
Creating Instance¶
FileSourceScanExec
takes the following when created:
- [[relation]] HadoopFsRelation
- [[output]] Output schema attributes
- [[requiredSchema]] Schema
- [[partitionFilters]]
partitionFilters
expressions - [[optionalBucketSet]] Bucket IDs for bucket pruning (
Option[BitSet]
) - [[dataFilters]]
dataFilters
expressions - [[tableIdentifier]] Optional
TableIdentifier
=== [[createNonBucketedReadRDD]] Creating RDD for Non-Bucketed Reads -- createNonBucketedReadRDD
Internal Method
[source, scala]¶
createNonBucketedReadRDD( readFile: (PartitionedFile) => Iterator[InternalRow], selectedPartitions: Seq[PartitionDirectory], fsRelation: HadoopFsRelation): RDD[InternalRow]
createNonBucketedReadRDD
calculates the maximum size of partitions (maxSplitBytes
) based on the following properties:
createNonBucketedReadRDD
sums up the size of all the files (with the extra spark.sql.files.openCostInBytes) for the given selectedPartitions
and divides the sum by the "default parallelism" (i.e. number of CPU cores assigned to a Spark application) that gives bytesPerCore
.
The maximum size of partitions is then the minimum of spark.sql.files.maxPartitionBytes and the bigger of spark.sql.files.openCostInBytes and the bytesPerCore
.
createNonBucketedReadRDD
prints out the following INFO message to the logs:
Planning scan with bin packing, max size: [maxSplitBytes] bytes, open cost is considered as scanning [openCostInBytes] bytes.
For every file (as Hadoop's FileStatus
) in every partition (as PartitionDirectory
in the given selectedPartitions
), createNonBucketedReadRDD
<
createNonBucketedReadRDD
"compresses" multiple splits per partition if together they are smaller than the maxSplitBytes
("Next Fit Decreasing") that gives the necessary partitions (file blocks as FilePartitions).
In the end, createNonBucketedReadRDD
creates a FileScanRDD (with the given (PartitionedFile) => Iterator[InternalRow]
read function and the partitions).
createNonBucketedReadRDD
is used when FileSourceScanExec
physical operator is requested for the input RDD (and neither the optional bucketing specification of the HadoopFsRelation is defined nor bucketing is enabled).
Input RDD¶
inputRDD: RDD[InternalRow]
lazy value
inputRDD
is a Scala lazy value which is computed once when accessed and cached afterwards.
inputRDD
is an input RDD
that is used when FileSourceScanExec
physical operator is requested for inputRDDs and to execute.
When created, inputRDD
requests HadoopFsRelation to get the underlying FileFormat that is in turn requested to build a data reader with partition column values appended (with the input parameters from the properties of HadoopFsRelation
and pushedDownFilters).
In case the HadoopFsRelation
has bucketing specification specified and bucketing support is enabled, inputRDD
creates a FileScanRDD with bucketing (with the bucketing specification, the reader, selectedPartitions and the HadoopFsRelation
itself). Otherwise, inputRDD
createNonBucketedReadRDD.
Dynamically Selected Partitions¶
dynamicallySelectedPartitions: Array[PartitionDirectory]
lazy value
dynamicallySelectedPartitions
is a Scala lazy value which is computed once when accessed and cached afterwards.
dynamicallySelectedPartitions
...FIXME
dynamicallySelectedPartitions
is used when FileSourceScanExec
is requested for inputRDD.
Selected Partitions¶
selectedPartitions: Seq[PartitionDirectory]
lazy value
selectedPartitions
is a Scala lazy value which is computed once when accessed and cached afterwards.
selectedPartitions
...FIXME
=== [[outputPartitioning]] Output Partitioning Scheme -- outputPartitioning
Attribute
[source, scala]¶
outputPartitioning: Partitioning¶
NOTE: outputPartitioning
is part of the <
outputPartitioning
can be one of the following:
-
HashPartitioning (with the <
> and the < > of the bucketing specification of the < >) when bucketing is enabled and the < > has a bucketing specification defined -
UnknownPartitioning (with
0
partitions) otherwise
=== [[createBucketedReadRDD]] Creating FileScanRDD with Bucketing Support -- createBucketedReadRDD
Internal Method
[source, scala]¶
createBucketedReadRDD( bucketSpec: BucketSpec, readFile: (PartitionedFile) => Iterator[InternalRow], selectedPartitions: Seq[PartitionDirectory], fsRelation: HadoopFsRelation): RDD[InternalRow]
createBucketedReadRDD
prints the following INFO message to the logs:
Planning with [numBuckets] buckets
createBucketedReadRDD
maps the available files of the input selectedPartitions
into PartitionedFiles. For every file, createBucketedReadRDD
<
createBucketedReadRDD
then groups the PartitionedFiles
by bucket ID.
NOTE: Bucket ID is of the format _0000n, i.e. the bucket ID prefixed with up to four 0
s.
createBucketedReadRDD
prunes (filters out) the bucket files for the bucket IDs that are not listed in the <
createBucketedReadRDD
creates a FilePartition (file block) for every bucket ID and the (pruned) bucket PartitionedFiles
.
In the end, createBucketedReadRDD
creates a FileScanRDD (with the input readFile
for the read function and the file blocks (FilePartitions
) for every bucket ID for partitions)
[TIP]¶
Use RDD.toDebugString
to see FileScanRDD
in the RDD execution plan (aka RDD lineage).
[source, scala]¶
// Create a bucketed table spark.range(8).write.bucketBy(4, "id").saveAsTable("b1")
scala> sql("desc extended b1").where($"col_name" like "%Bucket%").show +--------------+---------+-------+ | col_name|data_type|comment| +--------------+---------+-------+ | Num Buckets| 4| | |Bucket Columns| [id
]| | +--------------+---------+-------+
val bucketedTable = spark.table("b1")
val lineage = bucketedTable.queryExecution.toRdd.toDebugString scala> println(lineage) (4) MapPartitionsRDD[26] at toRdd at
====
NOTE: createBucketedReadRDD
is used exclusively when FileSourceScanExec
physical operator is requested for the <
=== [[supportsBatch]] supportsBatch
Attribute
[source, scala]¶
supportsBatch: Boolean¶
supportsBatch
is enabled (true
) only when the FileFormat (of the <supportsBatch
is disabled (i.e. false
).
Note
FileFormat does not support vectorized decoding by default (i.e. supportBatch flag is disabled). Only ParquetFileFormat and OrcFileFormat have support for it under certain conditions.
supportsBatch
is part of the ColumnarBatchScan abstraction.
=== [[ColumnarBatchScan]] FileSourceScanExec As ColumnarBatchScan
FileSourceScanExec
is a ColumnarBatchScan and <
FileSourceScanExec
has <ParquetFileFormat
data sources exclusively.
FileSourceScanExec
has <
==== [[needsUnsafeRowConversion]] needsUnsafeRowConversion
Flag
[source, scala]¶
needsUnsafeRowConversion: Boolean¶
needsUnsafeRowConversion
is enabled (i.e. true
) when the following conditions all hold:
-
FileFormat of the HadoopFsRelation is ParquetFileFormat
-
spark.sql.parquet.enableVectorizedReader configuration property is enabled
Otherwise, needsUnsafeRowConversion
is disabled (i.e. false
).
NOTE: needsUnsafeRowConversion
is used when FileSourceScanExec
is <
needsUnsafeRowConversion
is part of the ColumnarBatchScan abstraction.
==== [[vectorTypes]] Fully-Qualified Class Names (Types) of Concrete ColumnVectors -- vectorTypes
Method
[source, scala]¶
vectorTypes: Option[Seq[String]]¶
vectorTypes
simply requests the FileFormat of the HadoopFsRelation for vectorTypes.
vectorTypes
is part of the ColumnarBatchScan abstraction.
=== [[doExecute]] Executing Physical Operator (Generating RDD[InternalRow]) -- doExecute
Method
[source, scala]¶
doExecute(): RDD[InternalRow]¶
doExecute
is part of the SparkPlan abstraction.
doExecute
branches off per <
Note
supportsBatch flag can be enabled for ParquetFileFormat and OrcFileFormat built-in file formats (under certain conditions).
With <doExecute
creates a <FileSourceScanExec
for the <0
) and SparkPlan.md#execute[executes] it right after.
With <doExecute
creates an unsafeRows
RDD to scan over which is different per <
If <doExecute
takes the <RDD.mapPartitionsWithIndexInternal
):
-
Creates a UnsafeProjection for the schema
-
Initializes the UnsafeProjection
-
Maps over the rows in a partition iterator using the
UnsafeProjection
projection
Otherwise, doExecute
simply takes the <unsafeRows
RDD (with no changes).
doExecute
takes the numOutputRows metric and creates a new RDD by mapping every element in the unsafeRows
and incrementing the numOutputRows
metric.
[TIP]¶
Use RDD.toDebugString
to review the RDD lineage and "reverse-engineer" the values of the <
With <> off and <> on you should see two more RDDs in the RDD lineage.¶
=== [[outputOrdering]] Output Data Ordering -- outputOrdering
Attribute
[source, scala]¶
outputOrdering: Seq[SortOrder]¶
NOTE: outputOrdering
is part of the <
outputOrdering
is a SortOrder
expression for every <Ascending
order only when all the following hold:
-
<
> has a bucketing specification defined -
All the buckets have a single file in it
Otherwise, outputOrdering
is simply empty (Nil
).
=== [[updateDriverMetrics]] updateDriverMetrics
Internal Method
[source, scala]¶
updateDriverMetrics(): Unit¶
updateDriverMetrics
updates the following <
-
<
> metric with the total of all the sizes of the files in the < > -
<
> metric with the time spent in the < >
In the end, updateDriverMetrics
requests the SQLMetrics
object to spark-sql-SQLMetric.md#postDriverMetricUpdates[posts the metric updates].
NOTE: updateDriverMetrics
is used exclusively when FileSourceScanExec
physical operator is requested for the <
=== [[getBlockLocations]] getBlockLocations
Internal Method
[source, scala]¶
getBlockLocations(file: FileStatus): Array[BlockLocation]¶
getBlockLocations
simply requests the given Hadoop https://hadoop.apache.org/docs/r2.7.3/api/index.html?org/apache/hadoop/fs/LocatedFileStatus.html[FileStatus] for the block locations (getBlockLocations
) if it is a Hadoop https://hadoop.apache.org/docs/r2.7.3/api/index.html?org/apache/hadoop/fs/LocatedFileStatus.html[LocatedFileStatus]. Otherwise, getBlockLocations
returns an empty array.
NOTE: getBlockLocations
is used when FileSourceScanExec
physical operator is requested to <