Generic Top

Get started with Spring 5 and Spring Boot 2, through the Learn Spring course:

>> CHECK OUT THE COURSE

1. Overview

In this tutorial, we'll explore how to accomplish batch processing of Stream data in Java. We'll see examples using both native Java features and some third-party libraries.

2. What Does Batch Processing of Stream Data Mean?

Batch processing of Stream data in Java refers to the practice of dividing a big dataset into smaller, more manageable chunks and processing them in order. In this scenario, the data source for processing comes from a data stream.

This can be advantageous for a variety of reasons, including increasing data processing efficiency, processing very huge datasets that may not fit in memory all at once, and providing a mechanism to process data in parallel using several processors.

However, there are various issues that can occur when implementing batch processing:

  • Setting an acceptable batch size: The overhead of processing each batch may become significant if the batch size is too small. However, if the batch size is too large, processing each batch may take too long, which can cause delays in the stream-processing pipeline.
  • State management: In order to keep track of intermediate results or to guarantee that each batch is processed consistently with prior batches, it's often required to preserve the state between batches when employing batch processing. The complexity of working with dispersed systems increases the difficulty of state management.
  • Fault tolerance: When processing large datasets in batches, it's critical to ensure that processing can be continued if a failure occurs. This can be difficult since it may be necessary to store huge quantities of the intermediate state in order to resume processing.

In this article, for simplicity and clarity, we'll only focus on the batch processing of Stream data in Java and not on how to address the above-mentioned issues.

3. Batch Processing With Java Streams API

First, we must note some crucial concepts we'll work with. First, we have the Streams API, a major feature introduced in Java 8. From the Streams API, we'll use the Stream class.

In this case, we need to consider that a declared data stream can be called only once. If we try to operate a second time over the same data stream, we get an IllegalStateException. A quick exapmle shows us this behavior:

Stream<String> coursesStream = Stream.of("Java", "Frontend", "Backend", "Fullstack");
Stream<Integer> coursesStreamLength = coursesStream.map(String::length);
// we get java.lang.IllegalStateException
Stream<String> emphasisCourses = coursesStream.map(course -> course + "!");

Second, we'll work on most examples within the following sections using a functional style. Some of the examples have side effects, and we must try to avoid them in a functional programming style.

Before we build our code examples, let's define our test data stream, batch size, and expected batch result.

Our data stream will be a Stream of Integer values:

Stream<Integer> data = IntStream.range(0, 34).boxed();

Then, our batch size is 10:

private final int BATCH_SIZE = 10;

And finally, let's define our expected batches:

private final List<Integer> firstBatch = List.of(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
private final List<Integer> secondBatch = List.of(10, 11, 12, 13, 14, 15, 16, 17, 18, 19);
private final List<Integer> thirdBatch = List.of(20, 21, 22, 23, 24, 25, 26, 27, 28, 29);
private final List<Integer> fourthBatch = List.of(30, 31, 32, 33);

Next, let's look at some examples.

4. Using an Iterator

The first approach uses a custom implementation from the Iterator interface. We define a CustomBatchIterator class, and we can set the batch size when initializing a new instance of the Iterator.

Let's jump into the code:

public class CustomBatchIterator<T> implements Iterator<List<T>> {
    private final int batchSize;
    private List<T> currentBatch;
    private final Iterator<T> iterator;
    public CustomBatchIterator(Iterator<T> sourceIterator, int batchSize) {
        this.batchSize = batchSize;
        this.iterator = sourceIterator;
    }
    @Override
    public List<T> next() {
        return currentBatch;
    }
    @Override
    public boolean hasNext() {
        prepareNextBatch();
        return currentBatch != null && !currentBatch.isEmpty();
    }
    private void prepareNextBatch() {
        currentBatch = new ArrayList<>(batchSize);
        while (iterator.hasNext() && currentBatch.size() < batchSize) {
            currentBatch.add(iterator.next());
        }
    }
}

Here, we've overridden the hasNext() and next() methods of the Iterator interface in our CustomBatchIterator class. If the current batch is not empty, the hasNext() method prepares the next batch of data by executing the prepareNextBatch() method. We need to use the next() method to get the latest information.

The prepareNextBatch() method begins by populating the current batch with elements from the source iterator until the batch is complete or the source iterator runs out of elements, whichever happens first. currentBatch is initialized as an empty list with a size equal to batchSize.

The next step is to add two static methods to our class:

public class CustomBatchIterator<T> implements Iterator<List<T>> {

    // other methods

    public static <T> Stream<List<T>> batchStreamOf(Stream<T> stream, int batchSize) {
        return stream(new CustomBatchIterator<>(stream.iterator(), batchSize));
    }
    private static <T> Stream<T> stream(Iterator<T> iterator) {
        return StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, ORDERED), false);
    }
}

Our batchStreamOf() method generates a stream of batches from a data stream. It accomplishes this by instantiating the CustomBatchIterator class and handing it to the stream() method, which produces a Stream from an Iterator.

Our stream() method creates a Spliterator (a special iterator that can be explored using streams) from the Iterator using the Spliterators.spliteratorUnknownSize() method and then gives the Spliterator to the StreamSupport.stream() method to build the stream.

Now, it's time to test our implementation:

@Test
public void givenAStreamOfData_whenIsProcessingInBatchUsingSpliterator_thenFourBatchesAreObtained() {
    Collection<List<Integer>> result = new ArrayList<>();
    CustomBatchIterator.batchStreamOf(data, BATCH_SIZE).forEach(result::add);
    assertTrue(result.contains(firstBatch));
    assertTrue(result.contains(secondBatch));
    assertTrue(result.contains(thirdBatch));
    assertTrue(result.contains(fourthBatch));
}

In the above test, we pass our data stream and batch size to the batchStreamOf() method. Then, we check that we got four batches after data processing.

5. Using Collection API

Our next example uses the Collection API and is relatively more straightforward than the first one.

Let's see our test case:

@Test
public void givenAStreamOfData_whenIsProcessingInBatchUsingCollectionAPI_thenFourBatchesAreObtained() {
    Collection<List<Integer>> result = data.collect(Collectors.groupingBy(it -> it / BATCH_SIZE))
      .values();
    assertTrue(result.contains(firstBatch));
    assertTrue(result.contains(secondBatch));
    assertTrue(result.contains(thirdBatch));
    assertTrue(result.contains(fourthBatch));
}

We use Collectors.groupingBy() method from the Java Streams API in this code snippet to group the elements in the data stream by a key calculated using the it -> it / BATCH_SIZE lambda expression. The lambda expression divides each element by BATCH_SIZE, and the result is returned as the key.

Then, we invoke the map's values method to retrieve a collection of the lists of elements, which we save in the result variable.

We can use the parallel() method from Stream for large datasets. However, we need to consider that the order of execution is out of our control. It may change every time we run the program.

Let's check our test case using parallel():

@Test
public void givenAStreamOfData_whenIsProcessingInBatchParallelUsingCollectionAPI_thenFourBatchesAreObtained() {
    Collection<List<Integer>> result = data.parallel()
      .collect(Collectors.groupingBy(it -> it / BATCH_SIZE))
      .values();
    assertTrue(result.contains(firstBatch));
    assertTrue(result.contains(secondBatch));
    assertTrue(result.contains(thirdBatch));
    assertTrue(result.contains(fourthBatch));
}

6. RxJava

RxJava is a Java version of ReactiveX, which is a library for composing asynchronous and event-based programs using observable sequences. We can use it in conjunction with the Streams API to do batch processing in Java.

First, let's add its dependency in our pom.xml file:

<dependency>
    <groupId>io.reactivex.rxjava3</groupId>
    <artifactId>rxjava</artifactId>
    <version>3.1.5</version>
</dependency>

Our next step is to implement the test case:

@Test
public void givenAStreamOfData_whenIsProcessingInBatchUsingRxJavaV3_thenFourBatchesAreObtained() {
    Collection<List<Integer>> result = new ArrayList<>();
    Observable.fromStream(data)
      .buffer(BATCH_SIZE)
      .subscribe(result::add);
    assertTrue(result.contains(firstBatch));
    assertTrue(result.contains(secondBatch));
    assertTrue(result.contains(thirdBatch));
    assertTrue(result.contains(fourthBatch));
}

To divide the data stream into manageable chunks, this code uses the buffer() operator from the RxJava library, with each chunk's size determined by the variable BATCH_SIZE.

Additionally, we create an Observable from a data stream using the Observable.fromStream() method. We call the Observable‘s buffer() method with BATCH_SIZE as the input. The Observable items are sorted into lists of the size we choose, and each list is emitted as a new item in the stream.

The result is an Observable, and the subscribe() method is invoked on it with result::add as its argument. This creates a subscription to the Observable, and every time the Observable emits an item, the add method of the result list is invoked. In this scenario, the Observable‘s output consists of lists of elements aggregated into sets.

7. Vavr

Vavr is a functional programming library with immutable collections and other functional data structures.

In this case, we add its dependency in our pom.xml file:

<dependency>
    <groupId>io.vavr</groupId>
    <artifactId>vavr</artifactId>
    <version>1.0.0-alpha-4</version>
</dependency>

Now, let's see the test case in action:

@Test
public void givenAStreamOfData_whenIsProcessingInBatchUsingVavr_thenFourBatchesAreObtained() {
    List<List<Integer>> result = Stream.ofAll(data)
      .toList()
      .grouped(BATCH_SIZE)
      .toList();
    assertTrue(result.contains(firstBatch));
    assertTrue(result.contains(secondBatch));
    assertTrue(result.contains(thirdBatch));
    assertTrue(result.contains(fourthBatch));
}

The Stream.ofAll() method transforms the data set into a stream using the Stream.ofAll() method. Finally, we use Stream‘s toList() method to turn it into a List. This final List is passed as an argument to the grouped() method along with the value BATCH_SIZE. This method returns an ordered List with BATCH_SIZE elements taken from the original list and replicated once within each inner list.

The List class is from the io.vavr.collection in the above test and not from java.util.List.

8. Reactor

The next option for batch processing is using the Reactor library. In addition to batch processing, Reactor, a Java library for reactive programming, offers several operators for working with streams. In this case, we'll work with Flux to do the batch processing.

For this example, let's add the dependency to our pom.xml file:

<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-core</artifactId>
    <version>3.5.1</version>
</dependency>

Let's implement our test case:

@Test
public void givenAStreamOfData_whenIsProcessingInBatchUsingReactor_thenFourBatchesAreObtained() {
    Collection<List<Integer>> result = new ArrayList<>();
    Flux.fromStream(data)
      .buffer(BATCH_SIZE)
      .subscribe(result::add);
    assertTrue(result.contains(firstBatch));
    assertTrue(result.contains(secondBatch));
    assertTrue(result.contains(thirdBatch));
    assertTrue(result.contains(fourthBatch));
}

To create a Flux from a java.util.stream.Stream object, we use the Flux.fromStream() method. This is handy when we want to process the elements of the stream using the reactive operators provided by the Flux class.

The buffer() operator is used to batch elements into fixed-size lists. Flux is added to the current list when it emits a new element. When the list reaches the appropriate size, Flux emits it, and a new list is formed. This can be useful for batch processing optimization, such as lowering the number of database queries or network requests.

Finally, the subscribe() method adds a Flux subscriber. The subscriber receives the items emitted by Flux. Next, it adds them to a result object. The subscribe() method produces a Subscription object, which may be used to regulate data flow and unsubscribe from the Flux when it's no longer required.

9. Apache Commons

We can use a powerful library such as Apache Commons Collections to perform batch processing.

Let's add its dependency in our pom.xml file:

<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-collections4</artifactId>
    <version>4.4</version>
</dependency>

Our test implementation is straightforward:

@Test
public void givenAStreamOfData_whenIsProcessingInBatchUsingApacheCommon_thenFourBatchesAreObtained() {
    Collection<List<Integer>> result = new ArrayList<>(ListUtils
      .partition(data.collect(Collectors.toList()), BATCH_SIZE));
    assertTrue(result.contains(firstBatch));
    assertTrue(result.contains(secondBatch));
    assertTrue(result.contains(thirdBatch));
    assertTrue(result.contains(fourthBatch));
}

The partition() method is an Apache Commons ListUtils utility method that accepts a List and a size. It produces a List of Lists, with each inner List having a maximum size of the provided size. We can notice that the data stream is converted into a List before passing it to the partition() method.

10. Guava

Next, we have the Guava library. Guava offers a variety of utility methods for working with collections, including batch processing.

Let's add the dependency in our pom.xml file:

<dependency>
    <groupId>com.google.guava</groupId>
    <artifactId>guava</artifactId>
    <version>31.1-jre</version>
</dependency>

Now, let's see our working example:

@Test
public void givenAStreamOfData_whenIsProcessingInBatchUsingGuava_thenFourBatchesAreObtained() {
    Collection<List<Integer>> result = new ArrayList<>();
    Iterators.partition(data.iterator(), BATCH_SIZE).forEachRemaining(result::add);
    assertTrue(result.contains(firstBatch));
    assertTrue(result.contains(secondBatch));
    assertTrue(result.contains(thirdBatch));
    assertTrue(result.contains(fourthBatch));
}

The Iterators.partition() method can help break a large dataset into smaller chunks for processing, such as analyzing the data in parallel or loading it into a database in batches.

We use the Iterators.partition() method to split an Iterator of data into a series of smaller Iterators. The data passed to the Iterators.partition() method is the Iterator from our data stream. Additionally, we passed it the BATCH_SIZE.

11. Cyclops

Finally, we have the Cyclops library based on the jool library. Cyclops React is a library that includes several operators for interacting with streams, including some for batch processing.

Let's add its dependency to our pom.xml:

<dependency>
    <groupId>com.oath.cyclops</groupId>
    <artifactId>cyclops</artifactId>
    <version>10.4.1</version>
</dependency>

And let's look at the code for our last example:

@Test
public void givenAStreamOfData_whenIsProcessingInBatchUsingCyclops_thenFourBatchesAreObtained() {
    Collection<List<Integer>> result = new ArrayList<>();
    ReactiveSeq.fromStream(data)
      .grouped(BATCH_SIZE)
      .toList()
      .forEach(value -> result.add(value.collect(Collectors.toList())));
    assertTrue(result.contains(firstBatch));
    assertTrue(result.contains(secondBatch));
    assertTrue(result.contains(thirdBatch));
    assertTrue(result.contains(fourthBatch));
}

The ReactiveSeq class is a type of reactive sequence. Besides, the ReactiveSeq.fromStream() method turns the data stream into a reactive sequence. Then, the data is grouped into batches of BATCH_SIZE. The processed data is then collected into a collection of integer Lists.

However, we can get a lazy, functional style using LazySeq. In this case, we only need to replace ReactiveSeq with LazySeq:

@Test
public void givenAStreamOfData_whenIsProcessingInBatchUsingCyclopsLazy_thenFourBatchesAreObtained() {
    Collection<List<Integer>> result = new ArrayList<>();
    LazySeq.fromStream(data)
      .grouped(BATCH_SIZE)
      .toList()
      .forEach(value -> result.add(value.collect(Collectors.toList())));
    assertTrue(result.contains(firstBatch));
    assertTrue(result.contains(secondBatch));
    assertTrue(result.contains(thirdBatch));
    assertTrue(result.contains(fourthBatch));
}

12. Conclusion

In this article, we learned several approaches to accomplish batch processing of Streams in Java. We explored several alternatives, from Java native APIs to some popular libraries like RxJava, Vavr, and Apache Commons.

As usual, all code samples are available over on GitHub.

Generic bottom

Get started with Spring 5 and Spring Boot 2, through the Learn Spring course:

>> CHECK OUT THE COURSE
Generic footer banner
guest
2 Comments
Oldest
Newest
Inline Feedbacks
View all comments