Popular HiveMQ Extension Use Cases

A HiveMQ extension has virtually infinite possibilities. Here are a few typical use cases to help you dive deeper into the details of the HiveMQ extension SDK and associated APIs:

Here are some typical uses for the HiveMQ Extension SDK:

Authentication

Authentication is the mechanism that verifies the identity of your communication partner.
Air travel provides a classic example of authentication. Before you are permitted to board a plane, airport security checks your passport. Presentation of a passport that identifies you as the person whose name is on the ticket, allows you to proceed.
In the context of MQTT, authentication is used to check if an MQTT client is allowed to connect to a broker and if it is allowed to use a particular client identifier.

To apply authentication in a HiveMQ extension, you implement a SimpleAuthenticator and register an AuthenticatorProvider with HiveMQ. This implementation is done in the extensionStart method of the main class. The custom provider for authenticator is registered with the SecurityRegistry.

Registering Authenticator
public void extensionStart(ExtensionStartInput extensionStartInput, ExtensionStartOutput extensionStartOutput) {
    //register the provider with the SecurityRegistry
    Services.securityRegistry().setAuthenticatorProvider(new MyAuthenticatorProvider());
}

In this example, the Authenticator Provider returns a new Authenticator for every new MQTT connection. It is also possible to share an instance of an authenticator between multiple MQTT connections, see this example.

Example Authenticator Provider
public class MyAuthenticatorProvider implements AuthenticatorProvider {

    @Override
    public Authenticator getAuthenticator(AuthenticatorProviderInput authenticatorProviderInput) {
        //return an instance of an Authenticator
        return new MyAuthenticator();
    }

}

A SimpleAuthenticator that implements username/password based authentication for MQTT clients uses the information from the input object to determine whether a client is allowed to connect.

In this example, only clients with the username "admin" and password "hivemq" are allowed to connect. Clients that lack the username or password are not allowed to connect.

Example Username/Password Authenticator
public class MyAuthenticator implements SimpleAuthenticator {

    @Override
    public void onConnect(SimpleAuthInput simpleAuthInput, SimpleAuthOutput simpleAuthOutput) {
        ...
        //check if the user is "admin" and the password is "hivemq"
        if (username.equals("admin") && password.equals("hivemq")) {
            simpleAuthOutput.authenticateSuccessfully();
        } else {
            simpleAuthOutput.failAuthentication();
        }
    }
}
Full Source of Example Username/Password Authenticator
public class MyAuthenticator implements SimpleAuthenticator {

    @Override
    public void onConnect(SimpleAuthInput simpleAuthInput, SimpleAuthOutput simpleAuthOutput) {

        //get the contents of the MQTT connect packet from the input object
        ConnectPacket connect = simpleAuthInput.getConnectPacket();

        //check if the client set username and password
        if (!connect.getUserName().isPresent() || !connect.getPassword().isPresent()) {
            simpleAuthOutput.failAuthentication();
        }

        //get username and password from the connect packet
        String username = connect.getUserName().get();
        String password = Charset.forName("UTF-8").decode(connect.getPassword().get()).toString();

        //check if the user is "admin" and the password is "hivemq"
        if (username.equals("admin") && password.equals("hivemq")) {
            simpleAuthOutput.authenticateSuccessfully();
        } else {
            simpleAuthOutput.failAuthentication();
        }
    }
}

In the authenticator, the extension developer has access to all client information that can be used for authentication. This information includes all of the information from the CONNECT packet and the TLS certificate of the client. Access to all available authentication information makes it possible to implement even the most complex authentication mechanisms.

Authorization

Authorization is the mechanism that controls access based on certain identities.
Let’s continue the air travel example that we began in the authentication section. After your ticket and passport authenticate your identity, a boarding pass is used to authorizes your access to a specific plane, row, and seat.
In the context of an MQTT client, 'access' refers to the ability to publish and subscribe to certain topics.

HiveMQ extensions support two authorization mechanisms for the MQTT publish and subscribe actions:

  • Default Permissions: This mechanism provides access control on a per topic basis.

  • Authorizers: This mechanism implements authorization on a per-packet basis.

Default Permissions are the simplest and most scalable of the mechanisms. The permissions for a client are represented by a list of topics and attributes to which a client can or cannot publish or subscribe. Default permissions are independent for each client and can be set in an authenticator or in a ClientInitializer.

In this example, each client is allowed to publish and subscribe only to topics that start with the same client identifier as the client. Here, we add a few lines to the existing authenticator that add the default permissions of the client.

Default Permissions
...

//create a topic permission for all topics starting with the client identifier
final TopicPermission permission = Builders.topicPermission() (1)
        .topicFilter(simpleAuthInput.getConnectPacket().getClientId() + "/#") (2)
        .type(TopicPermission.PermissionType.ALLOW) (3)
        .build();

//add this permission to the client's default permissions
simpleAuthOutput.getDefaultPermissions().add(permission);

...
1 The extension SDK provides builders for classes that need to be constructed by an extension developer. The class Builders has static methods to access the specific builders.
2 Topic filter for this permission, the MQTT wildcards # and + can be used to create topic filters that match multiple topics.
3 A white-list approach that only allow access to specific topics is used in this example. A black-list approach is also possible.
Full Authorizer with default permissions
public class MyAuthenticator implements SimpleAuthenticator {

    @Override
    public void onConnect(SimpleAuthInput simpleAuthInput, SimpleAuthOutput simpleAuthOutput) {

        //get the contents of the MQTT connect packet from the input object
        ConnectPacket connect = simpleAuthInput.getConnectPacket();

        //check if the client set username and password
        if (!connect.getUserName().isPresent() || !connect.getPassword().isPresent()) {
            simpleAuthOutput.failAuthentication();
        }

        //get username and password from the connect packet
        String username = connect.getUserName().get();
        String password = Charset.forName("UTF-8").decode(connect.getPassword().get()).toString();

        //check if the user is "admin" and the password is "hivemq"
        if (username.equals("admin") && password.equals("hivemq")) {

            //create a topic permission for all topics starting with the client identifier
            TopicPermission permission = Builders.topicPermission()
                    .topicFilter(simpleAuthInput.getConnectPacket().getClientId() + "/#")
                    .build();

            //add this permission to the client's default permissions
            simpleAuthOutput.getDefaultPermissions().add(permission);

            simpleAuthOutput.authenticateSuccessfully();
        } else {
            simpleAuthOutput.failAuthentication();
        }

    }

}
The permissions for an MQTT client can include more detail than only the topic. For example, the QoS level or the retained-message flag. For a list of all available permission options, see the Topic Permission chapter.

The other mechanism for authorization are Authorizers. Authorizers implement authorization on a per-packet basis.

To learn more about Authorizers, see Publish Authorizer and Subscription Authorizer.

Lifecycle Events

To get notified when certain events in the lifecycle of an MQTT client occur, an extension can listen to lifecycle events. These events include a client establishing a connection, a client disconnecting, successful authentication of a client, and more.

To create a lifecycle-event listener, register a ClientLifecycleEventListenerProvider with the EventRegistry.

Register provider with EventRegistry
 Services.eventRegistry().setClientLifecycleEventListener(new MyClientLifecycleEventListenerProvider());

The provider returns a new instance of a ClientLifecycleEventListener for each client. If desired, you can share the same instance of a listener between multiple clients. For more information, see this example.

Example ClientLifecycleEventListenerProvider
public class MyClientLifecycleEventListenerProvider implements ClientLifecycleEventListenerProvider {

    @Override
    public ClientLifecycleEventListener getClientLifecycleEventListener(ClientLifecycleEventListenerProviderInput input) {
        return new MyClientLifecyleEventListener();
    }
}

In this example, a log statement is issued when a client connects, when authentication is successful, and when a client disconnects.

Example ClientLifecycleEventListener
public class MyClientLifecyleEventListener implements ClientLifecycleEventListener {

    private static final Logger log = LoggerFactory.getLogger(MyClientLifecyleEventListener.class);

    @Override
    public void onMqttConnectionStart(ConnectionStartInput connectionStartInput) {
        log.info("Client {} connects.", connectionStartInput.getConnectPacket().getClientId());
    }

    @Override
    public void onAuthenticationSuccessful(AuthenticationSuccessfulInput authenticationSuccessfulInput) {
        log.info("Client {} authenticated successfully.", authenticationSuccessfulInput.getClientInformation().getClientId());
    }

    @Override
    public void onDisconnect(DisconnectEventInput disconnectEventInput) {
        log.info("Client {} disconnected.", disconnectEventInput.getClientInformation().getClientId());
    }
}
You can listen to more detailed events by overriding other methods from the interface ClientLifecycleEventListener.


Intercept & Manipulate MQTT Messages

To manipulate the content of an MQTT message before the broker processes the message, a HiveMQ extension can intercept the message. The developer registers an Interceptor callback that HiveMQ calls each time an MQTT message is received.

In this example, a PublishInterceptor that manipulates the topic of incoming publishes is registered. An Interceptor is always registered in a ClientInitializer. The initializer is a callback that HiveMQ calls each time an MQTT client session is initialized.

Register ClientInitializer with the InitializerRegistry
Services.initializerRegistry().setClientInitializer(new MyClientInitializer());

An Interceptor is then added to the clientContext of the Initializer.

Register ClientInitializer with the InitializerRegistry
public class MyClientInitializer implements ClientInitializer {

    @Override
    public void initialize(InitializerInput initializerInput, ClientContext clientContext) {

        clientContext.addPublishInboundInterceptor(new PublishInboundInterceptor() { (1)
            @Override
            public void onInboundPublish(PublishInboundInput publishInboundInput, PublishInboundOutput publishInboundOutput) {
                //set the topic to "new/topic"
                publishInboundOutput.getPublishPacket().setTopic("new/topic"); (2)
            }
        });
    }
}
1 A new Interceptor is created and added to the context of the client.
2 The changed topic. Most of the attributes of an MQTT PUBLISH packet can be manipulated.
The Publish Interceptor can also be used to prevent onward delivery of incoming messages. For more information, see this example.

Publish MQTT Messages

You can use an extension to manipulate the messages clients send or to create and publish new messages.

The Publish Service allows you to publish messages to all subscribers of a topic or to a specific client. For more information, see this example.

Publish a MQTT message
Publish message = Builders.publish()
    .topic("the/topic")
    .qos(Qos.EXACTLY_ONCE)
    .payload(Charset.forName("UTF-8").encode("payload"))
    .build();

Services.publishService().publish(message);

The PUBLISH messages that the Publish Service sends are processed in the same way as a PUBLISH message that a client sends.


Modify Subscriptions

The Subscription Store allows extensions to manipulate the subscriptions of a client. The Subscription Store contains methods to add, remove, or get subscriptions for a client.

In this example, a subscription for the topic new/topic with QoS 1 is added for the client with the identifier client-ID.

Add a subscription for a client
TopicSubscription subscription = Builders.topicSubscription()
    .topicFilter("new/topic")
    .qos(Qos.AT_LEAST_ONCE)
    .build();

Services.subscriptionStore().addSubscription("client-ID", subscription);
When a subscription for a client is added via the extension system or the HiveMQ Control Center, the retained message for the topics that are included in the subscription are not published to the client.


Modify Retained Messages

In the extension SDK, the RetainedMessageStore can set, remove, and fetch retained messages.

For this example, a retained message is added (or replaced) for the topic retain/topic with QoS 0 and the payload content.

Add a subscription for a client
RetainedPublish publish = Builders.retainedPublish()
    .topic("retain/topic")
    .payload(Charset.forName("UTF-8").encode("content"))
    .qos(Qos.AT_MOST_ONCE)
    .build();

Services.retainedMessageStore().addOrReplace(publish);
Unlike the retained messages that clients send, retained messages that are added with the Retained Message Store are not published to active subscribers.

Cluster Discovery

The HiveMQ cluster discovery feature can utilize the custom discovery mechanisms that an extension provides. This functionality enables deep integration of a HiveMQ cluster into an existing infrastructure.

Register cluster discovery callback
Services.clusterService().addDiscoveryCallback(new MyClusterDiscovery());
Example cluster discovery
public class MyClusterDiscovery implements ClusterDiscoveryCallback {

    //this is a placeholder for an implementation of your custom external service
    MyExternalService externalService = new MyExternalService(); (1)

    @Override
    public void init(ClusterDiscoveryInput clusterDiscoveryInput, ClusterDiscoveryOutput clusterDiscoveryOutput) {
        externalService.registerNode(clusterDiscoveryInput.getOwnAddress()); (2)
        clusterDiscoveryOutput.provideCurrentNodes(externalService.getAllClusterNodes());
    }

    @Override
    public void reload(ClusterDiscoveryInput clusterDiscoveryInput, ClusterDiscoveryOutput clusterDiscoveryOutput) {
        clusterDiscoveryOutput.provideCurrentNodes(externalService.getAllClusterNodes()); (3)
    }

    @Override
    public void destroy(ClusterDiscoveryInput clusterDiscoveryInput) {
        externalService.removeNode(clusterDiscoveryInput.getOwnAddress()); (4)
    }
}
1 This class is a placeholder for a custom external service.
2 When this cluster node starts, the node registers with the external service.
3 The reload method is called regularly and a list of addresses for all cluster nodes is provided to the output object. You can configure the reload interval.
4 When this cluster node stops, the node is unregistered with the external service.
If you want to use cluster discovery from an extension, the cluster configuration of HiveMQ must be set to extension. For more information, see configuration example.


Metrics

HiveMQ extensions can provide their own metrics and access HiveMQ metrics.

Metrics in HiveMQ extensions are handled by the widely-used and well-known Dropwizard Metrics library.

Example read HiveMQ metric
Counter currentConnectionsCounter = Services.metricRegistry().counter("com.hivemq.networking.connections.current");
log.info("currently connected {}", currentConnectionsCounter.getCount());
Example add custom metric
final Counter customCounter = Services.metricRegistry().counter("demo.helloworld.started");
customCounter.inc();
Many types of metrics are available. Metric types include: Gauges, Counters, Meters, Histograms, and Timers.

Next steps

  • Get acquainted with the APIs and Services that the extension SDK provides.

  • Look at existing extension code on github.

  • Implement your custom logic in a HiveMQ extension.