1. Introduction

In this tutorial, we’ll show how to use PostgreSQL’s NOTIFY/LISTEN feature with Spring Integration-based applications.

2. Quick Recap

PostgreSQL offers a lightweight message notification mechanism that allows clients to send notifications to each other using regular database connections. This mechanism uses two non-standard SQL statements, NOTIFY and LISTEN, hence its name.

We’ve already covered this mechanism in more detail in a previous tutorial, so we’ll assume basic knowledge of how to use it. Here, we’ll cover a more specific use case: how to use this mechanism to implement a SubscribableChannel.

3. Dependencies

For this tutorial, we’ll need just the core spring integration library and the PostgreSQL JDBC driver:

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-core</artifactId>
    <version>6.0.0</version>
</dependency>
<dependency>
    <groupId>org.postgresql</groupId>
    <artifactId>postgresql</artifactId>
    <version>42.3.8</version>
</dependency>

The latest versions for spring-integration-core and postgresql are available on Maven Central.

4. What Is a SubscribableChannel?

Spring Integration’s SubscribableChannel interface is a MessageChannel extension that supports asynchronous message delivery to subscribers. It adds two additional methods to its parent:

  • subscribe(MessageHandler handler)
  • unsubscribe(MessageHandler handler)

Those methods allow a client to register/unregister a MessageHandler instance to process received messages.

Despite its similarities with reactive message dispatching, there’s a fundamental difference: here we have a push-based model, whereas in the reactive world is pull-based. This means that there’s no implicit flow control, and is up to consumers to implement any buffering/discarding strategy to handle excessive traffic.

Out-of-the-box, Spring Integration comes with a simple implementation of this interface, PublishSubscribeChannel, but this implementation works only on the same VM instance. Using the NOTIFY/LISTEN mechanism, we’ll no longer have this limitation.

5. SubscribableChannel Implementation

We’ll base our implementation on a readily available base class from spring integration’s core: AbstractSubscribableChannel. Although not strictly necessary, opting for this approach has some benefits:

  • Management: important for production, exposes key metrics to monitor the system’s health, and helps with performance issues troubleshooting
  • Interceptors: allows client code to add ChannelInterceptors that can inspect messages before/after they’re processed

The implementation itself consists of two main parts: message delivery and dispatch.

5.1. Message Delivery

This functionality corresponds to the producer side of a channel. A message producer would typically use the standard send() method available on the MessageChannel interface or use a MesssageGateway that wraps the channel with a user-friendly interface.

Since we’re leveraging AbstractSubscribableChannel, which in turn extends AbstractMessageChannel, all we must do is implement the doSend() method. Here, we’ll use NOTIFY to send the message to PostgreSQL, which will then deliver it to any client that has issued a LISTEN command for the same channel.

@Override
protected boolean doSend(Message<?> message, long timeout) {
    try {
        String msg = prepareNotifyPayload(message);
        try (Connection c = ds.getConnection()) {
            c.createStatement().execute("NOTIFY " + channelName + ", '" + msg + "'");
        }
        return true;
    } catch (Exception ex) {
        throw new MessageDeliveryException(message, "Unable to deliver message: " + ex.getMessage(), ex);
    }
}

We use the DataSource and channel name passed through the constructor to get a database connection and as the notification’s channel name, respectively.

The prepareNotifyPayload() method converts the incoming Message object into a JSON string suitable for use as the notification payload:

protected String prepareNotifyPayload(Message<?> message) throws JsonProcessingException {
    Map<String, Object> rawMap = new HashMap<>();
    rawMap.putAll(message.getHeaders());
    JsonNode headerData = om.valueToTree(rawMap);
    JsonNode bodyData = om.valueToTree(message.getPayload());
    ObjectNode msg = om.getNodeFactory().objectNode();
    msg.set(HEADER_FIELD, headerData);
    msg.set(BODY_FIELD, bodyData);
    return om.writeValueAsString(msg);
}

This approach has an important limitation: by default, PostgreSQL limits the size of the notification payload to about 8 KB. In general, this is enough for messages that simply signal an event but clearly insufficient in integration scenarios where a message may carry the full content of a file.

In those cases, a better approach is to store the “large” data part in some shared storage (e.g., a database table or an S3 bucket) and send a reference to it in the message.

5.2. Message Dispatch

This code takes care of receiving asynchronous notifications from PostgreSQL and dispatching them to subscribers. Since it’s pointless to listen for notifications when there are no subscribers, the implementation will only start the background thread responsible for this task when someone calls subscribe():

@Override
public boolean subscribe(MessageHandler handler) {
    boolean r = super.subscribe(handler);
    if (r && super.getSubscriberCount() == 1) {
        startListenerThread();
    }
    return r;
}

Similarly, we’ll stop the listener when there’re no more subscribers:

@Override
public boolean unsubscribe(MessageHandler handle) {
    boolean r = super.unsubscribe(handle);
    if (r && super.getSubscriberCount() == 0) {
        stopListenerThread();
    }
    return r;
}

The background listener thread will issue an initial LISTEN statement and then loop through the received notifications:

@Override
public void run() {
    startLatch.countDown();
    try (Statement st = conn.createStatement()) {
        st.execute("LISTEN " + channelName);
        PGConnection pgConn = conn.unwrap(PGConnection.class);

        while (!Thread.currentThread().isInterrupted()) {
            PGNotification[] nts = pgConn.getNotifications();
            for (PGNotification n : nts) {
                Message<?> msg = convertNotification(n);
                getDispatcher().dispatch(msg);
            }
        }
    } catch (SQLException sex) {
        // ... exception handling omitted
    } finally {
        stopLatch.countDown();
    }
}

For each received notification, we first convert it to a Message, and then we delegate the actual delivery to the configured dispatcher. This takes care of everything related to this task, like invoking interceptors, updating metrics, and so on.

6. Integration Example

Now that we have our implementation in place, let’s test it with a simple integration scenario with it. We’ll use this channel to deliver BUY/SELL Order messages, and, on the receiving end, we’ll have a subscriber that receives those orders and keep up a balance of those transactions for each symbol.

Firstly, let’s create a @Bean for our SubscribableChannel:

@Bean
static SubscribableChannel orders(@Value("${db.url}") String url, @Value("${db.username}") String username, 
  @Value("${db.password}") String password) {
    
    SingleConnectionDataSource ds = new SingleConnectionDataSource(url, username, password, true);      
    Supplier<Connection> connectionSupplier = () -> {
        try {
            return ds.getConnection();
        }
        catch(SQLException ex) {
            throw new RuntimeException(ex);
        }
    };
    
    PGSimpleDataSource pgds = new PGSimpleDataSource();
    pgds.setUrl(url);
    pgds.setUser(username);
    pgds.setPassword(password);
    
    return new PostgresSubscribableChannel("orders", connectionSupplier, pgds, new ObjectMapper());
}

Notice that we use the supplied database URL and credentials to create two DataSource objects. The first one is a SingleConnectionDataSource that we, as a source for the required connection supplier, used to receive notifications. The second DataSource is used for sending notifications and uses a PostgreSQL native implementation.

Secondly, we create a @ServiceActivator method to receive orders:

@ServiceActivator(inputChannel = "orderProcessor")
void processOrder(Order order){
    BigDecimal orderTotal = order.getQuantity().multiply(order.getPrice());
    if (order.getOrderType() == OrderType.SELL) {
        orderTotal = orderTotal.negate();
    }
    
    BigDecimal sum = orderSummary.get(order.getSymbol());
    if (sum == null) {
        sum = orderTotal;
    } else {
        sum = sum.add(orderTotal);
    }    
    orderSummary.put(order.getSymbol(), sum);
    orderSemaphore.release();    
}

We use a Semaphore to keep track of received messages. Its main purpose is to make testing a bit easier, as we can use it to synchronize the main thread running tests with messages received and processed in the background.

Lastly, we also need a @Transformer to convert the received message payloads, which are JsonNode instances, into Order objects:

@Transformer(inputChannel = "orders", outputChannel = "orderProcessor" )
Order validatedOrders(Message<?> orderMessage)  throws JsonProcessingException {
    ObjectNode on = (ObjectNode)orderMessage.getPayload();
    Order order = om.treeToValue(on, Order.class);
    return order;
}

Notice that transformed messages go to the orderProcessor channel, which will be automatically created by Spring Integration. Unless we explicitly define a channel with this name, the actual channel will be a DirectChannel, that simply ties producers and consumers together.

Alternatively, we could define this channel as a QueueChannel or similar. This would provide a buffer to store messages, thus allowing our system to cope with any temporary message surges.

7. Testing

Finally, let’s write an integration test to see all this working together:

@SpringJUnitConfig(classes = {PostgresqlPubSubExample.class})
public class PostgresqlPubSubExampleLiveTest {
    
    @Autowired
    PostgresqlPubSubExample processor;
    
    @Autowired
    OrdersGateway ordersGateway;

    @Test
    void whenPublishOrder_thenSuccess() throws Exception {
        
        Order o = new Order(1l,"BAEL", OrderType.BUY, BigDecimal.valueOf(2.0), BigDecimal.valueOf(5.0));
        ordersGateway.publish(o);
        
        assertThat(processor.awaitNextMessage(10, TimeUnit.SECONDS)).isTrue();
        
        BigDecimal total = processor.getTotalBySymbol("BAEL");
        assertThat(total).isEqualTo(BigDecimal.valueOf(10));
    }
}

We’re leveraging Spring’s test facilities to instantiate all required beans so we can just send an order using the MessageGateway that wraps our channel. Once we send a message, the test uses the awaitNextMessage() helper method before querying the total order value.

Please notice that since this is an integration test, we must have a running PostgreSQL instance available and the required credentials to use it.

8. Note for Spring Integration 6 Users

Starting with version 6, Spring Integration comes with the PostgresSubscribableChannel class that implements SubscribableChannel. This version, however, requires Spring 6 and therefore implies using Java 17 as a baseline for developing our applications.

This new implementation doesn’t have the same limitation on payload size as the code in this tutorial but requires creating a table on the database to store it. However, since Java 8 and 11 still represent a large share of the existing applications, the technique described here will still be applicable for some time.

9. Conclusion

In this tutorial, we’ve shown how to leverage the NOTIFY/LISTEN mechanism available on PostgreSQL to implement asynchronous message delivery in Spring Integration applications.

As usual, the full code is available over on GitHub.

Course – LSD (cat=Persistence)

Get started with Spring Data JPA through the reference Learn Spring Data JPA course:

>> CHECK OUT THE COURSE
res – Persistence (eBook) (cat=Persistence)
2 Comments
Oldest
Newest
Inline Feedbacks
View all comments
Comments are open for 30 days after publishing a post. For any issues past this date, use the Contact form on the site.