The Price of all “Rest with Spring” course packages will increase by $50 next Friday:

>>> GET ACCESS NOW

1. Overview

In this article, we will be looking at the akka-streams library that is built atop of the Akka actor framework, which adheres to the reactive streams manifesto. The Akka Streams API allows us to easily compose data transformation flows from independent steps.

Moreover, all processing is done in a reactive, non-blocking, and asynchronous way.

2. Maven Dependencies

To get started, we need to add the akka-stream and akka-stream-testkit libraries into our pom.xml:

<dependency>
    <groupId>com.typesafe.akka</groupId>
    <artifactId>akka-stream_2.11</artifactId>
    <version>2.5.2</version>
</dependency>
<dependency>
    <groupId>com.typesafe.akka</groupId>
    <artifactId>akka-stream-testkit_2.11</artifactId>
    <version>2.5.2</version>
</dependency>

3. Akka Streams API

To work with Akka Streams, we need to be aware of the core API concepts:

  • Source – the entry point to processing in the akka-stream library – we can create an instance of this class from multiple sources; for example, we can use the single() method if we want to create a Source from a single String, or we can create a Source from an Iterable of elements
  • Flow – the main processing building block – every Flow instance has one input and one output value
  • Materializer – we can use one if we want our Flow to have some side effects like logging or saving results; most commonly, we will be passing the NotUsed alias as a Materializer to denote that our Flow should not have any side effects
  • Sink operation – when we are building a Flow, it is not executed until we will register a Sink operation on it – it is a terminal operation that triggers all computations in the entire Flow

4. Creating Flows in Akka Streams

Let’s start by building a simple example, where we’ll show how to create and combine multiple Flows – to process a stream of integers and calculate the average moving window of integer pairs from the stream.

We’ll parse a semicolon-delimited String of integers as input to create our akka-stream Source for the example.

4.1. Using a Flow to Parse Input

First, let’s create a DataImporter class that will take an instance of the ActorSystem that we will use later to create our Flow:

public class DataImporter {
    private ActorSystem actorSystem;

    // standard constructors, getters...
}

Next, let’s create a parseLine method that will generate a List of Integer from our delimited input String. Keep in mind that we are using Java Stream API here only for parsing:

private List<Integer> parseLine(String line) {
    String[] fields = line.split(";");
    return Arrays.stream(fields)
      .map(Integer::parseInt)
      .collect(Collectors.toList());
}

Our initial Flow will apply parseLine to our input to create a Flow with input type String and output type Integer:

private Flow<String, Integer, NotUsed> parseContent() {
    return Flow.of(String.class)
      .mapConcat(this::parseLine);
}

When we call the parseLine() method, the compiler knows that the argument to that lambda function will be a String – same as the input type to our Flow.

Note that we are using the mapConcat() method – equivalent to the Java 8 flatMap() method – because we want to flatten the List of Integer returned by parseLine() into a Flow of Integer so that subsequent steps in our processing do not need to deal with the List.

4.2. Using a Flow to Perform Calculations

At this point, we have our Flow of parsed integers. Now, we need to implement logic that will group all input elements into pairs and calculate an average of those pairs.

Now, we’ll create a Flow of Integers and group them using the grouped() method.

Next, we want to calculate an average.

Since we are not interested in the order in which those averages will be processed, we can have averages calculated in parallel using multiple threads by using the mapAsyncUnordered() method, passing the number of threads as an argument to this method.

The action that will be passed as the lambda to the Flow needs to return a CompletableFuture because that action will be calculated asynchronously in the separate thread:

private Flow<Integer, Double, NotUsed> computeAverage() {
    return Flow.of(Integer.class)
      .grouped(2)
      .mapAsyncUnordered(8, integers ->
        CompletableFuture.supplyAsync(() -> integers.stream()
          .mapToDouble(v -> v)
          .average()
          .orElse(-1.0)));
}

We are calculating averages in eight parallel threads. Note that we are using the Java 8 Stream API for calculating an average.

4.3. Composing Multiple Flows into a Single Flow

The Flow API is a fluent abstraction that allows us to compose multiple Flow instances to achieve our final processing goal. We can have granular flows where one, for example, is parsing JSON, another is doing some transformation, and another is gathering some statistics.

Such granularity will help us create more testable code because we can test each processing step independently.

We created two flows above that can work independently of each other. Now, we want to compose them together.

First, we want to parse our input String, and next, we want to calculate an average on a stream of elements.

We can compose our flows using the via() method:

Flow<String, Double, NotUsed> calculateAverage() {
    return Flow.of(String.class)
      .via(parseContent())
      .via(computeAverage());
}

We created a Flow having input type String and two other flows after it. The parseContent() Flow takes a String input and returns an Integer as output. The computeAverage() Flow is taking that Integer and calculates an average returning Double as the output type.

5. Adding Sink to the Flow

As we mentioned, to this point the whole Flow is not yet executed because it is lazy. To start execution of the Flow we need to define a Sink. The Sink operation can, for example, save data into a database, or send results to some external web service.

Suppose we have an AverageRepository class with the following save() method that writes results to our database:

CompletionStage<Double> save(Double average) {
    return CompletableFuture.supplyAsync(() -> {
        // write to database
        return average;
    });
}

Now, we want to create a Sink operation that use this method to save the results of our Flow processing. To create our Sink, we first need to create a Flow that takes a result of our processing as the input type. Next, we want to save all our results to the database.

Again, we do not care about ordering of the elements, so we can perform the save() operations in parallel using the mapAsyncUnordered() method.

To create a Sink from the Flow we need to call the toMat() with Sink.ignore() as a first argument and Keep.right() as the second because we want to return a status of the processing:

private Sink<Double, CompletionStage<Done>> storeAverages() {
    return Flow.of(Double.class)
      .mapAsyncUnordered(4, averageRepository::save)
      .toMat(Sink.ignore(), Keep.right());
}

6. Defining a Source for Flow

The last thing that we need to do is to create a Source from the input StringWe can apply a calculateAverage() Flow to this source using the via() method.

Then, to add the Sink to the processing, we need to call the runWith() method and pass the storeAverages() Sink that we just created:

CompletionStage<Done> calculateAverageForContent(String content) {
    return Source.single(content)
      .via(calculateAverage())
      .runWith(storeAverages(), ActorMaterializer.create(actorSystem))
      .whenComplete((d, e) -> {
          if (d != null) {
              System.out.println("Import finished ");
          } else {
              e.printStackTrace();
          }
      });
}

Note that when the processing is finished we are adding the whenComplete() callback, in which we can perform some action depending on the outcome of the processing.

7. Testing Akka Streams

We can test our processing using the akka-stream-testkit. 

The best way to test the actual logic of the processing is to test all Flow logic and use TestSink to trigger the computation and assert on the results.

In our test, we are creating the Flow that we want to test, and next, we are creating a Source from the test input content:

@Test
public void givenStreamOfIntegers_whenCalculateAverageOfPairs_thenShouldReturnProperResults() {
    // given
    Flow<String, Double, NotUsed> tested = new DataImporter(actorSystem).calculateAverage();
    String input = "1;9;11;0";

    // when
    Source<Double, NotUsed> flow = Source.single(input).via(tested);

    // then
    flow
      .runWith(TestSink.probe(actorSystem), ActorMaterializer.create(actorSystem))
      .request(4)
      .expectNextUnordered(5d, 5.5);
}

We are checking that we are expecting four input arguments, and two results that are averages can arrive in any order because our processing is done in the asynchronous and parallel way.

8. Conclusion

In this article, we were looking at the akka-stream library.

We defined a process that combines multiple Flows to calculate moving average of elements. Then, we defined a Source that is an entry point of the stream processing and a Sink that triggers the actual processing.

Finally, we wrote a test for our processing using the akka-stream-testkit.

The implementation of all these examples and code snippets can be found in the GitHub project – this is a Maven project, so it should be easy to import and run as it is.

The Price of all “Rest with Spring” course packages will increase by $50 next Friday:

>>> GET ACCESS NOW

Leave a Reply

Be the First to Comment!

Notify of
avatar
wpDiscuz