Course – LS – All

Get started with Spring and Spring Boot, through the Learn Spring course:

>> CHECK OUT THE COURSE

1. Overview

In this tutorial, we’re going to learn about RxJava hooks. We’ll be creating short examples to demonstrate how the hooks work in different situations.

2. What Are RxJava Hooks?

As the name depicts, RxJava hooks allow us to hook into the lifecycle of Observable, Completable, Maybe, Flowable, and Single. In addition, RxJava allows us to add lifecycle hooks to the schedulers returned by Schedulers. Furthermore, we can specify a global error handler also using the hooks.

In RxJava 1, the class RxJavaHooks is used to define the hooks. But, the hooking mechanism is completely re-written in RxJava 2. Now the class RxJavaHooks is no longer available to define the hooks. Instead, we should use RxJavaPlugins to implement the lifecycle hooks.

The RxJavaPlugins class has a number of setter methods to set the hooks. These hooks are global. Once they’re set, then we have to either call the reset() method of the RxJavaPlugins class, or call the setter method for the individual hook to remove it.

3. Hook for Error Handling

We can use the setErrorHandler( ) method to handle the errors that can’t be emitted because the downstream’s lifecycle already reached its terminal state. Let’s see how we can implement an error handler and test it:

RxJavaPlugins.setErrorHandler(throwable -> {
    hookCalled = true;
});

Observable.error(new IllegalStateException()).subscribe();

assertTrue(hookCalled);

Not all the exceptions are thrown as-is. However, RxJava will check if the thrown error is one of the already named bug cases that should pass through as-is, otherwise it will be wrapped into an UndeliverableException. The exceptions that are named as bug cases are:

  • OnErrorNotImplementedException – when user forgets to add the onError handler in subscribe() method
  • MissingBackpressureException – due to either an operator bug or concurrent onNext
  • IllegalStateException – when general protocol violations occur
  • NullPointerException – standard null pointer exception
  • IllegalArgumentException – due to invalid user input
  • CompositeException – due to a crash while handling an exception

4. Hooks for Completable

The RxJava Completable has two lifecycle hooks. Let’s take a look at them now.

4.1. setOnCompletableAssembly

RxJava will call this hook when it instantiates operators and sources on Completable. We can use the current Completable object, provided as an argument to the hook function, for any operation on it:

RxJavaPlugins.setOnCompletableAssembly(completable -> {
    hookCalled = true;
    return completable;
});

Completable.fromSingle(Single.just(1));

assertTrue(hookCalled);

4.2. setOnCompletableSubscribe

RxJava calls this hook before a subscriber subscribes to a Completable:

RxJavaPlugins.setOnCompletableSubscribe((completable, observer) -> {
    hookCalled = true;
    return observer;
});

Completable.fromSingle(Single.just(1)).test();

assertTrue(hookCalled);

5. Hooks for Observable

Next, let’s take a look at the RxJava’s three lifecycle hooks for Observable.

5.1. setOnObservableAssembly

RxJava calls this hook when it instantiates operators and sources on Observable:

RxJavaPlugins.setOnObservableAssembly(observable -> {
    hookCalled = true;
    return observable;
});

Observable.range(1, 10);

assertTrue(hookCalled);

5.2. setOnObservableSubscribe

RxJava calls this hook before a subscriber subscribes to an Observable:

RxJavaPlugins.setOnObservableSubscribe((observable, observer) -> {
    hookCalled = true;
    return observer;
});

Observable.range(1, 10).test();

assertTrue(hookCalled);

5.3. setOnConnectableObservableAssembly

This hook is intended for the ConnectableObservable. A ConnectableObservable is a variant of an Observable itself. The only difference is that it does not begin emitting items when it is subscribed to, but only when its connect() method is called:

RxJavaPlugins.setOnConnectableObservableAssembly(connectableObservable -> {
    hookCalled = true;
    return connectableObservable;
});

ConnectableObservable.range(1, 10).publish().connect();

assertTrue(hookCalled);

6. Hooks for Flowable

Now, let’s take a look at the lifecycle hooks defined for Flowable.

6.1. setOnFlowableAssembly

RxJava calls this hook when it instantiates operators and sources on Flowable:

RxJavaPlugins.setOnFlowableAssembly(flowable -> {
    hookCalled = true;
    return flowable;
});

Flowable.range(1, 10);

assertTrue(hookCalled);

6.2. setOnFlowableSubscribe

RxJava calls this hook before a subscriber subscribes to Flowable:

RxJavaPlugins.setOnFlowableSubscribe((flowable, observer) -> {
    hookCalled = true;
    return observer;
});

Flowable.range(1, 10).test();

assertTrue(hookCalled);

6.3. setOnConnectableFlowableAssembly

RxJava calls this hook when it instantiates operators and sources on ConnectableFlowable. Like the ConnectableObservable, ConnectableFlowable also begins emitting items only when we call its connect() method:

RxJavaPlugins.setOnConnectableFlowableAssembly(connectableFlowable -> {
    hookCalled = true;
    return connectableFlowable;
});

ConnectableFlowable.range(1, 10).publish().connect();

assertTrue(hookCalled);

6.4. setOnParallelAssembly

A ParallelFlowable is for achieving parallelism among multiple publishers. RxJava calls the setOnParallelAssembly() hook when it  instantiates operators and sources on ParallelFlowable:

RxJavaPlugins.setOnParallelAssembly(parallelFlowable -> {
    hookCalled = true;
    return parallelFlowable;
});

Flowable.range(1, 10).parallel();

assertTrue(hookCalled);

7. Hooks for Maybe

The Maybe emitter has two hooks defined to control its lifecycle.

7.1. setOnMaybeAssembly

RxJava calls this hook when it instantiates operators and sources on Maybe:

RxJavaPlugins.setOnMaybeAssembly(maybe -> {
    hookCalled = true;
    return maybe;
});

Maybe.just(1);

assertTrue(hookCalled);

7.2. setOnMaybeSubscribe

RxJava calls this hook before a subscriber subscribes to Maybe:

RxJavaPlugins.setOnMaybeSubscribe((maybe, observer) -> {
    hookCalled = true;
    return observer;
});

Maybe.just(1).test();

assertTrue(hookCalled);

8. Hooks for Single

RxJava defines the basic two hooks for Single emitter as well.

8.1. setOnSingleAssembly

RxJava calls this hook when it instantiates operators and sources on Single:

RxJavaPlugins.setOnSingleAssembly(single -> {
    hookCalled = true;
    return single;
});

Single.just(1);

assertTrue(hookCalled);

8.2. setOnSingleSubscribe

RxJava calls this hook before a subscriber subscribes to Single:

RxJavaPlugins.setOnSingleSubscribe((single, observer) -> {
    hookCalled = true;
    return observer;
});

Single.just(1).test();

assertTrue(hookCalled);

9. Hooks for Schedulers

Like the RxJava emitters, Schedulers also has a bunch of hooks to control their lifecycle. RxJava defines a common hook that gets called when we use any type of Schedulers. In addition, it’s possible to implement hooks that are specific to various Schedulers.

9.1. setScheduleHandler

RxJava calls this hook when we use any of the schedulers for an operation:

RxJavaPlugins.setScheduleHandler((runnable) -> {
    hookCalled = true;
    return runnable;
});

Observable.range(1, 10)
  .map(v -> v * 2)
  .subscribeOn(Schedulers.single())
  .test();

hookCalled = false;

Observable.range(1, 10)
  .map(v -> v * 2)
  .subscribeOn(Schedulers.computation())
  .test();

assertTrue(hookCalled);

Since we’ve repeated the operation with both single() and computation() schedulers, when we run this, the test case will print the message twice in the console.

9.2. Hooks for Computation Scheduler

The computation scheduler has two hooks – namely, setInitComputationSchedulerHandler and setComputationSchedulerHandler.

When RxJava initializes a computation scheduler, it calls the hook we set using the setInitComputationSchedulerHandler function. And, furthermore, it calls the hook we set using the setComputationSchedulerHandler function when we schedule a task with Schedulers.computation():

RxJavaPlugins.setInitComputationSchedulerHandler((scheduler) -> {
    initHookCalled = true;
    return scheduler.call();
});

RxJavaPlugins.setComputationSchedulerHandler((scheduler) -> {
    hookCalled = true;
    return scheduler;
});

Observable.range(1, 10)
  .map(v -> v * 2)
  .subscribeOn(Schedulers.computation())
  .test();

assertTrue(hookCalled && initHookCalled);

9.3. Hooks for IO Scheduler

The IO scheduler also has two hooks – namely, setInitIoSchedulerHandler and setIoSchedulerHandler:

RxJavaPlugins.setInitIoSchedulerHandler((scheduler) -> {
    initHookCalled = true;
    return scheduler.call();
});

RxJavaPlugins.setIoSchedulerHandler((scheduler) -> {
    hookCalled = true;
    return scheduler;
});

Observable.range(1, 10)
  .map(v -> v * 2)
  .subscribeOn(Schedulers.io())
  .test();

assertTrue(hookCalled && initHookCalled);

9.4. Hooks for Single Scheduler

Now, let’s see the hooks for Single scheduler:

RxJavaPlugins.setInitSingleSchedulerHandler((scheduler) -> {
    initHookCalled = true;
    return scheduler.call();
});

RxJavaPlugins.setSingleSchedulerHandler((scheduler) -> {
    hookCalled = true;
    return scheduler;
});

Observable.range(1, 10)
  .map(v -> v * 2)
  .subscribeOn(Schedulers.single())
  .test();

assertTrue(hookCalled && initHookCalled);

9.5. Hooks for NewThread Scheduler

Like other schedulers, NewThread scheduler also defines two hooks:

RxJavaPlugins.setInitNewThreadSchedulerHandler((scheduler) -> {
    initHookCalled = true;
    return scheduler.call();
});

RxJavaPlugins.setNewThreadSchedulerHandler((scheduler) -> {
    hookCalled = true;
    return scheduler;
});

Observable.range(1, 15)
  .map(v -> v * 2)
  .subscribeOn(Schedulers.newThread())
  .test();

assertTrue(hookCalled && initHookCalled);

10. Conclusion

In this tutorial, we’ve learned what are the various RxJava lifecycle hooks and how can we implement them. Among these hooks, the error handling hook is the most noteworthy one. But, we can use others for auditing purposes, like logging the number of subscribers and other specific use cases.

And, as usual, all of the short examples we’ve discussed here can be found over on GitHub.

Course – LS – All

Get started with Spring and Spring Boot, through the Learn Spring course:

>> CHECK OUT THE COURSE
res – Junit (guide) (cat=Reactive)
Comments are open for 30 days after publishing a post. For any issues past this date, use the Contact form on the site.