This is the post number 8 in this series where we go through the basics of using Kafka. So far, we have been using the Java client for Kafka, and Kafka Streams. This time, we are going to use Spark Structured Streaming (the counterpart of Spark Streaming that provides a Dataframe API).
Although we used Kotlin in the previous posts, we are going to code in Scala this time. This is because Spark's Java API is more complicated to use than the Scala API.
We are going to reuse the example from part 1 and part 2 of this tutorial. In part 1, we created a producer than sends data in JSON format to a topic:
{"firstName":"Quentin","lastName":"Corkery","birthDate":"1984-10-26T03:52:14.449+0000"}
{"firstName":"Neil","lastName":"Macejkovic","birthDate":"1971-08-06T18:03:11.533+0000"}
...
We are going to build the consumer that processes the data to calculate the age of the persons, as we did in part 2:
Quentin Corkery 33
Lysanne Beer 20
...
We are going to create a separate project using SBT (Scala Build Tool). Let's create the build.sbt
file:
name := "kafka-tutorials"
organization := "com.ippontech"
version := "0.0.1-SNAPSHOT"
scalaVersion := "2.11.12"
val sparkVersion = "2.2.2"
libraryDependencies ++= Seq(
"org.scala-lang" % "scala-library" % scalaVersion.value,
"org.apache.spark" %% "spark-sql" % sparkVersion,
"org.apache.spark" %% "spark-sql-kafka-0-10" % sparkVersion
)
We added dependencies for Spark SQL - necessary for Spark Structured Streaming - and for the Kafka connector. A few notes about the versions we used:
We can now add a log4j.properties
file under the src/main/resources
directory to configure the logging, and we can start coding.
Let's create a class called StreamsProcessor
with a constructor to initialize the brokers
field, and add a process()
method where we will do all the work:
class StreamsProcessor(brokers: String) {
def process(): Unit = {
...
}
}
We can create the entry point of our application by writing a main
function. In Scala, a static
method needs to be in an object
, not in a class
, so let's create one:
object StreamsProcessor {
def main(args: Array[String]): Unit = {
new StreamsProcessor("localhost:9092").process()
}
}
We can now initialize Spark in the process()
method:
val spark = SparkSession.builder()
.appName("kafka-tutorials")
.master("local[*]")
.getOrCreate()
import spark.implicits._
We are going to work locally, running the application straight from our IDE, so we are setting the master to local[*]
, meaning we are creating as many threads as there are cores on the machine.
We also imported Spark's implicit conversions to make it easier to work with Dataframes, in particular for column selectors ($"<column name>"
).
Thanks to the Kafka connector that we added as a dependency, Spark Structured Streaming can read a stream from Kafka:
val inputDf = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokers)
.option("subscribe", "persons")
.load()
Let's see the structure of the Dataframe by calling .printSchema()
on it:
root
|-- key: binary (nullable = true)
|-- value: binary (nullable = true)
|-- topic: string (nullable = true)
|-- partition: integer (nullable = true)
|-- offset: long (nullable = true)
|-- timestamp: timestamp (nullable = true)
|-- timestampType: integer (nullable = true)
Each message will have columns for:
0
).Now, if we wanted to visualize the data, we could not just call .show()
on the Dataframe. Instead, we have to write a streaming query that outputs the content of the Dataframe to the console:
val consoleOutput = inputDf.writeStream
.outputMode("append")
.format("console")
.start()
consoleOutput.awaitTermination()
And the output would look like:
-------------------------------------------
Batch: 8
-------------------------------------------
+----+--------------------+-------+---------+------+--------------------+-------------+
| key| value| topic|partition|offset| timestamp|timestampType|
+----+--------------------+-------+---------+------+--------------------+-------------+
|null|[7B 22 66 69 72 7...|persons| 0| 25582|2018-08-14 12:02:...| 0|
+----+--------------------+-------+---------+------+--------------------+-------------+
-------------------------------------------
Batch: 9
-------------------------------------------
+----+--------------------+-------+---------+------+--------------------+-------------+
| key| value| topic|partition|offset| timestamp|timestampType|
+----+--------------------+-------+---------+------+--------------------+-------------+
|null|[7B 22 66 69 72 7...|persons| 0| 25583|2018-08-14 12:02:...| 0|
+----+--------------------+-------+---------+------+--------------------+-------------+
...
We can see that the value of the messages is indeed binary data.
Spark sees the data in binary format, but we know it is actually JSON data. Let's first convert the messages to strings:
val personJsonDf = inputDf.selectExpr("CAST(value AS STRING)")
The output now looks like:
+--------------------+
| value|
+--------------------+
|{"firstName":"Rey...|
+--------------------+
We can now deserialize the JSON. The easiest is to use Spark's from_json()
function from the org.apache.spark.sql.functions
object. We need to provide the structure (list of fields) of the JSON data so that the Dataframe can reflect this structure:
val struct = new StructType()
.add("firstName", DataTypes.StringType)
.add("lastName", DataTypes.StringType)
.add("birthDate", DataTypes.StringType)
val personNestedDf = personJsonDf.select(from_json($"value", struct).as("person"))
At this point, the Dataframe contains nested columns, as indicated by the schema:
root
|-- person: struct (nullable = true)
| |-- firstName: string (nullable = true)
| |-- lastName: string (nullable = true)
| |-- birthDate: string (nullable = true)
Let's flatten this Dataframe:
val personFlattenedDf = personNestedDf.selectExpr("person.firstName", "person.lastName", "person.birthDate")
root
|-- firstName: string (nullable = true)
|-- lastName: string (nullable = true)
|-- birthDate: string (nullable = true)
As you may have noticed, we read the birth date as a string. This is because, even though the from_json()
function relies on Jackson, there is no way to specify the format of the date to read at that time (we used an ISO-8601 format).
Instead, we are going to use Spark's to_timestamp()
function (there is also a to_date()
function if you want to read a date with a time indication). Notice that, in this case, we are overriding the birthDate
column by creating a new column with the same name:
val personDf = personFlattenedDf.withColumn("birthDate", to_timestamp($"birthDate", "yyyy-MM-dd'T'HH:mm:ss.SSSZ"))
We can now calculate the age of the persons. Spark doesn't have a built-in function to calculate the number of years between two dates, so we are going to create a User Defined Function (UDF).
We start by creating a regular Scala function (or lambda, in this case) taking a java.sql.Timestamp
in input (this is how timestamps are represented in a Spark Datateframe), and returning an Int
:
val ageFunc: java.sql.Timestamp => Int = birthDate => {
val birthDateLocal = birthDate.toLocalDateTime().toLocalDate()
val age = Period.between(birthDateLocal, LocalDate.now()).getYears()
age
}
We can now wrap this function in a Spark UDF:
val ageUdf: UserDefinedFunction = udf(ageFunc, DataTypes.IntegerType)
We can finally apply the UDF to the birthDate
column to create a new column called age
:
val processedDf = personDf.withColumn("age", ageUdf.apply($"birthDate"))
Now that we have processed the data to calculate the age of the persons, we need to get ready to output the data to another Kafka topic.
The Kafka connector supports writing to Kafka. The requirement is for the Dataframe to have columns named key
and value
, both either of type string or binary.
In our case, we want the key to be in the form <first name><space><last name>
. We can use the concat()
function as well as the lit()
function for the space. For the value, we will need to convert the age
column to a string.
We then use .select()
on the Dataframe to compute these two columns and assign them names thanks to the as()
method (alias()
would do the same).
val resDf = processedDf.select(
concat($"firstName", lit(" "), $"lastName").as("key"),
processedDf.col("age").cast(DataTypes.StringType).as("value"))
Our Dataframe is ready, in the format that is expected by the Kafka connector, so we can instruct Spark to output the stream of data to Kafka:
val kafkaOutput = resDf.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokers)
.option("topic", "ages")
.option("checkpointLocation", "/Users/aseigneurin/dev/kafka-tutorials/spark/checkpoints")
.start()
kafkaOutput.awaitTermination()
Notice that we have to specify a location for checkpoints. This is required for Kafka's High Availability feature, although this does not guarantee that the messages will be written exactly once: Spark only provides at-least-once delivery guarantee.
To test this code, let's first start the producer we created in part 1 and run a console consumer to visualize the data:
{"firstName":"Corene","lastName":"Glover","birthDate":"1999-12-26T17:07:18.279+0000"}
{"firstName":"Forest","lastName":"Spencer","birthDate":"1979-04-29T08:07:12.123+0000"}
...
Now, let's run the Spark application straight from our IDE and run a console consumer on the ages
topic:
Corene Glover 18
Forest Spencer 39
...
It all looks good!
Once thing to know, if you left a console output in your code, is that you need to start all the queries before you call awaitTermination()
on any of them, e.g.:
val consoleOutput = processedDf.writeStream
...
.start()
val kafkaOutput = resDf.writeStream
...
.start()
kafkaOutput.awaitTermination()
consoleOutput.awaitTermination()
If you already use Spark to process data in batch with Spark SQL, Spark Structured Streaming is appealing. It offers the same Dataframes API as its batch counterpart.
My personal opinion is more contrasted, though:
It is up to you to make your own opinion but, at the time of this writing, Kafka Streams is my preferred option for building a streaming application.
The code of this tutorial can be found here.
Feel free to ask questions in the comments section below!