Course – LS – NPI (cat=Spring)
announcement - icon

Get started with Spring and Spring Boot, through the reference Learn Spring course:

>> LEARN SPRING

1. Overview

In this article, we’ll discuss the need to publish messages within a @Transactional block and the associated performance challenges, such as prolonged database connection times. To tackle this, we’ll utilize Spring Modulith‘s features to listen to Spring application events and automatically publish them to a Kafka topic.

2. Transactional Operations and Message Brokers

For the code examples of this article, we’ll assume we’re writing the functionality responsible for saving an Article on Baeldung:

@Service
class Baeldung {
    private final ArticleRepository articleRepository;

    // constructor

    @Transactional
    public void createArticle(Article article) {
        validateArticle(article);
        article = addArticleTags(article);
        // ... other business logic
        
        articleRepository.save(article);
    }
}

Additionally, we’ll need to notify other parts of the system about this new Article. With this information, other modules or services will react accordingly, creating reports or sending newsletters to the website’s readers.

The easiest way to achieve this is to inject a dependency who knows how to publish this event. For our example, let’s use KafkaOperations to send a message to the “baeldung.articles.published” topic and use the Article‘s slug() as the key:

@Service
class Baeldung {
    private final ArticleRepository articleRepository;
    private final KafkaOperations<String, ArticlePublishedEvent> messageProducer;

    // constructor

    @Transactional
    public void createArticle(Article article) {
        // ... business logic
        validateArticle(article);
        article = addArticleTags(article);
        article = articleRepository.save(article);

        messageProducer.send(
          "baeldung.articles.published",
          article.slug(),
          new ArticlePublishedEvent(article.slug(), article.title())
        ).join();
    }
}

However, this approach is not ideal for a few different reasons. From a design point of view, we have coupled the domain service with the message producer. Moreover, the domain service directly depends on the lower-level component, breaking one of the fundamental Clean Architecture rules.

Furthermore, this approach will also have performance implications because everything is happening within a @Transacional method. As a result, the database connection acquired for saving the Article will be kept open until the message is successfully published.

Lastly, saving the entity and publishing the message will be done as an atomic operation. In other words, if the producer fails to publish the event, the database transaction will be rolled back.

3. Dependency Inversion Using Spring Events

We can leverage Spring Events to improve the design of our solution. Our goal is to avoid publishing the messages to Kafka directly from our domain service. Let’s remove the KafkaOperations dependency and publish an internal application event instead:

@Service
public class Baeldung {
    private final ApplicationEventPublisher applicationEvents;
    private final ArticleRepository articleRepository;

    // constructor

    @Transactional
    public void createArticle(Article article) {
        // ... business logic
        validateArticle(article);
        article = addArticleTags(article);
        article = articleRepository.save(article);

        applicationEvents.publishEvent(
          new ArticlePublishedEvent(article.slug(), article.title()));
    }
}

In addition to this, we’ll have a dedicated Kafka producer as part of our infrastructure layer. This component will listen to the ArticlePublishedEvents and delegate the publishing to the underlying KafkaOperations bean:

@Component
class ArticlePublishedKafkaProducer {
    private final KafkaOperations<String, ArticlePublishedEvent> messageProducer;

    // constructor 

    @EventListener
    public void publish(ArticlePublishedEvent article) {
        Assert.notNull(article.slug(), "Article Slug must not be null!");
        messageProducer.send("baeldung.articles.published", article.splug(), event);
    }
}

With this abstraction, the infrastructure component now depends on the event produced by the domain service. In other words, we’ve managed to reduce the coupling and invert the source code dependency. Furthermore, if other modules are interested in the Article creation, they can now seamlessly listen to these application events and react accordingly.

4. Atomic Vs. Non-atomic Operations

Now, let’s delve into the performance considerations. To begin, we must determine whether rolling back when the communication with the message broker fails is the desired behavior. This choice varies based on the specific context.

In case we do not need this atomicity, it’s imperative to free the database connection and publish the events asynchronously. To simulate this, we can try to create an article without a slug, causing ArticlePublishedKafkaProducer::publish to fail:

@Test
void whenPublishingMessageFails_thenArticleIsStillSavedToDB() {
    var article = new Article(null, "Introduction to Spring Boot", "John Doe", "<p> Spring Boot is [...] </p>");

    baeldung.createArticle(article);

    assertThat(repository.findAll())
      .hasSize(1).first()
      .extracting(Article::title, Article::author)
      .containsExactly("Introduction to Spring Boot", "John Doe");
}

If we run the test now, it will fail. This happens because ArticlePublishedKafkaProducer throws an exception that will cause the domain service to roll back the transaction. However, we can make the event listener asynchronous by replacing the @EventListener annotation with @TransactionalEventListener and @Async:

@Async
@TransactionalEventListener
public void publish(ArticlePublishedEvent event) {
    Assert.notNull(event.slug(), "Article Slug must not be null!");
    messageProducer.send("baeldung.articles.published", event);
}

If we re-run the test now, we’ll notice that the exception is logged, the event was not published, and the entity is saved to the database. Moreover, the database connection was released sooner,  allowing other threads to use it.

5. Event Externalization with Spring Modulith

We successfully tackled the design and performance concerns of the original code example through a two-step approach:

  • Dependency inversion using Spring application events
  • Asynchronous publishing utilizing @TransactionalEventListener and @Async

 Spring Modulith allows us to further simplify our code, providing built-in support for this pattern. Let’s start by adding the maven dependencies for spring-modulith-events-api to our pom.xml:

<dependency>
    <groupId>org.springframework.modulith</groupId>
    <artifactId>spring-modulith-events-api</artifactId>
    <version>1.1.2</version>
</dependency>

This module can be configured to listen to application events and automatically externalize them to various message systems. We’ll stick to our original example and focus on Kafka. For this integration, we’ll need to add the spring-modulith-events-kafka dependency:

<dependency> 
    <groupId>org.springframework.modulith</groupId> 
    <artifactId>spring-modulith-events-kafka</artifactId> 
    <version>1.1.2</version> 
</dependency>

Now, we need to update the ArticlePublishedEvent and annotate it with @Externalized. This annotation requires the name and the key of the routing target. In other words, the Kafka topic and the message key. For the key, we’ll use a SpEL expression that will invoke Article::slug():

@Externalized("baeldung.article.published::#{slug()}")
public record ArticlePublishedEvent(String slug, String title) {
}

6. Event Externalization Configuration

Even though the @Externalized annotation’s value is useful for concise SpEL expressions, there are situations where we might want to avoid using it:

  • In cases where the expression becomes overly complex
  • When we aim to separate information about the topic from the application event
  • If we want distinct models for the application event and the externalized event

For these use cases, we can configure the necessary routing and event mapping using EventExternalizationConfiguration’s builder. After that, we simply need to expose this configuration as a Spring bean:

@Bean
EventExternalizationConfiguration eventExternalizationConfiguration() {
    return EventExternalizationConfiguration.externalizing()
      .select(EventExternalizationConfiguration.annotatedAsExternalized())
      .route(
        ArticlePublishedEvent.class,
        it -> RoutingTarget.forTarget("baeldung.articles.published").andKey(it.slug())
      )
      .mapping(
        ArticlePublishedEvent.class,
        it -> new ArticlePublishedKafkaEvent(it.slug(), it.title())
      )
      .build();
}

In this case, we’ll remove the routing information from the ArticlePublishedEvent and keep the @Externalized annotation with no value:

@Externalized
public record ArticlePublishedEvent(String slug, String title) {
}

7. Conclusions

In this article, we discussed the scenarios that require us to publish a message from within a transactional block. We discovered that this pattern can have big performance implications because it can block the database connection for a longer time.

After that, we used Spring Modulith’s features to listen to Spring application events and automatically publish them to a Kafka topic. This approach allowed us to externalize the events asynchronously and free the database connection sooner.

The complete source code can be found over on GitHub.

Course – LS (cat=Spring)

Get started with Spring and Spring Boot, through the Learn Spring course:

>> THE COURSE
res – REST with Spring (eBook) (everywhere)
Subscribe
Notify of
guest
0 Comments
Inline Feedbacks
View all comments