Course – LS – All

Get started with Spring and Spring Boot, through the Learn Spring course:

>> CHECK OUT THE COURSE

1. Overview

In this article, we’ll be looking at ways of testing code written using RxJava.

The typical flow we are creating with RxJava consists of an Observable and an Observer. The observable is a source of a data that is a sequence of elements. One or more observers subscribe to it to receive emitted events.

Typically, the observer and observables are executed in separate threads in an asynchronous fashion – that makes the code hard to test in a traditional way.

Fortunately, RxJava provides a TestSubscriber class which gives us the ability to test asynchronous, event-driven flow.

2. Testing RxJava – the Traditional Way

Let’s start with an example – we have a sequence of letters that we want to zip with a sequence of integers from 1 inclusive.

Our test should assert that a subscriber that listens to events emitted by zipped observable receives letters zipped with integers.

Writing such a test in a traditional way means that we need to keep a list of results and update that list from an observer. Adding elements to a list of integers means that our observable and observers need to work in the same thread – they cannot work asynchronously.

And so we would be missing one of the biggest advantages of RxJava – processing of events in separate threads.

Here’s what that limited version of the test would look like:

List<String> letters = Arrays.asList("A", "B", "C", "D", "E");
List<String> results = new ArrayList<>();
Observable<String> observable = Observable
  .from(letters)
  .zipWith(
     Observable.range(1, Integer.MAX_VALUE), 
     (string, index) -> index + "-" + string);

observable.subscribe(results::add);

assertThat(results, notNullValue());
assertThat(results, hasSize(5));
assertThat(results, hasItems("1-A", "2-B", "3-C", "4-D", "5-E"));

We’re aggregating results from an observer by adding elements to a results list. The observer and the observable work in the same thread so our assertion blocks properly and waits for a subscribe() method to finish.

3. Testing RxJava Using a TestSubscriber

RxJava comes with a TestSubsriber class that allows us to write tests that work with an asynchronous processing of events. This is a normal observer that subscribes to the observable.

In a test, we can examine the state of a TestSubscriber and make assertions on that state:

List<String> letters = Arrays.asList("A", "B", "C", "D", "E");
TestSubscriber<String> subscriber = new TestSubscriber<>();

Observable<String> observable = Observable
  .from(letters)
  .zipWith(
    Observable.range(1, Integer.MAX_VALUE), 
    ((string, index) -> index + "-" + string));

observable.subscribe(subscriber);

subscriber.assertCompleted();
subscriber.assertNoErrors();
subscriber.assertValueCount(5);
assertThat(
  subscriber.getOnNextEvents(),
  hasItems("1-A", "2-B", "3-C", "4-D", "5-E"));

We are passing a TestSubscriber instance to a subscribe() method on the observable. Then we can examine the state of this subscriber.

TestSubscriber has some very useful assertion methods that we’ll use to validate our expectations. The subscriber should receive 5 emitted elements by an observer and we assert that by calling the assertValueCount() method.

We can examine all events that a subscriber received by calling the getOnNextEvents() method.

Calling the assertCompleted() method checks if a stream to which the observer is subscribed to is completed. The assertNoErrors() method asserts that there were no errors while subscribing to a stream.

4. Testing Expected Exceptions

Sometimes in our processing, when an observable is emitting events or an observer is processing events, an error occurs. The TestSubscriber has a special method for examining error state – the assertError() method that takes the type of an exception as an argument:

List<String> letters = Arrays.asList("A", "B", "C", "D", "E");
TestSubscriber<String> subscriber = new TestSubscriber<>();

Observable<String> observable = Observable
  .from(letters)
  .zipWith(Observable.range(1, Integer.MAX_VALUE), ((string, index) -> index + "-" + string))
  .concatWith(Observable.error(new RuntimeException("error in Observable")));

observable.subscribe(subscriber);

subscriber.assertError(RuntimeException.class);
subscriber.assertNotCompleted();

We are creating the observable that is joined with another observable using the concatWith() method. The second observable throws a RuntimeException while emitting next event. We can examine a type of that exception on a TestSubsciber by calling the assertError() method.

The observer that receives an error ceases processing and ends up in a non-completed state. That state can be checked by the assertNotCompleted() method.

5. Testing Time-Based Observable

Let’s say that we have an Observable that emits one event per second and we want to test that behavior with a TestSubsciber.

We can define a time-based Observable using the Observable.interval() method and pass a TimeUnit as an argument:

List<String> letters = Arrays.asList("A", "B", "C", "D", "E");
TestScheduler scheduler = new TestScheduler();
TestSubscriber<String> subscriber = new TestSubscriber<>();
Observable<Long> tick = Observable.interval(1, TimeUnit.SECONDS, scheduler);

Observable<String> observable = Observable.from(letters)
  .zipWith(tick, (string, index) -> index + "-" + string);

observable.subscribeOn(scheduler)
  .subscribe(subscriber);

The tick observable will emit a new value every one second.

At the beginning of a test we are at time zero, so our TestSubscriber will not be completed:

subscriber.assertNoValues();
subscriber.assertNotCompleted();

To emulate time passing in our test we need to use a TestScheduler class. We can simulate that one-second pass by calling the advanceTimeBy() method on a TestScheduler:

scheduler.advanceTimeBy(1, TimeUnit.SECONDS);

The advanceTimeBy() method will make an observable produce one event. We can assert that one event was produced by calling an assertValueCount() method:

subscriber.assertNoErrors();
subscriber.assertValueCount(1);
subscriber.assertValues("0-A");

Our list of letters has 5 elements in it so when we want to cause an observable to emit all events, 6 seconds of processing needs to pass. To emulate that 6 seconds, we use the advanceTimeTo() method:

scheduler.advanceTimeTo(6, TimeUnit.SECONDS);
 
subscriber.assertCompleted();
subscriber.assertNoErrors();
subscriber.assertValueCount(5);
assertThat(subscriber.getOnNextEvents(), hasItems("0-A", "1-B", "2-C", "3-D", "4-E"));

After emulating passed time, we can execute assertions on a TestSubscriber. We can assert that all events were produced by calling the assertValueCount() method.

6. Conclusion

In this article, we examined ways of testing observers and observables in RxJava. We looked at a way of testing emitted events, errors and time-based observables.

The implementation of all these examples and code snippets can be found in the GitHub project – this is a Maven project, so it should be easy to import and run as it is.

Course – LS – All

Get started with Spring and Spring Boot, through the Learn Spring course:

>> CHECK OUT THE COURSE
res – Junit (guide) (cat=Reactive)
Comments are open for 30 days after publishing a post. For any issues past this date, use the Contact form on the site.