Apache Spark Foundation Course - Cassandra Connector


Welcome back to Learning Journal. As promised earlier, in this video, I am going to talk about Apache Spark and Apache Cassandra connectivity. But before that, you might want to understand the use cases that require connecting Spark and Cassandra.

Why Apache Cassandra

Let's start with understanding the main strengths of Apache Cassandra.

  1. Apache Cassandra is an open source, distributed, NoSQL database.
  2. Apache Cassandra offers linear scalability, high availability, fault tolerance, and high speed read/write performance. The read/write speed over a huge volume is the most distinguishing feature of Cassandra as a database.
  3. Despite being a NoSQL database, it supports a query language (CQL) that is similar to SQL. That makes Cassandra easy to learn.
  4. Cassandra offers replication across multiple datacentres and cloud availability zones. That's another powerful feature. That means you can serve your global customer requests from their nearest datacentre.
  5. Cassandra data model is more suitable for large denormalized tables that we design to support a group of known queries. That means, if you have a requirement where you need to create huge tables, I mean a table to store hundreds of gigabytes or even terabytes, and then you want to fire a set of known queries on them, Cassandra is a great choice. However, there is a downside of Cassandra data model. You don't get relational features and ACID transactions. However, you can solve many big data requirements without those relational features.

When to use Apache Cassandra

In summary, Cassandra a good choice for two categories of use cases.

  1. Big data OLTP systems -
    These are the systems that require a high speed write performance and does not require relational features and ACID compliant transactions.
  2. Serving layer of Big data platforms -
    These are the systems that require a big data reporting layer for a known set of queries that we can serve using denormalized tables.

So, if you have any of these Big data use cases, you might need to implement read/write to Cassandra database.
How to connect Apache Spark with Cassandra? That's what I am going to demonstrate. I will need a Cassandra cluster for that demonstration.

How to Install Cassandra Cluster in GCP

Let's Install a three-node Cassandra cluster in Google Cloud. Follow these steps.


  1. Start your GCP console.
  2. Go to cloud launcher.
  3. Search for Cassandra.
  4. You might see a bunch of options, but I prefer to use the one that is marked as Google Click to deploy.
  5. You Cassandra usage is free, but Google will charge you for the CPU, Memory and disk space. But that's fine. I am using my free credits.
  6. Click to the launch button.
  7. You might want to change the zone to the same zone that you are using to create your Spark machine. I use the same zone to reduce the network latency. Different zones are in different datacentres, and any communication between different zone will experience unnecessary network latency.
  8. You might also want to choose a smaller machine type and reduce the default disk space. I don't need that many resources for my demo.
  9. Finally, click the deploy button. That's it. GCP will provision your three node Cassandra cluster in less than a minute.
  10. You can SSH to one of the Cassandra nodes from the deployment manager page. Alternatively, you can jump to your compute engine instance page and SSH to one of the Cassandra nodes.

Great! We are now ready to use Cassandra.

Prepare Cassandra to receive data from Spark

Before we start connecting to Cassandra, I want to clarify two things about Cassandra database.

  1. Cassandra has the concept of a Keyspace, which is similar to a database in relational databases. A Keyspace is a level where we define data partitioning and replication strategy. So, I don't recommend creating a Keyspace on the fly.
  2. Cassandra data model design is a conscious decision that completely depends upon the queries that you want to execute on that table. So, I don't recommend creating Cassandra tables on the fly. Most of the time, you will create a Cassandra table and populate the data from Spark.

Great!

  1. Connect to Cassandra using the cqlsh client.
                                
    cqlsh                                          
                            
  1. Execute a CQL statement to test if Cassandra cluster is up and running.
                                
    SELECT cluster_name, listen_address FROM system.local;                                         
                            

  1. Create a Keyspace using below CQL command.
                                
    create KEYSPACE sparkdb WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 3};                                  
                            
  1. Create a Cassandra table.
                                
    create table sparkdb.survey_results( gender text, sum_yes int, sum_no int, primary key (gender));                                 
                            
  1. Test the table.
                                
    select * from sparkdb.survey_results;                                 
                            

Great! Now we are ready to jump to your Apache Spark machine and try to connect Cassandra and load some data into this table.

How to write Spark data frame to Cassandra table

Start spark shell and add Cassandra connector package dependency to your classpath. I am using the latest connector as on date. The latest version of Spark uses Scala 2.11, and hence I am using the connector for Scala 2.11.

                                
    spark-shell --packages datastax:spark-cassandra-connector:2.0.1-s_2.11                                 
                            

The next step is to create a data frame that holds some data. Then we can write that data frame to a Cassandra table. Let's create a data frame. Use following code to create a data frame.

                                
    //Read CSV into Data Frame
    val df =spark.read
                 .format("csv")
                 .option("header","true")
                 .option("inferSchema","true")
                 .option("nullValue","NA")
                 .option("timestampFormat","yyyy-MM-dd'T'HH:mm:ss")
                 .option("mode","failfast")
                 .load("/home/prashant/spark-data/mental-health-in-tech-survey/survey.csv")
                                    
    df.createOrReplaceTempView("survey_tbl")
                                    
    val dfout=spark.sql("""select gender, sum(yes) sum_yes, sum(no) sum_no
                            from (select case when lower(trim(gender)) in ('male','m','male-ish','maile','mal','male (cis)',
                                                                            'make','male ','man','msle','mail','malr','cis man',
                                                                            'cis male') then 'Male' 
                                              when lower(trim(gender)) in ('cis female','f','female','woman','femake','female ',
                                                                            'cis-female/femme','female (cis)','femail') then 'Female'
                                              else 'Transgender' 
                                          end as gender,
                                          case when treatment == 'Yes' then 1 else 0 end as yes,
                                          case when treatment == 'No' then 1 else 0 end as no
                            from survey_tbl) 
                            where gender != 'Transgender'
                            group by gender""")                                   
                            

You already understand this code. Right? We used it in the earlier videos. If you look at the final data frame, the schema of my data frame complies with the schema of the target Cassandra table. That's a critical requirement. You might face a lot of issues if both the schemas are not same.
You can use the below code to save the data frame to Cassandra. The code is similar to what we have been using in this tutorial. Specify the format as Cassandra. You can use the overwrite mode, but you also have to confirm the truncation. You must specify your Cassandra host, port, Keyspace, and the target table name. That's it. Execute the same method and your data moves to Cassandra.

                                
    dfout.write
         .format("org.apache.spark.sql.cassandra")
         .mode("overwrite")
         .option("confirm.truncate","true")
         .option("spark.cassandra.connection.host","10.142.0.3")
         .option("spark.cassandra.connection.port","9042")
         .option("keyspace","sparkdb")
         .option("table","survey_results")
         .save()                                         
                            

You can go back to your CQL client and verify the data load.

How to configure Spark Cassandra connection parameters

If you think that specifying Cassandra host and port for every table write operation is not a good thing to do. You can specify these settings at the Spark session level. Use the below code.

                                
    import org.apache.spark.sql.cassandra._
    spark.setCassandraConf(Map( "spark.cassandra.connection.host" -> "10.142.0.3", 
                                "spark.cassandra.connection.port" -> "9042"))  
                            

Once you set these settings at the session level, you can save the data frame without these settings. You can test it using the following code.

                                
    dfout.write
         .format("org.apache.spark.sql.cassandra")
         .mode("overwrite")
         .option("confirm.truncate","true")
         .option("keyspace","sparkdb")
         .option("table","survey_results")
         .save()                                         
                            

In fact, you can pass these configurations from the command line as well using --conf option. There is no need to hardcode the connection parameters in your program.

How to read Cassandra table in Spark

Reading from Cassandra table is again as simple as reading from any other data source. You can use the following code to test the read operation.

                                
    val df_read = spark.read
                       .format("org.apache.spark.sql.cassandra")
                       .option("spark.cassandra.connection.host","10.142.0.3")
                       .option("spark.cassandra.connection.port","9042")
                       .option("keyspace","sparkdb")
                       .option("table","survey_results")
                       .load()
    df_read.show                                            
                            

Great! That completes the Cassandra connector demonstration.
However, Cassandra is a powerful database. It offers many features and various other configuration opportunities. It may not be possible to cover all those things here. However, you can refer to the Cassandra Connector documentation. It is freely available on GitHub.

  1. Main Spark Cassandra connector page.
  2. List of general Cassandra connector configurations.
  3. List of data frame configurations.

Other than standard data frame APIs, the Cassandra connector offers a bunch of other shortcut APIs and utility methods. If you are interested, you can find those as well in the documentation.
Great! Thank you very much for watching Learning Journal. Keep learning and keep growing.


You will also like:


Pattern Matching

Scala takes the credit to bring pattern matching to the center.

Learning Journal

What is a closure?

A closure is a function. Like any other Scala function, a Closure may be pure or impure.

Learning Journal

Hadoop Security

Hadoop security implementation using Kerberos.

Learning Journal

Statements and Expressions

Statements and Expressions in Scala. How are they different?

Learning Journal

Lazy Evaluations

Evaluate the expression now vs evaluate it for the first use. Strict vs Lazy?

Learning Journal