Course – LS – All

Get started with Spring and Spring Boot, through the Learn Spring course:

>> CHECK OUT THE COURSE

1. Introduction

Apache Kafka is an open-source distributed event store and fault-tolerant stream processing system. Kafka is basically an event streaming platform where clients can publish and subscribe to a stream of events. Generally, producer applications publish events to Kafka while consumers subscribe to these events, thus implementing a publisher-subscriber model.

In this tutorial, we’ll learn how we can add custom headers in a Kafka message using a Kafka producer.

2. Setup

Kafka provides an easy-to-use Java library that we can use for creating Kafka producer clients (Producers) and consumer clients (Consumers).

2.1. Dependencies

To begin with, let’s add the Kafka Clients Java library’s Maven dependency to our project’s pom.xml file:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.4.0</version>
</dependency>

2.2. Connection Initialization

The guide assumes we have a Kafka cluster running on our local system. Additionally, we need to create a topic and establish a connection with the Kafka cluster.

Firstly, let’s begin by creating a Kafka topic in our cluster. We can create a topic “baeldung” by referring to our Kafka Topic Creation guide.

Secondly, let’s create a new Properties instance with the bare minimum configuration required for connecting the producer to our local broker:

Properties producerProperties = new Properties();
producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

Lastly, let’s create an instance of KafkaProducer that we’ll use to publish messages:

KafkaProducer <String, String> producer = new KafkaProducer<>(producerProperties);

The KafkaProducer class’s constructor accepts a Properties object (or a Map) with the bootstrap.servers property and returns an instance of KafkaProducer.

In a similar fashion, let’s create an instance of KafkaConsumer that we’ll use to consume messages:

Properties consumerProperties = new Properties();
consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "ConsumerGroup1");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProperties);

We’ll use these producer and consumer instances to demonstrate all our coding examples.

Now that we have all the necessary dependencies and connections configured, we can write a simple application to add custom headers in a Kafka message.

3. Publishing Messages With Custom Headers

Support for custom headers in Kafka messages was added in Kafka version 0.11.0.0. To create a Kafka message (Record), we create an instance of ProducerRecord<K,V>. The ProducerRecord basically identifies the message value and topic to which the message is to be published, along with other metadata.

ProducerRecord class provides various constructors to add custom headers to a Kafka message. Let’s take a look at a couple of constructors that we can use:

  • ProducerRecord(String topic, Integer partition, K key, V value, Iterable<Header> headers)
  • ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers)

Both the ProducerRecord class constructors accept custom headers in the form of an Iterable<Header> type.

To understand, let’s create a ProducerRecord that publishes a message to the “baeldung” topic along with some custom headers:

List <Header> headers = new ArrayList<>();
headers.add(new RecordHeader("website", "baeldung.com".getBytes()));
ProducerRecord <String, String> record = new ProducerRecord <>("baeldung", null, "message", "Hello World", headers);

producer.send(record);

Here, we’re creating a List of Header types to pass as headers to the constructor. Each header represents an instance of RecordHeader(String key, byte[] value) that accepts a header key as a String and the header value as a byte array.

In a similar fashion, we can use the second constructor that additionally accepts a timestamp of the record being published:

List <Header> headers = new ArrayList<>();
headers.add(new RecordHeader("website", "baeldung.com".getBytes()));
ProducerRecord <String, String> record = new ProducerRecord <>("baeldung", null, System.currentTimeMillis(), "message", "Hello World", headers);

producer.send(record);

So far, we’ve created a message with custom headers and published it to Kafka.

Next, let’s implement the consumer code to consume a message and verify its custom headers.

4. Consuming Messages With Custom Headers

Firstly, we subscribe our consumer instance to the Kafka topic “baeldung” to consume messages from:

consumer.subscribe(Arrays.asList("baeldung"));

Secondly, we use the polling mechanism to poll for new messages from Kafka:

ConsumerRecords<String, String> records = consumer.poll(Duration.ofMinutes(1));

The KafkaConsumer.poll(Duration duration) methods polls for new messages in the Kafka topic until the time specified by the Duration parameter. The method returns an instance of ConsumerRecords containing the fetched messages. ConsumerRecords is basically an Iterable instance of the ConsumerRecord type.

Lastly, we loop through the fetched records and get the custom headers along with each message:

for (ConsumerRecord<String, String> record : records) {
    System.out.println(record.key());
    System.out.println(record.value());

    Headers consumedHeaders = record.headers();
    for (Header header : consumedHeaders) {
        System.out.println(header.key());
        System.out.println(new String(header.value()));
    }
}

Here, we’re using the various getter methods from the ConsumerRecord class to fetch message keys, values, and custom headers. ConsumerRecord.headers() method returns an instance of Headers containing the custom headers. Headers is basically an Iterable instance of the Header type. We then loop through each Header instance and fetch the header key and value using the Header.key() and Header.value() methods, respectively.

5. Conclusion

In this article, we’ve learned how to add custom headers to a Kafka message. We looked at the different constructors available that accept custom headers with their corresponding implementations.

We then saw how we could consume a message with custom headers and verify them.

As always, the complete code for all the examples is available over on GitHub.

Course – LS – All

Get started with Spring and Spring Boot, through the Learn Spring course:

>> CHECK OUT THE COURSE
res – REST with Spring (eBook) (everywhere)
Comments are closed on this article!