I just announced the new Spring Boot 2 material, coming in REST With Spring:

>> CHECK OUT THE COURSE

1. Overview

In this article, we will demonstrate how to build a simple server and its client using the Java 7 NIO.2 channel APIs.

We’ll look at the AsynchronousServerSocketChannel and AsynchronousSocketChannel classes which are the key classes used in implementing the server and client respectively.

If you are new to NIO.2 channel APIs, we have an introductory article on this site. You can read it by following this link.

All classes that are needed to use NIO.2 channel APIs are bundled up in java.nio.channels package:

import java.nio.channels.*;

2. The Server With Future

An instance of AsynchronousServerSocketChannel is created by calling the static open API on its class:

AsynchronousServerSocketChannel server
  = AsynchronousServerSocketChannel.open();

A newly created asynchronous server socket channel is open but not yet bound, so we must bind it to a local address and optionally choose a port:

server.bind(new InetSocketAddress("127.0.0.1", 4555));

We could just as well have passed in null so that it uses a local address and binds to an arbitrary port:

server.bind(null);

Once bound, the accept API is used to initiate the accepting of connections to the channel’s socket:

Future<AsynchronousSocketChannel> acceptFuture = server.accept();

As it is with asynchronous channel operations, the above call returns right away and execution continues.

Next, we can use the get API to query for a response from the Future object:

AsynchronousSocketChannel worker = future.get();

This call will block if necessary to wait for a connection request from a client. Optionally, we can specify a timeout if we don’t want to wait forever:

AsynchronousSocketChannel worker = acceptFuture.get(10, TimeUnit.SECONDS);

After the above call returns and the operation were successful, we can create a loop within which we listen for incoming messages and echo them back to the client.

Let’s create a method called runServer within which we will do the waiting and process any incoming messages:

public void runServer() {
    clientChannel = acceptResult.get();
    if ((clientChannel != null) && (clientChannel.isOpen())) {
        while (true) {
            ByteBuffer buffer = ByteBuffer.allocate(32);
            Future<Integer> readResult  = clientChannel.read(buffer);
            
            // perform other computations
            
            readResult.get();
            
            buffer.flip();
            Future<Integer> writeResult = clientChannel.write(buffer);
 
            // perform other computations
 
            writeResult.get();
            buffer.clear();
        } 
        clientChannel.close();
        serverChannel.close();
    }
}

Inside the loop, all we do is create a buffer to read from and write to depending on the operation.

Then, every time we do a read or a write, we can continue executing any other code and when we are ready to process the result, we call the get() API on the Future object.

To start the server, we call its constructor and then the runServer method inside main:

public static void main(String[] args) {
    AsyncEchoServer server = new AsyncEchoServer();
    server.runServer();
}

3. The Server with CompletionHandler

In this section, we will see how to implement the same server using the CompletionHandler approach rather than a Future approach.

Inside the constructor, we create an AsynchronousServerSocketChannel and bind it to a local address the same way we did before:

serverChannel = AsynchronousServerSocketChannel.open();
InetSocketAddress hostAddress = new InetSocketAddress("localhost", 4999);
serverChannel.bind(hostAddress);

Next, still inside the constructor, we create a while loop within which we accept any incoming connection from a client. This while loop is used strictly to prevent the server from exiting before establishing a connection with a client.

To prevent the loop from running endlessly, we call System.in.read() at its end to block execution until an incoming connection is read from the standard input stream:

while (true) {
    serverChannel.accept(
      null, new CompletionHandler<AsynchronousSocketChannel,Object>() {

        @Override
        public void completed(
          AsynchronousSocketChannel result, Object attachment) {
            if (serverChannel.isOpen()){
                serverChannel.accept(null, this);
            }

            clientChannel = result;
            if ((clientChannel != null) && (clientChannel.isOpen())) {
                ReadWriteHandler handler = new ReadWriteHandler();
                ByteBuffer buffer = ByteBuffer.allocate(32);

                Map<String, Object> readInfo = new HashMap<>();
                readInfo.put("action", "read");
                readInfo.put("buffer", buffer);

                clientChannel.read(buffer, readInfo, handler);
             }
         }
         @Override
         public void failed(Throwable exc, Object attachment) {
             // process error
         }
    });
    System.in.read();
}

When a connection is established, the completed callback method in the CompletionHandler of the accept operation is called.

Its return type is an instance of AsynchronousSocketChannel. If the server socket channel is still open, we call the accept API again to get ready for another incoming connection while reusing the same handler.

Next, we assign the returned socket channel to a global instance. We then check that it is not null and that it is open before performing operations on it.

The point at which we can start read and write operations is inside the completed callback API of the accept operation’s handler. This step replaces the previous approach where we polled the channel with the get API.

Notice that the server will no longer exit after a connection has been established unless we explicitly close it.

Notice also that we created a separate inner class for handling read and write operations; ReadWriteHandler. We will see how the attachment object comes in handy at this point.

First, let’s look at the ReadWriteHandler class:

class ReadWriteHandler implements 
  CompletionHandler<Integer, Map<String, Object>> {
    
    @Override
    public void completed(
      Integer result, Map<String, Object> attachment) {
        Map<String, Object> actionInfo = attachment;
        String action = (String) actionInfo.get("action");

        if ("read".equals(action)) {
            ByteBuffer buffer = (ByteBuffer) actionInfo.get("buffer");
            buffer.flip();
            actionInfo.put("action", "write");

            clientChannel.write(buffer, actionInfo, this);
            buffer.clear();

        } else if ("write".equals(action)) {
            ByteBuffer buffer = ByteBuffer.allocate(32);

            actionInfo.put("action", "read");
            actionInfo.put("buffer", buffer);

            clientChannel.read(buffer, actionInfo, this);
        }
    }
    
    @Override
    public void failed(Throwable exc, Map<String, Object> attachment) {
        // 
    }
}

The generic type of our attachment in the ReadWriteHandler class is a map. We specifically need to pass two important parameters through it – the type of operation(action) and the buffer.

Next, we will see how these parameters are used.

The first operation we perform is a read since this is an echo server which only reacts to client messages. Inside the ReadWriteHandler‘s completed callback method, we retrieve the attached data and decide what to do accordingly.

If it’s a read operation which has completed, we retrieve the buffer, change the action parameter of the attachment and perform a write operation right away to echo the message to the client.

If it’s a write operation which has just completed, we call the read API again to prepare the server to receive another incoming message.

4. The Client

After setting up the server, we can now set up the client by calling the open API on the AsyncronousSocketChannel class. This call creates a new instance of the client socket channel which we then use to make a connection to the server:

AsynchronousSocketChannel client = AsynchronousSocketChannel.open();
InetSocketAddress hostAddress = new InetSocketAddress("localhost", 4999)
Future<Void> future = client.connect(hostAddress);

The connect operation returns nothing on success. However, we can still use the Future object to monitor the state of the asynchronous operation.

Let’s call the get API to await connection:

future.get()

After this step, we can start sending messages to the server and receiving echoes for the same. The sendMessage method looks like this:

public String sendMessage(String message) {
    byte[] byteMsg = new String(message).getBytes();
    ByteBuffer buffer = ByteBuffer.wrap(byteMsg);
    Future<Integer> writeResult = client.write(buffer);

    // do some computation

    writeResult.get();
    buffer.flip();
    Future<Integer> readResult = client.read(buffer);
    
    // do some computation

    readResult.get();
    String echo = new String(buffer.array()).trim();
    buffer.clear();
    return echo;
}

5. The Test

To confirm that our server and client applications are performing according to expectation, we can use a test:

@Test
public void givenServerClient_whenServerEchosMessage_thenCorrect() {
    String resp1 = client.sendMessage("hello");
    String resp2 = client.sendMessage("world");

    assertEquals("hello", resp1);
    assertEquals("world", resp2);
}

6. Conclusion

In this article, we have explored the Java NIO.2 asynchronous socket channel APIs. We have been able to step through the process of building a server and client with these new APIs.

You can access the full source code for this article by in the Github project.

I just announced the new Spring Boot 2 material, coming in REST With Spring:

>> CHECK OUT THE LESSONS