Apache Kafka Foundation Course - Custom Partitioner


Welcome to Apache Kafka tutorial at Learning Journal. In this session, we are going to explore Kafka partitioner. We will try to understand why default partitioner is not enough and when you might need a custom partitioner. We will also look at a use case and create code for custom partitioner. I already explained that Kafka partitioner is responsible for deciding partition number for each message. We also discussed the behaviour of the default partitioner. Let me quickly recap the default partitioner.
The default partitioner follows these rules.

  1. If a producer specifies a partition number in the message record, use it.
  2. If the message doesn’t hard code a partition number, but it provides a key, choose a partition based on a hash value of the key.
  3. If no partition number or key is present, pick a partition in a round-robin fashion.

So, you can use default partitioner in three scenarios.

  1. You don't care about which partition your data is landing, but you want the partitioner to distribute your data evenly, you will use the third rule of default partitioner.
  2. If you already know which partition do you want to send the data, you will hard code it and use the first rule of default partitioner.
  3. If you want your data to be distributed based on your Key, you will specify a key in your messages.

Partition Keys

But there is a catch with the key, and that is because the way hashing works. The hashing guarantees that a key will always give you the same hash number. But it doesn't ensure that two different keys will never give you the same hash number.
So, for example, if you have three tables, and you want to send all rows from these three tables to three different partitions. I mean, data from table A goes to partition 0 and data from table B goes to partition 1 and so on. One of the obvious thought is to send table name as a key. But that will be incorrect because Table A and Table B can give the same number after hashing. I will show you this happening in the example.
Since hashing doesn’t guarantees a unique number, it is better that you manage the translation of the table name to a partition number in your producer and hard code your partition number with the message. Another alternative is to implement a custom partitioner and use your partitioner instead of using the default one. I will show you an example for custom partitioner as well.
That's the first precaution with the key.
There is another catch with the key. The partition number is the mod of the hash value of the key and the total number of partitions on the topic. So, if you are increasing the number of partitions for your topic, the default partitioner will start returning different numbers. That may be a problem if you are relying on your key for achieving a spaecifc partitioning.
With these two problems, I don't find a Key to be of good use in making desired custom partitioning. If you want a specific type of partitioning, the only option is to create your own algorithm and implement it in a custom partitioner.


Kafka Partitioner Example

Let's create an example use-case and implement a custom partitioner.
Assume, we are collecting data from a bunch of sensors. All the sensors are sending data to a single topic. I planned ten partitions for the topic. But I want three partitions dedicated for a specific sensor named TSS and remaining seven partitions for rest of the sensors. How would you achieve this?
You can solve this requirement, and any other type of partitioning need by implementing a custom partitioner.
Let me show you the solution code for this problem. The first thing is a producer. We created several producers earlier. I created a new one for this example. Let’s look at the producer code.

                                
    import java.util.*;
    import org.apache.kafka.clients.producer.*;
    public class SensorProducer {
                                    
    public static void main(String[] args) throws Exception{
                                    
        String topicName = "SensorTopic";
                                    
            Properties props = new Properties();
            props.put("bootstrap.servers", "localhost:9092,localhost:9093");
            props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("partitioner.class", "SensorPartitioner");
            props.put("speed.sensor.name", "TSS");
                                        
            Producer<String, String> producer = new KafkaProducer<>(props);
                                        
            for (inti=0 ;i<10 ; i++)
                producer.send(new ProducerRecord<>(topicName,"SSP"+i,"500"+i));
                                        
            for (inti=0 ;i<10 ; i++)
                producer.send(new ProducerRecord<>(topicName,"TSS","500"+i));
                                        
            producer.close();                                        
            System.out.println("SimpleProducer Completed.");
        }
    }                                               
                            

The producer code is like other producers that I have already explained. The only difference is in the configuration properties. We are setting two new properties. The first one is partitioner.class property. Since we are not using default partitioner, we set this property to the class name of our custom partitioner. I will show you the code for this custom partitioner class in a minute.
The next property speed.sensor.name is not a Kafka configuration. It is a custom property. We are using it to supply the name of the sensor that requires special treatment. I don't want to hardcode the string TSS in my custom partitioner, and custom configuration is the method of passing values to your partitioner. Rest of the code is straightforward. We send some messages for various sensors. Then we send some messages for TSS sensor.


Custom Kafka Partitioner

We need to create our class by implementing Partitioner Interface. Your custom partitioner class must implement three methods from the interface.

  1. Configure
  2. Partition
  3. Close

Let’s look at the code.

                                
    import java.util.*;
    import org.apache.kafka.clients.producer.*;
    import org.apache.kafka.common.*;
    import org.apache.kafka.common.utils.*;
    import org.apache.kafka.common.record.*;
                                    
    public class SensorPartitioner implements Partitioner {
                                    
        private String speedSensorName;
                                    
        public void configure(Map<String, ?> configs) {
            speedSensorName = configs.get("speed.sensor.name").toString();                                    
        }
                                        
        public intpartition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
                                    
            List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
            intnumPartitions = partitions.size();
            intsp = (int)Math.abs(numPartitions*0.3);
            int p=0;
                                    
            if ( (keyBytes == null) || (!(key instanceof String)) )
                throw new InvalidRecordException("All messages must have sensor name as key");
                                    
            if ( ((String)key).equals(speedSensorName) )
                p = Utils.toPositive(Utils.murmur2(valueBytes)) % sp;
            else
                p = Utils.toPositive(Utils.murmur2(keyBytes)) % (numPartitions-sp) + sp ;
                                    
            System.out.println("Key = " + (String)key + " Partition = " + p );
            return p;
        }
        public void close() {}                                    
    }                                           
                            

The configure and the close methods are like initialization and clean-up methods. They will be called once at the time of instantiating your producer. You can initialise things in Configure method and clean up things in close method. In our example, we don't have anything to clean up. However, we have something to configure.
We want to find the sensor name that requires three partitions. My producer is sending that name as a custom configuration. The configure method is extracting the configuration value and setting it into a private variable. I will use that variable later in the code.
The partition method is the place where all the action happens. The producer will call this method for each message and provide all the details with every call. The input to the method is the topic name, key, value and the cluster details. With all these input parameters, we have everything that is required to calculate a partition number. All that we need to do is to return an integer as a partition number. This method is the place where we implement our algorithm for partitioning.
Let’s try to understand the algorithm that I have implemented. I am applying my algorithm in four simple steps.


  1. The first step is to determine the number of partitions and reserve 30% of it for TSS sensor. If I have ten partitions for the topic, this logic will reserve three partitions for TSS. The next question is, how do we get the number of partitions in the topic?
    We got a cluster object as an input, and the method partitionsForTopic will give us a list of all partitions. Then we take the size of the list. That's the number of partitions in the Topic. Then we set SP as 30% of the number of partitions. So, if I have ten partitions, SP should be set to 3.
  2. If we don't get a message Key, throw an exception. We need the Key because the Key tells us the sensor name. Without knowing the sensor name, we can't decide that the message should go to one of the three reserved partitions or it should go to the the other bucket of seven partitions.
  3. Next step is to determine the partition number. If the Key = TSS, then we hash the message value, divide it by 3 and take the mod as partition number. Using mod will make sure that we always get 0, 1 or 2.
  4. If the Key != TSS then we divide it by 7 and again take the mod. The mod will be somewhere between 0 and 6. So, I am adding 3 to shift it by 3.

That all is pure maths, and I hope you get that. You might be wondering that in step 3, I am hashing message value but in step 4, I am hashing message key. Let me explain. In step 3, every time Key will be TSS. So, hashing TSS will give me same number every time, and all the TSS messages will go to the same partition. But we want to distribute it in first three partitions. So, I am hashing the message value to get a different number every time.
In step 4, I should be hashing the message value again. However, instead of hashing message value again, I am hashing the Key. That's because I wanted to show you that why you should be careful if you want to use a Key for achieving a specific partitioning. I wanted to demonstrate that different Keys can land up in the same partition. We will observe that behaviour by executing this example. Checkout the video for the demonstration. The video shows you the compilation and execution process.
Thanks for watching Learning Journal. Keep learning and Keep growing.


You will also like:


Free virtual machines

Get upto six free VMs in Google Cloud and learn Bigdata.

Learning Journal

Functional Programming

What is Functional Programming and why it is important?

Learning Journal

Immutability in FP

The literal meaning of Immutability is unable to change? How to program?

Learning Journal

What is a closure?

A closure is a function. Like any other Scala function, a Closure may be pure or impure.

Learning Journal

Tail Recursion

Tail recursion is another concept associated with recursion. Learn with examples.

Learning Journal