Expand Authors Top

If you have a few years of experience in the Java ecosystem and you’d like to share that with the community, have a look at our Contribution Guidelines.

November Discount Launch 2022 – Top
We’re finally running a Black Friday launch. All Courses are 30% off until next Friday:

>> GET ACCESS NOW

Expanded Audience – Frontegg – Security (partner)
announcement - icon User management is very complex, when implemented properly. No surprise here.

Not having to roll all of that out manually, but instead integrating a mature, fully-fledged solution - yeah, that makes a lot of sense.
That's basically what Frontegg is - User Management for your application. It's focused on making your app scalable, secure and enjoyable for your users.
From signup to authentication, it supports simple scenarios all the way to complex and custom application logic.

Have a look:

>> Elegant User Management, Tailor-made for B2B SaaS

NPI – Lightrun – Spring (partner)

We rely on other people’s code in our own work. Every day. It might be the language you’re writing in, the framework you’re building on, or some esoteric piece of software that does one thing so well you never found the need to implement it yourself.

The problem is, of course, when things fall apart in production - debugging the implementation of a 3rd party library you have no intimate knowledge of is, to say the least, tricky. It’s difficult to understand what talks to what and, specifically, which part of the underlying library is at fault.

Lightrun is a new kind of debugger.

It's one geared specifically towards real-life production environments. Using Lightrun, you can drill down into running applications, including 3rd party dependencies, with real-time logs, snapshots, and metrics. No hotfixes, redeployments, or restarts required.

Learn more in this quick, 5-minute Lightrun tutorial:

>> The Essential List of Spring Boot Annotations and Their Use Cases

1. Overview

Debugging reactive streams is probably one of the main challenges we'll have to face once we start using these data structures.

And having in mind that Reactive Streams have been gaining popularity over the last years, it's a good idea to know how we can carry out this task efficiently.

Let's start by setting up a project using a reactive stack to see why this is often troublesome.

2. Scenario with Bugs

We want to simulate a real-case scenario, where several asynchronous processes are running, and where we've introduced some defects in the code that will eventually trigger exceptions.

To understand the big picture, we'll mention that our application will be consuming and processing streams of simple Foo objects which contain only an id, a formattedName, and a quantity field.

2.1. Analyzing the Log Output

Now, let's examine a snippet and the output it generates when an unhandled error shows up:

public void processFoo(Flux<Foo> flux) {
    flux.map(FooNameHelper::concatFooName)
      .map(FooNameHelper::substringFooName)
      .map(FooReporter::reportResult)
      .subscribe();
}

public void processFooInAnotherScenario(Flux<Foo> flux) {
    flux.map(FooNameHelper::substringFooName)
      .map(FooQuantityHelper::divideFooQuantity)
      .subscribe();
}

After running our application for a few seconds, we'll realize that it's logging exceptions from time to time.

Having a close look at one of the errors, we'll find something similar to this:

Caused by: java.lang.StringIndexOutOfBoundsException: String index out of range: 15
    at j.l.String.substring(String.java:1963)
    at com.baeldung.debugging.consumer.service.FooNameHelper
      .lambda$1(FooNameHelper.java:38)
    at r.c.p.FluxMap$MapSubscriber.onNext(FluxMap.java:100)
    at r.c.p.FluxMap$MapSubscriber.onNext(FluxMap.java:114)
    at r.c.p.FluxConcatMap$ConcatMapImmediate.innerNext(FluxConcatMap.java:275)
    at r.c.p.FluxConcatMap$ConcatMapInner.onNext(FluxConcatMap.java:849)
    at r.c.p.Operators$MonoSubscriber.complete(Operators.java:1476)
    at r.c.p.MonoDelayUntil$DelayUntilCoordinator.signal(MonoDelayUntil.java:211)
    at r.c.p.MonoDelayUntil$DelayUntilTrigger.onComplete(MonoDelayUntil.java:290)
    at r.c.p.MonoDelay$MonoDelayRunnable.run(MonoDelay.java:118)
    at r.c.s.SchedulerTask.call(SchedulerTask.java:50)
    at r.c.s.SchedulerTask.call(SchedulerTask.java:27)
    at j.u.c.FutureTask.run(FutureTask.java:266)
    at j.u.c.ScheduledThreadPoolExecutor$ScheduledFutureTask
      .access$201(ScheduledThreadPoolExecutor.java:180)
    at j.u.c.ScheduledThreadPoolExecutor$ScheduledFutureTask
      .run(ScheduledThreadPoolExecutor.java:293)
    at j.u.c.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at j.u.c.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at j.l.Thread.run(Thread.java:748)

Based on the root cause, and noticing the FooNameHelper class mentioned in the stack trace, we can imagine that on some occasions, our Foo objects are being processed with a formattedName value that is shorter than expected.

Of course, this is just a simplified case, and the solution seems rather obvious.

But let's imagine this was a real-case scenario where the exception itself doesn't help us solve the issue without some context information.

Was the exception triggered as a part of the processFoo, or of the processFooInAnotherScenario method?

Did other previous steps affect the formattedName field before arriving at this stage?

The log entry wouldn't help us figure out these questions.

To make things worse, sometimes the exception isn't even thrown from within our functionality.

For example, imagine we rely on a reactive repository to persist our Foo objects. If an error rises at that point, we might not even have a clue on where to get started to debug our code.

We need tools to debug reactive streams efficiently.

3. Using a Debug Session

One option to figure out what's going on with our application is to start a debugging session using our favorite IDE.

We'll have to set up a couple of conditional breakpoints and analyze the flow of data when each step in the stream gets executed.

Indeed, this might be a cumbersome task, especially when we've got a lot of reactive processes running and sharing resources.

Additionally, there are many circumstances where we can't start a debugging session for security reasons.

4. Logging Information With the doOnErrorMethod or Using the Subscribe Parameter

Sometimes, we can add useful context information, by providing a Consumer as a second parameter of the subscribe method:

public void processFoo(Flux<Foo> flux) {

    // ...

    flux.subscribe(foo -> {
        logger.debug("Finished processing Foo with Id {}", foo.getId());
    }, error -> {
        logger.error(
          "The following error happened on processFoo method!",
           error);
    });
}

Note: It's worth mentioning that if we don't need to carry out further processing on the subscribe method, we can chain the doOnError function on our publisher:

flux.doOnError(error -> {
    logger.error("The following error happened on processFoo method!", error);
}).subscribe();

Now we'll have some guidance on where the error might be coming from, even though we still don't have much information about the actual element that generated the exception.

5. Activating Reactor's Global Debug Configuration

The Reactor library provides a Hooks class that lets us configure the behavior of Flux and Mono operators.

By just adding the following statement, our application will instrument the calls to the publishers' methods, wrap the construction of the operator, and capture a stack trace:

Hooks.onOperatorDebug();

After the debug mode gets activated, our exception logs will include some helpful information:

16:06:35.334 [parallel-1] ERROR c.b.d.consumer.service.FooService
  - The following error happened on processFoo method!
java.lang.StringIndexOutOfBoundsException: String index out of range: 15
    at j.l.String.substring(String.java:1963)
    at c.d.b.c.s.FooNameHelper.lambda$1(FooNameHelper.java:38)
    ...
    at j.l.Thread.run(Thread.java:748)
    Suppressed: r.c.p.FluxOnAssembly$OnAssemblyException: 
Assembly trace from producer [reactor.core.publisher.FluxMapFuseable] :
    reactor.core.publisher.Flux.map(Flux.java:5653)
    c.d.b.c.s.FooNameHelper.substringFooName(FooNameHelper.java:32)
    c.d.b.c.s.FooService.processFoo(FooService.java:24)
    c.d.b.c.c.ChronJobs.consumeInfiniteFlux(ChronJobs.java:46)
    o.s.s.s.ScheduledMethodRunnable.run(ScheduledMethodRunnable.java:84)
    o.s.s.s.DelegatingErrorHandlingRunnable
      .run(DelegatingErrorHandlingRunnable.java:54)
    o.u.c.Executors$RunnableAdapter.call(Executors.java:511)
    o.u.c.FutureTask.runAndReset(FutureTask.java:308)
Error has been observed by the following operator(s):
    |_    Flux.map ⇢ c.d.b.c.s.FooNameHelper
            .substringFooName(FooNameHelper.java:32)
    |_    Flux.map ⇢ c.d.b.c.s.FooReporter.reportResult(FooReporter.java:15)

As we can see, the first section remains relatively the same, but the following sections provide information about:

  1. The assembly trace of the publisher — here we can confirm that the error was first generated in the processFoo method.
  2. The operators that observed the error after it was first triggered, with the user class where they were chained.

Note: In this example, mainly to see this clearly, we're adding the operations on different classes.

We can toggle the debug mode on or off at any time, but it won't affect Flux and Mono objects that have already been instantiated.

5.1. Executing Operators on Different Threads

One other aspect to keep in mind is that the assembly trace is generated properly even if there are different threads operating on the stream.

Let's have a look at the following example:

public void processFoo(Flux<Foo> flux) {
    flux.publishOn(Schedulers.newSingle("foo-thread"))
       // ...
      .publishOn(Schedulers.newSingle("bar-thread"))
      .map(FooReporter::reportResult)
      .subscribeOn(Schedulers.newSingle("starter-thread"))
      .subscribe();
}

Now if we check the logs we'll appreciate that in this case, the first section might change a little bit, but the last two remain fairly the same.

The first part is the thread stack trace, therefore it'll show only the operations carried out by a particular thread.

As we've seen, that's not the most important section when we're debugging the application, so this change is acceptable.

6. Activating the Debug Output on a Single Process

Instrumenting and generating a stack trace in every single reactive process is costly.

Thus, we should implement the former approach only in critical cases.

Anyhow, Reactor provides a way to enable the debug mode on single crucial processes, which is less memory-consuming.

We're referring to the checkpoint operator:

public void processFoo(Flux<Foo> flux) {
    
    // ...

    flux.checkpoint("Observed error on processFoo", true)
      .subscribe();
}

Note that in this manner, the assembly trace will be logged at the checkpoint stage:

Caused by: java.lang.StringIndexOutOfBoundsException: String index out of range: 15
	...
Assembly trace from producer [reactor.core.publisher.FluxMap],
  described as [Observed error on processFoo] :
    r.c.p.Flux.checkpoint(Flux.java:3096)
    c.b.d.c.s.FooService.processFoo(FooService.java:26)
    c.b.d.c.c.ChronJobs.consumeInfiniteFlux(ChronJobs.java:46)
    o.s.s.s.ScheduledMethodRunnable.run(ScheduledMethodRunnable.java:84)
    o.s.s.s.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
    j.u.c.Executors$RunnableAdapter.call(Executors.java:511)
    j.u.c.FutureTask.runAndReset(FutureTask.java:308)
Error has been observed by the following operator(s):
    |_    Flux.checkpoint ⇢ c.b.d.c.s.FooService.processFoo(FooService.java:26)

We should implement the checkpoint method towards the end of the reactive chain.

Otherwise, the operator won't be able to observe errors occurring downstream.

Also, let's note that the library offers an overloaded method. We can avoid:

  • specifying a description for the observed error if we use the no-args option
  • generating a filled stack trace (which is the most costly operation), by providing just the custom description

7. Logging a Sequence of Elements

Finally, Reactor publishers offer one more method that could potentially come in handy in some cases.

By calling the log method in our reactive chain, the application will log each element in the flow with the state that it has at that stage.

Let's try it out in our example:

public void processFoo(Flux<Foo> flux) {
    flux.map(FooNameHelper::concatFooName)
      .map(FooNameHelper::substringFooName)
      .log();
      .map(FooReporter::reportResult)
      .doOnError(error -> {
        logger.error("The following error happened on processFoo method!", error);
      })
      .subscribe();
}

And check the logs:

INFO  reactor.Flux.OnAssembly.1 - onSubscribe(FluxMap.MapSubscriber)
INFO  reactor.Flux.OnAssembly.1 - request(unbounded)
INFO  reactor.Flux.OnAssembly.1 - onNext(Foo(id=0, formattedName=theFo, quantity=8))
INFO  reactor.Flux.OnAssembly.1 - onNext(Foo(id=1, formattedName=theFo, quantity=3))
INFO  reactor.Flux.OnAssembly.1 - onNext(Foo(id=2, formattedName=theFo, quantity=5))
INFO  reactor.Flux.OnAssembly.1 - onNext(Foo(id=3, formattedName=theFo, quantity=6))
INFO  reactor.Flux.OnAssembly.1 - onNext(Foo(id=4, formattedName=theFo, quantity=6))
INFO  reactor.Flux.OnAssembly.1 - cancel()
ERROR c.b.d.consumer.service.FooService 
  - The following error happened on processFoo method!
...

We can easily see the state of each Foo object at this stage, and how the framework cancels the flow when an exception happens.

Of course, this approach is also costly, and we'll have to use it in moderation.

8. Conclusion

We can consume a lot of our time and effort troubleshooting problems if we don't know the tools and mechanisms to debug our application properly.

This is especially true if we're not used to handling reactive and asynchronous data structures, and we need extra help to figure out how things work.

As always, the full example is available over on the GitHub repo.

November Discount Launch 2022 – Bottom
We’re finally running a Black Friday launch. All Courses are 30% off until next Friday:

>> GET ACCESS NOW

Generic footer banner
Comments are closed on this article!