Thursday, 20 June 2019

RDD creation in Apache Spark

RDD creation in Apache Spark

There are three ways to create RDDs :
  1. Using parallelized collection
  2. From existing Apache Spark RDDs 
  3. From external datasets.

1. Using Parallelized collection (parallelizing)
In the initial stage, RDDs are generally created by parallelized collection i.e. by taking an existing collection in the program and passing it to SparkContext’s parallelize() method. This method is used in the initial stage of learning Spark since it quickly creates our own RDDs in Spark shell and performs operations on them. This method is rarely used outside testing and prototyping because this method requires entire dataset on one machine.
Eg:

val data=spark.sparkContext.parallelize(Seq(("maths",52),("english",75),("science",82), ("computer",65),("maths",85)))

data.collect()


2. From existing Apache Spark RDDs
In Spark, the distributed dataset can be formed from any data source supported by Hadoop, including the local file system, HDFS, Cassandra, HBase etc. In this, the data is loaded from the external dataset. To create text file RDD, we can use SparkContext’s textFile method. It takes URL of the file and read it as a collection of line. URL can be a local path on the machine or a hdfs://, s3n://, etc.
The point to jot down is that the path of the local file system and worker node should be the same. The file should be present at same destinations both in the local file system and worker node.  We can copy the file to the worker nodes or use a network mounted shared file system.
Eg:
//How many unique professions do we have in the data file?
var userFile = sc.textFile("/FileStore/tables/user.txt")
var fileData = userFile.map(lines=>lines.split(","))
var fileDataOpr = fileData.map(recd => (recd(0),recd(1),recd(2),recd(3),recd(4)))
var dataMapLogic = fileDataOpr.map{case(id,age,sex,profession,pincode)=>profession}
//println(dataMapLogic.distinct.collect().map{x=>println(x)})
println(dataMapLogic.distinct.count())

3. From external datasets
Transformation mutates one RDD into another RDD, thus transformation is the way to create an RDD from already existing RDD.

RDD Persistence and Caching in Spark?
Spark RDD persistence is an optimization technique in which saves the result of RDD evaluation. We can make persisted RDD through cache() and persist() methods. When we use the cache() method we can store all the RDD in-memory. We can persist the RDD in memory and use it efficiently across parallel operations. The difference between cache() and persist() is that using cache() the default storage level is MEMORY_ONLY while using persist() we can use various storage levels. When the RDD is computed for the first time, it is kept in memory on the node. The cache memory of the Spark is fault tolerant so whenever any partition of RDD is lost, it can be recovered by transformation Operation that originally created it.

Benefits of RDD Persistence in Spark
Some advantages of RDD caching and persistence mechanism in spark are :
Time efficient
Cost efficient
Lessen the execution time.

Storage levels of Persisted RDDs :
Using persist() we can use various storage levels to Store Persisted RDDs in Apache Spark.
1) MEMORY_ONLY
In this storage level, RDD is stored as deserialized Java object in the JVM. If the size of RDD is greater than memory, It will not cache some partition and recompute them next time whenever needed. In this level the space used for storage is very high, the CPU computation time is low, the data is stored in-memory. It does not make use of the disk.

2) MEMORY_AND_DISK
In this level, RDD is stored as deserialized Java object in the JVM. When the size of RDD is greater than the size of memory, it stores the excess partition on the disk, and retrieve from disk whenever required. In this level the space used for storage is high, the CPU computation time is medium, it makes use of both in-memory and on disk storage.

3) MEMORY_ONLY_SER
This level of Spark store the RDD as serialized Java object (one-byte array per partition). It is more space efficient as compared to deserialized objects, especially when it uses fast serializer. But it increases the overhead on CPU. In this level the storage space is low, the CPU computation time is high and the data is stored in-memory. It does not make use of the disk.

4) MEMORY_AND_DISK_SER
It is similar to MEMORY_ONLY_SER, but it drops the partition that does not fits into memory to disk, rather than recomputing each time it is needed. In this storage level, The space used for storage is low, the CPU computation time is high, it makes use of both in-memory and on disk storage.

5) DISK_ONLY
In this storage level, RDD is stored only on disk. The space used for storage is low, the CPU computation time is high and it makes use of on disk storage.

Unpersist RDD in Spark?
Spark monitor the cache of each node automatically and drop out the old data partition in the LRU (least recently used) fashion. LRU is an algorithm which ensures the least frequently used data. It spills out that data from the cache. We can also remove the cache manually using RDD.unpersist() method.

Features of Spark RDD
There are several advantages of using RDD. Some of them are-
1. In-memory computation
The data inside RDD are stored in memory for as long as you want to store. Keeping the data in-memory improves the performance by an order of magnitudes.
2. Lazy Evaluation
The data inside RDDs are not evaluated on the go. The changes or the computation is performed only after an action is triggered. Thus, it limits how much work it has to do.
3. Fault Tolerance
Upon the failure of worker node, using lineage of operations we can re-compute the lost partition of RDD from the original one. Thus, we can easily recover the lost data.
4. Immutability
RDDs are immutable in nature meaning once we create an RDD we can not manipulate it. And if we perform any transformation, it creates new RDD. We achieve consistency through immutability.
5. Persistence
We can store the frequently used RDD in in-memory and we can also retrieve them directly from memory without going to disk, this speedup the execution. We can perform Multiple operations on the same data, this happens by storing the data explicitly in memory by calling persist() or cache() function.
6. Partitioning
RDD partition the records logically and distributes the data across various nodes in the cluster. The logical divisions are only for processing and internally it has no division. Thus, it provides parallelism.
7. Parallel
RDD process the data parallelly over the cluster.
8. Location-Stickiness
RDDs are capable of defining placement preference to compute partitions. Placement preference refers to information about the location of RDD. The DAGScheduler places the partitions in such a way that task is close to data as much as possible. Thus speed up computation.
9. Coarse-grained Operation
We apply coarse-grained transformations to RDD. Coarse-grained meaning the operation applies to the whole dataset not on an individual element in the data set of RDD.
10. No limitation
we can have any number of RDD. there is no limit to its number. the limit depends on the size of disk and memory.

Limitations of RDD in Apache Spark?
1. No input optimization engine
There is no provision in RDD for automatic optimization. It cannot make use of Spark advance optimizers like catalyst optimizer and Tungsten execution engine. We can optimize each RDD manually. This limitation is overcome in Dataset and DataFrame, both make use of Catalyst to generate optimized logical and physical query plan.
2. Runtime type safety
There is no Static typing and run-time type safety in RDD. It does not allow us to check error at the runtime. Dataset provides compile-time type safety to build complex data workflows. Compile-time type safety means if you try to add any other type of element to this list, it will give you compile time error.It helps detect errors at compile time and makes your code safe.
3. Degrade when not enough memory
The RDD degrades when there is not enough memory to store RDD in-memory or on disk. There comes storage issue when there is a lack of memory to store RDD. The partitions that overflow from RAM can be stored on disk and will provide the same level of performance. By increasing the size of RAM and disk it is possible to overcome this issue.
4. Performance limitation & Overhead of serialization & garbage collection
Since the RDD are in-memory JVM object, it involves the overhead of Garbage Collection and Java serialization this is expensive when the data grows. Since the cost of garbage collection is proportional to the number of Java objects. Using data structures with fewer objects will lower the cost or we can persist the object in serialized form.
5. Handling structured data
RDD does not provide schema view of data. It has no provision for handling structured data.
Dataset and DataFrame provide the Schema view of data. It is a distributed collection of data organized into named columns.
So, this was all in limitations of RDD in Apache Spark.

RDD Lineage
RDD is lazy in nature. It means a series of transformations are performed on an RDD, which is not even evaluated immediately. While we create a new RDD from an existing Spark RDD, that new RDD also carries a pointer to the parent RDD in Spark. That is the same as all the dependencies between the RDDs those are logged in a graph, rather than the actual data. It is what we call as lineage graph.
RDD lineage is nothing but the graph of all the parent RDDs of an RDD. We also call it an RDD operator graph or RDD dependency graph. To be very specific, it is an output of applying transformations to the spark. Then, it creates a logical execution plan.

Spark's Advantages

Advantages of using Spark

Spark In-memory Computing

In in-memory computation, the data is kept in random access memory(RAM) instead of some slow disk drives and is processed in parallel. Using this we can detect a pattern, analyze large data. This has become popular because it reduces the cost of memory. So, in-memory processing is economic for applications. The two main columns of in-memory computation are-

  • RAM storage
  • Parallel distributed processing.

Spark In-memory Computing
Keeping the data in-memory improves the performance by an order of magnitudes. The main abstraction of Spark is its RDDs. And the RDDs are cached using the cache() or persist() method.
When we use cache() method, all the RDD stores in-memory. When RDD stores the value in memory, the data that does not fit in memory is either recalculated or the excess data is sent to disk. Whenever we want RDD, it can be extracted without going to disk. This reduces the space-time complexity and overhead of disk storage.
The in-memory capability of Spark is good for machine learning and micro-batch processing.
 The difference between cache() and persist() is that using cache() the default storage level is MEMORY_ONLY while using persist() we can use various storage levels.
Storage levels of RDD Persist() in Spark
Using persist() we can use various storage levels to Store Persisted RDDs in Apache Spark.
1) MEMORY_ONLY
In this storage level, RDD is stored as deserialized Java object in the JVM. If the size of RDD is greater than memory, It will not cache some partition and recompute them next time whenever needed. In this level the space used for storage is very high, the CPU computation time is low, the data is stored in-memory. It does not make use of the disk.

2) MEMORY_AND_DISK
In this level, RDD is stored as deserialized Java object in the JVM. When the size of RDD is greater than the size of memory, it stores the excess partition on the disk, and retrieve from disk whenever required. In this level the space used for storage is high, the CPU computation time is medium, it makes use of both in-memory and on disk storage.

3) MEMORY_ONLY_SER
This level of Spark store the RDD as serialized Java object (one-byte array per partition). It is more space efficient as compared to deserialized objects, especially when it uses fast serializer. But it increases the overhead on CPU. In this level the storage space is low, the CPU computation time is high and the data is stored in-memory. It does not make use of the disk.

4) MEMORY_AND_DISK_SER
It is similar to MEMORY_ONLY_SER, but it drops the partition that does not fits into memory to disk, rather than recomputing each time it is needed. In this storage level, The space used for storage is low, the CPU computation time is high, it makes use of both in-memory and on disk storage.

5) DISK_ONLY
In this storage level, RDD is stored only on disk. The space used for storage is low, the CPU computation time is high and it makes use of on disk storage.
Spark Lazy Evaluation
Lazy evaluation in Spark means that the execution will not start until an action is triggered. In Spark, the picture of lazy evaluation comes when Spark transformations occur.

Transformations are lazy in nature meaning when we call some operation in RDD, it does not execute immediately. Spark maintains the record of which operation is being called(Through DAG). We can think Spark RDD as the data, that we built up through transformation. Since transformations are lazy in nature, so we can execute operation any time by calling an action on data. Hence, in lazy evaluation data is not loaded until it is necessary.

 Advantages of Lazy Evaluation in Spark Transformation
There are some benefits of Lazy evaluation in Apache Spark-
1) Increases Manageability
By lazy evaluation, users can organize their Apache Spark program into smaller operations. It reduces the number of passes on data by grouping operations.
2) Saves Computation and increases Speed
Spark Lazy Evaluation plays a key role in saving calculation overhead. Since only necessary values get compute. It saves the trip between driver and cluster, thus speeds up the process.
3) Reduces Complexities
The two main complexities of any operation are time and space complexity. Using Apache Spark lazy evaluation we can overcome both. Since we do not execute every operation, Hence, the time gets saved. It let us work with an infinite data structure. The action is triggered only when the data is required, it reduces overhead.
4) Optimization
It provides optimization by reducing the number of queries.

Lazy evaluation enhances the power of Apache Spark by reducing the execution time of the RDD operations. It maintains the lineage graph to remember the operations on RDD. As a result, it Optimizes the performance and achieves fault tolerance.


Spark RDD Fault Tolerance

Spark operates on data in fault-tolerant file systems like HDFS or S3. So all the RDDs generated from fault tolerant data is fault tolerant. But this does not set true for streaming/live data (data over the network). So the key need of fault tolerance in Spark is for this kind of data. The basic fault-tolerant semantic of Spark are:
Since Apache Spark RDD is an immutable dataset, each Spark RDD remembers the lineage of the deterministic operation that was used on fault-tolerant input dataset to create it. If due to a worker node failure any partition of an RDD is lost, then that partition can be re-computed from the original fault-tolerant dataset using the lineage of operations.
Assuming that all of the RDD transformations are deterministic, the data in the final transformed RDD will always be the same irrespective of failures in the Spark cluster.
To achieve fault tolerance for all the generated RDDs, the achieved data replicates among multiple Spark executors in worker nodes in the cluster. This results in two types of data that needs to recover in the event of failure:
Data received and replicated – In this, the data gets replicated on one of the other nodes thus the data can be retrieved when a failure.

Data received but buffered for replication – The data is not replicated thus the only way to recover fault is by retrieving it again from the source.

Failure also occurs in worker as well as driver nodes.
Failure of worker node – The node which runs the application code on the Spark cluster is Spark worker node. These are the slave nodes. Any of the worker nodes running executor can fail, thus resulting in loss of in-memory If any receivers were running on failed nodes, then their buffer data will be lost.
Failure of driver node – If there is a failure of the driver node that is running the Spark Streaming application, then SparkContent losses and all executors lose their in-memory data.
Apache Mesos helps in making the Spark master fault tolerant by maintaining the backup masters. It is open source software residing between the application layer and the operating system. It makes easier to deploy and manage applications in large-scale clustered environment.  Executors are relaunched if they fail. Post failure, executors are relaunched automatically and spark streaming does parallel recovery by recomputing Spark RDD’s on input data. Receivers are restarted by the workers when they fail.

Fault Tolerance with Receiver-based sources
For input sources based on receivers, the fault tolerance depends on both- the failure scenario and the type of receiver. There are two types of receiver:
Reliable receiver – Once we ensure that the received data replicates, the reliable sources are acknowledged. If the receiver fails, the source will not receive acknowledgment for the buffered data. So, the next time restarts the receiver, the source will resend the data. Hence, no data will lose due to failure.
Unreliable Receiver – Due to the worker or driver failure, the data can loss sincethe receiver does not send an acknowledgment.

If the worker node fails, and the receiver is reliable there will be no data loss. But in the case of unreliable receiver data loss will occur. With the unreliable receiver, data received but not replicated can be lost.
Spark Streaming write ahead logs
If the driver node fails, all the data that was received and replicated in memory will be lost. This will affect the result of the stateful transformation. To avoid the loss of data, Spark 1.2 introduced write ahead logs, which save received data to fault-tolerant storage. All the data received is written to write ahead logs before it can be processed to Spark Streaming.
Write ahead logs are used in database and file system. It ensure the durability of any data operations. It works in the way that first the intention of the operation is written down in the durable log. After this, the operation is applied to the data. This is done because if the system fails in the middle of applying the operation, the lost data can be recovered. It is done by reading the log and reapplying the data it has intended to do.

DAG in Apache Spark

DAG(Directed Acyclic Graph) in Apache Spark is a set of Vertices and Edges, where vertices represent the RDDs and the edges represent the Operation to be applied on RDD. In Spark DAG, every edge directs from earlier to later in the sequence. On the calling of Action, the created DAG submits to DAG Scheduler which further splits the graph into the stages of the task.
It contains a sequence of vertices such that every edge is directed from earlier to later in the sequence. 

The limitations of Hadoop MapReduce became a key point to introduce DAG in Spark. The computation through MapReduce in three steps:
The data is read from HDFS.
Then apply Map and Reduce operations.
The computed result is written back to HDFS.

Each MapReduce operation is independent of each other and HADOOP has no idea of which Map reduce would come next. Sometimes for some iteration, it is irrelevant to read and write back the immediate result between two map-reduce jobs. In such case, the memory in stable storage (HDFS) or disk memory gets wasted.
In multiple-step, till the completion of the previous job all the jobs block from the beginning. As a result, complex computation can require a long time with small data volume.
While in Spark, a DAG (Directed Acyclic Graph) of consecutive computation stages is formed. In this way, we optimize the execution plan, e.g. to minimize shuffling data around. 

DAG works in Spark:
The interpreter is the first layer, using a Scala interpreter, Spark interprets the code with some modifications.
Spark creates an operator graph when you enter your code in Spark console.
When we call an Action on Spark RDD at a high level, Spark submits the operator graph to the DAG Scheduler.
Divide the operators into stages of the task in the DAG Scheduler. A stage contains task based on the partition of the input data. The DAG scheduler pipelines operators together. For example, map operators schedule in a single stage.
The stages pass on to the Task Scheduler. It launches task through cluster manager. The dependencies of stages are unknown to the task scheduler.
The Workers execute the task on the slave.
At higher level, we can apply two type of RDD transformations: narrow transformation (e.g. map(), filter() etc.) and wide transformation (e.g. reduceByKey()). Narrow transformation does not require the shuffling of data across a partition, the narrow transformations will group into single stage while in wide transformation the data shuffles. Hence, Wide transformation results in stage boundaries.

Achieve Fault Tolerance through DAG
RDD splits into the partition and each node operates on a partition at any point in time. Here, the series of Scala function executes on a partition of the RDD. These operations compose together and Spark execution engine view these as DAG (Directed Acyclic Graph).
When any node crashes in the middle of any operation say O3 which depends on operation O2, which in turn O1. The cluster manager finds out the node is dead and assign another node to continue processing. This node will operate on the particular partition of the RDD and the series of operation that it has to execute (O1->O2->O3).  Now there will be no data loss.
Advantages of DAG in Spark:
The lost RDD can recover using the Directed Acyclic Graph.
Map Reduce has just two queries the map, and reduce but in DAG we have multiple levels. So to execute SQL query, DAG is more flexible.
DAG helps to achieve fault tolerance. Thus we can recover the lost data.
It can do a better global optimization than a system like Hadoop MapReduce.

Tuesday, 18 June 2019

Spark DataFrame

DataFrame in Spark

A Spark DataFrame is a distributed collection of data organized into named columns that provides operations to filter, group, or compute aggregates, and can be used with Spark SQL. DataFrames can be constructed from structured data files, existing RDDs, tables in Hive, or external databases.

We need DataFrames for:
Multiple Programming languages
The best property of DataFrames in Spark is its support for multiple languages, which makes it easier for programmers from different programming background to use it.
DataFrames in Spark support R–Programming Language, Python, Scala, and Java.
Multiple data sources
DataFrames in Spark can support a large variety of sources of data. We shall discuss one by one in the use case we deal with the upcoming part of this article.
Processing Structured and Semi-Structured Data
The core requirement for which the DataFrames are introduced is to process the Big-Data with ease. DataFrames in Spark uses a table format to store the data in a versatile way along with the schema for the data it is dealing with.
Slicing and Dicing the data
DataFrame APIs support slicing and dicing the data. It can perform operations like select and filter upon rows, columns.Statistical data is always prone to have Missing values, Range Violations, and Irrelevant values. The user can manage the missing data explicitly by using DataFrames.

Spark DataFrame Features
Scalability: It allows processing petabytes of data at once.
Flexibility: It supports a broad array of data formats (csv, Elasticsearch, Avro, etc.) and storage systems (HDFS, Hive tables, etc.)
Custom Memory Management: Data is stored off-heap in a binary format that saves memory and removes garbage collection. Also, Java serialization is avoided here as the schema is already known.
Optimized Execution Plans: Spark catalyst optimizer executes query plans, and it executes the queries on RDDs.
  1. DataFrame is a distributed collection of data organized in named column. It is equivalent to the table in RDBMS.
  2. It can deal with both structured and unstructured data formats. For Example Avro, CSV, elastic search, and Cassandra. It also deals with storage systems HDFS, HIVE tables, MySQL, etc.
  3. Catalyst supports optimization. It has general libraries to represent trees. DataFrame uses Catalyst tree transformation in four phases: a) Analyze logical plan to solve references, b) Logical plan optimization c) Physical planning d) Code generation to compile part of a query to Java bytecode.
  4. The DataFrame API’s are available in various programming languages. For example Java, Scala, Python, and R.
  5.  It provides Hive compatibility. We can run unmodified Hive queries on existing Hive warehouse.
  6.  It can scale from kilobytes of data on the single laptop to petabytes of data on a large cluster.
  7. DataFrame provides easy integration with Big data tools and framework via Spark core.
Creating DataFrames in Apache Spark
To all the functionality of Spark, SparkSession class is the entry point. For the creation of basic SparkSession just use SparkSession.builder(). Using Spark Session, an application can create DataFrame from an existing RDD, Hive table or from Spark data sources. Spark SQL can operate on the variety of data sources using DataFrame interface. Using Spark SQL DataFrame we can create a temporary view. In the temporary view of dataframe, we can run the SQL query on the data.


Limitations of Spark DataFrames
Despite having multiple benefits, none of the technologies exist without loopholes. Considerable limitations of Spark DataFrames are as follows:
  • The compiler is not able to catch errors as the code refers to data attribute names. Errors are detected during the run time after the creation of query plans.
  • It works better with Scala and very limited with Java.
  • Domain objects cannot be regenerated from it.

Tuesday, 4 June 2019

Paired RDD Operations

RDD Operation

reduceByKey
The reduceByKey example works much better on a large dataset. That's because Spark knows it can combine output with a common key on each partition before shuffling the data.
Look at the diagram below to understand what happens with reduceByKey. Notice how pairs on the same machine with the same key are combined (by using the lamdba function passed into reduceByKey) before the data is shuffled. Then the lamdba function is called again to reduce all the values from each partition to produce one final result.


groupByKey
when we are calling groupByKey - all the key-value pairs are shuffled around. This is a lot of unnessary data to being transferred over the network.
To determine which machine to shuffle a pair to, Spark calls a partitioning function on the key of the pair. Spark spills data to disk when there is more data shuffled onto a single executor machine than can fit in memory. However, it flushes out the data to disk one key at a time - so if a single key has more key-value pairs than can fit in memory, an out of memory exception occurs. This will be more gracefully handled in a later release of Spark so the job can still proceed, but should still be avoided - when Spark needs to spill to disk, performance is severely impacted.
combineByKey
Spark combineByKey RDD transformation is very similar to combiner in Hadoop MapReduce programming. In this post, we’ll discuss spark combineByKey example in depth and try to understand the importance of this function in detail.
Spark combineByKey function uses following three functions as an argument,

createCombiner
createCombiner function of combineByKey
This function is a first argument of combineByKey function
It is a first aggregation step for each key
It will be executed when any new key is found in a partition
Execution of this lambda function is local to a partition of a node, on each individual values
In our case as we are calculating percentage, we need sum and count as an aggregation. So our createCombiner function should initialize it with a tuple (sum, count). For initial aggregation, it should be (value, 1).
This function is similar to first argument (i.e. zeroVal) of aggregateByKey transformation.
mergeValue
mergeValue function of combineByKey
Second function executes when next subsequent value is given to combiner
It also executes locally on each partition of a node and combines all values
Arguments of this function are a accumulator and a new value
It combines a new value in existing accumulator
In our case mergeValue has one accumulator tuple (sum, count). So whenever we get a new value the marks will be added to first element and second value (i.e. counter) will be incremented by 1.
This function is similar to second argument (i.e. seqOp) of aggregateByKey transformation.
mergeCombiners
mergeCombiner function of combineByKey
Final function is used to combine how to merge two accumulators (i.e. combiners) of a single key across the partitions to generate final expected result
Arguments are two accumulators (i.e. combiners)
Merge results of a single key from different partitions
This function is similar to third argument (i.e. combOp) of aggregateByKey transformation.

Important Points
Apache spark combineByKey is a transformation operation hence its evaluation is lazy
It is a wider operation as it shuffles data in the last stage of aggregation and creates another RDD
Recommended to use when you need to do further aggregation on grouped data
Use combineByKey when return type differs than source type (i.e. when you cannot use reduceByKey )
Example :
val studentRDD = sc.parallelize(Array(("Joseph", "Maths", 83), ("Joseph", "Physics", 74), ("Joseph", "Chemistry", 91), ("Joseph", "Biology", 82), ("Jimmy", "Maths", 69), ("Jimmy", "Physics", 62), 
    ("Jimmy", "Chemistry", 97), ("Jimmy", "Biology", 80), ("Tina", "Maths", 78), ("Tina", "Physics", 73), ("Tina", "Chemistry", 68), ("Tina", "Biology", 87), ("Thomas", "Maths", 87), ("Thomas", "Physics", 93), ("Thomas", "Chemistry", 91), ("Thomas", "Biology", 74), ("Cory", "Maths", 56), ("Cory", "Physics", 65), ("Cory", "Chemistry", 71), ("Cory", "Biology", 68), ("Jackeline", "Maths", 86), ("Jackeline", "Physics", 62), ("Jackeline", "Chemistry", 75), ("Jackeline", "Biology", 83), ("Juan", "Maths", 63), ("Juan", "Physics", 69), ("Juan", "Chemistry", 64), ("Juan", "Biology", 60)), 3)

//Defining createCombiner, mergeValue and mergeCombiner functions
def createCombiner = (tuple: (String, Int)) => (tuple._2.toDouble, 1)
//createCombiner creates the initial value (combiner) for a key's first encounter on a partition if one is not found --> (firstValueEncountered, 1). So, this is merely initializing a tuple with the first value and a key counter of 1.
def mergeValue = (accumulator: (Double, Int), element: (String, Int)) => (accumulator._1 + element._2, accumulator._2 + 1) 
//mergeValue is triggered only if a combiner (tuple in our case) has already been created for the found key on this partition --> (existingTuple._1 + subSequentValue, existingTuple._2 + 1). This adds the existing tuple's value (in the first slot) with the newly encountered value and takes the existing tuple's counter (in the second slot) and increments it.
def mergeCombiner = (accumulator1: (Double, Int), accumulator2: (Double, Int)) => (accumulator1._1 + accumulator2._1, accumulator1._2 + accumulator2._2)
// mergeCombiner takes the combiners (tuples) created on each partition and merges them together --> (tupleFromPartition._1 + tupleFromPartition2._1, tupleFromPartition1._2 + tupleFromPartition2._2). This is merely adding the values from each tuple together and the counters together into one tuple.
val combRDD = studentRDD.map(t => (t._1, (t._2, t._3))).combineByKey(createCombiner, mergeValue, mergeCombiner).map(e => (e._1, e._2._1/e._2._2))
//Check the Outout
combRDD.collect foreach println
(Tina,76.5)
(Thomas,86.25)
(Jackeline,76.5)
(Joseph,82.5)
(Juan,64.0)
(Jimmy,77.0)
(Cory,65.0)

mapValues()
When we use map() with a Pair RDD, we get access to both Key & value. There are times we might only be interested in accessing the value(& not key). In those case, we can use mapValues().
Eg:
val inputrdd = sc.parallelize(Seq(("maths", 50), ("maths", 60), ("english", 65)))
val mapped = inputrdd.mapValues(mark => (mark, 1));
mapped.collect()
//Output :
res3: Array[(String, (Int, Int))] = Array((maths,(50,1)), (maths,(60,1)), (english,(65,1)))

Keys()
Keys() operation generally returns a spark RDD of just the keys.
rdd.keys()

values()
values() operation generally returns an RDD of just the values.
rdd.values()

Fold
Fold is a very powerful operation in spark which allows you to calculate many important values in O(n) time. If you are familiar with Scala collection it will be like using fold operation on collection. Even if you not used fold in Scala, this post will make you comfortable in using fold.
Syntax
def fold[T](acc:T)((acc,value) => acc)
The above is kind of high level view of fold api. It has following three things:
T is the data type of RDD
acc is accumulator of type T which will be return value of the fold operation
A function , which will be called for each element in rdd with previous accumulator.
Let’s see some examples of fold
//Code
val itemPrice= sc.parallelize(List(("Soap",10.0),("Toaster",200.0),("Tshirt",400.0)))
val dummyItem = ("dummy",0.0);
val maxPrice = itemPrice.fold(dummyItem)((acc,item) => { 
  if(acc._2 < item._2) item else acc})
println("maximum price item "+maxPrice )
//Output
maximum price item (Tshirt,400.0)

FoldByKey
In Map/Reduce key plays a role of grouping values. We can use foldByKey operation to aggregate values based on keys.
In this example, employees are grouped by department name. If you want to find the maximum salaries in a given department we can use following code.
Let’s see some examples of foldByKey
//Code
val deptEmployees = sc.parallelize(List(("cs",("jack",1000.0)),("cs",("bron",1200.0)),("phy",("sam",2200.0)),("phy",("ronaldo",500.0)) ))
val maxByDept = deptEmployees.foldByKey(("dummy",0.0))((acc,element)=> if(acc._2 > element._2) acc else element)
println("maximum salaries in each dept" + maxByDept.collect().toList)
//Output
maximum salaries in each deptList((cs,(bron,1200.0)), (phy,(sam,2200.0)))

countByKey()
Through countByKey operation, we can count the number of elements for each key.
rdd.countByKey()
Eg:
val rdd = sc.parallelize(Seq(("math",    55),("math",    56),("english", 57),("english", 58),("science", 59),("science", 54)))
val result1 = rdd.countByKey()
//Output:
result1: scala.collection.Map[String,Long] = Map(math -> 2, english -> 2, science -> 2)

collectAsMap()
collectAsMap() operation helps to collect the result as a map to provide easy lookup.
rdd.collectAsMap()
Eg:
val rdd = sc.parallelize(Seq(("math",55),("math",56),("math",58),("english",57),("english",58),("science",54)))
val result = rdd.collectAsMap()
println(result)
//Output
result: scala.collection.Map[String,Int] = Map(math -> 58, science -> 54, english -> 58)

lookup(key)
it returns all values associated with the provided key.
rdd.lookup()
Eg:
val rdd = sc.parallelize(Seq(("math",55),("math",56),("math",58),("english",57),("english",58),("science",54)))
val result = rdd.lookup("math")
println(result)
//Output
result: Seq[Int] = WrappedArray(55, 56, 58)

Integrating Apache Hive with Spark

Hive Warehouse Connector for accessing Apache Spark data The Hive Warehouse Connector (HWC) is a Spark library/plugin that is launched w...