Apache Spark Foundation Course - Spark JDBC Data Sources and Sinks


Welcome back to Learning Journal. In the previous lesson, I talked about file-based data sources. I also promised to cover some more Spark data sources. To keep my promise, I intend to demonstrate three more Spark data sources.

  1. JDBC
  2. Cassandra
  3. MongoDB

Covering all of them in a single video is not possible. So, I planned to break them into three videos. In this video, I will talk about Spark JDBC connector. So, let's start.

Why do we need Spark JDBC Connector?

Spark JDBC connector is one of the most valuable connectors for two reasons.

  1. You can connect to a variety of databases. Some of the most popular options are Oracle, SQL Server, MySQL, and the PostgreSQL. In fact, you can connect to any database that offers SQL and supports a JDBC connectivity.
  2. The second and the most important reason to use JDBC data source is a use case that might need you to connect to an RDBMS.

Let's try to understand one of the use cases.

Spark JDBC Connector Use Case
Fig.1-Spark JDBC Connector Use Case


Assume that you are a media and information company that publishes content over various channels. Your customers are viewing that material, and you are collecting necessary data. You have millions of viewers, and hence you get a massive amount of data. It may be a couple of terabytes per month. Right? You ingest that data to your data lake and want to aggregate and summarize it. Then you want to build a set of dashboards for different stakeholders in your organization. The dashboarding application gives them an opportunity to slice and dice the results and gain some insight into what is performing well, and which themes are trending. Right?
However, you realized that the dashboarding is an independent application for two reasons.


  1. The output of your aggregation and summarization is sufficient enough to power that application.
  2. It also needs a bunch of other features. For example, user management, login credentials, user preferences, Save and download reports, share reports and collaborate with other colleagues, and many other things.

This kind of application makes more sense with an RDBMS as a backend. So, you decided to add an RDBMS. Now, it's apparent that you need to connect to RDBMS from Apache Spark and save your aggregation output in the RDBMS. That's it.That's what makes a Spark JDBC connector a critical thing. Isn't it?

Spark JDBC source and sink demo

I hope you understand that you might want to read something into Spark from an RDBMS and you might also need to write something back to a relational database. How to do it? That's what I am going to demonstrate. I will use PostgreSQL for this demo. However, it should work with any JDBC compliant database with some minor changes.

Creating a PostgreSQL Server in Cloud

I need a Postgres database for my demo. So, let's start a PostgreSQL VM in Google cloud.

  1. Start your GCP console.
  2. Go to cloud launcher.
  3. Search for PostgreSQL. You might see multiple options. Choose the one that is labelled as Google click to deploy.
  4. Your PostgreSQL usage is free, but you might incur some cost for the CPU and the memory.
  5. Click on the launch button. You might want to change the zone to the same zone as you have for your Spark VM.
  6. You might also want to change the machine type to a smaller configuration.
  7. Finally, click this deploy button. That’s it. GCP will launch your VM in less than a minute.
  8. You can SSH to your PostgreSQL server from the deployments page. Alternatively, you can come to your compute engine page and SSH to the VM.
  9. Once you complete your testing, you can delete your VM and the deployment.

Configure PostgreSQL server

Before I can use this PostgreSQL system, I need to setup few things.

  1. Create a PostgreSQL user
  2. Create a database
  3. Test PostgreSQLserver connection from a remote machine

Let's do that. The first step is to create a user. Use following commands to create a user for yourself.

                                    
    sudo su postgres
    createuser --pwprompt --interactive prashant
    exit                                        
                                

The first command changes the current OS user to the PostgreSQL server admin user. The second command is a PostgreSQLserver utility that creates a new user. The first option will prompt password for the new user. The second option will ask some other necessary questions. The last argument is my username.
Finally, exit to the regular user.
The next step is to create a database. Use the below command.

                                        
    createdb sparkDB                                        
                                    

Great! Now we need to test the connection from a remote machine. I want to make sure that I can connect to the PostgreSQL DB from a different VM. However, I need to install a PostgreSQL client tool for this testing. So, let's download and install a client tool.

  1. Go to www.postgresql.org
  2. Click downloads
  3. Select your OS Family
  4. Choose the latest PostgreSQL version
  5. Pick up the architecture
  6. Copy and execute the yum install command. That command will configure your repository.
  7. Finally, Install the client using the next yum command.

Great! Now we are ready to test the PostgreSQL remote connection. Use below command to connect to the remote Postgres database.

                                            
    psql --host 10.128.0.4 -U prashant --dbnamesparkDB --password                                        
                                        

You might want to change the IP address with your PostgreSQL server VMs IP address. You can get the IP address of your PostgreSQL server using below command.

                                                
    hostname -I                                       
                                            

Oops. Remote connection is not working. To fix this problem, you need to modify PostgreSQL hba_file. Let's do that.

                                                
    //Login to your Postgres server machine as Postgres admin user.
    sudo su portgres
    //Connect to the database and check hba file location. 
    psql
    show hba_file;
    //Open hba file and add below line at the end.
    vi /etc/postgresql/10/main/pg_hba.conf
    host    all     all             0.0.0.0/0          md5
    //Exit admin user
    exit
    //Restart PostgreSQL server
    sudo /etc/init.d/postgresql restart
    //Go back to the remote machine and test your connection.
    psql --host 10.128.0.4 -U prashant --dbnamesparkDB --password                                      
                                            

Great! It worked.
All of this is not at all required if you already have a PostgreSQL database server that allows connection from a remote machine. I had to do all this for the demo. However, all this exercise has nothing to do with Spark JDBC connectivity. All you need is following information.

  1. Your Posrtgre Server IP - 10.128.0.4
  2. Database Lister TCP/IP Port - 5432
  3. Database name - sparkDB

If you already have these three things, you can directly start your Spark shell and test your JDBC example.

How to Install Spark JDBC Connector

To use Spark JDBC connector, you need to download the JDBC connector jar and include it in your driver and executer class path. It sounds difficult however it is quite simple to achieve. Use below command to start the spark shell.

                                                    
    spark-shell --packages org.postgresql:postgresql:9.4.1207                                      
                                                

We need a JDBC driver package, and hence I am adding it to my spark classpath using --package option. I am going to connect to PostgreSQL, and hence I am adding PostgreSQL JDBC package. If you want to use Oracle or any other database, you should include the appropriate JDBC package for the respective database. As you already know, the --package option will automatically pull the dependencies from the maven repository and include it in your classpath.


Spark JDBC Sink Example

Great! We are ready to write data from Spark over a JDBC connection. Use following code.

                                                        
    //Read CSV into Data Frame
    val df = spark.read
                  .format("csv")
                  .option("header","true")
                  .option("inferSchema","true") 
                  .option("nullValue","NA")
                  .option("timestampFormat","yyyy-MM-dd'T'HH:mm:ss")
                  .option("mode","failfast")
                  .load("/home/prashant/spark-data/mental-health-in-tech-survey/survey.csv")
                                                                
    df.createOrReplaceTempView("survey_tbl")
                                                                
    val dfout = spark.sql("""select gender, sum(yes) sum_yes, sum(no) sum_no 
                            from (select case when lower(trim(gender)) in ('male','m','male-ish','maile','mal','male (cis)',
                                                                           'make','male ','man','msle','mail','malr','cis man',
                                                                           'cis male') then 'Male' 
                                              when lower(trim(gender)) in ('cis female','f','female','woman','femake','female ',
                                                                           'cis-female/femme','female (cis)','femail') then 'Female'
                                              else 'Transgender' 
                                         end as gender,
                                         case when treatment == 'Yes' then 1 else 0 end as yes,
                                         case when treatment == 'No' then 1 else 0 end as no
                                   from survey_tbl) 
                                   where gender != 'Transgender'
                                   group by gender""")                                         
                                                    

You already understand this code. We have used the same in an earlier video.The first part is reading data from a CSV source. Then we register that data frame as a temporary view. Then I perform some aggregation and create another data set using an SQL code. My output is available as a data frame.
Let me show you the content of this data frame.

                                                        
    dfout.show
    +------+-------+------+
    |gender|sum_yes|sum_no|
    +------+-------+------+
    |Female|    170|    77|
    |  Male|    450|   541|
    +------+-------+------+                                     
                                                    

Nice! Isn't it. This data frame is what I want to save into my RDBMS. Here is the code to write it to JDBC database.

                                                            
    //Write Data Frame to JDBC
    dfout.write
      .format("jdbc")
      .mode("overwrite")
      .option("driver", "org.postgresql.Driver")
      .option("url", "jdbc:postgresql://10.128.0.4:5432/sparkDB")
      .option("dbtable", "survey_results")
      .option("user", "prashant")
      .option("password","pandey")
      .save()                                     
                                                        

The write API structure is the same as we used for other file formats in the earlier video. Let me explain it quickly. We define the format as JDBC. Then we set five different options. The driver, JDBC URL, database table name, and the credentials. That's it. Execute the save method, and it goes to your database. You can go back to your PostgreSQL client tool and verify it.

                                                                
    psql --host 10.128.0.4 -U prashant --dbname sparkDB --password
    \d+ survey_results
    select * from survey_results;                                   
                                                            

Amazing! Isn't it. It creates the table and loads the data. However, I don't prefer this automatic create table approach. I prefer to create the table directly in database using database create table statement. That gives me more control over database design.

How to truncate and overwrite from Spark JDBC

However, you might be wondering, if the table already exists in the database, how will we truncate and write the data into the same table. If you already have a table in the database, you can use the overwrite mode with the truncate option.

                                                            
    //Write Data Frame to JDBC
    dfout.write
      .format("jdbc")
      .mode("overwrite")
      .option("truncate", "true")
      .option("driver", "org.postgresql.Driver")
      .option("url", "jdbc:postgresql://10.128.0.4:5432/sparkDB")
      .option("dbtable", "survey_results")
      .option("user", "prashant")
      .option("password","pandey")
      .save()                                     
                                                        

To use the truncate on an existing table, all you need to do is to set another option truncate as true. That’s it. And the API takes care of truncating the existing table and inserting all the data into the same table. The truncate and insert approach is reasonable because it allows you to keep database table structure intact along with all indexes.
Great! That's all about writing to PostgreSQL databases.

Spark JDBC Source Example

Reading from databases is as simple as reading from a file. The below example is to read a full table.

                                                                    
    //spark-shell --packages org.postgresql:postgresql:9.4.1207
    val pgDF_table = spark.read
                          .format("jdbc")
                          .option("driver", "org.postgresql.Driver")
                          .option("url", "jdbc:postgresql://10.128.0.4:5432/sparkDB")
                          .option("dbtable", "survey_results")
                          .option("user", "prashant")
                          .option("password","pandey")
                          .load()
    pgDF_table.show                                         
                                                                

How to use SQL in Spark JDBC

It is not necessary to read a full table. You might want to read some part of the data using an SQL statement. I have an example for that as well.

                                                                    
    //spark-shell --packages org.postgresql:postgresql:9.4.1207
    val pgDF_table = spark.read
                          .format("jdbc")
                          .option("driver", "org.postgresql.Driver")
                          .option("url", "jdbc:postgresql://10.128.0.4:5432/sparkDB")
                          .option("dbtable", "(select * from survey_results limit 1) as survey_results")
                          .option("user", "prashant")
                          .option("password","pandey")
                          .load()
    pgDF_table.show                                         
                                                                

Where to get more details

We used some of the conventional options for a JDBC connector. You can get the list of all available JDBC options in Spark documentation . I recommend that you go through these options at least once. We also have a JDBC shortcut API method as well. Check that out as well. It looks like the JDBC shortcut API offers some more flexibility to partition your data for a read operation.
Great! Thank you very much for watching Learning Journal. Keep learning and keep growing.


You will also like:


Pure Function benefits

Pure Functions are used heavily in functional programming. Learn Why?

Learning Journal

Apache Spark Introduction

What is Apache Spark and how it works? Learn Spark Architecture.

Learning Journal

Referential Transparency

Referential Transparency is an easy method to verify the purity of a function.

Learning Journal

Scala Variable length arguments

How do you create a variable length argument in Scala? Why would you need it?

Learning Journal

Free virtual machines

Get upto six free VMs in Google Cloud and learn Bigdata.

Learning Journal