General Extension Concepts

A HiveMQ extension is essentially a folder that holds a Java archive and an XML descriptor file that you place in the extensions folder of your HiveMQ installation.
Each HiveMQ extension has a particular extension lifecycle that allows the extension to be installed, enabled, or disabled at runtime.

The HiveMQ extension callback contains one or two parameters that are called input and output. For more information, see Extension Input/Output Principles.

Extension Lifecycle

HiveMQ extensions offer hot-reload functionality that allows you to install, enable, and disable extensions at runtime. Hot-reloading of extensions supports the high-availability strategy of HiveMQ by providing a way to change or update extensions with no downtime.

Each installed HiveMQ extension can be in one of two available states: Enabled or Disabled. You can use an administrative action to switch the state of an extension during runtime.

To disable an extension at runtime, create a DISABLED marker in the extension home folder before you remove the extension.

The following diagram shows the typical lifecycle of HiveMQ extensions at HiveMQ start.
In the diagram, 'Extension 1' is already installed when HiveMQ starts.

extension lifecycle start
Figure 1. HiveMQ Extension installed at startup diagram
startup 1
Figure 2. HiveMQ startup with extension

When you stop HiveMQ, all currently-enabled extensions stop automatically. The following diagram shows the lifecycle of 'Extension 1' when HiveMQ stops.

extension lifecycle stop
Figure 3. HiverMQ extension stopped with HiveMQ stop
stop 1
Figure 4. HiveMQ stop with extension

HiveMQ allows extensions to be enabled or disabled during runtime. The following diagram shows the lifecycle of an extension called Extension 2 that is enabled at runtime.

extension lifecycle start runtime
Figure 5. HiveMQ extension installed during runtime
startup 2
Figure 6. Add extension to running HiveMQ

When you add a disabled marker to the folder of an extension to disable the extension during runtime, the following lifecycle can be expected.

extension lifecycle stop runtime
Figure 7. HiveMQ Extenstion stopped during runtime
stop 2
Figure 8. Add extension during runtime

To install a HiveMQ extension, you simply move the folder that contains the extension to the <hivemq-home>/extensions/ folder of your HiveMQ instance.

By default, HiveMQ enables each extension that you install. To prevent HiveMQ from automatically starting up an extension, you need to add a disabled marker to the folder of the extension.

To mark an installed extension as disabled, place a file with the name DISABLED in the home folder of the extension.

To enable a previously-disabled extension, remove the DISABLED file from the extension folder.

HiveMQ automatically registers the changes you make to the extension, and enables or disables the extension accordingly.

The following example shows the folder content of two HiveMQ extensions:

- hivemq/
  |- bin/
  |- conf/
  ...
  |- extensions/
  |   |- extension-1/
  |   |   |- extension.jar
  |   |   |- hivemq-extension.xml
  |   |- extension-2/
  |   |   |- extension.jar
  |   |   |- hivemq-extension.xml
  |   |   |- DISABLED

In the example, Extension-2 has a DISABLED marker.
HiveMQ does not enable extensions that have a disabled marker in their extension folder during broker startup. The disabled extension can be enabled at runtime by removing the disabled marker.

To update an installed extension to a newer version, disable the running extension with a DISABLED marker, make the necessary changes, and then remove the DISABLED marker to re-enable the extension

When an extension starts at HiveMQ startup or during runtime, HiveMQ calls the extensionStart method that the ExtensionMain method of the extension implements.

ClientInitializer, Authenticators, Authorizers, and LifecycleCallbacks need to be registered in extensionStart method.

You can also use extensionStart to initialize and set up resources for an extension. For example, open a connection to a database.

public class HelloWorldMain implements ExtensionMain {

    private static final @NotNull Logger log = LoggerFactory.getLogger(HelloWorldMain.class);

    @Override
    public void extensionStart(final @NotNull ExtensionStartInput extensionStartInput, final @NotNull ExtensionStartOutput extensionStartOutput) {

        try {
            //Open Resources here.
            log.info("Started " + extensionStartInput.getExtensionInformation().getName() + ":" + extensionStartInput.getExtensionInformation().getVersion());

        } catch (Exception e) {
            log.error("Exception thrown at extension start: ", e);
        }

    }

    @Override
    public void extensionStop(final @NotNull ExtensionStopInput extensionStopInput, final @NotNull ExtensionStopOutput extensionStopOutput) {

        log.info("Stopped " + extensionStartInput.getExtensionInformation().getName() + ":" + extensionStartInput.getExtensionInformation().getVersion());

    }

}

When an extension stops at HiveMQ shutdown or during runtime, HiveMQ calls the extensionStop method that the ExtensionMain method of the extension implements.

The extensionStop method can be used to clean up extension resources and stop tasks that the extension runs. For example, close a database connection.

To prevent further starts, an extension can call the preventExtensionStartup method on the output parameter of the extensionStart method. When an extension prevents its own startup, HiveMQ automatically disabled the extension. Additionally, the extension must provide a reason why extension startup has been prevented. Detailed reason information improves the ability of HiveMQ operations to debug the prevented extension startup.

If an exception is thrown and not caught with the implementation of extensionStart, HiveMQ interprets the startup of the extension unsuccessful and disables the extension.

Extensions are responsible for handling all exceptions their methods throw as well as any exceptions that are thrown as a result of code the extension calls.

If the extensionStop method of an exception throws an exception, HiveMQ stops and disables the extension. An extension cannot prevent its own stop.

Extension Input / Output Principles

In general, callback methods in the HiveMQ Extension SDK API contain one or two parameters:

  • The first parameter in a HiveMQ callback method is a read-only Input object.

  • The second parameter in a HiveMQ callback method is a modifiable Output object.

Extension Start Input / Output
...

@Override
public void extensionStart(final @NotNull ExtensionStartInput extensionStartInput, final @NotNull ExtensionStartOutput extensionStartOutput) {

}
...

Extension Input

An input object is an informational object. This object provides callback context and global information.

Input Examples

The following information shows several extension input options, tells where the input occurs, and what information the input holds.

Example Extension Start Input

This example illustrates the content of an ExtensionStartInput from the extension start callback:

  • Previous extension version (for updated extensions)

  • Enabled extensions

  • Current extension version

  • Extension name

  • Extension home folder

  • Extension ID

  • Extension author (if author information is provided in the hivemq-extension.xml file)

...

@Override
public void extensionStart(final @NotNull ExtensionStartInput extensionStartInput, final @NotNull ExtensionStartOutput extensionStartOutput) {

    final Optional<String> previousVersion = extensionStartInput.getPreviousVersion();
    final Map<String, ExtensionInformation> enabledExtensions = extensionStartInput.getEnabledExtensions();

    final ExtensionInformation extensionInformation = extensionStartInput.getExtensionInformation();
    final String version = extensionInformation.getVersion();
    final String name = extensionInformation.getName();
    final File extensionFolder = extensionInformation.getExtensionHomeFolder();
    final String id = extensionInformation.getId();
    final Optional<String> author = extensionInformation.getAuthor();

}
...
Example Publish Inbound Input

This example illustrates the content of a PublishInboundInput from the inbound publish callback of a Publish Inbound Interceptor:

  • MQTT PUBLISH packet

  • Client ID

  • MQTT version

  • IP address

  • Listener with port, bind address, and type

  • TLS

  • Proxy information

  • Connection attribute store

  • Server information

...

new PublishInboundInterceptor() {
    @Override
    public void onInboundPublish(final @NotNull PublishInboundInput publishInboundInput, final @NotNull PublishInboundOutput publishInboundOutput) {

        final PublishPacket publishPacket = publishInboundInput.getPublishPacket();
        final ClientInformation clientInformation = publishInboundInput.getClientInformation();
        final ConnectionInformation connectionInformation = publishInboundInput.getConnectionInformation();

    }
};
...

Extension Output

Output objects give you the ability to extend the default behavior of HiveMQ to accommodate your specific business case. Depending on the callback, MQTT packets can be modified, dropped, or authorized, connecting clients can be authenticated or disconnected, and many more possibilities.

Output objects are blocking and can easily be used to create an asynchronous output object.

Blocking Output

The default execution of every output object is blocking.

In a blocking execution, the next callback for the same client cannot be called until the previous callback is completely executed.

Blocking execution is recommended for simple callback code that is not time-consuming.

There is no difference in ordering when using blocking or asynchronous callbacks. Execution of the callbacks is always ordered.

Be sure that your blocking code returns, otherwise the extension task execution for a client can not complete.

Blocking Examples

The following examples highlight use cases that benefit from a blocking output.

Example Publish Interceptor Output

This example shows a very simple Publish Inbound Interceptor^.
The interceptor checks the client ID of the publishing client and prevents delivery if the ID contains the string "prevent".

public class SimplePreventInterceptor implements PublishInboundInterceptor {

    @Override
    public void onInboundPublish(final @NotNull PublishInboundInput publishInboundInput, final @NotNull PublishInboundOutput publishInboundOutput) {

        final String clientId = publishInboundInput.getClientInformation().getClientId();

        if (clientId.contains("prevent")) {

            //prevent publish delivery on the output object
            publishInboundOutput.preventPublishDelivery();

        }

    }
}
Example Subscription Authorizer Output

This example shows how to implement a simple Subscription Authorizer^.
The authorizer denys any shared subscription.

public class DenySharedAuthorizer implements SubscriptionAuthorizer {

        @Override
        public void authorizeSubscribe(@NotNull final SubscriptionAuthorizerInput input, @NotNull final SubscriptionAuthorizerOutput output) {

            //disallow a shared subscription
            if (input.getTopicFilter().startsWith("$shared")) {
                output.failAuthorization();
                return;
            }

            //otherwise let the other extensions or default permissions decide
            output.nextExtensionOrDefault();
        }
    }

Asynchronous Output

If your use case requires external service calls such as HTTP requests or database reads/writes, we recommended the use of asynchronous (async) output.

To make an output object asynchronous, call one of the following methods:

  • output.async(Duration timeout); (fallback is FAILURE)

  • output.async(Duration timeout, TimeoutFallback fallback);

For more information, see HiveMQ Extension SDK JavaDoc

Some output objects provide additional async() implementations. For example, PublishInboundOutput. For more information, see Publish Inbound Interceptor.

The use of multiple async() calls is not allowed and throws an exception.

When output.resume() is called or the timeout duration is exceeded, HiveMQ calls the next callback for the same context.

Blocking and asynchronous callbacks do not affect callback ordering. Execution of callbacks is always ordered.

We recommended this type of execution for complex or time-consuming callback code. For more information, see async examples.

Always verify that your async callback execution is non-blocking.

Duration (Timeout) Parameter

The mandatory timeout parameter defines how long HiveMQ must wait for the result of an output.

TimeoutFallback Parameter

The default fallback is FAILURE. A 'Failure' response usually causes HiveMQ to end any further action. The actual behavior is defined in the specific implementation. SUCCESS usually means that HiveMQ either sees the outcome as successful or asks the next extension.

Asynchronous Output Examples

The following examples highlight use cases that benefit from asynchronous output.

Example Async Publish Interceptor Output

This example shows how to implement a PublishInboundInterceptor that asynchronously modifies an inbound PUBLISH message in an external task with the use of the Managed Extension Executor Service

Since the timeout strategy is FAILURE, HiveMQ prevents onward delivery of the PUBLISH if the duration of the task exceeds 2 seconds.

public class ModifyingAsyncPublishInboundInterceptor implements PublishInboundInterceptor {

    @Override
    public void onInboundPublish(final @NotNull PublishInboundInput publishInboundInput, final @NotNull PublishInboundOutput publishInboundOutput) {

        //make output object async with a duration of 2 seconds and timeout fallback failure
        final Async<PublishInboundOutput> asyncOutput = publishInboundOutput.async(Duration.ofSeconds(2), TimeoutFallback.FAILURE);

        //submit external task to extension executor service
        final CompletableFuture<?> taskFuture = Services.extensionExecutorService().submit(new Runnable() {
            @Override
            public void run() {
                //get the modifiable publish object from the output
                final ModifiablePublishPacket publish = publishInboundOutput.getPublishPacket();

                //call external publish modification (method not provided)
                callExternalTask(publish);
            }
        });

        //wait for completion of the task
        taskFuture.whenComplete(new BiConsumer<Object, Throwable>() {
            @Override
            public void accept(final @Nullable Object object, final @Nullable Throwable throwable) {
                if(throwable != null){
                    //please use more sophisticated logging
                    throwable.printStackTrace();
                }

                //Always resume the async output, otherwise it will time out
                asyncOutput.resume();
            }
        });

    }
}
Example Asynchronous Subscription Authorizer Output

This example shows how to use the SubscriptionAuthorizer with an async output to call an external service.

public class MyAsyncSubscriptionAuthorizer implements SubscriptionAuthorizer {

    @Override
    public void authorizeSubscribe(@NotNull final SubscriptionAuthorizerInput input, @NotNull final SubscriptionAuthorizerOutput output) {

        //get the managed extension executor service
        final ManagedExtensionExecutorService extensionExecutorService = Services.extensionExecutorService();

        //make the output async with a timeout of 2 seconds
        final Async<SubscriptionAuthorizerOutput> async = output.async(Duration.ofSeconds(2));

        //submit a task to the extension executor
        extensionExecutorService.submit(new Runnable() {
            @Override
            public void run() {

                //call an external service to decide the outcome
                final boolean result = callExternalTaks();

                if (result) {
                    output.authorizeSuccessfully();
                } else {
                    output.failAuthorization();
                }

                //Always resume the async output, otherwise it will time out
                async.resume();
            }
        });

    }
}

Initializing Objects

The HiveMQ Extension SDK provides static Services and Builder classes to enable the creation of your desired Java objects inside extension classes.

Example Services Event Registry
final EventRegistry eventRegistry = Services.eventRegistry();
Example TopicPermission Builder
final TopicPermission permission = Builders.topicPermission()
        .topicFilter("allowed/topic")
        .qos(TopicPermission.Qos.ALL)
        .activity(TopicPermission.MqttActivity.ALL)
        .type(TopicPermission.PermissionType.ALLOW)
        .retain(TopicPermission.Retain.ALL)
        .build();
Builders validate the given values. The HiveMQ Extension SDK JavaDoc lists the exceptions that can be thrown when an invalid value is passed.

Never Block

The single most important rule for all extensions is: Never block an output.

You can use the ManagedExtensionExecutorService for every action that has the potential to block in the callback of an output.

It is a best practise to use a connection pool such as HikariCP when you work with databases.

The following example shows how the ManagedExtensionExecutorService can be used to accomplish non-blocking callback behavior.
This is a good example of an asynchronous Subscription Authorizer that calls an external server.
Since calls to an external service always expose the risk of taking a long time or timing out, callbacks such as this can block an entire flow.
To prevent potential blocks, the PublishAuthorizesOutput is made asynchronous with the help of the ManagedExtensionExecutorService.

public class MyAsyncSubscriptionAuthorizer implements SubscriptionAuthorizer {

    @Override
    public void authorizeSubscribe(@NotNull final SubscriptionAuthorizerInput input, @NotNull final SubscriptionAuthorizerOutput output) {

        //get the managed extension executor service
        final ManagedExtensionExecutorService extensionExecutorService = Services.extensionExecutorService();

        //make the output async with a timeout of 2 seconds
        final Async<SubscriptionAuthorizerOutput> async = output.async(Duration.ofSeconds(2));

        //submit a task to the extension executor
        extensionExecutorService.submit(new Runnable() {
            @Override
            public void run() {

                //call an external service to decide the outcome
                final boolean result = callExternalTaks();

                if (result) {
                    output.authorizeSuccessfully();
                } else {
                    output.failAuthorization();
                }

                //Always resume the async output, otherwise it will time out
                async.resume();
            }
        });

    }
}

Extension Isolation

The HiveMQ Extension SDK uses extension isolation:

  • Each extension has its own class loader

  • Extension resources and classes are not shared between extensions

Extension isolation adds additional security for your extension and avoids difficult to debug failure behavior that can occur when extensions share libraries.

Undesirable side effects can result when you use libraries like Jersey that tend to interfere with other class loaders. HiveMQ implements workarounds for such libraries internally. If you use a library that HiveMQ does not yet cover, notify contact@hivemq.com to request an appropriate workaround.