Generic Top

I just announced the new Spring Boot 2 material, coming in REST With Spring:

>> CHECK OUT THE COURSE

1. Overview

With the introduction of Spring WebFlux, we got another powerful tool to write reactive, non-blocking applications. While using this technology is now way easier than before, debugging reactive sequences in Spring WebFlux can be quite cumbersome.

In this quick tutorial, we’ll see how to easily log events in asynchronous sequences and how to avoid some simple mistakes.

2. Maven Dependency

Let’s add the Spring WebFlux dependency to our project so we can create reactive streams:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

We can get the latest spring-boot-starter-webflux dependency from Maven Central.

3. Creating a Reactive Stream

To begin let’s create a reactive stream using Flux and use the log() method to enable logging:

Flux<Integer> reactiveStream = Flux.range(1, 5).log();

Next, we will subscribe to it to consume generated values:

reactiveStream.subscribe();

4. Logging Reactive Stream

After running the above application we see our logger in action:

2018-11-11 22:37:04 INFO | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
2018-11-11 22:37:04 INFO | request(unbounded)
2018-11-11 22:37:04 INFO | onNext(1)
2018-11-11 22:37:04 INFO | onNext(2)
2018-11-11 22:37:04 INFO | onNext(3)
2018-11-11 22:37:04 INFO | onNext(4)
2018-11-11 22:37:04 INFO | onNext(5)
2018-11-11 22:37:04 INFO | onComplete()

We see every event that occurred on our stream. Five values were emitted and then stream closed with an onComplete() event.

5. Advanced Logging Scenario

We can modify our application to see a more interesting scenario. Let’s add take() to Flux which will instruct the stream to provide only a specific number of events:

Flux<Integer> reactiveStream = Flux.range(1, 5).log().take(3);

After executing the code, we’ll see the following output:

2018-11-11 22:45:35 INFO | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
2018-11-11 22:45:35 INFO | request(unbounded)
2018-11-11 22:45:35 INFO | onNext(1)
2018-11-11 22:45:35 INFO | onNext(2)
2018-11-11 22:45:35 INFO | onNext(3)
2018-11-11 22:45:35 INFO | cancel()

As we can see, take() caused the stream to cancel after emitting three events.

The placement of log() in your stream is crucial. Let’s see how placing log() after take() will produce different output:

Flux<Integer> reactiveStream = Flux.range(1, 5).take(3).log();

And the output:

2018-11-11 22:49:23 INFO | onSubscribe([Fuseable] FluxTake.TakeFuseableSubscriber)
2018-11-11 22:49:23 INFO | request(unbounded)
2018-11-11 22:49:23 INFO | onNext(1)
2018-11-11 22:49:23 INFO | onNext(2)
2018-11-11 22:49:23 INFO | onNext(3)
2018-11-11 22:49:23 INFO | onComplete()

As we can see changing the point of observation changed the output. Now the stream produced three events, but instead of cancel(), we see onComplete(). This is because we observe the output of using take() instead of what was requested by this method.

6. Conclusion

In this quick article, we saw how to log reactive streams using built-in log() method.

And as always, the source code for the above example can be found over on GitHub.

Generic bottom

I just announced the new Spring Boot 2 material, coming in REST With Spring:

>> CHECK OUT THE LESSONS

Leave a Reply

avatar
  Subscribe  
Notify of