Why Delta Lake for Spark
In this first part, I will talk about the following question.
Why do we need something like Delta Lake?
As we discuss the answer, you will also learn following things.
- Some of the critical features that are missing from Apache Spark.
- What kind of problem it causes for a typical data lake implementation?
These problems also form the basis for a typical Apache Spark Job interview discussion. So,
make sure you sail through the end of the video. And don't forget to subscribe and set
notifications for more exciting stuff.
Great! Let's start.
Atomicity in Spark Writer APIs
We all know that Apache Spark is not ACID compliant. And I guess you also understand what
does ACID means. However, let me refresh it quickly and talk about what it means for Apache
Spark.
The A in ACID stands for Atomicity. Basically, it means, either all or nothing. So, when
you are using spark data frame writer API, it should either write full data or nothing.
Let's quickly look at the Spark documentation.
What does it say? It is important to realize that these save modes do not utilize any
locking and are not atomic. That's a big problem. Isn't it?
That means a failed job might leave an incomplete file and ultimately corrupt your data.
Let's read the documentation further. Additionally, when performing an overwrite, the
data will be deleted before writing out the new data. Oops! That's more dangerous. A failing
job may remove the old one and corrupt the new one.
Well, the situation looks scary, but it is not that bad in reality. Why? Because Spark
data frame writer API internally relies on a job level commit protocol to ensure some degree
of Atomicity, i.e., all or nothing. Let me show you this with an example.
The following job will create a new csv file with 100 rows.
If you execute it and check the resultsin the test-1 directory, you will see one file with
100 rows. Right?
Now, let's start the second job to append another set of 100 records, but this job fails
in the middle of the write operation.
Here is the code.
Execute it and check the results again in test-1 directory. Do you see any impact? Nothing.
Right? You still have one file with same 100 rows.
What if the append completed successfully? Let's change the code a little and execute it
again.
Execute it and check the results again in the test-1 directory. You have got another file
with 100 records. So, now you have 200 records in this directory. It seems like Spark write
is atomic because we saw all or nothing working here.
Why did this happen? Because, by default, Spark would use Hadoop's FileOutputCommitter
version 1 algorithm to implement Job commit and abort. And as a result of this algorithm, we
get some degree of Atomicity. Many Spark teams won't even notice this problem until they
face Job failure caused by a runtime exception. Isn't it?
However, the FileOutputCommitter version 1 algorithm is too slow and profoundly impacts
the write performance. If the Atomicity is not critical, you also have an option to use
FileOutputCommitter version 2. Version 2 algorithm is much faster but does not guarantee all
or nothing.
When you are using cloud storage, such as Amazon S3, the FileOutputCommitter version 1
performs much slower than HDFS. For that reason, Amazon EMR came up with EMRFS S3-optimized
committer. You can get more details about it here.
Hortonworks also came up with multiple committers for Amazon S3. More information on
this is here.
Databricks also offered its own solution in the name of DBIO. More details on this are
available
here.
So, the point is straightforward. Spark data frame writer APIs are not Atomic, but it
behaves like an Atomic operation for append operation. However, the default implementation
comes with a performance overhead, especially when using cloud storage instead of HDFS. So,
if you are using cloud storage, checkout for the Job Commit algorithm offered by your cloud
provider.
Data Consistency Problem in Spark
The next item in the ACID is Consistency. For a data processing system, consistency means a
lot. However, at the most basic level, the consistency ensures that the data is always in
the valid state. With that definition, now think about the overwrite mode of the Spark
writer API. It deletes an old file and then creates a new one. These two operations are not
transactional. That means, in between, there is a time when data does not exist. If your
overwrite operation fails, you are going to lose all your data. Do you want to test it?
Let's do it. I am going to create another test-2.csv file with hundred rows in it.
Go and check the outcome in test-2 directory. You will have a file with 100 rows.
Now, let's execute a failing program that tries to overwrite the existing file but fails
in the middle.
You should get an exception an exception. Go back and check the test-2 directory. Nothing
would be there. So, you lost the old data and didn't even get the new data. This is a
typical Atomicity problem with the Spark overwrite operation. And this problem also breaks
the data consistency. What happens if you overwrite operation does not raise an exception
and completes successfully? Will you still have the consistency problem?
Yes, because, there is a time in between the delete and write operation when your data
is not in the valid state. And that's how Spark API lacks consistency.
Isolation and Durability in Spark
The next item in ACID is isolation. What does it mean? Here is the definition.
An operation in process and not yet committed must remain isolated from any other
activities. What it means is simple. When you are writing a data set, another concurrent
read/write on the same data set should not be impacted by your write operation. A typical
database would offer different isolation levels, such as read committed and serializable. Do
we have this feature in Spark?
No. Apache Spark does not have a strict notion of a commit. I mean, there are task-level
commits, and finally, a job-level commit that Spark implements. But this implementation is
broken due to lack of Atomicity in write operations. And hence, Spark is not offering
isolation types even at an individual API level.
Finally, Durability. What it says is this. A Committed data is saved by the system such
that, even in the event of a failure and system restart, the data is available in its
correct state.
Well, the durability is offered by the storage layer, and we know HDFS and S3 are great
in this. However, when Spark doesn't correctly implement the commit, then all the durability
features offered by the storage goes for a toss.
So, what we concluded is this. Apache Spark does not offer ACID properties. Nothing new.
We already knew it. But what I am trying to do is to show you some implications with simple
examples. We cannot write a program which would never ever raise a runtime exception. Sooner
or later, you will face it. And if that happens, you will be left with corrupt or lost data.
Even if your program does not raise a runtime exception, you cannot read a dataset which a
different process is concurrently overwriting. And that's a considerable limitation put on
to us by the Apache Spark.
In my experience, I couldn't find an efficient and reliable solution for this problem
except for some workarounds. In fact, even Spark creators could not find a suitable solution
for this problem, and hence, they could not offer updates and deletes in Spark. If this ACID
thing can be fixed, you will get Update, Delete, and Merge statements in Apache Spark. And
that is what Delta Lake is all about.
There are a few other common problems.
- Schema Enforcement
- Too many small files in real-time scenarios
- Mulish data skipping using partitioning
Schema Enforcement Problem in Spark
Let me first explain the schema enforcement problem.
Let's assume you have a batch processing Job. It reads some data from an input location,
performs a transformation, and writes it to the output location. And it repeats the same for
the next batch of input data. That's what is a typical batch processing. Right?
Let's imitate the scenario using below code.
Check out the video for a working demo of this code. We execute the same code twice with two
batches of data. The first batch works well but in the second batch, we get an exception.
This thing is demonstrated in the video.
The demo demonstrates the scenario when your schema might change in the upcoming batches
of the same data. If this happens, you will end up in a complex problem of investigating
what caused the exception in the current failing job.
Schema-on-read is a great thing to implement schema evolution, but it causes a big mess
when we do not validate if the tool supports this evolution or not. Apache Spark does not
support evolving an Int column to a Double column. Who will do this validation? If I have to
hand-code this thing myself, it's a big job. And believe me, I am not going to do this at
all.
There are workarounds to this problem. but handling data issues are much easier when you
stop the job even before putting corrupt data with useful data. But if you mix them once,
you dropped yourself in the hell. Delta Lake takes care of this problem also.
Small File Problem in Spark
The next problem is the small file problem. Let's assume that you have a data ingestion job. This job triggers every hour and brings data to the data lake and saves it to some location. Looks like a usual thing. Right? But there are some minute details of this setup.
The ingestion job will append a new batch of data to the target location every hour. Right?
Every append is at least one new file. Have you ever seen how big is the file? Yes, you are
right. It depends upon how much data we collect in that one hour. It could be 100 GB, 1 GB,
or even just a few MBs. You may have hundreds of such ingestion jobs in a large system, and
all of them are creating hundreds of small 1 GB files every hour. Do you see the problem?
Too many small files are neither efficiently handled by the storage systems nor it can be
efficient for the Spark. Why?
Well, the Spark API would internally need to query the storage system to get the list of
files for a particular table. A large number of files would slow down the listing operation.
Spark would also need to open all of these files and close them after reading them. This
will result in your jobs, spending a lot of time opening and closing files rather than
reading data. A small file doesn't compress well due to reduced compression effectiveness.
If you created a hive table or spark table, that means, you are storing metadata about the
table which grows significantly for a large number of files. That will again slow down your
job. So,a large number of small files are a real problem in big data systems, and there is
no doubt about this question.
Now think about the real-time or even a near-real-time scenario. Your ingestion job that
triggers every hour is not a good fit for the real-time requirements. You need to trigger it
every minute. Even a minute could be too big for your use case. Right? But now, when you
change the ingestion frequency to one minute, what would be the file size. Few KBs. Right?
And that would exponentially aggregate to your small file problems.
And that is why we need something like Kafka to fix this problem. Bring data to Kafka,
run your real-time pipelines directly from Kafka.
But you want to dump your data into data lake as well for and hourly batch processing.
So, you can create a separate hourly sink job to push data to the lake. But you still have
small files created hourly. How do you fix it?
A Compaction Job. Right? Read all small files and write it as one large file. Obvious
solution. Isn't it? But what are you going to do with the lack of ACID properties? I mean,
we often see this problem. Your data processing job should wait for the compaction job to
complete. Why? Because the compaction needs writing large files and deleting small files and
that operation is not transactional. So, it brings your system in an inconsistent state. You
cannot even perform a reliable read operation during the compaction. Isn't it? This one is a
real challenge that we often deal with, and Delta Lake offers a solution for this one as
well. We will learn about that in the next video.
Partition in Apache Spark
Finally, the last item that I want to talk about is the mulish data skipping using
partitioning. Some of the common questions in Spark interviews are related to performance
optimization. How do you tune a Spark Job performance? Have you found a silver bullet? I
haven't.
One of the common approaches is to apply effective data skipping. What does it mean?
When you have a filter condition in your query, Spark is going to read all the data,
apply a filter, and throw away records if they do not qualify the filter condition. The
question is, can we avoid reading some or all of those records that we are going to throw
away. Databases implement it using secondary indexes, but we do not have that facility in
Spark. Right?
We often handle it by creating partitions for commonly used columns. As a result of
this, Spark will only read a limited number of partitions, and other partitions are skipped.
And that's what we term as data skipping or partition pruning. But we all know, the
partitioning approach is often not very useful. Why?
Because the partitioning works well with two types of columns.
- Chronological columns such as a date.
- Low cardinality columns such as country-code, or a combination of the columns such as country-code plus state-code.
But when the cardinality of your partition column is high, we end up in creating too many
partitions and ultimately a lot of small files. And this would trigger the performance
problems caused by the small files.
Even with the low cardinality partition columns, we will be reading all the data files
in qualified partitions. We do not have a kind of indexing or at least statistics to skip
some of the files in the eligible partitions. Databricks Delta Lake tries to address this
problem as well.
Great! So now we understand the main problems that delta lake is there to address. And
by solving these problems, they also provide some additional capabilities out of the box
such as time travel and audit logs. We will learn more about delta lake in the next video
and try to understand how delta lake address all these problems. See you in the next video.
Keep Learning and Keep growing.