Hello and welcome to Kafka tutorials at Learning Journal. We have learned almost every concept associated with Kafka producers
and consumers. We learned Java APIs for Kafka and created several examples to put them to use. In
this session, we will cover a suitable method to handle schema evolution in Apache Kafka.
Schema evolution is all about dealing with changes in your message record over time. Let's assume
that you are collecting clickstream and your original schema for each click is something like this.
- Session id - An identifier for session
- Browser - An identifier for the browser
- Campaign - A custom identifier for a running campaign
- Channel - A custom identifier for the section of the site
- Referrer - A first hit referrer (ex - facebook.com)
- IP - An IP address from your ISP
So, you have producers that are sending this data to Kafka, and you have consumers that are reading this data from Kafka
and doing some analysis. We have already seen this kind of example in one of my earlier videos. If
you missed that, go back and watch custom serializer and deserializer. I am sure you can deliver
a solution to this requirement by implementing a custom serializer and a deserializer.
You had this system in place for few months, and later you decided to upgrade your schema to
something like this.
- Session id - An identifier for session
- Browser - An identifier for the browser
- Campaign - A custom identifier for a running campaign
- Channel - A custom identifier for the section of the site
- Entry URL - A first hit referrer URL
- IP - An IP address from your ISP
- Language - An identifier for the language
- OS ID - An identifier for the operating system
The problem starts now. If you are changing your schema, you need to create new producers because you want to send some new fields. Right? But I have two more questions.
- Do I need to change all current producers?
I mean, I am fine to create new producers to include additional fields. But I don't want the system to break if I am not upgrading all of them. - Do I need to change my existing consumers?
Again, I am happy to create some new consumers to work with newly added fields. But my current consumers are doing good, and they have nothing to do with new attributes. I don't want to change them.
So, I don't want to change my current producers and consumers because that will be too much of work. So, what do I want?
In fact, I want to support both old and new schema simultaneously. Can I support both versions of
schemas?
In a standard case, if you change your schema, you have to change everything, your producers,
consumers, serializer, and deserializer. The problem doesn't end there. After making these changes,
you can't read old messages because you changed the code and any attempt to read old messages using
new code will raise an exception.
In summary, I need to have a combination of old and new producers as well as a mix of old and
new consumers. Kafka should be able to store both types of messages on the same topic. Consumers
should be able to read both types of messages without any error.
That's what is a typical schema evolution problem. How do you handle this schema evolution problem
in Apache Kafka?
But do you know, how do we take care of it in other systems?
The industry solution to handling schema evolution is to include schema information with the
data. So, when someone is writing data, they write schema and data both. And when someone wants to
read that data, they first read schema and then read data based on the schema. If we follow this
approach, we can keep changing schema as frequently as required without worrying to change our code
because we are always reading schema before reading a message.
There are pre-built and reusable serialization systems to help us and simplify the whole process
of translating messages according to schemas and embedding schema information in the message record.
Avro is one of them. It is the most popular serialization system for Hadoop and its ecosystem.
Apache Avro for Kafka
Kafka follows the same approach and uses Avro to handle schema evolution problem. Let me give you a brief introduction to
Avro and them we will create an example to see how all of this works in Kafka.
But, before that, you may need to download
Avro
tools
because we will be using it in the upcoming example. If the direct URL is not working for any
reason, you can go to the
Avro release page and
follow the download link from there.
Ok, we need to understand how Avro works. Then we will explore how to implement it in Kafka.
Avro is a data serialization system, and it offers you four things.
- Allows you to define a schema for your data.
- Generates code for your schema. (Optional)
- Provide APIs to serialize your data according to the schema and embed schema information in the data.
- Provide APIs to extract schema information and deserialize your data based on the schema.
That's it. That's all a serialization system can give you. Let's see how it works. We will follow the four steps listed above.
If you have some data, and you want to serialize it, the first thing is to define a schema for
your data. Avro schemas are defined using JSON. So, let's define a schema for our clickstream record.
So, here is my schema file.
File name:- ClickRecordV1.avsc
The name of my schema is ClickRecord, and there are six items. The first field name is session_id, and the data type is string.
Session id and channel are mandatory. However, the browser, campaign and IP address can have a null
value. The referrer has a default value none.
It is a simple schema. However, Avro supports many other complex data types. For more details,
you can refer to
Avro specification
Let's come back to the second step. Generate code for your schema. This step is simple. We will
use Avro tool to generate code. Place your Avro schema file and the Avro tool in the same directory
and execute the Avro tool using below command.
To execute Avro tool,give it a command to compile schema. Then give the schema file name and the destination directory. Hit
enter and Done. This command will generate java file in the same directory. That's your source code
generated by Avro tool. If you quickly look at the generated Java code, you should realize that it
creates a Java class for
ClickRecord with get and set methods for each field. That’s it. We finished step 2.
Let's come back to step 3 and 4.Avro provides API for serializing your data. So, you can use
ClickRecord
class generated in step 2 to create your data objects. And then you can use Avro APIs as stated in step 3 to serialize them
and send to a Kafka broker. That's what a Kafka producer and a serializer do. So, at the producer
end, I have to create a
ClickRecord object and use a Avro serializer.
Similarly, on the receiving end, I can use Avro APIs as stated in step 4 to deserialize those
messages back to
ClickRecord objects. That's what a deserializer does. So, at the receiver end, I have to
use aAvro deserializer that extracts schema information from the message and use Avro APIs to deserialize
my data back to the
ClickRecord object.
The point I want to make is that everything ultimately goes to an
AvroSerializer and an
AvroDeserializer. Producers and Consumers will only use the generated class to create data
objects. Serializer and Deserializer take care of rest. So, Let me redefine these four steps regarding
Kafka implementation.
- Define an Avro Schema for your message record.
- Generate a source code for your Avro Schema.
- Create a producer and use KafkaAvroSerializer.
- Create a consumer and use KafkaAvroDeserializer.
Kafka Avro Producer
The whole process of sending Kafka messages is quite straight forward. KafkaAvroSerializer and KafkaAvroDeserializer take all the complications away from a developer. You don't have to learn and use Avro APIs to serialize and deserialize your data. All that is already done and bundled into Kafka-Avro-Serializer module developed by the confluent team. Impressive! Isn't it? Now, let's look at the code for a producer.
The code looks familiar, right. We have a topic
AvroClicks, and we want to send our
ClickRecords to this Topic. Like every other producer, we create three mandatory properties
and set appropriate values. Our message keys are going to be a string, so we configure the key serializer
class as
StringSerializer. Our message values are going to be according to Avro Schema, so we set
the value serializer class as
KafkaAvroSerializer. The confluent team developed this class, and it takes care of all the
serialization activity. It protects us from learning Avro APIs and takes care of all the complexities
of serialization work.
The next property is schema registry. This one is new. It's a new component developed by the
confluent team. You might be wondering about the purpose of this tool.
Kafka Schema Registry
So far, we learned that
KafkaAvroSerializer would take care of all the serialization work at producer end and
KafkaAvroDeserializer will take care of all the deserialization work at the consumer end.
But how do they communicate with each other about the schema? The deserializer should know the schema.
Without knowing the schema, it can't deserialize the raw bytes back to an object. That's where the
schema registry is useful. The
KafkaAvroSerializer
will store the schema details into the schema registry and include an ID of the schema into the message record. When
KafkaAvroDeserializer receives a message, it takes the
Schema ID from the message and gets schema details from the registry. Once we have the schema
details and message bytes, it is simple to deserialize them. That's where we use the schema registry.
Let's come back to our producer code.
After setting up all the properties, everything else is straight forward.We create a
ClickRecord object, set values for the fields and send it to Kafka Broker.
I hope you remember that we generated the source code for the
ClickRecord class using Avro tool. Now, we are using it in our producer. That's all we do
in a producer.
Kafka Avro Consumer
The consumer code is also familiar.
We setup properties including schema registry URL and value deserializer as
KafkaAvroDeserializer. We have another additional property
specific.avro.reader=true. If we are using Avro schema and code generation for the schema,
this property is mandatory. In fact, there is another generic method to use Avro schema without using
Avro code generation tool, but I am not covering that part in this video. So, if you are generating
code for your Avro schema, set this property as true.
After setting all the properties, we create a consumer instance, subscribe to the topics and
start polling and processing messages.
KafkaAvroDeserializer takes care of all the pain of deserializing messages and converting
it back to the
ClickRecord object.
We are ready to compile and execute the producer and consumer and see all of this working.
Confluent Platform for Kafka
But this example is dependent on the confluent platform. The schema registry and Kafka Avro serializer module is part of
the Confluent platform. To execute this example, we need to download and install an open source version
of the confluent platform.
This session is getting too long, so I am splitting it into two parts.
In this first part, we covered the notion of schema evolution and looked at Avro as a solution
to the problem of schema evolution. We created a schema, generated code for the schema using Avro
tool. Then we learned how a producer and consumer would use Avro schema. We talked about schema registry
and its purpose.
In the next session, we will download and install confluent platform.We will then compile and
execute our code.
But the whole point of using Avro is to see how to handle changes in the schema. So, we will
modify our schema and learn how can we support both old and new schema in a system where both old
and new producers and consumers are alive.
Thank you for watching Learning Journal. Keep learning and keep growing.