Course – LS – All

Get started with Spring and Spring Boot, through the Learn Spring course:

>> CHECK OUT THE COURSE

1. Overview

In this tutorial, we’ll explore various techniques for partitioning a Java 8 Stream based on a fixed maximum size.

We’ll start by revisiting how to accomplish this with Lists. Subsequently, we’ll enhance our approach by incorporating Stream-specific functionalities, such as lazy evaluation and thread safety.

2. Partitioning a List

There are various ways of partitioning a List in Java. One easy way of doing it would be to start by determining the desired number of batches based on the desired batch size and the size of the source list:

static <T> Stream<List<T>> partitionList(List<T> source, int batchSize) {
    int nrOfFullBatches = (source.size() - 1) / batchSize;
    // ...
}

To partition the source list into smaller sub-lists, our initial step involves computing the indices that demarcate the starting and ending points of each batch. While performing this calculation, we should keep in mind that the last batch may have a smaller size compared to the others:

int startIndex = batch * batchSize;
int endIndex = (batch == nrOfFullBatches) ? source.size() : (batch + 1) * batchSize;

Finally, we can add some validations and cover all the corner case scenarios. For example, when the source list is empty or if batchSize is a negative number:

static <T> Stream<List<T>> partitionList(List<T> source, int batchSize) {
    if (batchSize <= 0) {
        throw new IllegalArgumentException(String.format("The batchSize cannot be smaller than 0. Actual value: %s", batchSize));
    }
    if (source.isEmpty()) {
        return Stream.empty();
    }
    int nrOfFullBatches = (source.size() - 1) / batchSize;
    return IntStream.rangeClosed(0, nrOfFullBatches)
      .mapToObj(batch -> {
          int startIndex = batch * batchSize;
          int endIndex = (batch == nrOfFullBatches) ? source.size() : (batch + 1) * batchSize;
          return source.subList(startIndex, endIndex);
      });
}

Finally, let’s test the solution. For an input list of numbers from 1 to 8 and a batch size of 3, we’ll expect three sub-lists:

@Test
void whenPartitionList_thenReturnThreeSubLists() {
    List<Integer> source = List.of(1, 2, 3, 4, 5, 6, 7, 8);

    Stream<List<Integer>> result = partitionList(source, 3);

    assertThat(result).containsExactlyInAnyOrder(
      List.of(1, 2, 3),
      List.of(4, 5, 6),
      List.of(7, 8)
    );
}

3. Partitioning a Parallel Stream

Streams come with distinctive characteristics, such as lazy evaluation and the capacity for parallel processing. Embracing these features can be achieved by creating a custom Collector.

Moreover, given that the desired return type is a list of sub-lists, we’ll also make use of certain functions already defined by Collectors.toList(), which we’ll refer to as the downstream collector:

static <T> List<List<T>> partitionStream(Stream<T> source, int batchSize) {
    return source.collect(partitionBySize(batchSize, Collectors.toList()));
}

static <T, A, R> Collector<T, ?, R> partitionBySize(int batchSize, Collector<List<T>, A, R> downstream) {
    return Collector.of( ... );
}

We can create a Collector using the static factory method Collector.of(). Let’s consult the documentation and see what each of the parameters represents:

  • supplier – The supplier function for the new collector
  • accumulator – The accumulator function for the new collector
  • combiner – The combiner function for the new collector
  • finisher – The finisher function for the new collector
  • characteristics – The collector characteristics for the new collector

Now, let’s systematically walk through each of them, creating and understanding their functionality one by one.

3.1. The Supplier

We’ll use a temporary object to accumulate the data and split it into batches. This accumulator is typically concealed as an implementation detail.

Upon completion of the collection operation, we invoke the finisher function, which transforms this accumulator into the final result returned by the collector. The first parameter of the factory method Collector.of() will be a function that supplies an instance of our custom Accumulator.

This temporary accumulator encapsulates a list of values and the fixed batch size. Furthermore, it provides the caller with the flexibility to specify a listener that is notified when a batch reaches its capacity. Additionally, it includes a generic field to accommodate a downstream collector:

static class Accumulator<T, A> {
    private final List<T> values = new ArrayList<>();
    private final int batchSize;
    private A downstreamAccumulator;
    private final BiConsumer<A, List<T>> batchFullListener;

    // constructor
}

Needless to say, the accumulator remains fully encapsulated. For this reason, we’ll create it as a static inner class, and we’ll favor the package-protected access modifier.

Now, let’s write a method that accepts a new value. After adding it to the list, if the size of the list reaches the batchSize, we’ll notify the listener and then clear the values:

void add(T value) {
    values.add(value);
    if (values.size() == batchSize) {
        batchFullListener.accept(downstreamAccumulator, new ArrayList<>(values));
        values.clear();
    }
}

Let’s create the Supplier that instantiates this Accumulator. When a batch is full, we’ll delegate to the downstream accumulator, in our case, the one coming from Collectors.toList():

(acc, values) -> downstream.accumulator().accept(acc, values)

Finally, we can re-write this BiConsumer using method reference and create our Supplier:

Supplier<Accumulator> supplier =  () -> new Accumulator<>(
  batchSize,
  downstream.supplier().get(),
  downstream.accumulator()::accept
);

3.2. The Accumulator

The second argument when creating a custom Collector will be a BiConsumer that accepts an Accumulator and the new value. In our case, we’ll simply delegate to the Accumulator and allow it to add the value to the current batch:

BiConsumer<Accumulator<T, A>, T> accumulator = (acc, value) -> acc.add(value);

3.3. The Combiner

The combiner is a function that accepts two Accumulators and provides a way of merging them together. Firstly, we need to merge their downstreamAccumulators using the downstream’s combiner. After that, we can stream all the values accumulated by one of the accumulators and add them to the other one:

BinaryOperator<Accumulator<T, A>> combiner = (acc1, acc2) -> {
    acc1.downstreamAccumulator = downstream.combiner()
      .apply(acc1.downstreamAccumulator, acc2.downstreamAccumulator);
    acc2.values.forEach(acc1::add);
    return acc1;
};

Let’s refactor the code and encapsulate this logic inside the Accumulator class itself:

static class Accumulator<T, A> {
    private final List<T> values = new ArrayList<>();
    private final int batchSize;
    private A downstreamAccumulator;
    private final BiConsumer<A, List<T>> batchFullListener;

    // constructor

    void add(T value) {
        // ...  
    }

    Accumulator<T, A> combine(Accumulator<T, A> other, BinaryOperator<A> accumulatorCombiner) {
        this.downstreamAccumulator = accumulatorCombiner.apply(downstreamAccumulator, other.downstreamAccumulator);
        other.values.forEach(this::add);
        return this;
    }
}

This simplifies our combiner into a one-liner:

BinaryOperator<Accumulator<T, A>> combiner = (acc1, acc2) -> acc1.combine(acc2, downstream.combiner());

3.4. The Finisher

As previously mentioned, we must establish a means to convert this custom Accumulator into the ultimate result: the List of Lists. This is another place where we can rely on the downstream collector to aggregate all the batches into a single list.

Additionally, if the accumulator isn’t empty, indicating the presence of values from the last incomplete batch, we need to ensure that these values are consolidated before invoking the downstream finisher:

Function<Accumulator<T, A>, R> finisher = acc -> {
    if (!acc.values.isEmpty()) {
        downstream.accumulator().accept(acc.downstreamAccumulator, acc.values);
    }
    return downstream.finisher().apply(acc.downstreamAccumulator);
};

3.5. The Collector Characteristics

Our collector is designed to be thread-safe and is suitable for use with parallel streams. This means that the final reduction process occurs concurrently across multiple threads. A natural consequence of this parallel processing is the inability to guarantee the order of elements.

Collector Characteristics can be used to optimize reduction implementations. Based on the considerations we’ve outlined, we’ll configure the characteristics parameter to utilize Collector.Characteristics.UNORDERED:

static <T, A, R> Collector<T, ?, R> partitionBySize(int batchSize, Collector<List, A, R> downstream) {
    // ...
    return Collector.of(
      supplier,
      accumulator,
      combiner,
      finisher,
      Collector.Characteristics.UNORDERED
    );
}

3.6. The Full Solution

We now understand the roles played by each function used in collector creation. Let’s revisit the whole method before proceeding with the tests:

static <T> List<List<T>> partitionStream(Stream<T> source, int batchSize) {
    return source.collect(partitionBySize(batchSize, Collectors.toList()));
}

static <T, A, R> Collector<T, ?, R> partitionBySize(int batchSize, Collector<List<T>, A, R> downstream) {
    Supplier<Accumulator<T, A>> supplier = () -> new Accumulator<>(
      batchSize, 
      downstream.supplier().get(), 
      downstream.accumulator()::accept
    );

    BiConsumer<Accumulator<T, A>, T> accumulator = (acc, value) -> acc.add(value);

    BinaryOperator<Accumulator<T, A>> combiner = (acc1, acc2) -> acc1.combine(acc2, downstream.combiner());

    Function<Accumulator<T, A>, R> finisher = acc -> {
        if (!acc.values.isEmpty()) {
            downstream.accumulator().accept(acc.downstreamAccumulator, acc.values);
        }
        return downstream.finisher().apply(acc.downstreamAccumulator);
    };
    
    return Collector.of(supplier, accumulator, combiner, finisher, Collector.Characteristics.UNORDERED);
}

During testing, we’ll no longer be able to assert the values within each batch. Consequently, our assertions will focus solely on verifying the count and sizes of the batches. For instance, when partitioning a parallel stream that contains integers between 1 and 8 with a batchSize of 3, we’ll generate two complete batches, each containing three elements, and one batch with two elements:

@Test
void whenPartitionStream_thenReturnThreeSubLists() {
    Stream<Integer> source = Stream.of(1, 2, 3, 4, 5, 6, 7, 8).parallel();

    List<List<Integer>> result = partitionStream(source, 3);

    assertThat(result)
      .hasSize(3)
      .satisfies(batch -> assertThat(batch).hasSize(3), atIndex(0))
      .satisfies(batch -> assertThat(batch).hasSize(3), atIndex(1))
      .satisfies(batch -> assertThat(batch).hasSize(2), atIndex(2));
}

4. Partitioning a Stream Using Guava

To avoid potential errors, we can opt for the utilization of a proven third-party library rather than building a thread-safe Collector from scratch. For instance, Google’s Guava provides an elegant and concise way for partitioning a Stream into an Iterable comprising Lists of the same data type.

Firstly, let’s add the dependency to our pom.xml:

<dependency>
    <groupId>com.google.guava</groupId>
    <artifactId>guava</artifactId>
    <version>33.0.0-jre</version>
</dependency>

Now, we can simply use the static method Iterables.partition(). This function accepts an Iterable and the desired batch size as its parameters:

static <T> Iterable<List<T>> partitionUsingGuava(Stream<T> source, int batchSize) {
    return Iterables.partition(source::iterator, batchSize);
}

The only distinction in our testing approach lies in the altered return type, now an Iterable. To assert the batch sizes, we’ll gather all elements of the Iterable into an ArrayList. Besides this adjustment, the testing procedure remains unchanged:

@Test
void whenPartitionParallelStreamWithGuava_thenReturnThreeSubLists() {
    Stream<Integer> source = Stream.of(1, 2, 3, 4, 5, 6, 7, 8).parallel();

    Iterable<List<Integer>> result = partitionUsingGuava(source, 3);

    assertThat(result)
      .map(ArrayList::new)
      .hasSize(3)
      .satisfies(batch -> assertThat(batch).asList().hasSize(3), atIndex(0))
      .satisfies(batch -> assertThat(batch).asList().hasSize(3), atIndex(1))
      .satisfies(batch -> assertThat(batch).asList().hasSize(2), atIndex(2));
}

5. Conclusion

In this article, we explored various ways of partitioning a Stream in Java. We started by recalling how we can split a List into smaller sub-lists of fixed values of fixed sizes. Following that, we discussed the advantages of Streams and parallel Streams, and we created our own custom Collector for them.

Finally, we ventured into Guava’s API, which enables us to accomplish the same functionality effortlessly using the static method Iterables.partition().

As always, the complete source code for the examples is available over on GitHub.

Course – LS – All

Get started with Spring and Spring Boot, through the Learn Spring course:

>> CHECK OUT THE COURSE
res – REST with Spring (eBook) (everywhere)
Comments are open for 30 days after publishing a post. For any issues past this date, use the Contact form on the site.