1. Introduction

Ratpack is a framework built on top of the Netty engine, which allows us to quickly build HTTP applications. We’ve already covered its basic usage in previous articles. This time, we’ll show how to use its streaming API to implement reactive applications.

2. A Quick Recap on Reactive Streams

Before getting into the actual implementation, let’s first do a quick recap on what constitutes a Reactive Application. According to the original authors, such applications must have the following properties:

  • Responsive
  • Resilient
  • Elastic
  • Message Driven

So, how do Reactive Streams help us achieve any of those properties? Well, in this context, message-driven doesn’t necessarily imply the use of messaging middleware. Instead, what is actually required to address this point is asynchronous request processing and support for non-blocking backpressure.

Ratpack reactive support uses the Reactive Streams API standard for the JVM as the base for its implementation. As such, it allows interoperability with other compatible frameworks like Project Reactor and RxJava.

3. Using Ratpacks’ Streams Class

Ratpack’s Streams class provides several utility methods to create Publisher instances, which we can then use to create data processing pipelines.

A good starting point is the publish() method, which we can use to create a Publisher from any Iterable:

Publisher<String> pub = Streams.publish(Arrays.asList("hello", "hello again"));
LoggingSubscriber<String> sub = new LoggingSubscriber<String>();
pub.subscribe(sub);
sub.block();

Here, LoggingSubscriber is a test implementation of the Subscriber interface that just logs every object emitted by the Publisher. It also includes a helper method block() that, as the name suggests, blocks the caller until the publisher emits all its objects or produces an error.

Running the test case, we’ll see the expected sequence of events:

onSubscribe: sub=7311908
onNext: sub=7311908, value=hello
onNext: sub=7311908, value=hello again
onComplete: sub=7311908

Another useful method is yield(). It has a single Function parameter that receives a YieldRequest object and returns the next object to emit:

@Test
public void whenYield_thenSuccess() {
    
    Publisher<String> pub = Streams.yield((t) -> {
        return t.getRequestNum() < 5 ? "hello" : null;
    });
    
    LoggingSubscriber<String> sub = new LoggingSubscriber<String>();
    pub.subscribe(sub);
    sub.block();
    assertEquals(5, sub.getReceived());
}

The YieldRequest parameter allows us to implement logic based on the number of objects emitted so far, using its getRequestNum() method. In our example, we use this information to define the end condition, which we signal by returning a null value.

Now, let’s see how to create a Publisher for periodic events:

@Test
public void whenPeriodic_thenSuccess() {
    ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
    Publisher<String> pub = Streams.periodically(executor, Duration.ofSeconds(1), (t) -> {
        return t < 5 ? String.format("hello %d",t): null; 
    });

    LoggingSubscriber<String> sub = new LoggingSubscriber<String>();
    pub.subscribe(sub);
    sub.block();
    assertEquals(5, sub.getReceived());
}

The returned publisher uses the ScheduledExecutorService to call the producer function periodically until it returns a null value. The producer function receives an integer value that corresponds to the number of objects already emitted, which we use to terminate the stream.

4. Using TransformablePublisher

Taking a closer look at Streams’ methods, we can see that they usually return a TransformablePublisher. This interface extends Publisher with several utility methods that, much like what we find in Project Reactor’s Flux and Mono, make it easier to create complex processing pipelines from individual steps.

As an example, let’s use the map method to transform a sequence of integers into strings:

@Test
public void whenMap_thenSuccess() throws Exception {
    TransformablePublisher<String> pub = Streams.yield( t -> {
        return t.getRequestNum() < 5 ? t.getRequestNum() : null;
      })
      .map(v -> String.format("item %d", v));
    
    ExecResult<List<String>> result = ExecHarness.yieldSingle((c) -> pub.toList());
    assertTrue("should succeed", result.isSuccess());
    assertEquals("should have 5 items",5,result.getValue().size());
}

Here, the actual execution happens inside a thread pool managed by the test utility class ExecHarness. Since yieldSingle() expects a Promise, we use toList() to adapt our publisher. This method collects all results produced by the subscriber and stores them in a List.

As stated in the documentation, we must take care when using this method. Applying it to an unbounded publisher can quickly make the JVM running out of memory! To avoid this situation, we should keep its use mostly restricted to unit tests.

Besides map(), TransformablePublisher has several useful operators:

  • filter(): filter upstream objects based on a Predicate
  • take(): emits just the first n objects from the upstream Publisher
  • wiretap(): adds an observation point where we can inspect data and events as they flow through the pipeline
  • reduce(): reduce upstream objects to a single value
  • transform(): injects a regular Publisher in the stream

5. Using buffer() with Non-Compliant Publishers

In some scenarios, we must deal with a Publisher that sends more items to their subscribers than requested. To address those scenarios, Ratpack’s Streams offer a buffer() method, which keeps those extra items in memory until subscribers consume them.

To illustrate how this works, let’s create a simple non-compliant Publisher that ignored the number of requested items. Instead, it will always produce at least 5 items more than requested:

private class NonCompliantPublisher implements Publisher<Integer> {

    @Override
    public void subscribe(Subscriber<? super Integer> subscriber) {
        log.info("subscribe");
        subscriber.onSubscribe(new NonCompliantSubscription(subscriber));
    }
    
    private class NonCompliantSubscription implements Subscription {
        private Subscriber<? super Integer> subscriber;
        private int recurseLevel = 0;

        public NonCompliantSubscription(Subscriber<? super Integer> subscriber) {
            this.subscriber = subscriber;
        }

        @Override
        public void request(long n) {
            log.info("request: n={}", n);
            if ( recurseLevel > 0 ) {
               return;
            }
            recurseLevel++;
            for (int i = 0 ; i < (n + 5) ; i ++ ) {
                subscriber.onNext(i);
            }
            subscriber.onComplete();
        }

        @Override
        public void cancel() {
        }
    }
}

First, let’s test this publisher using our LoggingSubscriber. We’ll use the take() operator so it will receive just the first item

@Test
public void whenNonCompliantPublisherWithoutBuffer_thenSuccess() throws Exception {
    TransformablePublisher<Integer> pub = Streams.transformable(new NonCompliantPublisher())
      .wiretap(new LoggingAction(""))
      .take(1);
      
    LoggingSubscriber<Integer> sub = new LoggingSubscriber<>();
    pub.subscribe(sub);
    sub.block();
}

Running this test, we see that despite receiving  a cancel() request, our non-compliant publisher keeps producing new items:

RatpackStreamsUnitTest - : event=StreamEvent[DataEvent{subscriptionId=0, data=0}]
LoggingSubscriber - onNext: sub=583189145, value=0
RatpackStreamsUnitTest - : event=StreamEvent[RequestEvent{requestAmount=1, subscriptionId=0}]
NonCompliantPublisher - request: n=1
RatpackStreamsUnitTest - : event=StreamEvent[CancelEvent{subscriptionId=0}]
LoggingSubscriber - onComplete: sub=583189145
RatpackStreamsUnitTest - : event=StreamEvent[DataEvent{subscriptionId=0, data=1}]
... more expurious data event
RatpackStreamsUnitTest - : event=StreamEvent[CompletionEvent{subscriptionId=0}]
LoggingSubscriber - onComplete: sub=583189145

Now, let’s add a buffer() step in this stream. We’ll add two wiretap steps to log events before it, so its effect becomes more apparent:

@Test
public void whenNonCompliantPublisherWithBuffer_thenSuccess() throws Exception {
    TransformablePublisher<Integer> pub = Streams.transformable(new NonCompliantPublisher())
      .wiretap(new LoggingAction("before buffer"))
      .buffer()
      .wiretap(new LoggingAction("after buffer"))
      .take(1);
      
    LoggingSubscriber<Integer> sub = new LoggingSubscriber<>();
    pub.subscribe(sub);
    sub.block();
}

This time, running this code produces a different log sequence:

LoggingSubscriber - onSubscribe: sub=675852144
RatpackStreamsUnitTest - after buffer: event=StreamEvent[RequestEvent{requestAmount=1, subscriptionId=0}]
NonCompliantPublisher - subscribe
RatpackStreamsUnitTest - before buffer: event=StreamEvent[RequestEvent{requestAmount=1, subscriptionId=0}]
NonCompliantPublisher - request: n=1
RatpackStreamsUnitTest - before buffer: event=StreamEvent[DataEvent{subscriptionId=0, data=0}]
... more data events
RatpackStreamsUnitTest - before buffer: event=StreamEvent[CompletionEvent{subscriptionId=0}]
RatpackStreamsUnitTest - after buffer: event=StreamEvent[DataEvent{subscriptionId=0, data=0}]
LoggingSubscriber - onNext: sub=675852144, value=0
RatpackStreamsUnitTest - after buffer: event=StreamEvent[RequestEvent{requestAmount=1, subscriptionId=0}]
RatpackStreamsUnitTest - after buffer: event=StreamEvent[CancelEvent{subscriptionId=0}]
RatpackStreamsUnitTest - before buffer: event=StreamEvent[CancelEvent{subscriptionId=0}]
LoggingSubscriber - onComplete: sub=67585214

The “before buffer” messages show that our non-compliant publisher was able to send all values after the first call to request. Nevertheless, downstream values were still sent one by one, respecting the amount requested by the LoggingSubscriber.

6. Using batch() with Slow Subscribers

Another scenario that can decrease an application’s throughput is when downstream subscribers request data in small amounts. Our LoggingSubscriber is a good example: it requests just a single item at a time.

In real-world applications, this can lead to a lot of context switches, which will hurt the overall performance. A better approach is to request a larger number of items at a time. The batch() method allows an upstream publisher to use more efficient request sizes while allowing downstream subscribers to use smaller request sizes.

Let’s see how this works in practice. As before, we’ll start with a stream without batch:

@Test
public void whenCompliantPublisherWithoutBatch_thenSuccess() throws Exception {
    TransformablePublisher<Integer> pub = Streams.transformable(new CompliantPublisher(10))
      .wiretap(new LoggingAction(""));
      
    LoggingSubscriber<Integer> sub = new LoggingSubscriber<>();
    pub.subscribe(sub);
    sub.block();
}

Here, CompliantPublisher is just a test Publisher that produces integers up to, but excluding, the value passed to the constructor. Let’s run it to see the non-batched behavior:

CompliantPublisher - subscribe
LoggingSubscriber - onSubscribe: sub=-779393331
RatpackStreamsUnitTest - : event=StreamEvent[RequestEvent{requestAmount=1, subscriptionId=0}]
CompliantPublisher - request: requested=1, available=10
RatpackStreamsUnitTest - : event=StreamEvent[DataEvent{subscriptionId=0, data=0}]
LoggingSubscriber - onNext: sub=-779393331, value=0
... more data events omitted
CompliantPublisher - request: requested=1, available=1
RatpackStreamsUnitTest - : event=StreamEvent[CompletionEvent{subscriptionId=0}]
LoggingSubscriber - onComplete: sub=-779393331

The output shows that the producer emits values one by one. Now, let’s add step batch() to our pipeline, so the upstream publisher produces up to five items at a time:

@Test
public void whenCompliantPublisherWithBatch_thenSuccess() throws Exception {
    
    TransformablePublisher<Integer> pub = Streams.transformable(new CompliantPublisher(10))
      .wiretap(new LoggingAction("before batch"))
      .batch(5, Action.noop())
      .wiretap(new LoggingAction("after batch"));
      
    LoggingSubscriber<Integer> sub = new LoggingSubscriber<>();
    pub.subscribe(sub);
    sub.block();
}

The batch() method takes two arguments: the number of items requested on each request() call and an Action to handle discarded items, that is, items requested but not consumed. This situation can arise if there’s an error or a downstream subscriber calls cancel(). Let’s see the resulting execution log:

LoggingSubscriber - onSubscribe: sub=-1936924690
RatpackStreamsUnitTest - after batch: event=StreamEvent[RequestEvent{requestAmount=1, subscriptionId=0}]
CompliantPublisher - subscribe
RatpackStreamsUnitTest - before batch: event=StreamEvent[RequestEvent{requestAmount=5, subscriptionId=0}]
CompliantPublisher - request: requested=5, available=10
RatpackStreamsUnitTest - before batch: event=StreamEvent[DataEvent{subscriptionId=0, data=0}]
... first batch data events omitted
RatpackStreamsUnitTest - before batch: event=StreamEvent[RequestEvent{requestAmount=5, subscriptionId=0}]
CompliantPublisher - request: requested=5, available=6
RatpackStreamsUnitTest - before batch: event=StreamEvent[DataEvent{subscriptionId=0, data=5}]
... second batch data events omitted
RatpackStreamsUnitTest - before batch: event=StreamEvent[RequestEvent{requestAmount=5, subscriptionId=0}]
CompliantPublisher - request: requested=5, available=1
RatpackStreamsUnitTest - before batch: event=StreamEvent[CompletionEvent{subscriptionId=0}]
RatpackStreamsUnitTest - after batch: event=StreamEvent[DataEvent{subscriptionId=0, data=0}]
LoggingSubscriber - onNext: sub=-1936924690, value=0
RatpackStreamsUnitTest - after batch: event=StreamEvent[RequestEvent{requestAmount=1, subscriptionId=0}]
RatpackStreamsUnitTest - after batch: event=StreamEvent[DataEvent{subscriptionId=0, data=1}]
... downstream data events omitted
LoggingSubscriber - onComplete: sub=-1936924690

We can see that now the publisher gets requests for five items each time. Notice that, in this test scenario, we see two requests to the producer even before the logging subscriber gets the first item. The reason is that, in this test scenario, we have a single-threaded execution, so batch() continues to buffer items until it gets the onComplete() signal.

7. Using Streams in Web Applications

Ratpack supports using reactive streams in conjunction with its asynchronous web framework.

7.1. Receiving a Data Stream

For incoming data, the Request object available through the handler’s Context provides the getBodyStream() method, which returns a TransformablePublisher of ByteBuf objects.

From this publisher, we can build our processing pipeline:

@Bean
public Action<Chain> uploadFile() {
    
    return chain -> chain.post("upload", ctx -> {
        TransformablePublisher<? extends ByteBuf> pub = ctx.getRequest().getBodyStream();
        pub.subscribe(new Subscriber<ByteBuf>() {
            private Subscription sub;
            @Override
            public void onSubscribe(Subscription sub) {
                this.sub = sub;
                sub.request(1);
            }

            @Override
            public void onNext(ByteBuf t) {
                try {
                    ... do something useful with received data
                    sub.request(1);
                }
                finally {
                    // DO NOT FORGET to RELEASE !
                    t.release();
                }
            }

            @Override
            public void onError(Throwable t) {
                ctx.getResponse().status(500);
            }

            @Override
            public void onComplete() {
                ctx.getResponse().status(202);
            }
        }); 
    });
}

There are a couple of details to consider when implementing the subscribers. First, we must ensure that we call ByteBuf’s release() method at some point. Failing to do so will lead to memory leakage. Second, any asynchronous processing must use Ratpack’s primitives only. Those include Promise, Blocking, and similar constructs.

7.2. Sending a Data Stream

The most direct way to send data stream is to use Response.sendStream(). This method takes a ByteBuf publisher argument and sends data to the client, applying backpressure as required to avoid overflowing it:

@Bean
public Action<Chain> download() {
    return chain -> chain.get("download", ctx -> {
        ctx.getResponse().sendStream(new RandomBytesPublisher(1024,512));
    });
}

As simple as it is, there’s a downside when using this method: it won’t set by itself any header, including Content-Length, which might be an issue for clients:

$ curl -v --output data.bin http://localhost:5050/download
... request messages omitted
< HTTP/1.1 200 OK
< transfer-encoding: chunked
... download progress messages omitted

Alternatively, a better method is to use the handle’s Context render() method, passing a ResponseChunks object. In this case, the response will use the “chunked’ transfer encoding method. The most straightforward way to create a ResponseChunks instance is through one of the static methods available in this class:

@Bean
public Action<Chain> downloadChunks() {
    return chain -> chain.get("downloadChunks", ctx -> {
        ctx.render(ResponseChunks.bufferChunks("application/octetstream",
          new RandomBytesPublisher(1024,512)));
    });
}

With this change, the response now includes the content-type header:

$ curl -v --output data.bin http://localhost:5050/downloadChunks
... request messages omitted
< HTTP/1.1 200 OK
< transfer-encoding: chunked
< content-type: application/octetstream
<
... progress messages omitted

7.3. Using Server-Side Events

Support for Server-Side Events (SSE) also uses the render() method. In this case, however, we use ServerSentEvents to adapt items coming from a Producer to Event objects that include some metadata along with an event payload:

@Bean
public Action<Chain> quotes() {
    ServerSentEvents sse = ServerSentEvents.serverSentEvents(quotesService.newTicker(), (evt) -> {
        evt
          .id(Long.toString(idSeq.incrementAndGet()))
          .event("quote")
          .data( q -> q.toString());
    });
    
    return chain -> chain.get("quotes", ctx -> ctx.render(sse));
}

Here, QuotesService is just a sample service that creates a Publisher that produces random quotes at regular intervals. The second argument is a function that prepares the event for sending. This includes adding an id, an event type, and the payload itself.

We can use curl to test this method, yielding an output that shows a sequence of random quotes, along with event metadata:

$ curl -v http://localhost:5050/quotes
... request messages omitted
< HTTP/1.1 200 OK
< content-type: text/event-stream;charset=UTF-8
< transfer-encoding: chunked
... other response headers omitted
id: 10
event: quote
data: Quote [ts=2021-10-11T01:20:52.081Z, symbol=ORCL, value=53.0]

... more quotes

7.4. Broadcasting Websocket Data

We can pipe data from any Publisher to a WebSocket connection using Websockets.websocketBroadcast():

@Bean
public Action<Chain> quotesWS() {
    Publisher<String> pub = Streams.transformable(quotesService.newTicker())
      .map(Quote::toString);
    return chain -> chain.get("quotes-ws", ctx -> WebSockets.websocketBroadcast(ctx, pub));
}

Here, we use the same QuotesService we’ve seen before as the event source for broadcasting quotes to clients. Let’s use curl again to simulate a WebSocket client:

$ curl --include -v \
     --no-buffer \
     --header "Connection: Upgrade" \
     --header "Upgrade: websocket" \
     --header "Sec-WebSocket-Key: SGVsbG8sIHdvcmxkIQ==" \
     --header "Sec-WebSocket-Version: 13" \
     http://localhost:5050/quotes-ws
... request messages omitted
< HTTP/1.1 101 Switching Protocols
HTTP/1.1 101 Switching Protocols
< upgrade: websocket
upgrade: websocket
< connection: upgrade
connection: upgrade
< sec-websocket-accept: qGEgH3En71di5rrssAZTmtRTyFk=
sec-websocket-accept: qGEgH3En71di5rrssAZTmtRTyFk=

<
<Quote [ts=2021-10-11T01:39:42.915Z, symbol=ORCL, value=63.0]
... more quotes omitted

8. Conclusion

In this article, we’ve explored Ratpack’s support for reactive streams and how to apply it in different scenarios.

As usual, the full source code of the examples can be found over on GitHub.

Course – LS (cat=Java)

Get started with Spring and Spring Boot, through the Learn Spring course:

>> CHECK OUT THE COURSE
res – Junit (guide) (cat=Reactive)
Comments are open for 30 days after publishing a post. For any issues past this date, use the Contact form on the site.