Course – LS – All

Get started with Spring and Spring Boot, through the Learn Spring course:

>> CHECK OUT THE COURSE

1. Overview

In this tutorial, we’ll use the Java Client for NATs to connect to a NATS Server and publish and receive messages.

NATS offers three primary modes of message exchange. Publish/Subscribe semantics delivers messages to all subscribers of a topic. Request/Reply messaging sends requests via topics and routes responses back to the requestor.

Subscribers can also join message queue groups when they subscribe to a topic. Messages sent to the associated topic are only delivered to one subscriber in the queue group.

2. Setup

2.1. Maven Dependency

First, we need to add the NATS library to our pom.xml:

<dependency>
    <groupId>io.nats</groupId>
    <artifactId>jnats</artifactId>
    <version>1.0</version>
</dependency>

The latest version of the library can be found here, and the Github project is here.

2.2. NATS Server

Second, we’ll need a NATS Server for exchanging messages. There’re instructions for all major platforms here.

We assume that there’s a server running on localhost:4222.

3. Connect and Exchange Messages

3.1. Connect to NATS

The connect() method in the static NATS class creates Connections.

If we want to use a connection with default options and listening at localhost on port 4222, we can use the default method:

Connection natsConnection = Nats.connect();

But Connections have many configurable options, a few of which we want to override.

We’ll create an Options object and pass it to Nats:

private Connection initConnection() {
    Options options = new Options.Builder()
      .errorCb(ex -> log.error("Connection Exception: ", ex))
      .disconnectedCb(event -> log.error("Channel disconnected: {}", event.getConnection()))
      .reconnectedCb(event -> log.error("Reconnected to server: {}", event.getConnection()))
      .build();

    return Nats.connect(uri, options);
}

NATS Connections are durable. The API will attempt to reconnect a lost connection.

We’ve installed callbacks to notify us of when a disconnect occurs and when the connection is restored. In this example, we’re using lambdas, but for applications that need to do more than simply log the event, we can install objects that implement the required interfaces.

We can run a quick test. Create a connection and add a sleep for 60 seconds to keep the process running:

Connection natsConnection = initConnection();
Thread.sleep(60000);

Run this. Then stop and start your NATS server:

[jnats-callbacks] ERROR com.baeldung.nats.NatsClient 
  - Channel disconnected: io.nats.client.ConnectionImpl@79428dc1
[reconnect] WARN io.nats.client.ConnectionImpl 
  - couldn't connect to nats://localhost:4222 (nats: connection read error)
[jnats-callbacks] ERROR com.baeldung.nats.NatsClient 
  - Reconnected to server: io.nats.client.ConnectionImpl@79428dc1

We can see the callbacks log the disconnection and reconnect.

3.2. Subscribe to Messages

Now that we have a connection, we can work on message processing.

A NATS Message is a container for an array of bytes[]. In addition to the expected setData(byte[]) and byte[] getData() methods there’re methods for setting and getting the message destination and reply to topics.

We subscribe to topics, which are Strings.

NATS supports both synchronous and asynchronous subscriptions.

Let’s take a look at an asynchronous subscription:

AsyncSubscription subscription = natsConnection
  .subscribe( topic, msg -> log.info("Received message on {}", msg.getSubject()));

The API delivers Messages to our MessageHandler(), in its thread.

Some applications may want to control the thread that processes messages instead:

SyncSubscription subscription = natsConnection.subscribeSync("foo.bar");
Message message = subscription.nextMessage(1000);

SyncSubscription has a blocking nextMessage() method that will block for the specified number of milliseconds. We’ll use synchronous subscriptions for our tests to keep the test cases simple.

AsyncSubscription and SyncSubscription both have an unsubscribe() method that we can use to close the subscription.

subscription.unsubscribe();

3.3. Publish Messages

Publishing Messages can be done several ways.

The simplest method requires only a topic String and the message bytes:

natsConnection.publish("foo.bar", "Hi there!".getBytes());

If a publisher wishes a response or to provide specific information about the source of a message, it’s may also send a message with a reply-to topic:

natsConnection.publish("foo.bar", "bar.foo", "Hi there!".getBytes());

There are also overloads for a few other combinations such as passing in a Message instead of bytes.

3.4. A Simple Message Exchange

Given a valid Connection, we can write a test that verifies message exchange:

SyncSubscription fooSubscription = natsConnection.subscribe("foo.bar");
SyncSubscription barSubscription = natsConnection.subscribe("bar.foo");
natsConnection.publish("foo.bar", "bar.foo", "hello there".getBytes());

Message message = fooSubscription.nextMessage();
assertNotNull("No message!", message);
assertEquals("hello there", new String(message.getData()));

natsConnection
  .publish(message.getReplyTo(), message.getSubject(), "hello back".getBytes());

message = barSubscription.nextMessage();
assertNotNull("No message!", message);
assertEquals("hello back", new String(message.getData()));

We start by subscribing to two topics with synchronous subscriptions since they work much better inside a JUnit test. Then we send a message to one of them, specifying the other as a replyTo address.

After reading the message from the first destination we “flip” the topics to send a response.

3.5. Wildcard Subscriptions

NATS server supports topic wildcards.

Wildcards operate on topic tokens that are separated with the ’.’ character. The asterisk character ‘*’ matches an individual token. The greater-than symbol ‘>’ is a wildcard match for the remainder of a topic, which may be more than one token.

For example:

  • foo.* matches foo.bar, foo.requests, but not foo.bar.requests
  • foo.> matches foo.bar, foo.requests, foo.bar.requests, foo.bar.baeldung, etc.

Let’s try a few tests:

SyncSubscription fooSubscription = client.subscribeSync("foo.*");

client.publishMessage("foo.bar", "bar.foo", "hello there");

Message message = fooSubscription.nextMessage(200);
assertNotNull("No message!", message);
assertEquals("hello there", new String(message.getData()));

client.publishMessage("foo.bar.plop", "bar.foo", "hello there");
message = fooSubscription.nextMessage(200);
assertNull("Got message!", message);

SyncSubscription barSubscription = client.subscribeSync("foo.>");

client.publishMessage("foo.bar.plop", "bar.foo", "hello there");

message = barSubscription.nextMessage(200);
assertNotNull("No message!", message);
assertEquals("hello there", new String(message.getData()));

4. Request/Reply Messaging

Our message exchange test resembled a common idiom on pub/sub messaging systems; request/reply. NATS has explicit support for this request/reply messaging.

Publishers can install a handler for requests using the asynchronous subscription method we used above:

AsyncSubscription subscription = natsConnection
  .subscribe("foo.bar.requests", new MessageHandler() {
    @Override
    public void onMessage(Message msg) {
        natsConnection.publish(message.getReplyTo(), reply.getBytes());
    }
});

Or they can respond to requests as they arrive.

The API provides a request() method:

Message reply = natsConnection.request("foo.bar.requests", request.getBytes(), 100);

This method creates a temporary mailbox for the response, and write the reply-to address for us.

Request() returns the response, or null if the request times out. The last argument is the number of milliseconds to wait.

We can modify our test for request/reply:

natsConnection.subscribe(salary.requests", message -> {
    natsConnection.publish(message.getReplyTo(), "denied!".getBytes());
});
Message reply = natsConnection.request("salary.requests", "I need a raise.", 100);
assertNotNull("No message!", reply);
assertEquals("denied!", new String(reply.getData()));

5. Message Queues

Subscribers may specify queue groups at subscription time. When a message is published to the group NATS will deliver it to a one-and-only-one subscriber.

Queue groups do not persist messages. If no listeners are available, the message is discarded.

5.1. Subscribing to Queues

Subscribers specify a queue group name as a String:

SyncSubscription subscription = natsConnection.subscribe("topic", "queue name");

There is also an asynchronous version, of course:

SyncSubscription subscription = natsConnection
  .subscribe("topic", "queue name", new MessageHandler() {
    @Override
    public void onMessage(Message msg) {
        log.info("Received message on {}", msg.getSubject());
    }
});

The subscription creates the queue on the NATS server.

5.2. Publishing to Queues

Publishing message to queue groups simply requires publishing to the associated topic:

natsConnection.publish("foo",  "queue message".getBytes());

The NATS server will route the message to the queue and select a message receiver.

We can verify this with a test:

SyncSubscription queue1 = natsConnection.subscribe("foo", "queue name");
SyncSubscription queue2 = natsConnection.subscribe("foo", "queue name");

natsConnection.publish("foo", "foobar".getBytes());

List<Message> messages = new ArrayList<>();

Message message = queue1.nextMessage(200);
if (message != null) messages.add(message);

message = queue2.nextMessage(200);
if (message != null) messages.add(message);

assertEquals(1, messages.size());

We only receive one message.

If we change the first two lines to a normal subscription:

SyncSubscription queue1 = natsConnection.subscribe("foo");
SyncSubscription queue2 = natsConnection.subscribe("foo");

The test fails because the message is delivered to both subscribers.

6. Conclusion

In this brief introduction, we connected to a NATS server and sent both pub/sub messages and load-balanced queue messages. We looked at NATS support for wildcard subscriptions. We also used request/reply messaging.

Code samples, as always, can be found over on GitHub.

Course – LS – All

Get started with Spring and Spring Boot, through the Learn Spring course:

>> CHECK OUT THE COURSE
res – REST with Spring (eBook) (everywhere)
Comments are closed on this article!