eBook – Guide Spring Cloud – NPI EA (cat=Spring Cloud)
announcement - icon

Let's get started with a Microservice Architecture with Spring Cloud:

>> Join Pro and download the eBook

eBook – Mockito – NPI EA (tag = Mockito)
announcement - icon

Mocking is an essential part of unit testing, and the Mockito library makes it easy to write clean and intuitive unit tests for your Java code.

Get started with mocking and improve your application tests using our Mockito guide:

Download the eBook

eBook – Java Concurrency – NPI EA (cat=Java Concurrency)
announcement - icon

Handling concurrency in an application can be a tricky process with many potential pitfalls. A solid grasp of the fundamentals will go a long way to help minimize these issues.

Get started with understanding multi-threaded applications with our Java Concurrency guide:

>> Download the eBook

eBook – Reactive – NPI EA (cat=Reactive)
announcement - icon

Spring 5 added support for reactive programming with the Spring WebFlux module, which has been improved upon ever since. Get started with the Reactor project basics and reactive programming in Spring Boot:

>> Join Pro and download the eBook

eBook – Java Streams – NPI EA (cat=Java Streams)
announcement - icon

Since its introduction in Java 8, the Stream API has become a staple of Java development. The basic operations like iterating, filtering, mapping sequences of elements are deceptively simple to use.

But these can also be overused and fall into some common pitfalls.

To get a better understanding on how Streams work and how to combine them with other language features, check out our guide to Java Streams:

>> Join Pro and download the eBook

eBook – Jackson – NPI EA (cat=Jackson)
announcement - icon

Do JSON right with Jackson

Download the E-book

eBook – HTTP Client – NPI EA (cat=Http Client-Side)
announcement - icon

Get the most out of the Apache HTTP Client

Download the E-book

eBook – Maven – NPI EA (cat = Maven)
announcement - icon

Get Started with Apache Maven:

Download the E-book

eBook – Persistence – NPI EA (cat=Persistence)
announcement - icon

Working on getting your persistence layer right with Spring?

Explore the eBook

eBook – RwS – NPI EA (cat=Spring MVC)
announcement - icon

Building a REST API with Spring?

Download the E-book

Course – LS – NPI EA (cat=Jackson)
announcement - icon

Get started with Spring and Spring Boot, through the Learn Spring course:

>> LEARN SPRING
Course – RWSB – NPI EA (cat=REST)
announcement - icon

Explore Spring Boot 3 and Spring 6 in-depth through building a full REST API with the framework:

>> The New “REST With Spring Boot”

Course – LSS – NPI EA (cat=Spring Security)
announcement - icon

Yes, Spring Security can be complex, from the more advanced functionality within the Core to the deep OAuth support in the framework.

I built the security material as two full courses - Core and OAuth, to get practical with these more complex scenarios. We explore when and how to use each feature and code through it on the backing project.

You can explore the course here:

>> Learn Spring Security

Course – LSD – NPI EA (tag=Spring Data JPA)
announcement - icon

Spring Data JPA is a great way to handle the complexity of JPA with the powerful simplicity of Spring Boot.

Get started with Spring Data JPA through the guided reference course:

>> CHECK OUT THE COURSE

Partner – Moderne – NPI EA (cat=Spring Boot)
announcement - icon

Refactor Java code safely — and automatically — with OpenRewrite.

Refactoring big codebases by hand is slow, risky, and easy to put off. That’s where OpenRewrite comes in. The open-source framework for large-scale, automated code transformations helps teams modernize safely and consistently.

Each month, the creators and maintainers of OpenRewrite at Moderne run live, hands-on training sessions — one for newcomers and one for experienced users. You’ll see how recipes work, how to apply them across projects, and how to modernize code with confidence.

Join the next session, bring your questions, and learn how to automate the kind of work that usually eats your sprint time.

Partner – LambdaTest – NPI EA (cat=Testing)
announcement - icon

Regression testing is an important step in the release process, to ensure that new code doesn't break the existing functionality. As the codebase evolves, we want to run these tests frequently to help catch any issues early on.

The best way to ensure these tests run frequently on an automated basis is, of course, to include them in the CI/CD pipeline. This way, the regression tests will execute automatically whenever we commit code to the repository.

In this tutorial, we'll see how to create regression tests using Selenium, and then include them in our pipeline using GitHub Actions:, to be run on the LambdaTest cloud grid:

>> How to Run Selenium Regression Tests With GitHub Actions

Course – LJB – NPI EA (cat = Core Java)
announcement - icon

Code your way through and build up a solid, practical foundation of Java:

>> Learn Java Basics

1. Overview

Apache Kafka is an event streaming platform that collects, processes, stores, and integrates data at scale. Sometimes, we may want to delay the processing of messages from Kafka. An example is a customer order processing system designed to process orders after a delay of X seconds, accommodating cancellations within this timeframe.

In this article, we’ll explore consumer processing of Kafka messages with delay using Spring Kafka. Although Kafka doesn’t provide out-of-the-box support for the delayed consumption of messages, we’ll look at an alternative option for implementation.

2. Application Context

Kafka offers multiple ways to retry on errors. We’ll use this retry mechanism to delay the consumer processing of messages. Therefore, it’s worth understanding how Kafka retry works.

Let’s consider an order processing application where a customer can place an order on a UI. The user can cancel mistakenly placed orders within 10 seconds. These orders go to the Kafka topic web.orders, where our application processes them.

An external service exposes the latest order status (CREATED, ORDER_CONFIRMED, ORDER_PROCESSED, DELETED). Our application needs to receive the message, wait for 10 seconds, and check with the external service to process the order if it’s in CONFIRMED status, i.e., the user hasn’t canceled it within the 10 seconds.

For testing, the internal orders received from web.orders.internal shouldn’t be delayed.

Let’s add a simple Order model that has orderGeneratedDateTime populated by the producer and orderProcessedTime populated by the consumer after a delayed duration:

@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class Order {

    private UUID orderId;

    private LocalDateTime orderGeneratedDateTime;

    private LocalDateTime orderProcessedTime;

    private List<String> address;

    private double price;
}

3. Kafka Listener and External Service

Next, we’ll add a listener for topic consumption and a service that exposes the status of the orders.

Let’s add a KafkaListener which reads and processes messages from topics web.orders and web.internal.orders :

@RetryableTopic(attempts = "1", include = KafkaBackoffException.class, dltStrategy = DltStrategy.NO_DLT)
@KafkaListener(topics = { "web.orders", "web.internal.orders" }, groupId = "orders")
public void handleOrders(String order) throws JsonProcessingException {
    Order orderDetails = objectMapper.readValue(order, Order.class);
    OrderService.Status orderStatus = orderService.findStatusById(orderDetails.getOrderId());
    if (orderStatus.equals(OrderService.Status.ORDER_CONFIRMED)) {
        orderService.processOrder(orderDetails);
    }
}

It’s important to include KafkaBackoffException so that the listener allows retries. For simplicity, let’s consider that the external OrderService always returns the order status as CONFIRMED. Also, the processOrder() method sets the order processed time as the current time and saves the order into a HashMap:

@Service
public class OrderService {

    HashMap<UUID, Order> orders = new HashMap<>();

    public Status findStatusById(UUID orderId) {
        return Status.ORDER_CONFIRMED;
    }

    public void processOrder(Order order) {
        order.setOrderProcessedTime(LocalDateTime.now());
        orders.put(order.getOrderId(), order);
    }
}

4. Custom Delayed Message Listener

Spring-Kafka comes up with the KafkaBackoffAwareMessageListenerAdapter which extends AbstractAdaptableMessageListener and implements AcknowledgingConsumerAwareMessageListener. This adapter examines the backoff dueTimestamp header and either backs off the message by invoking KafkaConsumerBackoffManager or retries the processing.

Let’s now implement the DelayedMessageListenerAdapter similar to KafkaBackoffAwareMessageListenerAdapter. This adapter should provide flexibility to configure delays per topic along with a default delay of 0 seconds:

public class DelayedMessageListenerAdapter<K, V> extends AbstractDelegatingMessageListenerAdapter<MessageListener<K, V>> 
  implements AcknowledgingConsumerAwareMessageListener<K, V> {

    // Field declaration and constructor

    public void setDelayForTopic(String topic, Duration delay) {
        Objects.requireNonNull(topic, "Topic cannot be null");
        Objects.requireNonNull(delay, "Delay cannot be null");
        this.logger.debug(() -> String.format("Setting delay %s for listener id %s", delay, this.listenerId));
        this.delaysPerTopic.put(topic, delay);
    }

    public void setDefaultDelay(Duration delay) {
        Objects.requireNonNull(delay, "Delay cannot be null");
        this.logger.debug(() -> String.format("Setting delay %s for listener id %s", delay, this.listenerId));
        this.defaultDelay = delay;
    }

    @Override
    public void onMessage(ConsumerRecord<K, V> consumerRecord, Acknowledgment acknowledgment, Consumer<?, ?> consumer) throws KafkaBackoffException {
        this.kafkaConsumerBackoffManager.backOffIfNecessary(createContext(consumerRecord,
          consumerRecord.timestamp() + delaysPerTopic.getOrDefault(consumerRecord.topic(), this.defaultDelay)
          .toMillis(), consumer));
        invokeDelegateOnMessage(consumerRecord, acknowledgment, consumer);
    }

    private KafkaConsumerBackoffManager.Context createContext(ConsumerRecord<K, V> data, long nextExecutionTimestamp, Consumer<?, ?> consumer) {
        return this.kafkaConsumerBackoffManager.createContext(nextExecutionTimestamp, 
          this.listenerId, 
          new TopicPartition(data.topic(), data.partition()), consumer);
    }
}

For every incoming message, this adapter first receives the record and checks the delay set for the topic. This will be set in the configuration, and if not set, it uses the default delay.

The existing implementation of KafkaConsumerBackoffManager#backOffIfNecessary method checks the difference between the context record timestamp and the current timestamp. If the difference is positive, showing no consumption due, the partition pauses and raises a KafkaBackoffException. Otherwise, it sends the record to the KafkaListener method for consumption.

5. Listener Configuration

The ConcurrentKafkaListenerContainerFactory is the default implementation of Spring Kafka which is responsible for building containers for KafkaListener. It allows us to configure the number of concurrent KafkaListener instances. Each container can be considered a logical thread pool, where each thread is responsible for listening to messages from one or more Kafka topics.

The DelayedMessageListenerAdapter needs to be configured with the listener by declaring a custom ConcurrentKafkaListenerContainerFactory. We can set the delay for specific topics like web.orders and also set a default delay of 0 for any other topics:

@Bean
public ConcurrentKafkaListenerContainerFactory<Object, Object> kafkaListenerContainerFactory(ConsumerFactory<Object, Object> consumerFactory, 
  ListenerContainerRegistry registry, TaskScheduler scheduler) {
    ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory);
    KafkaConsumerBackoffManager backOffManager = createBackOffManager(registry, scheduler);
    factory.getContainerProperties()
      .setAckMode(ContainerProperties.AckMode.RECORD);
    factory.setContainerCustomizer(container -> {
        DelayedMessageListenerAdapter<Object, Object> delayedAdapter = wrapWithDelayedMessageListenerAdapter(backOffManager, container);
        delayedAdapter.setDelayForTopic("web.orders", Duration.ofSeconds(10));
        delayedAdapter.setDefaultDelay(Duration.ZERO);
        container.setupMessageListener(delayedAdapter);
    });
    return factory;
}

@SuppressWarnings("unchecked")
private DelayedMessageListenerAdapter<Object, Object> wrapWithDelayedMessageListenerAdapter(KafkaConsumerBackoffManager backOffManager, 
  ConcurrentMessageListenerContainer<Object, Object> container) {
    return new DelayedMessageListenerAdapter<>((MessageListener<Object, Object>) container.getContainerProperties()
      .getMessageListener(), backOffManager, container.getListenerId());
}

private ContainerPartitionPausingBackOffManager createBackOffManager(ListenerContainerRegistry registry, TaskScheduler scheduler) {
    return new ContainerPartitionPausingBackOffManager(registry, 
      new ContainerPausingBackOffHandler(new ListenerContainerPauseService(registry, scheduler)));
}

Notably, setting the acknowledgment mode at the RECORD level is essential to ensure that the consumer redelivers messages if an error happens during processing.

Finally, we need to define a TaskScheduler bean to resume paused partitions after the delay duration and this scheduler needs to be injected into the BackOffManager which will be used by DelayedMessageListenerAdapter:

@Bean
public TaskScheduler taskScheduler() {
    return new ThreadPoolTaskScheduler();
}

6. Testing

Let’s ensure orders on the web.orders topic undergo a 10-second delay before processing through testing:

@Test
void givenKafkaBrokerExists_whenCreateOrderIsReceived_thenMessageShouldBeDelayed() throws Exception {
    // Given
    var orderId = UUID.randomUUID();
    Order order = Order.builder()
      .orderId(orderId)
      .price(1.0)
      .orderGeneratedDateTime(LocalDateTime.now())
      .address(List.of("41 Felix Avenue, Luton"))
      .build();

    String orderString = objectMapper.writeValueAsString(order);
    ProducerRecord<String, String> record = new ProducerRecord<>("web.orders", orderString);
    
    // When
    testKafkaProducer.send(record)
      .get();
    await().atMost(Duration.ofSeconds(1800))
      .until(() -> {
          // then
          Map<UUID, Order> orders = orderService.getOrders();
          return orders != null && orders.get(orderId) != null && Duration.between(orders.get(orderId)
              .getOrderGeneratedDateTime(), orders.get(orderId)
              .getOrderProcessedTime())
            .getSeconds() >= 10;
      });
}

Next, we’ll test any orders to web.internal.orders follow a default delay of 0 seconds:

@Test
void givenKafkaBrokerExists_whenCreateOrderIsReceivedForOtherTopics_thenMessageShouldNotBeDelayed() throws Exception {
    // Given
    var orderId = UUID.randomUUID();
    Order order = Order.builder()
      .orderId(orderId)
      .price(1.0)
      .orderGeneratedDateTime(LocalDateTime.now())
      .address(List.of("41 Felix Avenue, Luton"))
      .build();

    String orderString = objectMapper.writeValueAsString(order);
    ProducerRecord<String, String> record = new ProducerRecord<>("web.internal.orders", orderString);
    
    // When
    testKafkaProducer.send(record)
      .get();
    await().atMost(Duration.ofSeconds(1800))
      .until(() -> {
          // Then
          Map<UUID, Order> orders = orderService.getOrders();
          System.out.println("Time...." + Duration.between(orders.get(orderId)
              .getOrderGeneratedDateTime(), orders.get(orderId)
              .getOrderProcessedTime())
            .getSeconds());
          return orders != null && orders.get(orderId) != null && Duration.between(orders.get(orderId)
              .getOrderGeneratedDateTime(), orders.get(orderId)
              .getOrderProcessedTime())
            .getSeconds() <= 1;
      });
}

7. Conclusion

In this tutorial, we explored how a Kafka consumer can delay processing messages by fixed intervals.

We can modify the implementation to dynamically set processing delays by utilizing embedded message durations as part of the message.

The code backing this article is available on GitHub. Once you're logged in as a Baeldung Pro Member, start learning and coding on the project.
Baeldung Pro – NPI EA (cat = Baeldung)
announcement - icon

Baeldung Pro comes with both absolutely No-Ads as well as finally with Dark Mode, for a clean learning experience:

>> Explore a clean Baeldung

Once the early-adopter seats are all used, the price will go up and stay at $33/year.

eBook – HTTP Client – NPI EA (cat=HTTP Client-Side)
announcement - icon

The Apache HTTP Client is a very robust library, suitable for both simple and advanced use cases when testing HTTP endpoints. Check out our guide covering basic request and response handling, as well as security, cookies, timeouts, and more:

>> Download the eBook

eBook – Java Concurrency – NPI EA (cat=Java Concurrency)
announcement - icon

Handling concurrency in an application can be a tricky process with many potential pitfalls. A solid grasp of the fundamentals will go a long way to help minimize these issues.

Get started with understanding multi-threaded applications with our Java Concurrency guide:

>> Download the eBook

eBook – Java Streams – NPI EA (cat=Java Streams)
announcement - icon

Since its introduction in Java 8, the Stream API has become a staple of Java development. The basic operations like iterating, filtering, mapping sequences of elements are deceptively simple to use.

But these can also be overused and fall into some common pitfalls.

To get a better understanding on how Streams work and how to combine them with other language features, check out our guide to Java Streams:

>> Join Pro and download the eBook

eBook – Persistence – NPI EA (cat=Persistence)
announcement - icon

Working on getting your persistence layer right with Spring?

Explore the eBook

Course – LS – NPI EA (cat=REST)

announcement - icon

Get started with Spring Boot and with core Spring, through the Learn Spring course:

>> CHECK OUT THE COURSE

Partner – Moderne – NPI EA (tag=Refactoring)
announcement - icon

Modern Java teams move fast — but codebases don’t always keep up. Frameworks change, dependencies drift, and tech debt builds until it starts to drag on delivery. OpenRewrite was built to fix that: an open-source refactoring engine that automates repetitive code changes while keeping developer intent intact.

The monthly training series, led by the creators and maintainers of OpenRewrite at Moderne, walks through real-world migrations and modernization patterns. Whether you’re new to recipes or ready to write your own, you’ll learn practical ways to refactor safely and at scale.

If you’ve ever wished refactoring felt as natural — and as fast — as writing code, this is a good place to start.

Course – LS – NPI – (cat=Spring)
announcement - icon

Get started with Spring Boot and with core Spring, through the Learn Spring course:

>> CHECK OUT THE COURSE

eBook Jackson – NPI EA – 3 (cat = Jackson)