General Concepts

A HiveMQ extension is essentially a folder containing a Java JAR file and an XML descriptor file, which is dropped into the extensions folder of the HiveMQ installation.
Each HiveMQ extension has its own Extension Lifecycle, which allows extensions to be installed, enabled and disabled at runtime.

Each HiveMQ extension callback contains 1 or 2 parameters, called input and output. A detailed explanation of this concept can be found here.

Extension Lifecycle

HiveMQ extensions can be hot-reloaded. This means that extensions can be installed, enabled and disabled at runtime. Hot-reloading of extensions enhances HiveMQs high-availability strategy and provides a way to change extensions without introducing any kind of downtime.

An installed HiveMQ extension can have two different states. It can either be Enabled or Disabled. These states can be switched at runtime by enabling or disabling the extension via an administrative action.

Always disable your extensions by creating a DISABLED marker in the extension home folder first before removing the extension at runtime.

The following diagram shows the typical lifecycle of HiveMQ extensions at HiveMQ Start. The extension "Extension 1" is already installed at the start up of HiveMQ.

extension lifecycle start
Figure 1. HiveMQ Extension installed at startup diagram
startup 1
Figure 2. HiveMQ startup with extension


All currently enabled extensions are automatically stopped, when HiveMQ is stopped. The following diagram shows the lifecycle for the extension "Extension 1 " in this case.

extension lifecycle stop
Figure 3. HiverMQ Extension stopped with HiveMQ stop
stop 1
Figure 4. HiveMQ stop with extension


Extensions can also be enabled or disabled at runtime. The following diagram show the lifecycle for the extension "Extension 2", which is enabled at runtime.

extension lifecycle start runtime
Figure 5. HiveMQ Extension installed during runtime
startup 2
Figure 6. Add extension to running HiveMQ


When an extension is disabled at runtime, by adding the DISABLED-marker to the extension’s folder, the following lifecycyle can be expected.

extension lifecycle stop runtime
Figure 7. HiveMQ Extenstion stopped during runtime
stop 2
Figure 8. Add extension during runtime

A HiveMQ extension is installed by putting the extension’s folder into HiveMQ’s extensions folder <hivemq-home>/extensions/.
Installed HiveMQ extensions are enabled by default. To prevent HiveMQ from automatically starting up the extension, a disabled marker needs to be present in the folder of the newly installed extension.

An extension can be marked as disabled when a file with the name DISABLED is placed inside its home folder.

A previously disabled extension can be enabled by removing the file DISABLED from its extension folder. HiveMQ picks up the change automatically and enables or disables the extension accordingly.

- hivemq/
  |- bin/
  |- conf/
  ...
  |- extensions/
  |   |- extension-1/
  |   |   |- extension.jar
  |   |   |- hivemq-extension.xml
  |   |- extension-2/
  |   |   |- extension.jar
  |   |   |- hivemq-extension.xml
  |   |   |- DISABLED

The example above shows the folders contents for 2 HiveMQ extensions. In this case extension-2 has the DISABLED marker present.
Extensions, which have the disabled marker present in their extension folder, are not enabled by HiveMQ on broker start up. They can be enabled at runtime by removing the disabled marker.

To update an installed extension to a newer version, you disable the running extension, make the necessary changes, and then enable the extension again.

If an extension is started, whether at HiveMQ start up or during runtime, HiveMQ calls the extensionStart of the extension’s implementation of ExtensionMain imethod . ClientInitializer, Authenticators, Authorizers, and LifecycleCallbacks need to be registered in this method. You can also use extensionStart to initialize and set up resources for an extension. For example, open a connection to a database.

public class HelloWorldMain implements ExtensionMain {

    private static final @NotNull Logger log = LoggerFactory.getLogger(HelloWorldMain.class);

    @Override
    public void extensionStart(final @NotNull ExtensionStartInput extensionStartInput, final @NotNull ExtensionStartOutput extensionStartOutput) {

        try {
            //Open Resources here.
            log.info("Started " + extensionStartInput.getExtensionInformation().getName() + ":" + extensionStartInput.getExtensionInformation().getVersion());

        } catch (Exception e) {
            log.error("Exception thrown at extension start: ", e);
        }

    }

    @Override
    public void extensionStop(final @NotNull ExtensionStopInput extensionStopInput, final @NotNull ExtensionStopOutput extensionStopOutput) {

        log.info("Stopped " + extensionStartInput.getExtensionInformation().getName() + ":" + extensionStartInput.getExtensionInformation().getVersion());

    }

}

When an extension is stopped, whether at HiveMQ shutdown or during runtime, the method extensionStop of the extension’s implementation of ExtensionMain is called by HiveMQ. This method can be used to clean up the extension’s resources and stop running tasks. (E.g.closing a database connection)

By calling the method preventExtensionStartup on the ouput parameter of the extensionStart method an extension can prevent its further startup. If an extension prevents its own startup that extension is automatically disabled by HiveMQ. The extension must give a reason why the extension startup has been prevented. The more valuable and informative this information is, the better the HiveMQ operations capabilities of debugging prevented Extension startups will be.

If an Exception is thrown and not caught with the implementation of extensionStart, the startup of the extension is interpreted by HiveMQ as unsuccessful and the extension will be disabled.

Extensions are responsible for handling Exceptions thrown in their methods or Exceptions thrown as a result of code called by the extension.

The stopping of an extension can not be prevented by the extension itself. If an Exception is thrown from the extension’s implementation of extensionStop the extension is stopped and disabled by HiveMQ.




Extension Input / Output Principles

Almost every callback method provided by the HiveMQ Extenstion SDK API contains 1 or 2 parameters.
The first parameter is always an Input object.
The second parameter is always an Output object.

Extension Start Input / Output
...

@Override
public void extensionStart(final @NotNull ExtensionStartInput extensionStartInput, final @NotNull ExtensionStartOutput extensionStartOutput) {

}
...




Input

An input object is an informational object. It provides context about the callback and global information. See examples.



Examples

The following lists a number possible ExtensionInputs, where they occur and what information they hold.

Extension Start Input

This examples illustrates the content of a ExtensionStartInput from the extension start callback.

These are:

  • Previous extension version ( if extension was updated )

  • Enabled extensions

  • Current extension version

  • Extension name

  • Extension home folder

  • Extension ID

  • Extension Author ( if set in hivemq-extension.xml )

...

@Override
public void extensionStart(final @NotNull ExtensionStartInput extensionStartInput, final @NotNull ExtensionStartOutput extensionStartOutput) {

    final Optional<String> previousVersion = extensionStartInput.getPreviousVersion();
    final Map<String, ExtensionInformation> enabledExtensions = extensionStartInput.getEnabledExtensions();

    final ExtensionInformation extensionInformation = extensionStartInput.getExtensionInformation();
    final String version = extensionInformation.getVersion();
    final String name = extensionInformation.getName();
    final File extensionFolder = extensionInformation.getExtensionHomeFolder();
    final String id = extensionInformation.getId();
    final Optional<String> author = extensionInformation.getAuthor();

}
...




Publish Inbound Input

This examples illustrates the content of a PublishInboundInput from the inbound publish callback of a Publish Inbound Interceptor^.

It holds this information:

  • MQTT PUBLISH packet

  • Client ID

  • MQTT Version

  • IP Address

  • Listener with port, bind address and type

  • TLS

  • Proxy Information

  • ConnectionAttributeStore

  • The ServerInformation

...

new PublishInboundInterceptor() {
    @Override
    public void onInboundPublish(final @NotNull PublishInboundInput publishInboundInput, final @NotNull PublishInboundOutput publishInboundOutput) {

        final PublishPacket publishPacket = publishInboundInput.getPublishPacket();
        final ClientInformation clientInformation = publishInboundInput.getClientInformation();
        final ConnectionInformation connectionInformation = publishInboundInput.getConnectionInformation();

    }
};
...


Output

Output objects provide the ability to extend default HiveMQ behaviour to accommodate your specific business case. Depending on the callback, MQTT packets can be modified, dropped, or authorized, connecting clients can be authenticated or disconnected and many more possibilities.

The output object itself is blocking , it can easily be used to create an asynchronous output object.




Blocking Output

The default execution of any output object is blocking. The next callback for the same client will be called when the previous callback is completely executed. It is recommended to use this way of execution if the callback code is simple and not time consuming. See examples.

There is no difference in ordering when using blocking or asynchronous callbacks. Execution of callbacks is always ordered.

Be sure that your blocking code returns, otherwise the extension task execution for a client can not complete.

Examples

This sections illustrates examples where blocking output usage is useful.

Simple Publish Interceptor Output

This example shows a very simple Publish Inbound Interceptor^.
The interceptor checks the client id of the publishing client and prevents delivery if the id contains the String "prevent".
This use case is a great candidate for using a blocking output implementation.

public class SimplePreventInterceptor implements PublishInboundInterceptor {

    @Override
    public void onInboundPublish(final @NotNull PublishInboundInput publishInboundInput, final @NotNull PublishInboundOutput publishInboundOutput) {

        final String clientId = publishInboundInput.getClientInformation().getClientId();

        if (clientId.contains("prevent")) {

            //prevent publish delivery on the output object
            publishInboundOutput.preventPublishDelivery();

        }

    }
}


Simple Subscription Authorizer Output

This example shows how to implement a simple Subscription Authorizer^.
The authorizer denys any shared subscription.
This use case is a great candidate for using a blocking output implementation.

public class DenySharedAuthorizer implements SubscriptionAuthorizer {

        @Override
        public void authorizeSubscribe(@NotNull final SubscriptionAuthorizerInput input, @NotNull final SubscriptionAuthorizerOutput output) {

            //disallow a shared subscription
            if (input.getTopicFilter().startsWith("$shared")) {
                output.failAuthorization();
                return;
            }

            //otherwise let the other extensions or default permissions decide
            output.nextExtensionOrDefault();
        }
    }


Async Output

When your business case requires any external service calls, like http requests or database reads/writes it is recommended to use an asynchronous output. An output object can be made asynchronous by calling one of the following methods:

  • output.async(Duration timeout); (fallback is FAILURE)

  • output.async(Duration timeout, TimeoutFallback fallback);

Some output objects (eg. PublishInboundOutput provide more async() implementations. But they are described in their specific sections.

Multiple async() calls are not allowed and will throw an exception.

The next callback for the same context will be called when the output.resume() is called or the timeout is exceeded.

There is no difference in ordering when using blocking or asynchronous callbacks. Execution of callbacks is always ordered.

It is recommended to use this way of execution if the callback code is very complex or time consuming. See examples.

Be sure that your async callback execution does not block!

Duration (Timeout) Parameter

The timeout parameter is mandatory as it tells HiveMQ how long HiveMQ waits for any output result.

TimeoutFallback Parameter

The default fallback is FAILURE which usually means that HiveMQ aborts any further action. The actual behaviour is defined in the specific implementation. SUCCESS usually means that HiveMQ either sees the outcome as successful or asks the next extension.

Examples

This sections illustrates examples where asynchronous output usage is useful.

Async Publish Interceptor Output

This example shows how to implement a PublishInboundInterceptor which asynchronously modifies an inbound PUBLISH message in an external task with the use of the Managed Extension Executor Service^

The timeout strategy is failure, which means that the PUBLISH message onward delivery will be prevented if the task takes longer than 2 seconds.

public class ModifyingAsyncPublishInboundInterceptor implements PublishInboundInterceptor {

    @Override
    public void onInboundPublish(final @NotNull PublishInboundInput publishInboundInput, final @NotNull PublishInboundOutput publishInboundOutput) {

        //make output object async with a duration of 2 seconds and timeout fallback failure
        final Async<PublishInboundOutput> asyncOutput = publishInboundOutput.async(Duration.ofSeconds(2), TimeoutFallback.FAILURE);

        //submit external task to extension executor service
        final CompletableFuture<?> taskFuture = Services.extensionExecutorService().submit(new Runnable() {
            @Override
            public void run() {
                //get the modifiable publish object from the output
                final ModifiablePublishPacket publish = publishInboundOutput.getPublishPacket();

                //call external publish modification (method not provided)
                callExternalTask(publish);
            }
        });

        //wait for completion of the task
        taskFuture.whenComplete(new BiConsumer<Object, Throwable>() {
            @Override
            public void accept(final @Nullable Object object, final @Nullable Throwable throwable) {
                if(throwable != null){
                    //please use more sophisticated logging
                    throwable.printStackTrace();
                }

                //Always resume the async output, otherwise it will time out
                asyncOutput.resume();
            }
        });

    }
}




Async Subscription Authorizer Output

This example shows how to use the SubscriptionAuthorizer with an async output to call an external service.

public class MyAsyncSubscriptionAuthorizer implements SubscriptionAuthorizer {

    @Override
    public void authorizeSubscribe(@NotNull final SubscriptionAuthorizerInput input, @NotNull final SubscriptionAuthorizerOutput output) {

        //get the managed extension executor service
        final ManagedExtensionExecutorService extensionExecutorService = Services.extensionExecutorService();

        //make the output async with a timeout of 2 seconds
        final Async<SubscriptionAuthorizerOutput> async = output.async(Duration.ofSeconds(2));

        //submit a task to the extension executor
        extensionExecutorService.submit(new Runnable() {
            @Override
            public void run() {

                //call an external service to decide the outcome
                final boolean result = callExternalTaks();

                if (result) {
                    output.authorizeSuccessfully();
                } else {
                    output.failAuthorization();
                }

                //Always resume the async output, otherwise it will time out
                async.resume();
            }
        });

    }
}



Initializing Objects

The HiveMQ Extension SDK provides static Services and Builder classes to enable the creation of desired Java objects inside extension classes.

Services Example Event Registry
final EventRegistry eventRegistry = Services.eventRegistry();


Builder Example TopicPermission
final TopicPermission permission = Builders.topicPermission()
        .topicFilter("allowed/topic")
        .qos(TopicPermission.Qos.ALL)
        .activity(TopicPermission.MqttActivity.ALL)
        .type(TopicPermission.PermissionType.ALLOW)
        .retain(TopicPermission.Retain.ALL)
        .build();
Builders validate the given values. The Javadoc lists the Exceptions that may be thrown when an invalid value is passed.



Don’t Block

The single most important rule for all extension is: Don’t block any outputs. Never. You can use the ManagedExtensionExecutorService for every action that might potentially block in the callback of an output.
By the way: It’s also a great practise to use Connection Pools (e.g. HikariCP) if you are dealing with databases.

The following demonstrates an example, where the ManagedExtensionExecutorService is used to accomplish non-blocking callback behavior.
This is a suitable example of an asynchronous Subscription Authorizer that calls an external server.
A callback like this has the potential to block the entire flow, as a call to an external service always bares the risk of taking long or timing out.
For this reason the PublishAuthorizesOutput has been made asynchronous with the help of the ManagedExtensionExecutorService.

public class MyAsyncSubscriptionAuthorizer implements SubscriptionAuthorizer {

    @Override
    public void authorizeSubscribe(@NotNull final SubscriptionAuthorizerInput input, @NotNull final SubscriptionAuthorizerOutput output) {

        //get the managed extension executor service
        final ManagedExtensionExecutorService extensionExecutorService = Services.extensionExecutorService();

        //make the output async with a timeout of 2 seconds
        final Async<SubscriptionAuthorizerOutput> async = output.async(Duration.ofSeconds(2));

        //submit a task to the extension executor
        extensionExecutorService.submit(new Runnable() {
            @Override
            public void run() {

                //call an external service to decide the outcome
                final boolean result = callExternalTaks();

                if (result) {
                    output.authorizeSuccessfully();
                } else {
                    output.failAuthorization();
                }

                //Always resume the async output, otherwise it will time out
                async.resume();
            }
        });

    }
}



Extension Isolation

The HiveMQ Extension SDK uses extension isolation. That means, each extension has its own classloader and resources and classes cannot be shared between extensions. This adds additional security for your extension and helps to avoid tricky to debug failure behavior that could be caused by using the same libraries in different extensions.
Some libraries like Jersey are trying and want to interfere with other class loaders, which may lead to side effects. HiveMQ implements workarounds for such libraries internally. If you are using such a library, which isn’t covered yet by HiveMQ, please contact support@hivemq.com so we can include workarounds for such libraries.