Generic Top

I just announced the new Learn Spring course, focused on the fundamentals of Spring 5 and Spring Boot 2:

>> CHECK OUT THE COURSE

1. Introduction

Java Flow API was introduced in Java 9 as an implementation of Reactive Stream Specification.

In this tutorial, we'll first investigate reactive streams. Then, we'll learn about its relation to RxJava and Flow API.

2. What Are Reactive Streams?

The Reactive Manifesto introduced Reactive Streams to specify a standard for asynchronous stream processing with non-blocking backpressure.

The scope of the Reactive Stream Specification is to define a minimal set of interfaces to achieve those ends:

Flow API originates from the specification. RxJava precedes it, but since 2.0, RxJava has supported the spec, too.

We'll go deep into both, but first, let's see a practical use case.

3. Use Case

For this tutorial, we'll use a live stream video service as our use case.

A live stream video, contrary to on-demand video streaming, does not depend on the consumer. Therefore the server publishes the stream at its own pace, and it's the consumer's responsibility to adapt.

In the most simple form, our model consists of a video stream publisher and a video player as the subscriber.

Let's implement VideoFrame as our data item:

public class VideoFrame {
    private long number;
    // additional data fields

    // constructor, getters, setters
}

Then let's go through our Flow API and RxJava implementations one-by-one.

4. Implementation With Flow API

The Flow APIs in JDK 9 correspond to the Reactive Streams Specification. With the Flow API, if the application initially requests N items, then the publisher pushes at most N items to the subscriber.

The Flow API interfaces are all in the java.util.concurrent.Flow interface. They are semantically equivalent to their respective Reactive Streams counterparts.

Let's implement VideoStreamServer as the publisher of VideoFrame.

public class VideoStreamServer extends SubmissionPublisher<VideoFrame> {
  
    public VideoStreamServer() {
        super(Executors.newSingleThreadExecutor(), 5);
    }
}

We have extended our VideoStreamServer from SubmissionPublisher instead of directly implementing Flow::Publisher. SubmissionPublisher is JDK implementation of Flow::Publisher for asynchronous communication with subscribers, so it lets our VideoStreamServer to emit at its own pace.

Also, it's helpful for backpressure and buffer handling, because when SubmissionPublisher::subscribe called, it creates an instance of BufferedSubscription, and then adds the new subscription to its chain of subscriptions. BufferedSubscription can buffer issued items up to SubmissionPublisher#maxBufferCapacity.

Now let's define VideoPlayer, which consumes a stream of VideoFrame. Hence it must implement Flow::Subscriber.

public class VideoPlayer implements Flow.Subscriber<VideoFrame> {
   
    Flow.Subscription subscription = null;

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        this.subscription = subscription;
        subscription.request(1);
    }

    @Override
    public void onNext(VideoFrame item) {
        log.info("play #{}" , item.getNumber());
        subscription.request(1);
    }

    @Override
    public void onError(Throwable throwable) {
        log.error("There is an error in video streaming:{}" , throwable.getMessage());

    }

    @Override
    public void onComplete() {
        log.error("Video has ended");
    }
}

VideoPlayer subscribes to VideoStreamServer, then after a successful subscription VideoPlayer::onSubscribe method is called, and it requests for one frame. VideoPlayer::onNext receive the frame and requests for a new one. The number of the requested frames depends on the use case and Subscriber implementations.

Finally, let's put things together:

VideoStreamServer streamServer = new VideoStreamServer();
streamServer.subscribe(new VideoPlayer());

// submit video frames

ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
AtomicLong frameNumber = new AtomicLong();
executor.scheduleWithFixedDelay(() -> {
    streamServer.offer(new VideoFrame(frameNumber.getAndIncrement()), (subscriber, videoFrame) -> {
        subscriber.onError(new RuntimeException("Frame#" + videoFrame.getNumber()
        + " droped because of backpressure"));
        return true;
    });
}, 0, 1, TimeUnit.MILLISECONDS);

sleep(1000);

5. Implementation With RxJava

RxJava is a Java implementation of ReactiveX. The ReactiveX (or Reactive Extensions) project aims to provide a reactive programming concept. It's a combination of the Observer pattern, the Iterator pattern, and functional programming.

The latest major version for RxJava is 3.x. RxJava supports Reactive Streams since version 2.x with its Flowable base class, but it's a more significant set than Reactive Streams with several base classes like Flowable, Observable, Single, Completable.

Flowable as reactive stream compliance component is a flow of 0 to N items with backpressure handling. Flowable extends Publisher from Reactive Streams. Therefore many RxJava operators accept Publisher directly and allow direct interoperation with other Reactive Streams implementations.

Now, Let's make our video stream generator which is an infinite lazy stream:

Stream<VideoFrame> videoStream = Stream.iterate(new VideoFrame(0), videoFrame -> {
    // sleep for 1ms;
    return new VideoFrame(videoFrame.getNumber() + 1);
});

Then we define a Flowable instance to generate frames on a separate thread:

Flowable
  .fromStream(videoStream)
  .subscribeOn(Schedulers.from(Executors.newSingleThreadExecutor()))

It is important to note that an infinite stream is enough for us, but if we need a more flexible way to generate our stream, then Flowable.create is a good choice.

Flowable
  .create(new FlowableOnSubscribe<VideoFrame>() {
      AtomicLong frame = new AtomicLong();
      @Override
      public void subscribe(@NonNull FlowableEmitter<VideoFrame> emitter) {
          while (true) {
              emitter.onNext(new VideoFrame(frame.incrementAndGet()));
              //sleep for 1 ms to simualte delay
          }
      }
  }, /* Set Backpressure Strategy Here */)

Then, at the next step, VideoPlayer subscribes to this Flowable and observes items on a separate thread.

videoFlowable
  .observeOn(Schedulers.from(Executors.newSingleThreadExecutor()))
  .subscribe(item -> {
      log.info("play #" + item.getNumber());
      // sleep for 30 ms to simualate frame display
  });

And finally, we'll configure the strategy for backpressure. If we want to stop the video in case of frame loss, hence we have to use BackpressureOverflowStrategy::ERROR when the buffer is full.

Flowable
  .fromStream(videoStream)
  .subscribeOn(Schedulers.from(Executors.newSingleThreadExecutor()))
  .onBackpressureBuffer(5, null, BackpressureOverflowStrategy.ERROR)
  .observeOn(Schedulers.from(Executors.newSingleThreadExecutor()))
  .subscribe(item -> {
       log.info("play #" + item.getNumber()); 
       // sleep for 30 ms to simualate frame display 
  });

6. Comparison of RxJava and Flow API

Even in these two simple implementations, we can see how RxJava's API is rich, especially for buffer management, error handling, and backpressure strategy. It gives us more options and fewer lines of code with its fluent API. Now let's consider more complicated cases.

Assume that our player can't display video frames without a codec. Hence with Flow API, we need to implement a Processor to simulate the codec and sit between server and player. With RxJava, we can do it with Flowable::flatMap or Flowable::map.

Or let's imagine that our player is also going to broadcast live translation audio, so we have to combine streams of video and audio from separate publishers. With RxJava, we can use Flowable::combineLatest, but with Flow API, it is not an easy task.

Although, it is possible to write a custom Processor that subscribes to both streams and sends the combined data to our VideoPlayer. The implementation is a headache, however.

7. Why Flow API?

At this point, we may have a question, what is the philosophy behind the Flow API?

If we search for Flow API usages in the JDK, we can find something in java.net.http and jdk.internal.net.http.

Furthermore, we can find adapters in the reactor project or reactive stream package. For example, org.reactivestreams.FlowAdapters has methods for converting Flow API interfaces to Reactive Stream ones and vice-versa. Therefore it helps the interoperability between Flow API and libraries with reactive stream support.

All of these facts help us to understand the purpose of Flow API: It was created to be a group of reactive specification interfaces in JDK without relay on third parties. Moreover, Java expects Flow API to be accepted as standard interfaces for reactive specification and to be used in JDK or other Java-based libraries that implement the reactive specification for middlewares and utilities.

8. Conclusions

In this tutorial, we've got an introduction to Reactive Stream Specification, Flow API, and RxJava.

Furthermore, we've seen a practical example of Flow API and RxJava implementations for a live video stream.

But all aspects of Flow API and RxJava like Flow::Processor, Flowable::map and Flowable::flatMap or backpressure strategies are not covered here.

As always, you find the tutorial's complete code over on GitHub.

Generic bottom

I just announced the new Learn Spring course, focused on the fundamentals of Spring 5 and Spring Boot 2:

>> CHECK OUT THE COURSE
Comments are closed on this article!