Let's get started with a Microservice Architecture with Spring Cloud:
Understanding and Avoiding CommitFailedException in Kafka
Last updated: June 27, 2026
1. Overview
Kafka provides a high degree of flexibility in committing offsets for processed messages in the consumer. It includes synchronous, asynchronous, and auto-commit options.
While this reduces the complexity of the commit process, unexpected errors can still occur.
In this tutorial, we’ll learn about a commit offset-related failure in a Kafka consumer application. We’ll debug the root cause of the resulting CommitFailedException and implement a few solutions to avoid it where possible.
2. Implementing a Consumer Service to Demonstrate the Issue
Let’s build a simple consumer application that processes records sequentially.
We’ll implement the consume method in the KafkaConsumerService class:
public class KafkaConsumerService {
private final KafkaConsumer<String, String> consumer;
private final AtomicBoolean running = new AtomicBoolean(true);
public void consume() {
try {
while (running.get()) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
if (records.isEmpty()) {
continue;
}
records.forEach(this::simulateDBCall);
consumer.commitSync();
}
} catch (WakeupException ex) {
if (running.get()) {
log.error("Error in the Kafka Consumer with exception", ex);
throw ex;
}
} finally {
consumer.close();
}
}
}
In the code above, we process records sequentially and then commit the offset.
To simulate a database call, we’ll add a delay in the above simulateDBCall method:
private void simulateDBCall(ConsumerRecord<String, String> record) {
try {
log.info("Simulating a DB call - record key {} value {}", record.key(), record.value());
Thread.sleep(150L);
} catch (InterruptedException ex) {
Thread.currentThread()
.interrupt();
throw new RuntimeException(ex);
}
}
The delay will cause the consumer to wait around 150 ms before the next record.
3. Testing the Service
Next, let’s test the consumer service. Note that we use Testcontainers in our test harness to provide a local Kafka broker for testing; full details are available in the supporting GitHub repository linked at the end of this article.
First, we’ll include the consumer-related configs in the test class:
private static Properties getConsumerConfig() {
Properties consumerProperties = new Properties();
consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer-app");
consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
consumerProperties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1);
consumerProperties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 50);
return consumerProperties;
}
In the config above, we’re polling for one record with a maximum poll interval of 50 ms and manually committing the offset.
We’ll implement a test case by producing, consuming a message, and verifying the offset:
@Test
void givenProducerMessageIsSent_whenConsumerIsRunning_thenConsumerOffsetIsCommitted() {
KafkaConsumerService kafkaConsumerService = new KafkaConsumerService(getConsumerConfig(), "test-topic");
Thread th = new Thread(kafkaConsumerService::consume);
th.start();
try (KafkaProducer<String, String> producer = new KafkaProducer<>(getProducerConfig())) {
producer.send(new ProducerRecord<>("test-topic", "x1", "test"));
producer.flush();
}
Awaitility.await()
.atMost(30, TimeUnit.SECONDS)
.pollInterval(2, TimeUnit.SECONDS)
.untilAsserted(() -> {
TopicPartition topicPartition = new TopicPartition("test-topic", 0);
Map<TopicPartition, OffsetAndMetadata> committedOffsets;
try (AdminClient adminClient = AdminClient.create(getAdminProps())) {
ListConsumerGroupOffsetsResult result = adminClient.listConsumerGroupOffsets("consumer-app");
committedOffsets = result.partitionsToOffsetAndMetadata()
.get();
}
assertNotNull(committedOffsets);
assertNotNull(committedOffsets.get(topicPartition));
assertEquals(1L, committedOffsets.get(topicPartition)
.offset());
});
kafkaConsumerService.shutdown();
}
Instead of offset committed, we’ll see the following error log with the test case also failing:
org.apache.kafka.clients.consumer.CommitFailedException: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group.
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:1280)
...
at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:918)
at com.baeldung.kafka.commitfailure.sequential.KafkaConsumerService.consume(KafkaConsumerService.java:34)
From the log above, it’s obvious that the CommitFailedException was thrown while committing the offset in the consume method. Also, it says that the consumer is no longer part of the group. Kafka gives some hints in the class documentation on the possible reason.
4. Root Cause of the CommitFailedException
We can monitor and debug the root cause with multiple options, including container logs, CLI, and Kafka UI tools.
First, we’ll check the Kafka container logs for the error:
[2026-06-08 16:53:05,349] INFO [GroupCoordinator 1]: Preparing to rebalance group consumer-app in state PreparingRebalance with old generation 1 (__consumer_offsets-0) (reason: Removing member consumer-consumer-app-1-3792090b-be55-4e19-bdbd-a3c66b968168 on LeaveGroup; client reason: consumer poll timeout has expired.)
...
[2026-06-08 16:53:05,354] INFO [GroupCoordinator 1]: Member MemberMetadata(memberId=consumer-consumer-app-1-3792090b-be55-4e19-bdbd-a3c66b968168, groupInstanceId=None, clientId=consumer-consumer-app-1, clientHost=/172.17.0.1, sessionTimeoutMs=45000, rebalanceTimeoutMs=50, supportedProtocols=List(range, cooperative-sticky)) has left group consumer-app through explicit `LeaveGroup`; client reason: consumer poll timeout has expired...
We’ll also confirm the consumer group’s state using the kafka-consumer-groups command:
sh-4.4$ /usr/bin/kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group consumer-app --state
Consumer group 'consumer-app' has no active members.
GROUP COORDINATOR (ID) ASSIGNMENT-STRATEGY STATE #MEMBERS
consumer-app 8db993586dab:9092 (1) Empty 0
Based on the above logs and status, we can see that the consumer was removed due to a client-specific error: consumer poll timeout has expired.
Kafka has multiple ways to check if the consumer is healthy and running. This includes a background heartbeat check, which should run before the session timeout.
Additionally, Kafka expects the consumer to poll once within the specified max poll interval time period. If the consumer does not poll for any reason, it will be considered dead, and the partition will be reassigned to another consumer.
By further debugging and re-verifying the application logs, we observe that the record processing was clearly taking more time, and the consumer could not call the poll method within the specified MAX_POLL_INTERVAL_MS_CONFIG time period. Consequently, the consumer was removed by the Kafka group coordinator.
So, any further attempt to commit an offset failed with the error. It makes sense as the consumer is no longer the partition owner, and a commit can corrupt the offset state.
We’ll explore a few practical ways to prevent the error.
5. Approaches to Avoid the Problem
With the consumer polling understanding, we’ll try to solve it by analyzing the consumer code and configuration.
We may consider enabling auto-commit config, but that brings a possibility of data loss and irregular commit timing. Moreover, this approach does not ensure at-least-once processing.
Instead, we’ll explore some relevant manual commit solutions.
5.1. Tuning the Poll Duration and Batch Size
By debugging the code, we found that the actual processing time was significantly longer than the specified maximum poll interval.
We’ll increase the maximum poll interval config:
consumerProperties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1);
consumerProperties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300);
We can also fine-tune the batch size according to our batch processing timing and throughput requirements.
Let’s test the updated consumer and confirm that no CommitFailedException is thrown.
We should note that it’s good to set the poll interval to about 3 times the batch processing time to ensure a safe buffer.
While changing the poll interval works, we may still face the issue in more complex setups, such as processing involving database transactions or network calls.
5.2. Async Processing
If we’re processing records in the same thread that performs I/O operations, the above solution reduces throughput and increases latency.
In a production environment, we can’t guarantee a predictable database execution time, so configuring just the polling duration might be insufficient.
We can consider asynchronous processing of the records, so the consumer thread blocks for much shorter periods.
In Kafka, this can be done by processing the records asynchronously with virtual threads.
First, we’ll include a worker thread pool and a per-partition offset tracking map in the consumer service:
public class KafkaConsumerService {
private final KafkaConsumer<String, String> consumer;
private final AtomicBoolean running = new AtomicBoolean(true);
private final ExecutorService workers;
private final Map<TopicPartition, AtomicLong> committableOffsets = new ConcurrentHashMap<>();
public KafkaConsumerService(Properties consumerProps, String topic) {
this.consumer = new KafkaConsumer<>(consumerProps);
consumer.subscribe(List.of(topic));
workers = Executors.newVirtualThreadPerTaskExecutor();
}
}
Next, we’ll update the consume method with async processing using the worker threads:
public void consume() {
try {
while (running.get()) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
if (records.isEmpty()) {
continue;
}
List<CompletableFuture<Void>> futures = processAsync(records);
try {
CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new))
.orTimeout(700L, TimeUnit.MILLISECONDS)
.join();
} catch (CompletionException ex) {
log.error("Batch processing timed out or failed", ex);
}
commitOffsets();
}
} catch (WakeupException ex) {
if (running.get()) {
log.error("Error in the Kafka Consumer with exception", ex);
throw ex;
}
} finally {
commitOffsets();
consumer.close();
}
}
We’ll implement the processAsync method to asynchronously process the records:
private List<CompletableFuture<Void>> processAsync(ConsumerRecords<String, String> records) {
return StreamSupport.stream(records.spliterator(), false)
.map(record -> CompletableFuture.runAsync(() -> simulateDBUpdate(record), workers)
.whenComplete((ignored, ex) -> {
if (ex == null) {
markComplete(record);
} else {
log.error("Failed offset and send to DLQ {} {} {}",
record.offset(), record.key(), ex.getMessage());
}
})
.exceptionally(ex -> null))
.toList();
}
In the code above, the consumer waits for the workers to complete processing, then commits the processed offset. If any record fails during processing, we log the failure message and can send it to a Dead Letter Queue (DLQ) topic.
Next, let’s implement the markComplete method to update the processed record offset into the committableOffsets map:
private void markComplete(ConsumerRecord<String, String> record) {
TopicPartition tp = new TopicPartition(record.topic(), record.partition());
committableOffsets.computeIfAbsent(tp, k -> new AtomicLong(-1L))
.accumulateAndGet(record.offset() + 1L, Math::max);
}
Finally, we’ll implement the commitOffsets method to commit the processed offset:
private void commitOffsets() {
Map<TopicPartition, OffsetAndMetadata> toCommit = new HashMap<>();
committableOffsets.forEach((tp, atomicOffset) -> {
long val = atomicOffset.get();
if (val != -1L) {
toCommit.put(tp, new OffsetAndMetadata(val));
}
});
if (toCommit.isEmpty()) {
return;
}
consumer.commitSync(toCommit);
toCommit.forEach((tp, meta) -> {
AtomicLong ref = committableOffsets.get(tp);
if (ref != null) {
ref.compareAndSet(meta.offset(), -1L);
}
});
}
In the method above, we build the committed offset snapshot map for all the processed records and commit it. We also atomically mark the committed offset as completed.
Now, let’s configure the consumer with an increased batch size and max poll interval in the test class:
consumerProperties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30);
consumerProperties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 1000);
Finally, we’ll implement a test case by producing and consuming multiple messages and verifying the offset:
@Test
void givenProducerMessagesAreSent_whenConsumerIsRunning_thenConsumerOffsetIsCommitted() {
KafkaConsumerService kafkaConsumerService = new KafkaConsumerService(getConsumerConfig(), "test-topic");
Thread th = new Thread(kafkaConsumerService::consume);
th.start();
try (KafkaProducer<String, String> producer = new KafkaProducer<>(getProducerConfig())) {
for (int num = 0; num < 100; num++) {
producer.send(new ProducerRecord<>("test-topic", "x1" + num, "test" + num));
}
producer.flush();
}
Awaitility.await()
.atMost(30, TimeUnit.SECONDS)
.pollInterval(2, TimeUnit.SECONDS)
.untilAsserted(() -> {
TopicPartition topicPartition = new TopicPartition("test-topic", 0);
Map<TopicPartition, OffsetAndMetadata> committedOffsets;
try (AdminClient adminClient = AdminClient.create(getAdminProps())) {
ListConsumerGroupOffsetsResult result = adminClient.listConsumerGroupOffsets("consumer-app");
committedOffsets = result.partitionsToOffsetAndMetadata()
.get();
}
assertNotNull(committedOffsets);
assertNotNull(committedOffsets.get(topicPartition));
assertEquals(100L, committedOffsets.get(topicPartition)
.offset());
});
kafkaConsumerService.shutdown();
}
By running the test above, we have successfully verified that the committed offset matches the total number of consumed messages.
5.3. Implementing a ConsumerRebalanceListener
We should note that it’s always good practice to implement ConsumerRebalanceListener in the consumer to commit any in-flight offset and clean any data.
Let’s implement the ConsumerRebalanceListener in the KafkaConsumer object in the KafkaConsumerService class:
consumer.subscribe(List.of(topic), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
try {
commitOffsets(partitions);
} catch (Exception ex) {
log.error("Commit failed during rebalance", ex);
} finally {
partitions.forEach(committableOffsets::remove);
}
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
partitions.forEach(committableOffsets::remove);
}
});
We’ll also need to implement the commitOffsets method to commit offsets for the specific partitions:
private void commitOffsets(Collection<TopicPartition> partitions) {
Map<TopicPartition, OffsetAndMetadata> toCommit = new HashMap<>();
partitions.forEach(tp -> {
AtomicLong ref = committableOffsets.get(tp);
if (ref != null) {
long val = ref.get();
if (val != -1L) {
toCommit.put(tp, new OffsetAndMetadata(val));
}
}
});
if (toCommit.isEmpty()) {
return;
}
consumer.commitSync(toCommit);
}
In the code above, we’re committing the offset by creating a snapshot map from the specific partition’s offset.
Now, let’s run the previous section’s test method to verify the offset for the updated consumer implementation:
13:01:21.571 [virtual-158] INFO c.b.k.c.f.async.KafkaConsumerService - Simulating a db call - record key x199 value test99
From the test above, we confirm that all messages were processed and committed.
5.4. Best Practices
While we cannot prevent exceptions entirely, we can protect the system from any reprocessing duplication, data loss, or silent message drops.
We should implement idempotency on both the producer and the consumer, validate database constraints, and ensure side-effect-free reprocessing. Additionally, we should send the failed messages to a retry or DLQ topic.
6. Conclusion
In this article, we learned about one of the common offset commit-related exceptions in the Consumer application. We explored why the resulting CommitFailedException happens and how to prevent it. We also implemented a preventive solution by tuning the maximum poll interval and switching to asynchronous processing logic.
As always, the example code can be found over on GitHub.
















