This post is the first in a series of posts in which we will learn how to send messages in the Avro format into Kafka so that they can be consumed by Spark Streaming. There will be 3 posts:
Let’s start this post with Kafka. Kafka is a publish-subscribe messaging system originally written at LinkedIn. It is an Apache project – hence open-source. Kafka has gained a lot of traction for its simplicity and its ability to handle huge amounts of messages.
I’m not going to get too much into describing Kafka. There are great articles out there such as Running Kafka at Scale and Apache Kafka for Beginners. Instead, I will focus on the keys aspects of Kafka and how to use Kafka from a developer’s perspective.
The key features of Kafka are:
The picture below shows a topic with 2 partitions. The producer writes messages to both partitions (with some partitioning logic or by round-robin) and they are simply added to the end of each partition while being assigned an offset by Kafka. At the same time, a consumer reads messages from each partition.
The goal here is not to provide a documentation on how to operate Kafka in production. Instead, we will start a quick-and-dirty installation with a single Kafka node and ZooKeeper (ZooKeeper is used to maintain the state of the Kafka cluster).
Start by downloading Kafka 0.9.0.0 and un-taring it:
> tar -xzf kafka_2.11-0.9.0.0.tgz
> cd kafka_2.11-0.9.0.0
Now, start ZooKeeper:
> bin/zookeeper-server-start.sh config/zookeeper.properties
[2016-02-29 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
...
You can now start the Kafka server:
> bin/kafka-server-start.sh config/server.properties
[2016-02-29 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)
[2016-02-29 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
...
We can finally create a topic named “mytopic” with a single partition and no replication. The script will connect to ZooKeeper to know which instance of Kafka it should connect to.
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic mytopic
You may find additional details in the Quick Start of Kafka’s documentation.
Kafka’s Quick Start describes how to use built-in scripts to publish and consume simple messages. Instead, we will be writing Java code.
Let’s first add a dependency to our Maven’s pom.xml file:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.9.0.1</version>
</dependency>
The 2 main entry points you will find in this library are KafkaProducer and KafkaConsumer.
Here is the complete code to send 100 messages to the “mytopic” topic we created earlier:
package com.ipponusa;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class SimpleStringProducer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 100; i++) {
ProducerRecord<String, String> record = new ProducerRecord<>("mytopic", "value-" + i);
producer.send(record);
}
producer.close();
}
}
Let’s describe this code:
bootstrap.servers
is the host and port to our Kafka serverkey.serializer
is the name of the class to serialize the key of the messages (messages have a key and value, but even though the key is optional, a serializer needs to be provided)value.serializer
is the name of the class to serialize the value of the message. We’re going to send strings, hence the StringSerializer
.KafkaProducer
, providing the type of the key and of the value as generic parameters, and the properties object we’ve just prepared.ProducerRecord
with generic parameters (type of the key and type of the value). We specify the name of the topic and the value, thus omitting the key. The message is then sent using the send
method of the KafkaProducer
.Before launching the producer, let’s see the code of the consumer:
package com.ipponusa;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
public class SimpleStringConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "mygroup");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("mytopic"));
boolean running = true;
while (running) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.value());
}
}
consumer.close();
}
}
The structure is very similar to the producer’s code. Let’s see the differences:
key.deserializer
and value.deserializer
properties, in this case Kafka’s StringDeserializer
class.group.id
property. It is required so as to tell Kafka which consumer group this consumer belongs to. The principle is that the consumers within a group will each receive a subset of the messages that are published, thus sharing the load.KafkaProducer
, we create a KafkaConsumer
.subscribe
method.poll
method of the KafkaConsumer
instance (parameter value 100 is a timeout in ms). This returns a ConsumerRecords
object which is basically a list of ConsumerRecord
objects. Each ConsumerRecord
object is a message which key and value have been deserialized using the deserializer classes provided as properties.Time to run the code! Start by running the consumer. You should get a few line of logs and the process should then wait for messages:
2016-02-29 15:27:12,528 [main] INFO org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values:
...
2016-02-29 15:27:12,707 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 0.9.0.1
2016-02-29 15:27:12,707 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : 23c69d62a0cabf06
Then, and only then, launch the producer. The output should be very similar, and then end with a line saying the producer has been shut down:
2016-02-29 15:27:54,205 [main] INFO org.apache.kafka.clients.producer.KafkaProducer - Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
If you go back to the consumer, you should see some output on the console:
Value=value-0
Value=value-1
...
This seems to work as we would have expected. A few things to note, though. First, we said earlier that the offsets were managed by the consumer but we didn’t do anything regarding offsets. That’s because, by default, the KafkaConsumer
uses Kafka’s API to manage the offset for us. To manage offset by hand, you should set the enable.auto.commit
property to false
and use the seek
method when initializing the KafkaConsumer
.
In our example, the consumer queries Kafka for the highest offset of each partition, and then only waits for new messages. If we had started the producer before the consumer, the messages would have been silently ignored.
Now, this was a very basic example as we were only using one partition. We can update the topic so that it uses 2 partitions:
bin/kafka-topics.sh --zookeeper localhost:2181 --alter <b>--partitions 2</b> --topic mytopic
As we said earlier, the producer can decide to which partition it wants to write to. This is done with an extra parameter to the ProducerRecord
constructor. The documentation is pretty clear about the behavior:
If a valid partition number is specified that partition will be used when sending the record. If no partition is specified but a key is present a partition will be chosen using a hash of the key. If neither key nor partition is present a partition will be assigned in a round-robin fashion.
On the side of the consumer, all the consumers that have subscribed to the topic and that are within the same group will receive a subset of the messages. If we were to create 2 threads, each with a KafkaConsumer
, each consumer would most likely receive the messages of one partition.
We’ve seen the basics of Kafka, how we can publish and consume messages with a simple Java API, and how messages can be distributed into partitions. This partitioning is one of the features that allows Kafka to scale horizontally.
In the next post, we will see how to use Spark Streaming to consume messages from Kafka.