Apache Kafka Foundation Course - Schema Evolution - Part 2

Welcome to Kafka Tutorial at Learning Journal. In the previous session, we talked about schema evolution problem. We learned Avro basics and created a producer and consumer that uses Avro schema.In this session, we will Install and configure open source version of the Confluent platform and execute our producer and consumer. We will also modify our schema and create a new version of the prior schema. Then, We will create a new producer to send some messages based on the new schema.
This example will show us the schema evolution in action. We will see old producer, new producer and old consumer working together in the same system. Finally, we will create a new consumer as well. This new consumer will be able to read old and new both types of messages without any exception.By the end of this session, we will have a system where old and new both versions of the schema will be working with older and newer types of producers and consumers.
So, let’s start.

Confluent Open Source

The confluent platform is, in fact, Apache Kafka packaged together with additional components. All these elements together with Apache Kafka make it a flexible and powerful streaming platform.There are many ways to download and install confluent platform. However, I am going to use RPM packages via Yum. You can follow confluent documentation , and install confluent platform. For an installation demo, you can watch the video.
The current confluent platform at the time of recording this video is version is 3.1.1, and it comes with Kafka 0.10.1. Both versions are the latest releases at the time of recording this tutorial. I am expecting the same APIs to be released as Kafka 1.0 in near future.
Once you installed Confluent platform, jump over to quick start guide. Follow the quick start guide to start all necessary services. We need to start zookeeper, Kafka server and schema registry. I covered all of that in an earlier video for Apache Kafka installation and demo. We are doing similar things except starting schema registry as well. Once you have all the services running, you are ready to compile and execute our Avro producer and consumer.
To Compile our producer and consumer, we need to include Avro and Kafka-Avro-serializer dependencies. I have an SBT build file that contains these dependencies.

    name := "AvroTest"

    val repositories = Seq(
        "confluent" at "http://packages.confluent.io/maven/",
    libraryDependencies ++= Seq(
        "org.apache.avro" % "avro" % "1.8.1",
        "io.confluent" % "kafka-avro-serializer" % "3.1.1",
        "org.apache.kafka" % "kafka-clients" % ""
        exclude("javax.jms", "jms")
        exclude("com.sun.jdmk", "jmxtools")
        exclude("com.sun.jmx", "jmxri")
        exclude("org.slf4j", "slf4j-simple")
    resolvers += "confluent" at "http://packages.confluent.io/maven/"                                     

With the help of above build file, you can compile and execute the Avro producer and consumer. The video contains a demo for the same.

Schema Evolution in Kafka

So far, we learned that how can we use Avro schema in our producers and consumers. But the whole point of using Avro is to support evolving schemas. So, let's change our schema. Here is the new version of my schema.
File Name:-ClickRecordV2.avsc

    {"type": "record",
        "name": "ClickRecord",
        "fields": [
            {"name": "session_id", "type": "string"},
            {"name": "browser", "type": ["string", "null"]},
            {"name": "campaign", "type": ["string", "null"]},
            {"name": "channel", "type": "string"},
            {"name": "entry_url", "type": ["string", "null"], "default": "None"},
            {"name": "ip", "type": ["string", "null"]},
            {"name": "language", "type": ["string", "null"], "default": "None"},
            {"name": "os", "type": ["string", "null"],"default": "None"}     

Note that the schema name and type don't change. The name and type of schema are still same as earlier. However, I changed the record structure. The first change is to remove referrer field and the second change is to add three new attributes. I have made these changes in the schema but let me warn you that you are not free to evolve your schema in a random fashion. There are some rules. Avro specification defines some rules for compatibility. You should refer to Avro specification for more details about compatibility guideline.
So, we modified our schema. We need to generate code for this new schema. You can do that using below command. We performed this activity in the earlier lesson.

    java -jar avro-tools-1.8.1.jar compile schema ClickRecordV2.avsc                                           

Now, we need to create a new producer and send some messages in the new format.

Kafka Producer

So, here is my code for the new producer.

    import java.util.*;
    import org.apache.kafka.clients.producer.*;
    public class ClickRecordProducerV2 {
        public static void main(String[] args) throws Exception{
            String topicName = "AvroClicks";
            String msg;
            Properties props = new Properties();
            props.put("bootstrap.servers", "localhost:9092,localhost:9093");
            props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
            props.put("schema.registry.url", "http://localhost:8081");
            Producer<String, ClickRecord> producer = new KafkaProducer<>(props);
            ClickRecordcr = new ClickRecord();
                producer.send(new ProducerRecord<String, ClickRecord>(topicName,cr.getSessionId().toString(),cr)).get();  
            catch(Exception ex){

The code is same as earlier, but this time, I am creating a ClickRecord object using new schema code. Then I am setting some fields and finally sending it to Kafka broker. This Producer will send a message in the new format.

Testing Schema Evolution in Kafka

Now, it's time to open three terminals. I will execute a consumer in one terminal. That wouldbe an old consumer. Then, I will start a new producer in one terminal and see if the old consumer can read the message sent by the new producer.
The video shows that the old consumer can read a message that came with an evolved schema.
You can start an old producer and check if your consumer is still able to read the old message as well. The video demo shows that use case successfully.
So,we have seen that using Avro and schema registry, we can quickly build a system where producers and consumers can write and read messages using an evolving schema, and different versions of messages can co-exist in the same system.
I have a written an example code for the new consumer as well. I am not going to explain it because it is essentially same as old consumer. The only difference is that it uses source code generated from the new schema. I leave it for you to try and test yourself. The code is listed below.

    import java.util.*;
    import org.apache.kafka.clients.consumer.*;
    public class ClickRecordConsumerV2{    
        public static void main(String[] args) throws Exception{
            String topicName = "AvroClicks";
            String groupName = "RG";
            Properties props = new Properties();
            props.put("bootstrap.servers", "localhost:9092,localhost:9093");
            props.put("group.id", groupName);
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
            props.put("schema.registry.url", "http://localhost:8081");
            props.put("specific.avro.reader", "true");
            KafkaConsumer<String, ClickRecord> consumer = new KafkaConsumer<>(props);
                while (true){
                    ConsumerRecords<String, ClickRecord> records = consumer.poll(100);
                    for (ConsumerRecord<String, ClickRecord>record : records){
                        System.out.println("Session id="+ record.value().getSessionId()
                                        + " Channel=" + record.value().getChannel() 
                                        + " Entry URL=" + record.value().getEntryUrl()
                                        + " Language=" + record.value().getLanguage());
                }catch(Exception ex){

That's it for this session. Thank you for watching Learning Journal. Keep learning and keep growing.

You will also like:

Functional Programming

What is Functional Programming and why it is important?

Learning Journal

Scala Variable length arguments

How do you create a variable length argument in Scala? Why would you need it?

Learning Journal

Pattern Matching

Scala takes the credit to bring pattern matching to the center.

Learning Journal

Scala named arguments

Learn about named arguments and default values in Scala functions with examples.

Learning Journal

What is a closure?

A closure is a function. Like any other Scala function, a Closure may be pure or impure.

Learning Journal