Generic Top

The early-bird price of the new Learn Spring Security OAuth course packages will increase by $50 on Wednesday:

>> CHECK OUT THE COURSE

1. Overview

Kotlin Coroutines can often add readability to reactive, callback-heavy code.

In this tutorial, we'll find out how to leverage these coroutines for building non-blocking Spring Boot applications. We’ll also compare the reactive and coroutine approaches.

2. Coroutine Motivation

Nowadays, it is common for systems to serve thousands or even millions of requests. Consequently, the development world is moving towards non-blocking computation and request handling. Utilizing system resource efficiently, by offloading I/O operations from core threads makes it possible to handle many more requests compared to the traditional thread per-request approach.

Asynchronous processing is not a trivial task and can be error-prone. Fortunately, we have tools for tackling this complexity like Java CompletableFutures or reactive libraries like RxJava. Indeed, the Spring framework already supports reactive approaches with the Reactor and WebFlux frameworks.

Asynchronous code can be hard to read, but the Kotlin language provides the concept of Coroutines to allow writing concurrent and asynchronous code in a sequential style.

Coroutines are very flexible, so we have more control over the execution of tasks via Jobs and Scopes. Besides that, Kotlin coroutines work perfectly side by side with existing Java non-blocking frameworks.

Spring will support Kotlin Coroutines from version 5.2.

3. Project Setup

Let's start by adding in the dependencies we'll need.

As most of the dependencies used in this tutorial don’t have stable releases yet, we'll need to include the snapshots and milestones repositories:

<pluginRepositories>
    <pluginRepository>
        <id>spring-snapshots</id>
        <name>Spring Snapshots</name>
        <url>https://repo.spring.io/snapshot</url>
        <snapshots>
            <enabled>true</enabled>
        </snapshots>
    </pluginRepository>
    <pluginRepository>
        <id>spring-milestones</id>
        <name>Spring Milestones</name>
        <url>https://repo.spring.io/milestone</url>
    </pluginRepository>
</pluginRepositories>

Let's use the Netty framework, an asynchronous client-server event-driven framework. We'll use NettyWebServer as an embedded implementation of a reactive web server.

Additionally, starting from version 3.0, the Servlet Specification introduces support for applications to process requests in a non-blocking manner. So, we could use a servlet container like Jetty or Tomcat as well.

Let's use these versions, including Spring 5.2 via Spring Boot:

<properties>
    <kotlin.version>1.3.31</kotlin.version>
    <r2dbc.version>1.0.0.M1</r2dbc.version>
    <r2dbc-spi.version>1.0.0.M7</r2dbc-spi.version>
    <h2-r2dbc.version>1.0.0.BUILD-SNAPSHOT</h2-r2dbc.version>
    <kotlinx-coroutines.version>1.2.1</kotlinx-coroutines.version>
    <spring-boot.version>2.2.0.M2</spring-boot.version>
</properties>

Next, as we're relying on WebFlux for asynchronous processing, it's very important to use spring-boot-starter-webflux instead of spring-boot-starter-web. So we need to include this dependency in our pom.xml:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
    <version>${spring-boot.version}</version>
</dependency>

Next, we'll add R2DBC dependencies to support reactive database access:

<dependency>
    <groupId>io.r2dbc</groupId>
    <artifactId>r2dbc-h2</artifactId>
    <version>${h2-r2dbc.version}</version>
</dependency>
<dependency>
    <groupId>io.r2dbc</groupId>
    <artifactId>r2dbc-spi</artifactId>
    <version>${r2dbc-spi.version}</version>
</dependency>

Finally, we'll add the Kotlin core and coroutines dependencies:

<dependency>
    <groupId>org.jetbrains.kotlin</groupId>
    <artifactId>kotlin-reflect</artifactId>
</dependency>
<dependency>
    <groupId>org.jetbrains.kotlin</groupId>
    <artifactId>kotlin-stdlib-jdk8</artifactId>
</dependency>
<dependency>
    <groupId>org.jetbrains.kotlinx</groupId>
    <artifactId>kotlinx-coroutines-core</artifactId>
    <version>${kotlinx-coroutines.version}</version>
</dependency>
<dependency>
    <groupId>org.jetbrains.kotlinx</groupId>
    <artifactId>kotlinx-coroutines-reactor</artifactId>
    <version>${kotlinx-coroutines.version}</version>
</dependency>

4. Spring Data R2DBC with Coroutines

In this section, we'll focus on accessing databases both in reactive and coroutines styles.

4.1. Reactive R2DBC

Let’s start with the reactive relational database client. Simply put, R2DBC is an API specification that declares a reactive API to be implemented by database vendors.

Our data store will be powered by the in-memory H2 database. Additionally, reactive relational drivers are available for PostgreSQL and Microsoft SQL.

At first, let’s implement a simple repository using the reactive approach:

@Repository
class ProductRepository(private val client: DatabaseClient) {

    fun getProductById(id: Int): Mono<Product> {
        return client.execute()
          .sql("SELECT * FROM products WHERE id = $1")
          .bind(0, id)
          .`as`(Product::class.java)
          .fetch()
          .one()
    }

    fun addNewProduct(name: String, price: Float): Mono<Void> {
        return client.execute()
          .sql("INSERT INTO products (name, price) VALUES($1, $2)")
          .bind(0, name)
          .bind(1, price)
          .then()
    }

    fun getAllProducts(): Flux<Product> {
        return client.select()
          .from("products")
          .`as`(Product::class.java)
          .fetch()
          .all()
    }
}

Here we're using the non-blocking DatabaseClient to execute queries against the database. Now, let’s rewrite our repository class using suspending functions and corresponding Kotlin types.

4.2. R2DBC with Coroutines

In order to transform functions from reactive to the Coroutines API, we add the suspend modifier before function definition:

fun noResultFunc(): Mono<Void>
suspend fun noResultFunc()

Furthermore, we can omit the Void return type. In case of a non-void result we just return a result of the defined type without wrapping it in the Mono class:

fun singleItemResultFunc(): Mono<T>
fun singleItemResultFunc(): T?

Next, if a source may emit more than a single item, we just change Flux to Flow as follows:

fun multiItemsResultFunc(): Flux<T>
fun mutliItemsResultFunc(): Flow<T>

Let's apply these rules and refactor our repository:

@Repository
class ProductRepositoryCoroutines(private val client: DatabaseClient) {

    suspend fun getProductById(id: Int): Product? =
        client.execute()
          .sql("SELECT * FROM products WHERE id = $1")
          .bind(0, id)
          .`as`(Product::class.java)
          .fetch()
          .one()
          .awaitFirstOrNull()

    suspend fun addNewProduct(name: String, price: Float) =
        client.execute()
          .sql("INSERT INTO products (name, price) VALUES($1, $2)")
          .bind(0, name)
          .bind(1, price)
          .then()
          .awaitFirstOrNull()

    @FlowPreview
    fun getAllProducts(): Flow<Product> =
        client.select()
          .from("products")
          .`as`(Product::class.java)
          .fetch()
          .all()
          .asFlow()
}

In the snippet above, there are several points that require our attention. Where do these await* functions come from? They are defined as Kotlin extension functions in the kotlin-coroutines-reactive library.

Furthermore, there are more extensions available in the spring-data-r2dbc library.

5. Spring WebFlux Controllers

So far, we’ve seen how to implement repositories, but haven’t made any actual queries to the datastore yet. So, in this section, we'll figure out how to apply coroutines with the Spring WebFlux framework by creating non-blocking controllers.

5.1. Reactive Controllers

Let’s define two simple endpoints that in turn query the database through our repository.

Let's start with the more familiar reactive style:

@RestController
class ProductController {
    @Autowired
    lateinit var productRepository: ProductRepository

    @GetMapping("/{id}")
    fun findOne(@PathVariable id: Int): Mono<Product> {
        return productRepository.getProductById(id)
    }

    @GetMapping("/")
    fun findAll(): Flux<Product> {
        return productRepository.getAllProducts()
    }
}

This raises the question, which thread is responsible for executing actual I/O operations? By default, each query's operations run on a separate reactor NIO thread that's chosen by an underlying scheduler implementation.

5.2. Controllers with Coroutines

Let’s refactor the controller by leveraging suspending functions and using the corresponding repository class:

@RestController
class ProductControllerCoroutines {
    @Autowired
    lateinit var productRepository: ProductRepositoryCoroutines

    @GetMapping("/{id}")
    suspend fun findOne(@PathVariable id: Int): Product? {
        return productRepository.getProductById(id)
    }

    @FlowPreview
    @GetMapping("/")
    fun findAll(): Flow<Product> {
        return productRepository.getAllProducts()
    }
}

At first, please note that findAll() function is not a suspending one. However, as far as we're returning Flow, it internally calls suspending functions.

For this version, database queries will run on the same reactor thread as in the reactive example.

6. Spring WebFlux WebClient

Next, imagine we have microservices in our system.

In order to complete a request, we need to query another service to get additional data. So, in our case, a good example would be getting a product stock quantity. To call another service via the API, we'll use WebClient from the WebFlux framework.

6.1. Reactive WebClient

To start with, let's see how to make a simple request:

val htmlResponse = webClient.get()
  .uri("https://www.baeldung.com/")
  .retrieve().bodyToMono<String>()

The next move is to call the external stock service to get stock quantity and then return a combined result to a client. At first, we'll get a product from the repository and then query the stock service:

@GetMapping("/{id}/stock")
fun findOneInStock(@PathVariable id: Int): Mono<ProductStockView> {
   val product = productRepository.getProductById(id)
   
   val stockQuantity = webClient.get()
     .uri("/stock-service/product/$id/quantity")
     .accept(MediaType.APPLICATION_JSON)
     .retrieve()
     .bodyToMono<Int>()
   return product.zipWith(stockQuantity) { 
       productInStock, stockQty ->
         ProductStockView(productInStock, stockQty)
   }
}

Notice that we're returning an object of the Mono<Product> type from the repository. Then, we're getting a Mono<Int> from the WebClient. Finally, the actual subscription happens when we call the zipWith() method. We wait for both requests to complete and finally combine them into a new object.

6.2. WebClient with Coroutines

Now, let's see how to use the WebClient with coroutines.

To perform a GET request, we apply awaitExchange() and awaitBody() suspending extension functions:

val htmlResponse = webClient.get()
  .uri("https://www.baeldung.com/")
  .awaitExchange()
  .awaitBody<String>()

Let's get back to our microservice example. We can execute a request against the stock service:

@GetMapping("/{id}/stock")
suspend fun findOneInStock(@PathVariable id: Int): ProductStockView {
    val product = productRepository.getProductById(id)
    val quantity = webClient.get()
      .uri("/stock-service/product/$id/quantity")
      .accept(APPLICATION_JSON)
      .awaitExchange()
      .awaitBody<Int>()
    return ProductStockView(product!!, quantity)
}

We should note that this looks like a blocking code. One of the main benefits of using coroutines is the ability to write asynchronous code in a fluent and readable way.

In the example above, the database query and the web request will be executed one after another. This is because coroutines are sequential by default.

Can we run suspending functions in parallel? Absolutely! Let's modify our endpoint method to run the queries in parallel:

@GetMapping("/{id}/stock")
suspend fun findOneInStock(@PathVariable id: Int): ProductStockView = coroutineScope {
    val product: Deferred<Product?> = async(start = CoroutineStart.LAZY) {
        productRepository.getProductById(id)
    }
    val quantity: Deferred<Int> = async(start = CoroutineStart.LAZY) {
        webClient.get()
          .uri("/stock-service/product/$id/quantity")
          .accept(APPLICATION_JSON)
          .awaitExchange().awaitBody<Int>()
    }
    ProductStockView(product.await()!!, quantity.await())
}

Here, by wrapping a suspending function in the async{} block we get an object of the Deferred<> type. By default, coroutines are immediately scheduled for execution. As a result to run them exactly when the await() method is called we need to pass CoroutineStart.LAZY as the optional start parameter.

Finally, to start executing functions we call the await() method. In that case, the two functions will execute in parallel. This technique is also known as parallel decomposition.

It's interesting to note that functions in async blocks are dispatched to separate worker threads. After that, actual I/O operations happen on threads from the Reactor NIO pool.

To enforce structured concurrency we've used the coroutineScope{} scoping function to create our own scope. It will wait for all the coroutines inside the block to complete before completing itself. However, the coroutineScope{} function doesn't block the current thread compared to runBlocking.

7. WebFlux.fn DSL Routes

Finally, let's see how to use coroutines with DSL Routes definitions.

The WebFlux Functional Framework, powered by Kotlin, provides a concise and fluent way to define endpoints. The coRouter {} DSL supports Kotlin Coroutines for defining router functions.

First, let's define router endpoints in the DSL fashion:

@Configuration
class RouterConfiguration {
    @FlowPreview
    @Bean
    fun productRoutes(productsHandler: ProductsHandler) = coRouter {
        GET("/", productsHandler::findAll)
        GET("/{id}", productsHandler::findOne)
        GET("/{id}/stock", productsHandler::findOneInStock)
    }
}

Now we have our route definitions, let's implement the ProductsHandler with the same functionality as the ProductController had:

@Component
class ProductsHandler(
  @Autowired var webClient: WebClient, 
  @Autowired var productRepository: ProductRepositoryCoroutines) {
    
    @FlowPreview
    suspend fun findAll(request: ServerRequest): ServerResponse =
        ServerResponse.ok().json().bodyAndAwait(productRepository.getAllProducts())

    suspend fun findOneInStock(request: ServerRequest): ServerResponse {
        val id = request.pathVariable("id").toInt()
        val product: Deferred<Product?> = GlobalScope.async {
            productRepository.getProductById(id)
        }
        val quantity: Deferred<Int> = GlobalScope.async {
            webClient.get()
              .uri("/stock-service/product/$id/quantity")
              .accept(MediaType.APPLICATION_JSON)
              .awaitExchange().awaitBody<Int>()
        }
        return ServerResponse.ok()
          .json()
          .bodyAndAwait(ProductStockView(product.await()!!, quantity.await()))
    }

    suspend fun findOne(request: ServerRequest): ServerResponse {
        val id = request.pathVariable("id").toInt()
        return ServerResponse.ok()
          .json()
          .bodyAndAwait(productRepository.getProductById(id)!!)
    }
}

We should note that we've used suspending functions for defining the ProductsHandler class. Not much changed compared to the controller except request and response types.

This is all we need to set up a simple REST controller. Consequently, thanks to using the Routes DSL along with Kotlin coroutines, we have fluent and concise endpoint definitions.

8. Conclusion

In this article, we've explored Kotlin coroutines and found out how to integrate them with Spring frameworks, R2DBC, and WebFlux in particular.

Applying non-blocking approaches in a project may improve application performance and scalability. In addition, we've seen how using Kotlin coroutines can make asynchronous code more readable.

We should note that the mid-development versions of the above libraries may change a lot before they reach stable releases and that minor version differences may be incompatible with each other.

The code of the examples is available, as always, over on GitHub.

Generic bottom

The early-bird price of the new Learn Spring Security OAuth course packages will increase by $50 on Wednesday:

>> CHECK OUT THE COURSE
newest oldest most voted
Notify of
Andrei Silviu Dragnea
Guest
Andrei Silviu Dragnea

There are two problems in the code snippets using the “async” coroutine builder: By default, the coroutine is scheduled immediately for execution and not when “await()” is called on the returned “Deferred”, as also stated here: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/async.html . This can be changed by using an explicit value of “CoroutineStart.LAZY” for the start parameter of the coroutine builder. While it is true that, behind the scenes, Spring starts a coroutine in the GlobalScope in order to execute suspending functions from controllers (see https://github.com/spring-projects/spring-framework/blob/master/spring-core-coroutines/src/main/kotlin/org/springframework/core/CoroutinesUtils.kt for details), it is not recommended to use coroutine builders on the GlobalScope in user code. Structured concurrency… Read more »

Eric Martin
Member
Eric Martin

Great point. The article has been updated.

Andrei Silviu Dragnea
Guest
Andrei Silviu Dragnea

There are two problems with the usage of the async coroutine builder: 1. It is not true that the execution of the coroutine started by the async coroutine builder does not start until await is called on the returned Deferred. By default, the coroutine is immediately scheduled for execution. It can be made to start lazily, by calling the async coroutine builder with a CoroutineStart.LAZY value for the start parameter. For details, see here: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/async.html 2. While it is true that Spring, behind the scenes (see here: https://github.com/spring-projects/spring-framework/blob/master/spring-core-coroutines/src/main/kotlin/org/springframework/core/CoroutinesUtils.kt), launches coroutines in the GlobalScope in order to execute the suspending functions… Read more »

Eric Martin
Member
Eric Martin

Thanks again. The article and code have been updated.

Comments are closed on this article!