It’s been a while since I worked with Spark Streaming. It was back then when I was working for a pet project that ultimately ended up as a Typesafe Activator template Spark Streaming with Scala and Akka to get people going with the technologies.
Time flies by very quickly and as the other blog posts may have showed I’m evaluating Apache Kafka as a potential messaging and integration platform for my future projects. A lot is happening in so called big data space and Apache Kafka fits the bill in many dataflows around me so well. I’m very glad it’s mostly all Scala which we all love and are spending our time with. Ain’t we?
From Spark Streaming documentation (Kafka bolded on purpose):
Spark Streaming is an extension of the core Spark API that enables scalable, high-throughput, fault-tolerant stream processing of live data streams. Data can be ingested from many sources like Kafka, Flume, Twitter, ZeroMQ, Kinesis, or TCP sockets, and can be processed using complex algorithms expressed with high-level functions like map, reduce, join and window.
Since Apache Kafka aims at being the central hub for real-time streams of data (see 1.2 Use Cases and Putting Apache Kafka To Use: A Practical Guide to Building a Stream Data Platform (Part 1)) I couldn’t deny myself the simple pleasure of giving it a go.
Buckle up and ingest some data using Apache Kafka and Spark Streaming! You surely will love the infrastructure (if you haven’t already). Be sure to type fast to see the potential of the platform at your fingertips.
The project (using sbt)
Here is the sbt build file build.sbt
for the project:
val sparkVersion = "1.4.1"
scalaVersion := "2.11.7"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-streaming" % sparkVersion,
"org.apache.spark" %% "spark-streaming-kafka" % sparkVersion
)
It uses the latest released versions of Spark Streaming 1.4.1 and Scala 2.11.7.
Setting up Kafka broker
It assumes you’ve already installed Apache Kafka. You may want to use Docker (see Apache Kafka on Docker) or build Kafka from the sources. Whatever approach you choose, start Zookeeper and Kafka.
Starting Zookeeper
I’m using the version of Kafka built from the sources and ./bin/zookeeper-server-start.sh
that comes with it.
➜ kafka_2.11-0.8.3-SNAPSHOT git:(trunk) ✗ ./bin/zookeeper-server-start.sh config/zookeeper.properties
...
[2015-07-21 06:51:39,614] INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxnFactory)
Starting Kafka broker
Once Zookeeper is up and running (in the above case, listening to the port 2181), run a Kafka broker.
➜ kafka_2.11-0.8.3-SNAPSHOT git:(trunk) ✗ ./bin/kafka-server-start.sh config/server.properties
...
[2015-07-21 06:53:17,051] INFO Registered broker 0 at path /brokers/ids/0 with addresses: PLAINTEXT -> EndPoint(192.168.1.9,9092,PLAINTEXT) (kafka.utils.ZkUtils$)
[2015-07-21 06:53:17,058] INFO [Kafka Server 0], started (kafka.server.KafkaServer)
There are merely two commands to boot the entire environment up and that’s it.
Creating topic - spark-topic
A topic is where you’re going to send messages to and where Spark Streaming is consuming them from later on. It’s the communication channel between producers and consumers and you’ve got to have one.
Create spark-topic
topic. The name is arbitrary and pick whatever makes you happy, but use it consistently in the other places where the name’s used.
➜ kafka_2.11-0.8.3-SNAPSHOT git:(trunk) ✗ ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic spark-topic
Created topic "spark-topic".
You may want to check out the available topics.
➜ kafka_2.11-0.8.3-SNAPSHOT git:(trunk) ✗ ./bin/kafka-topics.sh --list --zookeeper localhost:2181
spark-topic
You’re now done with setting up Kafka for the demo.
(optional) Sending to and receiving messages from Kafka
Apache Kafka comes with two shell scripts to send and receive messages from topics. They’re kafka-console-producer.sh
and kafka-console-consumer.sh
, respectively. They both use the console (stdin) as the input and output.
Let’s publish few messages to the spark-topic
topic using ./bin/kafka-console-producer.sh
.
➜ kafka_2.11-0.8.3-SNAPSHOT git:(trunk) ✗ ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic spark-topic
hello
hi
^D
You can keep the producer up in one terminal and use another terminal to consume the messages or just send a couple of messages and close the session. Kafka persists messages for a period of time.
Consuming messages is as simple as running ./bin/kafka-console-consumer.sh
. Mind the option --zookeeper
to point to Zookeeper where Kafka stores its configuration and --from-beginning
that tells Kafka to process all persisted messages.
➜ kafka_2.11-0.8.3-SNAPSHOT git:(trunk) ✗ ./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic spark-topic --from-beginning
hello
hi
^DConsumed 2 messages
Spark time!
Remember build.sbt
above that sets the Scala/sbt project up with appropriate Scala version and Spark Streaming dependencies?
val sparkVersion = "1.4.1"
scalaVersion := "2.11.7"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-streaming" % sparkVersion,
"org.apache.spark" %% "spark-streaming-kafka" % sparkVersion
)
To learn a little about the integration between Spark Streaming and Apache Kafka you’re going to use sbt console
and type all the integration code line by line in the interactive console. You could have a simple Scala application, but I’m leaving it to you as an exercise.
You may want to read the scaladoc of org.apache.spark.SparkConf and org.apache.spark.streaming.StreamingContext to learn about their purpose in the sample.
scala> import org.apache.spark.SparkConf
import org.apache.spark.SparkConf
scala> val conf = new SparkConf().setMaster("local[*]").setAppName("KafkaReceiver")
conf: org.apache.spark.SparkConf = org.apache.spark.SparkConf@2f8bf5fc
scala> import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.StreamingContext
scala> import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.Seconds
scala> val ssc = new StreamingContext(conf, Seconds(10))
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
15/07/21 09:08:39 INFO SparkContext: Running Spark version 1.4.1
...
ssc: org.apache.spark.streaming.StreamingContext = org.apache.spark.streaming.StreamingContext@2ce5cc3
scala> import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.kafka.KafkaUtils
// Note the name of the topic in use - spark-topic
scala> val kafkaStream = KafkaUtils.createStream(ssc, "localhost:2181","spark-streaming-consumer-group", Map("spark-topic" -> 5))
kafkaStream: org.apache.spark.streaming.dstream.ReceiverInputDStream[(String, String)] = org.apache.spark.streaming.kafka.KafkaInputDStream@4ab601ac
// The very complex BIG data analytics
scala> kafkaStream.print()
// Start the streaming context so Spark Streaming polls for messages
scala> ssc.start
15/07/21 09:11:31 INFO ReceiverTracker: ReceiverTracker started
15/07/21 09:11:31 INFO ForEachDStream: metadataCleanupDelay = -1
...
15/07/21 09:11:31 INFO StreamingContext: StreamingContext started
Spark Streaming is now connected to Apache Kafka and consumes messages every 10 seconds. Leave it running and switch to another terminal.
Open a terminal to run a Kafka producer. You may want to use kafkacat
(highly recommended) or the producer that comes with Apache Kafka - kafka-console-producer.sh
.
➜ kafka_2.11-0.8.3-SNAPSHOT git:(trunk) ✗ ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic spark-topic
hey spark, how are you doing today?
Switch to the terminal with Spark Streaming running and see the message printed out.
15/07/21 09:12:00 INFO DAGScheduler: ResultStage 1 (print at <console>:18) finished in 0.016 s
15/07/21 09:12:00 INFO DAGScheduler: Job 1 finished: print at <console>:18, took 0.030530 s
-------------------------------------------
Time: 1437462720000 ms
-------------------------------------------
(null,hey spark, how are you doing today?)
15/07/21 09:12:00 INFO JobScheduler: Finished job streaming job 1437462720000 ms.1 from job set of time 1437462720000 ms
Congratulations! You’ve earned the Spark Streaming and Apache Kafka integration badge! Close Spark Streaming’s context using
scala> ssc.stop
or simply press Ctrl+C
. Shut down Apache Kafka and Zookeeper, too. Done.
(bonus) Building Apache Spark from the sources
You could use the very latest version of Spark Streaming in which the latest and greatest development is going on and lives on the master branch.
Following the official documentation Building Spark1, execute the following two commands. Please note the versions in the build as it uses Scala 2.11.7 and Hadoop 2.7.1.
➜ spark git:(master) ./dev/change-scala-version.sh 2.11
➜ spark git:(master) ✗ ./build/mvn -Pyarn -Phadoop-2.6 -Dhadoop.version=2.7.1 -Dscala-2.11 -DskipTests clean install
...
[INFO] ------------------------------------------------------------------------
[INFO] Reactor Summary:
[INFO]
[INFO] Spark Project Parent POM ........................... SUCCESS [ 6.187 s]
[INFO] Spark Project Launcher ............................. SUCCESS [ 10.096 s]
[INFO] Spark Project Networking ........................... SUCCESS [ 8.650 s]
[INFO] Spark Project Shuffle Streaming Service ............ SUCCESS [ 6.085 s]
[INFO] Spark Project Unsafe ............................... SUCCESS [ 10.308 s]
[INFO] Spark Project Core ................................. SUCCESS [02:08 min]
[INFO] Spark Project Bagel ................................ SUCCESS [ 6.750 s]
[INFO] Spark Project GraphX ............................... SUCCESS [ 15.942 s]
[INFO] Spark Project Streaming ............................ SUCCESS [ 32.429 s]
[INFO] Spark Project Catalyst ............................. SUCCESS [ 55.389 s]
[INFO] Spark Project SQL .................................. SUCCESS [ 56.297 s]
[INFO] Spark Project ML Library ........................... SUCCESS [01:05 min]
[INFO] Spark Project Tools ................................ SUCCESS [ 4.702 s]
[INFO] Spark Project Hive ................................. SUCCESS [ 47.624 s]
[INFO] Spark Project REPL ................................. SUCCESS [ 5.686 s]
[INFO] Spark Project YARN Shuffle Service ................. SUCCESS [ 7.479 s]
[INFO] Spark Project YARN ................................. SUCCESS [ 11.903 s]
[INFO] Spark Project Assembly ............................. SUCCESS [ 59.155 s]
[INFO] Spark Project External Twitter ..................... SUCCESS [ 7.177 s]
[INFO] Spark Project External Flume Sink .................. SUCCESS [ 6.205 s]
[INFO] Spark Project External Flume ....................... SUCCESS [ 9.151 s]
[INFO] Spark Project External Flume Assembly .............. SUCCESS [ 1.896 s]
[INFO] Spark Project External MQTT ........................ SUCCESS [ 15.044 s]
[INFO] Spark Project External MQTT Assembly ............... SUCCESS [ 3.593 s]
[INFO] Spark Project External ZeroMQ ...................... SUCCESS [ 6.658 s]
[INFO] Spark Project External Kafka ....................... SUCCESS [ 11.207 s]
[INFO] Spark Project Examples ............................. SUCCESS [01:16 min]
[INFO] Spark Project External Kafka Assembly .............. SUCCESS [ 5.115 s]
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 11:21 min
[INFO] Finished at: 2015-08-22T15:08:02+02:00
[INFO] Final Memory: 431M/1960M
[INFO] ------------------------------------------------------------------------
The jars for the version are at your command in the Maven local repository. Switch the version of Spark Streaming to 1.5.0-SNAPSHOT and start over.
Summary
As it turns out setting up a working configuration of Apache Kafka and Spark Streaming is just few clicks away. There are a couple of places that need improvement, but what the article has showed could be a good starting point for other real-time big data analytics using Apache Kafka as the central hub for real-time streams of data that are then processed using complex algorithms in Spark Streaming.
Once the data’s processed, Spark Streaming could be publishing results into yet another Kafka topic and/or store in Hadoop for later. It seems I’ve got a very powerful setup I’m not yet fully aware of where I should apply to.
Let me know what you think about the topic2 of the blog post in the Comments section below or contact me at jacek@japila.pl. Follow the author as @jaceklaskowski on Twitter, too.