1. Introduction

The Akka Streams library allows us to compose flows, aka data transforming functions, connecting them to sources and consumers (the so-called sinks).

In this tutorial, we’ll take a look at how to test streams. In particular, we’ll see how to write unit tests for Akka Flows, Sources, and Sinks using Akka Stream Testkit.

2. Importing the Library

As a first thing, let’s include the Akka Streams libraries in our build:

val AkkaVersion = "2.8.5"

libraryDependencies ++= Seq(
  "com.typesafe.akka" %% "akka-stream" % AkkaVersion,
  "com.typesafe.akka" %% "akka-stream-testkit" % AkkaVersion % Test
)

On the one hand, akka-stream, imports the Akka Stream API. On the other hand, akka-stream-testkit, imports the testing components. In this article, we’ll see how to use both.

3. The Running Example

Our example will be a stream producing comma-separated pairs of integral numbers. In our case, the first component of the pair represents the result of a question in a quiz. The second one, instead, indicates the expected result. We’ll write a Source producing these pairs, some Flows parsing and comparing them, and a Sink counting how many correct answers the user gave. Let’s take a look at the code:

package object stream {
  val source: Source[String, NotUsed] = Source(
    Seq("5,10", "15,15", "78,79", "12,12", "0,0", "456,456")
  )

  val parse: Flow[String, (Int, Int), NotUsed] =
    Flow[String]
      .map { pair =>
        val parts = pair.split(",")
        (parts(0).toInt, parts(1).toInt)
      }

  val compare: Flow[(Int, Int), Boolean, NotUsed] =
    Flow[(Int, Int)]
      .map { case (userAnswer, correctAnswer) => userAnswer == correctAnswer }

  val sink: Sink[Boolean, Future[(Int, Int)]] = Sink.fold((0, 0)) {
    case ((correctCount, total), wasCorrect) =>
      if (wasCorrect) (correctCount + 1, total + 1)
      else (correctCount, total + 1)
  }
}

In the example above, we make use of the NotUsed Akka type, for example in Flow[String, (Int, Int), NotUsed]. This means that the flow inputs Strings and emits values of type (Int, Int), producing, when it’s run, a value of type NotUsed, which is somewhat similar to Unit. This is what Akka calls “materialized value” and represents a value produced by the stream when the latter is run.

3.1. Putting It All Together

Even if we’re focusing on testing stream components (sources, sinks, or flows) in isolation, it’s useful to get to know how we can compose them. Let’s see a simple application:

object Main extends App {
  implicit val system: ActorSystem = ActorSystem("baeldung")

  source
    .via(parse)
    .via(compare)
    .runWith(sink)
    .andThen {
      case Failure(exception) => println(exception)
      case Success((correct, total)) =>
        println(s"$correct/$total correct answers")
    }(system.dispatcher)
    .onComplete(_ => system.terminate())(system.dispatcher)
}

The first thing our application does is create an implicit ActorSystem. This is necessary to invoke the Flow::runWith() method.

Then, the rest of the code shows how to run the stream. In particular, we either tell the user how many correct answers they provided or show an error message.

Lastly, we explicitly terminate the ActorSystem using Future::onComplete(). If we didn’t do that, the application would never terminate.

If we run the example above, it prints 4/6 correct answers.

4. The Akka Stream Testkit

Akka Streams comes with a standalone module named akka-stream-testkit, which provides tools designed for testing streams. There are two main components in this module, TestSource and TestSink. Furthermore, several factory methods let us instantiate test Sources and Sinks.

In this section, we’re going to see how to write unit tests for Sources, Sinks, and Flows. In all the subsequent code snippets, we’ll assume the definition of an implicit ActorSystem, used to run the stream components under test:

implicit val system: ActorSystem = ActorSystem("baeldung")

The test snippets in this section are not meant to be exhaustive and will take into account only one error flow. When testing real code, we should test the behavior of our streams in the presence of errors as well.

4.1. Testing a Source

Let’s start by verifying that our source correctly emits the expected elements:

"The \"source\" source" should "emit pairs of integers" in {
    source
    .runWith(TestSink[String]())
    .request(6)
    .expectNext("5,10", "15,15", "78,79", "12,12", "0,0", "456,456")
    .expectComplete()
}

Let’s break down the snippet above. First, we run our source with a TestSink[String]. TestSink gives us a way to have manual control over the source under test. We can use it to request a given number of elements and set up expectations for stream completion or errors.

In the snippet above, we requested six elements (as source emits that many String values), verifying they were as expected. Lastly, we asserted that the stream was completed after emitting those six values.

4.2. Testing a Sink

Testing a Sink is very similar to testing a Source. Let’s see an example:

"The \"sink\" sink" should "count the number of trues" in {
  val (probe, result) = TestSource[Boolean]().toMat(sink)(Keep.both).run()

  probe.sendNext(true)
  probe.sendNext(false)
  probe.sendNext(false)
  probe.sendComplete()

  result.futureValue shouldBe (1, 3)
}

We should remember that our sink counts the number of true values in the stream. Therefore, to test it, we create a TestSource[Boolean] and link it to the sink sink. Running the stream, keeping both materialized values (Keep.both), gives us access to a Source probe. We use the latter to emit some Boolean values (using TestSource::sendNext()). Then, we send a completion signal with TestSource::sendComplete(). If we wanted to fail the stream, we could use TestSource::sendError() instead.

The second materialized value represents the result of the stream produced by our sink. The call to the futureValue method is actually a ScalaTest feature, made possible by the following import:

import org.scalatest.concurrent.ScalaFutures.convertScalaFuture

This turns an instance of Future into a FutureConcept, allowing us to call FutureConcept::futureValue(), to have ScalaTest wait until the Future completes, returning either its value or an error.

In the end, the sink behaves correctly. We send three Boolean values into the stream, of which only one is true.

4.3. Testing a Flow

Testing a Flow combines what we did to test Sources and Sinks. Hence, to test the Flow logic, we use TestSink to trigger the computation and assert the results. Similarly, we can use TestSource to generate test data. Depending on the type of test we’re writing (unit, integration, and so on) and on the design of our application, we can test one or more Flows simultaneously.

Let’s start by taking a look at a test parse Flow above:

"The \"parse\" flow" should "parse pairs of integers" in {
  val (pub, sub) = TestSource[String]()
    .via(parse)
    .toMat(TestSink[(Int, Int)]())(Keep.both)
    .run()

  pub.sendNext("1,1")
  pub.sendNext("145,146")
  pub.sendComplete()

  sub.requestNext((1, 1))
  sub.requestNext((145, 146))
  sub.expectComplete()
}

In the snippet above, we used Akka’s TestSource and TestSink, keeping both materialized values. This returns two probes, pub, and sub, which we can use to test the parse flow.

In particular, we can publish elements downstream using pub.sendNext(). Similarly, we can send a completion signal with pub.sendComplete(). The test kit defines many more methods allowing us, for instance, to also send errors downstream, as we saw above. On the other hand, the sub probe lets us set expectations.

In the example above, we used sub.requestNext() to wait for a given element and sub.expectComplete() to test for stream termination.

Similarly, we can test that the parse Flow rejects pairs of integers with an invalid delimiter:

"The \"parse\" flow" should "reject pairs with the wrong delimiter" in {
    val (pub, sub) = TestSource[String]()
      .via(parse)
      .toMat(TestSink[(Int, Int)]())(Keep.both)
      .run()

    pub.sendNext("2,3")
    pub.sendNext("1;1")
    pub.sendComplete()

    sub.request(2)
    sub.expectNext((2, 3))
    val exc = sub.expectError()
    exc.getMessage should be("For input string: \"1;1\"")
  }

In this case, we first send a valid element (“2,3”) and then an invalid one (“1;1”). The Flow parses the first pair correctly, but it fails for the second, throwing a NumberFormatException since 1;1 isn’t a valid number. This happens because the code doesn’t split the incoming string, as there is no comma in it. With sub.expectError() we get an instance of the Throwable, on which we can make assertions.

Following the same pattern, we could test for other bad inputs, such as elements with only one number (e.g. 1″) or pairs with wrong types (e.g. “a,2”).

Testing the compare Flow is very similar.

4.4. A Note About Fuzzing

Fuzzing is a software testing technique based on providing random and unexpected data to a program, to cause crashes, memory leaks, race conditions, and similar issues. Fuzzing is also very useful for security testing, to find vulnerabilities in the software.

With Akka Stream Testkit, we can enable a fuzzing mode to run concurrent execution paths more aggressively. This helps us find race conditions in tests. All it takes is to set a configuration value:

akka.stream.materializer.debug.fuzzing-mode = on

Nonetheless, this additional test coverage comes at the cost of reduced performance (a.k.a. stream throughput). Therefore, we shouldn’t enable this setting when running the application on production or for benchmark purposes.

5. Conclusion

In this article, we took a look at the testing strategies made possible by Akka Stream Testkit. In particular, we saw how to use TestSource and TestSink to send test elements downstream and make assertions on the behavior of our stream components. Lastly, we got to know how to enable fuzzing to find race conditions in our code more easily.

As usual, the source code for the example is available can find the code over on GitHub.

Comments are open for 30 days after publishing a post. For any issues past this date, use the Contact form on the site.