Partner – Microsoft – NPI (cat= Spring)
announcement - icon

Azure Spring Apps is a fully managed service from Microsoft (built in collaboration with VMware), focused on building and deploying Spring Boot applications on Azure Cloud without worrying about Kubernetes.

And, the Enterprise plan comes with some interesting features, such as commercial Spring runtime support, a 99.95% SLA and some deep discounts (up to 47%) when you are ready for production.

>> Learn more and deploy your first Spring Boot app to Azure.

You can also ask questions and leave feedback on the Azure Spring Apps GitHub page.

1. Introduction

In this tutorial, we’ll explore Kafka topics and partitions and how they relate to each other.

2. What Is a Kafka Topic

A topic is a storage mechanism for a sequence of events. Essentially, topics are durable log files that keep events in the same order as they occur in time. So, each new event is always added to the end of the log. Additionally, events are immutable. Thus, we can’t change them after they’ve been added to a topic.

An example use case for Kafka topics is recording a sequence of temperature measurements for a room. Once a temperature value has been recorded, like 25 C at 5:02 PM, it cannot be altered as it has already occurred. Furthermore, a temperature value at 5:06 PM cannot precede the one recorded at 5:02 PM. Hence, by treating each temperature measurement as an event, a Kafka topic would be a suitable option to store that data.

3. What Is a Kafka Partition

Kafka uses topic partitioning to improve scalability. In partitioning a topic, Kafka breaks it into fractions and stores each of them in different nodes of its distributed system. That number of fractions is determined by us or by the cluster default configurations.

Kafka guarantees the order of the events within the same topic partition. However, by default, it does not guarantee the order of events across all partitions.

For example, to improve performance, we can divide the topic into two different partitions and read from them on the consumer side. In that case, a consumer reads the events in the same order they arrived at the same partition. In contrast, if Kafka delivers two events to different partitions, we can’t guarantee that the consumer reads the events in the same order they were produced.

To improve the ordering of events, we can set an event key to the event object. With that, events with the same key are assigned to the same partition, which is ordered. Thus, events with the same key arrive at the consumer side in the same order they were produced.

4. Consumer Groups

A consumer group is a set of consumers that reads from a topic. Kafka divides all partitions among the consumers in a group, where any given partition is always consumed once by a group member. However, that division might be unbalanced, which means that more than one partition can be assigned to a consumer.

For instance, let’s picture a topic with three partitions that a consumer group with two consumers should read. Hence, one possible division is that the first consumer gets partitions one and two, and the second consumer only gets partition three.

In the KIP-500 update, Kafka introduced a new consensus algorithm named KRaft. As we add consumers to a group or remove consumers from a group, KRaft rebalances the partitions between the remaining consumers proportionally. Thus, it guarantees that there’s no partition without a consumer assigned.

5. Configure the Application

In this section, we’ll create the classes to configure a topic, consumer, and producer service.

5.1. Topic Configuration

First, let’s create the configuration class for our topic:

@Configuration
public class KafkaTopicConfig {

    @Value(value = "${spring.kafka.bootstrap-servers}")
    private String bootstrapAddress;

    public KafkaAdmin kafkaAdmin() {
        Map<String, Object> configs = new HashMap<>();
        configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        return new KafkaAdmin(configs);
    }

    public NewTopic celciusTopic() {
        return TopicBuilder.name("celcius-scale-topic")
                .partitions(2)
                .build();
    }
}

The KafkaTopicConfig class injects two Spring beans. The KafkaAdmin bean initiates the Kafka cluster with the network address it should run, while the NewTopic bean creates a topic named celcius-scale-topic with one partition.

5.2. Consumer and Producer Configuration

We need the necessary classes to inject the producer and consumer configurations for our topic.

First, let’s create the producer configuration class:

public class KafkaProducerConfig {

    @Value(value = "${spring.kafka.bootstrap-servers}")
    private String bootstrapAddress;

    @Bean
    public ProducerFactory<String, Double> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, DoubleSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, Double> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

The KafkaProducerConfig injects two Spring beans. The ProducerFactory tells how Kafka is supposed to serialize events and which server the producer should listen to. The KafkaTemplate will be used in the consumer service class to create events.

5.3. Kafka Producer Service

Finally, after the initial configurations, we can create the driver application. Let’s first create the producer application:

public class ThermostatService {

    private final KafkaTemplate<String, Double> kafkaTemplate;

    public ThermostatService(KafkaTemplate<String, Double> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void measureCelsiusAndPublish(int numMeasurements) {
        new Random().doubles(25, 35)
                .limit(numMeasurements)
                .forEach(tmp -> {
                    kafkaTemplate.send("celcius-scale-topic", tmp);
                });
    }
}

The ThermostatService contains a single method named measureCelsiusAndPublish. This method produces random temperature measurements in the range [25, 35] and publishes to the celsius-scale-topic Kafka topic. To achieve this, we use the doubles() method of the Random class to create a stream of random numbers. Then, we publish the event using the send() method of kafkaTemplate.

6. Producing and Consuming Events

In this section, we’ll see how to configure a Kafka consumer to read events from the topic using an embedded Kafka broker.

6.1. Create the Consumer Service

To consume events, we need one or more consumer classes. Let’s create one consumer of the celcius-scale-topic:

@Service
public class TemperatureConsumer {
    Map<String, Set<String>> consumedRecords = new ConcurrentHashMap<>();

    @KafkaListener(topics = "celcius-scale-topic", groupId = "group-1")
    public void consumer1(ConsumerRecord<?, ?> consumerRecord) {
        trackConsumedPartitions("consumer-1", consumerRecord.partition());
    }

    private void trackConsumedPartitions(String consumerName, int partitionNumber) {
        consumedRecords.computeIfAbsent(consumerName, k -> new HashSet<>());
        consumedRecords.computeIfPresent(consumerName, (k, v) -> {
            v.add(String.valueOf(partitionNumber));
            return v;
        });
    }
}

Our consumer1() method uses the @KafkaListener annotation to initiate the consumer. The topics argument is a list of topics to consume, while the groupId argument identifies the consumer group to which the consumer belongs.

To visualize the results later, we’ve used a ConcurrentHashMap to store the events consumed. The key corresponds to the consumer’s name, whereas the value contains the partitions that it consumed from.

6.2. Create the Test Class

Now, let’s create our integration test class:

@SpringBootTest(classes = ThermostatApplicationKafkaApp.class)
@EmbeddedKafka(partitions = 2, brokerProperties = {"listeners=PLAINTEXT://localhost:9092", "port=9092"})
public class KafkaTopicsAndPartitionsIntegrationTest {
    @ClassRule
    public static EmbeddedKafkaBroker embeddedKafka = new EmbeddedKafkaBroker(1, true, "multitype");

    @Autowired
    private ThermostatService service;

    @Autowired
    private TemperatureConsumer consumer;

    @Test
    public void givenTopic_andConsumerGroup_whenConsumersListenToEvents_thenConsumeItCorrectly() throws Exception {
        service.measureCelsiusAndPublish(10000);
        Thread.sleep(1000);
        System.out.println(consumer.consumedRecords);
    }
}

We’re using an embedded Kafka broker to run the test with Kafka. The @EmbeddedKafka annotation uses the argument brokerProperties to configure the URL and port the broker will run on. Then, we start the embedded broker using a JUnit rule in the EmbeddedKafkaBroker field.

Finally, in the test method, we call our thermostat service to produce 10,000 events.

We’ll use Thread.sleep() to wait 1 second after the events are produced. This ensures the consumers are properly set up in the broker to start processing messages.

Let’s see an example of the output we’ll get when we run the test:

{consumer-1=[0, 1]}

That means the same consumer processed all events in partitions 0 and 1 since we have only one consumer and one consumer group. This result may vary if there are more consumers in different consumer groups.

7. Conclusion

In this article, we’ve looked at the definitions of Kafka topics and partitions and how they relate to each other.

We’ve also illustrated a scenario of a consumer reading events from both partitions of a topic using an embedded Kafka broker.

As always, the example code is available over on GitHub.

Course – LS (cat=Spring)

Get started with Spring and Spring Boot, through the Learn Spring course:

>> THE COURSE
res – REST with Spring (eBook) (everywhere)
4 Comments
Oldest
Newest
Inline Feedbacks
View all comments
Comments are open for 30 days after publishing a post. For any issues past this date, use the Contact form on the site.