Expand Authors Top

If you have a few years of experience in the Java ecosystem and you’d like to share that with the community, have a look at our Contribution Guidelines.

November Discount Launch 2022 – Top
We’re finally running a Black Friday launch. All Courses are 30% off until next Friday:

>> GET ACCESS NOW

Expanded Audience – Frontegg – Security (partner)
announcement - icon User management is very complex, when implemented properly. No surprise here.

Not having to roll all of that out manually, but instead integrating a mature, fully-fledged solution - yeah, that makes a lot of sense.
That's basically what Frontegg is - User Management for your application. It's focused on making your app scalable, secure and enjoyable for your users.
From signup to authentication, it supports simple scenarios all the way to complex and custom application logic.

Have a look:

>> Elegant User Management, Tailor-made for B2B SaaS

NPI – Lightrun – Spring (partner)

We rely on other people’s code in our own work. Every day. It might be the language you’re writing in, the framework you’re building on, or some esoteric piece of software that does one thing so well you never found the need to implement it yourself.

The problem is, of course, when things fall apart in production - debugging the implementation of a 3rd party library you have no intimate knowledge of is, to say the least, tricky. It’s difficult to understand what talks to what and, specifically, which part of the underlying library is at fault.

Lightrun is a new kind of debugger.

It's one geared specifically towards real-life production environments. Using Lightrun, you can drill down into running applications, including 3rd party dependencies, with real-time logs, snapshots, and metrics. No hotfixes, redeployments, or restarts required.

Learn more in this quick, 5-minute Lightrun tutorial:

>> The Essential List of Spring Boot Annotations and Their Use Cases

1. Introduction

During the transmission of messages in Apache Kafka, the client and server agree on the use of a common syntactic format. Apache Kafka brings default converters (such as String and Long) but also supports custom serializers for specific use cases. In this tutorial, we'll see how to implement them.

2. Serializers in Apache Kafka

Serialization is the process of converting objects into bytes. Deserialization is the inverse process — converting a stream of bytes into an object. In a nutshell, it transforms the content into readable and interpretable information.

As we mentioned, Apache Kafka provides default serializers for several basic types, and it allows us to implement custom serializers:

 

kafka1

The figure above shows the process of sending messages to a Kafka topic through the network. In this process, the custom serializer converts the object into bytes before the producer sends the message to the topic. Similarly, it also shows how the deserializer transforms back the bytes into the object for the consumer to properly process it.

2.1. Custom Serializers

Apache Kafka provides a pre-built serializer and deserializer for several basic types:

But it also offers the capability to implement custom (de)serializers. In order to serialize our own objects, we'll implement the Serializer interface. Similarly, to create a custom deserializer, we'll implement the Deserializer interface.

There are there methods available to override for both interfaces:

  • configure: used to implement configuration details
  • serialize/deserialize: These methods include the actual implementation of our custom serialization and deserialization.
  • close: use this method to close the Kafka session

3. Implementing Custom Serializers in Apache Kafka

Apache Kafka provides the capability of customizing the serializers. It's possible to implement specific converters not only for the message value but also for the key.

3.1. Dependencies

To implement the examples, we'll simply add the Kafka Consumer API dependency to our pom.xml:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.8.0</version>
</dependency>

3.2. Custom Serializer

First, we'll use Lombok to specify the custom object to send through Kafka:

@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class MessageDto {
    private String message;
    private String version;
}

Next, we'll implement the Serializer interface provided by Kafka for the producer to send the messages:

public class CustomSerializer implements Serializer<MessageDto> {
    private final ObjectMapper objectMapper = new ObjectMapper();

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
    }

    @Override
    public byte[] serialize(String topic, MessageDto data) {
        try {
            if (data == null){
                System.out.println("Null received at serializing");
                return null;
            }
            System.out.println("Serializing...");
            return objectMapper.writeValueAsBytes(data);
        } catch (Exception e) {
            throw new SerializationException("Error when serializing MessageDto to byte[]");
        }
    }

    @Override
    public void close() {
    }
}

We'll override the serialize method of the interface. Therefore, in our implementation, we'll transform the custom object using a Jackson ObjectMapper. Then we'll return the stream of bytes to properly send the message to the network.

3.3. Custom Deserializer

In the same way, we'll implement the Deserializer interface for the consumer:

@Slf4j
public class CustomDeserializer implements Deserializer<MessageDto> {
    private ObjectMapper objectMapper = new ObjectMapper();

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
    }

    @Override
    public MessageDto deserialize(String topic, byte[] data) {
        try {
            if (data == null){
                System.out.println("Null received at deserializing");
                return null;
            }
            System.out.println("Deserializing...");
            return objectMapper.readValue(new String(data, "UTF-8"), MessageDto.class);
        } catch (Exception e) {
            throw new SerializationException("Error when deserializing byte[] to MessageDto");
        }
    }

    @Override
    public void close() {
    }
}

As in the previous section, we'll override the deserialize method of the interface. Consequently, we'll convert the stream of bytes into the custom object using the same Jackson ObjectMapper.

3.4. Consuming an Example Message

Let's see a working example sending and receiving an example message with the custom serializer and deserializer.

Firstly, we'll create and configure the Kafka Producer:

private static KafkaProducer<String, MessageDto> createKafkaProducer() {
    Properties props = new Properties();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
    props.put(ProducerConfig.CLIENT_ID_CONFIG, CONSUMER_APP_ID);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "com.baeldung.kafka.serdes.CustomSerializer");

    return new KafkaProducer(props);
}

We'll configure the value serializer property with our custom class and the key serializer with the default StringSerializer.

Secondly, we'll create the Kafka Consumer:

private static KafkaConsumer<String, MessageDto> createKafkaConsumer() {
    Properties props = new Properties();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
    props.put(ConsumerConfig.CLIENT_ID_CONFIG, CONSUMER_APP_ID);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_ID);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "com.baeldung.kafka.serdes.CustomDeserializer");

    return new KafkaConsumer<>(props);
}

Besides the key and value deserializers with our custom class, it is mandatory to include the group id. Apart from that, we put the auto offset reset config to earliest in order to make sure the producer sent all messages before the consumer starts.

Once we've created the producer and consumer clients, it's time to send an example message:

MessageDto msgProd = MessageDto.builder().message("test").version("1.0").build();

KafkaProducer<String, MessageDto> producer = createKafkaProducer();
producer.send(new ProducerRecord<String, MessageDto>(TOPIC, "1", msgProd));
System.out.println("Message sent " + msgProd);
producer.close();

And we can receive the message with the consumer by subscribing to the topic:

AtomicReference<MessageDto> msgCons = new AtomicReference<>();

KafkaConsumer<String, MessageDto> consumer = createKafkaConsumer();
consumer.subscribe(Arrays.asList(TOPIC));

ConsumerRecords<String, MessageDto> records = consumer.poll(Duration.ofSeconds(1));
records.forEach(record -> {
    msgCons.set(record.value());
    System.out.println("Message received " + record.value());
});

consumer.close();

The result in the console is:

Serializing...
Message sent MessageDto(message=test, version=1.0)
Deserializing...
Message received MessageDto(message=test, version=1.0)

4. Conclusion

In this tutorial, we showed how producers use serializers in Apache Kafka to send messages through the network. In the same way, we also showed how consumers use deserializers to interpret the message received.

Furthermore, we learned the default serializers available and, most importantly, the capability of implementing custom serializers and deserializers.

As always, the code is available over on GitHub.

November Discount Launch 2022 – Bottom
We’re finally running a Black Friday launch. All Courses are 30% off until next Friday:

>> GET ACCESS NOW

Generic footer banner
Comments are closed on this article!