1. Overview

In this article, we’ll discuss the need to publish messages within a @Transactional block and the associated performance challenges, such as prolonged database connection times. To tackle this, we’ll utilize Spring Modulith‘s features to listen to Spring application events and automatically publish them to a Kafka topic.

2. Transactional Operations and Message Brokers

For the code examples of this article, we’ll assume we’re writing the functionality responsible for saving an Article on Baeldung:

@Service
class Baeldung {
    private final ArticleRepository articleRepository;

    // constructor

    @Transactional
    public void createArticle(Article article) {
        validateArticle(article);
        article = addArticleTags(article);
        // ... other business logic
        
        articleRepository.save(article);
    }
}

Additionally, we’ll need to notify other parts of the system about this new Article. With this information, other modules or services will react accordingly, creating reports or sending newsletters to the website’s readers.

The easiest way to achieve this is to inject a dependency who knows how to publish this event. For our example, let’s use KafkaOperations to send a message to the “baeldung.articles.published” topic and use the Article‘s slug() as the key:

@Service
class Baeldung {
    private final ArticleRepository articleRepository;
    private final KafkaOperations<String, ArticlePublishedEvent> messageProducer;

    // constructor

    @Transactional
    public void createArticle(Article article) {
        // ... business logic
        validateArticle(article);
        article = addArticleTags(article);
        article = articleRepository.save(article);

        messageProducer.send(
          "baeldung.articles.published",
          article.slug(),
          new ArticlePublishedEvent(article.slug(), article.title())
        ).join();
    }
}

However, this approach is not ideal for a few different reasons. From a design point of view, we have coupled the domain service with the message producer. Moreover, the domain service directly depends on the lower-level component, breaking one of the fundamental Clean Architecture rules.

Furthermore, this approach will also have performance implications because everything is happening within a @Transactional method. As a result, the database connection acquired for saving the Article will be kept open until the message is successfully published.

Lastly, this solution also creates an error-prone relationship between persisting the data and publishing the message:

  • If the producer fails to publish the message, the transaction will be rolled back;
  • The transaction can eventually be rolled back even if the message was already published;

3. Dependency Inversion Using Spring Events

We can leverage Spring Events to improve the design of our solution. Our goal is to avoid publishing the messages to Kafka directly from our domain service. Let’s remove the KafkaOperations dependency and publish an internal application event instead:

@Service
public class Baeldung {
    private final ApplicationEventPublisher applicationEvents;
    private final ArticleRepository articleRepository;

    // constructor

    @Transactional
    public void createArticle(Article article) {
        // ... business logic
        validateArticle(article);
        article = addArticleTags(article);
        article = articleRepository.save(article);

        applicationEvents.publishEvent(
          new ArticlePublishedEvent(article.slug(), article.title()));
    }
}

In addition to this, we’ll have a dedicated Kafka producer as part of our infrastructure layer. This component will listen to the ArticlePublishedEvents and delegate the publishing to the underlying KafkaOperations bean:

@Component
class ArticlePublishedKafkaProducer {
    private final KafkaOperations<String, ArticlePublishedEvent> messageProducer;

    // constructor 

    @EventListener
    public void publish(ArticlePublishedEvent article) {
        Assert.notNull(article.slug(), "Article Slug must not be null!");
        messageProducer.send("baeldung.articles.published", article.splug(), event);
    }
}

With this abstraction, the infrastructure component now depends on the event produced by the domain service. In other words, we’ve managed to reduce the coupling and invert the source code dependency. Furthermore, if other modules are interested in the Article creation, they can now seamlessly listen to these application events and react accordingly.

On the other hand, the publish() method will be called from within the same transaction as our business logic. Indirectly, the two operations remain coupled relative to the fact that the failure of either can cause the failure or rollback of the other.

4. Atomic vs. Non-atomic Operations

Now, let’s delve into the performance considerations. To begin, we must determine whether rolling back when the communication with the message broker fails is the desired behavior. This choice varies based on the specific context.

In case we do not need this atomicity, it’s imperative to free the database connection and publish the events asynchronously. To simulate this, we can try to create an article without a slug, causing ArticlePublishedKafkaProducer::publish to fail:

@Test
void whenPublishingMessageFails_thenArticleIsStillSavedToDB() {
    var article = new Article(null, "Introduction to Spring Boot", "John Doe", "<p> Spring Boot is [...] </p>");

    baeldung.createArticle(article);

    assertThat(repository.findAll())
      .hasSize(1).first()
      .extracting(Article::title, Article::author)
      .containsExactly("Introduction to Spring Boot", "John Doe");
}

If we run the test now, it will fail. This happens because ArticlePublishedKafkaProducer throws an exception that will cause the domain service to roll back the transaction. However, we can make the event listener asynchronous by replacing the @EventListener annotation with @TransactionalEventListener and @Async:

@Async
@TransactionalEventListener
public void publish(ArticlePublishedEvent event) {
    Assert.notNull(event.slug(), "Article Slug must not be null!");
    messageProducer.send("baeldung.articles.published", event);
}

If we re-run the test now, we’ll notice that the exception is logged, the event was not published, and the entity is saved to the database. Moreover, the database connection was released sooner,  allowing other threads to use it.

5. Event Externalization With Spring Modulith

We successfully tackled the design and performance concerns of the original code example through a two-step approach:

  • Dependency inversion using Spring application events
  • Asynchronous publishing utilizing @TransactionalEventListener and @Async

 Spring Modulith allows us to further simplify our code, providing built-in support for this pattern. Let’s start by adding the maven dependencies for spring-modulith-events-api to our pom.xml:

<dependency>
    <groupId>org.springframework.modulith</groupId>
    <artifactId>spring-modulith-events-api</artifactId>
    <version>1.1.3</version>
</dependency>

This module can be configured to listen to application events and automatically externalize them to various message systems. We’ll stick to our original example and focus on Kafka. For this integration, we’ll need to add the spring-modulith-events-kafka dependency:

<dependency> 
    <groupId>org.springframework.modulith</groupId> 
    <artifactId>spring-modulith-events-kafka</artifactId> 
    <version>1.1.3</version>
    <scope>runtime</scope> 
</dependency>

Now, we need to update the ArticlePublishedEvent and annotate it with @Externalized. This annotation requires the name and the key of the routing target. In other words, the Kafka topic and the message key. For the key, we’ll use a SpEL expression that will invoke Article::slug():

@Externalized("baeldung.article.published::#{slug()}")
public record ArticlePublishedEvent(String slug, String title) {
}

6. The Event Publication Registry

As previously discussed, we still have an error-prone relationship between persisting the data and publishing the message –  ailing to publish the message causes the transaction to roll back. On the other hand, even if the message is successfully published, the transaction can still roll back later.

Spring Modulith’s event publication registry implements the “transactional outbox” pattern to tackle this problem, ensuring eventual consistency across the system. When a transactional operation happens, instead of immediately sending a message to an external system, the event is stored in an event publishing log within the same business transaction.

6.1. The Event Publishing Log

First, we’ll need to introduce the spring-modulith-starter dependency that corresponds to our persistence technology. We can consult the official documentation for a complete list of the supported starters. Since we use Spring Data JPA and a PostgreSQL database, we’ll add the spring-modulith-starter-jpa dependency:

<dependency>
    <groupId>org.springframework.modulith</groupId>
    <artifactId>spring-modulith-starter-jpa</artifactId>
    <version>1.1.2</version>
</dependency>

Additionally, we’ll enable Spring Modulith to create the “event_publication” table. This table contains the relevant data about the externalized application events. Let’s add the following property to our application.yml:

spring.modulith:
  events.jdbc-schema-initialization.enabled: true

Our setup uses Testcontainer to spin up a Docker Container with the PostgreSQL database. As a result, we can leverage the Testcontainers Desktop application to “freeze the container shutdown” and “open a terminal” attached to the container itself. Then, we can inspect the database using the following commands:

  • “psql -U test_user -d test_db” – to open the PostgreSQL interactive terminal
  • “\d” – to list the database objects
TestcontainersDesktop

As we can see, the “even_publication” table was successfully created. Let’s execute a query to see the events persisted by our tests:

testcontainer_events-1

On the first row, we can see the event created by our first test, which covered the happy flow. However, in the second test, we intentionally created an invalid event, by omitting “slug“, to simulate a failure during event publication. Since this Article was saved to the database but not successfully published, it appears in the events_publication table with a missing completion_date.

6.2. Resubmitting Events

We can enable Spring Modulith to automatically resubmit events upon application restart through the republish-outstanding-events-on-restart property:

spring.modulith:
  republish-outstanding-events-on-restart: true

Furthermore, we can use the IncompleteEventPublications bean to programmatically re-submit the failed events older than a given time:

@Component
class EventPublications {
    private final IncompleteEventPublications incompleteEvents;
    private final CompletedEventPublications completeEvents;

    // constructor

    void resubmitUnpublishedEvents() {
        incompleteEvents.resubmitIncompletePublicationsOlderThan(Duration.ofSeconds(60));
    }
}

Similarly, we can use the CompletedEventPublications bean to easily query or clear the event_publications table:

void clearPublishedEvents() {
    completeEvents.deletePublicationsOlderThan(Duration.ofSeconds(60));
}

7. Event Externalization Configuration

Even though the @Externalized annotation’s value is useful for concise SpEL expressions, there are situations where we might want to avoid using it:

  • In cases where the expression becomes overly complex
  • When we aim to separate information about the topic from the application event
  • If we want distinct models for the application event and the externalized event

For these use cases, we can configure the necessary routing and event mapping using EventExternalizationConfiguration’s builder. After that, we simply need to expose this configuration as a Spring bean:

@Bean
EventExternalizationConfiguration eventExternalizationConfiguration() {
    return EventExternalizationConfiguration.externalizing()
      .select(EventExternalizationConfiguration.annotatedAsExternalized())
      .route(
        ArticlePublishedEvent.class,
        it -> RoutingTarget.forTarget("baeldung.articles.published").andKey(it.slug())
      )
      .mapping(
        ArticlePublishedEvent.class,
        it -> new PostPublishedKafkaEvent(it.slug(), it.title())
      )
      .build();
}

The EventExternalizationConfiguration enables us to define the routing and mapping of application events in a declarative way. Moreover, it lets us handle various types of application events. For example, if we need to handle an additional event like with “WeeklySummaryPublishedEvent”, we can easily do it by adding one more type-specific routing and mapping:

@Bean
EventExternalizationConfiguration eventExternalizationConfiguration() {
    return EventExternalizationConfiguration.externalizing()
      .select(EventExternalizationConfiguration.annotatedAsExternalized())
      .route(
        ArticlePublishedEvent.class,
        it -> RoutingTarget.forTarget("baeldung.articles.published").andKey(it.slug())
      )
      .mapping(
        ArticlePublishedEvent.class,
        it -> new PostPublishedKafkaEvent(it.slug(), it.title())
      )
      .route(
        WeeklySummaryPublishedEvent.class,
        it -> RoutingTarget.forTarget("baeldung.articles.published").andKey(it.handle())
      )
      .mapping(
        WeeklySummaryPublishedEvent.class,
        it -> new PostPublishedKafkaEvent(it.handle(), it.heading())
      )
      .build();
}

As we observe, the mappings and routings need two things: the type itself and a function to resolve the Kafka topic and payload. In our example, both application events will be mapped to a common type and sent to the same topic.

Additionally, since we now declare the routing in the configuration, we can remove this information from the event itself. Consequently, the event will only have the @Externalized annotation, with no value:

@Externalized
public record ArticlePublishedEvent(String slug, String title) {
}

@Externalized
public record WeeklySummaryPublishedEvent(String handle, String heading) {
}

8. Conclusion

In this article, we discussed the scenarios that require us to publish a message from within a transactional block. We discovered that this pattern can have big performance implications because it can block the database connection for a longer time.

After that, we used Spring Modulith’s features to listen to Spring application events and automatically publish them to a Kafka topic. This approach allowed us to externalize the events asynchronously and free the database connection sooner.

The complete source code can be found over on GitHub.

Course – RWSB – NPI (cat=REST/Spring/Spring Boot)
announcement - icon

Now that the new version of REST With Spring - “REST With Spring Boot” is finally out, the current price will be available until the 22nd of June, after which it will permanently increase by 50$

>> GET ACCESS NOW

Course – LS (cat=Spring)
announcement - icon

Get started with Spring Boot and with core Spring, through the Learn Spring course:

>> CHECK OUT THE COURSE

res – REST with Spring (eBook) (everywhere)
Comments are open for 30 days after publishing a post. For any issues past this date, use the Contact form on the site.