Kafka is a high-performance and scalable messaging system. Sometimes when handling big data. The default configuration may limit the maximum performance. In this article, I’ll explain how messages are generate and saved in Kafka, and how to improve performance by changing configuration.
In short, messages will assembled into batches (named
RecordBatch) and send to broker.
The producer manages some internal queues, and each queue contains
RecordBatch that will send to one broker. When calling
send method, the producer will look into the internal queue and try to append this message to
RecordBatch which is smaller than
batch.size (default value is 16KB) or create new
There is also a sender thread in producer which is responsible for turning
RecordBatch into requests (
<broker node，List(ProducerBatch)>) and send to broker.
The details can be found from these two articles: Apache Kafka - Message Format and A Guide To The Kafka Protocol - Apache Kafka - Apache Software Foundation.
Here are some important properties in
timestamp and, of course, the
Record consists of
When look into the kafka topic data directory, you may find files like this:
00000000000000000000.log 00000000000000000000.index 00000000000000000000.timeindex 00000000000000000035.log 00000000000000000035.index 00000000000000000035.timeindex
Kafka saves each partition as segments. When new record comes, it append to the active segment. If the segment’s size limit is reached, a new segment is created as becomes the active segment. Segments are named by the offset of its first record, so the segments’ names are incremental.
Furthermore, the segment divided into three kinds of file: log file, index file and timeindex file.
logfile contains the actual data
indexfile contains the record’s relative offset and its physical position in the log file. This makes the look up complexity for specific offset record to
timeindexfile contains the record’s relative offset and its timestamp.
Consumer keeps reading data from broker, and decompress data if necessary. It will put data into a internal queue and return the target number of records to client.
max.poll.records (default values is 500) means the maximum number of records returned in a single call to poll().
fetch.min.bytes (default value is 1) means the minimum amount of data the broker should return from a fetch request. If insufficient data is available, the server will wait up to
fetch.max.wait.ms ms and accumulate the data before answering the request.
The default socket buffer value in Java client is too small for high-throughput environment.
socket.receive.buffer.bytes (default value is 64KB) and
send.buffer.bytes (default value is 128KB) is the
SO_SNDBUFF for socket connections respectively. I recommend to set it to a bigger value or
-1 to use the OS default value.
As mentioned before, producer always send message as
RecordBatch. Each batch should be smaller than
batch.size (default value is 16KB). Increasing
batch.size will not only reduce the TCP request to broker, but also lead to better compression ratio when compression is enabled.
linger.ms is used to specific the wait time before sending
RecordBatch, and it will effect the real size of
RecordBatch indirectly. The producer groups together any records that arrive in between request transmissions into a single batched request. If the system load is low and the
RecordBatch is not full, the producer sender will still send this batch once it has been waited for
linger.ms’s default value is 0, which means producer will send message as quick as possible(but the messaged arrived between two send requests will also be batched to
RecordBatch). Increasing this value not only makes real batch size be close to
batch.size and reducing the number of requests to be sent, but also increases the delay of messages.
buffer.memory (default value is 32MB) controls the total amount of memory available to the producer for buffering. If records are sent faster than they can be transmitted to the server then this buffer space will be exhausted. When the buffer space is exhausted additional send calls will block.
As the throughput keep growing, bandwidth may become bottleneck. It’s easy to tackle this by add
compresstion.type param in producer. Once it is configured, the producer will compressed the
RecordBatch before sending it to broker. If the records are texts, the compression ratio should be high and bandwidth usage will be significantly decreased.
There are two kind of
compresstion.type, topic level and producer level.
If you set
compresstion.type in producer, the producer will compress the records and send it to broker.
There is also a topic level
compresstion.type configuration. When it is set, producer’s compression type is not constrained. The broker will convert data sent from producer to target
compresstion.type can be set as
producer. The default value is
producer, which means the broker will keep the original data send from the producer.
How to choose compression type? According to cloudflare’s test result in Squeezing the firehose: getting the most from Kafka compression:
|type||CPU ration||Compression ratio|
Gzip has best compression ratio but take lots of CPU time.
Snappy keeps a balance between the CPU time and space. The new compression type
zstd added in Kafka 2.1 produce larger compression ratio than
Snappy with the cost of a little more CPU time.
These are common configurations, you can find more from the official document contains such as
- Kafka message format
- Exploit Apache Kafka’s Message Format to Save Storage and Bandwidth
- Consuming big messages from Kafka
- How does max.poll.records affect the consumer poll
- A Practical Introduction to Kafka Storage Internals
- Kafka message codec - compress and decompress
- 20 Best Practices for Working With Apache Kafka at Scale
- Kakfa Document
- kafka-python KafkaProducer
- Deep Dive Into Apache Kafka | Storage Internals