Introduction

Since version 1.4, HiveMQ offers a free and open source plugin SDK with service provider interfaces. This allows everyone to extend HiveMQ and add custom functionality via plugins.

With custom HiveMQ plugins, it’s easy to add functionality like writing messages to databases, integrate with other service buses, collect statistics, add fine-grained security and virtually anything else you can imagine.

Since version 1.5, there is even deeper integration to the HiveMQ core with the HiveMQ Services concept. See the HiveMQ Services chapter for more information.

HiveMQ 3.0 brought major improvements for the plugin system in all aspects and allows even deeper integration into HiveMQ message processing and lifecycle.

Plugin development for HiveMQ is as easy as writing a Java main method once you grasp the core concepts. This documentation covers all relevant topics to get you started as quickly as possible. If you prefer to learn from real world plugins, visit the HiveMQ Github page or use the Maven Archetype for plugins.

General Concepts

A HiveMQ plugin is essentially a Java JAR file which is dropped in into the plugins folder of the HiveMQ installation. HiveMQ utilizes the standard Java Service Loader mechanism [1].

Using Dependency Injection

The recommended way of developing plugins is by using Dependency Injection. HiveMQ utilizes Google Guice as dependency injection provider. Typically most plugins will use the JSR 330 API like the javax.inject.Inject annotation.

HiveMQ plugins (and all dependent classes) can be written with constructor injection, field injection and method injection. Constructor injection tends to be the most convenient way as it allows maximum testability.

It is also possible to use annotations from JSR 250 aka Lifecycle Management Annotations in your plugin. You can annotate a method in your class with @javax.annotation.PostConstruct and @javax.annotation.PreDestroy to hook into the lifecycle.

Table 1. Commonly used annotations for HiveMQ plugins
Name Target Description

@javax.inject.Inject

Constructor, Field, Method (Setter)

Injects the object(s) specified.

@javax.inject.Singleton

Class, Method (Provider)

Defines that an object is a Singleton.

@javax.annotation.PostConstruct

Method

Executes this method after all injection took place.

@javax.annotation.PreDestroy

Method

Executes shutdown logic when HiveMQs LifecycleManager shuts down. This is only the case when HiveMQ itself shuts down.

@com.google.inject.Provides

Method

Used for Provider Methods defined in the HiveMQPluginModule.


The following shows an example of a constructor injection in the PluginEntryPoint of the HiveMQ plugin. It also shows the use of a javax.annotation.PostConstruct annotation to execute a method after the injection (and the constructor call) is finished.

Constructor Injection and Lifecycle Example
import com.hivemq.spi.PluginEntryPoint;
import javax.annotation.PostConstruct;
import javax.inject.Inject;

public class TestPlugin extends PluginEntryPoint {

    private final MyDependency myDependency;

    @Inject (1)
    public TestPlugin(MyDependency myDependency) {
        this.myDependency = myDependency;
    }

    @PostConstruct (2)
    public void postConstruct() {

        myDependency.doSomething();
    }
}
1 Constructor Injection
2 JSR 250 Lifecycle method which runs after injections took place
Don’t like dependency injection?
Although not recommended, it is possible to write HiveMQ plugins the old fashioned Java way without dependency injection. Please make sure that you have a no-argument constructor in your PluginEntryPoint subclass. Note that you won’t be able to use the Services APIs without dependency injection.

Plugin Project Structure

Minimal Plugin Project Structure
Figure 1. HiveMQ Plugin Structure

For the most minimal HiveMQ plugin you need at least two different classes and a text file for the service loader mechanism:

  1. A class which extends com.hivemq.spi.HiveMQPluginModule

  2. A class which extends com.hivemq.spi.PluginEntryPoint

  3. A textfile which resides in META-INF/services and its name is com.hivemq.spi.HiveMQPluginModule

Using the Maven Archetype
There is a Maven Archetype available which generates all classes and files needed for a plugin.
HiveMQPluginModule

Your class which extends com.hivemq.spi.HiveMQPluginModule is responsible for bootstrapping your module. That means, it is responsible for the following things:

  • Define bindings for your dependency injection.

  • Declaring a class which is your Plugin Entry Point. This class acts as your plugin business logic "main method".

Using Guice Provider methods
You can use standard Guice provider methods in your implementation of com.hivemq.spi.HiveMQPluginModule.

The following shows the most simple implementation of com.hivemq.spi.HiveMQPluginModule.

Example com.hivemq.spi.HiveMQPluginModule implementation
import com.hivemq.spi.HiveMQPluginModule;
import com.hivemq.spi.PluginEntryPoint;

public class TestPluginModule extends HiveMQPluginModule {

    @Override
    protected void configurePlugin() {
	(1)
    }

    @Override
    protected Class<? extends PluginEntryPoint> entryPointClass() {
        return TestPlugin.class; (2)
    }
}
1 You can wire your dependencies here with standard Guice bindings.
2 This is your plugins main entry class.
PluginEntryPoint

Your class which implements com.hivemq.spi.PluginEntryPoint can be seen as the "main method" of your plugin. The constructor and the @PostConstruct annotated method are called when the HiveMQ server starts up.

You can use Dependency Injection and inject all the dependencies you wired up in your HiveMQPluginModule subclass. It’s even possible to inject some HiveMQ specific objects like the CallbackRegistry.

Example com.hivemq.spi.PluginEntryPoint implementation
import com.hivemq.spi.PluginEntryPoint;

import javax.annotation.PostConstruct;
import javax.inject.Inject;

public class TestPlugin extends PluginEntryPoint {

    private final MyDependency dependency;

    @Inject
    public TestPlugin(MyDependency dependency) { (1)
        this.dependency = dependency;
    }

    @PostConstruct
    public void postConstruct() { (2)
        dependency.doSomething();
    }
}
1 You can inject any dependency with constructor injection.
2 This method gets executed after the injections are done and the constructor was executed.
META-INF/services/com.hivemq.spi.HiveMQPluginModule file

To enable HiveMQ to detect your plugin as a plugin, a text file called com.hivemq.spi.HiveMQPluginModule has to be created in META-INF/services. The contents of the file is one line which contains the fully qualified class name of your HiveMQPluginModule subclass.

Example of META-INF/services/com.hivemq.spi.HiveMQPluginModule contents
com.hivemq.testplugin.TestPluginModule

CallbackRegistry

Typically most HiveMQ plugins implemented at least one Callback. See the Callbacks Chapter for more information on the concrete callbacks.

Registering your own callbacks is pretty easy from your PluginEntryPoint implementation. You can call getCallbackRegistry() to get the callback registry.

Example of registering a callback
import com.hivemq.spi.PluginEntryPoint;


public class TestPlugin extends PluginEntryPoint {

    public TestPlugin() {

        getCallbackRegistry().addCallback(new MyPublishReceivedCallback());
    }
}
Getting a reference to the callback registry in other classes
When you want to use the callback registry in other classes than your PluginEntryPoint class, you can just inject it. If you don’t like to use dependency injection, you have to pass the CallbackRegistry manually.

Plugin Information

It is strongly recommended to annotate your HiveMQPluginModule with com.hivemq.spi.plugin.meta.Information for additional metadata about the plugin. HiveMQ uses these metadata for monitoring and information purposes. When starting HiveMQ with your plugin, HiveMQ will log your defined plugin name and version on startup.

If you forget to add the @Information annotation to your HiveMQPluginModule implementation, HiveMQ will log a warning on startup.
Example of using @Information
import com.hivemq.spi.HiveMQPluginModule;
import com.hivemq.spi.plugin.meta.Information;

@Information(
        name = "My test plugin",
        author = "John Doe",
        version = "1.0",
        description = "A test plugin to show the HiveMQ plugin capabilities")
public class TestPluginModule extends HiveMQPluginModule {
…
}

Logging

For logging purposes, HiveMQ plugins are encouraged to utilize the excellent SLF4J API. When using SLF4J, plugins don’t have to wrestle with logging configuration as the HiveMQ server takes care of it. No surprises here, just use the standard SLF4J API.

Example of using a SLF4J Logger
import com.hivemq.spi.PluginEntryPoint;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestPlugin extends PluginEntryPoint {

    private static final Logger log = LoggerFactory.getLogger(TestPlugin.class); (1)

    public TestPlugin() {

        log.info("Hello, I'm logging here");
    }
}
1 Standard SLF4J Logger creation
LogService
You can change the logging level of HiveMQ programmatically with the Log Service.

Don’t block!

The single most important rule for all plugins is: Don’t block in callbacks. Never. You can, use the com.hivemq.spi.services.PluginExecutorService for everything which potentially blocks in callbacks. By the way: It’s also a great idea to use Connection Pools (e.g. HikariCP) if you are dealing with databases.

The following shows an example code which uses the PluginExecutorService for nonblocking behaviour.

Example code for nonblocking actions
import com.hivemq.spi.PluginEntryPoint;
import com.hivemq.spi.callback.events.broker.OnBrokerStart;
import com.hivemq.spi.callback.exception.BrokerUnableToStartException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.PostConstruct;
import javax.inject.Inject;
import com.hivemq.spi.services.PluginExecutorService;

import static com.hivemq.spi.callback.CallbackPriority.HIGH;


public class TestPlugin extends PluginEntryPoint {

    private static final Logger log = LoggerFactory.getLogger(TestPlugin.class);

    private final ExecutorService executorService;

    @Inject
    public TestPlugin(final PluginExecutorService executorService) { (1)
        this.executorService = executorService;
    }

    @PostConstruct
    public void postConstruct() { (2)
        getCallbackRegistry().addCallback(new OnBrokerStart() { (3)
            @Override
            public void onBrokerStart() throws BrokerUnableToStartException {
                executorService.submit(new Runnable() { (4)
                    @Override
                    public void run() {
                        log.info("Starting long operation");
                        try {
                            Thread.sleep(5000); (5)
                        } catch (InterruptedException e) {
                            log.error("Error", e);
                        }
                        log.info("Stopping long operation");

                    }
                });
            }

            @Override
            public int priority() {
                return HIGH; (6)
            }
        });
    }
}
1 Injecting the PluginExecutorService
2 After the injections are done this method gets executed
3 We add an anonymous inner class which implements the OnBrokerStart Callback
4 We submit a Runnable — which gets executed asynchronously — to the ExecutorService
5 This is our long running method. We simulate it with a simple Thread.sleep.
6 This defines that our Callback has High Priority compared to other OnBrokerStart callbacks for HiveMQ.

Caching

The single most important rule in HiveMQ plugins is: Don’t block. Under some circumstances you have to block, for example when you are waiting of responses from your database, a REST API call or when reading something from the filesystem. If you are doing this in a callback which is called very often (like the OnPublishReceivedCallback), you most likely want to implement caching.

In general, all plugins are responsible for proper caching by themselves.

HiveMQ offers the convenient annotation com.hivemq.spi.aop.cache.Cached which caches the return values of methods. This annotation also respects the parameters you pass to the method and will only cache return values for method executions with the same parameters. You can define a duration how long the method return values are cached.

Here is an example how the method caching works in a plugin:

Example of a plugin which uses Caching
import com.hivemq.spi.PluginEntryPoint;
import com.hivemq.spi.aop.cache.Cached;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.PostConstruct;
import javax.inject.Inject;
import java.util.Date;
import com.hivemq.spi.services.PluginExecutorService;
import java.util.concurrent.TimeUnit;


public class TestPlugin extends PluginEntryPoint {

    private static final Logger log = LoggerFactory.getLogger(TestPlugin.class);
    private final PluginExecutorService pluginExecutorService;

    @Inject
    public TestPlugin(PluginExecutorService pluginExecutorService) {
        this.pluginExecutorService = pluginExecutorService;
    }

    @PostConstruct
    public void postConstruct() {
        pluginExecutorService.scheduleAtFixedRate(new Runnable() { (1)
            @Override
            public void run() {
                log.info(getDate().toString()); (2)
            }
        }, 1, 1, TimeUnit.SECONDS); (3)
    }


    @Cached(timeToLive = 10, timeUnit = TimeUnit.SECONDS) (4)
    public Date getDate() {
        return new Date();
    }
}
1 We are scheduling a new Runnable Task which gets executed at a fixed rate
2 We just output the the value from the getDate() method to verify this was actually cached
3 We are scheduling this task every second
4 Here we define that the result of the method has to be cached for 10 seconds.
The caching with the com.hivemq.spi.aop.cache.Cached annotation only works on objects created by the Dependency Injection Container. That means if you instantiate a class with that annotation by yourself, the caching won’t work.

For more fine grained caching, we recommend to implement caching on your own and use a full fledged caching library. For simple caching, you can utilize Google Guava (included as dependency in the SPI).

Measuring execution times

HiveMQ offers several convenient annotations to monitor your plugins. The data gathered for the annotated methods is stored in the HiveMQ MetricRegistry.

Name Description

Counted

Methods annotated with this annotation are added to the HiveMQ MetricRegistry automatically as Counters. The invocation of the annotated methods are counted

ExceptionMetered

Methods annotated with this annotation are added to the HiveMQ MetricRegistry automatically as Meters. The exceptions thrown by the annotated method are counted

Metered

Methods annotated with this annotation are added to the HiveMQ MetricRegistry automatically as Meters. The methods annotated with this Annotation get instrumented as Meters.

Timed

Methods annotated with this annotation are added to the HiveMQ MetricRegistry automatically as Timers. The invocation times of the methods annotated with this Annotation are captured automatically.

Example of a method which uses Timed
@Timed(name = "get.date.duration")
public Date getDate() {
    return new Date();
}
These annotations only work on objects created by the Dependency Injection Container. That means if you instantiate a class with these annotations by yourself, the measurements won’t work.

Plugin Isolation

Since HiveMQ version 2.1.0, HiveMQ uses plugin isolation. That means, each plugin has its own classloader and resources and classes cannot be shared between plugins. This adds additional security for your plugin and avoids nasty bugs if you include the same libraries in different plugins.

Some libraries like Jersey are nasty and want to interfere with other class loaders, so this can lead to side effects. HiveMQ implements workarounds for such libraries internally. If you are using such a library which isn’t covered yet by HiveMQ, please contact support@hivemq.com so we can include workarounds for such libraries.

Miscellaneous

Don’t use experimental APIs
Some APIs are marked with the annotation @Experimental. These APIs are subject to change and you should prepare for updating your plugin with a new HiveMQ release. Typically these APIs aren’t documented in this plugin guide but can be found in the source code.

Get started quickly with the Plugin Maven Archetype

So if you just read the theoretical parts of the documentation or you want to dive into the plugin development quickly, you should read this chapter carefully. It will be explained what a Maven Archetype is, how you can benefit from it and how to use it.

What is a Maven Archetype?

A Maven archetype is a template tooling, which allows to create a template for a Maven project. This template can then be used to create a new Maven project. So in short the HiveMQ Plugin Archetype provides you with a fully functional HelloWorld plugin to get started with developing your own plugins. This is by far the simplest way to get started with plugin development. If you want to gain more insights on Maven and/or the Maven archetype, please refer to the official Maven guides and the archetype guide.

Maven Central
All HiveMQ dependencies, including the maven archetype, are available in Maven central.

Creating your first Plugin Project from the command line

  • Open a terminal and switch to the folder in which you want to create a new project folder with the plugin source files.

Project Structure
The generation process will create a folder with all the necessary files inside the directory in which the following command is executed.
  • Make sure you have Apache Maven available (for instructions look here)

  • Execute Archetype command

    Generate a new Maven project with the archetype
    	mvn archetype:generate -DarchetypeGroupId=com.hivemq -DarchetypeArtifactId=hivemq-plugin-archetype -DarchetypeVersion=3.0.0
  • Specifiy the common Maven identifiers for the new project in the prompt: GroupId, ArtifactId, Version

  • Congrats, you have just created your first HiveMQ plugin :)

Next Steps
If you want to learn more about the generated HelloWorld example, please read the provided JavaDoc. For information on how to run and debug your new plugin see Development with Maven Plugin.

Creating your first Plugin Project using IntelliJ IDEA

  • Go to File → New Project and select Maven

  • In the next step click Add archetype... and fill in the details

    • Archetype Group Id: com.hivemq

    • Archetype Artifact Id: hivemq-plugin-archetype

    • Archetype Version: 3.0.0

Add Archetype
  • Click OK and select the Archetype in the list

Select Archetype and enter own GroupId
  • Done! IntelliJ is now creating the project and resolving the Maven dependencies.

Finished project
Want to run your newly created plugin on HiveMQ?
The next chapter explains how to run and debug plugins within your IDE. Go directly to the IntelliJ section.

Creating your first Plugin Project using Eclipse

  • Make sure you have the Eclipse IDE for Java Developers, otherwise Maven is not included by default

  • Go to File → New → Project and select Maven Project

  • In the next step click Add archetype... and fill in the details

    • Archetype Group Id: com.hivemq

    • Archetype Artifact Id: hivemq-plugin-archetype

    • Archetype Version: 3.0.0

Add  Archetype
  • Click Next, enter the desired Group Id, Artifact Id, Version for your plugin and click Finish

Select Archetype in list
  • Done! Eclipse will create the project and resolve all dependencies!

Finished project
Want to run your newly created plugin on HiveMQ?
The next chapter explains how to run and debug plugins within your IDE. Go directly to the Eclipse section.

Development with the Maven Plugin

This chapter introduces the HiveMQ Maven plugin, which is a great help for any developer throughout the development lifecycle of a HiveMQ plugin.

Introduction

The plugin must to be packaged as a JAR file (see Packaging & Deployment) and copied manually to the HiveMQ plugin folder (<HiveMQHome>/plugins) in order to initialize the plugin on HiveMQ startup. This process is very inconvenient and not suitable for a fast switching between developing and manual testing. Another set-back is that debugging can not be applied to a plugin in the way it would be possible with a normal Java application. The reason is that the plugin can not be debugged without a running HiveMQ instance. All these described scenarios are essential for an effective plugin development, and that’s why we created the HiveMQ Maven Plugin to solve these problems.

The Maven plugin helps developers to

  • easily run their developed plugin on HiveMQ for testing purposes

  • successfully debug their plugin

Functionality

The Maven plugin automates the above stated steps, which are necessary to run a plugin on a real HiveMQ instance. Therefore you need to have a HiveMQ instance located on your development machine, which the plugin can use to deploy the plugin under development to. When the Maven plugin is invoked it will create an ad-hoc plugin directory in the maven build directory (this can also be customized, see the configration options) and will move the plugin jar file to the newly created folder. If you want to test configuration files for your plugin, please put them in src/main/resources, because then they are copied to the ad-hoc plugins folder as well.

Plugin folder of HiveMQ, when working with the Maven Plugin
The plugin folder of the HiveMQ instance, used from the Maven Plugin, is not used. Plugins or configuration files located there won’t be used when starting HiveMQ from within the Maven Plugin.

Afterwards the HiveMQ located in the specified directory (need to be declared using the configration options) will be started with the ad-hoc plugin folder. The Maven plugin will then show the HiveMQ console output. This empowers plugin developers with an easy way for testing newly created plugins. Additionally it provides the capability to debug the plugin during runtime using the Java remote application debugging mechanism. Java Remote Debugging is a client/server concept, in which both parties, application (HiveMQ with plugin) and IDE, are able to be the server or the client. Each scenario brings other advantages and disadvantages:

  • HiveMQ with Plugin: Server; IDE: Client

    • Advantage

      • Easier handling, because HiveMQ can be started without IDE debugger being started first.

    • Disadvantage

      • If the code you want to debug is executed at startup, then you have to be quick in starting the IDE client debugger, otherwise the code you wish to debug is already been passed through.

  • HiveMQ with Plugin: Client; IDE: Server

    • Advantage

      • It is possible to debug code at startup

    • Disadvantage

      • Before the HiveMQ can be started, your IDEs debugging process need to be up and running.

Both of these scenarios have a right to exist and can be switch through the configuration options of the Maven plugin. It is also possible to specify the port on which they connect and the host name of the server, if HiveMQ is running in client mode.

Best Practice
Use CLIENT mode, when you want to debug code in your callback and only use the SERVER mode, when debugging the Plugin Module or Plugin Entry Point is necessary.

This was just a quick overview over Java Remote Debugging in the context of HiveMQ plugin development, for more insights see this link.

Usage

As an effective way to use the plugin, an own Maven profile attached to the Maven package goal has been proven to be a great workflow. This triggers the plugin to be ran on HiveMQ everytime the Maven package goal is executed and the profile RunWithHiveMQ is active. In order to make it work as described the following snippet has to be added to the pom.xml in the root folder of the plugin project. This is already part of the project if the provided Maven Archetype was used. The only change which needs to be made is to provide the correct HiveMQ home folder.

IDE Support
For more information on how to do this with you favorite IDE look here.
HiveMQ Maven Plugin Usage
<project....>
...

 <profiles>
        <profile>
            <id>RunWithHiveMQ</id>
            <build>
                <plugins>
                    <plugin>
                        <groupId>com.hivemq</groupId>
                        <artifactId>hivemq-maven-plugin</artifactId>
                        <version>3.0.0</version>
                        <executions>
                            <execution>
                                <id>hivemq</id>
                                <phase>package</phase> (3)
                                <goals>
                                    <goal>hivemq</goal>
                                </goals>

                                <configuration>(1)
                                    <hiveMQDir>
                                        /Applications/hivemq (2)
                                    </hiveMQDir>
                                </configuration>

                            </execution>
                        </executions>
                    </plugin>
                </plugins>
            </build>
        </profile>
    </profiles>
...
</project>
1 All configuration properties for the Maven plugin have to be placed in here.
2 Please specify the HiveMQ home directory. If you have not downloaded HiveMQ yet, please download the latest HiveMQ.
3 Here you can change the Maven phase during which the plugin is executed, it only makes sense to do this in package or later phases, because during package the plugin jar file is generated.

Configuration Options

Do I need additional configuration for the Maven plugin?
In general, the Maven plugin has reasonable defaults, but sometimes it is necessary to adjust some settings or just to get an overview of the default settings. The HiveMQ directory configuration property is the only mandatory one.
Table 2. Configuration Options
Name Default Required Description

hivemqDir

true

This need to be set to your local HiveMQ directory. If you have not downloaded HiveMQ yet, please download the latest HiveMQ.

pluginJarName

{artifactId}-{version}.jar

false

The name of the plugin jar file.

pluginDir

${project.build.directory}

false

The directory in which your plugin jar file is located.

hivemqJar

hivemq.jar

false

The name of the HiveMQ jar file in the bin directory.

verbose

true

false

This property specifies, whether the messages logged from HiveMQ to the console should be shown or not.

noPlugins

false

false

When this is set to true, HiveMQ will start without using the plugin, which is currently under development. A possible use case would be to compare the behaviour of HiveMQ with and without the plugin.

debugMode

SERVER

false

Specifies the debug mode of the plugin. Valid values are NONE, SERVER and CLIENT. Use NONE for starting HiveMQ + plugin without debugging. When you want to debug the bootstrapping part of your plugin use CLIENT, otherwise user SERVER. For more insights about debug modes see the functionality section. And make sure to configure your IDE corresponding to your debug mode.

debugPort

5005

false

The debug port on which the debug server starts when in SERVER mode or the client connects to when in CLIENT mode.

debugServerHostName

localhost

false

If the debugMode is CLIENT, through this property it is possible to specify the host name of the debug server (for example the machine your IDE is running on). This is only needed if you want to remotely debug your plugin on another machine, see also the functionality section.

Workflow with common IDEs

Prerequisites
You can use any plugin, where the HiveMQ Maven plugin is configured, which means that the pom.xml should contain the snippet shown here. The simplest method is to create your own plugin HelloWorld project with the provided Maven Archetype. Be advised to change the HiveMQ home directory!

IntelliJ IDEA

Run HiveMQ with Plugin
  • Open the pom.xml and create a new configuration element debugMode in the Maven Plugin and set value to NONE

    Run-Plugin-In-IntelliJ-Step-1
  • Run Maven Goal package and make sure Profile RunWithHiveMQ is selected

    Run-Plugin-In-IntelliJ-Step-2
Debug Plugin in Server Mode
  • Create new configuration element debugMode and set value to SERVER, Run Maven Goal package and make sure Profile RunWithHiveMQ is selected

    Debug-Plugin-In-IntelliJ-Server-Mode-Step-1
  • Create a new Run Configuration

    Debug-Plugin-In-IntelliJ-Server-Mode-Step-2
  • Select Remote Configuration

    Debug-Plugin-In-IntelliJ-Server-Mode-Step-3
  • Make sure the Transport is set to Socket, Mode is set to Attach and the port is 5005.

    Debug-Plugin-In-IntelliJ-Server-Mode-Step-4
  • Run the newly created Configuration

    Debug-Plugin-In-IntelliJ-Server-Mode-Step-5
  • Wait until the Debugger Console opens and shows Connected

    Debug-Plugin-In-IntelliJ-Server-Mode-Step-6
Debug Plugin in Client Mode
  • Create new configuration element debugMode and set value to CLIENT and create a new Run Configuration

    Debug-Plugin-In-IntelliJ-Client-Mode-Step-1
  • Select Remote Configuration

    Debug-Plugin-In-IntelliJ-Client-Mode-Step-2
  • Make sure the Transport is set to Socket, Mode is set to Listen and the port is 5005.

    Debug-Plugin-In-IntelliJ-Client-Mode-Step-3
  • Run the newly created Configuration

    Debug-Plugin-In-IntelliJ-Client-Mode-Step-4
  • Wait until the Debugger Console opens and shows Connected, then run Maven package

    Debug-Plugin-In-IntelliJ-Client-Mode-Step-5
  • Wait until HiveMQ is started

    Debug-Plugin-In-IntelliJ-Client-Mode-Step-6
  • Switch to the Debug window and check if it shows Connected

    Debug-Plugin-In-IntelliJ-Client-Mode-Step-7

Eclipse

Maven Goal Package
  • Create new Run Configuration for Maven Goal

    Run-Maven-Package-In-Eclipse-Step-1
  • Insert Maven Goal package und Profile RunWithHiveMQ

    Run-Maven-Package-In-Eclipse-Step-2
  • Click apply and you are done!

Run HiveMQ with Plugin
  • Open the pom.xml and create a new configuration element debugMode in the Maven Plugin and set value to NONE

    Run-Plugin-In-Eclipse-Step-1
  • Click on the Run icon to run the above created Run Configuration.

  • Done! HiveMQ is starting in the Maven console.

Debug Plugin in Server Mode
  • Create new configuration element debugMode and set value to SERVER; Run Maven Goal package

    Debug-Plugin-In-Eclipse-Server-Mode-Step-1
  • Create a new Debug Configruation: Remote Java Application

    Debug-Plugin-In-Eclipse-Server-Mode-Step-2
  • Make sure Connection Type is Socket Attach, the host is localhost, the port is 5005 and then click Debug

    Debug-Plugin-In-Eclipse-Server-Mode-Step-3
  • Change to the Debug Perspective and check if the debugger is running

    Debug-Plugin-In-Eclipse-Server-Mode-Step-4
Debug Plugin in Client Mode
  • Create a new Debug Configruation: Remote Java Application

    Debug-Plugin-In-Eclipse-Client-Mode-Step-1
  • Make sure Connection Type is Socket Listen, the port is 5005 and then click Debug

    Debug-Plugin-In-Eclipse-Client-Mode-Step-2
  • Change to the Debug Perspective and check if the debugger is listening

    Debug-Plugin-In-Eclipse-Client-Mode-Step-3
  • Create new configuration element debugMode and set value to CLIENT; Run the Maven Goal package

    Debug-Plugin-In-Eclipse-Client-Mode-Step-4
  • Wait until HiveMQ is started

    Debug-Plugin-In-Eclipse-Client-Mode-Step-5
  • Change to the Debug Perspective and check if the debugger is connected

    Debug-Plugin-In-Eclipse-Client-Mode-Step-6

Callbacks

A core concept of HiveMQ plugin development is using the callbacks HiveMQ provides to execute custom business logic on events when they occur.

To hook your callback implementations into HiveMQ, you can use the Callback Registry.

Example of Registering a Plugin Callback
import com.hivemq.spi.PluginEntryPoint;
import com.hivemq.spi.callback.lowlevel.OnPingCallback;
import com.hivemq.spi.security.ClientData;

import javax.annotation.PostConstruct;

public class TestPlugin extends PluginEntryPoint {

    @PostConstruct
    public void postConstruct() {
        getCallbackRegistry().addCallback(new OnPingCallback() { (1)
            @Override
            public void onPingReceived(ClientData clientData) {
                System.out.println("Ping received");
            }
        });
    }
}
1 Registering the anonymous inner callback class
Callback Hierarchy
Figure 2. The Callback Hierarchy

Every callback interface which can be implemented is a Asynchronous or Synchronous callback.

The only difference which matters for plugin implementors is, that Synchronous Callbacks have priorities. Priorities define the order of the callback execution when there is more than one implementation of a callback. Use the constants from the com.hivemq.spi.callback.CallbackPriority class or define your own numerical return value. Lower values mean higher priority.

Synchronous callbacks are executed in order because they typically allow to interfere with the message processing/authentication/authorization mechanism of HiveMQ. Naturally, it is very important to prevent blocking in Synchronous Callbacks.

Synchronous Callback Execution
Figure 3. Synchronous Callback Execution

Asynchronous Callbacks are executed in parallel. HiveMQ synchronizes after the parallel execution. That means, that a slow callback can block HiveMQ, although all other callbacks are already finished.

Asynchronous Callback Execution
Figure 4. Asynchronous Callback Execution
Same priorities on callbacks
If callbacks have the same priority, the execution order of these callbacks is not predictable.

Overview of all Callbacks

Table 3. Callbacks
Name Type Description

Broker Event Callbacks

OnBrokerStart

Synchronous

Called when the broker starts up. This happens before bootstrapping functionality like binding network interfaces.

OnBrokerStop

Synchronous

Called when the broker stops.

MQTT Message Callbacks

OnConnectCallback

Synchronous

Called when a CONNECT message arrives.

OnDisconnectCallback

Asynchronous

Called when a DISCONNECT message arrives or a TCP connection loss occurs.

OnPublishReceivedCallback

Synchronous

Called when a PUBLISH MQTT message arrives.

OnPublishSendCallback

Asynchronous

Called when a the broker is sending a PUBLISH MQTT message.

OnSubscribeCallback

Synchronous

Called when a MQTT SUBSCRIBE message arrives.

OnUnsubscribeCallback

Asynchronous

Called when a MQTT UNSUBSCRIBE message arrives.

OnConnackSend

Asynchronous

Called when a the broker is sending a MQTT CONNACK message.

OnPingCallback

Asynchronous

Called when a MQTT PINGREQ message arrives.

OnPubackReceived

Asynchronous

Called when a MQTT PUBACK message arrives.

OnPubackSend

Asynchronous

Called when the broker sent a MQTT PUBACK message.

OnPubcompReceived

Asynchronous

Called when a MQTT PUBCOMP message arrives.

OnPubcompSend

Asynchronous

Called when the broker sent a MQTT PUBCOMP message.

OnPubrecReceived

Asynchronous

Called when a MQTT PUBREC message arrives.

OnPubrecSend

Asynchronous

Called when the broker sent a MQTT PUBREC message.

OnPubrelReceived

Asynchronous

Called when a MQTT PUBREL message arrives.

OnPubrelSend

Asynchronous

Called when the broker sent a MQTT PUBREL message.

OnSubackSend

Asynchronous

Called when the broker sent a MQTT SUBACK message.

OnUnsubackSend

Asynchronous

Called when the broker sent a MQTT UNSUBACK message.

Security Callbacks

AfterLoginCallback

Asynchronous

Called when a client made and successful or unsuccessful login attempt.

OnAuthenticationCallback

Synchronous

Called after a CONNECT message arrived to check the credentials

OnAuthorizationCallback

Synchronous

Returns MqttTopicPermissions when a client publishes or subscribes.

OnInsufficientPermissionDisconnect

Asynchronous

Gets called when a client was disconnected due to insufficient permissions when publishing or subscribing.

RestrictionsAfterLoginCallback

Synchronous

Gets executed after a CONNECT message arrives and the client was authenticated successfully.

Other Callbacks

ClusterDiscoveryCallback

Asynchronous

Gets called periodically by HiveMQ to discover other HiveMQ cluster nodes.

ScheduledCallback

Asynchronous

Gets executed periodically based on a quartz-style cron configuration.

OnBrokerStart

Type

Synchronous

Purpose

The com.hivemq.spi.callback.events.broker.OnBrokerStart callback is useful for implementing custom startup logic and verifications. A common use case is for example verifying that a database connection can be obtained or that expected files are available and valid.

Interfering with HiveMQ

It is possible to throw a com.hivemq.spi.callback.exception.BrokerUnableToStartException. When the onBrokerStart() method of the OnBrokerStart callback implementation throws this exception, HiveMQ refuses to start.

OnBrokerStartCallback Example
import com.google.inject.Inject;
import com.hivemq.spi.callback.CallbackPriority;
import com.hivemq.spi.callback.events.broker.OnBrokerStart;
import com.hivemq.spi.callback.exception.BrokerUnableToStartException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HiveMQStart implements OnBrokerStart {

    private static final Logger log = LoggerFactory.getLogger(HiveMQStart.class);


    @Override
    public void onBrokerStart() throws BrokerUnableToStartException {
        log.info("HiveMQ is starting");

        if (checkStartupSuccessful()) {
            log.info("Startup check was successful");
        } else {
            throw new BrokerUnableToStartException("Could not start HiveMQ :("); (1)
        }
    }

    @Override
    public int priority() {
        return CallbackPriority.MEDIUM;
    }
}
1 Throwing the BrokerUnableToStartException prevents HiveMQ from starting.

OnBrokerStop

Type

Synchronous

Purpose

The com.hivemq.spi.callback.events.broker.OnBrokerStop callback is useful for implementing custom shutdown logic. A common use case is for example shutting down a database connection.

Interfering with HiveMQ

It’s not possible to interfere with HiveMQ directly with this callback.

OnBrokerStopCallback Example
import com.hivemq.spi.callback.CallbackPriority;
import com.hivemq.spi.callback.events.broker.OnBrokerStop;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.inject.Inject;
import java.io.FileOutputStream;
import java.io.IOException;

public class HiveMQStop implements OnBrokerStop {

    final Logger log = LoggerFactory.getLogger(HiveMQStop.class);
    final FileOutputStream fileOutputStream;

    @Inject
    public HiveMQStop(final FileOutputStream fileOutputStream) {
        this.fileOutputStream = fileOutputStream;
    }

    @Override
    public void onBrokerStop() {
        try {
            fileOutputStream.close();
        } catch (IOException e) {
            log.warn("Output stream could not be closed on broker shutdown.", e);
        }
    }

    @Override
    public int priority() {
        return CallbackPriority.MEDIUM;
    }
}

OnConnectCallback

Type

Synchronous

Purpose

The com.hivemq.spi.callback.events.OnConnectCallback is useful for performing custom logic when a MQTT CONNECT message arrives. A common use case is for example logging when a client connects. Useful edge cases fort his callbacks are when you need to verify against some parts of the CONNECT message like the LWT part.

Interfering with HiveMQ

It is possible to throw a com.hivemq.spi.callback.exception.RefusedConnectionException to disconnect a client with a specific return code for the CONNACK message. A CONNECT message object and a ClientData object — which contains information about the credentials and the optional client authentication certificate — are passed as parameters to the onConnect method.

Although possible for scenarios with only one authentication resource, this callback is not designed to perform authentication, please use the OnAuthenticationCallback for this purpose.
OnConnectCallback Example
import com.hivemq.spi.callback.CallbackPriority;
import com.hivemq.spi.callback.events.OnConnectCallback;
import com.hivemq.spi.callback.exception.RefusedConnectionException;
import com.hivemq.spi.message.CONNECT;
import com.hivemq.spi.message.ReturnCode;
import com.hivemq.spi.security.ClientData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ClientConnect implements OnConnectCallback {

    Logger log = LoggerFactory.getLogger(ClientConnect.class);

    @Override
    public void onConnect(CONNECT connect, ClientData clientData) throws RefusedConnectionException {
        if (clientData.getClientId().equals("bad id")) {
            throw new RefusedConnectionException(ReturnCode.REFUSED_IDENTIFIER_REJECTED);
        }
        log.info("Client {} is connecting", clientData.getClientId());
    }

    @Override
    public int priority() {
        return CallbackPriority.MEDIUM;
    }
}

OnDisconnectCallback

Type

Asynchronous

Purpose

The com.hivemq.spi.callback.events.OnDisconnectCallback is useful for performing custom logic when a client disconnects due to a MQTT DISCONNECT message or a TCP connection loss.

If the client disconnected due to a connection loss, the parameter abruptDisconnect is true. If the client disconnected gracefully, the parameter is false.

Interfering with HiveMQ

It’s not possible to interfere with HiveMQ directly with this callback. The abruptAbort parameter indicates if there was a TCP connection loss (abruptAbort = true) or a graceful disconnect with a MQTT DISCONNECT message.

OnDisconnectCallback Example
import com.hivemq.spi.callback.events.OnDisconnectCallback;
import com.hivemq.spi.security.ClientData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ClientDisconnect implements OnDisconnectCallback {

    final Logger log = LoggerFactory.getLogger(ClientDisconnect.class);

    @Override
    public void onDisconnect(ClientData clientData, boolean abruptDisconnect) {
        if (abruptDisconnect) {
            log.warn("Connection to client {}, with ip {}, was lost abruptly.", clientData.getClientId(), clientData.getInetAddress());
        } else {
            log.info("Client {} is disconnected.", clientData.getClientId());
        }
    }
}

OnPublishReceivedCallback

Type

Synchronous

Purpose

The com.hivemq.spi.callback.events.OnPublishReceivedCallback gets called when a PUBLISH MQTT message arrives. This callback is useful for validating the PUBLISH messages, e.g. verify that the message has special payload format. Don’t use this callback for Topic based permissions.

This callback gets called very often. Please make sure you use proper caching and that you don’t block.
Interfering with HiveMQ

It’s possible to throw a com.hivemq.spi.callback.exception.OnPublishReceivedException when the published message is not valid from a business logic point of view. It’s possible to disconnect the publishing client by passing true as parameter to the OnPublishReceivedException.

This callback is not meant to be used for topic restrictions. Please see the Client Authorization Chapter for more details. Also if more than on plugin is throwing an OnPublishReceivedException for the same callback, only the first one will be handled. Therefore the first exception decides, whether the client gets disconnected or not.
OnPublishReceivedCallback Example
import com.google.common.base.Charsets;
import com.hivemq.spi.callback.CallbackPriority;
import com.hivemq.spi.callback.events.OnPublishReceivedCallback;
import com.hivemq.spi.callback.exception.OnPublishReceivedException;
import com.hivemq.spi.message.PUBLISH;
import com.hivemq.spi.security.ClientData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PublishReceived implements OnPublishReceivedCallback {

    final Logger logger = LoggerFactory.getLogger(PublishReceived.class);

    @Override
    public void onPublishReceived(final PUBLISH publish, final ClientData clientData) throws OnPublishReceivedException {
        final String clientID = clientData.getClientId();
        final String topic = publish.getTopic();
        final String message = new String(publish.getPayload(), Charsets.UTF_8);
        logger.debug("Client {} sent a message to topic {}: {}", clientID, topic, message);
        if (topic.equals("database/import") && message.contains("DROP TABLE")) {
            throw new OnPublishReceivedException(true);
        }
    }

    @Override
    public int priority() {
        return CallbackPriority.LOW;
    }
}

OnPublishSend

Type

Asynchronous

Purpose

The com.hivemq.spi.callback.events.OnPublishSend callback gets called when an outgoing MQTT PUBLISH is going to be sent to a subscribing client.

This callback gets called very often. Please make sure you use proper caching and that you don’t block.
Interfering with HiveMQ

It’s not possible to interfere with HiveMQ directly with this callback.

OnPublishSend Example
import com.google.common.base.Charsets;
import com.hivemq.spi.callback.events.OnPublishSend;
import com.hivemq.spi.message.PUBLISH;
import com.hivemq.spi.security.ClientData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PublishSend implements OnPublishSend {

    final Logger logger = LoggerFactory.getLogger(PublishSend.class);

    @Override
    public void onPublishSend(final PUBLISH publish, final ClientData clientData) {
        if (logger.isDebugEnabled()) {
            final String clientID = clientData.getClientId();
            final String topic = publish.getTopic();
            final String message = new String(publish.getPayload(), Charsets.UTF_8);

            logger.debug("Client {} received a message on topic {}: {}", clientID, topic, message);
        }
    }
}

OnSubscribeCallback

Type

Synchronous

Purpose

The com.hivemq.spi.callback.events.OnSubscribeCallback callback gets called when a MQTT SUBSCRIBE message arrives. Useful for validating SUBSCRIBE messages or logging subscriptions. This callback is not designed to be used for Authorization. Please see the Authorization chapter for more details.

Interfering with HiveMQ

It’s possible to throw a com.hivemq.spi.callback.exception.InvalidSubscriptionException when the SUBSCRIBE message is invalid. The client gets disconnected when the exception is thrown.

OnSubscribeCallback Example
import com.hivemq.spi.callback.CallbackPriority;
import com.hivemq.spi.callback.events.OnSubscribeCallback;
import com.hivemq.spi.callback.exception.InvalidSubscriptionException;
import com.hivemq.spi.message.SUBSCRIBE;
import com.hivemq.spi.security.ClientData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Subscribe implements OnSubscribeCallback {

    final Logger logger = LoggerFactory.getLogger(Subscribe.class);

    @Override
    public void onSubscribe(final SUBSCRIBE message, final ClientData clientData) throws InvalidSubscriptionException {
        logger.debug("Client {} is now subscribed to the topics: {}.", clientData.getClientId(), message.getTopics());
    }

    @Override
    public int priority() {
        return CallbackPriority.MEDIUM;
    }
}

OnUnsubscribeCallback

Type

Asynchronous

Purpose

The com.hivemq.spi.callback.events.OnUnsubscribeCallback callback gets called when a MQTT UNSUBSCRIBE message arrives.

Interfering with HiveMQ

It’s not possible to interfere with HiveMQ directly with this callback.

OnSubscribeCallback Example
import com.hivemq.spi.callback.events.OnUnsubscribeCallback;
import com.hivemq.spi.message.UNSUBSCRIBE;
import com.hivemq.spi.security.ClientData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class UnSubscribe implements OnUnsubscribeCallback {

    final Logger logger = LoggerFactory.getLogger(UnSubscribe.class);

    @Override
    public void onUnsubscribe(final UNSUBSCRIBE message, final ClientData clientData) {
        logger.debug("Client {} is no longer subscribed to the topics: {}.", clientData.getClientId(), message.getTopics());
    }
}

OnConnackSend

Type

Asynchronous

Purpose

The com.hivemq.spi.callback.events.OnConnackSend callback gets called when the broker is sending a MQTT CONNACK message. Useful to check of a client was actually connected successfully or if the connection failed for some reason.

Interfering with HiveMQ

It’s not possible to interfere with HiveMQ directly with this callback.

OnConnackSend Example
import com.hivemq.spi.callback.lowlevel.OnConnackSend;
import com.hivemq.spi.message.CONNACK;
import com.hivemq.spi.message.ReturnCode;
import com.hivemq.spi.security.ClientData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConnackSent implements OnConnackSend {

    final Logger logger = LoggerFactory.getLogger(ConnackSent.class);

    @Override
    public void onConnackSend(CONNACK connack, ClientData clientData) {
        if (connack.getReturnCode() == ReturnCode.ACCEPTED) {
            logger.debug("Client {} connected successfully.", clientData.getClientId());
        } else {
            logger.warn("Client {} failed to connect. Return code = {}", clientData.getClientId(), connack.getReturnCode().name());
        }
    }
}

OnPingCallback

Type

Asynchronous

Purpose

The com.hivemq.spi.callback.events.OnPingCallback callback gets called when a MQTT PINGREQ message arrives. The callback is useful to check if client is still active. Note that PINGREQ messages are only sent by clients if no other messages are sent during the keep alive period.

Interfering with HiveMQ

It’s not possible to interfere with HiveMQ directly with this callback.

OnPingCallback Example
import com.hivemq.spi.callback.lowlevel.OnPingCallback;
import com.hivemq.spi.security.ClientData;

import java.util.Date;
import java.util.HashMap;
import java.util.Map;

public class PingReceived implements OnPingCallback {

    private final Map<String, Date> clientActivity = new HashMap<>();

    @Override
    public void onPingReceived(final ClientData clientData) {
        clientActivity.put(clientData.getClientId(), new Date());
    }
}

OnPubackReceived

Type

Asynchronous

Purpose

The com.hivemq.spi.callback.events.OnPubackReceived callback gets called when a MQTT PUBACK message arrives. The Callback can be used to implement business logic, that should not be processed until a message was received by the client at least once.

Interfering with HiveMQ

It’s not possible to interfere with HiveMQ directly with this callback.

OnPubackReceived Example
import com.hivemq.spi.callback.lowlevel.OnPubackReceived;
import com.hivemq.spi.message.PUBACK;
import com.hivemq.spi.security.ClientData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PubackReceived implements OnPubackReceived {

    final Logger logger = LoggerFactory.getLogger(PubackReceived.class);

    @Override
    public void onPubackReceived(PUBACK puback, ClientData clientData) {
        logger.debug("Client {} received a PUBLISH message with id {}", puback.getMessageId());
    }
}

OnPubackSend

Type

Asynchronous

Purpose

The com.hivemq.spi.callback.events.OnPubackSend callback gets called when the broker is sending a MQTT PUBACK message.

Interfering with HiveMQ

It’s not possible to interfere with HiveMQ directly with this callback.

OnPubackSend Example
import com.hivemq.spi.callback.lowlevel.OnPubackSend;
import com.hivemq.spi.message.PUBACK;
import com.hivemq.spi.security.ClientData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PubackSend implements OnPubackSend {

    final Logger logger = LoggerFactory.getLogger(PubackSend.class);

    @Override
    public void onPubackSend(PUBACK puback, ClientData clientData) {
        logger.debug("A PUBACK message with id {} sent by client {} was received.", puback.getMessageId(), clientData.getClientId());
    }
}

OnPubcompReceived

Type

Asynchronous

Purpose

The com.hivemq.spi.callback.events.OnPubcompReceived callback gets called when a MQTT PUBCOMP message arrives. The callback is executed as soon as the transfer of a PUBLISH message with QoS 2 to a subscriber is completed.

Interfering with HiveMQ

It’s not possible to interfere with HiveMQ directly with this callback.

OnPubcompReceived Example
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
import com.hivemq.spi.callback.lowlevel.OnPubcompReceived;
import com.hivemq.spi.message.PUBCOMP;
import com.hivemq.spi.security.ClientData;

public class PubcompReceived implements OnPubcompReceived {

    final Multimap<String, Integer> incompleteMessageTransfers = HashMultimap.create();

    @Override
    public void onPubcompReceived(final PUBCOMP pubcomp, final ClientData clientData) {
        incompleteMessageTransfers.remove(clientData.getClientId(), pubcomp.getMessageId());
    }
}

OnPubcompSend

Type

Asynchronous

Purpose

The com.hivemq.spi.callback.events.OnPubcompSend callback gets called when the broker is sending a MQTT PUBCOMP message. The callback is executed as soon as the transfer of a PUBLISH message with QoS 2 to the broker is completed.

Interfering with HiveMQ

It’s not possible to interfere with HiveMQ directly with this callback.

OnPubcompSend Example
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
import com.hivemq.spi.callback.lowlevel.OnPubcompSend;
import com.hivemq.spi.message.PUBCOMP;
import com.hivemq.spi.security.ClientData;

public class PubcompSend implements OnPubcompSend {

    final Multimap<String, Integer> incompleteMessageTransfers = HashMultimap.create();

    @Override
    public void onPubcompSend(final PUBCOMP pubcomp, final ClientData clientData) {
        incompleteMessageTransfers.remove(clientData.getClientId(), pubcomp.getMessageId());
    }
}

OnPubrecReceived

Type

Asynchronous

Purpose

The com.hivemq.spi.callback.events.OnPubrecReceived callback gets called when a MQTT PUBREC message arrives.

Interfering with HiveMQ

It’s not possible to interfere with HiveMQ directly with this callback.

OnPubrecReceived Example
import com.hivemq.spi.callback.lowlevel.OnPubrecReceived;
import com.hivemq.spi.message.PUBREC;
import com.hivemq.spi.security.ClientData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PubrecReceived implements OnPubrecReceived {

    final Logger logger = LoggerFactory.getLogger(PubrecReceived.class);

    @Override
    public void onPubackReceived(final PUBREC pubrec, final ClientData clientData) {
        logger.debug("Client {} received a PUBLISH message with id {}", pubrec.getMessageId());
    }
}

OnPubrecSend

Type

Asynchronous

Purpose

The com.hivemq.spi.callback.events.OnPubrecSend callback gets called when the broker is sending a MQTT PUBREC message.

Interfering with HiveMQ

It’s not possible to interfere with HiveMQ directly with this callback.

OnPubrecSend Example
import com.hivemq.spi.callback.lowlevel.OnPubrecSend;
import com.hivemq.spi.message.PUBREC;
import com.hivemq.spi.security.ClientData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PubrecSend implements OnPubrecSend {

    final Logger logger = LoggerFactory.getLogger(PubrecSend.class);

    @Override
    public void onPubrecSend(final PUBREC puback, final ClientData clientData) {
        logger.debug("A PUBLISH message with id {} sent by publisher {} was received.", pubrec.getMessageId(), clientData.getClientId());
    }
}

OnPubrelReceived

Type

Asynchronous

Purpose

The com.hivemq.spi.callback.events.OnPubrelReceived callback gets called when a MQTT PUBREL message arrives.

Interfering with HiveMQ

It’s not possible to interfere with HiveMQ directly with this callback.

OnPubrelReceived Example
import com.hivemq.spi.callback.lowlevel.OnPubrelReceived;
import com.hivemq.spi.message.PUBREL;
import com.hivemq.spi.security.ClientData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PubrelReceived implements OnPubrelReceived {

    Logger logger = LoggerFactory.getLogger(PubrelReceived.class);

    @Override
    public void onPubrelReceived(final PUBREL pubrel, final ClientData clientData) {
        logger.debug("A PUBREL message with id {} was sent by publisher {}.", pubrel.getMessageId(), clientData.getClientId());
    }
}

OnPubrelSend

Type

Asynchronous

Purpose

The com.hivemq.spi.callback.events.OnPubrelSend callback gets called when the broker is sending a MQTT PUBREL message.

Interfering with HiveMQ

It’s not possible to interfere with HiveMQ directly with this callback.

OnPubrelSend Example
import com.hivemq.spi.callback.lowlevel.OnPubrelSend;
import com.hivemq.spi.message.PUBREL;
import com.hivemq.spi.security.ClientData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PubrelSend implements OnPubrelSend {

    Logger logger = LoggerFactory.getLogger(PubrelSend.class);

    @Override
    public void onPubrelSend(final PUBREL pubrel, final ClientData clientData) {
        logger.debug("A PUBREL message with id {} was sent to subscriber {}.", pubrel.getMessageId(), clientData.getClientId());
    }
}

OnSubackSend

Type

Asynchronous

Purpose

The com.hivemq.spi.callback.events.OnSubackSend callback gets called when the broker is sending a MQTT SUBACK message. Useful to check of a client was subscribed successfully. Note that a client which sent an invalid SUBSCRIBE message is usually disconnected immediately.

Interfering with HiveMQ

It’s not possible to interfere with HiveMQ directly with this callback.

OnSubackSend Example
import com.hivemq.spi.callback.lowlevel.OnSubackSend;
import com.hivemq.spi.message.SUBACK;
import com.hivemq.spi.security.ClientData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SubackSend implements OnSubackSend {

    final Logger logger = LoggerFactory.getLogger(SubackSend.class);

    @Override
    public void onSubackSend(final SUBACK suback, final ClientData clientData) {
        logger.debug("The subscription of client {} with id {} was successful.", clientData.getClientId(), suback.getMessageId());
    }
}

OnUnsubackSend

Type

Asynchronous

Purpose

The com.hivemq.spi.callback.events.OnUnsubackSend callback gets called when the broker is sending a MQTT UNSUBACK message. Useful to check if a client was unsubscribed successfully.

Interfering with HiveMQ

It’s not possible to interfere with HiveMQ directly with this callback.

OnUnsubackSend Example
import com.hivemq.spi.callback.lowlevel.OnUnsubackSend;
import com.hivemq.spi.message.UNSUBACK;
import com.hivemq.spi.security.ClientData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class UnsubackSend implements OnUnsubackSend {

    final Logger logger = LoggerFactory.getLogger(UnsubackSend.class);

    @Override
    public void onSubackSend(final UNSUBACK unsuback, final ClientData clientData) {
        logger.debug("The unsubscribe of client {} with id {} was successful.", clientData.getClientId(), unsuback.getMessageId());
    }
}

AfterLoginCallback

Type

Asynchronous

Purpose

The com.hivemq.spi.callback.security.AfterLoginCallback gets called when a client made a successful or unsuccessful login attempt. This happens when the client sent a CONNECT message and authentication took place. The callback offers the method afterSuccessfulLogin for successful logins and a method afterFailedLogin for unsuccessful login attempts.

Interfering with HiveMQ

It’s not possible to interfere with HiveMQ directly with this callback.

AfterLoginCallback Example
import com.hivemq.spi.callback.exception.AuthenticationException;
import com.hivemq.spi.callback.security.AfterLoginCallback;
import com.hivemq.spi.security.ClientData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AfterLogin implements AfterLoginCallback {

    final Logger logger = LoggerFactory.getLogger(AfterLogin.class);

    @Override
    public void afterSuccessfulLogin(final ClientData clientData) {
        logger.debug("Client {} logged in successfully.", clientData.getUsername().get());
    }

    @Override
    public void afterFailedLogin(final AuthenticationException exception, final ClientData clientData) {
        logger.warn("Client {} failed to log in. Return code: {}",
                clientData.getUsername().or(clientData.getClientId()), exception.getReturnCode().name());
    }
}

OnAuthenticationCallback

Type

Synchronous

Purpose

The com.hivemq.spi.callback.security.OnAuthenticationCallback gets called after a CONNECT message arrives and handles the authentication of the client. Username/Password from the MQTT CONNECT message can be used to authenticate the client or when using client certificate authentication for the transport layer authentication, the client certificate can also be used to authenticate on the application layer.

If your scenario allows this, you should use Caching.
Interfering with HiveMQ

The checkCredentials method must return either true or false, depending if the authentication was successful. When the authentication wasn’t successful and you want to control the CONNACK MQTT message return code, you can throw a AuthenticationException. This has the side effect that all other plugins which implement authentication are ignored once the exception was thrown.

OnAuthenticationCallback Example

See username/password authentication.


OnAuthorizationCallback

Type

Synchronous

Purpose

The com.hivemq.spi.callback.security.OnAuthorizationCallback is responsible for returning `com.hivemq.spi.topic.MqttTopicPermission`s. Every time a specific action on a topic like publishing or subscribing is done by a client, the callback is executed to check if the client is allowed to do this.

This callback gets called very often, so make sure you use proper Caching.
Interfering with HiveMQ

HiveMQ uses the returned MqttTopicPermissions to check if a client is allowed to do a specific action.

OnAuthorizationCallback Example

See authorization example.


OnInsufficientPermissionsCallback

Type

Asynchronous

Purpose

The com.hivemq.spi.callback.security.OnInsufficientPermissionDisconnect gets called when a client was disconnected due to insufficient permissions when publishing or subscribing. At the time the callback gets executed the client was already disconnected, so this is an informational callback.

Interfering with HiveMQ

It’s not possible to interfere with HiveMQ directly with this callback.


RestrictionsAfterLoginCallback

Type

Synchronous

Purpose

The com.hivemq.spi.callback.security.RestrictionsAfterLoginCallback gets executed after a CONNECT message arrives and the client was authenticated successfully. This callback provides restrictions which affect only a specific client, e.g. throttling or maximum allowed MQTT message sizes.

Interfering with HiveMQ

The callback returns a set of restrictions which are applied to the specific client.

ClusterDiscoveryCallback Example
import com.hivemq.spi.callback.security.RestrictionsAfterLoginCallback;
import com.hivemq.spi.security.ClientData;
import com.hivemq.spi.security.Restriction;
import com.hivemq.spi.security.RestrictionType;

import java.util.HashSet;
import java.util.Set;

public class RestrictionsAfterLogin implements RestrictionsAfterLoginCallback {

    @Override
    public Set<Restriction> getRestrictions(final ClientData clientData) {

        final Set<Restriction> restrictions = new HashSet<>();

        restrictions.add(new Restriction(RestrictionType.MAX_PUBLISH_MESSAGE_SIZE, 128l)); (1)
        restrictions.add(new Restriction(RestrictionType.MAX_OUTGOING_BYTES_SEC, 128l)); (2)
        restrictions.add(new Restriction(RestrictionType.MAX_INCOMING_BYTES, 128l)); (3)

        return restrictions;
    }
}
1 Limit the maximum size of a message the client can send.
2 Add throttling for sending messages out to the client.
3 Add throttling for receiving messages from the client.

ClusterDiscoveryCallback

Type

Asynchronous

Purpose

The com.hivemq.spi.callback.cluster.ClusterDiscoveryCallback is used to discover other nodes to form a cluster with. Nodes are discovered by calling getNodeAddresses, which is called periodically by HiveMQ. The interval at which getNodeAddresses is called can be configured in HiveMQ’s configuration file. The callback also provides you with the means to register/unregister this HiveMQ nodes with the methods init and destroy.

For example the ClusterDiscoveryCallback can be used to implement discovery via database, via shared files or via an external registry like etcd or Consul.

Like any other callback, you have to manually add a concrete ClusterDiscoveryCallback to the Callback Registry at plugin startup time or at runtime.

Interfering with HiveMQ

The callback returns a list of ClusterNodeAddress which are used by HiveMQ form a cluster. A ClusterNodeAddress essentially consists of a hostname/IP and port. Every available node for this cluster must be represented with one ClusterNodeAddress in the returned list.

If there is more than one ClusterDiscoveryCallback at a time all results of all plugins are used to form a cluster.

ClusterDiscoveryCallback Example
import com.google.common.util.concurrent.ListenableFuture;
import com.google.inject.Inject;
import com.hivemq.spi.callback.cluster.ClusterDiscoveryCallback;
import com.hivemq.spi.callback.cluster.ClusterNodeAddress;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;

public class ClusterDiscovery implements ClusterDiscoveryCallback {

    private static final Logger log = LoggerFactory.getLogger(ClusterDiscovery.class);

    private final ExternalRegistry externalRegistry; (1)

    private String clusterId;

    @Inject
    public ClusterDiscovery(final ExternalRegistry externalRegistry) {
        this.externalRegistry = externalRegistry;
    }

    @Override
    public void init(final String clusterId, final ClusterNodeAddress ownAddress) { (2)
        this.clusterId = clusterId;

        log.info("Registering HiveMQ with clusterId {} with central registry", clusterId);
        externalRegistry.put(clusterId, ownAddress.getHost(), ownAddress.getPort());
    }

    @Override
    public ListenableFuture<List<ClusterNodeAddress>> getNodeAddresses() { (3)
        final ListenableFuture<List<ClusterNodeAddress>> allClusterNodesFuture = externalRegistry.getAllNodes();
        return allClusterNodesFuture;
    }

    @Override
    public void destroy() { (4)
        log.info("Removing HiveMQ with clusterId {} from central registry", clusterId);
        externalRegistry.remove(clusterId);
    }
}
1 some kind of central registry which holds the information for all your cluster nodes
2 Gets called one time when HiveMQ starts its cluster
3 Gets called periodically to discover new cluster nodes at runtime
4 Gets called one time when HiveMQ shuts down

ScheduledCallback

Type

Asynchronous

Purpose

The com.hivemq.spi.callback.schedule.ScheduledCallback gets executed periodically. See the Scheduled Callback Execution chapter for more information.

Interfering with HiveMQ

It’s not possible to interfere with HiveMQ directly with this callback. When the callback is added to the Callback Registry, a validation of the quartz-style cron expression will take place. If the expression is invalid, the callback won’t be executed.

ScheduledCallback Example

See scheduled callback example.

Scheduled Callback Execution

While most of the callbacks for HiveMQ plugins are used for reacting to certain events, it is sometimes desirable to run some tasks based on a periodical schedule.

With ScheduledCallbacks it’s easy to implement plugin behaviour which occurs periodically, based on quartz-style cron expressions. This gives your plugins the possibility to run on regular intervals like every minute or every day at midnight. Even non-trivial intervals like Every last friday of every month during the years from 2002 to 2017 at 15:15 and 20:30 can be represented. See the Quartz-Style Cron Expressions chapter for more information.

The scheduled callback executions are especially useful for tasks like:

  • Maintenance tasks

  • Watchdog services

  • Reconfiguration services

  • Integration with other systems (especially in conjunction with the Publish Service)

  • Backup services

Usage

A ScheduledCallback is a callback which gets executed asynchronously. Like any other callback, you have to manually add a concrete ScheduledCallback to the Callback Registry at plugin startup time or at runtime.

A ScheduledCallback consists of two methods:

  • cronExpression(): This method returns the quartz-styled cron expression to schedule the callback.

  • execute(): This method executes the actual code to run on every interval.

Example of adding a scheduled callback
import com.hivemq.spi.PluginEntryPoint;
import com.hivemq.spi.callback.registry.CallbackRegistry;
import com.hivemq.spi.callback.schedule.ScheduledCallback;

import javax.annotation.PostConstruct;

class CallbackSchedulerMainClass extends PluginEntryPoint {


    @PostConstruct
    public void postConstruct() {

        final CallbackRegistry callbackRegistry = getCallbackRegistry();

        callbackRegistry.addCallback(new ScheduledCallback() {
            @Override
            public void execute() {
                System.out.println("Executing");
            }

            @Override
            public String cronExpression() {
                // Every second
                return "* * * * * ?";
            }
        });
    }
}
The cronExpression() method gets only called once when adding the callback to the Callback Registry. If you need to change the expression at runtime or if it is created dynamically, you have to reload it at runtime.

The cron expression will be evaluated when adding it to the Callback Registry and all callbacks with invalid expressions will be rejected.

Quartz-Style Cron Expressions

To allow maximum flexibility for scheduling callbacks, HiveMQ recognizes quartz-style cron expressions. This allows scheduling based on time intervals and dates.

Table 4. Examples of quartz-style cron expressions
Expression Meaning

0/5 * * * * ?

every 5 seconds

0 0 0/2 * * ?

every 2 hours

0 0 0 * * ?

on midnight, every day

0 0/5 15 * * ?

every 5 minutes starting at 3:00 PM and ending at 3:55 PM, every day

0 15 11 ? * MON-FRI

every Monday, Tuesday, Wednesday, Thursday and Friday at 11:15 AM

0 15 10 ? * 6L

on the last Friday of every month at 10:15 AM

Learn more about Quartz Cron Expressions here.

Cron expression changes at runtime

The the return value of the cronExpression() method of a ScheduledCallback is only evaluated once after it was added to the Callback Registry. However, sometimes static cron expressions are not enough for your scheduled callbacks. Sometimes it’s desirable to (re-)load cron expressions from databases, configuration files, web services or other sources on demand.

A reload of the cron expression of a ScheduledCallback can be triggered manually with the reloadScheduledCallbackExpression(ScheduledCallback scheduledCallback) method of the CallbackRegistry.

Example of reloading a ScheduledCallback
import com.hivemq.spi.PluginEntryPoint;
import com.hivemq.spi.callback.registry.CallbackRegistry;
import com.hivemq.spi.callback.schedule.ScheduledCallback;

import javax.annotation.PostConstruct;

class CallbackSchedulerMainClass extends PluginEntryPoint {


    @PostConstruct
    public void postConstruct() {

        final CallbackRegistry callbackRegistry = getCallbackRegistry();

        callbackRegistry.addCallback(new ScheduledCallback() {
            @Override
            public void execute() {

                System.out.println("Business Logic Stuff");

                callbackRegistry.reloadScheduledCallbackExpression(this); (1)
            }

            @Override
            public String cronExpression() {
                String expression = dynamicExpressionFromDatabase(); (2)

                return expression;
            }
        });
    }
}
1 Trigger a manual reload of the expression, which calls the cronExpression() method after executing this callback
2 Loads a dynamic expression

Utils

For convenience, HiveMQ offers some utilities for working with quartz-style cron expressions. The following utility classes are worth a look if you are working with cron expressions:

  • com.hivemq.spi.callback.schedule.ScheduleExpressions: A utility class which offers constants for common cron expressions and some utility methods for calculating expressions.

Authentication and Authorization

One of the many use cases for writing a HiveMQ plugin is the implementation of client authentication and authorization. The callbacks enables the plugin developer among other things to completely customize the authentication and authorization behavior.

Client Authentication

The following sequence diagram shows an overview of what happens in the HiveMQ core, when a new client is trying to connect and how a plugin can interfere with it.

Client Authentication Callbacks
Figure 5. Execution flow of callbacks during and after the client authentication
Default Behavior
When no plugin is present, all clients are authenticated successfully.

Implement username/password authentication

Example of an username/password authentication callback
public class UserAuthentication implements OnAuthenticationCallback {

    Logger log = LoggerFactory.getLogger(UserAuthentication.class);

    @Override
    public Boolean checkCredentials(ClientCredentialsData clientData) throws AuthenticationException { (1)


        String username;
        if (!clientData.getUsername().isPresent()) { (4)
            throw new AuthenticationException("No Username provided", ReturnCode.REFUSED_NOT_AUTHORIZED); (2)
        }
        username = clientData.getUsername().get();


        if (Strings.isNullOrEmpty(username)) {
            throw new AuthenticationException("No Username provided", ReturnCode.REFUSED_NOT_AUTHORIZED); (2)
        }

        Optional<String> password = Optional.fromNullable(retrievePasswordFromDatabase(username));

        if (!password.isPresent()) {
            throw new AuthenticationException("No Account with the credentials was found!", ReturnCode.REFUSED_NOT_AUTHORIZED); (2)
        } else {
            if (clientData.getPassword().get().equals(password.get())) {
                return true;
            }
            return false;
        }

    }

    @Cached(timeToLive = 10, timeUnit = TimeUnit.MINUTES) (3)
    private String retrievePasswordFromDatabase(String username) {

        String password =....     //Call to any database to ask for the password of the user

        return password;
    }

    @Override
    public int priority() {
        return CallbackPriority.MEDIUM;
    }
}
1 ClientData holds all data provided by the client and can be used to identify it.
2 If a AuthenticationException is thrown, the client will be disconnected independently of possible other authentication plugins, see the guidelines for mulitple plugins.
3 The @Cached annotation is used to cache the request for a particular username for 10 minutes, therefore the plugin only blocks when it fetches the value from the database.
4 ClientData uses the Optional class from Google Guava, which allows better handling of possible null values.

Client Authorization

The authorization of is verified every time a client tries to publish to a certain topic or subscribes to topics. The following diagram shows all possible flows of a publish message (the same flow applies for a subscribe message):

Client Authorization Callback

The most interesting callback here is the OnAuthorizationCallback, which returns a list of `MqttTopicPermission`s for the client.

The list of `MqttTopicPermission`s should contains all permissions the client has. The matching if a certain action will be allowed due to the permissions of the client will be done by HiveMQ.

The following snippet shows the available constructors for the MqttTopicPermission class.

Available constructors for MqttTopicPermission
MqttTopicPermission(final String topic, final TYPE type) (1)

MqttTopicPermission(final String topic, final TYPE type, final ACTIVITY activity) (2)

MqttTopicPermission(final String topic, final TYPE type, final QOS qos) (3)

MqttTopicPermission(final String topic, final TYPE type, final QOS qos, final ACTIVITY activity) (4)

MqttTopicPermission(final String topic, final TYPE type, final QOS qos, final ACTIVITY activity, final RETAIN publishRetain) (5)
1 Allow or deny the topic a client can publish/subscribe to, with all Quality of Service (QoS) levels.
2 Allow or deny the topic and the client’s ability to publish, subscribe or do both, for all QoS.
3 Allow or deny the topic and the client’s ability to use only some of the QoS or all of them, allow or deny publish and subscribe.
4 Allow or deny the topic, the client’s ability to use QoS and to subscribe/publish.
5 Allow or deny the topic, the client’s ability to use QoS and to subscribe/publish, and the ability to send retained messages.
Example implementation of a permission, which allows a client to only publish/subscribe to topics with his client id upfront.
public class ClientIdTopic implements OnAuthorizationCallback {

    @Override
    @Cached(timeToLive = 10, timeUnit = TimeUnit.MINUTES) (3)
    public List<MqttTopicPermission> getPermissionsForClient(ClientData clientData) {

        List<MqttTopicPermission> mqttTopicPermissions = new ArrayList<>();
        mqttTopicPermissions.add(new MqttTopicPermission(clientData.getClientId() + "/#", MqttTopicPermission.TYPE.ALLOW)); (1)

        return mqttTopicPermissions;

    }

    @Override
    public AuthorizationBehaviour getDefaultBehaviour() {
        return AuthorizationBehaviour.DENY; (2)
    }

    @Override
    public int priority() {
        return CallbackPriority.MEDIUM;
    }
}
1 The permission allows a client only to publish/subscribe to topics, which begin with his client id.
2 The client is not allowed to publish/subscribe to any other topic.
3 The method is another ideal candidate for the @Cached annotation.

Black and Whitelists

HiveMQ 2 uses a whitelist-only approach.

HiveMQ 3 allows to implement fine-grained permissions based on blacklists and whitelists.

Guidelines for multiple plugins

In the case more than one authentication or authorization plugins is running simultaneously, additional aspects have to be taken into consideration.

AuthenticationException or false

There are two ways in a OnAuthenticationCallback to tell HiveMQ that the client is not allowed to connect:

  • return false

  • thrown an AuthenticationException

If there is only a single plugin implementing the OnAuthenticationCallback the only reason to throw an AuthenticationException is to customize the return code. In a multiple plugin scenario there is another important difference: When an AuthenticationException is being catched by HiveMQ, the result of the other plugins have no meaning, because the client gets disconnected instantly. By contrast, when returning false, HiveMQ synchronizes all callbacks and then computes the over all result. More on that in the next chapter.

How to decide if AuthenticationException or return false should be used?
This is up to the developer, because it is highly depending on the authentication logic in a particular use case. See the following example for an advice.

An example use case to show the difference would be, that the client user can be either in a MySQL or PostgreSQL database. So there are two plugins, one for checking the existance of the user in each database. If one of the plugins encouter that the client has not provided username and password, an AuthenticationException should be thrown, because most likely without this information the other plugin is also not able to authenticate.

Difference between returning false and throwing an AuthenticationException
...
    @Override
    @Cached(timeToLive = 1, timeUnit = TimeUnit.MINUTES)
    public Boolean checkCredentials(ClientCredentialsData clientData) throws AuthenticationException {


        String clientUsername;
        String clientPassword;
        if (!clientData.getUsername().isPresent() && !clientData.getPassword().isPresent() ) {
            clientUsername = clientData.getUsername().get();
            clientPassword = clientData.getPassword().get();
        }
        else
        {
            throw new AuthenticationException(ReturnCode.REFUSED_BAD_USERNAME_OR_PASSWORD);(1)
        }

        String savedPassword = retrievePassword(clientUsername);

        if(clientPassword.equals(savedPassword))
        {
            return true;
        }
        else
        {
            return false; (2)
        }
    }
...
1 Username and password are not present and an AuthenticationException is thrown, because in the above stated use case no authentication can be granted.
2 If the user is not present or the password does not match, returning false would be the correct thing to do, preserving the chance for the other plugin to successfully authenticate the client.

Successful authentication/authorization

If none of the plugins had thrown an AuthenticationException, the return value of each plugin will be taken into consideration for determine the overall result.

Criterion to authentication or authorize a client
If one of the plugins returned true or the matching permission, HiveMQ is authenticating the client or accepting the authorization request.

Services

HiveMQ Services are a powerful concept introduced with HiveMQ 1.5. They provide a convenient way to interact with the HiveMQ core from plugins.

Blocking vs Async Services

For some services there is a blocking and async version of the service available. When a method of a blocking service is called, it will return as soon as the results are available. The methods of the async services return a ListenableFuture object which allows the developer to add a callback method that is called as soon as the results are available. It is highly recommended to use the async services to avoid performance losses.

Table 5. Available HiveMQ Services
Name Available since Description

Blocking Retained Message Store

3.1

Gives access to retained messages and lets plugins read, create, delete and modify retained messages

Async Retained Message Store

3.1

Gives access to retained messages and lets plugins read, create, delete and modify retained messages

Blocking Subscription Store

3.1

Gives access to subscriptions and lets plugins read, create, delete and modify subscriptions

Async Subscription Store

3.1

Gives access to subscriptions and lets plugins read, create, delete and modify subscriptions

Blocking Client Service

3.1

Gives access to information about connected clients and clients with a persistent session

Async Client Service

3.1

Gives access to information about connected clients and clients with a persistent session

Publish Service

1.5

Allows to publish new MQTT messages to subscribed clients

$SYS Topic Service

2.0

Allows to query, add, remove and modify $SYS Topics at runtime.

Log Service

3.0

Allows to change the HiveMQ Log level at runtime.

Blocking Metric Service

3.1

Allows to inspect and add custom HiveMQ metrics at runtime.

Async Metric Service

3.1

Allows to inspect and add custom HiveMQ metrics at runtime.

Configuration Service

3.0

Allows to change HiveMQ configuration at runtime.

REST Service

3.0

Allows to add custom HiveMQ resources at runtime.

Shared Subscription Service

3.0

Allows to add and remove shared subscriptions at runtime.

Some of the old services are deprecated since 3.1 and will be removed for HiveMQ 4.0. Avoid using these services in future development. The documentation of these services is still available here.

There are more services planned in the future. Please contact us if you have specific needs for new services.

Inject Services!
At the moment it’s only possible to use the HiveMQ Services in your plugin if you inject them via Dependency Injection.

ListenableFuture

Some of the plugins services are using com.google.common.util.concurrent.ListenableFuture object as a return value. The ListenableFuture. objects enable the developer to add callbacks that are executed as soon as the method is finished.

Example Usage
Add callback to ListenableFuture
ListenableFuture<Boolean> future = someMethod();
Futures.addCallback(future, new FutureCallback<Boolean>() {
    @Override
    public void onSuccess(Boolean result) {
        log.info("'someMethod' succeeded. Result is {}", result);
    }

    @Override
    public void onFailure(Throwable throwable) {
        throwable.printStackTrace();
    }
});

For more information about Listenable Future see the documentation.

Retained Message Store

The Retained Message Store enables HiveMQ plugins to directly add and remove retained messages from HiveMQs retained message store. It’s also possible to retrieve a certain retained message or the amount of messages that are currently stored by HiveMQ.

Blocking API

Blocking Retained Message Store API
/**
 * @return all retained messages which are currently stored on this HiveMQ instance.
 */
Set<RetainedMessage> getLocalRetainedMessages();

/**
 * @return the number of all retained messages on this HiveMQ instance.
 */
Long localSize();

/**
 * Checks if a retained message is present in the retained message store, on this HiveMQ instance.
 *
 * @param topic the topic associated with the retained message
 * @return true if there's a message for the given topic
 */
boolean containsLocally(String topic);

/**
 * @return all retained messages which are currently stored
 */
Set<RetainedMessage> getRetainedMessages();

/**
 * @param topic a topic
 * @return retained message for the specific topic or <code>null</code>.
 * instance with an empty reference
 */
@Nullable
RetainedMessage getRetainedMessage(String topic);

/**
 * Removes the retained message from given topic.
 * If there isn't any retained message on the topic yet, nothing will happen.
 *
 * @param topic from which the message should be removed
 */
void remove(String topic);

/**
 * Removes all retained messages from the message store.
 */
void clear();

/**
 * This method adds or replaces a retained message
 *
 * @param retainedMessage which should be added or replaced
 */
void addOrReplace(RetainedMessage retainedMessage);

/**
 * Checks if a retained message is present in the retained message store.
 *
 * @param topic the topic associated with the retained message
 * @return true if there's a message for the given topic
 */
boolean contains(String topic);

/**
 * @return the number of all retained messages
 */
long size();

Example Usage

This section discusses common examples on how to use the Blocking Retained Message Store in your own plugins.

Inject the Message Store
import com.hivemq.spi.services.BlockingRetainedMessageStore;

import javax.inject.Inject;

public class MyPlugin {

    private final BlockingRetainedMessageStore retainedMessageStore;

    @Inject
    public MyPlugin(final BlockingRetainedMessageStore retainedMessageStore) { (1)
        this.retainedMessageStore = retainedMessageStore;
    }
}
1 The message store implementation is injected by the HiveMQ core.


Programmatically add a new Retained Message
    public void addRetainedMessage(String topic, String message){

        if(!retainedMessageStore.contains(topic)) (1)
            retainedMessageStore.addOrReplace(new RetainedMessage
                                 (topic, message.getBytes(), QoS.valueOf(1)));(2)
    }
1 Check if there is currently a retained message for a certain topic.
2 Add a retained message to the topic if there is none.


Clear the whole Retained Message Store
    public void cleanUp(){

        if(retainedMessageStore.size() >= TOO_MANY) (1)
            retainedMessageStore.clear(); (2)
    }
1 Get the amount of retained messages that are currently stored and check if there are too many retained messages.
2 Delete all retained messages.


Read and remove specific retained messages.
    public void logAndDeleteRetainedMessage(String topic){
        RetainedMessage rm = retainedMessageStore.getRetainedMessage(topic); (1)

        if(rm != null){
            logger.info(new String(rm.get().getMessage()));
            retainedMessageStore.remove(rm.get()); (2)
        }
    }
1 Get the retained message for a certain topic if there is any.
2 Remove the message.

Async API

Async Retained Message Store API
/**
 * @return a {@link com.google.common.util.concurrent.ListenableFuture} which contains all retained messages which are currently stored on this HiveMQ instance.
 */
ListenableFuture<Set<RetainedMessage>> getLocalRetainedMessages();

/**
 * @return a {@link com.google.common.util.concurrent.ListenableFuture} which contains the number of all retained messages on this HiveMQ instance.
 */
ListenableFuture<Long> localSize();

/**
 * Checks if a retained message is present in the retained message store, on this HiveMQ instance.
 *
 * @param topic the topic associated with the retained message
 * @return a {@link com.google.common.util.concurrent.ListenableFuture} which contains true if there's a message for the given topic
 */
ListenableFuture<Boolean> containsLocally(String topic);

/**
 * @return a {@link com.google.common.util.concurrent.ListenableFuture} which contains all retained messages which are currently stored
 */
ListenableFuture<Set<RetainedMessage>> getRetainedMessages();

/**
 * @param topic a topic
 * @return a {@link com.google.common.util.concurrent.ListenableFuture} which contains the retained message for the specific topic or <code>null</code>.
 */
ListenableFuture<RetainedMessage> getRetainedMessage(String topic);

/**
 * Removes the retained message from given topic.
 * If there isn't any retained message on the topic yet, nothing will happen.
 *
 * @param topic from which the message should be removed
 * @return a {@link com.google.common.util.concurrent.ListenableFuture} which returns after removal
 */
ListenableFuture<Void> remove(String topic);

/**
 * Removes all retained messages from the message store.
 *@return a {@link com.google.common.util.concurrent.ListenableFuture} which returns after removal
 */
ListenableFuture<Void> clear();

/**
 * This method adds or replaces a retained message
 *
 * @param retainedMessage which should be added or replaced
 * @return a {@link com.google.common.util.concurrent.ListenableFuture} which returns after adding or replacing
 */
ListenableFuture<Void> addOrReplace(RetainedMessage retainedMessage);

/**
 * Checks if a retained message is present in the retained message store.
 *
 * @param topic the topic associated with the retained message
 * @return a {@link com.google.common.util.concurrent.ListenableFuture} which contains true if there's a message for the given topic
 */
ListenableFuture<Boolean> contains(String topic);

/**
 * @return a {@link com.google.common.util.concurrent.ListenableFuture} which contains the number of all retained messages
 */
ListenableFuture<Long> size();

Example Usage

This section discusses common examples on how to use the Async Retained Message Store in your own plugins.

Programmatically add a new Retained Message
public void addRetainedMessage(final String topic, final String message) {

    final ListenableFuture<Void> future = retainedMessageStore.addOrReplace(new RetainedMessage (1)
            (topic, message.getBytes(), QoS.valueOf(1)));
    Futures.addCallback(future, new FutureCallback<Void>() {
        @Override
        public void onSuccess(Void aVoid) {
            log.debug("Set new retained message for topic {}", topic); (2)
        }

        @Override
        public void onFailure(Throwable throwable) {
            log.error("Failed to set retained message for topic {}.", topic, throwable);
        }
    });
}
1 Set a retained message for a given topic.
2 Log that the retained message was set, as soon as the future succeeds.


Read and remove specific retained messages.
public void logAndDeleteRetainedMessage(final String topic) {
    final ListenableFuture<RetainedMessage> future = retainedMessageStore.getRetainedMessage(topic); (1)
    Futures.addCallback(future, new FutureCallback<RetainedMessage>() {
        @Override
        public void onSuccess(final RetainedMessage retainedMessage) {
            if (retainedMessage != null) {
                log.info("Removing retained message on topic {}.", topic);
                retainedMessageStore.remove(topic); (2)
            }
        }

        @Override
        public void onFailure(Throwable throwable) {

        }
    });
}
1 Get the retained message for a certain topic if there is any.
2 Remove the message.


Subscription Store Service

The Subscription Store is designed to programmatically interact with MQTT client subscriptions. The Subscription Store service provides methods to get, add or remove subscriptions and allows to find out which topics a certain subscriber has subscribed to. It’s also possible with the Subscription Store to find out which clients subscribed to a certain topic.

The Subscription Store allows to query for subscription information on all cluster nodes or just the local HiveMQ node.

There are separate "local" methods available in case the broker instance might be connected to a cluster and you want to gather information about this instance only. Local and cluster methods will return the same result, if the broker instance is not part of a cluster.

Blocking API

/**
 * This method returns all subscriptions on this HiveMQ Node as a {@link com.google.common.collect.Multimap} of client identifiers and topics.
 * You won't receive subscriptions of connected
 * clients from other HiveMQ nodes if HiveMQ runs in a cluster.
 * <p/>
 * Please be aware that calling this method on HiveMQ instances with many subscriptions could have
 * negative performance effects.
 * <p/>
 * The returned Multimap is read-only and must not be modified.
 *
 * @return a {@link com.google.common.collect.Multimap} of client identifiers and their topic subscriptions
 */
@ReadOnly
Multimap<String, Topic> getLocalSubscriptions();

/**
 * Returns all MQTT client subscriber identifiers for a given topic, for this HiveMQ instance.
 * MQTT Wildcards are allowed.
 * <p/>
 * Don't pass <code>null</code> as topic. This method is lenient, so
 * it will just return an empty Set.
 * <p/>
 * The returned Set is read-only and must not be modified.
 *
 * @param topic the topic
 * @return client identifiers of all subscribers that subscribed to the topic
 */
@ReadOnly
Set<String> getLocalSubscribers(@NotNull String topic);

/**
 * Returns all topics a client is subscribed to, on this HiveMQ instance.
 * <p/>
 * If the client does not exist, an empty Set is returned.
 * <p/>
 * Don't pass <code>null</code> as clientId. This method is lenient, so
 * it will just return an empty Set.
 * <p/>
 * The returned Set is read-only and must not be modified.
 *
 * @param clientID of the client
 * @return all topics the client subscribed to
 */
@ReadOnly
Set<Topic> getLocalTopics(@NotNull String clientID);

/**
 * This method adds a subscription for a certain client to a certain topic.
 * If HiveMQ is connected to a cluster, the subscription will be broadcast to all other Cluster Nodes.
 * <p/>
 * This method is lenient, so if the clientId or the topic
 * is <code>null</code>, nothing will happen.
 *
 * @param clientID client, which should be subscribed
 * @param topic    topic to which the client should be subscribed
 */
void addSubscription(@NotNull String clientID, @NotNull Topic topic);

/**
 * This method removes a subscription for a certain client and a certain topic.
 * If HiveMQ is connected to a cluster, the subscription will be removed by other Cluster Nodes as well.
 *
 * @param clientID client, which should get unsubscribed
 * @param topic    topic from which the client should get unsubscribed
 */
void removeSubscription(@NotNull String clientID, @NotNull String topic);

/**
 * This method returns all subscriptions this HiveMQ instance and all other nodes in a HiveMQ cluster,
 * as a {@link com.google.common.collect.Multimap} of client identifiers and topics.
 * <p/>
 * Please be aware that calling this method on HiveMQ instances with many subscriptions could have
 * negative performance effects.
 * <p/>
 * The returned Multimap is read-only and must not be modified.
 *
 * @return a {@link com.google.common.collect.Multimap} of client identifiers and their topic subscriptions
 */
@ReadOnly
Multimap<String, Topic> getSubscriptions();

/**
 * Returns all MQTT client subscriber identifiers for a given topic, this HiveMQ instance and all other nodes in a HiveMQ cluster.
 * MQTT Wildcards are allowed.
 * <p/>
 * Don't pass <code>null</code> as topic. This method is lenient, so
 * it will just return an empty Set.
 * <p/>
 * The returned Set is read-only and must not be modified.
 *
 * @param topic the topic
 * @return client identifiers of all subscribers that subscribed to the topic
 */
@ReadOnly
Set<String> getSubscribers(@NotNull String topic);

/**
 * Returns all topics a client is subscribed to, on this HiveMQ instance and all other nodes in a HiveMQ cluster.
 * <p/>
 * If the client does not exist, an empty Set is returned.
 * <p/>
 * Don't pass <code>null</code> as clientId. This method is lenient, so
 * it will just return an empty Set.
 * <p/>
 * The returned Set is read-only and must not be modified.
 *
 * @param clientID of the client
 * @return all topics the client subscribed to
 */
@ReadOnly
Set<Topic> getTopics(@NotNull String clientID);

Example Usage

This section discusses common examples on how to use the Subscription Store in your own plugins.

Inject the Subscription Store
import com.hivemq.spi.services.BlockingSubscriptionStore;

import javax.inject.Inject;

public class MyPlugin {

    private final BlockingSubscriptionStore subscriptionStore;

    @Inject
    public MyPlugin(final BlockingSubscriptionStore subscriptionStore) { (1)
        this.subscriptionStore = subscriptionStore;
    }
}
1 The subscription store implementation is injected by the HiveMQ core.


Setup a default Subscription
public class ConnackSend implements OnConnackSend {

    private final static String DEFAULT_TOPIC = "default/topic";
    private final SubscriptionStore subscriptionStore;

    @Inject
    public ConnackSend(SubscriptionStore subscriptionStore, ClientService clientService) {
        this.subscriptionStore = subscriptionStore;
    }

    @Override
    public void onConnackSend(CONNACK connack, ClientData clientData) { (1)
        subscriptionStore.addSubscription(clientData.getClientId(), new Topic(DEFAULT_TOPIC, QoS.valueOf(1))); (2)
    }

}
1 Add a subscription as soon as a client connected successfully.
2 As soon as a subscription is added, the MQTT client will receive all messages published to the topic.


List Subscribers for a certain topic
private void logSubscribers(String topic) {

    Set<string> subscribers = subscriptionStore.getLocalSubscribers(topic); (1)
    for (String clientId : subscribers) {
        logger.info(clientId);
    }
}
1 Get all subscribers for a certain topic, on this broker instance only.


Async API

/**
 * This method returns all subscriptions on this HiveMQ Node as a {@link com.google.common.collect.Multimap} of client identifiers and topics.
 * You won't receive subscriptions of connected
 * clients from other HiveMQ nodes if HiveMQ runs in a cluster.
 * <p/>
 * Please be aware that calling this method on HiveMQ instances with many subscriptions could have
 * negative performance effects.
 * <p/>
 * The returned Multimap is read-only and must not be modified.
 *
 * @return a {@link com.google.common.util.concurrent.ListenableFuture} which contains a {@link com.google.common.collect.Multimap} of client identifiers and their topic subscriptions
 */
@ReadOnly
ListenableFuture<Multimap<String, Topic>> getLocalSubscriptions();

/**
 * Returns all MQTT client subscriber identifiers for a given topic, for this HiveMQ instance.
 * MQTT Wildcards are allowed.
 * <p/>
 * Don't pass <code>null</code> as topic. This method is lenient, so
 * it will just return an empty Set.
 * <p/>
 * The returned Set is read-only and must not be modified.
 *
 * @param topic the topic
 * @return a {@link com.google.common.util.concurrent.ListenableFuture} which contains the client identifiers of all subscribers that subscribed to the topic
 */
@ReadOnly
ListenableFuture<Set<String>> getLocalSubscribers(@NotNull String topic);

/**
 * Returns all topics a client is subscribed to, on this HiveMQ instance.
 * <p/>
 * If the client does not exist, an empty Set is returned.
 * <p/>
 * Don't pass <code>null</code> as clientId. This method is lenient, so
 * it will just return an empty Set.
 * <p/>
 * The returned Set is read-only and must not be modified.
 *
 * @param clientID of the client
 * @return a {@link com.google.common.util.concurrent.ListenableFuture} which contains all topics the client subscribed to
 */
@ReadOnly
ListenableFuture<Set<Topic>> getLocalTopics(@NotNull String clientID);

/**
 * This method adds a subscription for a certain client to a certain topic.
 * If HiveMQ is connected to a cluster, the subscription will be broadcast to all other Cluster Nodes.
 * <p/>
 * This method is lenient, so if the clientId or the topic
 * is <code>null</code>, nothing will happen.
 *
 * @param clientID client, which should be subscribed
 * @param topic    topic to which the client should be subscribed
 * @return A {@link com.google.common.util.concurrent.ListenableFuture} object that will succeed,
 * as soon es the subscription was added by all Cluster Nodes.
 */
ListenableFuture<Void> addSubscription(@NotNull String clientID, @NotNull Topic topic);

/**
 * This method removes a subscription for a certain client and a certain topic.
 * If HiveMQ is connected to a cluster, the subscription will be removed by other Cluster Nodes as well.
 *
 * @param clientID client, which should get unsubscribed
 * @param topic    topic from which the client should get unsubscribed
 * @return A {@link com.google.common.util.concurrent.ListenableFuture} object that will succeed,
 * as soon es the subscription was removed by all Cluster Nodes.
 */
ListenableFuture<Void> removeSubscription(@NotNull String clientID, @NotNull String topic);

/**
 * This method returns all subscriptions this HiveMQ instance and all other nodes in a HiveMQ cluster,
 * as a {@link com.google.common.collect.Multimap} of client identifiers and topics.
 * <p/>
 * Please be aware that calling this method on HiveMQ instances with many subscriptions could have
 * negative performance effects.
 * <p/>
 * The returned Multimap is read-only and must not be modified.
 *
 * @return a {@link com.google.common.util.concurrent.ListenableFuture} which contains a {@link com.google.common.collect.Multimap} of client identifiers and their topic subscriptions
 */
@ReadOnly
ListenableFuture<Multimap<String, Topic>> getSubscriptions();

/**
 * Returns all MQTT client subscriber identifiers for a given topic, this HiveMQ instance and all other nodes in a HiveMQ cluster.
 * MQTT Wildcards are allowed.
 * <p/>
 * Don't pass <code>null</code> as topic. This method is lenient, so
 * it will just return an empty Set.
 * <p/>
 * The returned Set is read-only and must not be modified.
 *
 * @param topic the topic
 * @return a {@link com.google.common.util.concurrent.ListenableFuture} which contains the client identifiers of all subscribers that subscribed to the topic
 */
@ReadOnly
ListenableFuture<Set<String>> getSubscribers(@NotNull String topic);

/**
 * Returns all topics a client is subscribed to, on this HiveMQ instance and all other nodes in a HiveMQ cluster.
 * <p/>
 * If the client does not exist, an empty Set is returned.
 * <p/>
 * Don't pass <code>null</code> as clientId. This method is lenient, so
 * it will just return an empty Set.
 * <p/>
 * The returned Set is read-only and must not be modified.
 *
 * @param clientID of the client
 * @return a {@link com.google.common.util.concurrent.ListenableFuture} which contains which contains all topics the client subscribed to
 */
@ReadOnly
ListenableFuture<Set<Topic>> getTopics(@NotNull String clientID);

Example Usage

Remove unwanted subscriptions
public void removeTopicSubscriptions(final Topic topicToRemove) {

    Futures.addCallback(subscriptionStore.getSubscriptions(), new FutureCallback<multimap<string, topic="">>() { (1)

        @Override
        public void onSuccess(Multimap<string, topic=""> subscriptions) { (2)(3)

            for (String clientId : subscriptions.keySet()) { (4)

                if (subscriptions.get(clientId).contains(topicToRemove)) { (5)
                    subscriptionStore.removeSubscription(clientId, topicToRemove.getTopic());
                }
            }
        }

        @Override
        public void onFailure(Throwable t) {
            t.printStackTrace();
        }
    });
}
1 The getSubscriptions method returns a ListenableFuture.
2 The callback succeeds, as soon as the subscriptions hab been collected.
3 Get a Multimap with client identifiers as keys and topic subscriptions as value.
4 Iterate all client identifiers.
5 Find all clients that subscribed to a certain topic and remove the subscriptions.

Gotchas

  • The Multimap class is part of the com.google.common.collect API. See the documentation for further information.

  • Topic class objects are equal if their topics are equal, independent of their quality of service (QoS) levels.

  • Subscriptions added in the OnConnectCallback for the connecting clients are removed after the callback execution, if the client is requesting a clean session. Consider using the OnConnackSend callback instead.


Client Service

The Client Service allows plugins to gather information about clients, like whether a client is currently connected or not, its username, client identifier and SSL/TLS certificate, if the client is anonymous or if the client is authenticated.

The Client Service allows to query for client information on all cluster nodes or just the local HiveMQ node.

There are separate "local" methods available in case the broker instance might be connected to a cluster and you want to gather information about this instance only. In case the HiveMQ instance is not connected to a cluster, both methods will return the same results.

Blocking API

/**
 * Returns all identifiers of connected clients of this HiveMQ Node. You won't receive client identifiers of connected
 * clients from other HiveMQ nodes if HiveMQ runs in a cluster.
 * <p/>
 * If you have many client connections, please note that calling this method frequently could have negative performance
 * effects.
 *
 * @return client identifiers of all connected clients
 */
Set<String> getLocalConnectedClients();

/**
 * Returns all disconnected clients which have a persistent MQTT session on this instance of HiveMQ (MQTT clean session=false).
 * <p/>
 * Disconnected MQTT clients which don't have a persistent session won't be returned by this method
 *
 * @return all disconnected clients with a persistent MQTT session
 */
Set<String> getLocalDisconnectedClients();

/**
 * Check if a client with a given identifier is currently connected to this HiveMQ instance.
 *
 * @param clientId client, which should be checked
 * @return true, if a certain client is currently connected and false otherwise
 */
boolean isClientConnectedLocal(String clientId);

/**
 * Returns client information for clients that are connected to this broker instance.
 * <p/>
 * If the client isn't connected, <code>null</code> will be returned.
 *
 * @param clientId the client identifier of the client
 * @return {@link ClientData} for a specific client or <code>null</code> if the client is not connected
 */
@Nullable
ClientData getLocalClientData(String clientId);

/**
 * Returns all identifiers of connected clients of this HiveMQ instance and all other nodes in a HiveMQ cluster
 * <p/>
 * Calling this method frequently in a clustered environment could have negative performance effects.
 *
 * @return client identifiers of all connected clients
 */
Set<String> getConnectedClients();

/**
 * Returns all disconnected clients which have a persistent MQTT session on this broker or any other cluster node.
 * <p/>
 * Disconnected MQTT clients which don't have a persistent session won't be returned by this method
 *
 * @return all disconnected clients with a persistent MQTT session
 */
Set<String> getDisconnectedClients();

/**
 * Check if a client with a given identifier is currently connected to this HiveMQ broker instance or any other instance in the cluster.
 *
 * @param clientId client, which should be checked
 * @return true, if a certain client is currently connected and false otherwise
 */
boolean isClientConnected(String clientId);

/**
 * Returns additional client information about a given client with a given client identifier.
 * <p/>
 * This method will also get client information from other cluster nodes if needed.
 * <p/>
 * If the client isn't connected, <code>null</code> will be returned.
 *
 * @param clientId the client identifier of the client
 * @return {@link ClientData} for a specific client or <code>null</code> if the client is not connected
 */
@Nullable
ClientData getClientData(String clientId);

Example Usage

This section discusses common examples on how to use the Client Service in your own plugins.

Inject the Client Server
import com.hivemq.spi.services.BlockingClientService;

import javax.inject.Inject;

public class MyPlugin {

    private final BlockingClientService clientService;

    @Inject
    public MyPlugin(final BlockingClientService clientService) { (1)
        this.clientService = clientService;
    }
}
1 The client service implementation is injected by the HiveMQ core.


Check if a client is connected
final boolean clientConnected = clientService.isClientConnected(client);
if (clientConnected) {
    log.info("Client {} is connected {}.");
}


Async API

/**
 * Returns all identifiers of connected clients of this HiveMQ Node. You won't receive client identifiers of connected
 * clients from other HiveMQ nodes if HiveMQ runs in a cluster.
 * <p/>
 * If you have many client connections, please note that calling this method frequently could have negative performance
 * effects.
 *
 * @return a {@link com.google.common.util.concurrent.ListenableFuture} which contains all client identifiers of all connected clients
 */
ListenableFuture<Set<String>> getLocalConnectedClients();

/**
 * Returns all disconnected clients which have a persistent MQTT session on this instance of HiveMQ (MQTT clean session=false).
 * <p/>
 * Disconnected MQTT clients which don't have a persistent session won't be returned by this method
 *
 * @return a {@link com.google.common.util.concurrent.ListenableFuture} which contains the client identifiers of all disconnected clients with a persistent MQTT session
 */
ListenableFuture<Set<String>> getLocalDisconnectedClients();

/**
 * Check if a client with a given identifier is currently connected to this HiveMQ instance.
 *
 * @param clientId client, which should be checked
 * @return a {@link com.google.common.util.concurrent.ListenableFuture} which contains true, if a certain client is currently connected and false otherwise
 */
ListenableFuture<Boolean> isClientConnectedLocal(String clientId);

/**
 * Returns client information for clients that are connected to this broker instance.
 * <p/>
 * If the client isn't connected, you will receive an {@link Optional} with absent data.
 *
 * @param clientId the client identifier of the client
 * @return a {@link com.google.common.util.concurrent.ListenableFuture} which contains the {@link ClientData} for a specific client.
 */
ListenableFuture<ClientData> getLocalClientData(String clientId);

/**
 * Returns all identifiers of connected clients of this HiveMQ instance and all other nodes in a HiveMQ cluster
 * <p/>
 * Calling this method frequently in a clustered environment could have negative performance effects.
 *
 * @return a {@link com.google.common.util.concurrent.ListenableFuture} which contains the client identifiers of all connected clients
 */
ListenableFuture<Set<String>> getConnectedClients();

/**
 * Returns all disconnected clients which have a persistent MQTT session on this broker or any other cluster node.
 * <p/>
 * Disconnected MQTT clients which don't have a persistent session won't be returned by this method
 *
 * @return a {@link com.google.common.util.concurrent.ListenableFuture} which contains the of client identifiers of all disconnected clients with a persistent MQTT session
 */
ListenableFuture<Set<String>> getDisconnectedClients();

/**
 * Check if a client with a given identifier is currently connected to this HiveMQ broker instance or any other instance in the cluster.
 *
 * @param clientId client, which should be checked
 * @return a {@link com.google.common.util.concurrent.ListenableFuture} which contains true, if a certain client is currently connected and false otherwise
 */
ListenableFuture<Boolean> isClientConnected(String clientId);

/**
 * Returns additional client information about a given client with a given client identifier.
 * <p/>
 * This method will also get client information from other cluster nodes if needed.
 * <p/>
 * If the client isn't connected, you will receive an {@link Optional} with absent data.
 *
 * @param clientId the client identifier of the client
 * @return a {@link com.google.common.util.concurrent.ListenableFuture} which contains the {@link ClientData} for a specific client.
 */
ListenableFuture<ClientData> getClientData(String clientId);

Example Usage

Log Client Information
private void logClientData(String clientId) {

    Futures.addCallback(clientService.getClientDataForClientId(clientId), new FutureCallback<optional<clientdata>>() {

        @Override
        public void onSuccess(ClientData result) {

            if (result != null) {
                ClientData clientData = result.get();
                logger.info(clientData.getUsername() + " " + clientData.getClientId() + " " + clientData.isAuthenticated());
            }
        }

        @Override
        public void onFailure(Throwable t) {
            t.printStackTrace();
        }
    });
}


List Connected Clients
private void logConnectedClients() {
    Futures.addCallback(clientService.getConnectedClients(), new FutureCallback<Set<string>>() {
        @Override
        public void onSuccess(Set<string> connectedClients) {
            for (String connectedClient : connectedClients) {
                logger.info("client {} is currently connected.", connectedClient);
            }
        }

        @Override
        public void onFailure(Throwable t) {
            t.printStackTrace();
        }
    });
}
Gotchas
  • If a MQTT client is connected with cleanSession=false, all information about the client will be lost as soon as the client disconnects. This means the client will not be considered as "disconnected client" for the getDisconnectedClients method.


Publish Service

The Publish Service is used for the automated sending of MQTT publish messages in HiveMQ plugins

API

     /**
     * Publishes a new MQTT {@link PUBLISH} message. The standard MQTT topic matching mechanism of HiveMQ will apply.
     * <p/>
     * If the given {@link PUBLISH} or any of its information (topic,qos,message) is null, a {@link NullPointerException}
     * will be thrown
     *
     * @param publish object with topic, QoS and message, which should be published to all subscribed clients
     * @throws NullPointerException if the given object is <code>null</code> or any relevant information like topic, qos
     *                              or message is <code>null</code>
     */
    void publish(@NotNull PUBLISH publish);


    /**
     * Publishes a new MQTT {@link PUBLISH} message.
     * The PUBLISH will only be delivered to the client with the specified client identifier.
     * Also the client needs to be subscribed on the topic of the PUBLISH in order to receive it.
     * <p/>
     * If the given {@link PUBLISH} or any of its information (topic,qos,message) is null, a {@link NullPointerException}
     * will be thrown
     *
     * @param publish object with topic, QoS and message, which should be published to all subscribed clients
     * @throws NullPointerException if the given object is <code>null</code> or any relevant information like topic, qos
     *                              or message is <code>null</code>
     */
    void publishtoClient(@NotNull PUBLISH publish, @NotNull String clientId);

Example Usage

This section discusses common examples on how to use the PublishService in your own plugins.

Copy and redirect a PUBLISH message to a subtopic
    @Override
    public void onPublishReceived(PUBLISH publish, ClientData clientData)
                                        throws OnPublishReceivedException {

        PUBLISH copy = PUBLISH.copy(publish); (1)
        copy.setTopic(publish.getTopic()+"/"+subTopic);
        publishService.publish(copy); (2)
    }
1 Make a copy of the incoming message.
2 Publish the copied message another topic.
Deep copy
If you want to modify the original MQTT PUBLISH message, always make a deep copy with PUBLISH.copy of the original PUBLISH object for further modifications.


Publish a new MQTT message
    public void publishNewMessage() {
        final PUBLISH message = new PUBLISH(); (1)
        message.setQoS(QoS.EXACTLY_ONCE);
        message.setTopic("the/topic");
        message.setPayload("payload".getBytes(Charsets.UTF_8));
        message.setRetain(true);

        publishService.publish(message); (2)
    }
1 Create a new PUBLISH object.
2 Publish the message.

Gotchas

  • If you don’t want to modify the original MQTT PUBLISH message, always make a deep copy with PUBLISH.copy of the original PUBLISH object for further modifications.

  • The publishToClient method is rather expensive. Don’t use this method excessively.

$SYS Topic Service

The $SYS Topic Service is used for querying, adding, removing and modifying $SYS Topics at runtime.

$SYS topic publishes are executed periodically if the HiveMQ $SYS topic plugin is installed.

The HiveMQ SYS Topic Plugin is activated by default. The source code is open source under a business friendly Apache 2 license and it’s recommended to study the source code if you want to learn how to use the SYS topic for your own plugins.

Custom $SYS topics can be hooked into the HiveMQ source with this service. When the default $SYS topic plugin is installed, clients can not publish to $SYS topics.

API

    /**
     * Returns a Collection of all {@link SYSTopicEntry}s topics available.
     * It is possible to filter the returned list by {@link Type}.
     * By default, this method will return all {@link SYSTopicEntry}s.
     *
     * @param types the {@link Type}s to filter
     * @return a Collection of all $SYS topics registered to HiveMQ of the given Types
     */
    Collection<SYSTopicEntry> getAllEntries(Type... types);

    /**
     * Checks if the SYSTopicService contains a given {@link SYSTopicEntry}.
     *
     * @param entry the {@link SYSTopicEntry} to check
     * @return <code>true</code> if the Service contains the given {@link SYSTopicEntry},
     * <code>false</code> otherwise
     */
    boolean contains(final SYSTopicEntry entry);

    /**
     * Returns the {@link SYSTopicEntry} for a given topic.
     *
     * @param topic the topic
     * @return an {@link Optional} of a {@link SYSTopicEntry}
     */
    Optional<SYSTopicEntry> getEntry(String topic);

    /**
     * Adds a new $SYS topic entry.
     * The topic of the given {@link SYSTopicEntry} must start with '$SYS'.
     *
     * @param entry a new {@link SYSTopicEntry} to add
     */
    void addEntry(final SYSTopicEntry entry);

    /**
     * Removes a {@link SYSTopicEntry}
     *
     * @param entry the entry to remove
     * @return <code>true</code> if the {@link SYSTopicEntry} could be removed, <code>false</code> otherwise
     */
    boolean removeEntry(final SYSTopicEntry entry);

    /**
     * Triggers a PUBLISH of all registered SYSTopicEntries with {@link Type} STANDARD to all clients which are subscribed to the SYSTopics
     *
     * @since 3.0
     */
    void triggerStandardSysTopicPublish();

    /**
     * Triggers a PUBLISH of all registered SYSTopicEntries with {@link Type} STATIC to a specified client.
     * The client receives all messages from topics it is subscribed on.
     *
     * @param clientId The clientid for which to trigger the publishes
     * @since 3.0
     */
    void triggerStaticSysTopicPublishToClient(final String clientId);

    /**
     * Triggers a PUBLISH of all registered SYSTopicEntries with {@link Type} STANDARD to a specified client.
     * The client receives all messages from topics it is subscribed on.
     *
     * @param clientId The clientid for which to trigger the publishes
     * @since 3.0
     */
    void triggerStandardSysTopicPublishToClient(final String clientId);

Example Usage

Add a new $SYS Topic at runtime
 sysTopicService.addEntry(new SYSTopicEntry() {
         @Override
         public String topic() {
             return "$SYS/new";
         }

         @Override
         public Supplier<byte[]> payload() {
             return Suppliers.ofInstance("test".getBytes("UTF-8"));
         }

         @Override
         public Type type() {
             return Type.STANDARD;
         }

         @Override
         public String description() {
             return "the description";
         }
 });


Remove all static $SYS topics at runtime
 final Collection<SYSTopicEntry> allStaticEntries = sysTopicService.getAllEntries(Type.STATIC);

 for (SYSTopicEntry entry : allStaticEntries) {
     sysTopicService.removeEntry(entry);
 }


Get payload for a specific $SYS topic at runtime
 final Optional<SYSTopicEntry> entry = sysTopicService.getEntry("$SYS/broker/uptime");

 if (entry.isPresent()) {
     log.info("Uptime is {}", new String(entry.get().payload().get(), Charsets.UTF_8));
 }


Send standard sys topic every minute
@Override
public void onBrokerStart() throws BrokerUnableToStartException {
    sysTopicRegistration.registerSysTopics();

    final long publishInterval = 60;
    pluginExecutorService.scheduleAtFixedRate(new TriggerDynamicSysTopic(), publishInterval, publishInterval, TimeUnit.SECONDS);

}

private class TriggerDynamicSysTopic implements Runnable {

    @Override
    public void run() {
        sysTopicService.triggerStandardSysTopicPublish();
    }
}

Gotchas

  • To add custom topics to the $SYS topic tree, these topics must start with $SYS/.

  • As long as the HiveMQ SYS Topic Plugin is located in the plugin folder, $SYS topic messages will be published automatically.

Plugin Executor Service

Many MQTT integrations depend on expensive (in terms of CPU time) operations like:

  • Calling webservices

  • Persisting or querying data from a database

  • Writing data to disk

  • Any other blocking operation

The main paradigm in HiveMQ plugins is that blocking is not allowed. But what to do if your plugin business logic depends on blocking operations? In order to make these operations asynchronous, use the HiveMQ managed PluginExecutorService. This HiveMQ-managed executor service is shared between all HiveMQ plugins and can be monitored by the standard HiveMQ monitoring system.

The PluginExecutorService is sophisticated implementation which can be used as a ListeningExecutorService. This allows to use a callback based Future handling for true non-blocking behaviour.

The plugin executor service also allows to schedule tasks periodically.

It’s important that you never create your own thread pools. This can decrease the performance of HiveMQ significantly. This is especially true for Java Cached Thread pools, since these increase Java threads without any limit and can make your system unresponsive. If you’re using a library which uses these threadpools (like the Jersey Client library), make sure to limit the amount of threads!

Inject and use Plugin Executor Service
import com.google.inject.Inject;
import com.hivemq.spi.callback.CallbackPriority;
import com.hivemq.spi.callback.events.OnConnectCallback;
import com.hivemq.spi.callback.exception.RefusedConnectionException;
import com.hivemq.spi.message.CONNECT;
import com.hivemq.spi.security.ClientData;
import com.hivemq.spi.services.PluginExecutorService;

public class ClientConnect implements OnConnectCallback {

    private final PluginExecutorService pluginExecutorService;

    @Inject
    public ClientConnect(PluginExecutorService pluginExecutorService) {
        this.pluginExecutorService = pluginExecutorService;
    }

    @Override
    public void onConnect(CONNECT connect, ClientData clientData) throws RefusedConnectionException {
        pluginExecutorService.execute(new Runnable() {

            @Override
            public void run() {
                expensiveTask();
            }
        });
    }

    private void expensiveTask(){
        ...
    }

    @Override
    public int priority() {
        return CallbackPriority.MEDIUM;
    }
}


Log incoming publishes per minute
@Override
public void onBrokerStart() throws BrokerUnableToStartException {
    pluginExecutorService.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            Meter incomingPublishRate = metricService.getHiveMQMetric(HiveMQMetrics.INCOMING_PUBLISH_RATE);
            logger.info("Incoming publishes last minute = {}", incomingPublishRate.getOneMinuteRate());
        }
    }, 1, 1, TimeUnit.MINUTES);
}


Adding a callback for return values
private void methodWithListenableFuture() {
    final ListenableFuture<Object> result = pluginExecutorService.submit(new Callable<Object>() {
          @Override
          public Object call() throws Exception {
              return expensiveTask();
          }
      });

      Futures.addCallback(result, new FutureCallback<Object>() {
          @Override
          public void onSuccess(@Nullable Object result) {
              log.info("Expensive result {}", result.toString());
          }

          @Override
          public void onFailure(Throwable t) {
              t.printStackTrace();
          }
      });
}

private Object expensiveTask() {
    ...
}

Log Service

The Log Service enables the plugin, to change the log level of internal logger of HiveMQ at runtime pragmatically.

API

public interface LogService {

    /**
     * The loglevels HiveMQ supports
     */
    enum LogLevel {

        /**
         * The TRACE log level for finest HiveMQ logging
         */
        TRACE,
        /**
         * The DEBUG log level for fine HiveMQ logging
         */
        DEBUG,
        /**
         * The INFO log level. INFO logging is the default HiveMQ log behaviour
         */
        INFO,
        /**
         * The WARN log level which only logs warnings
         */
        WARN,
        /**
         * The ERROR log level which only logs severe HiveMQ errors
         */
        ERROR

    }

    /**
     * Changes the log level of the internal HiveMQ logger
     * <p/>
     * This does not support <code>null</code> parameters. If you pass
     * <code>null</code>, this method is lenient and will ignore the parameter
     *
     * @param logLevel the new loglevel
     */
    void setLogLevel(@NotNull LogLevel logLevel);

    /**
     * Returns the current log level of the internal HiveMQ logger
     *
     * @return the current log level of the internal HiveMQ logger
     */
    LogLevel getLogLevel();
}

Example Usage

Set log level to debug if plugin is deployed
import com.google.inject.Inject;
import com.hivemq.spi.callback.CallbackPriority;
import com.hivemq.spi.callback.events.broker.OnBrokerStart;
import com.hivemq.spi.callback.exception.BrokerUnableToStartException;
import com.hivemq.spi.services.LogService;

public class HiveMQStart implements OnBrokerStart {

    private final LogService logService;

    @Inject
    public HiveMQStart(LogService logService) {
        this.logService = logService;
    }

    @Override
    public void onBrokerStart() throws BrokerUnableToStartException {
        if (logService.getLogLevel() != LogService.LogLevel.DEBUG) {
            logService.setLogLevel(LogService.LogLevel.DEBUG);
        }
    }

    @Override
    public int priority() {
        return CallbackPriority.MEDIUM;
    }
}

Metric Service

HiveMQ 3 introduced a new, holistic monitoring model. While HiveMQ 2 recalculated metrics at specific points of time, HiveMQ 3 metrics are always current and accurate.

HiveMQ uses the de-facto standard Java metrics library Dropwizard Metrics, so plugin developers can benefit from the whole ecosystem about that library and writing custom integrations is a breeze.

The MetricService of HiveMQ can be used to gather information about pre-defined HiveMQ metrics or can be used to hook in custom metrics defined by your plugin. There’s no artificial wrapper, so you can use the feature-rich Dropwizard Metrics library directly.

For gathering pre-defined HiveMQ metrics, the constant class com.hivemq.spi.metrics.HiveMQMetrics can be used.

Blocking API

/**
 * Returns a specific HiveMQ metric. If the metric does not exist, this method will return
 * <code>null</code>.
 * <p/>
 * For a list of all available metrics, refer to the {@link com.hivemq.spi.metrics.HiveMQMetrics} constant class.
 *
 * @param metric the metric
 * @param <T>    the metric type
 * @return the metric (if available) or <code>null</code>
 */
@Nullable
<T extends Metric> T getHiveMQMetric(HiveMQMetric<T> metric);

/**
 * Returns a specific HiveMQ metric. If the metric does not exist, this method will return
 * <code>null</code>.
 * <p/>
 * For a list of all available metrics, refer to the {@link com.hivemq.spi.metrics.HiveMQMetrics} constant class.
 *
 * @param metric the metric
 * @param <T>    the metric type
 * @return the metric (if available) or <code>null</code>
 */
@Nullable
<T extends Metric> Map<String, T> getClusterMetric(HiveMQMetric<T> metric);

/**
 * Returns the metric registry of HiveMQ.
 *
 * @return the metric registry
 */
MetricRegistry getMetricRegistry();

Example Usage

Monitor specific metrics
private void logConnectionsHistogram() {
    Histogram connections = metricService.getHiveMQMetric(HiveMQMetrics.CONNECTIONS_OVERALL_MEAN);

    final Snapshot snapshot = connections.getSnapshot();

    logger.info("Mean: {}ms", snapshot.getMean() / 1000 / 1000);
    logger.info("StdDev: {}ms", snapshot.getStdDev() / 1000 / 1000);
    logger.info("Min: {}ms", snapshot.getMin() / 1000 / 1000);
    logger.info("Max: {}ms", snapshot.getMax() / 1000 / 1000);
    logger.info("Median: {}ms", snapshot.getMedian() / 1000 / 1000);
    logger.info("75th percentile: {}ms", snapshot.get75thPercentile() / 1000 / 1000);
    logger.info("95th percentile: {}ms", snapshot.get95thPercentile() / 1000 / 1000);
    logger.info("98th percentile: {}ms", snapshot.get98thPercentile() / 1000 / 1000);
    logger.info("99th percentile: {}ms", snapshot.get99thPercentile() / 1000 / 1000);
    logger.info("999th percentile: {}ms", snapshot.get999thPercentile() / 1000 / 1000);
}
Add custom metrics
private void addCustomMetrics() {

    //Adding different metric types
    final Meter meter = metricService.getMetricRegistry().meter("my-meter");
    final Timer timer = metricService.getMetricRegistry().timer("my-timer");
    final Counter counter = metricService.getMetricRegistry().counter("my-counter");
    final Histogram histogram = metricService.getMetricRegistry().histogram("my-histogram");


    //Remove a metric
    metricService.getMetricRegistry().remove("my-meter");
}

Async API

/**
 * Returns a specific HiveMQ metric. If the metric does not exist, this method will return
 * <code>null</code>.
 * <p/>
 * For a list of all available metrics, refer to the {@link com.hivemq.spi.metrics.HiveMQMetrics} constant class.
 *
 * @param metric the metric
 * @param <T>    the metric type
 * @return a {@link com.google.common.util.concurrent.ListenableFuture} with the metric (if available) or a <code>null</code> result.
 */
<T extends Metric> ListenableFuture<T> getHiveMQMetric(HiveMQMetric<T> metric);

/**
 * Returns a specific HiveMQ metric. If the metric does not exist, this method will return
 * <code>null</code>.
 * <p/>
 * For a list of all available metrics, refer to the {@link com.hivemq.spi.metrics.HiveMQMetrics} constant class.
 *
 * @param metric the metric
 * @param <T>    the metric type
 * @return a {@link com.google.common.util.concurrent.ListenableFuture} with the metric (if available) or a <code>null</code> result.
 */
<T extends Metric> ListenableFuture<Map<String, T>> getClusterMetric(HiveMQMetric<T> metric);

/**
 * Returns the metric registry of HiveMQ.
 *
 * @return the metric registry
 */
MetricRegistry getMetricRegistry();

Example Usage

Monitor specific metrics
ListenableFuture<Counter> future = metricService
        .getHiveMQMetric(HiveMQMetrics.INCOMING_DISCONNECT_COUNT);
Futures.addCallback(future, new FutureCallback<Counter>() {
    @Override
    public void onSuccess(Counter counter) {
        log.info("Incoming disconnects = {}", counter.getCount());
    }

    @Override
    public void onFailure(Throwable throwable) {
        throwable.printStackTrace();
    }
});

Configuration Service

HiveMQ configurations are typically set at startup time via the config.xml configuration file. However, it may be desirable to change configurations at runtime. Most of the configurations of HiveMQ can be changed via the plugin system at runtime via the different configuration services.

The following table shows the available configuration services

Table 6. Configuration Services
Name Purpose API

GeneralConfigurationService

General HiveMQ configurations

Available on Github

ListenerConfigurationService

Allows to bind new listeners at runtime

Available on Github

MqttConfigurationService

MQTT specific configurations like retryIntervals, max client identifier length, …​

Available on Github

ThrottlingConfigurationService

Configurations to throttle traffic globally or to limit the maximum amount of MQTT connections

Available on Github

In order to use one of the concrete subservices in your plugin, you can use the following approaches:

  • Inject the desired configuration subservice directly

  • Inject the ConfigurationService interface and use your desired configuration service via this interface.

API

public interface ConfigurationService {

    /**
     * Returns the configuration service for general HiveMQ configuration
     *
     * @return the general HiveMQ configuration service
     */
    GeneralConfigurationService generalConfiguration();

    /**
     * Returns the configuration service which allows to add and inspect listeners add runtime.
     *
     * @return the listener configuration service
     */
    ListenerConfigurationService listenerConfiguration();

    /**
     * Returns the configuration service for MQTT configuration
     *
     * @return the mqtt configuration service
     */
    MqttConfigurationService mqttConfiguration();

    /**
     * Returns the throttling configuration service for global throttling
     *
     * @return the global throttling service
     */
    ThrottlingConfigurationService throttlingConfiguration();

}

REST Service

API

public interface RESTService {

    /**
     * Adds a new listener to the REST Service programmatically.
     * <p/>
     * Note: Typically the listeners are added via the configuration file. If adding
     * the configuration via the config file does not suit your needs, the programmatic API
     * offers a convenient way to add an arbitrary number of listeners at runtime.
     * <p/>
     * Note: If you add listeners at runtime, existing servlets or JAX-RS resource on other listeners won't be added
     * to this new listener automatically.
     *
     * @param listener the {@link Listener} implementation
     * @throws NullPointerException     if the passed listener is <code>null</code>
     * @throws IllegalArgumentException if the passed listener is not a valid listener type (like {@link HttpListener}
     */
    void addListener(@NotNull Listener listener);

    /**
     * Returns an immutable view of all listeners.
     * <p/>
     * In order to get more information you need to downcast to a specific listener type. You can do this e.g. by
     * code such as
     * <pre>
     * if (listener instanceof HttpListener) {
     *      HttpListener httpListener = (HttpListener) listener;
     * }
     * </pre>
     *
     * @return an immutable view of all listeners
     */
    @ReadOnly
    Collection<Listener> getListeners();

    /**
     * Adds a servlet instance to a specific path and adds the servlet to all available listeners.
     *
     * @param servlet the servlet to add
     * @param path    the path to bind the servlet to
     * @throws NullPointerException if the servlet or the path is null
     */
    void addServlet(@NotNull HttpServlet servlet, @NotNull String path);

    /**
     * Adds a servlet instance to a specific path and adds the servlet to all specified listeners. If the
     * collection of listeners is empty, the servlet will be added to all available listeners
     *
     * @param servlet             the servlet to add
     * @param path                the path to bind the servlet to
     * @param listenerIdentifiers a collection with identifiers of listeners
     * @throws NullPointerException if the servlet, the path or the listener identifiers collection is null
     */
    void addServlet(@NotNull HttpServlet servlet, @NotNull String path, @NotNull Collection<String> listenerIdentifiers);


    /**
     * Adds a servlet to a specific path and adds the servlet to all available listeners.
     * <p/>
     * The given servlet class will be instantiated by HiveMQ and the servlet can use dependency injection.
     * <p/>
     * The servlet will be instantiated once and not per request, so it's essentially a singleton.
     *
     * @param servlet the servlet to add
     * @param path    the path to bind the servlet to
     * @return The instantiated servlet
     * @throws NullPointerException if the servlet or the path is null
     */
    <T extends HttpServlet> T addServlet(@NotNull Class<T> servlet, @NotNull String path);

    /**
     * Adds a servlet to a specific path and adds the servlet to all specified listeners. If the
     * collection of listeners is empty, the servlet will be added to all available listeners
     * <p/>
     * The given servlet class will be instantiated by HiveMQ and the servlet can use dependency injection.
     * <p/>
     * The servlet will be instantiated once and not per request, so it's essentially a singleton.
     *
     * @param servlet             the servlet to add
     * @param path                the path to bind the servlet to
     * @param listenerIdentifiers a collection with identifiers of listeners
     * @return The instantiated servlet
     * @throws NullPointerException if the servlet or the path is null
     */
    <T extends HttpServlet> T addServlet(@NotNull Class<T> servlet, @NotNull String path, @NotNull Collection<String> listenerIdentifiers);


    /**
     * Adds a specific {@link ServletFilter} with a given path to the RESTService on all available listeners.
     *
     * @param filter the {@link ServletFilter}
     * @param path   the Path
     * @param <T>    the {@link Filter} which is contained in the {@link ServletFilter}
     * @return the concrete {@link Filter} instance
     * @throws NullPointerException if the {@link ServletFilter} or path is null
     */
    <T extends Filter> T addFilter(@NotNull ServletFilter<T> filter, @NotNull String path);

    /**
     * Adds a specific {@link ServletFilter} with a given path to the RESTService on specific listeners.
     *
     * @param filter    the {@link ServletFilter}
     * @param path      the Path
     * @param listeners a collection of listeners
     * @param <T>       the {@link Filter} which is contained in the {@link ServletFilter}
     * @return the concrete {@link Filter} instance
     * @throws NullPointerException if the {@link ServletFilter}, path or listener collection is null
     */
    <T extends Filter> T addFilter(@NotNull ServletFilter<T> filter, @NotNull String path, @NotNull Collection<String> listeners);


    /**
     * Adds a servlet instance to a specific path and adds the servlet to all available listeners.
     * <p/>
     * Additionally a servlet filter is added directly to the servlet path mapping. This is a convenient
     * method if you need a specific filter only for one servlet
     *
     * @param servlet the servlet to add
     * @param path    the path to bind the servlet to
     * @param filters an arbitrary amount of filters for this specific servlet
     * @throws NullPointerException if the servlet, the path or the filter array is null
     */
    @SuppressWarnings("unchecked")
    void addServletWithFilters(@NotNull HttpServlet servlet, @NotNull String path, @NotNull ServletFilter<? extends Filter>... filters);

    /**
     * Adds a servlet instance to a specific path and adds the servlet to all specific listeners.
     * <p/>
     * Additionally a servlet filter is added directly to the servlet path mapping. This is a convenient
     * method if you need a specific filter only for one servlet and only on specific listeners
     *
     * @param servlet             the servlet to add
     * @param path                the path to bind the servlet to
     * @param listenerIdentifiers a collection of listener identifierd
     * @param filters             an arbitrary amount of filters for this specific servlet
     * @throws NullPointerException if the servlet, the path, the listener collection or the filter array is null
     */
    @SuppressWarnings("unchecked")
    void addServletWithFilters(@NotNull HttpServlet servlet, @NotNull String path, @NotNull Collection<String> listenerIdentifiers, @NotNull ServletFilter<? extends Filter>... filters);

    /**
     * Adds a servlet to a specific path and adds the servlet to all available listeners. If the
     * collection of listeners is empty, the servlet will be added to all available listeners
     * <p/>
     * The given servlet class will be instantiated by HiveMQ and the servlet can use dependency injection.
     * <p/>
     * Additionally a servlet filter is added directly to the servlet path mapping. This is a convenient
     * method if you need a specific filter only for one servlet
     *
     * @param servlet the servlet to add
     * @param path    the path to bind the servlet to
     * @param filters an arbitrary amount of filters for this specific servlet
     * @return the instantiated servlet
     * @throws NullPointerException if the servlet, the path or the filter array is null
     */
    @SuppressWarnings("unchecked")
    <T extends HttpServlet> T addServletWithFilters(@NotNull Class<T> servlet, @NotNull String path, @NotNull ServletFilter<? extends Filter>... filters);

    /**
     * Adds a servlet to a specific path and adds the servlet to all specified listeners. If the
     * collection of listeners is empty, the servlet will be added to all available listeners
     * <p/>
     * The given servlet class will be instantiated by HiveMQ and the servlet can use dependency injection.
     * <p/>
     * Additionally a servlet filter is added directly to the servlet path mapping. This is a convenient
     * method if you need a specific filter only for one servlet
     *
     * @param servlet             the servlet to add
     * @param path                the path to bind the servlet to
     * @param listenerIdentifiers the collection of listeners
     * @param filters             an arbitrary amount of filters for this specific servlet
     * @return the instantiated servlet
     * @throws NullPointerException if the servlet, the path or the filter array is null
     */
    @SuppressWarnings("unchecked")
    <T extends HttpServlet> T addServletWithFilters(@NotNull Class<T> servlet, @NotNull String path, @NotNull Collection<String> listenerIdentifiers, @NotNull ServletFilter<? extends Filter>... filters);


    /**
     * Adds a {@link Application} to the RESTService.
     * <p/>
     * Please be aware that when using this method, only all properties, classes and singletons are
     * added to the RESTService, the {@link Application} and all annotations on the {@link} Application
     * are ignored. So essentially this is a convenient method which allows you to add a lot of resources at once
     * <p/>
     * Important: {@link javax.ws.rs.ApplicationPath} annotations are ignored.
     * <p/>
     * All resources defined in the Application will be added to all available listeners.
     *
     * @param application the {@link Application}
     * @throws NullPointerException if the passed {@link Application} is null
     */
    void addJaxRsApplication(@NotNull Application application);

    /**
     * Adds a {@link Application} to the RESTService.
     * <p/>
     * Please be aware that when using this method, only all properties, classes and singletons are
     * added to the RESTService, the {@link Application} and all annotations on the {@link} Application
     * are ignored. So essentially this is a convenient method which allows you to add a lot of resources at once
     * <p/>
     * Important: {@link javax.ws.rs.ApplicationPath} annotations are ignored.
     * <p/>
     * All resources defined in the Application will be added to all specified listeners.
     *
     * @param application         the {@link Application}
     * @param listenerIdentifiers a collection of listeners
     * @throws NullPointerException if the passed {@link Application} or the collection of listeners is null
     */
    void addJaxRsApplication(@NotNull Application application, @NotNull Collection<String> listenerIdentifiers);

    /**
     * Adds all given JAX-RS resources as singleton to all available listeners.
     * Since you have to instantiate the singleton objects on your own,
     * these singletons can't use dependency injection by HiveMQ and you have to pass your dependencies on your own
     * to the objects.
     * <p/>
     * If you want to have singletons which use dependency injection, consider using another method of the RESTService
     * which accepts classes instead of objects. You can annotate these methods with {@link javax.inject.Singleton}
     *
     * @param singletons an arbitrary number of singleton resources
     * @throws NullPointerException if the singleton array is null
     */
    void addJaxRsSingletons(@NotNull Object... singletons);

    /**
     * Adds all given JAX-RS resources as singleton to all specified listeners.
     * Since you have to instantiate the singleton objects on your own,
     * these singletons can't use dependency injection by HiveMQ and you have to pass your dependencies on your own
     * to the objects.
     * <p/>
     * If you want to have singletons which use dependency injection, consider using another method of the RESTService
     * which accepts classes instead of objects. You can annotate these methods with {@link javax.inject.Singleton}
     *
     * @param singletons          an collection of singleton resources
     * @param listenerIdentifiers a collection of listeners
     * @throws NullPointerException if the singleton array is null
     */
    void addJaxRsSingletons(@NotNull Collection<Object> singletons, @NotNull Collection<String> listenerIdentifiers);

    /**
     * Adds an arbitrary number of JAX-RS resources to all available listeners.
     * These resources can use dependency injection and HiveMQ will instantiate the resources for you at runtime
     *
     * @param resources a arbitrary number of JAX-RS resource classes
     * @throws NullPointerException if the resources array is null
     */
    void addJaxRsResources(@NotNull Class<?>... resources);

    /**
     * Adds an arbitrary number of JAX-RS resources to all specified listeners.
     * These resources can use dependency injection and HiveMQ will instantiate the resources for you at runtime
     *
     * @param resources           a collection of JAX-RS resource classes
     * @param listenerIdentifiers a collection of listeners
     * @throws NullPointerException if the resources array or the listener collection is null
     */
    void addJaxRsResources(@NotNull Collection<Class<?>> resources, @NotNull Collection<String> listenerIdentifiers);


}

Deprecated Services

The following services are deprecated since 3.1. Most of these services are replaced by a Blocking/Async version of the original service. Shared subscriptions can now be created with the Blocking Subscription Store or the Async Subscription Store , due to the changes to the shared subscription concept in 3.1. See the HiveMQ documentation for more information on shared subscriptions.

Table 7. Deprecated HiveMQ Services
Name Available since Description

Retained Message Store Service

1.5

Gives access to retained messages and lets plugins read, create, delete and modify retained messages

Subscription Store Service

1.5

Gives access to subscriptions and lets plugins read, create, delete and modify subscriptions

Client Service

1.5

Gives access to information about connected clients and clients with a persistent session

Metric Service

3.0

Allows to inspect and add custom HiveMQ metrics at runtime.

Shared Subscription Service

3.0

Allows to add and remove shared subscriptions at runtime.

Retained Message Store Service

The Retained Message Store enables HiveMQ plugins to directly add and remove retained messages from HiveMQs retained message store. It’s also possible to retrieve a certain retained message or the amount of messages that are currently stored by HiveMQ.

API

Retained Message Store API
           /**
            * @return all retained messages which are currently stored
            */
           public Set<RetainedMessage> getRetainedMessages();

           /**
            * @param topic a topic
            * @return retained message for the specific topic, otherwise an Optional
            *         instance with an empty reference
            */
           public Optional<RetainedMessage> getRetainedMessage(String topic);

           /**
            * Removes the retained message from given topic. null values are ignored.
            * If there isn't any retained message on the topic yet, nothing will happen.
            *
            * @param topic from which the message should be removed
            */
           public void remove(String topic);

           /**
            * Removes a given retained message. null values are ignored.
            * If the given retained message doesn't exist, nothing will happen.
            *
            * @param retainedMessage which should be removed
            */
           public void remove(RetainedMessage retainedMessage);

           /**
            * Removes all retained messages from the message store.
            */
           public void clear();

           /**
            * This method adds or replaces a retained message
            *
            * @param retainedMessage which should be added or replaced
            */
           public void addOrReplace(RetainedMessage retainedMessage);

           /**
            * Checks if the retained message is present in the retained message store.
            * Only the topic is of a retained message is considered for this check, QoS and message are ignored.
            *
            * @param retainedMessage to check if it's already in the message store
            * @return true if there's already a message on the topic of the given retained message
            */
           public boolean contains(RetainedMessage retainedMessage);

           /**
            * @return the number of all retained messages
            */
           public int size();

Example Usage

This section discusses common examples on how to use the Retained Message Store in your own plugins.

Inject the Message Store
import com.hivemq.spi.services.RetainedMessageStore;

import javax.inject.Inject;

public class MyPlugin {

    private final RetainedMessageStore retainedMessageStore;

    @Inject
    public MyPlugin(final RetainedMessageStore retainedMessageStore) { (1)
        this.retainedMessageStore = retainedMessageStore;
    }
}
1 The message store implementation is injected by the HiveMQ core.


Programmatically add a new Retained Message
    public void addRetainedMessage(String topic, String message){

        if(!retainedMessageStore.contains(topic)) (1)
            retainedMessageStore.addOrReplace(new RetainedMessage
                                 (topic, message.getBytes(), QoS.valueOf(1)));(2)
    }
1 Check if there is currently a retained message for a certain topic.
2 Add a retained message to the topic if there is none.


Clear the whole Retained Message Store
    public void cleanUp(){

        if(retainedMessageStore.size() >= TOO_MANY) (1)
            retainedMessageStore.clear(); (2)
    }
1 Get the amount of retained messages that are currently stored and check if there are too many retained messages.
2 Delete all retained messages.


Read and remove specific retained messages.
    public void logAndDeleteRetainedMessage(String topic){
        Optional<RetainedMessage> rm = retainedMessageStore.getRetainedMessage(topic); (1)

        if(rm.isPresent()){
            logger.info(new String(rm.get().getMessage()));
            retainedMessageStore.remove(rm.get()); (2)
        }
    }
1 Get the retained message for a certain topic if there is any.
2 Remove the message.

Gotchas

  • When calling the getRetainedMessage method with a topic for which no retained message is stored, you will receive a Optional.absent() object instead of null. See the com.google.common.base.Optional documentation for further information about the Optional class.

Subscription Store Service

The Subscription Store is designed to programmatically interact with MQTT client subscriptions. The Subscription Store service provides methods to get, add or remove subscriptions and allows to find out which topics a certain subscriber has subscribed to. It’s also possible with the Subscription Store to find out which clients subscribed to a certain topic.

The Subscription Store allows to query for subscription information on all cluster nodes or just the local HiveMQ node.

There are separate "local" methods available in case the broker instance might be connected to a cluster and you want to gather information about this instance only. The return value of the local methods us not wrapped in a ListenableFuture. Local and cluster methods will return the same result, if the broker instance is not part of a cluster.

API

    /**
     * This method returns all subscriptions on this HiveMQ Node as a {@link com.google.common.collect.Multimap} of client identifiers and topics.
     * You won't receive subscriptions of connected
     * clients from other HiveMQ nodes if HiveMQ runs in a cluster.
     * <p/>
     * Please be aware that calling this method on HiveMQ instances with many subscriptions could have
     * negative performance effects.
     * <p/>
     * The returned Multimap is read-only and must not be modified.
     *
     * @return a {@link com.google.common.collect.Multimap} of client identifiers and their topic subscriptions
     */
    @ReadOnly
    Multimap<String, Topic> getLocalSubscriptions();

    /**
     * Returns all MQTT client subscriber identifiers for a given topic, for this HiveMQ instance.
     * MQTT Wildcards are allowed.
     * <p/>
     * Don't pass <code>null</code> as topic. This method is lenient, so
     * it will just return an empty Set.
     * <p/>
     * The returned Set is read-only and must not be modified.
     *
     * @param topic the topic
     * @return client identifiers of all subscribers that subscribed to the topic
     */
    @ReadOnly
    Set<String> getLocalSubscribers(@NotNull String topic);

    /**
     * Returns all topics a client is subscribed to, on this HiveMQ instance.
     * <p/>
     * If the client does not exist, an empty Set is returned.
     * <p/>
     * Don't pass <code>null</code> as clientId. This method is lenient, so
     * it will just return an empty Set.
     * <p/>
     * The returned Set is read-only and must not be modified.
     *
     * @param clientID of the client
     * @return all topics the client subscribed to
     */
    @ReadOnly
    Set<Topic> getLocalTopics(@NotNull String clientID);

    /**
     * This method adds a subscription for a certain client to a certain topic.
     * If HiveMQ is connected to a cluster, the subscription will be broadcast to all other Cluster Nodes.
     * <p/>
     * This method is lenient, so if the clientId or the topic
     * is <code>null</code>, nothing will happen.
     *
     * @param clientID client, which should be subscribed
     * @param topic    topic to which the client should be subscribed
     * @return A {@link com.google.common.util.concurrent.ListenableFuture} object that will succeed,
     * as soon es the subscription was added by all Cluster Nodes.
     */
    ListenableFuture<Void> addSubscription(@NotNull String clientID, @NotNull Topic topic);

    /**
     * This method removes a subscription for a certain client and a certain topic.
     * If HiveMQ is connected to a cluster, the subscription will be removed by other Cluster Nodes as well.
     *
     * @param clientID client, which should get unsubscribed
     * @param topic    topic from which the client should get unsubscribed
     * @return A {@link com.google.common.util.concurrent.ListenableFuture} object that will succeed,
     * as soon es the subscription was removed by all Cluster Nodes.
     */
    ListenableFuture<Void> removeSubscription(@NotNull String clientID, @NotNull String topic);

    /**
     * This method returns all subscriptions this HiveMQ instance and all other nodes in a HiveMQ cluster,
     * as a {@link com.google.common.collect.Multimap} of client identifiers and topics.
     * <p/>
     * Please be aware that calling this method on HiveMQ instances with many subscriptions could have
     * negative performance effects.
     * <p/>
     * The returned Multimap is read-only and must not be modified.
     *
     * @return a {@link com.google.common.collect.Multimap} of client identifiers and their topic subscriptions
     */
    @ReadOnly
    ListenableFuture<Multimap<String, Topic>> getSubscriptions();

    /**
     * Returns all MQTT client subscriber identifiers for a given topic, this HiveMQ instance and all other nodes in a HiveMQ cluster.
     * MQTT Wildcards are allowed.
     * <p/>
     * Don't pass <code>null</code> as topic. This method is lenient, so
     * it will just return an empty Set.
     * <p/>
     * The returned Set is read-only and must not be modified.
     *
     * @param topic the topic
     * @return client identifiers of all subscribers that subscribed to the topic
     */
    @ReadOnly
    ListenableFuture<Set<String>> getSubscribers(@NotNull String topic);

    /**
     * Returns all topics a client is subscribed to, on this HiveMQ instance and all other nodes in a HiveMQ cluster.
     * <p/>
     * If the client does not exist, an empty Set is returned.
     * <p/>
     * Don't pass <code>null</code> as clientId. This method is lenient, so
     * it will just return an empty Set.
     * <p/>
     * The returned Set is read-only and must not be modified.
     *
     * @param clientID of the client
     * @return all topics the client subscribed to
     */
    @ReadOnly
    ListenableFuture<Set<Topic>> getTopics(@NotNull String clientID);
All ListenableFuture methods will return immediately. Please use callbacks or use the get() method to wait for the result.

Example Usage

This section discusses common examples on how to use the Subscription Store in your own plugins.

Inject the Subscription Store
import com.hivemq.spi.services.SubscriptionStore;

import javax.inject.Inject;

public class MyPlugin {

    private final SubscriptionStore subscriptionStore;

    @Inject
    public MyPlugin(final SubscriptionStore subscriptionStore) { (1)
        this.subscriptionStore = subscriptionStore;
    }
}
1 The subscription store implementation is injected by the HiveMQ core.


Remove unwanted subscriptions
public void removeTopicSubscriptions(final Topic topicToRemove) {

    Futures.addCallback(subscriptionStore.getSubscriptions(), new FutureCallback<Multimap<String, Topic>>() { (1)

        @Override
        public void onSuccess(Multimap<String, Topic> subscriptions) { (2)(3)

            for (String clientId : subscriptions.keySet()) { (4)

                if (subscriptions.get(clientId).contains(topicToRemove)) { (5)
                    subscriptionStore.removeSubscription(clientId, topicToRemove.getTopic());
                }
            }
        }

        @Override
        public void onFailure(Throwable t) {
            t.printStackTrace();
        }
    });
}
1 The getSubscriptions method returns a ListenableFuture.
2 The callback succeeds, as soon as the subscriptions hab been collected.
3 Get a Multimap with client identifiers as keys and topic subscriptions as value.
4 Iterate all client identifiers.
5 Find all clients that subscribed to a certain topic and remove the subscriptions.


Setup a default Subscription
public class ConnackSend implements OnConnackSend {

    private final static String DEFAULT_TOPIC = "default/topic";
    private final SubscriptionStore subscriptionStore;

    @Inject
    public ConnackSend(SubscriptionStore subscriptionStore, ClientService clientService) {
        this.subscriptionStore = subscriptionStore;
    }

    @Override
    public void onConnackSend(CONNACK connack, ClientData clientData) { (1)
        subscriptionStore.addSubscription(clientData.getClientId(), new Topic(DEFAULT_TOPIC, QoS.valueOf(1))); (2)
    }

}
1 Add a subscription as soon as a client connected successfully.
2 As soon as a subscription is added, the MQTT client will receive all messages published to the topic.


List Subscribers for a certain topic
private void logSubscribers(String topic) {

    Set<String> subscribers = subscriptionStore.getLocalSubscribers(topic); (1)
    for (String clientId : subscribers) {
        logger.info(clientId);
    }
}
1 Get all subscribers for a certain topic, on this broker instance only.

Gotchas

  • The Multimap class is part of the com.google.common.collect API. See the documentation for further information.

  • Topic class objects are equal if their topics are equal, independent of their quality of service (QoS) levels.

  • Subscriptions added in the OnConnectCallback for the connecting clients are removed after the callback execution, if the client is requesting a clean session. Consider using the OnConnackSend callback instead.


Client Service

The Client Service allows plugins to gather information about clients, like whether a client is currently connected or not, its username, client identifier and SSL/TLS certificate, if the client is anonymous or if the client is authenticated.

The Client Service allows to query for client information on all cluster nodes or just the local HiveMQ node.

There are separate "local" methods available in case the broker instance might be connected to a cluster and you want to gather information about this instance only. The return value of the local methods are not wrapped in a ListenableFuture. In case the HiveMQ instance is not connected to a cluster, both methods will return the same results.

API

    /**
     * Returns all identifiers of connected clients of this HiveMQ Node. You won't receive client identifiers of connected
     * clients from other HiveMQ nodes if HiveMQ runs in a cluster.
     *

     * If you have many client connections, please not that calling this method frequently could have negative performance
     * effects.
     *
     * @return client identifiers of all connected clients
     */
    Set<string> getLocalConnectedClients();

    /**
     * Returns all disconnected clients which have a persistent MQTT session on this instance of HiveMQ (MQTT clean session=false).
     * </string>

     * Disconnected MQTT clients which don't have a persistent session won't be returned by this method
     *
     * @return all disconnected clients with a persistent MQTT session
     */
    Set<string> getLocalDisconnectedClients();

    /**
     * Check if a client with a given identifier is currently connected to this HiveMQ instance.
     *
     * @param clientId client, which should be checked
     * @return true, if a certain client is currently connected and false otherwise
     */
    boolean isClientConnectedLocal(String clientId);

    /**
     * Returns client information for clients that are connected to this broker instance.
     * </string>

     * If the client isn't connected, you will receive an {@link Optional} with absent data.
     *
     * @param clientId the client identifier of the client
     * @return {@link ClientData} for a specific client.
     */
    Optional<clientdata> getLocalClientDataForClientId(String clientId);

    /**
     * Returns all identifiers of connected clients of this HiveMQ instance and all other nodes in a HiveMQ cluster
     * </clientdata>

     * Calling this method frequently in a clustered environment could have negative performance effects.
     *
     * @return client identifiers of all connected clients
     */
    ListenableFuture<set<string>> getConnectedClients();

    /**
     * Returns all disconnected clients which have a persistent MQTT session on this broker or any other cluster node.
     * </set<string>

     * Disconnected MQTT clients which don't have a persistent session won't be returned by this method
     *
     * @return all disconnected clients with a persistent MQTT session
     */
    ListenableFuture<set<string>> getDisconnectedClients();

    /**
     * Check if a client with a given identifier is currently connected to this HiveMQ broker instance or any other instance in the cluster.
     *
     * @param clientId client, which should be checked
     * @return true, if a certain client is currently connected and false otherwise
     */
    ListenableFuture<boolean> isClientConnected(final String clientId);

    /**
     * Returns additional client information about a given client with a given client identifier.
     * </boolean></set<string>

     * This method will also get client information from other cluster nodes if needed.
     *

     * If the client isn't connected, you will receive an {@link Optional} with absent data.
     *
     * @param clientId the client identifier of the client
     * @return {@link ClientData} for a specific client.
     */
    ListenableFuture<optional<clientdata>> getClientDataForClientId(final String clientId);

Example Usage

This section discusses common examples on how to use the Client Service in your own plugins.

Inject the Client Server
import com.hivemq.spi.services.ClientService;

import javax.inject.Inject;

public class MyPlugin {

    private final ClientService clientService;

    @Inject
    public MyPlugin(final ClientService clientService) { (1)
        this.clientService = clientService;
    }
}
1 The client service implementation is injected by the HiveMQ core.


Log Client Information
private void logClientData(String clientId) {

    Futures.addCallback(clientService.getClientDataForClientId(clientId), new FutureCallback<optional<clientdata>>() {

        @Override
        public void onSuccess(Optional<clientdata> result) {

            if (result.isPresent()) {
                ClientData clientData = result.get();
                logger.info(clientData.getUsername() + " " + clientData.getClientId() + " " + clientData.isAuthenticated());
            }
        }

        @Override
        public void onFailure(Throwable t) {
            t.printStackTrace();
        }
    });
}
List Connected Clients
private void logConnectedClients() {
    Futures.addCallback(clientService.getConnectedClients(), new FutureCallback<set<string>>() {
        @Override
        public void onSuccess(Set<string> connectedClients) {
            for (String connectedClient : connectedClients) {
                logger.info("client {} is currently connected.", connectedClient);
            }
        }

        @Override
        public void onFailure(Throwable t) {
            t.printStackTrace();
        }
    });
}
Gotchas
  • If a MQTT client is connected with cleanSession=false, all information about the client will be lost as soon as the client disconnects. This means the client will not be considered as "disconnected client" for the getDisconnectedClients method.

Metric Service

HiveMQ 3 introduced a new, holistic monitoring model. While HiveMQ 2 recalculated metrics at specific points of time, HiveMQ 3 metrics are always current and accurate.

HiveMQ uses the de-facto standard Java metrics library Dropwizard Metrics, so plugin developers can benefit from the whole ecosystem about that library and writing custom integrations is a breeze.

The MetricService of HiveMQ can be used to gather information about pre-defined HiveMQ metrics or can be used to hook in custom metrics defined by your plugin. There’s no artificial wrapper, so you can use the feature-rich Dropwizard Metrics library directly.

For gathering pre-defined HiveMQ metrics, the constant class com.hivemq.spi.metrics.HiveMQMetrics can be used.

API

    /**
     * Returns a specific HiveMQ metric. If the metric does not exist, this method will return
     * <code>null</code>.
     * <p/>
     * For a list of all available metrics, refer to the {@link com.hivemq.spi.metrics.HiveMQMetrics} constant class.
     *
     * @param metric the metric
     * @param <T>    the metric type
     * @return the metric (if available) or <code>null</code>
     */
    @Nullable
    <T extends Metric> T getHiveMQMetric(HiveMQMetric<T> metric);

    /**
     * Returns the metric registry of HiveMQ.
     *
     * @return the metric registry
     */
    MetricRegistry getMetricRegistry();

Example Usage

Monitor specific metrics
private void logConnectionsHistogram() {
    Histogram connections = metricService.getHiveMQMetric(HiveMQMetrics.CONNECTIONS_OVERALL_MEAN);

    final Snapshot snapshot = connections.getSnapshot();

    logger.info("Mean: {}ms", snapshot.getMean() / 1000 / 1000);
    logger.info("StdDev: {}ms", snapshot.getStdDev() / 1000 / 1000);
    logger.info("Min: {}ms", snapshot.getMin() / 1000 / 1000);
    logger.info("Max: {}ms", snapshot.getMax() / 1000 / 1000);
    logger.info("Median: {}ms", snapshot.getMedian() / 1000 / 1000);
    logger.info("75th percentile: {}ms", snapshot.get75thPercentile() / 1000 / 1000);
    logger.info("95th percentile: {}ms", snapshot.get95thPercentile() / 1000 / 1000);
    logger.info("98th percentile: {}ms", snapshot.get98thPercentile() / 1000 / 1000);
    logger.info("99th percentile: {}ms", snapshot.get99thPercentile() / 1000 / 1000);
    logger.info("999th percentile: {}ms", snapshot.get999thPercentile() / 1000 / 1000);
}
Add custom metrics
private void addCustomMetrics() {

    //Adding different metric types
    final Meter meter = metricService.getMetricRegistry().meter("my-meter");
    final Timer timer = metricService.getMetricRegistry().timer("my-timer");
    final Counter counter = metricService.getMetricRegistry().counter("my-counter");
    final Histogram histogram = metricService.getMetricRegistry().histogram("my-histogram");


    //Remove a metric
    metricService.getMetricRegistry().remove("my-meter");
}

Shared Subscription Service

HiveMQ 3 brings a new concept for load balancing client subscriptions to multiple clients with Shared Subscriptions. The concept is explained in the Official User Guide.

These Shared Subscriptions can be added and removed at runtime.

API

@ThreadSafe
public interface SharedSubscriptionService {

    /**
     * Adds shared subscriptions to the SharedSubscriptionService.
     *

     * Although the contract of this method is to disallow null values, this method is lenient.
     * If ``null`` values are passed, these values are ignored.
     *
     * @param sharedSubscriptions the shared subscriptions to add
     */
    void addSharedSubscriptions(@NotNull final String... sharedSubscriptions);

    /**
     * Returns ``true`` if shared subscriptions are available, ``false`` otherwise
     *
     * @return ``true`` if shared subscriptions are available, ``false`` otherwise
     */
    boolean sharedSubscriptionsAvailable();

    /**
     * Removes a shared subscription from the SharedSubscriptionService.
     *

     * Although the contract of this method is to disallow null values, this method is lenient.
     * If a ``null`` value is passed, the value is ignored.
     *
     * @param sharedSubscription the shared subscription to remove
     */
    void removeSharedSubscription(@NotNull final String sharedSubscription);

    /**
     * Returns all shared subscriptions. The returned List is read-only representation
     * of all shared subscriptions
     *
     * @return a read-only List of all shared subscriptions
     */
    @ReadOnly
    List<string> getSharedSubscriptions();

    /**
     * Returns the number of all shared subscriptions.
     *
     * @return the number of all shared subscriptions
     */
    long getSharedSubscriptionsSize();
}

Example Usage

Inspect all registered Shared Subscriptions
private final SharedSubscriptionService sharedSubscriptionService;

@Inject
public MyCallback(SharedSubscriptionService sharedSubscriptionService){
    this.sharedSubscriptionService = sharedSubscriptionService
}

@PostConstruct
public void init(){
    listSharedSubscriptions();
}

public void listSharedSubscriptions(){
    final List<string> sharedSubscriptions = sharedSubscriptionService.getSharedSubscriptions();
    log.info("The following shared subscriptions were registered:", sharedSubscriptions.toString());
}
Adding Shared Subscriptions
public void addSharedSubscriptions(){
   sharedSubscriptionService.addSharedSubscriptions("first/shared", "second/shared");
}
Removing Shared Subscriptions
public void removeSharedSubscription(){
    sharedSubscriptionService.removeSharedSubscription("my/shared/subscription");
}

</string></string>

Packaging and Deployment

If you want to package the plugin and deploy it to your production environment, just follow these steps:

  • Execute the Maven goal package without the RunWithHiveMQ Profile (see Maven Plugin usage)

  • Find the jar file following this naming convertion <artifactId>-<version>.jar in your project build folder, usually target

  • Copy the jar file and all configuration files (if needed) to the HiveMQ plugin folder (<HiveMQHome>/plugins) in you production environment

  • Start HiveMQ …​ Done!


1. Actually HiveMQ just utilizes the META-INF/services/ file approach and uses an enhanced service loader mechanism which allows dependency injection in plugins