Partner – Microsoft – NPI (cat= Spring)
announcement - icon

Azure Spring Apps is a fully managed service from Microsoft (built in collaboration with VMware), focused on building and deploying Spring Boot applications on Azure Cloud without worrying about Kubernetes.

And, the Enterprise plan comes with some interesting features, such as commercial Spring runtime support, a 99.95% SLA and some deep discounts (up to 47%) when you are ready for production.

>> Learn more and deploy your first Spring Boot app to Azure.

You can also ask questions and leave feedback on the Azure Spring Apps GitHub page.

1. Overview

In this tutorial, we’ll deep dive into Java reactive programming to solve an interesting problem of how to read Flux<DataBuffer> into a single InputStream.

2. Request Setup

As a first step to solving the problem of reading Flux<DataBuffer> into a single InputStream, we’ll use the Spring reactive WebClient for making a GET request. Further, we can use one of the public API endpoints hosted by gorest.co.in for such testing scenarios:

String REQUEST_ENDPOINT = "https://gorest.co.in/public/v2/users";

Next, let’s define the getWebClient() method for getting a new instance of the WebClient class:

static WebClient getWebClient() {
    WebClient.Builder webClientBuilder = WebClient.builder();
    return webClientBuilder.build();
}

At this point, we’re ready to make a GET request to the /public/v2/users endpoint. However, we must get the response body as a Flux<DataBuffer> object. So, let’s move on to the next section about BodyExtractors to do precisely this.

3. BodyExtractors and DataBufferUtils

We can use the toDataBuffers() method of the BodyExtractors class available in spring-webflux to extract the response body into Flux<DataBuffer>.

Let’s go ahead and create body as an instance of Flux<DataBuffer> type:

Flux<DataBuffer> body = client
  .get(
  .uri(REQUEST_ENDPOINT)
    .exchangeToFlux( clientResponse -> {
        return clientResponse.body(BodyExtractors.toDataBuffers());
    });

Next, as we require to collect these streams of DataBuffer into a single InputStream, a good strategy to achieve this is by using PipedInputStream and PipedOutputStream.

Further, we intend to write to the PipedOutputStream and eventually read from the PipedInputStream. So, let’s see how we can create these two connected streams:

PipedOutputStream outputStream = new PipedOutputStream();
PipedInputStream inputStream = new PipedInputStream(1024*10);
inputStream.connect(outputStream);

We must note that the default size is 1024 bytes. However, we expect that the collected result from the Flux<DataBuffer> could exceed the default value. Therefore, we need to explicitly specify a larger value for the size, which in this case is 1024*10.

Finally, we use the write() utility method available in the DataBufferUtils class for writing body as a publisher to outputStream:

DataBufferUtils.write(body, outputStream).subscribe();

We must note that we connected inputStream to outputStream at the time of declaration. So, we’re good to read from inputStream. Let’s move on to the next section to see this in action.

4. Reading From the PipedInputStream

First, let’s defined a helper method, readContent(), to read an InputStream as a String object:

String readContent(InputStream stream) throws IOException {
    StringBuffer contentStringBuffer = new StringBuffer();
    byte[] tmp = new byte[stream.available()];
    int byteCount = stream.read(tmp, 0, tmp.length);
    contentStringBuffer.append(new String(tmp));
    return String.valueOf(contentStringBuffer);
}

Next, because it’s a typical practice to read the PipedInputStream in a different thread, let’s create the readContentFromPipedInputStream() method  that internally spawns a new thread to read contents from the PipedInputStream into a String object by calling the readContent() method:

String readContentFromPipedInputStream(PipedInputStream stream) throws IOException {
    StringBuffer contentStringBuffer = new StringBuffer();
    try {
        Thread pipeReader = new Thread(() -> {
            try {
                contentStringBuffer.append(readContent(stream));
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        });
        pipeReader.start();
        pipeReader.join();
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    } finally {
        stream.close();
    }

    return String.valueOf(contentStringBuffer);
}

At this stage, our code is ready to use for a simulation. Let’s see it in action:

WebClient webClient = getWebClient();
InputStream inputStream = getResponseAsInputStream(webClient, REQUEST_ENDPOINT);
Thread.sleep(3000);
String content = readContentFromPipedInputStream((PipedInputStream) inputStream);
logger.info("response content: \n{}", content.replace("}","}\n"));

As we’re dealing with an asynchronous system, we’re delaying the read by an arbitrary 3 secs before reading from the stream so that we’re able to see the complete response. Additionally, at the time of logging, we’re inserting a newline character to break the long output to multiple lines.

Finally, let’s verify the output generated by the code execution:

20:45:04.120 [main] INFO com.baeldung.databuffer.DataBufferToInputStream - response content: 
[{"id":2642,"name":"Bhupen Trivedi","email":"[email protected]","gender":"male","status":"active"}
,{"id":2637,"name":"Preity Patel","email":"[email protected]","gender":"female","status":"inactive"}
,{"id":2633,"name":"Brijesh Shah","email":"[email protected]","gender":"male","status":"inactive"}
...
,{"id":2623,"name":"Mohini Mishra","email":"[email protected]","gender":"female","status":"inactive"}
]

That’s it! It looks like we’ve got it all right.

5. Conclusion

In this article, we used the concept of piped streams and the utility methods available in the BodyExtractors and DataBufferUtils classes to read Flux<DataBuffer> into a single InputStream.

As always, the complete source code for the tutorial is available over on GitHub.

Course – LS (cat=Spring)

Get started with Spring and Spring Boot, through the Learn Spring course:

>> THE COURSE
res – Junit (guide) (cat=Reactive)
Comments are open for 30 days after publishing a post. For any issues past this date, use the Contact form on the site.