1. Introduction

In this tutorial, we’ll see how to execute a group of tasks concurrently and wait for them to finish. We’re going to use coroutines instead of threads as the promoted way to achieve concurrency in Kotlin.

2. async-await

Kotlin’s async function allows running concurrent coroutines and returns a Deferred<T> result. Deferred is a non-blocking cancellable future to act as a proxy for a result that is initially unknown. For example, by calling Deferred#wait method, we wait until the task is done, and then we have the result.

Moreover, if we have a Deferred collection, we can use the awaitAll extension to wait until all of them finish:

@Test
fun whenAwaitAsyncCoroutines_thenAllTerminated() {
    val count = AtomicInteger()
    runBlocking {
        val tasks = listOf(
            async(Dispatchers.IO) { count.addAndGet(longRunningTask()) },
            async(Dispatchers.IO) { count.addAndGet(longRunningTask()) }
        )
        tasks.awaitAll()
        Assertions.assertEquals(2, count.get())
    }
}

Here, we passed Dipatchers.IO to the async function, but there are several other options as well. Dispatcher is responsible for determining the execution thread (or threads) of the coroutine. If we don’t pass anything, the async block will use the same dispatcher as the parent block.

2.1. Structured Concurrency With async

Nevertheless, there is a better approach to take advantage of Kotlin’s structured concurrency.

Structured concurrency means that the life of a coroutine is limited within its CoroutineScope, and a coroutine can be only launched within a scope. With structured concurrency, the outer scope doesn’t finish until all its children’s coroutines complete. As a result, it ensures that launched coroutines don’t lose and don’t leak. Furthermore, it reports errors properly and ensures they are never lost.

So, let’s run our concurrent tasks within a parent coroutine to benefit from structured concurrency — and we can get away from awaitAll and having to explicitly manage their life cycles:

@Test
fun whenParentCoroutineRunAsyncCoroutines_thenAllTerminated() {
    val count = AtomicInteger()
    runBlocking {
        withContext(coroutineContext) {
            async(Dispatchers.IO) { count.addAndGet(longRunningTask()) }
            async(Dispatchers.IO) { count.addAndGet(longRunningTask()) }
        }
        Assertions.assertEquals(2, count.get())
    }
}

The assertions execute after the withContext block. The withContext block, as the parent coroutine, completes when all child coroutines complete as well. Hence, we don’t have to call await or awaitAll explicitly.

In addition to withContext, there are other builders, like runBlocking or the coroutineScope builder. The coroutineScope lets us declare a scope for the new coroutine. However, the new coroutine doesn’t complete until all launched children complete.

Although runBlocking and coroutineScope builders may look similar as they both wait for their body and all its children to complete, keep in mind that the runBlocking method blocks the current thread for waiting, while coroutineScope just suspends.

3. launch-join

In cases where we don’t need the return value of the coroutine, we have the option to use the launch function. The launch function is an extension of CoroutineScope that returns a Job. We call the Job#join method to wait for the Job to complete. Additionally, if we have a collection of Jobs, then we call joinAll to wait until all of them complete:

@Test
fun whenJoinLaunchedCoroutines_thenAllTerminated() {
    val count = AtomicInteger()
    runBlocking {
        val tasks = listOf(
            launch (Dispatchers.IO) { count.addAndGet(longRunningTask()) },
            launch (Dispatchers.IO) { count.addAndGet(longRunningTask()) }
        )
        tasks.joinAll()
        Assertions.assertEquals(2, count.get())
    }
}

Like async, here we have the structured concurrency option, so let’s create the tasks within a parent coroutine:

@Test
fun whenParentCoroutineLaunchCoroutines_thenAllTerminated() {
    val count = AtomicInteger()
    runBlocking {
        withContext(coroutineContext) {
            launch(Dispatchers.IO) { count.addAndGet(longRunningTask()) }
            launch(Dispatchers.IO) { count.addAndGet(longRunningTask()) }
        }
        Assertions.assertEquals(2, count.get())
    }
}

All we mentioned before about the builder for the parent coroutine and the dispatcher also apply to the launch sample.

4. Conclusion

In this tutorial, we have learned about structured concurrency and shown samples of how to use async or launch for concurrent executions of coroutines.

As always, the complete code is available over on GitHub.

Comments are open for 30 days after publishing a post. For any issues past this date, use the Contact form on the site.