Apache Kafka Foundation Course - Producer API

Welcome to Apache Kafka tutorial at Learning journal. So far, we covered a lot of theory and concepts of Apache Kafka. But Kafka is more about APIs. You can classify Kafka APIs into two parts.

  1. Producer APIs
  2. Consumer APIs.

In this session, we will cover internals of Producer API and also create an example producer. By the end of this video, you will have a sound understanding of Apache Kafka producer API, and you should be able to code your producers. So let's start.

Asynchronous Communication

I think, by now, you already know that we can use Kafka in several ways. We can use Kafka to solve complex data integration problems. We can use Kafka to create a series of validations, transformations and build complex data pipelines. We can use it to record information for later consumption, for example, recording clickstream. We can use it to log transactions and create applications to respond in real-time. We can also use it to collect data from your mobile phones, smart appliances, and sensors in an IOT application.
But if you look at any of these use cases, it's all about asynchronous communication among applications. So, whatever we do with Kafka, we must have a producer that will send data to Kafka. You need to create a producer for your application to send data to Kafka. The most common method to create Kafka producer is using Kafka APIs. Since core APIs are available in Java, you must know Java to be able to understand and use them. However, even if you are not in day to day Java coding, you can still understand the concepts and internal working of Kafka during this discussion.

A Simple Kafka Producer

Let's directly jump into some code. I have listed the code for the simplest Kafka producer. Just take a look at it before we jump into an explanation.
Git Filename - SimpleProducer.java

            import java.util.*;
    1.      import org.apache.kafka.clients.producer.*;                                    
            public class SimpleProducer {                                    
                public static void main(String[] args) throws Exception{

    2.              String key = "Key1";
                    String value = "Value-1";
                    String topicName = "SimpleProducerTopic";
    3.              Properties props = new Properties();
                    props.put("bootstrap.servers", "localhost:9092,localhost:9093");
                    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    4.              Producer<String, String> producer = new KafkaProducer<>(props);                                    
    5.              ProducerRecord<String, String> record = new ProducerRecord<>(topicName,key,value);
    6.              producer.send(record);                
                    System.out.println("SimpleProducer Completed.");

At line 1, the org.apache.kafka.clients.producer package defines Apache Kafka producer API. If you are interested in looking at the source code for the package, it's available on GitHub. You may also need to refer producer API documentation

In the above example, we want to send a string message to Kafka. Most of the time, Kafka messages are Key value pair. So, with every message, you can send a key. However, the key is not mandatory. You can send a message without a key. But in this example, at line 2, we want to create a key as Key1 and a value as Value-1. And we want to send it to the topic SimpleProducerTopic.
If you look at the rest of the code, there are only three steps.

  1. Step 1 - Create a KafkaProducer object - line 4.
  2. Step 2 - Create a ProducerRecord object - line 5.
  3. Step 3 - Send the record to the producer - line 6.

That is all that we do in a Kafka Producer.
So, let’s look at the details of each step.
The first step is to create a KafkaProducer object. To create this object, you need a property object with at least three mandatory configurations (line 3).
These core configurations are.

  1. bootstrap.servers
  2. key.serializer
  3. value.serializer

I hope you already know bootstrap servers. It is a list of Kafka brokers. The producer object will use this list to connect to Kafka cluster. You can specify one or more brokers in this list. The recommendation is to provide at least two brokers, so if one broker is down, the producer can connect to the other broker from the list.
The next two properties are about Kafka message. I already mentioned in an earlier video that Kafka doesn't have any meaning for the data. A message is just an array of bytes for Kafka. In this example, we are sending a string Key and a string value. But Kafka accepts only an array of bytes. So, we need a class to convert our message key and value into an array of bytes. The activity of converting Java objects into an array of bytes is called serialization. So those two properties are to specify an appropriate serializer class for the key and value. In this example, we are using string serializer for both key and value. Kafka also provides some other serializers like IntSerializer or DoubleSerializer. If you want to send an integer key, you should use an IntSerializer for the key instead of StringSerializer. We will cover serializers in detail in another session.
So, we define those three mandatory configurations and package them into a Java properties object. Then we pass the properties object to KafkaProducer object constructor and instantiate a producer. That was our first step.
We are done with the first step, and now you have a Producer instantiated. We want this Producer to send some messages. The second step is to create a ProducerRecord object. The ProducerRecord object requires three things, The Topic name, Key, and the Message Value. We are passing all these three things into the constructor and instantiating a ProducerRecord. The producer record object is our message. It should be given to producer, so the producer can send it to Kafka broker.
As a Final step, we make a call to send method on Producer object and handover the RecordObject. That's it.
Now it's producer’s responsibility to deliver this message to the broker. After sending all your messages, you need to close the Producer object. Closing a producer is necessary to clean up all resources that producer may be using.
In a real-life project, the Producer is a long-running process that keeps sending messages, but we took a simple example to understand the API.
That's it for the simple producer, you can compile it and execute it. You can use console consumer to test if the message reached the Broker.

How to Compile Kafka Producer

If you want to compile and execute the given code, you can use SBT to compile and execute it as demonstrated in the video. If you are new to SBT, I have a video training on SBT as part of my Scala training. You will need a Scala build file to compile the above example. You can use the build file as given below.
Filename – build.sbt

    name := "KafkaTest"

    libraryDependencies ++= Seq(
        "org.apache.kafka" % "kafka-clients" % ""
        exclude("javax.jms", "jms")
        exclude("com.sun.jdmk", "jmxtools")
        exclude("com.sun.jmx", "jmxri")
        exclude("org.slf4j", "slf4j-simple")

In the above simple example, we created two objects.

  1. KafkaProduce
  2. ProducerRecord

I recommend you look at the Java Docs for these two classes.

The KafkaProducer

The KafkaProducer give you four variants of a constructor. They don't provide anything new but only offer different methods to pass in your configuration values. If you look at the signature of send method in the documentation, it returns you a Java Future object of type RecordMetadata. The RecordMetadata is a kind of acknowledgment from the broker. It contains information about your message, for example, the partition id and offset number. We will cover more details on acknowledgment in the next session.

The ProducerRecord

The ProducerRecord offers four variants of the constructor. We used one of the options and supplied the topic, message key, and message value. But there is another constructor with a more comprehensive option. It allows us to provide two more parameters.

  1. Partition Number
  2. Timestamp.

We will cover Kafka message partition in a separate session but let me cover some basics here.
Kafka comes with a default partitioner. This partitioner will use the message key to determine a partition number. If you are sending few thousand messages with same message key, they all will land in the same partition. But you know that message key is optional, so if you don't send a message key, the default partitioner will evenly distribute those messages across the available partitions.
But what about the partition number parameter in the ProducerRecord ?
If you set a partition number in your ProducerRecord, you will disable default partitioner. It is like, you are hardcoding a partition in your message itself and don't want Kafka to determine a suitable partition for your message. If you hard code it to zero, your message will go to partition zero. We will cover some usage of this parameter in later sessions.
Now let's come to Timestamp. Kafka gives a timestamp to every message. If you want to set a message time stamp before you send it to Kafka, you can use this parameter. If you don't set a timestamp, the broker will set it when the message reaches the broker. It is important to note the difference. The former is the time when you are sending a message to the broker, and later one is the time when the broker is receiving a message.
That's it for this session. In next session, we will cover some more details of Kafka producer APIs.

You will also like:

Kafka Core Concepts

Learn Apache Kafka core concepts and build a solid foundation on Apache Kafka.

Learning Journal

Pattern Matching

Scala takes the credit to bring pattern matching to the center.

Learning Journal

Apache Spark Introduction

What is Apache Spark and how it works? Learn Spark Architecture.

Learning Journal

Scala placeholder syntax

What is a scala placeholder syntax and why do we need it? Learn from experts.

Learning Journal

Higher Order functions

Scala allows you to create Higher Order functions as first class citizens.

Learning Journal