This is a post in 3 parts in which I explain how we started a project on Kafka Streams, and why we had to stop using this library because it had a scalability issue. I took a lot of inspiration from Streams, though, and I will explain how we re-implemented our application on top of plain Kafka Consumers.
My goal here is not to say that Kafka Streams is not a good library (I still encourage you to use it, if it is appropriate for your use case), but to describe the engineering that is required to implement a fault-tolerant application that performs a simple aggregation.
Our goal is fairly simple: we want to monitor the quality of the messages in all the topics of a Kafka cluster. To simplify a little, for each message, we want to say if it is valid
or invalid
, and we want to publish metrics: the number of valid
or invalid
messages per second.
The metrics are published to another Kafka topic (one topic for all the metrics of the cluster). We then have another application that reads from this topic, and pushes the metrics to InfluxDB. Finally, we are using Grafana to visualize the metrics:
This sounds simple but there is a lot of complexity to handle:
Let's assume we're using a topic called tx
with 2 partitions (tx-0
and tx-1
), with valid (v
) and invalid (i
) messages, and windows of 1 second (t0
= from second 0 to 1, t1
= from second 1 to 2, etc.).
t0 t1 t2
----------- ----------- -----------
tx-0 |v|v|v|i|v| |v|i| |i|v|v|
----------- ----------- -----------
tx-1 |v|i|v|v| |v|v|v| |i|i|v|v|
----------- ----------- -----------
We would expect the following results:
t0
, 7 valid messages (4 in partition tx-0
+ 3 in partition tx-1
) and 2 valid messages (1 in each partition)t1
, 4 valid messages and 1 valid messaget2
, 4 valid messages and 3 valid messagesThis would materialize as follows in the metrics
topic, assuming that t0=1501273548000
:
$ kafka-console-consumer --topic metrics --property print.key=true ...
{"topic":"tx","window":1501273548000,"status":"valid"} 7
{"topic":"tx","window":1501273548000,"status":"invalid"} 2
{"topic":"tx","window":1501273549000,"status":"valid"} 4
{"topic":"tx","window":1501273549000,"status":"invalid"} 1
{"topic":"tx","window":1501273550000,"status":"valid"} 4
{"topic":"tx","window":1501273550000,"status":"invalid"} 3
And, in InfluxDB, we would have a measurement called tx
with two fields, valid
and invalid
:
> select valid, invalid from tx
name: tx
time valid invalid
---- ----- -------
1501273548000000000 7 2
1501273549000000000 4 1
1501273550000000000 4 3
Now, in the case of late-arrival / late-processed data, we would simply send an update to the metrics
topic. For instance, if we received one more valid message for t1
, we would output the following update:
{"topic":"tx","window":1501273549000,"status":"valid"} 5
In InfluxDB, this would update the existing value of field valid
for the given timestamp:
> select valid, invalid from tx where time=1501273549000000000
name: tx
time valid invalid
---- ----- -------
1501273549000000000 5 1
This is all for now. In the next post, we will see how we implemented this using Kafka Streams.