Course – LS – All
announcement - icon

Get started with Spring Boot and with core Spring, through the Learn Spring course:

>> CHECK OUT THE COURSE

1. Overview

Spring Cloud AWS is a project that aims to simplify interacting with AWS services. Simple Queue Service (SQS) is an AWS solution for sending and receiving asynchronous messages in a scalable way.

In this tutorial, we’ll reintroduce the Spring Cloud AWS SQS integration, which has been completely rewritten for Spring Cloud AWS 3.0.

The framework provides familiar Spring abstractions for handling SQS queues, such as SqsTemplate and the @SqsListener annotation.

We’ll walk through an event-driven scenario with examples for sending and receiving messages, and show strategies for setting up integration tests with Testcontainers, a tool for managing disposable docker containers, and LocalStack, which simulates an AWS-like environment locally for testing our logic.

2. Dependencies

The Spring Cloud AWS Bill of Materials (BOM) ensures compatible versions between projects. It declares versions for many dependencies, including Spring Boot, and should be used instead of Spring Boot’s own BOM.

Here’s how to import it in our pom.xml file:

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>io.awspring.cloud</groupId>
            <artifactId>spring-cloud-aws</artifactId>
            <version>3.0.4</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

The main dependency we’ll need is SQS Starter, which contains all SQS-related classes for the project. The SQS integration has no dependency to Spring Boot and can be used standalone in any standard Java application:

<dependency>
    <groupId>io.awspring.cloud</groupId>
    <artifactId>spring-cloud-aws-starter-sqs</artifactId>
</dependency>

For Spring Boot applications such as the one we’re building in this tutorial, we should add the project’s Core Starter, as it allows us to leverage Spring Boot’s auto-configuration for SQS, and AWS configuration such as credentials and region:

<dependency>
    <groupId>io.awspring.cloud</groupId>
    <artifactId>spring-cloud-aws-starter</artifactId>
</dependency>

3. Setting up a Local Test Environment

In this section, we’ll walk through setting up LocalStack environment with Testcontainers to test our code in our local environment. Note that the examples in this tutorial can also be executed targeting AWS directly.

3.1. Dependencies

For running LocalStack and TestContainers with JUnit 5, we’ll need two additional dependencies:

<dependency>
    <groupId>org.testcontainers</groupId>
    <artifactId>localstack</artifactId>
    <scope>test</scope>
</dependency>
<dependency>
    <groupId>org.testcontainers</groupId>
    <artifactId>junit-jupiter</artifactId>
    <scope>test</scope>
</dependency>

Let’s also include the awaitility library to help us assert the asynchronous message consumption:

<dependency>
    <groupId>org.awaitility</groupId>
    <artifactId>awaitility</artifactId>
    <scope>test</scope>
</dependency>

3.2. Configuration

We’ll now create a class with the logic for managing our containers, which can be inherited by our test suites. Let’s name it BaseSqsIntegrationTests. For each test suite that extends this class, Testcontainers will create and start a new container, which is essential for isolating each suite’s data from one another.

The @SpringBootTest annotation is necessary for the Spring Context to be initialized, and the @Testcontainers annotation associates our Testcontainers annotations with JUnit’s runtime so that containers start when the test suite runs and stop after tests are complete:

@SpringBootTest
@Testcontainers
public class BaseSqsIntegrationTest {
   // Our test configuration will be added here
}

Let’s now declare the LocalStackContainer. The @Container annotation is also necessary for the framework to automatically manage the container’s lifecycle:

private static final String LOCAL_STACK_VERSION = "localstack/localstack:2.3.2";

@Container
static LocalStackContainer localstack = new LocalStackContainer(DockerImageName.parse(LOCAL_STACK_VERSION));

Finally, we’ll bind the properties the Spring Cloud AWS framework uses for auto-configuration with LocalStack. We’ll fetch the container port and hosts at runtime since Testcontainers will provide us with a random port, which is great for parallel testing. We can use the @DynamicPropertySource annotation for that:

@DynamicPropertySource
static void overrideProperties(DynamicPropertyRegistry registry) {
    registry.add("spring.cloud.aws.region.static", () -> localStack.getRegion());
    registry.add("spring.cloud.aws.credentials.access-key", () -> localStack.getAccessKey());
    registry.add("spring.cloud.aws.credentials.secret-key", () -> localStack.getSecretKey());
    registry.add("spring.cloud.aws.sqs.endpoint", () -> localStack.getEndpointOverride(SQS)
      .toString());
    // ...other AWS services endpoints can be added here
}

This is all we need to implement a Spring Boot test with LocalStack, Testcontainers, and Spring Cloud AWS. We also need to make sure the Docker engine is running in our local environment before running the tests.

4. Setting up the Queue Names

We can set up the queue names by leveraging Spring Boot’s application.yml property mechanism.

For this tutorial, we’ll create three queues:

events:
  queues:
    user-created-by-name-queue: user_created_by_name_queue
    user-created-record-queue: user_created_record_queue
    user-created-event-type-queue: user_created_event_type_queue

Let’s create a POJO to represent these properties:

@ConfigurationProperties(prefix = "events.queues")
public class EventQueuesProperties {

    private String userCreatedByNameQueue;
    private String userCreatedRecordQueue;
    private String userCreatedEventTypeQueue;

    // getters and setters
}

Finally, we need to use the @EnableConfigurationProperties annotation in a @Configuration annotated class, or the main Spring Application class, to let Spring Boot know we want to populate it with our application.yml properties:

@SpringBootApplication
@EnableConfigurationProperties(EventQueuesProperties.class)
public class SpringCloudAwsApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringCloudAwsApplication.class, args);
    }
}

Now, we’re ready to inject either the values themselves or the POJO when we need the queue names.

By default, Spring Cloud AWS SQS will create the queues if they’re not found, which helps us quickly set up dev environments. In production, the application should not have permission to create queues, so it will fail to start if a queue is not found. The framework can also be configured to explicitly fail if a queue is not found instead.

5. Sending and Receiving Messages

There are multiple ways of sending and receiving messages to and from SQS using Spring Cloud AWS. Here, we’ll cover the most common ones, using the SqsTemplate for sending messages and the @SqsListener annotation for receiving them.

5.1. Scenario

In our scenario, we’ll simulate an event-driven application that responds to UserCreatedEvent by saving relevant information in its local repository.

Let’s create a User entity:

public record User(String id, String name, String email) {
}

And let’s create a simple in-memory UserRepository:

@Repository
public class UserRepository {

    private final Map<String, User> persistedUsers = new ConcurrentHashMap<>();

    public void save(User userToSave) {
        persistedUsers.put(userToSave.id(), userToSave);
    }

    public Optional<User> findById(String userId) {
        return Optional.ofNullable(persistedUsers.get(userId));
    }

    public Optional<User> findByName(String name) {
        return persistedUsers.values().stream()
          .filter(user -> user.name().equals(name))
          .findFirst();
    }
}

And finally, let’s create a UserCreatedEvent Java Record class:

public record UserCreatedEvent(String id, String username, String email) {
}

5.2. Setup

To test our scenarios, we’ll create a SpringCloudAwsSQSLiveTest class that extends the BaseSqsIntegrationTest file we created earlier. We’ll autowire three dependencies: the SqsTemplate that’s auto-configured by the framework, the UserRepository so we can assert our message processing worked, and our EventQueuesProperties POJO with the queue names:

public class SpringCloudAwsSQSLiveTest extends BaseSqsIntegrationTest {

    private static final Logger logger = LoggerFactory.getLogger(SpringCloudAwsSQSLiveTest.class);

    @Autowired
    private SqsTemplate sqsTemplate;

    @Autowired
    private UserRepository userRepository;

    @Autowired
    private EventQueuesProperties eventQueuesProperties;

   // ...
}

To contain our listeners, let’s create a UserEventListeners class and declare it as a Spring @Component:

@Component
public class UserEventListeners {

    private static final Logger logger = LoggerFactory.getLogger(UserEventListeners.class);

    public static final String EVENT_TYPE_CUSTOM_HEADER = "eventType";

    private final UserRepository userRepository;

    public UserEventListeners(UserRepository userRepository) {
        this.userRepository = userRepository;
    }

    // Our listeners will be added here 
}

5.3. String Payloads

In this first example, we’ll send a message with a String payload, receive it in our listener, and persist it to our repository. We’ll then poll the repository to make sure our application persists the data correctly.

First, let’s create a test for sending the message in our test class:

@Test
void givenAStringPayload_whenSend_shouldReceive() {
    // given
    var userName = "Albert";

    // when
    sqsTemplate.send(to -> to.queue(eventQueuesProperties.getUserCreatedByNameQueue())
      .payload(userName));
    logger.info("Message sent with payload {}", userName);

    // then
    await().atMost(Duration.ofSeconds(3))
      .until(() -> userRepository.findByName(userName)
        .isPresent());
}

We should see a log similar to:

INFO [ main] c.b.s.c.a.sqs.SpringCloudAwsSQSLiveTest : Message sent with payload Albert

And then, note that the test fails because we don’t have the listener for this queue yet.

Let’s set up our listener to consume the message from this queue in our listener class and make the test pass:

@SqsListener("${events.queues.user-created-by-name-queue}")
public void receiveStringMessage(String username) {
    logger.info("Received message: {}", username);
    userRepository.save(new User(UUID.randomUUID()
      .toString(), username, null));
}

Now, when we run the test, we should see the result in the log:

INFO [ntContainer#0-1] c.b.s.cloud.aws.sqs.UserEventListeners : Received message: Albert

And the test passes.

Note that we’re using Spring’s properties-resolving capabilities to fetch the queue name from the application.yml we created earlier.

5.4. POJO and Record Payloads

Now that we’ve sent and received a String payload, let’s set up a scenario with a Java Record, the UserCreatedEvent we created earlier.

First, let’s write our failing test:

@Test
void givenARecordPayload_whenSend_shouldReceive() {
    // given
    String userId = UUID.randomUUID()
      .toString();
    var payload = new UserCreatedEvent(userId, "John", "[email protected]");

    // when
    sqsTemplate.send(to -> to.queue(eventQueuesProperties.getUserCreatedRecordQueue())
      .payload(payload));

    // then
    logger.info("Message sent with payload: {}", payload);
    await().atMost(Duration.ofSeconds(3))
      .until(() -> userRepository.findById(userId)
        .isPresent());
}

We should see a log similar to this before the test fails:

INFO [ main] c.b.s.c.a.sqs.SpringCloudAwsSQSLiveTest : Message sent with payload: UserCreatedEvent[id=67f52cf6-c750-4200-9a02-345bda0516f8, username=John, [email protected]]

Now, let’s create the corresponding listener to make the test pass:

@SqsListener("${events.queues.user-created-record-queue}")
public void receiveRecordMessage(UserCreatedEvent event) {
    logger.info("Received message: {}", event);
    userRepository.save(new User(event.id(), event.username(), event.email()));
}

We’ll see the output noting the message was received, and the test passes:

INFO [ntContainer#1-1] c.b.s.cloud.aws.sqs.UserEventListeners   : Received message: UserCreatedEvent[id=2d66df3d-2dbd-4aed-8fc0-ddd08416ed12, username=John, [email protected]]

The framework will auto-configure any ObjectMapper bean available in the Spring Context to handle the serialization and deserialization of messages. We can configure our own ObjectMapper and customize serialization in several ways, but that’s beyond the scope of this tutorial.

5.5. Spring Message and Headers

In this last scenario, we’ll send a Record with a custom header and receive the message as a Spring Message instance, as well as both the custom header we added and a standard SQS header in the method signature. The framework automatically converts all SQS message attributes to message headers, including any provided by the user.

Let’s create the failing test first:

@Test
void givenCustomHeaders_whenSend_shouldReceive() {
    // given
    String userId = UUID.randomUUID()
      .toString();
    var payload = new UserCreatedEvent(userId, "John", "[email protected]");
    var headers = Map.<String, Object> of(EVENT_TYPE_CUSTOM_HEADER, "UserCreatedEvent");

    // when
    sqsTemplate.send(to -> to.queue(eventQueuesProperties.getUserCreatedEventTypeQueue())
      .payload(payload)
      .headers(headers));

    // then
    logger.info("Sent message with payload {} and custom headers: {}", payload, headers);
    await().atMost(Duration.ofSeconds(3))
      .until(() -> userRepository.findById(userId)
        .isPresent());
}

The test should generate a log similar to this before failing:

INFO [ main] c.b.s.c.a.sqs.SpringCloudAwsSQSLiveTest  : Sent message with payload UserCreatedEvent[id=575de854-82de-44e4-8dfe-8fdc9f6ae4a1, username=John, [email protected]] and custom headers: {eventType=UserCreatedEvent}

Now, let’s add the corresponding listener to make the test pass:

@SqsListener("${events.queues.user-created-event-type-queue}")
public void customHeaderMessage(Message<UserCreatedEvent> message, @Header(EVENT_TYPE_CUSTOM_HEADER) String eventType,
    @Header(SQS_APPROXIMATE_FIRST_RECEIVE_TIMESTAMP) Long firstReceive) {
    logger.info("Received message {} with event type {}. First received at approximately {}.", message, eventType, firstReceive);
    UserCreatedEvent payload = message.getPayload();
    userRepository.save(new User(payload.id(), payload.username(), payload.email()));
}

And we’ll see the output when we re-run our test, indicating success:

INFO [ntContainer#2-1] c.b.s.cloud.aws.sqs.UserEventListeners   : Received message GenericMessage [payload=UserCreatedEvent[id=575de854-82de-44e4-8dfe-8fdc9f6ae4a1, username=John, [email protected]], headers=...

In this example, we’re receiving a Message with the deserialized UserCreatedEvent record as the payload and two headers. To ensure consistency throughout the project, we should use the SqsHeader class constants for retrieving SQS standard headers.

6. Conclusion

In this article, we used an event-driven scenario to go through different examples for sending and receiving messages with Spring Cloud AWS SQS 3.0.

We set up a local environment with LocalStack and TestContainers and configured the framework to use the proper local configuration for our integration tests.

As usual, the complete code used in this tutorial is available over on GitHub.

Course – LS – All
announcement - icon

Get started with Spring Boot and with core Spring, through the Learn Spring course:

>> CHECK OUT THE COURSE

res – Microservices (eBook) (cat=Cloud/Spring Cloud)
1 Comment
Oldest
Newest
Inline Feedbacks
View all comments
Comments are open for 30 days after publishing a post. For any issues past this date, use the Contact form on the site.