1. Overview

Monitoring an event-driven system that uses the Apache Kafka cluster would often require us to get the list of active brokers. In this tutorial, we’ll explore few shell commands to get the list of active brokers in a running cluster.

2. Setup

For the purpose of this article, let’s use the below docker-compose.yml file to set up a two-node Kafka cluster:

$ cat docker-compose.yml
---
version: '2'
services:
  zookeeper-1:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - 2181:2181
  
  kafka-1:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper-1
    ports:
      - 29092:29092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-1:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
  kafka-2:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper-1
    ports:
      - 39092:39092
    environment:
      KAFKA_BROKER_ID: 2
      KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-2:9092,PLAINTEXT_HOST://localhost:39092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

Now, let’s spin-up the Kafka cluster using the docker-compose command:

$ docker-compose up -d

We can verify that the Zookeeper server is listening on port 2181, while the Kafka brokers are listening on ports 29092 and 39092, respectively:

$ ports=(2181 29092 39092)
$ for port in $ports
do
nc -z localhost $port
done
Connection to localhost port 2181 [tcp/eforward] succeeded!
Connection to localhost port 29092 [tcp/*] succeeded!
Connection to localhost port 39092 [tcp/*] succeeded!

3. Using Zookeeper APIs

In a Kafka cluster, the Zookeeper server stores metadata related to the Kafka broker servers. So, let’s use the filesystem APIs exposed by Zookeeper to get the broker details.

3.1. zookeeper-shell Command

Most Kafka distributions are shipped with either zookeeper-shell or zookeeper-shell.sh binary. So, it’s a de facto standard to use this binary to interact with the Zookeeper server.

First, let’s connect to the Zookeeper server running at localhost:2181:

$ /usr/local/bin/zookeeper-shell localhost:2181
Connecting to localhost:2181
Welcome to ZooKeeper!

Once we’re connected to the Zookeeper server, we can execute typical filesystem commands such as ls to get metadata information stored in the server. Let’s find the ids of the brokers that are currently alive:

ls /brokers/ids
[1, 2]

We can see that there are currently two active brokers, with ids 1 and 2. Using the get command, we can fetch more details for a specific broker with a given id:

get /brokers/ids/1
{"features":{},"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT","PLAINTEXT_HOST":"PLAINTEXT"},"endpoints":["PLAINTEXT://kafka-1:9092","PLAINTEXT_HOST://localhost:29092"],"jmx_port":-1,"port":9092,"host":"kafka-1","version":5,"timestamp":"1625336133848"}
get /brokers/ids/2
{"features":{},"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT","PLAINTEXT_HOST":"PLAINTEXT"},"endpoints":["PLAINTEXT://kafka-2:9092","PLAINTEXT_HOST://localhost:39092"],"jmx_port":-1,"port":9092,"host":"kafka-2","version":5,"timestamp":"1625336133967"}

Note that the broker with id=1 is listening on port 29092, while the second broker with id=2 is listening on port 39092.

Finally, to exit the Zookeeper shell, we can use the quit command:

quit

3.2. zkCli Command

Just like Kafka distributions are shipped with the zookeeper-shell binary, Zookeeper distributions are shipped with zkCli or zkCli.sh binary.

As such, interacting with zkCli is exactly like interacting with zookeeper-shell, so let’s go ahead and confirm that we’re able to get the required details for the broker with id=1:

$ zkCli -server localhost:2181 get /brokers/ids/1
Connecting to localhost:2181

WATCHER::

WatchedEvent state:SyncConnected type:None path:null
{"features":{},"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT","PLAINTEXT_HOST":"PLAINTEXT"},"endpoints":["PLAINTEXT://kafka-1:9092","PLAINTEXT_HOST://localhost:29092"],"jmx_port":-1,"port":9092,"host":"kafka-1","version":5,"timestamp":"1625336133848"}

As expected, we can see that broker details fetched using zookeeper-shell match those obtained using zkCli.

4. Using the Broker Version API

At times, we might have an incomplete list of active brokers, and we want to get all the available brokers in the cluster. In such a scenario, we can use the kafka-broker-api-versions command shipped with the Kafka distributions.

Let’s assume that we know about a broker running at localhost:29092, so let’s try to find out all active brokers participating in the Kafka cluster:

$ kafka-broker-api-versions --bootstrap-server localhost:29092 | awk '/id/{print $1}'
localhost:39092
localhost:29092

It’s worth noting that we used the awk command to filter the output and show only the broker address. Further, the result correctly shows that there are two active brokers in the cluster.

Although this approach looks simpler than the Zookeeper CLI approach, the kafka-broker-api-versions binary is only a recent addition to Kafka distribution.

5. Shell Script

In any practical scenario, executing the zkCli or zookeeper-shell commands manually for each broker would be taxing. So, let’s write a Shell script that takes the Zookeeper server address as an input, and in return, gives us the list of all active brokers.

5.1. Helper Functions

Let’s write all the helper functions in the functions.sh script:

$ cat functions.sh
#!/bin/bash
ZOOKEEPER_SERVER="${1:-localhost:2181}"

# Helper Functions Below

First, let’s write the get_broker_ids function to get the set of active broker ids that will call the zkCli command internally:

function get_broker_ids {
broker_ids_out=$(zkCli -server $ZOOKEEPER_SERVER <<EOF
ls /brokers/ids
quit
EOF
)
broker_ids_csv="$(echo "${broker_ids_out}" | grep '^\[.*\]$')"
echo "$broker_ids_csv" | sed 's/\[//;s/]//;s/,/ /'
}

Next, let’s write the get_broker_details function to get the verbose broker details using the broker_id:

function get_broker_details {
broker_id="$1"
echo "$(zkCli -server $ZOOKEEPER_SERVER <<EOF
get /brokers/ids/$broker_id
quit
EOF
)"
}

Now that we have the verbose broker details, let’s write the parse_broker_endpoint function to get the broker’s endpoint detail:

function parse_endpoint_detail {
broker_detail="$1"
json="$(echo "$broker_detail"  | grep '^{.*}$')"
json_endpoints="$(echo $json | jq .endpoints)"
echo "$(echo $json_endpoints |jq . |  grep HOST | tr -d " ")"
}

Internally, we used the jq command for the JSON parsing.

5.2. Main Script

Now, let’s write the main script get_all_active_brokers.sh that uses the helper functions defined in functions.sh:

$ cat get_all_active_brokers.sh
#!/bin/bash
. functions.sh "$1"

function get_all_active_brokers {
broker_ids=$(get_broker_ids)
for broker_id in $broker_ids
do
    broker_details="$(get_broker_details $broker_id)"
    broker_endpoint=$(parse_endpoint_detail "$broker_details")
    echo "broker_id="$broker_id,"endpoint="$broker_endpoint
done
}

get_all_active_brokers

We can notice that we’ve iterated over all the broker_ids in the get_all_active_brokers function to aggregate the endpoints for all the active brokers.

Finally, let’s execute the get_all_active_brokers.sh script so that we can see the list of active brokers for our two-node Kafka cluster:

$ ./get_all_active_brokers.sh localhost:2181
broker_id=1,endpoint="PLAINTEXT_HOST://localhost:29092"
broker_id=2,endpoint="PLAINTEXT_HOST://localhost:39092"

We can see that the results are accurate. It looks like we nailed it!

6. Conclusion

In this tutorial, we learned about shell commands such as zookeeper-shell, zkCli, and kafka-broker-api-versions to get the list of active brokers in a Kafka cluster. Additionally, we wrote a shell script to automate the process of finding broker details in real-world scenarios.

Comments are open for 30 days after publishing a post. For any issues past this date, use the Contact form on the site.