Course – LS – All

Get started with Spring and Spring Boot, through the Learn Spring course:

>> CHECK OUT THE COURSE

1. Introduction

In this tutorial, we’ll take a first look at RSocket and how it enables client-server communication.

2. What Is RSocket?

RSocket is a binary, point-to-point communication protocol intended for use in distributed applications. In that sense, it provides an alternative to other protocols like HTTP.

A full comparison between RSocket and other protocols is beyond the scope of this article. Instead, we’ll focus on a key feature of RSocket: its interaction models.

RSocket provides four interaction models. With that in mind, we’ll explore each one with an example.

3. Maven Dependencies

RSocket needs only two direct dependencies for our examples:

<dependency>
    <groupId>io.rsocket</groupId>
    <artifactId>rsocket-core</artifactId>
    <version>0.11.13</version>
</dependency>
<dependency>
    <groupId>io.rsocket</groupId>
    <artifactId>rsocket-transport-netty</artifactId>
    <version>0.11.13</version>
</dependency>

The rsocket-core and rsocket-transport-netty dependencies are available on Maven Central.

An important note is that the RSocket library makes frequent use of reactive streams. The Flux and Mono classes are used throughout this article so a basic understanding of them will be helpful.

4. Server Setup

First, let’s create the Server class:

public class Server {
    private final Disposable server;

    public Server() {
        this.server = RSocketFactory.receive()
          .acceptor((setupPayload, reactiveSocket) -> Mono.just(new RSocketImpl()))
          .transport(TcpServerTransport.create("localhost", TCP_PORT))
          .start()
          .subscribe();
    }

    public void dispose() {
        this.server.dispose();
    }

    private class RSocketImpl extends AbstractRSocket {}
}

Here we use the RSocketFactory to set up and listen to a TCP socket. We pass in our custom RSocketImpl to handle requests from clients. We’ll add methods to the RSocketImpl as we go.

Next, to start the server we just need to instantiate it:

Server server = new Server();

A single server instance can handle multiple connections. As a result, just one server instance will support all of our examples.

When we’re finished, the dispose method will stop the server and release the TCP port.

4. Interaction Models

4.1. Request/Response

RSocket provides a request/response model – each request receives a single response.

For this model, we’ll create a simple service that returns a message back to the client.

Let’s start by adding a method to our extension of AbstractRSocket, RSocketImpl:

@Override
public Mono<Payload> requestResponse(Payload payload) {
    try {
        return Mono.just(payload); // reflect the payload back to the sender
    } catch (Exception x) {
        return Mono.error(x);
    }
}

The requestResponse method returns a single result for each request, as we can see by the Mono<Payload> response type.

Payload is the class that contains message content and metadata. It’s used by all of the interaction models. The content of the payload is binary, but there are convenience methods that support String-based content.

Next, we can create our client class:

public class ReqResClient {

    private final RSocket socket;

    public ReqResClient() {
        this.socket = RSocketFactory.connect()
          .transport(TcpClientTransport.create("localhost", TCP_PORT))
          .start()
          .block();
    }

    public String callBlocking(String string) {
        return socket
          .requestResponse(DefaultPayload.create(string))
          .map(Payload::getDataUtf8)
          .block();
    }

    public void dispose() {
        this.socket.dispose();
    }
}

The client uses the RSocketFactory.connect() method to initiate a socket connection with the server. We use the requestResponse method on the socket to send a payload to the server.

Our payload contains the String passed into the client. When the Mono<Payload> response arrives we can use the getDataUtf8() method to access the String content of the response.

Finally, we can run the integration test to see request/response in action. We’ll send a String to the server and verify that the same String is returned:

@Test
public void whenSendingAString_thenRevceiveTheSameString() {
    ReqResClient client = new ReqResClient();
    String string = "Hello RSocket";

    assertEquals(string, client.callBlocking(string));

    client.dispose();
}

4.2. Fire-and-Forget

With the fire-and-forget model, the client will receive no response from the server.

In this example, the client will send simulated measurements to the server in 50ms intervals. The server will publish the measurements.

Let’s add a fire-and-forget handler to our server in the RSocketImpl class:

@Override
public Mono<Void> fireAndForget(Payload payload) {
    try {
        dataPublisher.publish(payload); // forward the payload
        return Mono.empty();
    } catch (Exception x) {
        return Mono.error(x);
    }
}

This handler looks very similar to the request/response handler. However, fireAndForget returns Mono<Void> instead of Mono<Payload>.

The dataPublisher is an instance of org.reactivestreams.Publisher. Thus, it makes the payload available to subscribers. We’ll make use of that in the request/stream example.

Next, we’ll create the fire-and-forget client:

public class FireNForgetClient {
    private final RSocket socket;
    private final List<Float> data;

    public FireNForgetClient() {
        this.socket = RSocketFactory.connect()
          .transport(TcpClientTransport.create("localhost", TCP_PORT))
          .start()
          .block();
    }

    /** Send binary velocity (float) every 50ms */
    public void sendData() {
        data = Collections.unmodifiableList(generateData());
        Flux.interval(Duration.ofMillis(50))
          .take(data.size())
          .map(this::createFloatPayload)
          .flatMap(socket::fireAndForget)
          .blockLast();
    }

    // ... 
}

The socket setup is exactly the same as before.

The sendData() method uses a Flux stream to send multiple messages. For each message, we invoke socket::fireAndForget.

We need to subscribe to the Mono<Void> response for each message. If we forget to subscribe then socket::fireAndForget will not execute.

The flatMap operator makes sure the Void responses are passed to the subscriber, while the blockLast operator acts as the subscriber.

We’re going to wait until the next section to run the fire-and-forget test. At that point, we’ll create a request/stream client to receive the data that was pushed by the fire-and-forget client.

4.3. Request/Stream

In the request/stream model, a single request may receive multiple responses. To see this in action we can build upon the fire-and-forget example. To do that, let’s request a stream to retrieve the measurements we sent in the previous section.

As before, let’s start by adding a new listener to the RSocketImpl on the server:

@Override
public Flux<Payload> requestStream(Payload payload) {
    return Flux.from(dataPublisher);
}

The requestStream handler returns a Flux<Payload> stream. As we recall from the previous section, the fireAndForget handler published incoming data to the dataPublisher. Now, we’ll create a Flux stream using that same dataPublisher as the event source. By doing this the measurement data will flow asynchronously from our fire-and-forget client to our request/stream client.

Let’s create the request/stream client next:

public class ReqStreamClient {

    private final RSocket socket;

    public ReqStreamClient() {
        this.socket = RSocketFactory.connect()
          .transport(TcpClientTransport.create("localhost", TCP_PORT))
          .start()
          .block();
    }

    public Flux<Float> getDataStream() {
        return socket
          .requestStream(DefaultPayload.create(DATA_STREAM_NAME))
          .map(Payload::getData)
          .map(buf -> buf.getFloat())
          .onErrorReturn(null);
    }

    public void dispose() {
        this.socket.dispose();
    }
}

We connect to the server in the same way as our previous clients.

In getDataStream() we use socket.requestStream() to receive a Flux<Payload> stream from the server. From that stream, we extract the Float values from the binary data. Finally, the stream is returned to the caller, allowing the caller to subscribe to it and process the results.

Now let’s test. We’ll verify the round trip from fire-and-forget to request/stream.

We can assert that each value is received in the same order as it was sent. Then, we can assert that we receive the same number of values that were sent:

@Test
public void whenSendingStream_thenReceiveTheSameStream() {
    FireNForgetClient fnfClient = new FireNForgetClient(); 
    ReqStreamClient streamClient = new ReqStreamClient();

    List<Float> data = fnfClient.getData();
    List<Float> dataReceived = new ArrayList<>();

    Disposable subscription = streamClient.getDataStream()
      .index()
      .subscribe(
        tuple -> {
            assertEquals("Wrong value", data.get(tuple.getT1().intValue()), tuple.getT2());
            dataReceived.add(tuple.getT2());
        },
        err -> LOG.error(err.getMessage())
      );

    fnfClient.sendData();

    // ... dispose client & subscription

    assertEquals("Wrong data count received", data.size(), dataReceived.size());
}

4.4. Channel

The channel model provides bidirectional communication. In this model, message streams flow asynchronously in both directions.

Let’s create a simple game simulation to test this. In this game, each side of the channel will become a player.  As the game runs, these players will send messages to the other side at random time intervals. The opposite side will react to the messages.

Firstly, we’ll create the handler on the server. Like before, we add to the RSocketImpl:

@Override
public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
    Flux.from(payloads)
      .subscribe(gameController::processPayload);
    return Flux.from(gameController);
}

The requestChannel handler has Payload streams for both input and output. The Publisher<Payload> input parameter is a stream of payloads received from the client. As they arrive, these payloads are passed to the gameController::processPayload function.

In response, we return a different Flux stream back to the client. This stream is created from our gameController, which is also a Publisher.

Here is a summary of the GameController class:

public class GameController implements Publisher<Payload> {
    
    @Override
    public void subscribe(Subscriber<? super Payload> subscriber) {
        // send Payload messages to the subscriber at random intervals
    }

    public void processPayload(Payload payload) {
        // react to messages from the other player
    }
}

When the GameController receives a subscriber it begins sending messages to that subscriber.

Next, let’s create the client:

public class ChannelClient {

    private final RSocket socket;
    private final GameController gameController;

    public ChannelClient() {
        this.socket = RSocketFactory.connect()
          .transport(TcpClientTransport.create("localhost", TCP_PORT))
          .start()
          .block();

        this.gameController = new GameController("Client Player");
    }

    public void playGame() {
        socket.requestChannel(Flux.from(gameController))
          .doOnNext(gameController::processPayload)
          .blockLast();
    }

    public void dispose() {
        this.socket.dispose();
    }
}

As we have seen in our previous examples, the client connects to the server in the same way as the other clients.

The client creates its own instance of the GameController.

We use socket.requestChannel() to send our Payload stream to the server.  The server responds with a Payload stream of its own.

As payloads received from the server we pass them to our gameController::processPayload handler.

In our game simulation, the client and server are mirror images of each other. That is, each side is sending a stream of Payload and receiving a stream of Payload from the other end.

The streams run independently, without synchronization.

Finally, let’s run the simulation in a test:

@Test
public void whenRunningChannelGame_thenLogTheResults() {
    ChannelClient client = new ChannelClient();
    client.playGame();
    client.dispose();
}

5. Conclusion

In this introductory article, we’ve explored the interaction models provided by RSocket. The full source code of the examples can be found in our Github repository.

Be sure to check out the RSocket website for a deeper discussion. In particular, the FAQ and Motivations documents provide a good background.

Course – LS – All

Get started with Spring and Spring Boot, through the Learn Spring course:

>> CHECK OUT THE COURSE
res – Junit (guide) (cat=Reactive)
Comments are open for 30 days after publishing a post. For any issues past this date, use the Contact form on the site.