Apache Kafka Foundation Course - Schema Evolution - Part 1


Hello and welcome to Kafka tutorials at Learning Journal. We have learned almost every concept associated with Kafka producers and consumers. We learned Java APIs for Kafka and created several examples to put them to use. In this session, we will cover a suitable method to handle schema evolution in Apache Kafka.
Schema evolution is all about dealing with changes in your message record over time. Let's assume that you are collecting clickstream and your original schema for each click is something like this.

  1. Session id - An identifier for session
  2. Browser - An identifier for the browser
  3. Campaign - A custom identifier for a running campaign
  4. Channel - A custom identifier for the section of the site
  5. Referrer - A first hit referrer (ex - facebook.com)
  6. IP - An IP address from your ISP

So, you have producers that are sending this data to Kafka, and you have consumers that are reading this data from Kafka and doing some analysis. We have already seen this kind of example in one of my earlier videos. If you missed that, go back and watch custom serializer and deserializer. I am sure you can deliver a solution to this requirement by implementing a custom serializer and a deserializer.
You had this system in place for few months, and later you decided to upgrade your schema to something like this.

  1. Session id - An identifier for session
  2. Browser - An identifier for the browser
  3. Campaign - A custom identifier for a running campaign
  4. Channel - A custom identifier for the section of the site
  5. Entry URL - A first hit referrer URL
  6. IP - An IP address from your ISP
  7. Language - An identifier for the language
  8. OS ID - An identifier for the operating system

The problem starts now. If you are changing your schema, you need to create new producers because you want to send some new fields. Right? But I have two more questions.


  1. Do I need to change all current producers?
    I mean, I am fine to create new producers to include additional fields. But I don't want the system to break if I am not upgrading all of them.
  2. Do I need to change my existing consumers?
    Again, I am happy to create some new consumers to work with newly added fields. But my current consumers are doing good, and they have nothing to do with new attributes. I don't want to change them.

So, I don't want to change my current producers and consumers because that will be too much of work. So, what do I want? In fact, I want to support both old and new schema simultaneously. Can I support both versions of schemas?
In a standard case, if you change your schema, you have to change everything, your producers, consumers, serializer, and deserializer. The problem doesn't end there. After making these changes, you can't read old messages because you changed the code and any attempt to read old messages using new code will raise an exception.
In summary, I need to have a combination of old and new producers as well as a mix of old and new consumers. Kafka should be able to store both types of messages on the same topic. Consumers should be able to read both types of messages without any error.
That's what is a typical schema evolution problem. How do you handle this schema evolution problem in Apache Kafka?
But do you know, how do we take care of it in other systems?
The industry solution to handling schema evolution is to include schema information with the data. So, when someone is writing data, they write schema and data both. And when someone wants to read that data, they first read schema and then read data based on the schema. If we follow this approach, we can keep changing schema as frequently as required without worrying to change our code because we are always reading schema before reading a message.
There are pre-built and reusable serialization systems to help us and simplify the whole process of translating messages according to schemas and embedding schema information in the message record. Avro is one of them. It is the most popular serialization system for Hadoop and its ecosystem.

Apache Avro for Kafka

Kafka follows the same approach and uses Avro to handle schema evolution problem. Let me give you a brief introduction to Avro and them we will create an example to see how all of this works in Kafka.
But, before that, you may need to download Avro tools because we will be using it in the upcoming example. If the direct URL is not working for any reason, you can go to the Avro release page and follow the download link from there.
Ok, we need to understand how Avro works. Then we will explore how to implement it in Kafka. Avro is a data serialization system, and it offers you four things.

  1. Allows you to define a schema for your data.
  2. Generates code for your schema. (Optional)
  3. Provide APIs to serialize your data according to the schema and embed schema information in the data.
  4. Provide APIs to extract schema information and deserialize your data based on the schema.

That's it. That's all a serialization system can give you. Let's see how it works. We will follow the four steps listed above.
If you have some data, and you want to serialize it, the first thing is to define a schema for your data. Avro schemas are defined using JSON. So, let's define a schema for our clickstream record.
So, here is my schema file.
File name:- ClickRecordV1.avsc

                                
    { "type": "record",
        "name": "ClickRecord",
        "fields": [
            {"name": "session_id", "type": "string"},
            {"name": "browser", "type": ["string", "null"]},
            {"name": "campaign", "type": ["string", "null"]},
            {"name": "channel", "type": "string"},
            {"name": "referrer", "type": ["string", "null"], "default": "None"},
            {"name": "ip", "type": ["string", "null"]}
        ]
    }                                               
                         

The name of my schema is ClickRecord, and there are six items. The first field name is session_id, and the data type is string. Session id and channel are mandatory. However, the browser, campaign and IP address can have a null value. The referrer has a default value none.
It is a simple schema. However, Avro supports many other complex data types. For more details, you can refer to Avro specification
Let's come back to the second step. Generate code for your schema. This step is simple. We will use Avro tool to generate code. Place your Avro schema file and the Avro tool in the same directory and execute the Avro tool using below command.

                                
    java -jar avro-tools-1.8.1.jar compile schema ClickRecordV1.avsc                                           
                         

To execute Avro tool,give it a command to compile schema. Then give the schema file name and the destination directory. Hit enter and Done. This command will generate java file in the same directory. That's your source code generated by Avro tool. If you quickly look at the generated Java code, you should realize that it creates a Java class for ClickRecord with get and set methods for each field. That’s it. We finished step 2.
Let's come back to step 3 and 4.Avro provides API for serializing your data. So, you can use ClickRecord class generated in step 2 to create your data objects. And then you can use Avro APIs as stated in step 3 to serialize them and send to a Kafka broker. That's what a Kafka producer and a serializer do. So, at the producer end, I have to create a ClickRecord object and use a Avro serializer.
Similarly, on the receiving end, I can use Avro APIs as stated in step 4 to deserialize those messages back to ClickRecord objects. That's what a deserializer does. So, at the receiver end, I have to use aAvro deserializer that extracts schema information from the message and use Avro APIs to deserialize my data back to the ClickRecord object.
The point I want to make is that everything ultimately goes to an AvroSerializer and an AvroDeserializer. Producers and Consumers will only use the generated class to create data objects. Serializer and Deserializer take care of rest. So, Let me redefine these four steps regarding Kafka implementation.

  1. Define an Avro Schema for your message record.
  2. Generate a source code for your Avro Schema.
  3. Create a producer and use KafkaAvroSerializer.
  4. Create a consumer and use KafkaAvroDeserializer.

Kafka Avro Producer

The whole process of sending Kafka messages is quite straight forward. KafkaAvroSerializer and KafkaAvroDeserializer take all the complications away from a developer. You don't have to learn and use Avro APIs to serialize and deserialize your data. All that is already done and bundled into Kafka-Avro-Serializer module developed by the confluent team. Impressive! Isn't it? Now, let's look at the code for a producer.

                                    
    import java.util.*;
    import org.apache.kafka.clients.producer.*;
    public class AvroProducer {
                                            
        public static void main(String[] args) throws Exception{
                                            
            String topicName = "AvroClicks";
            String msg;
                                            
            Properties props = new Properties();
            props.put("bootstrap.servers", "localhost:9092,localhost:9093");        
            props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
            props.put("schema.registry.url", "http://localhost:8081");
                                            
            Producer<String, ClickRecord> producer = new KafkaProducer<>(props);
            ClickRecordcr = new ClickRecord();
            try{
                cr.setSessionId("10001");
                cr.setChannel("HomePage");
                cr.setIp("192.168.0.1");
                                            
                producer.send(new ProducerRecord<String, ClickRecord>(topicName,cr.getSessionId().toString(),cr)).get();
                                            
                System.out.println("Complete");
            }
            catch(Exception ex){
                ex.printStackTrace(System.out);
            }
            finally{
                producer.close();
            }
                                            
        }
    }                                                   
                             

The code looks familiar, right. We have a topic AvroClicks, and we want to send our ClickRecords to this Topic. Like every other producer, we create three mandatory properties and set appropriate values. Our message keys are going to be a string, so we configure the key serializer class as StringSerializer. Our message values are going to be according to Avro Schema, so we set the value serializer class as KafkaAvroSerializer. The confluent team developed this class, and it takes care of all the serialization activity. It protects us from learning Avro APIs and takes care of all the complexities of serialization work.
The next property is schema registry. This one is new. It's a new component developed by the confluent team. You might be wondering about the purpose of this tool.

Kafka Schema Registry

So far, we learned that KafkaAvroSerializer would take care of all the serialization work at producer end and KafkaAvroDeserializer will take care of all the deserialization work at the consumer end. But how do they communicate with each other about the schema? The deserializer should know the schema. Without knowing the schema, it can't deserialize the raw bytes back to an object. That's where the schema registry is useful. The KafkaAvroSerializer will store the schema details into the schema registry and include an ID of the schema into the message record. When KafkaAvroDeserializer receives a message, it takes the Schema ID from the message and gets schema details from the registry. Once we have the schema details and message bytes, it is simple to deserialize them. That's where we use the schema registry.
Let's come back to our producer code.
After setting up all the properties, everything else is straight forward.We create a ClickRecord object, set values for the fields and send it to Kafka Broker.
I hope you remember that we generated the source code for the ClickRecord class using Avro tool. Now, we are using it in our producer. That's all we do in a producer.

Kafka Avro Consumer

The consumer code is also familiar.

                                    
    import java.util.*;
    import org.apache.kafka.clients.consumer.*;                                            
                                            
    public class AvroConsumer{
                                            
        public static void main(String[] args) throws Exception{
                                            
            String topicName = "AvroClicks";                                            
            String groupName = "RG";

            Properties props = new Properties();
            props.put("bootstrap.servers", "localhost:9092,localhost:9093");
            props.put("group.id", groupName);
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
            props.put("schema.registry.url", "http://localhost:8081");
            props.put("specific.avro.reader", "true");
                                            
            KafkaConsumer<String, ClickRecord> consumer = new KafkaConsumer<>(props);
            consumer.subscribe(Arrays.asList(topicName));
            try{
                while (true){
                    ConsumerRecords<String, ClickRecord> records = consumer.poll(100);
                    for (ConsumerRecord<String, ClickRecord>record : records){
                            System.out.println("Session id="+ record.value().getSessionId()
                                            + " Channel=" + record.value().getChannel() 
                                            + " Referrer=" + record.value().getReferrer());
                        }
                    }
                }catch(Exception ex){
                    ex.printStackTrace();
                }
                finally{
                    consumer.close();
                }
        }
                                            
    }                                                   
                             

We setup properties including schema registry URL and value deserializer as KafkaAvroDeserializer. We have another additional property specific.avro.reader=true. If we are using Avro schema and code generation for the schema, this property is mandatory. In fact, there is another generic method to use Avro schema without using Avro code generation tool, but I am not covering that part in this video. So, if you are generating code for your Avro schema, set this property as true.
After setting all the properties, we create a consumer instance, subscribe to the topics and start polling and processing messages. KafkaAvroDeserializer takes care of all the pain of deserializing messages and converting it back to the ClickRecord object.
We are ready to compile and execute the producer and consumer and see all of this working.

Confluent Platform for Kafka

But this example is dependent on the confluent platform. The schema registry and Kafka Avro serializer module is part of the Confluent platform. To execute this example, we need to download and install an open source version of the confluent platform.
This session is getting too long, so I am splitting it into two parts.
In this first part, we covered the notion of schema evolution and looked at Avro as a solution to the problem of schema evolution. We created a schema, generated code for the schema using Avro tool. Then we learned how a producer and consumer would use Avro schema. We talked about schema registry and its purpose.
In the next session, we will download and install confluent platform.We will then compile and execute our code.
But the whole point of using Avro is to see how to handle changes in the schema. So, we will modify our schema and learn how can we support both old and new schema in a system where both old and new producers and consumers are alive.
Thank you for watching Learning Journal. Keep learning and keep growing.


You will also like:


Anonymous Functions

Learn Scala Anonymous Functions with suitable examples.

Learning Journal

Immutability in FP

The literal meaning of Immutability is unable to change? How to program?

Learning Journal

Tail Recursion

Tail recursion is another concept associated with recursion. Learn with examples.

Learning Journal

Apache Spark Introduction

What is Apache Spark and how it works? Learn Spark Architecture.

Learning Journal

Spark in Google cloud

Learn How to Install Hadoop and Spark in Google Cloud in just 2 minuts.

Learning Journal