Welcome to Kafka tutorials at Learning Journal. We have learned quite a lot about Kafka
subscribers. In this session, we
will study how to take full control of your Kafka subscriber. So, let's start.
In our earlier sessions, we learned about two things that are managed by Kafka.
- Automatic group management & Partition assignment.
- Offset and consumer positions control.
Let me quickly recap both things.We already learned that to become a member of a group you just need to provide a group name and subscribe to the topics. That's it. And Kafka will make you a member of a consumer group. But have you thought about it? Why do you want to become a member of a Group? What are the benefits of creating a Consumer Group? You already know the answer. Right? It gives following advantages.
- Allows you to parallel process a topic.
- Automatically manages partition assignment.
- Detects entry/exit/failure of a consumer and perform partition rebalancing activity.
Correct? So, most of the time, you want all these features. But there is a downside of automatic partition assignment. You don't have control over the partition assignment. I mean, you can't decide which partitions will be assigned to which consumer. Kafka takes this decision. So, if you have built a solution with a custom partitioning strategy and you want to make use of the custom partitioning, this automatic partition assignment will create problems for you. To understand it in a better way, I must take you back to my custom partitioning example.
Problems with Kafka Consumer Groups
You remember, we used a custom partitioner to make sure that all the data for TSS sensor will
land into first three partitions.
You have that system in place for collecting data. Now you have a real-time statistics
calculation
requirement for TSS sensor. So, you need to create a consumer that reads only TSS data and
calculates
required statistics. Making sense? Pause for a minute and think about it. Can you solve this
problem?
One way is that you read all the data and discard everything else except TSS and then
calculate
your stats. But you don't want to do unnecessary work. After all, it's a real-time system and
every
millisecond counts. You might be thinking that you should have created a separate topic for TSS.
Then, it was easy. Right? You can subscribe to TSS topic and rest is simple. But this system was
designed in this way for a reason, and now you don't have a luxury of redesigning.
You can find a solution easily if you have the flexibility to create a consumer and assign
these
three partitions to yourself. That's it. Your Consumer will read these three partitions and rest
will be simple.
I think this example makes a point that in certain use cases, you may need the flexibility
to
take control of the partition assignment instead of relying on Kafka to do a random assignment.
I
will show you a relevant code example in a minute.
But let me come back to offset management and consumer positioning as well. So, we already
know
that Kafka maintains a current offset and a committed offset for each consumer. We have also
seen
an example for rebalance listener where we take partial control over the offset. I am not sure
if
you already noticed that we still have a problem in that implementation.
Let me take you back to that example and explain the problem.
So, this is the consumer that we created. Just look at the for-loop. First step is to process
the record and save the result
in the database. Then in the next step, we keep the offset because we finished processing that
message,
and we will commit the offset later. Right? But thosetwo steps are two operations, and they are
not
atomic. It is possible that we save the record in the database and the consumer crashes before
committing
the offset.
So, the rebalance listener will allow us to perform clean up and commit before the partition
goes away, but it can't help us in synching the processed record and committed offset. There is
no
other way to achieve it except making
SaveToDatabase and commit offset as a single atomic transaction.
Great, now we understand that we may have use cases where we need to take full control of
two
things.
- Partition assignment - That means, we don't want Kafka to assign partitions to different consumers. We want to take the control and assign desired partitions to ourselves.
- Committed offset management - That means, we don't want Kafka to store the offset. We want to maintain the committed offset somewhere outside of Kafka. This approach allows us to develop transactional systems.
So, how to do that? Let's create an example.
Exactly once processing Example
So, I will create a consumer that will assign three TSS partitions to itself. Then, it will
start reading all messages from
those three partitions and insert each message into a database table. It will not commit the
offset
back to Kafka. Instead, it will update the current offset into another database table. The
insert
and update statements will be part of a single transaction, so either both will complete, or
both
will fail.
I will use MySQL as a database for this demo. If you don't have MySQL installed and
configured,
you can follow instructions listed below.
- Install MySQL server using below command. The Yum command will download and install MySQL server.
- Once Installation is over, start the MySQL server.
- MySQL service is up, let's make it secure. The below command will help you setup password for the MySQL root account.
- Now we can start MySQL command line tool.
- mysql -u root -p
- Now we need to do following things.
- Create a database.
- Create a table to insert TSS sensor data.
- Create a table to update TSS offsets.
- Insert 3 rows for each TSS partition for initial offsets.
- I created a simple SQL script to do all of this. The script is simple, and I assume it doesn't require any explanation. Let's execute the script.
- At this stage, you should have the table for sensor data but no data inside the table. The Consumer will insert the data, so we wanted an empty table.
- You should have initial offsets table with initial values set to zero. So, the consumer should start reading from the beginning and keep updating as it reads.
The code for the script file is shown below.
Now, it's time to look at the Consumer code.
Exactly once processing – Kafka Consumer
Let’s start with the main method. As always, we set up properties and then instantiate a Kafka consumer object. Other than mandatory properties, we have an additional property enable.auto.commit. This property will disable the auto-commit feature, and the consumer will not commit offsets automatically back to Kafka broker.
If we want automatic group management, we should subscribe to the topic and Kafka will
automatically assign partitions to
the consumer. But in this example, we don't want Kafka to assign partitions to us. So, we create
three partition objects, one each for the three partitions that we want to read, then we
self-assign
these three partitions.
Good, so the first part is taken care, we have three partitions, and we are ready to read
data
from those partitions. The next thing is to adjust the offset. The next three lines of code will
set the offset position for three partitions. The
getOffsetFromDB will read offset positions from MySQL database. The current values in
the
database are zero, so it will return zero. That's it. We are all set. We assigned partitions,
and
we adjusted offset to appropriate positions. All we need to do is read and process.
The loop will keep polling Kafka till we are getting records. The
saveAndCommit method will save each record in the database and alsoupdate offsets.
The code for
saveAndCommit is plain JDBC code. We set auto commit false, insert data, update offset,
and finally, execute commit. That makes the method an atomic transaction and we can achieve
exactly-once
processing scenario.
Kafka efficiently provides us at least once processing. But to achieve exactly-once
processing,
we need to do some extra work as we did in this example.
You can test the code. The video demonstrates the execution and test scenario. Checkout the
video
for the demo.
In this session, we learned to take control of partition assignment and offset management
from
Apache Kafka. Automatic group and offset management are convenient options, but by taking
control
in our hand, we have all the power to support complex requirements.
That's it for this session. See you again in the next session. Thank you for watching
learning
journal. Keep learning and keep growing.