Apache Kafka Foundation Course - Exactly Once Processing


Welcome to Kafka tutorials at Learning Journal. We have learned quite a lot about Kafka subscribers. In this session, we will study how to take full control of your Kafka subscriber. So, let's start.
In our earlier sessions, we learned about two things that are managed by Kafka.

  1. Automatic group management & Partition assignment.
  2. Offset and consumer positions control.

Let me quickly recap both things.We already learned that to become a member of a group you just need to provide a group name and subscribe to the topics. That's it. And Kafka will make you a member of a consumer group. But have you thought about it? Why do you want to become a member of a Group? What are the benefits of creating a Consumer Group? You already know the answer. Right? It gives following advantages.

  1. Allows you to parallel process a topic.
  2. Automatically manages partition assignment.
  3. Detects entry/exit/failure of a consumer and perform partition rebalancing activity.

Correct? So, most of the time, you want all these features. But there is a downside of automatic partition assignment. You don't have control over the partition assignment. I mean, you can't decide which partitions will be assigned to which consumer. Kafka takes this decision. So, if you have built a solution with a custom partitioning strategy and you want to make use of the custom partitioning, this automatic partition assignment will create problems for you. To understand it in a better way, I must take you back to my custom partitioning example.

Problems with Kafka Consumer Groups

You remember, we used a custom partitioner to make sure that all the data for TSS sensor will land into first three partitions. You have that system in place for collecting data. Now you have a real-time statistics calculation requirement for TSS sensor. So, you need to create a consumer that reads only TSS data and calculates required statistics. Making sense? Pause for a minute and think about it. Can you solve this problem?
One way is that you read all the data and discard everything else except TSS and then calculate your stats. But you don't want to do unnecessary work. After all, it's a real-time system and every millisecond counts. You might be thinking that you should have created a separate topic for TSS. Then, it was easy. Right? You can subscribe to TSS topic and rest is simple. But this system was designed in this way for a reason, and now you don't have a luxury of redesigning.
You can find a solution easily if you have the flexibility to create a consumer and assign these three partitions to yourself. That's it. Your Consumer will read these three partitions and rest will be simple.
I think this example makes a point that in certain use cases, you may need the flexibility to take control of the partition assignment instead of relying on Kafka to do a random assignment. I will show you a relevant code example in a minute.
But let me come back to offset management and consumer positioning as well. So, we already know that Kafka maintains a current offset and a committed offset for each consumer. We have also seen an example for rebalance listener where we take partial control over the offset. I am not sure if you already noticed that we still have a problem in that implementation.
Let me take you back to that example and explain the problem.


                                
    //Code from random consumer example
    while (true){
    ConsumerRecords<String, String> records = consumer.poll(100);
        for (ConsumerRecord<String, String>record : records){
            /*System.out.println("Topic:"+ record.topic() + 
                " Partition:" + record.partition() + 
                " Offset:" + record.offset() + 
                " Value:"+ record.value());*/
            //Step - 1
            // Do some processing and save it to Database
            rebalanceListner.addOffset(record.topic(), record.partition(),record.offset());
        }
            //Step - 2
            //consumer.commitSync(rebalanceListner.getCurrentOffsets());
    }                                               
                            

So, this is the consumer that we created. Just look at the for-loop. First step is to process the record and save the result in the database. Then in the next step, we keep the offset because we finished processing that message, and we will commit the offset later. Right? But thosetwo steps are two operations, and they are not atomic. It is possible that we save the record in the database and the consumer crashes before committing the offset.
So, the rebalance listener will allow us to perform clean up and commit before the partition goes away, but it can't help us in synching the processed record and committed offset. There is no other way to achieve it except making SaveToDatabase and commit offset as a single atomic transaction.
Great, now we understand that we may have use cases where we need to take full control of two things.

  1. Partition assignment - That means, we don't want Kafka to assign partitions to different consumers. We want to take the control and assign desired partitions to ourselves.
  2. Committed offset management - That means, we don't want Kafka to store the offset. We want to maintain the committed offset somewhere outside of Kafka. This approach allows us to develop transactional systems.

So, how to do that? Let's create an example.


Exactly once processing Example

So, I will create a consumer that will assign three TSS partitions to itself. Then, it will start reading all messages from those three partitions and insert each message into a database table. It will not commit the offset back to Kafka. Instead, it will update the current offset into another database table. The insert and update statements will be part of a single transaction, so either both will complete, or both will fail.
I will use MySQL as a database for this demo. If you don't have MySQL installed and configured, you can follow instructions listed below.

  1. Install MySQL server using below command. The Yum command will download and install MySQL server.
                                            
        yum install mysql-server                                               
                                        
  2. Once Installation is over, start the MySQL server.
                                            
        service mysqld start                                               
                                        
  3. MySQL service is up, let's make it secure. The below command will help you setup password for the MySQL root account.
                                            
        mysql_secure_installation                                              
                                        
  4. Now we can start MySQL command line tool.
  5. mysql -u root -p
  6. Now we need to do following things.
    1. Create a database.
    2. Create a table to insert TSS sensor data.
    3. Create a table to update TSS offsets.
    4. Insert 3 rows for each TSS partition for initial offsets.
  7. I created a simple SQL script to do all of this. The script is simple, and I assume it doesn't require any explanation. Let's execute the script.
                                            
        source tss.sql                                             
                                        
  8. At this stage, you should have the table for sensor data but no data inside the table. The Consumer will insert the data, so we wanted an empty table.
  9. You should have initial offsets table with initial values set to zero. So, the consumer should start reading from the beginning and keep updating as it reads.

The code for the script file is shown below.


                                
    create database test;
    use test;
    create table tss_data(skey varchar(50), svalue varchar(50));
    create table tss_offsets(topic_name varchar(50),partition int, offset int);
    insert into tss_offsets values('SensorTopic1',0,0);
    insert into tss_offsets values('SensorTopic1',1,0);
    insert into tss_offsets values('SensorTopic1',2,0);                                             
                            

Now, it's time to look at the Consumer code.

Exactly once processing – Kafka Consumer

Let’s start with the main method. As always, we set up properties and then instantiate a Kafka consumer object. Other than mandatory properties, we have an additional property enable.auto.commit. This property will disable the auto-commit feature, and the consumer will not commit offsets automatically back to Kafka broker.

                                
    import java.util.*;
    import org.apache.kafka.clients.consumer.*;
    import org.apache.kafka.common.*;
    import java.sql.*;
                                    
    public class SensorConsumer{
                                    
        public static void main(String[] args) throws Exception{
                String topicName = "SensorTopic";
                KafkaConsumer<String, String> consumer = null;
                intrCount;
                                    
                Properties props = new Properties();
                props.put("bootstrap.servers", "localhost:9092,localhost:9093");
                props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
                props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
                props.put("enable.auto.commit", "false");
                                    
                consumer = new KafkaConsumer<>(props);
                TopicPartition p0 = new TopicPartition(topicName, 0);
                TopicPartition p1 = new TopicPartition(topicName, 1);
                TopicPartition p2 = new TopicPartition(topicName, 2);
                                    
                consumer.assign(Arrays.asList(p0,p1,p2));
                System.out.println("Current position p0=" + consumer.position(p0)
                                + " p1=" + consumer.position(p1)
                                + " p2=" + consumer.position(p2));
                                    
                consumer.seek(p0, getOffsetFromDB(p0));
                consumer.seek(p1, getOffsetFromDB(p1));
                consumer.seek(p2, getOffsetFromDB(p2));
                System.out.println("New positions po=" + consumer.position(p0)
                                + " p1=" + consumer.position(p1)
                                + " p2=" + consumer.position(p2));
                                    
                System.out.println("Start Fetching Now");
                try{
                    do{
                        ConsumerRecords<String, String> records = consumer.poll(1000);
                        System.out.println("Record polled " + records.count());
                        rCount = records.count();
                        for (ConsumerRecord<String, String>record : records){
                            saveAndCommit(consumer,record);
                      }
                    }while (rCount>0);
                }catch(Exception ex){
                    System.out.println("Exception in main.");
                }
                finally{
                        consumer.close();
                }
        }
                                    
        private static long getOffsetFromDB(TopicPartition p){
            long offset = 0;
            try{
                Class.forName("com.mysql.jdbc.Driver");
                Connection con=DriverManager.getConnection("jdbc:mysql://localhost:3306/test","root","pandey");
                                    
                String sql = "select offset from tss_offsets where topic_name='" 
                            + p.topic() + "' and partition=" + p.partition();
                Statement stmt=con.createStatement();
                ResultSetrs = stmt.executeQuery(sql);
                if (rs.next())
                    offset = rs.getInt("offset");
                stmt.close();
                con.close();
            }catch(Exception e){
                System.out.println("Exception in getOffsetFromDB");
            }
            return offset;
        }
                                    
        private static void saveAndCommit(KafkaConsumer<String, String> c, ConsumerRecord<String, String> r){
            System.out.println("Topic=" + r.topic() + " Partition=" + r.partition() + " Offset=" + r.offset() 
                        + " Key=" + r.key() + " Value=" + r.value());
            try{
                Class.forName("com.mysql.jdbc.Driver");
                Connection con=DriverManager.getConnection("jdbc:mysql://localhost:3306/test","root","pandey");
                con.setAutoCommit(false);
                                        
                String insertSQL = "insert into tss_data values(?,?)";
                PreparedStatementpsInsert = con.prepareStatement(insertSQL);
                psInsert.setString(1,r.key());
                psInsert.setString(2,r.value());
                                        
                String updateSQL = "update tss_offsets set offset=? where topic_name=? and partition=?";
                PreparedStatementpsUpdate = con.prepareStatement(updateSQL);
                psUpdate.setLong(1,r.offset()+1);
                psUpdate.setString(2,r.topic());
                psUpdate.setInt(3,r.partition());
                                        
                psInsert.executeUpdate();
                psUpdate.executeUpdate();
                con.commit();
                con.close();
            }catch(Exception e){
                System.out.println("Exception in saveAndCommit");
            }                                    
        }
    }                                               
                            

If we want automatic group management, we should subscribe to the topic and Kafka will automatically assign partitions to the consumer. But in this example, we don't want Kafka to assign partitions to us. So, we create three partition objects, one each for the three partitions that we want to read, then we self-assign these three partitions.
Good, so the first part is taken care, we have three partitions, and we are ready to read data from those partitions. The next thing is to adjust the offset. The next three lines of code will set the offset position for three partitions. The getOffsetFromDB will read offset positions from MySQL database. The current values in the database are zero, so it will return zero. That's it. We are all set. We assigned partitions, and we adjusted offset to appropriate positions. All we need to do is read and process.
The loop will keep polling Kafka till we are getting records. The saveAndCommit method will save each record in the database and alsoupdate offsets.
The code for saveAndCommit is plain JDBC code. We set auto commit false, insert data, update offset, and finally, execute commit. That makes the method an atomic transaction and we can achieve exactly-once processing scenario.
Kafka efficiently provides us at least once processing. But to achieve exactly-once processing, we need to do some extra work as we did in this example.
You can test the code. The video demonstrates the execution and test scenario. Checkout the video for the demo.
In this session, we learned to take control of partition assignment and offset management from Apache Kafka. Automatic group and offset management are convenient options, but by taking control in our hand, we have all the power to support complex requirements.
That's it for this session. See you again in the next session. Thank you for watching learning journal. Keep learning and keep growing.



You will also like:


Free virtual machines

Get upto six free VMs in Google Cloud and learn Bigdata.

Learning Journal

Functional Programming

What is Functional Programming and why it is important?

Learning Journal

Immutability in FP

The literal meaning of Immutability is unable to change? How to program?

Learning Journal

What is a closure?

A closure is a function. Like any other Scala function, a Closure may be pure or impure.

Learning Journal

Tail Recursion

Tail recursion is another concept associated with recursion. Learn with examples.

Learning Journal