HiveMQ Enterprise Extension for Kafka

The HiveMQ Enterprise Extension for Kafka implements the native Kafka protocol inside the HiveMQ broker. This allows the seamless integration of MQTT messages with one or multiple Kafka clusters.

Features

The HiveMQ Enterprise Extension for Kafka allows you to:

  • Forward messages sent from IoT devices, which are connected to HiveMQ, to multiple Kafka clusters.

  • Select multiple MQTT topic filters per Kafka topic. MQTT topic filters come with full support of MQTT wildcards.

  • Monitoring of MQTT messages that are written to Kafka via the HiveMQ Control Center.

  • Messages are buffered at the broker, if the Kafka cluster is not available, to ensure high-availability and failure-tolerance.


The HiveMQ Enterprise Extension for Kafka allows you to select multiple MQTT topic filters that are forwarded to configurable Kafka topics.

The HiveMQ Enterprise Extension for Kafka does not interfere with the regular application of the MQTT Pub/Sub mechanism, and can be used in addition to normal MQTT subscriptions and Shared Subscriptions.

publish to kafka


The HiveMQ Enterprise Extension for Kafka acts as multiple Kafka producers that send every selected MQTT Publish Message to the configured Kafka cluster(s). Every HiveMQ extension on every HiveMQ node in a HiveMQ cluster opens connections to all needed Kafka brokers in the desired Kafka clusters automatically.

publish cluster to kafka cluster

The selected MQTT Publish messages are sent to Kafka with the original MQTT topic from the Publish message as key and the Payload of the MQTT Publish message as value.

Requirements

To run the HiveMQ Enterprise Extension for Kafka the following requirements must be met:

  • A running HiveMQ Professional or Enterprise Edition installation (versions 4.1.0 and higher)

  • A running Kafka Cluster (versions 0.10.2 and higher)

Installation

  • To install the HiveMQ Enterprise Extension for Kafka, download the extension from the the marketplace

  • and unzip the downloaded file into the extension folder of your HiveMQ installation.

 |- <HiveMQ folder>
   |- bin
   |- config
   |- extensions
     |- hivemq-kafka-extension
   |- data
 ...
The HiveMQ Enterprise Extension for Kafka must be installed on all HiveMQ broker nodes in a HiveMQ cluster to function properly.
  • The extension is now installed and needs to be configured for your individual Kafka cluster(s) and topics. An example configuration file can be found inside the unzipped folder. See the configuration chapter for all possible configuration options.

If any of the configured Kafka topics do not already exist, the HiveMQ Enterprise Extension for Kafka will try to create the topic automatically. The topic will be created with a default replication factor of 1 and a default partition count of 10.

You can check if the extension is installed and configured properly by checking the HiveMQ Logs and the then created Kafka tab of the HiveMQ Control Center.

kafka control center 100k

Configuration

The HiveMQ Enterprise Extension for Kafka is configured with sensible default settings. Those default values will be sufficient for most users to get started.

The configuration is divided into Kafka clusters and Topic Mappings.
Kafka Clusters represents an arbitrary number of physical Kafka cluster and their respective connection settings.
A Topic Mapping represents the mapping of sets of MQTT topic filters to a Kafka topic.
Each Topic Mapping refers to exactly one Kafka Cluster, while a single Kafka Cluster can use an arbitrary number of Topic Mappings.

Configuration Files

The configuration file kafka-configuration.xml is located in the folder of the extension inside the extensions folder of the HiveMQ installation.

The extension uses a simple but powerful XML based configuration.

Example configuration
...
<?xml version="1.0" encoding="UTF-8" ?>
<kafka-configuration xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
                     xsi:noNamespaceSchemaLocation="kafka-extension.xsd">

    <kafka-clusters>
        <kafka-cluster>
            <id>cluster01</id>
            <bootstrap-servers>127.0.0.1:9092</bootstrap-servers>
        </kafka-cluster>
    </kafka-clusters>

    <topic-mappings>
        <topic-mapping>
            <id>mapping01</id>
            <cluster-id>cluster01</cluster-id>
            <mqtt-topic-filters>
                <mqtt-topic-filter>mytopic/#</mqtt-topic-filter>
            </mqtt-topic-filters>
            <kafka-topic>my-kafka-topic</kafka-topic>
        </topic-mapping>
    </topic-mappings>

</kafka-configuration>
...


It is possible apply changes to kafka-configuration.xml at runtime, no restart of the extension or HiveMQ is required, for those changes to take effect. The previous configuration file is then automatically archived to the config-archive sub-folder in the extension’s folder.

If the configuration file is changed at runtime and the configuration contains errors, the previous configuration is kept.

When changing the configuration at runtime, the Topic Mapping is stopped and re-started automatically to include the changes. If only the mqtt-topic-filters in a Topic Mapping are changed, a restart of the topic mapping is not necessary. When a topic mapping is restarted, the existing connections to Kafka for this topic mapping are also closed and re-established.


Kafka Cluster

The <kafka-cluster setting are used by the extension to establish the connection to a Kafka cluster.

Minimal Example configuration
...
<kafka-clusters>
    <kafka-cluster>
        <id>cluster01</id>
        <bootstrap-servers>127.0.0.1:9092</bootstrap-servers>
    </kafka-cluster>
<kafka-clusters>
...


The following values can be configured:

Table 1. Kafka Cluster options
Name Default Mandatory Description

id

-

The unique identifier for this Kafka cluster. This string can only contain the characters abcdefghijklmnopqrstuvwxyz0123456789-_

bootstrap-servers

-

A comma separated list of the Kafka bootstrap servers with host and port.

tls

disabled

TLS configuration.

authentication

none

Authentication configuration.

If you need to connect to the same Kafka cluster with different settings, you can just add the same cluster multiple times with a different id and different settings.


Topic Mapping

Topic mappings represent the routing information for MQTT topic filters to Kafka topics.

Minimal Example configuration
...
<topic-mappings>

        <topic-mapping>
            <id>mapping01</id>

            <!-- Kafka cluster to use for this topic mapping -->
            <cluster-id>cluster01</cluster-id>

            <!-- List of MQTT topic filters -->
            <mqtt-topic-filters>
                <mqtt-topic-filter>topic1/#</mqtt-topic-filter>
                <mqtt-topic-filter>topic2/#</mqtt-topic-filter>
            </mqtt-topic-filters>

            <!-- Target Kafka topic -->
            <kafka-topic>topic01</kafka-topic>

        </topic-mapping>

    </topic-mappings>
...


The following values can be configured:

Table 2. Topic mapping options
Name Default Mandatory Description

id

-

The unique identifier for this topic mapping. This string can only contain the characters abcdefghijklmnopqrstuvwxyz0123456789-_.

cluster-id

-

The identifier of the referenced Kafka cluster.

mqtt-topic-filters

-

A list of MQTT topic filters.

kafka-topic

-

The kafka topic that the messages are produced to.

kafka-acks

ONE

The Ack mode for Kafka. Can be ZERO (do not wait for acknowledgements), ONE (at least one Ack), ACK (all replicas must Ack, depends on Kafka server configuration).

If you want to send the same messages to multiple Kafka topic, you can simply duplicate the topic mapping and change the id and the kafka-topic for the second topic mapping.


TLS

If you want to use a secured TCP connection to connect to your Kafka cluster(s), you can configure the TLS options inside the <kafka-cluster> setting.

Example TLS configuration
...
<kafka-clusters>
    <kafka-cluster>
        <id>cluster01</id>
        <bootstrap-servers>127.0.0.1:9092</bootstrap-servers>

        <tls>
            <enabled>true</enabled>

            <!-- Truststore, to trust the Kafka server's certificate -->
            <truststore>
                <path>/opt/hivemq/conf/kafka-trust.jks</path>
                <password>truststorepassword</password>
            </truststore>

            <!-- Keystore, when mutual TLS is used -->
            <keystore>
                <path>/opt/hivemq/conf/kafka-key.jks</path>
                <password>keystorepassword</password>
                <private-key-password>privatekeypassword</private-key-password>
            </keystore>

            <!-- Cipher suites supported by the client -->
            <cipher-suites>
                <cipher-suite>TLS_AES_128_GCM_SHA256</cipher-suite>
                <cipher-suite>TLS_AES_256_GCM_SHA384</cipher-suite>
            </cipher-suites>

            <!-- Supported TLS protocols -->
            <protocols>
                <protocol>TLSv1.2</protocol>
            </protocols>

            <!-- If the client should verify the server's hostname -->
            <hostname-verification>true</hostname-verification>
        </tls>

    </kafka-cluster>
</kafka-clusters>
...
Table 3. TLS element options
Name Default Mandatory Description

enabled

false

If TLS is enabled. Can be true or false

protocols

All protocols enabled by the JVM

The enabled protocols

cipher-suites

All cipher suites enabled the JVM

The enabled cipher-suites

keystore.path

-

The path to the key store where your certificate and private key are included

keystore.password

-

The password to open the key store

keystore.private-key-password

-

The password for the private key

truststore.path

-

The path for the trust store which includes trusted client certificates

truststore.password

-

The password to open the trust store

hostname-verification

true

If hostname verification is enabled. Can be true or false


Authentication

Authentication against Kafka servers can be configured per Kafka cluster inside the <kafka-cluster> settings.

The HiveMQ Enterprise Extension for Kafka supports the following authentication mechanisms:

By default no authentication is enabled.
Example authentication configuration
...
<kafka-clusters>
    <kafka-cluster>
        ...

        <authentication>
            <plain>
                <username>kafka-user</username>
                <password>kafka-password</password>
            </plain>
        </authentication>

    </kafka-cluster>
</kafka-clusters>
...


No authentication

The none authentication disables authentication against the Kafka cluster.

No authentication configuration
...
<kafka-clusters>
    <kafka-cluster>
        ...

        <authentication>
            <none/>
        </authentication>

    </kafka-cluster>
</kafka-clusters>
...


Plain

Plain authentication with username and password (not recommended).

Example plain configuration
...
<kafka-clusters>
    <kafka-cluster>
        ...

        <authentication>
            <plain>
                <username>kafka-user</username>
                <password>kafka-password</password>
            </plain>
        </authentication>

    </kafka-cluster>
</kafka-clusters>
...


SCRAM SHA256

Authentication with Salted Challenge Response Authentication Mechanism (SCRAM).
SCRAM alleviates most of the security concerns that come with a plain username/password authentication.

Example SCRAM SHA256 configuration
...
<kafka-clusters>
    <kafka-cluster>
        ...

        <authentication>
            <scram-sha256>
                <username>kafka-user</username>
                <password>kafka-user-secret</password>
            </scram-sha256>
        </authentication>

    </kafka-cluster>
</kafka-clusters>
...


SCRAM SHA512

Authentication with Salted Challenge Response Authentication Mechanism (SCRAM).
SCRAM alleviates most of the security concerns that come with a plain username/password authentication.

Example SCRAM SHA256 configuration
...
<kafka-clusters>
    <kafka-cluster>
        ...

        <authentication>
            <scram-sha512>
                <username>kafka-user</username>
                <password>kafka-user-secret</password>
            </scram-sha512>
        </authentication>

    </kafka-cluster>
</kafka-clusters>
...


GSSAPI

The Generic Security Service Application Program Interface (GSSAPI) is used to allow authentication against Kerberos.

Example GSSAPI configuration
...
<kafka-clusters>
    <kafka-cluster>
        ...

        <authentication>
            <gssapi>
                <key-tab-file>/opt/hivemq/kafka-client.keytab</key-tab-file>
                <kerberos-service-name>kafka</kerberos-service-name>
                <principal>kafka/kafka1.hostname.com@EXAMPLE.COM</principal>
                <store-key>true</store-key>
                <use-key-tab>true</use-key-tab>
            </gssapi>
        </authentication>

    </kafka-cluster>
</kafka-clusters>
...

The use of a Kerberos ticket cache can be configured as follows:

Example GSSAPI configuration
...
<kafka-clusters>
    <kafka-cluster>
        ...

        <authentication>
            <gssapi>
                <use-ticket-cache>true</use-ticket-cache>
            </gssapi>
        </authentication>

    </kafka-cluster>
</kafka-clusters>
...
Table 4. GSSAPI element options
Name Default Mandatory Description

use-key-tab

-

If a keytab file should be used. (Requires key-tab-file to be configured)

key-tab-file

-

Absolute path to the keytab file. Allowed values are true or false.

kerberos-service-name

-

Name of the kerberos service

principal

-

The name of the principal that should be used. The principal can be a simple username or a service name.

store-key

-

If you want the keytab or the principal’s key to be stored in the Subject’s private credentials. Allowed values are true or false.

use-ticket-cache

-

If the ticket cache should be used. Allowed values are true or false.


Control Center

The HiveMQ Enterprise Extension for Kafka adds additional pages to HiveMQ’s Control Center. These pages allow you to monitor the messages that are processed by the extension.

kafka control center 100k


The Dashboard for the Enterprise Extension consists of an overview at the top and more detailed graphs below. The top bar gives a quick overview over the most important metrics for the extension.

bar top
Table 5. Top bar values
Name Value type Description

Current status

Status

Current operational status of the extension. Shows errors or warnings if manual intervention is necessary.

Mapped MQTT messages

Messages per second

The amount of inbound MQTT Publish messages per second that will be processed by the HiveMQ Enterprise Extension for Kafka.

Written to Kafka

Messages per second

The amount of outbound messages that have already been written to Kafka.

Topic Mappings

Absolute value

The number of configured topic mappings.

Kafka Clusters

Absolute value

The number of configured Kafka clusters.

Kafka Brokers

Absolute value

The number of available Kafka brokers that are visible to the extension.


Underneath the top bar two larger graphs are located, which provide additional insights into the messages processed by the HiveMQ Enterprise Extension for Kafka. The left graph is displaying the Inbound MQTT Publish messages per second that will be processed by the extension. The graph is grouped by HiveMQ cluster node.

mapped large

The right is displaying the Outbound messages that have already been written to Kafka. These messages have also been acknowledged by Kafka. The graph is grouped by Kafka cluster. It is expected that both graphs show an identical overall sum.

written large

Support

In case you need any help with the HiveMQ Enterprise Extension for Kafka or have any suggestions for improvement please contact support@hivemq.com.