ParquetFileFormat¶
[[shortName]] ParquetFileFormat
is the FileFormat for parquet data source (i.e. registers itself to handle files in parquet format and converts them to Spark SQL rows).
NOTE: parquet
is the default data source format in Spark SQL.
Note
Apache Parquet is a columnar storage format for the Apache Hadoop ecosystem with support for efficient storage and encoding of data.
// All the following queries are equivalent
// schema has to be specified manually
import org.apache.spark.sql.types.StructType
val schema = StructType($"id".int :: Nil)
spark.read.schema(schema).format("parquet").load("parquet-datasets")
// The above is equivalent to the following shortcut
// Implicitly does format("parquet").load
spark.read.schema(schema).parquet("parquet-datasets")
// parquet is the default data source format
spark.read.schema(schema).load("parquet-datasets")
[[isSplitable]] ParquetFileFormat
is splitable, i.e. FIXME
[[supportBatch]] ParquetFileFormat
supports vectorized parquet decoding in whole-stage code generation when all of the following hold:
-
spark.sql.parquet.enableVectorizedReader configuration property is enabled
-
spark.sql.codegen.wholeStage internal configuration property is enabled
-
The number of fields in the schema is at most spark.sql.codegen.maxFields internal configuration property
-
All the fields in the output schema are of AtomicType
ParquetFileFormat
supports filter predicate push-down optimization (via <
[[ParquetFilters]] .Spark Data Source Filters to Parquet Filter Predicates Conversions (aka ParquetFilters.createFilter
) [cols="1m,2",options="header",width="100%"] |=== | Data Source Filter | Parquet FilterPredicate
| IsNull | [[IsNull]] FilterApi.eq
| IsNotNull | [[IsNotNull]] FilterApi.notEq
| EqualTo | [[EqualTo]] FilterApi.eq
| Not EqualTo | [[NotEqualTo]] FilterApi.notEq
| EqualNullSafe | [[EqualNullSafe]] FilterApi.eq
| Not EqualNullSafe | [[NotEqualNullSafe]] FilterApi.notEq
| LessThan | [[LessThan]] FilterApi.lt
| LessThanOrEqual | [[LessThanOrEqual]] FilterApi.ltEq
| GreaterThan | [[GreaterThan]] FilterApi.gt
| GreaterThanOrEqual | [[GreaterThanOrEqual]] FilterApi.gtEq
| And | [[And]] FilterApi.and
| Or | [[Or]] FilterApi.or
| No | [[Not]] FilterApi.not
|===
[[logging]] [TIP] ==== Enable ALL
logging level for org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
logger to see what happens inside.
Add the following line to conf/log4j.properties
:
log4j.logger.org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat=ALL
Refer to <>.¶
=== [[prepareWrite]] Preparing Write Job -- prepareWrite
Method
[source, scala]¶
prepareWrite( sparkSession: SparkSession, job: Job, options: Map[String, String], dataSchema: StructType): OutputWriterFactory
prepareWrite
...FIXME
prepareWrite
is part of the FileFormat abstraction.
=== [[inferSchema]] inferSchema
Method
[source, scala]¶
inferSchema( sparkSession: SparkSession, parameters: Map[String, String], files: Seq[FileStatus]): Option[StructType]
inferSchema
...FIXME
inferSchema
is part of the FileFormat abstraction.
=== [[vectorTypes]] vectorTypes
Method
[source, scala]¶
vectorTypes( requiredSchema: StructType, partitionSchema: StructType, sqlConf: SQLConf): Option[Seq[String]]
vectorTypes
creates a collection of the names of OffHeapColumnVector or OnHeapColumnVector when spark.sql.columnVector.offheap.enabled property is enabled or disabled, respectively.
The size of the collection are all the fields of the given requiredSchema
and partitionSchema
schemas.
vectorTypes
is part of the FileFormat abstraction.
=== [[buildReaderWithPartitionValues]] Building Data Reader With Partition Column Values Appended -- buildReaderWithPartitionValues
Method
[source, scala]¶
buildReaderWithPartitionValues( sparkSession: SparkSession, dataSchema: StructType, partitionSchema: StructType, requiredSchema: StructType, filters: Seq[Filter], options: Map[String, String], hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow]
buildReaderWithPartitionValues
is part of the FileFormat abstraction.
buildReaderWithPartitionValues
sets the <hadoopConf
.
[[options]] .Hadoop Configuration Options [cols="1m,3",options="header",width="100%"] |=== | Name | Value
| parquet.read.support.class | [[parquet.read.support.class]] ParquetReadSupport
| org.apache.spark.sql.parquet.row.requested_schema | [[org.apache.spark.sql.parquet.row.requested_schema]] JSON representation of requiredSchema
| org.apache.spark.sql.parquet.row.attributes | [[org.apache.spark.sql.parquet.row.attributes]] JSON representation of requiredSchema
| spark.sql.session.timeZone | [[spark.sql.session.timeZone]] spark.sql.session.timeZone
| spark.sql.parquet.binaryAsString | [[spark.sql.parquet.binaryAsString]] spark.sql.parquet.binaryAsString
| spark.sql.parquet.int96AsTimestamp | [[spark.sql.parquet.int96AsTimestamp]] spark.sql.parquet.int96AsTimestamp
|===
buildReaderWithPartitionValues
requests ParquetWriteSupport
to setSchema
.
buildReaderWithPartitionValues
tries to push filters down to create a Parquet FilterPredicate
(aka pushed
).
Note
Filter predicate push-down optimization for parquet data sources uses spark.sql.parquet.filterPushdown configuration property (default: enabled).
With spark.sql.parquet.filterPushdown configuration property enabled, buildReaderWithPartitionValues
takes the input Spark data source filters
and converts them to Parquet filter predicates if possible (as described in the <
Note
buildReaderWithPartitionValues
creates filter predicates for the following types: BooleanType, IntegerType, (LongType, FloatType, DoubleType, StringType, BinaryType.
buildReaderWithPartitionValues
broadcasts the input hadoopConf
Hadoop Configuration
.
In the end, buildReaderWithPartitionValues
gives a function that takes a PartitionedFile and does the following:
. Creates a Hadoop FileSplit
for the input PartitionedFile
. Creates a Parquet ParquetInputSplit
for the Hadoop FileSplit
created
. Gets the broadcast Hadoop Configuration
. Creates a flag that says whether to apply timezone conversions to int96 timestamps or not (aka convertTz
)
. Creates a Hadoop TaskAttemptContextImpl
(with the broadcast Hadoop Configuration
and a Hadoop TaskAttemptID
for a map task)
. Sets the Parquet FilterPredicate
(only when spark.sql.parquet.filterPushdown configuration property is enabled and it is by default)
The function then branches off on whether Parquet vectorized reader is enabled or not.
With Parquet vectorized reader enabled, the function does the following:
-
Creates a VectorizedParquetRecordReader and a RecordReaderIterator
-
Requests
VectorizedParquetRecordReader
to initialize (with the ParquetParquetInputSplit
and the HadoopTaskAttemptContextImpl
) -
Prints out the following DEBUG message to the logs:
Appending [partitionSchema] [partitionValues]
-
Requests
VectorizedParquetRecordReader
to initBatch -
(only with <
> enabled) Requests VectorizedParquetRecordReader
to enableReturningBatches -
In the end, the function gives the RecordReaderIterator (over the
VectorizedParquetRecordReader
) as theIterator[InternalRow]
With Parquet vectorized reader disabled, the function does the following:
- FIXME (since Parquet vectorized reader is enabled by default it's of less interest currently)
=== [[mergeSchemasInParallel]] mergeSchemasInParallel
Method
[source, scala]¶
mergeSchemasInParallel( filesToTouch: Seq[FileStatus], sparkSession: SparkSession): Option[StructType]
mergeSchemasInParallel
...FIXME
NOTE: mergeSchemasInParallel
is used when...FIXME