RDD Operation
reduceByKey
The reduceByKey example works much better on a large dataset. That's because Spark knows it can combine output with a common key on each partition before shuffling the data.
Look at the diagram below to understand what happens with reduceByKey. Notice how pairs on the same machine with the same key are combined (by using the lamdba function passed into reduceByKey) before the data is shuffled. Then the lamdba function is called again to reduce all the values from each partition to produce one final result.
groupByKey
when we are calling groupByKey - all the key-value pairs are shuffled around. This is a lot of unnessary data to being transferred over the network.
To determine which machine to shuffle a pair to, Spark calls a partitioning function on the key of the pair. Spark spills data to disk when there is more data shuffled onto a single executor machine than can fit in memory. However, it flushes out the data to disk one key at a time - so if a single key has more key-value pairs than can fit in memory, an out of memory exception occurs. This will be more gracefully handled in a later release of Spark so the job can still proceed, but should still be avoided - when Spark needs to spill to disk, performance is severely impacted.
combineByKey
Spark combineByKey RDD transformation is very similar to combiner in Hadoop MapReduce programming. In this post, we’ll discuss spark combineByKey example in depth and try to understand the importance of this function in detail.
Spark combineByKey function uses following three functions as an argument,
createCombiner
createCombiner function of combineByKey
This function is a first argument of combineByKey function
It is a first aggregation step for each key
It will be executed when any new key is found in a partition
Execution of this lambda function is local to a partition of a node, on each individual values
In our case as we are calculating percentage, we need sum and count as an aggregation. So our createCombiner function should initialize it with a tuple (sum, count). For initial aggregation, it should be (value, 1).
This function is similar to first argument (i.e. zeroVal) of aggregateByKey transformation.
mergeValue
mergeValue function of combineByKey
Second function executes when next subsequent value is given to combiner
It also executes locally on each partition of a node and combines all values
Arguments of this function are a accumulator and a new value
It combines a new value in existing accumulator
In our case mergeValue has one accumulator tuple (sum, count). So whenever we get a new value the marks will be added to first element and second value (i.e. counter) will be incremented by 1.
This function is similar to second argument (i.e. seqOp) of aggregateByKey transformation.
mergeCombiners
mergeCombiner function of combineByKey
Final function is used to combine how to merge two accumulators (i.e. combiners) of a single key across the partitions to generate final expected result
Arguments are two accumulators (i.e. combiners)
Merge results of a single key from different partitions
This function is similar to third argument (i.e. combOp) of aggregateByKey transformation.
Important Points
Apache spark combineByKey is a transformation operation hence its evaluation is lazy
It is a wider operation as it shuffles data in the last stage of aggregation and creates another RDD
Recommended to use when you need to do further aggregation on grouped data
Use combineByKey when return type differs than source type (i.e. when you cannot use reduceByKey )
Example :
val studentRDD = sc.parallelize(Array(("Joseph", "Maths", 83), ("Joseph", "Physics", 74), ("Joseph", "Chemistry", 91), ("Joseph", "Biology", 82), ("Jimmy", "Maths", 69), ("Jimmy", "Physics", 62),
("Jimmy", "Chemistry", 97), ("Jimmy", "Biology", 80), ("Tina", "Maths", 78), ("Tina", "Physics", 73), ("Tina", "Chemistry", 68), ("Tina", "Biology", 87), ("Thomas", "Maths", 87), ("Thomas", "Physics", 93), ("Thomas", "Chemistry", 91), ("Thomas", "Biology", 74), ("Cory", "Maths", 56), ("Cory", "Physics", 65), ("Cory", "Chemistry", 71), ("Cory", "Biology", 68), ("Jackeline", "Maths", 86), ("Jackeline", "Physics", 62), ("Jackeline", "Chemistry", 75), ("Jackeline", "Biology", 83), ("Juan", "Maths", 63), ("Juan", "Physics", 69), ("Juan", "Chemistry", 64), ("Juan", "Biology", 60)), 3)
//Defining createCombiner, mergeValue and mergeCombiner functions
def createCombiner = (tuple: (String, Int)) => (tuple._2.toDouble, 1)
//createCombiner creates the initial value (combiner) for a key's first encounter on a partition if one is not found --> (firstValueEncountered, 1). So, this is merely initializing a tuple with the first value and a key counter of 1.
def mergeValue = (accumulator: (Double, Int), element: (String, Int)) => (accumulator._1 + element._2, accumulator._2 + 1)
//mergeValue is triggered only if a combiner (tuple in our case) has already been created for the found key on this partition --> (existingTuple._1 + subSequentValue, existingTuple._2 + 1). This adds the existing tuple's value (in the first slot) with the newly encountered value and takes the existing tuple's counter (in the second slot) and increments it.
def mergeCombiner = (accumulator1: (Double, Int), accumulator2: (Double, Int)) => (accumulator1._1 + accumulator2._1, accumulator1._2 + accumulator2._2)
// mergeCombiner takes the combiners (tuples) created on each partition and merges them together --> (tupleFromPartition._1 + tupleFromPartition2._1, tupleFromPartition1._2 + tupleFromPartition2._2). This is merely adding the values from each tuple together and the counters together into one tuple.
val combRDD = studentRDD.map(t => (t._1, (t._2, t._3))).combineByKey(createCombiner, mergeValue, mergeCombiner).map(e => (e._1, e._2._1/e._2._2))
//Check the Outout
combRDD.collect foreach println
(Tina,76.5)
(Thomas,86.25)
(Jackeline,76.5)
(Joseph,82.5)
(Juan,64.0)
(Jimmy,77.0)
(Cory,65.0)
mapValues()
When we use map() with a Pair RDD, we get access to both Key & value. There are times we might only be interested in accessing the value(& not key). In those case, we can use mapValues().
Eg:
val inputrdd = sc.parallelize(Seq(("maths", 50), ("maths", 60), ("english", 65)))
val mapped = inputrdd.mapValues(mark => (mark, 1));
mapped.collect()
//Output :
res3: Array[(String, (Int, Int))] = Array((maths,(50,1)), (maths,(60,1)), (english,(65,1)))
Keys()
Keys() operation generally returns a spark RDD of just the keys.
rdd.keys()
values()
values() operation generally returns an RDD of just the values.
rdd.values()
Fold
Fold is a very powerful operation in spark which allows you to calculate many important values in O(n) time. If you are familiar with Scala collection it will be like using fold operation on collection. Even if you not used fold in Scala, this post will make you comfortable in using fold.
Syntax
def fold[T](acc:T)((acc,value) => acc)
The above is kind of high level view of fold api. It has following three things:
T is the data type of RDD
acc is accumulator of type T which will be return value of the fold operation
A function , which will be called for each element in rdd with previous accumulator.
Let’s see some examples of fold
//Code
val itemPrice= sc.parallelize(List(("Soap",10.0),("Toaster",200.0),("Tshirt",400.0)))
val dummyItem = ("dummy",0.0);
val maxPrice = itemPrice.fold(dummyItem)((acc,item) => {
if(acc._2 < item._2) item else acc})
println("maximum price item "+maxPrice )
//Output
maximum price item (Tshirt,400.0)
FoldByKey
In Map/Reduce key plays a role of grouping values. We can use foldByKey operation to aggregate values based on keys.
In this example, employees are grouped by department name. If you want to find the maximum salaries in a given department we can use following code.
Let’s see some examples of foldByKey
//Code
val deptEmployees = sc.parallelize(List(("cs",("jack",1000.0)),("cs",("bron",1200.0)),("phy",("sam",2200.0)),("phy",("ronaldo",500.0)) ))
val maxByDept = deptEmployees.foldByKey(("dummy",0.0))((acc,element)=> if(acc._2 > element._2) acc else element)
println("maximum salaries in each dept" + maxByDept.collect().toList)
//Output
maximum salaries in each deptList((cs,(bron,1200.0)), (phy,(sam,2200.0)))
countByKey()
Through countByKey operation, we can count the number of elements for each key.
rdd.countByKey()
Eg:
val rdd = sc.parallelize(Seq(("math", 55),("math", 56),("english", 57),("english", 58),("science", 59),("science", 54)))
val result1 = rdd.countByKey()
//Output:
result1: scala.collection.Map[String,Long] = Map(math -> 2, english -> 2, science -> 2)
collectAsMap()
collectAsMap() operation helps to collect the result as a map to provide easy lookup.
rdd.collectAsMap()
Eg:
val rdd = sc.parallelize(Seq(("math",55),("math",56),("math",58),("english",57),("english",58),("science",54)))
val result = rdd.collectAsMap()
println(result)
//Output
result: scala.collection.Map[String,Int] = Map(math -> 58, science -> 54, english -> 58)
lookup(key)
it returns all values associated with the provided key.
rdd.lookup()
Eg:
val rdd = sc.parallelize(Seq(("math",55),("math",56),("math",58),("english",57),("english",58),("science",54)))
val result = rdd.lookup("math")
println(result)
//Output
result: Seq[Int] = WrappedArray(55, 56, 58)