Skip to content

FileScanRDD — Input RDD of FileSourceScanExec Physical Operator

FileScanRDD is the input RDD of FileSourceScanExec leaf physical operator (for Whole-Stage Java Code Generation).

The Internals of Apache Spark

Find out more on RDD abstraction in The Internals of Apache Spark.

Creating Instance

FileScanRDD takes the following to be created:

FileScanRDD is created when FileSourceScanExec physical operator is requested to createBucketedReadRDD and createNonBucketedReadRDD (when FileSourceScanExec operator is requested for the input RDD when WholeStageCodegenExec physical operator is executed).

Configuration Properties

FileScanRDD uses the following properties (when requested to compute a partition):

FilePartition

FileScanRDD is given FilePartitions when created that are custom RDD partitions with PartitionedFiles (file blocks).

Placement Preferences of Partition (Preferred Locations)

getPreferredLocations(
  split: RDDPartition): Seq[String]

getPreferredLocations assumes that the given RDDPartition is actually a FilePartition and requests it for preferredLocations.

getPreferredLocations is part of Spark Core's RDD abstraction.

RDD Partitions

getPartitions: Array[RDDPartition]

getPartitions simply returns the FilePartitions.

getPartitions is part of Spark Core's RDD abstraction.

Computing Partition

compute(
  split: RDDPartition,
  context: TaskContext): Iterator[InternalRow]

Note

The RDDPartition given is actually a FilePartition with one or more PartitionedFiles (that getPartitions returned).

compute is part of Spark Core's RDD abstraction.

Retrieving Next Element

next(): Object

next takes the next element of the current iterator over elements of a file block (PartitionedFile).

next increments the metrics of bytes and number of rows read (that could be the number of rows in a ColumnarBatch for vectorized reads).

next is part of Scala's Iterator abstraction.

Demo

val q = spark.read.text("README.md")
val sparkPlan = q.queryExecution.executedPlan
scala> println(sparkPlan.numberedTreeString)
00 FileScan text [value#0] Batched: false, DataFilters: [], Format: Text, Location: InMemoryFileIndex[file:/Users/jacek/dev/oss/spark/README.md], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<value:string>

import org.apache.spark.sql.execution.FileSourceScanExec
val scan = sparkPlan.collectFirst { case exec: FileSourceScanExec => exec }.get
val inputRDD = scan.inputRDDs.head

import org.apache.spark.sql.execution.datasources.FileScanRDD
assert(inputRDD.isInstanceOf[FileScanRDD])

val rdd = scan.execute
scala> println(rdd.toDebugString)
(1) MapPartitionsRDD[1] at execute at <console>:27 []
 |  FileScanRDD[0] at inputRDDs at <console>:26 []

val fileScanRDD = rdd.dependencies.head.rdd
assert(fileScanRDD.isInstanceOf[FileScanRDD])

Logging

Enable ALL logging level for org.apache.spark.sql.execution.datasources.FileScanRDD logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.sql.execution.datasources.FileScanRDD=ALL

Refer to Logging.


Last update: 2020-11-14
Back to top