1. Introduction

ZIO is a functional effect system and a zero-dependency library for asynchronous and concurrent programming in Scala. In this tutorial, we’ll learn the basics of ZIO Streams, a purely functional streaming library that uses the ZIO runtime.

The purpose of the streaming library is to allow users to abstract the mechanism of reading and writing operations from the processing logic. To allow such an abstraction, ZIO Streams introduces several data types that we’ll discuss.

2. What Is ZStream?

One of the core data types is ZStream[R, E, A]. Let’s understand the meaning of the type parameters:

  • R – describes the dependencies required by the stream
  • E – describes the error that the stream can fail with
  • A – describes the return values of the stream. Every stream can return zero or more values.

As we can see, the type signature of ZStream looks very similar to the ZIO type parameters. As we know, the ZIO data type represents an effect that requires some dependencies of type R, can fail with an error type E, and returns a single value of type A.

Similarly to ZIOZStream also requires dependencies of type R and can fail with an error type E, but the key difference between ZIO and ZStream is that the streaming type can yield any number of values between 0 and infinity of type A.

ZStream doesn’t operate on individual values of type A for both practical and performance reasons. Instead, all operations are done on small collections of elements. Internally those collections are represented as Chunk[A] data type.

Let’s define a simple ZStream:

val simpleStream = ZStream.range(0, 1000)

When defining the workflow, ZStream represents a source of data for processing.

3. What Is ZSink?

Another important data type required for defining workflow with ZIO Streams is ZSink. It has a quite complex type signature – ZSink[R, E, I, L, Z].

Let’s take a closer look at each individual type parameter:

  • R – describes the dependencies required by the sink
  • E – describes the errors that the sink can fail with
  • I – describes the events that are being processed by the sink
  • L – describes the leftover values of the sink
  • Z – describes the return values of the sink

Let’s briefly discuss the difference between the return and leftover values. For some operations, the sink won’t process all elements of the stream. Some examples of such operations are take() and head(). Elements that weren’t processed by the given sink will be returned as leftover values.

We can create a sink that allows us to sum integers:

val sumIntsSink: ZSink[Any, Nothing, Int, Nothing, Int] = ZSink.sum[Int]

To use the sink we can pass it to the run() method of the ZStream:

val processedStream: ZIO[Any, Nothing, Int] = ZStream.range(0, 1000).run(processAllSink)

The sum operation processes all elements of the stream. That’s why the leftover type of the sink is Nothing.

We can also create a sink that only processes some elements:

val processSomeSink: ZSink[Any, Nothing, Int, Int, Chunk[Int]] = ZSink.take[Int](100)

For the sink above, the leftover type L is not Nothing, because the sink will only process the first 100 elements of the stream. We can use the collectLeftover() function to return both leftover and return values for the given sink:

val processed: ZIO[Any, Nothing, (Chunk[Int], Chunk[Int])] = ZStream.range(0, 1000)
  .run(processSomeSink.exposeLeftover)

The return type of the run() function is a tuple containing the return value as the first field and the leftover value as the second.

4. Building Workflows With ZPipeline

We have just understood the concept of streams and sinks. But there’s one more important element in ZIO Streams that we’ll use when defining workflows, ZPipeline.

It serves as a stream transformer, taking one ZStream as input and returning another transformed ZStream as the output.

ZPipeline allows us to define a form of a blueprint for obtaining one stream from another. While, from a technical point of view, we could just call all the functions on the ZStream directly, using ZPipeline allows us to reduce code duplication and abstract stream transformations.

Just as with the previous data types, we can use one of the predefined functions to create a pipeline:

val pipeline:ZPipeline[Any, Nothing, String, Int] = ZPipeline.map[String, Int](string => string.toInt + 1)

The above pipeline takes a stream of strings and maps it into integers and then increases every mapped integer by one. Since we defined ZPipeline, we can reuse it to map multiple different streams:

val mappingPipeline:ZPipeline[Any, Nothing, String, Int] = ZPipeline.map[String, Int](string => string.toInt + 1)
val firstStreamMapped = ZStream("1", "2", "3", "4", "5").via(mappingPipeline)
val secondStreamMapped = ZStream("6", "7", "8", "9", "10") .via(mappingPipeline )
val totalSum: ZIO[Any, Nothing, Int] = firstStreamMapped.concat(secondStreamMapped).runSum

Additionally, we can use a >>>  operator or the andThen() function to compose different pipelines:

val mappingPipeline:ZPipeline[Any, Nothing, String, Int] = ZPipeline.map[String, Int](string => string.toInt + 1)
val filterOdd: ZPipeline[Any, Nothing, Int, Int] = ZPipeline.filter(_ % 2 == 0)
val oddSream =ZStream("1", "2", "3", "4", "5", "6").via(mappingPipeline >>> filterOdd)

5. Error Handling

So far we’ve discussed some very simple cases, but in reality, streams may fail. Failures may occur for various reasons, such as input data may have an incorrect format, the input file may not exist or the HTTP connection may timeout. ZIO streams allow us to use different methods to handle a failing stream.

5.1. Recovering From Failures

If our stream fails, we can recover from the failure and run another stream instead. We can choose whether we want to recover from just specific errors or all possible exceptions:

val failingStream: ZStream[Any, Throwable, Int] = ZStream.range(0, 5) ++
  ZStream.fail(new RuntimeException("Failing!")) ++
  ZStream.range(6, 10)
val recoveryStream: ZStream[Any, Throwable, Int] = ZStream.range(10, 15)
  
override def run: ZIO[Any with ZIOAppArgs with Scope, Any, Any] =
  failingStream.orElse(recoveryStream)
    .runSum

In this case, runSum() processes numbers from 0 to 5 from the failing stream, and then, after the exception is thrown, it skips the rest of the elements from this stream and processes recoveryStream.

5.2. Retrying on Failure

Sometimes the failure may just be temporary. For example, an HTTP request that failed due to network issues may succeed after some time. If we have a case like that, we can use retry():

object Generator{
  var counter = 0
  def getNext(): Task[Int] = {
    counter += 1
    if(counter == 2) ZIO.fail(new RuntimeException("FAILED!")) else ZIO.succeed(counter)
  }
}
val failingStream: ZStream[Any, Throwable, Int] =
  ZStream.range(0, 5).flatMap(_ => ZStream.fromZIO(Generator.getNext()))

override def run: ZIO[Any with ZIOAppArgs with Scope, Any, Any] = failingStream.retry(Schedule.once)
  .runSum

As we can see, retry() takes a Schedule as an argument, which will be used to calculate the retrying schedule. In our case above, we’re going to only retry once.

One more important thing to note is that retry() doesn’t keep any state and it will retry the whole stream. So in our case, we’re going to process seven elements, the first two before failure, and then all five elements after retrying.

6. Processing Files With ZIO Streams

So far, we’ve only processed artificially-generated streams, but with ZIO Streams, we can do much more. In this section, we’ll read and write data to files.

6.1. Reading Data From a File

ZIO Streams have the fromFile() method that allows us to create a stream of bytes from the selected file.  We can also create a stream that processes whole lines instead of bytes, which is quite useful when working with text files:

val fileIterator = Source.fromFile("file.txt").getLines()
val fileInputStream: ZStream[Any, Throwable, String] =
  ZStream.fromIterator(fileIterator)

6.2. Writing Data to a File

ZSink offers a method fromFile() that allows us to write a stream to a file. However, before we write the stream into the file, we first need to map our fileInputStream to bytes. Additionally, since getLines() removes newline characters, we need to add them back so that the output file looks the same as the input file:

val fileSink = ZSink.fromFile(new File("outputFile.txt"))
val fileOutputStream = fileInputStream
  .intersperse("\n")
  .flatMap(line => ZStream(line.getBytes.toList:_*))
  .run(fileSink)

We’ve added the newline characters using the intersperse() method.

7. Conclusion

In this article, we learned the basic idea behind ZIO Streams, a streaming library in the ZIO ecosystem.

We also explored basic examples of creating sources and sinks, processing streams with ZPipeline, and error handling.

As always, all the code samples can be found over on GitHub.

Comments are open for 30 days after publishing a post. For any issues past this date, use the Contact form on the site.