Course – LS – All
announcement - icon

Get started with Spring Boot and with core Spring, through the Learn Spring course:

>> CHECK OUT THE COURSE

1. Introduction

Parallel-collectors is a small library that provides a set of Java Stream API collectors that enable parallel collection processing but without the drawbacks of the standard parallel Stream API, which was designed for processing CPU-bound tasks.

2. Maven Dependencies

If we want to start using the library, we need to add a single entry in Maven’s pom.xml file:

<dependency>
    <groupId>com.pivovarit</groupId>
    <artifactId>parallel-collectors</artifactId>
    <version>2.6.0</version>
</dependency>

Or a single line in Gradle’s build file:

compile 'com.pivovarit:parallel-collectors:2.6.0'

The newest version can be found on Maven Central.

3. Parallel Streams Caveats

Parallel Streams were one of Java 8’s highlights, but they turned out to be applicable to heavy CPU processing exclusively.

The reason for this was the fact that Parallel Streams were internally backed by a JVM-wide shared ForkJoinPool, which provided limited parallelism and was used by all Parallel Streams running on a single JVM instance.

For example, imagine we have a list of IDs, and we want to use them to fetch a list of users. Assuming that the operation is expensive and actually worth parallelizing, we may want to use Parallel Streams to achieve our objective:

List<Integer> ids = Arrays.asList(1, 2, 3); 
List<String> results = ids.parallelStream() 
  .map(i -> fetchById(i)) // each operation takes one second
  .collect(Collectors.toList()); 

System.out.println(results); // [user-1, user-2, user-3]

And indeed, we can see that there’s a noticeable speedup. But it becomes problematic if we start running multiple parallel blocking operations… in parallel. This might quickly saturate the pool and result in potentially huge latencies. That’s why it’s important to build bulkheads by creating separate thread pools – to prevent unrelated tasks from influencing each other’s execution.

To provide a custom ForkJoinPool instance, we could leverage the trick described here, but this approach relied on an undocumented hack and was faulty until JDK10. We can read more in the issue itself – [JDK8190974].

4. Parallel Collectors in Action

Parallel Collectors, as the name suggests, are just standard Stream API Collectors that allow performing additional operations in parallel at the collect() phase.

The ParallelCollectors class (which mirrors the standard Collectors class) is a facade providing access to the whole functionality of the library.

If we wanted to redo the above example, we could simply write:

ExecutorService executor = Executors.newFixedThreadPool(10);

List<Integer> ids = Arrays.asList(1, 2, 3);

CompletableFuture<List<String>> results = ids.stream()
  .collect(ParallelCollectors.parallel(i -> fetchById(i), toList(), executor, 4));

System.out.println(results.join()); // [user-1, user-2, user-3]

The result is the same. However, we were able to provide our custom thread pool and specify our custom parallelism level, and the result arrived wrapped in a CompletableFuture instance without blocking the current thread. 

Standard Parallel Streams, on the other hand, couldn’t achieve any of those.

4.1. Collect in Parallel Using Standard Collectors

As intuitive as it gets, if we want to process a Stream in parallel and collect results, we can simply use ParallelCollectors.parallel and provide a desired Collector, just like with standard Stream API:

List ids = Arrays.asList(1, 2, 3);

CompletableFuture<List> results = ids.stream()
  .collect(parallel(i -> fetchById(i), toList(), executor, 4));

assertThat(results.join()).containsExactly("user-1", "user-2", "user-3");

4.2. Collect in Parallel to Stream

If previously mentioned API methods aren’t flexible enough, we can always collect all items into a Stream instance, and then process it just like any other Stream instance inside a CompletableFuture:

List ids = Arrays.asList(1, 2, 3);

CompletableFuture<Stream> results = ids.stream()
  .collect(parallel(i -> fetchById(i), executor, 4));

assertThat(results.join()).containsExactly("user-1", "user-2", "user-3");

4.3. ParallelCollectors.parallelToStream()

The above examples focused on use cases where it was desired to receive a result wrapped in a CompletableFuture, but if we just want to block the calling thread and process results in completion order, we can go for parallelToStream():

List ids = Arrays.asList(1, 2, 3);

Stream result = ids.stream()
  .collect(parallelToStream(i -> fetchByIdWithRandomDelay(i), executor, 4));

assertThat(result).contains("user-1", "user-2", "user-3");

In this case, we can expect the collector to return different results each time since we introduced a random processing delay. Hence, we included the contains() assertions in our test.

4.4. ParallelCollectors.parallelToOrderedStream()

If we want to ensure that elements are processed in the original order, we can leverage parallelToOrderedStream():

List ids = Arrays.asList(1, 2, 3);

Stream result = ids.stream()
  .collect(parallelToOrderedStream(ParallelCollectorsUnitTest::fetchByIdWithRandomDelay, executor, 4));

assertThat(result).containsExactly("user-1", "user-2", "user-3");

In this case, the collector will always maintain the order but might be slower than the above.

5. Limitations

At the point of writing, parallel-collectors don’t work with infinite streams even if short-circuiting operations are used – it’s a design limitation imposed by Stream API internals. Simply put, Streams treat collectors as non-short-circuiting operations, so the stream needs to process all upstream elements before getting terminated.

The other limitation is that short-circuiting operations don’t interrupt the remaining tasks after short-circuiting – this is one of the limitations of CompletableFuture that doesn’t propagate interruptions to executing threads.

6. Conclusion

We saw how the parallel-collectors library allows us to perform parallel processing by using custom Java Stream API Collectors and CompletableFutures to utilize custom thread pools, parallelism, and the non-blocking style of CompletableFutures.

As always, code snippets are available over on GitHub.

For further reading, see the parallel-collectors library on GitHub, the author’s blog, and the author’s Twitter account.

Course – LS – All
announcement - icon

Get started with Spring Boot and with core Spring, through the Learn Spring course:

>> CHECK OUT THE COURSE

res – REST with Spring (eBook) (everywhere)
Comments are open for 30 days after publishing a post. For any issues past this date, use the Contact form on the site.