How to create Kafka Streams Application
Kafka Streams API offers two types of APIs to create real-time streaming application.
- Streams DSL
- Processor API
Processor APIs are the low-level API, and they provide you with more flexibility
than the DSL. Using Processor API requires little extra manual work and code on the
application developer side. However, they allow you to handle those one-off problems
that may not be possible to solve with higher level abstraction.
Streams DSL is an easy to use DSL for creating real-time stream processing
applications. In tis post, we will primarily focus on the Streams DSL.
Let’s start our learning with a simple Kafka Stream application. As our first
example, we want to create a simple Kafka streams application to do the following
things.
- Connect to the Kafka cluster and start reading data from a given topic.
- We do not want to get into the processing that data yet, but we merely want to print the messages on the console.
This example is an excerpt from the Book Kafka Streams –
Real-time Stream Processing
For a detailed explanation of the example and much more, you can get access to
the Book using below link.
Solution – Hello Streams
Code Listing below shows the code snippet for our first Kafka streams application.At a high level, creating a Kafka Streams application is a four-step process.
Step 1 (Line 2 – 6)
The first step is to create a Java Properties object and put the necessary configuration in the Properties object. Kafka streams API is highly configurable, and we use Java Properties object to specify those configurations. In this example, we are setting four most basic settings.
- APPLICATION_ID_CONFIG
- BOOTSTRAP_SERVERS_CONFIG
- DEFAULT_KEY_SERDE_CLASS_CONFIG
- DEFAULT_VALUE_SERDE_CLASS_CONFIG
Every Kafka Streams application must be identified by a unique application id. If
you are executing multiple instances of your Kafka Streams application, all
instances of the same application must have the same application id. The application
id is used by Kafka Streams application in many ways to isolate resources for the
same application and share workload among the instances of the same application. You
will learn some specific uses of the application id as you progress with the book.
The next configuration is the bootstrap server which you already know since
earlier chapters. A bootstrap server is a comma-separated list of host/port pairs
that streams application would use to establishing the initial connection to the
Kafka cluster.
The next two configurations are the Key and the Value Serdes. Serdes is a
Factory class that you will be using for creating aSerde (a combination of
serializer and a matching deserializer). In this example, we are setting an Integer
Serde for the key and a String Serde for the value. You might be wondering, why do
we need a Serde? While creating producers, we required a serializer and consumers
needed a deserializer. But why do we need Serde (a combination of both) for the
Streams API?
This example is a data consuming stream application. However, a typical Kafka
Streams application internally creates a combination of consumer and producer.
Hence, we need a serializer as well as a deserializer for the internal
producer/consumer to work correctly. Therefore, we set a default Serdes for the key
and a value.
Step 2 (Line 7 – 11)
Next comes the central part of your Kafka Streams application. In this part, we define our streams computational logic that we want to execute. For our example, the computation logic is as straightforward as following steps.
- Open a stream to a source topic – define a Kafka stream for a Kafka topic that can be used to read all the messages.
- Process the stream – for each message, print the Key and the Value.
- Create a Topology – bundle your business logic into a stream topology.
Opening a stream
We want to use Kafka Streams DSL for defining the above computational logic. Most of the DSL APIs are available through StreamsBuilder() class. So, the first step is to create a StreamBuilder object. After creating a builder, you can open a Kafka Stream using the stream() method on the StreamBuilder. The stream() method takes a Kafka topic name and returns a KStream object. Once you get a KStream object, you have successfully completed the first part of your computational logic – Opening a stream to a source topic.
Process the stream
KStream class provides a bunch of methods for you to build your
computational logic.
We just want to navigate each message (key/value pair) and apply a println.
So, the
example is making use of the foreach() method on KStream to
implement the second
part of our computational logic – Process the stream.
Instead of using foreach() method, you can also use the peek()
method to work
with each record in the stream. The foreach() and the peek()
method takes a lambda
expression on key/value pair, and the logic to work with each pair is implemented in
the lambda body.That’s all. That’s what we wanted to do as a computational logic.
Create a Topology
The Kafka Streams computational logic is known as a Topology and is
represented by
the Topology class. So, whatever we defined as computational logic, we can
get all
that bundled into a Topology object by making a call to the
build() method
on the
StreamsBuilder object.That’s all. That’s want we wanted to achieve in the
Step-2.
However, we implemented the computational logic by creating some intermediate
objects such as KStream, and Topology. A typical application may
not be creating and
holding these intermediate objects. Instead, we prefer to define all these
operations as a chain of methods to avoid unnecessary code clutter. The code below
shows the same code implemented as a chain of methods.
Step 3 (Line 12 – 14)
Once we have the Properties and the Topology, we are ready to instantiate the KafkaStreams and start() it. That’s all we do in a typical Kafka Streams application. You can summarize a Kafka Streams application in just three steps.
- Configure using Properties
- Create Topology using StreamBuilder
- Create and start KafkaStreams using Properties and Topology.
Step 4 (15 – 18)
A typical Kafka stream application is an always running application. So, once started, it keeps running forever until you bring it down for some maintenance reasons. However, you must add a nice ShutdownHook to handle SIGTERM. We use the ShutdownHook to perform necessary clean-up and close the KafkaStreams object.
Streams Application in a Nutshell
We have seen a simple Kafka Streams application. You must have realized that the
core of your Kafka streams application is the Topology of your application. A
typical Kafka Streams application define its computational logic through one or more
processor topologies.
The next important point to notice is that your Kafka Streams application does
not execute inside the Kafka cluster or the Kafka Broker. Instead, it runs in a
separate JVM instance, or in a different machine entirely. For reading and writing
data to the Kafka cluster, it creates a remote connection using the bootstrap
servers configuration. The simplicity to run your Kafka Streams application allows
you to use your favourite packaging and deployment infrastructure such as Docker and
Kubernetes container platforms.
Finally, you can run a single instance of your application or alternatively, you
can start multiple instances of the same application to scale your production
deployment horizontally. Kafka Streams API is designed to help you elastically scale
and parallelize your application by merely starting more and more instances. These
additional instances not only contribute to share workload but also provide
automatic fault-tolerance.