Kafka tutorial #6 - Kafka Streams in Kotlin

This is the sixth post in this series where we go through the basics of using Kafka. In this post, instead of using the Java client (producer and consumer API), we are going to use Kafka Streams, a powerful library to process streaming data.

A few words about Kafka Streams

Kafka Streams is a library that allows you to process data from and to Kafka. It provides a DSL with similar functions as to what we can find in Spark: map(), flatMap(), filter(), groupBy(), join(), etc. Stateful transformations are available, as well as powerful windowing functions, including support for late arrival data, etc.

Since Kafka Streams is a library, a Kafka Streams applications can be deployed by just executing the Jar of your application. There is no server in which to deploy your application, meaning you can use any tool you like to run your Kafka Streams applications: Docker, Kubernetes, bare-metal, etc.

Kafka Streams allows you to scale out by running multiple threads in the same JVM, and/or by starting multiple instances of your application (multiple JVMs).

Although Kafka Streams is part of the Apache Kafka project, I highly recommend reading the documentation provided by Confluent. You can also watch the talk I gave at Kafka Summit last year: Microservices with Kafka: An Introduction to Kafka Streams with a Real-Life Example.

Setting up our project

We just need one dependency for Kafka Streams. The Kafka client is a transitive dependency, so you don't need to add that one explicitly:

dependencies {
    compile 'org.apache.kafka:kafka-streams:2.0.0'

We will keep the dependencies for Jackson and its Kotlin module.

We said we could just run a Kafka Streams application as a jar, so let's modify our build to generate a "fat jar":

jar {
    from {
        configurations.compile.collect { it.isDirectory() ? it : zipTree(it) }

The consumer code

To write a Kafka Streams application, the first thing to do is to get a StreamsBuilder. This is what will allow us to build a topology:

val streamsBuilder = StreamsBuilder()

We can use this builder to consume data from a topic. Here, we want to use a KStream (a representation of a stream of records), not a KTable (a changelog stream), so we use the stream() method.

val personJsonStream: KStream<String, String> = streamsBuilder
        .stream<String, String>(personsTopic, Consumed.with(Serdes.String(), Serdes.String()))

It is a good practice to specify explicitly how to deserialize the messages at this level (Consumed.with(...)), rather than through application-wide properties, because a Kafka Streams application may read from multiple sources of data with different formats.

Because we are reading JSON data as strings, we need to deserialize the value of our messages into Person objects:

val personStream: KStream<String, Person> = personJsonStream.mapValues { v ->
    jsonMapper.readValue(v, Person::class.java)

Notice that the mapValues() method has 2 implementations with different lambdas, so we are explicitly specifying that we are using the lambda with a single argument (v -> ...).

We can now process the Person objects to calculate the age of the persons:

val resStream: KStream<String, String> = personStream.map { _, p ->
    val birthDateLocal = p.birthDate.toInstant().atZone(ZoneId.systemDefault()).toLocalDate()
    val age = Period.between(birthDateLocal, LocalDate.now()).getYears()
    KeyValue("${p.firstName} ${p.lastName}", "$age")

In this case, map() only has one implementation with a lambda that takes 2 parameters. We don't need the key (it is null) so _ is a way to indicate we know the parameter is here, but we don't need a variable for this value.

Now comes the time to write the result back to the output topic. Again, it is a good practice to be explicit with the serializers to use:

resStream.to(agesTopic, Produced.with(Serdes.String(), Serdes.String()))

Out topology is built, so let's get a reference to it:

val topology = streamsBuilder.build()

We can now start the Kafka Streams engine, passing it the topology and a couple of properties:

val props = Properties()
props["bootstrap.servers"] = brokers
props["application.id"] = "kafka-tutorial"
val streams = KafkaStreams(topology, props)

Notice that, unlike how we used the Java client, we don't need to specify serializers or deserializers here. Instead, we have defined these explicitly when reading from or writing to topics. If we need to use additional topics with different formats, there will be no ambiguity as to what serde to use.

Testing the code

We can reuse the code of part 1 to produce data, so let's go ahead and run this producer, and run the console consumer to visualize the data:

$ kafka-console-consumer --bootstrap-server localhost:9092 --topic persons

Now, we can either run the Kafka Streams application straight from the IDE, or build it and run it from the command line:

$ gradle build
$ java -cp build/libs/kafka-streams.jar com.ippontech.kafkastreams.StreamsProcessorKt

Now, if you start the console consumer on the output topic, you should see similar results as to what we had seen in part 2:

$ kafka-console-consumer --bootstrap-server localhost:9092 --topic ages
Patrick Rempel	30
Charlotte Windler	39


Scaling out

We said that Kafka Streams can easily scale out by running multiple instances of the same application. Let's verify this by running a second instance in another window (I am shortening the logs for better readability):

$ java -cp build/libs/kafka-streams.jar com.ippontech.kafkastreams.StreamsProcessorKt
18/08/03 18:43:56.039 INFO ... Setting newly assigned partitions [persons-2, persons-3]

This new instance was just assigned 2 partitions of our topic (2 and 3), out of a total of 4 partitions.

Let's look at the log of the first instance:

18/08/03 18:43:56.014 INFO ... Revoking previously assigned partitions [persons-0, persons-1, persons-2, persons-3]
18/08/03 18:43:56.014 INFO ... State transition from RUNNING to PARTITIONS_REVOKED
18/08/03 18:43:56.014 INFO ... State transition from RUNNING to REBALANCING
18/08/03 18:43:56.035 INFO ... Setting newly assigned partitions [persons-0, persons-1]

The first instance was processing the 4 partitions. It entered a state where the partitions got reassigned to consumers, ending up in this instance being assigned 2 partitions (0 and 1).

We just found a way to easily scale out! This works because our consumers are part of the same consumer group, which is controlled here through the application.id property.

We could go further and assign multiple threads to each instance of our application by defining the num.stream.threads property. Each thread would be independent, with its own consumer and producer. This makes it easy to consume the resources of our servers.


I can't say enough how Kafka Streams is a great library. It is my tool of choice when building data pipelines with Kafka. Kafka Streams removes a lot of the work that you would have to do with the plain Java client, while being a lot simpler to deploy and manage than a Spark or Flink application.

There is a lot more to know about Kafka Streams, so let me know in the comments section below if there is something specific you would like me to expose in a further post.

The code of these tutorials can be found here.

Found this post useful? Kindly tap
Author image
Big Data Engineer & Managing Consultant - I work with Spark, Kafka and Cassandra. My preferred language is Scala!
Washington, DC, USA LinkedIn
Ippon Technologies is an international consulting firm that specializes in Agile Development, Big Data and DevOps / Cloud. Our 300+ highly skilled consultants are located in the US, France and Australia. Ippon technologies has a $32 million revenue and a 20% annual growth rate.