1. Overview

ZIO fibers are a concurrency model that we can use to perform up to tens of thousands of concurrent computations. Many Scala libraries, such as ZIO and Cats, implement this model.

In this tutorial, we’ll explore and learn how to use them in our code.

2. Fibers vs Threads

To understand ZIO fibers, we need to first compare them with another concurrency model that’s highly used, the JVM Threads. We can use fibers and threads for multitasking, but they have significant differences.

2.1. JVM Threads

JVM threads are bound to OS threads. Our application needs to make system calls and allocate a fixed memory size to create a new thread.

JVM threads use preemptive multitasking, meaning that the OS decides when a thread will execute and for how long.

When the OS decides that another thread has to run, it’ll save its current state and pass control to another thread. This context switching adds a lot of overhead and delay for many threads.

Threads don’t have a specific return type. They implement their functionality by the run method, returning void, and they don’t allow thread composition. They also don’t have a meaningful error response.

It’s important to note that threads can offer isolation between individual threads. If one thread encounters an error or crashes, it doesn’t affect other threads directly. Each thread has its separate stack and local variables, minimizing interference between threads.

2.2. ZIO Fibers

We can compare ZIO Fibers to green threads. They’re dynamic data structures that can change their memory consumption on demand, and the ZIO runtime is responsible for running them.

We can create as many fibers as we want as long as we have enough memory. They utilize cooperative multitasking, meaning all the fibers work together, and each one decides when to pass control to another fiber, eliminating the overhead of context switching.

For example, a fiber may run that makes a database call. While this fiber is blocked waiting for the database to respond, it passes control to another, so there’s no wasted time.

Fibers are built on the concept of composition. They are typed using specific return and error types, allowing us to create complex concurrent programs using a functional and composable approach. We can safely interrupt processes, unlike with threads where interrupting may not be successful.

Comparing the isolation levels with JVM threads, if a fiber encounters an unrecoverable error or fails, it can propagate to other fibers and make them fail. Careful error-handling strategies are needed to manage failures and prevent them from affecting unrelated parts of the application.

3. Creating Fibers

Whenever we create a ZIO effect, we utilize fibers under the hood. Every main function of a ZIO app has a main fiber that executes all the effects. This only allows us to utilize some of the power of fibers, as all the effects run sequentially on the main fiber.

Before exploring how to create fibers, we should note that ZIO documentation states that we should only use fibers manually if we’re experienced ZIO programmers, as it’s very easy to make mistakes and introduce performance problems.

ZIO provides a lot of concurrent primitives like raceWith, zipPar, and foreachPar that use fibers under the hood and are safer.

Now, we’ll implement a simple application that uses fibers. In our example, we’ll implement all the steps of cooking pasta with a simple sauce. To make the dish, we must boil some water, put the pasta in the water, prepare the sauce ingredients, and cook the sauce.

Since all these steps require some time, we’ll mimic this by adding some delay and implementing a generic method:

private def delay(duration: Duration): ZIO[Any, Nothing, Unit] = ZIO.sleep(duration)

Now, let’s implement our methods:

private def boilWater(): ZIO[Any, Nothing, Unit] =
  for {
    _ <- ZIO.succeed(println("Water put on stove..."))
    _ <- delay(100.milli)
    _ <- ZIO.succeed(println("Water boiled!"))
  } yield ()

private def boilPasta(): ZIO[Any, Nothing, Unit] =
  for {
    _ <- ZIO.succeed(println("Put pasta in boiling water..."))
    _ <- delay(1.second)
    _ <- ZIO.succeed(println("Pasta ready!"))
  } yield ()

private def prepareIngredient(ingredient: String): ZIO[Any, Nothing, Unit] =
  for {
    _ <- ZIO.succeed(println(s"Preparing $ingredient..."))
    _ <- delay(300.millis)
    _ <- ZIO.succeed(println(s"$ingredient ready"))
  } yield ()

private def makeSauce(): ZIO[Any, Nothing, Unit] =
  for {
    _ <- ZIO.succeed(println(s"Preparing sauce..."))
    _ <- delay(1.second)
    _ <- ZIO.succeed(println(s"Sauce is ready"))
  } yield ()

For each action, we print something when it starts, wait some time for it to complete, and then print the completed action. We can add any ingredients that we want using a generic method. For now, we’ll only add some onions and tomatoes.

The only thing we have to do now is to call these effects from our main method:

def pastaApp: ZIO[Any, Nothing, Unit] =
  for {
    waterFiber <- boilWater().fork
    pastaFiber <- boilPasta().fork
    tomatoFiber <- prepareIngredient("tomato").fork
    onionFiber <- prepareIngredient("onion").fork
    sauceFiber <- makeSauce().fork
    } yield ()

We call the fork method to ensure a different fiber executes each step.

When we run our code, we’ll notice a different result each time, and steps execute in the wrong order. We put the pasta in the water before it started boiling and prepare multiple ingredients simultaneously. This happens because every fiber runs asynchronously and doesn’t depend on another thread to complete.

To fix this, we need to make some fibers wait for other ones to complete:

for {
  waterFiber <- boilWater().fork
  pastaFiber <- waterFiber.await.zip(boilPasta()).fork
  tomatoFiber <- prepareIngredient("tomato").fork
  onionFiber <- tomatoFiber.await.zip(prepareIngredient("onion")).fork
  sauceFiber <- onionFiber.await.zip(makeSauce()).fork
} yield ()

We use the await and zip methods to make the fibers wait for the execution of the previous step before starting the next one. Now, we wait for the water to boil before putting in the pasta, and we finish preparing the tomato and the onions before making the sauce.

The await method waits for the previous fiber to complete, and the zip method zips two different fibers into one.

4. Joining Fibers

While we’ve managed to make our fibers dependent on each other, we still need to make them run concurrently. More importantly, we never wait for all the fibers to finish their work. So, the result doesn’t make a lot of sense now:

Water put on stove...
Preparing tomato...

We see the independent steps starting but never finishing. We must zip our two fibers into one and then wait for the result. For this, we use the join method:

for {
  waterFiber <- boilWater().fork
  pastaFiber <- waterFiber.await.zip(boilPasta()).fork
  tomatoFiber <- prepareIngredient("tomato").fork
  onionFiber <- tomatoFiber.await.zip(prepareIngredient("onion")).fork
  sauceFiber <- onionFiber.await.zip(makeSauce()).fork
  _ <- pastaFiber.zip(sauceFiber).join
} yield ()

We can see now that our result is what we expected. The fibers that depend on others are waiting for each other to finish, while the ones that don’t, run concurrently:

Water put on stove...
Preparing tomato...
Water boiled!
Put pasta in boiling water...
tomato ready
Preparing onion...
onion ready
Preparing sauce...
Pasta ready!
Sauce is ready

5. Interrupting Fibers

We sometimes want to stop the execution of a fiber that we’ve already started. In our example, we decided that we didn’t want to cook pasta and we’d order something else instead. For this action, we’ll create another method:

private def orderFood(): ZIO[Any, Nothing, Unit] =
  for {
    _ <- ZIO.succeed(println("Ordering some food..."))
    _ <- delay(1.second)
    _ <- ZIO.succeed(println("Food arrived!"))
  } yield ()

Our main method will also include this method:

for {
  waterFiber <- boilWater().fork
  pastaFiber <- waterFiber.await.zip(boilPasta()).fork
  tomatoFiber <- prepareIngredient("tomato").fork
  onionFiber <- tomatoFiber.await.zip(prepareIngredient("onion")).fork
  sauceFiber <- onionFiber.await.zip(makeSauce()).fork
  _ <- pastaFiber.zip(sauceFiber).join
  orderFiber <- orderFood().fork
  _ <- orderFiber.join
} yield ()

The problem is that since our pasta fiber is running, we’ll end up with the pasta dish and the food we ordered. We need to use the interrupt method on the pasta fiber to stop it and end up with only the food we ordered:

for {
  waterFiber <- boilWater().fork
  pastaFiber <- waterFiber.await.zip(boilPasta()).fork
  tomatoFiber <- prepareIngredient("tomato").fork
  onionFiber <- tomatoFiber.await.zip(prepareIngredient("onion")).fork
  sauceFiber <- onionFiber.await.zip(makeSauce()).fork
  _ <- pastaFiber.zip(sauceFiber).interrupt
  orderFiber <- orderFood().fork
  _ <- orderFiber.join
} yield ()

And the result verifies that we stopped preparing the dish midway:

Water put on stove...
Preparing tomato...
Ordering some food...
Water boiled!
tomato ready
Preparing onion...
onion ready
Food arrived!

Some steps are still executed after we have ordered food. This happens because we can only guarantee the order of execution of each fiber if we make them dependent on each other.

Unlike JVM Threads, in which we don’t know if the interruption succeeded, in ZIO, when a fiber interrupts another fiber, we know that the interruption actually occurs and always works.

6. Using Finalizers

After we have our meal, we’ll want to clean everything up. To do so, we can use a finalizer.

A finalizer is an effect we call when the fiber completes its task or gets interrupted. Let’s create a simple method again that will let us clean up everything, our kitchen and the packages we received:

private val cleanup: ZIO[Any, Nothing, Unit] =
  for {
    _ <- ZIO.succeed(println(s"Cleaning up kitchen and delivery packages..."))
    _ <- delay(10.millis)
    _ <- ZIO.succeed(println("All clean"))
  } yield ()

We now need to call it in our code using the ensuring method:

for {
  waterFiber <- boilWater().fork
  pastaFiber <- waterFiber.await.zip(boilPasta()).fork
  tomatoFiber <- prepareIngredient("tomato").fork
  onionFiber <- tomatoFiber.await.zip(prepareIngredient("onion")).fork
  sauceFiber <- onionFiber.await.zip(makeSauce()).fork
  _ <- pastaFiber.zip(sauceFiber).interrupt
  orderFiber <- orderFood().fork
  _ <- orderFiber.join.ensuring(cleanup)
} yield ()

This will print the following result:

Water put on stove...
Preparing tomato...
Ordering some food...
Water boiled!
tomato ready
Preparing onion...
onion ready
Food arrived!
Cleaning up everything...
All clean

7. Conclusion

In this article, we discussed ZIO fibers and saw how they differ from the JVM threads. We also learned how we can use them to run concurrent tasks and how to interrupt them or make them dependent on each other.

As always, the code for the examples is available over on GitHub.

Comments are closed on this article!