eBook – Guide Spring Cloud – NPI EA (cat=Spring Cloud)
announcement - icon

Let's get started with a Microservice Architecture with Spring Cloud:

>> Join Pro and download the eBook

eBook – Mockito – NPI EA (tag = Mockito)
announcement - icon

Mocking is an essential part of unit testing, and the Mockito library makes it easy to write clean and intuitive unit tests for your Java code.

Get started with mocking and improve your application tests using our Mockito guide:

Download the eBook

eBook – Java Concurrency – NPI EA (cat=Java Concurrency)
announcement - icon

Handling concurrency in an application can be a tricky process with many potential pitfalls. A solid grasp of the fundamentals will go a long way to help minimize these issues.

Get started with understanding multi-threaded applications with our Java Concurrency guide:

>> Download the eBook

eBook – Reactive – NPI EA (cat=Reactive)
announcement - icon

Spring 5 added support for reactive programming with the Spring WebFlux module, which has been improved upon ever since. Get started with the Reactor project basics and reactive programming in Spring Boot:

>> Join Pro and download the eBook

eBook – Java Streams – NPI EA (cat=Java Streams)
announcement - icon

Since its introduction in Java 8, the Stream API has become a staple of Java development. The basic operations like iterating, filtering, mapping sequences of elements are deceptively simple to use.

But these can also be overused and fall into some common pitfalls.

To get a better understanding on how Streams work and how to combine them with other language features, check out our guide to Java Streams:

>> Join Pro and download the eBook

eBook – Jackson – NPI EA (cat=Jackson)
announcement - icon

Do JSON right with Jackson

Download the E-book

eBook – HTTP Client – NPI EA (cat=Http Client-Side)
announcement - icon

Get the most out of the Apache HTTP Client

Download the E-book

eBook – Maven – NPI EA (cat = Maven)
announcement - icon

Get Started with Apache Maven:

Download the E-book

eBook – Persistence – NPI EA (cat=Persistence)
announcement - icon

Working on getting your persistence layer right with Spring?

Explore the eBook

eBook – RwS – NPI EA (cat=Spring MVC)
announcement - icon

Building a REST API with Spring?

Download the E-book

Course – LS – NPI EA (cat=Jackson)
announcement - icon

Get started with Spring and Spring Boot, through the Learn Spring course:

>> LEARN SPRING
Course – RWSB – NPI EA (cat=REST)
announcement - icon

Explore Spring Boot 3 and Spring 6 in-depth through building a full REST API with the framework:

>> The New “REST With Spring Boot”

Course – LSS – NPI EA (cat=Spring Security)
announcement - icon

Yes, Spring Security can be complex, from the more advanced functionality within the Core to the deep OAuth support in the framework.

I built the security material as two full courses - Core and OAuth, to get practical with these more complex scenarios. We explore when and how to use each feature and code through it on the backing project.

You can explore the course here:

>> Learn Spring Security

Course – LSD – NPI EA (tag=Spring Data JPA)
announcement - icon

Spring Data JPA is a great way to handle the complexity of JPA with the powerful simplicity of Spring Boot.

Get started with Spring Data JPA through the guided reference course:

>> CHECK OUT THE COURSE

Partner – Moderne – NPI EA (cat=Spring Boot)
announcement - icon

Refactor Java code safely — and automatically — with OpenRewrite.

Refactoring big codebases by hand is slow, risky, and easy to put off. That’s where OpenRewrite comes in. The open-source framework for large-scale, automated code transformations helps teams modernize safely and consistently.

Each month, the creators and maintainers of OpenRewrite at Moderne run live, hands-on training sessions — one for newcomers and one for experienced users. You’ll see how recipes work, how to apply them across projects, and how to modernize code with confidence.

Join the next session, bring your questions, and learn how to automate the kind of work that usually eats your sprint time.

Course – LJB – NPI EA (cat = Core Java)
announcement - icon

Code your way through and build up a solid, practical foundation of Java:

>> Learn Java Basics

1. Overview

Kafka provides a high degree of flexibility in committing offsets for processed messages in the consumer. It includes synchronous, asynchronous, and auto-commit options.

While this reduces the complexity of the commit process, unexpected errors can still occur.

In this tutorial, we’ll learn about a commit offset-related failure in a Kafka consumer application. We’ll debug the root cause of the resulting CommitFailedException and implement a few solutions to avoid it where possible.

2. Implementing a Consumer Service to Demonstrate the Issue

Let’s build a simple consumer application that processes records sequentially.

We’ll implement the consume method in the KafkaConsumerService class:

public class KafkaConsumerService {

    private final KafkaConsumer<String, String> consumer;
    private final AtomicBoolean running = new AtomicBoolean(true);

    public void consume() {
        try {
            while (running.get()) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                if (records.isEmpty()) {
                    continue;
                }
                records.forEach(this::simulateDBCall);
                consumer.commitSync();
            }
        } catch (WakeupException ex) {
            if (running.get()) {
                log.error("Error in the Kafka Consumer with exception", ex);
                throw ex;
            }
        } finally {
            consumer.close();
        }
    }
}

In the code above, we process records sequentially and then commit the offset.

To simulate a database call, we’ll add a delay in the above simulateDBCall method:

private void simulateDBCall(ConsumerRecord<String, String> record) {
    try {
        log.info("Simulating a DB call - record key {} value {}", record.key(), record.value());
        Thread.sleep(150L);
    } catch (InterruptedException ex) {
        Thread.currentThread()
          .interrupt();
        throw new RuntimeException(ex);
    }
}

The delay will cause the consumer to wait around 150 ms before the next record.

3. Testing the Service

Next, let’s test the consumer service. Note that we use Testcontainers in our test harness to provide a local Kafka broker for testing; full details are available in the supporting GitHub repository linked at the end of this article.

First, we’ll include the consumer-related configs in the test class:

private static Properties getConsumerConfig() {
    Properties consumerProperties = new Properties();
    consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
    consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer-app");
    consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
    consumerProperties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1);
    consumerProperties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 50);
    
    return consumerProperties;
}

In the config above, we’re polling for one record with a maximum poll interval of 50 ms and manually committing the offset.

We’ll implement a test case by producing, consuming a message, and verifying the offset:

@Test
void givenProducerMessageIsSent_whenConsumerIsRunning_thenConsumerOffsetIsCommitted() {
    KafkaConsumerService kafkaConsumerService = new KafkaConsumerService(getConsumerConfig(), "test-topic");
    Thread th = new Thread(kafkaConsumerService::consume);
    th.start();

    try (KafkaProducer<String, String> producer = new KafkaProducer<>(getProducerConfig())) {
        producer.send(new ProducerRecord<>("test-topic", "x1", "test"));
        producer.flush();
    }

    Awaitility.await()
      .atMost(30, TimeUnit.SECONDS)
      .pollInterval(2, TimeUnit.SECONDS)
      .untilAsserted(() -> {
          TopicPartition topicPartition = new TopicPartition("test-topic", 0);
          Map<TopicPartition, OffsetAndMetadata> committedOffsets;
          try (AdminClient adminClient = AdminClient.create(getAdminProps())) {
              ListConsumerGroupOffsetsResult result = adminClient.listConsumerGroupOffsets("consumer-app");
              committedOffsets = result.partitionsToOffsetAndMetadata()
                .get();
          }

          assertNotNull(committedOffsets);
          assertNotNull(committedOffsets.get(topicPartition));
          assertEquals(1L, committedOffsets.get(topicPartition)
            .offset());
      });

    kafkaConsumerService.shutdown();
}

Instead of offset committed, we’ll see the following error log with the test case also failing:

org.apache.kafka.clients.consumer.CommitFailedException: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group.
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:1280)
	...
	at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:918)
	at com.baeldung.kafka.commitfailure.sequential.KafkaConsumerService.consume(KafkaConsumerService.java:34)

From the log above, it’s obvious that the CommitFailedException was thrown while committing the offset in the consume method. Also, it says that the consumer is no longer part of the group. Kafka gives some hints in the class documentation on the possible reason.

4. Root Cause of the CommitFailedException

We can monitor and debug the root cause with multiple options, including container logs, CLI, and Kafka UI tools.

First, we’ll check the Kafka container logs for the error:

[2026-06-08 16:53:05,349] INFO [GroupCoordinator 1]: Preparing to rebalance group consumer-app in state PreparingRebalance with old generation 1 (__consumer_offsets-0) (reason: Removing member consumer-consumer-app-1-3792090b-be55-4e19-bdbd-a3c66b968168 on LeaveGroup; client reason: consumer poll timeout has expired.)
...
[2026-06-08 16:53:05,354] INFO [GroupCoordinator 1]: Member MemberMetadata(memberId=consumer-consumer-app-1-3792090b-be55-4e19-bdbd-a3c66b968168, groupInstanceId=None, clientId=consumer-consumer-app-1, clientHost=/172.17.0.1, sessionTimeoutMs=45000, rebalanceTimeoutMs=50, supportedProtocols=List(range, cooperative-sticky)) has left group consumer-app through explicit `LeaveGroup`; client reason: consumer poll timeout has expired...

We’ll also confirm the consumer group’s state using the kafka-consumer-groups command:

sh-4.4$ /usr/bin/kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group consumer-app --state
Consumer group 'consumer-app' has no active members.

GROUP                     COORDINATOR (ID)          ASSIGNMENT-STRATEGY  STATE           #MEMBERS
consumer-app              8db993586dab:9092  (1)                         Empty           0

Based on the above logs and status, we can see that the consumer was removed due to a client-specific error: consumer poll timeout has expired.

Kafka has multiple ways to check if the consumer is healthy and running. This includes a background heartbeat check, which should run before the session timeout.

Additionally, Kafka expects the consumer to poll once within the specified max poll interval time period. If the consumer does not poll for any reason, it will be considered dead, and the partition will be reassigned to another consumer.

By further debugging and re-verifying the application logs, we observe that the record processing was clearly taking more time, and the consumer could not call the poll method within the specified MAX_POLL_INTERVAL_MS_CONFIG time period. Consequently, the consumer was removed by the Kafka group coordinator.

So, any further attempt to commit an offset failed with the error. It makes sense as the consumer is no longer the partition owner, and a commit can corrupt the offset state.

We’ll explore a few practical ways to prevent the error.

5. Approaches to Avoid the Problem

With the consumer polling understanding, we’ll try to solve it by analyzing the consumer code and configuration.

We may consider enabling auto-commit config, but that brings a possibility of data loss and irregular commit timing. Moreover, this approach does not ensure at-least-once processing.

Instead, we’ll explore some relevant manual commit solutions.

5.1. Tuning the Poll Duration and Batch Size

By debugging the code, we found that the actual processing time was significantly longer than the specified maximum poll interval.

We’ll increase the maximum poll interval config:

consumerProperties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1);
consumerProperties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300);

We can also fine-tune the batch size according to our batch processing timing and throughput requirements.

Let’s test the updated consumer and confirm that no CommitFailedException is thrown.

We should note that it’s good to set the poll interval to about 3 times the batch processing time to ensure a safe buffer.

While changing the poll interval works, we may still face the issue in more complex setups, such as processing involving database transactions or network calls.

5.2. Async Processing

If we’re processing records in the same thread that performs I/O operations, the above solution reduces throughput and increases latency.

In a production environment, we can’t guarantee a predictable database execution time, so configuring just the polling duration might be insufficient.

We can consider asynchronous processing of the records, so the consumer thread blocks for much shorter periods.

In Kafka, this can be done by processing the records asynchronously with virtual threads.

First, we’ll include a worker thread pool and a per-partition offset tracking map in the consumer service:

public class KafkaConsumerService {

    private final KafkaConsumer<String, String> consumer;
    private final AtomicBoolean running = new AtomicBoolean(true);
    private final ExecutorService workers;
    private final Map<TopicPartition, AtomicLong> committableOffsets = new ConcurrentHashMap<>();

    public KafkaConsumerService(Properties consumerProps, String topic) {
        this.consumer = new KafkaConsumer<>(consumerProps);
        consumer.subscribe(List.of(topic));
        workers = Executors.newVirtualThreadPerTaskExecutor();
    }
}

Next, we’ll update the consume method with async processing using the worker threads:

public void consume() {
    try {
        while (running.get()) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            if (records.isEmpty()) {
                continue;
            }

            List<CompletableFuture<Void>> futures = processAsync(records);

            try {
                CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new))
                  .orTimeout(700L, TimeUnit.MILLISECONDS)
                  .join();
            } catch (CompletionException ex) {
                log.error("Batch processing timed out or failed", ex);
            }

            commitOffsets();
        }
    } catch (WakeupException ex) {
        if (running.get()) {
            log.error("Error in the Kafka Consumer with exception", ex);
            throw ex;
        }
    } finally {
        commitOffsets();
        consumer.close();
    }
}

We’ll implement the processAsync method to asynchronously process the records:

private List<CompletableFuture<Void>> processAsync(ConsumerRecords<String, String> records) {
    return StreamSupport.stream(records.spliterator(), false)
      .map(record -> CompletableFuture.runAsync(() -> simulateDBUpdate(record), workers)
        .whenComplete((ignored, ex) -> {
            if (ex == null) {
                markComplete(record);
            } else {
                log.error("Failed offset and send to DLQ {} {} {}", 
                  record.offset(), record.key(), ex.getMessage());
            }
        })
        .exceptionally(ex -> null))
      .toList();
}

In the code above, the consumer waits for the workers to complete processing, then commits the processed offset. If any record fails during processing, we log the failure message and can send it to a Dead Letter Queue (DLQ) topic.

Next, let’s implement the markComplete method to update the processed record offset into the committableOffsets map:

private void markComplete(ConsumerRecord<String, String> record) {
    TopicPartition tp = new TopicPartition(record.topic(), record.partition());
    committableOffsets.computeIfAbsent(tp, k -> new AtomicLong(-1L))
      .accumulateAndGet(record.offset() + 1L, Math::max);
}

Finally, we’ll implement the commitOffsets method to commit the processed offset:

private void commitOffsets() {
    Map<TopicPartition, OffsetAndMetadata> toCommit = new HashMap<>();
    committableOffsets.forEach((tp, atomicOffset) -> {
        long val = atomicOffset.get();
        if (val != -1L) {
            toCommit.put(tp, new OffsetAndMetadata(val));
        }
    });

    if (toCommit.isEmpty()) {
        return;
    }
    consumer.commitSync(toCommit);
    toCommit.forEach((tp, meta) -> {
        AtomicLong ref = committableOffsets.get(tp);
        if (ref != null) {
            ref.compareAndSet(meta.offset(), -1L);
        }
    });
}

In the method above, we build the committed offset snapshot map for all the processed records and commit it. We also atomically mark the committed offset as completed.

Now, let’s configure the consumer with an increased batch size and max poll interval in the test class:

consumerProperties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30);
consumerProperties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 1000);

Finally, we’ll implement a test case by producing and consuming multiple messages and verifying the offset:

@Test
void givenProducerMessagesAreSent_whenConsumerIsRunning_thenConsumerOffsetIsCommitted() {
    KafkaConsumerService kafkaConsumerService = new KafkaConsumerService(getConsumerConfig(), "test-topic");
    Thread th = new Thread(kafkaConsumerService::consume);
    th.start();

    try (KafkaProducer<String, String> producer = new KafkaProducer<>(getProducerConfig())) {
        for (int num = 0; num < 100; num++) {
            producer.send(new ProducerRecord<>("test-topic", "x1" + num, "test" + num));
        }
        producer.flush();
    }

    Awaitility.await()
      .atMost(30, TimeUnit.SECONDS)
      .pollInterval(2, TimeUnit.SECONDS)
      .untilAsserted(() -> {
          TopicPartition topicPartition = new TopicPartition("test-topic", 0);
          Map<TopicPartition, OffsetAndMetadata> committedOffsets;

          try (AdminClient adminClient = AdminClient.create(getAdminProps())) {
              ListConsumerGroupOffsetsResult result = adminClient.listConsumerGroupOffsets("consumer-app");
              committedOffsets = result.partitionsToOffsetAndMetadata()
                .get();
          }

          assertNotNull(committedOffsets);
          assertNotNull(committedOffsets.get(topicPartition));
          assertEquals(100L, committedOffsets.get(topicPartition)
            .offset());
      });

    kafkaConsumerService.shutdown();
}

By running the test above, we have successfully verified that the committed offset matches the total number of consumed messages.

5.3. Implementing a ConsumerRebalanceListener

We should note that it’s always good practice to implement ConsumerRebalanceListener in the consumer to commit any in-flight offset and clean any data.

Let’s implement the ConsumerRebalanceListener in the KafkaConsumer object in the KafkaConsumerService class:

consumer.subscribe(List.of(topic), new ConsumerRebalanceListener() {
    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        try {
            commitOffsets(partitions);
        } catch (Exception ex) {
            log.error("Commit failed during rebalance", ex);
        } finally {
            partitions.forEach(committableOffsets::remove);
        }
    }

    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        partitions.forEach(committableOffsets::remove);
    }
});

We’ll also need to implement the commitOffsets method to commit offsets for the specific partitions:

private void commitOffsets(Collection<TopicPartition> partitions) {
    Map<TopicPartition, OffsetAndMetadata> toCommit = new HashMap<>();

    partitions.forEach(tp -> {
        AtomicLong ref = committableOffsets.get(tp);
        if (ref != null) {
            long val = ref.get();
            if (val != -1L) {
                toCommit.put(tp, new OffsetAndMetadata(val));
            }
        }
    });

    if (toCommit.isEmpty()) {
        return;
    }
    consumer.commitSync(toCommit);
}

In the code above, we’re committing the offset by creating a snapshot map from the specific partition’s offset.

Now, let’s run the previous section’s test method to verify the offset for the updated consumer implementation:

13:01:21.571 [virtual-158] INFO  c.b.k.c.f.async.KafkaConsumerService - Simulating a db call - record key x199 value test99

From the test above, we confirm that all messages were processed and committed.

5.4. Best Practices

While we cannot prevent exceptions entirely, we can protect the system from any reprocessing duplication, data loss, or silent message drops.

We should implement idempotency on both the producer and the consumer, validate database constraints, and ensure side-effect-free reprocessing. Additionally, we should send the failed messages to a retry or DLQ topic.

6. Conclusion

In this article, we learned about one of the common offset commit-related exceptions in the Consumer application. We explored why the resulting CommitFailedException happens and how to prevent it. We also implemented a preventive solution by tuning the maximum poll interval and switching to asynchronous processing logic.

As always, the example code can be found over on GitHub.

Baeldung Pro – NPI EA (cat = Baeldung)
announcement - icon

Baeldung Pro comes with both absolutely No-Ads as well as finally with Dark Mode, for a clean learning experience:

>> Explore a clean Baeldung

Once the early-adopter seats are all used, the price will go up and stay at $33/year.

eBook – HTTP Client – NPI EA (cat=HTTP Client-Side)
announcement - icon

The Apache HTTP Client is a very robust library, suitable for both simple and advanced use cases when testing HTTP endpoints. Check out our guide covering basic request and response handling, as well as security, cookies, timeouts, and more:

>> Download the eBook

eBook – Java Concurrency – NPI EA (cat=Java Concurrency)
announcement - icon

Handling concurrency in an application can be a tricky process with many potential pitfalls. A solid grasp of the fundamentals will go a long way to help minimize these issues.

Get started with understanding multi-threaded applications with our Java Concurrency guide:

>> Download the eBook

eBook – Java Streams – NPI EA (cat=Java Streams)
announcement - icon

Since its introduction in Java 8, the Stream API has become a staple of Java development. The basic operations like iterating, filtering, mapping sequences of elements are deceptively simple to use.

But these can also be overused and fall into some common pitfalls.

To get a better understanding on how Streams work and how to combine them with other language features, check out our guide to Java Streams:

>> Join Pro and download the eBook

eBook – Persistence – NPI EA (cat=Persistence)
announcement - icon

Working on getting your persistence layer right with Spring?

Explore the eBook

Course – LS – NPI EA (cat=REST)

announcement - icon

Get started with Spring Boot and with core Spring, through the Learn Spring course:

>> CHECK OUT THE COURSE

Partner – Moderne – NPI EA (tag=Refactoring)
announcement - icon

Modern Java teams move fast — but codebases don’t always keep up. Frameworks change, dependencies drift, and tech debt builds until it starts to drag on delivery. OpenRewrite was built to fix that: an open-source refactoring engine that automates repetitive code changes while keeping developer intent intact.

The monthly training series, led by the creators and maintainers of OpenRewrite at Moderne, walks through real-world migrations and modernization patterns. Whether you’re new to recipes or ready to write your own, you’ll learn practical ways to refactor safely and at scale.

If you’ve ever wished refactoring felt as natural — and as fast — as writing code, this is a good place to start.

eBook Jackson – NPI EA – 3 (cat = Jackson)
guest
0 Comments
Oldest
Newest
Inline Feedbacks
View all comments