Mastering FP and OO with Scala

Making use of functional and object-oriented programming on JVM

Publishing Events Using Custom Producer for Apache Kafka

| Comments

I’m a Scala proponent so when I found out that the Apache Kafka team has decided to switch to using Java as the main language of the new client API it was beyond my imagination. Akka’s fine with their Java/Scala APIs and so I can’t believe Apache Kafka couldn’t offer similar APIs, too. It’s even more weird when one finds out that Apache Kafka itself is written in Scala. Why on earth did they decide to do the migration?!

In order to learn Kafka better, I developed a custom producer using the latest Kafka’s Producer API in Scala. I built Kafka from the sources, and so I’m using the version 0.8.3-SNAPSHOT. It was pretty surprising experience, esp. when I ran across java.util.concurrent.Future that seems so limited to what scala.concurrent.Future offers. No map, flatMap or such? So far I consider the switch to using Java for the Client API a big mistake.

Here comes the complete Kafka producer I’ve developed in Scala that’s supposed to serve as a basis for my future development endeavours using the API in what’s going to be in 0.8.3 release.

Custom KafkaProducer in Scala

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import java.util.concurrent.Future

import org.apache.kafka.clients.producer.RecordMetadata

object KafkaProducer extends App {

  val topic = util.Try(args(0)).getOrElse("my-topic-test")
  println(s"Connecting to $topic")

  import org.apache.kafka.clients.producer.KafkaProducer

  val props = new java.util.Properties()
  props.put("bootstrap.servers", "localhost:9092")
  props.put("client.id", "KafkaProducer")
  props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer")
  props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")

  val producer = new KafkaProducer[Integer, String](props)

  import org.apache.kafka.clients.producer.ProducerRecord

  val polish = java.time.format.DateTimeFormatter.ofPattern("dd.MM.yyyy H:mm:ss")
  val now = java.time.LocalDateTime.now().format(polish)
  val record = new ProducerRecord[Integer, String](topic, 1, s"hello at $now")
  val metaF: Future[RecordMetadata] = producer.send(record)
  val meta = metaF.get() // blocking!
  val msgLog =
    s"""
       |offset    = ${meta.offset()}
       |partition = ${meta.partition()}
       |topic     = ${meta.topic()}
     """.stripMargin
  println(msgLog)

  producer.close()
}

Building Kafka from the sources

In order to run the client you should build Kafka from the sources first and publish the jars to the local Maven repository. The reason to have the build is that the producer uses the very latest Kafka Producer API.

Building Kafka from the sources is as simple as executing gradle -PscalaVersion=2.11.7 clean releaseTarGz in the directory where you git clone https://github.com/apache/kafka.gitd the Kafka repo from GitHub.

➜  kafka git:(trunk) gradle -PscalaVersion=2.11.7 clean releaseTarGz install
Building project 'core' with Scala version 2.11.7
...
BUILD SUCCESSFUL

Total time: 1 mins 23.233 secs

I was building the distro against Scala 2.11.7.

Once done, core/build/distributions/kafka_2.11-0.9.0.0-SNAPSHOT.tgz is where you find the release package.

➜  kafka git:(trunk) ls -l core/build/distributions/kafka_2.11-0.9.0.0-SNAPSHOT.tgz
-rw-r--r--  1 jacek  staff  17813003 29 wrz 08:32 core/build/distributions/kafka_2.11-0.9.0.0-SNAPSHOT.tgz

Unpack it and cd to it.

➜  kafka git:(trunk) tar -zxf core/build/distributions/kafka_2.11-0.9.0.0-SNAPSHOT.tgz
➜  kafka git:(trunk) ✗ cd kafka_2.11-0.9.0.0-SNAPSHOT
➜  kafka_2.11-0.9.0.0-SNAPSHOT git:(trunk) ✗ ls -l
total 32
-rw-r--r--   1 jacek  staff  11358  9 lis  2014 LICENSE
-rw-r--r--   1 jacek  staff    162  9 lis  2014 NOTICE
drwxr-xr-x  26 jacek  staff    884 29 wrz 08:32 bin
drwxr-xr-x  16 jacek  staff    544 29 wrz 08:32 config
drwxr-xr-x  21 jacek  staff    714 29 wrz 08:32 libs

Zookeeper up and running

Running Zookeeper is the very first step you should do (as that’s how Kafka maintains high-availability). Use ./bin/zookeeper-server-start.sh config/zookeeper.properties:

➜  kafka_2.11-0.9.0.0-SNAPSHOT git:(trunk) ✗ ./bin/zookeeper-server-start.sh config/zookeeper.properties
[2015-09-29 12:26:41,011] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2015-09-29 12:26:41,014] INFO autopurge.snapRetainCount set to 3 (org.apache.zookeeper.server.DatadirCleanupManager)
[2015-09-29 12:26:41,014] INFO autopurge.purgeInterval set to 0 (org.apache.zookeeper.server.DatadirCleanupManager)
[2015-09-29 12:26:41,014] INFO Purge task is not scheduled. (org.apache.zookeeper.server.DatadirCleanupManager)
[2015-09-29 12:26:41,014] WARN Either no config or no quorum defined in config, running  in standalone mode (org.apache.zookeeper.server.quorum.QuorumPeerMain)
[2015-09-29 12:26:41,036] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2015-09-29 12:26:41,036] INFO Starting server (org.apache.zookeeper.server.ZooKeeperServerMain)
[2015-09-29 12:26:41,301] INFO Server environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT (org.apache.zookeeper.server.ZooKeeperServer)
[2015-09-29 12:26:41,301] INFO Server environment:host.name=172.20.36.184 (org.apache.zookeeper.server.ZooKeeperServer)
[2015-09-29 12:26:41,301] INFO Server environment:java.version=1.8.0_60 (org.apache.zookeeper.server.ZooKeeperServer)
...
[2015-09-29 12:26:41,333] INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxnFactory)

Kafka broker up and running

In another terminal, start a Kafka broker using ./bin/kafka-server-start.sh config/server.properties command:

  ➜  kafka_2.11-0.9.0.0-SNAPSHOT git:(trunk) ✗ ./bin/kafka-server-start.sh config/server.properties
...
[2015-07-20 00:18:33,671] INFO starting (kafka.server.KafkaServer)
[2015-07-20 00:18:33,673] INFO Connecting to zookeeper on localhost:2181 (kafka.server.KafkaServer)
[2015-07-20 00:18:33,684] INFO Starting ZkClient event thread. (org.I0Itec.zkclient.ZkEventThread)
[2015-07-20 00:18:33,693] INFO Client environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT (org.apache.zookeeper.ZooKeeper)
[2015-07-20 00:18:33,694] INFO Client environment:host.name=192.168.1.9 (org.apache.zookeeper.ZooKeeper)
[2015-07-20 00:18:33,694] INFO Client environment:java.version=1.8.0_45 (org.apache.zookeeper.ZooKeeper)
[2015-07-20 00:18:33,694] INFO Client environment:java.vendor=Oracle Corporation (org.apache.zookeeper.ZooKeeper)
...
[2015-09-29 13:18:49,919] INFO Registered broker 0 at path /brokers/ids/0 with addresses: PLAINTEXT -> EndPoint(192.168.99.1,9092,PLAINTEXT) (kafka.utils.ZkUtils$)
[2015-09-29 13:18:49,933] INFO Kafka version : 0.9.0.0-SNAPSHOT (org.apache.kafka.common.utils.AppInfoParser)
[2015-09-29 13:18:49,933] INFO Kafka commitId : 4e7db39556ba916c (org.apache.kafka.common.utils.AppInfoParser)
[2015-09-29 13:18:49,934] INFO [Kafka Server 0], started (kafka.server.KafkaServer)
[2015-09-29 13:18:49,935] INFO New leader is 0 (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)

Creating topic

You’re now going to create my-topic topic where the custom producer is going to publish messages to. Of course, the name of the topic is arbitrary, but should match what the custom producer uses.

➜  kafka_2.11-0.9.0.0-SNAPSHOT git:(trunk) ✗ ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic my-topic-test
Created topic "my-topic-test".

Check out the topics available using ./bin/kafka-topics.sh --list --zookeeper localhost:2181. You should see one.

➜  kafka_2.11-0.9.0.0-SNAPSHOT git:(trunk) ✗ ./bin/kafka-topics.sh --list --zookeeper localhost:2181
my-topic-test

kafka-publisher - Scala project

Create a Scala project. The project is managed by sbt with the following build.sbt:

val kafkaVersion = "0.9.0.0-SNAPSHOT"
scalaVersion := "2.11.7"

libraryDependencies += "org.apache.kafka" % "kafka-clients" % kafkaVersion
resolvers += Resolver.mavenLocal

Use the following project/build.properties:

sbt.version=0.13.9

Sending messages using KafkaProducer - sbt run

With the setup, you should now be able to run sbt run to run the custom Scala producer for Kafka.

➜  kafka-publisher  sbt run
[info] Loading global plugins from /Users/jacek/.sbt/0.13/plugins
[info] Loading project definition from /Users/jacek/dev/sandbox/kafka-publisher/project
[info] Set current project to kafka-publisher (in build file:/Users/jacek/dev/sandbox/kafka-publisher/)
[info] Compiling 1 Scala source to /Users/jacek/dev/sandbox/kafka-publisher/target/scala-2.11/classes...
[info] Running KafkaProducer
Connecting to my-topic-test
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.

offset    = 0
partition = 0
topic     = my-topic-test

[success] Total time: 4 s, completed Sep 29, 2015 4:21:14 PM

Executing sbt run again should show a different offset for the sam partition and topic:

➜  kafka-publisher  sbt run
[info] Loading global plugins from /Users/jacek/.sbt/0.13/plugins
[info] Loading project definition from /Users/jacek/dev/sandbox/kafka-publisher/project
[info] Set current project to kafka-publisher (in build file:/Users/jacek/dev/sandbox/kafka-publisher/)
[info] Running KafkaProducer
Connecting to my-topic-test
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.

offset    = 1
partition = 0
topic     = my-topic-test

[success] Total time: 1 s, completed Sep 29, 2015 4:21:47 PM

Using kafkacat as a Kafka message consumer

If you really would like to see the message on the other, receiving side, I strongly recommend using kafkacat that, once installed, boils down to the following command:

➜  ~  kafkacat -C -b localhost:9092 -t my-topic-test
hello at 20.07.2015 0:29:43
hello at 20.07.2015 0:30:46

It will read all the messages already published to my-topic-test topic and print out others once they come.

That’s it. Congratulations!

Summary

The complete project is on GitHub in kafka-producer repo.

You may also want to read 1.3 Quick Start in the official documentation of Apache Kafka.

Let me know what you think about the topic1 of the blog post in the Comments section below or contact me at jacek@japila.pl. Follow the author as @jaceklaskowski on Twitter, too.


  1. pun intended

Comments