Course – LS – All

Get started with Spring and Spring Boot, through the Learn Spring course:

>> CHECK OUT THE COURSE

1. Overview

Kafka consumer group lag is a key performance indicator of any Kafka-based event-driven system.

In this tutorial, we’ll build an analyzer application to monitor Kafka consumer lag.

2. Consumer Lag

Consumer lag is simply the delta between the consumer’s last committed offset and the producer’s end offset in the log. In other words, the consumer lag measures the delay between producing and consuming messages in any producer-consumer system.

In this section, let’s understand how we can determine the offset values.

2.1. Kafka AdminClient

To inspect the offset values of a consumer group, we’ll need the administrative Kafka client. So, let’s write a method in the LagAnalyzerService class to create an instance of the AdminClient class:

private AdminClient getAdminClient(String bootstrapServerConfig) {
    Properties config = new Properties();
    config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServerConfig);
    return AdminClient.create(config);
}

We must note the use of @Value annotation to retrieve the bootstrap server list from the property file. In the same way, we’ll use this annotation to get other values such as groupId and topicName.

2.2. Consumer Group Offset

First, we can use the listConsumerGroupOffsets() method of the AdminClient class to fetch the offset information of a specific consumer group id.

Next, our focus is mainly on the offset values, so we can invoke the partitionsToOffsetAndMetadata() method to get a map of TopicPartition vs. OffsetAndMetadata values:

private Map<TopicPartition, Long> getConsumerGrpOffsets(String groupId) 
  throws ExecutionException, InterruptedException {
    ListConsumerGroupOffsetsResult info = adminClient.listConsumerGroupOffsets(groupId);
    Map<TopicPartition, OffsetAndMetadata> topicPartitionOffsetAndMetadataMap = info.partitionsToOffsetAndMetadata().get();

    Map<TopicPartition, Long> groupOffset = new HashMap<>();
    for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : topicPartitionOffsetAndMetadataMap.entrySet()) {
        TopicPartition key = entry.getKey();
        OffsetAndMetadata metadata = entry.getValue();
        groupOffset.putIfAbsent(new TopicPartition(key.topic(), key.partition()), metadata.offset());
    }
    return groupOffset;
}

Lastly, we can notice the iteration over the topicPartitionOffsetAndMetadataMap to limit our fetched results to the offset values per each topic and partition.

2.3. Producer Offset

The only thing left for finding the consumer group lag is a way of getting the end offset values. For this, we can use the endOffsets() method of the KafkaConsumer class.

Let’s start by creating an instance of the KafkaConsumer class in the LagAnalyzerService class:

private KafkaConsumer<String, String> getKafkaConsumer(String bootstrapServerConfig) {
    Properties properties = new Properties();
    properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServerConfig);
    properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    return new KafkaConsumer<>(properties);
}

Next, let’s aggregate all the relevant TopicPartition values from the consumer group offsets for which we need to compute the lag so that we provide it as an argument to the endOffsets() method:

private Map<TopicPartition, Long> getProducerOffsets(Map<TopicPartition, Long> consumerGrpOffset) {
    List<TopicPartition> topicPartitions = new LinkedList<>();
    for (Map.Entry<TopicPartition, Long> entry : consumerGrpOffset.entrySet()) {
        TopicPartition key = entry.getKey();
        topicPartitions.add(new TopicPartition(key.topic(), key.partition()));
    }
    return kafkaConsumer.endOffsets(topicPartitions);
}

Finally, let’s write a method that uses consumer offsets and producer’s endoffsets to generate the lag for each TopicPartition:

private Map<TopicPartition, Long> computeLags(
  Map<TopicPartition, Long> consumerGrpOffsets,
  Map<TopicPartition, Long> producerOffsets) {
    Map<TopicPartition, Long> lags = new HashMap<>();
    for (Map.Entry<TopicPartition, Long> entry : consumerGrpOffsets.entrySet()) {
        Long producerOffset = producerOffsets.get(entry.getKey());
        Long consumerOffset = consumerGrpOffsets.get(entry.getKey());
        long lag = Math.abs(producerOffset - consumerOffset);
        lags.putIfAbsent(entry.getKey(), lag);
    }
    return lags;
}

3. Lag Analyzer

Now, let’s orchestrate the lag analysis by writing the analyzeLag() method in the LagAnalyzerService class:

public void analyzeLag(String groupId) throws ExecutionException, InterruptedException {
    Map<TopicPartition, Long> consumerGrpOffsets = getConsumerGrpOffsets(groupId);
    Map<TopicPartition, Long> producerOffsets = getProducerOffsets(consumerGrpOffsets);
    Map<TopicPartition, Long> lags = computeLags(consumerGrpOffsets, producerOffsets);
    for (Map.Entry<TopicPartition, Long> lagEntry : lags.entrySet()) {
        String topic = lagEntry.getKey().topic();
        int partition = lagEntry.getKey().partition();
        Long lag = lagEntry.getValue();
        LOGGER.info("Time={} | Lag for topic = {}, partition = {}, groupId = {} is {}",
          MonitoringUtil.time(),
          topic,
          partition,
          lag);
    }
}

However, when it comes to monitoring the lag metric, we’d need an almost real-time value of the lag so that we can take any administrative action for recovering system performance.

One straightforward way of achieving this is by polling the lag value at a regular interval of time. So, let’s create a LiveLagAnalyzerService service that will invoke the analyzeLag() method of the LagAnalyzerService:

@Scheduled(fixedDelay = 5000L)
public void liveLagAnalysis() throws ExecutionException, InterruptedException {
    lagAnalyzerService.analyzeLag(groupId);
}

For our purpose, we have set the poll frequency as 5 seconds using the @Scheduled annotation. However, for real-time monitoring, we’d probably need to make this accessible via JMX.

4. Simulation

In this section, we’ll simulate Kafka producer and consumer for a local Kafka setup so that we can see LagAnalyzer in action without depending on an external Kafka producer and consumer.

4.1. Simulation Mode

Since simulation mode is required only for demonstration purposes, we should have a mechanism to turn it off when we want to run the Lag Analyzer application for a real scenario.

We can keep this as a configurable property in the application.properties resource file:

monitor.producer.simulate=true
monitor.consumer.simulate=true

We’ll plug these properties into the Kafka producer and consumer and control their behavior.

Additionally, let’s define producer startTime, endTime, and a helper method time() to get the current time during the monitoring:

public static final Date startTime = new Date();
public static final Date endTime = new Date(startTime.getTime() + 30 * 1000);

public static String time() {
    DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy/MM/dd HH:mm:ss");
    LocalDateTime now = LocalDateTime.now();
    String date = dtf.format(now);
    return date;
}

4.2. Producer-Consumer Configurations

We’ll need to define a few core configuration values for instantiating the instances for our Kafka consumer and producer simulators.

First, let’s define the configuration for the consumer simulator in the KafkaConsumerConfig class:

@Bean
public ConsumerFactory<String, String> consumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 0);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, this.groupId);
    return new DefaultKafkaConsumerFactory<>(props);
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(
  @Qualifier("consumerFactory") ConsumerFactory<String, String> consumerFactory) {
    ConcurrentKafkaListenerContainerFactory<String, String> listenerContainerFactory = 
      new ConcurrentKafkaListenerContainerFactory<>();
    listenerContainerFactory.setConsumerFactory(consumerFactory);
    return listenerContainerFactory;
}

Next, we can define the configuration for the producer simulator in the KafkaProducerConfig class:

@Bean
public ProducerFactory<String, String> producerFactory() {
    Map<String, Object> configProps = new HashMap<>();
    configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
    configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    return new DefaultKafkaProducerFactory<>(configProps);
}

@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
    return new KafkaTemplate<>(producerFactory());
}

Further, let’s use the @KafkaListener annotation to specify the target listener, which is, of course, enabled only when monitor.consumer.simulate is set to true:

@KafkaListener(
  topics = "${monitor.topic.name}",
  containerFactory = "kafkaListenerContainerFactory",
  autoStartup = "${monitor.consumer.simulate}")
public void listen(String message) throws InterruptedException {
    Thread.sleep(10L);
}

As such, we added a sleeping time of 10 milliseconds to create an artificial consumer lag.

Finally, let’s write a sendMessage() method to simulate the producer:

@Scheduled(fixedDelay = 1L, initialDelay = 5L)
public void sendMessage() throws ExecutionException, InterruptedException {
    if (enabled) {
        if (endTime.after(new Date())) {
            String message = "msg-" + time();
            SendResult<String, String> result = kafkaTemplate.send(topicName, message).get();
        }
    }
}

We can notice that the producer will generate messages at the rate of 1 message/ms. Moreover, it’ll stop producing messages after the endTime of 30 seconds after startTime of the simulation.

4.3. Live Monitoring

Now, let’s run the main method in our LagAnalyzerApplication:

public static void main(String[] args) {
    SpringApplication.run(LagAnalyzerApplication.class, args);
    while (true) ;
}

We’ll see the current lag on each partition of the topic after every 30 seconds:

Time=2021/06/06 11:07:24 | Lag for topic = baeldungTopic, partition = 0, groupId =baeldungGrp is 93
Time=2021/06/06 11:07:29 | Lag for topic = baeldungTopic, partition = 0, groupId =baeldungGrp is 290
Time=2021/06/06 11:07:34 | Lag for topic = baeldungTopic, partition = 0, groupId =baeldungGrp is 776
Time=2021/06/06 11:07:39 | Lag for topic = baeldungTopic, partition = 0, groupId =baeldungGrp is 1159
Time=2021/06/06 11:07:44 | Lag for topic = baeldungTopic, partition = 0, groupId =baeldungGrp is 1559
Time=2021/06/06 11:07:49 | Lag for topic = baeldungTopic, partition = 0, groupId =baeldungGrp is 2015
Time=2021/06/06 11:07:54 | Lag for topic = baeldungTopic, partition = 0, groupId =baeldungGrp is 1231
Time=2021/06/06 11:07:59 | Lag for topic = baeldungTopic, partition = 0, groupId =baeldungGrp is 731
Time=2021/06/06 11:08:04 | Lag for topic = baeldungTopic, partition = 0, groupId =baeldungGrp is 231
Time=2021/06/06 11:08:09 | Lag for topic = baeldungTopic, partition = 0, groupId =baeldungGrp is 0

As such, the rate at which the producer is producing messages is 1 message/ms, which is higher than the rate at which the consumer is consuming the message. So, lag will start building for the first 30 seconds, after which the producer stops producing, so lag will gradually decline to 0.

5. Monitoring Consumer Lag via Actuator Endpoint

For a Spring Boot application with Kafka consumers, we can get the consumer lag metrics using Micrometer and expose them to the actuator endpoint. Let’s see how we can do this.

5.1. Enabling Actuator Endpoint

First, we need to add the spring-boot-starter-actuator dependency in the project’s pom.xml file:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-actuator</artifactId>
    <version>3.2.2.</version>
</dependency>

Now, let’s also enable the /actuator endpoint by configuring the application.properties file:

management.endpoints.web.base-path=/actuator
management.endpoints.web.exposure.include=*
management.endpoint.health.show-details=always

Finally, let’s also set the port for our application to be different from 8080:

server.port=8081

We must note that the Zookeeper process runs a web console at the port 8080 . So, we must use a different port for our Spring Boot application if we’re running Zookeeper on the local machine.

5.2. Configuring Metrics Using Micrometer

We can get the Kafka consumer metrics using the Micrometer library. In this section, we’ll expose the consumer metrics for the Prometheus monitoring system.

First, we must add the micrometer-registry-prometheus dependency in the project’s pom.xml file:

<dependency>
    <groupId>io.micrometer</groupId>
    <artifactId>micrometer-registry-prometheus</artifactId>
    <version>1.12.2</version>
</dependency>

Next, let’s enable JMX and the /actuator/prometheus endpoint for our application:

management.endpoint.prometheus.enabled=true
spring.jmx.enabled=false

Moving on, let’s add an instance of the MeterRegistry class as a member to the KafkaConsumerConfig class:

@Autowired
private MeterRegistry meterRegistry;

Finally, we’re ready to add the MicrometerConsumerListener to the consumerFactory bean:

@Bean
public ConsumerFactory<String, String> consumerFactory() {
    Map<String, Object> props = getConsumerConfig(this.groupId);
    DefaultKafkaConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory<>(props);
    consumerFactory.addListener(new MicrometerConsumerListener<>(this.meterRegistry));
    return consumerFactory;
}

That’s it! We’re ready to run the application and monitor the Kafka consumer metrics.

5.3. Monitoring Consumer Lag Metric

We can start the application and visit the /actuator/prometheus endpoint to see the kafka_consumer_* metrics. Amongst other metrics, the kafka_consumer_fetch_manager_records_lag metric shows the current lag information:

kafka_consumer_fetch_manager_records_lag{client_id="consumer-baeldungGrp-2",kafka_version="3.3.1",partition="0",spring_id="consumerFactory.consumer-baeldungGrp-2",topic="baeldung",} 21447.0

Further, let’s write a script to get the lag periodically and show the current lag in an almost real-time manner:

$ while true
do
curl --silent -XGET http://localhost:8081/actuator/prometheus | \
awk '/kafka_consumer_fetch_manager_records_lag{/{print "Current lag is:",$2}'; 
sleep 5;
done
Current lag is: 11375.0
Current lag is: 10875.0
Current lag is: 10375.0
Current lag is: 9875.0

Great! We’ve successfully integrated the Kafka consumer metrics and exposed them via the actuator endpoint.

6. Conclusion

In this tutorial, we developed an understanding of how to find the consumer lag on a Kafka topic. Additionally, we used that knowledge to create a LagAnalyzer application in spring that shows the lag in almost real-time.

As always, the complete source code for the tutorial is available over on GitHub.

Course – LS – All

Get started with Spring and Spring Boot, through the Learn Spring course:

>> CHECK OUT THE COURSE
res – REST with Spring (eBook) (everywhere)
2 Comments
Oldest
Newest
Inline Feedbacks
View all comments
Comments are open for 30 days after publishing a post. For any issues past this date, use the Contact form on the site.