In this article we will walk through the core architecture of a cluster, Spark Application, and Spark's structured APIs using DataFrames and SQL.
Spark's Basic Architecture :
As we know that one machine(computer) works perfectly well for working with spreadsheet software but when we try to do some data processing on that machine, will observe that machine don't have enough power and resources to perform computations on huge amount of information(waiting time is too much). To solve this problem we need cluster, or group of machine, pools the resource of many machine together, giving us the ability to use all the cumulative resources as if they were a single computer. Group of machine is alone not powerful. To perform task on that machines we also need a framework to co-ordinate work across them that is what Spark is doing. Spark does just that, managing and co-ordinating the execution of tasks on data across a cluster of computers. To manage this cluster of machines (on which task executes) we need cluster manager like Spark's standalone cluster manager, YARN, or Mesos. In short we have to submit Spark Applications to these cluster managers, which will grant resources to our application so that we can complete our work.
Spark Applications :
It consists of a driver process and a set of executor processes.
Driver process runs your main() function which sits on a node in the cluster, responsible for three tasks : maintaining information about the Spark Application; responding to a user's program or input and analyzing, distributing, and scheduling work across the executors. Driver process is the heart of a Spark Application and maintains all relevant information during the lifetime of the application.
Executors are actually responsible for carrying out the task that the driver assigns them. Each executor is responsible for two things: executing code assigned to it by the driver, and reporting the state of the computation on the executor back to the driver node.
In interactive mode, you implicitly create a SparkSession that manages the Spark Application. In other word you control your Spark Application through a driver process called the SparkSession. There is a one-to-one correspondaence between a SparkSession and Spark Application.
In Scala, we should wee something like the following :
res0: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@...
DataFrames : It is the most common structured API, simply represents a table of data with rows & columns. The list that defines the columns and the type within those columns is called the schema. Spark DataFrame can span thousands of computers. The reason for putting the data on more than one computers should be inituitive : either data is too large to fit on one machine or it would simply take too long to perform that computation on one machine. In R/Python DataFrames (with some exceptions) exist on one machine rather than multiple machines.
Partitions : To allow every executor to perform work in parallel, Spark breaks up the data into chunks called partitions. A partitions is a collection of rows that sit on one physical machine in your cluster. A DataFrames's partitions represent how the data is physically distributed across the cluster of machines during execution. If you have one partition, Spark will have a parallelism of only one, even if you have thousands of executors. If you have many partitions but only one executor then also Spark will have a parallelism of only one.
With DataFrames you don't manipulate partitions manually or individually.
In Spark, the core data structures are immutable(can't change) after they are created. To change a DataFrame, you need to instruct Spark how you would like to modify it to do what you want. This instructions are called transformations. Spark will not act on transformation until we call action. Transformation is nothing but the core of how you express your business logic to Spark. Transformations are of two types :
Narrow transformation : In this type of transformation each input partition will contribute to only one output partition.With this kind of transformations, Spark will automatically perform an operation called "pipelining", meaning that if we specify multiple filters on DataFrames, they will all be performed in-memory.
Wide transformation : This type of transformation are referred as shuffle, whereby Spark will exchange partitions across the cluster. In this kind of transformation Spark writes in Disk instead of memory.
Lazy Evaluation :
It means that Spark will wait until the very last moment to execute the graph of computation instructions.
Actions :
Transformation allow us to build up our logical transformation plan. To trigger the computation , we run an action. An action instructs Spark to compute a result from a series of transformations.
There are three kinds of actions:
Actions to view data in the console.
Actions to collect data to native objects in the respective language.
Actions to write to output data sources.
Spark UI
We can monitor the progress of a job through the Spark web UI. It is available, on port 4040 of the driver node.
Spark includes the abilitty to read and write from a variety of data sources. To read this data, we will use a DataFrameReader that is associated with our SparkSession.
Schema inference means that we want Spark to take a best guess at what the schema of our DataFrame should be.
Note : By default, when we perform a shuffle, Spark outputs 200 shuffle partitions. Let us set this value to 5 to reduce the number of the output partitions from the shuffle:
spark.conf.set("spark.sql.shuffle.partitions","5")
With SparkSQL, you can register any DataFrame as a table or view (or a temp table) and query it using pure SQL.
Eg: Dataframe_Name.createOrReplaceTempView("Table_Name")
End-to-End Example
Scala code:
var FData = spark.read.option("inferschema","true").option("header","true").csv("filepath") //Read csv file
FData
.groupBy("Column_Name") //RelationalGroupedDataset
.sum("count") //sum aggregation method
.withColumnRenamed("sum(count)",New_Column_Name") //withColumnRenamed method
.sort(desc("New_Column_Name")) //sorts the data
.limit(5) //specify a limit
.explain() //action step
Spark's Basic Architecture :
As we know that one machine(computer) works perfectly well for working with spreadsheet software but when we try to do some data processing on that machine, will observe that machine don't have enough power and resources to perform computations on huge amount of information(waiting time is too much). To solve this problem we need cluster, or group of machine, pools the resource of many machine together, giving us the ability to use all the cumulative resources as if they were a single computer. Group of machine is alone not powerful. To perform task on that machines we also need a framework to co-ordinate work across them that is what Spark is doing. Spark does just that, managing and co-ordinating the execution of tasks on data across a cluster of computers. To manage this cluster of machines (on which task executes) we need cluster manager like Spark's standalone cluster manager, YARN, or Mesos. In short we have to submit Spark Applications to these cluster managers, which will grant resources to our application so that we can complete our work.
Spark Applications :
It consists of a driver process and a set of executor processes.
Driver process runs your main() function which sits on a node in the cluster, responsible for three tasks : maintaining information about the Spark Application; responding to a user's program or input and analyzing, distributing, and scheduling work across the executors. Driver process is the heart of a Spark Application and maintains all relevant information during the lifetime of the application.
Executors are actually responsible for carrying out the task that the driver assigns them. Each executor is responsible for two things: executing code assigned to it by the driver, and reporting the state of the computation on the executor back to the driver node.
The architecture of a Spark Application
Note : User can specify how many executors should fall on each node through configurations.
Spark's Language APIs
Scala : Spark is primarily written in Scala, making it Spark's default language.
Java : Even though Spark is written in Scala, you can write code in java as well.
Python : Python supports nearly all constructs that Scala supports.
SQL : Spark support a subset of the ANSI SQL 2013 standard.
R : Spark has two commonly user R libraries: one as a part of Spark core (SparkR) and another as an R community-driven package (sparklyr)
Relationship between the SparkSession and Spark's Language API
Spark has two fundamental sets of APIs: the low-level "unstructured" APIs, and the higher-level structured APIs.
In interactive mode, you implicitly create a SparkSession that manages the Spark Application. In other word you control your Spark Application through a driver process called the SparkSession. There is a one-to-one correspondaence between a SparkSession and Spark Application.
In Scala, we should wee something like the following :
res0: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@...
DataFrames : It is the most common structured API, simply represents a table of data with rows & columns. The list that defines the columns and the type within those columns is called the schema. Spark DataFrame can span thousands of computers. The reason for putting the data on more than one computers should be inituitive : either data is too large to fit on one machine or it would simply take too long to perform that computation on one machine. In R/Python DataFrames (with some exceptions) exist on one machine rather than multiple machines.
Partitions : To allow every executor to perform work in parallel, Spark breaks up the data into chunks called partitions. A partitions is a collection of rows that sit on one physical machine in your cluster. A DataFrames's partitions represent how the data is physically distributed across the cluster of machines during execution. If you have one partition, Spark will have a parallelism of only one, even if you have thousands of executors. If you have many partitions but only one executor then also Spark will have a parallelism of only one.
With DataFrames you don't manipulate partitions manually or individually.
In Spark, the core data structures are immutable(can't change) after they are created. To change a DataFrame, you need to instruct Spark how you would like to modify it to do what you want. This instructions are called transformations. Spark will not act on transformation until we call action. Transformation is nothing but the core of how you express your business logic to Spark. Transformations are of two types :
Narrow transformation : In this type of transformation each input partition will contribute to only one output partition.With this kind of transformations, Spark will automatically perform an operation called "pipelining", meaning that if we specify multiple filters on DataFrames, they will all be performed in-memory.
Wide transformation : This type of transformation are referred as shuffle, whereby Spark will exchange partitions across the cluster. In this kind of transformation Spark writes in Disk instead of memory.
Lazy Evaluation :
It means that Spark will wait until the very last moment to execute the graph of computation instructions.
Actions :
Transformation allow us to build up our logical transformation plan. To trigger the computation , we run an action. An action instructs Spark to compute a result from a series of transformations.
There are three kinds of actions:
Actions to view data in the console.
Actions to collect data to native objects in the respective language.
Actions to write to output data sources.
Spark UI
We can monitor the progress of a job through the Spark web UI. It is available, on port 4040 of the driver node.
Spark includes the abilitty to read and write from a variety of data sources. To read this data, we will use a DataFrameReader that is associated with our SparkSession.
Schema inference means that we want Spark to take a best guess at what the schema of our DataFrame should be.
Note : By default, when we perform a shuffle, Spark outputs 200 shuffle partitions. Let us set this value to 5 to reduce the number of the output partitions from the shuffle:
spark.conf.set("spark.sql.shuffle.partitions","5")
With SparkSQL, you can register any DataFrame as a table or view (or a temp table) and query it using pure SQL.
Eg: Dataframe_Name.createOrReplaceTempView("Table_Name")
End-to-End Example
Scala code:
var FData = spark.read.option("inferschema","true").option("header","true").csv("filepath") //Read csv file
FData
.groupBy("Column_Name") //RelationalGroupedDataset
.sum("count") //sum aggregation method
.withColumnRenamed("sum(count)",New_Column_Name") //withColumnRenamed method
.sort(desc("New_Column_Name")) //sorts the data
.limit(5) //specify a limit
.explain() //action step




No comments:
Post a Comment