Course – LS – All

Get started with Spring and Spring Boot, through the Learn Spring course:

>> CHECK OUT THE COURSE

1. Introduction

The popularity of RxJava has led to the creation of multiple third-party libraries that extend its functionality.

Many of those libraries were an answer to typical problems that developers were dealing with when using RxJava. RxRelay is one of these solutions.

2. Dealing With a Subject

Simply put, a Subject acts as a bridge between Observable and Observer. Since it’s an Observer, it can subscribe to one or more Observables and receive events from them.

Also, given it’s at the same time an Observable, it can reemit events or emit new events to its subscribers. More information about the Subject can be found in this article.

One of the issues with Subject is that after it receives onComplete() or onError() – it’s no longer able to move data. Sometimes it’s the desired behavior, but sometimes it’s not.

In cases when such behavior isn’t desired, we should consider using RxRelay.

3. Relay

A Relay is basically a Subject, but without the ability to call onComplete() and onError(), thus it’s constantly able to emit data.

This allows us to create bridges between different types of API without worrying about accidentally triggering the terminal state.

To use RxRelay we need to add the following dependency to our project:

<dependency>
  <groupId>com.jakewharton.rxrelay2</groupId>
  <artifactId>rxrelay</artifactId>
  <version>1.2.0</version>
</dependency>

4. Types of Relay

There’re three different types of Relay available in the library. We’ll quickly explore all three here.

4.1. PublishRelay

This type of Relay will reemit all events once the Observer has subscribed to it.

The events will be emitted to all subscribers:

public void whenObserverSubscribedToPublishRelay_itReceivesEmittedEvents() {
    PublishRelay<Integer> publishRelay = PublishRelay.create();
    TestObserver<Integer> firstObserver = TestObserver.create();
    TestObserver<Integer> secondObserver = TestObserver.create();
    
    publishRelay.subscribe(firstObserver);
    firstObserver.assertSubscribed();
    publishRelay.accept(5);
    publishRelay.accept(10);
    publishRelay.subscribe(secondObserver);
    secondObserver.assertSubscribed();
    publishRelay.accept(15);
    firstObserver.assertValues(5, 10, 15);
    
    // second receives only the last event
    secondObserver.assertValue(15);
}

There’s no buffering of events in this case, so this behavior is similar to a cold Observable.

4.2. BehaviorRelay

This type of Relay will reemit the most recent observed event and all subsequent events once the Observer has subscribed:

public void whenObserverSubscribedToBehaviorRelay_itReceivesEmittedEvents() {
    BehaviorRelay<Integer> behaviorRelay = BehaviorRelay.create();
    TestObserver<Integer> firstObserver = TestObserver.create();
    TestObserver<Integer> secondObserver = TestObserver.create();
    behaviorRelay.accept(5);     
    behaviorRelay.subscribe(firstObserver);
    behaviorRelay.accept(10);
    behaviorRelay.subscribe(secondObserver);
    behaviorRelay.accept(15);
    firstObserver.assertValues(5, 10, 15);
    secondObserver.assertValues(10, 15);
}

When we’re creating the BehaviorRelay we can specify the default value, which will be emitted, if there’re no other events to emit.

To specify the default value we can use createDefault() method:

public void whenObserverSubscribedToBehaviorRelay_itReceivesDefaultValue() {
    BehaviorRelay<Integer> behaviorRelay = BehaviorRelay.createDefault(1);
    TestObserver<Integer> firstObserver = new TestObserver<>();
    behaviorRelay.subscribe(firstObserver);
    firstObserver.assertValue(1);
}

If we don’t want to specify the default value, we can use the create() method:

public void whenObserverSubscribedToBehaviorRelayWithoutDefaultValue_itIsEmpty() {
    BehaviorRelay<Integer> behaviorRelay = BehaviorRelay.create();
    TestObserver<Integer> firstObserver = new TestObserver<>();
    behaviorRelay.subscribe(firstObserver);
    firstObserver.assertEmpty();
}

4.3. ReplayRelay

This type of Relay buffers all events it has received and then reemits it to all subscribers that subscribe to it:

 public void whenObserverSubscribedToReplayRelay_itReceivesEmittedEvents() {
    ReplayRelay<Integer> replayRelay = ReplayRelay.create();
    TestObserver<Integer> firstObserver = TestObserver.create();
    TestObserver<Integer> secondObserver = TestObserver.create();
    replayRelay.subscribe(firstObserver);
    replayRelay.accept(5);
    replayRelay.accept(10);
    replayRelay.accept(15);
    replayRelay.subscribe(secondObserver);
    firstObserver.assertValues(5, 10, 15);
    secondObserver.assertValues(5, 10, 15);
}

All elements are buffered and all subscribers will receive the same events, so this behavior is similar to the cold Observable.

When we’re creating the ReplayRelay we can provide maximal buffer size and time to live for events.

To create the Relay with limited buffer size we can use the createWithSize() method. When there’re more events to be buffered than the set buffer size, previous elements will be discarded:

public void whenObserverSubscribedToReplayRelayWithLimitedSize_itReceivesEmittedEvents() {
    ReplayRelay<Integer> replayRelay = ReplayRelay.createWithSize(2);
    TestObserver<Integer> firstObserver = TestObserver.create();
    replayRelay.accept(5);
    replayRelay.accept(10);
    replayRelay.accept(15);
    replayRelay.accept(20);
    replayRelay.subscribe(firstObserver);
    firstObserver.assertValues(15, 20);
}

We can also create ReplayRelay with max time to leave for buffered events using the createWithTime() method:

public void whenObserverSubscribedToReplayRelayWithMaxAge_thenItReceivesEmittedEvents() {
    SingleScheduler scheduler = new SingleScheduler();
    ReplayRelay<Integer> replayRelay =
      ReplayRelay.createWithTime(2000, TimeUnit.MILLISECONDS, scheduler);
    long current =  scheduler.now(TimeUnit.MILLISECONDS);
    TestObserver<Integer> firstObserver = TestObserver.create();
    replayRelay.accept(5);
    replayRelay.accept(10);
    replayRelay.accept(15);
    replayRelay.accept(20);
    Thread.sleep(3000);
    replayRelay.subscribe(firstObserver);
    firstObserver.assertEmpty();
}

5. Custom Relay

All types described above extend the common abstract class Relay, this gives us an ability to write our own custom Relay class.

To create a custom Relay we need to implement three methods: accept(), hasObservers() and subscribeActual().

Let’s write a simple Relay that will reemit event to one of the subscribers chosen at random:

public class RandomRelay extends Relay<Integer> {
    Random random = new Random();

    List<Observer<? super Integer>> observers = new ArrayList<>();

    @Override
    public void accept(Integer integer) {
        int observerIndex = random.nextInt() % observers.size();
        observers.get(observerIndex).onNext(integer);
    }

    @Override
    public boolean hasObservers() {
        return observers.isEmpty();
    }

    @Override
    protected void subscribeActual(Observer<? super Integer> observer) {
        observers.add(observer);
        observer.onSubscribe(Disposables.fromRunnable(
          () -> System.out.println("Disposed")));
    }
}

We can now test that only one subscriber will receive the event:

public void whenTwoObserversSubscribedToRandomRelay_thenOnlyOneReceivesEvent() {
    RandomRelay randomRelay = new RandomRelay();
    TestObserver<Integer> firstObserver = TestObserver.create();
    TestObserver<Integer> secondObserver = TestObserver.create();
    randomRelay.subscribe(firstObserver);
    randomRelay.subscribe(secondObserver);
    randomRelay.accept(5);
    if(firstObserver.values().isEmpty()) {
        secondObserver.assertValue(5);
    } else {
        firstObserver.assertValue(5);
        secondObserver.assertEmpty();
    }
}

6. Conclusion

In this tutorial, we had a look at RxRelay, a type similar to Subject but without the ability to trigger the terminal state.

More information can be found in the documentation. And, as always all the code samples can be found over on GitHub.

Course – LS – All

Get started with Spring and Spring Boot, through the Learn Spring course:

>> CHECK OUT THE COURSE
res – Junit (guide) (cat=Reactive)
Comments are open for 30 days after publishing a post. For any issues past this date, use the Contact form on the site.