Let's get started with a Microservice Architecture with Spring Cloud:
Convert DataBuffer to Mono in Reactor
Last updated: December 21, 2025
1. Introduction
In modern Java applications, Project Reactor and the Spring WebFlux framework have made stream processing and non-blocking I/O the standard for building scalable microservices. When handling large binary payloads, such as file uploads or image transfers, WebFlux defaults to streaming the data as a Flux<DataBuffer>.
While it’s efficient for backpressure and low memory consumption, a Flux<DataBuffer> is often not the format we need for final processing, especially when interacting with APIs or libraries that require the entire binary content as a standard Java byte[] array.
In this article, we’ll explore the underlying concepts of DataBuffer and Flux and walk through a strategy for efficiently converting a stream of DataBuffer chunks into a single Mono<byte[]>.
2. Project Setup
To start, we’ll add the spring-boot-starter-webflux dependency. It includes Project Reactor (reactor-core), Spring WebFlux, and the necessary data buffer utilities:
<dependency>
<groupId>org.springframework.boot</groupId
<artifactId>spring-boot-starter-webflux</artifactId
<version>4.0.0</version>
</dependency>
3. Understanding Reactor and Data Buffers
Before we implement the conversion, we must understand the three components involved: DataBuffer, Flex, and our target data type Mono<byte[]>.
3.1. What Is a Data Buffer?
In Reactive programming, a DataBuffer is an abstraction over a continuous segment of memory, typically a byte array. Unlike a simple byte[], a DataBuffer often uses pooled memory to reduce garbage collection overhead, making it efficient for high-throughput applications.
It acts as a lightweight wrapper that holds raw bytes and tracks the read/write index, allowing components to read parts of the data without needing to know the underlying memory management details.
3.2. Role of Flux<DataBuffer> in WebFlux
In WebFlux, large payloads, such as file uploads or large JSON responses, are not read into memory all at once. Instead, they are streamed as a sequence of chunks, each wrapped in a DataBuffer. This streaming behavior is represented by a Flux<DataBuffer>.
Using a Flux allows for backpressure, where the consumer can signal to the producer (e.g., the underlying TCP connection) how fast it can handle data, preventing memory exhaustion. This approach is fundamental to building scalable, non-blocking applications.
3.3. Why Do We Need Mono<byte[]>?
Our desired output, Mono<byte[]>, signifies a single, eventual result (the complete byte array) wrapped within a reactive publisher. The byte[] contains the entire binary payload, aggregated from all the individual DataBuffer chunks.
While the Flux<DataBuffer> is excellent for streaming, the Mono<byte[]> is necessary when the entire payload is needed at once, such as calculating a file hash, saving the content to a blocking API, or deserializing an entire object in one go.
4. The Conversion Strategy and Implementation
The conversion process is conceptually simple but must be performed correctly using reactive operators to ensure non-blocking execution and proper resource management. The strategy involves two main steps:
4.1. Aggregating the Flux Stream
Since the Flux emits multiple DataBuffer objects, the first step is to combine them into a single, contiguous buffer. The static method DataBufferUtils.join() will help us achieve it. This method takes a Flux<DataBuffer> and returns a Mono<DataBuffer>. It handles the iterative process of reading bytes from each buffer in the stream and writing them into a new, single, large DataBuffer until the source Flux completes.
Memory management is crucial in reactive programming. Data buffers are often allocated from pools and must be explicitly released to prevent memory leaks and pool exhaustion.
The good news is that the DataBufferUtils.join() method is designed to manage this process. It automatically releases the individual buffers from the source Flux as they are aggregated. The resulting single Mono<DataBuffer> is then ready for us to use, but we are still responsible for releasing that final aggregated buffer.
4.2. Extracting the Final byte[] Array
The final step is to take the single, aggregated DataBuffer from the resulting Mono and convert its contents into a standard Java byte[]. We achieve this by mapping the Mono<DataBuffer> using the DataBufferUtils.toByteArray() method.
This utility efficiently extracts the bytes from the final buffer. We’ll use the doOnTerminate or doFinally operators to ensure the memory for the final aggregated buffer is released after the conversion is complete, regardless of success or failure.
We’ll now follow the above three steps to create the complete, robust, non-blocking utility method:
public class DataBufferConverter {
public Mono<byte[]> toByteArray(Flux<DataBuffer> data) {
return DataBufferUtils
.join(data)
.flatMap(dataBuffer -> {
try {
byte[] bytes = new byte[dataBuffer.readableByteCount()];
dataBuffer.read(bytes);
return Mono.just(bytes);
} finally {
DataBufferUtils.release(dataBuffer);
}
});
}
}
This utility uses DataBufferUtils.join() for aggregation and wraps the final extraction in a try-finally block to guarantee the final buffer’s release after conversion to byte[].
5. Testing the Conversion Flow
We’ll now write a simple JUnit test to verify that the byte array is correctly reconstructed from the chunked Flux. We’ll use DefaultDataBufferFactory to simulate the reactive stream environment:
public class DataBufferConverterTest {
private final DataBufferConverter converter = new DataBufferConverter();
private final DefaultDataBufferFactory factory = new DefaultDataBufferFactory();
private final String TEST_CONTENT = "This is a long test string.";
@Test
void givenFluxOfDataBuffers_whenConvertedToByteArray_thenContentMatches() {
// Setup: First, we'll manually create two DataBuffer chunks for the input Flux
byte[] part1 = "This is a ".getBytes();
byte[] part2 = "long test string.".getBytes();
DataBuffer buffer1 = factory.allocateBuffer(part1.length);
buffer1.write(part1);
DataBuffer buffer2 = factory.allocateBuffer(part2.length);
buffer2.write(part2);
Flux<DataBuffer> sourceFlux = Flux.just(buffer1, buffer2);
// Act & Assert: Here we perform conversion and block for direct assertion
byte[] resultBytes = converter.toByteArray(sourceFlux).block();
byte[] expectedBytes = TEST_CONTENT.getBytes();
assertArrayEquals(expectedBytes, resultBytes, "The reconstructed byte array should match original");
}
}
Here, we manually created two chunked DataBuffer segments and sent them through the converter. We then performed a synchronous assertion to verify that the reconstructed byte[] content matches the original.
6. Conclusion
In this article, we first explored the fundamental difference between the streaming nature of Flux<DataBuffer> and our target, the complete payload represented by Mono<byte[]>.
We detailed the three-step conversion strategy involving aggregation, buffer handling, and final extraction. Finally, we implemented a robust utility method using DataBufferUtils.join(). Next, we ensured proper memory management by manually releasing the final aggregated DataBuffer in the flatMap operator.
This clean, non-blocking pattern allows us to seamlessly integrate high-performance reactive data streams with traditional APIs requiring a single byte[] payload.
As always, the implementation’s source code is available over on GitHub.















