Course – LS – All

Get started with Spring and Spring Boot, through the Learn Spring course:

>> CHECK OUT THE COURSE

1. Introduction

In this short tutorial, we’ll see how to retrieve the last N messages from an Apache Kafka Topic.

In the first part of the article, we’ll focus on the prerequisites we need to be able to execute this operation. In the second part, we’ll build a small utility to read the messages using Java with the Kafka Java API library. Finally, we’ll provide short guidance to achieve the same results from the command line using KafkaCat.

2. Prerequisites

To retrieve the last N messages from a Kafka Topic is as simple as consuming messages starting from a well-defined offset. The offset in a Kafka Topic indicates the current position of a consumer. In a previous article, we have seen how it’s possible to get a particular number of messages in an Apache Kafka Topic leveraging the consumer.seekToEnd() method.

Considering the same functionality, we can get the intuition of calculating the correct offset by performing a simple subtraction: offset = lastOffset – N. We can then start polling N messages from this position.

Nonetheless, this method won’t work if we produce records using a Transactional Producer. In this case, the offset will skip some numbers to accommodate Kafka Topic Transactional Records (commit/rollback, etc.). One common case in which Transactional Producers are used is when we need to process Kafka Messages exactly once. Simply put, if we start reading messages at (lastOffset – N), we might consume less than N messages because some offset numbers are consumed by transactional records.

3. Get the Last N Messages in a Kafka Topic with Java

First of all, we need to create a Producer and a Consumer:

Properties producerProperties = new Properties();
producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

Properties consumerProperties = new Properties();
consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "ConsumerGroup1");

KafkaProducer<String, String> producer = new KafkaProducer<>(producerProperties);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProperties);

Let’s now produce some messages:

final String TOPIC1 = "baeldung-topic";
int messagesInTopic = 100;
for (int i = 0; i < messagesInTopic; i++) {
    producer.send(new ProducerRecord(TOPIC1, null, MESSAGE_KEY, String.valueOf(i))).get();
}

For the sake of clarity and simplicity, let’s suppose we need to register just one partition for our consumer:

TopicPartition partition = new TopicPartition(TOPIC1, 0);
List<TopicPartition> partitions = new ArrayList<>();
partitions.add(partition);
consumer.assign(partitions);

As we mentioned before, we need to position the offset at the right place, and we can start polling:

int messagesToRetrieve = 10;
consumer.seekToEnd(partitions);
long startIndex = consumer.position(partition) - messagesToRetrieve;
consumer.seek(partition, startIndex);
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMinutes(1));

We might want to increase the polling duration in case the network is particularly slow, or the number of messages to retrieve is of particularly big cardinality. In this case, we need to consider that having a huge amount of records in memory could cause a resource shortage problem.

Let’s now finally check if we actually retrieved the right number of messages:

for (ConsumerRecord<String, String> record : records) {
    assertEquals(MESSAGE_KEY, record.key());
    assertTrue(Integer.parseInt(record.value()) >= (messagesInTopic - messagesToRetrieve));
    recordsReceived++;
}
assertEquals(messagesToRetrieve, recordsReceived);

4. Get the Last N Messages in a Kafka Topic with KafkaCat

KafkaCat (kcat) is a command-line tool that we can use to test and debug Kafka Topics. Kafka itself provides a good number of scripts and shell tools to do the same operations. Still, the simplicity and ease of use of KafkaCat are making it a de-facto standard when it comes to doing operations like retrieving the last N Messages in Apache Kafka Topic. Once installed, it’s possible to retrieve the latest N messages produced in a Kafka Topic by running this simple command:

$ kafkacat -C -b localhost:9092 -t topic-name -o -<N> -e
  • -C means that we need to consume messages
  • -b indicates the location of the Kafka Broker
  • -t indicates the topic name
  • -o indicates that we need to read starting from this offset. With negative sign means we need to read N messages from the end.
  • -e option exits upon reading the last message

Linking to the above case we discussed, the command to retrieve the last 10 messages from the topic named “baeldung-topic” is:

$ kafkacat -C -b localhost:9092 -t baeldung-topic -o -10 -e

5. Conclusion

In this short tutorial, we’ve seen how it’s possible to consume the latest N messages of a Kafka Topic. In the first part, we used the Java Kafka API Library. In the second part, we used a Command Line utility program called KafkaCat.

As always, the code is available over on GitHub.

Course – LS – All

Get started with Spring and Spring Boot, through the Learn Spring course:

>> CHECK OUT THE COURSE
res – REST with Spring (eBook) (everywhere)
Comments are open for 30 days after publishing a post. For any issues past this date, use the Contact form on the site.