Apache Spark: MapReduce and RDD manipulations with keys

In a previous article, we saw that Apache Spark allows us to perform aggregations on every value of an RDD. We will now perform aggregations and other manipulations with keys.

Approach

A key-based reduction operation aggregates values for each RDD key. This type of operation can only be performed on an RDD of type JavaPairRDD, which is an RDD in which elements are key-value tuples. Note that, unlike with Java Map objects, there is no unique constraint on keys. Multiple tuples with the same key may exist in the RDD.

The reduction operation will be performed on the values of the same key until there is only one value per key. The resulting RDD will then be a true key-value Map, with unique keys.

Let’s take the following RDD as example (key, value):

(A, 3)
(A, 5)
(B, 2)
(A, 4)
(B, 7)

After application of a reduction by keys, summing values, we obtain:

(A, 12)
(B, 9)

The RDD contains as many tuples as there were distinct keys in the original RDD.

Sample data

We are going to use Wikipedia traffic metrics as a dataset. These files are public and free. One file is produced every hour. Each file is about 300 MB, once uncompressed.

Here is an example file extract:

fr Louvoy%C3%A8rent 1 7766  
fr Louvoyer 2 24276  
fr Louvre-Lens 1 39497  
fr Louvres 1 36541  
fr Louxor 2 33183  

Each line represents a record with 4 space-delimited fields:

  • the name of the Wikipedia project: country code, sometimes followed by a suffix indicating if the site is Wikipedia, Wikibooks, Wiktionary, etc.
  • the URL-encoded page title
  • the number of requests received
  • the size of the page in bytes

Let’s practice

From the Wikipedia statistics, we can calculate the number of visits per project. Unlike with a global aggregation, we want to obtain a key-value list: project code – total number of visits.

Let’s first read a file by lines and split each record:

sc.textFile(“data/wikipedia-pagecounts/pagecounts-20141101-000000″)  
  .map(line -> line.split(” “))

We obtain a JavaRDD<String[]>.

We can transform this JavaRDD into a JavaPairRDD through the mapToPair() operation. And then we have to return objects of type Tuple2:

  .mapToPair(s -> new Tuple2<String , Long>(s[0], Long.parseLong(s[2])))

The JavaPairRDD provides transformations allowing us to natively work on this key-value collection: reduceByKey(), sortByKey(), as well as functions that can combine two JavaPairRDDs: join(), intersect(), etc.

For example, we will use the reduceByKey() function with a sum operation. Returned values will belong to the same key, without us having knowledge of the key:

  .reduceByKey((x, y) -> x + y)

At last, we can output all the tuples to the console. The tuple key is represented by field _1, while the value is represented by field _2.

  .foreach(t -> System.out.println(t._1 + ” -> ” + t._2))

The entire code is:

SparkConf conf = new SparkConf().setAppName(“wikipedia-mapreduce-by-key”)  
.setMaster(“local”);
JavaSparkContext sc = new JavaSparkContext(conf);  
sc.textFile(“data/wikipedia-pagecounts/pagecounts-20141101-000000″)  
  .map(line -> line.split(” “))
  .mapToPair(s -> new Tuple2<String , Long>(s[0], Long.parseLong(s[2])))
  .reduceByKey((x, y) -> x + y)
  .foreach(t -> System.out.println(t._1 + ” -> ” + t._2));

When executing this, we get a result similar to:

got.mw -> 14  
mo.d -> 1  
eo.q -> 38  
fr -> 602583  
ja.n -> 167  
mus -> 21  
xal -> 214  

The value 3269849 for Wikipedia France is then the sum of visits marked as “fr” in the file.

Sorting results by key

We notice that results are not sorted. Indeed, for performance reasons, Apache Spark does not guarantee order in the RDD: tuples are processed independently.

We can sort tuples by key with the sortByKey() method which takes a boolean as input to reverse the sort:

  .sortByKey()

The result is:

AR -> 195  
De -> 115  
EN -> 4  
En -> 10  
En.d -> 8  
FR -> 1  
It -> 2  
SQ.mw -> 11  
Simple -> 1  
aa -> 27  
aa.b -> 6  
aa.d -> 1  
aa.mw -> 11  
…

The sort is case-sensitive. To perform a case-insensitive sort, we can use a comparator. Unfortunately, we cannot use a comparator coming from Comparator.comparing() (new in Java 8), because the returned comparator is not serializable.

// produces an exception:
// Task not serializable: java.io.NotSerializableException
.sortByKey(Comparator.comparing(String::toLowerCase))

We then have to use a comparator that implements the Serializable interface:

class LowerCaseStringComparator implements Comparator, Serializable {  
@Override
public int compare(String s1, String s2) {  
return s1.toLowerCase().compareTo(s2.toLowerCase());  
}
}

We then use the comparator this way:

Comparator c = new LowerCaseStringComparator(); … .sortByKey(c)

We obtain what we wanted:
… ang.q -> 15
ang.s -> 9
AR -> 195
ar -> 108324
ar.b -> 293

Sorting results by value

The JavaPairRDD class has a sortByKey() method, but there is no sortByValue() method. To sort by value, we then have to reverse our tuples so that values become keys.

Since a JavaPairRDD does not impose unique keys, we can have redundant values.

We reverse tuples with mapToPair():

.mapToPair(t -> new Tuple2<Long , String>(t._2, t._1))

We can then sort the RDD by descending order (highest values first) and save the 10 first elements with the method take():

.sortByKey(false).take(10)

Note that take() returns a Java collection (java.util.List), not an RDD. The forEach() method we use is then the one from the collections API, not the RDD’s forEach().

.forEach(t -> System.out.println(t._2 + ” -> ” + t._1));

The sorting code is:

.mapToPair(t -> new Tuple2<Long , String>(t._2, t._1))
.sortByKey(false) .take(10)
.forEach(t -> System.out.println(t._2 + ” -> ” + t._1));

We then obtain the top 10 of the most visited Wikipedia sites in that hour:

meta.m -> 15393394  
meta.mw -> 12390990  
en -> 7209054  
en.mw -> 4405366  
es -> 1210216  
de -> 692501  
ja.mw -> 674700  
es.mw -> 666607  
ru -> 664970  
ja -> 637371  

Processing multiple files

It is possible to process multiple files thanks to wildcard support in the textFile() method parameter.
We can tell Apache Spark to use 16 threads through setMaster(“local[16]”).

Thus, the top 10 calculation for November 1st (24 files) becomes:

SparkConf conf = new SparkConf()  
.setAppName(“wikipedia-mapreduce-by-key”)
.setMaster(“local[16]”);
JavaSparkContext sc = new JavaSparkContext(conf);  
sc.textFile(“data/wikipedia-pagecounts/pagecounts-20141101-*”)  
.map(line -> line.split(” “))
.mapToPair(s -> new Tuple2<String , Long>(s[0], Long.parseLong(s[2])))
.reduceByKey((x, y) -> x + y)
.mapToPair(t -> new Tuple2<Long , String>(t._2, t._1))
.sortByKey(false) .take(10)
.forEach(t -> System.out.println(t._2 + ” -> ” + t._1));

We get this result:

meta.m -> 430508482  
meta.mw -> 313310187  
en -> 171157735  
en.mw -> 103491223  
ru -> 29955421  
ja -> 22201812  
ja.mw -> 22186094  
es -> 21424067  
de -> 21098513  
fr -> 17967662  

On a Macbook pro (SSD, Core i7 2GHz), this processing takes 40 seconds for 9 GB of data. Not bad!

Conclusion

We had a good overview of the Apache Spark API and what it offers. The manipulation of key-value tuples is one of the most frequently used parts of the framework.
In our following posts, we will deploy a Spark cluster and study the streaming API.

This code is available on Github.


Found this post useful? Kindly tap
Author image
Big Data Engineer & Managing Consultant - I work with Spark, Kafka and Cassandra. My preferred language is Scala!
Washington, DC, USA LinkedIn
OUR COMPANY
Ippon Technologies is an international consulting firm that specializes in Agile Development, Big Data and DevOps / Cloud. Our 300+ highly skilled consultants are located in the US, France and Australia. Ippon technologies has a $32 million revenue and a 20% annual growth rate.