Generic Top

I just announced the new Learn Spring course, focused on the fundamentals of Spring 5 and Spring Boot 2:

>> CHECK OUT THE COURSE

1. Overview

Kafka is a message processing system built around a distributed messaging queue. It provides a Java library so that applications can write data to, or read data from, a Kafka topic.

Now, since most of the business domain logic is validated through unit tests, applications generally mock all I/O operations in JUnit. Kafka also provides a MockProducer to mock a producer application.

In this tutorial, we'll first implement a Kafka producer application. Later, we'll implement a unit test to verify common producer operations with MockProducer.

2. Maven Dependencies

Before we implement a producer application, we'll add a Maven dependency for kafka-clients:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.5.0</version>
</dependency>

3. MockProducer

The kafka-clients library contains a Java library for publishing and consuming messages in Kafka. Producer applications can use these API's to send key-value records to a Kafka topic:

public class KafkaProducer {

    private final Producer<String, String> producer;

    public KafkaProducer(Producer<String, String> producer) {
        this.producer = producer;
    }

    public Future<RecordMetadata> send(String key, String value) {
        ProducerRecord record = new ProducerRecord("topic_sports_news", key, value);
        return producer.send(record);
    }
}

Any Kafka producer must implement the Producer interface in the client's library. Kafka also provides a KafkaProducer class, which is a concrete implementation that performs the I/O operations towards a Kafka broker.

Furthermore, Kafka provides a MockProducer that implements the same Producer interface and mocks all I/O operations implemented in the KafkaProducer:

@Test
void givenKeyValue_whenSend_thenVerifyHistory() {

    MockProducer mockProducer = new MockProducer<>(true, new StringSerializer(), new StringSerializer());

    kafkaProducer = new KafkaProducer(mockProducer);
    Future<RecordMetadata> recordMetadataFuture = kafkaProducer.send("soccer", 
      "{\"site\" : \"baeldung\"}");

    assertTrue(mockProducer.history().size() == 1);
}

Although such I/O operations can also be mocked with Mockito, MockProducer gives us access to a lot of features that we would need to implement on top of our mock. One such feature is the history() method. MockProducer caches the records for which send() is called, thereby allowing us to validate the publish behavior of the producer.

Moreover, we can also validate the metadata like topic name, partition, record key, or value:

assertTrue(mockProducer.history().get(0).key().equalsIgnoreCase("data"));
assertTrue(recordMetadataFuture.get().partition() == 0);

4. Mocking a Kafka Cluster

In our mocked tests so far, we've assumed a topic with just one partition. However, for achieving maximum concurrency between producer and consumer threads, Kafka topics are usually split into multiple partitions.

This allows producers to write data into multiple partitions. This is usually achieved by partitioning the records based on key and mapping specific keys to a particular partition:

public class EvenOddPartitioner extends DefaultPartitioner {

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, 
      byte[] valueBytes, Cluster cluster) {
        if (((String)key).length() % 2 == 0) {
            return 0;
        }
        return 1;
    }
}

Because of this, all even-length keys will be published to partition “0” and, likewise, odd-length keys to partition “1”.

MockProducer enables us to validate such partition assignment algorithms by mocking the Kafka cluster with multiple partitions:

@Test
void givenKeyValue_whenSendWithPartitioning_thenVerifyPartitionNumber() 
  throws ExecutionException, InterruptedException {
    PartitionInfo partitionInfo0 = new PartitionInfo(TOPIC_NAME, 0, null, null, null);
    PartitionInfo partitionInfo1 = new PartitionInfo(TOPIC_NAME, 1, null, null, null);
    List<PartitionInfo> list = new ArrayList<>();
    list.add(partitionInfo0);
    list.add(partitionInfo1);

    Cluster cluster = new Cluster("kafkab", new ArrayList<Node>(), list, emptySet(), emptySet());
    this.mockProducer = new MockProducer<>(cluster, true, new EvenOddPartitioner(), 
      new StringSerializer(), new StringSerializer());

    kafkaProducer = new KafkaProducer(mockProducer);
    Future<RecordMetadata> recordMetadataFuture = kafkaProducer.send("partition", 
      "{\"site\" : \"baeldung\"}");

    assertTrue(recordMetadataFuture.get().partition() == 1);
}

We mocked a Cluster with two partitions, 0 and 1. We can then verify that EvenOddPartitioner publishes the record to partition 1.

5. Mocking Errors with MockProducer

So far, we've only mocked the producer to send a record to a Kafka topic successfully. But, what happens if there's an exception when writing a record?

Applications usually handle such exceptions by retrying or throwing the exception to the client.

MockProducer allows us to mock exceptions during send() so that we can validate the exception-handling code:

@Test
void givenKeyValue_whenSend_thenReturnException() {
    MockProducer<String, String> mockProducer = new MockProducer<>(false, 
      new StringSerializer(), new StringSerializer())

    kafkaProducer = new KafkaProducer(mockProducer);
    Future<RecordMetadata> record = kafkaProducer.send("site", "{\"site\" : \"baeldung\"}");
    RuntimeException e = new RuntimeException();
    mockProducer.errorNext(e);

    try {
        record.get();
    } catch (ExecutionException | InterruptedException ex) {
        assertEquals(e, ex.getCause());
    }
    assertTrue(record.isDone());
}

There are two notable things in this code.

First, we called the MockProducer constructor with autoComplete as false. This tells the MockProducer to wait for input before completing the send() method.

Second, we'll call mockProducer.errorNext(e), so that MockProducer returns an exception for the last send() call.

6. Mocking Transactional Writes with MockProducer

Kafka 0.11 introduced transactions between Kafka brokers, producers, and consumers. This allowed the end-to-end Exactly-Once message delivery semantic in Kafka. In short, this means that transactional producers can only publish records to a broker with a two-phase commit protocol.

MockProducer also supports transactional writes and allows us to verify this behavior:

@Test
void givenKeyValue_whenSendWithTxn_thenSendOnlyOnTxnCommit() {
    MockProducer<String, String> mockProducer = new MockProducer<>(true, 
      new StringSerializer(), new StringSerializer())

    kafkaProducer = new KafkaProducer(mockProducer);
    kafkaProducer.initTransaction();
    kafkaProducer.beginTransaction();
    Future<RecordMetadata> record = kafkaProducer.send("data", "{\"site\" : \"baeldung\"}");

    assertTrue(mockProducer.history().isEmpty());
    kafkaProducer.commitTransaction();
    assertTrue(mockProducer.history().size() == 1);
}

Since MockProducer also supports the same APIs as the concrete KafkaProducer, it only updates the history once we commit the transaction. Such mocking behavior can help applications validate that commitTransaction() is invoked for every transaction.

7. Conclusion

In this article, we looked at the MockProducer class of the kafka-client library. We discussed that MockProducer implements the same hierarchy as the concrete KafkaProducer and, therefore, we can mock all I/O operations with a Kafka broker.

We also discussed some complex mocking scenarios and were able to test exceptions, partitioning, and transactions with the MockProducer.

As always, all code examples are available over on GitHub.

Generic bottom

I just announced the new Learn Spring course, focused on the fundamentals of Spring 5 and Spring Boot 2:

>> CHECK OUT THE COURSE
Comments are closed on this article!