Generic Top

Get started with Spring 5 and Spring Boot 2, through the Learn Spring course:

>> CHECK OUT THE COURSE

1. Overview

Apache Kafka is a powerful, open-source, distributed, fault-tolerant event streaming platform. However, when we use Kafka to send messages larger than the configured size limit, it gives an error.

We showed how to work with Spring and Kafka in a previous tutorial. In this tutorial, we'll look at the way to send large messages with Kafka.

2. Problem Statement

Kafka configuration limits the size of messages that it's allowed to send. By default, this limit is 1MB. However, if there's a requirement to send large messages, we need to tweak these configurations as per our requirements.

For this tutorial, we're using Kafka v2.5. Let's first look into our Kafka setup before jumping to configuration.

3. Setup

Here, we're going to use the basic Kafka setup with a single broker. Also, the producer application can send messages over a defined topic to Kafka Broker by using Kafka Client. Additionally, we're using a single partition topic:

We can observe multiple interaction points here like Kafka Producer, Kafka Broker, Topic, and Kafka Consumer. Therefore, all of these need configuration updates to be able to send a large message from one end to another.

Let's look into these configs in detail to send a large message of 20MB.

3. Kafka Producer Configuration

This is the first place where our message originates. And we're using Spring Kafka to send messages from our application to the Kafka server.

Hence, the property “max.request.size” needs to be updated first. Additional details about this producer config are available in Kafka Documentation.  This is available as constant ProducerConfig.MAX_REQUEST_SIZE_CONFIG in the Kafka Client library, which is available as part of Spring Kafka dependency.

Let's configure this value to 20971520 bytes:

public ProducerFactory<String, String> producerFactory() {
    Map<String, Object> configProps = new HashMap<>();
    configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
    configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    configProps.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "20971520");

    return new DefaultKafkaProducerFactory<>(configProps);
}

4. Kafka Topic Configuration

Our message-producing application sends messages to Kafka Broker on a defined Topic. Hence, the next requirement is to configure the used Kafka Topic. This means we need to update the “max.message.bytes” property having a default value of 1MB.

This holds the value of Kafka's largest record batch size after compression (if compression is enabled). Additional details are available in Kafka Documentation.

Let's configure this property manually at the time of topic creation using the CLI command:

./kafka-topics.sh --bootstrap-server localhost:9092 --create --topic longMessage --partitions 1 \
--replication-factor 1 --config max.message.bytes=20971520 

Alternatively, we can configure this property through Kafka Client:

public NewTopic topic() {
    NewTopic newTopic = new NewTopic(longMsgTopicName, 1, (short) 1);
    Map<String, String> configs = new HashMap<>();
    configs.put("max.message.bytes", "20971520");
    newTopic.configs(configs);
    return newTopic;
}

At a minimum, we need to configure these two properties.

5. Kafka Broker Configuration

An optional configuration property, “message.max.bytes“, can be used to allow all topics on a Broker to accept messages of greater than 1MB in size.

And this holds the value of the largest record batch size allowed by Kafka after compression (if compression is enabled). Additional details are available in Kafka Documentation.

Let's add this property in Kafka Broker's “server.properties” config file:

message.max.bytes=20971520

Moreover, the maximum value among “message.max.bytes” and “max.message.bytes” will be the effective value used.

6. Consumer Configuration

Let's look into the configuration settings available for a Kafka consumer. Although these changes aren't mandatory for consuming large messages, avoiding them can have a performance impact on the consumer application. Hence, it's good to have these configs in place, too:

  • max.partition.fetch.bytes: This property limits the number of bytes a consumer can fetch from a Topic's partition. Additional details are available in Kafka Documentation. This is available as a constant named ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG in the Kafka Client library
  • fetch.max.bytes: This property limits the number of bytes a consumer can fetch from the Kafka server itself. A Kafka consumer can listen on multiple partitions as well. Additional details are available in Kafka Documentation. This is available as constant ConsumerConfig.FETCH_MAX_BYTES_CONFIG in the Kafka Client library

Therefore, to configure our consumers, we need to create a KafkaConsumerFactory. Remember we always need to use a higher value compared to Topic/Broker config:

public ConsumerFactory<String, String> consumerFactory(String groupId) {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "20971520");
    props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, "20971520");
    return new DefaultKafkaConsumerFactory<>(props);
}

Here we used the same config value of 20971520 Bytes for both properties because we are using a single partition Topic. However, the value of FETCH_MAX_BYTES_CONFIG should be higher than MAX_PARTITION_FETCH_BYTES_CONFIG. When we have consumer listening on multiple partitions, FETCH_MAX_BYTES_CONFIG represents the message size that can be fetched from multiple partitions. On the other hand, config MAX_PARTITION_FETCH_BYTES_CONFIG represents message fetch size from a single partition.

7. Alternatives

We saw how different configs in Kafka producer, Topic, Broker, and Kafka consumer could be updated to send large messages. However, we should generally avoid sending large messages using Kafka. The processing of large messages consumes more CPU and memory of our producer and consumer. Hence ultimately somewhat limits their processing capabilities for other tasks. Also, this can cause visibly high latency to the end-user.

Let's look into other possible options:

  1. Kafka producer provides a feature to compress messages. Additionally, it supports different compression types that we can configure using the compression.type property.
  2. We can store the large messages in a file at the shared storage location and send the location through Kafka message. This can be a faster option and has minimum processing overhead.
  3. Another option could be to split the large message into small messages of size 1KB each at the producer end. After that, we can send all these messages to a single partition using the partition key to ensure the correct order. Therefore, later, at the consumer end, we can reconstruct the large message from smaller messages.

If none of the above options suits our requirements, we can go for the earlier discussed configurations.

8. Conclusion

In this article, we covered different Kafka configurations required to send large messages greater than 1MB in size.

We covered configs needs at the Producer, Topic, on Broker, and Consumer end. However, some of these are mandatory configs, while some are optional. Additionally, consumer configs are optional but good to have to avoid negative performance impacts.

In the end, we also covered alternate possible options for sending large messages.

As always, the code example is available over on GitHub.

Generic bottom

Get started with Spring 5 and Spring Boot 2, through the Learn Spring course:

>> CHECK OUT THE COURSE
Generic footer banner
Comments are closed on this article!