1. Overview

Single in RxJava and Deferred in Kotlin Coroutines are mechanisms for performing asynchronous operations. They allow us to execute time-consuming tasks without blocking the main thread.

We might already be using Single in a project but wish to migrate to Deferred as an alternative solution. Conversion is one of the ways to achieve this without having to do a full rewrite.

In this article, we’ll experiment with various ways to convert Single to Deferred.

2. Comparing Single vs. Deferred

Single is one of the abstracts provided by RxJava that represents an Observable that will only produce one item or one error.

We can use Single when we only want to receive one result from an operation, such as an API call or reading data from a database.

Deferred is a data type used in Kotlin Coroutines to represent results that will be available in the future.

Meanwhile, we can also use Deferred for potentially long-running operations such as network or API calls, where the results will be available after a time delay.

Conceptually, Deferred and Single have a similar purpose – namely, to represent a single result of a potentially long-running operation:

Feature Single (RxJava) Deferred (Kotlin Coroutines)
Purpose Represents an Observable with one item or error Represents a result that will be available in the future
Use Case Receive one result from an operation Handle potentially long-running operations
Result Handling One item or error Result available after a time delay
Error Handling Handles errors through onError() Errors can be caught using try/catch or await()
Get results subscribe(), blockingGet() await()

 

The table above shows a brief overview of the comparison between Single and Deferred.

3. Convert Single to Deferred

For demonstration purposes, we have data in the form of a list of products:

data class Product(val id: Int, val name: String, val price: Double)

private val allProducts = listOf(
  Product(1, "Samsung", 1200.0),
  Product(2, "Oppo", 800.0),
  Product(3, "Nokia", 450.0),
  Product(4, "Lenovo", 550.0),
  Product(5, "ASUS", 400.0)
)

Next, we create a function that produces Single<List<Product>> that has been filtered by prices above 500 and sorted by price:

private fun getFilteredProducts(): Single<List<Product>> {
    return Single.just(
      allProducts
    ).map { products ->
      products.sortedBy { it.price }.filter { it.price > 500 }
    }.subscribeOn(Schedulers.io())
}

Let’s also prepare a function we can use later to validate that the results are as we expect:

private suspend fun Deferred<*>.assertOver500AndSorted() {
    assertThat(this.await() as List<*>).containsExactly(
      Product(4, "Lenovo", 550.0),
      Product(2, "Oppo", 800.0),
      Product(1, "Samsung", 1200.0)
    )
}

This function will validate that a result is truly a Deferred object that represents a filtered List<Product>:

deferred.assertOver500AndSorted()

Next, we’ll convert from Single to Deferred in a number of different ways. Let’s experiment!

3.1. Using async() and blockingGet()

The async function is used to start the execution of a coroutine asynchronously. When we use the async function, a coroutine will be started to execute the given task and return a Deferred representing the result of the task:

val futureResult: Deferred<T> = async { /* do something */ }

In this case, we use the RxJava blockingGet() method that waits in a blocking fashion until the current Single signals a success value (which is returned) or an exception (which is propagated).

So, we can use this method to convert Single to Deferred:

val deferred = async { getFilteredProducts().blockingGet() }

getFilteredProducts().blockingGet() is called within the coroutine and will block the coroutine until getFilteredProducts() completes.

3.2. Using CompletableDeferred

CompletableDeferred is an instance of Deferred that allows us to create Deferred objects whose results can be set manually at a later time. We can complete a CompletableDeferred by using complete() or completeExceptionally() methods:

public abstract fun complete(value: T): kotlin.Boolean

We can also use a completeExceptionally() method to handle operations that should return a value that has failed with an exception:

public abstract fun completeExceptionally(exception: kotlin.Throwable): kotlin.Boolean

So, we’ll use the complete() function to send the results of the subscribe() operation from Single and then store them in CompletableDeferred:

val deferred = CompletableDeferred<T>()
getFilteredProducts().subscribe(deferred::complete, deferred::completeExceptionally)

getFilteredProducts().subscribe(…) subscribes to the observable returned by getFilteredProducts() and has two callbacks.

The deferred::complete is called when the observable emits a value, completing the CompletableDeferred with that value. Meanwhile, deferred::completeExceptionally is called when the observable emits an error, completing the CompletableDeferred with that exception.

3.3. Using suspendCoroutine or suspendCancellableCoroutine

The suspendCoroutine is a function in Kotlin Coroutines to suspend the execution of a coroutine and wait for the results of a lambda:

suspend inline fun <T> suspendCoroutine(
  crossinline block: (Continuation<T>) -> kotlin.Unit
): T

Whereas, suspendCancellableCoroutine is like suspendCoroutine, but with the added ability to cancel the coroutine while it is waiting. This uses continuation.cancel() to cancel the routine:

public suspend inline fun <T> suspendCancellableCoroutine(
  crossinline block: (CancellableContinuation<T>) -> kotlin.Unit
): T

Let’s create Deferred with suspendCoroutine:

val deffered = async {
  suspendCoroutine { continuation ->
    getFilteredProducts().subscribe(continuation::resume, continuation::resumeWithException)
  }
}

When getFilteredProducts() emits a value, continuation::resume will be called, resuming the coroutine with that value.

But, if getFilteredProducts() emits an error, continuation::resumeWithException will be called, resuming the coroutine with that exception.

3.4. Using Kotlinx Coroutines Rx3

Kotlinx Coroutines Rx3 is part of the Kotlin Coroutines ecosystem that provides powerful integration between Kotlin Coroutines and RxJava 3. In situations where we have a combination of RxJava and Coroutine operations, Kotlin Coroutines Rx3 allows us to use both.

But, because this is not yet bundled in the default Kotlin Coroutines, we must first declare it as a dependency:

<dependency>
    <groupId>org.jetbrains.kotlinx</groupId>
    <artifactId>kotlinx-coroutines-rx3</artifactId>
    <version>1.8.0</version>
</dependency>

This integration allows us to use RxJava 3 operators and flows within Kotlin core routines, as well as convert between RxJava 3 data types such as Completable, Single, or Observable with Kotlin core data types such as Deferred or Flow.

Kotlinx Coroutines Rx3 has an await() function to wait for the completion of a Single value response without blocking the thread. It returns the result value or throws an exception if an error occurs in the response:

suspend fun <T> SingleSource<T & Any>.await(): T

So, we create a Deferred with Single.await() and wrap it in async{}.

val deferred = async { getFilteredProducts().await() }

We can call await() on Single coroutines. If successful, await() will return the value emitted from the Single.

4. Conclusion

In this article, we’ve discussed various ways of converting Single to Deferred.

We can use async() and blockingGet() for a simple conversion.

If we need to resolve or redirect an exception, then we can use CompletableDeferred.

The suspendCoroutine function is an option if we need a suspendable function rather than a callback-style function. Another option is to use suspendCancellableCoroutine, which allows for coroutine cancellation while waiting, albeit requiring more coding.

However, if we need complete integration of RxJava 3 with Kotlin Coroutines including Observable, Single, Flowable, and others, then Kotlinx Coroutines Rx3 is a handy solution.

All the code examples, as usual, are in our repository over on GitHub.

Subscribe
Notify of
guest
0 Comments
Inline Feedbacks
View all comments