Let's get started with a Microservice Architecture with Spring Cloud:
Reset Consumer Offset in Kafka
Last updated: January 29, 2026
1. Overview
In Apache Kafka, the consumer offset is the pointer that indicates the next message that the consumer should read from a topic partition.
Normally, we don’t need to change the committed offset in the consumer application. However, sometimes we need to change the offset to reprocess or skip the messages. The purpose of the offset change can vary, such as fixing an application bug, reprocessing downstream failures, rebuilding state, or testing.
In this tutorial, we’ll learn how to reset the consumer offset in Kafka. We’ll implement different approaches, like using the Kafka CLI tool, programmatically resetting the consumer offset, and using the Kafka Admin API.
2. Reset With Kafka CLI
The Kafka CLI tool helps view, manage, and reset consumer offsets. This would be helpful for any one-time change or during testing.
2.1. Setup the Kafka
First, we’ll configure a local Kafka instance running in Kraft mode with the docker-compose.yml config:
version: "3.8"
services:
kafka:
image: confluentinc/cp-kafka:7.9.0
hostname: kafka
container_name: kafka
ports:
- "9092:9092"
- "9101:9101"
expose:
- '29092'
environment:
KAFKA_NODE_ID: 1
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092'
KAFKA_LISTENERS: 'PLAINTEXT://kafka:29092,CONTROLLER://kafka:29093,PLAINTEXT_HOST://0.0.0.0:9092'
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_PROCESS_ROLES: 'broker,controller'
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka:29093'
KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'
Next, we’ll run the above service in the terminal:
$ docker compose up -d
Then, let’s create a test-topic topic using the kafka-topics command:
$ docker exec kafka kafka-topics --bootstrap-server
localhost:9092 --create --topic test-topic --partitions 2 --replication-factor 1
Let’s produce and consume a few test messages.
2.2. Producer and Consumer
We’ll produce two test messages by executing the kafka-console-producer command:
$ docker exec -it kafka kafka-console-producer --bootstrap-server localhost:9092 --topic test-topic
>test1
>test2
Then, we’ll consume the messages by running the kafka-console-consumer command:
$ docker exec -it kafka kafka-console-consumer --bootstrap-server localhost:9092 --topic test-topic --group test-group
test1
test2
The consumer above reads both messages and commits the offset.
2.3. Reset With the kafka-consumer-groups Command
Let’s run the kafka-consumer-groups‘s describe command to confirm the current committed offset:
$ docker exec kafka kafka-consumer-groups --bootstrap-server localhost:9092 --group test-group --describe
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
test-group test-topic 0 0 0 0 console-consumer-636e6769-eead-4491-ac63-4179e2620dac /127.0.0.1 console-consumer
test-group test-topic 1 2 2 0 console-consumer-636e6769-eead-4491-ac63-4179e2620dac /127.0.0.1 console-consumer
For this example, we’ll reset the offset to the earliest committed offset value.
We should ensure that the consumer service is stopped before resetting any offset. Kafka does not allow resetting the offset in an active consumer.
First, we’ll preview the new offset using the kafka-consumer-groups –dry-run command:
$ docker exec kafka kafka-consumer-groups --bootstrap-server
localhost:9092 --group test-group --topic test-topic --reset-offsets --to-earliest --dry-run
GROUP TOPIC PARTITION NEW-OFFSET
test-group test-topic 0 0
test-group test-topic 1 0
From the data above, we confirm the new offset value before applying the change.
Then, let’s run the kafka-consumer-groups‘s execute command to reset the offset:
$ docker exec kafka kafka-consumer-groups --bootstrap-server
localhost:9092 --group test-group --topic test-topic --reset-offsets --to-earliest --execute
Finally, we’ll execute the kafka-console-consumer command to restart the same consumer:
$ docker exec -it kafka kafka-console-consumer --bootstrap-server
localhost:9092 --topic test-topic --group test-group
test1
test2
The consumer above replays the same two messages after the offset reset.
Here’s the other –reset-offsets options for the kafka-consumer-group command:
- –to-latest – Move to the most recent offset, making the consumer skip previous messages and process new data
- –to-offset <offset> – Moves the consumer to a specific, fixed offset number
- –shift-by <n> – Moves the offset forward (+n) or backward (-n) from its current position
- –to-datetime <timestamp> – Resets the offset to the position corresponding to a specific timestamp
- –by-duration <duration> – Resets to a point in the recent past
- –from-file <File> – Resets the offset to values mentioned in a specific CSV file
We should be careful before resetting the offset in the production environment and test it before changing it.
3. Programmatically Reset in Consumer
Kafka provides a client API for programmatically seeking to a particular offset in the consumer service. The new offset position can be from the beginning, latest, from a specific time, or an offset value.
For this example, we’ll implement a common use case: replaying all messages after a specific time.
3.1. Implement the Consumer Rebalance Listener
To seek from a desired offset position or timestamp, we’ll need to implement the ConsumerRebalanceListener interface. The ConsumerRebalanceListener implementation will include the seeking logic for partition assignment.
The ConsumerRebalanceListener implementation will include a required KafkaConsumer, a replayFromTime, and a seekDone flag to check whether the replay is complete.
Let’s implement the ReplayRebalanceListener class with the onPartitionsAssigned method:
public class ReplayRebalanceListener implements ConsumerRebalanceListener {
private final KafkaConsumer<String, String> consumer;
private final long replayFromTimeInEpoch;
private final AtomicBoolean seekDone = new AtomicBoolean(false);
public ReplayRebalanceListener(KafkaConsumer<String, String> consumer, long replayFromTimeInEpoch) {
this.consumer = consumer;
this.replayFromTimeInEpoch = replayFromTimeInEpoch;
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
if (seekDone.get() || partitions.isEmpty()) {
return;
}
Map<TopicPartition, Long> partitionsTimestamp = partitions.stream()
.collect(Collectors.toMap(Function.identity(), tp -> replayFromTimeInEpoch));
Map<TopicPartition, OffsetAndTimestamp> offsets = consumer.offsetsForTimes(partitionsTimestamp);
partitions.forEach(tp -> {
OffsetAndTimestamp offsetAndTimestamp = offsets.get(tp);
if (offsetAndTimestamp != null) {
consumer.seek(tp, offsetAndTimestamp.offset());
}
});
seekDone.set(true);
}
}
In the code above, we intentionally seek the consumer based on a timestamp for each partition.
The onPartitionsAssigned method will run before the consumer starts polling for the ConsumerRecords.
Additionally, if the seek was already done, then it returns without re-seeking. This is done to avoid unnecessary replays to the consumer due to any errors.
3.2. Implement the Kafka Consumer Service
We’ll implement a Kafka consumer service using the above ReplayRebalanceListener class.
First, we’ll define the KafkaConsumerService class with a KafkaConsumer and a running boolean flag:
public class KafkaConsumerService {
private static final Logger log = LoggerFactory.getLogger(KafkaConsumerService.class);
private final KafkaConsumer<String, String> consumer;
private final AtomicBoolean running = new AtomicBoolean(true);
public KafkaConsumerService(Properties consumerProps, String topic, long replayFromTimestampInEpoch) {
this.consumer = new KafkaConsumer<>(consumerProps);
if (replayFromTimestampInEpoch > 0) {
consumer.subscribe(List.of(topic),
new ReplayRebalanceListener(consumer, replayFromTimestampInEpoch));
} else {
consumer.subscribe(List.of(topic));
}
}
}
In the code above, we subscribe the Kafka consumer to the topic and include the ReplayRebalanceListener only if the replayFromTimestampInEpoch is non-zero.
We’ll implement the start method in the above class to start polling for ConsumerRecords:
public void start() {
try {
while (running.get()) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
records.forEach(record -> log.info("topic={} partition={} offset={} key={} value={}",
record.topic(), record.partition(), record.offset(), record.key(), record.value())););
consumer.commitSync();
}
} catch (WakeupException ex) {
if (running.get()) {
log.error("Error in the Kafka Consumer with exception {}", ex.getMessage(), ex);
throw ex;
}
} finally {
consumer.close();
}
}
In the code above, the consumer continuously polls the topic, logs the consumer record, and finally commits.
Also, let’s add the shutdown method to gracefully shutdown the consumer service:
public void shutdown() {
running.set(false);
consumer.wakeup();
}
In Spring application, we can implement the same using the ConsumerSeekAware interface.
Next, we’ll test the KafkaConsumerService class.
3.3. Test the Consumer Service
We’ll implement the integration tests for the KafkaConsumerService class using the testcontainers for Kafka.
Let’s consider a test case where the consumer is replayed after a particular timestamp, and then verify if the consumer was already read or not based on using the same consumer group as a test consumer.
We’ll implement the consumeFromCommittedOffset helper method to get the committed offset:
private List<String> consumeFromCommittedOffset(String topic, String groupId) {
List<String> values = new ArrayList<>();
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(getConsumerConfig(groupId))) {
consumer.subscribe(Collections.singleton(topic));
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(2));
for (ConsumerRecord<String, String> r : records) {
values.add(r.value());
}
}
return values;
}
We’ll implement a test by sending two messages, and then replay the consumer after the first message timestamp:
@Test
void givenConsumerReplayIsEnabled_whenReplayTimestampIsProvided_thenConsumesFromTimestamp() {
KafkaProducer<String, String> producer = new KafkaProducer<>(getProducerConfig());
long firstMsgTs = System.currentTimeMillis();
producer.send(new ProducerRecord<>("test-topic-1", 0, firstMsgTs, "x1", "test1"));
producer.flush();
long baseTs = System.currentTimeMillis();
long secondMsgTs = baseTs + 1L;
producer.send(new ProducerRecord<>("test-topic-1", 0, secondMsgTs, "x2", "test2"));
producer.flush();
KafkaConsumerService kafkaConsumerService = new KafkaConsumerService(getConsumerConfig("test-group-1"),
"test-topic-1",
baseTs);
new Thread(kafkaConsumerService::start).start();
Awaitility.await()
.atMost(45, TimeUnit.SECONDS)
.pollInterval(1, TimeUnit.SECONDS)
.untilAsserted(() -> {
List<String> consumed = consumeFromCommittedOffset("test-topic-1", "test-group-1");
assertEquals(0, consumed.size());
assertFalse(consumed.contains("test1"));
assertFalse(consumed.contains("test2"));
});
kafkaConsumerService.shutdown();
}
We’ll verify the test assertions and the KafkaConsumerService logs:
16:49:38.898 [Thread-5] INFO c.b.k.r.c.KafkaConsumerService - topic=test-topic partition=0 offset=1 key=x2 value=test2
From the test above, we confirm the consumer is polling after the provided timestamp and skips the previous messages.
We should note that the consumer is recommended not to enable the ENABLE_AUTO_COMMIT_CONFIG. As the consumer commits happen periodically and independently of the rebalance, they might cause message duplication.
4. Reset With Kafka Admin API
Instead of using the Kafka CLI command or changing the consumer-side code, we can consider implementing the reset offset management into the existing automation tools.
Kafka provides the Admin API to programmatically administer and manage the cluster. Using this API, we can change the offset for any topic and consumer group.
4.1. Implement the Kafka Offset Admin Service
Using the AdminClient API, we can update the consumer offset for any topic and consumer group.
First, we’ll implement the ResetOffsetService class with the AdminClient instance:
public class ResetOffsetService {
private final AdminClient adminClient;
public ResetOffsetService(String bootstrapServers) {
this.adminClient = AdminClient.create(Map.of(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers));
}
}
Next, we’ll implement the fetchPartitions method to get the topic partitions data in the above class:
private List<TopicPartition> fetchPartitions(String topic) throws ExecutionException, InterruptedException {
return adminClient.describeTopics(List.of(topic))
.values()
.get(topic)
.get()
.partitions()
.stream()
.map(p -> new TopicPartition(topic, p.partition()))
.toList();
}
Then, let’s implement the fetchEarliestOffsets method to fetch the above partition’s earliest offset values:
private Map<TopicPartition, OffsetAndMetadata> fetchEarliestOffsets(List<TopicPartition> partitions) {
Map<TopicPartition, OffsetSpec> offsetSpecs = partitions.stream()
.collect(Collectors.toMap(tp -> tp, tp -> OffsetSpec.earliest()));
ListOffsetsResult offsetsResult = adminClient.listOffsets(offsetSpecs);
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
partitions.forEach(tp -> {
long offset = Optional.ofNullable(offsetsResult.partitionResult(tp))
.map(kafkaFuture -> {
try {
return kafkaFuture.get();
} catch (InterruptedException | ExecutionException ex) {
log.error("Error in the Kafka Consumer reset with exception {}", ex.getMessage(), ex);
throw new RuntimeException(ex);
}
})
.map(ListOffsetsResult.ListOffsetsResultInfo::offset)
.orElseThrow(() -> new RuntimeException("No offset result returned for partition " + tp));
offsets.put(tp, new OffsetAndMetadata(offset));
});
return offsets;
}
In the code above, we’re fetching the offset value per partition using the AdminClient‘s listOffsets method and then returning as a Map<TopicPartition, OffsetAndMetadata> instance.
To reset the offset, we’ll first fetch the partition’s information, get the earliestOffsets map, and alter the offsets using the AdminClient‘s alterConsumerGroupOffsets method.
Finally, we’ll implement the reset method with the topic and the consumerGroup as input in the above class:
public void reset(String topic, String consumerGroup) {
List<TopicPartition> partitions;
try {
partitions = fetchPartitions(topic);
} catch (ExecutionException | InterruptedException ex) {
log.error("Error in the fetching partitions with exception {}", ex.getMessage(), ex);
throw new RuntimeException(ex);
}
Map<TopicPartition, OffsetAndMetadata> earliestOffsets = fetchEarliestOffsets(partitions);
try {
adminClient.alterConsumerGroupOffsets(consumerGroup, earliestOffsets)
.all()
.get();
} catch (InterruptedException | ExecutionException ex) {
log.error("Error in the Kafka Consumer reset with exception {}", ex.getMessage(), ex);
throw new RuntimeException(ex);
}
}
In the code above, we’re throwing a RuntimeException in case of any failure in the above methods.
Instead of resetting to the earliest offset, we can also reset the offset after a particular timestamp, the latest, or any offset value.
4.2. Testing the Admin Service
Similar to the earlier test, we’ll implement the integration tests for the ResetOffsetService class using the testcontainers.
Let’s implement the fetchCommitedOffset method to get the partition’s committed offset:
private long fetchCommittedOffset(String groupId) throws ExecutionException, InterruptedException {
Map<TopicPartition, OffsetAndMetadata> offsets = testAdminClient.listConsumerGroupOffsets(groupId)
.partitionsToOffsetAndMetadata()
.get();
return offsets.values()
.iterator()
.next()
.offset();
}
Now, we’ll write the test to verify the ResetService‘s reset method:
@Test
void givenMessagesAreConsumed_whenOffsetIsReset_thenOffsetIsSetToEarliest() {
KafkaProducer<String, String> producer = new KafkaProducer<>(getProducerConfig());
producer.send(new ProducerRecord<>("test-topic-1", "msg-1"));
producer.send(new ProducerRecord<>("test-topic-1", "msg-2"));
producer.flush();
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(getConsumerConfig("test-group-1"));
consumer.subscribe(List.of("test-topic-1"));
int consumed = 0;
while (consumed < 2) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
consumed += records.count();
}
consumer.commitSync();
consumer.close();
await().atMost(5, SECONDS)
.pollInterval(Duration.ofMillis(300))
.untilAsserted(() -> assertEquals(2L, fetchCommittedOffset("test-group-1")));
resetService.reset("test-topic-1", "test-group-1");
await().atMost(5, SECONDS)
.pollInterval(Duration.ofMillis(300))
.untilAsserted(() -> assertEquals(0L, fetchCommittedOffset("test-group-1")));
}
In the code above, we’re asserting that the committed offset is before and after the reset.
Consumers should be idempotent to safely handle offset resets without causing duplicate processing.
5. Conclusion
In this article, we’ve learned how to reset the consumer offset in Kafka. We’ve implemented it in multiple ways, like using the Kafka CLI tool, by replaying with the KafkaRebalanceListener interface and using the AdminClient API. We’ve also tested the implementations in a Docker environment.
As always, the example code can be found over on GitHub.
















