Skip to content

KafkaOffsetRangeCalculator

KafkaOffsetRangeCalculator is <> for KafkaMicroBatchReader to <> (when KafkaMicroBatchReader is requested to planInputPartitions).

[[minPartitions]][[creating-instance]] KafkaOffsetRangeCalculator takes an optional minimum number of partitions per executor (minPartitions) to be created (that can either be undefined or greater than 0).

[[apply]] When created with a DataSourceOptions, KafkaOffsetRangeCalculator uses minPartitions option for the <>.

=== [[getRanges]] Offset Ranges -- getRanges Method

[source, scala]

getRanges( fromOffsets: PartitionOffsetMap, untilOffsets: PartitionOffsetMap, executorLocations: Seq[String] = Seq.empty): Seq[KafkaOffsetRange]


getRanges finds the common TopicPartitions that are the keys that are used in the fromOffsets and untilOffsets collections (intersection).

For every common TopicPartition, getRanges creates a <> with the from and until offsets from the fromOffsets and untilOffsets collections (and the <> undefined). getRanges filters out the TopicPartitions that <> (i.e. the difference between until and from offsets is not greater than 0).

At this point, getRanges knows the TopicPartitions with records to consume.

getRanges branches off based on the defined <> and the number of KafkaOffsetRanges (TopicPartitions with records to consume).

For the <> undefined or smaller than the number of KafkaOffsetRanges (TopicPartitions to consume records from), getRanges updates every KafkaOffsetRange with the <> based on the TopicPartition and the executorLocations).

Otherwise (with the <> defined and greater than the number of KafkaOffsetRanges), getRanges splits KafkaOffsetRanges into smaller ones.

NOTE: getRanges is used exclusively when KafkaMicroBatchReader is requested to planInputPartitions.

=== [[KafkaOffsetRange]] KafkaOffsetRange -- TopicPartition with From and Until Offsets and Optional Preferred Location

KafkaOffsetRange is a case class with the following attributes:

  • [[topicPartition]] TopicPartition
  • [[fromOffset]] fromOffset offset
  • [[untilOffset]] untilOffset offset
  • [[preferredLoc]] Optional preferred location

[[size]] KafkaOffsetRange knows the size, i.e. the number of records between the <> and <> offsets.

=== [[getLocation]] Selecting Preferred Executor for TopicPartition -- getLocation Internal Method

[source, scala]

getLocation( tp: TopicPartition, executorLocations: Seq[String]): Option[String]


getLocation...FIXME

NOTE: getLocation is used exclusively when KafkaOffsetRangeCalculator is requested to <>.