HiveMQ Enterprise Extension for Kafka

The HiveMQ Enterprise Extension for Kafka implements the native Kafka protocol inside the HiveMQ broker. This implementation allows you to seamlessly integrate MQTT messages with one or more Kafka clusters.

Features

The HiveMQ Enterprise Extension for Kafka allows you to:

  • Forward messages sent from IoT devices that are connected to HiveMQ to multiple Kafka clusters.

  • Select multiple MQTT topic filters per Kafka topic (MQTT topic filters come with full support of MQTT wildcards).

  • Monitor MQTT messages that are written to Kafka on the HiveMQ Control Center.

  • Buffer messages at the broker to ensure high-availability and failure tolerance if a Kafka cluster is not available.


The HiveMQ Enterprise Extension for Kafka allows you to select multiple MQTT topic filters that are forwarded to configurable Kafka topics.

The HiveMQ Enterprise Extension for Kafka does not interfere with the regular application of the MQTT Pub/Sub mechanism and can be used in addition to normal MQTT subscriptions and shared subscriptions.

publish to kafka


The HiveMQ Enterprise Extension for Kafka acts as multiple Kafka producers that send every selected MQTT Publish message to the configured Kafka clusters. In a HiveMQ cluster, each HiveMQ extension on every HiveMQ node automatically opens connections to all needed Kafka brokers in the desired Kafka clusters.

publish cluster to kafka cluster

The selected MQTT Publish messages are sent to Kafka with the original MQTT topic from the Publish message as key and the Payload of the MQTT Publish message as value.

Requirements

To run the HiveMQ Enterprise Extension for Kafka, the following requirements must be met:

  • A running HiveMQ Professional or Enterprise Edition installation (versions 4.1.0 and higher)

  • A running Kafka Cluster (versions 0.10.2 and higher)

Installation

  • To install the HiveMQ Enterprise Extension for Kafka, download the extension from our marketplace

  • Unzip the downloaded file into the extensions folder of your HiveMQ installation.

└─ <HiveMQ folder>
    ├─ bin
    ├─ config
    ├─ data
    ├─ extensions
    │   ├─ hivemq-kafka-extension
    │   └─ ...
    ├─ license
    ├─ log
    └─ ...
To function properly, the HiveMQ Enterprise Extension for Kafka must be installed on all of the HiveMQ broker nodes in a HiveMQ cluster.
  • Once installed, you need to configure the extension for your individual Kafka clusters and topics. An example configuration file is included in the download file. For information on all possible configuration options, see the configuration section .

If any of the configured Kafka topics do not already exist, the HiveMQ Enterprise Extension for Kafka attempts to create the topic automatically. The topic is created with a default replication factor of 1 and a default partition count of 10.

To verify that the extension is installed and configured properly, check the HiveMQ logs and the newly-created Kafka tab of your HiveMQ Control Center.

kafka control center 100k

Configuration

The HiveMQ Enterprise Extension for Kafka is preconfigured with sensible default settings. These default values are usually sufficient to you get started.

Configuration of the extension is divided into Kafka clusters and topic mappings.
Kafka Clusters represents an arbitrary number of physical Kafka clusters and their respective connection settings.
Topic Mappings represents the mapping of sets of MQTT topic filters to a Kafka topic.
Each topic mapping refers to exactly one Kafka cluster. However, a single Kafka cluster can use multiple topic mappings.

Configuration Files

The kafka-configuration.xml file is located in the Kafka extension folder within the extensions folder of your HiveMQ installation.

The extension uses a simple but powerful XML based configuration.

Example configuration
...
<?xml version="1.0" encoding="UTF-8" ?>
<kafka-configuration xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
                     xsi:noNamespaceSchemaLocation="kafka-extension.xsd">

    <kafka-clusters>
        <kafka-cluster>
            <id>cluster01</id>
            <bootstrap-servers>127.0.0.1:9092</bootstrap-servers>
        </kafka-cluster>
    </kafka-clusters>

    <topic-mappings>
        <topic-mapping>
            <id>mapping01</id>
            <cluster-id>cluster01</cluster-id>
            <mqtt-topic-filters>
                <mqtt-topic-filter>mytopic/#</mqtt-topic-filter>
            </mqtt-topic-filters>
            <kafka-topic>my-kafka-topic</kafka-topic>
        </topic-mapping>
    </topic-mappings>

</kafka-configuration>
...


You can apply changes to the kafka-configuration.xml file at runtime. There is no need to restart the extension or HiveMQ for the changes to take effect. The previous configuration file is automatically archived to the config-archive sub-folder in the extensions folder.

If the configuration file is changed at runtime and the new configuration contains errors, the previous configuration is kept.

When a configuration changes at runtime, topic mapping pauses and re-starts automatically to include the changes. If only the mqtt-topic-filters in a topic mapping change, a restart of the topic mapping is not necessary. When a topic mapping restarts, the existing connections to Kafka for the topic mapping are also closed and re-established.


Kafka Cluster

The extension uses the <kafka-cluster> settings to establish the connection to a Kafka cluster.

Minimal Example configuration
...
<kafka-clusters>
    <kafka-cluster>
        <id>cluster01</id>
        <bootstrap-servers>127.0.0.1:9092</bootstrap-servers>
    </kafka-cluster>
<kafka-clusters>
...


The following values can be configured:

Table 1. Kafka Cluster options
Name Default Mandatory Description

id

-

The unique identifier of the Kafka cluster. This string can only contain the following characters abcdefghijklmnopqrstuvwxyz0123456789-_.

bootstrap-servers

-

A comma-separated list of the Kafka bootstrap servers with host and port

tls

disabled

Shows whether a TLS configuration is enabled for the Kafka cluster

authentication

none

Shows the type of Authentication that is configured for the Kafka cluster

If you need to connect to the same Kafka cluster with different settings, you can simply add the same cluster multiple times with a different id and different settings.


Topic Mapping

Topic mappings represent the routing information for MQTT topic filters to Kafka topics.

Minimal Example configuration
...
<topic-mappings>

    <topic-mapping>
        <id>mapping01</id>

        <!-- Kafka cluster to use for this topic mapping -->
        <cluster-id>cluster01</cluster-id>

        <!-- List of MQTT topic filters -->
        <mqtt-topic-filters>
            <mqtt-topic-filter>topic1/#</mqtt-topic-filter>
            <mqtt-topic-filter>topic2/#</mqtt-topic-filter>
        </mqtt-topic-filters>

        <!-- Target Kafka topic -->
        <kafka-topic>topic01</kafka-topic>

    </topic-mapping>

</topic-mappings>
...


The following values can be configured:

Table 2. Topic mapping options
Name Default Mandatory Description

id

-

The unique identifier for this topic mapping. This string can only contain the following characters abcdefghijklmnopqrstuvwxyz0123456789-_.

cluster-id

-

The identifier of the referenced Kafka cluster

mqtt-topic-filters

-

A list of MQTT topic filters

kafka-topic

-

The kafka topic that the messages are produced to

kafka-acks

ONE

The Ack mode for Kafka. Three Ack modes are possible: ZERO (do not wait for acknowledgements), ONE (at least one Ack), ACK (all replicas must Ack, depends on Kafka server configuration).

If you want to send the same messages to multiple Kafka topics, you can simply duplicate the topic mapping and change the id and the kafka-topic of the additional topic mappings.


TLS

To use a secured TCP connection to connect to your Kafka clusters, configure the TLS options in the <kafka-cluster> setting.

Example TLS configuration
...
<kafka-clusters>
    <kafka-cluster>
        <id>cluster01</id>
        <bootstrap-servers>127.0.0.1:9092</bootstrap-servers>

        <tls>
            <enabled>true</enabled>

            <!-- Truststore, to trust the Kafka server's certificate -->
            <truststore>
                <path>/opt/hivemq/conf/kafka-trust.jks</path>
                <password>truststorepassword</password>
            </truststore>

            <!-- Keystore, when mutual TLS is used -->
            <keystore>
                <path>/opt/hivemq/conf/kafka-key.jks</path>
                <password>keystorepassword</password>
                <private-key-password>privatekeypassword</private-key-password>
            </keystore>

            <!-- Cipher suites supported by the client -->
            <cipher-suites>
                <cipher-suite>TLS_AES_128_GCM_SHA256</cipher-suite>
                <cipher-suite>TLS_AES_256_GCM_SHA384</cipher-suite>
            </cipher-suites>

            <!-- Supported TLS protocols -->
            <protocols>
                <protocol>TLSv1.2</protocol>
            </protocols>

            <!-- If the client should verify the server's hostname -->
            <hostname-verification>true</hostname-verification>
        </tls>

    </kafka-cluster>
</kafka-clusters>
...
Table 3. TLS element options
Name Default Mandatory Description

enabled

false

If TLS is enabled, this setting can be set to true or false

protocols

All protocols enabled by the JVM

The enabled protocols

cipher-suites

All cipher suites enabled the JVM

The enabled cipher-suites

keystore.path

-

The path to the key store where your certificate and private key are included

keystore.password

-

The password to open the key store

keystore.private-key-password

-

The password for the private key

truststore.path

-

The path for the trust store which includes trusted client certificates

truststore.password

-

The password to open the trust store

hostname-verification

true

If hostname verification is enabled, this option can be set to true or false


Authentication

Authentication against Kafka servers can be configured per Kafka cluster in the <kafka-cluster> settings.

The HiveMQ Enterprise Extension for Kafka supports the following authentication mechanisms:

By default, no authentication is enabled.
Example authentication configuration
...
<kafka-clusters>
    <kafka-cluster>
        ...

        <authentication>
            <plain>
                <username>kafka-user</username>
                <password>kafka-password</password>
            </plain>
        </authentication>

    </kafka-cluster>
</kafka-clusters>
...


No authentication

The none authentication option disables authentication against the Kafka cluster.

No authentication configuration
...
<kafka-clusters>
    <kafka-cluster>
        ...

        <authentication>
            <none/>
        </authentication>

    </kafka-cluster>
</kafka-clusters>
...


Plain

Plain authentication authenticates with username and password only (not recommended).

Example plain configuration
...
<kafka-clusters>
    <kafka-cluster>
        ...

        <authentication>
            <plain>
                <username>kafka-user</username>
                <password>kafka-password</password>
            </plain>
        </authentication>

    </kafka-cluster>
</kafka-clusters>
...


SCRAM SHA256

Authentication with Salted Challenge Response Authentication Mechanism (SCRAM) alleviates most of the security concerns that come with a plain username/password authentication.

Example SCRAM SHA256 configuration
...
<kafka-clusters>
    <kafka-cluster>
        ...

        <authentication>
            <scram-sha256>
                <username>kafka-user</username>
                <password>kafka-user-secret</password>
            </scram-sha256>
        </authentication>

    </kafka-cluster>
</kafka-clusters>
...


SCRAM SHA512

Authentication with Salted Challenge Response Authentication Mechanism (SCRAM) alleviates most of the security concerns that come with a plain username/password authentication.

Example SCRAM SHA256 configuration
...
<kafka-clusters>
    <kafka-cluster>
        ...

        <authentication>
            <scram-sha512>
                <username>kafka-user</username>
                <password>kafka-user-secret</password>
            </scram-sha512>
        </authentication>

    </kafka-cluster>
</kafka-clusters>
...


GSSAPI

The Generic Security Service Application Program Interface (GSSAPI) is used to allow authentication against Kerberos.

Example GSSAPI configuration
...
<kafka-clusters>
    <kafka-cluster>
        ...

        <authentication>
            <gssapi>
                <key-tab-file>/opt/hivemq/kafka-client.keytab</key-tab-file>
                <kerberos-service-name>kafka</kerberos-service-name>
                <principal>kafka/kafka1.hostname.com@EXAMPLE.COM</principal>
                <store-key>true</store-key>
                <use-key-tab>true</use-key-tab>
            </gssapi>
        </authentication>

    </kafka-cluster>
</kafka-clusters>
...

The use of a Kerberos ticket cache can be configured as follows:

Example GSSAPI configuration
...
<kafka-clusters>
    <kafka-cluster>
        ...

        <authentication>
            <gssapi>
                <use-ticket-cache>true</use-ticket-cache>
            </gssapi>
        </authentication>

    </kafka-cluster>
</kafka-clusters>
...
Table 4. GSSAPI element options
Name Default Mandatory Description

use-key-tab

-

Enables use of a keytab file. This option requires a configured key-tab-file.

key-tab-file

-

Absolute path to the keytab file. Allowed values are true or false.

kerberos-service-name

-

Name of the kerberos service

principal

-

The name of the principal that is used. The principal can be a simple username or a service name.

store-key

-

Stores the keytab or the principal key in the private credentials of the subject. Allowed values are true or false.

use-ticket-cache

-

Enables use of a ticket cache. Allowed values are true or false.


Control Center

The HiveMQ Enterprise Extension for Kafka adds additional pages to the Control Center of HiveMQ. These pages allow you to monitor the messages that the extension processes.

kafka control center 100k


The Dashboard for the HiveMQ Enterprise Extension for Kafka provides an overview bar and detailed graphs.

The overview bar shows you the key metrics for the extension at a glance:

bar top
Table 5. Top bar values
Name Value type Description

Current status

Status

The current operational status of the extension. Shows errors or warnings if manual intervention is necessary.

Mapped MQTT messages

Messages per second

The amount of inbound MQTT Publish messages per second that the HiveMQ Enterprise Extension for Kafka processes

Written to Kafka

Messages per second

The amount of outbound messages that have already been written to Kafka

Topic Mappings

Absolute value

The number of configured topic mappings

Kafka Clusters

Absolute value

The number of configured Kafka clusters

Kafka Brokers

Absolute value

The number of available Kafka brokers that are visible to the extension


The detailed graphs provide additional insights into the messages the HiveMQ Enterprise Extension for Kafka processes.

The Mapped MQTT Publish Messages graph displays the Inbound MQTT Publish messages per second that are processed by the extension. Information is shown per HiveMQ cluster node.

mapped large

The Messages Written to Kafka graph displays the Outbound messages that are written to Kafka. These messages have been acknowledged by Kafka. Information is shown per Kafka cluster. It is expected that both graphs show an identical overall sum.

written large

Support

If you need help with the HiveMQ Enterprise Extension for Kafka or have suggestions on how we can improve the extension, please contact us at contact@hivemq.com.