Generic Top

Get started with Spring 5 and Spring Boot 2, through the Learn Spring course:

>> CHECK OUT THE COURSE

1. Introduction

In this article, we'll see how to set up Kafka Streams using Spring Boot. Kafka Streams is a client-side library built on top of Apache Kafka. It enables the processing of an unbounded stream of events in a declarative manner.

Some real-life examples of streaming data could be sensor data, stock market event streams, and system logs. For this tutorial, we'll build a simple word-count streaming application. Let's start with an overview of Kafka Streams and then set up the example along with its tests in Spring Boot.

2. Overview

Kafka Streams provides a duality between Kafka topics and relational database tables. It enables us to do operations like joins, grouping, aggregation, and filtering of one or more streaming events.

An important concept of Kafka Streams is that of processor topology. Processor topology is the blueprint of Kafka Stream operations on one or more event streams. Essentially, the processor topology can be considered as a directed acyclic graph. In this graph, nodes are categorized into source, processor, and sink nodes, whereas the edges represent the flow of the stream events.

The source at the top of the topology receives streaming data from Kafka, passes it down to the processor nodes where custom operations are performed, and flows out through the sink nodes to a new Kafka topic. Alongside the core processing, the state of the stream is saved periodically using checkpoints for fault tolerance and resilience.

3. Dependencies

We'll start by adding the spring-kafka and kafka-streams dependencies to our POM:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.7.8</version>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId
    <artifactId>kafka-streams</artifactId>
    <version>2.7.1</version>
</dependency> 

4. Example

Our sample application reads streaming events from an input Kafka topic. Once the records are read, it processes them to split the text and counts the individual words. Subsequently, it sends the updated word count to the Kafka output. In addition to the output topic, we'll also create a simple REST service to expose this count over an HTTP endpoint.

Overall, the output topic will be continuously updated with the words extracted from the input events and their updated counts.

4.1. Configuration

First, let's define the Kafka stream configuration in a Java config class:

@Configuration
@EnableKafka
@EnableKafkaStreams
public class KafkaConfig {

    @Value(value = "${spring.kafka.bootstrap-servers}")
    private String bootstrapAddress;

    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
    KafkaStreamsConfiguration kStreamsConfig() {
        Map<String, Object> props = new HashMap<>();
        props.put(APPLICATION_ID_CONFIG, "streams-app");
        props.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

        return new KafkaStreamsConfiguration(props);
    }

    // other config
}

Here, we've used the@EnableKafkaStreams annotation to autoconfigure the required components. This autoconfiguration requires a KafkaStreamsConfiguration bean with the name as specified by DEFAULT_STREAMS_CONFIG_BEAN_NAME. As a result, Spring Boot uses this configuration and creates a KafkaStreams client to manage our application lifecycle.

In our example, we've provided the application id, bootstrap server connection details, and SerDes (Serializer/Deserializer) for our configuration.

4.2. Topology

Now that we've set up the configuration, let's build the topology for our application to keep a count of the words from input messages:

@Component
public class WordCountProcessor {

    private static final Serde<String> STRING_SERDE = Serdes.String();

    @Autowired
    void buildPipeline(StreamsBuilder streamsBuilder) {
        KStream<String, String> messageStream = streamsBuilder
          .stream("input-topic", Consumed.with(STRING_SERDE, STRING_SERDE));

        KTable<String, Long> wordCounts = messageStream
          .mapValues((ValueMapper<String, String>) String::toLowerCase)
          .flatMapValues(value -> Arrays.asList(value.split("\\W+")))
          .groupBy((key, word) -> word, Grouped.with(STRING_SERDE, STRING_SERDE))
          .count();

        wordCounts.toStream().to("output-topic");
    }
}

Here, we've defined a configuration method and annotated it with @Autowired. Spring processes this annotation and wires a matching bean from the container into the StreamsBuilder argument. Alternately, we can also create a bean in the configuration class to generate the topology.

StreamsBuilder gives us access to all of the Kafka Streams APIs, and it becomes like a regular Kafka Streams application. In our example, we've used this high-level DSL to define the transformations for our application:

  • Create a KStream from the input topic using the specified key and value SerDes.
  • Create a KTable by transforming, splitting, grouping, and then counting the data.
  • Materialize the result to an output stream.

In essence, Spring Boot provides a very thin wrapper around Streams API while managing the lifecycle of our KStream instance. It creates and configures the required components for the topology and executes our Streams application. Importantly, this lets us focus on our core business logic while Spring manages the lifecycle.

4.3. REST Service

After defining our pipeline with the declarative steps, let's create the REST controller. This provides the endpoints in order to POST messages to the input topic and to GET the counts for the specified word. But importantly, the application retrieves data from the Kafka Streams state store rather than the output topic.

First, let's modify the KTable from earlier and materialize the aggregated counts as a local state store. This can then be queried from the REST controller:

KTable<String, Long> wordCounts = textStream
  .mapValues((ValueMapper<String, String>) String::toLowerCase)
  .flatMapValues(value -> Arrays.asList(value.split("\\W+")))
  .groupBy((key, value) -> value, Grouped.with(STRING_SERDE, STRING_SERDE))
  .count(Materialized.as("counts"));

After this, we can update our controller to retrieve the count value from this counts state store:

@GetMapping("/count/{word}")
public Long getWordCount(@PathVariable String word) {
    KafkaStreams kafkaStreams = factoryBean.getKafkaStreams();
    ReadOnlyKeyValueStore<String, Long> counts = kafkaStreams.store(
      StoreQueryParameters.fromNameAndType("counts", QueryableStoreTypes.keyValueStore())
    );
    return counts.get(word);
}

Here, the factoryBean is an instance of StreamsBuilderFactoryBean that is wired into the controller. This provides the KafkaStreams instance managed by this factory bean. Therefore, we can obtain the counts key/value state store that we created earlier, represented by KTable. At this point, we can use this to get the current count for the requested word from the local state store.

5. Testing

Testing is a crucial part of developing and verifying our application topology. Spring Kafka test library and Testcontainers both provide excellent support to test our application at various levels.

5.1. Unit Testing

First, let's set up a unit test for our topology using the TopologyTestDriver. This is the main test tool for testing a Kafka Streams application:

@Test
void givenInputMessages_whenProcessed_thenWordCountIsProduced() {
    StreamsBuilder streamsBuilder = new StreamsBuilder();
    wordCountProcessor.buildPipeline(streamsBuilder);
    Topology topology = streamsBuilder.build();

    try (TopologyTestDriver topologyTestDriver = new TopologyTestDriver(topology, new Properties())) {
        TestInputTopic<String, String> inputTopic = topologyTestDriver
          .createInputTopic("input-topic", new StringSerializer(), new StringSerializer());
        
        TestOutputTopic<String, Long> outputTopic = topologyTestDriver
          .createOutputTopic("output-topic", new StringDeserializer(), new LongDeserializer());

        inputTopic.pipeInput("key", "hello world");
        inputTopic.pipeInput("key2", "hello");

        assertThat(outputTopic.readKeyValuesToList())
          .containsExactly(
            KeyValue.pair("hello", 1L),
            KeyValue.pair("world", 1L),
            KeyValue.pair("hello", 2L)
          );
    }
}

Here, the first thing we need is the Topology that encapsulates our business logic under test from the WordCountProcessor. Now, we can use this with the TopologyTestDriver to create the input and output topics for our testing. Crucially, this eliminates the need to have a broker running and still verify the pipeline behavior. In other words, it makes it fast and easy to verify our pipeline behavior without using a real Kafka broker.

5.2. Integration Testing

Finally, let's use the Testcontainers framework to test our application end-to-end. This uses a running Kafka broker and starts up our application for a complete test:

@Testcontainers
@SpringBootTest(classes = KafkaStreamsApplication.class, webEnvironment = WebEnvironment.RANDOM_PORT)
class KafkaStreamsApplicationLiveTest {

    @Container
    private static final KafkaContainer KAFKA = new KafkaContainer(
      DockerImageName.parse("confluentinc/cp-kafka:5.4.3"));

    private final BlockingQueue<String> output = new LinkedBlockingQueue<>();

    // other test setup

    @Test
    void givenInputMessages_whenPostToEndpoint_thenWordCountsReceivedOnOutput() throws Exception {
        postMessage("test message");

        startOutputTopicConsumer();

        // assert correct counts on output topic
        assertThat(output.poll(2, MINUTES)).isEqualTo("test:1");
        assertThat(output.poll(2, MINUTES)).isEqualTo("message:1");

        // assert correct count from REST service
        assertThat(getCountFromRestServiceFor("test")).isEqualTo(1);
        assertThat(getCountFromRestServiceFor("message")).isEqualTo(1);
    }
}

Here, we've sent a POST to our REST controller, which, in turn, sends the message to the Kafka input topic. As part of the setup, we've also started a Kafka consumer. This listens asynchronously to the output Kafka topic and updates the BlockingQueue with the received word counts.

During the test execution, the application should process the input messages. Following on, we can verify the expected output both from the topic as well as the state store using the REST service.

6. Conclusion

In this tutorial, we've seen how to create a simple event-driven application to process messages with Kafka Streams and Spring Boot.

After a brief overview of core streaming concepts, we looked at the configuration and creation of a Streams topology. Then, we saw how to integrate this with the REST functionality provided by Spring Boot. Finally, we covered some approaches for effectively testing and verifying our topology and application behavior.

As always, the full source code is available over on GitHub.

Generic bottom

Get started with Spring 5 and Spring Boot 2, through the Learn Spring course:

>> CHECK OUT THE COURSE
Generic footer banner
Comments are closed on this article!