1. Introduction

Streams are ubiquitous in modern programming. For instance, we can read files, process events, model communication between sockets, and much more using streams. Streams are so fundamental that the Reactive Streams Manifesto defines the principles of reactive streaming, that is, asynchronous stream processing with back-pressure. The latter is an important feedback mechanism allowing the receiver to slow the sender down if the stream is too fast.

In this tutorial, we’ll take a helicopter view of one implementation of such a manifesto, the akka-streams library. As we’ll see, it allows us to compose several data transformations, known as flows, connecting them to a source of data and a sink.

2. Importing the Library

To get started, we import the relevant Akka Streams libraries into our project:

val AkkaVersion = "2.8.2"

libraryDependencies ++= Seq(
  "com.typesafe.akka" %% "akka-stream" % AkkaVersion,
  "com.typesafe.akka" %% "akka-stream-testkit" % AkkaVersion % Test
)

The former, akka-stream, imports the Akka Stream API. The latter, akka-stream-testkit, on the other hand, imports the testing resources. In this article, we’ll see how to use both.

3. Building Akka Streams

Akka Stream is entirely built around three main components:

  • Source, an operator with one output representing a source of data
  • Sink, an operator with one input, representing the end of the stream, i.e., a consumer of data
  • Flow, an operator with one input and one output. We can see it as a function operating on the stream, transforming the elements that flow through it.

3.1. Defining a Source

The Source object defines many ways to create a source of data, including, but not limited to, the following ones:

  • Source.empty, an empty source
  • Source.single(), a source with only one element
  • Source.future(Future), a source whose elements will be those produced by the Future
  • Source.apply(Iterable), a source producing the elements contained in the given Iterable

For example, we’ll see how to create a Source producing comma-separated pairs of integral numbers. We can imagine the first component of the pair represents the result of a question in a quiz, whereas the second element represents the expected result.

In our running example, we’ll write a Source producing these pairs, some Flows parsing and comparing them, and a Sink counting how many correct answers the user gave. We’ll start with the Source:

val source: Source[String, NotUsed] = Source(
  Seq("5,10", "15,15", "78,79", "12,12", "0,0", "456,456")
)

Here, the example creates a Source out of a Seq of Strings, each containing two numbers. The type of the source value is Source[String, NotUsed]. This means that the source emits Strings, producing a value of type NotUsed, which is somewhat similar to Unit. This is the so-called “materialized value” and is, essentially, a value produced by the stream when it’s run.

This is different than the value produced by a Sink. For example, a Source producing elements every n seconds might materialize an instance of Cancellable to be used to stop emitting elements.

3.2. Defining a Sink

Similarly to Source, the Sink object defines many ways to create a sink for a stream, including, but not limited to, the following ones:

  • Sink.fold(), a sink folding the elements of the stream and returning a Future with the final result
  • Sink.head, returning a Future with the first element of the stream. There’s also an alternative, Sink.headOption, to account for streams with no elements.
  • Sink.last and Sink.lastOption, similar to Sink.head and Sink.headOption, but returning the last element of the stream
  • Sink.foreach, to carry out some action for each element of the stream without returning any results
  • Sink.seq, to collect all the elements of the stream into a Seq

For example, we’ll see how to create a Sink that, given a stream of Boolean values, counts the number of true values over the number of elements of the stream:

val sink: Sink[Boolean, Future[(Int, Int)]] = Sink.fold((0, 0)) {
  case ((correctCount, total), wasCorrect) =>
    if (wasCorrect) (correctCount + 1, total + 1)
    else (correctCount, total + 1)
}

The sink above is based on Sink.fold(). For each stream element, it increments the correctCount counter if and only if the element of the stream is true.

The behavior here is similar to the usual fold operation on Iterables, with the difference that Sink.fold() produces a Future, since the stream is run asynchronously.

3.3. Defining a Flow

We can create Flows using the apply method in the Flow object. Once we create it, we can configure its behavior using the methods of the Flow class, including, but not limited to, the following ones:

  • Flow::map(), to apply a function to every element of the stream
  • Flow::log(), to log the elements of the stream flowing through
  • Flow::take() and Flow::takeWhile(), to only take a substream, discarding the rest
  • Flow::filter() to filter the elements of the stream

For example, we’ll create two Flows, one to parse the pairs produced by the source above and one to compare the two numbers, producing a stream of Booleans that sink can consume:

val parse: Flow[String, (Int, Int), NotUsed] =
  Flow[String]
    .map { pair =>
      val parts = pair.split(",")
      (parts(0).toInt, parts(1).toInt)
    }

val compare: Flow[(Int, Int), Boolean, NotUsed] =
  Flow[(Int, Int)]
    .map { case (userAnswer, correctAnswer) => userAnswer == correctAnswer }

The parse flow inputs elements of type String and turns them into pairs of Ints. It splits the original String using the comma as a separator. This implementation doesn’t include any error handling. If there were no commas, for example, parts(1) would fail and throw an exception, causing a failure in the stream.

The compare flow inputs pairs of Ints and compares them, emitting downstream, Booleans. The sink we defined above takes these elements and counts the number of trues, as we already saw.

3.4. Putting It All Together

Before looking at how we can test our streams using the test kit provided by Akka, we’ll see our stream in action. We’ll take a look at a simple application putting source, parse, compare, and sink together:

object Main extends App {
  implicit val system: ActorSystem = ActorSystem("baeldung")

  source
    .via(parse)
    .via(compare)
    .runWith(sink)
    .andThen {
      case Failure(exception) => println(exception)
      case Success((correct, total)) =>
        println(s"$correct/$total correct answers")
    }(system.dispatcher)
    .onComplete(_ => system.terminate())(system.dispatcher)
}

First of all, our application creates an implicit ActorSystem. We need this to invoke the Flow::runWith() method, which runs a stream with a given sink.

Other than this, the example specifies how to run the stream. With the via method, we can concatenate one or more Flows. As we saw above, running a stream on a sink generated via Sink.fold returns a Future. In our example, we print a message indicating how many correct answers the user gave or the message of the exception if something went wrong while running the stream.

Notably, to use Future::andThen() and Future::onComplete(), we had to specify an execution context. In this case, we used that of the ActorSystem. The call to Future::onComplete() only serves the purpose of terminating the ActorSystem. Otherwise, the application would never terminate.

If we run the example above, it prints 4/6 correct answers.

On the other hand, if the source emitted a badly-formatted element, such as 0;0 rather than 0,0, the stream processing would fail with an exception: java.lang.NumberFormatException: For input string: “0;0”. This is because parse wouldn’t be able to split the two numbers using the comma as a separator and would then try to turn 0;0 into a number. Akka Stream has built-in ways to handle errors like this without surfacing the exception.

4. Testing Akka Streams

Akka Streams comes with a standalone module named akka-stream-testkit, providing tools designed for testing streams. This module defines two main components TestSource and TestSink, with factory methods to generate test Sources and Sinks.

Generally, the way to test the Flow logic is to use TestSink to trigger the computation and assert the results. Similarly, we can use TestSource to generate test data. Depending on the type of test we’re writing (unit, integration, and so on) and on the design of our application, we can test one or more Flows simultaneously.

For instance, we’ll see how to test the parse flow from the examples above:

implicit val system: ActorSystem = ActorSystem("baeldung")

"The parse flow" should "parse pairs of integers" in {
  val (pub, sub) = TestSource[String]()
    .via(parse)
    .toMat(TestSink[(Int, Int)]())(Keep.both)
    .run()

  pub.sendNext("1,1")
  pub.sendNext("145,146")
  pub.sendComplete()

  sub.requestNext((1, 1))
  sub.requestNext((145, 146))
  sub.expectComplete()
}

Like before, to run a stream, we need an implicit ActorSystem. Then, to test a stream, we use Akka’s TestSource and TestSink. In this case, we run the stream keeping both materialized values (Keep.both). This returns two probes, pub and sub, which we can use to test the parse flow.

In particular, we can publish elements downstream using pub.sendNext(). Similarly, we can send a completion signal with pub.sendComplete(). The test kit defines many more methods allowing us, for instance, to also send errors downstream. On the other hand, the sub probe lets us set expectations. In the example above, we used sub.requestNext() to wait for a given element and sub.expectComplete() to test for stream termination.

Unit-testing sources and sinks are similar and use TestSink and TestSource only, respectively. If we want to test a custom Source or a custom Sink, we can use TestSink or TestSource to perform assertions on the generated elements or to generate data, respectively.

5. A Note About Alpakka

Alpakka is an open-source project built on top of Akka Streams, aiming to define reactive, stream-aware integrations for Scala and Java.

Rather than a single library, Alpakka is a collection of integrations with several third-party services, such as selected AWS and Google Cloud ones, MQTT, ElasticSearch, Cassandra, Kafka, Slick (to interact with databases via JDBC), and many more.

The advantage of Alpakka is that it offers a Domain Specific Language (DSL) integrated with Akka Streams.

Similarly to what we did above, Alpakka connectors define Sources, Sinks, and Flows to help us interact with a given service. For example, the integration for AWS S3 offers Sources to download files and Sinks to upload them.

6. Conclusion

In this article, we learned about Akka Stream. First, we looked at the core concepts: Source, Sink, and Flow. Secondly, we put the knowledge into practice by writing a simple Akka Stream. Thirdly, we saw how we could use the test kit provided by Akka to test stream-based components and applications. Finally, we discussed Alpakka and how to use it in our projects to interact with third-party services.

As usual, the source code for the examples is available over on GitHub.

Comments are open for 30 days after publishing a post. For any issues past this date, use the Contact form on the site.