Expand Authors Top

If you have a few years of experience in the Java ecosystem and you’d like to share that with the community, have a look at our Contribution Guidelines.

Expanded Audience – Frontegg – Security (partner)
announcement - icon User management is very complex, when implemented properly. No surprise here.

Not having to roll all of that out manually, but instead integrating a mature, fully-fledged solution - yeah, that makes a lot of sense.
That's basically what Frontegg is - User Management for your application. It's focused on making your app scalable, secure and enjoyable for your users.
From signup to authentication, it supports simple scenarios all the way to complex and custom application logic.

Have a look:

>> Elegant User Management, Tailor-made for B2B SaaS

November Discount Launch 2022 – Top
We’re finally running a Black Friday launch. All Courses are 30% off until the end of this week:

>> GET ACCESS NOW

November Discount Launch 2022 – TEMP TOP (NPI)
We’re finally running a Black Friday launch. All Courses are 30% off until the end of this week:

>> GET ACCESS NOW

1. Overview

In this article, we'll explore a few strategies to purge data from an Apache Kafka topic.

2. Clean-Up Scenario

Before we learn the strategies to clean up the data, let's acquaint ourselves with a simple scenario that demands a purging activity.

2.1. Scenario

Messages in Apache Kafka automatically expire after a configured retention time. Nonetheless, in a few cases, we might want the message deletion to happen immediately.

Let's imagine that a defect has been introduced in the application code that is producing messages in a Kafka topic. By the time a bug-fix is integrated, we already have many corrupt messages in the Kafka topic that are ready for consumption.

Such issues are most common in a development environment, and we want quick results. So, bulk deletion of messages is a rational thing to do.

2.2. Simulation

To simulate the scenario, let's start by creating a purge-scenario topic from the Kafka installation directory:

$ bin/kafka-topics.sh \
  --create --topic purge-scenario --if-not-exists \
  --partitions 2 --replication-factor 1 \
  --zookeeper localhost:2181

Next, let's use the shuf command to generate random data and feed it to the kafka-console-producer.sh script:

$ /usr/bin/shuf -i 1-100000 -n 50000000 \
  | tee -a /tmp/kafka-random-data \
  | bin/kafka-console-producer.sh \
  --bootstrap-server=0.0.0.0:9092 \
  --topic purge-scenario

We must note that we've used the tee command to save the simulation data for later use.

Finally, let's verify that a consumer can consume messages from the topic:

$ bin/kafka-console-consumer.sh \
  --bootstrap-server=0.0.0.0:9092 \
  --from-beginning --topic purge-scenario \
  --max-messages 3
76696
49425
1744
Processed a total of 3 messages

3. Message Expiry

The messages produced in the purge-scenario topic will have a default retention period of seven days. To purge messages, we can temporarily reset the retention.ms topic-level property to ten seconds and wait for messages to expire:

$ bin/kafka-configs.sh --alter \
  --add-config retention.ms=10000 \
  --bootstrap-server=0.0.0.0:9092 \
  --topic purge-scenario \
  && sleep 10

Next, let's verify that the messages have expired from the topic:

$ bin/kafka-console-consumer.sh  \
  --bootstrap-server=0.0.0.0:9092 \
  --from-beginning --topic purge-scenario \
  --max-messages 1 --timeout-ms 1000
[2021-02-28 11:20:15,951] ERROR Error processing message, terminating consumer process:  (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.TimeoutException
Processed a total of 0 messages

Finally, we can restore the original retention period of seven days for the topic:

$ bin/kafka-configs.sh --alter \
  --add-config retention.ms=604800000 \
  --bootstrap-server=0.0.0.0:9092 \
  --topic purge-scenario

With this approach, Kafka will purge messages across all the partitions for the purge-scenario topic.

4. Selective Record Deletion

At times, we might want to delete records selectively within one or more partitions from a specific topic. We can satisfy such requirements by using the kafka-delete-records.sh script.

First, we need to specify the partition-level offset in the delete-config.json configuration file.

Let's purge all messages from the partition=1 by using offset=-1:

{
  "partitions": [
    {
      "topic": "purge-scenario",
      "partition": 1,
      "offset": -1
    }
  ],
  "version": 1
}

Next, let's proceed with record deletion:

$ bin/kafka-delete-records.sh \
  --bootstrap-server localhost:9092 \
  --offset-json-file delete-config.json

We can verify that we're still able to read from partition=0:

$ bin/kafka-console-consumer.sh \
  --bootstrap-server=0.0.0.0:9092 \
  --from-beginning --topic purge-scenario --partition=0 \
  --max-messages 1 --timeout-ms 1000
  44017
  Processed a total of 1 messages

However, when we read from partition=1, there will be no records to process:

$ bin/kafka-console-consumer.sh \
  --bootstrap-server=0.0.0.0:9092 \
  --from-beginning --topic purge-scenario \
  --partition=1 \
  --max-messages 1 --timeout-ms 1000
[2021-02-28 11:48:03,548] ERROR Error processing message, terminating consumer process:  (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.TimeoutException
Processed a total of 0 messages

5. Delete and Recreate the Topic

Another workaround to purge all messages of a Kafka topic is to delete and recreate it. However, this is only possible if we set the delete.topic.enable property to true while starting the Kafka server:

$ bin/kafka-server-start.sh config/server.properties \
  --override delete.topic.enable=true

To delete the topic, we can use the kafka-topics.sh script:

$ bin/kafka-topics.sh \
  --delete --topic purge-scenario \
  --zookeeper localhost:2181
Topic purge-scenario is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.

Let's verify it by listing the topic:

$ bin/kafka-topics.sh --zookeeper localhost:2181 --list

After confirming that the topic is no longer listed, we can now go ahead and recreate it.

6. Conclusion

In this tutorial, we simulated a scenario where we'd need to purge an Apache Kafka topic. Moreover, we explored multiple strategies to purge it completely or selectively across partitions.

November Discount Launch 2022 – Bottom
We’re finally running a Black Friday launch. All Courses are 30% off until the end of this week:

>> GET ACCESS NOW

Generic footer banner
Comments are closed on this article!