Apache Spark Foundation Course - SQL over Dataframes


In the earlier videos, we started our discussion on Spark Data frames. By now, you must have realized that all that you need to learn is to model your business requirements using Spark Transformations. Once you learn to write the transformation that meets your business requirement, you have almost completed Apache Spark foundation course.
You can continue learning Spark internals, tuning, optimizations, and other things like streaming and machine learning. However, modelling your business requirement into a series of transformations is the most critical part of Spark development.
It is like learning SQL. Once you know the SQL, you can claim to be a database developer. Similarly, once you master the transformations, you can claim to be a Spark Developer. Moreover, if you know SQL, you are already a good Spark developer. That is the topic of this video.
In this video, we will augment our Data Frame knowledge with our SQL skills. So, let’s start.

Amazing SQL

You have already seen some transformation code earlier.We used the read API to load the data from a CSV file.

                                
    val df = spark.read
    .format("csv")
    .option("header","true")
    .option("inferSchema","true") 
    .option("nullValue","NA")
    .option("timestampFormat","yyyy-MM-dd'T'HH:mm:ss")
    .option("mode","failfast")
    .load("/home/prashant/spark-data/survey.csv")
                                    
    //Then we applied a select transformation and a filter condition.
    val sel = df.select("Timestamp", "Age","remote_work","leave").filter("Age > 30")                                             
                        

If you are a database developer, you will see the above transformation as an SQL expression.

                                
    select timestamp, age,remote_work,leave
    from survey_tbl
    where age > 30;                                                             
                        

To make this SQL work, all you need is a table and an SQL execution engine.The good news is that the Spark offers you both of these things.
How?
We will see that in a minute.Before that, let's look at the other things that we did in the earlier video. We wanted to create a bar chart, and for that purpose, we did several things.

  1. Created a user-defined function.
  2. Applied the UDF in a select transformation.
  3. Applied to filter out the transgender.
  4. Applied a group by gender.
  5. Calculated the sum of Yes and No.

Assuming you are good at writing SQL, If I allow you to do all of that using a SQL statement, you would be able to do it as a single SQL expression.

                                
    select gender, sum(all_yes), sum(all_nos) 
    from (select case when lower(trim(gender)) in ('male','m','male-ish','maile','mal',
                                                   'male (cis)','make','male','man','msle',
                                                   'mail', 'malr','cis man', 'cis male') 
                      then 'Male' 
                      when lower(trim(gender)) in ('cis female','f','female','woman',
                                                 'femake','female ','cis-female/femme',
                                                 'female (cis)','femail') 
                      then 'Female'
                      else 'Transgender' 
                 end as gender,
                 case when treatment == 'Yes' then 1 else 0 end as all_yes,
                 case when treatment == 'No'  then 1 else 0 end as all_nos
          from survey_tbl)
          where gender != 'Transgender'
          group by gender                                      
                        

I hope you already get the sense of the point that I want to make.SQL is an excellent tool for many transformation requirements. Most of us are already skilled and comfortable with SQL. And for that reason, Apache Spark allows us to use SQL over a data frame.


Spark SQL requires Schema

Before we execute the above SQL in Spark, let's talk a little about the schema. A schema is nothing more than a definition for the column names and their data types. In our earlier example, we allowed the API to infer the schema. However, there are two approaches to handle schema.

  1. Let the data source define the schema, and we infer it from the source.
  2. Define a schema explicitly in your program and read the data using your schema definition.

When your source system offers a well-defined schema, schema inference is a reasonable choice. However, it is a good idea to define your schema manually while working with untyped sources such as CSV and JSON.
In our current example, we are loading data from a CSV file. So, the recommendation is to define the schema instead of using the inferSchema.

Spark Types to define Schema

In my earlier video, I said that the Spark is a programming language in itself. The Spark type system is the main reason behind that statement. Apache Spark maintains its own type information, and they designed data frames to use Spark types.
What does that mean?
That means, Data frames do not use Scala types or Python types. No matter which language are you using for your code, A Spark data frame API always uses Spark types. And I believe, that was a design decision to bring SQL over the data frames across the languages.
You can get the list of Spark Types in org.apache.spark.sql.types package.

How to define a Spark Schema

We are all set with the theoretical fundamentals. Let's do something practical. The below code defines a schema for the survey data set.

                                
    //You can create a Schema for survey data set using below code
    import org.apache.spark.sql.types._
    val surveySchema = StructType(Array(StructField("timestamp",TimestampType,true), 
                                        StructField("age",LongType,true), 
                                        StructField("gender",StringType,true), 
                                        StructField("country",StringType,true), 
                                        StructField("state",StringType,true), 
                                        StructField("self_employed",StringType,true), 
                                        StructField("family_history",StringType,true), 
                                        StructField("treatment",StringType,true), 
                                        StructField("work_interfere",StringType,true), 
                                        StructField("no_employees",StringType,true), 
                                        StructField("remote_work",StringType,true), 
                                        StructField("tech_company",StringType,true), 
                                        StructField("benefits",StringType,true), 
                                        StructField("care_options",StringType,true), 
                                        StructField("wellness_program",StringType,true), 
                                        StructField("seek_help",StringType,true), 
                                        StructField("anonymity",StringType,true), 
                                        StructField("leave",StringType,true), 
                                        StructField("mental_health_consequence",StringType,true),
                                        StructField("phys_health_consequence",StringType,true), 
                                        StructField("coworkers",StringType,true), 
                                        StructField("supervisor",StringType,true), 
                                        StructField("mental_health_interview",StringType,true),
                                        StructField("phys_health_interview",StringType,true),
                                        StructField("mental_vs_physical",StringType,true), 
                                        StructField("obs_consequence",StringType,true), 
                                        StructField("comments",StringType,true))
                                )

    //You can load the data using above schema
    val df = spark.read
                 .format("csv")
                 .schema(surveySchema)
                 .option("header","true")
                 .option("nullValue","NA")
                 .option("timestampFormat","yyyy-MM-dd'T'HH:mm:ss")
                 .option("mode","failfast")
                 .load("/home/prashant/spark-data/survey.csv")                                         
                        

Excellent. So, Spark data frame schema is a StructType that contains a set of StructFields. Each StructField defines a column. The StructField is a serializable class under Scala AnyRef.
The S StructField constructor can take four values.

  1. The name of the column
  2. The data type of the column.
  3. A boolean that tells if the field is nullable. This parameter defaults to true.
  4. You can also supply some metadata for each column. The metadata is nothing but a map of key-value pairs. The default value is empty.

The StructType is also a class that holds an array of StructFields. If you are using Python, both of those structs are same in Python as well. However, the Python StructType is a list of StructFields whereas Scala StrcutType is an array of StructField.
You can use the above code to create a schema and load your data using an explicit schema. Once loaded, you should have a data frame.

Spark Temporary View

Apache Spark allows you to create a temporary view using a data frame. It is just like a view in a database. Once you have a view, you can execute SQL on that view. Spark offers four data frame methods to create a view.

  1. createGlobalTempView
  2. createOrReplaceGlobalTempView
  3. createOrReplaceTempView
  4. createTempView

As you can guess by just looking at the method names, there are two types of temporary views. Temporary view and a Global Temporary view. We can also refer it as a Local Temporary view and a Global Temporary view. Let's try to understand the difference.

Global vs Local Temp view

The local temporary view is only visible to the current spark session. However, a Global temporary view is visible to the current spark application across the sessions.
Wait a minute. Do you mean a SparkSession and a Spark Application are two different things?
Yes. We normally start a Spark Application by creating a Spark session. To a beginner, it appears that a Spark Application can have a single session. However, that is not true. You can have multiple sessions in a single Spark application. The Spark session internally creates a Spark context. A SparkContext represents the connection to a Spark cluster. It also keeps track of all the RDDs, cached data as well as the configurations.
You cannot have more than one Spark Context in a single JVM. That means, one instance of an application can have only one connection to the cluster and hence a single Spark context. You cannot have more than one Spark context. However, your application can create multiple Spark Sessions. All those sessions will point to the same context, but you can have multiple sessions.
In your standard applications, you may not need to create multiple spark sessions. However, if you are developing an application that needs to support multiple interactive users, you might want to create one Spark Session for each user session. Ideally, we should be able to create multiple connections to Spark cluster for each user in the above use case, but creating multiple contexts is not yet supported by Spark. The documentation claims that they will remove this restriction in the future releases.
So, coming back to local temporary views, they are only visible to the current session. However, global temporary views are visible across the spark sessions within the same application.
In all this discussion, one thing is crystal clear. None of them are visible to other applications. So, you create a global temporary view or a local temporary view, they are always local to your application, and they live only till your application is alive.
Great, since I am not going to create multiple sessions, let me create a local temporary view.

                                
    df.createOrReplaceTempView("survey_tbl")                                                
                        

The method takes the name of the view as an argument. The above statement must have created a temporary table or a view. Where can you find it?
Well, a temporary view is maintained by the Spark session. So, let's check the Spark session.

                                
    spark.catalog.listTables.show                                                
                        

Spark session offers you a catalog. A catalog is an interface that allows you to create, drop, alter or query underlying databases, tables, and functions. I recommend you to at least go through the documentation for the catalog interface . The above statement is using the listTables method in the catalog. If you check the output of the above statement, you can see that the view that we created is a temporary table that doesn't belong to any databases.
Let's create a global temporary table and see if we can list that as well.

                                
    df.createOrReplaceGlobalTempView("survey_gtbl")                                                
                        

We used the appropriate method to create a Global temporary view on our data frame. I named the view as survey_gtbl. If you call the catalog listTables method once again. You won’t see that global table. There is a reason for that. A Global temp table belongs to a system database called global_temp. So, if you want to access the global temp table, you must look into the global_temp database. So, the correct method call should also specify the database name.

                                
    spark.catalog.listTables("global_temp").show                                               
                        

The output of the above statement should list your global temp table. Once you register the temp table, executing your SQL statement is a simple thing.

                                
    spark.sql("""select timestamp, age,remote_work,leave
    from survey_tbl
    where age > 30""")                                                      
                        

Execute an SQL on the spark session, and you get a data frame in return. So, if you think that the SQL is simpler to solve your problems, instead of using lengthy data frame API chains, you are free to use SQL. And surprisingly, you do not have a performance penalty. The SQL works as fast as a Data frame transformation.


                                
    spark.sql("""select gender, sum(yes), sum(no) 
            from (select case when lower(trim(gender)) in ('male','m','male-ish','maile','mal',
                                                           'male (cis)','make','male ','man','msle',
                                                           'mail','malr','cis man','cis male') 
                              then 'Male' 
                              when lower(trim(gender)) in ('cis female','f','female','woman',
                                                           'female','female ','cis-female/femme',
                                                           'female (cis)','femail') 
                              then 'Female'
                              else 'Transgender' 
                              end as gender,
                              case when treatment == 'Yes' then 1 else 0 end as yes,
                              case when treatment == 'No' then 1 else 0 end as no
                  from survey_tbl) 
         where gender != 'Transgender'
         group by gender""").show
                                                                                   
                        

So, instead of using a UDF and then a confusing chain of APIs, we can use an SQL statement to achieve whatever we did in the previous video. You can use API chains or an SQL, and both delivers the same performance. The choice is yours.


You will also like:


Functional Programming

What is Functional Programming and why it is important?

Learning Journal

First Class Functions

Function is a first-class citizen in functional programming. What does it mean?

Learning Journal

Hadoop Security

Hadoop security implementation using Kerberos.

Learning Journal

Pure Functions

What are pure functions and side effects. Start learning functional programming.

Learning Journal

Spark in Google cloud

Learn How to Install Hadoop and Spark in Google Cloud in just 2 minuts.

Learning Journal