I’m a Scala proponent so when I found out that the Apache Kafka team has decided to switch to using Java as the main language of the new client API it was beyond my imagination. Akka’s fine with their Java/Scala APIs and so I can’t believe Apache Kafka couldn’t offer similar APIs, too. It’s even more weird when one finds out that Apache Kafka itself is written in Scala. Why on earth did they decide to do the migration?!
In order to learn Kafka better, I developed a custom producer using the latest Kafka’s Producer API in Scala. I built Kafka from the sources, and so I’m using the version 0.8.3-SNAPSHOT. It was pretty surprising experience, esp. when I ran across java.util.concurrent.Future that seems so limited to what scala.concurrent.Future offers. No map
, flatMap
or such? So far I consider the switch to using Java for the Client API a big mistake.
Here comes the complete Kafka producer I’ve developed in Scala that’s supposed to serve as a basis for my future development endeavours using the API in what’s going to be in 0.8.3 release.
Custom KafkaProducer in Scala
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
|
Building Kafka from the sources
In order to run the client you should build Kafka from the sources first and publish the jars to the local Maven repository. The reason to have the build is that the producer uses the very latest Kafka Producer API.
Building Kafka from the sources is as simple as executing gradle -PscalaVersion=2.11.7 clean releaseTarGz
in the directory where you git clone https://github.com/apache/kafka.git
d the Kafka repo from GitHub.
➜ kafka git:(trunk) gradle -PscalaVersion=2.11.7 clean releaseTarGz install
Building project 'core' with Scala version 2.11.7
...
BUILD SUCCESSFUL
Total time: 1 mins 23.233 secs
I was building the distro against Scala 2.11.7.
Once done, core/build/distributions/kafka_2.11-0.9.0.0-SNAPSHOT.tgz
is where you find the release package.
➜ kafka git:(trunk) ls -l core/build/distributions/kafka_2.11-0.9.0.0-SNAPSHOT.tgz
-rw-r--r-- 1 jacek staff 17813003 29 wrz 08:32 core/build/distributions/kafka_2.11-0.9.0.0-SNAPSHOT.tgz
Unpack it and cd
to it.
➜ kafka git:(trunk) tar -zxf core/build/distributions/kafka_2.11-0.9.0.0-SNAPSHOT.tgz
➜ kafka git:(trunk) ✗ cd kafka_2.11-0.9.0.0-SNAPSHOT
➜ kafka_2.11-0.9.0.0-SNAPSHOT git:(trunk) ✗ ls -l
total 32
-rw-r--r-- 1 jacek staff 11358 9 lis 2014 LICENSE
-rw-r--r-- 1 jacek staff 162 9 lis 2014 NOTICE
drwxr-xr-x 26 jacek staff 884 29 wrz 08:32 bin
drwxr-xr-x 16 jacek staff 544 29 wrz 08:32 config
drwxr-xr-x 21 jacek staff 714 29 wrz 08:32 libs
Zookeeper up and running
Running Zookeeper is the very first step you should do (as that’s how Kafka maintains high-availability). Use ./bin/zookeeper-server-start.sh config/zookeeper.properties
:
➜ kafka_2.11-0.9.0.0-SNAPSHOT git:(trunk) ✗ ./bin/zookeeper-server-start.sh config/zookeeper.properties
[2015-09-29 12:26:41,011] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2015-09-29 12:26:41,014] INFO autopurge.snapRetainCount set to 3 (org.apache.zookeeper.server.DatadirCleanupManager)
[2015-09-29 12:26:41,014] INFO autopurge.purgeInterval set to 0 (org.apache.zookeeper.server.DatadirCleanupManager)
[2015-09-29 12:26:41,014] INFO Purge task is not scheduled. (org.apache.zookeeper.server.DatadirCleanupManager)
[2015-09-29 12:26:41,014] WARN Either no config or no quorum defined in config, running in standalone mode (org.apache.zookeeper.server.quorum.QuorumPeerMain)
[2015-09-29 12:26:41,036] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2015-09-29 12:26:41,036] INFO Starting server (org.apache.zookeeper.server.ZooKeeperServerMain)
[2015-09-29 12:26:41,301] INFO Server environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT (org.apache.zookeeper.server.ZooKeeperServer)
[2015-09-29 12:26:41,301] INFO Server environment:host.name=172.20.36.184 (org.apache.zookeeper.server.ZooKeeperServer)
[2015-09-29 12:26:41,301] INFO Server environment:java.version=1.8.0_60 (org.apache.zookeeper.server.ZooKeeperServer)
...
[2015-09-29 12:26:41,333] INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxnFactory)
Kafka broker up and running
In another terminal, start a Kafka broker using ./bin/kafka-server-start.sh config/server.properties
command:
➜ kafka_2.11-0.9.0.0-SNAPSHOT git:(trunk) ✗ ./bin/kafka-server-start.sh config/server.properties
...
[2015-07-20 00:18:33,671] INFO starting (kafka.server.KafkaServer)
[2015-07-20 00:18:33,673] INFO Connecting to zookeeper on localhost:2181 (kafka.server.KafkaServer)
[2015-07-20 00:18:33,684] INFO Starting ZkClient event thread. (org.I0Itec.zkclient.ZkEventThread)
[2015-07-20 00:18:33,693] INFO Client environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT (org.apache.zookeeper.ZooKeeper)
[2015-07-20 00:18:33,694] INFO Client environment:host.name=192.168.1.9 (org.apache.zookeeper.ZooKeeper)
[2015-07-20 00:18:33,694] INFO Client environment:java.version=1.8.0_45 (org.apache.zookeeper.ZooKeeper)
[2015-07-20 00:18:33,694] INFO Client environment:java.vendor=Oracle Corporation (org.apache.zookeeper.ZooKeeper)
...
[2015-09-29 13:18:49,919] INFO Registered broker 0 at path /brokers/ids/0 with addresses: PLAINTEXT -> EndPoint(192.168.99.1,9092,PLAINTEXT) (kafka.utils.ZkUtils$)
[2015-09-29 13:18:49,933] INFO Kafka version : 0.9.0.0-SNAPSHOT (org.apache.kafka.common.utils.AppInfoParser)
[2015-09-29 13:18:49,933] INFO Kafka commitId : 4e7db39556ba916c (org.apache.kafka.common.utils.AppInfoParser)
[2015-09-29 13:18:49,934] INFO [Kafka Server 0], started (kafka.server.KafkaServer)
[2015-09-29 13:18:49,935] INFO New leader is 0 (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
Creating topic
You’re now going to create my-topic
topic where the custom producer is going to publish messages to. Of course, the name of the topic is arbitrary, but should match what the custom producer uses.
➜ kafka_2.11-0.9.0.0-SNAPSHOT git:(trunk) ✗ ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic my-topic-test
Created topic "my-topic-test".
Check out the topics available using ./bin/kafka-topics.sh --list --zookeeper localhost:2181
. You should see one.
➜ kafka_2.11-0.9.0.0-SNAPSHOT git:(trunk) ✗ ./bin/kafka-topics.sh --list --zookeeper localhost:2181
my-topic-test
kafka-publisher - Scala project
Create a Scala project. The project is managed by sbt with the following build.sbt
:
val kafkaVersion = "0.9.0.0-SNAPSHOT"
scalaVersion := "2.11.7"
libraryDependencies += "org.apache.kafka" % "kafka-clients" % kafkaVersion
resolvers += Resolver.mavenLocal
Use the following project/build.properties
:
sbt.version=0.13.9
Sending messages using KafkaProducer - sbt run
With the setup, you should now be able to run sbt run
to run the custom Scala producer for Kafka.
➜ kafka-publisher sbt run
[info] Loading global plugins from /Users/jacek/.sbt/0.13/plugins
[info] Loading project definition from /Users/jacek/dev/sandbox/kafka-publisher/project
[info] Set current project to kafka-publisher (in build file:/Users/jacek/dev/sandbox/kafka-publisher/)
[info] Compiling 1 Scala source to /Users/jacek/dev/sandbox/kafka-publisher/target/scala-2.11/classes...
[info] Running KafkaProducer
Connecting to my-topic-test
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
offset = 0
partition = 0
topic = my-topic-test
[success] Total time: 4 s, completed Sep 29, 2015 4:21:14 PM
Executing sbt run
again should show a different offset for the sam partition and topic:
➜ kafka-publisher sbt run
[info] Loading global plugins from /Users/jacek/.sbt/0.13/plugins
[info] Loading project definition from /Users/jacek/dev/sandbox/kafka-publisher/project
[info] Set current project to kafka-publisher (in build file:/Users/jacek/dev/sandbox/kafka-publisher/)
[info] Running KafkaProducer
Connecting to my-topic-test
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
offset = 1
partition = 0
topic = my-topic-test
[success] Total time: 1 s, completed Sep 29, 2015 4:21:47 PM
Using kafkacat as a Kafka message consumer
If you really would like to see the message on the other, receiving side, I strongly recommend using kafkacat that, once installed, boils down to the following command:
➜ ~ kafkacat -C -b localhost:9092 -t my-topic-test
hello at 20.07.2015 0:29:43
hello at 20.07.2015 0:30:46
It will read all the messages already published to my-topic-test
topic and print out others once they come.
That’s it. Congratulations!
Summary
The complete project is on GitHub in kafka-producer repo.
You may also want to read 1.3 Quick Start in the official documentation of Apache Kafka.
Let me know what you think about the topic1 of the blog post in the Comments section below or contact me at jacek@japila.pl. Follow the author as @jaceklaskowski on Twitter, too.
-
pun intended↩