VectorizedParquetRecordReader¶
VectorizedParquetRecordReader
is a SpecificParquetRecordReaderBase for parquet file format for Vectorized Parquet Decoding.
VectorizedParquetRecordReader
is <ParquetFileFormat
is requested for a data reader (with spark.sql.parquet.enableVectorizedReader property enabled and the read schema with AtomicType data types only).
[[creating-instance]] VectorizedParquetRecordReader
takes the following to be created:
- [[convertTz]]
TimeZone
(null
when no timezone conversion is expected) - [[useOffHeap]]
useOffHeap
flag (based on spark.sql.columnVector.offheap.enabled configuration property) - [[capacity]] Capacity (based on spark.sql.parquet.columnarReaderBatchSize configuration property)
VectorizedParquetRecordReader
uses the <
-
Creating <
> when < > -
Controlling <
> when < >
VectorizedParquetRecordReader
uses <true
).
[[internal-registries]] .VectorizedParquetRecordReader's Internal Properties (e.g. Registries, Counters and Flags) [cols="1m,3",options="header",width="100%"] |=== | Name | Description
| batchIdx | [[batchIdx]] Current batch index that is the index of an InternalRow
in the <VectorizedParquetRecordReader
is requested to <
Starts at 0
Increments every <
Reset to 0
when <
| columnarBatch | [[columnarBatch]] ColumnarBatch
| columnReaders | [[columnReaders]] VectorizedColumnReaders (one reader per column) to <
Intialized when <
| columnVectors | [[columnVectors]] Allocated WritableColumnVectors
| MEMORY_MODE a| [[MEMORY_MODE]] Memory mode of the <
- [[OFF_HEAP]]
OFF_HEAP
(when <> is on as based on spark.sql.columnVector.offheap.enabled configuration property) - [[ON_HEAP]]
ON_HEAP
Used exclusively when VectorizedParquetRecordReader
is requested to <
| missingColumns | [[missingColumns]] Bitmap of columns (per index) that are missing (or simply the ones that the reader should not read)
| numBatched | [[numBatched]]
| returnColumnarBatch | [[returnColumnarBatch]] Optimization flag to control whether VectorizedParquetRecordReader
offers rows as the <
Default: false
Enabled (true
) when VectorizedParquetRecordReader
is requested to <
Used in <InternalRow
)
| rowsReturned | [[rowsReturned]] Number of rows read already
| totalCountLoadedSoFar | [[totalCountLoadedSoFar]]
| totalRowCount | [[totalRowCount]] Total number of rows to be read
|===
=== [[nextKeyValue]] nextKeyValue
Method
[source, java]¶
boolean nextKeyValue() throws IOException¶
NOTE: nextKeyValue
is part of Hadoop's https://hadoop.apache.org/docs/r2.7.4/api/org/apache/hadoop/mapred/RecordReader.html[RecordReader] to read (key, value) pairs from a Hadoop https://hadoop.apache.org/docs/r2.7.4/api/org/apache/hadoop/mapred/InputSplit.html[InputSplit] to present a record-oriented view.
nextKeyValue
...FIXME
nextKeyValue
is used when:
-
NewHadoopRDD
is requested to compute a partition (compute
) -
RecordReaderIterator
is requested to check whether or not there are more internal rows
=== [[resultBatch]] resultBatch
Method
[source, java]¶
ColumnarBatch resultBatch()¶
resultBatch
gives <
NOTE: resultBatch
is used exclusively when VectorizedParquetRecordReader
is requested to <
=== [[initialize]] Initializing -- initialize
Method
[source, java]¶
void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)¶
initialize
is part of the SpecificParquetRecordReaderBase abstraction.
initialize
...FIXME
=== [[enableReturningBatches]] enableReturningBatches
Method
[source, java]¶
void enableReturningBatches()¶
enableReturningBatches
simply turns <
enableReturningBatches
is used when ParquetFileFormat
is requested for a data reader (for vectorized parquet decoding in whole-stage codegen).
=== [[initBatch]] Initializing Columnar Batch -- initBatch
Method
[source, java]¶
void initBatch(StructType partitionColumns, InternalRow partitionValues) // <1> // private private void initBatch() // <2> private void initBatch( MemoryMode memMode, StructType partitionColumns, InternalRow partitionValues)
<1> Uses <partitionColumns
and no partitionValues
initBatch
creates the batch schema that is sparkSchema and the input partitionColumns
schema.
initBatch
requests OffHeapColumnVector or OnHeapColumnVector to allocate column vectors per the input memMode
, i.e. OFF_HEAP or ON_HEAP memory modes, respectively. initBatch
records the allocated column vectors as the internal <
Note
OnHeapColumnVector is used based on spark.sql.columnVector.offheap.enabled configuration property.
initBatch
creates a ColumnarBatch (with the <
initBatch
creates new slots in the <partitionColumns
and sets the input partitionValues
as constants.
initBatch
initializes <nulls
.
initBatch
is used when:
VectorizedParquetRecordReader
is requested for resultBatchParquetFileFormat
is requested to build a data reader with partition column values appended
=== [[nextBatch]] Reading Next Rows Into Columnar Batch -- nextBatch
Method
boolean nextBatch()
nextBatch
reads at least <true
when there are rows available. Otherwise, nextBatch
returns false
(to "announce" there are no rows available).
Internally, nextBatch
firstly requests every WritableColumnVector (in the <
nextBatch
requests the <0
(effectively resetting the batch and making it available for reuse).
When the <nextBatch
finishes with (returns) false
(to "announce" there are no rows available).
nextBatch
<
nextBatch
calculates the number of rows left to be returned as a minimum of the <
nextBatch
requests every <
NOTE: <
NOTE: The number of rows in the internal <
In the end, nextBatch
registers the progress as follows:
-
The number of rows read is added to the <
> counter -
Requests the internal <
> to set the number of rows (in batch) to be the number of rows read -
The <
> registry is exactly the number of rows read -
The <
> registry becomes 0
nextBatch
finishes with (returns) true
(to "announce" there are rows available).
NOTE: nextBatch
is used exclusively when VectorizedParquetRecordReader
is requested to <
=== [[checkEndOfRowGroup]] checkEndOfRowGroup
Internal Method
[source, java]¶
void checkEndOfRowGroup() throws IOException¶
checkEndOfRowGroup
...FIXME
NOTE: checkEndOfRowGroup
is used exclusively when VectorizedParquetRecordReader
is requested to <
=== [[getCurrentValue]] Getting Current Value (as Columnar Batch or Single InternalRow) -- getCurrentValue
Method
[source, java]¶
Object getCurrentValue()¶
NOTE: getCurrentValue
is part of the Hadoop https://hadoop.apache.org/docs/r2.7.5/api/org/apache/hadoop/mapreduce/RecordReader.html[RecordReader] Contract to break the data into key/value pairs for input to a Hadoop Mapper
.
getCurrentValue
returns the entire <true
) or requests it for a single row instead.
getCurrentValue
is used when:
-
NewHadoopRDD
is requested to compute a partition (compute
) -
RecordReaderIterator
is requested for the next internal row