Course – LS – All

Get started with Spring and Spring Boot, through the Learn Spring course:

>> CHECK OUT THE COURSE

1. Introduction

In this article, we’re going to have a look at JeroMQ, a pure Java implementation of ZeroMQ. We’ll see what it is and what it can do for us in our applications.

2. What Is ZeroMQ?

ZeroMQ is a messaging infrastructure that doesn’t need any actual infrastructure services to be set up. We don’t need a separate message broker like you would with other implementations such as ActiveMQ or Kafka. Instead, the ZeroMQ dependency within our application has the ability to do all of this for us.

So, what can we do with this? We can achieve all of the standard messaging patterns that we’d normally want:

  • Request/Response
  • Publish/Subscribe
  • Synchronous vs. Asynchronous
  • And others

2.1. Sockets

ZeroMQ works with the concept of sockets. These are very similar in concept to the sockets that we’d use in low-level network programming.

All sockets have a type, some of which we’ll see in this article. They then all either listen for connections from other sockets or else open connections to other sockets. Once a pair of sockets are connected, then we’re ready to send messages between them. Note that only certain combinations of sockets can be used together, depending on exactly what we want to achieve.

JeroMQ also supports several different transport mechanisms between sockets. For example, common ones include:

  • tcp://<host>:<port> – This uses TCP/IP networking to send messages between sockets. This can allow for the sockets to be on different processes and different hosts but does bring some of the reliability concerns that networking has.
  • ipc://<endpoint> – This uses system-dependant mechanisms to send messages between sockets. This allows for the sockets to be different processes, but they must be on the same host, and there may be other system restrictions on which processes can communicate.
  • inproc://<name> – This allows communication between sockets that are in the same process. Specifically, they must be within the same JeroMQ context.

The exact choice of transport will depend on our needs. Depending on the exact transport and socket types, we can also use this to communicate with other ZeroMQ implementations, including in other languages.

3. Getting Started

JeroMQ is a pure Java implementation of ZeroMQ. So let’s have a quick look at using it in our application.

3.1. Dependencies

The first thing we need is to add the dependency:

<dependency>
    <groupId>org.zeromq</groupId>
    <artifactId>jeromq</artifactId>
    <version>0.5.3</version>
</dependency>

We can find the latest version in the Maven Central Repository.

3.2. JeroMQ Context

Before we can do anything with JeroMQ, we need to have a context set up. This is an instance of the ZContext class and is responsible for managing everything.

There’s nothing special about creating our context – we can simply use new ZContext(). We must also ensure that we close it correctly – using the close() method. This ensures we release any networking resources correctly.

The instance that we’re using must live at least as long as anything we’re doing, so we need to ensure that it’s created at the start of our application and not closed until the end.

If we’re writing a standard Java application, we can simply use the try-with-resources pattern. If we’re using something like Spring, then we could set it up as a bean with a destroy method configured. And other patterns as needed by the framework that we’re using.

3.3. Creating Sockets

Once we have a context, we can use it to create sockets. These sockets are then the basis for all of our messaging.

We create a socket using the ZContext.createSocket() method, providing the type of socket that we want to use. Once this is done, we typically need to call either ZMQ.Socket.bind() to listen for connections, or ZMQ.Socket.connect() to open a connection to another socket.

At this point, we’re now ready to use our socket. Messages are sent using methods such as send() and received using methods such as recv(). 

We can close our sockets to disconnect when we’re finished. We can do this either by calling Socket.close() explicitly or else by closing the ZContext then, all sockets created from it are automatically closed.

Note that sockets aren’t threadsafe. We can pass them between threads, but it’s important that only one thread at a time ever accesses them.

4. Request/Response Messaging

Let’s start with a simple Request/Response setup. The first thing we need is a server. This is the part that listens for incoming connections:

try (ZContext context = new ZContext()) {
    ZMQ.Socket socket = context.createSocket(SocketType.REP);
    socket.bind("tcp://*:5555");

    byte[] reply = socket.recv();
    // Do something here.

    String response = "world";
    socket.send(response.getBytes(ZMQ.CHARSET), 0);
}

Here we created a new socket of type REP – short for Reply. We can instruct it to start listening on a given address before entering a loop in which we receive the next message from our socket, do something with it, and then send a response back.

Next, we need a client. This is the side that opens a connection out to the server. It’s also the side that must send the initial request – our server is only capable of replying to requests that it received:

try (ZContext context = new ZContext()) {
    ZMQ.Socket socket = context.createSocket(SocketType.REQ);
    socket.connect("tcp://localhost:5555");

    String request = "Hello";
    socket.send(request.getBytes(ZMQ.CHARSET), 0);

    byte[] reply = socket.recv();
}

As before, we create a new socket. Only this time, it’s of type REQ – short for Request. We then instruct it to connect to another socket somewhere before sending messages and receiving responses.

The main difference between the REQ and REP sides is when they’re allowed to send messages. The REQ side can send messages anytime, and the REP side can only send messages in response to receiving a message – hence Request and Response.

4.1. Multiple Clients

We’ve seen here how to have a single client sending messages to a single server. But what if we want to have multiple clients instead?

The good news is, it just works. JeroMQ will allow an arbitrary number of clients to connect to the same server address, and it’ll handle all of the networking requirements for us.

However, how does this work? There’s nothing in our server that states which client to send the response to. This is because we don’t need it. JeroMQ tracks it all for us. When the server calls send() the message is sent to the client that we last received a message from. This allows our code to not need to care about any of this.

The downside is that our processing must be entirely single-threaded. Because of how this works, we’re required to do all of the processing of one message and send the reply before we receive the next message. For some scenarios, this is fine, but often this will be a big bottleneck.

4.2. Asynchronous Processing

What if, instead, we want to be able to process incoming requests asynchronously and send responses out-of-order? We can’t easily do that with the REQ/REP setup since every response goes directly to the last received request.

Instead, we can do this with a different type of socket – ROUTER. This works very similar to REP, except that it becomes our responsibility to indicate who the recipient is of the messages.

Let’s have a look at a server component:

try (ZContext context = new ZContext()) {
    ZMQ.Socket broker = context.createSocket(SocketType.ROUTER);
    broker.bind("tcp://*:5555");

    String identity = broker.recvStr();
    broker.recv(); //  Envelope delimiter
    String message = broker.recvStr(0);
    // Do something here.

    broker.sendMore(identity);
    broker.sendMore("");
    broker.send("Hello back");
}

This looks very similar, but not quite the same. We set our socket type to ROUTER instead of REP. This socket type allows the server to route messages to a specific client by knowing their identity.

When we receive messages here, we actually receive three different pieces of data. First, we receive the client’s identity, followed by an envelope delimiter and then the actual message.

Equally, when we send messages, we need to do the same. We send the identity of the client the message is for, followed by an envelope delimiter – which can be any string – and then the actual message.

Let’s take a look at the client:

try (ZContext context = new ZContext()) {
    ZMQ.Socket worker = context.createSocket(SocketType.REQ);
    worker.setIdentity(Thread.currentThread().getName().getBytes(ZMQ.CHARSET));

    worker.connect("tcp://localhost:5555");
    worker.send("Hello " + 
    String workload = worker.recvStr();
    // Do something with the response.
}

This is almost identical to our client from before. We’ve given our client an identity now so that the server knows which client is which. Without this, the server will not be able to direct responses to the correct client. Other than that, this is identical to what we’ve seen before.

Because our server can now indicate which client the message is for, we can suddenly process multiple requests at a time – for example, using an executor service. The only requirement is that we never have multiple threads accessing the socket simultaneously.

5. Pub/Sub Messaging

So far, we’ve seen cases where the client sends an initial request, and then the server sends back a response. What if, instead, we want to have the server just broadcast out events that clients can consume?

We can do this using the Pub/Sub pattern. A server will publish messages that a subscriber will then consume. So, how does this look?

Firstly we need to have our publisher:

try (ZContext context = new ZContext()) {
    ZMQ.Socket pub = context.createSocket(SocketType.PUB);
    pub.bind("tcp://*:5555");

    // Wait until something happens.
    pub.send("Hello");
}

This seems remarkably simple, but that’s because JeroMQ manages most of the complexity for us. All we’re doing is creating a socket of type PUB – short for Publish, listening for connections, and then sending a message down it.

Next, we need a subscriber:

try (ZContext context = new ZContext()) {
    ZMQ.Socket sub = context.createSocket(SocketType.SUB);
    sub.connect("tcp://localhost:5555");

    sub.subscribe("".getBytes());

    String message = sub.recvStr();
}

This is slightly more complicated, but still not by much. Here we create a socket of type SUB – short for Subscribe – and connect it to our publisher. We then need to subscribe to messages. This takes a set of bytes to consider as the prefix for all incoming messages – or the empty set of bytes to subscribe to all messages.

Once we’ve done this, we can receive messages. We receive any appropriate messages that were sent by the subscriber. Note that we can only receive messages that were sent after we subscribed – anything sent before that will have been lost.

5.1. Multiple Clients

As before, if we want to have multiple clients, then we can do so. Every connected subscriber will receive all appropriate messages sent by the publisher, meaning that this acts as a multicast – e.g. similar to a JMS Topic as opposed to a JMS Queue.

We can also have different clients with different subscriptions. This means that they each get only an appropriate subset of the broadcast messages. All of this works exactly as we’d expect, with no additional effort on our part.

5.2. Asynchronous Processing

One issue that we have here is that the recv() method blocks until a message is available. If our subscriber is only ever waiting on messages from this socket and then reacting to them, then that’s fine. However, if we want our subscriber to be doing other things – e.g. waiting on multiple sockets – then this doesn’t work.

The recv() or recvStr() methods that we’ve used have an alternative signature that allows for some flags to be provided. If the flag ZMQ.DONTWAIT is provided, which will cause the method to return immediately instead of blocking. If no message is ready to be read, this will then return null instead.

This will allow us to poll the socket to see if there is a message waiting, process it if so, and if not, then do other things in the interim.

6. Conclusion

Here we’ve seen a very brief introduction to what we can achieve with JeroMQ. However, we can do much more with it than we’ve covered here. Why not try it out next time you need to do any form of messaging in your application?

As always, we can find all code from this article over on GitHub.

Course – LS – All

Get started with Spring and Spring Boot, through the Learn Spring course:

>> CHECK OUT THE COURSE
res – REST with Spring (eBook) (everywhere)
Comments are open for 30 days after publishing a post. For any issues past this date, use the Contact form on the site.