Course – LS – All

Get started with Spring and Spring Boot, through the Learn Spring course:

>> CHECK OUT THE COURSE

1. Overview

The Java Stream API provides various methods that allow modifications of the stream elements. However, the actions inside these methods have to be non-interfering and stateless. Otherwise, this would result in incorrect behavior and output.

In this tutorial, we’ll discuss the common mistakes made while modifying the elements in a Java Stream and the correct way to do it.

2. Change the State of a Stream Element

Let’s take an example of a list of Person class:

public class Person {
    private String name;
    private String email;

    public Person(String name, String email) {
        this.name = name;
        this.email = email;
    }
    //standard getters and setters..
}

We’ll modify the email ID of the Person elements inside a stream and convert it to uppercase.

2.1. Modify With forEach() Method

Let’s start with a very basic way of doing this by simply iterating over the list using the method forEach():

@Test
void givenPersonList_whenUpdatePersonEmailByInterferingWithForEach_thenPersonEmailUpdated() {
    personList.stream().forEach(e -> e.setEmail(e.getEmail().toUpperCase()));

    personList.stream().forEach(e -> assertEquals(e.getEmail(), e.getEmail().toUpperCase()));
}

In the above method, while iterating over the list of Person objects, the email of each of the elements is converted to uppercase. It looks legitimate but it violates the principle of non-interference. It means that in a stream pipeline, we should never modify the original source.

Unless the stream source is concurrent, modifying a stream’s data source during the execution of a stream pipeline can cause exceptions, incorrect answers, or nonconformant behavior.

2.2. Modify With peek() Method

Let’s now look at the peek() method. We’re often tempted to use it for modifying the properties of the elements inside a stream:

@Test
void givenPersonList_whenUpdatePersonEmailByInterferingWithPeek_thenPersonEmailUpdated() {
    personList.stream()
      .peek(e -> e.setEmail(e.getEmail().toUpperCase()))
      .collect(Collectors.toList());

    personList.forEach(e -> assertEquals(e.getEmail(), e.getEmail().toUpperCase()));
}

Again, by updating the source personList we’re repeating the same mistake mentioned in the earlier section.

2.3. Modify With map() Method

The method forEach() is a terminal operation in a stream pipeline. However, map(), like peek() is an intermediate operation that returns a Stream. In map() we’ll create a new Person object with email in uppercase and then collect it into a new list:

@Test
void givenPersonList_whenUpdatePersonEmailWithMapMethod_thenPersonEmailUpdated() {
    List<Person> newPersonList = personList.stream()
      .map(e -> new Person(e.getName(), e.getEmail().toUpperCase()))
      .collect(Collectors.toList());

    newPersonList.forEach(e -> assertEquals(e.getEmail(), e.getEmail().toUpperCase()));
}

In the above method, we didn’t modify the original list. Instead, we created a new list newPersonList out of it. Hence, it’s non-interfering. It’s also stateless because the results of the actions inside it don’t affect each other. Mostly, they operate independently. These principles are recommended, regardless of whether it’s a sequential or a parallel processing.

Considering immutability is one of the essences of functional programming, we can try to create an immutable Person class:

public class ImmutablePerson {

    private String name;
    private String email;

    public ImmutablePerson(String name, String email) {
        this.name = name;
        this.email = email;
    }

    public ImmutablePerson withEmail(String email) {
        return new ImmutablePerson(this.name, email);
    }
    //Standard getters
}

The ImmutablePerson class doesn’t have any setter methods. However, it provides a method withEmail() that returns a new ImmutablePerson with email in uppercase.

Now, let’s use it while modifying the elements in the stream:

@Test
void givenPersonList_whenUpdateImmutablePersonEmailWithMapMethod_thenPersonEmailUpdated() {
    List<ImmutablePerson> newImmutablePersonList = immutablePersonList.stream()
      .map(e -> e.withEmail(e.getEmail().toUpperCase()))
      .collect(Collectors.toList());

    newImmutablePersonList.forEach(e -> assertEquals(e.getEmail(), e.getEmail().toUpperCase()));
}

With this, we’re enforcing non-interference.

3. Remove Element From a Stream

Performing structural changes in a stream is even trickier. This is a costlier operation than modification and hence if care isn’t taken, it might lead to inconsistent and undesirable outcomes. Let’s explore this in detail.

3.1. Remove Element With forEach() Method

What if we want to remove a few elements from a stream? For example, let’s remove the person with the name John from the list:

@Test
void givenPersonList_whenRemoveWhileIterating_thenThrowException() {
    assertThrows(NullPointerException.class, () -> {
        personList.stream().forEach(e -> {
            if(e.getName().equals("John")) {
                personList.remove(e);
            }
        });
    });
}

We tried to modify the structure of the list in the forEach() method while iterating. Surprisingly, this results in NullPointerException unlike the forEach() in an ArrayList which throws ConcurrentModificationException:

@Test
void givenPersonList_whenRemoveWhileIteratingWithForEach_thenThrowException() {
    assertThrows(ConcurrentModificationException.class, () -> {
        personList.forEach(e -> {
            if(e.getName().equals("John")) {
                personList.remove(e);
            }
        });
    });
}

3.2. Remove Element With CopyOnWriteArrayList

CopyOnWriteArrayList is a thread-safe version of ArrayList. While iterating on it elements can be removed:

@Test
void givenPersonList_whenRemoveWhileIterating_thenPersonRemoved() {
    assertEquals(4, personList.size());
    
    CopyOnWriteArrayList<Person> cps = new CopyOnWriteArrayList<>(personList);
    cps.stream().forEach(e -> {
        if(e.getName().equals("John")) {
            cps.remove(e);
        }
    });

    assertEquals(3, cps.size());
}

It can prevent interference among multiple threads but it’s too costly because, for every write operation, it creates a snapshot.

3.3. Remove Element With filter() Method

The Java Stream API provides the method filter() to remove elements in a more elegant way:

@Test
void givenPersonList_whenRemovePersonWithFilter_thenPersonRemoved() {
    assertEquals(4, personList.size());

    List<Person> newPersonList = personList.stream()
      .filter(e -> !e.getName().equals("John"))
      .collect(Collectors.toList());

    assertEquals(3, newPersonList.size());
}

In the above method, filter() allows only those Person objects to move forward in the pipeline that don’t have a name, John. Again, the predicate used inside the filter method should be non-interfering and stateless. It also looks simpler, easy to understand and troubleshoot.

4. Conclusion

In this article, we’ve looked at the correct way of modifying the elements in a stream. It’s important that pipeline processing should be non-interfering and stateless. Otherwise, this could result in unexpected results.

As usual, the code used in this article can be found over on GitHub.

Course – LS – All

Get started with Spring and Spring Boot, through the Learn Spring course:

>> CHECK OUT THE COURSE
res – REST with Spring (eBook) (everywhere)
Comments are open for 30 days after publishing a post. For any issues past this date, use the Contact form on the site.