Welcome to Apache Kafka tutorial at Learning Journal. In this session, we are going to explore Kafka partitioner. We will
try to understand why default partitioner is not enough and when you might need a custom partitioner.
We will also look at a use case and create code for custom partitioner. I already explained that
Kafka partitioner is responsible for deciding partition number for each message. We also discussed
the behaviour of the default partitioner. Let me quickly recap the default partitioner.
The default partitioner follows these rules.
- If a producer specifies a partition number in the message record, use it.
- If the message doesn’t hard code a partition number, but it provides a key, choose a partition based on a hash value of the key.
- If no partition number or key is present, pick a partition in a round-robin fashion.
So, you can use default partitioner in three scenarios.
- You don't care about which partition your data is landing, but you want the partitioner to distribute your data evenly, you will use the third rule of default partitioner.
- If you already know which partition do you want to send the data, you will hard code it and use the first rule of default partitioner.
- If you want your data to be distributed based on your Key, you will specify a key in your messages.
Partition Keys
But there is a catch with the key, and that is because the way hashing works. The hashing guarantees that a key will always
give you the same hash number. But it doesn't ensure that two different keys will never give you
the same hash number.
So, for example, if you have three tables, and you want to send all rows from these three tables
to three different partitions. I mean, data from table A goes to partition 0 and data from table
B goes to partition 1 and so on. One of the obvious thought is to send table name as a key. But that
will be incorrect because Table A and Table B can give the same number after hashing. I will show
you this happening in the example.
Since hashing doesn’t guarantees a unique number, it is better that you manage the translation
of the table name to a partition number in your producer and hard code your partition number with
the message. Another alternative is to implement a custom partitioner and use your partitioner instead
of using the default one. I will show you an example for custom partitioner as well.
That's the first precaution with the key.
There is another catch with the key. The partition number is the mod of the hash value of the
key and the total number of partitions on the topic. So, if you are increasing the number of partitions
for your topic, the default partitioner will start returning different numbers. That may be a problem
if you are relying on your key for achieving a spaecifc partitioning.
With these two problems, I don't find a Key to be of good use in making desired custom partitioning.
If you want a specific type of partitioning, the only option is to create your own algorithm and
implement it in a custom partitioner.
Kafka Partitioner Example
Let's create an example use-case and implement a custom partitioner.
Assume, we are collecting data from a bunch of sensors. All the sensors are sending data to a
single topic. I planned ten partitions for the topic. But I want three partitions dedicated for a
specific sensor named TSS and remaining seven partitions for rest of the sensors. How would you achieve
this?
You can solve this requirement, and any other type of partitioning need by implementing a custom
partitioner.
Let me show you the solution code for this problem. The first thing is a producer. We created
several producers earlier. I created a new one for this example. Let’s look at the producer code.
The producer code is like other producers that I have already explained. The only difference is in the configuration properties.
We are setting two new properties. The first one is
partitioner.class property. Since we are not using default partitioner, we set this property
to the class name of our custom partitioner. I will show you the code for this custom partitioner
class in a minute.
The next property
speed.sensor.name is not a Kafka configuration. It is a custom property. We are using it
to supply the name of the sensor that requires special treatment. I don't want to hardcode the string
TSS in my custom partitioner, and custom configuration is the method of passing values to your partitioner.
Rest of the code is straightforward. We send some messages for various sensors. Then we send some
messages for TSS sensor.
Custom Kafka Partitioner
We need to create our class by implementing Partitioner Interface. Your custom partitioner class must implement three methods from the interface.
- Configure
- Partition
- Close
Let’s look at the code.
The configure and the close methods are like initialization and clean-up methods. They will be called once at the time of
instantiating your producer. You can initialise things in
Configure method and clean up things in
close method. In our example, we don't have anything to clean up. However, we have something
to configure.
We want to find the sensor name that requires three partitions. My producer is sending that name
as a custom configuration. The configure method is extracting the configuration value and setting
it into a private variable. I will use that variable later in the code.
The
partition method is the place where all the action happens. The producer will call this
method for each message and provide all the details with every call. The input to the method is the
topic name, key, value and the cluster details. With all these input parameters, we have everything
that is required to calculate a partition number. All that we need to do is to return an integer
as a partition number. This method is the place where we implement our algorithm for partitioning.
Let’s try to understand the algorithm that I have implemented. I am applying my algorithm in
four simple steps.
- The first step is to determine the number of partitions and reserve 30% of it for TSS sensor.
If I have ten partitions for the topic, this logic will reserve three partitions for TSS. The
next question is, how do we get the number of partitions in the topic?
We got a cluster object as an input, and the method partitionsForTopic will give us a list of all partitions. Then we take the size of the list. That's the number of partitions in the Topic. Then we set SP as 30% of the number of partitions. So, if I have ten partitions, SP should be set to 3. - If we don't get a message Key, throw an exception. We need the Key because the Key tells us the sensor name. Without knowing the sensor name, we can't decide that the message should go to one of the three reserved partitions or it should go to the the other bucket of seven partitions.
- Next step is to determine the partition number. If the Key = TSS, then we hash the message value, divide it by 3 and take the mod as partition number. Using mod will make sure that we always get 0, 1 or 2.
- If the Key != TSS then we divide it by 7 and again take the mod. The mod will be somewhere between 0 and 6. So, I am adding 3 to shift it by 3.
That all is pure maths, and I hope you get that. You might be wondering that in step 3, I am hashing message value but in
step 4, I am hashing message key. Let me explain. In step 3, every time Key will be TSS. So, hashing
TSS will give me same number every time, and all the TSS messages will go to the same partition.
But we want to distribute it in first three partitions. So, I am hashing the message value to get
a different number every time.
In step 4, I should be hashing the message value again. However, instead of hashing message value
again, I am hashing the Key. That's because I wanted to show you that why you should be careful if
you want to use a Key for achieving a specific partitioning. I wanted to demonstrate that different
Keys can land up in the same partition. We will observe that behaviour by executing this example.
Checkout the video for the demonstration. The video shows you the compilation and execution process.
Thanks for watching Learning Journal. Keep learning and Keep growing.