1. Introduction

If we need to represent a stream of values and process them asynchronously, Kotlin Coroutines library has us covered. The Flow class allows just that.

The Flow concept is similar to a Publisher from Reactive Streams. Or, we can think about it as an asynchronous Sequence, where instead of blocking on next() calls of Sequence iterator, we’ll suspend the coroutine and do something productive with our thread.

The Kotlin documentation also states that a Flow is cold, as opposed to a Channel, meaning that it holds no resources intrinsically and nothing executes until a collect() starts.

In this tutorial, let’s see how we can create a Flow and what its common use cases and properties are.

2. Constructing and Consuming a Flow

A Flow may be constructed from a basic builder flow {}, which is similar to a sequence {} builder. It expects that we call the emit() function one or more times:

val flowWithSingleValue = flow { emit(0) }

Alternatively, collections, ranges, and sequences may be converted to Flows using the .asFlow() extension function:

val flowFromList = listOf("mene", "mene", "tekel", "upharsin").asFlow()

Then we can consume the Flow:

val inscription = buildString {
    listOf("mene", "mene", "tekel", "upharsin")
      .asFlow()
      .collect { append(it).append(", ") }
}.removeSuffix(", ")
assertEquals(inscription, "mene, mene, tekel, upharsin")

Apart from a simple collect(), there are other terminal flow operators. Each of them is a suspend function that starts the collection of a Flow. Basically, we can do the same things with a Flow as with a Sequence, such as representing all the values as a List or a Set, reducing the flow, or taking only the first value:

val data = flow { "PEACE".asSequence().forEach { emit(it.toString()) } }
val charList = data.toList()
assertEquals(5, charList.size)

val symbols = data.toSet()
assertEquals(4, symbols.size)

val word = data.reduce { acc, c -> acc + c }
assertEquals("PEACE", word)

val firstLetter = data.first()
assertEquals("P", firstLetter)

3. Transforming Flows

Before calling a terminal operator, we can also change each Flow value with transformation operators. There are few surprises here — we have map {} and filter {} as our basic tools:

val symbols = (60..100).asFlow()
    .filter { it in secretCodes }
    .map { Char(it) }
    .toList()

The basis for all of the transformers is a transform {} function, which creates a new Flow from the parent one by calling emit():

val monthsOfSorrow = Month.values().asFlow()
    .transform {
        if (it <= Month.MAY)
            emit(it)
    }
    .toList()

Instead of doing it by hand, there are shortcuts to limit the size of the final collection:

Month.values().asFlow().take(5).toList()

This works in a slightly different way but still creates another Flow. It calls collect() within a try…catch block and throws an exception after the necessary number of items has been processed. The values that were collected are emitted downstream.

4. A Flow as a Stream of Updates

There are cases when the Flow is infinite and represents the current state of a parameter, say, the number of visitors on our site. In such cases, it doesn’t make sense to handle all the values, only the most recent ones.

To accomplish this, Flow features the conflate() method:

val buffer = mutableListOf<Int>()
(1..10).asFlow()
    .transform {
        delay(10)
        emit(it)
    }
    .conflate()
    .collect{
        delay(50)
        buffer.add(it)
    }
assertTrue { buffer.size < 10}

It skips values between two consecutive invocations of a collector function.

Or maybe only the very last value in the Flow means something. In that case, we can use the *Latest{} method family. These methods will apply the argument lambda to the latest accessible value and will cancel its execution if a more recent value arrives during that period:

var latest = -1
(1..10).asFlow()
    .collectLatest { latest = it }
assertEquals(10, latest)

5. Composing Flows

Sometimes, the transformation we need is a composition with another set of data. The simplest of such operations is zip():

val codes = listOf(80, 69, 65, 67).asFlow()
val symbols = listOf('P', 'A', 'C', 'E').asFlow()
val list = buildList {
    codes.zip(symbols) { code, symbol -> add("$code -> $symbol") }.collect()
}
assertEquals(listOf("80 -> P", "69 -> A", "65 -> C", "67 -> E"), list)

The zip() method will produce a new Flow that is a superposition of the two initial components.

If we need the most recent values of both of the flows, there is combine():

val arrowsI = listOf("v", "<", "^", ">").asFlow().map { delay(30); it }
val arrowsII = listOf("v", ">", "^", "<").asFlow().map { delay(20); it }
arrowsI.combine(arrowsII) { one, two -> "$one $two" }
    .collect { println(it) }

Each time one of the flows emits a new value, it will recalculate the result. The output will look like:

v v
v >
< >
< ^
^ <
> <

6. Handling Exceptions

The emitter code, transformer code, or collector code all can throw exceptions. A Flow must be transparent to exceptions, and it’s against the Flow idea to emit() from inside a try…catch block. This way, any exception that the collector throws can catch itself.

We can “catch” exceptions thrown during a transformation using the catch() operator. It can re-throw an exception, emit a value instead, or ignore the exception. The operation will catch only those exceptions that happened upstream, that is, in previous transformations. Any subsequent transformations and collections still may throw exceptions:

val result = (1..10).asFlow()
    .transform {
        if (it > 3) throw IllegalArgumentException("Too much")
        emit(it)
    }
    .catch {
        when (it) {
            is IllegalArgumentException -> emit(3)
            else -> throw it // rethrow unknown exceptions
        }
    }
    .toList()
assertEquals(listOf(1, 2, 3, 3), result)

7. Transparent Back-Pressure From Flows

Flow comes with an inherent back-pressure mechanism. Because emit() is a suspend function, it doesn’t calculate eagerly, but instead, continues when the collector is ready to take a new value. So, if the collector is expensive and the producer is more performant, it won’t overwhelm the collector:

val fastFlow = flow {
    (1..10).forEach {
        println("Before waiting: $it")
        delay(10)
        println("About to emit: $it")
        emit(Char(60 + it))
        println("After emitting: $it")
    }
}

fastFlow.collect {
    Thread.sleep(100) // Imitate hard CPU-bound operation
    println("Collected $it")
}

If we look at the output, we’ll see that the emitter loop continues only after the collector has finished the processing:

Before waiting: 1
About to emit: 1
Collected =
After emitting: 1
Before wating: 2
About to emit: 2
Collected >
After emitting: 2

8. Flow Context

Despite the fact that Flow doesn’t own any resources, it still has to run in some thread. The emit() function is a suspend function and, therefore, must run in a coroutine context along with the rest of the instructions inside the flow {} builder. By default, it’s the context of the collector.

If we want to run the emitter in a different context, we have to use the .flowOn() modifier:

flow {
    (1..10).forEach { emit(it) }
    assertTrue { "DefaultDispatcher-worker" in Thread.currentThread().name }
}.flowOn(Dispatchers.IO)
    .collect {
        println(it)
        assertTrue { "main" in Thread.currentThread().name }
    }

9. Conclusion

In this article, we looked closely at the Flow class. It’s a versatile and powerful concept that may be used for asynchronous communication between various parts of our application.

The Flow is “cold”, that is, it holds no coroutine contexts or threads by default. It’s similar to Sequence in the standard Kotlin library and Publisher in Reactive Streams. We can transform flows and compose them together. We can collect them all at once, putting their content into a conventional collection, element by element, or only in part – the first several elements, the latest element, and so on.

The Flow implicitly exerts back-pressure on the emitter. By default, it uses the context of a collector but might be configured otherwise. Also, we should avoid emitting values from inside the try…catch operator, as it may disrupt the collector.

As usual, all the code examples can be found over on GitHub.

Comments are closed on this article!