Apache Spark Foundation Course - Spark Database and Tables


Welcome back to Learning Journal. SQL is one of the key skills for data engineers and data scientists. Apache Hive celebrates the credit to bring SQL into Bigdata toolset, and it still exists in many production systems. However, we see a growing trend of migrating Hive workloads to Spark SQL. Spark SQL is the most popular and prominent feature of Apache Spark, and that's the topic for this video. So, let's start.
The moment we talk about SQL, a bunch of things starts flashing in our mind. Here is the list.

  1. Database
  2. Schema and Datatypes
  3. Tables
  4. Views
  5. SQL functions
  6. User-defined functions
  7. Metadata
  8. Execution plans & opportunities for optimization
  9. SQL Clients and connectivity
  10. DDL and DML constructs

You want something like a database that allows you to organize your tables and views. Right? Then you want to know the supported schema structure and the datatypes.
What SQL functions does it support? Does it allow you to create new functions?
Where is the metadata stored and How can you access the metadata?
What opportunities do I have for optimization?
And finally, you want to know about the clients? Does it support the SQL client? How do I connect from a remote machine? What about Notebooks? How do I connect and pull data from Spark to my BI tools? Does it support JDBC and ODBC?
I mean, the moment you call something SQL compliant, we start expecting all these things because these are the most basic and obvious features.
DDL and DML syntax is the last thing. We already understand that the SQL comes in different flavours. They largely comply with standards, but every database has got an SQL dialect, and so the Spark SQL is no different than others. Spark implements a subset of SQL:2003 standard, and hence, every SQL construct and function that you might know is not available in Spark, but you have more than enough SQL support. In addition to ANSI SQL syntax, Spark SQL also supports a larger chunk of HiveQL. We will cover all these things with appropriate examples.


Spark SQL Clients

Let's start with a list of supported clients. Apache Spark allows you to execute SQL using a variety of methods. The most widely used methods are listed here.

  1. A traditional command line SQL tool
  2. Programmatic SQL interface
  3. JDBC/ODBC over Spark SQL thrift server
  4. Apache Zeppelin or other notebooks

The easiest method to use Spark SQL is to use from command line. Let's try it.
The tool is the spark-sql. The command line tool is not much popular among Spark developers. You cannot install and use it from a remote machine. However, it is still a good tool to test your Spark queries and execute your SQL scripts from command line.
It throws a lot of debug messages for each SQL. However, you can start it in silent mode to avoid unnecessary debug messages.

                                
    spark-sql -S                                             
                         

Spark SQL database

Let's try some examples. The first thing that I want to do is to create a database. Spark SQL comes with a default database. If you do not specify a database, that means, you are referring to the default database. We don't want to do that so let's create a new database. You can create a database using following code.

                                
    CREATE DATABASE mysparkdb
    LOCATION '/home/prashant/mysparkdb/';                                   
                         

Simple. Isn't it? Every Spark database has a default location. If the specified path does not already exist, this command will try to create a directory with the given path. When you drop the database, Spark will delete that directory.If you already have a database, you can describe it.

                                
    DESCRIBE DATABASE mysparkdb;                                   
                         

The describe command shows you the current location of the database. If you create the database without specifying a location, Spark will create the database directory at a default location. You can get your default location using the following command.

                                
    SET spark.sql.warehouse.dir;                                   
                         

If you want to change the default database setting, you can change this setting at session level using SET command, or you can set it permanently using Spark configuration files.
I am using a multi-node Hadoop/Spark cluster in Google Cloud. My default filesystem is HDFS. So, the create database statement will look for the specified directory location in HDFS. If the directory does not exist, Spark SQL will create a directory for this database in HDFS. Let's check it.

                                
    hadoop fs -ls /home/prashant/                                   
                         

If you want to create your database in Google storage bucket, all you need to do is to specify a fully qualified Google storage location.

                                
    CREATE DATABASE gs_sparkdb
    LOCATION 'gs://pkp-gcp-bucket/gs_sparkdb/';                                     
                         

Similarly, if you are using AWS EMR cluster, you can create your database in S3 bucket. Like Google and Amazon, every cloud provider offers an integrated HDFS compatible storage solution. If you are using Cloud environment, you are most likely to use that cloud storage instead of using HDFS. There are many reasons to use cloud storage. They are cheaper, reliable, atomic, version controlled, and you get the freedom to scale up or scale down your cluster size depending upon your dynamic compute requirements.
Great! Now if I create a table in this database, Spark SQL will create a subdirectory for the table and place the data files in that subdirectory. That's what the database means for Apache Spark. It is just a namespace and a directory location.

Spark SQL Tables

Let's create a table. I cannot cover the syntax for all DDL and DML statements in this video. So, I was looking for some good Spark SQL reference documentation , and unfortunately, I found just one at Databricks. It is not fully comprehensive, but that's what we have.
You can refer the documentation for the syntax. However, I want to cover CREATE TABLE syntax here. And the reason is particularly important. I want you to understand the correlation between the Spark SQL syntax and the data frame APIs. I will come to that point and explain the correlation but for now, let's assume that you have some data in a CSV file and you want to create a table and load that data into the table.
If you already know Hive, you might have done it using following HiveQL commands.

                                
    CREATE TABLE IF NOT EXISTS mysparkdb.hive_surveys(
        TIME_STAMP TIMESTAMP, 
        AGE LONG, 
        GENDER STRING, 
        COUNTRY STRING, 
        STATE STRING, 
        SELF_EMPLOYED STRING, 
        FAMILY_HISTORY STRING, 
        TREATMENT STRING, 
        WORK_INTERFERE STRING, 
        NO_EMPLOYEES STRING, 
        REMOTE_WORK STRING, 
        TECH_COMPANY STRING, 
        BENEFITS STRING, 
        CARE_OPTIONS STRING, 
        WELLNESS_PROGRAM STRING, 
        SEEK_HELP STRING, 
        ANONYMITY STRING, 
        LEAVE STRING, 
        MENTAL_HEALTH_CONSEQUENCE STRING, 
        PHYS_HEALTH_CONSEQUENCE STRING, 
        COWORKERS STRING, 
        SUPERVISOR STRING, 
        MENTAL_HEALTH_INTERVIEW STRING, 
        PHYS_HEALTH_INTERVIEW STRING, 
        MENTAL_VS_PHYSICAL STRING, 
        OBS_CONSEQUENCE STRING, 
        COMMENTS STRING)
    ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde' 
    STORED AS TEXTFILE;                                     
                         

Once you have a table, you might want to load data into the table as shown below.

                                
    LOAD DATA INPATH '/home/prashant/spark-data/csv/surveys.csv' 
    INTO TABLE mysparkdb.hive_surveys;                                      
                         

Since SparkSQL also supports the majority of HiveQL, you can easily execute these HiveQL statements in SparkSQL. Let's do that.Now you can easily query that table.

                                
    select * from mysparkdb.hive_surveys limit 5;                                   
                         

You can describe the table and check some details about the table.There are two important things to notice here.
The type of the table and the provider. It is a managed table, and it is a Hive compatible table because we used Hive syntax to create the table. If you are using HiveQL syntax to create a table, Spark will default to Hive SerDes. Hive SerDes might not be optimized to use Spark specific serialization features, and hence they might perform slower than Spark's native serialization. So, we don't recommend the use of HiveQL for creating tables.
That means Spark's create table statement is slightly different than HiveQL. Here is the equivalent Spark SQL code.

                                
    CREATE TABLE IF NOT EXISTS mysparkdb.spark_surveys(
        TIME_STAMP TIMESTAMP, 
        AGE LONG, 
        GENDER STRING, 
        COUNTRY STRING, 
        STATE STRING, 
        SELF_EMPLOYED STRING, 
        FAMILY_HISTORY STRING, 
        TREATMENT STRING, 
        WORK_INTERFERE STRING, 
        NO_EMPLOYEES STRING, 
        REMOTE_WORK STRING, 
        TECH_COMPANY STRING, 
        BENEFITS STRING, 
        CARE_OPTIONS STRING, 
        WELLNESS_PROGRAM STRING, 
        SEEK_HELP STRING, 
        ANONYMITY STRING, 
        LEAVE STRING, 
        MENTAL_HEALTH_CONSEQUENCE STRING, 
        PHYS_HEALTH_CONSEQUENCE STRING, 
        COWORKERS STRING, 
        SUPERVISOR STRING, 
        MENTAL_HEALTH_INTERVIEW STRING, 
        PHYS_HEALTH_INTERVIEW STRING, 
        MENTAL_VS_PHYSICAL STRING, 
        OBS_CONSEQUENCE STRING, 
        COMMENTS STRING)
    USING CSV  
    OPTIONS ( header='false',
              nullvalue='NA',
              timestampFormat="yyyy-MM-dd'T'HH:mm:ss");                                       
                         

Did you notice the difference? Instead of ROW FORMAT and STORED AS, we are writing the USING keyword. If you don't know HiveQL, don't even worry about that.


Spark API vs Spark SQL

Great! Now it is time to show you the correlation between Spark data frame APIs and the Spark SQL syntax. Do you still recall the data frame reader API? Let me show you.
Here is the code that we used to read the data from a CSV source.

                                
    val df = spark.read
                  .format("csv")
                  .option("header","true")
                  .option("inferSchema","true")
                  .option("nullValue","NA")
                  .option("timestampFormat","yyyy-MM-dd'T'HH:mm:ss")
                  .option("mode","failfast")
                  .option("path","/home/prashant/spark-data/csv/surveys.csv")
                  .load()                                     
                         

Does it look like CREATE TABLE statement? In both methods, we tell the file format and then provide a bunch of options. Both methods must know the mechanism to read the file, and hence all the options for a CSV file type, that we learned earlier are valid for CREATE TABLE as well.
There are two more things that we specified to the data frame reader.

  1. Schema
  2. Data file location

We are specifying the Schema in the CREATE TABLE. So that's taken care. Right?
If you want, you can specify the data file location as well. How to do that?
We already learned that earlier. Use the option to specify a path. Like this.

                                
    CREATE TABLE IF NOT EXISTS mysparkdb.external_spark_surveys(
        TIME_STAMP TIMESTAMP, 
        AGE LONG, 
        GENDER STRING, 
        COUNTRY STRING, 
        STATE STRING, 
        SELF_EMPLOYED STRING, 
        FAMILY_HISTORY STRING, 
        TREATMENT STRING, 
        WORK_INTERFERE STRING, 
        NO_EMPLOYEES STRING, 
        REMOTE_WORK STRING, 
        TECH_COMPANY STRING, 
        BENEFITS STRING, 
        CARE_OPTIONS STRING, 
        WELLNESS_PROGRAM STRING, 
        SEEK_HELP STRING, 
        ANONYMITY STRING, 
        LEAVE STRING, 
        MENTAL_HEALTH_CONSEQUENCE STRING, 
        PHYS_HEALTH_CONSEQUENCE STRING, 
        COWORKERS STRING, 
        SUPERVISOR STRING, 
        MENTAL_HEALTH_INTERVIEW STRING, 
        PHYS_HEALTH_INTERVIEW STRING, 
        MENTAL_VS_PHYSICAL STRING, 
        OBS_CONSEQUENCE STRING, 
        COMMENTS STRING)
    USING CSV  
    OPTIONS (header='false',
             nullvalue='NA',
             timestampFormat="yyyy-MM-dd'T'HH:mm:ss",
             path='/home/prashant/spark-data/csv/surveys.csv');                                      
                         

There is another shortcut. Specify the location parameter.


                                
    DROP TABLE IF EXISTS mysparkdb.external_spark_surveys;
    CREATE TABLE IF NOT EXISTS mysparkdb.external_spark_surveys(
        TIME_STAMP TIMESTAMP, 
        AGE LONG, 
        GENDER STRING, 
        COUNTRY STRING, 
        STATE STRING, 
        SELF_EMPLOYED STRING, 
        FAMILY_HISTORY STRING, 
        TREATMENT STRING, 
        WORK_INTERFERE STRING, 
        NO_EMPLOYEES STRING, 
        REMOTE_WORK STRING, 
        TECH_COMPANY STRING, 
        BENEFITS STRING, 
        CARE_OPTIONS STRING, 
        WELLNESS_PROGRAM STRING, 
        SEEK_HELP STRING, 
        ANONYMITY STRING, 
        LEAVE STRING, 
        MENTAL_HEALTH_CONSEQUENCE STRING, 
        PHYS_HEALTH_CONSEQUENCE STRING, 
        COWORKERS STRING, 
        SUPERVISOR STRING, 
        MENTAL_HEALTH_INTERVIEW STRING, 
        PHYS_HEALTH_INTERVIEW STRING, 
        MENTAL_VS_PHYSICAL STRING, 
        OBS_CONSEQUENCE STRING, 
        COMMENTS STRING)
    USING CSV   
    OPTIONS (header='false',
             nullvalue='NA',
             timestampFormat="yyyy-MM-dd'T'HH:mm:ss")
    LOCATION '/home/prashant/spark-data/csv/surveys.csv';                                       
                         

However, there is a catch here. We don't want our table to refer to this CSV file from that location. We want our table to store data inside the database directory that we created earlier. Right?
Let me formalize this idea.

Spark Managed vs Unmanaged tables

Spark SQL supports two types of tables.

  1. Managed Tables
  2. Unmanaged tables or external tables.

Spark stores a managed table inside the database directory location. If you drop a managed table, Spark will delete the data file as well as the table subdirectory. And that is fair because that's what you wanted to do when you issued a drop table statement. Right?
The unmanaged files are external tables. That means they reside somewhere outside the database directory. If you drop an unmanaged table, Spark will delete the metadata entry for that table, and because of that drop table statement, you won't be able to access that table using Spark SQL. However, the data file for that unmanaged table still resides at the original location. Spark leaves that file as it was.
Most of the time, if you are creating a database and then creating a table in that database, you would be using managed tables. That approach is simple and clean. That's what we have been doing with all other database systems.
So, what is the purpose of those external tables?


Why do we need External Tables?

Suppose you have some data that resides in some other filesystem location or maybe in some other storage system, it may be in a JDBC database, or Cassandra or may be in MongoDB. That data is stored, maintained and managed by a different system or a different team. You don't own it, but you want to make it available to your Spark database application and your application users in the same manner as they are using your other managed tables. You don't want to make a copy of it but to refer the same one as a locally managed table. How would you do it? Unmanaged table or some people call it as the external table. That is when you would be using an external table in Apache Spark.
Great! Let's come back to our original discussion. The create table statement. If you specify the path option or a location parameter, Spark will make it an external table. Let's try both the options and check out the difference.
So, the first statement should create an external table because we specified the path option.
And the second statement should create a managed table because we do not specify a file location.
Describe the first table.

                                
    describe table extended mysparkdb.external_spark_surveys;                                   
                         

Good! It is an external table.
Check the second table.

                                
    describe table extended mysparkdb.spark_surveys;                                   
                         

This one is a managed table. Right?
There is no need to load the data into an external table because it refers to the data file from its original location and the file already contains the data. But what about the managed table?
My managed table does not contain any data yet. How to load the data into a managed table?
The LOAD DATA statement that we used earlier is only available for tables that you created using Hive format. You cannot use the LOAD DATA on a Spark SQL table. That sounds like a problem. Isn't it?
How do we load data into a Spark SQL managed table? I will come back to this question in the next video. But before I conclude the first part of the Spark SQL, let me highlight the main take away from this session.

Spark SQL Create Table

Spark SQL internally implements data frame API and hence, all the data sources that we learned in the earlier video, including Avro, Parquet, JDBC, and Cassandra, all of them are available to you through Spark SQL.
In this example, I have some data into a CSV file. I wanted to create a managed table and load that data from CSV file to the managed table in Spark. Since CSV file is not an efficient method to store data, I would want to create my managed table using Avro or Parquet. We already learned Parquet data source. So, let's use that knowledge to create a Parquet table, and we will load the data into this table from the CSV source.
Here is a CREATE TABLE statement to create a parquet table.


                                
    CREATE TABLE IF NOT EXISTS mysparkdb.spark_surveys(
        TIME_STAMP TIMESTAMP, 
        AGE LONG, 
        GENDER STRING, 
        COUNTRY STRING, 
        STATE STRING, 
        SELF_EMPLOYED STRING, 
        FAMILY_HISTORY STRING, 
        TREATMENT STRING, 
        WORK_INTERFERE STRING, 
        NO_EMPLOYEES STRING, 
        REMOTE_WORK STRING, 
        TECH_COMPANY STRING, 
        BENEFITS STRING, 
        CARE_OPTIONS STRING, 
        WELLNESS_PROGRAM STRING, 
        SEEK_HELP STRING, 
        ANONYMITY STRING, 
        LEAVE STRING, 
        MENTAL_HEALTH_CONSEQUENCE STRING, 
        PHYS_HEALTH_CONSEQUENCE STRING, 
        COWORKERS STRING, 
        SUPERVISOR STRING, 
        MENTAL_HEALTH_INTERVIEW STRING, 
        PHYS_HEALTH_INTERVIEW STRING, 
        MENTAL_VS_PHYSICAL STRING, 
        OBS_CONSEQUENCE STRING, 
        COMMENTS STRING)
    USING PARQUET;                                      
                         

I don't think you need an explanation for this statement.
Great! That's it for this session. In the next session, we will load the CSV data into this table and learn few more things about Spark SQL.
Thank you very much for watching Learning Journal. Keep learning and keep growing.


You will also like:


Scala Variable length arguments

How do you create a variable length argument in Scala? Why would you need it?

Learning Journal

Local Functions

How do you implement private methods in a functional programming language.

Learning Journal

Referential Transparency

Referential Transparency is an easy method to verify the purity of a function.

Learning Journal

Statements and Expressions

Statements and Expressions in Scala. How are they different?

Learning Journal

Lazy Evaluations

Evaluate the expression now vs evaluate it for the first use. Strict vs Lazy?

Learning Journal