Managing Azure Event Hubs with kafkactl

We faced the issue that some messages in Azure Event Hub couldn’t be processed. We wanted to fetch the messages with all additional information to analyze the issue. As Azure Event Hubs has an interface for Apache Kafka, we decided to use kafkactl as it is quite flexible.

 

About kafkactl

kafkactl is a small command-line utility developed by Device Insight that is written in Go. As the name implies, the tool tries to be for Kafka clusters what kubectl is for Kubernetes clusters. It allows to manage multiple clusters and can be used to inspect and control various aspects around Kafka like topics and consumer groups.

Configuration / Information of Azure Event Hubs

The broker for kafkactl is the “Host name” value of the Azure Event Hubs Namespace.

Where to get the Event Hubs Namespace Host Name in the Azure Portal
Where to find the Event Hubs Namespace Host Name in the Azure Portal

For the authentication the connection string is used. This can be for a specific topic / event hub or the primary key which allows all topics / event hubs to be consumed.

Go to “Shared access policies” and select the “RootManageSharedAccessKey”. Then copy the primary connection string.

Screenshot of the Azure Portal that shows how to navigate to the RootManageSharedAccessKey Connection String
How to find the RootManageSharedAccessKey Connection String

Configuration of kafkactl

The Azure Event Hub Kafka interface uses Kafka 1.0 and SASL authentication. This must be configured in the kafkactl config.yml file.
							
							
					contexts:
    dev-blog-demo:
        brokers:
        - "dev-blog-demo.servicebus.windows.net:9093"
        sasl:
            enabled: true
            mechanism: "plaintext"
            password: "Endpoint=sb://dev-blog-demo.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=qVyjNrYLvqJmh5GVmlar0Yb5RRdmJGAXSWc498sktQw="
            username: "$ConnectionString"
        tls:
            enabled: true				
			

To adopt this for your environment, you need to set the brokers and the password aka primary connection-string.

Usage

Now you activate the context and use kafkactl as wanted.

For example, create a new topic, write to the topic, consume from the topic and delete the topic again.

							
							
					psc@psc2 ~ % kafkactl config use-context dev-blog-demo

psc@psc2 ~ % kafkactl get topics
TOPIC       PARTITIONS
example     4

psc@psc2 ~ % kafkactl create topic my-new-event-hub --partitions 8 
topic created: my-new-event-hub

psc@psc2 ~ % kafkactl get topics
TOPIC                PARTITIONS
example              4
my-new-event-hub     8

psc@psc2 ~ % kafkactl produce my-new-event-hub --header KEY:VALUE --value '{"msg":"test"}'
message produced (partition=1	offset=0)

psc@psc2 ~ % kafkactl consume my-new-event-hub --from-beginning --print-headers
KEY:VALUE#{"msg":"test"}

psc@psc2 ~ % kafkactl delete topic example
topic deleted: example

psc@psc2 ~ % kafkactl get topics
TOPIC                PARTITIONS
my-new-event-hub     8				
			

For more details about its usage see kafkactl’s GitHub repository.

Recommended posts

Stefan Hudelmaier
2022/09/13

Visualize data from Azure IoT Hub using React and ux4iot

Write an excerpt (optional)