1. Overview

Apache Pulsar is a powerful messaging and queuing system. Its cloud-native approach allows it to handle unlimited events seamlessly and almost instantly.

In this tutorial, we’ll explore how to integrate Pulsar into our codebase using the Scala library pulsar4s.

2. The pulsar4s Library

The pulsar4s library can be used for various scenarios, where the producer and consumer are part of the same application or are in entirely different processes, e.g., in a microservices architecture. The library itself is really straightforward.

The only thing we have to do to get started is to add the following dependency in our build.sbt file:

libraryDependencies += "com.clever-cloud.pulsar4s" %% "pulsar4s-core" % "2.9.0"

2.1. Creating a Client

After we’ve loaded the required dependency, we create a client that connects to our Pulsar instance. Let’s create a class to handle this:

class PulsarClient(pulsarServiceUrl: String)

The class accepts as an argument a String that represents the URL where Pulsar can be found. Assuming that our Pulsar instance is in the same machine that our code will run, the URL has the following format:

pulsar://localhost:8080

8080 is the default port Pulsar is initiated and we can always configure it to use a different one.

As we have our connection URL, we’ll create a configuration instance that holds the connection configuration that our producers and consumers will later use to connect:

private val config: PulsarClientConfig = PulsarClientConfig(pulsarServiceUrl)
private val client: PulsarAsyncClient = PulsarClient(config)

For our example, the connection URL is enough to create a client, but we can use the PulsarClientConfig for more complex needs like authentication.

2.2. Creating a Producer

Next, we need to create a producer to generate messages and then post them to our Pulsar instance. For that, we’ll add a new method to our PulsarClient class:

def producer[T: Schema](producerConfig: ProducerConfig): Producer[T] =
  client.producer[T](producerConfig)

The method accepts an argument of type ProducerConfig and creates a producer. The producer will produce a message of type T, a subtype of Schema.

A Schema is actually a typeclass that allows the library to know how to encode and decode the message. The Schema is implicitly passed to the client.

There are built-in schemas for simple types, like strings, bytes, boolean, date and number types, but more complex ones require custom schemas. For our example, we will use the built-in schema for strings.

Since now we have a producer, let’s define a class to produce messages to our Pulsar instance:

class PulsarProducer(pulsarClient: PulsarClient) {

  implicit val schema: Schema[String] = Schema.STRING

  val topic: Topic = Topic("pulsar4s-topic")
  val producerConfig: ProducerConfig = ProducerConfig(topic)
  val producer: Producer[String] = pulsarClient.producer[String](producerConfig)

  def sendMessage(key: String, message: String): Try[MessageId] =
    producer.send(
      DefaultProducerMessage[String](Some(key), message)
    )
}

We’ve defined within our producer class the implicit schema we’ll use to encode our messages. Then, we create a ProducerConfig that’s posting messages to the topic pulsar4s-topic and create a producer instance with the method we defined in our PulsarClient class.

After that, we define a method that accepts a key and a message and posts it to Pulsar. We wrap these values in a DefaultProducerMessage class and send it using our producer. We should keep in mind that there’s no need to define a key, but it’s useful for topic compaction.

The method we defined is synchronous and returns a Try that either contains a MessageId with useful information about our sent message, or the exception thrown, e.g., that the Pulsar instance isn’t available.

2.3. Creating a Consumer

We’re now able to produce messages and the next step is to consume them. For this we’ll create a new method in our PulsarClient class to generate a consumer:

def consumer[T: Schema](consumerConfig: ConsumerConfig): Consumer[T] =
  client.consumer[T](consumerConfig)

The method does pretty much the same as before. It accepts a ConsumerConfig as an argument and generates a consumer that consumes messages of type T.

So now, we can implement a class to do the actual consumption of our messages:

class PulsarConsumer(pulsarClient: PulsarClient)(implicit
  executionContext: ExecutionContext
) {

  implicit val schema: Schema[String] = Schema.STRING

  val topic: Topic = Topic("pulsar4s-topic")
  val consumerConfig: ConsumerConfig =
    ConsumerConfig(Subscription.generate, Seq(topic))
  val consumer: Consumer[String] = pulsarClient.consumer[String](consumerConfig)

  def consume(): Try[ConsumerMessage[String]] =
    consumer.receive.map(message => {
      consumer.acknowledge(message.messageId)
      message
    })

  def consumeAsync(): Future[ConsumerMessage[String]] =
    consumer.receiveAsync.map(message => {
      consumer.acknowledge(message.messageId)
      message
    })
}

We’ve again defined our implicit schema and created a ConsumerConfig. Our consumer needs a name and the method Subscription.generate(), a helper method, generates a random UUID name.

The consumer can also connect to multiple topics, so we pass a list of Topics in the configuration. In our example, the list contains only one Topic. Then, creating our consumer is as simple as calling the method we defined in our PulsarClient class.

We can now consume messages using two different approaches, synchronous and asynchronous.

The synchronous method is very simple. We only need to call the receive() method of our consumer that returns a Try containing our message. If we successfully consume the message, it’s very important to acknowledge the consumption using the acknowledge() method and passing the message’s id.

The asynchronous consumption is actually the same as the synchronous one in coding terms. But it’s invoked with the receiveAsync() method and it returns a Future. Therefore, we also need to have an implicit ExecutionContext argument. Again, it’s important to acknowledge the message consumption.

3. Schemas Extensions

As stated above, pulsar4s provides built-in schemas for simple types, but more complex ones need different handling.

For JSON and Avro encoding, pulsar4s provides extensions with known Scala libraries, such as Circe, Jackson, Spray Json and Play Json. To see this, let’s create a custom message class that we’ll encode using JSON format:

case class PulsarMessage(id: Long, message: String, createdAt: Long)

Our class is pretty simple. It only contains an id, our actual message and information for its creation time in epoch time.

So now, we need to add our JSON extension of preference. We’ll use the Jackson extension, by adding the following line in our build.sbt file:

libraryDependencies += "com.clever-cloud.pulsar4s" %% "pulsar4s-jackson" % "2.9.0"

3.1. The JSON Producer

Let’s define our producer that will produce messages to the ‘pulsar4s-json-topic’ topic:

import com.sksamuel.pulsar4s.jackson._

class JsonPulsarProducer(pulsarClient: PulsarClient) {

  val topic: Topic = Topic("pulsar4s-json-topic")
  val producerConfig: ProducerConfig = ProducerConfig(topic)
  val producer: Producer[PulsarMessage] =
    pulsarClient.producer[PulsarMessage](producerConfig)

  def sendMessage(key: String, message: PulsarMessage): Try[MessageId] =
    producer.send(DefaultProducerMessage[PulsarMessage](Some(key), message))

}

Our producer is actually the same as the previous one that was using the string schema, but it now uses a PulsarMessage schema instead. We haven’t defined the implicit schema anywhere, but the import we added generates one for our message class.

3.2. The JSON Consumer

The definition of our consumer is again simple:

import com.sksamuel.pulsar4s.jackson._

class JsonPulsarConsumer(pulsarClient: PulsarClient)(implicit
  executionContext: ExecutionContext
) {

  val topic: Topic = Topic("pulsar4s-json-topic")
  val consumerConfig: ConsumerConfig =
    ConsumerConfig(Subscription.generate, Seq(topic))
  val consumer: Consumer[PulsarMessage] =
    pulsarClient.consumer[PulsarMessage](consumerConfig)

  def consume(): Try[ConsumerMessage[PulsarMessage]] =
    consumer.receive.map(message => {
      consumer.acknowledge(message.messageId)
      message
    })

  def consumeAsync(): Future[ConsumerMessage[PulsarMessage]] =
    consumer.receiveAsync.map(message => {
      consumer.acknowledge(message.messageId)
      message
    })
}

Again, there are no big changes. Our import does all the work and we only have to define a consumer that will consume messages of type PulsarMessage from the ‘pulsar4s-json-topic’ topic.

4. Libraries Integrations

Pulsar4s has various extensions and can be integrated with powerful libraries. By adding the corresponding module in our dependencies, we can integrate seamlessly with Akka Streams, FS2, Monix, Scalaz, Cats and ZIO.

5. Conclusion

In this article, we learned how to create a simple producer/consumer pair using the pulsar4s library, as well as use JSON encoding in our messages with the library’s extensions.

As usual, the code is available over on GitHub.

Comments are closed on this article!