1. Introduction
In this article, we’ll learn how to implement the SASL/PLAIN authentication mechanism in a Kafka service. We’ll also implement client-side authentication using the support provided by Spring Kafka.
Kafka supports multiple authentication options, providing enhanced security and compatibility. This includes SASL, SSL, and delegated token authentication.
Simple Authentication and Security Layer (SASL) is an authentication framework that allows other authentication mechanisms such as GSSAPI, OAuthBearer, SCRAM, and PLAIN, to be easily integrated.
SASL/PLAIN authentication is not secure! This is because user credentials are exposed over the network as plaintext. However, it’s still useful for local development due to fewer configuration requirements.
We should note that SASL/PLAIN authentication should not be used in production environments unless it’s used in conjunction with SSL/TLS. When SSL is combined with SASL/PLAIN authentication, referred to as SASL-SSL in Kafka, it encrypts traffic, including sensitive credentials between the client and server.
2. Implement Kafka With SASL/PLAIN Authentication
Let’s imagine we need to build a Kafka service that supports SASL/PLAIN authentication in a Docker environment.
For that, we’ll utilize JAAS configuration to add the user credentials required by the SASL/PLAIN.
To configure user credentials in Kafka, we’ll use the PlainLoginModule security implementation.
Let’s include a kafka_server_jaas.conf file to configure the admin and user1 credentials:
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret"
user_admin="admin-secret"
user_user1="user1-secret";
};
In the above code, we define the admin and user1 users, to be used for both Kafka’s inter-broker and external client authentication respectively. The user1 is defined as a one-liner in this format user_<username> along with the secret.
As we’ve included the client user credentials in the Kafka service, we’ll also secure the Zookeeper service with the SASL/PLAIN authentication. It’s also good practice to secure the Zookeeper service.
Let’s include a zookeeper_jaas.conf file to configure the zookeeper user credentials:
Server {
org.apache.zookeeper.server.auth.DigestLoginModule required
username="zookeeper"
password="zookeeper-secret"
user_zookeeper="zookeeper-secret";
};
In the above configuration, we’re using the Zookeeper-specific security implementation DigestLoginModule instead of Kafka’s PlainLoginModule for improved compatibility.
Additionally, we’ll include the zookeeper credentials in the previously created kafka_server_jaas.conf file:
Client {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="zookeeper"
password="zookeeper-secret";
};
The above Client credentials are used by the Kafka service to authenticate with the Zookeeper service.
2.3. Setup Kafka Service With Zookeeper
We can set up our Kafka and Zookeeper services using a Docker Compose file.
First, we’ll implement a Zookeeper service and include the zookeeper_jaas.conf file:
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.6.6
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
KAFKA_OPTS: "-Djava.security.auth.login.config=/etc/kafka/zookeeper_jaas.conf"
volumes:
- ./config/zookeeper_jaas.conf:/etc/kafka/zookeeper_jaas.conf
ports:
- 2181
Next, we’ll implement a Kafka service with the SASL/PLAIN authentication:
kafka:
image: confluentinc/cp-kafka:7.6.6
depends_on:
- zookeeper
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENERS: SASL_PLAINTEXT://0.0.0.0:9092
KAFKA_ADVERTISED_LISTENERS: SASL_PLAINTEXT://localhost:9092
KAFKA_INTER_BROKER_LISTENER_NAME: SASL_PLAINTEXT
KAFKA_SASL_ENABLED_MECHANISMS: PLAIN
KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: PLAIN
KAFKA_OPTS: "-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
volumes:
- ./config/kafka_server_jaas.conf:/etc/kafka/kafka_server_jaas.conf
ports:
- "9092:9092"
In the above code, we’ve included the previously created kafka_server_jaas.conf file to set up the SASL/PLAIN users.
We should note that the KAFKA_ADVERTISED_LISTENERS property is the endpoint that the Kafka client will send messages and listen to.
Finally, we’ll run the entire Docker setup using the docker compose command:
docker compose up --build
We’ll get similar logs in the Docker console:
kafka-1 | [2025-06-19 14:32:00,441] INFO Session establishment complete on server zookeeper/172.18.0.2:2181, session id = 0x10000004c150001, negotiated timeout = 18000 (org.apache.zookeeper.ClientCnxn)
kafka-1 | [2025-06-19 14:32:00,445] INFO [ZooKeeperClient Kafka server] Connected. (kafka.zookeeper.ZooKeeperClient)
zookeeper-1 | [2025-06-19 14:32:00,461] INFO Successfully authenticated client: authenticationID=zookeeper; authorizationID=zookeeper. (org.apache.zookeeper.server.auth.SaslServerCallbackHandler)
We confirm that the Kafka and Zookeeper services are integrated without any errors.
3. Implement Kafka Client With Spring
We’ll implement the producer and consumer services using the Spring Kafka implementation.
3.1. Maven Dependencies
First, we’ll include the Spring Kafka dependency:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>3.1.2</version>
</dependency>
Next, we’ll implement a producer service to send messages.
3.2. Kafka Producer
Let’s implement a Kafka producer service using the KafkaTemplate class:
public void sendMessage(String message, String topic) {
LOGGER.info("Producing message: {}", message);
kafkaTemplate.send(topic, "key", message)
.whenComplete((result, ex) -> {
if (ex == null) {
LOGGER.info("Message sent to topic: {}", message);
} else {
LOGGER.error("Failed to send message", ex);
}
});
}
In the above code, we’re sending a message using the send method of KafkaTemplate.
3.3. Kafka Consumer
We’ll use Spring Kafka’s KafkaListener and ConsumerRecord classes to implement the consumer service.
Let’s implement a consumer method with the @KafkaListener annotation:
@KafkaListener(topics = TOPIC)
public void receive(ConsumerRecord<String, String> consumerRecord) {
LOGGER.info("Received payload: '{}'", consumerRecord.toString());
messages.add(consumerRecord.value());
}
In the above code, we receive a message and add it to the messages list.
Next, we’ll create an application.yml file and include a few Spring Kafka-related properties:
spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: test-group
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
Now, let’s run the application and verify the setup:
kafka-1 | [2025-06-19 14:38:33,188] INFO [SocketServer listenerType=ZK_BROKER, nodeId=1001] Failed authentication with /192.168.65.1 (Unexpected Kafka request of type METADATA during SASL handshake.) (org.apache.kafka.common.network.Selector)
As expected, the client application is unable to authenticate with the Kafka server.
To resolve the above error, we’ll use the spring.kafka.properties configuration to provide the SASL/PLAIN settings.
Now, we’ll include a few additional configurations related to the user1 credentials and set the sasl.mechanism property to PLAIN:
spring:
kafka:
bootstrap-servers: localhost:9092
properties:
sasl.mechanism: PLAIN
sasl.jaas.config: >
org.apache.kafka.common.security.plain.PlainLoginModule required
username="user1"
password="user1-secret";
security:
protocol: "SASL_PLAINTEXT"
In the above code, we’ve included the matching username and password as part of the sasl.jaas.config property.
Sometimes, we can encounter common errors due to missing or incorrect SASL configuration. For example, we’ll get the below error if the sasl.mechanism property is PLAINTEXT instead of PLAIN:
Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.errors.SaslAuthenticationException: Failed to configure SaslClientAuthenticator
... 25 common frames omitted
Caused by: org.apache.kafka.common.errors.SaslAuthenticationException: Failed to create SaslClient with mechanism PLAINTEXT
We’ll get a different error when the sasl.mechanism property is incorrectly named as security.mechanism:
Caused by: java.lang.IllegalArgumentException: No serviceName defined in either JAAS or Kafka config
Let’s verify the Kafka application with the entire setup.
4. Testing
We’ll use the Testcontainers framework to test the Kafka client application.
First, we’ll create a DockerComposeContainer object using the docker-compose.yml:
@Container
public DockerComposeContainer<?> container =
new DockerComposeContainer<>("src/test/resources/sasl-plaintext/docker-compose.yml")
.withExposedService("kafka", "9092", Wait.forListeningPort());
Next, let’s implement a test method to validate the consumer:
@Test
void givenSaslIsConfigured_whenProducerSendsMessageOverSasl_thenConsumerReceivesOverSasl() {
String message = UUID.randomUUID().toString();
kafkaProducer.sendMessage(message, "test-topic");
await().atMost(Duration.ofMinutes(2))
.untilAsserted(() -> assertThat(kafkaConsumer.messages).containsExactly(message));
}
Finally, we’ll run the test case and verify the output:
16:56:44.525 [kafka-producer-network-thread | producer-1] INFO c.b.saslplaintext.KafkaProducer - Message sent to topic: 82e8a804-0269-40a2-b8ed-c509e6951011
16:56:48.566 INFO c.b.saslplaintext.KafkaConsumer - Received payload: ConsumerRecord(topic = test-topic, ... key = key, value = 82e8a804-0269-40a2-b8ed-c509e6951011
From the above logs, we can see that the consumer service has successfully received the message.
5. Conclusion
In this tutorial, we’ve learned how to set up SASL/PLAIN authentication in a Kafka service using JAAS config in a Docker environment.
We’ve also implemented producer/consumer services and configured the authentication using a similar JAAS config. Finally, we tested the entire setup by sending and receiving a message using a Docker TestContainer.
The code backing this article is available on GitHub. Once you're
logged in as a Baeldung Pro Member, start learning and coding on the project.