1. Introduction

Guava provides us with ListenableFuture with an enriched API over the default Java Future. Let's see how we can use this to our advantage.

2. Future, ListenableFuture and Futures

Let's have a brief look at what these different classes are and how they are related to each other.

2.1. Future

Since Java 5, we can use java.util.concurrent.Future to represent asynchronous tasks.

A Future allows us access to the result of a task that has already been completed or might complete in the future, along with support for canceling them.

2.2. ListenableFuture

One lacking feature when using java.util.concurrent.Future is the ability to add listeners to run on completion, which is a common feature provided by most popular asynchronous frameworks.

Guava solves this problem by allowing us to attach listeners to its com.google.common.util.concurrent.ListenableFuture.

2.3. Futures

Guava provides us with the convenience class com.google.common.util.concurrent.Futures to make it easier to work with their ListenableFuture.

This class provides various ways of interacting with ListenableFuture, among which is the support for adding success/failure callbacks and allowing us to coordinate multiple futures with aggregations or transformations.

3. Simple Usage

Let's now see how we can use ListenableFuture in its simplest ways; creating and adding callbacks.

3.1. Creating ListenableFuture

The simplest way we can obtain a ListenableFuture is by submitting a task to a ListeningExecutorService (much like how we would use a normal ExecutorService to obtain a normal Future):

ExecutorService execService = Executors.newSingleThreadExecutor();
ListeningExecutorService lExecService = MoreExecutors.listeningDecorator(execService);

ListenableFuture<Integer> asyncTask = lExecService.submit(() -> {
    TimeUnit.MILLISECONDS.sleep(500); // long running task
    return 5;
});

Notice how we use the MoreExecutors class to decorate our ExecutorService as a ListeningExecutorService. We can refer to Thread Pool's Implementation in Guava to learn more about MoreExecutors.

If we already have an API that returns a Future and we need to convert it to ListenableFuture, this is easily done by initializing its concrete implementation ListenableFutureTask:

// old api
public FutureTask<String> fetchConfigTask(String configKey) {
    return new FutureTask<>(() -> {
        TimeUnit.MILLISECONDS.sleep(500);
        return String.format("%s.%d", configKey, new Random().nextInt(Integer.MAX_VALUE));
    });
}

// new api
public ListenableFutureTask<String> fetchConfigListenableTask(String configKey) {
    return ListenableFutureTask.create(() -> {
        TimeUnit.MILLISECONDS.sleep(500);
        return String.format("%s.%d", configKey, new Random().nextInt(Integer.MAX_VALUE));
    });
}

We need to be aware that these tasks won't run unless we submit them to an Executor. Interacting directly with ListenableFutureTask is not common usage and is done only in rare scenarios (ex: implementing our own ExecutorService). Refer to Guava's AbstractListeningExecutorService for practical usage.

We can also use com.google.common.util.concurrent.SettableFuture if our asynchronous task can't use the ListeningExecutorService or the provided Futures utility methods, and we need to set the future value manually. For more complex usage, we can also consider com.google.common.util.concurrent.AbstractFuture.

3.2. Adding Listeners/Callbacks

One way we can add a listener to a ListenableFuture is by registering a callback with Futures.addCallback(), providing us access to the result or exception when success or failure occurs:

Executor listeningExecutor = Executors.newSingleThreadExecutor();

ListenableFuture<Integer> asyncTask = new ListenableFutureService().succeedingTask()
Futures.addCallback(asyncTask, new FutureCallback<Integer>() {
    @Override
    public void onSuccess(Integer result) {
        // do on success
    }

    @Override
    public void onFailure(Throwable t) {
        // do on failure
    }
}, listeningExecutor);

We can also add a listener by adding it directly to the ListenableFuture. Note that this listener will run when the future completes either successfully or exceptionally. Also, note that we don't have access to the result of the asynchronous task:

Executor listeningExecutor = Executors.newSingleThreadExecutor();

int nextTask = 1;
Set<Integer> runningTasks = ConcurrentHashMap.newKeySet();
runningTasks.add(nextTask);

ListenableFuture<Integer> asyncTask = new ListenableFutureService().succeedingTask()
asyncTask.addListener(() -> runningTasks.remove(nextTask), listeningExecutor);

4. Complex Usage

Let's now see how we can use these futures in more complex scenarios.

4.1. Fan-In

We may sometimes need to invoke multiple asynchronous tasks and collect their results, usually called a fan-in operation.

Guava provides us with two ways of doing this. However, we should be careful in selecting the correct method depending on our requirements. Let's assume we need to coordinate the following asynchronous tasks:

ListenableFuture<String> task1 = service.fetchConfig("config.0");
ListenableFuture<String> task2 = service.fetchConfig("config.1");
ListenableFuture<String> task3 = service.fetchConfig("config.2");

One way of fanning-in multiple futures is by the use of Futures.allAsList() method. This allows us to collect results of all futures if all of them succeed, in the order of the provided futures. If either one of these futures fails, then the whole result is a failed future:

ListenableFuture<List<String>> configsTask = Futures.allAsList(task1, task2, task3);
Futures.addCallback(configsTask, new FutureCallback<List<String>>() {
    @Override
    public void onSuccess(@Nullable List<String> configResults) {
        // do on all futures success
    }

    @Override
    public void onFailure(Throwable t) {
        // handle on at least one failure
    }
}, someExecutor);

If we need to collect results of all asynchronous tasks, regardless of whether they failed or not, we can use Futures.successfulAsList(). This will return a list whose results will have the same order as the tasks passed into the argument, and the failed tasks will have null assigned to their respective positions in the list:

ListenableFuture<List<String>> configsTask = Futures.successfulAsList(task1, task2, task3);
Futures.addCallback(configsTask, new FutureCallback<List<String>>() {
    @Override
    public void onSuccess(@Nullable List<String> configResults) {
        // handle results. If task2 failed, then configResults.get(1) == null
    }

    @Override
    public void onFailure(Throwable t) {
        // handle failure
    }
}, listeningExecutor);

We should be careful in the above usage that if the future task normally returns null on success, it will be indistinguishable from a failed task (which also sets the result as null).

4.2. Fan-In with Combiners

If we have a requirement to coordinate multiple futures that return different results, the above solution may not suffice. In this case, we can use the combiner variants of the fan-in operations to coordinate this mix of futures.

Similar to the simple fan-in operations, Guava provides us with two variants; one that succeeds when all tasks complete successfully and one that succeeds even if some tasks fail using the Futures.whenAllSucceed() and Futures.whenAllComplete() methods, respectively.

Let's see how we can use Futures.whenAllSucceed() to combine different results types from multiple futures:

ListenableFuture<Integer> cartIdTask = service.getCartId();
ListenableFuture<String> customerNameTask = service.getCustomerName();
ListenableFuture<List<String>> cartItemsTask = service.getCartItems();

ListenableFuture<CartInfo> cartInfoTask = Futures.whenAllSucceed(cartIdTask, customerNameTask, cartItemsTask)
    .call(() -> {
        int cartId = Futures.getDone(cartIdTask);
        String customerName = Futures.getDone(customerNameTask);
        List<String> cartItems = Futures.getDone(cartItemsTask);
        return new CartInfo(cartId, customerName, cartItems);
    }, someExecutor);

Futures.addCallback(cartInfoTask, new FutureCallback<CartInfo>() {
    @Override
    public void onSuccess(@Nullable CartInfo result) {
        //handle on all success and combination success
    }

    @Override
    public void onFailure(Throwable t) {
        //handle on either task fail or combination failed
    }
}, listeningExecService);

If we need to allow some tasks to fail, we can use Futures.whenAllComplete(). While the semantics are mostly similar to the above, we should be aware that the failed futures will throw an ExecutionException when Futures.getDone() is called on them.

4.3. Transformations

Sometimes we need to convert the result of a future once successful. Guava provides us with two ways to do so with Futures.transform() and Futures.lazyTransform().

Let's see how we can use Futures.transform() to transform the result of a future. This can be used as long as the transformation computation is not heavy:

ListenableFuture<List<String>> cartItemsTask = service.getCartItems();

Function<List<String>, Integer> itemCountFunc = cartItems -> {
    assertNotNull(cartItems);
    return cartItems.size();
};

ListenableFuture<Integer> itemCountTask = Futures.transform(cartItemsTask, itemCountFunc, listenExecService);

We can also use Futures.lazyTransform() to apply a transformation function to a java.util.concurrent.Future. We need to keep in mind that this option doesn't return a ListenableFuture but a normal java.util.concurrent.Future and that the transformation function applies every time get() is invoked on the resulting future.

4.4. Chaining Futures

We may come across situations where our futures need to call other futures. In such cases, Guava provides us with async() variants to safely chain these futures to execute one after the other.

Let's see how we can use Futures.submitAsync() to call a future from inside the Callable that is submitted:

AsyncCallable<String> asyncConfigTask = () -> {
    ListenableFuture<String> configTask = service.fetchConfig("config.a");
    TimeUnit.MILLISECONDS.sleep(500); //some long running task
    return configTask;
};

ListenableFuture<String> configTask = Futures.submitAsync(asyncConfigTask, executor);

In case we want true chaining, where the result of one future is fed into the computation of another future, we can use Futures.transformAsync():

ListenableFuture<String> usernameTask = service.generateUsername("john");
AsyncFunction<String, String> passwordFunc = username -> {
    ListenableFuture<String> generatePasswordTask = service.generatePassword(username);
    TimeUnit.MILLISECONDS.sleep(500); // some long running task
    return generatePasswordTask;
};

ListenableFuture<String> passwordTask = Futures.transformAsync(usernameTask, passwordFunc, executor);

Guava also provides us with Futures.scheduleAsync() and Futures.catchingAsync() to submit a scheduled task and to provide fallback tasks on error recovery, respectively. While they cater to different scenarios, we won't discuss them since they are similar to the other async() calls.

5. Usage Dos and Don'ts

Let's now investigate some common pitfalls we may encounter when working with futures and how to avoid them.

5.1. Working vs. Listening Executors

It is important to understand the difference between the working executor and the listening executor when using Guava futures. For example, let's say we have an asynchronous task to fetch configs:

public ListenableFuture<String> fetchConfig(String configKey) {
    return lExecService.submit(() -> {
        TimeUnit.MILLISECONDS.sleep(500);
        return String.format("%s.%d", configKey, new Random().nextInt(Integer.MAX_VALUE));
    });
}

Let's also say that we want to attach a listener to the above future:

ListenableFuture<String> configsTask = service.fetchConfig("config.0");
Futures.addCallback(configsTask, someListener, listeningExecutor);

Notice that the lExecService here is the executor that is running our asynchronous task, while the listeningExecutor is the executor on which our listener is invoked.

As seen above, we should always consider separating these two executors to avoid scenarios where our listeners and workers are competing for the same thread pool resources. Sharing the same executor may cause our heavy-duty tasks to starve the listener executions. Or a badly written heavyweight listener ends up blocking our important heavy-duty tasks.

5.2. Be Careful With directExecutor()

While we can use MoreExecutors.directExecutor() and MoreExecutors.newDirectExecutorService() in unit testing to make it easier to handle asynchronous executions, we should be careful using them in production code.

When we obtain executors from the above methods, any tasks that we submit to it, be it heavyweight or listeners, will be executed on the current thread. This can be dangerous if the current execution context is one that requires high throughput.

For example, using a directExecutor and submitting a heavyweight task to it in the UI thread will automatically block our UI thread.

We could also face a scenario where our listener ends up slowing down all our other listeners (even the ones that aren't involved with directExecutor). This is because Guava executes all listeners in a while loop in their respective Executors, but the directExecutor will cause the listener to run in the same thread as the while loop.

5.3. Nesting Futures is Bad

When working with chained futures, we should be careful not to call one from inside another future in such a way that it creates nested futures:

public ListenableFuture<String> generatePassword(String username) {
    return lExecService.submit(() -> {
        TimeUnit.MILLISECONDS.sleep(500);
        return username + "123";
    });
}

String firstName = "john";
ListenableFuture<ListenableFuture<String>> badTask = lExecService.submit(() -> {
    final String username = firstName.replaceAll("[^a-zA-Z]+", "")
        .concat("@service.com");
    return generatePassword(username);
});

If we ever see code that has ListenableFuture<ListenableFuture<V>>, then we should know that this is a badly written future because there is a chance that cancellation and completion of the outer future may race, and the cancellation may not propagate to the inner future.

If we see the above scenario, we should always use the Futures.async() variants to safely unwrap these chained futures in a connected fashion.

5.4. Be Careful With JdkFutureAdapters.listenInPoolThread()

Guava recommends that the best way we can leverage its ListenableFuture is by converting all our code that uses Future to ListenableFuture. 

If this conversion is not feasible in some scenarios, Guava provides us with adapters to do this using the  JdkFutureAdapters.listenInPoolThread() overrides. While this may seem helpful, Guava warns us that these are heavyweight adapters and should be avoided where possible.

6. Conclusion

In this article, we have seen how we can use Guava's ListenableFuture to enrich our usage of futures and how to use the Futures API to make it easier to work with these futures.

We have also seen some common errors that we may make when working with these futures and the provided executors.

As always, the full source code with our examples is available over on GitHub.

Generic bottom

Get started with Spring 5 and Spring Boot 2, through the Learn Spring course:

>> CHECK OUT THE COURSE
Generic footer banner
guest
0 Comments
Inline Feedbacks
View all comments