1. Introduction

It seems useless to advertise Kafka’s capabilities anymore. If we need a message queue and our setup is more than a couple of applications – Kafka is the industry go-to solution.

Let’s see how we can use Apache Kafka for sending and receiving messages with Kotlin. One way of doing it is to use Spring. It works in a similar way in Kotlin projects, and the usage of simple Kafka producers and consumers, as well as Kafka Streams, has already been covered on our site for Java.

In this tutorial, let’s consider using only Apache Kafka client libraries.

2. Apache Kafka Test Setup

First of all, we need a setup to run our experiments on. Deploying and supporting a Kafka installation is quite hard. Thankfully, there exists a Testcontainer module for Kafka. That means we can launch a working Kafka installation in a Docker container with a single line of code. Let’s create a Gradle project for our experiments and add the necessary dependencies to our build.gradle.kts:

val testcontainersVersion = "1.19.3"
dependencies {
    testImplementation("org.testcontainers:testcontainers:$testcontainersVersion")
    testImplementation("org.testcontainers:junit-jupiter:$testcontainersVersion")
    testImplementation("org.testcontainers:kafka:$testcontainersVersion")
}

Then we just need to annotate our test class with @Testcontainers and then add @Container to the field that holds our Kafka container variable, created from the Confluent Kafka Docker image:

@Testcontainers
class AppTest {
    companion object {
        @JvmStatic
        @Container
        val kafka = KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"))
    }
}

We also need a @JvmStatic annotation on this field to be able to share the Kafka instance among all Kafka tests. After this, we have access to the kafka Kafka instance. The downside of this way is a rather long start of the recommended CP Kafka container. On the other hand, it provides better portability and only requires a Docker on the user’s machine.

We noticed that sometimes docker start runs over the default Testcontainer timeout, and the tests may fail:

Container startup failed
org.testcontainers.containers.ContainerLaunchException: Container startup failed
	at app//org.testcontainers.containers.GenericContainer.doStart(GenericContainer.java:349)
	at app//org.testcontainers.containers.GenericContainer.start(GenericContainer.java:322)

In that case, it may be necessary to restart the test case.

The alternative is to run a docker-compose.yml with a Kafka + Zookeeper instance and connect to the address it exposes. In our example, we expose the traditional address localhost:9092.

3. Kafka Clients

Let’s add the basic Kafka library, which is necessary for sending messages to topics and receiving them:

val kafkaApiVersion = "3.3.1"
dependencies {
    implementation("org.apache.kafka:kafka-clients:$kafkaApiVersion")
    // Other dependencies
}

Kafka clients library is the only library that is absolutely necessary to connect to Kafka. Everything else is decided by the business needs of our application.

3.1. Kafka Producer

Kafka clients require a set of properties as an argument for their constructors. For the producer, the minimal set of properties, assuming no authentication, is this:

val producerProps = mapOf<String, String>(
    "bootstrap.servers" to kafka.bootstrapServers,
    "key.serializer" to "org.apache.kafka.common.serialization.StringSerializer",
    "value.serializer" to "org.apache.kafka.common.serialization.ByteArraySerializer",
    "security.protocol" to "PLAINTEXT"
)

Once we have them, we can create our producer:

KafkaProducer<String, ByteArray>(producerProps)

Basically, these properties tell our producer where to find its Kafka broker, how to authenticate itself to the broker, and how to process the keys and values of our messages into bytes. This last one is needed because Kafka is protocol-agnostic and only operates with bytes and not any specific format, be it JSON, protobuf, or something else.

Sending messages to Kafka is simple. The send(record: ProducerRecord<K, V>) almost immediately exists the call and returns a Future, while send(record: ProducerRecord<K, V>, callback: Callback) allows us to check if there were any problems while sending the message without blocking the thread:

suspend fun <K, V> Producer<K, V>.asyncSend(record: ProducerRecord<K, V>) =
    suspendCoroutine<RecordMetadata> { continuation ->
        send(record) { metadata, exception ->
            exception?.let(continuation::resumeWithException)
                ?: continuation.resume(metadata)
        }
    }

producer.use {
    it.send(ProducerRecord("test", "1", "Hello, world!".encodeToByteArray())) // Or asyncSend
}

Let’s not forget that KafkaProducer is Closeable and requires a close() when we’re finished.

3.2. Kafka Consumer

Now that we have a producer, we can start to consume messages on the other end. Let’s create a consumer with a similar set of properties:

val consumerProps = 
    mapOf(
        "bootstrap.servers" to KAFKA_BOOTSTRAP_SERVER,
        "auto.offset.reset" to "earliest",
        "key.deserializer" to "org.apache.kafka.common.serialization.StringDeserializer",
        "value.deserializer" to "org.apache.kafka.common.serialization.ByteArrayDeserializer",
        "group.id" to "someGroup",
        "security.protocol" to "PLAINTEXT"
    )

The key and value serializers have an obvious role. The intricacies of Kafka consumer groups have no Kotlin specific, as well as auto.offset.reset property. We will use the earliest value for that property in order to keep the test in a single thread:

tailrec fun <T> repeatUntilSome(block: () -> T?): T = block() ?: repeatUntilSome(block)

KafkaConsumer<String, ByteArray>(consumerProps("baeldung-simple-test")).use {
    it.subscribe(listOf("test"))
    val message = repeatUntilSome {
        it.poll(400.milliseconds.toJavaDuration()).map { String(it.value()) }.firstOrNull()
    }
    // Do something with the message
}

A Kafka consumer also needs to be closed in the end. Usually, the poll() method is called in a loop until the application is alive, and each time it returns an empty list or a list of several ConsumerRecords.

4. Kafka Streams

Kafka Streams is a library that allows us to operate on a higher level of abstraction than sending to and reading from a topic. Since operating on a set of topics reliably and properly synchronizing offset commits is a hard task, Kafka Streams API abstracts that away. First, let’s add the necessary dependency:

implementation("org.apache.kafka:kafka-streams:$kafkaApiVersion")
testImplementation("org.apache.kafka:kafka-streams-test-utils:$kafkaApiVersion")

Then, for the sake of the test, we are going to create the topics that we will be using:

fun createTopics(allProps: Map<String, Any>, topicConfig: TopicConfig) = AdminClient.create(allProps).use {
    it.createTopics(
        listOf(
            NewTopic(topicConfig.inStream1, 1, 1),
            NewTopic(topicConfig.inStream2, 1, 1),
            NewTopic(topicConfig.outStream, 1, 1)
        )
    )
}

In a production setting, the topics would already be there. The properties for creating a StreamsBuilder are fairly straightforward:

val streamConfig = mapOf<String, Any>(
    StreamsConfig.APPLICATION_ID_CONFIG to "streams-example",
    StreamsConfig.CLIENT_ID_CONFIG to "streams-example-client",
    StreamsConfig.COMMIT_INTERVAL_MS_CONFIG to 10L,
    StreamsConfig.POLL_MS_CONFIG to 10L,
    StreamsConfig.REPARTITION_PURGE_INTERVAL_MS_CONFIG to 300L,
    StreamsConfig.REPARTITION_PURGE_INTERVAL_MS_CONFIG to 500L,
    StreamsConfig.BOOTSTRAP_SERVERS_CONFIG to KAFKA_BOOTSTRAP_SERVER,
    StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG to Serdes.String().javaClass.name,
    StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG to Serdes.ByteArray().javaClass.name
)

The properties containing *_MS_* in their names, however, should have other values in production. The values in the example are there to speed up the processing so that the tests will run faster.

Next, we have to establish the relationship between the topics:

fun getTopology(topicConfig: TopicConfig): Topology = StreamsBuilder().apply {
    val (inStream1, inStream2, outStream) = topicConfig
    stream(inStream1, Consumed.with(Serdes.String(), Serdes.ByteArray())).join(
        stream(inStream2, Consumed.with(Serdes.String(), Serdes.ByteArray())),
        { name, num -> "${String(name)} ${String(num)}".encodeToByteArray() },
        JoinWindows.ofTimeDifferenceAndGrace(Duration.ofSeconds(3), Duration.ofSeconds(1))
    ).to(outStream)
}.build()

After the topology is created, we are ready to start our streams. Now, if we post some values into inStream1 and inStream2 with the same keys, we will receive their concatenated value out of the outStream:

private fun retrieveResultsFromOutputStream(topicConfig: TopicConfig): MutableList<String> {
    val results: MutableList<String> = mutableListOf()
    consumer<Int, ByteArray>(consumerProps("baeldung-streams")).use {
        it.subscribe(listOf(topicConfig.outStream))
        while (results.size < 4) {
            readAvailable(it).let(results::addAll)
        }
    }
    return results
}

KafkaStreams(getTopology(topicConfig), StreamsConfig(streamConfig)).use {
    it.cleanUp()
    it.start()

    populateData(topicConfig)

    val results: MutableList<String> = retrieveResultsFromOutputStream(topicConfig)
    assertEquals(EXPECTED_MOVIES, results)
}

As KafkaProducer and KafkaConsumer, KafkaStreams also is a resource and needs to be closed with close() – or used with a Kotlin scope function use {}.

4.1. Kafka Streams Testing

Above we demonstrated that Kotlin code is indeed capable of using Kafka Streams. However, in a real project, we would prefer tests that are lighter and execute faster. Thankfully, Kafka provides a special TopologyTestDriver class so that we can ensure that our transformations work as we expect. Let’s create one for the topology we described in the previous chapter:

val testDriverConfig = mapOf<String, String>(
    StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG to Serdes.String().javaClass.name,
    StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG to Serdes.ByteArray().javaClass.name
).toProperties()
val testDriver = TopologyTestDriver(getTopology(topicConfig), testDriverConfig)

Then we can pipe some data into input topics:

private fun TopologyTestDriver.createStandardTestTopic(topicName: String): TestInputTopic<String, ByteArray> =
    createInputTopic(topicName, Serdes.String().serializer(), Serdes.ByteArray().serializer())
val inStream1 = testDriver.createStandardTestTopic(topicConfig.inStream1)
val inStream2 = testDriver.createStandardTestTopic(topicConfig.inStream2)
testData().forEachIndexed { i, (sequelNumber, franchiseName) ->
    inStream1.pipeInput(TestRecord((i + 1).toString(), franchiseName))
    inStream2.pipeInput(TestRecord((i + 1).toString(), sequelNumber))
}

And then it’s easy to see what comes on the other end:

testDriver.createOutputTopic(
    topicConfig.outStream,
    Serdes.String().deserializer(),
    Serdes.ByteArray().deserializer()
).readValuesToList().map(::String)
    .let { assertEquals(EXPECTED_MOVIES, it) }

In that case, there is no connection to a real Kafka instance, no network interaction, and therefore we can check the stream logic separately from other concerns.

5. Conclusion

In this article, we looked at how we can work with Apache Kafka using pure Kotlin. As it turns out, there’s little difference from the Java way, except that the syntax is much briefer. To create a KafkaProducer and a KafkaConsumer, we have to decide on the protocol our messages would follow. KafkaStreams are also easy to use right out of the box, even without Spring bootstrapping. They also provide a framework to make topology testing easy and lightweight.

As usual, all the code is in our repository over on GitHub.

Comments are open for 30 days after publishing a post. For any issues past this date, use the Contact form on the site.