Course – LSS – NPI (cat=Security/Spring Security)
announcement - icon

I just announced the new Learn Spring Security course, including the full material focused on the new OAuth2 stack in Spring Security:

>> CHECK OUT THE COURSE

1. Introduction

In this tutorial, we’ll cover the basic setup for connecting a Spring Boot client to an Apache Kafka broker using SSL authentication.

Secure Sockets Layer (SSL) has actually been deprecated and replaced with Transport Layer Security (TLS) since 2015. However, for historic reasons, Kafka (and Java) still refer to “SSL” and we’ll be following this convention in this article as well.

2. SSL Overview

By default, Apache Kafka sends all data as clear text and without any authentication.

First of all, we can configure SSL for encryption between the broker and the client. This, by default, requires one-way authentication using public key encryption where the client authenticates the server certificate.

In addition, the server can also authenticate the client using a separate mechanism (such as SSL or SASL), thus enabling two-way authentication or mutual TLS (mTLS). Basically, two-way SSL authentication ensures that the client and the server both use SSL certificates to verify each other’s identities and trust each other in both directions.

In this article, the broker will be using SSL to authenticate the client, and keystore and truststore will be used for holding the certificates and keys.

Each broker requires its own keystore which contains the private key and the public certificate. The client uses its truststore to authenticate this certificate and trust the server. Similarly, each client also requires its own keystore which contains its private key and the public certificate. The server uses its truststore to authenticate and trust the client’s certificate and establish a secure connection.

The truststore can contain a Certificate Authority (CA) which can sign certificates. In this case, the broker or the client trusts any certificate signed by the CA that is present in the truststore. This simplifies the certificate authentication as adding new clients or brokers does not require a change to the truststore.

3. Dependencies and Setup

Our example application will be a simple Spring Boot application.

In order to connect to Kafka, let’s add the spring-kafka dependency in our POM file:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>3.1.2</version>
</dependency>

We’ll also be using a Docker Compose file to configure and test the Kafka server setup. Initially, let’s do this without any SSL configuration:

---
version: '2'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:6.2.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka:
    image: confluentinc/cp-kafka:6.2.0
    depends_on:
      - zookeeper
    ports:
      - 9092:9092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

Now, let’s start the container:

docker-compose up

This should bring up the broker with the default configuration.

4. Broker Configuration

Let’s start by looking at the minimum configuration required for the broker in order to establish secure connections.

4.1. Standalone Broker

Although we’re not using a standalone instance of the broker in this example, it’s useful to know the configuration changes required in order to enable SSL authentication.

First, we need to configure the broker to listen for SSL connections on port 9093, in the server.properties:

listeners=PLAINTEXT://kafka1:9092,SSL://kafka1:9093
advertised.listeners=PLAINTEXT://localhost:9092,SSL://localhost:9093

Next, the keystore and truststore related properties need to be configured with the certificate locations and credentials:

ssl.keystore.location=/certs/kafka.server.keystore.jks
ssl.keystore.password=password
ssl.truststore.location=/certs/kafka.server.truststore.jks
ssl.truststore.password=password
ssl.key.password=password

Finally, the broker must be configured to authenticate clients in order to achieve two-way authentication:

ssl.client.auth=required

4.2. Docker Compose

As we’re using Compose to manage our broker environment, let’s add all of the above properties to our docker-compose.yml file:

kafka:
  image: confluentinc/cp-kafka:6.2.0
  depends_on:
    - zookeeper
  ports:
    - 9092:9092
    - 9093:9093
  environment:
    ...
    KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,SSL://localhost:9093
    KAFKA_SSL_CLIENT_AUTH: 'required'
    KAFKA_SSL_KEYSTORE_FILENAME: '/certs/kafka.server.keystore.jks'
    KAFKA_SSL_KEYSTORE_CREDENTIALS: '/certs/kafka_keystore_credentials'
    KAFKA_SSL_KEY_CREDENTIALS: '/certs/kafka_sslkey_credentials'
    KAFKA_SSL_TRUSTSTORE_FILENAME: '/certs/kafka.server.truststore.jks'
    KAFKA_SSL_TRUSTSTORE_CREDENTIALS: '/certs/kafka_truststore_credentials'
  volumes:
    - ./certs/:/etc/kafka/secrets/certs

Here, we’ve exposed the SSL port (9093) in the ports section of the configuration. Additionally, we’ve mounted the certs project folder in the volumes section of the config. This contains the required certs and the associated credentials.

Now, restarting the stack using Compose shows the relevant SSL details in the broker log:

...
kafka_1      | uid=1000(appuser) gid=1000(appuser) groups=1000(appuser)
kafka_1      | ===> Configuring ...
<strong>kafka_1      | SSL is enabled.</strong>
....
kafka_1      | [2021-08-20 22:45:10,772] INFO KafkaConfig values:
<strong>kafka_1      |  advertised.listeners = PLAINTEXT://localhost:9092,SSL://localhost:9093
kafka_1      |  ssl.client.auth = required</strong>
<strong>kafka_1      |  ssl.enabled.protocols = [TLSv1.2, TLSv1.3]</strong>
kafka_1      |  ssl.endpoint.identification.algorithm = https
kafka_1      |  ssl.key.password = [hidden]
kafka_1      |  ssl.keymanager.algorithm = SunX509
<strong>kafka_1      |  ssl.keystore.location = /etc/kafka/secrets/certs/kafka.server.keystore.jks</strong>
kafka_1      |  ssl.keystore.password = [hidden]
kafka_1      |  ssl.keystore.type = JKS
kafka_1      |  ssl.principal.mapping.rules = DEFAULT
<strong>kafka_1      |  ssl.protocol = TLSv1.3</strong>
kafka_1      |  ssl.trustmanager.algorithm = PKIX
kafka_1      |  ssl.truststore.certificates = null
<strong>kafka_1      |  ssl.truststore.location = /etc/kafka/secrets/certs/kafka.server.truststore.jks</strong>
kafka_1      |  ssl.truststore.password = [hidden]
kafka_1      |  ssl.truststore.type = JKS
....

5. Spring Boot Client

Now that the server setup is complete, we’ll create the required Spring Boot components. These will interact with our broker which now requires SSL for two-way authentication.

5.1. Producer

First, let’s send a message to the specified topic using KafkaTemplate:

public class KafkaProducer {

    private final KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage(String message, String topic) {
        log.info("Producing message: {}", message);
        kafkaTemplate.send(topic, "key", message)
          .addCallback(
            result -> log.info("Message sent to topic: {}", message),
            ex -> log.error("Failed to send message", ex)
          );
    }
}

The send method is an async operation. Therefore, we’ve attached a simple callback that just logs some information once the broker receives the message.

5.2. Consumer

Next, let’s create a simple consumer using @KafkaListener.  This connects to the broker and consumes messages from the same topic as that used by the producer:

public class KafkaConsumer {

    public static final String TOPIC = "test-topic";

    public final List<String> messages = new ArrayList<>();

    @KafkaListener(topics = TOPIC)
    public void receive(ConsumerRecord<String, String> consumerRecord) {
        log.info("Received payload: '{}'", consumerRecord.toString());
        messages.add(consumerRecord.value());
    }
}

In our demo application, we’ve kept things simple and the consumer simply stores the messages in a List. In an actual real-world system, the consumer receives the messages and processes them according to the application’s business logic.

5.3. Configuration

Finally, let’s add the necessary configuration to our application.yml:

spring:
  kafka:
    security:
      protocol: "SSL"
    bootstrap-servers: localhost:9093
    ssl:
      trust-store-location: classpath:/client-certs/kafka.client.truststore.jks
      trust-store-password: <password>
      key-store-location:  classpath:/client-certs/kafka.client.keystore.jks
      key-store-password: <password>
    
    # additional config for producer/consumer 

Here, we’ve set the required properties provided by Spring Boot to configure the producer and consumer. As both of these components are connecting to the same broker, we can declare all the essential properties under spring.kafka section. However, if the producer and consumer were connecting to different brokers, we would specify these under spring.kafka.producer and spring.kafka.consumer sections, respectively.

In the ssl section of the configuration, we point to the JKS truststore in order to authenticate the Kafka broker. This contains the certificate of the CA which has also signed the broker certificate. In addition, we’ve also provided the path for the Spring client keystore which contains the certificate signed by the CA that should be present in the truststore on the broker side.

5.4. Testing

As we’re using a Compose file, let’s use the Testcontainers framework to create an end-to-end test with our Producer and Consumer:

@ActiveProfiles("ssl")
@Testcontainers
@SpringBootTest(classes = KafkaSslApplication.class)
class KafkaSslApplicationLiveTest {

    private static final String KAFKA_SERVICE = "kafka";
    private static final int SSL_PORT = 9093;  

    @Container
    public DockerComposeContainer<?> container =
      new DockerComposeContainer<>(KAFKA_COMPOSE_FILE)
        .withExposedService(KAFKA_SERVICE, SSL_PORT, Wait.forListeningPort());

    @Autowired
    private KafkaProducer kafkaProducer;

    @Autowired
    private KafkaConsumer kafkaConsumer;

    @Test
    void givenSslIsConfigured_whenProducerSendsMessageOverSsl_thenConsumerReceivesOverSsl() {
        String message = generateSampleMessage();
        kafkaProducer.sendMessage(message, TOPIC);

        await().atMost(Duration.ofMinutes(2))
          .untilAsserted(() -> assertThat(kafkaConsumer.messages).containsExactly(message));
    }

    private static String generateSampleMessage() {
        return UUID.randomUUID().toString();
    }
}

When we run the test, Testcontainers starts the Kafka broker using our Compose file, including the SSL configuration. The application also starts with its SSL configuration and connects to the broker over an encrypted and authenticated connection. As this is an asynchronous sequence of events, we’ve used Awaitlity to poll for the expected message in the consumer message store. This verifies all the configuration and the successful two-way authentication between the broker and the client.

6. Conclusion

In this article, we’ve covered the basics of the SSL authentication setup required between the Kafka broker and a Spring Boot client.

Initially, we looked at the broker setup required to enable two-way authentication. Then, we looked at the configuration required on the client-side in order to connect to the broker over an encrypted and authenticated connection. Finally, we used an integration test to verify the secure connection between the broker and the client.

As always, the full source code is available over on GitHub.

Course – LSS (cat=Security/Spring Security)

I just announced the new Learn Spring Security course, including the full material focused on the new OAuth2 stack in Spring Security:

>> CHECK OUT THE COURSE
res – Security (video) (cat=Security/Spring Security)
Comments are closed on this article!