Apache Kafka Foundation Course - Callback and Acks


Welcome to Apache Kafka tutorial at Learning Journal. We already created a simple producer and discussed how a message flows from producer to broker. In this session, I will discuss different approaches to implement a Kafka Producer. We will also learn about acknowledgment and call back method to handle responses from brokers. So, let's get started.
There are three approaches to send a message to Kafka.

Fire-and-forget Producer

Send and forget is the simplest approach. In this method, we send a message to the broker and don’t care if it was successfully received or not. The example that we created earlier followed this approach.
You might be wondering that is this a right approach? Where to use that method?
Well, Kafka is a distributed system. It comes with inbuilt fault tolerance feature. That makes Kafka a highly available system. So most of the time, your message will reach to the broker. We also know that producer will automatically retry in case of recoverable error. So, the probability of losing your messages is thin.
There are many use cases where you are dealing with huge volumes and losing a small portion of records is not critical. For example, if you are counting hits on a video, or collecting a twitter feed for sentiment analysis. In such use cases, even if you lose 2-3% of your tweets, it may not be a problem.
But, it is important to understand that in fire and forget approach, you may lose some messages. So, don't use this method when you can't afford to lose your messages.

Synchronous Producer

In this approach, we send a message and wait until we get a response. In the case of a success, we get a RecordMetadata object, and in the event of failure, we get an exception. Most of the time, we don't care about the success and the RecordMetadata that we receive. We only care about exception because we want to log errors for later analysis and appropriate action. You can adopt this method if your messages are critical and you can't afford to lose anything.
But it is important to notice that synchronous approach will slow you down. It will limit your throughput because you are waiting for every message to get acknowledged. You are sending one message and waiting for success, then you send your next message, and again wait for success. Each message will take some time to be delivered over the network. So, after every message, you will be waiting for a network delay, and the most interesting thing is that, you may not be doing anything in case of success. You care only failures, if it fails, you may want to take some action.
We have already seen an example of fire and forget approach. Let's look at the example of synchronous send.

                                
    import java.util.*;
    import org.apache.kafka.clients.producer.*;
    public class SynchronousProducer {
                                    
    public static void main(String[] args) throws Exception{
                                    
        String topicName = "SynchronousProducerTopic";
        String key = "Key-1";
        String value = "Value-1";
                                    
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092,localhost:9093");                                        
        props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 

        Producer<String, String> producer = new KafkaProducer<>(props);                                    
        ProducerRecord<String, String> record = new ProducerRecord<>(topicName,key,value);
                                    
        try{
            RecordMetadata metadata = producer.send(record).get();
            System.out.println("Message is sent to Partition no " + metadata.partition() + " and offset " + metadata.offset());
            System.out.println("SynchronousProducer Completed with success.");
            }catch (Exception e) {
                e.printStackTrace();
                System.out.println("SynchronousProducer failed with an exception");
            }finally{
                producer.close();
            }
        }
    }                                           
                            

The code is almost same as the one we looked earlier. We are following the same three steps that I explained earlier.

  1. Create producer properties and instantiate producer object.
  2. Create producer record.
  3. And finally, hand over the message to the producer by making a call to send method.

But this time, we want to get a response and handle an exception, so we wrap it in a try-catch and finally blocks. If you look at the try block, we are still calling producer.send method.
This send method returns a Java Future, and we call a get method on the Future object. The get method will wait till we get success or an exception.
In the case of success, we get a record metadata object. I am printing partition number and offset number from the RecordMetadata. But in your real application, you may not be doing anything for success.
In the case of an exception, we are just printing out the failure text and stack trace, but in your production code, you may want to log the message and exception details for later analysis. We have a final block as well, and we are closing the producer object to free up resources.
The fire and forget approach was on one extreme, and the synchronous approach is at another extreme. I mean, in one approach, you don't care at all, and in another method, you wait for every single message. So, there is a third approach which takes the middle path.

Asynchronous Producer

In this method, we send a message and provide a call back function to receive acknowledgment. We don't wait for success and failure. The producer will callback our function with RecordMetadata and an exception object. So, if you just care about exceptions, you simply look at the exception, if it is null, don't do anything. If the exception is not null, then you know it failed, so you can record the message details for later analysis.
Let's look at an example of the asynchronous approach.

                                
    import java.util.*;
    import org.apache.kafka.clients.producer.*;
                                    
    public class AsynchronousProducer {
                                    
    public static void main(String[] args) throws Exception{
        String topicName = "AsynchronousProducerTopic";
        String key = "Key1";
        String value = "Value-1";
                                    
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092,localhost:9093");
        props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
                                    
        Producer<String, String> producer = new KafkaProducer<>(props);                                    
        ProducerRecord<String, String> record = new ProducerRecord<>(topicName,key,value);
                                    
        producer.send(record, new MyProducerCallback());
        System.out.println("AsynchronousProducer call completed");
        producer.close();
                                    
        }                                    
    }
                                    
    class MyProducerCallback implements Callback{
                                    
        @Override
        public  voidonCompletion(RecordMetadatarecordMetadata, Exception e) {
            if (e != null)
                System.out.println("AsynchronousProducer failed with an exception");
            else
                System.out.println("AsynchronousProducer call Success:");
        }
    }                                               
                            

The code for asynchronous producer is again same as send and forget approach. The only difference is that we have a second parameter for the send method. A second parameter is a callback object.
The next part of the code is the callback class. So, if you want to create a callback class, you must implement the Callback interface. Then you have just one method to override. You need to override the onCompletion method. The broker will invoke this method with an acknowledgment or an exception. The rest of the code is simple, If the exception object is not null, we have a failure else we have success.
In this approach, you keep sending messages as fast as you can without waiting for responses, and handle failures later as they come using a callback function.

In Flight Messages

The asynchronous method appears to provide you a throughput that is as good as fire and forget approach. But there is a catch. You have a limit of in-flight messages. This limit is controlled by a configuration parameter max.in.flight.requests.per.connection. This parameter controls that how many messages you can send to the server without receiving a response. The default value is 5. You can increase this value, but there are other considerations. We will cover producer configurations in a separate session. Till then, just understand that asynchronous send gives you a better throughput compared to synchronous, but the max.in.flight.requests.per.connection limits it.
That's it for this session. In next session, we will cover some more details of Kafka producer APIs, See you again.
Thanks for watching Learning Journal.
Keep learning and Keep growing.


You will also like:


Kafka Core Concepts

Learn Apache Kafka core concepts and build a solid foundation on Apache Kafka.

Learning Journal

Apache Spark Introduction

What is Apache Spark and how it works? Learn Spark Architecture.

Learning Journal

Pattern Matching

Scala takes the credit to bring pattern matching to the center.

Learning Journal

Pure Function benefits

Pure Functions are used heavily in functional programming. Learn Why?

Learning Journal

Immutability in FP

The literal meaning of Immutability is unable to change? How to program?

Learning Journal