Apache Spark Foundation Course - Dataframe Transformations


In the earlier video, we started our discussion on Spark Data frames. In this video, we will deep dive further and try to understand some internals of Apache Spark data frames. I am assuming that you have already watched the Architecture video. We covered some RDD basics in that lesson. You might find that half of this video is reiterating the RDD concepts. And that happens because a data frame ultimately compiles down to an RDD. However, we will go ahead of those notions and also cover things that are specific to data frames. So, Let's start.

We already learned a method to load data from a source. What do you want to do next?
If you have a business requirement, you might want to do some data processing and generate an output to meet your requirement. Let me give you a simple requirement. We already loaded mental health survey dataset. Can you generate following bar chart using that dataset?

Spark Dataframe Example Graph and Table
Fig.1-Spark Dataframe Example Graph and Table.

Well, we don't want to get into the visualization so let's reduce the requirement to an output dataset. The table represents the final output that we want to achieve. I created the Bar Chart in MS Excel using the above table.
Does it appear to be an easy requirement? Well, It is indeed an easy example. We are going to solve this problem, and while we develop a solution, you will also discover some important data frame concepts.
We already learned that RDDs are immutable. Once loaded, you cannot modify them. However, you can perform following types of operations on RDDs.

  1. Transformations
  2. Actions

Spark data frames carry the same legacy from RDDs. So, Spark data frames are also immutable. But you can perform transformations on them to generate new data frames. You can also perform actions on a Spark data frames. If you check the documentation, you will see two categories of transformations.

  1. Typed Transformations
  2. Untyped Transformations

Both of these are available to data frames. The untyped transformations might return you a dataset. But you can convert a dataset to a data frame using the toDF function. However, you don't need to worry too much about it because Spark can take care of that automatically.
Transformations are the basic building blocks for Spark developers. They are the key tools for the developers because we use transformations to express our business logic.
Let's come back to our example. What do we want to do? If you check the dataset . You will see a field for treatment. This field records the response to the following question.
Have you taken a treatment for your mental illness?
Some people responded as Yes and others as No. All we need to do is to count these Yes and No. Then group them by the gender. That's it. You will get the output table. We can meet this requirement by applying a set of transformations. Let's try that.
The first activity is to load the data into a DataFrame. You can use below code to load the data.

                                
    val df = spark.read
                  .format("csv")
                  .option("header","true")
                  .option("inferSchema","true") 
                  .option("nullValue","NA")
                  .option("timestampFormat","yyyy-MM-dd'T'HH:mm​:ss")
                  .option("mode","failfast")
                  .option("path","/home/prashant/spark-data/mental-health-in-tech-survey/survey.csv")
                  .load()                                          
                            

There are many fields in this dataset. However, for my requirement, I am only concerned with two fields.

  1. Gender
  2. Treatment

So, let me apply a select transformation to select only two fields.

                                
     val df1 = df.select( $"Gender",$"treatment")
               df1.show                                                 
                         

So the df1 is a new data frame that contains only two fields. But I need to count Yes and Nos. So I might need to separate them into two different columns. Once I have all the Yes in one column and all the Nos in another column, I can easily count them.
So, let's apply another transformation.

                                
    val df2 = df1.select($"Gender",
                (when($"treatment" === "Yes", 1).otherwise(0)).alias("All-Yes"),
                (when($"treatment" === "No", 1).otherwise(0)).alias("All-Nos")
                         )                                                 
                         

This time, I am taking df1 and applying another select transformation. I take the gender field as it was earlier. I am not applying any changes to that field. However, I am transforming the treatment field. When the treatment is Yes, I take it as a numeric one. Otherwise, I take it as a numeric zero. Now, I can simply take a sum of All-Yes column to get the total number of Yes. Right? You might want to do it differently.
For example, When the treatment is Yes, take it as Yes otherwise take it as Null. Then you can apply a count function on that column.
However, I prefer to take it as a numeric value and apply the sum function instead of a count function. That helps me to avoid unnecessary dealing with nulls. You might also wonder that why do we apply two transformations for this. I mean, We could have done it in a single step.
Let me show you.

                                
    val df2 = df.select($"Gender",
                       (when($"treatment" === "Yes", 1).otherwise(0)).alias("All-Yes"),
                       (when($"treatment" === "No", 1).otherwise(0)).alias("All-Nos")
                        )                                                 
                         

Instead of applying this transformation over df2, we could have applied it directly to df.
I mean, we could have avoided the first transformation and done it in a single step. You might argue that if we can avoid one transformation, and do it as a single step, Spark might perform better. Right? But that's not the truth in this case. There is no difference in performance. At least not in this example.
Why?
Let's try to understand that.
We already learned in RDDs that the transformations are lazy. They don't execute until we fire an action. Right? Spark implements data frames to be lazy because that design gives them at least two clear benefits.

  1. Spark engine can act as a compiler.
  2. They get an opportunity to apply necessary optimizations.

In our example, we applied two transformations. We haven't executed any action yet. So, Spark has executed nothing yet on the cluster. You might see a load operation on Spark UI. But I am talking about those two select transformations. None of them are executed yet.
Let's apply an action.

                                
        df2.collect                                             
                         

Now you will see one job in Spark UI.
If you jump to the SQL tab in the Spark UI and click on the collect job, you will get a lot of details.
You will see four plans.

  1. Parsed Logical Plan
  2. Analyzed Logical Plan
  3. Optimized Logical Plan
  4. and Finally the Physical Plan

When we execute an action, Spark takes the user code. In our case, It takes those two select transformations. It will then parse the user code and generate a parsed logical plan.
The second step is to analyze the initial plan and resolve the column names and their data types. The output of the second step is an analyzed logical plan. Apache Spark maintains a catalog of all the tables and data frame information. The analyzer makes a call to the catalog and resolves the initial plan. The analyzed plan clearly shows the column names and the datatypes.
The analyzed logical plan goes to an optimizer. As of now, the optimizer mainly performs two types of optimizations.

  1. Pipelining
  2. Predicate pushdown

Pipelining is as simple as combining multiple transformations together. We created two transformations. Both were the select operations. Spark realizes that it can combine them together into a single transformation. So, it simply does that.
You can cross check it by looking at the optimized plan. The pipelining optimization doesn't only apply to a select transformation. Spark will look for all such opportunities and apply the pipelining where ever it is applicable.
The other type of optimization is the predicate pushdown. That simply means pushing down the filter conditions to the early stage instead of applying it at the end.
The optimized logical plan goes to the Spark compiler that generates a bunch of physical execution plans. The physical execution plan is nothing but a series of RDD transformations. The Spark engine generates multiple physical plans based on various considerations. Those considerations might be a different approach to perform a join operation. It may be the physical attributes of the underlying data file. It may be something else. However, Spark settles down to a single physical plan that it evaluates to be the best among others. Finally, The best plan goes for the execution on the cluster.
Great. Let's come back to our example. We loaded data. We applied one transformation. Then we applied another transformation. But we haven't reached the desired output. I think if group by the gender and compute a sum over the second and third column, we should get the desired output.
Let's try that.


                                
    val df3 = df2.groupBy("Gender")
                 .agg( sum($"All-Yes"),sum($"All-Nos"))                                     
                         

Now we apply a group by on gender. Then aggregate to calculate the sum of the other two columns. All we are doing here is nothing but simply chaining the data frame APIs. The syntax might look weird in the beginning, but you will be comfortable with this style in few days. All the methods that I am using in this example are available in dataset documentation with easy to follow examples.
If you check the output for the above transformation, you will realize that the Gender field is not very well coded. We have a data quality problem. There are many ways to deal with that issue. You might want to use a tricky regular expression and translate each value to one of the genders. However, we have a small dataset, and I think the safest method is to make a list of all unique values and handle them using a match case expression. I can quickly create a Scala function to handle it.
Here is the code. Well, this is a plain scala code.

                                
    def parseGender(g: String) = {  
                g.toLowerCase match {
                    case "male" | "m" | "male-ish" | "maile" |
                         "mal" | "male (cis)" | "make" | "male " |
                         "man" | "msle" | "mail" | "malr" |
                         "cis man" | "cis male" => "Male"
                    case "cis female" | "f" | "female" |
                         "woman" |  "femake" | "female " |
                         "cis-female/femme" | "female (cis)" |
                         "femail" => "Female"
                    case _ => "Transgender"
                }
    }                                             
                         

We want this function to be available to all the executors. you can do that by registering this function as a UDF. Registering a UDF is as simple as passing the function to the UDF function.

                                
    val parseGenderUDF = udf( parseGender _ )                                             
                         

Spark will serialize the function on the driver and transfer it over the network to all executor processes. So, now we can use the parseGenderUDF in our data frames.
Let's create another transformation to fix our data quality problem.

                                
    val df3 = df2.select((parseGenderUDF($"Gender")).alias("Gender"),
                            $"All-Yes",
                            $"All-Nos"
                        )                                             
                         

I am using the data frame df2 and applying another select transformation. This time, we apply the parseGenderUDF to the gender field. We also take All-Yes and All-Nos fields that we created earlier. Now, we can do a group by on df3.

                                
    val df4 = df3.groupBy("Gender").agg( sum($"All-Yes"),sum($"All-Nos"))                                           
                         

Great. You can check the output using below statement.

                                
        df4.show                                           
                         

You shoud get the desired output.
We did this in several steps. You might have lost it in between. So, Let me list down all the code at once.

                                
    spark.conf.set("spark.sql.shuffle.partitions", 2)
    val df = spark.read
                  .format("csv")
                  .option("header","true")
                  .option("inferSchema","true") 
                  .option("nullValue","NA")
                  .option("timestampFormat","yyyy-MM-dd'T'HH:mm?:ss")
                  .option("mode","failfast")
                  .option("path","/home/prashant/spark-data/mental-health-in-tech-survey/survey.csv")
                  .load()
    val df1 = df.select( $"Gender",$"treatment")
    val df2 = df.select($"Gender",
                         (when($"treatment" === "Yes", 1).otherwise(0)).alias("All-Yes"),
                         (when($"treatment" === "No", 1).otherwise(0)).alias("All-Nos")
                       )
    def parseGender(g: String) = {  
      g.toLowerCase match {
        case "male" | "m" | "male-ish" | "maile" |
             "mal" | "male (cis)" | "make" | "male " |
             "man" | "msle" | "mail" | "malr" |
             "cis man" | "cis male" => "Male"
        case "cis female" | "f" | "female" |
             "woman" |  "femake" | "female " |
             "cis-female/femme" | "female (cis)" |
             "femail" => "Female"
        case _ => "Transgender"
       }
       }
    val parseGenderUDF = udf(parseGender _)
    val df3 = df2.select((parseGenderUDF($"Gender")).alias("Gender"),
                          $"All-Yes",
                          $"All-Nos"
                        )
    val df4 = df3.groupBy("Gender").agg( sum($"All-Yes"),sum($"All-Nos"))
    val df5 = df4.filter($"Gender" =!= "Transgender")
    df5.collect                                           
                         

There are few more things covered in the video. So don't miss the video.



You will also like:


Scala Variable length arguments

How do you create a variable length argument in Scala? Why would you need it?

Learning Journal

Local Functions

How do you implement private methods in a functional programming language.

Learning Journal

Referential Transparency

Referential Transparency is an easy method to verify the purity of a function.

Learning Journal

Statements and Expressions

Statements and Expressions in Scala. How are they different?

Learning Journal

Lazy Evaluations

Evaluate the expression now vs evaluate it for the first use. Strict vs Lazy?

Learning Journal