Hello and welcome back to learning Journal. I have already talked about loading data from a CSV source file. In this video, we will extend the same idea and explore some other commonly used data sources. We will not only load but also explore the process of writing data to a variety of file formats. We will primarily work with five different types of data files.
In this video, we will restrict ourselves to work with data files. However, this video will give you the core concepts of
working with a source system. Once you have these core concepts, you will be able to work with a
variety of other third-party data sources. We will also explore a couple of third-party sources in
the next video to help you extend your learnings to virtually any other Spark source and destination.
Great! Let's start.
The first thing to understand is the general structure of the read and write APIs. To read data from a source, you need a reader. That is where we have a DataFrameReader interface. Similarly, to write data, you need a writer. That is where we have a DataFrameWriter interface. These two interfaces are a kind of Spark standard for working with any source system. There are few exceptions to this rule for an appropriate reason. However, if the source system can offer you a clean row-column structure, you should get these two interfaces. There is one crucial point to note here. I am not talking about reading something which doesn't qualify to have a schema. If the data is structured or semi structured where you can define a row-column schema, you should be using a DataFrameReader and a DataFrameWriter. Every standard Spark 2.x connector follows this convention. You want to connect to Cassandra, RedShift, Elastic search, or even sales force they all offer you a DataFrameReader and a DataFrameWriter.
When you don't have a schema, or the data source provider doesn't offer a Spark 2.x connector, you will fall back to RDDs, and we will talk about reading RDDs in a later video.
How to readfrom Spark data sources
We have already used DataFrameReader for CSV. So, you already know the general structure. For a typical reader, there are four methods.
We have already seen these in action for CSV. The format method takes a string to define the data source. It could be one of the following values.
The first three formats are part of the Spark Core packages. However, other two are still separate, and hence if you want
to use them, you must manage the dependency. Similarly, for other connectors, you should check out
their respective documents.
The next one is the option method. We have already used options for CSV. You can pass a key-value pair to the option method. And you can add as many option methods as you want. Right? The number of valid options and allowed key-value pair depends on the connector. We have seen a long list of CSV options, and similarly, you can get a list of supported options for the first three items (Parquet, JSON, ORC) in the Spark documentation. For other connectors, you need to check their respective documents.
The next method is the schema method. We already learned that we could infer the schema from the source system or we can use this method. There are two alternatives to specify a custom schema. We have already seen the StructType while working with CSV. You also have another simple choice. Specify a DDL like string.
spark.read.schema("a INT, b STRING, c DOUBLE")
Finally, you have the load method. In case of a file-based data source, you can pass a directory location or a file name to the load method. For other sources like JDBC, there is no file. So you would call it without any argument.
How to handle malformed records
So far so good. But what happens if you get a malformed record. That's a natural condition. When you are reading hundreds
of thousands of rows, some of them may not confine to the schema. Those are incorrect records. What
do you want to do with those?
Spark DataFrameReader allows you to set an option called mode. We have seen that as well for the CSV example. You can set one of the three values.
The default value is permissive. If you set the mode to permissive and encounter a malformed record, the
will set all the column values to null and push the entire row into a string column called
_corrupt_record. This corrupt record column allows you to dump those records in a separate
file. You can investigate and fix those records at the later stage. The
dropMalformed option will quietly drop the malformed records. You won't even notice that
some rows were not accurate.
And the failFast method raises an exception.
Great! I just want to make the last point about the readers.
While working with CSV, we have used the DataFrameReader method, and we also used a simple CSV method. The CSV method is a shortcut method for the DataFrame reader. While learning about other connectors, you might also find some other shortcut methods for those connectors. You can use them if you want to do that. However, I recommend you to follow the standard techniques. That makes your code more readable for an extensive team.
How to write Spark data frames
Let's quickly cover the
DataFrameWriter and then we will look at some examples.
The concept of writing data back to the source system is fundamentally same as the reading data. You need a writer, and hence we have a DataFrameWriter interface. For a typical writer, there are four methods.
The format method and the option method are like the
DataFrameReader. You need to check the documentation for the available options.
The save method is like a load method. It takes the output pathname and dumps the data at the specified location.
How to handle file already exist condition
However, while working with a file-based destination, you may encounter a file already exists situation. To handle that scenario, we have a mode method that takes one of the following options.
All the values are obvious. The third option will raise an exception, and the last option will keep quiet. I mean, if the file already exists, just ignore without raising any error.
Additional Spark Data frame write options
If you are working with file-based destination system, you also have three additional methods.
partitionBy method allows you to partition your output file. And the
bucketBy method allows you to create a finite number of data buckets. Both methods follow
the hive partitioning and bucketing principals. I hope you are already familiar with partition and
bucket concept. If not, write a comment below, and I will share some relevant content.
These extra methods may not be available with other connectors. However, you need to check their respective documents.
Spark Data Source Examples
Great! That's all about theory. All this knowledge is useless until you develop some skills around it. And you can convert
your knowledge into skills by writing some code. So, let's create a simple example and write some
I am going to do following things.
- Read CSV -> Write Parquet
- Read Parquet -> Write JSON
- Read JSON -> Write ORC
- Read ORC -> Write XML
- Read XML -> Write AVRO
- Read AVRO -> Write CSV
By doing these simple exercises, we will be able to learn all the file formats that I talked in this lesson. So, let’s start.
Working with CSV in Apache Spark
Here is the code to read a CSV and write into a Parquet format.
//Read CSV into Data Frame val df = spark.read .format("csv") .option("header","true") .option("inferSchema","true") .option("nullValue","NA") .option("timestampFormat","yyyy-MM-dd'T'HH:mm:ss") .option("mode","failfast") .load("/home/prashant/spark-data/mental-health-in-tech-survey/survey.csv") //Write Data Frame to Parquet df.write .format("parquet") .mode("overwrite") .save("/home/prashant/spark-data/mental-health-in-tech-survey/parquet-data/")
I have already explained the CSV reading in an earlier video. So, let's focus on the writing part. We start by calling a write method on a data frame. The write method returns a dataFrameWriter object. And then, everything else is simple. We call necessary dataFrameWriter methods. I hope you are already familiar with the parquet file format and its advantages. Since parquet is a well-defined file format, we don't have many options as we had in CSV. In fact, parquet is the default file format for Apache Spark data frames. If you don't specify this format, the data frame will assume it to be parquet. We are setting the mode as overwrite. So, if you already have some data, you will lose that, and spark will refresh the directory with the new file.
Working with JSON in Apache Spark
Let's move on to the next example. This time, we read parquet and write JSON.
//Read Parquet into Data Frame val df = spark.read .format("parquet") .option("mode","failfast") .load("/home/prashant/spark-data/mental-health-in-tech-survey/parquet-data/") //Write Data Frame to JSON df.write .format("json") .option("timestampFormat","yyyy-MM-dd HH:mm:ss") .mode("overwrite") .save("/home/prashant/spark-data/mental-health-in-tech-survey/json-data/")
The code is straightforward. In fact, it follows the same structure as the earlier example. Let me execute the code. Here is my JSON file. You pick one record and compare it with your original CSV. In these examples, we simply read data in one format and write it back in another format. We are not applying any transformations. However, in your real project, you will read data from a source, apply a series of transformations and then save it back in same or a new format. And you can do that easily as long as you have a data frame as your final result. We are also doing the same thing, saving a data frame. Right?
Working with ORC in Apache Spark
Here is an example to read a JSON and write it back to an ORC format. There is nothing new for me to explain here.
//Read JSON into Data Frame val df = spark.read .format("json") .option("timestampFormat","yyyy-MM-dd HH:mm:ss") .option("mode","failfast") .load("/home/prashant/spark-data/mental-health-in-tech-survey/json-data/") //Write Data Frame to ORC df.write .format("orc") .mode("overwrite") .save("/home/prashant/spark-data/mental-health-in-tech-survey/orc-data/")
Working with XML in Apache Spark
The next example is to read from ORC and write it to XML. The XML connector is not part of the Spark distribution. It is
an opensource connector that Databricks has created and certified. Similarly, our next example would
be using AVRO. That's again a Databricks certified connector but not available as part of the Spark
distribution. You can get the documentation and other details about these connectors on GitHub.
To use these connectors in the Spark Shell, you must include these dependencies in your Spark shell. The method is already given in their respective documentations. Start the Spark Shell using following command.
spark-shell --packages com.databricks:spark-xml_2.11:0.4.1,com.databricks:spark-avro_2.11:4.0.0
This command specifies the group id, artefact id, and the version. That's it. And the Spark dependency manager will pull the required Jar from maven repository and include it in your class path.
//Read ORC into Data Frame val df = spark.read .format("orc") .option("mode","failfast") .load("/home/prashant/spark-data/mental-health-in-tech-survey/orc-data/") //Write Data Frame to XML df.write .format("com.databricks.spark.xml") .option("rootTag","survey") .option("rowTag","survey-row") .mode("overwrite") .save("/home/prashant/spark-data/mental-health-in-tech-survey/xml-data/")
Working with AVRO in Apache Spark
The code example below reads an XML file into Apache Spark data frame and writes it back to AVRO file.
//Read XML into Data Frame val df = spark.read .format("com.databricks.spark.xml") .option("rowTag","survey-row") .option("mode","failfast") .load("/home/prashant/spark-data/mental-health-in-tech-survey/xml-data/") //Write Data Frame to AVRO df.write .format("com.databricks.spark.avro") .mode("overwrite") .save("/home/prashant/spark-data/mental-health-in-tech-survey/avro-data/")
The code example below reads an AVRO file into Apache Spark data frame and writes it back to CSV file.
//Read AVRO into Data Frame val df = spark.read .format("com.databricks.spark.avro") .option("mode","failfast") .load("/home/prashant/spark-data/mental-health-in-tech-survey/avro-data/") //Write Data Frame to CSV df.write .format("csv") .option("header","true") .option("nullValue","NA") .option("timestampFormat","yyyy-MM-dd HH:mm:ss") .mode("overwrite") .save("/home/prashant/spark-data/mental-health-in-tech-survey/csv-data/")
Thank you for watching Learning Journal. Keep learning and keep growing.