HiveMQ Enterprise Extension for Kafka

The HiveMQ Enterprise Extension for Kafka implements the native Kafka protocol inside the HiveMQ MQTT broker. This implementation allows you to seamlessly integrate MQTT messages with one or more Kafka clusters.

Features

The HiveMQ Enterprise Extension for Kafka adds monitored, bi-directional MQTT messaging to and from your Kafka installation:

  • Forward MQTT messages from IoT devices that are connected to your HiveMQ MQTT broker to topics in one or more Kafka clusters.

  • Poll information from a Kafka topic and publish this information as MQTT messages to one or more MQTT topics.

  • Use placeholders to dynamically generate MQTT messages at runtime with values extracted from a Kafka record.

  • Use multiple MQTT topic filters with full support of MQTT wildcards to route MQTT messages to the desired Kafka topic.

  • Buffer messages on the HiveMQ MQTT broker to ensure high-availability and failure tolerance whenever a Kafka cluster is temporarily unavailable.

  • Track all of the MQTT messages to and from Kafka on a centralized HiveMQ control center.

publish cluster to kafka cluster

With the HiveMQ Enterprise Extension for Kafka you can use your HiveMQ MQTT broker to forward MQTT messages from multiple MQTT topic filters to as many Kafka topics as you choose. Conversely, the HiveMQ extension enables you to poll information from the records in a Kafka topic and publish appropriate parts of the information to as many MQTT topics as you wish.

The MQTT-to-Kafka function of the HiveMQ Enterprise Extension for Kafka acts as multiple Kafka producers that route the selected MQTT publish messages to the desired Kafka topics. Each HiveMQ extension on every HiveMQ node in the HiveMQ cluster automatically opens connections to all of the Kafka brokers that are needed in the desired Kafka clusters.

HiveMQ sends the MQTT messages that are published to the selected MQTT topic to Kafka with the original MQTT topic from the publish message as key and the payload of the MQTT publish message as the value.

The Kafka-to-MQTT function of the extension makes it possible to transform Kafka records and publish values that are extracted from the Kafka topic in new MQTT messages. There are many ways to configure how HiveMQ uses the information from Kafka.

All of the HiveMQ Enterprise Extensions for Kafka in a HiveMQ cluster work together as a consumer group that balances the work load and retrieves data from an individual Kafka topic.

By default, the Kafka topic is used as the MQTT topic and the Kafka value is the MQTT payload.

The ability to multicast targeted MQTT messages from individual records in a Kafka topic is attractive for a wide range of possible use cases. For example, collect event information from particular services in a Kafka topic. Listen to the Kafka topic and react with dynamically generated MQTT messages to the appropriate MQTT topics.

Since both the MQTT-to-Kafka and the Kafka-to-MQTT functions are managed through the HiveMQ extension, you can easily maintain a clear overview of all activity from the control center of your HiveMQ broker.

The HiveMQ Enterprise Extension for Kafka does not interfere with the regular application of the MQTT Pub/Sub mechanism and can be used in addition to normal MQTT subscriptions and shared subscriptions.

Requirements

  • A running HiveMQ Professional or Enterprise Edition installation (versions 4.1.0 and higher)

  • A running Kafka Cluster (versions 0.10.2 and higher)

  • A valid licence for the HiveMQ Enterprise Extension for Kafka.

If no license is provided, HiveMQ uses a free evaluation license automatically. The evaluation licence for the HiveMQ Enterprise Extension for Kafka is valid for 5 hours. For more licence information or to request an extended evaluation period, contact HiveMQ sales.

Installation

  • To install the HiveMQ Enterprise Extension for Kafka, download the extension from the HiveMQ Marketplace.

  • Unzip the downloaded file into the extensions folder of your HiveMQ installation.

  • Place the license for the HiveMQ Enterprise Extension for Kafka in the license folder of your HiveMQ installation.

    └─ <HiveMQ folder>
        ├─ bin
        ├─ config
        ├─ data
        ├─ extensions
        │   ├─ hivemq-kafka-extension
        │   └─ ...
        ├─ license
        ├─ log
        └─ ...
    To function properly, the HiveMQ Enterprise Extension for Kafka must be installed on all of the HiveMQ broker nodes in a HiveMQ cluster.
  • Once installed, you need to configure the extension for your individual Kafka clusters and Kafka topics. An example configuration file is included in the download file. For information on all possible configuration options, see the configuration section.

If any of the configured Kafka topics do not already exist, the HiveMQ Enterprise Extension for Kafka attempts to create the topic automatically. The topic is created with a default replication factor of 1 and a default partition count of 10.

To verify that the extension is installed and configured properly, check the HiveMQ logs and the newly-created Kafka tab that is added to your HiveMQ Control Center.

kafka control center 1710

Configuration

The HiveMQ Enterprise Extension for Kafka provides seamless configuration hot reload. Changes that you make to the configuration of the extension are updated while the extension is running, without the need for a restart.

Version 1.1.0 of the HiveMQ Enterprise Extension for Kafka renames two topic-mapping tags:

  • <mqtt-to-kafka-mappings> replaces the <topic-mappings> tag.

  • <mqtt-to-kafka-mapping> replaces the <topic-mapping> tag.

The new tags reflect increased topic-mapping functionality in the extension that makes it possible to read records from Kafka and publish them as MQTT messages. Although the old topic-mapping tags are backward compatible, we highly recommended that you replace the old tags when you update to the new version of the extension to ensure clarity.

The HiveMQ Enterprise Extension for Kafka is preconfigured with sensible default settings. These default values are usually sufficient to you get started.

Configuration of the extension is divided into three sections:

  • Kafka Clusters general information and connection settings for one or more Kafka clusters.

  • MQTT-to-Kafka Mappings routing information for one or more MQTT topic filters to a Kafka topic.

  • Kafka-to-MQTT Mappings routing information for one or more records in a Kafka topic to one or more MQTT topics.

Each mapping refers to exactly one Kafka cluster. However, a single Kafka cluster can use multiple mappings.

Configuration File

The kafka-configuration.xml file is located in the Kafka extension folder within the extensions folder of your HiveMQ installation.

The extension uses a simple but powerful XML based configuration.

Example configuration
...
<?xml version="1.0" encoding="UTF-8" ?>
<kafka-configuration xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
                     xsi:noNamespaceSchemaLocation="kafka-extension.xsd">

    <kafka-clusters>
        <kafka-cluster>
            <id>cluster01</id>
            <bootstrap-servers>127.0.0.1:9092</bootstrap-servers>
        </kafka-cluster>
    </kafka-clusters>

    <mqtt-to-kafka-mappigs>
        <mqtt-to-kafka-mapping>
            <id>mapping01</id>
            <cluster-id>cluster01</cluster-id>
            <mqtt-topic-filters>
                <mqtt-topic-filter>mytopic/#</mqtt-topic-filter>
            </mqtt-topic-filters>
            <kafka-topic>my-kafka-topic</kafka-topic>
        </mqtt-to-kafka-mapping>
    </mqtt-to-kafka-mappings>

    <kafka-to-mqtt-mappings>
        <kafka-to-mqtt-mapping>
            <id>mapping02</id>
            <cluster-id>cluster01</cluster-id>
            <kafka-topics>
                <kafka-topic>first-kafka-topic</kafka-topic>
                <kafka-topic>second-kafka-topic</kafka-topic>
                <!-- arbitrary amount of kafka topics -->
            </kafka-topics>
        </kafka-to-mqtt-mapping>
    </kafka-to-mqtt-mappings>

</kafka-configuration>
...


You can apply changes to the kafka-configuration.xml file at runtime. There is no need to restart the extension or HiveMQ for the changes to take effect (hot reload). The previous configuration file is automatically archived to the config-archive subfolder in the extensions folder.

If you change the configuration file at runtime and the new configuration contains errors, the previous configuration is kept.

When a configuration changes at runtime, topic mapping pauses and re-starts automatically to include the changes.

If only the mqtt-topic-filters in a topic mapping change, it is not necessary to restart the topic mapping. When a topic mapping restarts, the existing connections to Kafka for the topic mapping are closed and re-established.


Kafka Cluster

The extension uses the <kafka-cluster> settings to establish the connection to a Kafka cluster.

Example Minimal Configuration
...
<kafka-clusters>
    <kafka-cluster>
        <id>cluster01</id>
        <bootstrap-servers>127.0.0.1:9092</bootstrap-servers>
    </kafka-cluster>
<kafka-clusters>
...


The following <kafka-cluster> values can be configured:

Name Default Mandatory Description

id

-

The unique identifier of the Kafka cluster. This string can only contain the following characters abcdefghijklmnopqrstuvwxyz0123456789-_.

bootstrap-servers

-

A comma-separated list of the Kafka bootstrap servers with host and port.

tls

disabled

Shows whether a TLS configuration is enabled for the Kafka cluster.

authentication

none

Shows the type of Authentication that is configured for the Kafka cluster.

If you need to connect to the same Kafka cluster with different settings, simply add the same cluster multiple times with a different id and different settings.


Topic Mapping & Message Processing

hivemq kafka architecture

MQTT-to-Kafka Topic Mapping

MQTT-to-Kafka mappings represent the routing information from MQTT topic filters to Kafka topics.

Example Minimal Configuration
...
<mqtt-to-kafka-mappings>

    <mqtt-to-kafka-mapping>
        <id>mapping01</id>

        <!-- Kafka cluster to use for this topic mapping -->
        <cluster-id>cluster01</cluster-id>

        <!-- List of MQTT topic filters -->
        <mqtt-topic-filters>
            <mqtt-topic-filter>topic1/#</mqtt-topic-filter>
            <mqtt-topic-filter>topic2/#</mqtt-topic-filter>
        </mqtt-topic-filters>

        <!-- Target Kafka topic -->
        <kafka-topic>topic01</kafka-topic>

    </mqtt-to-kafka-mapping>

</mqtt-to-kafka-mappings>
...


The following <mqtt-to-kafka-mapping> values can be configured:

Name Default Mandatory Description

id

-

The unique identifier for this topic mapping. This string can only contain the following characters abcdefghijklmnopqrstuvwxyz0123456789-_.

cluster-id

-

The identifier of the referenced Kafka cluster.

mqtt-topic-filters

-

A list of MQTT topic filters.

kafka-topic

-

The Kafka topic to which the MQTT topic is routed.

kafka-acks

ONE

The Ack mode for Kafka. Specifies the way that the Kafka cluster has to acknowledge incoming messages. Three Ack modes are possible: ZERO (do not wait for acknowledgements), ONE (at least one Ack), ACK (all replicas must Ack, depends on Kafka server configuration).

If you want to send the same messages to multiple Kafka topics, simply duplicate the topic mapping and change the id and the kafka-topic of the additional topic mappings.


Kafka-to-MQTT Topic Mapping

Kafka-to-MQTT mappings represent the routing information from Kafka topics to MQTT topics. Information from the Kafka record is sent on the MQTT topics that you define in the configuration. If no MQTT topic is set, the Kafka topic is used as the MQTT topic. Unless otherwise configured, the payload of the MQTT message is the value of the Kafka record.

Each Kafka-to-MQTT Topic Mapping in the Kafka Extension has a dedicated Kafka consumer. The Kafka consumers for the same topic mapping in the HiveMQ cluster are combined in one consumer group. The number of Kafka-to-MQTT Topic Mappings equals the number of consumer groups.

The extension reads and processes Kafka records in batches. The offset of the last record of each processed batch is committed to Kafka as soon as the batch is processed. This method ensures continuity if an extension in the cluster is temporarily unavailable or any other rebalancing of the cluster occurs.

After a restart, the extension continues reading from the point where it stopped before the restart (same is true if one node crashes and another node takes over the reading).

Since each partition in your Kafka topic has one dedicated consumer, it is a best practice to have more partitions than Kafka extensions (consumers). This ensures higher efficiency and maximum utilization of your HiveMQ extension.

To increase practical usability and fulfill a wide range of use cases, it is possible to extract information from the Kafka value. To enable the extraction of information, the Kafka value must be deserialized by the HiveMQ Enterprise Extension for Kafka. Supported information formats are listed here. The information can be extracted with a semantic that is similar to xpath. For more information, see supported path syntax.

Example Minimal Configuration of a Kafka-to-MQTT Mapping
...
<kafka-to-mqtt-mappings>

    <kafka-to-mqtt-mapping>
        <!-- The id is used for logging messages.  -->
        <id>mapping01</id>

        <!-- Kafka cluster to use for this topic mapping -->
        <cluster-id>cluster02</cluster-id>

        <!-- List of kafka topics -->
        <kafka-topics>
            <kafka-topic>topic1/#</kafka-topic>
            <kafka-topic>topic2/#</kafka-topic>
        </kafka-topics>

    </kafka-to-mqtt-mapping>

</kafka-to-mqtt-mappings>
...

In the example, the MQTT topic of the MQTT publish messages defaults to the Kafka topic and the MQTT payload defaults to the Kafka value.


The following <kafka-to-mqtt-mapping> values can be configured:

Name Default Mandatory Type in Path Description

id

-

-

The unique identifier for this mapping. This string can only contain the following characters abcdefghijklmnopqrstuvwxyz0123456789-_.

cluster-id

-

-

The identifier of the referenced Kafka cluster.

kafka-topics

-

-

At least one Kafka topic and/or Kafka-topic pattern is required. Be careful when using kafka-topic-pattern and deserialization, you might poll messages from kafka you don’t have a schema for, which can lead to losing messages. The safe route is to explicitly specify all kafka-topics here and also for each schema.

use-schema-registry

none

-

This field specifies whether the Kafka records are deserialized. Two modes are possible: none (no deserialization) and local (use local avsc files to deserialize AVRO objects.).

mqtt-topics

${KAFKA_TOPIC}

String-Array

Multiple MQTT topics to which the messages are sent.

mqtt-payload

${KAFKA_VALUE}

Bytes

The payload for the MQTT messages. When no schema is applied, the original Kafka value is used as the MQTT payload and cannot de modified.

mqtt-publish-fields

-

see below

The metadata that is defined for the MQTT publish message. See the following MQTT Publish Fields table for information on all available fields.

If you use a schema registry, we do not recommend the use of Kafka topic patterns. The combination of Kafka topic patterns and a schema registry can lead to unwanted behavior that is difficult to interpret.


MQTT Publish

The metadata of the MQTT Publish for each Kafka-to-MQTT mapping is defined in the <mqtt-publish-fields> of the configuration file.

Each MQTT Publish field can contain only one variable.
Name Default Mandatory Type in Path Description

retained-flag

false

Boolean

Defines whether all MQTT messages from the corresponding Kafka-to-MQTT mapping are retained. Possible values are true or false. If set to true and retain is disabled by HiveMQ, messages are not retained.

payload-format-indicator

-

String

Sets the format of the payload of the corresponding Kafka-to-MQTT mapping. Possible values are UTF-8 or UNSPECIFIED.

message-expiry-interval

-

Long,integer, or string

A long value that indicates the lifetime in seconds of a published message. The value must be positive.

response-topic

-

String

Sets the response topic for the MQTT messages.

correlation-data

-

Bytes

Sets the correlation data for the MQTT messages. This data is used to show to which request a response message belongs. If you insert the correlation-data directly into the config, which means you don’t use path-syntax to extract it, you have to encode it in Base64.

user-properties

-

String

Sets multiple <user-properties> with <key> and <value> pairs for the MQTT messages.

content-type

-

String

Specifies the content type of the MQTT message.

qos

1

Long, integer, or string

Sets the Quality of Service (QoS) for the MQTT message. 0 at most once, 1 at least once, 2 exactly once.

Example of MQTT Publish Fields
...
<mqtt-publish-fields>
    <retained-flag>true</retained-flag>
    <payload-format-indicator>${//payload-format}</payload-format-indicator>
    <message-expiry-interval>10000</message-expiry-interval>
    <response-topic>${//response-topic}</response-topic>
    <correlation-data>${//correlation-data}</correlation-data>
    <user-properties>
        <user-property>
            <key>key1</key>
            <value>value1</value>
        </user-property>
        <user-property>
            <key>key2</key>
            <value>value2</value>
        </user-property>
    </user-properties>
    <content-type>${//content-type}</content-type>
    <qos>2</qos>
</mqtt-publish-fields>
...
In this example multiple placeholders are used. Have a look to the subsequent documentation to get more information about the usage of placeholders.

Kafka Constants

Kafka Constants are placeholders that represent various sections of information from a Kafka record.
These placeholders are replaced with the associated content from the Kafka record at runtime.
No transformations are applied to the information that is read from the Kafka record.
The Kafka Constant is denoted with ${}.

Name Description

${KAFKA_TOPIC}

The value from the topic of the Kafka record.

${KAFKA_KEY}

The value from the key of the Kafka record.

${KAFKA_VALUE}

The value from the value of the Kafka record.

Examples of Kafka Constants usage
...
<mqtt-topics>
    <mqtt-topic>topic/test/topic-1</mqtt-topic>
    <mqtt-topic>${KAFKA_TOPIC}</mqtt-topic>
    <mqtt-topic>topic/test/${KAFKA_KEY}</mqtt-topic>
</mqtt-topics>
<mqtt-payload>${KAFKA_VALUE}</mqtt-payload>
...


Schema Registries

To be able to use the values that you extract from a Kafka record, the values must be deserialized with an appropriate schema. Each schema file that you want to use must be added to your local schema registry.
Your local schema registry can contain more than one schema file of the same type. When <use-schema-registry> is set to local in a topic mapping, you can specify <schema-registries> in the configuration.

Currently, the HiveMQ Enterprise Extension for Kafka only supports deserialization of the Avro schema format.

If you use a schema file, the HiveMQ extension validates each message that arrives. Only valid messages are processed.

A Kafka topic can only have one schema file. If a Kafka topic matches multiple entries in the schema registry, the first schema that appears in the configuration file is used.
Table 1. Schema Registry
Name Default Mandatory Description

name

-

The name of the local schema registry.

avro-schema-entries

-

Contains multiple Avro schema entries.

avro-schema-entry

-

Defines an Avro schema with serialization, Avro file, and Kafka topic fields.

serialization

binary

Specifies whether the Avro object is serialized in binary or json format. The format must be provided for objects that you want to deserialize.

avro-file

-

Specifies the relative path to the avro file.

kafka-topics

-

Kafka topics, for the specified schema. Either one Kafka topic or one Kafka-topic pattern is required. These fields specify to which Kafka topics the schema applies.

kafka-topic

-

The Kafka topic of the entry.

kafka-topic-pattern

-

A pattern that identifies which schema file the Kafka topic uses. Be careful when using kafka-topic-pattern, you might poll messages from topics that don’t use this schema and therefore you can’t deserialize, which can lead to losing messages. The safe route is to explicitly specify all kafka-topics where the schema applies.

To use the Schema Registry for all polled kafka-topics, use <kafka-topic-pattern>.*</kafka-topic-pattern>. This pattern is a regular expression that matches every topic.
Example Schema Registry
<kafka-configuration>
    ...
    <schema-registries>
        <local-schema-register>
            <name>local-register</name>
            <avro-schema-entries>
                <avro-schema-entry>
                    <serialization>binary</serialization>
                    <avro-file>/local-schema-registry/schemafile.avsc</avro-file>
                    <kafka-topics>
                        <!-- Caution: Either use kafka-topic or kafka-topic-pattern. The usage of both isn't allowed -->
                        <kafka-topic>the-first-kafka-topic</kafka-topic>
                        <kafka-topic>a-second-kafka-topic</kafka-topic>
                        <kafka-topic-pattern>a-regex-.*</kafka-topic-pattern>
                    </kafka-topics>
                </avro-schema-entry>
            </avro-schema-entries>
        </local-schema-register>
    </schema-registries>
    ...
</kafka-configuration>


Supported Path Syntax

To extract values from fields of the deserialized Kafka value, the HiveMQ Enterprise Extension for Kafka uses a path syntax that is similar to the XPath syntax. The path syntax is denoted by ${}.

  • // selects the root element

  • / moves down in the tree

  • nodename specifies the node that gets selected or moved to

  • nodenames[x] selects the x-th element in the nodenames array

The path must begin with the root element //.
Examples of Path Syntax Usage
...
<mqtt-topics>
    <mqtt-topic>topic/example/topic-1/${//mqtt-information/topic}</mqtt-topic>
</mqtt-topics>
<mqtt-payload>${//mqtt-information/client[2]/payload}</mqtt-payload>
...


Path Examples

This section provides examples of path semantic usage.

We use the Avro schema as a basis:

Example Avro Schema
{
  "type": "record",
  "name": "userInfo",
  "namespace": "my.example",
  "fields": [
  {
    "name": "clientIDs",
    "type": {
      "type": "array",
      "items": "string"
    }
  },
  {
  "name": "payload",
  "type": "bytes"
  },
  {
  "name": "publish_metadata",
  "type": {
    "type": "record",
    "name": "publish_information",
    "fields": [
      {
      "name": "retained",
      "type": "boolean",
      "default": "NONE"
      },
      {
      "name": "payload_format_indicator",
      "type": "string"
      },
      {
      "name": "expiry",
      "type": "long"
      }
      ]
    },
    "default": {}
    }
  ]
}

This example shows how to extract client IDs and insert them into MQTT topics. The format of the topics is test/client/<clientId>. This format makes it possible to send one Kafka Record to many MQTT Topics. To get the client IDs, we can use the path //clientIDs.

The configuration in the kafka-configuration.xml file is done as follows:

Example Configuration
...
<mqtt-topics>
    <mqtt-topic>test/client/${//clientIDs}</mqtt-topic>
</mqtt-topics>
...
The path must be placed within ${ and }

In the next example, we assume the field `clientIDs' contains the values ["clientId1", "clientId2", "clientId3"]. As a result, the MQTT publish is sent to the following three topics:

  • test/client/clientId1

  • test/client/clientId2

  • test/client/clientId3

To omit the first and last client ID and only send the MQTT Publish to the second client ID, we can add [1] at the end of the desired entry: //clientIDs[1]. (Since arrays begin at 0, we need to use [1] for the second entry)

Example Configuration
...
<mqtt-topics>
    <mqtt-topic>test/client/${//clientIDs[1]}</mqtt-topic>
</mqtt-topics>
...
Supported Formats

The HiveMQ Enterprise Extension for Kafka currently supports deserialization of the following formats:

  • AVRO

The current version of the HiveMQ Enterprise Extension can only deserialize Kafka records that use the Avro format. The path semantic can not be used with Kafka records that use other formats.


TLS

To connect to your Kafka clusters over a secure TCP connection, configure the TLS options in your <kafka-cluster> settings.

Example TLS Configuration
...
<kafka-clusters>
    <kafka-cluster>
        <id>cluster01</id>
        <bootstrap-servers>127.0.0.1:9092</bootstrap-servers>

        <tls>
            <enabled>true</enabled>

            <!-- Truststore, to trust the Kafka server's certificate -->
            <truststore>
                <path>/opt/hivemq/conf/kafka-trust.jks</path>
                <password>truststorepassword</password>
            </truststore>

            <!-- Keystore, when mutual TLS is used -->
            <keystore>
                <path>/opt/hivemq/conf/kafka-key.jks</path>
                <password>keystorepassword</password>
                <private-key-password>privatekeypassword</private-key-password>
            </keystore>

            <!-- Cipher suites supported by the client -->
            <cipher-suites>
                <cipher-suite>TLS_AES_128_GCM_SHA256</cipher-suite>
                <cipher-suite>TLS_AES_256_GCM_SHA384</cipher-suite>
            </cipher-suites>

            <!-- Supported TLS protocols -->
            <protocols>
                <protocol>TLSv1.2</protocol>
            </protocols>

            <!-- If the client should verify the server's hostname -->
            <hostname-verification>true</hostname-verification>
        </tls>

    </kafka-cluster>
</kafka-clusters>
...
Name Default Mandatory Description

enabled

false

If TLS is enabled, this setting can be set to true or false.

protocols

All protocols enabled by the JVM

The enabled protocols.

cipher-suites

All cipher suites enabled the JVM

The enabled cipher-suites.

keystore.path

-

The path to the key store where your certificate and private key are included.

keystore.password

-

The password to open the key store.

keystore.private-key-password

-

The password for the private key.

truststore.path

-

The path for the trust store which includes trusted client certificates.

truststore.password

-

The password to open the trust store.

hostname-verification

true

If hostname verification is enabled, this option can be set to true or false.


Authentication

Authentication against Kafka servers can be configured per Kafka cluster in the <kafka-cluster> settings.

The HiveMQ Enterprise Extension for Kafka supports the following authentication mechanisms:

By default, the No Authentication option is enabled.
Example Authentication Configuration
...
<kafka-clusters>
    <kafka-cluster>
        ...

        <authentication>
            <plain>
                <username>kafka-user</username>
                <password>kafka-password</password>
            </plain>
        </authentication>

    </kafka-cluster>
</kafka-clusters>
...


No Authentication

To disable authentication against the Kafka cluster, enter none as your authentication option.

Example No Authentication Configuration
...
<kafka-clusters>
    <kafka-cluster>
        ...

        <authentication>
            <none/>
        </authentication>

    </kafka-cluster>
</kafka-clusters>
...


Plain

Plain authentication uses only the username and password for authentication (not recommended).

Example Plain Authentication Configuration
...
<kafka-clusters>
    <kafka-cluster>
        ...

        <authentication>
            <plain>
                <username>kafka-user</username>
                <password>kafka-password</password>
            </plain>
        </authentication>

    </kafka-cluster>
</kafka-clusters>
...


SCRAM SHA256

Authentication with Salted Challenge Response Authentication Mechanism (SCRAM) alleviates most of the security concerns that come with a plain username/password authentication.

Example SCRAM SHA256 Authentication Configuration
...
<kafka-clusters>
    <kafka-cluster>
        ...

        <authentication>
            <scram-sha256>
                <username>kafka-user</username>
                <password>kafka-user-secret</password>
            </scram-sha256>
        </authentication>

    </kafka-cluster>
</kafka-clusters>
...


SCRAM SHA512

Authentication with Salted Challenge Response Authentication Mechanism (SCRAM) alleviates most of the security concerns that come with a plain username/password authentication.

Example SCRAM SHA256 Authentication Configuration
...
<kafka-clusters>
    <kafka-cluster>
        ...

        <authentication>
            <scram-sha512>
                <username>kafka-user</username>
                <password>kafka-user-secret</password>
            </scram-sha512>
        </authentication>

    </kafka-cluster>
</kafka-clusters>
...


GSSAPI

The Generic Security Service Application Program Interface (GSSAPI) is used to allow authentication against Kerberos.

If you use GSSAPI to authenticate the HiveMQ Enterprise Extension to your Kafka clusters via Kerberos, the Kafka client in the extension must have administrative rights. If admin rights are not enabled, the extension is not permitted to connect to Kafka.
Example GSSAPI Authentication Configuration
...
<kafka-clusters>
    <kafka-cluster>
        ...

        <authentication>
            <gssapi>
                <key-tab-file>/opt/hivemq/kafka-client.keytab</key-tab-file>
                <kerberos-service-name>kafka</kerberos-service-name>
                <principal>kafka/kafka1.hostname.com@EXAMPLE.COM</principal>
                <store-key>true</store-key>
                <use-key-tab>true</use-key-tab>
            </gssapi>
        </authentication>

    </kafka-cluster>
</kafka-clusters>
...

The use of a Kerberos ticket cache can be configured as follows:

Example GSSAPI Authentication Configuration
...
<kafka-clusters>
    <kafka-cluster>
        ...

        <authentication>
            <gssapi>
                <use-ticket-cache>true</use-ticket-cache>
            </gssapi>
        </authentication>

    </kafka-cluster>
</kafka-clusters>
...
Name Default Mandatory Description

use-key-tab

-

Enables use of a keytab file. This option requires a configured key-tab-file.

key-tab-file

-

Absolute path to the keytab file. Allowed values are true or false.

kerberos-service-name

-

Name of the kerberos service.

principal

-

The name of the principal that is used. The principal can be a simple username or a service name.

store-key

-

Stores the keytab or the principal key in the private credentials of the subject. Allowed values are true or false.

use-ticket-cache

-

Enables use of a ticket cache. Allowed values are true or false.


Kafka Dashboard in the HiveMQ Control Center

The HiveMQ Enterprise Extension for Kafka adds additional metrics to your HiveMQ control center. This information allows you to monitor the messages that the extension processes. Metrics are created for every action that can be measured.

kafka control center 1710


The Dashboard for the HiveMQ Enterprise Extension for Kafka provides an overview bar and detailed graphs for both messaging directions: from MQTT to Kafka and from Kafka to MQTT.

MQTT-to-Kafka Section

The MQTT-to-Kafka section of the Kafka page provides information on MQTT messages that are sent to Kafka. The overview bar in the MQTT-to-Kafka section shows you the key metrics for all MQTT-to-Kafka mappings at a glance:

kafka producer bar top 1710
Name Value type Description

Current Status

Status

The current operational status of the extension. If manual intervention is required, errors or warnings display here.

Consumed MQTT Messages

Messages per second

The number of inbound MQTT Publish messages per second that the HiveMQ Enterprise Extension for Kafka processes.

Written to Kafka

Messages per second

The number of outbound messages that have already been written to Kafka.

Producer Topic Mappings

Absolute value

The number of configured MQTT-to-Kafka topic mappings.

Kafka Clusters

Absolute value

The number of configured Kafka clusters.

Kafka Brokers

Absolute value

The number of available Kafka brokers that are visible to the extension.


The detailed graphs provide additional insights into the messages the HiveMQ Enterprise Extension for Kafka processes.

The Consumed MQTT Publish Messages graph displays the Inbound MQTT Publish messages per second that are processed by the extension. Information is shown per HiveMQ cluster node.

kafka consumed mqtt 1710
Figure 1. Consumed MQTT Publish Messages

The Messages Written to Kafka graph displays the Outbound messages that are written to Kafka. These messages have been acknowledged by Kafka. Information is shown per Kafka cluster. It is expected that both graphs show an identical overall sum.

kafka messages written 1710
Figure 2. Messages Written to Kafka

Kafka to MQTT Section

The Kafka-to-MQTT section of the Kafka page shows information on the information that is sent from Kafka to MQTT. The overview bar in the Kafka-to-MQTT section shows you the key metrics for all Kafka-to-MQTT mappings at a glance:

kafka consumer bar top 1710
Name Value type Description

Non-deserializable Kafka Messages

Absolute value

The number of Kafka messages that could not be deserialized and used.

Created MQTT messages

Messages per second

The number of inbound MQTT Publish messages per second that the HiveMQ Enterprise Extension produces.

Polled from Kafka

Messages per second

The number of outbound messages that are polled from Kafka.

Consumer Topic Mappings

Absolute value

The number of configured Kafka-to-MQTT topic mappings.

Kafka Clusters

Absolute value

The number of configured Kafka clusters.

Kafka Brokers

Absolute value

The number of available Kafka brokers that are visible to the extension.

The Created MQTT Publish Messages graph displays the MQTT Publish messages per second that are created by the extension. Information is shown per HiveMQ cluster node.

kafka created mqtt 1710

The Messages Polled From Kafka graph displays the inbound messages that are polled from Kafka. Information is shown per Kafka cluster. It is expected that the Created MQTT Publish Messages graph shows an identical or higher overall sum than the Messages Polled From Kafka graph. This discrepancy is due to scenarios in which multiple MQTT messages are created from a single Kafka record.

kafka messages polled 1710

HiveMQ Enterprise Extension for Kafka Upgrade Process

Releases of the HiveMQ Enterprise Extensions for Kafka use semantic versioning:

major.minor.patch
Major

Major versions are extensive feature releases that add functionality that is not downward compatible with previous versions.

Minor

Minor versions provide new features and functionality that is downward compatible with previous versions.

Patch

Patch versions (also known as bugfix releases) adjust existng functionality in a downward compatible manner and are drop-in replacements for the current minor version.

Prepare Your Upgrade Folder

The goal of your upgrade preparation is to collect and configure the files that you need to upgrade from the version of the extension that you currently use to the desired new version.

Before you begin the upgrade process, create a backup of the entire hivemq-kafka-extension folder that is located in the extensions folder of your HiveMQ installation. This backup preserves all of the configuration information that is associated with the extension version that you currently use.
  1. Download and unzip the new version of the extension from the HiveMQ Marketplace. In this procedure, you modify the files in the downloaded hivemq-kafka-extension folder as needed in preparation for the upgrade.

  2. Open the hivemq-kafka-extension folder that you downloaded and use the kafka-configuration.example.xml to create a new kafka-configuration.xml file. Based on your use case and new features that the version adds, adapt the kafka-configuration.xml file that you create accordingly. For more information, see Configuration.

  3. If you use a schema registry, add this information to the local-schema-registry folder in the hivemq-kafka-extension folder that you downloaded. For more information, see Schema Registries.

  4. If you currently use any third-party licences with the extension, add this information to the third-party-licenses folder in the hivemq-kafka-extension folder that you downloaded.

Before you upgrade the extensions in your production environment, test the new configuration of the extension in a test environment.

Rolling Upgrade of Enterprise Extensions in Your Cluster

HiveMQ extensions can be hot-reloaded. This means that extensions can be installed, enabled, and disabled at runtime. Hot-reloading of extensions enhances HiveMQs high-availability strategy and provides a way to change extensions without introducing any kind of downtime.

To avoid any unwanted impact on performance, the addition of a new node when you do a rolling upgrade is a best practice for all production environments. For more information, see steady resource utilization.

If you run your HiveMQ installation on a single node, you must add a node when you upgrade the extension to prevent data loss during the upgrade.
Example Upgrade Procedure

Cluster management methods vary, the following upgrade procedure assumes that the added node is installed with the same version of the extension as the rest of the cluster. Example commands use Linux.

  1. Add a node to your HiveMQ cluster and switch to the HiveMQ home folder. Addition of a node is a recommended optional step.

    #switch to hivemq home folder
    cd /path/to/hivemq
  2. Disable the old version of the HiveMQ Enterprise Extension for Kafka and wait until the extension stops successfully. You do not need to shut down HiveMQ. HiveMQ extensions can be enabled and disabled at runtime (hot reload). For more information on how to disable an extension, see Extension Lifecycle.

    #disable extension
    touch extensions/hivemq-kafka-extension/DISABLED
  3. Overwrite files in the hivemq-kafka-extension folder with new files from the hivemq-kafka-extension folder that you prepared from the download.

    #overwrite old extension files with files from new extension
    (You must overwrite individual files. If you try to overwrite the entire folder, HiveMQ throws an exception.)
    (To receive a confirm-overwrite prompt, use \cp -ri)
    \cp -r /path/to/new/hivemq-kafka-extension/ extensions/
  4. Remove the hivemq-kafka-extension-x.x.x.jar file from the old version of the extension.

    #remove old jar
    rm -f extensions/hivemq-kafka-extension/hivemq-kafka-extension-1.0.0.jar
  5. To enable the new version, delete the DISABLE file that you added. Wait until the extension successfully starts and proceed to the next node.

    #enable extension again
    rm -f extensions/hivemq-kafka-extension/DISABLED
  6. Repeat the replacement procedure on each node in the cluster until all old versions of the extension are replaced.

  7. Once all nodes are updated, remove the node that you added at the start of this procedure.

To maintain a seamless record of all configuration changes across different versions of the extension, do not delete the config-archive subfolder from the previous version.

To verify that the extension is installed and configured properly, check your HiveMQ logs and the Kafka tab of your HiveMQ Control Center.

Maintain steady resource utilization during your upgrade

Before you disable an old version of the extension, we recommend that you add a node to your cluster that is configured with the new extension version. When you disable an extension on one of the nodes in your cluster during the upgrade process, the workload of the extension you disable is redistributed over the remaining extensions that are available in the cluster. This redistributed workload increases the amount of work each remaining extension must do. The addition of a new node with the new extension, before you disable any of the existing extensions, helps you maintain a steady workload in your cluster as you upgrade the extension on each node. Once the extensions on all previously-existing nodes are upgraded and enabled, you can remove the node that you added to the cluster.

Support

If you need help with the HiveMQ Enterprise Extension for Kafka or have suggestions on how we can improve the extension, please contact us at contact@hivemq.com.