Course – LS – All

Get started with Spring and Spring Boot, through the Learn Spring course:

>> CHECK OUT THE COURSE

1. Overview

In this tutorial, we’ll explore various ways of accessing the first element of a Flux with Spring 5 WebFlux.

Firstly, we’ll use non-blocking methods of the API, such as next() and take(). After that, we’ll see how to achieve the same thing with the help of elementAt() method, where we need to specify the index.

Finally, we’ll learn about the blocking methods of the API, and we’ll use blockFirst() to access the first element of the flux.

2. Test Setup

For the code examples in this article, we’ll use the Payment class, which only has one field, the payment amount:

public class Payment {
    private final int amount;
    // constructor and getter
}

In the tests, we’ll construct a flux of payments using the test helper method called fluxOfThreePayments:

private Flux<Payment> fluxOfThreePayments() {
    return Flux.just(paymentOf100, new Payment(200), new Payment(300));
}

After that, we’ll use Spring Reactor’s StepVerifier to test the results.

3. next()

First, let’s try the next() method. This method will return the first element of the flux, wrapped into the reactive Mono type:

@Test
void givenAPaymentFlux_whenUsingNext_thenGetTheFirstPaymentAsMono() {
    Mono<Payment> firstPayment = fluxOfThreePayments().next();

    StepVerifier.create(firstPayment)
      .expectNext(paymentOf100)
      .verifyComplete();
}

On the other hand, if we’ll call next() on an empty flux, the result will be an empty Mono. Consequently, blocking it will return null:

@Test
void givenAEmptyFlux_whenUsingNext_thenGetAnEmptyMono() {
    Flux<Payment> emptyFlux = Flux.empty();

    Mono<Payment> firstPayment = emptyFlux.next();

    StepVerifier.create(firstPayment)
      .verifyComplete();
}

4. take()

The take() method of a reactive flux is equivalent to limit() for Java 8 Streams. In other words, we can use take(1) to limit the flux to exactly one element and then use it in a blocking or non-blocking way:

@Test
void givenAPaymentFlux_whenUsingTake_thenGetTheFirstPaymentAsFlux() {
    Flux<Payment> firstPaymentFlux = fluxOfThreePayments().take(1);

    StepVerifier.create(firstPaymentFlux)
      .expectNext(paymentOf100)
      .verifyComplete();
}

Similarly, for an empty flux, take(1) will return an empty flux:

@Test
void givenAEmptyFlux_whenUsingNext_thenGetAnEmptyFlux() {
    Flux<Payment> emptyFlux = Flux.empty();

    Flux<Payment> firstPaymentFlux = emptyFlux.take(1);

    StepVerifier.create(firstPaymentFlux)
      .verifyComplete();
}

5. elementAt()

The Flux API also offers the elementAt() method. We can use elementAt(0) to get the first element of a flux in a non-blocking way:

@Test
void givenAPaymentFlux_whenUsingElementAt_thenGetTheFirstPaymentAsMono() {
    Mono<Payment> firstPayment = fluxOfThreePayments().elementAt(0);

    StepVerifier.create(firstPayment)
      .expectNext(paymentOf100)
      .verifyComplete();
}

Though, if the index passed as a parameter is greater than the number of elements emitted by the flux, an error will be emitted:

@Test
void givenAEmptyFlux_whenUsingElementAt_thenGetAnEmptyMono() {
    Flux<Payment> emptyFlux = Flux.empty();

    Mono<Payment> firstPayment = emptyFlux.elementAt(0);

    StepVerifier.create(firstPayment)
      .expectError(IndexOutOfBoundsException.class);
}

6. blockFirst()

Alternatively, we can also use blockFirst(). Though, as the name suggests, this is a blocking method. As a result, if we use blockFirst(), we’ll be leaving the reactive world, and we’ll lose all its benefits:

@Test
void givenAPaymentFlux_whenUsingBlockFirst_thenGetTheFirstPayment() {
    Payment firstPayment = fluxOfThreePayments().blockFirst();

    assertThat(firstPayment).isEqualTo(paymentOf100);
}

7. toStream()

Finally, we can convert the flux to a Java 8 stream and then access the first element:

@Test
void givenAPaymentFlux_whenUsingToStream_thenGetTheFirstPaymentAsOptional() {
    Optional<Payment> firstPayment = fluxOfThreePayments().toStream()
      .findFirst();

    assertThat(firstPayment).contains(paymentOf100);
}

But, yet again, if we do this, we won’t be able to continue using the reactive pipelines.

8. Conclusion

In this article, we discussed the API of Java’s reactive streams. We’ve seen various ways of accessing the first element of a Flux, and we learned that we should stick to the non-blocking solutions if we want to use the reactive pipelines.

As always, code from this article can be found over on GitHub.

Course – LS – All

Get started with Spring and Spring Boot, through the Learn Spring course:

>> CHECK OUT THE COURSE
res – Junit (guide) (cat=Reactive)
Comments are closed on this article!