Apache Spark Foundation Course - Dataframe Basics


We had an introduction to Apache Spark and also learned Spark Architecture in the earlier videos. Now it is time to start getting into the Spark application development. Sometimes, it is quite convenient to break things into separate areas and narrow down our study focus. For that purpose, I can classify Apache Spark programming in following areas.

  1. Batch Processing
  2. Interactive Query
  3. Streaming
  4. Machine Learning
  5. Graph processing

In this video and some of the upcoming videos, we will try to restrict ourselves to the batch processing in Apache Spark.
We also learned that every Spark application does three things.

  1. Load
  2. Process
  3. Write

The first thing is to load the data from a source system. There are hundreds of possible source systems, even if we leave out the streaming sources, the number is still quite large. However, all of them comes under the scope of loading data into Apache Spark. We cannot cover all of them, but we will try to pick up some fundamental sources and add separate content for other sources over an extended period.
The final part is to write your outcome to a destination system. Again, there are hundreds of options to be considered for a destination system. We will cover some essential destinations types and add on few more at later stages.
In the middle, we have the data processing. It might be simple aggregation, grouping, and sorting or maybe some other complicated things. Whatever you do with the loaded data, It comes under the umbrella of processing the data. However, contrary to a large number of sources and destinations, we have just three options to hold the data in Apache Spark.

  1. Spark Dataframe
  2. Spark DataSet
  3. Spark RDD

It may be the raw data that you loaded from a source system or the intermediate data, or it may be an outcome. Everything resides in one of these three structures.

Spark Dataframes

A Spark data Frame is a distributed collection of structured data. Since they hold structured data, you can think of them as a database table with a schema attached to it. Well, that is a lot to understand. Let's take an example. To create a data frame, we need to load some data. Let me download a sample data file from kaggle.com.
If you have a Kaggle account, you can also download the same data file as I am using for this video. Let's take a quick look at the data file.

                                        
    "Timestamp","Age","Gender","Country","state","self_employed","family_history","treatment","work_interfere","no_employees","remote_work","tech_company","benefits","care_options","wellness_program","seek_help","anonymity","leave","mental_health_consequence","phys_health_consequence","coworkers","supervisor","mental_health_interview","phys_health_interview","mental_vs_physical","obs_consequence","comments"
    2014-08-27 11:29:31,37,"Female","United States","IL",NA,"No","Yes","Often","6-25","No","Yes","Yes","Not sure","No","Yes","Yes","Somewhat easy","No","No","Some of them","Yes","No","Maybe","Yes","No",NA
    2014-08-27 11:29:37,44,"M","United States","IN",NA,"No","No","Rarely","More than 1000","No","No","Don't know","No","Don't know","Don't know","Don't know","Don't know","Maybe","No","No","No","No","No","Don't know","No",NA
    2014-08-27 11:29:44,32,"Male","Canada",NA,NA,"No","No","Rarely","6-25","No","Yes","No","No","No","No","Don't know","Somewhat difficult","No","No","Yes","Yes","Yes","Yes","No","No",NA
    2014-08-27 11:29:46,31,"Male","United Kingdom",NA,NA,"Yes","Yes","Often","26-100","No","Yes","No","Yes","No","No","No","Somewhat difficult","Yes","Yes","Some of them","No","Maybe","Maybe","No","Yes",NA
    2014-08-27 11:30:22,31,"Male","United States","TX",NA,"No","No","Never","100-500","Yes","Yes","Yes","No","Don't know","Don't know","Don't know","Don't know","No","No","Some of them","Yes","Yes","Yes","Don't know","No",NA
    2014-08-27 11:31:22,33,"Male","United States","TN",NA,"Yes","No","Sometimes","6-25","No","Yes","Yes","Not sure","No","Don't know","Don't know","Don't know","No","No","Yes","Yes","No","Maybe","Don't know","No",NA
    2014-08-27 11:31:50,35,"Female","United States","MI",NA,"Yes","Yes","Sometimes","1-5","Yes","Yes","No","No","No","No","No","Somewhat difficult","Maybe","Maybe","Some of them","No","No","No","Don't know","No",NA
    2014-08-27 11:32:05,39,"M","Canada",NA,NA,"No","No","Never","1-5","Yes","Yes","No","Yes","No","No","Yes","Don't know","No","No","No","No","No","No","No","No",NA
    2014-08-27 11:32:39,42,"Female","United States","IL",NA,"Yes","Yes","Sometimes","100-500","No","Yes","Yes","Yes","No","No","No","Very difficult","Maybe","No","Yes","Yes","No","Maybe","No","No",NA
                                    
                            

I notice following things about the data.

  1. It is a CSV file.
  2. The first row is a header that contains the column names.
  3. String values are surrounded by a pair of double quotes.
  4. Null values are marked as NA
  5. The timestamp is in the following format - YYYY-MM-DD HH24: MM: SS

These observations are good enough to load the data correctly. Now, all that we need is an API call to load this data into Apache Spark. However, Apache Spark is continuously evolving, and a lot of the APIs are still in experimental stage. So, I recommend you to refer the documentation for the version that you are planning to use. Let me take the opportunity to introduce you to the Apache Spark API documentation.
Go to the Apache Spark home page. Navigate to the latest release documentation. Choose API docs and then Scala.
If you prefer using Python, you can jump to the Python documentation as well.
The good news is that the Data Frame APIs are synonymous in Scala and Python.


Spark Session

Once you open the Spark documentation, you might wonder, Where to start? A Spark application starts with a Spark Session. So, look for the Spark Session in the search bar.
A Spark session is the entry point to programming Spark with Data Frame APIs. We will be using an interactive client, so we already have a Spark Session available to us. Spark shell creates a Spark Session upfront for us. However, I will come back to Spark session builder when we build and compile our first Spark application. The most critical Spark Session API is the read method. It returns a Data Frame Reader. So let's jump to the Data Frame Reader.

Spark DataFrameReader

A Data Frame Reader offers many APIs. There is one specifically designed to read a CSV files. It takes a file path and returns a Data Frame. The CSV method could be the most convenient and straightforward method to load CSV files into a Data Frame. It also allows you to specify a lot many options. We use these settings to the tell the API about our source data file so that the API could interpret the file correctly. For example the header option. You can set the header option as TRUE, and the API knows that the first line in the CSV file is a header. The header is not a data row so that the API should skip the first row from loading.
Following other options are immediately relevant for our example.

1. quote -> Quote is the character used to enclose the string values. Quoting your string value is critical if you have a field that contains a comma. The default value is the double quote character, and hence we can rely on the default value for our example.

2. inferSchema -> Infer schema will automatically guess the data types for each field. If we set this option to TRUE, the API will read some sample records from the file to infer the schema. If we want to set this value to false, we must specify a schema explicitly.

3. nullValue -> The null value is used to define the string that represents a null.

4. timestampFormat -> This one is to declare the timestamp format used in your CSV file.

5. mode -> This one is crucial. It defines the method for dealing with a corrupt record. There are three supported modes.
PERMISSIVE, DROPMALFORMED, and FAILFAST.
The first two options allow you to continue loading even if some rows are corrupt. The last one throws an exception when it meets a corrupted record. We will be using the last one in our example because we do not want to proceed in case of data errors.
We can use the following code to load the data from the CSV file.

Spark Example to load a csv file in Scala

                                
    val df = spark.read.options( Map("header" -> "true",
                                     "inferSchema" -> "true", 
                                     "nullValue" -> "NA",
                                     "timestampFormat" -> "yyyy-MM-dd'T'HH:mm​:ss",
                                     "mode" -> "failfast")
                                ).csv("/home/prashant/spark-data/survey.csv")                                        
                            

Spark Example to load a csv file in Python

                                
    df = spark.read.options(header="true", \
                            inferSchema="true", \
                            nullValue = "NA", \
                            timestampFormat= "yyyy-MM-dd'T'HH:mm​:ss", \
                            mode = "failfast").csv("/home/prashant/spark-data/survey.csv")                                      
                            

Let me explain the code.
Spark is the variable name for the Spark Session object. The read method is an API defined over a Spark Session object. The read method returns an instance of Data frame Reader object, and hence we call the options API over a Data Frame Reader. The Options API takes a key value pair of all the options that we want to specify. Finally, we call the CSV method and provide the file location.
I am using Spark in local mode and hence I am giving the local file path. If you are trying to do it on a Hadoop cluster, you must move your file to HDFS and specify the HDFS file location. We will implement the next example in a Hadoop Cluster. I will show you the details for loading data from an HDFS location as well as from Gooogle Storage bucket.
Great. Copy the code. Go to Spark Shell. Activate the paste mode (type :paste). Paste the code. Press enter. Then press Control+D to close the paste mode. The value df is your Data Frame.
The method CSV is a convenient approach to load a CSV file. However, we have a more generic API structure for reading data. Here is an equivalent API call.

Spark API to load a csv file in Scala

                                
    val df = spark.read
                  .format("csv")
                  .option("header","true")
                  .option("inferSchema","true") 
                  .option("nullValue","NA")
                  .option("timestampFormat","yyyy-MM-dd'T'HH:mm​:ss")
                  .option("mode","failfast")
                  .option("path","/home/prashant/spark-data/survey.csv")
                  .load()                                        
                            

Spark API to load a csv file in Python

                                
    df = spark.read \
              .format("csv") \
              .option("header","true") \
              .option("inferSchema","true") \
              .option("nullValue","NA") \
              .option("timestampFormat","yyyy-MM-dd'T'HH:mm​:ss") \
              .option("mode","failfast") \
              .option("path","/home/prashant/spark-data/survey.csv") \
              .load()                                      
                            

The above API structure is more generic and supports many different sources. Sometimes, it is a good idea to use a consistent universal API structure across your code instead of using one for CSV and another one for JSON.
If you compare it with the earlier API chain, you will notice following differences.

  1. We are using a format method to specify the data source type. There are many formats available to us, and the community is working to add a lot more. Some of my favorites are CSV, JSON, parquet, JDBC, Kafka, and Cassandra.
  2. In the earlier API call, we created a MAP of all the options and passed them at once. You can do that in this API call as well. However, I prefer to supply each option individually. It is just a personal choice.
  3. You might notice that I provided the file location as an option. However, the load method can also take the path as an argument. Which one to use is again a personal preference.

We learned two Spark Classes.

  1. SparkSession
  2. DataFrameReader

I recommend you to spend some time reading the documentation for these two classes.

Where is DataFrame documentation?

Surprisingly, there is no Scala Class for Data Frame. You might find it in Python documentation, but in Scala, Data Frame is not a class. A Scala Data Frame is a data set of the following type.
type DataFrame = Dataset[Row]
So, we have to look into the DataSet class. All the methods available to a DataSet is also available to the Data Frames. Well, A hell lot of methods, But don't worry about them because 80% of your requirement can be satisfied by just a handful of these APIs, and I will cover the most critical ones.

Final point on Spark DataFrames

A Spark data Frame is a distributed collection of structured data. Since they hold structured data, you can think of them as a database table with a schema attached to it.
I said that a data frame is a distributed collection. Let's try to understand that.
I have already covered some basics about RDDs in the earlier video. You already know that the RDD is a partitioned collection. A data frame is nothing but an abstraction on top of RDD. You can think of it as a simplified and optimized RDD for structured data. Since an RDD is broken down into partitions, and Data Frame is just an abstraction over RDD, hence a data frame is also partitioned. You can bring that fact to your terminal using the below code.


                                
    df.rdd.getNumPartitions                                  
                            

In this code, I got the underlying RDD and printed the number of partitions. I can create a new data frame by doing a repartitioning on the existing one.

                                    
    val df5= df.repartition(5).toDF                                 
                                

Now, we can again check the number of partitions in the underlying RDD.

                                    
    df5.rdd.getNumPartitions                                 
                                

So, a Data Frame is a partitioned collection. Just like an RDD. In fact, under the hood, they have an RDD. The next thing that I mentioned about Data Frames is this. They hold structured data, and they have a schema. You can see it using below command.

                                    
    df.select("Timestamp", "Age","remote_work","leave").filter("Age > 30").show                                 
                                

Does it look like a select statement on a database table? The data frame follows a row-column structure like a database table. You can pick up the desired columns and apply filters on the rows. Every column has a name and data type attached to it. You can print the Data Frame schema using the below command.

                                    
    df5.printSchema                                 
                                

It shows the list of all the columns. Every column has a name and a data type attached to it. We also have a constraint. Great. In this session, I gave you a formal introduction to Spark Data Frames. We also learned a primary method to load data into Spark Data Frames. In the next video, we will deep dive further into Data Frames.


You will also like:


Pattern Matching

Scala takes the credit to bring pattern matching to the center.

Learning Journal

What is a closure?

A closure is a function. Like any other Scala function, a Closure may be pure or impure.

Learning Journal

Hadoop Security

Hadoop security implementation using Kerberos.

Learning Journal

Statements and Expressions

Statements and Expressions in Scala. How are they different?

Learning Journal

Lazy Evaluations

Evaluate the expression now vs evaluate it for the first use. Strict vs Lazy?

Learning Journal