eBook – Guide Spring Cloud – NPI EA (cat=Spring Cloud)
announcement - icon

Let's get started with a Microservice Architecture with Spring Cloud:

>> Join Pro and download the eBook

eBook – Mockito – NPI EA (tag = Mockito)
announcement - icon

Mocking is an essential part of unit testing, and the Mockito library makes it easy to write clean and intuitive unit tests for your Java code.

Get started with mocking and improve your application tests using our Mockito guide:

Download the eBook

eBook – Java Concurrency – NPI EA (cat=Java Concurrency)
announcement - icon

Handling concurrency in an application can be a tricky process with many potential pitfalls. A solid grasp of the fundamentals will go a long way to help minimize these issues.

Get started with understanding multi-threaded applications with our Java Concurrency guide:

>> Download the eBook

eBook – Reactive – NPI EA (cat=Reactive)
announcement - icon

Spring 5 added support for reactive programming with the Spring WebFlux module, which has been improved upon ever since. Get started with the Reactor project basics and reactive programming in Spring Boot:

>> Join Pro and download the eBook

eBook – Java Streams – NPI EA (cat=Java Streams)
announcement - icon

Since its introduction in Java 8, the Stream API has become a staple of Java development. The basic operations like iterating, filtering, mapping sequences of elements are deceptively simple to use.

But these can also be overused and fall into some common pitfalls.

To get a better understanding on how Streams work and how to combine them with other language features, check out our guide to Java Streams:

>> Join Pro and download the eBook

eBook – Jackson – NPI EA (cat=Jackson)
announcement - icon

Do JSON right with Jackson

Download the E-book

eBook – HTTP Client – NPI EA (cat=Http Client-Side)
announcement - icon

Get the most out of the Apache HTTP Client

Download the E-book

eBook – Maven – NPI EA (cat = Maven)
announcement - icon

Get Started with Apache Maven:

Download the E-book

eBook – Persistence – NPI EA (cat=Persistence)
announcement - icon

Working on getting your persistence layer right with Spring?

Explore the eBook

eBook – RwS – NPI EA (cat=Spring MVC)
announcement - icon

Building a REST API with Spring?

Download the E-book

Course – LS – NPI EA (cat=Jackson)
announcement - icon

Get started with Spring and Spring Boot, through the Learn Spring course:

>> LEARN SPRING
Course – RWSB – NPI EA (cat=REST)
announcement - icon

Explore Spring Boot 3 and Spring 6 in-depth through building a full REST API with the framework:

>> The New “REST With Spring Boot”

Course – LSS – NPI EA (cat=Spring Security)
announcement - icon

Yes, Spring Security can be complex, from the more advanced functionality within the Core to the deep OAuth support in the framework.

I built the security material as two full courses - Core and OAuth, to get practical with these more complex scenarios. We explore when and how to use each feature and code through it on the backing project.

You can explore the course here:

>> Learn Spring Security

Course – LSD – NPI EA (tag=Spring Data JPA)
announcement - icon

Spring Data JPA is a great way to handle the complexity of JPA with the powerful simplicity of Spring Boot.

Get started with Spring Data JPA through the guided reference course:

>> CHECK OUT THE COURSE

Partner – Moderne – NPI EA (cat=Spring Boot)
announcement - icon

Refactor Java code safely — and automatically — with OpenRewrite.

Refactoring big codebases by hand is slow, risky, and easy to put off. That’s where OpenRewrite comes in. The open-source framework for large-scale, automated code transformations helps teams modernize safely and consistently.

Each month, the creators and maintainers of OpenRewrite at Moderne run live, hands-on training sessions — one for newcomers and one for experienced users. You’ll see how recipes work, how to apply them across projects, and how to modernize code with confidence.

Join the next session, bring your questions, and learn how to automate the kind of work that usually eats your sprint time.

Course – LJB – NPI EA (cat = Core Java)
announcement - icon

Code your way through and build up a solid, practical foundation of Java:

>> Learn Java Basics

1. Overview

In a previous article, we had a quick introduction to Kafka Connect, including the different types of connectors, basic features of Connect, as well as the REST API.

In this tutorial, we’ll use Kafka connectors to build a more “real world” example.

We’ll use a connector to collect data via MQTT, and we’ll write the gathered data to MongoDB.

2. Setup Using Docker

We’ll use Docker Compose to set up the infrastructure. That includes an MQTT broker as the source, Zookeeper, one Kafka broker as well Kafka Connect as middleware, and finally a MongoDB instance including a GUI tool as the sink.

2.1. Connector Installation

The connectors required for our example, an MQTT source as well as a MongoDB sink connector, are not included in plain Kafka or the Confluent Platform.

As we discussed in the previous article, we can download the connectors (MQTT as well as MongoDB) from the Confluent hub. After that, we have to unpack the jars into a folder, which we’ll mount into the Kafka Connect container in the following section.

Let’s use the folder /tmp/custom/jars for that. We have to move the jars there before starting the compose stack in the following section, as Kafka Connect loads connectors online during startup.

2.2. Docker Compose File

We describe our setup as a simple Docker compose file, which consists of six containers:

version: '3.3'

services:
  mosquitto:
    image: eclipse-mosquitto:1.5.5
    hostname: mosquitto
    container_name: mosquitto
    expose:
      - "1883"
    ports:
      - "1883:1883"
  zookeeper:
    image: zookeeper:3.4.9
    restart: unless-stopped
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
        ZOO_MY_ID: 1
        ZOO_PORT: 2181
        ZOO_SERVERS: server.1=zookeeper:2888:3888
    volumes:
      - ./zookeeper/data:/data
      - ./zookeeper/datalog:/datalog
  kafka:
    image: confluentinc/cp-kafka:5.1.0
    hostname: kafka
    container_name: kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
      KAFKA_BROKER_ID: 1
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    volumes:
      - ./kafka/data:/var/lib/kafka/data
    depends_on:
      - zookeeper
  kafka-connect:
    image: confluentinc/cp-kafka-connect:5.1.0
    hostname: kafka-connect
    container_name: kafka-connect
    ports:
      - "8083:8083"
    environment:
      CONNECT_BOOTSTRAP_SERVERS: "kafka:9092"
      CONNECT_REST_ADVERTISED_HOST_NAME: connect
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: compose-connect-group
      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_PLUGIN_PATH: '/usr/share/java,/etc/kafka-connect/jars'
      CONNECT_CONFLUENT_TOPIC_REPLICATION_FACTOR: 1
    volumes:
      - /tmp/custom/jars:/etc/kafka-connect/jars
    depends_on:
      - zookeeper
      - kafka
      - mosquitto
  mongo-db:
    image: mongo:4.0.5
    hostname: mongo-db
    container_name: mongo-db
    expose:
      - "27017"
    ports:
      - "27017:27017"
    command: --bind_ip_all --smallfiles
    volumes:
      - ./mongo-db:/data
  mongoclient:
    image: mongoclient/mongoclient:2.2.0
    container_name: mongoclient
    hostname: mongoclient
    depends_on:
      - mongo-db
    ports:
      - 3000:3000
    environment:
      MONGO_URL: "mongodb://mongo-db:27017"
      PORT: 3000
    expose:
      - "3000"

The mosquitto container provides a simple MQTT broker based on Eclipse Mosquitto.

The containers zookeeper and kafka define a single-node Kafka cluster.

kafka-connect defines our Connect application in distributed mode.

And finally, mongo-db defines our sink database, as well as the web-based mongoclient, which helps us to verify whether the sent data arrived correctly in the database.

We can start the stack using the following command:

docker-compose up

3. Connector Configuration

As Kafka Connect is now up and running,  we can now configure the connectors.

3.1. Configure Source Connector

Let’s configure the source connector using the REST API:

curl -d @<path-to-config-file>/connect-mqtt-source.json -H "Content-Type: application/json" -X POST http://localhost:8083/connectors

Our connect-mqtt-source.json file looks like this:

{
    "name": "mqtt-source",
    "config": {
        "connector.class": "io.confluent.connect.mqtt.MqttSourceConnector",
        "tasks.max": 1,
        "mqtt.server.uri": "tcp://mosquitto:1883",
        "mqtt.topics": "baeldung",
        "kafka.topic": "connect-custom",
        "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
        "confluent.topic.bootstrap.servers": "kafka:9092",
        "confluent.topic.replication.factor": 1
    }
}

There are a few properties, which we haven’t used before:

  • mqtt.server.uri is the endpoint our connector will connect to
  • mqtt.topics is the MQTT topic our connector will subscribe to
  • kafka.topic defines the Kafka topic the connector will send the received data to
  • value.converter defines a converter which will be applied to the received payload. We need the ByteArrayConverter, as the MQTT Connector uses Base64 by default, while we want to use plain text
  • confluent.topic.bootstrap.servers is required by the newest version of the connector
  • The same applies to confluent.topic.replication.factor: it defines the replication factor for a Confluent-internal topic – as we have only one node in our cluster, we have to set that value to 1

3.2. Test Source Connector

Let’s run a quick test by publishing a short message to the MQTT broker:

docker run \
-it --rm --name mqtt-publisher --network 04_custom_default \
efrecon/mqtt-client \
pub -h mosquitto  -t "baeldung" -m "{\"id\":1234,\"message\":\"This is a test\"}"

And if we listen to the topic, connect-custom:

docker run \
--rm \
confluentinc/cp-kafka:5.1.0 \
kafka-console-consumer --network 04_custom_default --bootstrap-server kafka:9092 --topic connect-custom --from-beginning

then we should see our test message.

3.3. Setup Sink Connector

Next, we need our sink connector. Let’s again use the REST API:

curl -d @<path-to-config file>/connect-mongodb-sink.json -H "Content-Type: application/json" -X POST http://localhost:8083/connectors

Our connect-mongodb-sink.json file looks like this:

{
    "name": "mongodb-sink",
    "config": {
        "connector.class": "at.grahsl.kafka.connect.mongodb.MongoDbSinkConnector",
        "tasks.max": 1,
        "topics": "connect-custom",
        "mongodb.connection.uri": "mongodb://mongo-db/test?retryWrites=true",
        "mongodb.collection": "MyCollection",
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter.schemas.enable": false,
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": false
    }
}

We have the following MongoDB-specific properties here:

  • mongodb.connection.uri contains the connection string for our MongoDB instance
  • mongodb.collection defines the collection
  • Since the MongoDB connector is expecting JSON, we have to set JsonConverter for key.converter and value.converter
  • And we also need schemaless JSON for MongoDB, so we have to set key.converter.schemas.enable and value.converter.schemas.enable to false

3.4. Test Sink Connector

Since our topic connect-custom already contains messages from the MQTT connector test, the MongoDB connector should have fetched them directly after creation.

Hence, we should find them immediately in our MongoDB. We can use the web interface for that, by opening the URL http://localhost:3000/After login, we can select our MyCollection on the left, hit Execute, and our test message should be displayed.

3.5. End-to-end Test

Now, we can send any JSON struct using the MQTT client:

{
    "firstName": "John",
    "lastName": "Smith",
    "age": 25,
    "address": {
        "streetAddress": "21 2nd Street",
        "city": "New York",
        "state": "NY",
        "postalCode": "10021"
    },
    "phoneNumber": [{
        "type": "home",
        "number": "212 555-1234"
    }, {
        "type": "fax",
        "number": "646 555-4567"
    }],
    "gender": {
        "type": "male"
    }
}

MongoDB supports schema-free JSON documents, and as we disabled schemas for our converter, any struct is immediately passed through our connector chain and stored in the database.

Again, we can use the web interface at http://localhost:3000/.

3.6. Clean Up

Once we’re done, we can clean up our experiment and remove the two connectors:

curl -X DELETE http://localhost:8083/connectors/mqtt-source
curl -X DELETE http://localhost:8083/connectors/mongodb-sink

After that, we can shut down the Compose stack with Ctrl + C.

4. Conclusion

In this tutorial, we built an example using Kafka Connect, to collect data via MQTT, and to write the gathered data to MongoDB.

The code backing this article is available on GitHub. Once you're logged in as a Baeldung Pro Member, start learning and coding on the project.
Baeldung Pro – NPI EA (cat = Baeldung)
announcement - icon

Baeldung Pro comes with both absolutely No-Ads as well as finally with Dark Mode, for a clean learning experience:

>> Explore a clean Baeldung

Once the early-adopter seats are all used, the price will go up and stay at $33/year.

eBook – HTTP Client – NPI EA (cat=HTTP Client-Side)
announcement - icon

The Apache HTTP Client is a very robust library, suitable for both simple and advanced use cases when testing HTTP endpoints. Check out our guide covering basic request and response handling, as well as security, cookies, timeouts, and more:

>> Download the eBook

eBook – Java Concurrency – NPI EA (cat=Java Concurrency)
announcement - icon

Handling concurrency in an application can be a tricky process with many potential pitfalls. A solid grasp of the fundamentals will go a long way to help minimize these issues.

Get started with understanding multi-threaded applications with our Java Concurrency guide:

>> Download the eBook

eBook – Java Streams – NPI EA (cat=Java Streams)
announcement - icon

Since its introduction in Java 8, the Stream API has become a staple of Java development. The basic operations like iterating, filtering, mapping sequences of elements are deceptively simple to use.

But these can also be overused and fall into some common pitfalls.

To get a better understanding on how Streams work and how to combine them with other language features, check out our guide to Java Streams:

>> Join Pro and download the eBook

eBook – Persistence – NPI EA (cat=Persistence)
announcement - icon

Working on getting your persistence layer right with Spring?

Explore the eBook

Course – LS – NPI EA (cat=REST)

announcement - icon

Get started with Spring Boot and with core Spring, through the Learn Spring course:

>> CHECK OUT THE COURSE

Partner – Moderne – NPI EA (tag=Refactoring)
announcement - icon

Modern Java teams move fast — but codebases don’t always keep up. Frameworks change, dependencies drift, and tech debt builds until it starts to drag on delivery. OpenRewrite was built to fix that: an open-source refactoring engine that automates repetitive code changes while keeping developer intent intact.

The monthly training series, led by the creators and maintainers of OpenRewrite at Moderne, walks through real-world migrations and modernization patterns. Whether you’re new to recipes or ready to write your own, you’ll learn practical ways to refactor safely and at scale.

If you’ve ever wished refactoring felt as natural — and as fast — as writing code, this is a good place to start.

eBook Jackson – NPI EA – 3 (cat = Jackson)