Skip to content

ParquetFileFormat

ParquetFileFormat is the FileFormat of the parquet data source.

ParquetFileFormat is Serializable.

Short Name

shortName(): String

ParquetFileFormat is a DataSourceRegister with the short name:

parquet

Building Data Reader With Partition Values

buildReaderWithPartitionValues(
  sparkSession: SparkSession,
  dataSchema: StructType,
  partitionSchema: StructType,
  requiredSchema: StructType,
  filters: Seq[Filter],
  options: Map[String, String],
  hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow]

buildReaderWithPartitionValues is part of the FileFormat abstraction.

Fixme

Review Me

buildReaderWithPartitionValues sets the following configuration options in the input hadoopConf.

Name Value
parquet.read.support.class ParquetReadSupport
org.apache.spark.sql.parquet.row.requested_schema JSON representation of requiredSchema
org.apache.spark.sql.parquet.row.attributes JSON representation of requiredSchema
spark.sql.session.timeZone spark.sql.session.timeZone
spark.sql.parquet.binaryAsString spark.sql.parquet.binaryAsString
spark.sql.parquet.int96AsTimestamp spark.sql.parquet.int96AsTimestamp

buildReaderWithPartitionValues requests ParquetWriteSupport to setSchema.

buildReaderWithPartitionValues tries to push filters down to create a Parquet FilterPredicate (aka pushed).

With spark.sql.parquet.filterPushdown configuration property enabled, buildReaderWithPartitionValues takes the input Spark data source filters and converts them to Parquet filter predicates if possible. Otherwise, the Parquet filter predicate is not specified.

Note

buildReaderWithPartitionValues creates filter predicates for the following types: BooleanType, IntegerType, (LongType, FloatType, DoubleType, StringType, BinaryType.

buildReaderWithPartitionValues broadcasts the input hadoopConf Hadoop Configuration.

In the end, buildReaderWithPartitionValues gives a function that takes a PartitionedFile and does the following:

  1. Creates a Hadoop FileSplit for the input PartitionedFile

  2. Creates a Parquet ParquetInputSplit for the Hadoop FileSplit created

  3. Gets the broadcast Hadoop Configuration

  4. Creates a flag that says whether to apply timezone conversions to int96 timestamps or not (aka convertTz)

  5. Creates a Hadoop TaskAttemptContextImpl (with the broadcast Hadoop Configuration and a Hadoop TaskAttemptID for a map task)

  6. Sets the Parquet FilterPredicate (only when spark.sql.parquet.filterPushdown configuration property is enabled and it is by default)

The function then branches off on whether Parquet vectorized reader is enabled or not.

With Parquet vectorized reader enabled, the function does the following:

With Parquet vectorized reader disabled, the function does the following:

  • FIXME (since Parquet vectorized reader is enabled by default it's of less interest)

Schema Inference

inferSchema(
  sparkSession: SparkSession,
  parameters: Map[String, String],
  files: Seq[FileStatus]): Option[StructType]

inferSchema is part of the FileFormat abstraction.

inferSchema...FIXME

Splitable

isSplitable(
  sparkSession: SparkSession,
  options: Map[String, String],
  path: Path): Boolean

isSplitable is part of the FileFormat abstraction.

ParquetFileFormat is splitable (isSplitable is always true).

Preparing Write Job

prepareWrite(
  sparkSession: SparkSession,
  job: Job,
  options: Map[String, String],
  dataSchema: StructType): OutputWriterFactory

prepareWrite is part of the FileFormat abstraction.

prepareWrite...FIXME

supportBatch

supportBatch(
  sparkSession: SparkSession,
  schema: StructType): Boolean

supportBatch is part of the FileFormat abstraction.

Fixme

Review Me

supportBatch supports vectorized parquet decoding in whole-stage code generation when the following all hold:

  1. spark.sql.parquet.enableVectorizedReader configuration property is enabled

  2. spark.sql.codegen.wholeStage internal configuration property is enabled

  3. The number of fields in the schema is at most spark.sql.codegen.maxFields internal configuration property

  4. All the fields in the output schema are of AtomicType

supportDataType

supportDataType(
  dataType: DataType): Boolean

supportDataType is part of the FileFormat abstraction.

supportDataType...FIXME

Vector Types

vectorTypes(
  requiredSchema: StructType,
  partitionSchema: StructType,
  sqlConf: SQLConf): Option[Seq[String]]

vectorTypes is part of the FileFormat abstraction.

Fixme

Review Me

vectorTypes creates a collection of the names of OffHeapColumnVector or OnHeapColumnVector when spark.sql.columnVector.offheap.enabled property is enabled or disabled, respectively.

The size of the collection are all the fields of the given requiredSchema and partitionSchema schemas.

Logging

Enable ALL logging level for org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat=ALL

Refer to Logging.

Back to top