Course – LS – All

Get started with Spring and Spring Boot, through the Learn Spring course:

>> CHECK OUT THE COURSE

1. Overview

In this tutorial, we’ll play with RxJava’s Completable type, which represents a computation result without an actual value.

2. RxJava Dependency

Let’s include the RxJava 2 dependency into our Maven project:

<dependency>
    <groupId>io.reactivex.rxjava2</groupId>
    <artifactId>rxjava</artifactId>
    <version>2.2.2</version>
</dependency>

We can usually find the latest version on Maven Central.

3. Completable Type

Completable is similar to Observable with the only exception that the former emits either completion or error signals but no items. The Completable class contains several convenience methods for creating or obtaining it from different reactive sources.

We can spawn an instance that completes immediately by using Completable.complete()

Then, we can observe its state by using DisposableCompletableObserver:

Completable
  .complete()
  .subscribe(new DisposableCompletableObserver() {
    @Override
    public void onComplete() {
        System.out.println("Completed!");
    }

    @Override
    public void onError(Throwable e) {
        e.printStackTrace();
    }
});

Additionally, we can construct a Completable instance from Callable, Action, and Runnable:

Completable.fromRunnable(() -> {});

Also, we can get Completable instances from other types using either Completable.from() or calling ignoreElement() on MaybeSingleFlowable, and Observable sources themselves:

Flowable<String> flowable = Flowable
  .just("request received", "user logged in");
Completable flowableCompletable = Completable
  .fromPublisher(flowable);
Completable singleCompletable = Single.just(1)
  .ignoreElement();

4. Chaining Completables

We can employ chaining of Completables in many real-world use cases when we only care about the success of operation:

  • All-or-nothing actions like doing a PUT request to update a remote object followed by a local database update upon the success
  • Post-factum logging and journaling
  • Orchestration of several actions, e.g. running an analytics job after an ingestion action gets completed

We’ll keep examples simple and problem-agnostic. Consider we have several Completable instances:

Completable first = Completable
  .fromSingle(Single.just(1));
Completable second = Completable
  .fromRunnable(() -> {});
Throwable throwable = new RuntimeException();
Completable error = Single.error(throwable)
  .ignoreElement();

To combine two Completables into a single one, we can use the andThen() operator:

first
  .andThen(second)
  .test()
  .assertComplete();

We can chain as many Completables as needed. At the same time, if at least one of the sources fails to complete, resulting Completable won’t fire onComplete() as well:

first
  .andThen(second)
  .andThen(error)
  .test()
  .assertError(throwable);

Furthermore, if one of the sources is infinite or doesn’t reach onComplete for some reason, the resulting Completable will never fire onComplete() nor onError() as well.

A good thing that we can still test this scenario:

...
  .andThen(Completable.never())
  .test()
  .assertNotComplete();

5. Composing Series of Completables

Imagine we have a bunch of Completables. As of practical use case, suppose we need to register a user within several separate subsystems.

To join all Completables into a single one, we can use the merge() family of methods. The merge() operator allows subscribing to all sources.

The resulting instance completes once all of the sources are completed. Additionally, it terminates with onError when any of the sources emits an error:

Completable.mergeArray(first, second)
  .test()
  .assertComplete();

Completable.mergeArray(first, second, error)
  .test()
  .assertError(throwable);

Let’s move on to a slightly different use case. Let’s say we need to execute an action for every element obtained from a Flowable.

Then, we want a single Completable for both the completion of the upstream and all the element-level actions. The flatMapCompletable() operator comes to help in this case:

Completable allElementsCompletable = Flowable
  .just("request received", "user logged in")
  .flatMapCompletable(message -> Completable
      .fromRunnable(() -> System.out.println(message))
  );
allElementsCompletable
  .test()
  .assertComplete();

Similarly, the above method is available for the rest of the base reactive classes like ObservableMaybe, or Single.

As a practical context for flatMapCompletable(), we could think about decorating every item with some side effect. We can write a log entry per completed element or make a storage snapshot upon each successful action.

Finally, we may need to construct a Completable from a couple of other sources and get it terminated as soon as either of them completes. That’s where amb operators can help.

The amb prefix is a short-hand for “ambiguous”, implying the uncertainty about which Completable exactly gets completed. For example, ambArray():

Completable.ambArray(first, Completable.never(), second)
  .test()
  .assertComplete();

Note, that the above Completable may also terminate with onError() instead of onComplete() depending on which source completable terminates first:

Completable.ambArray(error, first, second)
  .test()
  .assertError(throwable);

Also, once the first source terminates, the remaining sources are guaranteed to be disposed of.

That means all remaining running Completables are stopped via Disposable.dispose() and corresponding CompletableObservers will be unsubscribed.

Concerning a practical example, we can use amb() when we stream a backup file to a several equivalents remote storages. And we complete the process once the first-best backup finishes or repeat the process upon error.

6. Conclusion

In this article, we briefly reviewed the Completable type of RxJava.

We started with different options for obtaining Completable instances and then chained and composed Completables by using the andThen(), merge(), flatMapCompletable(), and amb…() operators.

We can find the source for all code samples over on GitHub.

Course – LS – All

Get started with Spring and Spring Boot, through the Learn Spring course:

>> CHECK OUT THE COURSE
res – Junit (guide) (cat=Reactive)
Comments are open for 30 days after publishing a post. For any issues past this date, use the Contact form on the site.