Welcome to Kafka tutorials at Learning Journal. We have been creating producers in our earlier sessions. So far, we have
covered almost every aspect of a Kafka producer. In this session, we will conclude our discussion
about Kafka Producers.
In Kafka, almost everything is controlled using configurations. In our earlier examples, we used four configuration parameters. Three of them were mandatory and fourth parameter was for custom partitioner. Let me recap all of them.
Kafka Bootstrap Servers
The first parameter, bootstrap.servers is a list of Kafka broker URI and port number. Since it is mandatory, so, we must have at least one value specified for this parameter. The value that we provide for this parameter is used by the producer to connect to Kafka cluster. Without this value, the producer cannot reach to the cluster. You should provide at least two addresses because if the first broker is down the producer should reach out at the second address. If you have a large cluster, you can provide more than two address. There is no harm in providing 3 to 4 addresses.
Kafka Serializers & Partitioners
The second parameter is a
key.serializer. This parameter takes the name of the class that you want to use for serializing
your key. The third parameter is a
value.serializer. This parameter takes the name of the class that you planned to use as
a value serializer. You can use the same class for both key and value. If your key and value both
are strings, you can use the same serializer for both. However, if you are sending record or an object,
using the same serializer for both key and value does not make any sense.
The last one is partitioner.class. So, if you are using a custom partitioner, you should specify your class name for this parameter.
We have already used all these parameters. So, I am sure that you learned them already.
The Kafka producer provides many configuration parameters. The complete list of producer parameter is available in Kafka documentation. Most of the parameters have a reasonable default value, so there is no need to customize many of them. I recommend that you check the documentation and read all of them at least once. We have excellent documentation, and most of them are straightforward. I will cover three important parameters in this session because they have a direct impact on the reliability and performance of Kafka.
Remember that these are producer configurations. So, you can set these configurations using properties just like you are
partitioner.class. The effect of setting these properties can be seen at the producer level,
not at the topic level or the server level.
Let's start with the first configuration.
acks configuration is to configure acknowledgments. When producers send a message to Kafka
broker, they get a response back from the broker. The response is a
RecordMetaData object or an exception.
This parameter acks, it can take three values: 0, 1, and all. If we set it to 0, the producer will not wait for the response. It will send the message over the network and forget. There are three side effects of acks being 0.
- Possible loss of messages
- High throughput
- No Retries
Since producer is not waiting for the response, there is no guarantee that the server has received the record. So, you may
lose some records.
However, since the producer is not waiting for an acknowledgment, it can send data as fast as the network can support and achieve high throughput.
The third side effect is that the producer will not even go for a retry. Kafka is a highly available system, so there is a slim possibility that you lose your record. However, understand that there is no guarantee. So, use this setting when loss of few messages is not an issue. This setting will provide you the highest possible throughput.
If we set acks to 1, the producer will wait for the response. However, the response is sent by the leader. So, this parameter will have an impact on when the leader is going to send the response. In this case, the leader will respond after recording the message in its local storage.
If the leader is down and message delivery fails, the producer can retry after few milliseconds.
This option appears to be a safe choice. However, there is a catch. You still cannot guarantee that you will not lose your message.
You might be wondering how I can lose the record if it is received at the leader? Well, You can. Because we have a single copy of the message. We are not sure that it is replicated. What if leader crashes. You will lose your message. Correct?
Replicators are fast. They replicate it quickly. However, If the leader breaks before replica could make a copy, the message will be lost. Surprisingly, in such scenario, the messages can be lost even after successful acknowledgment.
The chance of losing your record is thinner than the earlier option, but it is not a reliable option.
If you want to achieve 100% reliability, it is necessary that all replicas in the ISR list should make a copy successfully before the leader sends an acknowledgment.
That is where the all setting works.
If we set acks parameter to all, the leader will acknowledge only after it receives an acknowledgment from all of the live replicas. This option gives you the highest reliability but costs you the highest latency.
The all setting is the slowest option because you will be waiting for all replicas. However, you can achieve better throughput using asynchronous send.
Kafka Retries and Max in flight requests
Now the next parameter. The parameter
retries is a simple one. It defines how many times the producer will retry after getting
an error. The default value is 0. There is another parameter
retry.backoff.ms that controls the time between two retries. The default value for this
parameter is 100 milliseconds.
The next parameter is max.in.flight.requests.per.connection. This one is crucial and often less understood. Let me try to explain it. If you are using asynchronous send with a callback function to check your errors. You are not waiting for a response, but you ultimately get the response using a call back function. So, do you know how many such messages you can send without waiting for a response? The question is, how many in-flight requests are allowed that are still not acknowledged?
That's the number defined by max.in.flight.requests.per.connection parameter. Setting this parameter to a high value will increase memory usage, but at the same time, it will increase throughput as well. So, if you have enough memory, you may want to set it to a higher value to achieve better performance of an asynchronous send.
There is a side effect of asynchronous send. Let's assume you send 10 requests for same partition, 5 of them were sent as the first batch and failed. Remaining five goes as a second batch and succeed. Now the producer will retry the first batch, and if it is successful, you lost your order. That's a significant side effect of asynchronous send. So, be careful if the order of delivery is critical for you. If you are looking for an ordered delivery of your messages, you have following two options.
- Use synchronous send.
- set max.in.flight.requests.per.connection to 1
Both options have the same result. The order is critical for some use cases, especially transactional systems, for example banks and inventory. If you are working on that kind of use case, set max.in.flight.requests.per.connection to 1.
Other Kafka Producer Configs
There are few more important properties. I recommend checking Kafka document for at least following properties.
All these properties are relatively straightforward. However, if you have any doubts, you can reach out to me for clarification.
The primary objective of this session was to understand the ordering guarantee of Kafka. You should have a fair idea by now that you can preserve the order in a Kafka partition, but it comes at the cost of throughput.
That’s it for this session. In the next video, we will start exploring Kafka consumers. Thank you for watching learning journal.
Keep learning and keep growing.