1. Introduction

When it comes to any approach or technology, the question is: what problem are we trying to solve?

RxJava and its extension, RxKotlin, are an implementation of the Reactive Manifesto, which mandates the applications to be responsive, resilient, elastic, and message-driven. The software that implements these principles uses asynchronous communication with external data sources, applies back-pressure on request producers, and degrades gracefully in case of system failures.

This should be the basis of our comparison between the Coroutines and Reactive Streams library. If they both are suitable, we should see which approach is more readable and better runs in production.

However, the Kotlin Coroutines library is not a RxKotlin competitor. Its scope is much broader. The Kotlin Coroutines approach consists of two very distinct concepts: the suspend word, which is just a Kotlin language keyword, and the default implementation of that keyword provided by the kotlinx.coroutines library. The suspend keyword guarantees collaborative concurrency: on entering any suspend function, the control is offered to other coroutines that might need it for their continuation.

And so, in their foundation, coroutines have nothing to do with messaging, back-pressure, or even asynchronicity. Their primary goal is to enable non-blocking waits between CPU-heavy parts of the program. This method allows other coroutines to utilize the CPU better. Certainly, by providing us with a way to create lightweight thread-like coroutines, this library offers an approach to asynchronous programming, which is novel in the JVM world.

2. Reactive Systems Are Responsive

Let’s read the Reactive Manifesto and see how we can use RxKotlin and Kotlin Coroutines to achieve its principles. The first point is the responsiveness of the system. The responses must be “rapid” and with “consistent times”, and all “problems” must be “detected quickly”. The latter principle usually sounds like “fail fast”. Can coroutines fail fast? Absolutely:

withTimeout(100.milliseconds) {
    delay(3.seconds)
    // Will throw TimeoutCancellationException
}

Another aspect of being responsive is to manage resources so that requests that actually can be processed would be processed. At the same time, there should always be a reserve for the control circuit. This goal is easily achieved with coroutines, which do not consume resources while waiting for asynchronous IO. Nonetheless, it’s possible to starve a CoroutineDispatcher with blocking code:

repeat(3) {
    starvingContext.launch {
        Thread.sleep(30000) // this code blocks the dispatchers threads
    }
}

runBlocking {
    withTimeout(100.milliseconds) {
        starvingContext.launch { println("A quick task which will never execute") }.join()
    }
}

Conversely, if we had used a non-blocking version of the waiting function, we would have seen the printed message:

repeat(3) {
    starvingContext.launch {
        delay(30000) // this function doesn't block
    }
}
runBlocking {
    withTimeout(100.milliseconds) {
        withContext(workingContext.coroutineContext) {
            println("This messages gets to be printed")
        }
    }
}

Now let us see what the Rx library offers in terms of responsiveness:

Observable
    .fromCallable {
        Thread.sleep(30000)
        "result"
    }
    .subscribeOn(worker)
    .timeout(100, TimeUnit.MILLISECONDS)
    .subscribe({ msg -> println(msg) }, { worker.shutdown() })

Instead of putting the timeout around the long operation, we put it after in a separate operator. On the downside, we might note that the Rx library is less explicit in what operation happens where. For instance, nothing in the code above says where onNext and onError lambdas will run or which thread is responsible for keeping track of our timeout. Contrary to that, Kotlin coroutines declare explicit concurrency as their main principle. We must mention even default dispatchers by name to run coroutines on their threads.

When managing parallel execution, the Rx library falls back onto the standard JVM thread model: we can manipulate threads for running that Observable callable and executing subscriber actions. The way Rx handles it is by creating Scheduler-s:

val worker = Schedulers.computation() // or .io(), or .single()

This abstraction is simpler than coroutine scope plus context plus dispatcher constellation, but we still need to manage thread pools and balance their sizes against each other.

3. Reactive Systems Are Resilient

The manifesto states: “Resilience is achieved by replication, containment, isolation, and delegation.” It also says: “Recovery of each component is delegated to another (external) component”.

Errors, i.e., exceptions in Kotin Coroutines, are just normal exceptions. Without additional libraries, such as ArrowKt, it’s impossible to mandate error handling on a specific level. All exceptions in Kotlin are undeclared ones. This is handled differently in Rx:

Observable.error<CustomException>(CustomException())
    .subscribe({ msg -> println(msg) }, { ex -> failed = ex !is CustomException })
    .dispose()

However, the error handler lambda is not mandatory, and the default behavior is just to skip the errors and only dump them into System.err. This might sound like a disadvantage over the coroutine approach, where uncaught exceptions will boil up until handled, or they destroy the whole application.

When it comes to component supervision, each coroutine runs in its own context, and these contexts form a hierarchy. Parent contexts are responsible for their children and the coroutines that run in them. Granted, by default, a failed child will poison and stop the whole hierarchy. We must admit that in error handling Rx approach makes more sense, or it would if it was more strictly enforced, while Coroutines can heavily benefit from functional libraries.

4. Reactive Systems Are Elastic

Until the Rx libraries are rewritten on Java Fibers, Kotlin Coroutines will continue to be more cost-effective. Regarding scaling, Rx and Coroutines offer the same capabilities: a cached thread pool will scale up the number of threads if all of the existing ones are busy. It doesn’t matter if we use that thread pool in an Rx Scheduler or a Kotlin CoroutineDispatcher.

However, since a thread is a rather heavy resource, we can launch a significantly fewer number of them.

5. Reactive Systems Are Message-Driven

Here we come again to the fact that the coroutines are a lower-level concept than the Observables and Flowables of the Rx libraries. We certainly have tools to set up a message exchange within our application, like Flow and Channel:

val pipeline = Channel<String>()
scopeA.launch {
    (1..10).map {
        pipeline.send(it.toString())
    }
    pipeline.close()
}
withContext(scopeB.coroutineContext) {
    pipeline.consumeAsFlow().map {
        println("Received message: $it")
        it
    }.toList()
}

However, Kotlin Coroutines don’t constrain us to use them, while it’s hard to use the Rx library without messaging: we start by creating an Observable, which produces one or more messages.

The manifesto also notes that the message-driven communication must be asynchronous, allow for back-pressure on the part of the receiver, and support non-blocking IO. Let’s see how coroutines compare with Rx on these topics.

5.1. Asynchronous Communication

It’s really easy to launch any process asynchronously with coroutines, but we have to be explicit about this:

val a = async { requestOverNetwork(0) }
val b = async { requestOverNetwork(1) }
val c = async { requestOverNetwork(2) }
a.await() + b.await() + c.await()

In the example, the three network requests will run in parallel, and the coroutine will pause when it needs its results. What about RxKotlin?

Observable.merge(
    listOf(
        observeOverNetwork(0),
        observeOverNetwork(1),
        observeOverNetwork(2)
    )
).reduce { t1, t2 -> t1 + t2 }
    .subscribe(testSubscriber)

Obviously, it does its job, but the coroutine syntax is clearer.

5.2. Back-pressure

Again, we have to implement this principle in our software actively. We can use Flow, which has an inherent back-pressure mechanism , or employ a Channel, with or without a buffer. A Channel will suspend until there is a space in its buffer for the new message.

But it is our decision to use Flow-s and Channel-s or not. In Rx land, the back pressure is more sophisticated.

5.3. Non-blocking IO

Both coroutines and the Rx library depend on underlying IO implementations to actually be non-blocking. However, coroutines have a clear advantage, as they can express non-blocking statements directly with the language:

private suspend fun AsynchronousFileChannel.asyncRead(dst: ByteBuffer, position: Long = 0): Int = suspendCoroutine {
        read(dst, position, it, object : CompletionHandler<Int, Continuation<Int>> {
            override fun completed(result: Int, attachment: Continuation<Int>) = it.resume(result)
            override fun failed(exc: Throwable, attachment: Continuation<Int>) = it.resumeWithException(exc)
        })
    }
// then later in the application
fileChannel.asyncRead(buffer)

The Rx approach also supports non-blocking libraries, such as Netty, although, of course. The configuration to use them is quite similar:

val buffer = ByteBuffer.allocate(13)
Observable.create { emitter ->
    fileChannel.read(buffer, 0, emitter, object : CompletionHandler<Int, ObservableEmitter<String>>{
        override fun completed(result: Int, attachment: ObservableEmitter<String>) {
            emitter.onNext(String(buffer.array()))
            emitter.onComplete()
        }
        override fun failed(exc: Throwable, attachment: ObservableEmitter<String>) {
            emitter.tryOnError(exc)
        }
    })
}.subscribe(testSubscriber)

6. Using the Rx Library With Coroutines

As we discovered, coroutines introduce concepts that are on a lower level than those of Reactive libraries. We can use these concepts to achieve the same results. Alternatively, if we don’t follow the Reactive Manifesto principles actively as we build our application, it won’t be loosely coupled or asynchronous. The Rx libraries put us on a much stricter basis.

Then it’s a good thing that we don’t have to choose. Kotlin extensions provide a set of adapter functions that transform Rx entities into Kotin Coroutine ones and back:

val observable = Observable.just("apple")
val result = observable.awaitSingle()

flow {
    (1..5).forEach { emit(it) }
}.asObservable(this.coroutineContext).subscribe(testSubscriber)

One of a disadvantage some people see in the Rx libraries is their pervasiveness. If we use an Rx construction in one place of our application, it becomes very tedious to use other approaches elsewhere, as the handover between regular code and Rx functionals is a bit awkward. Using Kotiln coroutines as a glue between blocking and non-blocking parts of the application might be a good idea.

7. Conclusion

In this article, we compared Kotlin Coroutines with RxKotlin/RxJava libraries. It turns out that the coroutines cover the tenets of the Reactive Manifesto very well, with the possible exception of error handling. Additionally, the coroutines are better rooted in the language and make fewer assumptions about our coding style. For instance, we can be as functional as we choose without needing to convert the whole codebase into monads immediately.

On the other hand, for teams that are already familiar with the Rx approach, it might be easier to continue using the Reactive style. The good thing about it is that Kotlin extensions provide adapters that allow to bridge the gap between Rx and Kotlin coroutines and use them both as needed.

All the code examples are, as usual, over on GitHub.

Comments are closed on this article!