1. Overview

In this tutorial, we’ll explore how to read a CSV file in our akka-streams Scala applications. We’ll first approach this problem using the standard akka-streams library.

Then, we’ll take advantage of an Alpakka library specifically created to handle CSVs, which comes with many useful features to help transform the stream into data structures that are much easier to work with within our code.

2. Using a String Literal

First, we can hold a CSV in a literal String within our code. We can use Source.single, passing in the CSV data as a String to access the data in an Akka stream:

Source
  .single(ByteString(""""Name","Age"
    |"Bob",24
    |"Jane",47""".stripMargin))

The Source.single call will then return a type of Source[ByteString, NotUsed]. This can be particularly useful for writing a script or test code on a short CSV. However, when working on a larger CSV for code running in production, it’s much better to read the file directly. Let’s see how to do just that.

3. Importing a File

In normal circumstances, reading the file from the file system is usually better practice. The standard Akka framework offers an object called FileIO, which can read a file as a stream:

FileIO.fromPath(Paths.get("path/to/file"))

In this code, we’re calling File.fromPath to create a new Source from the file’s contents in the path passed as an argument. To provide the path, we call Paths.get, passing in the path to our file as a String. This call returns a Source of type Source[ByteString, Future[IOResult]], which we can then use to access the CSV file’s contents.

4. Framing

After creating a new Source for our CSV file, it’s time to transform the Source into something we can use. Currently, we have a stream of a single ByteString value, and we can map over our Source to change this into a stream of Strings for each row of the CSV file:

Source
  .single(ByteString("path/to/file"))
  .via(Framing.delimiter(ByteString("\n"), 256, true).map(_.utf8String))

There are several arguments here, so let’s break this down, starting with the call to Framing.delimiter(ByteString(“\n”), 256, true):

  • ByteString(“\n”) – This first argument is the delimiter, which denotes where to break up each section in the file. Using \n signifies that a new line within the file should be used as the delimiter. This means the resulting stream will have a new ByteString element for each line in the file.
  • 256 – This number is the maximumFrameLength, which sets the maximum length an individual frame can be between delimiters. When setting this value in our application code, we must ensure it is set appropriately for the CSV file we’re importing. If this value is exceeded in a single frame, the whole Flow will fail.
  • true – this boolean value states whether or not truncation is allowed. If set to true, the final frame does not require the final delimiter. If set to false, a final delimiter is expected; otherwise, the Flow will fail. In our scenario, it’s best to set this to true, as we can’t guarantee the final line will end in a new line.

The final step in our code is to call .map(_.utf8String), which converts from the ByteString to a String.

5. Using Alpakka

The code used in the previous section is available in the standard Akka-streams library. Alternatively, we can use the Alpakka library, named akka-stream-alpakka-csv, specifically designed for working with CSV files.

We can add this as a dependency to our projects:

libraryDependencies ++= 
  Seq("com.lightbend.akka" %% "akka-stream-alpakka-csv" % "5.0.0")

Using akka-stream-alpakka-csv, we can call a dedicated function in place of the framing step from the last section:

FileIO
  .fromPath(Paths.get(path))
  .via(CsvParsing.lineScanner())
  .map(_.map(_.utf8String))

Using a single call to CsvParsing.lineScanner(), we can convert the ByteString into a List[ByteString]. Then, we map over the stream using .map(.utf8String) to transform the stream into a Source[List[String], Future[IOResult]].

Each stream element is now a List of Strings containing all the values for one line in the CSV, which is a lot easier to deal with in our code and provides a return type that is much easier to use.

6. Using CsvToMap.toMapAsStrings

The akka-stream-alpakka-csv library contains many useful functions to help transform CSV ByteString into various data structures for our code. One to note is CsvToMap.toMapAsStrings:

FileIO
  .fromPath(Paths.get(path))
  .via(CsvParsing.lineScanner())
  .via(CsvToMap.toMapAsStrings())

Calling this function converts the ByteString into a Map[String, String], all in a single call. This will take the header of each row and use it as the key for the Map in each stream element. This is a really powerful function that presents our CSV data in a familiar and workable data structure.

7. Conclusion

In this article, we have learned how to read in a CSV using the Akka-streams framework. We’ve explored both options — first holding the file as a String literal or reading the file in from the file system directly — and we know when either option is most appropriate to use.

Moving on from reading in the CSV file, we’ve discussed the options available to transform the CSV data from a ByteString to a data structure we can most efficiently use in our code.

This can be done through the standard Akka library by Framing.delimiter or by using CsvParsing.lineScanner and CsvToMap.toMapAsStrings from the akka-stream-alpakka-csv library.

As always, the sample code used in this article is available over on GitHub.

Comments are closed on this article!