Generic Top

Get started with Spring 5 and Spring Boot 2, through the Learn Spring course:

>> CHECK OUT THE COURSE

1. Overview

Kafka consumer group lag is a key performance indicator of any Kafka-based event-driven system.

In this tutorial, we'll build an analyzer application to monitor Kafka consumer lag.

2. Consumer Lag

Consumer lag is simply the delta between the consumer's last committed offset and the producer's end offset in the log. In other words, the consumer lag measures the delay between producing and consuming messages in any producer-consumer system.

In this section, let's understand how we can determine the offset values.

2.1. Kafka AdminClient

To inspect the offset values of a consumer group, we'll need the administrative Kafka client. So, let's write a method in the LagAnalyzerService class to create an instance of the AdminClient class:

private AdminClient getAdminClient(String bootstrapServerConfig) {
    Properties config = new Properties();
    config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServerConfig);
    return AdminClient.create(config);
}

We must note the use of @Value annotation to retrieve the bootstrap server list from the property file. In the same way, we'll use this annotation to get other values such as groupId and topicName.

2.2. Consumer Group Offset

First, we can use the listConsumerGroupOffsets() method of the AdminClient class to fetch the offset information of a specific consumer group id.

Next, our focus is mainly on the offset values, so we can invoke the partitionsToOffsetAndMetadata() method to get a map of TopicPartition vs. OffsetAndMetadata values:

private Map<TopicPartition, Long> getConsumerGrpOffsets(String groupId) 
  throws ExecutionException, InterruptedException {
    ListConsumerGroupOffsetsResult info = adminClient.listConsumerGroupOffsets(groupId);
    Map<TopicPartition, OffsetAndMetadata> topicPartitionOffsetAndMetadataMap = info.partitionsToOffsetAndMetadata().get();

    Map<TopicPartition, Long> groupOffset = new HashMap<>();
    for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : topicPartitionOffsetAndMetadataMap.entrySet()) {
        TopicPartition key = entry.getKey();
        OffsetAndMetadata metadata = entry.getValue();
        groupOffset.putIfAbsent(new TopicPartition(key.topic(), key.partition()), metadata.offset());
    }
    return groupOffset;
}

Lastly, we can notice the iteration over the topicPartitionOffsetAndMetadataMap to limit our fetched results to the offset values per each topic and partition.

2.3. Producer Offset

The only thing left for finding the consumer group lag is a way of getting the end offset values. For this, we can use the endOffsets() method of the KafkaConsumer class.

Let's start by creating an instance of the KafkaConsumer class in the LagAnalyzerService class:

private KafkaConsumer<String, String> getKafkaConsumer(String bootstrapServerConfig) {
    Properties properties = new Properties();
    properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServerConfig);
    properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    return new KafkaConsumer<>(properties);
}

Next, let's aggregate all the relevant TopicPartition values from the consumer group offsets for which we need to compute the lag so that we provide it as an argument to the endOffsets() method:

private Map<TopicPartition, Long> getProducerOffsets(Map<TopicPartition, Long> consumerGrpOffset) {
    List<TopicPartition> topicPartitions = new LinkedList<>();
    for (Map.Entry<TopicPartition, Long> entry : consumerGrpOffset.entrySet()) {
        TopicPartition key = entry.getKey();
        topicPartitions.add(new TopicPartition(key.topic(), key.partition()));
    }
    return kafkaConsumer.endOffsets(topicPartitions);
}

Finally, let's write a method that uses consumer offsets and producer's endoffsets to generate the lag for each TopicPartition:

private Map<TopicPartition, Long> computeLags(
  Map<TopicPartition, Long> consumerGrpOffsets,
  Map<TopicPartition, Long> producerOffsets) {
    Map<TopicPartition, Long> lags = new HashMap<>();
    for (Map.Entry<TopicPartition, Long> entry : consumerGrpOffsets.entrySet()) {
        Long producerOffset = producerOffsets.get(entry.getKey());
        Long consumerOffset = consumerGrpOffsets.get(entry.getKey());
        long lag = Math.abs(producerOffset - consumerOffset);
        lags.putIfAbsent(entry.getKey(), lag);
    }
    return lags;
}

3. Lag Analyzer

Now, let's orchestrate the lag analysis by writing the analyzeLag() method in the LagAnalyzerService class:

public void analyzeLag(String groupId) throws ExecutionException, InterruptedException {
    Map<TopicPartition, Long> consumerGrpOffsets = getConsumerGrpOffsets(groupId);
    Map<TopicPartition, Long> producerOffsets = getProducerOffsets(consumerGrpOffsets);
    Map<TopicPartition, Long> lags = computeLags(consumerGrpOffsets, producerOffsets);
    for (Map.Entry<TopicPartition, Long> lagEntry : lags.entrySet()) {
        String topic = lagEntry.getKey().topic();
        int partition = lagEntry.getKey().partition();
        Long lag = lagEntry.getValue();
        System.out.printf("Time=%s | Lag for topic = %s, partition = %s is %d\n",
          MonitoringUtil.time(),
          topic,
          partition,
          lag);
    }
}

However, when it comes to monitoring the lag metric, we'd need an almost real-time value of the lag so that we can take any administrative action for recovering system performance.

One straightforward way of achieving this is by polling the lag value at a regular interval of time. So, let's create a LiveLagAnalyzerService service that will invoke the analyzeLag() method of the LagAnalyzerService:

@Scheduled(fixedDelay = 5000L)
public void liveLagAnalysis() throws ExecutionException, InterruptedException {
    lagAnalyzerService.analyzeLag(groupId);
}

For our purpose, we have set the poll frequency as 5 seconds using the @Scheduled annotation. However, for real-time monitoring, we'd probably need to make this accessible via JMX.

4. Simulation

In this section, we'll simulate Kafka producer and consumer for a local Kafka setup so that we can see LagAnalyzer in action without depending on an external Kafka producer and consumer.

4.1. Simulation Mode

Since simulation mode is required only for demonstration purposes, we should have a mechanism to turn it off when we want to run the Lag Analyzer application for a real scenario.

We can keep this as a configurable property in the application.properties resource file:

monitor.producer.simulate=true
monitor.consumer.simulate=true

We'll plug these properties into the Kafka producer and consumer and control their behavior.

Additionally, let's define producer startTime, endTime, and a helper method time() to get the current time during the monitoring:

public static final Date startTime = new Date();
public static final Date endTime = new Date(startTime.getTime() + 30 * 1000);

public static String time() {
    DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy/MM/dd HH:mm:ss");
    LocalDateTime now = LocalDateTime.now();
    String date = dtf.format(now);
    return date;
}

4.2. Producer-Consumer Configurations

We'll need to define few core configuration values for instantiating the instances for our Kafka consumer and producer simulators.

First, let's define the configuration for the consumer simulator in the KafkaConsumerConfig class:

public ConsumerFactory<String, String> consumerFactory(String groupId) {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
    if (enabled) {
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    } else {
        props.put(ConsumerConfig.GROUP_ID_CONFIG, simulateGroupId);
    }
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 0);
    return new DefaultKafkaConsumerFactory<>(props);
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    if (enabled) {
        factory.setConsumerFactory(consumerFactory(groupId));
    } else {
        factory.setConsumerFactory(consumerFactory(simulateGroupId));
    }
    return factory;
}

Next, we can define the configuration for the producer simulator in the KafkaProducerConfig class:

@Bean
public ProducerFactory<String, String> producerFactory() {
    Map<String, Object> configProps = new HashMap<>();
    configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
    configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    return new DefaultKafkaProducerFactory<>(configProps);
}

@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
    return new KafkaTemplate<>(producerFactory());
}

Further, let's use the @KafkaListener annotation to specify the target listener, which is, of course, enabled only when monitor.consumer.simulate is set to true:

@KafkaListener(
  topics = "${monitor.topic.name}",
  containerFactory = "kafkaListenerContainerFactory",
  autoStartup = "${monitor.consumer.simulate}")
public void listen(String message) throws InterruptedException {
    Thread.sleep(10L);
}

As such, we added a sleeping time of 10 milliseconds to create an artificial consumer lag.

Finally, let's write a sendMessage() method to simulate the producer:

@Scheduled(fixedDelay = 1L, initialDelay = 5L)
public void sendMessage() throws ExecutionException, InterruptedException {
    if (enabled) {
        if (endTime.after(new Date())) {
            String message = "msg-" + time();
            SendResult<String, String> result = kafkaTemplate.send(topicName, message).get();
        }
    }
}

We can notice that the producer will generate messages at the rate of 1 message/ms. Moreover, it'll stop producing messages after the endTime of 30 seconds after startTime of the simulation.

4.3. Live Monitoring

Now, let's run the main method in our LagAnalyzerApplication:

public static void main(String[] args) {
    SpringApplication.run(LagAnalyzerApplication.class, args);
    while (true) ;
}

We'll see the current lag on each partition of the topic after every 30 seconds:

Time=2021/06/06 11:07:24 | Lag for topic = baeldungTopic, partition = 0 is 93
Time=2021/06/06 11:07:29 | Lag for topic = baeldungTopic, partition = 0 is 290
Time=2021/06/06 11:07:34 | Lag for topic = baeldungTopic, partition = 0 is 776
Time=2021/06/06 11:07:39 | Lag for topic = baeldungTopic, partition = 0 is 1159
Time=2021/06/06 11:07:44 | Lag for topic = baeldungTopic, partition = 0 is 1559
Time=2021/06/06 11:07:49 | Lag for topic = baeldungTopic, partition = 0 is 2015
Time=2021/06/06 11:07:54 | Lag for topic = baeldungTopic, partition = 0 is 1231
Time=2021/06/06 11:07:59 | Lag for topic = baeldungTopic, partition = 0 is 731
Time=2021/06/06 11:08:04 | Lag for topic = baeldungTopic, partition = 0 is 231
Time=2021/06/06 11:08:09 | Lag for topic = baeldungTopic, partition = 0 is 0

As such, the rate at which the producer is producing messages is 1 message/ms, which is higher than the rate at which the consumer is consuming the message. So, lag will start building for the first 30 seconds, after which the producer stops producing, so lag will gradually decline to 0.

5. Conclusion

In this tutorial, we developed an understanding of how to find the consumer lag on a Kafka topic. Additionally, we used that knowledge to create a LagAnalyzer application in spring that shows the lag in almost real-time.

As always, the complete source code for the tutorial is available over on GitHub.

Generic bottom

Get started with Spring 5 and Spring Boot 2, through the Learn Spring course:

>> CHECK OUT THE COURSE
Generic footer banner
Comments are closed on this article!