Course – LS – All

Get started with Spring and Spring Boot, through the Learn Spring course:

>> CHECK OUT THE COURSE

1. Introduction

In this tutorial, we’ll how to use PostgreSQL’s LISTEN/NOTIFY commands to implement a simple messaging brokering mechanism.

2. A Quick Intro to PostgreSQL’s LISTEN/NOTIFY Mechanism

Simply put, those commands allow connected clients to exchange messages over a regular PostgreSQL connection. A client uses the NOTIFY command to send a notification to a channel along with an optional string payload.

A channel can be any valid SQL identifier, and it works like a topic in traditional messaging systems. This means that the payload will be sent to all active listeners of that particular channel. When there’s no payload, listeners receive just an empty notification.

To start receiving notifications, a client uses the LISTEN command, which takes the channel name as its single parameter. This command returns immediately, thus allowing the client to carry on with other tasks using the same connection.

The notification mechanism has some important properties:

  • Channel names are unique within a database
  • Clients require no special grants to use LISTEN/NOTIFY
  • When NOTIFY is used within a transaction, clients receive the notification only if it completes successfully

Also, clients will receive a single notification if multiple NOTIFY commands are sent to the same channel using the same payload within a transaction.

3. The Case for PostgreSQL as Message Broker

Given the properties of PostgreSQL’s notifications, we could wonder when it would be a viable choice to use it instead of a full-fledged message broker such as RabbitMQ or similar. As usual, there are some tradeoffs. In general, going for the latter means:

  • More complexity – a message broker is another component that must be monitored, upgraded and so on
  • Handling failure modes that come with distributed transactions

The notification mechanism does not suffer from those issues:

  • The functionality is already in place, assuming we’re using PostgreSQL as the main database
  • No distributed transactions

Of course, there are limitations:

  • It is a proprietary mechanism, requiring embracing PostgreSQL forever (or, at least until a major refactoring)
  • No direct support for persistent subscribers. Notifications sent before a client starts listening to messages will be lost

Even with those constraints, there are some potential applications for this mechanism:

  • A notification bus in “modular monolith”-style applications
  • Distributed cache invalidation
  • Lightweight message broker, using plain database tables as queues
  • Event sourcing architectures

4. Using LISTEN/NOTIFY in Spring Boot Applications

Now that we have a basic understanding of the LISTEN/NOTIFY mechanism let’s move on and build a simple Spring Boot test application using it. We’ll create a simple API that allows us to submit buy/sell orders. The payload consists of the instrument symbol, price, and quantity we’re willing to buy or sell. We’ll also add an API that allows us to query an order, given its identifier.

So far, nothing special. But here’s the catch: we want to start serving order queries from the cache right after inserting them into the database. Sure, we could do a cache write-through, but in a distributed scenario where we’d need to scale up the service, we’d also need a distributed cache.

This is where the notification mechanism comes in handy: we’ll send a NOTIFY on every insert, and clients will use LISTEN to preload the order into their respective local caches.

4.1. Project Dependencies

Our sample application requires the regular set of dependencies of a WebMVC SpringBoot application, along with the PostgreSQL driver:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
    <version>2.7.12</version>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-jdbc</artifactId>
    <version>2.7.12</version>
</dependency>
<dependency>
    <groupId>org.postgresql</groupId>
    <artifactId>postgresql</artifactId>
    <version>42.6.0</version>
</dependency>

The latest versions for spring-boot-starter-web, spring-boot-starter-data-jdbc, and postgresql are available on Maven Central.

4.2. Notification Service

Since the notification mechanism is specific to PostgreSQL, we’ll encapsulate its general behavior in a single class: NotifierService. By doing so, we avoid those details leaking into other parts of the application. This also simplifies unit tests, as we can replace this service with a mock version to implement different scenarios.

The NotifierService has two responsibilities. Firstly, it offers a façade to send order-related notifications:

public class NotifierService {
    private static final String ORDERS_CHANNEL = "orders";
    private final JdbcTemplate tpl;
   
    @Transactional
    public void notifyOrderCreated(Order order) {
        tpl.execute("NOTIFY " + ORDERS_CHANNEL + ", '" + order.getId() + "'");
    }
   // ... other methods omitted
}

Secondly, it has a factory method for Runnable instances that the application uses to receive notifications. This factory takes a Consumer of PGNotification objects, which has methods to retrieve both the channel and payload associated with the notification:

public Runnable createNotificationHandler(Consumer<PGNotification> consumer) {        
    return () -> {
        tpl.execute((Connection c) -> {
            c.createStatement().execute("LISTEN " + ORDERS_CHANNEL);                
            PGConnection pgconn = c.unwrap(PGConnection.class);                 
            while(!Thread.currentThread().isInterrupted()) {
                PGNotification[] nts = pgconn.getNotifications(10000);
                if ( nts == null || nts.length == 0 ) {
                    continue;
                }                    
                for( PGNotification nt : nts) {
                    consumer.accept(nt);
                }
            }                
            return 0;
        });                
    };
}

Here, we’ve opted to deliver raw PGNotification for simplicity. In a real-world scenario, where we’d usually be dealing with multiple domain entities, we could extend this class using generics or similar techniques to avoid code duplication.

A few notes about the created Runnable:

  • Database-related logic uses the execute() method of the supplied JdbcTemplate. This ensures proper connection handling/cleanup and simplifies error handling
  • The callback runs until the current thread is interrupted or some runtime error causes it to return.

Notice the use of PGConnection instead of the standard JDBC Connection. We need this to have direct access to the getNotifications() method, which returns one or more queued notifications.

getNotifications() has two variants. When called with no arguments, it polls for any pending notifications and returns them. If there aren’t any, it returns null. The second variant accepts an integer corresponding to the maximum time to wait for notifications until returning null. Finally, if we pass 0 (zero) as the timeout value, getNotifications() will block until new notifications arrive.

During application initialization, we use a CommandLineRunner bean in a @Configuration class that will spawn a new Thread actually to start receiving notifications:

@Configuration
public class ListenerConfiguration {
    
    @Bean
    CommandLineRunner startListener(NotifierService notifier, NotificationHandler handler) {
        return (args) -> {
            Runnable listener = notifier.createNotificationHandler(handler);            
            Thread t = new Thread(listener, "order-listener");
            t.start();
        };
    }
}

4.3. Connection Handling

While technically possible, handling notifications and regular queries using the same connection is not convenient. One would have to spread calls to getNotification() along with the control flow, leading to code that would be hard to read and maintain.

Instead, the standard practice is to run one or more dedicated threads to handle notifications. Each thread has its own connection, which will be kept open all the time. This may pose a problem if those connections are created by a pool such as Hikari or DBCP.

To avoid those issues, our example creates a dedicated DriverDataSource that, in turn, we use to create the JdbcTemplate needed by NotifierService:

@Configuration
public class NotifierConfiguration {

    @Bean
    NotifierService notifier(DataSourceProperties props) {
        
        DriverDataSource ds = new DriverDataSource(
          props.determineUrl(), 
          props.determineDriverClassName(),
          new Properties(), 
          props.determineUsername(),
          props.determinePassword());
        
        JdbcTemplate tpl = new JdbcTemplate(ds);
        return new NotifierService(tpl);
    }
}

Notice that we were sharing the same connection properties used to create the main Spring-managed DataSource. However, we’re not exposing this dedicated DataSource as a bean, which would disable Spring Boot’s auto-configuration facilities.

4.4. Notification Handler

The last piece of the caching logic is the NotificationHandler class, which implements the Consumer<Notification> interface. The role of this class is to process a single notification and populate the configured Cache with Order instances:

@Component
public class NotificationHandler implements Consumer<PGNotification> {
    private final OrdersService orders;

    @Override
    public void accept(PGNotification t) {
        Optional<Order> order = orders.findById(Long.valueOf(t.getParameter()));
        // ... log messages omitted
    }
}

The implementation uses getName() and getParameter() to retrieve the channel name and order identifier from the notification. Here, we can assume that the notification will always be the expected one. This is not out of laziness but stems from the way NotifierService constructs the Runnable on which this handler will be invoked.

The actual logic is straightforward: we use the OrderRepository to fetch the Order from the database and add it to the cache:

@Service
public class OrdersService {
    private final OrdersRepository repo;
    // ... other private fields omitted
   
    @Transactional(readOnly = true)
    public Optional<Order> findById(Long id) {
        Optional<Order> o = Optional.ofNullable(ordersCache.get(id, Order.class));
        if (!o.isEmpty()) {
            log.info("findById: cache hit, id={}",id);
            return o;
        }        
        log.info("findById: cache miss, id={}",id);
        o = repo.findById(id);
        if ( o.isEmpty()) {
            return o;
        }        
        ordersCache.put(id, o.get());
        return o;
    }
}

5. Testing

To see the notification mechanism in action, the best way is to start two or more instances of the test application, each configured to listen on a different port. We also need a working PostgreSQL instance that both instances will connect to. Please refer to the application.properties file and modify it with your PostgreSQL instance connection details.

Next, to start our test environment, we’ll open two shells and use Maven to run the application. The project’s pom.xml contains an additional profile, instance1, that will start the application on a different port:

# On first shell:
$ mvn spring-boot:run
... many messages (omitted)
[  restartedMain] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat started on port(s): 8080 (http) with context path ''
[  restartedMain] c.b.messaging.postgresql.Application     : Started Application in 2.615 seconds (JVM running for 2.944)
[  restartedMain] c.b.m.p.config.ListenerConfiguration     : Starting order listener thread...
[ order-listener] c.b.m.p.service.NotifierService          : notificationHandler: sending LISTEN command...

## On second shell
... many messages (omitted)
[  restartedMain] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat started on port(s): 8081 (http) with context path ''
[  restartedMain] c.b.messaging.postgresql.Application     : Started Application in 1.984 seconds (JVM running for 2.274)
[  restartedMain] c.b.m.p.config.ListenerConfiguration     : Starting order listener thread...
[ order-listener] c.b.m.p.service.NotifierService          : notificationHandler: sending LISTEN command...

After some time, we should see a log message on each one informing us that the application is ready to receive requests. Now, let’s create our first Order using curl on yet another shell:

$ curl --location 'http://localhost:8080/orders/buy' \
--form 'symbol="BAEL"' \
--form 'price="13.34"' \
--form 'quantity="500"'
{"id":30,"symbol":"BAEL","orderType":"BUY","price":13.34,"quantity":500}

The application instance running on port 8080 will print some messages. We’ll also see that the 8081 instance log shows that it received a notification:

[ order-listener] c.b.m.p.service.NotificationHandler    : Notification received: pid=5141, name=orders, param=30
[ order-listener] c.b.m.postgresql.service.OrdersService : findById: cache miss, id=30
[ order-listener] c.b.m.p.service.NotificationHandler    : order details: Order(id=30, symbol=BAEL, orderType=BUY, price=13.34, quantity=500.00)

This is evidence that the mechanism works as intended.
Finally, we can use curl again to query the created Order on instance1:

curl http://localhost:8081/orders/30
{"id":30,"symbol":"BAEL","orderType":"BUY","price":13.34,"quantity":500.00}

As expected, we get the Order details. Moreover, the application log also shows that this information came from the cache:

[nio-8081-exec-1] c.b.m.postgresql.service.OrdersService   : findById: cache hit, id=30

6. Conclusion

In this article, we’ve presented PostgreSQL’s NOTIFY/LISTEN mechanism and how we can use it to implement a lightweight message broker with no extra components.

As usual, all code is available over on GitHub.

Course – LS – All

Get started with Spring and Spring Boot, through the Learn Spring course:

>> CHECK OUT THE COURSE
res – REST with Spring (eBook) (everywhere)
Comments are open for 30 days after publishing a post. For any issues past this date, use the Contact form on the site.