I just announced the new Spring Boot 2 material, coming in REST With Spring:

>> CHECK OUT THE COURSE

1. Overview

In this tutorial, we’ll see how we can add MQTT messaging in a Java project using the libraries provided by the Eclipse Paho project.

2. MQTT Primer

MQTT (MQ Telemetry Transport) is a messaging protocol that was created to address the need for a simple and lightweight method to transfer data to/from low-powered devices, such as those used in industrial applications.

With the increased popularity of IoT (Internet of Things) devices, MQTT has seen an increased use, leading to its standardization by OASIS and ISO.

The protocol supports a single messaging pattern, namely the Publish-Subscribe pattern: each message sent by a client contains an associated “topic” which is used by the broker to route it to subscribed clients. Topics names can be simple strings like “oiltemp” or a path-like string “motor/1/rpm“.

In order to receive messages, a client subscribes to one or more topics using its exact name or a string containing one of the supported wildcards (“#” for multi-level topics and “+” for single-level”).

3. Project Setup

In order to include the Paho library in a Maven project, we have to add the following dependency:

<dependency>
  <groupId>org.eclipse.paho</groupId>
  <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
  <version>1.2.0</version>
</dependency>

The latest version of the Eclipse Paho Java library module can be downloaded from Maven Central.

4. Client Setup

When using the Paho library, the first thing we need to do in order to send and/or receive messages from an MQTT broker is to obtain an implementation of the IMqttClient interfaceThis interface contains all methods required by an application in order to establish a connection to the server, send and receive messages.

Paho comes out of the box with two implementations of this interface, an asynchronous one (MqttAsyncClient) and a synchronous one (MqttClient). In our case, we’ll focus on the synchronous version, which has simpler semantics.

The setup itself is a two-step process: we first create an instance of the MqttClient class and then we connect it to our server. The following subsection detail those steps.

4.1. Creating a New IMqttClient Instance

The following code snippet shows how to create a new IMqttClient synchronous instance:

String publisherId = UUID.randomUUID().toString();
IMqttClient publisher = new MqttClient("tcp://iot.eclipse.org:1883",publisherId);

In this case, we’re using the simplest constructor available, which takes the endpoint address of our MQTT broker and a client identifier, which uniquely identifies our client.

In our case, we used a random UUID, so a new client identifier will be generated on every run.

Paho also provides additional constructors that we can use in order to customize the persistence mechanism used to store unacknowledged messages and/or the ScheduledExecutorService used to run background tasks required by the protocol engine implementation.

The server endpoint we’re using is a public MQTT broker hosted by the Paho project, which allows anyone with an internet connection to test clients without the need of any authentication.

4.2. Connecting to the Server

Our newly created MqttClient instance is not connected to the server. We do so by calling its connect() method, optionally passing a MqttConnectOptions instance that allows us to customize some aspects of the protocol.

In particular, we can use those options to pass additional information such as security credentials, session recovery mode, reconnection mode and so on.

The MqttConnectionOptions class expose those options as simple properties that we can set using normal setter methods. We only need to set the properties required for our scenario – the remaining ones will assume default values.

The code used to establish a connection to the server typically looks like this:

MqttConnectOptions options = new MqttConnectOptions();
options.setAutomaticReconnect(true);
options.setCleanSession(true);
options.setConnectionTimeout(10);
publisher.connect(options);

Here, we define our connection options so that:

  • The library will automatically try to reconnect to the server in the event of a network failure
  • It will discard unsent messages from a previous run
  • Connection timeout is set to 10 seconds

5. Sending Messages

Sending messages using an already connected MqttClient is very straightforward. We use one of the publish() method variants to send the payload, which is always a byte array, to a given topic, using one of the following quality-of-service options:

  • 0 – “at most once” semantics, also known as “fire-and-forget”. Use this option when message loss is acceptable, as it does not require any kind of acknowledgment or persistence
  • 1 – “at least once” semantics. Use this option when message loss is not acceptable and your subscribers can handle duplicates
  • 2 – “exactly once” semantics. Use this option when message loss is not acceptable and your subscribers cannot handle duplicates

In our sample project, the EngineTemperatureSensor class plays the role of a mock sensor that produces a new temperature reading every time we invoke its call() method.

This class implements the Callable interface so we can easily use it with one of the ExecutorService implementations available in the java.util.concurrent package:

public class EngineTemperatureSensor implements Callable<Void> {

    // ... private members omitted
    
    public EngineTemperatureSensor(IMqttClient client) {
        this.client = client;
    }

    @Override
    public Void call() throws Exception {        
        if ( !client.isConnected()) {
            return null;
        }           
        MqttMessage msg = readEngineTemp();
        msg.setQos(0);
        msg.setRetained(true);
        client.publish(TOPIC,msg);        
        return null;        
    }

    private MqttMessage readEngineTemp() {             
        double temp =  80 + rnd.nextDouble() * 20.0;        
        byte[] payload = String.format("T:%04.2f",temp)
          .getBytes();        
        retrun new MqttMessage(payload);           
    }
}

The MqttMessage encapsulates the payload itself, the requested Quality-of-Service and also the retained flag for the message. This flag indicates to the broker that it should retain this message until consumed by a subscriber.

We can use this feature to implement a “last known good” behavior, so when a new subscriber connects to the server, it will receive the retained message right away.

6. Receiving Messages

In order to receive messages from the MQTT broker, we need to use one of the subscribe() method variants, which allow us to specify:

  • One or more topic filters for messages we want to receive
  • The associated QoS
  • The callback handler to process received messages

In the following example, we show how to add a message listener to an existing IMqttClient instance to receive messages from a given topic. We use a CountDownLatch as a synchronization mechanism between our callback and the main execution thread, decrementing it every time a new message arrives.

In the sample code, we’ve used a different IMqttClient instance to receive messages. We did it just to make more clear which client does what, but this is not a Paho limitation – if you want, you can use the same client for publishing and receiving messages:

CountDownLatch receivedSignal = new CountDownLatch(10);
subscriber.subscribe(EngineTemperatureSensor.TOPIC, (topic, msg) -> {
    byte[] payload = msg.getPayload();
    // ... payload handling omitted
    receivedSignal.countDown();
});    
receivedSignal.await(1, TimeUnit.MINUTES);

The subscribe() variant used above takes an IMqttMessageListener instance as its second argument.

In our case, we use a simple lambda function that processes the payload and decrements a counter. If not enough messages arrive in the specified time window (1 minute), the await() method will throw an exception.

When using Paho, we don’t need to explicitly acknowledge message receipt. If the callback returns normally, Paho assumes it a successful consumption and sends an acknowledgment to the server.

If the callback throws an Exception, the client will be shut down. Please note that this will result in loss of any messages sent with QoS level of 0.

Messages sent with QoS level 1 or 2 will be resent by the server once the client is reconnected and subscribes to the topic again.

7. Conclusion

In this article, we demonstrated how we can add support for the MQTT protocol in our Java applications using the library provided by the Eclipse Paho project.

This library handles all low-level protocol details, allowing us to focus on other aspects of our solution, while leaving good space to customize important aspects of its internal features, such as message persistence.

The code shown in this article is available over on GitHub.

I just announced the new Spring Boot 2 material, coming in REST With Spring:

>> CHECK OUT THE LESSONS