General Concepts
A HiveMQ extension is essentially a folder containing a Java JAR file and an XML descriptor file, which is dropped into the extensions folder of the HiveMQ installation.
Each HiveMQ extension has its own Extension Lifecycle, which allows extensions to be installed, enabled and disabled at runtime.
Each HiveMQ extension callback contains 1 or 2 parameters, called input and output. A detailed explanation of this concept can be found here.
Extension Lifecycle
HiveMQ extensions can be hot-reloaded. This means that extensions can be installed, enabled and disabled at runtime. Hot-reloading of extensions enhances HiveMQs high-availability strategy and provides a way to change extensions without introducing any kind of downtime.
An installed HiveMQ extension can have two different states. It can either be Enabled
or Disabled
.
These states can be switched at runtime by enabling or disabling the extension via an administrative action.
Always disable your extensions by creating a DISABLED marker in the extension home folder first before removing the extension at runtime. |
The following diagram shows the typical lifecycle of HiveMQ extensions at HiveMQ Start. The extension "Extension 1" is already installed at the start up of HiveMQ.


All currently enabled extensions are automatically stopped, when HiveMQ is stopped. The following diagram shows the lifecycle for the extension "Extension 1 " in this case.


Extensions can also be enabled or disabled at runtime. The following diagram show the lifecycle for the extension "Extension 2", which is enabled at runtime.


When an extension is disabled at runtime, by adding the DISABLED-marker to the extension’s folder, the following lifecycyle can be expected.


A HiveMQ extension is installed by putting the extension’s folder into HiveMQ’s extensions folder <hivemq-home>/extensions/.
Installed HiveMQ extensions are enabled by default. To prevent HiveMQ from automatically starting up the extension, a disabled marker needs to be present in the folder of the newly installed extension.
An extension can be marked as disabled when a file with the name DISABLED is placed inside its home folder.
A previously disabled extension can be enabled by removing the file DISABLED
from its extension folder.
HiveMQ picks up the change automatically and enables or disables the extension accordingly.
- hivemq/
|- bin/
|- conf/
...
|- extensions/
| |- extension-1/
| | |- extension.jar
| | |- hivemq-extension.xml
| |- extension-2/
| | |- extension.jar
| | |- hivemq-extension.xml
| | |- DISABLED
The example above shows the folders contents for 2 HiveMQ extensions. In this case extension-2 has the DISABLED marker present.
Extensions, which have the disabled marker present in their extension folder, are not enabled by HiveMQ on broker start up.
They can be enabled at runtime by removing the disabled marker.
To update an installed extension to a newer version, you disable the running extension, make the necessary changes, and then enable the extension again.
If an extension is started, whether at HiveMQ start up or during runtime, HiveMQ calls the extensionStart
of the extension’s implementation of ExtensionMain imethod .
ClientInitializer, Authenticators, Authorizers, and LifecycleCallbacks need to be registered in this method.
You can also use extensionStart
to initialize and set up resources for an extension. For example, open a connection to a database.
public class HelloWorldMain implements ExtensionMain {
private static final @NotNull Logger log = LoggerFactory.getLogger(HelloWorldMain.class);
@Override
public void extensionStart(final @NotNull ExtensionStartInput extensionStartInput, final @NotNull ExtensionStartOutput extensionStartOutput) {
try {
//Open Resources here.
log.info("Started " + extensionStartInput.getExtensionInformation().getName() + ":" + extensionStartInput.getExtensionInformation().getVersion());
} catch (Exception e) {
log.error("Exception thrown at extension start: ", e);
}
}
@Override
public void extensionStop(final @NotNull ExtensionStopInput extensionStopInput, final @NotNull ExtensionStopOutput extensionStopOutput) {
log.info("Stopped " + extensionStartInput.getExtensionInformation().getName() + ":" + extensionStartInput.getExtensionInformation().getVersion());
}
}
When an extension is stopped, whether at HiveMQ shutdown or during runtime, the method extensionStop
of the extension’s implementation of ExtensionMain is called by HiveMQ.
This method can be used to clean up the extension’s resources and stop running tasks. (E.g.closing a database connection)
By calling the method preventExtensionStartup
on the ouput
parameter of the extensionStart
method an extension can prevent its further startup.
If an extension prevents its own startup that extension is automatically disabled by HiveMQ. The extension must give a reason why the extension startup has been prevented. The more valuable and informative this information is, the better the HiveMQ operations capabilities of debugging prevented Extension startups will be.
If an Exception is thrown and not caught with the implementation of extensionStart
, the startup of the extension is interpreted by HiveMQ as unsuccessful and the extension will be disabled.
Extensions are responsible for handling Exceptions thrown in their methods or Exceptions thrown as a result of code called by the extension. |
The stopping of an extension can not be prevented by the extension itself. If an Exception is thrown from the extension’s implementation of
extensionStop
the extension is stopped and disabled by HiveMQ.
Extension Input / Output Principles
Almost every callback method provided by the HiveMQ Extenstion SDK API contains 1 or 2 parameters.
The first parameter is always a read-only Input object.
The second parameter is always a modifiable Output object.
...
@Override
public void extensionStart(final @NotNull ExtensionStartInput extensionStartInput, final @NotNull ExtensionStartOutput extensionStartOutput) {
}
...
Input
An input object is an informational object. It provides context about the callback and global information. See examples.
Examples
The following lists a number possible ExtensionInputs, where they occur and what information they hold.
Extension Start Input
This examples illustrates the content of a ExtensionStartInput
from the extension start callback.
These are:
-
Previous extension version ( if extension was updated )
-
Enabled extensions
-
Current extension version
-
Extension name
-
Extension home folder
-
Extension ID
-
Extension Author ( if set in hivemq-extension.xml )
...
@Override
public void extensionStart(final @NotNull ExtensionStartInput extensionStartInput, final @NotNull ExtensionStartOutput extensionStartOutput) {
final Optional<String> previousVersion = extensionStartInput.getPreviousVersion();
final Map<String, ExtensionInformation> enabledExtensions = extensionStartInput.getEnabledExtensions();
final ExtensionInformation extensionInformation = extensionStartInput.getExtensionInformation();
final String version = extensionInformation.getVersion();
final String name = extensionInformation.getName();
final File extensionFolder = extensionInformation.getExtensionHomeFolder();
final String id = extensionInformation.getId();
final Optional<String> author = extensionInformation.getAuthor();
}
...
Publish Inbound Input
This examples illustrates the content of a PublishInboundInput from the inbound publish callback of a Publish Inbound Interceptor^.
It holds this information:
-
MQTT PUBLISH packet
-
Client ID
-
MQTT Version
-
IP Address
-
Listener with port, bind address and type
-
TLS
-
Proxy Information
-
ConnectionAttributeStore
...
new PublishInboundInterceptor() {
@Override
public void onInboundPublish(final @NotNull PublishInboundInput publishInboundInput, final @NotNull PublishInboundOutput publishInboundOutput) {
final PublishPacket publishPacket = publishInboundInput.getPublishPacket();
final ClientInformation clientInformation = publishInboundInput.getClientInformation();
final ConnectionInformation connectionInformation = publishInboundInput.getConnectionInformation();
}
};
...
Output
Output objects provide the ability to extend default HiveMQ behaviour to accommodate your specific business case. Depending on the callback, MQTT packets can be modified, dropped, or authorized, connecting clients can be authenticated or disconnected and many more possibilities.
The output object itself is blocking , it can easily be used to create an asynchronous output object.
Blocking Output
The default execution of any output object is blocking. The next callback for the same client will be called when the previous callback is completely executed. It is recommended to use this way of execution if the callback code is simple and not time consuming. See examples.
There is no difference in ordering when using blocking or asynchronous callbacks. Execution of callbacks is always ordered.
Be sure that your blocking code returns, otherwise the extension task execution for a client can not complete. |
Examples
This sections illustrates examples where blocking output usage is useful.
Simple Publish Interceptor Output
This example shows a very simple Publish Inbound Interceptor^.
The interceptor checks the client id of the publishing client and prevents delivery if the id contains the String "prevent".
This use case is a great candidate for using a blocking output implementation.
public class SimplePreventInterceptor implements PublishInboundInterceptor {
@Override
public void onInboundPublish(final @NotNull PublishInboundInput publishInboundInput, final @NotNull PublishInboundOutput publishInboundOutput) {
final String clientId = publishInboundInput.getClientInformation().getClientId();
if (clientId.contains("prevent")) {
//prevent publish delivery on the output object
publishInboundOutput.preventPublishDelivery();
}
}
}
Simple Subscription Authorizer Output
This example shows how to implement a simple Subscription Authorizer^.
The authorizer denys any shared subscription.
This use case is a great candidate for using a blocking output implementation.
public class DenySharedAuthorizer implements SubscriptionAuthorizer {
@Override
public void authorizeSubscribe(@NotNull final SubscriptionAuthorizerInput input, @NotNull final SubscriptionAuthorizerOutput output) {
//disallow a shared subscription
if (input.getTopicFilter().startsWith("$shared")) {
output.failAuthorization();
return;
}
//otherwise let the other extensions or default permissions decide
output.nextExtensionOrDefault();
}
}
Async Output
When your business case requires any external service calls, like http requests or database reads/writes it is recommended to use an asynchronous output. An output object can be made asynchronous by calling one of the following methods:
-
output.async(Duration timeout);
(fallback is FAILURE) -
output.async(Duration timeout, TimeoutFallback fallback);
Some output objects (eg. PublishInboundOutput
provide more async()
implementations. But they are described in their specific sections.
Multiple async() calls are not allowed and will throw an exception.
|
The next callback for the same context will be called when the output.resume()
is called or the timeout is exceeded.
There is no difference in ordering when using blocking or asynchronous callbacks. Execution of callbacks is always ordered.
It is recommended to use this way of execution if the callback code is very complex or time consuming. See examples.
Be sure that your async callback execution does not block! |
Duration (Timeout) Parameter
The timeout parameter is mandatory as it tells HiveMQ how long HiveMQ waits for any output result.
TimeoutFallback Parameter
The default fallback is FAILURE
which usually means that HiveMQ aborts any further action. The actual behaviour is defined in the specific implementation.
SUCCESS
usually means that HiveMQ either sees the outcome as successful or asks the next extension.
Examples
This sections illustrates examples where asynchronous output usage is useful.
Async Publish Interceptor Output
This example shows how to implement a PublishInboundInterceptor
which asynchronously modifies an inbound PUBLISH message in an external task
with the use of the Managed Extension Executor Service^
The timeout strategy is failure, which means that the PUBLISH
message onward delivery
will be prevented if the task takes longer than 2 seconds.
public class ModifyingAsyncPublishInboundInterceptor implements PublishInboundInterceptor {
@Override
public void onInboundPublish(final @NotNull PublishInboundInput publishInboundInput, final @NotNull PublishInboundOutput publishInboundOutput) {
//make output object async with a duration of 2 seconds and timeout fallback failure
final Async<PublishInboundOutput> asyncOutput = publishInboundOutput.async(Duration.ofSeconds(2), TimeoutFallback.FAILURE);
//submit external task to extension executor service
final CompletableFuture<?> taskFuture = Services.extensionExecutorService().submit(new Runnable() {
@Override
public void run() {
//get the modifiable publish object from the output
final ModifiablePublishPacket publish = publishInboundOutput.getPublishPacket();
//call external publish modification (method not provided)
callExternalTask(publish);
}
});
//wait for completion of the task
taskFuture.whenComplete(new BiConsumer<Object, Throwable>() {
@Override
public void accept(final @Nullable Object object, final @Nullable Throwable throwable) {
if(throwable != null){
//please use more sophisticated logging
throwable.printStackTrace();
}
//Always resume the async output, otherwise it will time out
asyncOutput.resume();
}
});
}
}
Async Subscription Authorizer Output
This example shows how to use the SubscriptionAuthorizer
with an async output to call an external service.
public class MyAsyncSubscriptionAuthorizer implements SubscriptionAuthorizer {
@Override
public void authorizeSubscribe(@NotNull final SubscriptionAuthorizerInput input, @NotNull final SubscriptionAuthorizerOutput output) {
//get the managed extension executor service
final ManagedExtensionExecutorService extensionExecutorService = Services.extensionExecutorService();
//make the output async with a timeout of 2 seconds
final Async<SubscriptionAuthorizerOutput> async = output.async(Duration.ofSeconds(2));
//submit a task to the extension executor
extensionExecutorService.submit(new Runnable() {
@Override
public void run() {
//call an external service to decide the outcome
final boolean result = callExternalTaks();
if (result) {
output.authorizeSuccessfully();
} else {
output.failAuthorization();
}
//Always resume the async output, otherwise it will time out
async.resume();
}
});
}
}
Initializing Objects
The HiveMQ Extension SDK provides static Services
and Builder
classes to enable the creation of desired Java objects inside extension classes.
final EventRegistry eventRegistry = Services.eventRegistry();
final TopicPermission permission = Builders.topicPermission()
.topicFilter("allowed/topic")
.qos(TopicPermission.Qos.ALL)
.activity(TopicPermission.MqttActivity.ALL)
.type(TopicPermission.PermissionType.ALLOW)
.retain(TopicPermission.Retain.ALL)
.build();
Builders validate the given values. The Javadoc lists the Exceptions that may be thrown when an invalid value is passed. |
Don’t Block
The single most important rule for all extension is: Don’t block any outputs. Never. You can use the ManagedExtensionExecutorService for every action that might potentially block in the callback of an output.
By the way: It’s also a great practise to use Connection Pools (e.g. HikariCP) if you are dealing with databases.
The following demonstrates an example, where the ManagedExtensionExecutorService
is used to accomplish non-blocking callback behavior.
This is a suitable example of an asynchronous Subscription Authorizer that calls an external server.
A callback like this has the potential to block the entire flow, as a call to an external service always bares the risk of taking long or timing out.
For this reason the PublishAuthorizesOutput
has been made asynchronous with the help of the ManagedExtensionExecutorService
.
public class MyAsyncSubscriptionAuthorizer implements SubscriptionAuthorizer {
@Override
public void authorizeSubscribe(@NotNull final SubscriptionAuthorizerInput input, @NotNull final SubscriptionAuthorizerOutput output) {
//get the managed extension executor service
final ManagedExtensionExecutorService extensionExecutorService = Services.extensionExecutorService();
//make the output async with a timeout of 2 seconds
final Async<SubscriptionAuthorizerOutput> async = output.async(Duration.ofSeconds(2));
//submit a task to the extension executor
extensionExecutorService.submit(new Runnable() {
@Override
public void run() {
//call an external service to decide the outcome
final boolean result = callExternalTaks();
if (result) {
output.authorizeSuccessfully();
} else {
output.failAuthorization();
}
//Always resume the async output, otherwise it will time out
async.resume();
}
});
}
}
Extension Isolation
The HiveMQ Extension SDK uses extension isolation. That means, each extension has its own classloader and resources and classes cannot be shared between extensions. This adds additional security for your extension and helps to avoid tricky to debug failure behavior that could be caused by using the same libraries in different extensions.
Some libraries like Jersey are trying and want to interfere with other class loaders, which may lead to side effects. HiveMQ implements workarounds for such libraries internally. If you are using such a library, which isn’t covered yet by HiveMQ, please contact support@hivemq.com so we can include workarounds for such libraries.