Why Delta Lake for Spark

In this first part, I will talk about the following question.
Why do we need something like Delta Lake?
As we discuss the answer, you will also learn following things.

  1. Some of the critical features that are missing from Apache Spark.
  2. What kind of problem it causes for a typical data lake implementation?

These problems also form the basis for a typical Apache Spark Job interview discussion. So, make sure you sail through the end of the video. And don't forget to subscribe and set notifications for more exciting stuff.
Great! Let's start.

Atomicity in Spark Writer APIs

We all know that Apache Spark is not ACID compliant. And I guess you also understand what does ACID means. However, let me refresh it quickly and talk about what it means for Apache Spark.
The A in ACID stands for Atomicity. Basically, it means, either all or nothing. So, when you are using spark data frame writer API, it should either write full data or nothing. Let's quickly look at the Spark documentation.

SQL Data Sources load save functions
Fig.1- SQL Data Sources load save functions

What does it say? It is important to realize that these save modes do not utilize any locking and are not atomic. That's a big problem. Isn't it?
That means a failed job might leave an incomplete file and ultimately corrupt your data.
Let's read the documentation further. Additionally, when performing an overwrite, the data will be deleted before writing out the new data. Oops! That's more dangerous. A failing job may remove the old one and corrupt the new one.
Well, the situation looks scary, but it is not that bad in reality. Why? Because Spark data frame writer API internally relies on a job level commit protocol to ensure some degree of Atomicity, i.e., all or nothing. Let me show you this with an example.
The following job will create a new csv file with 100 rows.

If you execute it and check the resultsin the test-1 directory, you will see one file with 100 rows. Right?
Now, let's start the second job to append another set of 100 records, but this job fails in the middle of the write operation.
Here is the code.

Execute it and check the results again in test-1 directory. Do you see any impact? Nothing. Right? You still have one file with same 100 rows.
What if the append completed successfully? Let's change the code a little and execute it again.

Execute it and check the results again in the test-1 directory. You have got another file with 100 records. So, now you have 200 records in this directory. It seems like Spark write is atomic because we saw all or nothing working here.
Why did this happen? Because, by default, Spark would use Hadoop's FileOutputCommitter version 1 algorithm to implement Job commit and abort. And as a result of this algorithm, we get some degree of Atomicity. Many Spark teams won't even notice this problem until they face Job failure caused by a runtime exception. Isn't it?
However, the FileOutputCommitter version 1 algorithm is too slow and profoundly impacts the write performance. If the Atomicity is not critical, you also have an option to use FileOutputCommitter version 2. Version 2 algorithm is much faster but does not guarantee all or nothing.
When you are using cloud storage, such as Amazon S3, the FileOutputCommitter version 1 performs much slower than HDFS. For that reason, Amazon EMR came up with EMRFS S3-optimized committer. You can get more details about it here.
Hortonworks also came up with multiple committers for Amazon S3. More information on this is here.
Databricks also offered its own solution in the name of DBIO. More details on this are available here.
So, the point is straightforward. Spark data frame writer APIs are not Atomic, but it behaves like an Atomic operation for append operation. However, the default implementation comes with a performance overhead, especially when using cloud storage instead of HDFS. So, if you are using cloud storage, checkout for the Job Commit algorithm offered by your cloud provider.

Data Consistency Problem in Spark

The next item in the ACID is Consistency. For a data processing system, consistency means a lot. However, at the most basic level, the consistency ensures that the data is always in the valid state. With that definition, now think about the overwrite mode of the Spark writer API. It deletes an old file and then creates a new one. These two operations are not transactional. That means, in between, there is a time when data does not exist. If your overwrite operation fails, you are going to lose all your data. Do you want to test it?
Let's do it. I am going to create another test-2.csv file with hundred rows in it.

Go and check the outcome in test-2 directory. You will have a file with 100 rows.
Now, let's execute a failing program that tries to overwrite the existing file but fails in the middle.

You should get an exception an exception. Go back and check the test-2 directory. Nothing would be there. So, you lost the old data and didn't even get the new data. This is a typical Atomicity problem with the Spark overwrite operation. And this problem also breaks the data consistency. What happens if you overwrite operation does not raise an exception and completes successfully? Will you still have the consistency problem?
Yes, because, there is a time in between the delete and write operation when your data is not in the valid state. And that's how Spark API lacks consistency.

Isolation and Durability in Spark

The next item in ACID is isolation. What does it mean? Here is the definition.
An operation in process and not yet committed must remain isolated from any other activities. What it means is simple. When you are writing a data set, another concurrent read/write on the same data set should not be impacted by your write operation. A typical database would offer different isolation levels, such as read committed and serializable. Do we have this feature in Spark?
No. Apache Spark does not have a strict notion of a commit. I mean, there are task-level commits, and finally, a job-level commit that Spark implements. But this implementation is broken due to lack of Atomicity in write operations. And hence, Spark is not offering isolation types even at an individual API level.
Finally, Durability. What it says is this. A Committed data is saved by the system such that, even in the event of a failure and system restart, the data is available in its correct state.
Well, the durability is offered by the storage layer, and we know HDFS and S3 are great in this. However, when Spark doesn't correctly implement the commit, then all the durability features offered by the storage goes for a toss.
So, what we concluded is this. Apache Spark does not offer ACID properties. Nothing new. We already knew it. But what I am trying to do is to show you some implications with simple examples. We cannot write a program which would never ever raise a runtime exception. Sooner or later, you will face it. And if that happens, you will be left with corrupt or lost data. Even if your program does not raise a runtime exception, you cannot read a dataset which a different process is concurrently overwriting. And that's a considerable limitation put on to us by the Apache Spark.
In my experience, I couldn't find an efficient and reliable solution for this problem except for some workarounds. In fact, even Spark creators could not find a suitable solution for this problem, and hence, they could not offer updates and deletes in Spark. If this ACID thing can be fixed, you will get Update, Delete, and Merge statements in Apache Spark. And that is what Delta Lake is all about.
There are a few other common problems.

  1. Schema Enforcement
  2. Too many small files in real-time scenarios
  3. Mulish data skipping using partitioning

Schema Enforcement Problem in Spark

Let me first explain the schema enforcement problem.
Let's assume you have a batch processing Job. It reads some data from an input location, performs a transformation, and writes it to the output location. And it repeats the same for the next batch of input data. That's what is a typical batch processing. Right?

Consume-Transform-Produce Pipeline in Spark
Fig.2- Consume-Transform-Produce Pipeline in Spark

Let's imitate the scenario using below code.

Check out the video for a working demo of this code. We execute the same code twice with two batches of data. The first batch works well but in the second batch, we get an exception. This thing is demonstrated in the video.
The demo demonstrates the scenario when your schema might change in the upcoming batches of the same data. If this happens, you will end up in a complex problem of investigating what caused the exception in the current failing job.
Schema-on-read is a great thing to implement schema evolution, but it causes a big mess when we do not validate if the tool supports this evolution or not. Apache Spark does not support evolving an Int column to a Double column. Who will do this validation? If I have to hand-code this thing myself, it's a big job. And believe me, I am not going to do this at all.
There are workarounds to this problem. but handling data issues are much easier when you stop the job even before putting corrupt data with useful data. But if you mix them once, you dropped yourself in the hell. Delta Lake takes care of this problem also.

Small File Problem in Spark

The next problem is the small file problem. Let's assume that you have a data ingestion job. This job triggers every hour and brings data to the data lake and saves it to some location. Looks like a usual thing. Right? But there are some minute details of this setup.

Ingestion Jobs creating small files
Fig.3- Ingestion Jobs creating small files

The ingestion job will append a new batch of data to the target location every hour. Right? Every append is at least one new file. Have you ever seen how big is the file? Yes, you are right. It depends upon how much data we collect in that one hour. It could be 100 GB, 1 GB, or even just a few MBs. You may have hundreds of such ingestion jobs in a large system, and all of them are creating hundreds of small 1 GB files every hour. Do you see the problem? Too many small files are neither efficiently handled by the storage systems nor it can be efficient for the Spark. Why?
Well, the Spark API would internally need to query the storage system to get the list of files for a particular table. A large number of files would slow down the listing operation. Spark would also need to open all of these files and close them after reading them. This will result in your jobs, spending a lot of time opening and closing files rather than reading data. A small file doesn't compress well due to reduced compression effectiveness. If you created a hive table or spark table, that means, you are storing metadata about the table which grows significantly for a large number of files. That will again slow down your job. So,a large number of small files are a real problem in big data systems, and there is no doubt about this question.

Now think about the real-time or even a near-real-time scenario. Your ingestion job that triggers every hour is not a good fit for the real-time requirements. You need to trigger it every minute. Even a minute could be too big for your use case. Right? But now, when you change the ingestion frequency to one minute, what would be the file size. Few KBs. Right? And that would exponentially aggregate to your small file problems.
And that is why we need something like Kafka to fix this problem. Bring data to Kafka, run your real-time pipelines directly from Kafka.
But you want to dump your data into data lake as well for and hourly batch processing. So, you can create a separate hourly sink job to push data to the lake. But you still have small files created hourly. How do you fix it?
A Compaction Job. Right? Read all small files and write it as one large file. Obvious solution. Isn't it? But what are you going to do with the lack of ACID properties? I mean, we often see this problem. Your data processing job should wait for the compaction job to complete. Why? Because the compaction needs writing large files and deleting small files and that operation is not transactional. So, it brings your system in an inconsistent state. You cannot even perform a reliable read operation during the compaction. Isn't it? This one is a real challenge that we often deal with, and Delta Lake offers a solution for this one as well. We will learn about that in the next video.

Partition in Apache Spark

Finally, the last item that I want to talk about is the mulish data skipping using partitioning. Some of the common questions in Spark interviews are related to performance optimization. How do you tune a Spark Job performance? Have you found a silver bullet? I haven't.
One of the common approaches is to apply effective data skipping. What does it mean?
When you have a filter condition in your query, Spark is going to read all the data, apply a filter, and throw away records if they do not qualify the filter condition. The question is, can we avoid reading some or all of those records that we are going to throw away. Databases implement it using secondary indexes, but we do not have that facility in Spark. Right?
We often handle it by creating partitions for commonly used columns. As a result of this, Spark will only read a limited number of partitions, and other partitions are skipped. And that's what we term as data skipping or partition pruning. But we all know, the partitioning approach is often not very useful. Why?
Because the partitioning works well with two types of columns.

  1. Chronological columns such as a date.
  2. Low cardinality columns such as country-code, or a combination of the columns such as country-code plus state-code.

But when the cardinality of your partition column is high, we end up in creating too many partitions and ultimately a lot of small files. And this would trigger the performance problems caused by the small files.
Even with the low cardinality partition columns, we will be reading all the data files in qualified partitions. We do not have a kind of indexing or at least statistics to skip some of the files in the eligible partitions. Databricks Delta Lake tries to address this problem as well.
Great! So now we understand the main problems that delta lake is there to address. And by solving these problems, they also provide some additional capabilities out of the box such as time travel and audit logs. We will learn more about delta lake in the next video and try to understand how delta lake address all these problems. See you in the next video.
Keep Learning and Keep growing.

Read More

By Prashant Pandey -

You will also like: