Java Top

I just announced the new Learn Spring course, focused on the fundamentals of Spring 5 and Spring Boot 2:

>> CHECK OUT THE COURSE

1. Overview

In this tutorial, we're going to learn how to use the Reactive HTTP client from Jetty. We'll be demonstrating its usage with different Reactive libraries by creating small test cases.

2. What is Reactive HttpClient?

Jetty's HttpClient allows us to perform blocking HTTP requests. When we're dealing with a Reactive API however, we can't use the standard HTTP client. To fill this gap, Jetty has created a wrapper around the HttpClient APIs so that it also supports the ReactiveStreams API.

The Reactive HttpClient is used to either consume or produce a stream of data over HTTP calls.

The example we're going to demonstrate here will have a Reactive HTTP client, which will communicate with a Jetty server using different Reactive libraries. We'll also talk about the request and response events provided by Reactive HttpClient.

We recommend reading our articles on Project Reactor, RxJava, and Spring WebFlux to get a better understanding of Reactive programming concepts and its terminologies.

3. Maven Dependencies

Let's start the example by adding dependencies for Reactive Streams, Project Reactor, RxJava, Spring WebFlux, and Jetty's Reactive HTTPClient to our pom.xml. Along with these, we'll be adding the dependency of Jetty Server as well for the server creation:

<dependency>
    <groupId>org.eclipse.jetty</groupId>
    <artifactId>jetty-reactive-httpclient</artifactId>
    <version>1.0.3</version>
</dependency>
<dependency>
    <groupId>org.eclipse.jetty</groupId>
    <artifactId>jetty-server</artifactId>
    <version>9.4.19.v20190610</version>
</dependency>
<dependency>
    <groupId>org.reactivestreams</groupId>
    <artifactId>reactive-streams</artifactId>
    <version>1.0.3</version>
</dependency>
<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-core</artifactId>
    <version>3.2.12.RELEASE</version>
</dependency>
<dependency>
    <groupId>io.reactivex.rxjava2</groupId>
    <artifactId>rxjava</artifactId>
    <version>2.2.11</version>
</dependency>
<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-webflux</artifactId>
    <version>5.1.9.RELEASE</version>
</dependency>

4. Creating the Server and the Client

Now let's create a server and add a request handler that simply writes the request body to the response:

public class RequestHandler extends AbstractHandler {
    @Override
    public void handle(String target, Request jettyRequest, HttpServletRequest request,
      HttpServletResponse response) throws IOException, ServletException {
        jettyRequest.setHandled(true);
        response.setContentType(request.getContentType());
        IO.copy(request.getInputStream(), response.getOutputStream());
    }
}

...

Server server = new Server(8080);
server.setHandler(new RequestHandler());
server.start();

And then we can write the HttpClient:

HttpClient httpClient = new HttpClient();
httpClient.start();

Now that we've created the client and server, let's see how we can transform this blocking HTTP Client into a non-blocking one and create the request:

Request request = httpClient.newRequest("http://localhost:8080/"); 
ReactiveRequest reactiveRequest = ReactiveRequest.newBuilder(request).build();
Publisher<ReactiveResponse> publisher = reactiveRequest.response();

So here, the ReactiveRequest wrapper provided by the Jetty made our blocking HTTP Client reactive. Let's proceed and see its usage with different reactive libraries.

5. ReactiveStreams Usage

Jetty's HttpClient natively supports Reactive Streams, so let's begin there.

Now, Reactive Streams is just a set of interfaces, so, for our testing, let's implement a simple blocking subscriber:

public class BlockingSubscriber implements Subscriber<ReactiveResponse> {
    BlockingQueue<ReactiveResponse> sink = new LinkedBlockingQueue<>(1);

    @Override
    public void onSubscribe(Subscription subscription) { 
        subscription.request(1); 
    }
  
    @Override 
    public void onNext(ReactiveResponse response) { 
        sink.offer(response);
    } 
   
    @Override 
    public void onError(Throwable failure) { } 

    @Override 
    public void onComplete() { }

    public ReactiveResponse block() throws InterruptedException {
        return sink.poll(5, TimeUnit.SECONDS);
    }   
}

Note that we needed to call Subscription#request as per the JavaDoc which states that “No events will be sent by a Publisher until demand is signaled via this method.” 

Also, note that we've added a safety mechanism so that our test can bail out if it hasn't seen the value in 5 seconds.

And now, we can quickly test our HTTP request:

BlockingSubscriber subscriber = new BlockingSubscriber();
publisher.subscribe(subscriber);
ReactiveResponse response = subscriber.block();
Assert.assertNotNull(response);
Assert.assertEquals(response.getStatus(), HttpStatus.OK_200);

6. Project Reactor Usage

Let’s now see how we can use the Reactive HttpClient with the Project Reactor. The publisher creation is pretty much the same as in the previous section.

After the publisher creation, let's use the Mono class from Project Reactor to get a reactive response:

ReactiveResponse response = Mono.from(publisher).block();

And then, we can test the resulting response:

Assert.assertNotNull(response);
Assert.assertEquals(response.getStatus(), HttpStatus.OK_200);

6.1. Spring WebFlux Usage

The conversion of the blocking HTTP Client into a reactive one is easy when used with Spring WebFlux. The Spring WebFlux ships with a reactive client, WebClient,  that can be used with various HTTP Client libraries. We can use this as an alternative to using straight Project Reactor code.

So first, let's wrap the Jetty's HTTP Client using JettyClientHttpConnector to bond it with the WebClient:

ClientHttpConnector clientConnector = new JettyClientHttpConnector(httpClient);

And then pass this connector to the WebClient to perform the non-blocking HTTP requests:

WebClient client = WebClient.builder().clientConnector(clientConnector).build();

Next, let's do the actual HTTP call with the Reactive HTTP Client that we just created and test the result:

String responseContent = client.post()
  .uri("http://localhost:8080/").contentType(MediaType.TEXT_PLAIN)
  .body(BodyInserters.fromPublisher(Mono.just("Hello World!"), String.class))
  .retrieve()
  .bodyToMono(String.class)
  .block();
Assert.assertNotNull(responseContent);
Assert.assertEquals("Hello World!", responseContent);

7. RxJava2 Usage

Let's now move on and see how the Reactive HTTP client is used with RxJava2

While we're here, let's mutate our example just a bit to now include a body in the request:

ReactiveRequest reactiveRequest = ReactiveRequest.newBuilder(request)
  .content(ReactiveRequest.Content
    .fromString("Hello World!", "text/plain", StandardCharsets.UTF_8))
  .build();
Publisher<String> publisher = reactiveRequest
  .response(ReactiveResponse.Content.asString());

The code ReactiveResponse.Content.asString() converts the response body to a string. It is also possible to discard the response using the ReactiveResponse.Content.discard() method if we're only interested in the status of the request.

Now, we can see that getting a response using RxJava2 is actually quite similar to Project Reactor. Basically, we just use Single instead of Mono:

String responseContent = Single.fromPublisher(publisher)
  .blockingGet();

Assert.assertEquals("Hello World!", responseContent);

8. Request and Response Events

The Reactive HTTP client emits a number of events during the execution. They are categorized as request events and response events. These events are helpful to peek into the lifecycle of a Reactive HTTP client.

This time, let's make our reactive request slightly differently by using the HTTP Client instead of the request:

ReactiveRequest request = ReactiveRequest.newBuilder(httpClient, "http://localhost:8080/")
  .content(ReactiveRequest.Content.fromString("Hello World!", "text/plain", UTF_8))
  .build();

And now let's get a Publisher of HTTP request events:

Publisher<ReactiveRequest.Event> requestEvents = request.requestEvents();

Now, let's use RxJava once again. This time, we'll create a list, that holds the event types, and populate it by subscribing to the request events as they happen:

List<Type> requestEventTypes = new ArrayList<>();

Flowable.fromPublisher(requestEvents)
  .map(ReactiveRequest.Event::getType).subscribe(requestEventTypes::add);
Single<ReactiveResponse> response = Single.fromPublisher(request.response());

Then, since we're in a test, we can block our response and verify:

int actualStatus = response.blockingGet().getStatus();

Assert.assertEquals(6, requestEventTypes.size());
Assert.assertEquals(HttpStatus.OK_200, actualStatus);

Similarly, we can subscribe to the response events as well. Since they are similar to the request event subscription, we've added only the latter here. The complete implementation with both request and response events can be found in the GitHub repository, linked at the end of this article.

9. Conclusion

In this tutorial, we've learned about the ReactiveStreams HttpClient provided by Jetty, its usage with the various Reactive libraries and the lifecycle events associated with a Reactive request.

All of the code snippets, mentioned in the article, can be found in our GitHub repository.

Java bottom

I just announced the new Learn Spring course, focused on the fundamentals of Spring 5 and Spring Boot 2:

>> CHECK OUT THE COURSE
Comments are closed on this article!