1. Introduction

In Kotlin coroutines, the Flow is a powerful construct for handling sequential data streams asynchronously. Sometimes we’ll work at once with many flows that we need to merge to process data efficiently. When we merge flows, we combine data from different sources into a single stream, facilitating concurrent processing and enhancing performance.

In this tutorial, we’ll explore various techniques for merging Kotlin flows, accompanied by code samples illustrating each approach.

2. Understanding Flows in Kotlin

Before diving into merging flows, let’s quickly recap Kotlin Flows. Flows are asynchronous streams of data that emit values sequentially. They handle potentially large datasets without blocking, making them ideal for asynchronous programming tasks.

To work with flows, we need to add the kotlinx-coroutines-core dependency to our Gradle build file:

dependencies {
    implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.8.0")
}

If we’re using Maven, we need to add the dependency to our pom.xml:

<dependencies>
    <dependency>
        <groupId>org.jetbrains.kotlinx</groupId>
        <artifactId>kotlinx-coroutines-core</artifactId>
        <version>1.8.0</version>
    </dependency>
</dependencies>

3. The merge() Method

The merge() function joins multiple flows into a single flow without preserving the order of elements. This method accepts a variable number of flows as input arguments and concurrently collects elements from each of them emitting them into the merged flow as they become available:

@Test
fun `should merge using the merge function`() = runBlocking {
    val flow1 = flowOf(1, 2, 3, 4)
    val flow2 = flowOf(5, 6, 7, 8)
    val mergedFlow = merge(flow1, flow2)
    val result = mergedFlow.toList()
    assertThat(result).containsExactlyInAnyOrder(1, 2, 3, 4, 5, 6, 7, 8)
}

This code merges two flows flow1 and flow2 into a single flow with the merge() function. The toList() function adds all items from mergedFlow to the final result list.

4. The zip() Method

Another useful method for merging flows is the zip() function. The zip() function combines elements from multiple flows pairwise, emitting a single value containing elements from each flow:

@Test
fun `should merge using zip`() = runBlocking {
    val flow1 = flowOf(1, 2, 3)
    val flow2 = flowOf("A", "B", "C")
    val result = flow1.zip(flow2) { num, letter ->
        "$num$letter"
    }.toList()
    assertEquals(listOf("1A", "2B", "3C"), result)
}

In this example, we create two flows to merge. The zip() function combines corresponding elements from each flow using the provided lambda function to produce a single string value. In this case, the lambda concatenates the integer and string values together.

5. The combine() Method

The combine() function is useful when merging flows and applying a transformation function to the combined elements. It combines the latest values from each flow whenever any of the flows emit a new value:

@Test
fun `should merge using combine`() = runBlocking {
    val flow1 = flowOf(0)
    val flow2 = flowOf(1, 2, 3)
    val result = flow1.combine(flow2) { num1, num2 ->
        num1 + num2
    }.toList()
    assertEquals(listOf(1, 2, 3), result)
}

We start with two flows emitting values 0 and 1, 2, and 3 respectively. Next, we call the combine() function on flow1 and pass in flow2 to merge their emitted values with a lambda function. Since combine() waits for both flows to emit values, the operation ensures the combining operation is complete before collecting the values into a list.

6. The flatMapConcat() Method

The flatMapConcat() function merges Flows, one after the other. This approach maintains the original order of each flow:

@Test
fun `should merge using flatmapconcat`() = runBlocking {
    val flow1 = flowOf(1, 2, 3)
    val flow2 = flowOf(4, 5, 6)
    val result = flow1.flatMapConcat { value1 ->
        flow2.map { value2 -> value1 + value2 }
    }.toList()
    assertEquals(listOf(5, 6, 7, 6, 7, 8, 7, 8, 9), result)
}

Calling the flatMapConcat() on flow1 effectively combines each element of flow1 with every element of flow2. The resulting Flow will be the product of the input Flows, which in this case results in nine total elements.

The flatMapConcat() function operates sequentially, meaning it waits for each inner flow to complete before processing the next element from the outer flow. Thus, it first maps each element emitted by flow1 to a new flow by adding it to every element emitted by flow2 repeatedly.

7. The flatMapMerge() Method

If we want to merge flows concurrently, we can use the flatMapMerge() function. This function merges flows concurrently allowing elements to be emitted as soon as they’re available, which can improve performance for large datasets:

@Test
fun `should combine using flatmapmerge`() = runBlocking {
    val flow1 = flowOf(1, 2, 3)
    val flow2 = flowOf(4, 5, 6)
    val result = flow1.flatMapMerge { value1 ->
        flow2.map { value2 -> value1 + value2 }
    }.toList()
    assertEquals(listOf(5, 6, 7, 6, 7, 8, 7, 8, 9), result)
}

Calling flatMapMerge() on flow1 combines each element of flow1 with every element of flow2, similar to flatMapConcat(). However, unlike flatMapConcat(), flatMapMerge() operates concurrently, allowing the processing of elements from both flows concurrently.

This concurrent processing enables better utilization of system resources and potentially reduces overall processing time especially if the operations within the flows are IO-bound or involve asynchronous operations.

The result is a sequence of merged values, which may not be ordered the same way as with flatMapConcat() due to concurrent processing.

8. Conclusion

Merging flows in Kotlin is a powerful technique for efficiently combining data from multiple sources. By utilizing functions like merge(), combine(), zip(), flatMapConcat(), or flatMapMerge(), we can merge flows based on our requirements, enabling us to design more robust and efficient asynchronous data processing pipelines in Kotlin applications.

As always, the source code for the examples is available over on GitHub.

2 Comments
Oldest
Newest
Inline Feedbacks
View all comments
Comments are open for 30 days after publishing a post. For any issues past this date, use the Contact form on the site.