Expand Authors Top

If you have a few years of experience in the Java ecosystem and you’d like to share that with the community, have a look at our Contribution Guidelines.

Expanded Audience – Frontegg – Security (partner)
announcement - icon User management is very complex, when implemented properly. No surprise here.

Not having to roll all of that out manually, but instead integrating a mature, fully-fledged solution - yeah, that makes a lot of sense.
That's basically what Frontegg is - User Management for your application. It's focused on making your app scalable, secure and enjoyable for your users.
From signup to authentication, it supports simple scenarios all the way to complex and custom application logic.

Have a look:

>> Elegant User Management, Tailor-made for B2B SaaS

November Discount Launch 2022 – Top
We’re finally running a Black Friday launch. All Courses are 30% off until the end of this week:


November Discount Launch 2022 – TEMP TOP (NPI)
We’re finally running a Black Friday launch. All Courses are 30% off until the end of this week:


1. Overview

In a previous article, we had a quick introduction to Kafka Connect, including the different types of connectors, basic features of Connect, as well as the REST API.

In this tutorial, we'll use Kafka connectors to build a more “real world” example.

We'll use a connector to collect data via MQTT, and we'll write the gathered data to MongoDB.

2. Setup Using Docker

We'll use Docker Compose to set up the infrastructure. That includes an MQTT broker as the source, Zookeeper, one Kafka broker as well Kafka Connect as middleware, and finally a MongoDB instance including a GUI tool as the sink.

2.1. Connector Installation

The connectors required for our example, an MQTT source as well as a MongoDB sink connector, are not included in plain Kafka or the Confluent Platform.

As we discussed in the previous article, we can download the connectors (MQTT as well as MongoDB) from the Confluent hub. After that, we have to unpack the jars into a folder, which we'll mount into the Kafka Connect container in the following section.

Let's use the folder /tmp/custom/jars for that. We have to move the jars there before starting the compose stack in the following section, as Kafka Connect loads connectors online during startup.

2.2. Docker Compose File

We describe our setup as a simple Docker compose file, which consists of six containers:

version: '3.3'

    image: eclipse-mosquitto:1.5.5
    hostname: mosquitto
    container_name: mosquitto
      - "1883"
      - "1883:1883"
    image: zookeeper:3.4.9
    restart: unless-stopped
    hostname: zookeeper
    container_name: zookeeper
      - "2181:2181"
        ZOO_MY_ID: 1
        ZOO_PORT: 2181
        ZOO_SERVERS: server.1=zookeeper:2888:3888
      - ./zookeeper/data:/data
      - ./zookeeper/datalog:/datalog
    image: confluentinc/cp-kafka:5.1.0
    hostname: kafka
    container_name: kafka
      - "9092:9092"
      KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
      - ./kafka/data:/var/lib/kafka/data
      - zookeeper
    image: confluentinc/cp-kafka-connect:5.1.0
    hostname: kafka-connect
    container_name: kafka-connect
      - "8083:8083"
      CONNECT_GROUP_ID: compose-connect-group
      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_PLUGIN_PATH: '/usr/share/java,/etc/kafka-connect/jars'
      - /tmp/custom/jars:/etc/kafka-connect/jars
      - zookeeper
      - kafka
      - mosquitto
    image: mongo:4.0.5
    hostname: mongo-db
    container_name: mongo-db
      - "27017"
      - "27017:27017"
    command: --bind_ip_all --smallfiles
      - ./mongo-db:/data
    image: mongoclient/mongoclient:2.2.0
    container_name: mongoclient
    hostname: mongoclient
      - mongo-db
      - 3000:3000
      MONGO_URL: "mongodb://mongo-db:27017"
      PORT: 3000
      - "3000"

The mosquitto container provides a simple MQTT broker based on Eclipse Mosquitto.

The containers zookeeper and kafka define a single-node Kafka cluster.

kafka-connect defines our Connect application in distributed mode.

And finally, mongo-db defines our sink database, as well as the web-based mongoclient, which helps us to verify whether the sent data arrived correctly in the database.

We can start the stack using the following command:

docker-compose up

3. Connector Configuration

As Kafka Connect is now up and running,  we can now configure the connectors.

3.1. Configure Source Connector

Let's configure the source connector using the REST API:

curl -d @<path-to-config-file>/connect-mqtt-source.json -H "Content-Type: application/json" -X POST http://localhost:8083/connectors

Our connect-mqtt-source.json file looks like this:

    "name": "mqtt-source",
    "config": {
        "connector.class": "io.confluent.connect.mqtt.MqttSourceConnector",
        "tasks.max": 1,
        "mqtt.server.uri": "tcp://mosquitto:1883",
        "mqtt.topics": "baeldung",
        "kafka.topic": "connect-custom",
        "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
        "confluent.topic.bootstrap.servers": "kafka:9092",
        "confluent.topic.replication.factor": 1

There are a few properties, which we haven't used before:

  • mqtt.server.uri is the endpoint our connector will connect to
  • mqtt.topics is the MQTT topic our connector will subscribe to
  • kafka.topic defines the Kafka topic the connector will send the received data to
  • value.converter defines a converter which will be applied to the received payload. We need the ByteArrayConverter, as the MQTT Connector uses Base64 by default, while we want to use plain text
  • confluent.topic.bootstrap.servers is required by the newest version of the connector
  • The same applies to confluent.topic.replication.factor: it defines the replication factor for a Confluent-internal topic – as we have only one node in our cluster, we have to set that value to 1

3.2. Test Source Connector

Let's run a quick test by publishing a short message to the MQTT broker:

docker run \
-it --rm --name mqtt-publisher --network 04_custom_default \
efrecon/mqtt-client \
pub -h mosquitto  -t "baeldung" -m "{\"id\":1234,\"message\":\"This is a test\"}"

And if we listen to the topic, connect-custom:

docker run \
--rm \
confluentinc/cp-kafka:5.1.0 \
kafka-console-consumer --network 04_custom_default --bootstrap-server kafka:9092 --topic connect-custom --from-beginning

then we should see our test message.

3.3. Setup Sink Connector

Next, we need our sink connector. Let's again use the REST API:

curl -d @<path-to-config file>/connect-mongodb-sink.json -H "Content-Type: application/json" -X POST http://localhost:8083/connectors

Our connect-mongodb-sink.json file looks like this:

    "name": "mongodb-sink",
    "config": {
        "connector.class": "at.grahsl.kafka.connect.mongodb.MongoDbSinkConnector",
        "tasks.max": 1,
        "topics": "connect-custom",
        "mongodb.connection.uri": "mongodb://mongo-db/test?retryWrites=true",
        "mongodb.collection": "MyCollection",
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter.schemas.enable": false,
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": false

We have the following MongoDB-specific properties here:

  • mongodb.connection.uri contains the connection string for our MongoDB instance
  • mongodb.collection defines the collection
  • Since the MongoDB connector is expecting JSON, we have to set JsonConverter for key.converter and value.converter
  • And we also need schemaless JSON for MongoDB, so we have to set key.converter.schemas.enable and value.converter.schemas.enable to false

3.4. Test Sink Connector

Since our topic connect-custom already contains messages from the MQTT connector test, the MongoDB connector should have fetched them directly after creation.

Hence, we should find them immediately in our MongoDB. We can use the web interface for that, by opening the URL http://localhost:3000/After login, we can select our MyCollection on the left, hit Execute, and our test message should be displayed.

3.5. End-to-end Test

Now, we can send any JSON struct using the MQTT client:

    "firstName": "John",
    "lastName": "Smith",
    "age": 25,
    "address": {
        "streetAddress": "21 2nd Street",
        "city": "New York",
        "state": "NY",
        "postalCode": "10021"
    "phoneNumber": [{
        "type": "home",
        "number": "212 555-1234"
    }, {
        "type": "fax",
        "number": "646 555-4567"
    "gender": {
        "type": "male"

MongoDB supports schema-free JSON documents, and as we disabled schemas for our converter, any struct is immediately passed through our connector chain and stored in the database.

Again, we can use the web interface at http://localhost:3000/.

3.6. Clean Up

Once we're done, we can clean up our experiment and remove the two connectors:

curl -X DELETE http://localhost:8083/connectors/mqtt-source
curl -X DELETE http://localhost:8083/connectors/mongodb-sink

After that, we can shut down the Compose stack with Ctrl + C.

4. Conclusion

In this tutorial, we built an example using Kafka Connect, to collect data via MQTT, and to write the gathered data to MongoDB.

As always, the config files can be found over on GitHub.

November Discount Launch 2022 – Bottom
We’re finally running a Black Friday launch. All Courses are 30% off until the end of this week:


Generic footer banner
Comments are closed on this article!