Welcome to Apache Kafka tutorial at Learning journal. So far, we covered a lot of theory and concepts of Apache Kafka. But Kafka is more about APIs. You can classify Kafka APIs into two parts.
- Producer APIs
- Consumer APIs.
In this session, we will cover internals of Producer API and also create an example producer. By the end of this video, you will have a sound understanding of Apache Kafka producer API, and you should be able to code your producers. So let's start.
Asynchronous Communication
I think, by now, you already know that we can use Kafka in several ways. We can use Kafka to
solve complex data integration
problems. We can use Kafka to create a series of validations, transformations and build complex
data
pipelines. We can use it to record information for later consumption, for example, recording
clickstream.
We can use it to log transactions and create applications to respond in real-time. We can also
use
it to collect data from your mobile phones, smart appliances, and sensors in an IOT application.
But if you look at any of these use cases, it's all about asynchronous communication among
applications.
So, whatever we do with Kafka, we must have a producer that will send data to Kafka. You need to
create a producer for your application to send data to Kafka. The most common method to create
Kafka
producer is using Kafka APIs. Since core APIs are available in Java, you must know Java to be
able
to understand and use them. However, even if you are not in day to day Java coding, you can
still
understand the concepts and internal working of Kafka during this discussion.
A Simple Kafka Producer
Let's directly jump into some code. I have listed the code for the simplest Kafka producer. Just
take a look at it before
we jump into an explanation.
Git Filename -
SimpleProducer.java
At line 1, the org.apache.kafka.clients.producer package defines Apache Kafka producer API. If you are interested in looking at the source code for the package, it's available on GitHub. You may also need to refer producer API documentation
In the above example, we want to send a string message to Kafka. Most of the time, Kafka
messages are Key value pair. So,
with every message, you can send a key. However, the key is not mandatory. You can send a
message
without a key. But in this example, at line 2, we want to create a key as
Key1 and a value as
Value-1. And we want to send it to the topic
SimpleProducerTopic.
If you look at the rest of the code, there are only three steps.
- Step 1 - Create a KafkaProducer object - line 4.
- Step 2 - Create a ProducerRecord object - line 5.
- Step 3 - Send the record to the producer - line 6.
That is all that we do in a Kafka Producer.
So, let’s look at the details of each step.
The first step is to create a
KafkaProducer object. To create this object, you need a property object with at least
three
mandatory configurations
(line 3).
These core configurations are.
- bootstrap.servers
- key.serializer
- value.serializer
I hope you already know bootstrap servers. It is a list of Kafka brokers. The producer object
will use this list to connect
to Kafka cluster. You can specify one or more brokers in this list. The recommendation is to
provide
at least two brokers, so if one broker is down, the producer can connect to the other broker
from
the list.
The next two properties are about Kafka message. I already mentioned in an earlier video
that
Kafka doesn't have any meaning for the data. A message is just an array of bytes for Kafka. In
this
example, we are sending a string Key and a string value. But Kafka accepts only an array of
bytes.
So, we need a class to convert our message key and value into an array of bytes. The activity of
converting Java objects into an array of bytes is called serialization. So those two properties
are
to specify an appropriate serializer class for the key and value. In this example, we are using
string
serializer for both key and value. Kafka also provides some other serializers like
IntSerializer or
DoubleSerializer. If you want to send an integer key, you should use an
IntSerializer for the key instead of
StringSerializer. We will cover serializers in detail in another session.
So, we define those three mandatory configurations and package them into a Java properties
object.
Then we pass the properties object to
KafkaProducer object constructor and instantiate a producer. That was our first step.
We are done with the first step, and now you have a Producer instantiated. We want this
Producer
to send some messages. The second step is to create a
ProducerRecord object. The
ProducerRecord object requires three things, The Topic name, Key, and the Message
Value.
We are passing all these three things into the constructor and instantiating a
ProducerRecord. The producer record object is our message. It should be given to
producer,
so the producer can send it to Kafka broker.
As a Final step, we make a call to send method on Producer object and handover the
RecordObject. That's it.
Now it's producer’s responsibility to deliver this message to the broker. After sending all
your
messages, you need to close the Producer object. Closing a producer is necessary to clean up all
resources that producer may be using.
In a real-life project, the Producer is a long-running process that keeps sending messages,
but
we took a simple example to understand the API.
That's it for the simple producer, you can compile it and execute it. You can use console
consumer
to test if the message reached the Broker.
How to Compile Kafka Producer
If you want to compile and execute the given code, you can use SBT to compile and execute it as
demonstrated in the video.
If you are new to SBT, I have a video training on SBT as part of my Scala training. You will
need
a Scala build file to compile the above example. You can use the build file as given below.
Filename –
build.sbt
In the above simple example, we created two objects.
- KafkaProduce
- ProducerRecord
I recommend you look at the Java Docs for these two classes.
The KafkaProducer
The KafkaProducer give you four variants of a constructor. They don't provide anything new but only offer different methods to pass in your configuration values. If you look at the signature of send method in the documentation, it returns you a Java Future object of type RecordMetadata. The RecordMetadata is a kind of acknowledgment from the broker. It contains information about your message, for example, the partition id and offset number. We will cover more details on acknowledgment in the next session.
The ProducerRecord
The ProducerRecord offers four variants of the constructor. We used one of the options and supplied the topic, message key, and message value. But there is another constructor with a more comprehensive option. It allows us to provide two more parameters.
- Partition Number
- Timestamp.
We will cover Kafka message partition in a separate session but let me cover some basics here.
Kafka comes with a default partitioner. This partitioner will use the message key to
determine
a partition number. If you are sending few thousand messages with same message key, they all
will
land in the same partition. But you know that message key is optional, so if you don't send a
message
key, the default partitioner will evenly distribute those messages across the available
partitions.
But what about the partition number parameter in the
ProducerRecord ?
If you set a partition number in your
ProducerRecord, you will disable default partitioner. It is like, you are hardcoding a
partition
in your message itself and don't want Kafka to determine a suitable partition for your message.
If
you hard code it to zero, your message will go to partition zero. We will cover some usage of
this
parameter in later sessions.
Now let's come to Timestamp. Kafka gives a timestamp to every message. If you want to set a
message
time stamp before you send it to Kafka, you can use this parameter. If you don't set a
timestamp,
the broker will set it when the message reaches the broker. It is important to note the
difference.
The former is the time when you are sending a message to the broker, and later one is the time
when
the broker is receiving a message.
That's it for this session. In next session, we will cover some more details of Kafka
producer
APIs.