Baeldung Pro – Scala – NPI EA (cat = Baeldung on Scala)
announcement - icon

Learn through the super-clean Baeldung Pro experience:

>> Membership and Baeldung Pro.

No ads, dark-mode and 6 months free of IntelliJ Idea Ultimate to start with.

1. Introduction

In a previous article, we saw what Akka Streams are and how to test them. In this article, we’ll dive into error handling. Typically, when an operator in a stream fails, the entire stream gets torn down. In that case, all the downstream operators get informed about the failure and won’t get run, whereas upstream operators see a cancellation.

There are three main ways to avoid complete stream failure: Recovering from the failure, restarting the stream, or supervising it.

In this tutorial, we’ll briefly examine all three ways to handle errors in Akka Streams.

2. Recovering from Failures

Stream recovery allows us to emit one final element downstream instead of the error to avoid complete stream failure. We can do this with the recover operator.

Let’s see an example:

val parseWithRecover: Flow[String, Either[String, (Int, Int)], NotUsed] =
    Flow[String]
      .map { pair =>
        val parts = pair.split(",")
        Right((parts(0).toInt, parts(1).toInt))
      }
      .recover({ case e: ArrayIndexOutOfBoundsException =>
        Left(e.getMessage)
      })

In the code snippet above, we added recover to the parse Flow we saw in the Introduction to Akka-Streams in Scala article. In particular, we changed the return type of the Flow to be Either[String, (Int, Int)]. This way, we can react to ArrayIndexOutOfBoundsExceptions, returning an error message.

Let’s see a simple test for the new Flow:

"The \"parseWithRecover\" flow" should "parse recover from a parsing error" in {
    val (pub, sub) = TestSource[String]()
      .via(parseWithRecover)
      .toMat(TestSink[Either[String, (Int, Int)]]())(Keep.both)
      .run()

    pub.sendNext("1,1")
    pub.sendNext("145146")
    pub.sendComplete()

    sub.requestNext(Right(1, 1))
    sub.requestNext(Left("Index 1 out of bounds for length 1"))
    sub.expectComplete()
  }

2.1. Adding Retries

The recover operator only lets us emit one element instead of a failed one. recoverWithRetries does the same thing up to a specified number of times. This is useful when dealing with temporary errors, such as network hiccups.

3. Restarting the Stream After a Failure

In Akka Streams, we can restart a single stream operator: Source, Flow, or Sink. We can do that using the RestartSource, RestartFlow, and RestartSink constructs, respectively, to implement an exponential backoff restart strategy. This means that when the given operator fails, Akka will restart it, waiting an increasing amount of time between two subsequent restarts.

This pattern lets us add restart support on a very fine grade, enabling it for single-stream operators. It is also very useful for mitigating the effects of transient failures, such as resources not being available.

Let’s see an example, using the stream elements implemented in the “Introduction to Akka-Streams in Scala” article and adding backoff restarts to the source:

private val backoffSettings = RestartSettings(
  minBackoff = 3 seconds,
  maxBackoff = 30 seconds,
  randomFactor = 0.2
).withMaxRestarts(3, 5.minutes)

RestartSource
  .withBackoff(backoffSettings) { () => source }
  .via(parse)
  .via(compare)
  .runWith(sink)
  .andThen {
    case Failure(exception) => println(exception)
    case Success((correct, total)) =>
      println(s"$correct/$total correct answers")
  }(system.dispatcher)
  .onComplete(_ => system.terminate())(system.dispatcher)

In the example above, we defined a backoff strategy with the RestartSettings class. In particular, we set the initial restart delay to 3 seconds (minBackoff), allowing the delay to grow until 30 seconds (maxBackoff), with a randomFactor of 0.2. The random factor simply randomizes the backoff calculation; the higher the value, the more random the backoff delay.

Lastly, we allowed the stream to restart at most three times or for five minutes.

We then add backoff capabilities to source by wrapping it inside RestartSource#withBackoff, which inputs the backoff settings and a lazily-evaluated source. If we run the example above, we get the following output:

[baeldung-akka.actor.default-dispatcher-6] INFO akka.stream.scaladsl.RestartWithBackoffSource - Restarting stream due to completion [1]
[baeldung-akka.actor.default-dispatcher-7] INFO akka.stream.scaladsl.RestartWithBackoffSource - Restarting stream due to completion [2]
[baeldung-akka.actor.default-dispatcher-6] INFO akka.stream.scaladsl.RestartWithBackoffSource - Restarting stream due to completion [3]
16/24 correct answers

The output shows the stream restarted three times, as expected. This is because RestartSource#withBackoff restarts the stream on success and failure.

If we wanted to restart it only on failure, we’d have to use RestartSource#onFailuresWithBackoff.

A RestartSource will not complete or fail until maxRestarts is reached, as it handles completion and failure by restarting the underlying Source. For RestartFlow and RestartSink, the restart might cause some messages to get lost.

4. Supervising a Stream

Supervision is a more advanced error resolution strategy, inspired by the error handling of Akka actors.

However, it does not make sense in all scenarios. For instance, if an operator interacts with an external service over the network, a back-off strategy is more suited if the connection drops. Therefore, not all operators support stream supervision out of the box and Akka Stream does not supervise streams automatically.

On the contrary, streams handling user data might benefit from supervision, as it gives us more control over what to do in case of failure.

We can choose one of three supervision strategies:

  • Stop: Where the stream is completed with failure. This is the default
  • Resume: Where the element causing the failure is dropped without terminating the stream
  • Restart: Where the element causing the failure is dropped and the stream continues after restarting the operator, effectively creating a new operator instance. In this case, all the internal state accumulated in the stream gets lost, and the stream will not replay the elements that flowed in the past

Defining a supervision strategy requires accessing the stream’s underlying runnable graph. Let’s see an example:

val runnableGraph = TestSource[String]()
  .via(parseWithRecover)
  .toMat(TestSink[Either[String, (Int, Int)]]())(Keep.both)

val decider: Supervision.Decider = {
  case _: ArrayIndexOutOfBoundsException => Supervision.Resume
  case _                                 => Supervision.Stop
}

val graphWithResumeSupervision =
      runnableGraph.withAttributes(ActorAttributes.supervisionStrategy(decider))

val (pub, sub) = graphWithResumeSupervision.run()

pub.sendNext("1,1")
pub.sendNext("145146")
pub.sendNext("1,2")
pub.sendComplete()

sub.requestNext(Right(1, 1))
sub.requestNext(Right(1, 2))
sub.expectComplete()

In the example above, we first defined a supervision strategy using the Supervision.Decider class. In particular, we chose to resume the stream only in the case of ArrayIndexOutOfBoundsExceptions, stopping it for all the other errors. Secondly, we decorated the stream graph with the withAttributes method, and then we ran it.

In particular, we used the test kit to send three elements to the stream, with the second crafted to throw an ArrayIndexOutOfBoundsException. Nonetheless, the stream only emitted two elements before completion, proving the second was indeed dropped.

5. Logging Stream Elements

Logging is not a form of error handling per se, but it is certainly a good practice to signal that something went wrong during the execution of a stream. We can use the log operator in Akka Streams to that end:

val parseWithLogging: Flow[String, (Int, Int), NotUsed] =
    Flow[String]
      .map { pair =>
        val parts = pair.split(",")
        (parts(0).toInt, parts(1).toInt)
      }
      .log(name = "Baeldung stream")
      .addAttributes(
        Attributes.logLevels(
          onElement = Attributes.LogLevels.Info,
        )
      )

To have Akka Stream log something, we add a logging backend to our project. For example, adding slf4j-simple to the build.sbt file:

libraryDependencies ++= Seq(
  "org.slf4j" % "slf4j-api" % "2.0.16",
  "org.slf4j" % "slf4j-simple" % "2.0.16"
)

By default, the log operator logs the element of a stream at the debug level and any exceptions at the error level. However, logging levels are configurable via Attributes#logLevels(). If we run the example in the “Introduction to Akka-Streams in Scala” article, using parseWithLogging instead of parse, we’ll get the following output:

[baeldung-akka.actor.default-dispatcher-6] INFO akka.event.slf4j.Slf4jLogger - Slf4jLogger started
[baeldung-akka.actor.default-dispatcher-7] INFO akka.stream.Materializer - [Baeldung stream] Element: (5,10)
[baeldung-akka.actor.default-dispatcher-7] INFO akka.stream.Materializer - [Baeldung stream] Element: (15,15)
[baeldung-akka.actor.default-dispatcher-7] INFO akka.stream.Materializer - [Baeldung stream] Element: (78,79)
[baeldung-akka.actor.default-dispatcher-7] INFO akka.stream.Materializer - [Baeldung stream] Element: (12,12)
[baeldung-akka.actor.default-dispatcher-7] INFO akka.stream.Materializer - [Baeldung stream] Element: (0,0)
[baeldung-akka.actor.default-dispatcher-7] INFO akka.stream.Materializer - [Baeldung stream] Element: (456,456)

Where each element is logged, along with the name of the stream.

Stream logging is a broad topic. For the scope of this tutorial, it is enough to know that Akka Streams lets us decide what to log and to what level.

6. Conclusion

In this article, we saw many ways to deal with errors in Akka Streams. First, we analyzed recoveries, allowing us to define a strategy to replace faulty elements. Secondly, we moved to restart the components of a stream, a strategy suitable to cope with temporary errors. Thirdly, we uncovered the power of supervision to define fine-grained error handling. Lastly, we took a quick look at logging individual stream elements or errors as they happen.

The code backing this article is available on GitHub. Once you're logged in as a Baeldung Pro Member, start learning and coding on the project.