Apache Kafka Foundation Course - Creating Consumers


Hello and welcome to Kafka tutorials at Learning Journal. In our previous session, we covered Kafka consumer groups. In this session, we will look at the code for our fist Kafka consumer and try to understand some details around it. We already created a Kafka consumer in an earlier tutorial, so let us take the same example for this session.

                                
    importjava.util.*;
    importorg.apache.kafka.clients.consumer.KafkaConsumer;
    importorg.apache.kafka.clients.consumer.ConsumerRecords;
    importorg.apache.kafka.clients.consumer.ConsumerRecord;
                                    
    public classSupplierConsumer{
                                    
        public static void main(String[]args) throws Exception{
                                
            StringtopicName="SupplierTopic";
            StringgroupName="SupplierTopicGroup";
                            
            Properties props =newProperties();
            props.put("bootstrap.servers","localhost:9092,localhost:9093");
            props.put("group.id",groupName);
            props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer","SupplierDeserializer");
                            
            KafkaConsumer consumer =newKafkaConsumer<>(props);
            consumer.subscribe(Arrays.asList(topicName));
                            
            while(true){
                ConsumerRecords records =consumer.poll(100);
                for(ConsumerRecordrecord: records){
                    System.out.println("Supplier id= " + 
                                String.valueOf(record.value().getID()) + 
                                " Supplier  Name = " + record.value().getName() + 
                                " Supplier Start Date = " + 
                                record.value().getStartDate().toString());
                }
            }                                
        }
    }                                                   
                            

Set Properties

Creating a Kafka consumer is very similar to creating a producer. We create a properties object and set three mandatory properties.

  1. bootstrap.servers
  2. key.deserializer
  3. value.deserializer

We already know that bootstrap.servers is a list of Kafka brokers and we need this information to connect to Kafka cluster.
In a producer, we used key and value serializers, but in a consumer, we need a deserializer. If you are sending string messages, you can use string deserializer. In this example, we are using a string deserializer for the key and a custom deserializer for the value.
The next property is group.id. In the previous session, we learned about consumer groups. You can specify your consumer group name as a value of this property. You might be wondering that don't we need to create a group first and then join the group? No, you don't need to worry about all those things as creating a group and participating in a group, who is the group coordinator and who is the group leader. All that is taken care by the API. For us, it is as simple as specifying a group name. The group name is a string, so you can choose any string name for your group.
The group id property is not mandatory, so you can skip it. But you should know that when you are not part of any group that means you are starting an independent consumer and your code will read all the data for the topic. Since you are not part of any group, there will be no sharing of work and your consumer need to read all data and process all of it alone.

Subscribe to topic

Great, so once we set up all consumer properties, the next step is to create a Kafka Consumer object and subscribe to one or more topics.
The subscribe method takes a list of topics so you can subscribe to multiple topics at a time. If you want to subscribe to many topics, you can also use regular expression or wildcard in this method. Subscribing to a topic means you are informing Kafka broker that you want to read data for these topics.
After subscribing, you want to fetch some records and process them. That's what the while loop is all about. The poll method will return some messages. You process them and again fetch for some more. The parameter to the pool method is a timeout. If there is no data to poll, you don't want to be hanging there, so this value specifies how quickly you want the pool method to return with or without data.

The Poll Method

The poll function is pretty powerful and takes care of a lot of things. It handles all the coordination, partition rebalances, and heart beat for group coordinator and provides you a clean and straightforward API. When you call to poll for the first time from a consumer, it finds a group coordinator, joins the group, receives partition assignment and fetches some records from those partitions. Every time you call to poll, it will send a heartbeat to group coordinator. So, it is necessary that whatever you do in a consumer, it should be quick and efficient. If you don't poll for a while, the coordinator may assume that the consumer is dead and trigger a partition rebalance. That's all about the Poll method.


The infinite loop

You might be wondering about the infinite loop. Consumers are usually long running processes, so having an infinite loop is perfectly fine. For some batch processing systems, you may not want this kind of infinite loop. Some requirements may need a consumer to wake up every few hours, process all the records collected during that interval and sleep again for few hours. Modelling such requirement should be simple. You can use a scheduler to start your consumer,lets say every six hours, and you can exit the while loop and safely quit after processing all the records collected during the period. You should make sure to make a call to close method to clean up resources and let the coordinator know that you are leaving the group.
We already discussed in the earlier session that exiting a consumer will initiate a partition rebalance activity. That means the partition that you were processing will go to some other consumer. There are certain clean-up activities to be performed before you exit. Those actions are necessary to make sure that there is no second processing and we don't end up with duplicate records. We will cover them in upcoming sessions.

Configuration properties

Before closing this video, I just want to highlight one more thing.
So, in all our examples, we have been creating a property object, but ideally, you should load it from a properties file. Keeping properties in a separate file is more flexible, and you will have all your configurations outside the code. So, let us change the code and load property values from an external file.
Java properties file is a simple text file that contains a key value pair. The first thing that I need to do is to create a file and move all the properties there. The content of the properties file for my example looks like this.

                                
    bootstrap.servers=localhost:9092,localhost:9093
    key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
    value.deserializer=SupplierDeserializer
    group.id=SupplierTopicGroup                                             
                            

It is simple, right. We have a key and value. One pair in each line.
So, we moved all our properties in this file.
Now it’s time to modify the code. I have changed the code, and it looks like this.


                                
    import java.util.*;
    import java.io.*;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
                                
    public class NewSupplierConsumer{	
                                
        public static void main(String[] args) throws Exception{	
                                
            String topicName = "SupplierTopic";
            String groupName = "SupplierTopicGroup";
            Properties props = new Properties();
            //props.put("bootstrap.servers", "localhost:9092,localhost:9093");
            //props.put("group.id", groupName);
            //props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            //props.put("value.deserializer", "SupplierDeserializer");
                                
            InputStream input = null;
            KafkaConsumer<String, Supplier> consumer = null;	
                                
            try {
                input = new FileInputStream("SupplierConsumer.properties");
                props.load(input);
                consumer = new KafkaConsumer<>(props);
                consumer.subscribe(Arrays.asList(topicName));                                    
                                
                while (true){
                    ConsumerRecords<String, Supplier> records = consumer.poll(100);
                    for (ConsumerRecord<String, Supplier>record : records){
                        System.out.println("Supplier id= " + String.valueOf(record.value().getID()) + 
                                " Supplier  Name = " + record.value().getName() + 
                                " Supplier Start Date = " + record.value().getStartDate().toString());
                    }
                }
            }catch(Exception ex){
                ex.printStackTrace();
            }finally{
                input.close();
                consumer.close();
            }
        }
    }                                               
                            

I changed the name to NewSupplierConsumer. You can see that I commented out some lines. Those lines are not required as we are loading them from an external file.
We open the properties file and the load all the key-value pairs into an object. Everything else is same.
That’s it for this session. Thank you for watching learning journal. Keep learning and keep growing.


You will also like:


Functional Programming

What is Functional Programming and why it is important?

Learning Journal

Scala Variable length arguments

How do you create a variable length argument in Scala? Why would you need it?

Learning Journal

Pure Function benefits

Pure Functions are used heavily in functional programming. Learn Why?

Learning Journal

Scala named arguments

Learn about named arguments and default values in Scala functions with examples.

Learning Journal

Anonymous Functions

Learn Scala Anonymous Functions with suitable examples.

Learning Journal