Skip to content

KafkaSourceProvider

KafkaSourceProvider is a DataSourceRegister that registers kafka data source alias.

The Internals of Spark SQL

Read up on DataSourceRegister in The Internals of Spark SQL book.

KafkaSourceProvider supports micro-batch stream processing (through MicroBatchStream) and uses a specialized KafkaMicroBatchReader.

Properties of Kafka Consumers on Executors

ConsumerConfig's Key Value
KEY_DESERIALIZER_CLASS_CONFIG ByteArrayDeserializer
VALUE_DESERIALIZER_CLASS_CONFIG ByteArrayDeserializer
AUTO_OFFSET_RESET_CONFIG none
GROUP_ID_CONFIG uniqueGroupId-executor
ENABLE_AUTO_COMMIT_CONFIG false
RECEIVE_BUFFER_CONFIG 65536

Required Options

KafkaSourceProvider requires the following options (that you can set using option method of DataStreamReader or DataStreamWriter):

Tip

Refer to Kafka Data Source's Options for the supported configuration options.

Creating KafkaTable

getTable(
  options: CaseInsensitiveStringMap): KafkaTable

getTable creates a KafkaTable with the value of includeheaders option (default: false).

getTable is part of the SimpleTableProvider abstraction (Spark SQL).

Creating Streaming Sink

createSink(
  sqlContext: SQLContext,
  parameters: Map[String, String],
  partitionColumns: Seq[String],
  outputMode: OutputMode): Sink

createSink creates a KafkaSink for topic option (if defined) and Kafka Producer parameters.

createSink is part of the StreamSinkProvider abstraction.

Creating Streaming Source

createSource(
  sqlContext: SQLContext,
  metadataPath: String,
  schema: Option[StructType],
  providerName: String,
  parameters: Map[String, String]): Source

createSource validates stream options.

createSource...FIXME

createSource is part of the StreamSourceProvider abstraction.

Validating Options For Batch And Streaming Queries

validateGeneralOptions(
  parameters: Map[String, String]): Unit

Note

Parameters are case-insensitive, i.e. OptioN and option are equal.

validateGeneralOptions makes sure that exactly one topic subscription strategy is used in parameters and can be:

  • subscribe
  • subscribepattern
  • assign

validateGeneralOptions reports an IllegalArgumentException when there is no subscription strategy in use or there are more than one strategies used.

validateGeneralOptions makes sure that the value of subscription strategies meet the requirements:

  • assign strategy starts with { (the opening curly brace)
  • subscribe strategy has at least one topic (in a comma-separated list of topics)
  • subscribepattern strategy has the pattern defined

validateGeneralOptions makes sure that group.id has not been specified and reports an IllegalArgumentException otherwise.

Kafka option 'group.id' is not supported as user-specified consumer groups are not used to track offsets.

validateGeneralOptions makes sure that auto.offset.reset has not been specified and reports an IllegalArgumentException otherwise.

Kafka option 'auto.offset.reset' is not supported.
Instead set the source option 'startingoffsets' to 'earliest' or 'latest' to specify where to start. Structured Streaming manages which offsets are consumed internally, rather than relying on the kafkaConsumer to do it. This will ensure that no data is missed when new topics/partitions are dynamically subscribed. Note that 'startingoffsets' only applies when a new Streaming query is started, and
that resuming will always pick up from where the query left off. See the docs for more details.

validateGeneralOptions makes sure that the following options have not been specified and reports an IllegalArgumentException otherwise:

  • kafka.key.deserializer
  • kafka.value.deserializer
  • kafka.enable.auto.commit
  • kafka.interceptor.classes

In the end, validateGeneralOptions makes sure that kafka.bootstrap.servers option was specified and reports an IllegalArgumentException otherwise.

Option 'kafka.bootstrap.servers' must be specified for configuring Kafka consumer

validateGeneralOptions is used when KafkaSourceProvider validates options for streaming and batch queries.

Creating ConsumerStrategy

strategy(
  caseInsensitiveParams: Map[String, String]): ConsumerStrategy

strategy converts a key (in caseInsensitiveParams) to a ConsumerStrategy.

Key ConsumerStrategy
assign AssignStrategy
subscribe SubscribeStrategy
subscribepattern SubscribePatternStrategy

strategy is used when...FIXME

AssignStrategy

AssignStrategy with Kafka TopicPartitions

strategy uses JsonUtils.partitions method to parse a JSON with topic names and partitions, e.g.

{"topicA":[0,1],"topicB":[0,1]}

The topic names and partitions are mapped directly to Kafka's TopicPartition objects.

SubscribeStrategy

SubscribeStrategy with topic names

strategy extracts topic names from a comma-separated string, e.g.

topic1,topic2,topic3

SubscribePatternStrategy

SubscribePatternStrategy with topic subscription regex pattern (that uses a Java java.util.regex.Pattern for the pattern), e.g.

topic\d

Name and Schema of Streaming Source

sourceSchema(
  sqlContext: SQLContext,
  schema: Option[StructType],
  providerName: String,
  parameters: Map[String, String]): (String, StructType)

sourceSchema gives the short name (i.e. kafka) and the fixed schema.

Internally, sourceSchema validates Kafka options and makes sure that the optional input schema is indeed undefined.

When the input schema is defined, sourceSchema reports a IllegalArgumentException.

Kafka source has a fixed schema and cannot be set with a custom one

sourceSchema is part of the StreamSourceProvider abstraction.

Validating Kafka Options for Streaming Queries

validateStreamOptions(
  caseInsensitiveParams: Map[String, String]): Unit

validateStreamOptions makes sure that endingoffsets option is not used. Otherwise, validateStreamOptions reports a IllegalArgumentException.

ending offset not valid in streaming queries

validateStreamOptions validates the general options.

validateStreamOptions is used when KafkaSourceProvider is requested for the schema for Kafka source and to create a KafkaSource.

Converting Configuration Options to KafkaOffsetRangeLimit

getKafkaOffsetRangeLimit(
  params: Map[String, String],
  offsetOptionKey: String,
  defaultOffsets: KafkaOffsetRangeLimit): KafkaOffsetRangeLimit

getKafkaOffsetRangeLimit finds the given offsetOptionKey in the params and does the following conversion:

getKafkaOffsetRangeLimit is used when:

Creating Fake BaseRelation

createRelation(
  sqlContext: SQLContext,
  parameters: Map[String, String]): BaseRelation

createRelation...FIXME

createRelation is part of the RelationProvider abstraction (Spark SQL).

Validating Configuration Options for Batch Processing

validateBatchOptions(
  caseInsensitiveParams: Map[String, String]): Unit

validateBatchOptions...FIXME

validateBatchOptions is used when KafkaSourceProvider is requested to createSource.

Configuration Properties

failOnDataLoss

failOnDataLoss(
  caseInsensitiveParams: Map[String, String]): Boolean

failOnDataLoss looks up the failOnDataLoss configuration property (in the caseInsensitiveParams) or defaults to true.

Utilities

kafkaParamsForDriver

kafkaParamsForDriver(
  specifiedKafkaParams: Map[String, String]): Map[String, Object]

kafkaParamsForDriver...FIXME

kafkaParamsForDriver is used when:

kafkaParamsForExecutors

kafkaParamsForExecutors(
  specifiedKafkaParams: Map[String, String],
  uniqueGroupId: String): Map[String, Object]

kafkaParamsForExecutors sets the Kafka properties for executors.

While setting the properties, kafkaParamsForExecutors prints out the following DEBUG message to the logs:

executor: Set [key] to [value], earlier value: [value]

kafkaParamsForExecutors is used when:

Kafka Producer Parameters

kafkaParamsForProducer(
  params: CaseInsensitiveMap[String]): ju.Map[String, Object]

kafkaParamsForProducer...FIXME

kafkaParamsForProducer is used when:

Logging

Enable ALL logging level for org.apache.spark.sql.kafka010.KafkaSourceProvider logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.sql.kafka010.KafkaSourceProvider=ALL

Refer to Logging.


Last update: 2021-02-17