
Learn through the super-clean Baeldung Pro experience:
>> Membership and Baeldung Pro.
No ads, dark-mode and 6 months free of IntelliJ Idea Ultimate to start with.
Last updated: December 29, 2024
Publish-subscribe (PubSub) is a messaging pattern in which publishers send messages to a channel and subscribers listen to messages on that channel. Redis is a popular in-memory data store that also supports messaging.
In this tutorial, we’ll explore implementing a publish-subscribe messaging system using Kotlin and Redis. To do this, we’ll build a small application in which a publisher sends messages to a Redis channel and a subscriber listens to messages on that channel.
Let’s start by setting up our Kotlin project and adding the building blocks for our messaging system.
We’ll use the lettuce-core library to interact with Redis in our Kotlin application:
<dependencies>
<dependency>
<groupId>io.lettuce.core</groupId>
<artifactId>lettuce-core</artifactId>
<version>6.5.0.RELEASE</version>
</dependency>
</dependencies>
Additionally, we’ll use an embedded Redis server for testing purposes to avoid using an external Redis server:
<dependency>
<groupId>com.github.kstyrc</groupId>
<artifactId>embedded-redis</artifactId>
<version>1.4.3</version>
</dependency>
Next, we’ll create a Redis connection using Lettuce:
object RedisConnectionManager: AutoCloseable {
private val redisClient: RedisClient = RedisClient.create("redis://localhost:6379")
private val connection: StatefulRedisConnection<String, String> = redisClient.connect()
override fun close() {
connection.close()
redisClient.shutdown()
}
}
We create a singleton object RedisConnectionManager that manages the Redis connection. In this class, we instantiate a RedisClient instance using the create() method and then connect() to the embedded Redis server.
Additionally, we define a close() method to close the connection. If not called explicitly, the connection closes when the object is garbage collected as we implement the AutoCloseable interface.
Additionally, let’s define methods to expose Redis’ sync() and async() commands. Specifically, we’ll use the sync methods to publish messages to Redis. Conversely, async commands are required to create a subscriber and listen to a channel.
First, we’ll manage the sync commands:
fun redisSyncCommands(): RedisCommands<String, String>? {
return connection.sync()
}
Significantly, the redisSyncCommands() method gets the synchronous operations from the connection and returns an object encapsulating them.
Next, we’ll look at the async commands:
fun redisPubSubAsyncCommands(messageListener: MessageListener): RedisPubSubAsyncCommands<String, String> {
val pubSubConnection = redisClient.connectPubSub()
pubSubConnection.addListener(messageListener)
return pubSubConnection.async()
}
The redisPubSubAsyncCommands() method creates a pub-sub connection to the server, adds a listener, and finally returns the async commands for the connection.
To represent our message, let’s define a simple data class. This class holds the message content:
data class Message(val content: String)
Here, we define a Message data class with a single content property. This class encapsulates the messages that we’ll send through Redis.
Now we can create a consumer that listens to messages on a specific channel. This requires a subscription to the channel and a listener to handle incoming messages.
First, let’s create a class that handles incoming messages. One of the simplest ways to achieve this is by extending the RedisPubSubAdapter class provided by Lettuce. It’s an abstract class that implements the RedisMessageListener interface:
class MessageListener : RedisPubSubAdapter<String, String>() {
var latch: CountDownLatch = CountDownLatch(1)
var messagesReceived: List<String> = emptyList()
override fun message(channel: String?, message: String?) {
println("Received message: $message from channel: $channel")
messagesReceived = messagesReceived.plus(message!!)
latch.countDown()
}
}
The message() method is called whenever a message is received on the subscribed channel. In this method, we log the received message and the channel name.
For testing purposes, we also use a CountDownLatch to wait for messages. We’ll store any messages received by the listener in messagesReceived so that we can verify them later.
Furthermore, we need to connect our listener to a Redis channel. For this, we’ll also create a subscriber that subscribes to a specific channel and listens for messages.
Let’s create a method to subscribe to a channel:
class RedisSubscriber(private val messageListener: MessageListener) {
fun subscribeToChannel(channel: String) {
RedisConnectionManager.redisPubSubAsyncCommands(messageListener).subscribe(channel)
}
}
Here are a few key points about the RedisSubscriber class:
When a message is received on the channel, the message() method of the MessageListener class is called, and the message content is logged.
It’s important to note that Redis doesn’t guarantee message delivery. If the subscriber isn’t listening when the publisher publishes, it won’t receive the message.
Now that we have a consumer listening to messages, let’s create a publisher that sends messages to a Redis channel.
We’ll create a method that publishes a message to a specific channel:
class RedisPublisher {
fun publishMessage(channel: String, message: Message) {
RedisConnectionManager.redisCommands()?.publish(channel, message.content)
println("Message published: $message")
}
}
This method takes a channel name and a Message object as parameters. Using the commands from the RedisConnectionManager, it publishes the message content to the specified channel using the publish() method.
To test our messaging system, we’ll write a test case that publishes a message to a channel and verifies that the consumer receives the message.
To start, let’s create a test class and set up our connection:
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class RedisSubscriberUnitTest {
val messageListener = MessageListener()
val redisSubscriber = RedisSubscriber(messageListener)
val redisPublisher = RedisPublisher()
val channel = "channel"
val message = Message("Hello, Redis!")
val redisServer = RedisServer(6379)
@BeforeAll
fun setUp() {
redisServer.start()
}
@AfterAll
fun tearDown() {
RedisConnectionManager.close()
redisServer.stop()
}
}
In the setUp() method, we start an embedded Redis server using the RedisServer class of the embedded-redis library. Similarly, in the tearDown() method, we close the Redis connection and stop the embedded Redis server.
Now, let’s write a test case that publishes a message to a channel and verifies that the consumer receives the message:
@Test
fun givenMessageListener_whenMessagePublished_thenMessageReceived() {
redisSubscriber.subscribeToChannel(channel)
redisPublisher.publishMessage(channel, message)
messageListener.latch.await(500, TimeUnit.MILLISECONDS)
assertEquals(message.content, messageListener.messagesReceived.get(0))
}
First, we subscribe to the channel using the RedisSubscriber instance, or we’ll miss the published message. Then, we publish a message to the channel using the RedisPublisher instance.
The latch in the MessageListener class waits for the message to be received.
Finally, we assert that the message the listener receives is the same as the message published.
In this article, we explored implementing a publish-subscribe messaging system using Kotlin and Redis. We set up a Redis connection, created a consumer to listen to messages on a channel, and finally a publisher to send messages to the channel. We also tested our system by publishing and consuming messages.