Welcome back to Learning Journal. In the earlier video, we started our discussion on Spark SQL. I talked about Spark SQL
command line tool. I also covered CREATE DATABASE and CREATE TABLE statements. I showed you some
Hive queries, and you learned that if you already know Hive, you can use that knowledge with Spark
As I discussed in the earlier video, Spark offers many interfaces to execute your SQL statements.
- A traditional command line SQL tool
- Apache Zeppelin or other notebooks
- JDBC/ODBC over Spark SQL thrift server
- Programmatic SQL interface
But do you understand which one is appropriate for you? When should I use what? The answer to that question is straightforward. It depends on what you want to do.
When to use Spark SQL command line?
I prefer to place my DDLs into a script and execute it using Spark SQL command line tool. SQL is quite powerful. We use DDLs
to create objects like database, schema, tables, and views. But in most of the cases, that DDL is
executed just once. It is a one-time activity for a new deployment. And hence, DDLs make more sense
to be placed in a script file that we can run in the target environment. And that's where I use command
Here is a simple example. You can place below Spark DDL script in a file.
--File Name:-myddl.sql CREATE DATABASE IF NOT EXISTS mysparkdb LOCATION '/home/prashant/mysparkdb/'; CREATE TABLE IF NOT EXISTS mysparkdb.surveys( TIME_STAMP TIMESTAMP, AGE LONG, GENDER STRING, COUNTRY STRING, STATE STRING, SELF_EMPLOYED STRING, FAMILY_HISTORY STRING, TREATMENT STRING, WORK_INTERFERE STRING, NO_EMPLOYEES STRING, REMOTE_WORK STRING, TECH_COMPANY STRING, BENEFITS STRING, CARE_OPTIONS STRING, WELLNESS_PROGRAM STRING, SEEK_HELP STRING, ANONYMITY STRING, LEAVE STRING, MENTAL_HEALTH_CONSEQUENCE STRING, PHYS_HEALTH_CONSEQUENCE STRING, COWORKERS STRING, SUPERVISOR STRING, MENTAL_HEALTH_INTERVIEW STRING, PHYS_HEALTH_INTERVIEW STRING, MENTAL_VS_PHYSICAL STRING, OBS_CONSEQUENCE STRING, COMMENTS STRING) USING CSV OPTIONS ( header='true', nullvalue='NA', timestampFormat="yyyy-MM-dd'T'HH:mm:ss", path='/home/prashant/survey.csv'); CREATE OR REPLACE VIEW mysparkdb.filtered_surveys as SELECT AGE, REMOTE_WORK, COUNT(*) AGE_COUNT FROM mysparkdb.surveys GROUP BY AGE, REMOTE_WORK;
Then you can move this script to your target environment and execute it from the command line.
spark-sql -f myddl.sql
The above script is a simple example to give you an idea of the process. In the above example, I am creating a database,
an external table and finally a view over the external table.
If you notice the DDL statement for creating an external table, you will realize that we are not using a CREATE EXTERNAL TABLE statement.
The CREATE EXTERNAL TABLE statement is a HiveQL syntax. But since we wanted to create a Spark SQL external table, so we used Spark SQL syntax, and Spark SQL does not have CREATE EXTERNAL TABLE statement. The method to create an external table in Spark is as simple as to define a path option or a location parameter.
When to use Spark SQL notebooks?
Great! The second part of the SQL is the DML. I prefer using Apache Zeppelin or some other notebook while developing and
testing my SQL queries. Those SQL queries might become part of a View, and ultimately, they get a
place in the DDL script. I may end up using some of the SQL in data transformation pipelines. No
matter where I use them, but while developing, testing and visualizing the outcome of my SQL, I use
a notebook interface. And that's where the notebooks excel. We use them for SQL development or exploratory
Let me give you a quick demo of the Zeppelin. I am using Google Cloud platform for all my demos as well as my other POCs. And I recommend the same to all my viewers. I have already covered the steps to create a multi-node Hadoop/Spark cluster in Google Cloud. However, we didn't configure Zeppelin earlier. Let's create a new three node Hadoop/Spark cluster and configure Zeppelin as well.
How to configure Zeppelin
I have already covered step by step process to create a Hadoop/Spark cluster in Google cloud. I am not going to repeat the details here. Check out my Spark in Google Cloud video for detailed procedure. In that video, we configured Jupyter notebook using an initialization action. In this video, we will configure Zeppelin Notebook using a different initialization script. Use the following shell script.
The above script is developed by GCP team and it is available to all of us from a public Google cloud bucket.
Great! Hit the create button and GCP will create a Spark cluster and integrate Zeppelin.
How to access Zeppelin in GCP
We installed and configured Zeppelin. It is available to us at the master node URL at port 8080. To connect to the Apache Zeppelin web interface, you will need to create an SSH tunnel and use a SOCKS 5 Proxy. I have explained this process earlier. Let me quickly show you the steps once again.
- Install Google Cloud SDK. You can download the Google Cloud SDK from Google Cloud website.
- Once downloaded, start the SDK installer on your local machine and follow the on-screen instructions.
- Once you have the SDK, you should be able to use gcloud and gsutil command line tools.
- Use below command to create an SSH tunnel. You should make sure that you specify the zone name and the master node name correctly.
gcloud compute ssh --zone=us-east1-c --ssh-flag="-D" --ssh-flag="10000" --ssh-flag="-N" "spark-03-m"
- The above command should launch a new window. Minimize all the windows.
- The next step is to start a new browser session that uses the SOCKS proxy through the tunnel. Start a new terminal. Start your browser using the command shown below.
"C:\Program Files (x86)\Google\Chrome\Application\chrome.exe" "http://spark-03-m:8080" --proxy-server="socks5://localhost:10000" --host-resolver-rules="MAP * 0.0.0.0 , EXCLUDE localhost" --user-data-dir=/tmp/spark-03-m
- The above command should launch a new chrome browser session. You should be able to access the Zeppelin UI.
How to use Zeppelin
Once you have Zeppelin, you can create a new notebook. Give a name to your notebook. Choose your Interpreter. Apache Spark
is the default interpreter and that’s what we want.
Great! Now You can execute shell commands using %sh binding. Let me give you a quick demo. I have this survey data file. Let me create a Google storage bucket. Good. Now I am going to upload a csv data file into my bucket. Great! Now we are ready to play with this data. You can execute a shell command to see the list of files in your Google bucket.
%sh gsutil ls gs://pkp-gcp-bucket
Let me copy this file to a HDFS location. Use below command.
%sh hadoop fs -cp gs://pkp-gcp-bucket/survey.csv /home/prashant/
Let me check the file.
%sh hadoop fs -ls /home/prashant/
Now, we are ready to execute some Spark SQL commands. Let’s create a database.
You can execute an SQL command using the %sql binding.
%sql CREATE DATABASE IF NOT EXISTS mysparkdb LOCATION '/home/prashant/mysparkdb/'
Let’s create an external table using the HDFS file that we copied in the earlier step.
%sql CREATE TABLE IF NOT EXISTS mysparkdb.surveys( TIME_STAMP TIMESTAMP, AGE LONG, GENDER STRING, COUNTRY STRING, STATE STRING, SELF_EMPLOYED STRING, FAMILY_HISTORY STRING, TREATMENT STRING, WORK_INTERFERE STRING, NO_EMPLOYEES STRING, REMOTE_WORK STRING, TECH_COMPANY STRING, BENEFITS STRING, CARE_OPTIONS STRING, WELLNESS_PROGRAM STRING, SEEK_HELP STRING, ANONYMITY STRING, LEAVE STRING, MENTAL_HEALTH_CONSEQUENCE STRING, PHYS_HEALTH_CONSEQUENCE STRING, COWORKERS STRING, SUPERVISOR STRING, MENTAL_HEALTH_INTERVIEW STRING, PHYS_HEALTH_INTERVIEW STRING, MENTAL_VS_PHYSICAL STRING, OBS_CONSEQUENCE STRING, COMMENTS STRING) USING CSV OPTIONS ( header='true', nullvalue='NA', timestampFormat="yyyy-MM-dd'T'HH:mm:ss", path='/home/prashant/survey.csv')
Now you might want to explore the data quickly. Let's look at the distribution of ages in this data set.
%sql select age, count(*) frequency from mysparkdb.surveys where age between 20 and 65 group by age;
You can look at the output as a table, or you can visualize it.
I hope you get a sense of the Spark notebooks. They are an excellent tool for data scientists as well as the developers. From one single interface, you can execute shell commands, SQL, Scala code and a variety of other things.
Apache Spark Thrift Server
The next method is the JDBC/ODBC connection. The JDBC and then ODBC connectors are the most straightforward. I hope you already
understand the anatomy of JDBC/ODBC. In a typical JDBC/ODBC setup, you need two things. At the database
side, you need a listener process. And there comes a new tool called Spark Thrift server.
Spark Thrift server is a service that allows JDBC and ODBC clients to run Spark SQL queries. So, if you want to connect to Spark SQL database using JDBC/ODBC, you need to make sure that the Thrift server is properly configured and running on your Spark Cluster.
That was the first thing. The next thing that you will need is a JDBC/ODBC driver. If you want to use JDBC, then you need a JDBC driver. If you want to use ODBC, then you need ODBC driver. The driver is nothing but a set of APIs, and you need them on the client side. That's it. Once you have these two things properly installed and configured, you should be able to establish a JDBC or an ODBC connection. There are three more questions.
- When to use JDBC/ODBC connections?
- How to get JDBC/ODBC drivers?
- How to use JDBC/ODBC connections?
Let's talk about the first question.
When to use Spark JDBC/ODBC connections?
There are two scenarios when you might want to use this type of connection.
- You are creating a dashboard using some BI Visualization tool, and you want to pull some data from the Spark database. Tableau is one of the most popular BI tools in this category.
- You are developing a custom web-based application, and you want your App server to pull some data from Spark.
In both scenarios, you can directly connect to Spark SQL and pull the data over JDBC/ODBC connection.
How to get Spark JDBC/ODBC drivers?
Let's talk about the second question.Thrift server is part of the Spark distribution. So, you don't have to buy that separately, it comes along with Spark. However, the drivers are not freely available. If you are using Tableau, you should get it from Tableau. Similarly, other BI vendors also supply their versions of these drivers. If you are using a commercial version of Apache Spark, for example, Databricks, Hortonworks or Cloudera, etc. You should get these drivers from your Spark vendor.
How to use Spark JDBC/ODBC connections?
Now the third question. How to use these connections.I will give you a quick demo of JDBC connection. However, let me raise
another important point.
A lot of people don't prefer to use Spark JDBC/ODBC connections at all. I mean, most of the time, you don't even need these drivers. Why? Because instead of connecting to Spark, you would find a more efficient alternative. How? Let's try to understand that.
Apache Spark is more of a big data computation platform. It is designed to perform computations on a huge volume of data. However, Spark is not a good system for satisfying concurrent users and sub-second response time expectations. Your BI reports and your web-based applications are most likely to have a bunch of concurrent users. Most of the time, these users need a response back in seconds. Apache Spark is not good for that. So, we use Spark to perform computations and push the results into a different system. That may be an efficient RDBMS, Cassandra, or maybe something else. Once you do that, there is no need for your BI tool or the Application to connect to the Spark.
Well, that said, but you have the capability at your disposal. So, let's see a simple example.
I can’t show you the JDBC connection from Tableau due to licencing problems. However, we have beeline tool to test a JDBC connection. The beeline is a command line SQL tool that comes with a bundled JDBC driver for Spark, so we don’t need to install a driver. I will be using Beeline for this demo.
The first thing that we need is to configure and start thrift server. Google Dataproc cluster comes with a preconfigured thrift server. However, it is configured to connect to Hive database instead of Spark SQL. Let me show you.
Start Beeline. Now, you can connect to Thrift server using following JDBC connection URL.
beeline !connect jdbc:hive2://localhost:10000
It will ask you for the username and password. We haven’t configured any credentials, so you can simply press enter twice.
Great! It shows the message as connected to Apache Hive using Hive JDBC driver.
But we wanted to connect to Spark SQL. To fix this problem, I may have to install and configure Spark thrift server from scratch. To avoid all those unnecessary things, let me come back to my local mode spark VM and show you the JDBC connection.
We need to start the thrift server. You can start it using the following command.
--You can start the thrift server from your Spark Home sbin directory spark/spark-2.2.0-bin-hadoop2.7/sbin/start-thriftserver.sh
Great! The spark thrift server is up and running. Let’s start beeline and try a JDBC connection at the port 10000.
Good. You can see the message as connected to Spark SQL using Hive JDBC driver. Now you can execute any Spark SQL query from beeline.
Your BI tools like Tableau uses the same mechanism to connect to the Spark thrift server.
When to use Spark SQL Programmatic interface
The final and the most important option is the programmatic interface for Spark SQL. The programmatic interface is the most
effectual method, and it helps you to model most of your requirements. It is like a combined power
of SQL and the Scala programming language. If you worked in Oracle, you already know that. The PL/SQL
and the SQL together are much more powerful.
Using programmatic method, we primarily interact with the Spark SQL database objects using a variety of APIs. The most common API is the spark.sql. Let's quickly look at an example.
val df = spark.sql("""select age, count(*) from mysparkdb.surveys where age between 20 and 65 group by age""") df.show
In this example, we pass the Spark SQL string to the spark.SQL method. The SQL method executes that statement and returns the result as a Spark data frame. The SQL method allows you to run a DML as well as a DDL statement. That means you can query a table or a view. You can also create a new table or view. However, it is unlikely that you will be executing a create table statement using this method. You might be creating temporary Views. Let’s see an example.
val df = spark.sql("""create temporary view age_count as select age, count(*) from mysparkdb.surveys where age between 20 and 65 group by age""") df.show
The returned data frame df has nothing to show because we executed a DDL that doesn't return any data.
Your temporary view is ready to use. You can query that view in the next statement.
val df1 = spark.sql("""select * from age_count""") df1.show
It is a common practice to build an intermediate view and then use it for the next step in your transformation job. So, the SQL API allows you to execute Spark SQL and HiveQL from inside your Scala programs. There are few other APIs as well. I have listed some of them for your reference.
- DataFrame.createOrReplaceTempView (viewName: String)
- SparkSession.table (tableName:String)
- DataFrameWriter.insertInto (tableName: String)
- DataFrameWriter.saveAsTable (tableName: String)
We have already seen the
createOrReplaceTempView API. That API allows you to create a temporary view using a data
The next one gives you a data frame out of the given table. It is equivalent to a select * from a table.
The insert into will push the data frame to an existing table.
The save as table can write a data frame to Spark SQL database as a table. This API can be used to create a table as well as load the data using a single API call.
In the earlier video, we created a Parquet table using the Create table statement. But we couldn't load the data into that table. Now you know two methods to load the data into that table.Here is the code to load the data from a CSV source to the Parquet table using the insert into API.
val sch = spark.table("mysparkdb.spark_surveys").schema val df = spark.read .format("csv") .schema(sch) .option("header","true") .option("mode","failfast") .load("/home/prashant/survey.csv") spark.table("mysparkdb.spark_surveys").count df.write.insertInto("mysparkdb.spark_surveys") spark.table("mysparkdb.spark_surveys").count
Here is another alternative to loading the data from a CSV source to the Parquet table using the save as table API. In this example, we are using append mode to load the data into an existing table.
val sch = spark.table("mysparkdb.spark_surveys").schema val df = spark.read .format("csv") .schema(sch) .option("header","true") .option("mode","failfast") .load("/home/prashant/survey.csv") spark.table("mysparkdb.spark_surveys").count df.write .mode("append") .saveAsTable("mysparkdb.spark_surveys") spark.table("mysparkdb.spark_surveys").count
The above code assumes that the table already exists, and we don't want to overwrite the existing record. That is why we
are using the append mode. If you don't want to keep the existing record, you can truncate the old
TRUNCATE TABLE SQL statement and use the append mode to load the fresh data into the same
table. You can also use the
TRUNCATE TABLE SQL followed by the
insertInto API that I showed earlier. Both the methods are essentially same.
If you don't have the table and want to create the table as well as load the data as a single API call, you can do that as well using the saveAsTable API. Here is the code example.
val df = spark.read .format("csv") .option("header","true") .option("inferSchema","true") .option("nullValue","NA") .option("timestampFormat","yyyy-MM-dd'T'HH:mm:ss") .option("mode","failfast") .load("/home/prashant/survey.csv") df.write .format("parquet") .saveAsTable("mysparkdb.new_surveys") spark.table("mysparkdb.new_surveys").count
Great! You are working in a traditional data warehouse environment, or you are doing something in a modern data lake environment,
data engineering process is one of the most time-consuming and complex things. Half of the project
time goes in extracting the data from a source system, validating it, correcting and filtering the
data. Then another 25% of the time goes to perform a bunch of transformation to prepare and load
the data into a target table that is more suitable for the data science and machine learning algorithms
to work. Those things are not simple. You can't do all that using SQL. You will need a programmatic
interface and scripting language to accomplish all those things. And that's where the Spark data
frame API is used either in Scala or Python. However, there are a lot of things that are more convenient
to achieve using SQL. And that's where the ability to mix in data frame API and Spark SQL is amazingly
To get a more realistic sense of this notion, you might want to see a bigger and a realistic example. I mean, doing all these small and simple examples are good to learn the core capabilities and to understand the minute details of the working. However, to get a bigger picture and a notion of how these things work together in a real-life project, you need some end to end examples.
By the end of this training, I will include some micro-projects to cover few realistic end-to-end examples.
Great! I talked about all the four alternatives to interact with Spark SQL database. I also talked about different scenarios to help you choose the right tool. In the next video, we will cover few more things of Spark SQL.
Thank you for watching Learning Journal. Keep Learning and Keep Growing.