Skip to content

VectorizedParquetRecordReader

VectorizedParquetRecordReader is a SpecificParquetRecordReaderBase for parquet file format for Vectorized Parquet Decoding.

VectorizedParquetRecordReader is <> exclusively when ParquetFileFormat is requested for a data reader (with spark.sql.parquet.enableVectorizedReader property enabled and the read schema with AtomicType data types only).

[[creating-instance]] VectorizedParquetRecordReader takes the following to be created:

VectorizedParquetRecordReader uses the <> attribute for the following:

  • Creating <> when <>

  • Controlling <> when <>

VectorizedParquetRecordReader uses <> memory mode when spark.sql.columnVector.offheap.enabled internal configuration property is enabled (true).

[[internal-registries]] .VectorizedParquetRecordReader's Internal Properties (e.g. Registries, Counters and Flags) [cols="1m,3",options="header",width="100%"] |=== | Name | Description

| batchIdx | [[batchIdx]] Current batch index that is the index of an InternalRow in the <>. Used when VectorizedParquetRecordReader is requested to <> with the <> flag disabled

Starts at 0

Increments every <>

Reset to 0 when <>

| columnarBatch | [[columnarBatch]] ColumnarBatch

| columnReaders | [[columnReaders]] VectorizedColumnReaders (one reader per column) to <>

Intialized when <> (when requested to <>)

| columnVectors | [[columnVectors]] Allocated WritableColumnVectors

| MEMORY_MODE a| [[MEMORY_MODE]] Memory mode of the <>

Used exclusively when VectorizedParquetRecordReader is requested to <>.

| missingColumns | [[missingColumns]] Bitmap of columns (per index) that are missing (or simply the ones that the reader should not read)

| numBatched | [[numBatched]]

| returnColumnarBatch | [[returnColumnarBatch]] Optimization flag to control whether VectorizedParquetRecordReader offers rows as the <> or one row at a time only

Default: false

Enabled (true) when VectorizedParquetRecordReader is requested to <>

Used in <> (to <>) and <> (to return the internal <> not a single InternalRow)

| rowsReturned | [[rowsReturned]] Number of rows read already

| totalCountLoadedSoFar | [[totalCountLoadedSoFar]]

| totalRowCount | [[totalRowCount]] Total number of rows to be read

|===

=== [[nextKeyValue]] nextKeyValue Method

[source, java]

boolean nextKeyValue() throws IOException

NOTE: nextKeyValue is part of Hadoop's https://hadoop.apache.org/docs/r2.7.4/api/org/apache/hadoop/mapred/RecordReader.html[RecordReader] to read (key, value) pairs from a Hadoop https://hadoop.apache.org/docs/r2.7.4/api/org/apache/hadoop/mapred/InputSplit.html[InputSplit] to present a record-oriented view.

nextKeyValue...FIXME

nextKeyValue is used when:

=== [[resultBatch]] resultBatch Method

[source, java]

ColumnarBatch resultBatch()

resultBatch gives <> if available or does <>.

NOTE: resultBatch is used exclusively when VectorizedParquetRecordReader is requested to <>.

=== [[initialize]] Initializing -- initialize Method

[source, java]

void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)

initialize is part of the SpecificParquetRecordReaderBase abstraction.

initialize...FIXME

=== [[enableReturningBatches]] enableReturningBatches Method

[source, java]

void enableReturningBatches()

enableReturningBatches simply turns <> internal flag on.

enableReturningBatches is used when ParquetFileFormat is requested for a data reader (for vectorized parquet decoding in whole-stage codegen).

=== [[initBatch]] Initializing Columnar Batch -- initBatch Method

[source, java]

void initBatch(StructType partitionColumns, InternalRow partitionValues) // <1> // private private void initBatch() // <2> private void initBatch( MemoryMode memMode, StructType partitionColumns, InternalRow partitionValues)


<1> Uses <> <2> Uses <> and no partitionColumns and no partitionValues

initBatch creates the batch schema that is sparkSchema and the input partitionColumns schema.

initBatch requests OffHeapColumnVector or OnHeapColumnVector to allocate column vectors per the input memMode, i.e. OFF_HEAP or ON_HEAP memory modes, respectively. initBatch records the allocated column vectors as the internal <>.

Note

OnHeapColumnVector is used based on spark.sql.columnVector.offheap.enabled configuration property.

initBatch creates a ColumnarBatch (with the <>) and records it as the internal <>.

initBatch creates new slots in the <> for the input partitionColumns and sets the input partitionValues as constants.

initBatch initializes <> with nulls.

initBatch is used when:

=== [[nextBatch]] Reading Next Rows Into Columnar Batch -- nextBatch Method

boolean nextBatch()

nextBatch reads at least <> rows and returns true when there are rows available. Otherwise, nextBatch returns false (to "announce" there are no rows available).

Internally, nextBatch firstly requests every WritableColumnVector (in the <> internal registry) to reset itself.

nextBatch requests the <> to specify the number of rows (in batch) as 0 (effectively resetting the batch and making it available for reuse).

When the <> is greater than the <>, nextBatch finishes with (returns) false (to "announce" there are no rows available).

nextBatch <>.

nextBatch calculates the number of rows left to be returned as a minimum of the <> and the <> reduced by the <>.

nextBatch requests every <> to readBatch (with the number of rows left to be returned and associated <>).

NOTE: <> use their own <> for storing values read. The numbers of <> and <> are equal.

NOTE: The number of rows in the internal <> matches the number of rows that <> decoded and stored in corresponding <>.

In the end, nextBatch registers the progress as follows:

  • The number of rows read is added to the <> counter

  • Requests the internal <> to set the number of rows (in batch) to be the number of rows read

  • The <> registry is exactly the number of rows read

  • The <> registry becomes 0

nextBatch finishes with (returns) true (to "announce" there are rows available).

NOTE: nextBatch is used exclusively when VectorizedParquetRecordReader is requested to <>.

=== [[checkEndOfRowGroup]] checkEndOfRowGroup Internal Method

[source, java]

void checkEndOfRowGroup() throws IOException

checkEndOfRowGroup...FIXME

NOTE: checkEndOfRowGroup is used exclusively when VectorizedParquetRecordReader is requested to <>.

=== [[getCurrentValue]] Getting Current Value (as Columnar Batch or Single InternalRow) -- getCurrentValue Method

[source, java]

Object getCurrentValue()

NOTE: getCurrentValue is part of the Hadoop https://hadoop.apache.org/docs/r2.7.5/api/org/apache/hadoop/mapreduce/RecordReader.html[RecordReader] Contract to break the data into key/value pairs for input to a Hadoop Mapper.

getCurrentValue returns the entire <> with the <> flag enabled (true) or requests it for a single row instead.

getCurrentValue is used when:

  • NewHadoopRDD is requested to compute a partition (compute)

  • RecordReaderIterator is requested for the next internal row


Last update: 2020-11-08