Course – LS – All

Get started with Spring and Spring Boot, through the Learn Spring course:

>> CHECK OUT THE COURSE

1. Introduction

In this article, we’ll discuss various options provided by Spring WebFlux to cancel an ongoing Flux. Firstly, we’ll have a quick overview of Flux in the context of Reactive Programming. Next, we’ll look into the need to cancel ongoing Flux.

We’ll look into various methods provided by Spring WebFlux to both explicitly and automatically cancel the subscription. We’ll drive our simple example using JUnit tests to verify that the system behaves as expected. Finally, we’ll see how we can perform post-cancelation cleanup, enabling us to reset the system to the desired state after the cancelation.

Let’s start with a quick overview of Flux first.

2. What Is Flux?

Spring WebFlux is a Reactive web framework that provides powerful features for building asynchronous, non-blocking applications. One of the key features of Spring WebFlux is its ability to handle fluxes. A Flux is a reactive data stream that can emit zero or more items. It can be created from various sources, such as a database query, a network call, or an in-memory collection.

A related terminology that we should be aware of in this context is that of subscription, which represents the connection between a data source (i.e., the publisher) and a data consumer (i.e., the subscriber). The subscription maintains a state that reflects whether the subscription is active or not. It can be used to cancel the subscription, which will stop the emission of data by Flux and free up any resources being held by the publisher. Some potential scenarios where we might want to cancel the ongoing subscription might be the user canceling a request or when a timeout occurs etc.

3. Benefits of Canceling an Ongoing Flux

In Reactive Spring WebFlux, it is important to cancel ongoing Flux to ensure efficient use of system resources and prevent potential memory leaks. Here are some reasons:

  • Backpressure: Reactive programming uses backpressure to regulate the data flow between the publisher and subscriber. If the subscriber cannot keep up with the pace of the publisher, backpressure is used to slow down or stop the flow of data. If an ongoing subscription is not canceled, it will continue to generate data even if the subscriber is not consuming it, resulting in backpressure buildup and potentially causing memory leaks.
  • Resource Management:  It can hold system resources such as memory, CPU, and network connections, which can cause resource exhaustion if left unchecked. System resources can be released by canceling the subscription and later be available for other tasks.
  • Performance: By terminating subscription early, the system can avoid unnecessary processing and reduce the response time, resulting in better overall system performance.

4. Maven Dependencies

Let’s take a very simple example of some sensor data coming as a Flux, and we want to cancel the data emission based on the subscription using various options provided by WebFlux.

To get started, we need to add the following key dependencies:

  •  spring-boot-starter-webflux: It bundles all the required dependencies to start building reactive web applications using Spring WebFlux, including the Reactor library for reactive programming, and Netty as the default embedded server.
  • reactor-spring:  It is a module in the Reactor project that provides integration with Spring Framework.
  • reactor-test:  It provides testing support for Reactive Streams.

Now, let’s declare these dependencies in project POM:

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-webflux</artifactId>
    </dependency>
    <dependency>
        <groupId>org.projectreactor</groupId>
        <artifactId>reactor-spring</artifactId>
        <version>${reactor-spring.version}</version>
    </dependency>
    <dependency>
        <groupId>io.projectreactor</groupId>
        <artifactId>reactor-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

5. Canceling Ongoing Flux in WebFlux

In Spring WebFlux, we can perform explicit cancellation using dispose(), or it can occur implicitly when using certain operators that call cancel() on the Subscription object. These operators include:

  • takeUntil()
  • takeWhile()
  • take(long n)
  • take(Duration n)

Looking into a bit more detail, we’ll find that these operators internally call the cancel() method on the Subscription object, passed as an argument to the Subscriber’s OnSubscribe() method. 

Let’s discuss these operators next.

5.1. Cancel Using takeUntil() Operator

Let’s build upon our example of sensor data. We want to continue receiving the data from the data stream until we encounter the value 8, at which point we want to cancel the emission of any more data:

@Test
void givenOngoingFlux_whentakeUntil_thenFluxCancels() {
    Flux<Integer> sensorData = Flux.range(1, 10);
    List<Integer> result = new ArrayList<>();

    sensorData.takeUntil(reading -> reading == 8)
      .subscribe(result::add);
    assertThat(result).containsExactly(1, 2, 3, 4, 5, 6, 7, 8);
}

This code snippet uses the Flux API to create a stream of integers and manipulate it using various operators. Firstly, the sequence of integers ranging from 1 to 10 is created using Flux.range(). Then, the takeUntil() operator is applied, which expects a predicate to specify that the Flux should continue emitting integers until the value reaches 8.

Finally, the subscribe() method is invoked, which causes the emission of values by the Flux until the takeUntil() predicate evaluates to true. Within the subscribe() method, every new integer emitted is added to a List<Integer>, allowing for the capture and manipulation of emitted values.

It’s important to note that the subscribe() method is essential to trigger the emission of values from Flux, and without it, no values would be emitted since Flux would have no subscription. Once the condition specified by the takeUntil() operator is true, the subscription is automatically canceled, and Flux stops emitting values. The test result confirms that the result list only contains Integer values up to 8, demonstrating the cancelation of any further data emission.

5.2. Cancel Using takeWhile() Operator

Next, let’s consider a scenario where we want the subscription to continue emitting data as long as the sensor reading stays less than 8. Here we can leverage the takeWhile() operator, which expects the continuation predicate:

@Test
void givenOngoingFlux_whentakeWhile_thenFluxCancels() {
    List<Integer> result = new ArrayList<>();
    Flux<Integer> sensorData = Flux.range(1, 10)
      .takeWhile(reading -> reading < 8)
      .doOnNext(result::add);

    sensorData.subscribe();
    assertThat(result).containsExactly(1, 2, 3, 4, 5, 6, 7);
}

Essentially, here the takeWhile() operator also expects a predicate. As long as the predicate evaluates to true, the data stream emits data. As soon as the predicate evaluates to false, the subscription is canceled, and no more data is emitted. Note that here, we have used a doOnNext() method when setting up the flux to add each emitted value to the list.

After that, we call the sensorData.subscribe().

5.3. Cancel Using take(long n) Operator

Next, let’s look at the take() operator, which can limit the number of elements we want to take from a potentially infinite sequence of reactive data streams. Let’s take a Flux of Integers ranging from 1 to the maximum value of Integers and then take the first 10 elements:

@Test
void givenOngoingFlux_whentake_thenFluxCancels() {
    Flux<Integer> sensorData = Flux.range(1, Integer.MAX_VALUE);
    List<Integer> result = new ArrayList<>();

    sensorData.take(10)
      .subscribe(result::add);
    Assertions.assertThat(result)
      .containsExactly(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
}

Here, again the subscription is canceled after the first 10 elements, and our result list confirms this.

5.4. Cancel Using take(Duration d) Operator

Another potential scenario where we might want to cancel any further data emission is when a certain time has elapsed, and after that, we aren’t interested in any further emission. In this case, we look at the duration of the Flux and then stop receiving anything outside the duration:

@Test
void givenAnOnGoingFlux_whenTimeout_thenCancelsFlux() {
    Flux<Integer> sensorData = Flux.interval(Duration.ZERO, Duration.ofSeconds(2))
      .map(i -> i.intValue() + 10)
      .take(5);

    Flux<Integer> canceledByTimeout = sensorData.take(Duration.ofSeconds(3));

    StepVerifier.create(canceledByTimeout)
      .expectNext(10, 11)
      .expectComplete()
      .verify();
}

To start with, we create a Flux of integers using the interval() operator, which emits values starting from 0 at an interval of every 2 seconds. We then map each emitted value to an Integer by adding 10. Next, we use the take() operator to limit the number of emitted values to 5. This means that the Flux will only emit the first 5 values and then complete.

We then create a new Flux called canceledBytimeOut by applying the take(Duration) operator with a duration value of 3 seconds. This means that the canceledBytimeout Flux will emit the first 2 values from sensor data and then complete.

Here we are using the StepVerifier. The StepVerifier is a utility provided by the Reactor-Test library that helps to verify the behavior of a Flux or Mono stream by setting up expectations on the expected events and then verifying that the events are emitted in the expected order and with the expected values.

In our case, the expected order and values are 10 and 11, and we are also verifying that the Flux completes using expectComplete() without emitting any additional values.

It’s important to note that the subscribe() method is not explicitly called because it is internally called when we invoke verify(). This means that the events are only emitted when we run the StepVerifier and not when we create the Flux stream.

5.5. Cancel Using dispose() Method

Next, let’s see how we can do explicit cancelation by invoking dispose(), which belongs to Disposable InterfaceSimply put, Disposable is an interface that serves as a one-way mechanism for cancelation. It enables the disposal of a resource or the cancelation of a subscription.

Let’s set up an example where we have a Flux of integers that emit values from 1 to 10 with a delay of 1 second. We’ll subscribe to the flux to print the values on the console. We’ll then cause the thread to sleep for 5ms and then call dispose() :

@Test
void giveAnOnGoingFlux_whenDispose_thenCancelsFluxExplicitly() throws InterruptedException {
    Flux<Integer> flux = Flux.range(1, 10)
      .delayElements(Duration.ofSeconds(1));

    AtomicInteger count = new AtomicInteger(0);
    Disposable disposable = flux.subscribe(i -> {
        System.out.println("Received: " + i);
        count.incrementAndGet();
    }, e -> System.err.println("Error: " + e.getMessage())
    );

    Thread.sleep(5000);
    System.out.println("Will Dispose the Flux Next");
    disposable.dispose();
    if(disposable.isDisposed()) {
        System.out.println("Flux Disposed");
    }
    assertEquals(4, count.get());
}

Here, we cause the thread to sleep for 5 seconds, then invoke dispose(). This causes the cancelation of the subscription.

6. Cleanup After Cancelation

It’s important to understand that canceling an ongoing subscription does not implicitly release any associated resources. However, it is important to do any cleanup and state reset once the flux has been canceled or completed. We can use the provided doOnCancel() and doFinally() methods to achieve this:

To simplify our test, we’ll print the appropriate messages once the flux is canceled. However, in the real-world scenario, this step can do any resource cleanup, like the closing of a connection, for example.

Let’s quickly test that when the Flux is canceled, our desired strings are printed as part of post-cancelation cleanup:

@Test
void givenAFluxIsCanceled_whenDoOnCancelAndDoFinally_thenMessagePrinted() throws InterruptedException {

    List<Integer> result = new ArrayList<>();
    PrintStream mockPrintStream = mock(PrintStream.class);
    System.setOut(mockPrintStream);

    Flux<Integer> sensorData = Flux.interval(Duration.ofMillis(100))
      .doOnCancel(() -> System.out.println("Flux Canceled"))
      .doFinally(signalType -> {
          if (signalType == SignalType.CANCEL) {
              System.out.println("Flux Completed due to Cancelation");
          } else {
              System.out.println("Flux Completed due to Completion or Error");
          }
      })
      .map(i -> ThreadLocalRandom.current().nextInt(1, 1001))
      .doOnNext(result::add);

    Disposable subscription = sensorData.subscribe();

    Thread.sleep(1000);
    subscription.dispose();

    ArgumentCaptor<String> captor = ArgumentCaptor.forClass(String.class);
    Mockito.verify(mockPrintStream, times(2)).println(captor.capture());

    assertThat(captor.getAllValues()).contains("Flux Canceled", "Flux Completed due to Cancelation");
}

The code invokes both the doOnCancel() and doFinally() operators. It’s important to note that the doOnCancel() operator executes only when the Flux sequence is explicitly canceled. On the other hand, the doFinally() operator executes whether it is canceled, completes successfully, or completes with an error.

Furthermore, the doFinally() operator consumes a type of SignalType interface. It represents the possible types of signals, such as OnComplete, OnError, and CANCEL. In this case, the SignalType is CANCEL and hence the “Flux Completed due to Cancellation” message is also captured.

7. Conclusion

In this tutorial, we covered various ways provided by Webflux to cancel an ongoing Flux. We had a quick recap of Flux in the context of Reactive programming. We examined the reasons why a subscription might need to be canceled. Then, we discussed various methods to facilitate the cancellation. In addition, we also looked into post-cancelation cleanup.

As always, the code can be found over on GitHub.

Course – LS – All

Get started with Spring and Spring Boot, through the Learn Spring course:

>> CHECK OUT THE COURSE
res – Junit (guide) (cat=Reactive)
Comments are open for 30 days after publishing a post. For any issues past this date, use the Contact form on the site.