Apache Kafka Foundation Course - Rebalance Listener


Welcome to Kafka tutorials at Learning Journal. In the previous session, we learned synchronous and asynchronous commit. But both methods were used to commit the latest offset. In this session, we will cover committing a particular offset and also learn about rebalance listener. Let me give you an example.

Kafka Rebalance scenario

Suppose, we have a situation where we got a lot of data using the poll method, and it is going to take some reasonable time to complete the processing for all the records. If you are taking a lot of time to process your records, you will have two types of risks.

  1. The first risk is the delay in next pool, because you are busy processing data from the last call. If you don't poll for a long, the group coordinator might assume that you are dead and trigger a rebalance activity. You don't want that to happen, Right? Because you were not dead, you were computing.
  2. The second risk is also related to rebalancing. The coordinator triggers a rebalance activity for some other reason while you are processing an extensive list of messages.

In both the cases, rebalance is triggered either because you didn't poll for a while or something else went wrong. Your current partitions will be taken away from you and reassigned to some other consumer. In such cases, you would want to commit whatever you have already processed before the ownership of the partition is taken away from you. And the new owner of the partition is supposed to start consuming it from the last committed offset.
How can you do this? Obviously, you will need to know at least two things.

  1. How to commit a particular offset?
    So, you can keep committing intermediate offsets instead of having committed the current offset in the end.
  2. How to know that a rebalance is triggered?
    So, you can commit whatever you already processed.

The synchronous and asynchronous commit that we learned earlier is committing the latest offset given by the last poll. We don't want that. I will show you an example that will maintain a current offset of processed records and commit the current offset when a rebalancing is triggered.
The answer to the second question is simple. Kafka API allows us to specify a ConsumerRebalanceListener class. This class offers two methods.

  1. onPartitionsRevoked
  2. onPartitionsAssigned

The Kafka will call the onPartitionsRevoked method just before it takes away your partitions. So, this is where you can commit your current offset.
The Kafka will call the onPartitionsAssigned method right after the rebalancing is complete and before you start consuming records from the new partitions. In this example, we don't have anything to do, but there are scenarios when you may want to use this method. We will discuss that situation in the next session.

Kafka Random Producer

Let's look at the complete example code.I have created a new producer for this example.

                                
    import java.util.*;
    import org.apache.kafka.clients.producer.*;
    public class RandomProducer {
                                    
    public static void main(String[] args) throws InterruptedException{
                                    
        String topicName = "RandomProducerTopic";
        String msg;
                                    
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092,localhost:9093");
        props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");         
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
                                    
        Producer<String, String> producer = new KafkaProducer<>(props);
        Random rg = new Random();
        Calendar dt = Calendar.getInstance();
        dt.set(2016,1,1);
        try{
            while(true){
                for (inti=0;i<100;i++){
                    msg = dt.get(Calendar.YEAR) + "-" + 
                          dt.get(Calendar.MONTH) + "-" + 
                          dt.get(Calendar.DATE) + "," + 
                          rg.nextInt(1000);
                    producer.send(new ProducerRecord<String, String>(topicName,0,"Key",msg)).get();
                    msg = dt.get(Calendar.YEAR) + "-" + 
                          dt.get(Calendar.MONTH) + "-" + 
                          dt.get(Calendar.DATE) + "," + 
                          rg.nextInt(1000);
                    producer.send(new ProducerRecord<String, String>(topicName,1,"Key",msg)).get();
                }
                dt.add(Calendar.DATE,1);
                System.out.println("Data Sent for " + 
                           dt.get(Calendar.YEAR) + "-" + 
                           dt.get(Calendar.MONTH) + "-" + 
                           dt.get(Calendar.DATE) );
            }
        }
        catch(Exception ex){
            System.out.println("Intrupted");
        }
        finally{
            producer.close();
            }
                                    
        }
    }                                               
                         

I am not going to explain the producer in detail because we have already learned almost everything about producers and you already understand the code. This producer will send data to the topic named RandomProducerTopic. I have already created this topic, and it has two partitions. If you look at those two send method calls, you will notice that we are sending the first message to partition zero and then next message to partition one. I am making these calls in an infinite loop, so the producer will keep sending data to both the partitions. This flow of continuous messages to both the partitions will help us simulate a rebalance and understand the behaviour of consumers.

Kafka Replacing Consumer

Now, let's look at the Consumer. You already know how to create a consumer. The first step is to setup all necessary properties. The second action is to instantiate a consumer object and subscribe to the topics. The final step is to poll the messages in a loop and process them. But in this example, we want to implement a rebalance listener.

                                
    import java.util.*;
    import org.apache.kafka.clients.consumer.*;
    import org.apache.kafka.common.*;
                                    
    public class RandomConsumer{
                                    
        public static void main(String[] args) throws Exception{
                                    
            String topicName = "RandomProducerTopic";
            KafkaConsumer<String, String> consumer = null;
                                    
            String groupName = "RG";
            Properties props = new Properties();
            props.put("bootstrap.servers", "localhost:9092,localhost:9093");
            props.put("group.id", groupName);
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("enable.auto.commit", "false");
                                    
            consumer = new KafkaConsumer<>(props);
            RebalanceListnerrebalanceListner = new RebalanceListner(consumer);
                                    
            consumer.subscribe(Arrays.asList(topicName),rebalanceListner);
            try{
                while (true){
                    ConsumerRecords<String, String> records = consumer.poll(100);
                    for (ConsumerRecord<String, String>record : records){
                        /*System.out.println("Topic:"+ record.topic() + 
                            " Partition:" + record.partition() + 
                            " Offset:" + record.offset() + " Value:"+ record.value());*/
                        // Do some processing and save it to Database
                        rebalanceListner.addOffset(record.topic(), record.partition(),record.offset());
                    }
                        //consumer.commitSync(rebalanceListner.getCurrentOffsets());
                }
            }catch(Exception ex){
                System.out.println("Exception.");
                ex.printStackTrace();
            }
            finally{
                    consumer.close();
            }
        }
                                    
    }                                               
                         

So, let's first understand the responsibilities of the listener. We want it to take care of two things.

  1. Maintain a list of offsets that are processed and ready to be committed.
  2. Commit the offsets when partitions are going away.

So, we want to maintain our personal list of offsets instead of relying on the current offsets that are managed by Kafka. This list will give us a complete freedom on what we want to commit. Right? We will look at the code for a listener in a minute, but let's see how we are using it in our consumer.
So, we instantiate a listener object. The Listener should have access to the consumer object for executing a commit, and that's why we provide a consumer reference to the listener. Then the listener object is given to the Kafka on a subscribe method call. By doing this, we make sure that Kafka will invoke the listener's onPartitionsRevoked method.
So far so good, we have setup a rebalance listener and Kafka will invoke the listener before taking our partitions, and we will conveniently commit before we lose them.
The rest is simple. The poll method will fetch some records. We process them one by one in the for-loop. After processing each message, we tell our listener that this particular offset is ready to be committed. The listener will not commit them immediately. It will just maintain a list of latest offsets per topic per partition that it should commit.
Once you finish processing all the messages and you are ready to make the next poll, you should commit the offsets and reset the list. That's it for the consumer.

Kafka Rebalance Listener example

Let's look at the code for the listener.

                                
    import java.util.*;
    import org.apache.kafka.clients.consumer.*;
    import org.apache.kafka.common.*;
                                    
    public class RebalanceListner implements ConsumerRebalanceListener {
        private KafkaConsumer consumer;
        private Map<TopicPartition, OffsetAndMetadata>currentOffsets = new HashMap();
                                    
        public RebalanceListner(KafkaConsumer con){
            this.consumer=con;
        }
                                    
        public void addOffset(String topic, int partition, long offset){
            currentOffsets.put(new TopicPartition(topic, partition),new OffsetAndMetadata(offset,"Commit"));
        }
                                    
        public Map<TopicPartition, OffsetAndMetadata>getCurrentOffsets(){
            return currentOffsets;
        }
                                    
        public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
            System.out.println("Following Partitions Assigned ....");
            for(TopicPartition partition: partitions)                
                System.out.println(partition.partition()+",");
        }
                                    
        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
            System.out.println("Following Partitions Revoked ....");
            for(TopicPartition partition: partitions)                
                System.out.println(partition.partition()+",");
                                    
                                    
            System.out.println("Following Partitions commited ...." );
            for(TopicPartitiontp: currentOffsets.keySet())
                System.out.println(tp.partition());
                                    
            consumer.commitSync(currentOffsets);
            currentOffsets.clear();
        }
    }                                               
                         

Your listener class must implement ConsumerRebalanceListener interface. Then I have a private variable to store a reference to the consumer. The constructor will set this variable. Then I have another variable of type Map. This variable is to maintain the offsets. Topic name and partition number is the key for the Map data structure.It will just keep the latest offset for the topic and partition. These offsets are ready to get committed. That's how the Map data structure works. Right? You add an item on the key, and it replaces the old one. I hope you already understand that.
We have three methods to handle the map of offsets. Those are straightforward, and I don't think they need any explanation. The onPartitionsAssigned method will print the list of partitions that are assigned. We don't have anything else to be done in that method.
The onPartitionsRevoked method will also print the list of partitions that are going away. Then, it will commit and reset the list. That's it. Now, it's time to execute the example and show you the effect of rebalancing.
The video demonstrates the process in detail. However, I have also listed the steps below.

  1. Create the Topic with two partitions.
  2. Start the producer in one terminal. The producer will make sure that the consumers will have some messages to read.
  3. Start a consumerin another terminal.
  4. You can observethe output messages. It shows that theconsumer has got both the partitions. Partition 0 and partition 1. This one is the only consumer, so it should read both the partitions.
  5. Now if you start another consumer.What should happen? Following activities will take place.
    1. A rebalance will be triggered. Because you have a new consumer and it should have some partition to read.
    2. Kafka will revoke all partitions from the first consumer because the list of consumers in a group is modified.
    3. Then, both consumers will get new partition assignment, and each of them should get one partition.
  6. So, let's start the second consumer in a new terminal.
    You should observe the output messages. You must notice that Kafka revoked both the partitions.But, before losing them, we committed both the partitions. The rebalance listener has taken care of the commit. You should also observe that both the consumers have got new partition assignment.
  7. If you kill one of those consumers, what will happen? You know it. Right? Another rebalance activity will be triggered, and Kafka will assign both the partitions to the surviving consumer. You can try that yourself.

That's it for this session. Thank you for watching learning journal. Keep watching and keep growing


You will also like:


Scala named arguments

Learn about named arguments and default values in Scala functions with examples.

Learning Journal

First Class Functions

Function is a first-class citizen in functional programming. What does it mean?

Learning Journal

Pure Function benefits

Pure Functions are used heavily in functional programming. Learn Why?

Learning Journal

Free virtual machines

Get upto six free VMs in Google Cloud and learn Bigdata.

Learning Journal

Referential Transparency

Referential Transparency is an easy method to verify the purity of a function.

Learning Journal