Apache Kafka Foundation Course - Custom Serializer


Welcome to Kafka tutorial at Learning Journal. In this lesson, we will discuss custom serializers.
We already know that we need appropriate serializer for our keys and values. In all our earlier examples, we are sending strings and hence we have been using string serializers. Kafka also provides some other serializers, for example, int, double and long. But these serializers together with string serializer doesn't cover most of the use cases.

Why Custom Serializer

If you are coming from a database background, you can think of a topic like a table and each message sent to the topic like a record. Those records are not always just a single string or a number. Normally, we have multiple columns in a record. So, when working with Kafka, we need to be able to send a record of multiple columns.
Similarly, if you are coming from an object-oriented programming background, you will see Kafka message as an object. And normally, these objects will have multiple fields and methods. We should be able to send these objects to Kafka as a message. Sending simple strings to Kafka may fulfil some requirements. But in a complex condition, you may need to send custom objects, for example, A supplier object or an invoice object. If you want to send such custom objects or a row like structure, you need to implement a custom serializer and a deserializer.
But let me tell you that there are other better options. Best practice is to use generic serializers instead of creating custom serializer and deserializer. There are many generic serializers like Avro and protocol buffer. But to be able to understand how serializers work, we will look at one example. However, most of the time, you will be using Generic serializers like Avro. We will leave Avro for some other day and focus on custom serializer in this session.

Kafka Serializer Example

To understand the idea of serializer and deserializer, we need to create an example.In this example, we will do following things.

  1. Create a Supplier class. We will serialize the supplier class and send the supplier object as a message to Kafka.
  2. Create a Kafka producer. This producer will send supplier object as a Kafka record. Earlier we were sending strings, but in this example, we are going to push an object instead of a simple string.
  3. Create a serializer to convert a supplier object into a byte array.
  4. Create a deserializer to convert a byte array back into a supplier object. Kafka doesn't know how to serialize and deserialize our object, and so we must create a serializer and a deserializer.
  5. Create a consumer. Finally, we will create a consumer that will read supplier objects from Kafka and just print the details on the console.

We will execute our example and observe all this working together. Let's start.


Create a Supplier Class

The first thing is the supplier class. Let's look at the code.

                                
    import java.util.Date;
    public class Supplier{
        private intsupplierId;
        private String supplierName;
        private Date supplierStartDate;
                                    
        public Supplier(int id, String name, Date dt){
            this.supplierId = id;
            this.supplierName = name;
            this.supplierStartDate = dt;
        }
                                    
        public int getID(){
            return supplierId;
        }
            
        public String getName(){
            return supplierName;
        }
            
        public Date getStartDate(){
            return supplierStartDate;
        }
    }                                           
                            

The supplier class defines three variables.

  1. Supplier id
  2. Supplier name
  3. Supplier date.

We also have one constructor and three methods. We will use this supplier class to instantiate supplier object and send it as a Kafka message. This code is very simple. The constructor takes three parameters and initializes the three variables. The three methods are to get the values for the corresponding variable.


Create a Kafka Serializer

Next thing is a serializer class. We already know that there is a string serializer. It is a good idea to copy the existing source code and modify it according to your requirement. So, I took the code from Kafka source repository and modified it for my example. So, let's look at the modified code.

                                
    import org.apache.kafka.common.serialization.Serializer;
    import org.apache.kafka.common.errors.SerializationException;
    import java.io.UnsupportedEncodingException;
    import java.util.Map;
    import java.nio.ByteBuffer;
                                    
    public class SupplierSerializer implements Serializer<Supplier> {
        private String encoding = "UTF8";
                                    
        @Override
        public void configure(Map<String, ?> configs, booleanisKey) {
            // nothing to configure
        }
                                    
        @Override
        public byte[] serialize(String topic, Supplier data) {
            intsizeOfName;
            intsizeOfDate;
            byte[] serializedName;
            byte[] serializedDate;
                                    
            try {
                    if (data == null)
                        return null;
                
                    serializedName = data.getName().getBytes(encoding);
                    sizeOfName = serializedName.length;
                    serializedDate = data.getStartDate().toString().getBytes(encoding);
                    sizeOfDate = serializedDate.length;

                    ByteBuffer buf = ByteBuffer.allocate(4+4+sizeOfName+4+sizeOfDate);
                    buf.putInt(data.getID());
                    buf.putInt(sizeOfName);
                    buf.put(serializedName);
                    buf.putInt(sizeOfDate);
                    buf.put(serializedDate);                                    

                    return buf.array();

                } catch (Exception e) {
                    throw new SerializationException("Error when serializing Supplier to byte[]");
            }
        }
                                    
        @Override
        public void close() {
            // nothing to do
        }
    }                                                               
                            

The name for my class is SupplierSerializer. It implements Serializer interface and sets the generic type as Supplier. This interface is defined under Kafka common package. As per this interface, we need to override three methods.


  1. Configure
  2. Serialize
  3. Close

If you remember the previous session where we implemented a custom partitioner, you can recall this pattern. We had a similar Interface for Partitioner. So, we already know that the configure and the close are for initialization and clean-up, and Kafka producer will call these methods only once. It will call configure when we instantiate the producer and call close when we close the producer. But in our example, we have nothing to do with configure and close methods. So, we leave them empty.
The main action is happening in the serialize method. The code is straightforward. If the data is null, we return null because we have nothing to serialize.
We simply convert supplier name and supplier start date into UTF8 bytes. Then we allocate a byte buffer and encode everything into the byte buffer. Since we will need to know the length of supplier name and supplier date strings at the time of deserialization, we also encode their sizes into the byte buffer. Finally, we return the byte buffer array. That's it. Done. That's what the serialization means, convert your object into bytes, and that's what we have done in the above example.

Create a Kafka Deserializer

Next part is a deserializer. Once you understand the serializer, the deserializer is simple. Let's look at the code for deserializer.

                                
    import java.nio.ByteBuffer;
    import java.util.Date;
    import java.text.DateFormat;
    import java.text.SimpleDateFormat;
    import org.apache.kafka.common.errors.SerializationException;
    import org.apache.kafka.common.serialization.Deserializer;
    import java.io.UnsupportedEncodingException;
    import java.util.Map;
                                    
    public class SupplierDeserializer implements Deserializer<Supplier> {
        private String encoding = "UTF8";
                                    
        @Override
        public void configure(Map<String, ?> configs, booleanisKey) {
            //Nothing to configure
        }
                                    
        @Override
        public Supplier deserialize(String topic, byte[] data) {
                                    
            try {
                    if (data == null){
                        System.out.println("Null recieved at deserialize");
                        return null;
                    }
                
                    ByteBuffer buf = ByteBuffer.wrap(data);
                    int id = buf.getInt();
                                        
                    int sizeOfName = buf.getInt();
                    byte[] nameBytes = new byte[sizeOfName];
                    buf.get(nameBytes);
                    
                    String deserializedName = new String(nameBytes, encoding);
                                        
                    int sizeOfDate = buf.getInt();
                    byte[] dateBytes = new byte[sizeOfDate];
                    buf.get(dateBytes);

                    String dateString = new String(dateBytes,encoding);                                    
                    DateFormat df = new SimpleDateFormat("EEE MMM ddHH:mm:ss Z yyyy");
                                        
                    return new Supplier(id,deserializedName,df.parse(dateString));
                                    
                } catch (Exception e) {
                    throw new SerializationException("Error when deserializing byte[] to Supplier");
            }
        }
                                    
        @Override
        public void close() {
            // nothing to do
        }
    }                                                       
                            

We are doing the opposite of what we did in the serializer. We deserialize every field, create a new supplier object and return it. That was all simple. That's what deserialization means, take a byte array and convert it into an object.

Problems with Kafka Custom Serializer/Deserializer

But the problem with this approach is managing future changes in the schema. Suppose you implemented this serializer and deserializer, and your system is functional for few months. After few months, you have a requirement to add another field in the supplier object. If you modify your Supplier object, you must change your serializer and deserializer, may be the producer and consumer as well. But the problem doesn't end there. After making new changes, you can't read your older messages because you changed the format and modified your code to read the new format. That's where generic serializers like Avro will be helpful. We will cover that part in some other session. For now, let’s continue our discussion on this example.

Kafka Producer

We completed supplier class, serializer, and deserializer class. Now, we need a producer to send messages and a consumer to receive messages. There is nothing new about a producer, we have already created several producers earlier.

                                
    import java.util.*;
    import java.text.DateFormat;
    import java.text.SimpleDateFormat;
    import org.apache.kafka.clients.producer.*;

    public class SupplierProducer {
                                    
        public static void main(String[] args) throws Exception{
                                    
            String topicName = "SupplierTopic";
                                    
            Properties props = new Properties();
            props.put("bootstrap.servers", "localhost:9092,localhost:9093");
            props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer", "SupplierSerializer");
                                    
            Producer<String, Supplier> producer = new KafkaProducer<>(props);
                                    
            DateFormat df = new SimpleDateFormat("yyyy-MM-dd");
            Supplier sp1 = new Supplier(101,"Xyz Pvt Ltd.",df.parse("2016-04-01"));
            Supplier sp2 = new Supplier(102,"Abc Pvt Ltd.",df.parse("2012-01-01"));
                                    
            producer.send(new ProducerRecord<String,Supplier>(topicName,"SUP",sp1)).get();
            producer.send(new ProducerRecord<String,Supplier>(topicName,"SUP",sp2)).get();
                                    
            System.out.println("SupplierProducer Completed.");
            producer.close();
                                    
        }
    }                                                       
                            

The producer code in this tutorial is almost same as earlier examples. I changed the value serializer class name. I also changed the producer generics parameter. And finally, we are sending two messages using synchronous send.


Kafka Consumer

Now we need a consumer. We can't use console consumer because we need a custom deserializer to interpret our message records. Let's look at our consumer code shown below.

                                
    import java.util.*;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
                                    
    public class SupplierConsumer{
                                    
        public static void main(String[] args) throws Exception{
                                
            String topicName = "SupplierTopic";
            String groupName = "SupplierTopicGroup";
                            
            Properties props = new Properties();
            props.put("bootstrap.servers", "localhost:9092,localhost:9093");
            props.put("group.id", groupName);
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "SupplierDeserializer");                                
                                
            KafkaConsumer<String, Supplier> consumer = new KafkaConsumer<>(props);
            consumer.subscribe(Arrays.asList(topicName));
                                
            while (true){
                ConsumerRecords<String, Supplier> records = consumer.poll(100);
                for (ConsumerRecord<String, Supplier>record : records){
                    System.out.println("Supplier id= " + String.valueOf(record.value().getID()) + " Supplier  Name = " + record.value().getName() + " Supplier Start Date = " + record.value().getStartDate().toString());
                }
            }
                                
        }
    }                                                       
                            

We are creating our first consumer. We haven't created a consumer earlier in this tutorial. Understanding consumers will take another set of sessions, so I am not going to explain the above consumer code in this lesson. That's a separate topic covered in the later lesions. But just to give you a glimpse, we implemented a loop. The loop is processing each message that the consumer received from the Kafka broker. I am only displaying all three supplier fields on the console. So, it will show you all the suppliers received on the consumer side.
If you want to execute the code and observe the outcome. Follow the video tutorial. The video demonstrates the compilation and execution process using SBT. If you want to use SBT, you will need a build file. You can use below content in your build file to compile this example.

                                
    name := "KafkaTest"

    libraryDependencies ++= Seq(
    "org.apache.kafka" % "kafka-clients" % "0.10.1.0"
    exclude("javax.jms", "jms")
    exclude("com.sun.jdmk", "jmxtools")
    exclude("com.sun.jmx", "jmxri")
    exclude("org.slf4j", "slf4j-simple")
    )                                           
                            

That's it for this session. We will cover some more details on Kafka producer in next session. See you again.
Thanks for watching Learning Journal. Keep learning and keep growing.



You will also like:


Functional Programming

What is Functional Programming and why it is important?

Learning Journal

Scala Variable length arguments

How do you create a variable length argument in Scala? Why would you need it?

Learning Journal

Pure Function benefits

Pure Functions are used heavily in functional programming. Learn Why?

Learning Journal

Scala named arguments

Learn about named arguments and default values in Scala functions with examples.

Learning Journal

Anonymous Functions

Learn Scala Anonymous Functions with suitable examples.

Learning Journal