Course – LS – All
announcement - icon

Get started with Spring Boot and with core Spring, through the Learn Spring course:

>> CHECK OUT THE COURSE

1. Overview

Apache Kafka is an event streaming platform that collects, processes, stores, and integrates data at scale. Sometimes, we may want to delay the processing of messages from Kafka. An example is a customer order processing system designed to process orders after a delay of X seconds, accommodating cancellations within this timeframe.

In this article, we’ll explore consumer processing of Kafka messages with delay using Spring Kafka. Although Kafka doesn’t provide out-of-the-box support for the delayed consumption of messages, we’ll look at an alternative option for implementation.

2. Application Context

Kafka offers multiple ways to retry on errors. We’ll use this retry mechanism to delay the consumer processing of messages. Therefore, it’s worth understanding how Kafka retry works.

Let’s consider an order processing application where a customer can place an order on a UI. The user can cancel mistakenly placed orders within 10 seconds. These orders go to the Kafka topic web.orders, where our application processes them.

An external service exposes the latest order status (CREATED, ORDER_CONFIRMED, ORDER_PROCESSED, DELETED). Our application needs to receive the message, wait for 10 seconds, and check with the external service to process the order if it’s in CONFIRMED status, i.e., the user hasn’t canceled it within the 10 seconds.

For testing, the internal orders received from web.orders.internal shouldn’t be delayed.

Let’s add a simple Order model that has orderGeneratedDateTime populated by the producer and orderProcessedTime populated by the consumer after a delayed duration:

@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class Order {

    private UUID orderId;

    private LocalDateTime orderGeneratedDateTime;

    private LocalDateTime orderProcessedTime;

    private List<String> address;

    private double price;
}

3. Kafka Listener and External Service

Next, we’ll add a listener for topic consumption and a service that exposes the status of the orders.

Let’s add a KafkaListener which reads and processes messages from topics web.orders and web.internal.orders :

@RetryableTopic(attempts = "1", include = KafkaBackoffException.class, dltStrategy = DltStrategy.NO_DLT)
@KafkaListener(topics = { "web.orders", "web.internal.orders" }, groupId = "orders")
public void handleOrders(String order) throws JsonProcessingException {
    Order orderDetails = objectMapper.readValue(order, Order.class);
    OrderService.Status orderStatus = orderService.findStatusById(orderDetails.getOrderId());
    if (orderStatus.equals(OrderService.Status.ORDER_CONFIRMED)) {
        orderService.processOrder(orderDetails);
    }
}

It’s important to include KafkaBackoffException so that the listener allows retries. For simplicity, let’s consider that the external OrderService always returns the order status as CONFIRMED. Also, the processOrder() method sets the order processed time as the current time and saves the order into a HashMap:

@Service
public class OrderService {

    HashMap<UUID, Order> orders = new HashMap<>();

    public Status findStatusById(UUID orderId) {
        return Status.ORDER_CONFIRMED;
    }

    public void processOrder(Order order) {
        order.setOrderProcessedTime(LocalDateTime.now());
        orders.put(order.getOrderId(), order);
    }
}

4. Custom Delayed Message Listener

Spring-Kafka comes up with the KafkaBackoffAwareMessageListenerAdapter which extends AbstractAdaptableMessageListener and implements AcknowledgingConsumerAwareMessageListener. This adapter examines the backoff dueTimestamp header and either backs off the message by invoking KafkaConsumerBackoffManager or retries the processing.

Let’s now implement the DelayedMessageListenerAdapter similar to KafkaBackoffAwareMessageListenerAdapter. This adapter should provide flexibility to configure delays per topic along with a default delay of 0 seconds:

public class DelayedMessageListenerAdapter<K, V> extends AbstractDelegatingMessageListenerAdapter<MessageListener<K, V>> 
  implements AcknowledgingConsumerAwareMessageListener<K, V> {

    // Field declaration and constructor

    public void setDelayForTopic(String topic, Duration delay) {
        Objects.requireNonNull(topic, "Topic cannot be null");
        Objects.requireNonNull(delay, "Delay cannot be null");
        this.logger.debug(() -> String.format("Setting delay %s for listener id %s", delay, this.listenerId));
        this.delaysPerTopic.put(topic, delay);
    }

    public void setDefaultDelay(Duration delay) {
        Objects.requireNonNull(delay, "Delay cannot be null");
        this.logger.debug(() -> String.format("Setting delay %s for listener id %s", delay, this.listenerId));
        this.defaultDelay = delay;
    }

    @Override
    public void onMessage(ConsumerRecord<K, V> consumerRecord, Acknowledgment acknowledgment, Consumer<?, ?> consumer) throws KafkaBackoffException {
        this.kafkaConsumerBackoffManager.backOffIfNecessary(createContext(consumerRecord,
          consumerRecord.timestamp() + delaysPerTopic.getOrDefault(consumerRecord.topic(), this.defaultDelay)
          .toMillis(), consumer));
        invokeDelegateOnMessage(consumerRecord, acknowledgment, consumer);
    }

    private KafkaConsumerBackoffManager.Context createContext(ConsumerRecord<K, V> data, long nextExecutionTimestamp, Consumer<?, ?> consumer) {
        return this.kafkaConsumerBackoffManager.createContext(nextExecutionTimestamp, 
          this.listenerId, 
          new TopicPartition(data.topic(), data.partition()), consumer);
    }
}

For every incoming message, this adapter first receives the record and checks the delay set for the topic. This will be set in the configuration, and if not set, it uses the default delay.

The existing implementation of KafkaConsumerBackoffManager#backOffIfNecessary method checks the difference between the context record timestamp and the current timestamp. If the difference is positive, showing no consumption due, the partition pauses and raises a KafkaBackoffException. Otherwise, it sends the record to the KafkaListener method for consumption.

5. Listener Configuration

The ConcurrentKafkaListenerContainerFactory is the default implementation of Spring Kafka which is responsible for building containers for KafkaListener. It allows us to configure the number of concurrent KafkaListener instances. Each container can be considered a logical thread pool, where each thread is responsible for listening to messages from one or more Kafka topics.

The DelayedMessageListenerAdapter needs to be configured with the listener by declaring a custom ConcurrentKafkaListenerContainerFactory. We can set the delay for specific topics like web.orders and also set a default delay of 0 for any other topics:

@Bean
public ConcurrentKafkaListenerContainerFactory<Object, Object> kafkaListenerContainerFactory(ConsumerFactory<Object, Object> consumerFactory, 
  ListenerContainerRegistry registry, TaskScheduler scheduler) {
    ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory);
    KafkaConsumerBackoffManager backOffManager = createBackOffManager(registry, scheduler);
    factory.getContainerProperties()
      .setAckMode(ContainerProperties.AckMode.RECORD);
    factory.setContainerCustomizer(container -> {
        DelayedMessageListenerAdapter<Object, Object> delayedAdapter = wrapWithDelayedMessageListenerAdapter(backOffManager, container);
        delayedAdapter.setDelayForTopic("web.orders", Duration.ofSeconds(10));
        delayedAdapter.setDefaultDelay(Duration.ZERO);
        container.setupMessageListener(delayedAdapter);
    });
    return factory;
}

@SuppressWarnings("unchecked")
private DelayedMessageListenerAdapter<Object, Object> wrapWithDelayedMessageListenerAdapter(KafkaConsumerBackoffManager backOffManager, 
  ConcurrentMessageListenerContainer<Object, Object> container) {
    return new DelayedMessageListenerAdapter<>((MessageListener<Object, Object>) container.getContainerProperties()
      .getMessageListener(), backOffManager, container.getListenerId());
}

private ContainerPartitionPausingBackOffManager createBackOffManager(ListenerContainerRegistry registry, TaskScheduler scheduler) {
    return new ContainerPartitionPausingBackOffManager(registry, 
      new ContainerPausingBackOffHandler(new ListenerContainerPauseService(registry, scheduler)));
}

Notably, setting the acknowledgment mode at the RECORD level is essential to ensure that the consumer redelivers messages if an error happens during processing.

Finally, we need to define a TaskScheduler bean to resume paused partitions after the delay duration and this scheduler needs to be injected into the BackOffManager which will be used by DelayedMessageListenerAdapter:

@Bean
public TaskScheduler taskScheduler() {
    return new ThreadPoolTaskScheduler();
}

6. Testing

Let’s ensure orders on the web.orders topic undergo a 10-second delay before processing through testing:

@Test
void givenKafkaBrokerExists_whenCreateOrderIsReceived_thenMessageShouldBeDelayed() throws Exception {
    // Given
    var orderId = UUID.randomUUID();
    Order order = Order.builder()
      .orderId(orderId)
      .price(1.0)
      .orderGeneratedDateTime(LocalDateTime.now())
      .address(List.of("41 Felix Avenue, Luton"))
      .build();

    String orderString = objectMapper.writeValueAsString(order);
    ProducerRecord<String, String> record = new ProducerRecord<>("web.orders", orderString);
    
    // When
    testKafkaProducer.send(record)
      .get();
    await().atMost(Duration.ofSeconds(1800))
      .until(() -> {
          // then
          Map<UUID, Order> orders = orderService.getOrders();
          return orders != null && orders.get(orderId) != null && Duration.between(orders.get(orderId)
              .getOrderGeneratedDateTime(), orders.get(orderId)
              .getOrderProcessedTime())
            .getSeconds() >= 10;
      });
}

Next, we’ll test any orders to web.internal.orders follow a default delay of 0 seconds:

@Test
void givenKafkaBrokerExists_whenCreateOrderIsReceivedForOtherTopics_thenMessageShouldNotBeDelayed() throws Exception {
    // Given
    var orderId = UUID.randomUUID();
    Order order = Order.builder()
      .orderId(orderId)
      .price(1.0)
      .orderGeneratedDateTime(LocalDateTime.now())
      .address(List.of("41 Felix Avenue, Luton"))
      .build();

    String orderString = objectMapper.writeValueAsString(order);
    ProducerRecord<String, String> record = new ProducerRecord<>("web.internal.orders", orderString);
    
    // When
    testKafkaProducer.send(record)
      .get();
    await().atMost(Duration.ofSeconds(1800))
      .until(() -> {
          // Then
          Map<UUID, Order> orders = orderService.getOrders();
          System.out.println("Time...." + Duration.between(orders.get(orderId)
              .getOrderGeneratedDateTime(), orders.get(orderId)
              .getOrderProcessedTime())
            .getSeconds());
          return orders != null && orders.get(orderId) != null && Duration.between(orders.get(orderId)
              .getOrderGeneratedDateTime(), orders.get(orderId)
              .getOrderProcessedTime())
            .getSeconds() <= 1;
      });
}

7. Conclusion

In this tutorial, we explored how a Kafka consumer can delay processing messages by fixed intervals.

We can modify the implementation to dynamically set processing delays by utilizing embedded message durations as part of the message.

As always, the source code for the examples is available over on GitHub.

Course – LS – All
announcement - icon

Get started with Spring Boot and with core Spring, through the Learn Spring course:

>> CHECK OUT THE COURSE

res – REST with Spring (eBook) (everywhere)
Subscribe
Notify of
guest
0 Comments
Inline Feedbacks
View all comments