I wrote quite a few tutorials about Kafka, so now is the time to look at more advanced problems. In this post, we will see how to perform windowed aggregations and how to deal with late events.
In the tutorials, we were processing messages, but we will now start dealing with events. Events are things that happened at a particular time.
Before processing our events, we need to understand the semantics of a timestamp:
The processing time will happen after the event time for 2 reasons:
In any case, the first thing is to make sure our events have a timestamp. Luckily enough, since Kafka 0.10, messages stored in Kafka are associated with a timestamp. This timestamp can either be assigned by the producer, or assigned by the broker if none was provided by the producer. In this example, we will make sure to assign timestamps at the source, i.e. in the producer code.
Suppose we have events arriving in our system, and we want to calculate a basic metric: how many events happened per unit of time. This could look like this, where tN
represents the Nth time unit, and *
represents a single event:
| t1 | t2 | t3 | ...
|* * * * |** * | * *** * *| ...
We would produce the following aggregations:
Now, suppose one of the events that happened during t1
actually took a little bit of time to reach our system, and only came during t2
:
| t1 | t2 | t3 | ...
|* * * . |** * | * *** * *| ...
\-----> *(t1)
If we use the processing time as a reference, we would generate the following results:
Now, you may have noticed I wrote we want to calculate "how many events happened per unit of time", so these results would be incorrect.
The event that arrived at t2
is called a late event and we basically have 2 options:
t1
would be incorrect but the result for t2
would remain correctThe latter can be tricky: when you are performing a streaming aggregation, you need to decide when you will produce aggregates, and whether you can update these aggregates. For instance, we could say:
t1
is over, produce a result ("result for t1 = 3"), then produce another result when we receive a late event ("updated result for t1 = 4").Frameworks offer different options (the model offered by Apache Beam being probably one of the most advanced), and we will see how Kafka Streams behaves in this case.
To test our system, we need a data generator that will send events at a fixed rate, and that will sometimes generate late events. Our generator will generate data with the following pattern:
On the consumer side, we will perform aggregations per windows of 10 seconds. Our goal will therefore to make sure that the late event gets counted in the correct window.
The code is pretty simple (and it's in Kotlin, by the way). We first need a timer that fires every second. We make sure this timer fires 0.2 second after every second, so as to allow the timer to drift slightly without messages being sent at 0.995 second, for instance, which would invalidate the results:
val now = System.currentTimeMillis()
val delay = 1200 - Math.floorMod(now, 1000)
val timer = Timer()
timer.schedule(object : TimerTask() {
override fun run() {
// sending logic
}
}, delay, 1000L)
When the timer fires, we just need to send an event every time unless at the 58th second, and we need to send a late event at the 2nd second (with a late timestamp):
val ts = System.currentTimeMillis()
val second = Math.floorMod(ts / 1000, 60)
if (second != 58L) {
sendMessage("$second", ts, "on time")
}
if (second == 2L) {
// send the late record
sendMessage("58", ts - 4000, "late")
}
In the sendMessage
function, just make sure to assign the timestamp of the message by using the appropriate constructor:
val window = (ts / 10000) * 10000
val value = "window=$window - n=$id - $info"
val futureResult = producer.send(ProducerRecord("events", null, ts, "$window", value))
logger.debug("Sent a record: $value")
futureResult.get()
Let's now create a topic for these events:
$ kafka-topics --zookeeper localhost:2181 --create --topic events --replication-factor 1 --partitions 4
Now, if we run the producer, we can see that event #58 does not arrive between #57 and #59, but instead arrives shortly after #2:
15:37:55.304 ... Sent a record: window=1535398670000 - n=55 - on time
15:37:56.304 ... Sent a record: window=1535398670000 - n=56 - on time
15:37:57.305 ... Sent a record: window=1535398670000 - n=57 - on time
<-- missing event
15:37:59.308 ... Sent a record: window=1535398670000 - n=59 - on time
15:38:00.311 ... Sent a record: window=1535398680000 - n=0 - on time
15:38:01.313 ... Sent a record: window=1535398680000 - n=1 - on time
15:38:02.317 ... Sent a record: window=1535398680000 - n=2 - on time
15:38:02.319 ... Sent a record: window=1535398670000 - n=58 - late <-- late event
15:38:03.318 ... Sent a record: window=1535398680000 - n=3 - on time
15:38:04.318 ... Sent a record: window=1535398680000 - n=4 - on time
Let's build a Kafka Streams application to perform a streaming aggregation: we want to count how many events happened per window of 10 seconds. The first step is to create a KStream from our topic:
val streamsBuilder = StreamsBuilder()
val eventStream: KStream<String, String> = streamsBuilder
.stream("events", Consumed.with(Serdes.String(), Serdes.String()))
We then need to aggregate the number of events per window of 10 seconds. Notice that the groupBy
function is the entry point to aggregate events of a KStream
, and we are providing a dummy key as we want to aggregate all the events together:
val aggregates: KTable<Windowed<String>, Long> = eventStream
.groupBy({ k, v -> "dummy" }, Serialized.with(Serdes.String(), Serdes.String()))
.windowedBy(TimeWindows.of(TimeUnit.SECONDS.toMillis(10)))
.count(Materialized.with(Serdes.String(), Serdes.Long()))
Notice the type of the resulting KTable: KTable<Windowed<String>, Long>
. The table would therefore look like:
key | value
--------------------|-------
1535398670000:dummy | 10
1535398680000:dummy | 7
The key is a compound value, with the window and the (dummy) aggregation key.
We then need to output the KTable to another topic. To do so, we need to convert the KTable to a KStream:
aggregates
.toStream()
.map { ws, i -> KeyValue("${ws.window().start()}", "$i") }
.to("aggs", Produced.with(Serdes.String(), Serdes.String()))
Finally, we can build the topology and start the application:
val topology = streamsBuilder.build()
val props = Properties()
...
val streams = KafkaStreams(topology, props)
streams.start()
By default, Kafka Streams will output results every 30 seconds, and the output will look like this:
1535401900000 10
1535401910000 10
1535401920000 10
1535401930000 2 <-- partial result
... <-- 30 second gap
1535401930000 10 <-- updated result
1535401940000 10
1535401950000 10
1535401960000 2
...
The interesting thing is that all the results that were updated since the last output are printed. In this case, the current aggregate for window 1535401930000 was 2, and an udpated result was printed later on, with a value of 10.
Keep in mind this is a streaming aggregation and, because the stream is unbounded, it is difficult to know when the results are final. That is, you shouldn't think an invalid result was printed. Instead, you should think that the result that was printed was valid at the time of the output, and that this result might change later on.
To output results more frequently than every 30 seconds, you can either change the commit interval or change the size of the cache, as indicated in Record caches in the DSL (see also this KIP that aims at offering more options). Let's change the commit interval for this example:
props["commit.interval.ms"] = 0
We would now get an update every time an event is received:
1535402380000 1
1535402380000 2
1535402380000 3
1535402380000 4
1535402380000 5
1535402380000 6
1535402380000 7
1535402380000 8
1535402380000 9
1535402380000 10
1535402390000 1 <---- beginning of a new window
1535402390000 2
1535402390000 3
1535402390000 4
1535402390000 5
1535402390000 6
1535402390000 7
1535402390000 8
1535402390000 9
1535402400000 1 <---- beginning of a new window
1535402400000 2
1535402400000 3
1535402390000 10 <----- late event
1535402400000 4
1535402400000 5
1535402400000 6
1535402400000 7
1535402400000 8
1535402400000 9
1535402400000 10
Here, it is very easy to spot the impact of the late event:
The good thing is that the window during which the late event arrived (window 1535402400000) does not include the late event. Kafka Streams rightly applied the event time semantics to perform the aggregation!
Processing a stream of events is much more complex than processing a fixed set of records. Events can arrive late, out-of-order, and it is virtually impossible to know when all the data has arrived. The capabilities of the processing framework will therefore make a big difference in how you can process the data. You have to think of when you want the results to be emitted, what to do when data arrives late, etc.
Although Kafka Streams did a good job at handling late events, we saw we had to change the commit interval or the size of the cache. However, this change can have negative side effects on the rest of your application. Apache Beam actually offers a more advanced model based on triggers.
We did not explore in this post how to discard late events. The process is called watermarking and this is controlled in Kafka Streams through the retention period of the aggregation windows. For this as well, Apache Beam offers a more advanced model: Watermarks and late data.
I do hope that, one day, Kafka Streams will implement the Apache Beam model. This would allow Kafka Streams to offer a great processing model as well as a simple deployment model. The best of both worlds.
The code used in this post can be found here. Feel free to ask questions in the comments section below!