1. Overview

The reactive manifesto, the guideline for all reactive systems, states that a distributed application should be resilient. To achieve resiliency, we must develop systems that treat failure as a first-class citizen in the architecture.

The Akka library, through the mechanism of supervision, provides the primitives to implement resilient systems. In this tutorial, we’re going to show how to use Akka and supervision to improve the resiliency of distributed systems.

2. Akka Dependencies

As usual, to use the Akka Typed library, we need to import akka-actor-typed, and we’ll need akka-actor-testkit-typed for testing:

libraryDependencies += "com.typesafe.akka" %% "akka-actor-typed" % "2.8.0",
libraryDependencies += "com.typesafe.akka" %% "akka-actor-testkit-typed" % "2.8.0" % Test

3. Scenario

First of all, to talk about supervision in Akka, we need a working example. Hence, we’re going to implement a simple service that mimics a web server serving files:

trait Resource
case class File(id: String, content: Array[Byte], mimeType: String) extends Resource

The web server handles only Get requests, and the responses it gives always contain the original path:

case class Get(path: String, replyTo: ActorRef[Response]) extends Request

case class Ok(path: String, resource: Resource) extends Response
case class NotFound(path: String) extends Response
case class BadRequest(path: String) extends Response
case class InternalServerError(path: String, error: String) extends Response

So, how does our web server serve the arriving requests?

It merely demands the Filesystem to search for the resources, and then it returns the file to the caller:

object WebServer {
  def apply(filesystem: ActorRef[Find]): Behavior[Request] =
    Behaviors.receive { (context, message) =>
      message match {
        case Get(path, replyTo) =>
          findInTheFilesystem(filesystem, context, replyTo, path)
        // More to come ;)
      }
      Behaviors.same
    }

For now, we’re leaving out many details. However, we’ll fill in the blanks during the rest of the article.

4. To Handle or Not to Handle

As we noted above, in distributed systems, we have to develop components with failure in mind. But, what kinds of errors do we need to handle in our code? Basically, there are two kinds: validation errors and failures.

Validation errors represent common errors, such as errors on inputs or anything our business logic can manage.

For example, suppose our web server expects a valid URI as a path, and the incoming message contains something that is not a URI. In this case, the webserver responds to the caller with a BadRequest message:

Behaviors.receive { (context, message) =>
  message match {
    case Get(path, replyTo) =>
      if (isNotValid(path)) {
        replyTo ! BadRequest(path)
      }

Hence, in the actors’ domain, validation errors should be part of the protocol used to interact with an actor.

On the other hand, a failure is some error condition that we cannot expect, such as a connection to an external resource that is no longer available, a filesystem that becomes full, and so on. Clearly, we cannot mix our business logic with the handling of all the possible unexpected failures. The resulting code will quickly become difficult to maintain.

So, the only possible way to handle failures is to delegate the responsibility of handling them to some external source, which we call a supervisor.

5. Supervision: Expecting the Unexpected

It’s time to improve our example. The web server serves incoming requests by searching for resources in a filesystem. We can model the filesystem as a dedicated actor together with its protocol:

object Filesystem {
  def apply(): Behavior[FsFind] = search

  private def search: Behavior[FsFind] = {
    Behaviors.receive { (context, message) =>
      context.log.info(s"Received a request for path ${message.path}")
      message.replyTo !
      if (Random.nextBoolean)
        FsFound(File("id", "{'result': 'ok'}".getBytes(), "application/json"))
      else
        FsMiss
      Behaviors.same
    }
  }
}

As we said, it’s not best practice to interleave the business logic with the code that handles unexpected failures. Hence, Akka implements supervision using decoration around an actor’s behavior:

object Filesystem {
  def apply(): Behavior[Find] = {
    Behaviors
      .supervise[Find](search)
      .onFailure[Exception](SupervisorStrategy.restart)
  }
 
  // ...

As we can see, the supervision defines both the exception and the strategy to use in case of failure.

In our example, the supervision catches every subtype of the Exception class, restarting the actor. It’s possible to specify different supervision strategies associated with different types of exceptions. To do so, just nest multiple calls to the supervise method:

Behaviors
  .supervise[Find](
    Behaviors
      .supervise(search)
      .onFailure[IOException(SupervisorStrategy.resume))
  .onFailure[Exception](SupervisorStrategy.restart)

Depending on the type of failure it encounters, a supervisor has three choices:

  • Resuming the failed actor
  • Restarting the actor
  • Stopping the actor

Let’s see how Akka addresses each of the above situations.

5.1. Trying to Save the World: Resuming the Actor

The luckiest case after a failure is when the error did not corrupt the state of the actor. Hence, the actor can continue its job, processing the next message. For example, let’s implement a cache: Instead of always having to read information from the filesystem, we need to model the cache state. For the sake of simplicity, we’ll use a Map[String, Resource]:

def cache(filesystem: ActorRef[FsFind],
          cacheMap: Map[String, Resource]): Behavior[Request] =
  Behaviors.receive { (ctx, message) =>
    message match {
      case Find(path, replyTo) =>
        val maybeAnHit = cacheMap.get(path)
        maybeAnHit match {
          case Some(r) => replyTo ! Hit(r)
          case None => askFilesystemForResource(filesystem, ctx, path, replyTo)
        }
        Behaviors.same
      case AdaptedFsFound(path, res, replyTo) =>
        replyTo ! Hit(res)
        cache(filesystem, cacheMap + (path -> res))
      case AdaptedFsMiss(_, replyTo) =>
        replyTo ! Miss
        Behaviors.same
    }
  }

We’ll use the supervision strategy SupervisionStrategy.resume to allow the actor to continue reading its mailbox after a failure:

object Cache {
  def apply(filesystem: ActorRef[FsFind]):
    Behaviors
      .supervise(cache(filesystem, Map[String, Resource]()))
      .onFailure(SupervisorStrategy.resume)
 
  // ...
}

In the above example, we use the functional style for defining the actor. We emulate a mutable state by passing information through the function that defines the behavior. Fortunately, it is sufficient to define the supervision on the first call of the method defining the actor’s behavior.

The Akka library will decorate each call of the cache method for us with the supervise function.

5.2. Just Restart the Actor

If the failure has corrupted the actor’s state, there’s nothing left to do but restart it, hoping that the failure state is due to a transitional condition.

So, we can use the SupervisorStrategy.restart to specify that the actor should be restarted following a failure:

Behaviors.supervise[Find](search).onFailure[Exception](SupervisorStrategy.restart)

We can specify a more adequate graded restarting policy, giving the maximum number of times the supervision can restart an actor in a given time interval:

SupervisorStrategy.restart.withLimit(maxNrOfRetries = 10, withinTimeRange = 5.minutes)

After a restart, all the children of an actor are stopped. Clearly, this behavior prevents resource leaks of creating new child actors every time the parent is restarted. However, it is possible to override this default behavior.

First of all, it is necessary to move the creation of child actors outside of the supervision, which must decorate only the code responsible for handling the reception of new messages. Last but not least, we invoke the withStopChildren method, telling Akka not to stop the child actors and instead to manage any dangling references among actors:

def apply(): Behavior[Request] = {
  Behaviors.setup { context =>
    val filesystem = context.spawn(Filesystem(), "filesystem")
    val cache = context.spawn(Cache(filesystem), "cache")
    Behaviors.supervise {
      serve(context, cache)
    }.onFailure[Exception](SupervisorStrategy.restart.withStopChildren(false))
  }
}

In contrast, if a child actor is restarted, Akka handles replacing the reference of the child actor with the parent.

As we said, failures are unpredictable events. When a loss happens, the actor likely has some resources open other than the one causing the error. Akka provides a mechanism to eventually clean up and close all of these pending resources.

Indeed, every actor ready to be restarted emits a PreRestart signal that we can listen to.

receiveSignal is a method of the Behavior type, and we can call it right after the definition of an actor’s behavior. In our example, just before the FileSystem actor is going to be restarted, the resources it owns must be released:

Behaviors.receive[FsFind] { (context, message) =>
  context.log.info(s"Received a request for path ${message.path}")
  message.replyTo !
  if (Random.nextBoolean)
    FsFound(File("id", "{'result': 'ok'}".getBytes(), "application/json"))
  else
    FsMiss
  Behaviors.same
}.receiveSignal {
  case (ctx, signal) if signal == PreRestart =>
    ctx.log.info("Releasing any dangling resource")
    Behaviors.same
}

5.3. There Is Nothing Left to Say: Stop the Actor

Stopping the actor is the default supervision strategy. The state of the application could be so compromised that resuming or restarting the actor is no longer worth it — when the conditions required for the actor to execute are no longer met. For example, think about a scheduled action or something similar.

When the actor system stops an actor, it sends the stopping actor a PostStop signal to clean up any handled resource.

Indeed, it’s quite common to handle the PostStop signal together with the PreRestart signal we saw in the previous example:

Behaviors.receive[Find] { (context, message) =>
  // ...
}.receiveSignal {
  case (ctx, signal) if signal == PreRestart || signal ==  PostStop =>
    ctx.log.info("Releasing any dangling resource")
    Behaviors.same
}

As we said before, Akka arranges actors in a hierarchy. What happens to the parent if a child actor stops? The default behavior for the parent actor is to fail itself, bubbling the failure up the hierarchy. However, what if a parent wants to remain alive in case one of its child actors stops? First of all, the parent actor must watch its child:

val webServer = context.spawn(WebServer(), message.id)
context.watch(webServer)

Watching an actor means to receive a ChildFailed signal when it stops. ChildFailed extends the more general signal Terminated:

object Main {
  case class Start(id: String)
  def apply: Behavior[Start] = {
    Behaviors.receive[Start] { (context, message) =>
      val webServer = context.spawn(WebServer(), message.id)
      context.watch(webServer)
      Behaviors.same
    }.receiveSignal {
      case (ctx, ChildFailed(ref, cause)) =>
        ctx.log.error(s"Child actor ${ref.path} failed with error ${cause.getMessage}")
        Behaviors.same
    }
  }
}

However, if the parent actor decides to not handle the ChildFailed signal, it will terminate itself, raising a DeathPactException, thereby bubbling up the failure.

6. What Happens to the Message?

Until now, we’ve discussed what happens to an actor in case of an exception. But, what happens to the message that generates the failure? The message will be lost, and the actor system discards it. Under no circumstances will the message be put back into the mailbox.

Conversely, nothing happens to the mailbox. If the supervision strategy defines that an actor must be resumed or restarted, it will continue to process the same mailbox containing all the remaining messages.

7. Conclusion

In this article, we started with the difference between a validation error and a failure. We looked at how the Akka library treats both of them, using supervision and the available supervision strategies.

As always, the code is available over on GitHub.

2 Comments
Oldest
Newest
Inline Feedbacks
View all comments
Comments are open for 30 days after publishing a post. For any issues past this date, use the Contact form on the site.