Course – LS – All

Get started with Spring and Spring Boot, through the Learn Spring course:

>> CHECK OUT THE COURSE

1. Overview

In this article, we’ll learn how to apply retry logic to CompletableFuture objects. Initially, we’ll retry the task wrapped within a CompletableFuture. Following that, we’ll harness the CompletableFuture API to create a chain of multiple instances, enabling us to re-execute the task when the future encounters an exceptional completion.

2. Retrying the Task

A simple approach to retry a task would be to leverage the decorator pattern and implement it using the classical OOP style with classes and interfaces. On the other hand, we can choose a more concise and functional approach, taking advantage of the higher-order functions.

Initially, we’ll declare a function that takes as a parameter a Supplier<T> and the maximum number of invocations. After that, we’ll use a while loop and a try-catch block to invoke the function multiple times, if needed. Finally, we’ll preserve the original data type by returning yet another Supplier<T>:

static <T> Supplier<T> retryFunction(Supplier<T> supplier, int maxRetries) {
    return () -> {
        int retries = 0;
	while (retries < maxRetries) {
	    try {
	        return supplier.get();
	    } catch (Exception e) {
	        retries++;
	    }
        }
	throw new IllegalStateException(String.format("Task failed after %s attempts", maxRetries));
    };
}

We can further improve this decorator by allowing for the definition of specific exceptions to be retried or by introducing a delay between invocations. But, for simplicity’s sake, let’s proceed with creating the CompletableFuture based on this function decorator:

static <T> CompletableFuture<T> retryTask(Supplier<T> supplier, int maxRetries) {
    Supplier<T> retryableSupplier = retryFunction(supplier, maxRetries);
    return CompletableFuture.supplyAsync(retryableSupplier);
}

Now, let’s proceed with writing tests for this functionality. To begin, we’ll need a method that will be retried by our CompletableFuture. For this purpose, we’ll design a method that fails four times by throwing RuntimeExceptions and successfully completes on the fifth attempt, returning an integer value:

AtomicInteger retriesCounter = new AtomicInteger(0);

@BeforeEach
void beforeEach() {
    retriesCounter.set(0);
}

int failFourTimesThenReturn(int returnValue) {
    int retryNr = retriesCounter.get();
    if (retryNr < 4) {
        retriesCounter.set(retryNr + 1);
	throw new RuntimeException();
    }
    return returnValue;
}

Now, we can finally test our retryTask() function and assert that the expected value is returned. Also, we can check the number of invocations by interrogating the retriesCounter:

@Test
void whenRetryingTask_thenReturnsCorrectlyAfterFourInvocations() {
    Supplier<Integer> codeToRun = () -> failFourTimesThenReturn(100);

    CompletableFuture<Integer> result = retryTask(codeToRun, 10);

    assertThat(result.join()).isEqualTo(100);
    assertThat(retriesCounter).hasValue(4);
}

Furthermore, if we call the same function with a smaller value for the maxRetires parameter, we’ll expect the Future to complete exceptionally. The original IllegalStateException should be wrapped into a CompletionException, but the original error message should be preserved:

@Test
void whenRetryingTask_thenThrowsExceptionAfterThreeInvocations() {
    Supplier<Integer> codeToRun = () -> failFourTimesThenReturn(100);

    CompletableFuture<Integer> result = retryTask(codeToRun, 3);

    assertThatThrownBy(result::join)
      .isInstanceOf(CompletionException.class)
      .hasMessageContaining("IllegalStateException: Task failed after 3 attempts");
}

3. Retrying the CompletableFuture

The CompletableFuture API provides options for handling exceptions as they arise. As a result, we can utilize methods such as exceptionally() instead of creating our function decorator.

3.1. Unsafe Retry

The exceptionally() method enables us to specify an alternative function that will be invoked when the initial invocation completes with an exception. For instance, if we intend to retry the invocation two times, we can utilize the fluent API to add two of these fallbacks:

static <T> CompletableFuture<T> retryTwice(Supplier<T> supplier) {
    return CompletableFuture.supplyAsync(supplier)
      .exceptionally(__ -> supplier.get())
      .exceptionally(__ -> supplier.get());
}

Since we need a variable number of retries, let’s refactor the code and use a for loop instead:

static <T> CompletableFuture<T> retryUnsafe(Supplier<T> supplier, int maxRetries) {
    CompletableFuture<T> cf = CompletableFuture.supplyAsync(supplier);
    for (int i = 0; i < maxRetries; i++) {
        cf = cf.exceptionally(__ -> supplier.get());
    }
    return cf;
}

We can test retryUnsafe() using the same test helpers and anticipate similar outcomes. Nonetheless, there will be a subtle distinction if the initial supplier completes before the final CompletableFuture is created with all its exceptionally() fallbacks. In such cases, the function will indeed be retried the specified number of times. However, this retry process will occur on the main thread, resulting in the loss of asynchrony.

To illustrate this, we can insert a 100-millisecond pause just before the for loop, which iteratively invokes the exceptionally() method.

static <T> CompletableFuture<T> retryUnsafe(Supplier<T> supplier, int maxRetries) {
    CompletableFuture<T> cf = CompletableFuture.supplyAsync(supplier);  
    sleep(100l);
    for (int i = 0; i < maxRetries; i++) {
        cf = cf.exceptionally(__ -> supplier.get());
    }
    return cf;
}

Following that, we’ll modify the failFourTimesThenReturn() test method to log the attempt number and the current thread name on each invocation of this method. Now, let’s re-run the test and check the console:

invocation: 0, thread: ForkJoinPool.commonPool-worker-1
invocation: 1, thread: main
invocation: 2, thread: main
invocation: 3, thread: main
invocation: 4, thread: main

As anticipated, the subsequent invocations were executed by the main thread. This can become problematic if the initial invocation is quick, but the subsequent ones are expected to be slower.

3.2. Retry Asynchronously

We can address this concern by ensuring that the subsequent invocations are carried out asynchronously. To facilitate this, a dedicated method was introduced to the API, starting with Java 12. By using exceptionallyAsync(), we’ll ensure all the retries will be performed asynchronously, regardless of the speed at which the initial CompletableFuture completes:

static <T> CompletableFuture<T> retryExceptionallyAsync(Supplier<T> supplier, int maxRetries) {
   CompletableFuture<T> cf = CompletableFuture.supplyAsync(supplier);
   for (int i = 0; i < maxRetries; i++) {
      cf = cf.exceptionallyAsync(__ -> supplier.get());
   }
   return cf;
}

Let’s quickly run the test and examine the logs:

invocation: 0, thread: ForkJoinPool.commonPool-worker-1
invocation: 1, thread: ForkJoinPool.commonPool-worker-1
invocation: 2, thread: ForkJoinPool.commonPool-worker-1
invocation: 3, thread: ForkJoinPool.commonPool-worker-2
invocation: 4, thread: ForkJoinPool.commonPool-worker-2

As expected, none of the invocations were executed by the main thread.

3.3. Nesting CompletableFutures

If we need a solution compatible with versions of Java prior to 12, we can manually enhance the first example to achieve full asynchrony. To accomplish this, we must ensure that the fallback is executed asynchronously within a new CompletableFuture:

cf.exceptionally(__ -> CompletableFuture.supplyAsync(supplier))

However, the code above will not compile because the datatypes do not match, but we can fix it in three steps. Firstly, we’ll need to double-nest the initial Future. We can easily do this through compleatedFuture():

CompletableFuture<CompletableFuture<T>> temp = cf.thenApply(value -> CompletableFuture.completedFuture(value));

Now the types are matching, so we can safely apply the exceptionally() fallback:

temp = temp.exceptionally(__ -> CompletableFuture.supplyAsync(supplier));

Lastly, we’ll use thenCompose() to flatten the object and go back to the original type:

cf = temp.thenCompose(t -> t);

Finally, let’s combine everything and create a CompletableFuture with a variable number of asynchronous fallbacks. Additionally, let’s take advantage of the fluent API, method references, and utility functions to keep the code concise:

static <T> CompletableFuture<T> retryNesting(Supplier<T> supplier, int maxRetries) {
    CompletableFuture<T> cf = CompletableFuture.supplyAsync(supplier);
    for (int i = 0; i < maxRetries; i++) {
        cf = cf.thenApply(CompletableFuture::completedFuture)
	  .exceptionally(__ -> CompletableFuture.supplyAsync(supplier))
	  .thenCompose(Function.identity());
    }
    return cf;
}

4. Conclusion

In this article, we explored the concept of retrying invocations of a function within a CompletableFuture. We began by delving into the implementation of the decorator pattern in a functional style, allowing us to retry the function itself.

Subsequently, we leveraged the CompletableFuture API to accomplish the same task while maintaining asynchronous flow. Our discovery included the exceptionallyAsync() method introduced in Java 12, which is perfect for this purpose. Finally, we presented an alternative approach, relying solely on methods from the original Java 8 API.

As always, we can find working code examples over on GitHub.

Course – LS – All

Get started with Spring and Spring Boot, through the Learn Spring course:

>> CHECK OUT THE COURSE
res – REST with Spring (eBook) (everywhere)
Comments are open for 30 days after publishing a post. For any issues past this date, use the Contact form on the site.