1. Introduction

In this tutorial, we’ll look into the RSocket library and how to use it with the Kotlin programming language.

2. What Is RSocket?

RSocket describes itself as an “application protocol providing Reactive Streams semantics”. It provides us with libraries for various languages that allow us to implement a variety of reactive network streams. This, in turn, lets us write networked applications that can communicate with each other in a variety of ways.

By defining RSocket as an application protocol first, we’re able to write our client and server applications in different programming languages and still interact correctly between them.

The Kotlin implementation that we’re going to explore in this article is built on top of Ktor and Kotlinx Coroutines.

3. Setting Up

Before we’re able to actively use RSocket, we need to do some setting up. This includes ensuring the correct dependencies are available and that we have the base components in our code.

3.1. Setting up a Server

For a server, we need to ensure that rsocket-core and rsocket-ktor-server-jvm are available, as well as ktor-server-netty-jvm and ktor-server-cio-jvm:

<dependency>
    <groupId>io.rsocket.kotlin</groupId>
    <artifactId>rsocket-core</artifactId>
    <version>0.15.4</version>
</dependency>

<dependency>
    <groupId>io.ktor</groupId>
    <artifactId>ktor-server-netty-jvm</artifactId>
    <version>2.3.7</version>
</dependency>
<dependency>
    <groupId>io.ktor</groupId>
    <artifactId>ktor-server-cio-jvm</artifactId>
    <version>2.3.7</version>
</dependency>
<dependency>
    <groupId>io.rsocket.kotlin</groupId>
    <artifactId>rsocket-ktor-server-jvm</artifactId>
    <version>0.15.4</version>
</dependency>

Note that we’ve included the JVM dependencies for both RSocket and Ktor since this article targets the JVM as a runtime. Alternative dependencies are also available for JS and native runtimes.

We then need to create a network server. Note that RSocket is built on top of Ktor, specifically on its WebSocket functionality. As such, we need to create a correctly configured Ktor server:

embeddedServer(Netty, port = 9000) {
    install(io.ktor.server.websocket.WebSockets)
    install(io.rsocket.kotlin.ktor.server.RSocketSupport)
    routing {
        // Add RSocket endpoints
    }
}.start(wait = true)

We must install the WebSockets extension before the RSocketSupport one for this to work correctly.

After completing this, we’ll have a server ready for RSocket integration.

3.2. Setting up a Client

For a client, we instead need rsocket-core and rsocket-ktor-client-jvm, as well as ktor-client-cio-jvm:

<dependency>
    <groupId>io.rsocket.kotlin</groupId>
    <artifactId>rsocket-core</artifactId>
    <version>0.15.4</version>
</dependency>

<dependency>
    <groupId>io.ktor</groupId>
    <artifactId>ktor-client-cio-jvm</artifactId>
    <version>2.3.7</version>
</dependency>
<dependency>
    <groupId>io.rsocket.kotlin</groupId>
    <artifactId>rsocket-ktor-client-jvm</artifactId>
    <version>0.15.4</version>
</dependency>

We can now create an RSocket client. This is just a Ktor HttpClient, which has the WebSockets and RSocket extensions installed into it:

val client = HttpClient {
    install(io.ktor.client.plugins.websocket.WebSockets)
    install(io.rsocket.kotlin.ktor.client.RSocketSupport)
}

As before, the WebSockets extension must be installed before the RSocketSupport extension. Note, however, that these are from different packages and come from different dependencies than the server-side versions.

4. Fire and Forget

Once we’ve got a working server and client setup, we can actually do something with it. The most basic of operations that we can implement is fire and forget. This is where the client sends a message to the server but doesn’t care about it anymore. There’s no expectation of a response, and the client can just carry on with whatever else it’s doing.

This can be useful for scenarios like emitting events – informing the server that something has happened but not needing any feedback from the server.

The first thing we need to do is set up the server to receive these messages. This means adding a new handler to our routing block:

rSocket("/rsocket/fireAndForget") {
    RSocketRequestHandler {
        fireAndForget { request: Payload ->
            val text = request.data.readText()
            println("Received request: $text")
        }
    }
}

Here, we’ve added a new handler of type fireAndForget – which means that there’s no response expected. We’ve then wired this up to the “/rsocket/fireAndForget” URL path. This is important since this is how the client will address requests to our server.

Our handler here simply gets the textual data from the received request and prints it out, but we can do anything we like with it.

Once we’ve done this, we can allow our client to send messages. This makes use of the HttpClient instance that we saw earlier:

val rSocket: RSocket = client.rSocket(host = "localhost", port = 9000, path = "/rsocket/fireAndForget")
rSocket.fireAndForget(buildPayload { data("Hello") })

We’ll notice here that the client.rSocket() call needs three pieces of information:

  • The host that the server is running on. This can be omitted if it will always be localhost, but is included here for completeness
  • The port that the server is running on
  • The URL path that our handler is expecting to be called on

The use of URL paths for handlers allows us to set up many different handlers for different functions on the same server.

We then send a single piece of data – the string “Hello” – to our server, never expecting a response. The client must send the message in the exact format expected by the server. Otherwise, we’ll encounter unexpected behavior.

At this point, we can now send whatever messages we want from the client to the server and act on them however we want.

5. Request/Response

Fire-and-forget messages are already very useful, but sometimes, we need to get a response back as well. For example, the client might need to know the outcome of processing the message. This can be achieved with request/response messages instead. The only real difference here is that the server will always send a message back as a result of any message it receives.

As before, we first need to set up our server to handle these:

rSocket("/rsocket/requestResponse") {
    RSocketRequestHandler {
        requestResponse { request: Payload ->
            val text = request.data.readText()
            println("Received request: $text")

            delay(Duration.ofSeconds(5))

            buildPayload { data(text.reversed()) }
        }
    }
}

This looks remarkably similar to our previous example, for good reason. The only notable differences are that we have a requestResponse handler instead of a fireAndForget one, and that this, in turn means our handler needs to return a message to send back to the client.

We now need a client that can talk to this:

val rSocket: RSocket = client.rSocket(port = 9000, path = "/rsocket/requestResponse")
val response = rSocket.requestResponse(buildPayload { data("Hello") })

val text = response.data.readText()
println("Received response: $text")

As before, we create our client connecting to our server with the correct URL path and we send a message down it. This time, however, we also wait for a response to come back.

5.1. Incorrect Message Types

Since request/response is so similar to fire-and-forget, we might think that we can intermix them. For example, we might have a request/response handler on the server, but one specific client just doesn’t care about the response and so wants to use a fire-and-forget client instead.

Unfortunately, this isn’t an option. If we attempt this, the server will throw an exception, preventing the message from being processed:

Exception in thread "eventLoopGroupProxy-3-4" kotlin.NotImplementedError: Fire and Forget is not implemented.
  at io.rsocket.kotlin.RSocketKt.notImplemented(RSocket.kt:52)
  at io.rsocket.kotlin.RSocketKt.access$notImplemented(RSocket.kt:1)
  at io.rsocket.kotlin.RSocket$DefaultImpls.fireAndForget(RSocket.kt:33)
  at io.rsocket.kotlin.RSocketRequestHandler.fireAndForget(RSocketRequestHandler.kt:91)
  at io.rsocket.kotlin.internal.RSocketResponder$handleFireAndForget$1.invokeSuspend(RSocketResponder.kt:39)
  at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
  at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106)
  at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:173)
  at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:166)
  at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
  at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:569)
  at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
  at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
  at io.ktor.server.netty.EventLoopGroupProxy$Companion.create$lambda$1$lambda$0(NettyApplicationEngine.kt:296)
  at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
  at java.base/java.lang.Thread.run(Thread.java:1623)
  Suppressed: kotlinx.coroutines.internal.DiagnosticCoroutineContextException: [CoroutineName(rSocket-responder),
  StandaloneCoroutine{Cancelling}@2fbf045b, io.ktor.server.netty.EventLoopGroupProxy@2c2d9176]

This is because we must use the correct client type for the server handler for the routing to work correctly.

6. Request Stream

So far, everything that we’ve looked at starts with a single message from the client to the server and then, optionally, a single response back to the client. However, we can also send a stream of data back to the client instead. This stream can either be bounded or unbounded as appropriate.

Our server will still look relatively similar to what we had before. We’ll have a requestStream handler instead, and this will return a Flow<T>, which represents our stream of data:

rSocket("/rsocket/requestStream") {
    RSocketRequestHandler {
        requestStream { request: Payload ->
            val text = request.data.readText()
            println("Received request: $text")

            flow {
                processData(text)
            }
        }
    }
}

Note that we don’t have any code here to produce our data – that’s a separate method. For example:

suspend fun FlowCollector<Payload>.processData(text: String) {
    for (i in 0..10) {
        val data = "data: ($text)$i"

        println("Emitting $data")
        emitOrClose(buildPayload { data(data) })

        delay(Duration.ofMillis(500))
    }
}

This simple suspend function will emit 10 values with a 500 ms pause between them. The emitOrClose() function is an extension function that RSocket provides on top of the kotlinx-coroutines FlowCollector class to be able to send messages back to the client.

What about the client? Again, this starts very similar to before. However, in this case, we need to be able to handle a stream of messages coming back instead of just one:

val rSocket: RSocket = client.rSocket(port = 9000, path = "/rsocket/requestStream")
val stream = rSocket.requestStream(buildPayload { data("Hello") })

stream.onEach { frame ->
    val text = frame.data.readText()
    println("Received frame: $text")
}.launchIn(this)

Our requestStream() call returns us a kotlinx-coroutines Flow<Payload> instance that represents the flow of messages from the server. We can then use this however we wish – in this case, we simply print out the values that we received.

7. Request Channels

Request streams allow us to have a data stream sent from the server to the client. However, the client only sends a single value to the server, triggering all of these responses.

Request channels enable a data stream to flow in both directions, allowing for bidirectional communication. These are independent streams, so it’s up to our implementation how we react to them on each end.

Our server will look slightly different this time. This time, not only does our handler get provided with the incoming request, but also a Flow<Payload> of the stream of data from client to server:

rSocket("/rsocket/requestChannel") {
    RSocketRequestHandler {
        requestChannel { request: Payload, payloads: Flow<Payload> ->
            val text = request.data.readText()
            println("Received request: $text")

            payloads.onEach { frame ->
                val payloadText = frame.data.readText()
                println("Received frame: $payloadText")
            }.launchIn(this)

            flow {
                processData(text)
            }
        }
    }
}

As before, our handler needs to return a Flow<Payload> representing the values sent to the client. However, we’re also processing a Flow<Payload> representing the values sent from the client to the server.

At the same time, our client needs to be slightly different to provide a Flow<Payload> of messages that are to be sent to the server:

val rSocket: RSocket = client.rSocket(port = 9000, path = "/rsocket/requestChannel")
val stream = rSocket.requestChannel(buildPayload { data("Hello") }, flow { produceData() })

stream.onEach { frame ->
    val text = frame.data.readText()
    println("Received frame: $text")
}.launchIn(this)

This Flow<Payload> can be anything we want – bounded or unbounded. This means that we’re able to have a generated flow of data in both directions as appropriate.

8. Summary

In this article, we’ve seen how to use RSocket with Kotlin. Next time you need to do reactive network communications, why not give it a go?

As always, all of the code for this article is available over on GitHub.

Comments are closed on this article!