eBook – Guide Spring Cloud – NPI EA (cat=Spring Cloud)
announcement - icon

Let's get started with a Microservice Architecture with Spring Cloud:

>> Join Pro and download the eBook

eBook – Mockito – NPI EA (tag = Mockito)
announcement - icon

Mocking is an essential part of unit testing, and the Mockito library makes it easy to write clean and intuitive unit tests for your Java code.

Get started with mocking and improve your application tests using our Mockito guide:

Download the eBook

eBook – Java Concurrency – NPI EA (cat=Java Concurrency)
announcement - icon

Handling concurrency in an application can be a tricky process with many potential pitfalls. A solid grasp of the fundamentals will go a long way to help minimize these issues.

Get started with understanding multi-threaded applications with our Java Concurrency guide:

>> Download the eBook

eBook – Reactive – NPI EA (cat=Reactive)
announcement - icon

Spring 5 added support for reactive programming with the Spring WebFlux module, which has been improved upon ever since. Get started with the Reactor project basics and reactive programming in Spring Boot:

>> Join Pro and download the eBook

eBook – Java Streams – NPI EA (cat=Java Streams)
announcement - icon

Since its introduction in Java 8, the Stream API has become a staple of Java development. The basic operations like iterating, filtering, mapping sequences of elements are deceptively simple to use.

But these can also be overused and fall into some common pitfalls.

To get a better understanding on how Streams work and how to combine them with other language features, check out our guide to Java Streams:

>> Join Pro and download the eBook

eBook – Jackson – NPI EA (cat=Jackson)
announcement - icon

Do JSON right with Jackson

Download the E-book

eBook – HTTP Client – NPI EA (cat=Http Client-Side)
announcement - icon

Get the most out of the Apache HTTP Client

Download the E-book

eBook – Maven – NPI EA (cat = Maven)
announcement - icon

Get Started with Apache Maven:

Download the E-book

eBook – Persistence – NPI EA (cat=Persistence)
announcement - icon

Working on getting your persistence layer right with Spring?

Explore the eBook

eBook – RwS – NPI EA (cat=Spring MVC)
announcement - icon

Building a REST API with Spring?

Download the E-book

Course – LS – NPI EA (cat=Jackson)
announcement - icon

Get started with Spring and Spring Boot, through the Learn Spring course:

>> LEARN SPRING
Course – RWSB – NPI EA (cat=REST)
announcement - icon

Explore Spring Boot 3 and Spring 6 in-depth through building a full REST API with the framework:

>> The New “REST With Spring Boot”

Course – LSS – NPI EA (cat=Spring Security)
announcement - icon

Yes, Spring Security can be complex, from the more advanced functionality within the Core to the deep OAuth support in the framework.

I built the security material as two full courses - Core and OAuth, to get practical with these more complex scenarios. We explore when and how to use each feature and code through it on the backing project.

You can explore the course here:

>> Learn Spring Security

Course – LSD – NPI EA (tag=Spring Data JPA)
announcement - icon

Spring Data JPA is a great way to handle the complexity of JPA with the powerful simplicity of Spring Boot.

Get started with Spring Data JPA through the guided reference course:

>> CHECK OUT THE COURSE

Partner – Moderne – NPI EA (cat=Spring Boot)
announcement - icon

Refactor Java code safely — and automatically — with OpenRewrite.

Refactoring big codebases by hand is slow, risky, and easy to put off. That’s where OpenRewrite comes in. The open-source framework for large-scale, automated code transformations helps teams modernize safely and consistently.

Each month, the creators and maintainers of OpenRewrite at Moderne run live, hands-on training sessions — one for newcomers and one for experienced users. You’ll see how recipes work, how to apply them across projects, and how to modernize code with confidence.

Join the next session, bring your questions, and learn how to automate the kind of work that usually eats your sprint time.

Partner – LambdaTest – NPI EA (cat=Testing)
announcement - icon

Regression testing is an important step in the release process, to ensure that new code doesn't break the existing functionality. As the codebase evolves, we want to run these tests frequently to help catch any issues early on.

The best way to ensure these tests run frequently on an automated basis is, of course, to include them in the CI/CD pipeline. This way, the regression tests will execute automatically whenever we commit code to the repository.

In this tutorial, we'll see how to create regression tests using Selenium, and then include them in our pipeline using GitHub Actions:, to be run on the LambdaTest cloud grid:

>> How to Run Selenium Regression Tests With GitHub Actions

Course – LJB – NPI EA (cat = Core Java)
announcement - icon

Code your way through and build up a solid, practical foundation of Java:

>> Learn Java Basics

1. Introduction

In this tutorial, we’ll explore how to dynamically route messages in Kafka Streams. Dynamic routing is particularly useful when the destination topic for a message depends on its content, enabling us to direct messages based on specific conditions or attributes within the payload. This kind of conditional routing finds real-world applications in various domains like IoT event handling, user activity tracking, and fraud detection.

We’ll walk through the problem of consuming messages from a single Kafka topic and conditionally routing them to multiple destination topics. The primary focus will be on how to set this up in a Spring Boot application using the Kafka Streams library.

2. Kafka Streams Routing Techniques

Dynamic routing of messages in Kafka Streams isn’t confined to a single approach but rather can be achieved using multiple techniques. Each has its distinct advantages, challenges, and suitability for various scenarios:

  • KStream Conditional Branching: The KStream.split().branch() method is the conventional means to segregate a stream based on predicates. While this method is easy to implement, it has limitations when it comes to scaling the number of conditions and can become less manageable.
  • Branching with KafkaStreamBrancher: This feature appeared in Spring Kafka version 2.2.4. It offers a more elegant and readable way to create branches in a Kafka Stream, eliminating the need for ‘magic numbers’ and allowing more fluid chaining of stream operations.
  • Dynamic Routing with TopicNameExtractor: Another method for topic routing is to use a TopicNameExtractor. This allows for a more dynamic topic selection at runtime based on the message key, value, or even the entire record context. However, it requires topics to be created in advance. This method affords more granular control over topic selection and is more adaptive to complex use cases.
  • Custom Processors: For scenarios requiring complex routing logic or multiple chained operations, we can apply custom processor nodes in the Kafka Streams topology. This approach is the most flexible but also the most complex to implement.

Throughout this article, we’ll focus on implementing the first three approaches—KStream Conditional Branching, Branching with KafkaStreamBrancher, and Dynamic Routing with TopicNameExtractor.

3. Setting Up Environment

In our scenario, we have a network of IoT sensors streaming various types of data, such as temperature, humidity, and motion to a centralized Kafka topic named iot_sensor_data. Each incoming message contains a JSON object with a field named sensorType that indicates the type of data the sensor is sending. Our aim is to dynamically route these messages to dedicated topics for each type of sensor data.

First, let’s establish a running Kafka instance. We can set up Kafka, Zookeeper, and Kafka UI using Docker, along with Docker Compose, by creating a docker-compose.yml file:

version: '3.8'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - 22181:2181
  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    ports:
      - 9092:9092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENERS: "INTERNAL://:29092,EXTERNAL://:9092"
      KAFKA_ADVERTISED_LISTENERS: "INTERNAL://kafka:29092,EXTERNAL://localhost:9092"
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT"
      KAFKA_INTER_BROKER_LISTENER_NAME: "INTERNAL"
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
  kafka_ui:
    image: provectuslabs/kafka-ui:latest
    depends_on:
      - kafka
    ports:
      - 8082:8080
    environment:
      KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181
      KAFKA_CLUSTERS_0_NAME: local
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092
  kafka-init-topics:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - kafka
    command: "bash -c 'echo Waiting for Kafka to be ready... && \
               cub kafka-ready -b kafka:29092 1 30 && \
               kafka-topics --create --topic iot_sensor_data --partitions 1 --replication-factor 1 --if-not-exists --bootstrap-server kafka:29092'"

Here we set all required environmental variables and dependencies between services. Furthermore, we are creating the iot_sensor_data topic by using specific commands in the kafka-init-topics service.

Now we can run Kafka inside Docker by executing docker-compose up -d.

Next, we have to add the Kafka Streams dependencies to the pom.xml file:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>3.6.1</version>`
</dependency>
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>3.1.2</version>
</dependency>

The first dependency is the org.apache.kafka.kafka-streams package, which provides Kafka Streams functionality. The subsequent Maven package, org.springframework.kafka.spring-kafka, facilitates the configuration and integration of Kafka with Spring Boot.

Another essential aspect is configuring the address of the Kafka broker. This is generally done by specifying the broker details in the application’s properties file. Let’s add this configuration along with other properties to our application.properties file:

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.streams.application-id=baeldung-streams
spring.kafka.consumer.group-id=baeldung-group
spring.kafka.streams.properties[default.key.serde]=org.apache.kafka.common.serialization.Serdes$StringSerde
kafka.topics.iot=iot_sensor_data

Next, let’s define a sample data class IotSensorData:

public class IotSensorData {
    private String sensorType;
    private String value;
    private String sensorId;
}

Lastly, we need to configure Serde for the serialization and deserialization of typed messages in Kafka:

@Bean
public Serde<IotSensorData> iotSerde() {
    return Serdes.serdeFrom(new JsonSerializer<>(), new JsonDeserializer<>(IotSensorData.class));
}

4. Implementing Dynamic Routing in Kafka Streams

After setting up the environment and installing the required dependencies, let’s focus on implementing dynamic routing logic in Kafka Streams.

Dynamic message routing can be an essential part of an event-driven application, as it enables the system to adapt to various types of data flows and conditions without requiring code changes.

4.1. KStream Conditional Branching

Branching in Kafka Streams allows us to take a single stream of data and split it into multiple streams based on some conditions. These conditions are provided as predicates that evaluate each message as it passes through the stream.

In recent versions of Kafka Streams, the branch() method has been deprecated in favor of the newer split().branch() method, which is designed to improve the API’s overall usability and flexibility. Nevertheless, we can apply it in the same way to split a KStream into multiple streams based on certain predicates.

Here we define the configuration that utilizes the split().branch() method for dynamic topic routing:

@Bean
public KStream<String, IotSensorData> iotStream(StreamsBuilder streamsBuilder) {
   KStream<String, IotSensorData> stream = streamsBuilder.stream(iotTopicName, Consumed.with(Serdes.String(), iotSerde()));
   stream.split()
     .branch((key, value) -> "temp".equals(value.getSensorType()), Branched.withConsumer((ks) -> ks.to(iotTopicName + "_temp")))
     .branch((key, value) -> "move".equals(value.getSensorType()), Branched.withConsumer((ks) -> ks.to(iotTopicName + "_move")))
     .branch((key, value) -> "hum".equals(value.getSensorType()), Branched.withConsumer((ks) -> ks.to(iotTopicName + "_hum")))
     .noDefaultBranch();
   return stream;
}

In the example above, we split the initial stream from the iot_sensor_data topic into multiple streams based on the sensorType property and route them to other topics accordingly.

If a target topic name can be generated based on the message content, we can use a lambda function within the to method for more dynamic topic routing:

@Bean
public KStream<String, IotSensorData> iotStreamDynamic(StreamsBuilder streamsBuilder) {
    KStream<String, IotSensorData> stream = streamsBuilder.stream(iotTopicName, Consumed.with(Serdes.String(), iotSerde()));
    stream.split()
      .branch((key, value) -> value.getSensorType() != null, 
        Branched.withConsumer(ks -> ks.to((key, value, recordContext) -> "%s_%s".formatted(iotTopicName, value.getSensorType()))))
      .noDefaultBranch();
    return stream;
}

This approach provides greater flexibility for routing messages dynamically based on their content if a topic name can be generated based on a message’s content.

4.2. Routing With KafkaStreamBrancher

The KafkaStreamBrancher class provides a builder-style API that allows easier chaining of branching conditions, making code more readable and maintainable.

The primary benefit is the removal of the complexities associated with managing an array of branched streams, which is how the original KStream.branch method works. Instead, KafkaStreamBrancher lets us define each branch along with operations that should happen to that branch, removing the need for magic numbers or complex indexing to identify the correct branch. This approach is closely related to the previous one discussed earlier due to the introduction of split().branch() method.

Let’s apply this approach to a stream:

@Bean
public KStream<String, IotSensorData> kStream(StreamsBuilder streamsBuilder) {
    KStream<String, IotSensorData> stream = streamsBuilder.stream(iotTopicName, Consumed.with(Serdes.String(), iotSerde()));
    new KafkaStreamBrancher<String, IotSensorData>()
      .branch((key, value) -> "temp".equals(value.getSensorType()), (ks) -> ks.to(iotTopicName + "_temp"))
      .branch((key, value) -> "move".equals(value.getSensorType()), (ks) -> ks.to(iotTopicName + "_move"))
      .branch((key, value) -> "hum".equals(value.getSensorType()), (ks) -> ks.to(iotTopicName + "_hum"))
      .defaultBranch(ks -> ks.to("%s_unknown".formatted(iotTopicName)))
      .onTopOf(stream);
    return stream;
}

We’ve applied Fluent API to route the message to a specific topic.  Similarly, we can use a single branch() method call to route to multiple topics by using content as a part of a topic name:

@Bean
public KStream<String, IotSensorData> iotBrancherStream(StreamsBuilder streamsBuilder) {
    KStream<String, IotSensorData> stream = streamsBuilder.stream(iotTopicName, Consumed.with(Serdes.String(), iotSerde()));
    new KafkaStreamBrancher<String, IotSensorData>()
      .branch((key, value) -> value.getSensorType() != null, (ks) ->
        ks.to((key, value, recordContext) -> String.format("%s_%s", iotTopicName, value.getSensorType())))
      .defaultBranch(ks -> ks.to("%s_unknown".formatted(iotTopicName)))
      .onTopOf(stream);
    return stream;
}

By providing a higher level of abstraction for branching logic, KafkaStreamBrancher not only makes the code cleaner but also enhances its manageability, especially for applications with complex routing requirements.

4.3. Dynamic Topic Routing With TopicNameExtractor

Another approach to manage conditional branching in Kafka Streams is by using a TopicNameExtractor which, as the name suggests, extracts the topic name dynamically for each message in the stream. This method can be more straightforward for certain use cases compared to the previously discussed split().branch() and KafkaStreamBrancher approaches.

Here’s a sample configuration using TopicNameExtractor in a Spring Boot application:

@Bean
public KStream<String, IotSensorData> kStream(StreamsBuilder streamsBuilder) {
    KStream<String, IotSensorData> stream = streamsBuilder.stream(iotTopicName, Consumed.with(Serdes.String(), iotSerde()));
    TopicNameExtractor<String, IotSensorData> sensorTopicExtractor = (key, value, recordContext) -> "%s_%s".formatted(iotTopicName, value.getSensorType());
    stream.to(sensorTopicExtractor);
    return stream;
}

While the TopicNameExtractor method is proficient in its primary function of routing records to specific topics, it has some limitations when compared to other approaches like split().branch() and KafkaStreamBrancher. Specifically, TopicNameExtractor doesn’t provide the option to perform additional transformations like mapping or filtering within the same routing step.

5. Conclusion

In this article, we’ve seen different approaches for dynamic topic routing using Kafka Streams and Spring Boot.

We began by exploring the modern branching mechanisms like the split().branch() method and the KafkaStreamBrancher class. Furthermore, we examined the dynamic topic routing capabilities offered by TopicNameExtractor.

Each technique presents its advantages and challenges. For instance, the split().branch() can be cumbersome when handling numerous conditions, whereas the TopicNameExtractor provides a structured flow but restricts certain inline data processes. As a result, grasping the subtle differences of each approach is vital for creating an effective routing implementation.

The code backing this article is available on GitHub. Once you're logged in as a Baeldung Pro Member, start learning and coding on the project.
Baeldung Pro – NPI EA (cat = Baeldung)
announcement - icon

Baeldung Pro comes with both absolutely No-Ads as well as finally with Dark Mode, for a clean learning experience:

>> Explore a clean Baeldung

Once the early-adopter seats are all used, the price will go up and stay at $33/year.

eBook – HTTP Client – NPI EA (cat=HTTP Client-Side)
announcement - icon

The Apache HTTP Client is a very robust library, suitable for both simple and advanced use cases when testing HTTP endpoints. Check out our guide covering basic request and response handling, as well as security, cookies, timeouts, and more:

>> Download the eBook

eBook – Java Concurrency – NPI EA (cat=Java Concurrency)
announcement - icon

Handling concurrency in an application can be a tricky process with many potential pitfalls. A solid grasp of the fundamentals will go a long way to help minimize these issues.

Get started with understanding multi-threaded applications with our Java Concurrency guide:

>> Download the eBook

eBook – Java Streams – NPI EA (cat=Java Streams)
announcement - icon

Since its introduction in Java 8, the Stream API has become a staple of Java development. The basic operations like iterating, filtering, mapping sequences of elements are deceptively simple to use.

But these can also be overused and fall into some common pitfalls.

To get a better understanding on how Streams work and how to combine them with other language features, check out our guide to Java Streams:

>> Join Pro and download the eBook

eBook – Persistence – NPI EA (cat=Persistence)
announcement - icon

Working on getting your persistence layer right with Spring?

Explore the eBook

Course – LS – NPI EA (cat=REST)

announcement - icon

Get started with Spring Boot and with core Spring, through the Learn Spring course:

>> CHECK OUT THE COURSE

Partner – Moderne – NPI EA (tag=Refactoring)
announcement - icon

Modern Java teams move fast — but codebases don’t always keep up. Frameworks change, dependencies drift, and tech debt builds until it starts to drag on delivery. OpenRewrite was built to fix that: an open-source refactoring engine that automates repetitive code changes while keeping developer intent intact.

The monthly training series, led by the creators and maintainers of OpenRewrite at Moderne, walks through real-world migrations and modernization patterns. Whether you’re new to recipes or ready to write your own, you’ll learn practical ways to refactor safely and at scale.

If you’ve ever wished refactoring felt as natural — and as fast — as writing code, this is a good place to start.

eBook Jackson – NPI EA – 3 (cat = Jackson)