Course – LS – All

Get started with Spring and Spring Boot, through the Learn Spring course:

>> CHECK OUT THE COURSE

1. Overview

In this article, we’ll explore the details of the CompletableFuture.allOf() method and understand the differences between using it and calling join() on multiple separate CompletableFuture instances. We’ll discover that allOf() enables us to proceed with our flow in a non-blocking way while also ensuring atomicity.

2. join() vs. allOf()

CompletableFuture is a powerful feature introduced in Java 8, facilitating and promoting the creation of non-blocking code. In this article, we’ll focus on two methods that enable parallel code execution: join() and allOf().

Let’s start by analyzing the inner workings of these two methods. Following that, we’ll delve into their distinct approaches to accomplishing a common goal, executing code in parallel, and subsequently merging the results. For the code snippet of this article, we’ll use two helper functions that are blocking the thread for a period of time and then return some data or throw an exception:

private CompletableFuture waitAndReturn(long millis, String value) {
    return CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(millis);
            return value;
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    });
}

private CompletableFuture waitAndThrow(long millis) {
    return CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(millis);
        } finally {
            throw new RuntimeException();
        }
    });
}

2.1. join()

CompletableFuture API exposes the join() method as a way of retrieving the value of the Future object by blocking the thread until the execution is completed. We should notice that the caller thread will be blocked even if the execution of the CompletableFuture is happening on a different thread:

CompletableFuture<String> future = waitAndReturn(1_000, "Harry");
assertEquals("Harry", future.join());

Furthermore, if the CompletableFuture completes with an error,  join() will throw it as a RuntimeException: 

CompletableFuture<String> futureError = waitAndThrow(1_000);
assertThrows(RuntimeException.class, futureError::join);

2.2. allOf()

The static method allOff() allows us to combine multiple CompletableFuture instances and returns a CompletableFuture<Void>. The completion of the resulting objects is dependent on the completion of all subsequent Futures. Moreover, if any of the subsequent Futures completes exceptionally, the overall result will also be considered a failure. It’s important to understand that allOf() is not a blocking method, which means that it will be executed instantly:

CompletableFuture<String> f1 = waitAndReturn(1_000, "Harry");
CompletableFuture<String> f2 = waitAndReturn(2_000, "Ron");

CompletableFuture<Void> combinedFutures = CompletableFuture.allOf(f1, f2);

However, in order to extract the values, we’ll need to call additional methods of the API. For example, if we call join() on the resulting CompletableFuture<Void>, the thread will wait for the two composing CompletableFuture objects to complete – each in its own thread. In other words, the caller thread will be blocked the same amount of time as it takes for the longes Future to complete:

combinedFutures.join();

Since the main thread already waited the two seconds, f1 and f2 are now completed, and the subsequent calls like join() or get() will be executed immediately:

assertEquals("Harry", f1.join());
assertEquals("Ron", f2.join());

2.3. Executing Code in Parallel

As we can notice from the previous examples, we can execute CompletableFutures in parallel and combine the results simply by calling join() on each of them:

CompletableFuture<String> f1 = waitAndReturn(1_000, "Harry");
CompletableFuture<String> f2 = waitAndReturn(2_000, "Ron");

sayHello(f1.join());
sayHello(f2.join());

Or by iterating through a Collection or Stream of CompletableFututres, calling the join() on each of them, and using their results:

Stream.of(f1, f2).map(CompletableFuture::join).forEach(this::sayHello);

The question at hand is whether using the static allOf() method before iterating and joining all the CompletableFututres will have any impact on the final result:

CompletableFuture.allOf(f1, f2).join();
Stream.of(f1, f2).map(CompletableFututre::join).forEach(this::sayHello);

There are two significant distinctions between the two approaches: error handling and the capacity to proceed in a non-blocking manner. Let’s delve into each of them and understand their particularities.

3. Error Handling

One of the main differences between the two approaches is that if we omit to invoke allOf, we’ll process the results of the CompletableFutures sequentially. Consequently, we can end up with partial processing of the values.

In other words, if one of the CompletableFutures throws an exception, it will break the chain and stop processing. In some cases, this can cause errors because the prior elements were already processed:

CompletableFuture<String> f1 = waitAndReturn(1_000, "Harry");
CompletableFuture<String> f2 = waitAndThrow(1_000);
CompletableFuture<String> f3 = waitAndReturn(1_000, "Ron");

Stream.of(f1, f2, f3)
  .map(CompletableFuture::join)
  .forEach(this::sayHello);

On the other hand, we can combine the three instances using allOf() and then invoke the join() method to achieve some sort of atomicity. By doing so, we either process all elements at once or none of them:

CompletableFuture.allOf(f1, f2, f3).join();
Stream.of(f1, f2, f3)
  .map(CompletableFuture::join) 
  .forEach(this::sayHello);

4. Non-Blocking Code

One of the advantages of allOf() is that it allows us to continue our flow in a non-blocking way. Since the return type is a CompletableFuture<Void>, we can use thenAccept() to process the data when it arrives without blocking the thread:

CompletableFuture.allOf(f1, f2, f3)
  .thenAccept(__ -> sayHelloToAll(f1.join(), f2.join(), f3.join()));

Similarly, if we need to merge the data from the different Futures, we can use the thenApply() method instead. For instance, we can concatenate the values of the three futures and continue the non-blocking flow with the resulting String:

CompletableFuture<String> names = CompletableFuture.allOf(f1, f2, f3)
  .thenApply(__ -> f1.join() + "," + f2.join() + "," + f3.join());

Moreover, if we don’t leave the async world and continue the CompletableFuture chain, we’ll be able to leverage its own mechanism for error handling and recovery, the exceptionally() method. For example, if one of the CompletableFututres completes with an exception, we can simply log it and continue the flow with a default value:

CompletableFuture<String> names = CompletableFuture.allOf(f1, f2, f3)
  .thenApply(__ -> f1.join() + "," + f2.join() + "," + f3.join())
  .exceptionally(err -> {
      System.out.println("oops, there was a problem! " + err.getMessage());
      return "names not found!";
  });

5. Conclusions

In this article, we learned how to use CompletableFuture‘s join() for parallel code execution while seamlessly merging the results. We also unveiled the advantages of allOf().join(), which allows us to process the data atomically. In other words, the flow will only continue when all the constituent CompletableFuture objects have been completed successfully.

Lastly, we discovered that we could use allOf() and omit to invoke join(). This will allow us to use the results of multiple CompletableFutures while continuing the non-blocking flow. We achieved this through other useful methods of the API, such as thenApply(), theAccept(), and exceptionally().

Course – LS – All

Get started with Spring and Spring Boot, through the Learn Spring course:

>> CHECK OUT THE COURSE
res – REST with Spring (eBook) (everywhere)
Comments are closed on this article!