Expand Authors Top

If you have a few years of experience in the Java ecosystem and you’d like to share that with the community, have a look at our Contribution Guidelines.

November Discount Launch 2022 – Top
We’re finally running a Black Friday launch. All Courses are 30% off until next Friday:

>> GET ACCESS NOW

Expanded Audience – Frontegg – Security (partner)
announcement - icon User management is very complex, when implemented properly. No surprise here.

Not having to roll all of that out manually, but instead integrating a mature, fully-fledged solution - yeah, that makes a lot of sense.
That's basically what Frontegg is - User Management for your application. It's focused on making your app scalable, secure and enjoyable for your users.
From signup to authentication, it supports simple scenarios all the way to complex and custom application logic.

Have a look:

>> Elegant User Management, Tailor-made for B2B SaaS

NPI – Lightrun – Spring (partner)

We rely on other people’s code in our own work. Every day. It might be the language you’re writing in, the framework you’re building on, or some esoteric piece of software that does one thing so well you never found the need to implement it yourself.

The problem is, of course, when things fall apart in production - debugging the implementation of a 3rd party library you have no intimate knowledge of is, to say the least, tricky. It’s difficult to understand what talks to what and, specifically, which part of the underlying library is at fault.

Lightrun is a new kind of debugger.

It's one geared specifically towards real-life production environments. Using Lightrun, you can drill down into running applications, including 3rd party dependencies, with real-time logs, snapshots, and metrics. No hotfixes, redeployments, or restarts required.

Learn more in this quick, 5-minute Lightrun tutorial:

>> The Essential List of Spring Boot Annotations and Their Use Cases

1. Overview

In this tutorial, we'll deep dive into Java reactive programming to solve an interesting problem of how to read Flux<DataBuffer> into a single InputStream.

2. Request Setup

As a first step to solving the problem of reading Flux<DataBuffer> into a single InputStream, we'll use the Spring reactive WebClient for making a GET request. Further, we can use one of the public API endpoints hosted by gorest.co.in for such testing scenarios:

String REQUEST_ENDPOINT = "https://gorest.co.in/public/v2/users";

Next, let's define the getWebClient() method for getting a new instance of the WebClient class:

static WebClient getWebClient() {
    WebClient.Builder webClientBuilder = WebClient.builder();
    return webClientBuilder.build();
}

At this point, we're ready to make a GET request to the /public/v2/users endpoint. However, we must get the response body as a Flux<DataBuffer> object. So, let's move on to the next section about BodyExtractors to do precisely this.

3. BodyExtractors and DataBufferUtils

We can use the toDataBuffers() method of the BodyExtractors class available in spring-webflux to extract the response body into Flux<DataBuffer>.

Let's go ahead and create body as an instance of Flux<DataBuffer> type:

Flux<DataBuffer> body = client
  .get(
  .uri(REQUEST_ENDPOINT)
    .exchangeToFlux( clientResponse -> {
        return clientResponse.body(BodyExtractors.toDataBuffers());
    });

Next, as we require to collect these streams of DataBuffer into a single InputStream, a good strategy to achieve this is by using PipedInputStream and PipedOutputStream.

Further, we intend to write to the PipedOutputStream and eventually read from the PipedInputStream. So, let's see how we can create these two connected streams:

PipedOutputStream outputStream = new PipedOutputStream();
PipedInputStream inputStream = new PipedInputStream(1024*10);
inputStream.connect(outputStream);

We must note that the default size is 1024 bytes. However, we expect that the collected result from the Flux<DataBuffer> could exceed the default value. Therefore, we need to explicitly specify a larger value for the size, which in this case is 1024*10.

Finally, we use the write() utility method available in the DataBufferUtils class for writing body as a publisher to outputStream:

DataBufferUtils.write(body, outputStream).subscribe();

We must note that we connected inputStream to outputStream at the time of declaration. So, we're good to read from inputStream. Let's move on to the next section to see this in action.

4. Reading From the PipedInputStream

First, let's defined a helper method, readContent(), to read an InputStream as a String object:

String readContent(InputStream stream) throws IOException {
    StringBuffer contentStringBuffer = new StringBuffer();
    byte[] tmp = new byte[stream.available()];
    int byteCount = stream.read(tmp, 0, tmp.length);
    contentStringBuffer.append(new String(tmp));
    return String.valueOf(contentStringBuffer);
}

Next, because it's a typical practice to read the PipedInputStream in a different thread, let's create the readContentFromPipedInputStream() method  that internally spawns a new thread to read contents from the PipedInputStream into a String object by calling the readContent() method:

String readContentFromPipedInputStream(PipedInputStream stream) throws IOException {
    StringBuffer contentStringBuffer = new StringBuffer();
    try {
        Thread pipeReader = new Thread(() -> {
            try {
                contentStringBuffer.append(readContent(stream));
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        });
        pipeReader.start();
        pipeReader.join();
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    } finally {
        stream.close();
    }

    return String.valueOf(contentStringBuffer);
}

At this stage, our code is ready to use for a simulation. Let's see it in action:

WebClient webClient = getWebClient();
InputStream inputStream = getResponseAsInputStream(webClient, REQUEST_ENDPOINT);
Thread.sleep(3000);
String content = readContentFromPipedInputStream((PipedInputStream) inputStream);
logger.info("response content: \n{}", content.replace("}","}\n"));

As we're dealing with an asynchronous system, we're delaying the read by an arbitrary 3 secs before reading from the stream so that we're able to see the complete response. Additionally, at the time of logging, we're inserting a newline character to break the long output to multiple lines.

Finally, let's verify the output generated by the code execution:

20:45:04.120 [main] INFO com.baeldung.databuffer.DataBufferToInputStream - response content: 
[{"id":2642,"name":"Bhupen Trivedi","email":"[email protected]","gender":"male","status":"active"}
,{"id":2637,"name":"Preity Patel","email":"[email protected]","gender":"female","status":"inactive"}
,{"id":2633,"name":"Brijesh Shah","email":"[email protected]","gender":"male","status":"inactive"}
...
,{"id":2623,"name":"Mohini Mishra","email":"[email protected]","gender":"female","status":"inactive"}
]

That's it! It looks like we've got it all right.

5. Conclusion

In this article, we used the concept of piped streams and the utility methods available in the BodyExtractors and DataBufferUtils classes to read Flux<DataBuffer> into a single InputStream.

As always, the complete source code for the tutorial is available over on GitHub.

November Discount Launch 2022 – Bottom
We’re finally running a Black Friday launch. All Courses are 30% off until next Friday:

>> GET ACCESS NOW

Generic footer banner
Comments are closed on this article!