Skip to content

KafkaRelation

[[schema]] KafkaRelation represents a collection of rows with a predefined schema (BaseRelation) that supports <> (TableScan).

TIP: Read up on https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-BaseRelation.html[BaseRelation] and https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-TableScan.html[TableScan] in https://bit.ly/spark-sql-internals[The Internals of Spark SQL] online book.

KafkaRelation is <> exclusively when KafkaSourceProvider is requested to create a BaseRelation.

[[options]] .KafkaRelation's Options [cols="1m,3",options="header",width="100%"] |=== | Name | Description

| kafkaConsumer.pollTimeoutMs a| [[kafkaConsumer.pollTimeoutMs]][[pollTimeoutMs]]

Default: spark.network.timeout configuration if set or 120s

|===

[[logging]] [TIP] ==== Enable ALL logging levels for org.apache.spark.sql.kafka010.KafkaRelation to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.sql.kafka010.KafkaRelation=ALL

Refer to <>.

=== [[creating-instance]] Creating KafkaRelation Instance

KafkaRelation takes the following when created:

  • [[sqlContext]] SQLContext
  • [[strategy]] ConsumerStrategy
  • [[sourceOptions]] Source options (Map[String, String])
  • [[specifiedKafkaParams]] User-defined Kafka parameters (Map[String, String])
  • [[failOnDataLoss]] failOnDataLoss flag
  • [[startingOffsets]] Starting offsets
  • [[endingOffsets]] Ending offsets

=== [[getPartitionOffsets]] getPartitionOffsets Internal Method

[source, scala]

getPartitionOffsets( kafkaReader: KafkaOffsetReader, kafkaOffsets: KafkaOffsetRangeLimit): Map[TopicPartition, Long]


CAUTION: FIXME

NOTE: getPartitionOffsets is used exclusively when KafkaRelation <>.

=== [[buildScan]] Building Distributed Data Scan with Column Pruning -- buildScan Method

[source, scala]

buildScan(): RDD[Row]

NOTE: buildScan is part of the https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-TableScan.html[TableScan] contract to build a distributed data scan with column pruning.

buildScan generates a unique group ID of the format spark-kafka-relation-[randomUUID] (to make sure that a streaming query creates a new consumer group).

buildScan creates a KafkaOffsetReader with the following:

buildScan uses the KafkaOffsetReader to <> for the starting and ending offsets (based on the given <> and the <>, respectively). buildScan requests the KafkaOffsetReader to close afterwards.

buildScan creates offset ranges (that are a collection of KafkaSourceRDDOffsetRanges with a Kafka TopicPartition, beginning and ending offsets and undefined preferred location).

buildScan prints out the following INFO message to the logs:

Generating RDD of offset ranges: [offsetRanges]

buildScan creates a KafkaSourceRDD with the following:

  • Kafka parameters for executors based on the given <> and the unique group ID (spark-kafka-relation-[randomUUID])

  • The offset ranges created

  • <> configuration

  • The given <> flag

  • reuseKafkaConsumer flag off (false)

buildScan requests the KafkaSourceRDD to map Kafka ConsumerRecords to InternalRows.

In the end, buildScan requests the <> to create a DataFrame (with the name kafka and the predefined <>) that is immediately converted to a RDD[InternalRow].

buildScan throws a IllegalStateException when...FIXME

different topic partitions for starting offsets topics[[fromTopics]] and ending offsets topics[[untilTopics]]

buildScan throws a IllegalStateException when...FIXME

[tp] doesn't have a from offset