Generic Top

I just announced the new Learn Spring course, focused on the fundamentals of Spring 5 and Spring Boot 2:

>> CHECK OUT THE COURSE

1. Overview

Apache Kafka is a powerful, distributed, fault-tolerant stream processing system. In a previous tutorial, we learned how to work with Spring and Kafka.

In this tutorial, we'll build on the previous one and learn how to write reliable, self-contained integration tests that don't rely on an external Kafka server running.

First, we'll start but looking at how to use and configure an embedded instance of Kafka. Then we'll see how we can make use of the popular framework Testcontainers from our tests.

2. Dependencies

Of course, we'll need to add the standard spring-kafka dependency to our pom.xml:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.6.3.RELEASE</version>
</dependency>

Then we'll need two more dependencies specifically for our tests. First, we'll add the spring-kafka-test artifact:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka-test</artifactId>
    <version>2.6.3.RELEASE</version>
    <scope>test</scope>
</dependency>

And finally, we'll add the Testcontainers Kafka dependency, which is also available over on Maven Central:

<dependency>
    <groupId>org.testcontainers</groupId>
    <artifactId>kafka</artifactId>
    <version>1.15.0</version>
    <scope>test</scope>
</dependency>

Now that we have all the necessary dependencies configured, we can write a simple Spring Boot application using Kafka.

3. A Simple Kafka Producer-Consumer Application

Throughout this tutorial, the focus of our tests will be a simple producer-consumer Spring Boot Kafka application.

Let's start by defining our application entry point:

@SpringBootApplication
@EnableAutoConfiguration
public class KafkaProducerConsumerApplication {

    public static void main(String[] args) {
        SpringApplication.run(KafkaProducerConsumerApplication.class, args);
    }
}

As we can see, this is a standard Spring Boot application. Where possible, we want to make use of default configuration values. With this in mind, we make use of the @EnableAutoConfiguration annotation to auto-config our application.

3.1. Producer Setup

Next, let's consider a producer bean that we'll use to send messages to a given Kafka topic:

@Component
public class KafkaProducer {

    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducer.class);

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void send(String topic, String payload) {
        LOGGER.info("sending payload='{}' to topic='{}'", payload, topic);
        kafkaTemplate.send(topic, payload);
    }
}

Our KafkaProducer bean defined above is merely a wrapper around the KafkaTemplate class. This class provides high-level thread-safe operations, such as sending data to the provided topic, which is exactly what we do in our send method.

3.2. Consumer Setup

Likewise, we'll now define a simple consumer bean which will listen to a Kafka topic and receives messages:

@Component
public class KafkaConsumer {

    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumer.class);

    private CountDownLatch latch = new CountDownLatch(1);
    private String payload = null;

    @KafkaListener(topics = "${test.topic}")
    public void receive(ConsumerRecord<?, ?> consumerRecord) {
        LOGGER.info("received payload='{}'", consumerRecord.toString());
        setPayload(consumerRecord.toString());
        latch.countDown();
    }

    public CountDownLatch getLatch() {
        return latch;
    }

    public String getPayload() {
        return payload;
    }
}

Our simple consumer uses the @KafkaListener annotation on the receive method to listen to messages on a given topic. We'll see later how we configure the test.topic from our tests.

Furthermore, the receive method stores the message content in our bean and decrements the count of the latch variable. This variable is a simple thread-safe counter field that we'll use later from our tests to ensure we successfully received a message.

Now that we have our simple Kafka application using Spring Boot implemented let's see how we can write integration tests.

4. A Word on Testing

In general, when writing clean integration tests, we shouldn't depend on external services that we might not be able to control or might suddenly stop working. This could have adverse effects on our test results.

Similarly, if we're dependent on an external service, in this case, a running Kafka broker, we likely won't be able to set it up, control it and tear it down in the way we want from our tests.

4.1. Application Properties

We're going to use a very light set of application configuration properties from our tests. We'll define these properties in our src/test/resources/application.yml file:

spring:
  kafka:
    consumer:
      auto-offset-reset: earliest
      group-id: baeldung
test:
  topic: embedded-test-topic

This is the minimum set of properties that we need when working with an embedded instance of Kafka or a local broker.

Most of these are self-explanatory, but the one we should highlight of particular importance is the consumer property auto-offset-reset: earliest. This property ensures that our consumer group gets the messages we send because the container might start after the sends have completed.

Additionally, we configure a topic property with the value embedded-test-topic, which is the topic we'll use from our tests.

5. Testing Using Embedded Kafka

In this section, we'll take a look at how to use an in-memory Kafka instance to run our tests against. This is also known as Embedded Kafka.

The dependency spring-kafka-test we added previously contains some useful utilities to assist with testing our application. Most notably, it contains the EmbeddedKafkaBroker class.

With that in mind, let's go ahead and write our first integration test:

@SpringBootTest
@DirtiesContext
@EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" })
class EmbeddedKafkaIntegrationTest {

    @Autowired
    private KafkaConsumer consumer;

    @Autowired
    private KafkaProducer producer;

    @Value("${test.topic}")
    private String topic;

    @Test
    public void givenEmbeddedKafkaBroker_whenSendingtoSimpleProducer_thenMessageReceived() 
      throws Exception {
        producer.send(topic, "Sending with own simple KafkaProducer");
        consumer.getLatch().await(10000, TimeUnit.MILLISECONDS);
        
        assertThat(consumer.getLatch().getCount(), equalTo(0L));
        assertThat(consumer.getPayload(), containsString("embedded-test-topic"));
    }
}

Let's walk through the key parts of our test. First, we start by decorating our test class with two pretty standard Spring annotations:

  • The @SpringBootTest annotation will ensure that our test bootstraps the Spring application context
  • We also use the @DirtiesContext annotation, which will make sure this context is cleaned and reset between different tests

Here comes the crucial part, we use the @EmbeddedKafka annotation to inject an instance of an EmbeddedKafkaBroker into our tests. Moreover, there are several properties available we can use to configure the embedded Kafka node:

  • partitions – this is the number of partitions used per topic. To keep things nice and simple, we only want one to be used from our tests
  • brokerProperties – additional properties for the Kafka broker. Again we keep things simple and specify a plain text listener and a port number

Next, we auto-wire our consumer and producer classes and configure a topic to use the value from our application.properties.

For the final piece of the jigsaw, we simply send a message to our test topic and verify that the message has been received and contains the name of our test topic.

When we run our test, we'll see amongst the verbose Spring output:

...
12:45:35.099 [main] INFO  c.b.kafka.embedded.KafkaProducer -
  sending payload='Sending with our own simple KafkaProducer' to topic='embedded-test-topic'
...
12:45:35.103 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1]
  INFO  c.b.kafka.embedded.KafkaConsumer - received payload=
  'ConsumerRecord(topic = embedded-test-topic, partition = 0, leaderEpoch = 0, offset = 1,
  CreateTime = 1605267935099, serialized key size = -1, 
  serialized value size = 41, headers = RecordHeaders(headers = [], isReadOnly = false),
  key = null, value = Sending with our own simple KafkaProducer)'

This confirms that our test is working properly. Awesome! We now have a way to write self-contained, independent integration tests using an in-memory Kafka broker.

6. Testing Kafka With TestContainers

Sometimes we might see small differences between a real external service vs. an embedded in-memory instance of a service that has been specifically provided for testing purposes. Although unlikely, it could also be that the port used from our test might be occupied, causing a failure.

With that in mind, in this section, we'll see a variation on our previous approach to testing using the Testcontainers framework. We'll see how to instantiate and manage an external Apache Kafka broker hosted inside a Docker container from our integration test.

Let's define another integration test which will be quite similar to the one we saw in the previous section:

@RunWith(SpringRunner.class)
@Import(com.baeldung.kafka.testcontainers.KafkaTestContainersLiveTest.KafkaTestContainersConfiguration.class)
@SpringBootTest(classes = KafkaProducerConsumerApplication.class)
@DirtiesContext
public class KafkaTestContainersLiveTest {

    @ClassRule
    public static KafkaContainer kafka = 
      new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.4.3"));

    @Autowired
    private KafkaConsumer consumer;

    @Autowired
    private KafkaProducer producer;

    @Value("${test.topic}")
    private String topic;

    @Test
    public void givenKafkaDockerContainer_whenSendingtoSimpleProducer_thenMessageReceived() 
      throws Exception {
        producer.send(topic, "Sending with own controller");
        consumer.getLatch().await(10000, TimeUnit.MILLISECONDS);
        
        assertThat(consumer.getLatch().getCount(), equalTo(0L));
        assertThat(consumer.getPayload(), containsString("embedded-test-topic"));
    }
}

Let's take a look at the differences this time around. We're declaring the kafka field, which is a standard JUnit @ClassRule. This field is an instance of the KafkaContainer class that will prepare and manage the lifecycle of our container running Kafka.

To avoid port clashes, Testcontainers allocates a port number dynamically when our docker container starts. For this reason, we provide a custom consumer and producer factory configuration using the class KafkaTestContainersConfiguration:

@Bean
public Map<String, Object> consumerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "baeldung");
    // more standard configuration
    return props;
}

@Bean
public ProducerFactory<String, String> producerFactory() {
    Map<String, Object> configProps = new HashMap<>();
    configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
    // more standard configuration
    return new DefaultKafkaProducerFactory<>(configProps);
}

We then reference this configuration via the @Import annotation at the beginning of our test.

The reason for this is that we need a way to inject the server address into our application, which as previously mentioned, is generated dynamically. We achieve this by calling the getBootstrapServers() method, which will return the bootstrap server location:

bootstrap.servers = [PLAINTEXT://localhost:32789]

Now when we run our test, we should see that Testcontainers does several things:

  • Checks our local Docker setup.
  • Pulls the confluentinc/cp-kafka:5.4.3 docker image if necessary
  • Starts a new container and waits for it to be ready
  • Finally, shuts down and deletes the container after our test finishes

Again, this is confirmed by inspecting the test output:

13:33:10.396 [main] INFO  🐳 [confluentinc/cp-kafka:5.4.3]
  - Creating container for image: confluentinc/cp-kafka:5.4.3
13:33:10.454 [main] INFO  🐳 [confluentinc/cp-kafka:5.4.3]
  - Starting container with ID: b22b752cee2e9e9e6ade38e46d0c6d881ad941d17223bda073afe4d2fe0559c3
13:33:10.785 [main] INFO  🐳 [confluentinc/cp-kafka:5.4.3]
  - Container confluentinc/cp-kafka:5.4.3 is starting: b22b752cee2e9e9e6ade38e46d0c6d881ad941d17223bda073afe4d2fe0559c3

Presto! A working integration test using a Kafka docker container.

7. Conclusion

In this article, we've learned about a couple of approaches for testing Kafka applications with Spring Boot. In the first approach, we saw how to configure and use a local in-memory Kafka broker.

Then we saw how to use Testcontainers to set up an external Kafka broker running inside a docker container from our tests.

As always, the full source code of the article is available over on GitHub.

Generic bottom

I just announced the new Learn Spring course, focused on the fundamentals of Spring 5 and Spring Boot 2:

>> CHECK OUT THE COURSE
guest
0 Comments
Inline Feedbacks
View all comments