Skip to content


[[shortName]] ParquetFileFormat is the FileFormat for parquet data source (i.e. registers itself to handle files in parquet format and converts them to Spark SQL rows).

NOTE: parquet is the default data source format in Spark SQL.


Apache Parquet is a columnar storage format for the Apache Hadoop ecosystem with support for efficient storage and encoding of data.

// All the following queries are equivalent
// schema has to be specified manually
import org.apache.spark.sql.types.StructType
val schema = StructType($"id".int :: Nil)"parquet").load("parquet-datasets")

// The above is equivalent to the following shortcut
// Implicitly does format("parquet").load"parquet-datasets")

// parquet is the default data source format"parquet-datasets")

[[isSplitable]] ParquetFileFormat is splitable, i.e. FIXME

[[supportBatch]] ParquetFileFormat supports vectorized parquet decoding in whole-stage code generation when all of the following hold:

ParquetFileFormat supports filter predicate push-down optimization (via <>) as per the following <>.

[[ParquetFilters]] .Spark Data Source Filters to Parquet Filter Predicates Conversions (aka ParquetFilters.createFilter) [cols="1m,2",options="header",width="100%"] |=== | Data Source Filter | Parquet FilterPredicate

| IsNull | [[IsNull]] FilterApi.eq

| IsNotNull | [[IsNotNull]] FilterApi.notEq

| EqualTo | [[EqualTo]] FilterApi.eq

| Not EqualTo | [[NotEqualTo]] FilterApi.notEq

| EqualNullSafe | [[EqualNullSafe]] FilterApi.eq

| Not EqualNullSafe | [[NotEqualNullSafe]] FilterApi.notEq

| LessThan | [[LessThan]]

| LessThanOrEqual | [[LessThanOrEqual]] FilterApi.ltEq

| GreaterThan | [[GreaterThan]]

| GreaterThanOrEqual | [[GreaterThanOrEqual]] FilterApi.gtEq

| And | [[And]] FilterApi.and

| Or | [[Or]] FilterApi.or

| No | [[Not]] FilterApi.not |===

[[logging]] [TIP] ==== Enable ALL logging level for org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat logger to see what happens inside.

Add the following line to conf/

Refer to <>.

=== [[prepareWrite]] Preparing Write Job -- prepareWrite Method

[source, scala]

prepareWrite( sparkSession: SparkSession, job: Job, options: Map[String, String], dataSchema: StructType): OutputWriterFactory


prepareWrite is part of the FileFormat abstraction.

=== [[inferSchema]] inferSchema Method

[source, scala]

inferSchema( sparkSession: SparkSession, parameters: Map[String, String], files: Seq[FileStatus]): Option[StructType]


inferSchema is part of the FileFormat abstraction.

=== [[vectorTypes]] vectorTypes Method

[source, scala]

vectorTypes( requiredSchema: StructType, partitionSchema: StructType, sqlConf: SQLConf): Option[Seq[String]]

vectorTypes creates a collection of the names of OffHeapColumnVector or OnHeapColumnVector when spark.sql.columnVector.offheap.enabled property is enabled or disabled, respectively.

The size of the collection are all the fields of the given requiredSchema and partitionSchema schemas.

vectorTypes is part of the FileFormat abstraction.

=== [[buildReaderWithPartitionValues]] Building Data Reader With Partition Column Values Appended -- buildReaderWithPartitionValues Method

[source, scala]

buildReaderWithPartitionValues( sparkSession: SparkSession, dataSchema: StructType, partitionSchema: StructType, requiredSchema: StructType, filters: Seq[Filter], options: Map[String, String], hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow]

buildReaderWithPartitionValues is part of the FileFormat abstraction.

buildReaderWithPartitionValues sets the <> in the input hadoopConf.

[[options]] .Hadoop Configuration Options [cols="1m,3",options="header",width="100%"] |=== | Name | Value

| | [[]] ParquetReadSupport

| org.apache.spark.sql.parquet.row.requested_schema | [[org.apache.spark.sql.parquet.row.requested_schema]] JSON representation of requiredSchema

| org.apache.spark.sql.parquet.row.attributes | [[org.apache.spark.sql.parquet.row.attributes]] JSON representation of requiredSchema

| spark.sql.session.timeZone | [[spark.sql.session.timeZone]] spark.sql.session.timeZone

| spark.sql.parquet.binaryAsString | [[spark.sql.parquet.binaryAsString]] spark.sql.parquet.binaryAsString

| spark.sql.parquet.int96AsTimestamp | [[spark.sql.parquet.int96AsTimestamp]] spark.sql.parquet.int96AsTimestamp


buildReaderWithPartitionValues requests ParquetWriteSupport to setSchema.

buildReaderWithPartitionValues tries to push filters down to create a Parquet FilterPredicate (aka pushed).


Filter predicate push-down optimization for parquet data sources uses spark.sql.parquet.filterPushdown configuration property (default: enabled).

With spark.sql.parquet.filterPushdown configuration property enabled, buildReaderWithPartitionValues takes the input Spark data source filters and converts them to Parquet filter predicates if possible (as described in the <>). Otherwise, the Parquet filter predicate is not specified.


buildReaderWithPartitionValues creates filter predicates for the following types: BooleanType, IntegerType, (LongType, FloatType, DoubleType, StringType, BinaryType.

buildReaderWithPartitionValues broadcasts the input hadoopConf Hadoop Configuration.

In the end, buildReaderWithPartitionValues gives a function that takes a PartitionedFile and does the following:

. Creates a Hadoop FileSplit for the input PartitionedFile

. Creates a Parquet ParquetInputSplit for the Hadoop FileSplit created

. Gets the broadcast Hadoop Configuration

. Creates a flag that says whether to apply timezone conversions to int96 timestamps or not (aka convertTz)

. Creates a Hadoop TaskAttemptContextImpl (with the broadcast Hadoop Configuration and a Hadoop TaskAttemptID for a map task)

. Sets the Parquet FilterPredicate (only when spark.sql.parquet.filterPushdown configuration property is enabled and it is by default)

The function then branches off on whether Parquet vectorized reader is enabled or not.

With Parquet vectorized reader enabled, the function does the following:

  • Creates a VectorizedParquetRecordReader and a RecordReaderIterator

  • Requests VectorizedParquetRecordReader to initialize (with the Parquet ParquetInputSplit and the Hadoop TaskAttemptContextImpl)

  • Prints out the following DEBUG message to the logs:

    Appending [partitionSchema] [partitionValues]
  • Requests VectorizedParquetRecordReader to initBatch

  • (only with <> enabled) Requests VectorizedParquetRecordReader to enableReturningBatches

  • In the end, the function gives the RecordReaderIterator (over the VectorizedParquetRecordReader) as the Iterator[InternalRow]

With Parquet vectorized reader disabled, the function does the following:

  • FIXME (since Parquet vectorized reader is enabled by default it's of less interest currently)

=== [[mergeSchemasInParallel]] mergeSchemasInParallel Method

[source, scala]

mergeSchemasInParallel( filesToTouch: Seq[FileStatus], sparkSession: SparkSession): Option[StructType]


NOTE: mergeSchemasInParallel is used when...FIXME

Last update: 2020-11-16