KafkaOffsetReader¶
KafkaOffsetReader
relies on the ConsumerStrategy to <
KafkaOffsetReader
<ConsumerConfig.GROUP_ID_CONFIG
) configuration explicitly set to <
KafkaOffsetReader
is <
-
KafkaRelation
is requested to build a distributed data scan with column pruning -
KafkaSourceProvider
is requested to create a KafkaSource, createMicroBatchReader, and createContinuousReader
[[options]] .KafkaOffsetReader's Options [cols="1m,3",options="header",width="100%"] |=== | Name | Description
| fetchOffset.numRetries a| [[fetchOffset.numRetries]]
Default: 3
| fetchOffset.retryIntervalMs a| [[fetchOffset.retryIntervalMs]] How long to wait before retries
Default: 1000
|===
[[kafkaSchema]] KafkaOffsetReader
defines the predefined fixed schema.
[[logging]] [TIP] ==== Enable ALL
logging level for org.apache.spark.sql.kafka010.KafkaOffsetReader
to see what happens inside.
Add the following line to conf/log4j.properties
:
log4j.logger.org.apache.spark.sql.kafka010.KafkaOffsetReader=ALL
Refer to <>.¶
=== [[creating-instance]] Creating KafkaOffsetReader Instance
KafkaOffsetReader
takes the following to be created:
- [[consumerStrategy]] ConsumerStrategy
- [[driverKafkaParams]] Kafka parameters (as name-value pairs that are used exclusively to <
> - [[readerOptions]] Options (as name-value pairs)
- [[driverGroupIdPrefix]] Prefix of the group ID
KafkaOffsetReader
initializes the <
=== [[nextGroupId]] nextGroupId
Internal Method
[source, scala]¶
nextGroupId(): String¶
nextGroupId
sets the <-
followed by the <[driverGroupIdPrefix]-[nextId]
).
In the end, nextGroupId
increments the <
NOTE: nextGroupId
is used exclusively when KafkaOffsetReader
is requested for a <
=== [[resetConsumer]] resetConsumer
Internal Method
[source, scala]¶
resetConsumer(): Unit¶
resetConsumer
...FIXME
NOTE: resetConsumer
is used when...FIXME
=== [[fetchTopicPartitions]] fetchTopicPartitions
Method
[source, scala]¶
fetchTopicPartitions(): Set[TopicPartition]¶
CAUTION: FIXME
fetchTopicPartitions
is used when KafkaRelation
is requested for getPartitionOffsets.
=== [[fetchEarliestOffsets]] Fetching Earliest Offsets -- fetchEarliestOffsets
Method
[source, scala]¶
fetchEarliestOffsets(): Map[TopicPartition, Long] fetchEarliestOffsets(newPartitions: Seq[TopicPartition]): Map[TopicPartition, Long]
CAUTION: FIXME
NOTE: fetchEarliestOffsets
is used when KafkaSource
rateLimit and generates a DataFrame for a batch (when new partitions have been assigned).
=== [[fetchLatestOffsets]] Fetching Latest Offsets -- fetchLatestOffsets
Method
[source, scala]¶
fetchLatestOffsets(): Map[TopicPartition, Long]¶
CAUTION: FIXME
NOTE: fetchLatestOffsets
is used when KafkaSource
gets offsets or initialPartitionOffsets
is initialized.
=== [[withRetriesWithoutInterrupt]] withRetriesWithoutInterrupt
Internal Method
[source, scala]¶
withRetriesWithoutInterrupt( body: => Map[TopicPartition, Long]): Map[TopicPartition, Long]
withRetriesWithoutInterrupt
...FIXME
NOTE: withRetriesWithoutInterrupt
is used when...FIXME
=== [[fetchSpecificOffsets]] Fetching Offsets for Selected TopicPartitions -- fetchSpecificOffsets
Method
[source, scala]¶
fetchSpecificOffsets( partitionOffsets: Map[TopicPartition, Long], reportDataLoss: String => Unit): KafkaSourceOffset
.KafkaOffsetReader's fetchSpecificOffsets image::images/KafkaOffsetReader-fetchSpecificOffsets.png[align="center"]
fetchSpecificOffsets
requests the <poll(0)
.
fetchSpecificOffsets
requests the <Consumer.assignment()
).
fetchSpecificOffsets
requests the <pause(partitions)
.
You should see the following DEBUG message in the logs:
DEBUG KafkaOffsetReader: Partitions assigned to consumer: [partitions]. Seeking to [partitionOffsets]
For every partition offset in the input partitionOffsets
, fetchSpecificOffsets
requests the <
seekToEnd
for the latest (aka-1
)seekToBeginning
for the earliest (aka-2
)seek
for other offsets
In the end, fetchSpecificOffsets
creates a collection of Kafka's TopicPartition
and position
(using the <
fetchSpecificOffsets
is used when KafkaSource
fetches and verifies initial partition offsets.
=== [[createConsumer]] Creating Kafka Consumer -- createConsumer
Internal Method
[source, scala]¶
createConsumer(): Consumer[Array[Byte], Array[Byte]]¶
createConsumer
requests <
NOTE: createConsumer
is used when KafkaOffsetReader
is <
=== [[consumer]] Creating Kafka Consumer (Unless Already Available) -- consumer
Method
[source, scala]¶
consumer: Consumer[Array[Byte], Array[Byte]]¶
consumer
gives the cached <<_consumer, Kafka Consumer>> or creates one itself.
NOTE: Since consumer
method is used (to access the internal <<_consumer, Kafka Consumer>>) in the fetch
methods that gives the property of creating a new Kafka Consumer whenever the internal <<_consumer, Kafka Consumer>> reference become null
, i.e. as in <
consumer
...FIXME
NOTE: consumer
is used when KafkaOffsetReader
is requested to <
=== [[close]] Closing -- close
Method
[source, scala]¶
close(): Unit¶
close
<
close
requests the <
close
is used when:
-
KafkaContinuousReader, KafkaMicroBatchReader, and KafkaSource are requested to stop a streaming reader or source
-
KafkaRelation
is requested to build a distributed data scan with column pruning
=== [[runUninterruptibly]] runUninterruptibly
Internal Method
[source, scala]¶
runUninterruptiblyT: T¶
runUninterruptibly
...FIXME
NOTE: runUninterruptibly
is used when...FIXME
=== [[stopConsumer]] stopConsumer
Internal Method
[source, scala]¶
stopConsumer(): Unit¶
stopConsumer
...FIXME
NOTE: stopConsumer
is used when...FIXME
=== [[toString]] Textual Representation -- toString
Method
[source, scala]¶
toString: String¶
NOTE: toString
is part of the ++https://docs.oracle.com/javase/8/docs/api/java/lang/Object.html#toString--++[java.lang.Object] contract for the string representation of the object.
toString
...FIXME
=== [[internal-properties]] Internal Properties
[cols="30m,70",options="header",width="100%"] |=== | Name | Description
| _consumer a| [[_consumer]] Kafka's https://kafka.apache.org/21/javadoc/org/apache/kafka/clients/consumer/Consumer.html[Consumer] (Consumer[Array[Byte], Array[Byte]]
)
<KafkaOffsetReader
is <
Used when KafkaOffsetReader
:
- <
> - <
> - <
> - <
> - <
> - <
>
| execContext a| [[execContext]] https://www.scala-lang.org/api/2.12.8/scala/concurrent/ExecutionContextExecutorService.html[scala.concurrent.ExecutionContextExecutorService]
| groupId a| [[groupId]]
| kafkaReaderThread a| [[kafkaReaderThread]] https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ExecutorService.html[java.util.concurrent.ExecutorService]
| maxOffsetFetchAttempts a| [[maxOffsetFetchAttempts]]
| nextId a| [[nextId]]
Initially 0
| offsetFetchAttemptIntervalMs a| [[offsetFetchAttemptIntervalMs]]
|===