1. Introduction

A Flow is an asynchronous stream that emits sequences of values. Flow is similar to a cold Publisher from Reactive Streams, which means nothing executes until a terminal operator such as collect() or reduce() is called on the Flow.

Besides the terminal operators, there’s also a set of intermediate operators like map(), filter(), take(), zip(), etc. These functions allow us to perform one or more intermediary transformations on our Flow.

In this tutorial, we’re interested in a group of intermediate operators that can be used to flatten a flow.

2. Flows Flattening Operators

Based on the nature of Flows, it’s quite easy to get into a situation where each value triggers a request for another sequence of values. In this case, we end up with a flow of flows (Flow<Flow<Any>>). With flattening operators, we can flatten it into a single flow for further processing.

The flatten() and flatMap() operators are available for Collection and Sequence, but it’s a bit different for Flow as its asynchronous nature requires different modes of flattening. Therefore, a family of flattening operators on flows exists.

Let’s learn more about these operators, their usage, and the differences between them.

2.1. flatMapConcat()

The flatMapConcat() operator does the task of transforming, serial concatenating, and flattening, and is a shortcut for calling both map() and flattenConcat().

flattenConcat() and therefore flatMapConcat() wait for the inner flow to complete before starting to collect the next one, which means they transform and then merge the inner flow sequentially:

val result = flow { // flow builder
    for (i in 1..3) {
        delay(1) // pretend we are doing something useful here
        emit(i) // emit next value
    }
}.flatMapConcat { it ->
    flow {
        emit(it * 2)
        delay(2)
        emit(it * 2 + 1)
    }
}.toList()

Let’s see the output of result:

[2, 3, 4, 5, 6, 7]

Here we can clearly see that the final flow is ordered, which shows that the flowMapConcat() block is processing each emit of the first flow through our concatenation transformation, before resuming the initial flow.

2.2. flatMapMerge()

The flatMapMerge() operator transforms and concurrently collects the incoming flows and then flattens them into a single flow.

This operator is a shortcut for map() and flattenMerge(). It has a concurrency parameter to control the number of concurrent flows that should be collected. The default value is 16 (DEFAULT_CONCURRENCY).

If we set the concurrency equal to 1, flatMapMerge() behaves like flatMapConcat().

Let’s try flatMapMerge():

val result = flow { // flow builder
    for (i in 1..3) {
        delay(1) //time consuming logic
        emit(i)
    }
}.flatMapMerge { it ->
    flow {
        emit(it * 2)
        delay(2) //time consuming logic
        emit(it * 2 + 1)
    }
}.toList()

Here is the output of result:

[2, 4, 3, 6, 5, 7]

The fact that we have an unordered result, shows that flatMapMerge() isn’t blocking the initial Flow to emit the new values, and the child flows are working concurrently.

2.3. flatMapLatest()

This flattening operator creates a new Flow by transforming the emitted value. flatMapLatest() cancels the previous Flow created by the transform block. This behavior makes flatMapLatest() the best option to cancel the ongoing flows in response to new events.

A use case for flatMapLatest() could be to show a user the result of their latest input.

Let’s see a code example:

val result = flow { // flow builder
    for (i in 1..3) {
        delay(1) // time consuming logic
        emit(i) // emit next value
    }
}.flatMapLatest { it ->
    flow {
        emit(it * 2)
        delay(2)
        emit(it * 2 + 1)
    }
}.toList()

And here’s the output of result:

[2, 4, 6, 7]

Looking at the result, we see that we have 2 as the first item, and then we have 4. It shows that the child flow was waiting for two milliseconds before emitting 3, while in the meantime the first Flow emitted 2 as the new value. This canceled the first child flow, and a new child flow is started that has emitted 4, and so on. The first Flow isn’t canceled after the emission of 3. Since this is the last number in our initial range, we see 7 in the result.

3. Conclusion

In this article, we looked at the flattening operators for Flow. Flow is similar to Sequence in the standard Kotlin library and Publisher in Reactive Streams. We can transform flows and compose them together, and by flattening we get a single Flow from a Flow of Flows.

Additionally, we saw that we can use flatMapConcat() when we want to collect the flows sequentially, and flatMapMerge() when we’re interested in concurrent collecting. On the other hand, if we have a use case in which we only care about processing the newest event, flatMapLatest() is the best choice.

As always, the source code for the examples is available over on GitHub.

Comments are closed on this article!