Baeldung Pro – Scala – NPI EA (cat = Baeldung on Scala)
announcement - icon

Learn through the super-clean Baeldung Pro experience:

>> Membership and Baeldung Pro.

No ads, dark-mode and 6 months free of IntelliJ Idea Ultimate to start with.

1. Introduction

ElasticMQ is an in-memory AWS SQS (Simple Queue System) compatible message queue built with Scala for embedded or standalone use.

In this tutorial, we’ll go through ElasticMQ orchestration, configuration, and use from a Scala client.

2. ElasticMQ Setup

ElasticMQ can be run standalone, embedded, or using an official docker image.

2.1. ElasticMQ Standalone

To set up a standalone ElasticMQ instance, we can download the ElasticMQ jar application with wget:

$ wget https://s3-eu-west-1.amazonaws.com/softwaremill-public/elasticmq-server-1.6.5.jar

Then we can start it:

$ java -jar elasticmq-server-1.6.5.jar
11:56:36.506 [main] INFO  org.elasticmq.server.Main$ - Starting ElasticMQ server (1.6.5) ...
11:56:39.109 [elasticmq-pekko.actor.default-dispatcher-4] INFO  o.a.pekko.event.slf4j.Slf4jLogger - Slf4jLogger started
11:56:43.740 [elasticmq-pekko.actor.default-dispatcher-4] INFO  o.e.rest.sqs.TheSQSRestServerBuilder - Started SQS rest server, bind address 0.0.0.0:9324, visible server address http://localhost:9324
11:56:44.153 [main] INFO  o.e.rest.sqs.TheSQSRestServerBuilder - Metrics MBean org.elasticmq:name=Queues successfully registered
11:56:44.326 [elasticmq-pekko.actor.default-dispatcher-4] INFO  o.e.r.s.TheStatisticsRestServerBuilder - Started statistics rest server, bind address 0.0.0.0:9325
11:56:44.344 [main] INFO  org.elasticmq.server.Main$ - === ElasticMQ server (1.6.5) started in 10093 ms ===

2.2. ElasticMQ Embedded

To create an embedded ElasticMQ instance, we use the ElasticMQ REST client library:

libraryDependencies += "org.elasticmq" %% "elasticmq-rest-sqs" % "1.6.5"

We can check the latest version on the MVN Repository.

To start the embedded REST server, we use the SQSRestServerBuilder:

val server = SQSRestServerBuilder
  .withPort(9325)
  .withInterface("localhost")
  .start()

To stop the embedded REST server, we use the stopAndWait() method:

server.stopAndWait()

The SQSRestServerBuilder is a default instance of TheSQSRestServerBuilder:

object SQSRestServerBuilder extends TheSQSRestServerBuilder(
  None,
  None,
  "",
  9324,
  NodeAddress(),
  true,
  StrictSQSLimits,
  "elasticmq",
  "000000000000",
  None
)

TheSQSRestServerBuilder has the following signature:

case class TheSQSRestServerBuilder(
  providedActorSystem: Option[ActorSystem],
  providedQueueManagerActor: Option[ActorRef],
  interface: String,
  port: Int,
  serverAddress: NodeAddress,
  generateServerAddress: Boolean,
  sqsLimits: Limits,
  _awsRegion: String,
  _awsAccountId: String,
  queueEventListener: Option[ActorRef]
)

If a providedActorSystem is provided, it won’t be managed by the server. If not, one will be created and managed by the server. A providedQueueManagerActor is the main ElasticMQ actor. A queueEventListener listens to changes in queues and messages. The rest is standard SQS configuration.

2.3. ElasticMQ Docker

To set up a docker ElasticMQ instance, let’s use the softwaremill/elasticmq-native image and expose the necessary ports:

$ docker run -p 9324:9324 -p 9325:9325 softwaremill/elasticmq-native
18:09:32.103 [main] INFO  org.elasticmq.server.Main$ - Starting ElasticMQ server (1.6.5) ...
18:09:32.756 [elasticmq-pekko.actor.default-dispatcher-7] INFO  o.a.pekko.event.slf4j.Slf4jLogger - Slf4jLogger started
18:09:33.226 [elasticmq-pekko.actor.default-dispatcher-8] INFO  o.e.rest.sqs.TheSQSRestServerBuilder - Started SQS rest server, bind address 0.0.0.0:9324, visible server address http://localhost:9324
18:09:33.228 [main] INFO  o.e.rest.sqs.TheSQSRestServerBuilder - Metrics MBean org.elasticmq:name=Queues successfully registered
18:09:33.259 [elasticmq-pekko.actor.default-dispatcher-8] INFO  o.e.r.s.TheStatisticsRestServerBuilder - Started statistics rest server, bind address 0.0.0.0:9325
18:09:33.259 [main] INFO  org.elasticmq.server.Main$ - === ElasticMQ server (1.6.5) started in 1581 ms ===

The native image is compiled using GraalVM for faster startup time and smaller bundle size, but we can also use the JVM-based softwaremill/elasticmq image.

3. Configuration

To configure an ElasticMQ instance, we use the HOCON (Human-Optimized Config Object Notation) configuration language. This is the standard for many Scala ecosystem projects.

Let’s consider a basic configuration file:

$ cat elasticmq.conf
# What is the outside visible address of this ElasticMQ node
# Used to create the queue URL (may be different from bind address!)
node-address {
  protocol = http
  host = localhost
  port = 9323
  context-path = ""
}

rest-sqs {
  enabled = true
  bind-port = 9323
  bind-hostname = "0.0.0.0"
  # Possible values: relaxed, strict
  sqs-limits = strict
}

rest-stats {
  enabled = true
  bind-port = 9324
  bind-hostname = "0.0.0.0"
}

# Should the node-address be generated from the bind port/hostname
# Set this to true e.g. when assigning port automatically by using port 0.
generate-node-address = false

We can see the ports have changed given our configuration file:

$ java -Dconfig.file=elasticmq.conf -jar elasticmq-server-1.6.5.jar
12:08:52.665 [main] INFO  org.elasticmq.server.Main$ - Starting ElasticMQ server (1.6.5) ...
12:08:59.786 [elasticmq-pekko.actor.default-dispatcher-4] INFO  o.e.rest.sqs.TheSQSRestServerBuilder - Started SQS rest server, bind address 0.0.0.0:9323, visible server address http://localhost:9323
12:09:00.075 [main] INFO  o.e.rest.sqs.TheSQSRestServerBuilder - Metrics MBean org.elasticmq:name=Queues successfully registered
12:09:00.246 [elasticmq-pekko.actor.default-dispatcher-5] INFO  o.e.r.s.TheStatisticsRestServerBuilder - Started statistics rest server, bind address 0.0.0.0:9324
12:09:00.262 [main] INFO  org.elasticmq.server.Main$ - === ElasticMQ server (1.6.5) started in 9916 ms ===

We can also provide a separate logging configuration:

$ java -Dlogback.configurationFile=logback.xml -Dconfig.file=elasticmq.conf -jar elasticmq-server-1.6.5.jar

4. ElasticMQ Client

To create a client for an ElasticMQ instance, we can use the AWS SQS SDK library and create a thin wrapper for convenience.

First, we include the SQS SDK:

libraryDependencies += "software.amazon.awssdk" % "sqs" % "2.26.24"

We can check for the latest version on the MVN Repository page. Then we can create a client wrapper class:

class SQSAsyncClient(
  queueURL: String,
  region: String,
  endpoint: String
)(implicit ec: ExecutionContext):

Finally, we instantiate the SDK client with our configuration parameters:

private val sqsAsyncClient: SqsAsyncClient =
  SqsAsyncClient
    .builder()
    .region(Region.of(region))
    .credentialsProvider(
      AwsCredentialsProviderChain
        .builder()
        .credentialsProviders(
          StaticCredentialsProvider.create(
            AwsBasicCredentials.create(
              ElasticMQConfig.ELASTIC_MQ_ACCESS_KEY,
              ElasticMQConfig.ELASTIC_MQ_SECRET_ACCESS_KEY
            )
          )
        )
        .build()
    )
    .endpointOverride(URI.create(endpoint))
    .build()

4.1. Creating Queues

Queues can be created at the configuration stage or we can use a client instance to create a queue through the SQS REST API. By default, queues are LIFO (Last-In First-Out), meaning old messages have dequeue priority over new ones.

We use the client to create a queue:

def createStandardQueue(queueName: String): Future[CreateQueueResponse] =
  val request = CreateQueueRequest.builder.queueName(queueName).build

  sqsAsyncClient.createQueue(request).asScala

Additionally, we can modify the queue attributes to create a FIFO (First-In First-Out) queue:

final lazy val createFIFOQueueAttributes = Map(
  (QueueAttributeName.FIFO_QUEUE, "true")
).asJava

def createFIFOQueue(queueName: String): Future[CreateQueueResponse] =
  val createQueueRequest = CreateQueueRequest.builder
    .queueName(queueName)
    .attributes(createFIFOQueueAttributes)
    .build

  sqsAsyncClient.createQueue(createQueueRequest).asScala

4.2. Sending Messages

To use our client to send messages to a queue, we use a SendMessageRequest() method:

def sendMessage(message: String): Future[SendMessageResponse] =
  val request = SendMessageRequest
    .builder()
    .messageBody(message)
    .queueUrl(queueURL)
    .build()

  sqsAsyncClient.sendMessage(request).asScala

We can also send messages in a batch using a SendMessageBatchRequest():

def sendMessagesInBatch(
  messages: List[String]
): Future[SendMessageBatchResponse] =
  val batchRequestEntry = messages
    .map(
      SendMessageBatchRequestEntry
        .builder()
        .messageBody(_)
        .id(UUID.randomUUID().toString)
        .build()
    )
    .asJava
  val sendMessageBatchRequest = SendMessageBatchRequest
    .builder()
    .queueUrl(queueURL)
    .entries(batchRequestEntry)
    .build()

  sqsAsyncClient.sendMessageBatch(sendMessageBatchRequest).asScala

4.3. Receiving Messages

We can use our client to receive up to 10 messages at a time using a ReceiveMessageRequest():

def receiveMessages(
  maxNumberOfMessages: Int
): Future[ReceiveMessageResponse] =
  val receiveMessageRequest =
    ReceiveMessageRequest
      .builder()
      .maxNumberOfMessages(maxNumberOfMessages)
      .queueUrl(queueURL)
      .waitTimeSeconds(10)
      .build()

  sqsAsyncClient.receiveMessage(receiveMessageRequest).asScala

The maxNumberOfMessages setter must not exceed 10.

4.4. Deleting Messages

To delete messages with our client using a DeleteMessageRequest():

def deleteMessage(receiptHandle: String): Future[DeleteMessageResponse] =
  val deleteMessageRequest = DeleteMessageRequest
    .builder()
    .queueUrl(queueURL)
    .receiptHandle(receiptHandle)
    .build()

  sqsAsyncClient.deleteMessage(deleteMessageRequest).asScala

4.5. List Active Queues

To receive a list of active queues with our client, we use the listQueues() method:

def listQueues(): Future[ListQueuesResponse] =
  sqsAsyncClient.listQueues().asScala

4.6. Purge Active Queue

To purge an active queue with our client, we use a PurgeQueueRequest():

final lazy val purgeQueueRequest =
  PurgeQueueRequest.builder().queueUrl(queueURL).build()
def purgeQueue(): Future[PurgeQueueResponse] =
  sqsAsyncClient.purgeQueue(purgeQueueRequest).asScala

4.7. Example Workflow

To test a workflow, we start an embedded ElasticMQ REST server:

val endpoint = "http://localhost:9325"
val region = "elasticmq"

val server = SQSRestServerBuilder
  .withPort(9325)
  .withInterface("localhost")
  .start()

Then we create a client and workflow:

val elasticMQClient = new SQSAsyncClient(ElasticMQ_URL, region, endpoint)

val uselessWorkflow =
  for
    _ <- elasticMQClient.createStandardQueue("standardQueueForTest")
    testQueueClient = new SQSAsyncClient(
      ElasticMQ_URL + "standardQueueForTest",
      region,
      endpoint
    )
    _ <- testQueueClient.createFIFOQueue("fifoQueue.fifo")
    _ <- testQueueClient.listQueues()
    _ <- testQueueClient.sendMessage("Hi")
    _ <- testQueueClient.sendMessagesInBatch(
      List("Follow", "Baeldung", "on", "LinkedIn")
    )
    _ <- testQueueClient.receiveMessages(5)
    _ <- testQueueClient.purgeQueue()
  yield ()

Then we execute the workflow, logging any errors:

uselessWorkflow
  .andThen(_ => server.stopAndWait())
  .onComplete:
    case Success(_) => m_logger.info("queue created")
    case Failure(exception) =>
      m_logger.error(exception, "exception in uselessWorkflow")

Finally, the result is successful logs:

[info] running com.baeldung.elasticmq.ElasticMQService 
13:42:30.524 [default-pekko.actor.default-dispatcher-5] INFO  o.a.pekko.event.slf4j.Slf4jLogger - Slf4jLogger started
13:42:34.167 [elasticmq-pekko.actor.default-dispatcher-5] INFO  o.a.pekko.event.slf4j.Slf4jLogger - Slf4jLogger started
13:42:38.825 [elasticmq-pekko.actor.default-dispatcher-5] INFO  o.e.rest.sqs.TheSQSRestServerBuilder - Started SQS rest server, bind address localhost:9325, visible server address http://localhost:9325
13:42:38.992 [sbt-bg-threads-1] INFO  o.e.rest.sqs.TheSQSRestServerBuilder - Metrics MBean org.elasticmq:name=Queues successfully registered
13:42:44.536 [elasticmq-pekko.actor.default-dispatcher-5] INFO  o.elasticmq.actor.QueueManagerActor - Creating queue CreateQueueData(standardQueueForTest,None,None,None,None,None,None,false,false,None,None,Map())
13:42:45.399 [elasticmq-pekko.actor.default-dispatcher-9] INFO  o.elasticmq.actor.QueueManagerActor - Creating queue CreateQueueData(fifoQueue.fifo,None,None,None,None,None,None,true,false,None,None,Map())
13:42:47.753 [elasticmq-pekko.actor.default-dispatcher-13] INFO  o.elasticmq.actor.queue.QueueActor - standardQueueForTest: Clearing queue
13:42:47.987 [elasticmq-pekko.actor.default-dispatcher-11] INFO  o.a.pekko.actor.CoordinatedShutdown - Running CoordinatedShutdown with reason [ActorSystemTerminateReason]
13:42:48.085 [default-pekko.actor.default-dispatcher-5] INFO  o.apache.pekko.actor.ActorSystemImpl - queue created

5. Conclusion

In this article, we’ve had an overview of ElasticMQ and client interaction with Scala. First, we set up ElasticMQ standalone with a .jar file, then with docker and embedded ElasticMQ within a program. Secondly, we set up a client to create queues, send messages, receive messages, delete messages, and send other commands to the ElasticMQ server.

The code backing this article is available on GitHub. Once you're logged in as a Baeldung Pro Member, start learning and coding on the project.