Spring Top

I just announced the new Learn Spring course, focused on the fundamentals of Spring 5 and Spring Boot 2:

>> CHECK OUT THE COURSE

1. Introduction

By default in Spring AMQP, a failed message is re-queued for another round of consumption. Consequently, an infinite consumption loop may occur, causing an unstable situation and a waste of resources.

While using a Dead Letter Queue is a standard way to deal with failed messages, we may want to retry the message consumption and return the system to a normal state.

In this tutorial, we'll present two different ways of implementing a retry strategy named Exponential Backoff.

2. Prerequisites

Throughout this tutorial, we'll use RabbitMQ, a popular AMQP implementation. Consequently, we may refer to this Spring AMQP article for further instructions on how to configure and use RabbitMQ with Spring.

For the sake of simplicity, we'll also use a docker image for our RabbitMQ instance, though any RabbitMQ instance listening on port 5672 will do.

Let's start a RabbitMQ docker container:

docker run -p 5672:5672 -p 15672:15672 --name rabbit rabbitmq:3-management

In order to implement our examples, we need to add a dependency on spring-boot-starter-amqp. The latest version is available on Maven Central:

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
        <version>2.2.4.RELEASE</version>
    </dependency>
</dependencies>

3. A Blocking Way

Our first way will use Spring Retry fixtures. We'll create a simple queue and a consumer configured to wait for some time between retries of the failed message.

First, let's create our queue:

@Bean
public Queue blockingQueue() {
    return QueueBuilder.nonDurable("blocking-queue").build();
}

Secondly, let's configure a backoff strategy in RetryOperationsInterceptor and wire it in a custom RabbitListenerContainerFactory:

@Bean
public RetryOperationsInterceptor retryInterceptor() {
    return RetryInterceptorBuilder.stateless()
      .backOffOptions(1000, 3.0, 10000)
      .maxAttempts(5)
      .recoverer(observableRecoverer())
      .build();
}

@Bean
public SimpleRabbitListenerContainerFactory retryContainerFactory(
  ConnectionFactory connectionFactory, RetryOperationsInterceptor retryInterceptor) {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory);

    Advice[] adviceChain = { retryInterceptor };
    factory.setAdviceChain(adviceChain);

    return factory;
}

As shown above, we're configuring an initial interval of 1000ms and a multiplier of 3.0, up to a maximum wait time of 10000ms. In addition, after five attempts the message will be dropped.

Let's add our consumer and force a failed message by throwing an exception:

@RabbitListener(queues = "blocking-queue", containerFactory = "retryContainerFactory")
public void consumeBlocking(String payload) throws Exception {
    logger.info("Processing message from blocking-queue: {}", payload);

    throw new Exception("exception occured!");
}

Finally, let's create a test and send two messages to our queue:

@Test
public void whenSendToBlockingQueue_thenAllMessagesProcessed() throws Exception {
    int nb = 2;

    CountDownLatch latch = new CountDownLatch(nb);
    observableRecoverer.setObserver(() -> latch.countDown());

    for (int i = 1; i <= nb; i++) {
        rabbitTemplate.convertAndSend("blocking-queue", "blocking message " + i);
    }

    latch.await();
}

Keep in mind that the CountdownLatch is only used as a test fixture.

Let's run the test and check our log output:

2020-02-18 21:17:55.638  INFO : Processing message from blocking-queue: blocking message 1
2020-02-18 21:17:56.641  INFO : Processing message from blocking-queue: blocking message 1
2020-02-18 21:17:59.644  INFO : Processing message from blocking-queue: blocking message 1
2020-02-18 21:18:08.654  INFO : Processing message from blocking-queue: blocking message 1
2020-02-18 21:18:18.657  INFO : Processing message from blocking-queue: blocking message 1
2020-02-18 21:18:18.875  ERROR : java.lang.Exception: exception occured!
2020-02-18 21:18:18.858  INFO : Processing message from blocking-queue: blocking message 2
2020-02-18 21:18:19.860  INFO : Processing message from blocking-queue: blocking message 2
2020-02-18 21:18:22.863  INFO : Processing message from blocking-queue: blocking message 2
2020-02-18 21:18:31.867  INFO : Processing message from blocking-queue: blocking message 2
2020-02-18 21:18:41.871  INFO : Processing message from blocking-queue: blocking message 2
2020-02-18 21:18:41.875 ERROR : java.lang.Exception: exception occured!

As can be seen, this log correctly shows the exponential wait time between each retry. While our backoff strategy works, our consumer is blocked until the retries have been exhausted. A trivial improvement is to make our consumer execute concurrently by setting the concurrency attribute of @RabbitListener:

@RabbitListener(queues = "blocking-queue", containerFactory = "retryContainerFactory", concurrency = "2")

However, a retried message still blocks a consumer instance. Therefore, the application can suffer from latency issues.

In the next section, we'll present a non-blocking way to implement a similar strategy.

4. A Non-blocking Way

An alternative way involves a number of retry queues coupled with message expiration. As a matter of fact, when a message expires it ends up in a dead letter queue. In other words, if the DLQ consumer sends back the message to its original queue, we're essentially doing a retry loop.

As a result, the number of retry queues used is the number of attempts that will occur.

First, let's create the dead letter queue for our retry queues:

@Bean
public Queue retryWaitEndedQueue() {
    return QueueBuilder.nonDurable("retry-wait-ended-queue").build();
}

Let's add a consumer on the retry dead letter queue. This consumer's sole responsibility is sending back the message to its original queue:

@RabbitListener(queues = "retry-wait-ended-queue", containerFactory = "defaultContainerFactory")
public void consumeRetryWaitEndedMessage(String payload, Message message, Channel channel) throws Exception{
    MessageProperties props = message.getMessageProperties();

    rabbitTemplate().convertAndSend(props.getHeader("x-original-exchange"), 
      props.getHeader("x-original-routing-key"), message);
}

Secondly, let's create a wrapper object for our retry queues. This object will hold the exponential backoff configuration:

public class RetryQueues {
    private Queue[] queues;
    private long initialInterval;
    private double factor;
    private long maxWait;

    // constructor, getters and setters

Thirdly, let's define three retry queues:

@Bean
public Queue retryQueue1() {
    return QueueBuilder.nonDurable("retry-queue-1")
      .deadLetterExchange("")
      .deadLetterRoutingKey("retry-wait-ended-queue")
      .build();
}

@Bean
public Queue retryQueue2() {
    return QueueBuilder.nonDurable("retry-queue-2")
      .deadLetterExchange("")
      .deadLetterRoutingKey("retry-wait-ended-queue")
      .build();
}

@Bean
public Queue retryQueue3() {
    return QueueBuilder.nonDurable("retry-queue-3")
      .deadLetterExchange("")
      .deadLetterRoutingKey("retry-wait-ended-queue")
      .build();
}

@Bean
public RetryQueues retryQueues() {
    return new RetryQueues(1000, 3.0, 10000, retryQueue1(), retryQueue2(), retryQueue3());
}

Then, we need an interceptor to handle the message consumption:

public class RetryQueuesInterceptor implements MethodInterceptor {

    // fields and constructor

    @Override
    public Object invoke(MethodInvocation invocation) throws Throwable {
        return tryConsume(invocation, this::ack, (messageAndChannel, e) -> {
            try {
                int retryCount = tryGetRetryCountOrFail(messageAndChannel, e);
                sendToNextRetryQueue(messageAndChannel, retryCount);
            } catch (Throwable t) {
                // ...
                throw new RuntimeException(t);
            }
        });
    }

In the case of the consumer returning successfully, we simply acknowledge the message.

However, if the consumer throws an exception and there are attempts left, we send the message to the next retry queue:

private void sendToNextRetryQueue(MessageAndChannel mac, int retryCount) throws Exception {
    String retryQueueName = retryQueues.getQueueName(retryCount);

    rabbitTemplate.convertAndSend(retryQueueName, mac.message, m -> {
        MessageProperties props = m.getMessageProperties();
        props.setExpiration(String.valueOf(retryQueues.getTimeToWait(retryCount)));
        props.setHeader("x-retried-count", String.valueOf(retryCount + 1));
        props.setHeader("x-original-exchange", props.getReceivedExchange());
        props.setHeader("x-original-routing-key", props.getReceivedRoutingKey());

        return m;
    });

    mac.channel.basicReject(mac.message.getMessageProperties()
      .getDeliveryTag(), false);
}

Again, let's wire our interceptor in a custom RabbitListenerContainerFactory:

@Bean
public SimpleRabbitListenerContainerFactory retryQueuesContainerFactory(
  ConnectionFactory connectionFactory, RetryQueuesInterceptor retryInterceptor) {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory);

    Advice[] adviceChain = { retryInterceptor };
    factory.setAdviceChain(adviceChain);

    return factory;
}

Finally, we define our main queue and a consumer which simulates a failed message:

@Bean
public Queue nonBlockingQueue() {
    return QueueBuilder.nonDurable("non-blocking-queue")
      .build();
}

@RabbitListener(queues = "non-blocking-queue", containerFactory = "retryQueuesContainerFactory", 
  ackMode = "MANUAL")
public void consumeNonBlocking(String payload) throws Exception {
    logger.info("Processing message from non-blocking-queue: {}", payload);

    throw new Exception("Error occured!");
}

Let's create another test and send two messages:

@Test
public void whenSendToNonBlockingQueue_thenAllMessageProcessed() throws Exception {
    int nb = 2;

    CountDownLatch latch = new CountDownLatch(nb);
    retryQueues.setObserver(() -> latch.countDown());

    for (int i = 1; i <= nb; i++) {
        rabbitTemplate.convertAndSend("non-blocking-queue", "non-blocking message " + i);
    }

    latch.await();
}

Then, let's launch our test and check the log:

2020-02-19 10:31:40.640  INFO : Processing message from non-blocking-queue: non blocking message 1
2020-02-19 10:31:40.656  INFO : Processing message from non-blocking-queue: non blocking message 2
2020-02-19 10:31:41.620  INFO : Processing message from non-blocking-queue: non blocking message 1
2020-02-19 10:31:41.623  INFO : Processing message from non-blocking-queue: non blocking message 2
2020-02-19 10:31:44.415  INFO : Processing message from non-blocking-queue: non blocking message 1
2020-02-19 10:31:44.420  INFO : Processing message from non-blocking-queue: non blocking message 2
2020-02-19 10:31:52.751  INFO : Processing message from non-blocking-queue: non blocking message 1
2020-02-19 10:31:52.774 ERROR : java.lang.Exception: Error occured!
2020-02-19 10:31:52.829  INFO : Processing message from non-blocking-queue: non blocking message 2
2020-02-19 10:31:52.841 ERROR : java.lang.Exception: Error occured!

Again, we see an exponential wait time between each retry. However, instead of blocking until every attempt is made, the messages are processed concurrently.

While this setup is quite flexible and helps alleviate latency issues, there is a common pitfall. Indeed, RabbitMQ removes an expired message only when it reaches the head of the queue. Therefore, if a message has a greater expiration period, it will block all other messages in the queue. For this reason, a reply queue must only contain messages having the same expiration value.

4. Conclusion

As shown above, event-based systems can implement an exponential backoff strategy to improve resiliency. While implementing such solutions can be trivial, it's important to realize that a certain solution can be well adapted to a small system, but cause latency issues in high-throughput ecosystems.

The source code is available over on GitHub.

Spring bottom

I just announced the new Learn Spring course, focused on the fundamentals of Spring 5 and Spring Boot 2:

>> CHECK OUT THE COURSE
Comments are closed on this article!