1. Introduction
When working with Kafka consumers, we often need to verify message flow through integration tests.
The EmbeddedKafkaBroker in Spring Kafka makes this easier by providing a reliable and self-contained in-memory Kafka broker.
However, as the test suite grows, each test class tends to start its own broker. This behavior slows down test execution and increases resource usage. In this article, we’ll learn how to reuse an embedded Kafka broker in multiple test classes to speed up integration tests.
2. Problem Statement
Spring’s EmbeddedKafkaBroker creates a new broker every time a test class loads the application context.
When a project contains several listeners or multiple test classes, each one ends up starting its own broker.
That approach leads to:
- Longer test startup time
- Extra overhead during CI builds
A shared broker, on the other hand, provides several clear advantages:
- Faster test execution since only one broker starts
- Consistent test behavior across multiple test classes
- Simpler resource cleanup
With this approach, we can reuse the embedded Kafka broker across multiple test classes to improve speed and consistency.
3. Reuse Embedded Kafka Broker in Multiple Test Classes
Let’s assume our application involves a simple order and payment listener. Each receives the message and logs the details along with broker information using the AdminClient API.
To help verify message reception during tests, both listeners maintain a static CountDownLatch, which helps us track when a message has been consumed.
Let’s start by creating the OrderListener:
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@KafkaListener(topics = "order")
public void receive(ConsumerRecord<String, String> consumerRecord) throws Exception {
try (AdminClient admin = AdminClient.create(Map.of(BOOTSTRAP_SERVERS_CONFIG, bootstrapServers))) {
LOG.info("Received customer order request [{}] from broker [{}]",
consumerRecord.value(),
admin.describeCluster().clusterId().get());
}
latch.countDown();
}
public static CountDownLatch getLatch() {
return latch;
}
Next, let’s create the PaymentListener:
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@KafkaListener(topics = "payment")
public void receive(ConsumerRecord<String, String> consumerRecord) throws Exception {
try (AdminClient admin = AdminClient.create(Map.of(BOOTSTRAP_SERVERS_CONFIG, bootstrapServers))) {
LOG.info("Received payment request [{}] from broker [{}]",
consumerRecord.value(),
admin.describeCluster().clusterId().get());
}
latch.countDown();
}
public static CountDownLatch getLatch() {
return latch;
}
Next, let’s define a simple utility class named EmbeddedKafkaHolder.
This singleton lazily initializes and exposes a single EmbeddedKafkaBroker instance:
public final class EmbeddedKafkaHolder {
private static final EmbeddedKafkaBroker embeddedKafka =
new EmbeddedKafkaKraftBroker(1, 1, "order", "payment")
.brokerListProperty("spring.kafka.bootstrap-servers");
private static volatile boolean started;
public static EmbeddedKafkaBroker getEmbeddedKafka() {
if (!started) {
synchronized (EmbeddedKafkaBroker.class) {
if (!started) {
try {
embeddedKafka.afterPropertiesSet();
} catch (Exception e) {
throw new KafkaException("Embedded broker failed to start", e);
}
started = true;
}
}
}
return embeddedKafka;
}
}
This utility sets up the test topics and guarantees that the embedded Kafka broker starts only once.
It achieves this by maintaining a volatile state and calling the afterPropertiesSet() method, which starts the broker with the configured properties.
Next, let’s use this setup in our tests to reuse the same broker.
We’ll start with the OrderListenerTest:
private static final EmbeddedKafkaBroker broker = EmbeddedKafkaHolder.getEmbeddedKafka();
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@DynamicPropertySource
static void kafkaProps(DynamicPropertyRegistry registry) {
registry.add("spring.kafka.bootstrap-servers", broker::getBrokersAsString);
}
@Test
void givenKafkaBroker_whenOrderMessageIsSent_thenListenerConsumesMessages() {
kafkaTemplate.send("order", "key", "{\"orderId\":%s}".formatted(UUID.randomUUID().toString()));
Awaitility.await()
.atMost(10, TimeUnit.SECONDS)
.pollInterval(500, TimeUnit.MILLISECONDS)
.until(() -> OrderListener.getLatch().getCount() == 0);
}
This test uses EmbeddedKafkaHolder.getEmbeddedKafka() to retrieve the shared broker instance.
It then injects the broker details into the spring.kafka.bootstrap-servers property so that the test application connects to it during startup.
The test sends a message to the order topic and waits until the countdown latch reaches zero.
This confirms that the listener has consumed the message and logged the broker details.
Similarly, we can use the same broker in any other test that requires it.
For example, let’s look at the PaymentListenerTest:
private static final EmbeddedKafkaBroker broker = EmbeddedKafkaHolder.getEmbeddedKafka();
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@DynamicPropertySource
static void kafkaProps(DynamicPropertyRegistry registry) {
registry.add("spring.kafka.bootstrap-servers", broker::getBrokersAsString);
}
@Test
void givenKafkaBroker_whenPaymentMessageIsSent_thenListenerConsumesMessages() {
kafkaTemplate.send("payment", "key", "{\"paymentId\":%s}".formatted(UUID.randomUUID().toString()));
Awaitility.await()
.atMost(10, TimeUnit.SECONDS)
.pollInterval(500, TimeUnit.MILLISECONDS)
.until(() -> PaymentListener.getLatch().getCount() == 0);
}
In the logs, we can see the same broker ID for both listeners, confirming that the broker is reused across tests:
[org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.b.kafka.sharedbroker.OrderListener - Received customer order request [{"orderId":c6e0c5b7-74c9-4dfb-8683-2c584148d21c}] from broker [PVU6g54OQ4GebLKOwjLVEw]
[org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] INFO c.b.k.sharedbroker.PaymentListener - Received payment request [{"paymentId":49f88030-9a6f-4424-a2bd-b47ee658c8c0}] from broker [PVU6g54OQ4GebLKOwjLVEw]
Although this example focuses on EmbeddedKafkaBroker, we can also use Testcontainers for a more production-like setup.
4. Conclusion
In this article, we explored an efficient way to reuse the same embedded Kafka broker across multiple test classes.
The code for this article is available over on GitHub.