Sunday, 2 June 2019

Spark RDDs

Spark RDD
An RDD stands for Resilient Distributed Datasets. It is Read-only partition collection of records. RDD is the fundamental data structure of Spark. It allows a programmer to perform in-memory computations on large clusters in a fault-tolerant manner. Thus, speed up the task. Follow this link to learn Spark RDD in great detail.

The RDD APIs have been on Spark since the 1.0 release.

RDD is a distributed collection of data elements spread across many machines in the cluster. RDDs are a set of Java or Scala objects representing data. It can easily and efficiently process data which is structured as well as unstructured. But like Dataframe and DataSets, RDD does not infer the schema of the ingested data and requires the user to specify it. It contains the collection of records which are partitioned. The basic unit of parallelism in an RDD is called partition. Each partition is one logical division of data which is immutable and created through some transformation on existing partitions. Immutability helps to achieve consistency in computations. We can move from RDD to DataFrame (If RDD is in tabular format) by toDF() method or we can do the reverse by the .rdd method. Learn various RDD Transformations and Actions APIs with examples.

RDD provides a familiar object-oriented programming style with compile-time type safety.

No inbuilt optimization engine is available in RDD. When working with structured data, RDDs cannot take advantages of sparks advance optimizers. For example, catalyst optimizer and Tungsten execution engine. Developers optimise each RDD on the basis of its attributes.

Efficiency is decreased when serialization is performed individually on a java and scala object which takes lots of time.

Spark evaluates RDDs lazily. They do not compute their result right away. Instead, they just remember the transformation applied to some base data set. Spark compute Transformations only when an action needs a result to sent to the driver program. 


Usage of RDDs
  • You can use RDDs When you want low-level transformation and actions on your data set.
  • Use RDDs When you need high-level abstractions.
There are two ways to create RDDs: parallelizing an existing collection in your driver program, or referencing a dataset in an external storage system, such as a shared filesystem, HDFS, HBase, or any data source offering a Hadoop InputFormat.

Spark RDD Operations
Two types of Apache Spark RDD operations are- Transformations and Actions. A Transformation is a function that produces new RDD from the existing RDDs but when we want to work with the actual dataset, at that point Action is performed. When the action is triggered after the result, new RDD is not formed like transformation. 
Apache Spark RDD supports two types of Operations-
  • Transformations
  • Actions

RDD Transformation

Spark Transformation is a function that produces new RDD from the existing RDDs. It takes RDD as input and produces one or more RDD as output. Each time it creates new RDD when we apply any transformation. Thus, the so input RDDs, cannot be changed since RDD are immutable in nature.

Applying transformation built an RDD lineage, with the entire parent RDDs of the final RDD(s). RDD lineage, also known as RDD operator graph or RDD dependency graph. It is a logical execution plan i.e., it is Directed Acyclic Graph (DAG) of the entire parent RDDs of RDD.

Transformations are lazy in nature i.e., they get execute when we call an action. They are not executed immediately. Two most basic type of transformations is a map(), filter().

After the transformation, the resultant RDD is always different from its parent RDD. It can be smaller (e.g. filter, count, distinct, sample), bigger (e.g. flatMap(), union(), Cartesian()) or the same size (e.g. map).



There are two types of transformations:
Narrow transformation – In Narrow transformation, all the elements that are required to compute the records in single partition live in the single partition of parent RDD. A limited subset of partition is used to calculate the result.
Map
flapMap
MapPartition
Filter
Sample
Union

Wide transformation – In wide transformation, all the elements that are required to compute the records in the single partition may live in many partitions of parent RDD. The partition may live in many partitions of parent RDD. 
Intersection
Distinct
ReduceByKey
GroupByKey
Join
Cartesian
Repartition
Coalesce

RDD Operations :
1. map(func)
Map converts an RDD of size ’n’ in to another RDD of size ‘n’. The input and output size of the RDD will be the same. Or to put it in another way, one element in input gets mapped to only one element in output.


So, for example let’s say i have an array, [1,2,3,4] and I want to increment each element by 10. The input size and outpt size are same, so we can use map.
Required : [1,2,3,4] -> [11,12,13,14]
Spark code : myRdd.map(x -> x+10)
So, that is what map function does. While using map, you can be sure that the size of input and output will remain the same and so even if you put a hundred map functions in series, the output and the input will have the same number of elements.

2. FlatMap(func)
Coming to FlatMap, it does a similar job. Transforming one collection to another. Or in spark terms, one RDD to another RDD. But, there is no condition that output size has to be equal to input size. Or to put it in another way, one element in the input can be mapped to zero or more elements in the output.

Also, the output of flatMap is flattened . Though the function in flatMap returns a list of element(s) for each element but the output of FlatMap will be an RDD which has all the elements flattened to a single list.
Let’s see this with an example.
Say you have a text file as follows
Hello World
Who are you
Now, if you run a flatMap on the textFile rdd,
words = linesRDD.flatMap(x -> List(x.split(" ")))
And, the value in the words RDD would be, ["Hello", "World", "Who", "are", "you"]

3. filter(func)
Spark RDD filter() function returns a new RDD, containing only the elements that meet a predicate. It is a narrow operation because it does not shuffle data from one partition to many partitions.
Filter() example:
val data = spark.read.textFile("spark_test.txt").rdd
val mapFile = data.flatMap(lines => lines.split(" ")).filter(value => value=="spark")
println(mapFile.count())
Note – In above code, flatMap function map line into words and then count the word “Spark” using count() Action after filtering lines containing “Spark” from mapFile.


4.mapPartitions(func)
mapPartitions() can be used as an alternative to map() & foreach(). mapPartitions() is called once for each Partition unlike map() & foreach() which is called for each element in the RDD. The main advantage being that, we can do initialization on Per-Partition basis instead of per-element basis(as done by map() & foreach()). Consider the case of Initializing a database. If we are using map() or foreach(), the number of times we would need to initialize will be equal to the no of elements in RDD. Whereas if we use mapPartitions(), the no of times we would need to initialize would be equal to number of Partitions. We get Iterator as an argument for mapPartition, through which we can iterate through all the elements in a Partition.
val path = "/Users/umeshp6655/data/wordcount.txt"
sc.textFile(path).mapPartitions(lines => {
// Using Scala APIs to process each partition
lines.flatMap(_.split(" ")).map((_, 1))}). reduceByKey((total, agg) => total 

+ agg).take(100).foreach(println)
5. mapPartitionWithIndex()
It is like mapPartition; Besides mapPartition it provides func
with an integer value representing the index of the partition, and the map() is applied on partition index wise one after the other.
Example:
val rdd1 = sc.parallelize(List("yellow","red","blue","cyan","black"),3)
val mapped = rdd1.mapPartitionsWithIndex{
// 'index' represents the Partition No
// 'iterator' to iterate through all elements
//in the partition
(index, iterator) => {
println("Called in Partition -> " + index)
val myList = iterator.toList
// In a normal user case, we will do the
// the initialization(ex : initializing database)
// before iterating through each element
myList.map(x => x + " -> " + index).iterator
}}
mapped.collect()
//Output
Called in Partition -> 1
Called in Partition -> 2
Called in Partition -> 0
res7: Array[String] = Array(yellow -> 0, red -> 1, blue -> 1, cyan -> 2, black -> 2)

6. union(dataset)

With the union() function, we get the elements of both the RDD in new RDD. The key rule of this function is that the two RDDs should be of the same type.

Union() example:
val rdd1 = spark.sparkContext.parallelize(Seq((1,"jan",2016),(3,"nov",2014),(16,"feb",2014)))
val rdd2 = spark.sparkContext.parallelize(Seq((5,"dec",2014),(17,"sep",2015)))
val rdd3 = spark.sparkContext.parallelize(Seq((6,"dec",2011),(16,"may",2015)))
val rddUnion = rdd1.union(rdd2).union(rdd3)
rddUnion.foreach(println)



7. intersection(other-dataset)
With the intersection() function, we get only the common element of both the RDD in new RDD. The key rule of this function is that the two RDDs should be of the same type.

Intersection() example:
val rdd1 = spark.sparkContext.parallelize(Seq((1,"jan",2016),(3,"nov",2014, (16,"feb",2014)))
val rdd2 = spark.sparkContext.parallelize(Seq((5,"dec",2014),(1,"jan",2016)))
val comman = rdd1.intersection(rdd2)
comman.foreach(println)

8. distinct()
It returns a new dataset that contains the distinct elements of the source dataset. It is helpful to remove duplicate data.
Distinct() example:
val rdd1 = park.sparkContext.parallelize(Seq((1,"jan",2016),(3,"nov",2014),(16,"feb",2014),(3,"nov",2014)))
val result = rdd1.distinct()
println(result.collect().mkString(", "))

9. groupByKey()
When we use groupByKey() on a dataset of (K, V) pairs, the data is shuffled according to the key value K in another RDD. In this transformation, lots of unnecessary data get to transfer over the network.
groupByKey() example:
val data = spark.sparkContext.parallelize(Array(('k',5),('s',3),('s',4),('p',7),('p',5),('t',8),('k',6)),3)
val group = data.groupByKey().collect()
group.foreach(println)

10. reduceByKey(func, [numTasks])
When we use reduceByKey on a dataset (K, V), the pairs on the same machine with the same key are combined, before the data is shuffled.
reduceByKey() example:
val words = Array("one","two","two","four","five","six","six","eight","nine","ten")
val data = spark.sparkContext.parallelize(words).map(w => (w,1)).reduceByKey(_+_)
data.foreach(println)

11. sortByKey()
When we apply the sortByKey() function on a dataset of (K, V) pairs, the data is sorted according to the key K in another RDD.
sortByKey() example:
val data = spark.sparkContext.parallelize(Seq(("maths",52), ("english",75), ("science",82), ("computer",65), ("maths",85)))
val sorted = data.sortByKey()
sorted.foreach(println)

12. join()
The Join is database term. It combines the fields from two table using common values. join() operation in Spark is defined on pair-wise RDD. Pair-wise RDDs are RDD in which each element is in the form of tuples. Where the first element is key and the second element is the value.
The boon of using keyed data is that we can combine the data together. The join() operation combines two data sets on the basis of the key.
Join() example:
val data = spark.sparkContext.parallelize(Array(('A',1),('b',2),('c',3)))
val data2 =spark.sparkContext.parallelize(Array(('A',4),('A',6),('b',7),('c',3),('c',8)))
val result = data.join(data2)
println(result.collect().mkString(","))

13. coalesce()
To avoid full shuffling of data we use coalesce() function. In coalesce() we use existing partition so that less data is shuffled. Using this we can cut the number of the partition. Suppose, we have four nodes and we want only two nodes. Then the data of extra nodes will be kept onto nodes which we kept.
Coalesce() example:
val rdd1 = spark.sparkContext.parallelize(Array("jan","feb","mar","april","may","jun"),3)
val result = rdd1.coalesce(2)
result.foreach(println)
Note – The coalesce will decrease the number of partitions of the source RDD to numPartitions define in coalesce argument.

RDD Action

Transformations create RDDs from each other, but when we want to work with the actual dataset, at that point action is performed. When the action is triggered after the result, new RDD is not formed like transformation. Thus, Actions are Spark RDD operations that give non-RDD values. The values of action are stored to drivers or to the external storage system. It brings laziness of RDD into motion.
An action is one of the ways of sending data from Executer to the driver. Executors are agents that are responsible for executing a task. While the driver is a JVM process that coordinates workers and execution of the task. Some of the actions of Spark are:

1. count()
Action count() returns the number of elements in RDD.
Count() example:
val data = spark.read.textFile("spark_test.txt").rdd
val mapFile = data.flatMap(lines => lines.split(" ")).filter(value => value=="spark")
println(mapFile.count())
Note – In above code flatMap() function maps line into words and count the word "Spark" using count() Action after filtering lines containing "Spark" from mapFile.

2. collect()
The action collect() is the common and simplest operation that returns our entire RDDs content to driver program. The application of collect() is unit testing where the entire RDD is expected to fit in memory. As a result, it makes easy to compare the result of RDD with the expected result. Action Collect() had a constraint that all the data should fit in the machine, and copies to the driver.
Collect() example:
val data = spark.sparkContext.parallelize(Array(('A',1),('b',2),('c',3)))
val data2 =spark.sparkContext.parallelize(Array(('A',4),('A',6),('b',7),('c',3),('c',8)))
val result = data.join(data2)
println(result.collect().mkString(","))
Note – join() transformation in above code will join two RDDs on the basis of same key(alphabet). After that collect() action will return all the elements to the dataset as an Array.

3. take(n)
The action take(n) returns n number of elements from RDD. It tries to cut the number of partition it accesses, so it represents a biased collection. We cannot presume the order of the elements.
Take() example:
val data = spark.sparkContext.parallelize(Array(('k',5),('s',3),('s',4),('p',7),('p',5),('t',8),('k',6)),3)
val group = data.groupByKey().collect()
val twoRec = result.take(2)
twoRec.foreach(println)
Note – The take(2) Action will return an array with the first n elements of the data set defined in the taking argument.

4. top()
If ordering is present in our RDD, then we can extract top elements from our RDD using top(). Action top() use default ordering of data.
Top() example:
val data = spark.read.textFile("spark_test.txt").rdd
val mapFile = data.map(line => (line,line.length))
val res = mapFile.top(3)
res.foreach(println)
Note – map() operation will map each line with its length. And top(3) will return 3 records from mapFile with default ordering.

5. countByValue()
The countByValue() returns, many times each element occur in RDD.
For example, RDD has values {1, 2, 2, 3, 4, 5, 5, 6} in this RDD "rdd.countByValue()"  will give the result {(1,1), (2,2), (3,1), (4,1), (5,2), (6,1)}
countByValue() example:
val data = spark.read.textFile("spark_test.txt").rdd
val result= data.map(line => (line,line.length)).countByValue()
result.foreach(println)
Note – The countByValue() action will return a hashmap of (K, Int) pairs with the count of each key.

6. reduce()
The reduce() function takes the two elements as input from the RDD and then produces the output of the same type as that of the input elements. The simple forms of such function are an addition. We can add the elements of RDD, count the number of words. It accepts commutative and associative operations as an argument.
Reduce() example:
val rdd1 = spark.sparkContext.parallelize(List(20,32,45,62,8,5))
val sum = rdd1.reduce(_+_)
println(sum)
Note – The reduce() action in above code will add the elements of the source RDD.

7. fold()
The signature of the fold() is like reduce(). Besides, it takes "zero value" as input, which is used for the initial call on each partition. But, the condition with zero value is that it should be the identity element of that operation. The key difference between fold() and reduce() is that, reduce() throws an exception for empty collection, but fold() is defined for empty collection.
For example, zero is an identity for addition; one is identity element for multiplication. The return type of fold() is same as that of the element of RDD we are operating on.
For example, rdd.fold(0)((x, y) => x + y).
Fold() example:
val rdd1 = spark.sparkContext.parallelize(List(("maths", 80),("science", 90)))
val additionalMarks = ("extra", 4)
val sum = rdd1.fold(additionalMarks){ (acc, marks) => val add = acc._2 + marks._2
("total", add)
}
println(sum)
Note – In above code additionalMarks is an initial value. This value will be added to the int value of each record in the source RDD.

8. aggregate()
It gives us the flexibility to get data type different from the input type. The aggregate() takes two functions to get the final result. Through one function we combine the element from our RDD with the accumulator, and the second, to combine the accumulator. Hence, in aggregate, we supply the initial zero value of the type which we want to return.

9. foreach()
When we have a situation where we want to apply operation on each element of RDD, but it should not return value to the driver. In this case, foreach() function is useful. For example, inserting a record into the database.
Foreach() example:
val data = spark.sparkContext.parallelize(Array(('k',5),('s',3),('s',4),('p',7),('p',5),('t',8),('k',6)),3)
val group = data.groupByKey().collect()
group.foreach(println)
Note – The foreach() action run a function (println) on each element of the dataset group.

No comments:

Post a Comment

Integrating Apache Hive with Spark

Hive Warehouse Connector for accessing Apache Spark data The Hive Warehouse Connector (HWC) is a Spark library/plugin that is launched w...