Java Top

Get started with Spring 5 and Spring Boot 2, through the Learn Spring course:

>> CHECK OUT THE COURSE

1. Overview

In this tutorial, we'll look at several ways to handle exceptions in Project Reactor. Operators introduced in the code examples are defined in both the Mono and Flux classes. However, we'll only focus on methods in the Flux class.

2. Maven Dependencies

Let's start by adding the Reactor core dependency:

<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-core</artifactId
    <version>3.4.9</version>
</dependency>

3. Throwing Exceptions Directly in a Pipeline Operator

The simplest way to handle an Exception is by throwing it. If something abnormal happens during the processing of a stream element, we can throw an Exception with the throw keyword as if it were a normal method execution.

Let's assume we need to parse a stream of Strings to Integers. If an element isn't a numeric String, we'll need to throw an Exception.

It's a common practice to use the map operator for such a conversion:

Function<String, Integer> mapper = input -> {
    if (input.matches("\\D")) {
        throw new NumberFormatException();
    } else {
        return Integer.parseInt(input);
    }
};

Flux<String> inFlux = Flux.just("1", "1.5", "2");
Flux<Integer> outFlux = inFlux.map(mapper);

As we can see, the operator throws an Exception if an input element is invalid. When we throw the Exception this way, Reactor catches it and signals an error downstream:

StepVerifier.create(outFlux)
    .expectNext(1)
    .expectError(NumberFormatException.class)
    .verify();

This solution works, but it's not elegant. As specified in the Reactive Streams specification, rule 2.13, an operator must return normally. Reactor helped us by converting the Exception to an error signal. However, we could do better.

Essentially, reactive streams rely on the onError method to indicate a failure condition. In most cases, this condition must be triggered by an invocation of the error method on the Publisher. Using an Exception for this use case brings us back to traditional programming.

4. Handling Exceptions in the handle Operator

Similar to the map operator, we can use the handle operator to process items in a stream one by one. The difference is that Reactor provides the handle operator with an output sink, allowing us to apply more complicated transformations.

Let's update our example from the previous section to use the handle operator:

BiConsumer<String, SynchronousSink<Integer>> handler = (input, sink) -> {
    if (input.matches("\\D")) {
        sink.error(new NumberFormatException());
    } else {
        sink.next(Integer.parseInt(input));
    }
};

Flux<String> inFlux = Flux.just("1", "1.5", "2");
Flux<Integer> outFlux = inFlux.handle(handler);

Unlike the map operator, the handle operator receives a functional consumer, called once for each element. This consumer has two parameters: an element coming from upstream and a SynchronousSink that builds an output to be sent downstream.

If the input element is a numeric String, we call the next method on the sink, providing it with the Integer converted from the input. If it isn't a numeric String, we'll indicate the situation by calling the error method with an Exception object.

Notice that an invocation of the error method will cancel the subscription to the upstream and invoke the onError method on the downstream. Such collaboration of error and onError is the standard way to handle Exceptions in reactive streams.

Let's verify the output stream:

StepVerifier.create(outFlux)
    .expectNext(1)
    .expectError(NumberFormatException.class)
    .verify();

5. Handling Exceptions in the flatMap Operator

Another commonly used operator that supports error handling is flatMap. This operator transforms input elements into Publishers, then flattens the Publishers into a new stream. We can take advantage of these Publishers to signify an erroneous state.

Let's try the same example using flatMap:

Function<String, Publisher<Integer>> mapper = input -> {
    if (input.matches("\\D")) {
        return Mono.error(new NumberFormatException());
    } else {
        return Mono.just(Integer.parseInt(input));
    }
};

Flux<String> inFlux = Flux.just("1", "1.5", "2");
Flux<Integer> outFlux = inFlux.flatMap(mapper);

StepVerifier.create(outFlux)
    .expectNext(1)
    .expectError(NumberFormatException.class)
    .verify();

Unsurprisingly, the result is the same as before.

Notice the only difference between handle and flatMap regarding error handling is that the handle operator calls the error method on a sink, while flatMap calls it on a Publisher.

If we're dealing with a stream represented by a Flux object, we can also use concatMap to handle errors. This method behaves in much the same way as flatMap, but it doesn't support asynchronous processing.

6. Avoiding NullPointerException

This section covers the handling of null references, which often cause NullPointerExceptions, a commonly encountered Exception in Java. To avoid this exception, we usually compare a variable with null and direct the execution to a different way if that variable is actually null. It's tempting to do the same in reactive streams:

Function<String, Integer> mapper = input -> {
    if (input == null) {
        return 0;
    } else {
        return Integer.parseInt(input);
    }
};

We may think that a NullPointerException won't occur because we already handled the case when the input value is null. However, the reality tells a different story:

Flux<String> inFlux = Flux.just("1", null, "2");
Flux<Integer> outFlux = inFlux.map(mapper);

StepVerifier.create(outFlux)
    .expectNext(1)
    .expectError(NullPointerException.class)
    .verify();

Apparently, a NullPointerException triggered an error downstream, meaning our null check didn't work.

To understand why that happened, we need to go back to the Reactive Streams specification. Rule 2.13 of the specification says that “calling onSubscribe, onNext, onError or onComplete MUST return normally except when any provided parameter is null in which case it MUST throw a java.lang.NullPointerException to the caller”.

As required by the specification, Reactor throws a NullPointerException when a null value reaches the map function.

Therefore, there's nothing we can do about a null value when it reaches a certain stream. We can't handle it or convert it to a non-null value before passing it downstream. Therefore, the only way to avoid a NullPointerException is to make sure that null values won't make it to the pipeline.

7. Conclusion

In this article, we've walked through Exception handling in Project Reactor. We discussed a couple of examples and clarified the process. We also covered a special case of exception that can happen when processing a reactive stream — NullPointerException.

As usual, the source code for our application is available over on GitHub.

Java bottom

Get started with Spring 5 and Spring Boot 2, through the Learn Spring course:

>> CHECK OUT THE COURSE
Junit footer banner
guest
0 Comments
Inline Feedbacks
View all comments