Apache Kafka Foundation Course - Offset Management


Welcome to Kafka tutorials at Learning Journal. In our previous session, we created our first consumer and covered some basics around poll method. In this Kafka tutorial, we will cover some internals of offset management in Apache Kafka. I will explain current offset and committed offset. I will also include an example to show synchronous and asynchronous commit.
Let me first define the offset. The offset is a position within a partition for the next message to be sent to a consumer. Kafka maintains two types of offsets.

  1. Current offset
  2. Committed offset

Current Offset

Let me first explain the current offset. When we call a poll method, Kafka sends some messages to us. Let us assume we have 100 records in the partition. The initial position of the current offset is 0. We made our first call and received 20 messages. Now Kafka will move the current offset to 20. When we make our next request, it will send some more messages starting from 20 and again move the current offset forward. The offset is a simple integer number that is used by Kafka to maintain the current position of a consumer. That's it. The current offset is a pointer to the last record that Kafka has already sent to a consumer in the most recent poll. So, the consumer doesn't get the same record twice because of the current offset.

Committed Offset

Now let us come to committed offset, this offset is the position that a consumer has confirmed about processing. What does that mean? After receiving a list of messages, we want to process it. Right? This processing may be just storing them into HDFS. Once we are sure that we have successfully processed the record, we may want to commit the offset. So, the committed offset is a pointer to the last record that a consumer has successfully processed. For example, the consumer received 20 records. It is processing them one by one, and after processing each record, it is committing the offset. We will see a code example of this in a while.
So, in summary.

  1. Current offset -> Sent Records -> This is used to avoid resending same records again to the same consumer.
  2. Committed offset -> Processed Records -> It is used to avoid resending same records to a new consumer in the event of partition rebalance.

The committed offset is critical in the case of partition rebalance.
In the event of rebalancing. When a new consumer is assigned a new partition, it should ask a question. Where to start? What is already processed by the previous owner? The answer to the question is the committed offset.


How to commit an offset?

Now, since we understand both the offsets maintained by Kafka, the next question is, How to commit an offset?
There are two ways to do it.

  1. Auto commit
  2. Manual-commit

The commit has a significant impact on the client application, so we need to choose an appropriate method based on our use case. Let us look at the auto-commit approach.

Auto Commit

Auto-commit is the easiest method. You can control this feature by setting two properties.

  1. enable.auto.commit
  2. auto.commit.interval.ms

The first property is by default true. So auto-commit is enabled by default. You can turn it off by setting enable.auto.commit to false. The second property defines the interval of auto-commit. The default value for this property is five seconds. So, in a default configuration, when you make a call to the poll method, it will check if it is time to commit. If you have passed five seconds since the previous call, the consumer will commit the last offset. So, Kafka will commit your current offset every five seconds.
The auto-commit is a convenient option, but it may cause second processing of records. Let us understand it with an example.
You have some messages in the partition, and you made your first poll request. You received 10 messages hence the consumer increases the current offset to 10. You take four seconds to process these ten messages and make a new call. Since you haven't passed five seconds, the consumer will not commit the offset. You received another set of records, and for some reason rebalance is triggered at this moment. First ten records are already processed, but nothing is committed yet. Right? The rebalance is triggered. So, the partition goes to a different consumer. Since we don't have a committed offset, the new owner of partition should start reading from the beginning and process first ten records again.
You might be thinking that let's reduce the commit frequency to four seconds. You can lower the incidence of commit by setting the auto-commit interval to a lower value, but you can't guarantee to eliminate repeat processing.


Manual Commit

The solution to this particular problem is a manual commit. So, we can configure the auto-commit off and manually commit after processing the records. There are two approaches to manual commit.

  1. Commit Sync
  2. Commit async

I hope you already understand the difference between synchronous and asynchronous. Synchronous commit is a straightforward and reliable method, but it is a blocking method. It will block your call for completing a commit operation, and it will also retry if there are recoverable errors.
Asynchronous commit will send the request and continue. The drawback is that commitAsync will not retry. But there is a valid reason for such behaviour. Let's understand it with an example.
Let us assume that you are trying to commit an offset as seventy-five. It failed for some recoverable reason, and you want to retry it after few seconds. Since this was an asynchronous call, so without knowing that your previous commit is waiting, you initiated another commit. This time it is to commit-100 Your commit 100 is successful while commit-75 waits for a retry. What do you want to do? Obviously, you don't want to commit 75 after commit 100. That may cause problems. So, they designed asynchronous commit to not to retry. However, this behaviour is not an issue because you know that if one commit fails for a recoverable reason, the next higher order commit will succeed.
Now we understand automatic and manual commits. It’s time to write some code and see how to implement it.

Manual commit example

In this example, we will use asynchronous commit. But in the case of an error, we want to make sure that we commit before we close and exit. So, we will use synchronous commit before we close our consumer.

                                
    import java.util.*;
    import java.io.*;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
                                    
    public class ManualConsumer{
                                    
        public static void main(String[] args) throws Exception{
                                    
            String topicName = "SupplierTopic";
            String groupName = "SupplierTopicGroup";
                                        
            Properties props = new Properties();
            props.put("bootstrap.servers", "localhost:9092,localhost:9093");
            props.put("group.id", groupName);
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "SupplierDeserializer");
            props.put("enable.auto.commit", "false");
                                    
            KafkaConsumer<String, Supplier> consumer = null;
                                    
            try {
                consumer = new KafkaConsumer<>(props);
                consumer.subscribe(Arrays.asList(topicName));
                                    
                while (true){
                    ConsumerRecords<String, Supplier> records = consumer.poll(100);
                    for (ConsumerRecord<String, Supplier>record : records){
                        System.out.println("Supplier id= " + String.valueOf(record.value().getID()) + 
                            " Supplier  Name = " + record.value().getName() + 
                            " Supplier Start Date = " + record.value().getStartDate().toString());
                    }
                    consumer.commitAsync();
                }
            }catch(Exception ex){
                ex.printStackTrace();
            }finally{
                consumer.commitSync();
                consumer.close();
            }
        }
    }                                           
                            

The code is straightforward, and we have already seen it earlier. There is nothing new except two new lines. The first one is asynchronous commit and the second one is synchronous commit. In this example, I am manually committing my current offset before pulling the next set of records.
You may be wondering that does it solve my problem completely.
I mean, I got 100 records in the first poll. After processing all 100 records, I am committing my current offset.

  1. What if a rebalance occurs after processing 50 records?
  2. What if an exception occurs after processing 50 records?

I leave these two questions for you to think and post me an answer as a comment or start a discussion on these two issues.
Let me give you a hint.
You can fix both above problems if you know how to commit a particular offset instead of committing current offset. Kafka offset management and handling rebalance gracefully is the most critical part of implementing appropriate Kafka consumers.
In the next session, we will see a more involved example and learn how to commit an appropriate offset and handle a rebalance more gracefully.
That’ it for this session. Thank you for watching learning journal. Keep learning and keep growing.



You will also like:


Kafka Core Concepts

Learn Apache Kafka core concepts and build a solid foundation on Apache Kafka.

Learning Journal

Scala placeholder syntax

What is a scala placeholder syntax and why do we need it? Learn from experts.

Learning Journal

Pattern Matching

Scala takes the credit to bring pattern matching to the center.

Learning Journal

Pure Function benefits

Pure Functions are used heavily in functional programming. Learn Why?

Learning Journal

Immutability in FP

The literal meaning of Immutability is unable to change? How to program?

Learning Journal