Course – LS – All

Get started with Spring and Spring Boot, through the Learn Spring course:

>> CHECK OUT THE COURSE

1. Introduction

Following the introduction to RxJava article, we’re going to look at aggregate and mathematical operators.

These operations must wait for the source Observable to emit all items. Because of this, these operators are dangerous to use on Observables that may represent very long or infinite sequences.

Secondly, all the examples use an instance of the TestSubscriber, a particular variety of Subscriber that can be used for unit testing, to perform assertions, inspect received events or wrap a mocked Subscriber.

Now, let’s start looking at the Mathematical operators.

2. Setup

To use additional operators, we’ll need to add the additional dependency to the pom.xml:

<dependency>
    <groupId>io.reactivex</groupId>
    <artifactId>rxjava-math</artifactId>
    <version>1.0.0</version>
</dependency>

Or, for a Gradle project:

compile 'io.reactivex:rxjava-math:1.0.0'

3. Mathematical Operators

The MathObservable is dedicated to performing mathematical operations and its operators use another Observable that emits items that can be evaluated as numbers.

3.1. Average

The average operator emits a single value – the average of all values emitted by the source.

Let’s see that in action:

Observable<Integer> sourceObservable = Observable.range(1, 20);
TestSubscriber<Integer> subscriber = TestSubscriber.create();

MathObservable.averageInteger(sourceObservable).subscribe(subscriber);

subscriber.assertValue(10);

There are four similar operators for dealing with primitive values: averageInteger, averageLong, averageFloat, and averageDouble.

3.2. Max

The max operator emits the largest encountered number.

Let’s see that in action:

Observable<Integer> sourceObservable = Observable.range(1, 20);
TestSubscriber<Integer> subscriber = TestSubscriber.create();

MathObservable.max(sourceObservable).subscribe(subscriber);

subscriber.assertValue(9);

It’s important to note that the max operator has an overloaded method that takes a comparison function.

Considering the fact that the mathematical operators can also work on objects that can be managed as numbers, the max overloaded operator allows for comparing custom types or custom sorting of standard types.

Let’s define the Item class:

class Item {
    private Integer id;

    // standard constructors, getter, and setter
}

We can now define the itemObservable and then use the max operator in order emit the Item with the highest id:

Item five = new Item(5);
List<Item> list = Arrays.asList(
  new Item(1), 
  new Item(2), 
  new Item(3), 
  new Item(4), 
  five);
Observable<Item> itemObservable = Observable.from(list);

TestSubscriber<Item> subscriber = TestSubscriber.create();

MathObservable.from(itemObservable)
  .max(Comparator.comparing(Item::getId))
  .subscribe(subscriber);

subscriber.assertValue(five);

3.3. Min

The min operator emits a single item containing the smallest element from the source:

Observable<Integer> sourceObservable = Observable.range(1, 20);
TestSubscriber<Integer> subscriber = TestSubscriber.create();

MathObservable.min(sourceObservable).subscribe(subscriber);

subscriber.assertValue(1);

The min operator has an overloaded method that accepts a comparator instance:

Item one = new Item(1);
List<Item> list = Arrays.asList(
  one, 
  new Item(2), 
  new Item(3), 
  new Item(4), 
  new Item(5));
TestSubscriber<Item> subscriber = TestSubscriber.create();
Observable<Item> itemObservable = Observable.from(list);

MathObservable.from(itemObservable)
  .min(Comparator.comparing(Item::getId))
  .subscribe(subscriber);

subscriber.assertValue(one);

3.4. Sum

The sum operator emits a single value that represents the sum of all of the numbers emitted by the source Observable:

Observable<Integer> sourceObservable = Observable.range(1, 20);
TestSubscriber<Integer> subscriber = TestSubscriber.create();

MathObservable.sumInteger(sourceObservable).subscribe(subscriber);

subscriber.assertValue(210);

There are also primitive-specialized similar operators: sumInteger, sumLong, sumFloat, and sumDouble.

4. Aggregate Operators

4.1. Concat

The concat operator joins items emitted by the source together.

Let’s now define two Observables and concatenate them:

List<Integer> listOne = Arrays.asList(1, 2, 3, 4);
Observable<Integer> observableOne = Observable.from(listOne);

List<Integer> listTwo = Arrays.asList(5, 6, 7, 8);
Observable<Integer> observableTwo = Observable.from(listTwo);

TestSubscriber<Integer> subscriber = TestSubscriber.create();

Observable<Integer> concatObservable = observableOne
  .concatWith(observableTwo);

concatObservable.subscribe(subscriber);

subscriber.assertValues(1, 2, 3, 4, 5, 6, 7, 8);

Going into details, the concat operator waits with subscribing to each additional Observable that are passed to it until the previous one completes.

For this reason, concatenating a “hot” Observable, that begins emitting items immediately, will lead to the loss of any items that the “hot” Observable emits before all previous ones are completed.

4.2. Count

The count operator emits the count of all items emitted by the source:

Let’s count the numbers of items emitted by an Observable:

List<String> lettersList = Arrays.asList(
  "A", "B", "C", "D", "E", "F", "G");
TestSubscriber<Integer> subscriber = TestSubscriber.create();

Observable<Integer> sourceObservable = Observable
  .from(lettersList).count();
sourceObservable.subscribe(subscriber);

subscriber.assertValue(7);

If the source Observable terminates with an error, the count will pass a notification error without emitting an item. However, if it doesn’t terminate at all, the count will neither emit an item nor terminate.

For the count operation, there is also the countLong operator, that in the end emits a Long value, for those sequences that may exceed the capacity of an Integer.

4.3. Reduce

The reduce operator reduces all emitted elements into a single element by applying the accumulator function.

This process continues until the all the items are emitted and then the Observable, from the reduce, emits the final value returned from the function.

Now, let’s see how it’s possible to perform reduction of a list of String, concatenating them in the reverse order:

List<String> list = Arrays.asList("A", "B", "C", "D", "E", "F", "G");
TestSubscriber<String> subscriber = TestSubscriber.create();

Observable<String> reduceObservable = Observable.from(list)
  .reduce((letter1, letter2) -> letter2 + letter1);
reduceObservable.subscribe(subscriber);

subscriber.assertValue("GFEDCBA");

4.4. Collect

The collect operator is similar to the reduce operator, but it’s dedicated to collecting elements into a single mutable data structure.

It requires two parameters:

  • a function that returns the empty mutable data structure
  • a function that, when given the data structure and an emitted item, modifies the data structure appropriately

Let’s see how it can be possible to return a set of items from an Observable:

List<String> list = Arrays.asList("A", "B", "C", "B", "B", "A", "D");
TestSubscriber<HashSet> subscriber = TestSubscriber.create();

Observable<HashSet<String>> reduceListObservable = Observable
  .from(list)
  .collect(HashSet::new, HashSet::add);
reduceListObservable.subscribe(subscriber);

subscriber.assertValues(new HashSet(list));

4.5. ToList

The toList operator works just like the collect operation, but collects all elements into a single list – think about Collectors.toList() from the Stream API:
Observable<Integer> sourceObservable = Observable.range(1, 5);
TestSubscriber<List> subscriber = TestSubscriber.create();

Observable<List<Integer>> listObservable = sourceObservable
  .toList();
listObservable.subscribe(subscriber);

subscriber.assertValue(Arrays.asList(1, 2, 3, 4, 5));

4.6. ToSortedList

Just like in the previous example but the emitted list is sorted:

Observable<Integer> sourceObservable = Observable.range(10, 5);
TestSubscriber<List> subscriber = TestSubscriber.create();

Observable<List<Integer>> listObservable = sourceObservable
  .toSortedList();
listObservable.subscribe(subscriber);

subscriber.assertValue(Arrays.asList(10, 11, 12, 13, 14));

As we can see, the toSortedList uses the default comparison, but it’s possible to provide a custom comparator function. We can now see how it’s possible to sort the integers in a reverse order using a custom sort function:

Observable<Integer> sourceObservable = Observable.range(10, 5);
TestSubscriber<List> subscriber = TestSubscriber.create();

Observable<List<Integer>> listObservable 
  = sourceObservable.toSortedList((int1, int2) -> int2 - int1);
listObservable.subscribe(subscriber);

subscriber.assertValue(Arrays.asList(14, 13, 12, 11, 10));

4.7. ToMap

The toMap operator converts the sequence of items emitted by an Observable into a map keyed by a specified key function.

In particular, the toMap operator has different overloaded methods that require one, two or three of the following parameters:

  1. the keySelector that produces a key from the item
  2. the valueSelector that produces from the emitted item the actual value that will be stored in the map
  3. the mapFactory that creates the collection that will hold the items

Let’s start defining a simple class Book:

class Book {
    private String title;
    private Integer year;

    // standard constructors, getters, and setters
}

We can now see how it’s possible to convert a series of emitted Book items to a Map, having the book title as key and the year as the value:

Observable<Book> bookObservable = Observable.just(
  new Book("The North Water", 2016), 
  new Book("Origin", 2017), 
  new Book("Sleeping Beauties", 2017)
);
TestSubscriber<Map> subscriber = TestSubscriber.create();

Observable<Map<String, Integer>> mapObservable = bookObservable
  .toMap(Book::getTitle, Book::getYear, HashMap::new);
mapObservable.subscribe(subscriber);

subscriber.assertValue(new HashMap() {{
  put("The North Water", 2016);
  put("Origin", 2017);
  put("Sleeping Beauties", 2017);
}});

4.8. ToMultiMap

When mapping, it is very common that many values share the same key. The data structure that maps one key to multiple values is called a multimap.

This can be achieved with the toMultiMap operator that converts the sequence of items emitted by an Observable into a List that is also a map keyed by a specified key function.

This operator adds another parameter to those of the toMap operator, the collectionFactory. This parameter permits to specify in which collection type the value should be stored. Let’s see how this can be done:

Observable<Book> bookObservable = Observable.just(
  new Book("The North Water", 2016), 
  new Book("Origin", 2017), 
  new Book("Sleeping Beauties", 2017)
);
TestSubscriber<Map> subscriber = TestSubscriber.create();

Observable multiMapObservable = bookObservable.toMultimap(
  Book::getYear, 
  Book::getTitle, 
  () -> new HashMap<>(), 
  (key) -> new ArrayList<>()
);
multiMapObservable.subscribe(subscriber);

subscriber.assertValue(new HashMap() {{
    put(2016, Arrays.asList("The North Water"));
    put(2017, Arrays.asList("Origin", "Sleeping Beauties"));
}});

5. Conclusion

In this article, we explored the mathematical and aggregate operators available within RxJava – and, of course, simple example of how to use each.

As always, all code examples in this article can be found over on Github.

Course – LS – All

Get started with Spring and Spring Boot, through the Learn Spring course:

>> CHECK OUT THE COURSE
res – Junit (guide) (cat=Reactive)
Comments are open for 30 days after publishing a post. For any issues past this date, use the Contact form on the site.