Skip to content

Kafka Data Source

Kafka Data Source is the streaming data source for Apache Kafka in Spark Structured Streaming.

Kafka Data Source provides a streaming source and a streaming sink for micro-batch and continuous stream processing.

spark-sql-kafka-0-10 External Module

Kafka Data Source is part of the spark-sql-kafka-0-10 external module that is distributed with the official distribution of Apache Spark, but it is not on the CLASSPATH by default.

You should define spark-sql-kafka-0-10 module as part of the build definition in your Spark project, e.g. as a libraryDependency in build.sbt for sbt:

libraryDependencies += "org.apache.spark" %% "spark-sql-kafka-0-10" % "3.1.1"

For Spark environments like spark-submit (and "derivatives" like spark-shell), you should use --packages command-line option:

./bin/spark-shell \
  --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.1

Note

Replace the version of spark-sql-kafka-0-10 module (e.g. 3.1.1 above) with one of the available versions found at The Central Repository's Search that matches your version of Apache Spark.

Streaming Source

Kafka data source can load streaming data (reading records) from one or more Kafka topics.

val records = spark
  .readStream
  .format("kafka")
  .option("subscribePattern", """topic-\d{2}""") // topics with two digits at the end
  .option("kafka.bootstrap.servers", ":9092")
  .load

Kafka data source supports many options for reading.

Kafka data source for reading is available through KafkaSourceProvider that is a MicroBatchStream and ContinuousReadSupport for micro-batch and continuous stream processing, respectively.

Predefined (Fixed) Schema

Kafka Data Source uses a predefined (fixed) schema.

Name Type
key BinaryType
value BinaryType
topic StringType
partition IntegerType
offset LongType
timestamp TimestampType
timestampType IntegerType
scala> records.printSchema
root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)

Internally, the fixed schema is defined as part of the DataSourceReader abstraction through MicroBatchReader and ContinuousReader extension contracts for micro-batch and continuous stream processing, respectively.

Column.cast Operator

Use Column.cast operator to cast BinaryType to a StringType (for key and value columns).

scala> :type records
org.apache.spark.sql.DataFrame

val values = records
  .select($"value" cast "string") // deserializing values
scala> values.printSchema
root
|-- value: string (nullable = true)

Streaming Sink

Kafka data source can write streaming data (the result of executing a streaming query) to one or more Kafka topics.

val sq = records
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", ":9092")
  .option("topic", "kafka2console-output")
  .option("checkpointLocation", "checkpointLocation-kafka2console")
  .start

Internally, the kafka data source format for writing is available through KafkaSourceProvider.

Micro-Batch Stream Processing

Kafka Data Source supports Micro-Batch Stream Processing using KafkaMicroBatchReader.

import org.apache.spark.sql.streaming.Trigger
import scala.concurrent.duration._
val sq = spark
  .readStream
  .format("kafka")
  .option("subscribepattern", "kafka2console.*")
  .option("kafka.bootstrap.servers", ":9092")
  .load
  .withColumn("value", $"value" cast "string") // deserializing values
  .writeStream
  .format("console")
  .option("truncate", false) // format-specific option
  .option("checkpointLocation", "checkpointLocation-kafka2console") // generic query option
  .trigger(Trigger.ProcessingTime(30.seconds))
  .queryName("kafka2console-microbatch")
  .start

// In the end, stop the streaming query
sq.stop

Kafka Data Source can assign a single task per Kafka partition (using KafkaOffsetRangeCalculator in Micro-Batch Stream Processing).

Kafka Data Source can reuse a Kafka consumer (using KafkaMicroBatchReader in Micro-Batch Stream Processing).

Continuous Stream Processing

Kafka Data Source supports Continuous Stream Processing using KafkaContinuousReader.

import org.apache.spark.sql.streaming.Trigger
import scala.concurrent.duration._
val sq = spark
  .readStream
  .format("kafka")
  .option("subscribepattern", "kafka2console.*")
  .option("kafka.bootstrap.servers", ":9092")
  .load
  .withColumn("value", $"value" cast "string") // convert bytes to string for display purposes
  .writeStream
  .format("console")
  .option("truncate", false) // format-specific option
  .option("checkpointLocation", "checkpointLocation-kafka2console") // generic query option
  .queryName("kafka2console-continuous")
  .trigger(Trigger.Continuous(10.seconds))
  .start

// In the end, stop the streaming query
sq.stop

Configuration Options

Options with kafka. prefix (e.g. kafka.bootstrap.servers) are considered configuration properties for the Kafka consumers used on the driver and executors.

assign

Topic subscription strategy that accepts a JSON with topic names and partitions, e.g.

{"topicA":[0,1],"topicB":[0,1]}

Exactly one topic subscription strategy is allowed (that KafkaSourceProvider validates before creating KafkaSource).

failOnDataLoss

Default: true

Used when KafkaSourceProvider is requested for failOnDataLoss configuration property

kafka.bootstrap.servers

(required) bootstrap.servers configuration property of the Kafka consumers used on the driver and executors

Default: (empty)

kafkaConsumer.pollTimeoutMs

The time (in milliseconds) spent waiting in Consumer.poll if data is not available in the buffer.

Default: spark.network.timeout or 120s

maxOffsetsPerTrigger

Number of records to fetch per trigger (to limit the number of records to fetch).

Default: (undefined)

Unless defined, KafkaSource requests KafkaOffsetReader for the latest offsets.

minPartitions

Minimum number of partitions per executor (given Kafka partitions)

Default: (undefined)

Must be undefined (default) or greater than 0

When undefined (default) or smaller than the number of TopicPartitions with records to consume from, KafkaMicroBatchReader uses KafkaOffsetRangeCalculator to find the preferred executor for every TopicPartition (and the available executors).

startingOffsets

Starting offsets

Default: latest

Possible values:

  • latest

  • earliest

  • JSON with topics, partitions and their starting offsets, e.g.

    {"topicA":{"part":offset,"p1":-1},"topicB":{"0":-2}}
    

Tip

Use Scala's tripple quotes for the JSON for topics, partitions and offsets.

option(
  "startingOffsets",
  """{"topic1":{"0":5,"4":-1},"topic2":{"0":-2}}""")

subscribe

Topic subscription strategy that accepts topic names as a comma-separated string, e.g.

topic1,topic2,topic3

Exactly one topic subscription strategy is allowed (that KafkaSourceProvider validates before creating KafkaSource).

subscribepattern

Topic subscription strategy that uses Java's java.util.regex.Pattern for the topic subscription regex pattern of topics to subscribe to, e.g.

topic\d

Tip

Use Scala's tripple quotes for the regular expression for topic subscription regex pattern.

option("subscribepattern", """topic\d""")

Exactly one topic subscription strategy is allowed (that KafkaSourceProvider validates before creating KafkaSource).

topic

Optional topic name to use for writing a streaming query

Default: (empty)

Unless defined, Kafka data source uses the topic names as defined in the topic field in the incoming data.

Logical Query Plan for Reading

When DataStreamReader is requested to load a dataset with kafka data source format, it creates a DataFrame with a StreamingRelationV2 leaf logical operator.

scala> records.explain(extended = true)
== Parsed Logical Plan ==
StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@1a366d0, kafka, Map(maxOffsetsPerTrigger -> 1, startingOffsets -> latest, subscribepattern -> topic\d, kafka.bootstrap.servers -> :9092), [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@39b3de87,kafka,List(),None,List(),None,Map(maxOffsetsPerTrigger -> 1, startingOffsets -> latest, subscribepattern -> topic\d, kafka.bootstrap.servers -> :9092),None), kafka, [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, timestampType#6]
...

Logical Query Plan for Writing

When DataStreamWriter is requested to start a streaming query with kafka data source format for writing, it requests the StreamingQueryManager to create a streaming query that in turn creates (a StreamingQueryWrapper with) a ContinuousExecution or a MicroBatchExecution for continuous and micro-batch stream processing, respectively.

scala> sq.explain(extended = true)
== Parsed Logical Plan ==
WriteToDataSourceV2 org.apache.spark.sql.execution.streaming.sources.MicroBatchWriter@bf98b73
+- Project [key#28 AS key#7, value#29 AS value#8, topic#30 AS topic#9, partition#31 AS partition#10, offset#32L AS offset#11L, timestamp#33 AS timestamp#12, timestampType#34 AS timestampType#13]
   +- Streaming RelationV2 kafka[key#28, value#29, topic#30, partition#31, offset#32L, timestamp#33, timestampType#34] (Options: [subscribepattern=kafka2console.*,kafka.bootstrap.servers=:9092])

Demo: Streaming Aggregation with Kafka Data Source

Check out Demo: Streaming Aggregation with Kafka Data Source.

Use the following to publish events to Kafka.

// 1st streaming batch
$ cat /tmp/1
1,1,1
15,2,1

$ kafkacat -P -b localhost:9092 -t topic1 -l /tmp/1

// Alternatively (and slower due to JVM bootup)
$ cat /tmp/1 | ./bin/kafka-console-producer.sh --topic topic1 --broker-list localhost:9092

Last update: 2021-02-17