Skip to content

KafkaMicroBatchReader

KafkaMicroBatchReader is the MicroBatchReader for kafka data source for Micro-Batch Stream Processing.

KafkaMicroBatchReader is created when KafkaSourceProvider is requested to create a MicroBatchReader.

[[pollTimeoutMs]] KafkaMicroBatchReader uses the DataSourceOptions to access the kafkaConsumer.pollTimeoutMs option (default: spark.network.timeout or 120s).

[[maxOffsetsPerTrigger]] KafkaMicroBatchReader uses the DataSourceOptions to access the maxOffsetsPerTrigger option (default: (undefined)).

KafkaMicroBatchReader uses the Kafka properties for executors to create KafkaMicroBatchInputPartitions when requested to planInputPartitions.

Creating Instance

KafkaMicroBatchReader takes the following to be created:

  • [[kafkaOffsetReader]] KafkaOffsetReader
  • [[executorKafkaParams]] Kafka properties for executors (Map[String, Object])
  • [[options]] DataSourceOptions
  • [[metadataPath]] Metadata Path
  • [[startingOffsets]] Desired starting KafkaOffsetRangeLimit
  • [[failOnDataLoss]] failOnDataLoss option

KafkaMicroBatchReader initializes the <>.

=== [[readSchema]] readSchema Method

[source, scala]

readSchema(): StructType

NOTE: readSchema is part of the DataSourceReader contract to...FIXME.

readSchema simply returns the predefined fixed schema.

=== [[planInputPartitions]] Plan Input Partitions -- planInputPartitions Method

[source, scala]

planInputPartitions(): java.util.List[InputPartition[InternalRow]]

NOTE: planInputPartitions is part of the DataSourceReader contract in Spark SQL for the number of InputPartitions to use as RDD partitions (when DataSourceV2ScanExec physical operator is requested for the partitions of the input RDD).

planInputPartitions first finds the new partitions (TopicPartitions that are in the <> but not in the <>) and requests the <> to fetch their earliest offsets.

planInputPartitions prints out the following INFO message to the logs:

Partitions added: [newPartitionInitialOffsets]

planInputPartitions then prints out the following DEBUG message to the logs:

TopicPartitions: [comma-separated list of TopicPartitions]

planInputPartitions requests the <> for <> (given the <> and the newly-calculated newPartitionInitialOffsets as the fromOffsets, the <> as the untilOffsets, and the <>).

In the end, planInputPartitions creates a KafkaMicroBatchInputPartition for every offset range (with the <>, the <>, the <> flag and whether to reuse a Kafka consumer among Spark tasks).

planInputPartitions <> when...FIXME

=== [[getSortedExecutorList]] Available Executors in Spark Cluster (Sorted By Host and Executor ID in Descending Order) -- getSortedExecutorList Internal Method

[source, scala]

getSortedExecutorList(): Array[String]

getSortedExecutorList requests the BlockManager to request the BlockManagerMaster to get the peers (the other nodes in a Spark cluster), creates a ExecutorCacheTaskLocation for every pair of host and executor ID, and in the end, sort it in descending order.

NOTE: getSortedExecutorList is used exclusively when KafkaMicroBatchReader is requested to <> (and calculates offset ranges).

=== [[getOrCreateInitialPartitionOffsets]] getOrCreateInitialPartitionOffsets Internal Method

[source, scala]

getOrCreateInitialPartitionOffsets(): PartitionOffsetMap

getOrCreateInitialPartitionOffsets...FIXME

NOTE: getOrCreateInitialPartitionOffsets is used exclusively for the <> internal registry.

=== [[getStartOffset]] getStartOffset Method

[source, scala]

getStartOffset: Offset

getStartOffset is part of the MicroBatchReader abstraction.

getStartOffset...FIXME

=== [[getEndOffset]] getEndOffset Method

[source, scala]

getEndOffset: Offset

getEndOffset is part of the MicroBatchReader abstraction.

getEndOffset...FIXME

=== [[deserializeOffset]] deserializeOffset Method

[source, scala]

deserializeOffset(json: String): Offset

deserializeOffset is part of the MicroBatchReader abstraction.

deserializeOffset...FIXME

=== [[internal-properties]] Internal Properties

[cols="30m,70",options="header",width="100%"] |=== | Name | Description

| endPartitionOffsets a| [[endPartitionOffsets]] Ending offsets for the assigned partitions (Map[TopicPartition, Long])

Used when...FIXME

| initialPartitionOffsets a| [[initialPartitionOffsets]]

[source, scala]

initialPartitionOffsets: Map[TopicPartition, Long]

| rangeCalculator a| [[rangeCalculator]] KafkaOffsetRangeCalculator (for the given <>)

Used exclusively when KafkaMicroBatchReader is requested to <> (to calculate offset ranges)

| startPartitionOffsets a| [[startPartitionOffsets]] Starting offsets for the assigned partitions (Map[TopicPartition, Long])

Used when...FIXME

|===

Logging

Enable ALL logging level for org.apache.spark.sql.kafka010.KafkaMicroBatchReader logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.sql.kafka010.KafkaMicroBatchReader=ALL

Refer to Logging.


Last update: 2021-02-07