
Learn through the super-clean Baeldung Pro experience:
>> Membership and Baeldung Pro.
No ads, dark-mode and 6 months free of IntelliJ Idea Ultimate to start with.
Last updated: March 18, 2024
ZIO is a functional effect system and a zero-dependency library for asynchronous and concurrent programming in Scala. In this tutorial, we’ll learn the basics of ZIO Streams, a purely functional streaming library that uses the ZIO runtime.
The purpose of the streaming library is to allow users to abstract the mechanism of reading and writing operations from the processing logic. To allow such an abstraction, ZIO Streams introduces several data types that we’ll discuss.
One of the core data types is ZStream[R, E, A]. Let’s understand the meaning of the type parameters:
As we can see, the type signature of ZStream looks very similar to the ZIO type parameters. As we know, the ZIO data type represents an effect that requires some dependencies of type R, can fail with an error type E, and returns a single value of type A.
Similarly to ZIO, ZStream also requires dependencies of type R and can fail with an error type E, but the key difference between ZIO and ZStream is that the streaming type can yield any number of values between 0 and infinity of type A.
ZStream doesn’t operate on individual values of type A for both practical and performance reasons. Instead, all operations are done on small collections of elements. Internally those collections are represented as Chunk[A] data type.
Let’s define a simple ZStream:
val simpleStream = ZStream.range(0, 1000)
When defining the workflow, ZStream represents a source of data for processing.
Another important data type required for defining workflow with ZIO Streams is ZSink. It has a quite complex type signature – ZSink[R, E, I, L, Z].
Let’s take a closer look at each individual type parameter:
Let’s briefly discuss the difference between the return and leftover values. For some operations, the sink won’t process all elements of the stream. Some examples of such operations are take() and head(). Elements that weren’t processed by the given sink will be returned as leftover values.
We can create a sink that allows us to sum integers:
val sumIntsSink: ZSink[Any, Nothing, Int, Nothing, Int] = ZSink.sum[Int]
To use the sink we can pass it to the run() method of the ZStream:
val processedStream: ZIO[Any, Nothing, Int] = ZStream.range(0, 1000).run(processAllSink)
The sum operation processes all elements of the stream. That’s why the leftover type of the sink is Nothing.
We can also create a sink that only processes some elements:
val processSomeSink: ZSink[Any, Nothing, Int, Int, Chunk[Int]] = ZSink.take[Int](100)
For the sink above, the leftover type L is not Nothing, because the sink will only process the first 100 elements of the stream. We can use the collectLeftover() function to return both leftover and return values for the given sink:
val processed: ZIO[Any, Nothing, (Chunk[Int], Chunk[Int])] = ZStream.range(0, 1000)
.run(processSomeSink.exposeLeftover)
The return type of the run() function is a tuple containing the return value as the first field and the leftover value as the second.
We have just understood the concept of streams and sinks. But there’s one more important element in ZIO Streams that we’ll use when defining workflows, ZPipeline.
It serves as a stream transformer, taking one ZStream as input and returning another transformed ZStream as the output.
ZPipeline allows us to define a form of a blueprint for obtaining one stream from another. While, from a technical point of view, we could just call all the functions on the ZStream directly, using ZPipeline allows us to reduce code duplication and abstract stream transformations.
Just as with the previous data types, we can use one of the predefined functions to create a pipeline:
val pipeline:ZPipeline[Any, Nothing, String, Int] = ZPipeline.map[String, Int](string => string.toInt + 1)
The above pipeline takes a stream of strings and maps it into integers and then increases every mapped integer by one. Since we defined ZPipeline, we can reuse it to map multiple different streams:
val mappingPipeline:ZPipeline[Any, Nothing, String, Int] = ZPipeline.map[String, Int](string => string.toInt + 1)
val firstStreamMapped = ZStream("1", "2", "3", "4", "5").via(mappingPipeline)
val secondStreamMapped = ZStream("6", "7", "8", "9", "10") .via(mappingPipeline )
val totalSum: ZIO[Any, Nothing, Int] = firstStreamMapped.concat(secondStreamMapped).runSum
Additionally, we can use a >>> operator or the andThen() function to compose different pipelines:
val mappingPipeline:ZPipeline[Any, Nothing, String, Int] = ZPipeline.map[String, Int](string => string.toInt + 1)
val filterOdd: ZPipeline[Any, Nothing, Int, Int] = ZPipeline.filter(_ % 2 == 0)
val oddSream =ZStream("1", "2", "3", "4", "5", "6").via(mappingPipeline >>> filterOdd)
So far we’ve discussed some very simple cases, but in reality, streams may fail. Failures may occur for various reasons, such as input data may have an incorrect format, the input file may not exist or the HTTP connection may timeout. ZIO streams allow us to use different methods to handle a failing stream.
If our stream fails, we can recover from the failure and run another stream instead. We can choose whether we want to recover from just specific errors or all possible exceptions:
val failingStream: ZStream[Any, Throwable, Int] = ZStream.range(0, 5) ++
ZStream.fail(new RuntimeException("Failing!")) ++
ZStream.range(6, 10)
val recoveryStream: ZStream[Any, Throwable, Int] = ZStream.range(10, 15)
override def run: ZIO[Any with ZIOAppArgs with Scope, Any, Any] =
failingStream.orElse(recoveryStream)
.runSum
In this case, runSum() processes numbers from 0 to 5 from the failing stream, and then, after the exception is thrown, it skips the rest of the elements from this stream and processes recoveryStream.
Sometimes the failure may just be temporary. For example, an HTTP request that failed due to network issues may succeed after some time. If we have a case like that, we can use retry():
object Generator{
var counter = 0
def getNext(): Task[Int] = {
counter += 1
if(counter == 2) ZIO.fail(new RuntimeException("FAILED!")) else ZIO.succeed(counter)
}
}
val failingStream: ZStream[Any, Throwable, Int] =
ZStream.range(0, 5).flatMap(_ => ZStream.fromZIO(Generator.getNext()))
override def run: ZIO[Any with ZIOAppArgs with Scope, Any, Any] = failingStream.retry(Schedule.once)
.runSum
As we can see, retry() takes a Schedule as an argument, which will be used to calculate the retrying schedule. In our case above, we’re going to only retry once.
One more important thing to note is that retry() doesn’t keep any state and it will retry the whole stream. So in our case, we’re going to process seven elements, the first two before failure, and then all five elements after retrying.
So far, we’ve only processed artificially-generated streams, but with ZIO Streams, we can do much more. In this section, we’ll read and write data to files.
ZIO Streams have the fromFile() method that allows us to create a stream of bytes from the selected file. We can also create a stream that processes whole lines instead of bytes, which is quite useful when working with text files:
val fileIterator = Source.fromFile("file.txt").getLines()
val fileInputStream: ZStream[Any, Throwable, String] =
ZStream.fromIterator(fileIterator)
ZSink offers a method fromFile() that allows us to write a stream to a file. However, before we write the stream into the file, we first need to map our fileInputStream to bytes. Additionally, since getLines() removes newline characters, we need to add them back so that the output file looks the same as the input file:
val fileSink = ZSink.fromFile(new File("outputFile.txt"))
val fileOutputStream = fileInputStream
.intersperse("\n")
.flatMap(line => ZStream(line.getBytes.toList:_*))
.run(fileSink)
We’ve added the newline characters using the intersperse() method.
In this article, we learned the basic idea behind ZIO Streams, a streaming library in the ZIO ecosystem.
We also explored basic examples of creating sources and sinks, processing streams with ZPipeline, and error handling.