Parallel Processing in Apache Spark

We already learned about the application driver and the executors. We know that Apache Spark breaks our application into many smaller tasks and assign them to executors. So Spark executes the application in parallel. But do you understand the internal mechanics? How does the Spark breaks our code into a set of task and run it in parallel?
This article aims to answer the above question.

Spark application flow

All that you are going to do in Apache Spark is to read some data from a source and load it into Spark. You will then process the data and hold the intermediate results, and finally write the results back to a destination. But in this process, you need a data structure to hold the data in Spark.
We have three alternatives to hold data in Spark.

  1. Data Frame
  2. Dataset
  3. RDD

Apache Spark 2.x recommends to use the first two and avoid using RDDs. However, there is a critical fact to note about RDDs. Data Frames and Datasets, both of them are ultimately compiled down to an RDD. So, under the hood, everything in Spark is an RDD. And for that reason, I will start with RDDs and try to explain the mechanics of parallel processing.


Spark RDD

Let's define the RDD. The name stands for Resilient Drstributed Dataset. However, I can describe a RDD as below.

Spark RDD is a resilient, partitioned, distributed and immutable collection of data.

Let's quickly review this description.

  1. Collection of data - RDDs hold data and appears to be a Scala Collection.
  2. Resilient - RDDs can recover from a failure, so they are fault tolerant.
  3. Partitioned - Spark breaks the RDD into smaller chunks of data. These pieces are called partitions.
  4. Distributed - Instead of keeping those partitions on a single machine, Spark spreads them across the cluster. So they are a distributed collection of data.
  5. Immutable - Once defined, you can't change a RDD. So Spark RDD is a read-only data structure.

You can create a RDD using two methods.

  1. Load some data from a source.
  2. Create a RDD by transforming another RDD.

The code below shows an example RDD.

                                        
    //load a text file from current directory
    val flistRDD = sc.textFile("flist.txt")
    //Check number of defaults partitions
    flistRDD.getNumPartitions
    //Reload with five partitions
    val flistRDD = sc.textFile("flist.txt", 5)
    //Count the number of elements in each partition
    flistRDD.foreachPartition(p => println("Items in partition-" + p.count(y=>true)) ) 
          
                                    

In the first line, we load some data from a file to create a RDD. When you create a RDD by loading some data from a source, Spark creates some default partitions. The second line displays the default number of partitions. If you want, you can override the defaults and create as many partitions as you want. The second parameter in textFile API is the number of partitions. The last line iterates to all partitions and counts the number of elements for each partition.
The above example shows that a RDD is a partitioned collection, and we can control the number of partitions.
Spark RDDs offer two types of operations.

  1. Transformations
  2. Actions

The transformation operations create a new distributed dataset from an existing distributed dataset. So, they create a new RDD from an existing RDD.
The Actions are mainly performed to send results back to the driver, and hence they produce a non distributed dataset.
All transformations in Spark are lazy and the actions are strict. That means, they don't compute results until an action requires them to provide results.

Parallel Processing of RDD

Let me ask you a simple question. Given the above RDD, If I want to count the number of lines in that RDD, Can we do it in parallel? No brainer. Right?
We already have five partitions. I will give one partition to each executor and ask them to count the lines in the given partition. Then I will take the counts back from these executors and sum it. Simple. Isn't it? That's what the Spark does.
Calculating count is a simple thing. However, the mechanism of parallelism in Spark is the same. There are two main variables to control the degree of parallelism in Apache Spark.

  1. The number of partitions
  2. The number of executors

If you have ten partitions, you can achieve ten parallel processes at the most. However, if you have just two executors, all those ten partitions will be queued to those two executors.
Let's do something little simple and take our understanding to the next level. Here is an example in Scala.


                                        
    val flistRDD = sc.textFile("/home/prashant/flist.txt", 5)
    val arrayRDD = flistRDD.map(x=> x.split("/"))
    val kvRDD = arrayRDD.map(a => (a(1),1))
    val fcountRDD= kvRDD.reduceByKey((x,y)=> x+y)
    fcountRDD.collect()       

    /* Sample entries from the data file
    /etc/abrt
    /etc/abrt/plugins
    /etc/abrt/plugins/CCpp.conf
    /etc/reader.conf
    /etc/fonts
    /etc/fonts/fonts.conf
    /etc/fonts/fonts.dtd
    /etc/fonts/conf.d
    */    
                                    

The above example loads a data file. The file contains the list of directories and files in my local system. I have listed some sample entries above.
Line one loads a text file into an RDD. The file is quite small. If you keep it in HDFS, it may have one or two blocks in HDFS, So it is likely that you get one or two partitions by default. However, we want to make five partitions of this data file and hence we set the second argument of the textFile API to 5.
The line two executes a map method on the first RDD and returns a new RDD. We already know that RDDs are immutable. So, we can't modify the first RDD. Instead, we take the first RDD, perform a map operation and create a new RDD. The map operation splits each line into an array of words. Hence, the new RDD is a collection of Arrays.
The third line executes another map method over the arrayRDD. This time, we generate a tuple (key-value pair). I am taking the first element of the array as my key. The value is a hardcoded numeric one.
What am I trying to do?
I am trying to count the number of entries for each unique directory listing in the file. That's why I am taking the directory name as a key and one as a value. Once I have kvRDD, I can easily count the number of files. All I have to do is to group all the values by the key and sum up the 1s.
That's what the fourth line is doing (ReduceByKey). The ReduceByKey means, group by key and sum the values.
Finally, I collect all the data back from the executors to the driver.
I have executed the above example on a six-node Hadoop cluster. Apache Spark offers a UI to track a Spark application. The below figure shows the Spark UI for the above example.

Spark UI to track a job
Fig.2- Spark UI shows the details of a Job.

We see one job. Two stages and ten task. Let's understand these three things.


Spark Job

Spark created one job for the collect action. We already know that Spark transformations are lazy. However, an action is strict. We created a single Spark action, and hence we see a single job.

Stages

Apache Spark has completed this Job in two Stages. It couldn't do it in a single Stage due to a shuffle activity caused by the reduceByKey transformation.

Tasks

My initial RDD had five partitions. So I expected five task. Since the job went into two stages, we have ten tasks. Five task for each stage.

DAG

The Spark DAG shows the whole process in a nice visualization. The DAG shows that Spark was able to complete first three activities in a single stage. However, for the ReduceByKey function, It took a new Stage. The question is Why?
A detailed answer is explained in one of my Spark training videos however here is a short answer.
Whenever you do something that needs moving data, for example, a group by operation or a join operation, you will notice a new Stage. This new stage represents a Spark Shuffle activity.

Read More

Spark Introduction | Spark Internals | Parallel Processing in Apache Spark

By Prashant Pandey -


You will also like:


Functional Programming

What is Functional Programming and why it is important?

Learning Journal

Pure Functions

What are pure functions and side effects. Start learning functional programming.

Learning Journal

Statements and Expressions

Statements and Expressions in Scala. How are they different?

Learning Journal

Scala named arguments

Learn about named arguments and default values in Scala functions with examples.

Learning Journal

Anonymous Functions

Learn Scala Anonymous Functions with suitable examples.

Learning Journal