Parallel Processing in Apache Spark
We already learned about the application driver and the executors. We know that
Apache Spark breaks our application into
many smaller tasks and assign them to executors. So Spark executes the application
parallel. But do you understand the internal mechanics? How does the Spark breaks
code into a set of task and run it in parallel?
This article aims to answer the above question.
Spark application flow
All that you are going to do in Apache Spark is to read some data from a source and
load it into Spark. You will then process
the data and hold the intermediate results, and finally write the results back to a
But in this process, you need a data structure to hold the data in Spark.
We have three alternatives to hold data in Spark.
- Data Frame
Apache Spark 2.x recommends to use the first two and avoid using RDDs. However, there is a critical fact to note about RDDs. Data Frames and Datasets, both of them are ultimately compiled down to an RDD. So, under the hood, everything in Spark is an RDD. And for that reason, I will start with RDDs and try to explain the mechanics of parallel processing.
Let's define the RDD. The name stands for Resilient Drstributed Dataset. However, I can describe a RDD as below.
Spark RDD is a resilient, partitioned, distributed and immutable collection of data.
Let's quickly review this description.
- Collection of data - RDDs hold data and appears to be a Scala Collection.
- Resilient - RDDs can recover from a failure, so they are fault tolerant.
- Partitioned - Spark breaks the RDD into smaller chunks of data. These pieces are called partitions.
- Distributed - Instead of keeping those partitions on a single machine, Spark spreads them across the cluster. So they are a distributed collection of data.
- Immutable - Once defined, you can't change a RDD. So Spark RDD is a read-only data structure.
You can create a RDD using two methods.
- Load some data from a source.
- Create a RDD by transforming another RDD.
The code below shows an example RDD.
//load a text file from current directory val flistRDD = sc.textFile("flist.txt") //Check number of defaults partitions flistRDD.getNumPartitions //Reload with five partitions val flistRDD = sc.textFile("flist.txt", 5) //Count the number of elements in each partition flistRDD.foreachPartition(p => println("Items in partition-" + p.count(y=>true)) )
In the first line, we load some data from a file to create a RDD. When you create a
RDD by loading some data from a source,
Spark creates some default partitions. The second line displays the default number
partitions. If you want, you can override the defaults and create as many
as you want. The second parameter in
textFile API is the number of partitions. The last line iterates to all
and counts the number of elements for each partition.
The above example shows that a RDD is a partitioned collection, and we can control the number of partitions.
Spark RDDs offer two types of operations.
The transformation operations create a new distributed dataset from an existing
distributed dataset. So, they create a new
RDD from an existing RDD.
The Actions are mainly performed to send results back to the driver, and hence they produce a non distributed dataset.
All transformations in Spark are lazy and the actions are strict. That means, they don't compute results until an action requires them to provide results.
Parallel Processing of RDD
Let me ask you a simple question. Given the above RDD, If I want to count the
number of lines in that RDD, Can we do it in
parallel? No brainer. Right?
We already have five partitions. I will give one partition to each executor and ask them to count the lines in the given partition. Then I will take the counts back from these executors and sum it. Simple. Isn't it? That's what the Spark does.
Calculating count is a simple thing. However, the mechanism of parallelism in Spark is the same. There are two main variables to control the degree of parallelism in Apache Spark.
- The number of partitions
- The number of executors
If you have ten partitions, you can achieve ten parallel processes at the most.
However, if you have just two executors,
all those ten partitions will be queued to those two executors.
Let's do something little simple and take our understanding to the next level. Here is an example in Scala.
val flistRDD = sc.textFile("/home/prashant/flist.txt", 5) val arrayRDD = flistRDD.map(x=> x.split("/")) val kvRDD = arrayRDD.map(a => (a(1),1)) val fcountRDD= kvRDD.reduceByKey((x,y)=> x+y) fcountRDD.collect() /* Sample entries from the data file /etc/abrt /etc/abrt/plugins /etc/abrt/plugins/CCpp.conf /etc/reader.conf /etc/fonts /etc/fonts/fonts.conf /etc/fonts/fonts.dtd /etc/fonts/conf.d */
The above example loads a data file. The file contains the list of directories and
files in my local system. I have listed
some sample entries above.
Line one loads a text file into an RDD. The file is quite small. If you keep it in HDFS, it may have one or two blocks in HDFS, So it is likely that you get one or two partitions by default. However, we want to make five partitions of this data file and hence we set the second argument of the textFile API to 5.
The line two executes a map method on the first RDD and returns a new RDD. We already know that RDDs are immutable. So, we can't modify the first RDD. Instead, we take the first RDD, perform a map operation and create a new RDD. The map operation splits each line into an array of words. Hence, the new RDD is a collection of Arrays.
The third line executes another map method over the arrayRDD. This time, we generate a tuple (key-value pair). I am taking the first element of the array as my key. The value is a hardcoded numeric one.
What am I trying to do?
I am trying to count the number of entries for each unique directory listing in the file. That's why I am taking the directory name as a key and one as a value. Once I have kvRDD, I can easily count the number of files. All I have to do is to group all the values by the key and sum up the 1s.
That's what the fourth line is doing (ReduceByKey). The ReduceByKey means, group by key and sum the values.
Finally, I collect all the data back from the executors to the driver.
I have executed the above example on a six-node Hadoop cluster. Apache Spark offers a UI to track a Spark application. The below figure shows the Spark UI for the above example.
We see one job. Two stages and ten task. Let's understand these three things.
Spark created one job for the collect action. We already know that Spark transformations are lazy. However, an action is strict. We created a single Spark action, and hence we see a single job.
Apache Spark has completed this Job in two Stages. It couldn't do it in a single Stage due to a shuffle activity caused by the reduceByKey transformation.
My initial RDD had five partitions. So I expected five task. Since the job went into two stages, we have ten tasks. Five task for each stage.
The Spark DAG shows the whole process in a nice visualization. The DAG shows that
Spark was able to complete first three
activities in a single stage. However, for the
ReduceByKey function, It took a new Stage. The question is Why?
A detailed answer is explained in one of my Spark training videos however here is a short answer.
Whenever you do something that needs moving data, for example, a group by operation or a join operation, you will notice a new Stage. This new stage represents a Spark Shuffle activity.