Services

HiveMQ Services provide a convenient way to interact with the HiveMQ core from plugins.

Table 1. Available HiveMQ Services
Name Available since Description

Plugin Executor Service

3.0

Executor Service to run blocking or periodical tasks.

Blocking Session Attribute Store

3.3

Gives access to the MQTT Session-scoped attribute store.

Async Session Attribute Store

3.3

Gives access to the MQTT Session-scoped attribute store.

Blocking Connection Attribute Store

3.3

The Connection Attribute Store enables HiveMQ plugins to add, get and remove Connection Attributes for clients.

Async Connection Attribute Store

3.3

The Connection Attribute Store enables HiveMQ plugins to add, get and remove Connection Attributes for clients.

Blocking Retained Message Store

3.1

Gives access to retained messages and lets plugins read, create, delete and modify retained messages.

Async Retained Message Store

3.1

Gives access to retained messages and lets plugins read, create, delete and modify retained messages.

Blocking Subscription Store

3.1

Gives access to subscriptions and lets plugins read, create, delete and modify subscriptions.

Async Subscription Store

3.1

Gives access to subscriptions and lets plugins read, create, delete and modify subscriptions.

Blocking Client Service

3.1

Gives access to information about connected clients and clients with a persistent session.

Async Client Service

3.1

Gives access to information about connected clients and clients with a persistent session.

Blocking Client Group Service

3.3

Allows to organize multiple clients into custom groups for a fast lookup of group members.

Async Client Group Service

3.3

Allows to organize multiple clients into custom groups for a fast lookup of group members.

Publish Service

1.5

Allows to publish new MQTT messages to subscribed clients.

$SYS Topic Service

2.0

Allows to query, add, remove and modify $SYS Topics at runtime.

Log Service

3.0

Allows to change the HiveMQ Log level at runtime.

Blocking Metric Service

3.1

Allows to inspect and add custom HiveMQ metrics at runtime.

Async Metric Service

3.1

Allows to inspect and add custom HiveMQ metrics at runtime.

Configuration Service

3.0

Allows to change HiveMQ configuration at runtime.

REST Service

3.0

Allows to add custom HiveMQ resources at runtime.

There are more services planned in the future. Please contact us if you have specific needs for new services.

Some of the old services are deprecated since 3.1 and will be removed for HiveMQ 4.0. Avoid using these services in future development. The documentation of these services is still available here.
Inject Services!
At the moment it’s only possible to use the HiveMQ Services in your plugin if you inject them via Dependency Injection.

Blocking vs Async Services

For some services there is a blocking and async version of the service available. When a method of a blocking service is called, it will return as soon as the results are available. The methods of the async services return a ListenableFuture object which allows the developer to add a callback method that is called as soon as the results are available. It is highly recommended to use the async services to avoid performance losses.

ListenableFuture

Some of the plugins services are using com.google.common.util.concurrent.ListenableFuture object as a return value. The ListenableFuture objects enable the developer to add callbacks that are executed as soon as the method is finished.

Example Usage

Add callback to ListenableFuture
ListenableFuture<Boolean> future = someMethod();
Futures.addCallback(future, new FutureCallback<Boolean>() {
    @Override
    public void onSuccess(Boolean result) {
        log.info("'someMethod' succeeded. Result is {}", result);
    }

    @Override
    public void onFailure(Throwable throwable) {
        throwable.printStackTrace();
    }
});

For more information about Listenable Future see the documentation.

Standard vs Local Methods

Some services like the RetainedMessageStore, ClientService or the SubscriptionStore have most methods twice. Once in a standard variety and once with the Local infix. An example for this is below:

public interface AsyncClientService {
    ...
    ListenableFuture<Set<String>> getConnectedClients();

    ListenableFuture<Set<String>> getLocalConnectedClients();
    ...
}

If HiveMQ is not in Cluster-Mode both method behave effectively in the same way. Once clustering is enabled though the difference is quite significant. The scope of the Local methods is only the current node in which the method is called, whereas the standard method queries all known nodes in the current cluster view and returns an accumulation of all results.

In large cluster configurations or if called frequently this can have a negative performance impact.

Session Attribute Store

The Session Attribute Store enables HiveMQ plugins to add, get and remove Session Attributes for clients.

Session Attributes are stored for the duration of the MQTT Session of the client. When the session TTL expires, the Session Attributes are not removed immediately, but at a later time when HiveMQ’s internal cleanup-jobs run.

Session Attributes are cluster wide accessible and are replicated for fault tolerance. They can be configured to be stored either in memory or on disk.

The Session Attribute Store is only available for clients which already have a session available. When a client connects, the earliest point in time to use the Session Attribute Store for the client is the OnSessionReadyCallback.

Blocking API

Blocking Session Attribute Store API
public interface BlockingSessionAttributeStore {

    /**
     * Sets the given session attribute for a persistent client.
     *
     * @param clientId the clientId of a persistent client.
     * @param key      the key of the session attribute.
     * @param value    the value of the session attribute.
     * @throws NoSuchClientIdException if no session for the given clientId exists.
     * @throws IncompatibleHiveMQVersionException if a node with a version lower than 3.3.0 exists in the cluster.
     */
    void put(@NotNull String clientId, @NotNull String key, @NotNull byte[] value) throws NoSuchClientIdException;

    /**
     * Sets the given session attribute for a persistent client if the timestamp of the attribute is newer than the timestamp of the already stored attribute.
     *
     * @param clientId   the clientId of a persistent client.
     * @param key        the key of the session attribute.
     * @param value      the value of the session attribute.
     * @param timestamp  the timestamp of the session attribute.
     * @return {@link OptionalAttribute} with the previous associated value or null and a boolean if a value was replaced or not.
     * failing with a {@link com.hivemq.spi.services.exception.NoSuchClientIdException} if no session for the given clientId exists,
     * failing with a {@link com.hivemq.spi.services.exception.IncompatibleHiveMQVersionException} if a node with a version lower than 3.4.0 exists in the cluster.
     * failing with a {@link com.hivemq.spi.services.exception.RateLimitExceededException} if the plugin service rate limit was exceeded.
     */
    @NotNull
    OptionalAttribute putIfNewer(@NotNull String clientId, @NotNull String key, @NotNull byte[] value, long timestamp) throws NoSuchClientIdException;

    /**
     * Sets the given session attribute for a persistent client if the timestamp of the attribute is newer than or equal to the timestamp of the already stored attribute.
     *
     * @param clientId   the clientId of a persistent client.
     * @param key        the key of the session attribute.
     * @param value      the value of the session attribute.
     * @param timestamp  the timestamp of the session attribute.
     * @return {@link OptionalAttribute} with the previous associated value or null and a boolean if a value was replaced or not.
     * failing with a {@link com.hivemq.spi.services.exception.NoSuchClientIdException} if no session for the given clientId exists,
     * failing with a {@link com.hivemq.spi.services.exception.IncompatibleHiveMQVersionException} if a node with a version lower than 3.4.0 exists in the cluster.
     * failing with a {@link com.hivemq.spi.services.exception.RateLimitExceededException} if the plugin service rate limit was exceeded.
     */
    @NotNull
    OptionalAttribute putIfNewerOrEquals(@NotNull String clientId, @NotNull String key, @NotNull byte[] value, long timestamp) throws NoSuchClientIdException;

    /**
     * Sets the given session attribute for a persistent client.
     *
     * @param clientId the clientId of a persistent client.
     * @param key      the key of the session attribute.
     * @param value    the value of the session attribute as a string.
     * @throws NoSuchClientIdException if no session for the given clientId exists.
     * @throws IncompatibleHiveMQVersionException if a node with a version lower than 3.3.0 exists in the cluster.
     */
    void putAsString(@NotNull String clientId, @NotNull String key, @NotNull String value) throws NoSuchClientIdException;

    /**
     * Sets the given session attribute for a persistent client.
     *
     * @param clientId the clientId of a persistent client.
     * @param key      the key of the session attribute.
     * @param value    the value of the session attribute as a string.
     * @param charset  the {@link Charset} of the given value.
     * @throws NoSuchClientIdException if no session for the given clientId exists.
     * @throws IncompatibleHiveMQVersionException if a node with a version lower than 3.3.0 exists in the cluster.
     */
    void putAsString(@NotNull String clientId, @NotNull String key, @NotNull String value, @NotNull Charset charset) throws NoSuchClientIdException;

    /**
     * Retrieves the value of the session attribute with the given key for a persistent client.
     *
     * @param clientId the clientId of a persistent client.
     * @param key      the key of the session attribute.
     * @return an {@link Optional} containing the value of the session attribute if present.
     * @throws NoSuchClientIdException if no session for the given clientId exists.
     * @throws IncompatibleHiveMQVersionException if a node with a version lower than 3.3.0 exists in the cluster.
     */
    @NotNull
    Optional<byte[]> get(@NotNull String clientId, @NotNull String key) throws NoSuchClientIdException;

    /**
     * Retrieves the value of the session attribute with the given key for a persistent client.
     *
     * @param clientId the clientId of a persistent client.
     * @param key      the key of the session attribute.
     * @return an {@link Optional} containing the value of the session attribute if present.
     * @throws NoSuchClientIdException if no session for the given clientId exists.
     * @throws IncompatibleHiveMQVersionException if a node with a version lower than 3.3.0 exists in the cluster.
     */
    @NotNull
    Optional<String> getAsString(@NotNull String clientId, @NotNull String key) throws NoSuchClientIdException;

    /**
     * Retrieves the value of the session attribute with the given key for a persistent client.
     *
     * @param clientId the clientId of a persistent client.
     * @param key      the key of the session attribute.
     * @param charset  the {@link Charset} of the given value.
     * @return an {@link Optional} containing the value of the session attribute if present.
     * @throws NoSuchClientIdException if no session for the given clientId exists.
     * @throws IncompatibleHiveMQVersionException if a node with a version lower than 3.3.0 exists in the cluster.
     */
    @NotNull
    Optional<String> getAsString(@NotNull String clientId, @NotNull String key, @NotNull Charset charset) throws NoSuchClientIdException;

    /**
     * Removes the session attribute with the given key for a persistent client.
     *
     * @param clientId the clientId of a persistent client.
     * @param key      the key of the session attribute.
     * @return an {@link Optional} containing the value of the removed session attribute if it was present.
     * @throws NoSuchClientIdException if no session for the given clientId exists.
     * @throws IncompatibleHiveMQVersionException if a node with a version lower than 3.3.0 exists in the cluster.
     */
    @NotNull
    Optional<byte[]> remove(@NotNull String clientId, @NotNull String key) throws NoSuchClientIdException;

    /**
     * Retrieves all session attributes for a persistent client.
     *
     * @param clientId the clientId of a persistent client.
     * @return a Future of an {@link Optional} containing all session attributes as a map of key and value pairs if present.
     * @throws NoSuchClientIdException if no session for the given clientId exists.
     * @throws IncompatibleHiveMQVersionException if a node with a version lower than 3.3.0 exists in the cluster.
     */
    @NotNull
    Optional<ImmutableMap<String, byte[]>> getAll(@NotNull String clientId) throws NoSuchClientIdException;

    /**
     * Clears all session attributes for the connected client.
     *
     * @param clientId the clientId of a persistent client.
     * @throws NoSuchClientIdException if no session for the given clientId exists.
     * @throws IncompatibleHiveMQVersionException if a node with a version lower than 3.3.0 exists in the cluster.
     */
    void clear(@NotNull String clientId) throws NoSuchClientIdException;
}

Example Usage

Set Session Attribute
public class SetSessionAttributeCallback implements OnPublishReceivedCallback {

    private final BlockingSessionAttributeStore sessionAttributeStore;

    @Inject
    SetAttributeCallback(final BlockingSessionAttributeStore sessionAttributeStore) {
        this.sessionAttributeStore = sessionAttributeStore; (1)
    }

    @Override
    public void onPublishReceived(final PUBLISH publish, final ClientData clientData) throws OnPublishReceivedException {
        final String clientId = clientData.getClientId();
        try {
            if (isSpecialClient(clientId)){
               sessionAttributeStore.putAsString(clientId, "session.token", "[...]", Charset.forName("UTF-8")); (2)
            }
        } catch (final NoSuchClientIdException noSuchClient) {
            ...
        } catch (final IncompatibleHiveMQVersionException incompatibleHiveMQ) {
            ...
        }
    }

    @Override
    public int priority() {
        return CallbackPriority.MEDIUM;
    }
}
1 Inject the BlockingSessionAttributeStore.
2 Set a Session Attribute for the client.
Replace Session Attribute If Newer
public class SetSessionAttributeCallback implements OnPublishReceivedCallback {

    private final BlockingSessionAttributeStore sessionAttributeStore;

    @Inject
    SetAttributeCallback(final BlockingSessionAttributeStore sessionAttributeStore) {
        this.sessionAttributeStore = sessionAttributeStore; (1)
    }

    @Override
    public void onPublishReceived(final PUBLISH publish, final ClientData clientData) throws OnPublishReceivedException {
        final String clientId = clientData.getClientId();
        try {
            if (isSpecialClient(clientId)){

               // optional.getValue() = null
               // optional.isReplaced() = true
               OptionalAttribute optional = sessionAttributeStore.putIfNewer(clientId, "session.token", "test".getBytes(), 100); (2)

               // optional.getValue() = null
               // optional.isReplaced() = false
               optional = sessionAttributeStore.putIfNewer(clientId, "session.token", "test1".getBytes(), 10); (3)

               // new String(optional.getValue()) = "test"
               // optional.isReplaced() = true
               optional = sessionAttributeStore.putIfNewer(clientId, "session.token", "test2".getBytes(), 1000); (4)
            }
        } catch (final NoSuchClientIdException noSuchClient) {
            ...
        } catch (final IncompatibleHiveMQVersionException incompatibleHiveMQ) {
            ...
        }
    }

    @Override
    public int priority() {
        return CallbackPriority.MEDIUM;
    }
}
1 Inject the BlockingSessionAttributeStore.
2 Put a Session Attribute for the client.
3 Put a Session Attribute for the client with an older timestamp.
4 Put a Session Attribute for the client with an newer timestamp.
Get and delete Session Attribute
public class UseSessionAttributeCallback implements BeforePublishSendCallback {

    private final BlockingSessionAttributeStore sessionAttributeStore;

    @Inject
    UseSessionAttributeCallback(final BlockingSessionAttributeStore sessionAttributeStore) {
        this.sessionAttributeStore = sessionAttributeStore; (1)
    }

    @Override
    public void beforePublishSend(final ModifiablePUBLISH publish, final ClientData clientData) throws BeforePublishSendException {

        final String clientId = clientData.getClientId();
        try {
            final Optional<byte[]> optionalAttribute = sessionAttributeStore.get(clientId, "session.token"); (2)
            if (optionalAttribute.isPresent()) {
                publish.setPayload(optionalAttribute.get());
            } else {
                sessionAttributeStore.clear(clientId); (3)
            }
        } catch (final NoSuchClientIdException noSuchClient) {
            ...
        } catch (final IncompatibleHiveMQVersionException incompatibleHiveMQ) {
            ...
        }
    }

    @Override
    public int priority() {
        return CallbackPriority.MEDIUM;
    }
}
1 Inject the BlockingSessionAttributeStore.
2 Get a Session Attribute for the client.
3 Clear all Session Attributes for the client.

Async API

Async Session Attribute Store API
public interface AsyncSessionAttributeStore {

    /**
     * Sets the given session attribute for a persistent client.
     *
     * @param clientId the clientId of a persistent client.
     * @param key      the key of the session attribute.
     * @param value    the value of the session attribute.
     * @return a {@link ListenableFuture} indicating success or
     * failing with a {@link com.hivemq.spi.services.exception.NoSuchClientIdException} if no session for the given clientId exists,
     * failing with a {@link com.hivemq.spi.services.exception.IncompatibleHiveMQVersionException} if a node with a version lower than 3.3.0 exists in the cluster.
     */
    @NotNull
    ListenableFuture<Void> put(@NotNull String clientId, @NotNull String key, @NotNull byte[] value);

    /**
     * Sets the given session attribute for a persistent client if the timestamp of the attribute is newer than the timestamp of the already stored attribute.
     *
     * @param clientId   the clientId of a persistent client.
     * @param key        the key of the session attribute.
     * @param value      the value of the session attribute.
     * @param timestamp  the timestamp of the session attribute.
     * @return a {@link ListenableFuture<OptionalAttribute>} with the previous associated value or null and a boolean if a value was replaced or not.
     * failing with a {@link com.hivemq.spi.services.exception.NoSuchClientIdException} if no session for the given clientId exists,
     * failing with a {@link com.hivemq.spi.services.exception.IncompatibleHiveMQVersionException} if a node with a version lower than 3.4.0 exists in the cluster.
     * failing with a {@link com.hivemq.spi.services.exception.RateLimitExceededException} if the plugin service rate limit was exceeded.
     */
    @NotNull
    ListenableFuture<OptionalAttribute> putIfNewer(@NotNull String clientId, @NotNull String key, @NotNull byte[] value, long timestamp);

    /**
     * Sets the given session attribute for a persistent client if the timestamp of the attribute is newer than or equal to the timestamp of the already stored attribute.
     *
     * @param clientId   the clientId of a persistent client.
     * @param key        the key of the session attribute.
     * @param value      the value of the session attribute.
     * @param timestamp  the timestamp of the session attribute.
     * @return a {@link ListenableFuture<OptionalAttribute>} with the previous associated value or null and a boolean if a value was replaced or not.
     * failing with a {@link com.hivemq.spi.services.exception.NoSuchClientIdException} if no session for the given clientId exists,
     * failing with a {@link com.hivemq.spi.services.exception.IncompatibleHiveMQVersionException} if a node with a version lower than 3.4.0 exists in the cluster.
     * failing with a {@link com.hivemq.spi.services.exception.RateLimitExceededException} if the plugin service rate limit was exceeded.
     */
    @NotNull
    ListenableFuture<OptionalAttribute> putIfNewerOrEquals(@NotNull String clientId, @NotNull String key, @NotNull byte[] value, long timestamp);

    /**
     * Sets the given session attribute for a persistent client.
     *
     * @param clientId the clientId of a persistent client.
     * @param key      the key of the session attribute.
     * @param value    the value of the session attribute as a string.
     * @return a {@link ListenableFuture} indicating success or
     * failing with a {@link com.hivemq.spi.services.exception.NoSuchClientIdException} if no session for the given clientId exists,
     * failing with a {@link com.hivemq.spi.services.exception.IncompatibleHiveMQVersionException} if a node with a version lower than 3.3.0 exists in the cluster.
     */
    @NotNull
    ListenableFuture<Void> putAsString(@NotNull String clientId, @NotNull String key, @NotNull String value);

    /**
     * Sets the given session attribute for a persistent client.
     *
     * @param clientId the clientId of a persistent client.
     * @param key      the key of the session attribute.
     * @param value    the value of the session attribute as a string.
     * @param charset  the {@link Charset} of the given value.
     * @return a {@link ListenableFuture} indicating success or
     * failing with a {@link com.hivemq.spi.services.exception.NoSuchClientIdException} if no session for the given clientId exists,
     * failing with a {@link com.hivemq.spi.services.exception.IncompatibleHiveMQVersionException} if a node with a version lower than 3.3.0 exists in the cluster.
     */
    @NotNull
    ListenableFuture<Void> putAsString(@NotNull String clientId, @NotNull String key, @NotNull String value, @NotNull Charset charset);

    /**
     * Retrieves the value of the session attribute with the given key for a persistent client.
     *
     * @param clientId the clientId of a persistent client.
     * @param key      the key of the session attribute.
     * @return a {@link ListenableFuture} indicating success or
     * failing with a {@link com.hivemq.spi.services.exception.NoSuchClientIdException} if no session for the given clientId exists,
     * failing with a {@link com.hivemq.spi.services.exception.IncompatibleHiveMQVersionException} if a node with a version lower than 3.3.0 exists in the cluster.
     */
    @NotNull
    ListenableFuture<Optional<byte[]>> get(@NotNull String clientId, @NotNull String key);

    /**
     * Retrieves the value of the session attribute with the given key for a persistent client.
     *
     * @param clientId the clientId of a persistent client.
     * @param key      the key of the session attribute.
     * @return a {@link ListenableFuture} indicating success or
     * failing with a {@link com.hivemq.spi.services.exception.NoSuchClientIdException} if no session for the given clientId exists,
     * failing with a {@link com.hivemq.spi.services.exception.IncompatibleHiveMQVersionException} if a node with a version lower than 3.3.0 exists in the cluster.
     */
    @NotNull
    ListenableFuture<Optional<String>> getAsString(@NotNull String clientId, @NotNull String key);

    /**
     * Retrieves the value of the session attribute with the given key for a persistent client.
     *
     * @param clientId the clientId of a persistent client.
     * @param key      the key of the session attribute.
     * @param charset  the {@link Charset} of the given value.
     * @return a {@link ListenableFuture} indicating success or
     * failing with a {@link com.hivemq.spi.services.exception.NoSuchClientIdException} if no session for the given clientId exists,
     * failing with a {@link com.hivemq.spi.services.exception.IncompatibleHiveMQVersionException} if a node with a version lower than 3.3.0 exists in the cluster.
     */
    @NotNull
    ListenableFuture<Optional<String>> getAsString(@NotNull String clientId, @NotNull String key, @NotNull Charset charset);


    /**
     * Removes the session attribute with the given key for a persistent client.
     *
     * @param clientId the clientId of a persistent client.
     * @param key      the key of the session attribute.
     * @return a {@link ListenableFuture} indicating success or
     * failing with a {@link com.hivemq.spi.services.exception.NoSuchClientIdException} if no session for the given clientId exists,
     * failing with a {@link com.hivemq.spi.services.exception.IncompatibleHiveMQVersionException} if a node with a version lower than 3.3.0 exists in the cluster.
     */
    @NotNull
    ListenableFuture<Optional<byte[]>> remove(@NotNull String clientId, @NotNull String key);

    /**
     * Retrieves all session attributes for a persistent client.
     *
     * @param clientId the clientId of a persistent client.
     * @return a {@link ListenableFuture} indicating success or
     * failing with a {@link com.hivemq.spi.services.exception.NoSuchClientIdException} if no session for the given clientId exists,
     * failing with a {@link com.hivemq.spi.services.exception.IncompatibleHiveMQVersionException} if a node with a version lower than 3.3.0 exists in the cluster.
     */
    @NotNull
    ListenableFuture<Optional<ImmutableMap<String, byte[]>>> getAll(@NotNull String clientId);

    /**
     * Clears all session attributes for a persistent client.
     *
     * @param clientId the clientId of a persistent client.
     * @return a {@link ListenableFuture} indicating success or
     * failing with a {@link com.hivemq.spi.services.exception.NoSuchClientIdException} if no session for the given clientId exists,
     * failing with a {@link com.hivemq.spi.services.exception.IncompatibleHiveMQVersionException} if a node with a version lower than 3.3.0 exists in the cluster.
     */
    @NotNull
    ListenableFuture<Void> clear(@NotNull String clientId);
}

Example Usage

Set Session Attribute
public class SetSessionAttributeCallback implements OnPublishReceivedCallback {

    private static final Logger log = LoggerFactory.getLogger(SetSessionAttributeCallback.class);

    private final AsyncSessionAttributeStore sessionAttributeStore;
    private final PluginExecutorService pluginExecutorService;

    @Inject
    SetSessionAttributeCallback(final AsyncSessionAttributeStore sessionAttributeStore,
                                final PluginExecutorService pluginExecutorService) {
        this.sessionAttributeStore = sessionAttributeStore; (1)
        this.pluginExecutorService = pluginExecutorService;
    }

    @Override
    public void onPublishReceived(final PUBLISH publish, final ClientData clientData) throws OnPublishReceivedException {
        final String clientId = clientData.getClientId();
        final ListenableFuture<Void> listenableFuture = sessionAttributeStore.put(clientId, publish.getTopic(), publish.getPayload()); (2)
        Futures.addCallback(listenableFuture, new FutureCallback<Void>() {
            @Override
            public void onSuccess(@Nullable final Void result) {
                log.trace("Successfully set last-payload for client: {}", clientId);
            }

            @Override
            public void onFailure(final Throwable t) {
                log.error("Failed to set last-payload for client: {}", clientId, t);
            }
        }, pluginExecutorService); (3)

    }

    @Override
    public int priority() {
        return CallbackPriority.MEDIUM;
    }
}
1 Inject the AsyncSessionAttributeStore and the PluginExecutorService.
2 Set a Session Attribute.
3 Listen on the future and handle error case.
Put Session Attribute if newer or equal
public class SetSessionAttributeCallback implements OnPublishReceivedCallback {

    private static final Logger log = LoggerFactory.getLogger(SetSessionAttributeCallback.class);

    private final AsyncSessionAttributeStore sessionAttributeStore;
    private final PluginExecutorService pluginExecutorService;

    @Inject
    SetSessionAttributeCallback(final AsyncSessionAttributeStore sessionAttributeStore,
                                final PluginExecutorService pluginExecutorService) {
        this.sessionAttributeStore = sessionAttributeStore; (1)
        this.pluginExecutorService = pluginExecutorService;
    }

    @Override
    public void onPublishReceived(final PUBLISH publish, final ClientData clientData) throws OnPublishReceivedException {
        final String clientId = clientData.getClientId();
        final ListenableFuture<OptionalAttribute> listenableFuture = sessionAttributeStore.putIfNewerOrEquals(clientId, publish.getTopic(), publish.getPayload(), System.currentTimeMillis()); (2)
        Futures.addCallback(listenableFuture, new FutureCallback<OptionalAttribute>() {
            @Override
            public void onSuccess(@Nullable final OptionalAttribute result) {

                //value was replaced so getValue() is not null
                if(result.getValue() != null){
                    log.trace("Successfully updated last-payload for client: {}", clientId);

                //no previous value was set, so getValue() == null, but a new Value is inserted.
                } else if (result.isReplaced()){
                    log.trace("Successfully set last-payload for client: {}", clientId);

                //previous value was set, but could not be replaced. It was newer and not equal.
                } else {
                    log.trace("Update was not possible, because previous value was newer for client: {}", clientId);
                }
            }

            @Override
            public void onFailure(final Throwable t) {
                log.error("Failed to set last-payload for client: {}", clientId, t);
            }
        }, pluginExecutorService); (3)
    }

    @Override
    public int priority() {
        return CallbackPriority.MEDIUM;
    }
}
1 Inject the AsyncSessionAttributeStore and the PluginExecutorService.
2 Put a Session Attribute if newer or equal.
3 Listen on the future and handle error case.
Use Session Attribute
public class UseSessionAttributeCallback implements OnSubscribeCallback {

    private static final Logger log = LoggerFactory.getLogger(UseSessionAttributeCallback.class);

    private final AsyncSessionAttributeStore sessionAttributeStore;
    private final PluginExecutorService pluginExecutorService;

    @Inject
    UseSessionAttributeCallback(final AsyncSessionAttributeStore sessionAttributeStore,
                                final PluginExecutorService pluginExecutorService) {
        this.sessionAttributeStore = sessionAttributeStore; (1)
        this.pluginExecutorService = pluginExecutorService;
    }


    @Override
    public void onSubscribe(final SUBSCRIBE message, final ClientData clientData) throws InvalidSubscriptionException {
        final List<Topic> topics = message.getTopics();
        for (final Topic topic : topics) {
            final String clientId = clientData.getClientId();
            final String topicString = topic.getTopic();
            final ListenableFuture<Optional<byte[]>> listenableFuture = sessionAttributeStore.get(clientId, topicString); (2)
            Futures.addCallback(listenableFuture, new FutureCallback<Optional<byte[]>>() {
                @Override
                public void onSuccess(@Nullable final Optional<byte[]> result) {
                    if (result != null && result.isPresent()) {
                        log.trace("Client {} published last to topic {} this payload {}.", clientId, topicString, result.get());
                    }
                }

                @Override
                public void onFailure(final Throwable t) {
                    log.error("Failed to get last-payload for client: {}", clientId, t);
                }
            }, pluginExecutorService); (3)
        }
    }

    @Override
    public int priority() {
        return CallbackPriority.MEDIUM;
    }
}
1 Inject the AsyncSessionAttributeStore and the PluginExecutorService.
2 Get a Session Attribute.
3 Listen on the future and handle error case.

Connection Attribute Store

The Connection Attribute Store enables HiveMQ plugins to add, get and remove Connection Attributes for clients. Connection Attributes are stored for the duration of the client connection.

For example Connection Attributes can be used to add authentication and authorization information to the client connection.

The Connection Attribute Store can only be accessed on the HiveMQ node which the client is connected to. A cluster wide alternative is the Session Attribute Store.

Connection Attributes are stored in memory and should only be used for a small amount of data.

Connection Attribute Store API
/**
 * Sets the given connection attribute for the connected client.
 *
 * @param key   the key of the connection attribute.
 * @param value the value of the connection attribute.
 * @throws LimitExceededException a {@link LimitExceededException} is thrown when the size of the passed value exceeds the maximum allowed size of 10 kilobytes for the value
 */
void put(@NotNull String key, @NotNull byte[] value);

/**
 * Sets the given connection attribute as UTF-8 String representation for the connected client.
 *
 * @param key   the key of the connection attribute.
 * @param value the value of the connection attribute as a string.
 * @throws LimitExceededException a {@link LimitExceededException} is thrown when the size of the passed value exceeds the maximum allowed size of 10 kilobytes for the value
 */
void putAsString(@NotNull String key, @NotNull String value);

/**
 * Sets the given connection attribute as String representation for the connected client with a given charset.
 *
 * @param key     the key of the connection attribute.
 * @param value   the value of the connection attribute as a string with the given charset.
 * @param charset the {@link Charset} of the given value.
 * @throws LimitExceededException a {@link LimitExceededException} is thrown when the size of the passed value exceeds the maximum allowed size of 10 kilobytes for the value
 */
void putAsString(@NotNull String key, @NotNull String value, @NotNull Charset charset);

/**
 * Retrieves the value of the connection attribute with the given key for the connected client.
 *
 * @param key the key of the connection attribute.
 * @return an {@link Optional} containing the value of the connection attribute if present.
 */
@NotNull
Optional<byte[]> get(@NotNull String key);

/**
 * Retrieves the value of the connection attribute with the given key for the connected client as UTF-8 string.
 *
 * @param key the key of the connection attribute.
 * @return an {@link Optional} containing the value of the connection attribute as a string if present.
 */
@NotNull
Optional<String> getAsString(@NotNull String key);

/**
 * Retrieves the value of the connection attribute with the given key for the connected client as string with the given charset.
 *
 * @param key     the key of the connection attribute.
 * @param charset the {@link Charset} of the value of the connection attribute.
 * @return an {@link Optional} containing the value of the connection attribute as a string with the given charset if present.
 */
@NotNull
Optional<String> getAsString(@NotNull String key, @NotNull Charset charset);

/**
 * Retrieves all connection attributes for the connected client.
 *
 * @return an {@link Optional} containing all connection attributes as a map of key and value pairs if present.
 */
@NotNull
Optional<ImmutableMap<String, byte[]>> getAll();

/**
 * Removes the connection attribute with the given key for the connected client.
 *
 * @param key the key of the connection attribute.
 * @return an {@link Optional} containing the value of the removed connection attribute if it was present.
 */
@NotNull
Optional<byte[]> remove(@NotNull String key);

/**
 * Clears all connection attributes for the connected client.
 */
void clear();

/**
 * Returns whether this connection attribute store is accessible.
 * The connection attribute store can only be accessed on the HiveMQ node, which the client is connected to.
 * A cluster-wide alternative is the {@link AsyncSessionAttributeStore}.
 * <p>
 * The ConnectionAttributeStore is not necessarily available if the {@link com.hivemq.spi.security.ClientData instance} was
 * not acquired via a {@link com.hivemq.spi.callback.Callback} parameter.
 * If you e.g. receive a {@link com.hivemq.spi.security.ClientData} object from calling
 * {@link AsyncClientService#getClientData(String)}, it's required to call {@link #isAccessible()} before
 * using the ConnectionAttributeStore.
 *
 * @return <code>true</code> if this connection attribute store is accessible, otherwise <code>false</code>
 */
boolean isAccessible();
Example of setting a Connection Attribute inside a callback
public class ExampleCallback implements OnAuthenticationCallback {
    @Override
    public Boolean checkCredentials(final ClientCredentialsData clientData) throws AuthenticationException {
        final ConnectionAttributeStore connectionAttributeStore = clientData.getConnectionAttributeStore(); (1)
        final String token = [...];
        connectionAttributeStore.putAsString("authentication.token", token); (2)
        return true;
    }

    @Override
    public int priority() {
        return CallbackPriority.MEDIUM;
    }
}
1 Get the Connection Attribute Store from the provided ClientData object.
2 Set the Connection Attribute.
Example of getting a Connection Attribute inside a callback
public class ExampleCallback implements AfterLoginCallback {
    @Override
    public void afterSuccessfulLogin(final ClientCredentialsData clientData) {
        final ConnectionAttributeStore connectionAttributeStore = clientData.getConnectionAttributeStore(); (1)
        final Optional<String> tokenOptional = connectionAttributeStore.getAsString("authentication.token"); (2)
        if (tokenOptional.isPresent()) { (3)
            final String token = tokenOptional.get();
            [...]
        }
    }

    @Override
    public void afterFailedLogin(AuthenticationException exception, final ClientData clientData) {
        [...]
    }
}
1 Get the Connection Attribute Store from the provided ClientData object.
2 Get the Connection Attribute.
3 The Connection Attribute is present if it was set and not removed before.

Retained Message Store

The Retained Message Store enables HiveMQ plugins to directly add and remove retained messages from HiveMQs retained message store. It’s also possible to retrieve a certain retained message or the amount of messages that are currently stored by HiveMQ.

Blocking API

Blocking Retained Message Store API
/**
 * @return all retained messages which are currently stored on this HiveMQ instance.
 */
Set<RetainedMessage> getLocalRetainedMessages();

/**
 * @return the number of all retained messages on this HiveMQ instance.
 */
Long localSize();

/**
 * Checks if a retained message is present in the retained message store, on this HiveMQ instance.
 *
 * @param topic the topic associated with the retained message
 * @return true if there's a message for the given topic
 */
boolean containsLocally(String topic);

/**
 * @return all retained messages which are currently stored
 */
Set<RetainedMessage> getRetainedMessages();

/**
 * @param topic a topic
 * @return retained message for the specific topic or <code>null</code>.
 * instance with an empty reference
 */
@Nullable
RetainedMessage getRetainedMessage(String topic);

/**
 * Removes the retained message from given topic.
 * If there isn't any retained message on the topic yet, nothing will happen.
 *
 * @param topic from which the message should be removed
 */
void remove(String topic);

/**
 * Removes all retained messages from the message store.
 */
void clear();

/**
 * This method adds or replaces a retained message
 *
 * @param retainedMessage which should be added or replaced
 */
void addOrReplace(RetainedMessage retainedMessage);

/**
 * Checks if a retained message is present in the retained message store.
 *
 * @param topic the topic associated with the retained message
 * @return true if there's a message for the given topic
 */
boolean contains(String topic);

/**
 * @return the number of all retained messages
 */
long size();

Example Usage

This section discusses common examples on how to use the Blocking Retained Message Store in your own plugins.

Inject the Message Store
import com.hivemq.spi.services.BlockingRetainedMessageStore;

import javax.inject.Inject;

public class MyPlugin {

    private final BlockingRetainedMessageStore retainedMessageStore;

    @Inject
    public MyPlugin(final BlockingRetainedMessageStore retainedMessageStore) { (1)
        this.retainedMessageStore = retainedMessageStore;
    }
}
1 The message store implementation is injected by the HiveMQ core.


Programmatically add a new Retained Message
    public void addRetainedMessage(String topic, String message){

        if(!retainedMessageStore.contains(topic)) (1)
            retainedMessageStore.addOrReplace(new RetainedMessage
                                 (topic, message.getBytes(), QoS.valueOf(1)));(2)
    }
1 Check if there is currently a retained message for a certain topic.
2 Add a retained message to the topic if there is none.


Clear the whole Retained Message Store
    public void cleanUp(){

        if(retainedMessageStore.size() >= TOO_MANY) (1)
            retainedMessageStore.clear(); (2)
    }
1 Get the amount of retained messages that are currently stored and check if there are too many retained messages.
2 Delete all retained messages.


Read and remove specific retained messages.
    public void logAndDeleteRetainedMessage(String topic){
        RetainedMessage rm = retainedMessageStore.getRetainedMessage(topic); (1)

        if(rm != null){
            logger.info(new String(rm.get().getMessage()));
            retainedMessageStore.remove(rm.get()); (2)
        }
    }
1 Get the retained message for a certain topic if there is any.
2 Remove the message.

Async API

Async Retained Message Store API
/**
 * @return a {@link com.google.common.util.concurrent.ListenableFuture} which contains all retained messages which are currently stored on this HiveMQ instance.
 */
ListenableFuture<Set<RetainedMessage>> getLocalRetainedMessages();

/**
 * @return a {@link com.google.common.util.concurrent.ListenableFuture} which contains the number of all retained messages on this HiveMQ instance.
 */
ListenableFuture<Long> localSize();

/**
 * Checks if a retained message is present in the retained message store, on this HiveMQ instance.
 *
 * @param topic the topic associated with the retained message
 * @return a {@link com.google.common.util.concurrent.ListenableFuture} which contains true if there's a message for the given topic
 */
ListenableFuture<Boolean> containsLocally(String topic);

/**
 * @return a {@link com.google.common.util.concurrent.ListenableFuture} which contains all retained messages which are currently stored
 */
ListenableFuture<Set<RetainedMessage>> getRetainedMessages();

/**
 * @param topic a topic
 * @return a {@link com.google.common.util.concurrent.ListenableFuture} which contains the retained message for the specific topic or <code>null</code>.
 */
ListenableFuture<RetainedMessage> getRetainedMessage(String topic);

/**
 * Removes the retained message from given topic.
 * If there isn't any retained message on the topic yet, nothing will happen.
 *
 * @param topic from which the message should be removed
 * @return a {@link com.google.common.util.concurrent.ListenableFuture} which returns after removal
 */
ListenableFuture<Void> remove(String topic);

/**
 * Removes all retained messages from the message store.
 *@return a {@link com.google.common.util.concurrent.ListenableFuture} which returns after removal
 */
ListenableFuture<Void> clear();

/**
 * This method adds or replaces a retained message
 *
 * @param retainedMessage which should be added or replaced
 * @return a {@link com.google.common.util.concurrent.ListenableFuture} which returns after adding or replacing
 */
ListenableFuture<Void> addOrReplace(RetainedMessage retainedMessage);

/**
 * Checks if a retained message is present in the retained message store.
 *
 * @param topic the topic associated with the retained message
 * @return a {@link com.google.common.util.concurrent.ListenableFuture} which contains true if there's a message for the given topic
 */
ListenableFuture<Boolean> contains(String topic);

/**
 * @return a {@link com.google.common.util.concurrent.ListenableFuture} which contains the number of all retained messages
 */
ListenableFuture<Long> size();

Example Usage

This section discusses common examples on how to use the Async Retained Message Store in your own plugins.

Programmatically add a new Retained Message
public void addRetainedMessage(final String topic, final String message) {

    final ListenableFuture<Void> future = retainedMessageStore.addOrReplace(new RetainedMessage (1)
            (topic, message.getBytes(), QoS.valueOf(1)));
    Futures.addCallback(future, new FutureCallback<Void>() {
        @Override
        public void onSuccess(Void aVoid) {
            log.debug("Set new retained message for topic {}", topic); (2)
        }

        @Override
        public void onFailure(Throwable throwable) {
            log.error("Failed to set retained message for topic {}.", topic, throwable);
        }
    });
}
1 Set a retained message for a given topic.
2 Log that the retained message was set, as soon as the future succeeds.


Read and remove specific retained messages.
public void logAndDeleteRetainedMessage(final String topic) {
    final ListenableFuture<RetainedMessage> future = retainedMessageStore.getRetainedMessage(topic); (1)
    Futures.addCallback(future, new FutureCallback<RetainedMessage>() {
        @Override
        public void onSuccess(final RetainedMessage retainedMessage) {
            if (retainedMessage != null) {
                log.info("Removing retained message on topic {}.", topic);
                retainedMessageStore.remove(topic); (2)
            }
        }

        @Override
        public void onFailure(Throwable throwable) {

        }
    });
}
1 Get the retained message for a certain topic if there is any.
2 Remove the message.


Subscription Store Service

The Subscription Store is designed to programmatically interact with MQTT client subscriptions. The Subscription Store service provides methods to get, add or remove subscriptions and allows to find out which topics a certain subscriber has subscribed to. It’s also possible with the Subscription Store to find out which clients subscribed to a certain topic.

The Subscription Store allows to query for subscription information on all cluster nodes or just the local HiveMQ node.

There are separate "local" methods available in case the broker instance might be connected to a cluster and you want to gather information about this instance only. Local and cluster methods will return the same result, if the broker instance is not part of a cluster.

When a subscription for a client is added via the extension system or HiveMQ Control Center, the retained message for the topics included in the subscription will not be published to the client.

Blocking API

/**
 * This method returns all subscriptions on this HiveMQ Node as a {@link com.google.common.collect.Multimap} of client identifiers and topics.
 * You won't receive subscriptions of connected
 * clients from other HiveMQ nodes if HiveMQ runs in a cluster.
 * <p/>
 * Please be aware that calling this method on HiveMQ instances with many subscriptions could have
 * negative performance effects.
 * <p/>
 * The returned Multimap is read-only and must not be modified.
 *
 * @return a {@link com.google.common.collect.Multimap} of client identifiers and their topic subscriptions
 */
@ReadOnly
Multimap<String, Topic> getLocalSubscriptions();

/**
 * Returns all MQTT client subscriber identifiers for a given topic, for this HiveMQ instance.
 * MQTT Wildcards are allowed.
 * <p/>
 * Don't pass <code>null</code> as topic. This method is lenient, so
 * it will just return an empty Set.
 * <p/>
 * The returned Set is read-only and must not be modified.
 *
 * @param topic the topic
 * @return client identifiers of all subscribers that subscribed to the topic
 */
@ReadOnly
Set<String> getLocalSubscribers(@NotNull String topic);

/**
 * Returns all topics a client is subscribed to, on this HiveMQ instance.
 * <p/>
 * If the client does not exist, an empty Set is returned.
 * <p/>
 * Don't pass <code>null</code> as clientId. This method is lenient, so
 * it will just return an empty Set.
 * <p/>
 * The returned Set is read-only and must not be modified.
 *
 * @param clientID of the client
 * @return all topics the client subscribed to
 */
@ReadOnly
Set<Topic> getLocalTopics(@NotNull String clientID);

/**
 * This method adds a subscription for a certain client to a certain topic.
 * If HiveMQ is connected to a cluster, the subscription will be broadcast to all other Cluster Nodes.
 * <p/>
 * This method is lenient, so if the clientId or the topic
 * is <code>null</code>, nothing will happen.
 *
 * @param clientID client, which should be subscribed
 * @param topic    topic to which the client should be subscribed
 */
void addSubscription(@NotNull String clientID, @NotNull Topic topic);

/**
 * This method adds subscriptions for a certain client to certain topics.
 * If HiveMQ is connected to a cluster, the subscription will be broadcast to all other Cluster Nodes.
 * <p>
 *
 * @param clientID client, which should be subscribed
 * @param topics   topics to which the client should be subscribed
 * @throws RateLimitExceededException if the plugin service rate limit was exceeded.
 * @throws InvalidTopicException      if any topic is not utf-8 well-formed or empty.
 * @throws NoSuchClientIdException    if a client does not exist.
 * @throws NullPointerException       if clientID or topics is <code>null</code>.
 * @throws IllegalArgumentException   if clientID or topics is empty.
 */
void addSubscriptions(@NotNull String clientID, @NotNull Set<Topic> topics);

/**
 * This method removes a subscription for a certain client and a certain topic.
 * If HiveMQ is connected to a cluster, the subscription will be removed by other Cluster Nodes as well.
 *
 * @param clientID client, which should get unsubscribed
 * @param topic    topic from which the client should get unsubscribed
 */
void removeSubscription(@NotNull String clientID, @NotNull String topic);

/**
 * This method removes subscriptions for a certain client and certain topics.
 * If HiveMQ is connected to a cluster, the subscription will be removed by other Cluster Nodes as well.
 * <p>
 *
 * @param clientID client, which should get unsubscribed
 * @param topics   topics from which the client should get unsubscribed
 * @throws RateLimitExceededException if the plugin service rate limit was exceeded.
 * @throws NullPointerException       if clientID or topics is <code>null</code>.
 * @throws IllegalArgumentException   if clientID or topics is empty.
 */
void removeSubscriptions(@NotNull String clientID, @NotNull Set<String> topics);

/**
 * This method returns all subscriptions this HiveMQ instance and all other nodes in a HiveMQ cluster,
 * as a {@link com.google.common.collect.Multimap} of client identifiers and topics.
 * <p/>
 * Please be aware that calling this method on HiveMQ instances with many subscriptions could have
 * negative performance effects.
 * <p/>
 * The returned Multimap is read-only and must not be modified.
 *
 * @return a {@link com.google.common.collect.Multimap} of client identifiers and their topic subscriptions
 */
@ReadOnly
Multimap<String, Topic> getSubscriptions();

/**
 * Returns all MQTT client subscriber identifiers for a given topic, this HiveMQ instance and all other nodes in a HiveMQ cluster.
 * MQTT Wildcards are allowed.
 * <p/>
 * Don't pass <code>null</code> as topic. This method is lenient, so
 * it will just return an empty Set.
 * <p/>
 * The returned Set is read-only and must not be modified.
 *
 * @param topic the topic
 * @return client identifiers of all subscribers that subscribed to the topic
 */
@ReadOnly
Set<String> getSubscribers(@NotNull String topic);

/**
 * Returns all topics a client is subscribed to, on this HiveMQ instance and all other nodes in a HiveMQ cluster.
 * <p/>
 * If the client does not exist, an empty Set is returned.
 * <p/>
 * Don't pass <code>null</code> as clientId. This method is lenient, so
 * it will just return an empty Set.
 * <p/>
 * The returned Set is read-only and must not be modified.
 *
 * @param clientID of the client
 * @return all topics the client subscribed to
 */
@ReadOnly
Set<Topic> getTopics(@NotNull String clientID);

Example Usage

This section discusses common examples on how to use the Subscription Store in your own plugins.

Inject the Subscription Store
import com.hivemq.spi.services.BlockingSubscriptionStore;

import javax.inject.Inject;

public class MyPlugin {

    private final BlockingSubscriptionStore subscriptionStore;

    @Inject
    public MyPlugin(final BlockingSubscriptionStore subscriptionStore) { (1)
        this.subscriptionStore = subscriptionStore;
    }
}
1 The subscription store implementation is injected by the HiveMQ core.


Setup a default Subscription
public class ConnackSend implements OnConnackSend {

    private final static String DEFAULT_TOPIC = "default/topic";
    private final BlockingSubscriptionStore subscriptionStore;

    @Inject
    public ConnackSend(BlockingSubscriptionStore subscriptionStore, ClientService clientService) {
        this.subscriptionStore = subscriptionStore;
    }

    @Override
    public void onConnackSend(CONNACK connack, ClientData clientData) { (1)
        subscriptionStore.addSubscription(clientData.getClientId(), new Topic(DEFAULT_TOPIC, QoS.valueOf(1))); (2)
    }

}
1 Add a subscription as soon as a client connected successfully.
2 As soon as a subscription is added, the MQTT client will receive all messages published to the topic.


Setup many default subscriptions
public class ConnackSend implements OnConnackSend {

    private final Set<Topic> defaultTopics;
    private final BlockingSubscriptionStore subscriptionStore;

    @Inject
    public ConnackSend(BlockingSubscriptionStore subscriptionStore) {
        this.subscriptionStore = subscriptionStore;
        defaultTopics = new HashSet<>();
        defaultTopics.add(new Topic("topic/a", QoS.AT_MOST_ONCE));
        defaultTopics.add(new Topic("topic/b", QoS.AT_LEAST_ONCE));
        defaultTopics.add(new Topic("topic/c", QoS.EXACTLY_ONCE));
    }

    @Override
    public void onConnackSend(CONNACK connack, ClientData clientData) { (1)
        subscriptionStore.addSubscriptions(clientData.getClientId(), defaultTopics)); (2)
    }

}
1 Add the subscriptions as soon as a client connected successfully.
2 As soon as the subscriptions are added, the MQTT client will receive all messages published to the topics.


Remove a set of topics for a client
private void removeSubscriptions(final String clientID, final Set<String> topics) {

    subscriptionStore.removeSubscriptions(clientId, topics); (1)

}
1 Remove the subscriptions for a client.


List Subscribers for a certain topic
private void logSubscribers(String topic) {

    Set<string> subscribers = subscriptionStore.getLocalSubscribers(topic); (1)
    for (String clientId : subscribers) {
        logger.info(clientId);
    }
}
1 Get all subscribers for a certain topic, on this broker instance only.


Async API

/**
 * This method returns all subscriptions on this HiveMQ Node as a {@link com.google.common.collect.Multimap} of client identifiers and topics.
 * You won't receive subscriptions of connected
 * clients from other HiveMQ nodes if HiveMQ runs in a cluster.
 * <p/>
 * Please be aware that calling this method on HiveMQ instances with many subscriptions could have
 * negative performance effects.
 * <p/>
 * The returned Multimap is read-only and must not be modified.
 *
 * @return a {@link com.google.common.util.concurrent.ListenableFuture} which contains a {@link com.google.common.collect.Multimap} of client identifiers and their topic subscriptions
 */
@ReadOnly
ListenableFuture<Multimap<String, Topic>> getLocalSubscriptions();

/**
 * Returns all MQTT client subscriber identifiers for a given topic, for this HiveMQ instance.
 * MQTT Wildcards are allowed.
 * <p/>
 * Don't pass <code>null</code> as topic. This method is lenient, so
 * it will just return an empty Set.
 * <p/>
 * The returned Set is read-only and must not be modified.
 *
 * @param topic the topic
 * @return a {@link com.google.common.util.concurrent.ListenableFuture} which contains the client identifiers of all subscribers that subscribed to the topic
 */
@ReadOnly
ListenableFuture<Set<String>> getLocalSubscribers(@NotNull String topic);

/**
 * Returns all topics a client is subscribed to, on this HiveMQ instance.
 * <p/>
 * If the client does not exist, an empty Set is returned.
 * <p/>
 * Don't pass <code>null</code> as clientId. This method is lenient, so
 * it will just return an empty Set.
 * <p/>
 * The returned Set is read-only and must not be modified.
 *
 * @param clientID of the client
 * @return a {@link com.google.common.util.concurrent.ListenableFuture} which contains all topics the client subscribed to
 */
@ReadOnly
ListenableFuture<Set<Topic>> getLocalTopics(@NotNull String clientID);

/**
 * This method adds a subscription for a certain client to a certain topic.
 * If HiveMQ is connected to a cluster, the subscription will be broadcast to all other Cluster Nodes.
 * <p/>
 * This method is lenient, so if the clientId or the topic
 * is <code>null</code>, nothing will happen.
 *
 * @param clientID client, which should be subscribed
 * @param topic    topic to which the client should be subscribed
 * @return a {@link com.google.common.util.concurrent.ListenableFuture} object that will succeed,
 * as soon es the subscription was added by all Cluster Nodes.
 */
ListenableFuture<Void> addSubscription(@NotNull String clientID, @NotNull Topic topic);

/**
 * This method adds subscriptions for a certain client to certain topics.
 * If HiveMQ is connected to a cluster, the subscription will be broadcast to all other Cluster Nodes.
 * <p>
 *
 * @param clientID client, which should be subscribed
 * @param topics   topics to which the client should be subscribed
 * @return a {@link ListenableFuture} object that will succeed, as soon es the subscriptions were added by all Cluster Nodes.
 * failing with a {@link RateLimitExceededException} if the plugin service rate limit was exceeded.
 * failing with a {@link InvalidTopicException} if any topic is not utf-8 well-formed or empty.
 * failing with a {@link NoSuchClientIdException} if a client does not exist.
 * <p>
 * @throws NullPointerException     if clientID or topics is <code>null</code>.
 * @throws IllegalArgumentException if clientID or topics is empty.
 */
ListenableFuture<Void> addSubscriptions(@NotNull String clientID, @NotNull Set<Topic> topics);

/**
 * This method removes a subscription for a certain client and a certain topic.
 * If HiveMQ is connected to a cluster, the subscription will be removed by other Cluster Nodes as well.
 *
 * @param clientID client, which should get unsubscribed
 * @param topic    topic from which the client should get unsubscribed
 * @return a {@link com.google.common.util.concurrent.ListenableFuture} object that will succeed,
 * as soon es the subscription was removed by all Cluster Nodes.
 */
ListenableFuture<Void> removeSubscription(@NotNull String clientID, @NotNull String topic);

/**
 * This method removes subscriptions for a certain client and certain topics.
 * If HiveMQ is connected to a cluster, the subscriptions will be removed by other Cluster Nodes as well.
 * <p>
 *
 * @param clientID client, which should get unsubscribed
 * @param topics   topics from which the client should get unsubscribed
 * @return a {@link ListenableFuture} object that will succeed, as soon es the subscriptions were removed by all Cluster Nodes.
 * failing with a {@link RateLimitExceededException} if the plugin service rate limit was exceeded.
 * <p>
 * @throws NullPointerException     if clientID or topics is <code>null</code>.
 * @throws IllegalArgumentException if clientID or topics is empty.
 */
ListenableFuture<Void> removeSubscriptions(@NotNull String clientID, @NotNull Set<String> topics);

/**
 * This method returns all subscriptions this HiveMQ instance and all other nodes in a HiveMQ cluster,
 * as a {@link com.google.common.collect.Multimap} of client identifiers and topics.
 * <p/>
 * Please be aware that calling this method on HiveMQ instances with many subscriptions could have
 * negative performance effects.
 * <p/>
 * The returned Multimap is read-only and must not be modified.
 *
 * @return a {@link com.google.common.util.concurrent.ListenableFuture} which contains a {@link com.google.common.collect.Multimap} of client identifiers and their topic subscriptions
 */
@ReadOnly
ListenableFuture<Multimap<String, Topic>> getSubscriptions();

/**
 * Returns all MQTT client subscriber identifiers for a given topic, this HiveMQ instance and all other nodes in a HiveMQ cluster.
 * MQTT Wildcards are allowed.
 * <p/>
 * Don't pass <code>null</code> as topic. This method is lenient, so
 * it will just return an empty Set.
 * <p/>
 * The returned Set is read-only and must not be modified.
 *
 * @param topic the topic
 * @return a {@link com.google.common.util.concurrent.ListenableFuture} which contains the client identifiers of all subscribers that subscribed to the topic
 */
@ReadOnly
ListenableFuture<Set<String>> getSubscribers(@NotNull String topic);

/**
 * Returns all topics a client is subscribed to, on this HiveMQ instance and all other nodes in a HiveMQ cluster.
 * <p/>
 * If the client does not exist, an empty Set is returned.
 * <p/>
 * Don't pass <code>null</code> as clientId. This method is lenient, so
 * it will just return an empty Set.
 * <p/>
 * The returned Set is read-only and must not be modified.
 *
 * @param clientID of the client
 * @return a {@link com.google.common.util.concurrent.ListenableFuture} which contains which contains all topics the client subscribed to
 */
@ReadOnly
ListenableFuture<Set<Topic>> getTopics(@NotNull String clientID);

Example Usage

Remove unwanted subscriptions
public void removeTopicSubscriptions(final Topic topicToRemove) {

    Futures.addCallback(subscriptionStore.getSubscriptions(), new FutureCallback<Multimap<String, Topic>>() { (1)

        @Override
        public void onSuccess(Multimap<String, Topic> subscriptions) { (2)(3)

            for (String clientId : subscriptions.keySet()) { (4)

                if (subscriptions.get(clientId).contains(topicToRemove)) { (5)
                    subscriptionStore.removeSubscription(clientId, topicToRemove.getTopic());
                }
            }
        }

        @Override
        public void onFailure(Throwable t) {
            t.printStackTrace();
        }
    });
}
1 The getSubscriptions method returns a ListenableFuture.
2 The callback succeeds, as soon as the subscriptions have been collected.
3 Get a Multimap with client identifiers as keys and topic subscriptions as value.
4 Iterate all client identifiers.
5 Find all clients that subscribed to a certain topic and remove the subscriptions.


Setup many default subscriptions
public class ConnackSend implements OnConnackSend {

    private final Set<Topic> defaultTopics;
    private final AsyncSubscriptionStore subscriptionStore;

    @Inject
    public ConnackSend(AsyncSubscriptionStore subscriptionStore) {
        this.subscriptionStore = subscriptionStore;
        defaultTopics = new HashSet<>();
        defaultTopics.add(new Topic("topic/a", QoS.AT_MOST_ONCE));
        defaultTopics.add(new Topic("topic/b", QoS.AT_LEAST_ONCE));
        defaultTopics.add(new Topic("topic/c", QoS.EXACTLY_ONCE));
    }

    @Override
    public void onConnackSend(CONNACK connack, ClientData clientData) { (1)
        ListenableFuture<Void> addFuture = asyncSubscriptionStore.addSubscriptions("client", topics);  (2)

        Futures.addCallback(addFuture, new FutureCallback<Void>() { (3)
            @Override
            public void onSuccess(@Nullable Void result) {
                log.debug("successfully added subscriptions"); (4)
            }

            @Override
            public void onFailure(Throwable t) {
                log.error("adding subscriptions failed: ", t); (5)
            }
        });
    }

}
1 Add the subscriptions as soon as a client connected successfully.
2 As soon as the subscriptions are added, the MQTT client will receive all messages published to the topics.
3 Add a callback to the add subscription future.
4 log when subscriptions were added successfully.
5 log when adding subscriptions failed.


Remove a set of topics for a client
private void removeSubscriptions(final String clientID, final Set<String> topics) {

    ListenableFuture<Void> removeFuture = asyncSubscriptionStore.removeSubscriptions("client", topics);  (1)

    Futures.addCallback(removeFuture, new FutureCallback<Void>() { (2)
        @Override
        public void onSuccess(@Nullable Void result) {
            log.debug("successfully added subscriptions"); (3)
        }

        @Override
        public void onFailure(Throwable t) {
            log.error("adding subscriptions failed: ", t); (4)
        }
    });

}
1 Remove the subscriptions for a client.
2 Add a callback to the remove subscription future.
3 log when subscriptions were removed successfully.
4 log when removing subscriptions failed.

Gotchas

  • The Multimap class is part of the com.google.common.collect API. See the documentation for further information.

  • Topic class objects are equal if their topics are equal, independent of their quality of service (QoS) levels.

  • Subscriptions added in the OnConnectCallback for the connecting clients are removed after the callback execution, if the client is requesting a clean session. Consider using the OnConnackSend callback instead.

  • Shared subscriptions with quality of service level 2 (QoS 2) are downgraded to quality of service level 1 (QoS 1).


Client Service

The Client Service allows plugins to gather information about clients, like whether a client is currently connected or not, its username, client identifier and SSL/TLS certificate, if the client is anonymous or if the client is authenticated.

The Client Service allows to query for client information on all cluster nodes or just the local HiveMQ node.

There are separate "local" methods available in case the broker instance might be connected to a cluster and you want to gather information about this instance only. In case the HiveMQ instance is not connected to a cluster, both methods will return the same results.

The Client Service can be used to set the time to live for client sessions in seconds. A time to live value of -1 results in the session never expiring. A time to live value of 0 results in the session being invalidated. The given TTL may be overwritten at client reconnection. You can also set the TTL as a restriction during the connection.

Blocking API

/**
 * Returns all identifiers of connected clients of this HiveMQ Node. You won't receive client identifiers of connected
 * clients from other HiveMQ nodes if HiveMQ runs in a cluster.
 * <p/>
 * If you have many client connections, please note that calling this method frequently could have negative performance
 * effects.
 *
 * @return client identifiers of all connected clients
 */
Set<String> getLocalConnectedClients();

/**
 * Returns all disconnected clients which have a persistent MQTT session on this instance of HiveMQ (MQTT clean session=false).
 * <p/>
 * Disconnected MQTT clients which don't have a persistent session won't be returned by this method
 *
 * @return all disconnected clients with a persistent MQTT session
 */
Set<String> getLocalDisconnectedClients();

/**
 * Check if a client with a given identifier is currently connected to this HiveMQ instance.
 *
 * @param clientId client, which should be checked
 * @return true, if a certain client is currently connected and false otherwise
 */
boolean isClientConnectedLocal(String clientId);

/**
 * Returns client information for clients that are connected to this broker instance.
 * <p/>
 * If the client isn't connected, <code>null</code> will be returned.
 *
 * @param clientId the client identifier of the client
 * @return {@link ClientData} for a specific client or <code>null</code> if the client is not connected
 */
@Nullable
ClientData getLocalClientData(String clientId);

/**
 * Returns all identifiers of connected clients of this HiveMQ instance and all other nodes in a HiveMQ cluster
 * <p/>
 * Calling this method frequently in a clustered environment could have negative performance effects.
 *
 * @return client identifiers of all connected clients
 */
Set<String> getConnectedClients();

/**
 * Returns all disconnected clients which have a persistent MQTT session on this broker or any other cluster node.
 * <p/>
 * Disconnected MQTT clients which don't have a persistent session won't be returned by this method
 *
 * @return all disconnected clients with a persistent MQTT session
 */
Set<String> getDisconnectedClients();

/**
 * Check if a client with a given identifier is currently connected to this HiveMQ broker instance or any other instance in the cluster.
 *
 * @param clientId client, which should be checked
 * @return true, if a certain client is currently connected and false otherwise
 */
boolean isClientConnected(String clientId);

/**
 * Returns additional client information about a given client with a given client identifier.
 * <p/>
 * This method will also get client information from other cluster nodes if needed.
 * <p/>
 * If the client isn't connected, <code>null</code> will be returned.
 *
 * @param clientId the client identifier of the client
 * @return {@link ClientData} for a specific client or <code>null</code> if the client is not connected
 */
@Nullable
ClientData getClientData(String clientId);

/**
 * Forcefully disconnect a client with the specified clientId.
 * <p>
 * If the client specified a LWT message, it will be sent.
 * To prevent LWT messages use the {@link #disconnectClient(String, boolean)} method
 *
 * @param clientId the clientId to disconnect
 * @return true if the client has been disconnected, false if no client with that id was found
 * @since 3.2
 */
boolean disconnectClient(String clientId);

/**
 * Forcefully disconnect a client with the specified clientId.
 * <p>
 * If the client specified a LWT message, it will only be sent if the boolean parameter is false
 *
 * @param clientId          the clientId to disconnect
 * @param preventLwtMessage if true the LWT message for this client is not sent
 * @return true if the client has been disconnected, false if no client with that id was found
 * @since 3.2
 */
boolean disconnectClient(String clientId, boolean preventLwtMessage);

/**
 * Sets the time to live for the client session. A TTL value of -1 results in the session never expiring.
 * Attention: The given TTL may be overwritten at client reconnection.
 *
 * @param clientId the client identifier of the client
 * @param ttl time to live for a client session in seconds
 *
 * @throws NoSuchClientIdException if no session for the given clientId exists
 * @throws com.hivemq.spi.services.exception.IncompatibleHiveMQVersionException if a node with a version lower than 3.4 exists in the cluster
 * @throws com.hivemq.spi.callback.exception.InvalidTTLException if the given TTL is invalid (< -1)
 * @since 3.4
 */
void setTTL(@NotNull String clientId, int ttl) throws NoSuchClientIdException;

/**
 * Returns the time to live for a client session in seconds.
 * <p/>
 * A value of -1 for the TTL means that the session never expires.
 *
 * @param clientId the client identifier of the client
 *
 * @return ttl time to live for the client session in seconds
 * @throws NoSuchClientIdException if no session for the given clientId exists
 * @throws com.hivemq.spi.services.exception.IncompatibleHiveMQVersionException} if a node with a version lower than 3.4 exists in the cluster
 * @since 3.4
 */
int getTTL(@NotNull String clientId) throws NoSuchClientIdException;

/**
 * Invalidates the client session for the client with the given client identifier. If the client is currently connected, it will be disconnected as well.
 * <p/>
 * Sets time to live to 0.
 * Attention: The given TTL of 0 may be overwritten at client reconnection.
 *
 * @param clientId the client identifier of the client whose session should be invalidated
 * @throws NoSuchClientIdException if no session for the given clientId exists
 * @throws com.hivemq.spi.services.exception.IncompatibleHiveMQVersionException} if a node with a version lower than 3.4 exists in the cluster
 * @since 3.4
 */
void invalidateSession(@NotNull String clientId) throws NoSuchClientIdException;

Example Usage

This section discusses common examples on how to use the Client Service in your own plugins.

Inject the Client Server
import com.hivemq.spi.services.BlockingClientService;

import javax.inject.Inject;

public class MyPlugin {

    private final BlockingClientService clientService;

    @Inject
    public MyPlugin(final BlockingClientService clientService) { (1)
        this.clientService = clientService;
    }
}
1 The client service implementation is injected by the HiveMQ core.


Check if a client is connected
final boolean clientConnected = clientService.isClientConnected(client);
if (clientConnected) {
    log.info("Client {} is connected {}.");
}


Disconnect a client programmatically
clientService.disconnectClient("clientIdentifier");


Async API

/**
 * Returns all identifiers of connected clients of this HiveMQ Node. You won't receive client identifiers of connected
 * clients from other HiveMQ nodes if HiveMQ runs in a cluster.
 * <p/>
 * If you have many client connections, please note that calling this method frequently could have negative performance
 * effects.
 *
 * @return a {@link com.google.common.util.concurrent.ListenableFuture} which contains all client identifiers of all connected clients
 */
ListenableFuture<Set<String>> getLocalConnectedClients();

/**
 * Returns all disconnected clients which have a persistent MQTT session on this instance of HiveMQ (MQTT clean session=false).
 * <p/>
 * Disconnected MQTT clients which don't have a persistent session won't be returned by this method
 *
 * @return a {@link com.google.common.util.concurrent.ListenableFuture} which contains the client identifiers of all disconnected clients with a persistent MQTT session
 */
ListenableFuture<Set<String>> getLocalDisconnectedClients();

/**
 * Check if a client with a given identifier is currently connected to this HiveMQ instance.
 *
 * @param clientId client, which should be checked
 * @return a {@link com.google.common.util.concurrent.ListenableFuture} which contains true, if a certain client is currently connected and false otherwise
 */
ListenableFuture<Boolean> isClientConnectedLocal(String clientId);

/**
 * Returns client information for clients that are connected to this broker instance.
 * <p/>
 * If the client isn't connected, you will receive an {@link Optional} with absent data.
 *
 * @param clientId the client identifier of the client
 * @return a {@link com.google.common.util.concurrent.ListenableFuture} which contains the {@link ClientData} for a specific client.
 */
ListenableFuture<ClientData> getLocalClientData(String clientId);

/**
 * Returns all identifiers of connected clients of this HiveMQ instance and all other nodes in a HiveMQ cluster
 * <p/>
 * Calling this method frequently in a clustered environment could have negative performance effects.
 *
 * @return a {@link com.google.common.util.concurrent.ListenableFuture} which contains the client identifiers of all connected clients
 */
ListenableFuture<Set<String>> getConnectedClients();

/**
 * Returns all disconnected clients which have a persistent MQTT session on this broker or any other cluster node.
 * <p/>
 * Disconnected MQTT clients which don't have a persistent session won't be returned by this method
 *
 * @return a {@link com.google.common.util.concurrent.ListenableFuture} which contains the of client identifiers of all disconnected clients with a persistent MQTT session
 */
ListenableFuture<Set<String>> getDisconnectedClients();

/**
 * Check if a client with a given identifier is currently connected to this HiveMQ broker instance or any other instance in the cluster.
 *
 * @param clientId client, which should be checked
 * @return a {@link com.google.common.util.concurrent.ListenableFuture} which contains true, if a certain client is currently connected and false otherwise
 */
ListenableFuture<Boolean> isClientConnected(String clientId);

/**
 * Returns additional client information about a given client with a given client identifier.
 * <p/>
 * This method will also get client information from other cluster nodes if needed.
 * <p/>
 * If the client isn't connected, you will receive null.
 *
 * @param clientId the client identifier of the client
 * @return a {@link com.google.common.util.concurrent.ListenableFuture} which contains the {@link ClientData} for a specific client.
 */
ListenableFuture<ClientData> getClientData(String clientId);

/**
 * Forcefully disconnect a client with the specified clientId.
 * <p>
 * If the client specified a LWT message, it will be sent.
 * To prevent LWT messages use the {@link #disconnectClient(String, boolean)} method
 *
 * @param clientId the clientId to disconnect
 * @return a {@link com.google.common.util.concurrent.ListenableFuture} which contains a {@link Boolean} which is true when the client has been disconnected and false if no client with that id was found.
 * @since 3.2
 */
ListenableFuture<Boolean> disconnectClient(String clientId);

/**
 * Forcefully disconnect a client with the specified clientId.
 * <p>
 * If the client specified a LWT message it will only be sent if the boolean parameter is set to false.
 *
 * @param clientId          the clientId to disconnect
 * @param preventLwtMessage if true the LWT message for this client is not published when the client gets disconnected
 * @return a {@link com.google.common.util.concurrent.ListenableFuture} which contains a {@link Boolean} which is true when the client has been disconnected and false if no client with that id was found.
 * @since 3.2
 */
ListenableFuture<Boolean> disconnectClient(String clientId, boolean preventLwtMessage);

/**
 * Sets the time to live for a client session in seconds. Session expiry can be disabled by setting the session time to live to -1.
 * Attention: The given TTL may be overwritten when a client (re-)connects.
 *
 * @param clientId the client identifier of the client
 * @param ttl time to live for the client session in seconds
 * @return a {@link com.google.common.util.concurrent.ListenableFuture} which is returned when the TTL is set successfully,
 * failing with a {@link com.hivemq.spi.services.exception.NoSuchClientIdException} if no session for the given clientId exists,
 * failing with a {@link com.hivemq.spi.services.exception.IncompatibleHiveMQVersionException} if a node with a version lower than 3.4 exists in the cluster,
 * failing with a {@link com.hivemq.spi.callback.exception.InvalidTTLException} if the given TTL is invalid (< -1).
 * @since 3.4
 */
ListenableFuture<Void> setTTL(@NotNull String clientId, int ttl);

/**
 * Returns the time to live for a client session in seconds.
 * <p/>
 * A value of -1 for the TTL means that session expiry is disabled.
 *
 * @param clientId the client identifier of the client
 * @return a {@link com.google.common.util.concurrent.ListenableFuture} succeeding with the client session TTL in seconds,
 * failing with a {@link com.hivemq.spi.services.exception.NoSuchClientIdException} if no session for the given clientId exists,
 * failing with a {@link com.hivemq.spi.services.exception.IncompatibleHiveMQVersionException} if a node with a version lower than 3.4 exists in the cluster.
 * @since 3.4
 */
ListenableFuture<Integer> getTTL(@NotNull String clientId);

/**
 * Invalidates the client session for the client with the given client identifier. If the client is currently connected, it will be disconnected as well.
 * <p/>
 * Sets time to live to 0.
 * Attention: The given TTL of 0 may be overwritten at client reconnection.
 *
 * @param clientId the client identifier of the client which session should be invalidated
 * @return a {@link com.google.common.util.concurrent.ListenableFuture} succeeding with a {@link Boolean} which is true when the client has been actively disconnected by the broker otherwise false,
 * failing with a {@link com.hivemq.spi.services.exception.NoSuchClientIdException} if no session for the given clientId exists,
 * failing with a {@link com.hivemq.spi.services.exception.IncompatibleHiveMQVersionException} if a node with a version lower than 3.4 exists in the cluster.
 * @since 3.4
 */
ListenableFuture<Boolean> invalidateSession(@NotNull String clientId);

Example Usage

Log Client Information
private void logClientData(String clientId) {

    Futures.addCallback(clientService.getClientDataForClientId(clientId), new FutureCallback<optional<clientdata>>() {

        @Override
        public void onSuccess(ClientData result) {

            if (result != null) {
                ClientData clientData = result.get();
                logger.info(clientData.getUsername() + " " + clientData.getClientId() + " " + clientData.isAuthenticated());
            }
        }

        @Override
        public void onFailure(Throwable t) {
            t.printStackTrace();
        }
    });
}


List Connected Clients
private void logConnectedClients() {
    Futures.addCallback(clientService.getConnectedClients(), new FutureCallback<Set<string>>() {
        @Override
        public void onSuccess(Set<string> connectedClients) {
            for (String connectedClient : connectedClients) {
                logger.info("client {} is currently connected.", connectedClient);
            }
        }

        @Override
        public void onFailure(Throwable t) {
            t.printStackTrace();
        }
    });
}


Disconnect a client programmatically
final ListenableFuture<Boolean> disconnected = clientService.disconnectClient("clientIdentifier");
Futures.addCallback(disconnected, new FutureCallback<Boolean>() {
    @Override
    public void onSuccess(@Nullable Boolean result) {
        if (result) {
            log.info("Client disconnected successfully");
        } else {
            log.info("Client was not connected");
        }
    }

    @Override
    public void onFailure(Throwable t) {
        // :(
    }

});

Gotchas

  • If a MQTT client is connected with cleanSession=true, all information about the client will be lost as soon as the client disconnects. This means the client will not be considered as "disconnected client" for the getDisconnectedClients method.


Client Group Service

The Client Group Service enables HiveMQ plugins to organize clients in groups. The plugin system allows to add clients to groups, remove clients from groups, get all groups and to get all clients belonging to a group.

The group memberships of a client are stored for the duration of the MQTT Session of that client. When the session TTL expires, the group memberships are not removed immediately, but at a later time when HiveMQ’s internal cleanup-jobs run.

Client Groups are accessible cluster wide and are replicated for fault tolerance. They can be configured to be stored either in memory or on disk.

The Client Group Service is only available for clients which already have a session available. When a client connects, the earliest point in time to add the client to a group is the OnSessionReadyCallback.

Blocking API

Blocking Client Group Service API
public interface BlockingClientGroupService {

    /**
     * Adds a client to a group.
     *
     * @param group      the group the client will be added to.
     * @param clientData the clientData of the client which will be added to the group.
     * @throws NoSuchClientIdException            if no session for the client to the given clientData exists.
     * @throws IncompatibleHiveMQVersionException if a node with a version lower than 3.3.0 exists in the cluster.
     */
    void addClientToGroup(@NotNull String group, @NotNull ClientData clientData) throws NoSuchClientIdException;

    /**
     * Adds a client to a group.
     *
     * @param group            the group the client will be added to.
     * @param clientIdentifier the identifier of the client which will be added to the group.
     * @throws NoSuchClientIdException            if no session for the client with the given identifier exists.
     * @throws IncompatibleHiveMQVersionException if a node with a version lower than 3.3.0 exists in the cluster.
     */
    void addClientToGroup(@NotNull String group, @NotNull String clientIdentifier) throws NoSuchClientIdException;

    /**
     * Removes a client from a group.
     *
     * @param group            the group the client will be removed from.
     * @param clientIdentifier the identifier of the client which will be added to the group.
     * @throws IncompatibleHiveMQVersionException if a node with a version lower than 3.3.0 exists in the cluster.
     */
    void removeClientFromGroup(@NotNull String group, @NotNull String clientIdentifier);

    /**
     * Retrieves the clients for a group.
     *
     * @param group the group the clients will be retrieved of.
     * @return the clients belonging to the given group.
     * @throws IncompatibleHiveMQVersionException if a node with a version lower than 3.3.0 exists in the cluster.
     */
    @NotNull
    ImmutableSet<String> getClientsForGroup(@NotNull String group);

    /**
     * Retrieves all available groups.
     *
     * @return all available groups.
     * @throws IncompatibleHiveMQVersionException if a node with a version lower than 3.3.0 exists in the cluster.
     */
    @NotNull
    ImmutableSet<String> getAvailableGroups();
}

Example Usage

Get the clients belonging to a group and add a client to group.
public class ClientGroupCallback implements OnConnectCallback {

    private final BlockingClientGroupService blockingClientGroupService;
    private final PublishService publishService;

    @Inject
    public ClientGroupCallback(final BlockingClientGroupService blockingClientGroupService, (1)
                               final PublishService publishService) {
        this.blockingClientGroupService = blockingClientGroupService;
        this.publishService = publishService;
    }

    @Override
    public void onConnect(final CONNECT connect, final ClientData clientData) throws RefusedConnectionException {
        final Optional<String> group = clientData.getConnectionAttributeStore().getAsString("group");

        if (group.isPresent()) {
            final ImmutableSet<String> clientIdentifiers = blockingClientGroupService.getClientsForGroup(group.get()); (2)

            for (final String clientIdentifier : clientIdentifiers) {
                final PUBLISH publish = new PUBLISH(clientData.getClientId().getBytes(), "groupclientconnected", QoS.AT_LEAST_ONCE);
                publishService.publishtoClient(publish, clientIdentifier); (3)
            }

            try{
                blockingClientGroupService.addClientToGroup(group.get(), clientData); (4)
            } catch (NoSuchClientIdException e) {
                //handle exception case
            }
        }
    }

    @Override
    public int priority() {
        return CallbackPriority.MEDIUM;
    }

}
1 Inject the BlockingClientGroupService.
2 Get the clients belonging to a group.
3 Publish to all clients of the group.
4 Add the client to a group.

Async API

Async Client Group Service API
public interface AsyncClientGroupService {

    /**
     * Adds a client to a group.
     *
     * @param group      the group the client will be added to.
     * @param clientData the clientData of the client which will be added to the group.
     * @return a {@link ListenableFuture} succeeding when the client is added to the group,
     * failing with a {@link com.hivemq.spi.services.exception.NoSuchClientIdException} if no session for the client to the given clientData exists,
     * failing with a {@link com.hivemq.spi.services.exception.IncompatibleHiveMQVersionException} if a node with a version lower than 3.3.0 exists in the cluster.
     */
    @NotNull
    ListenableFuture<Void> addClientToGroup(@NotNull String group, @NotNull ClientData clientData);

    /**
     * Adds a client to a group.
     *
     * @param group            the group the client will be added to.
     * @param clientIdentifier the identifier of the client which will be added to the group.
     * @return a {@link ListenableFuture} succeeding when the client is added to the group,
     * failing with a {@link com.hivemq.spi.services.exception.NoSuchClientIdException} if no session for the client with the given identifier exists,
     * failing with a {@link com.hivemq.spi.services.exception.IncompatibleHiveMQVersionException} if a node with a version lower than 3.3.0 exists in the cluster.
     */
    @NotNull
    ListenableFuture<Void> addClientToGroup(@NotNull String group, @NotNull String clientIdentifier);

    /**
     * Removes a client from a group.
     *
     * @param group            the group the client will be removed from.
     * @param clientIdentifier the identifier of the client which will be added to the group.
     * @return a {@link ListenableFuture} succeeding when the client is removed from the group,
     * failing with a {@link com.hivemq.spi.services.exception.IncompatibleHiveMQVersionException} if a node with a version lower than 3.3.0 exists in the cluster.
     */
    @NotNull
    ListenableFuture<Void> removeClientFromGroup(@NotNull String group, @NotNull String clientIdentifier);

    /**
     * Retrieves the clients for a group.
     *
     * @param group the group the clients will be retrieved of.
     * @return a {@link ListenableFuture} succeeding with the clients belonging to the given group,
     * failing with a {@link com.hivemq.spi.services.exception.IncompatibleHiveMQVersionException} if a node with a version lower than 3.3.0 exists in the cluster.
     */
    @NotNull
    ListenableFuture<ImmutableSet<String>> getClientsForGroup(@NotNull String group);

    /**
     * Retrieves all available groups.
     *
     * @return a {@link ListenableFuture} succeeding with the all available groups,
     * failing with a {@link com.hivemq.spi.services.exception.IncompatibleHiveMQVersionException} if a node with a version lower than 3.3.0 exists in the cluster.
     */
    @NotNull
    ListenableFuture<ImmutableSet<String>> getAvailableGroups();
}

Example Usage

Get the clients belonging to a group and add a client to group.
public class ClientGroupCallback implements OnConnectCallback {

    private final AsyncClientGroupService asyncClientGroupService;
    private final PublishService publishService;
    private final PluginExecutorService pluginExecutorService;

    @Inject
    public ClientGroupCallback(final AsyncClientGroupService asyncClientGroupService, (1)
                               final PublishService publishService,
                               final PluginExecutorService pluginExecutorService) {
        this.asyncClientGroupService = asyncClientGroupService;
        this.publishService = publishService;
        this.pluginExecutorService = pluginExecutorService;
    }

    @Override
    public void onConnect(final CONNECT connect, final ClientData clientData) throws RefusedConnectionException {
        final Optional<String> group = clientData.getConnectionAttributeStore().getAsString("group");

        if (group.isPresent()) {
            final ListenableFuture<ImmutableSet<String>> clientIdentifiersFuture = asyncClientGroupService.getClientsForGroup(group.get()); (2)

            Futures.addCallback(clientIdentifiersFuture, new FutureCallback<ImmutableSet<String>>() { (3)
                @Override
                public void onSuccess(final ImmutableSet<String> clientIdentifiers) {

                    for (final String clientIdentifier : clientIdentifiers) {
                        final PUBLISH publish = new PUBLISH(clientData.getClientId().getBytes(), "groupclientconnected", QoS.AT_LEAST_ONCE);
                        publishService.publishtoClient(publish, clientIdentifier); (4)
                    }

                    asyncClientGroupService.addClientToGroup(group.get(), clientData); (5)
                }

                @Override
                public void onFailure(final Throwable t) {
                    //handle exception case
                }
            }, pluginExecutorService);
        }
    }

    @Override
    public int priority() {
        return CallbackPriority.MEDIUM;
    }
}
1 Inject the AsyncClientGroupService.
2 Get the clients belonging to a group.
3 Add a callback for handling the result of the ListenableFuture.
4 Publish to all clients of the group.
5 Add the client to a group.

Publish Service

The Publish Service is used for the automated sending of MQTT publish messages in HiveMQ plugins.

API

     /**
     * Publishes a new MQTT {@link PUBLISH} message. The standard MQTT topic matching mechanism of HiveMQ will apply.
     * <p/>
     * If the given {@link PUBLISH} or any of its information (topic,qos,message) is null, a {@link NullPointerException}
     * will be thrown
     *
     * @param publish object with topic, QoS and message, which should be published to all subscribed clients
     * @throws NullPointerException if the given object is <code>null</code> or any relevant information like topic, qos
     *                              or message is <code>null</code>
     */
    void publish(@NotNull PUBLISH publish);

    /**
     * Publishes a new MQTT {@link PUBLISH} message.
     * The PUBLISH will only be delivered to the client with the specified client identifier.
     * Also the client needs to be subscribed on the topic of the PUBLISH in order to receive it.
     * <p/>
     * If the given {@link PUBLISH} or any of its information (topic,qos,message) is null, a {@link NullPointerException}
     * will be thrown
     *
     * @param publish object with topic, QoS and message, which should be published to all subscribed clients
     * @throws NullPointerException if the given object is <code>null</code> or any relevant information like topic, qos
     *                              or message is <code>null</code>
     */
    void publishtoClient(@NotNull PUBLISH publish, @NotNull String clientId);

Example Usage

This section discusses common examples on how to use the PublishService in your own plugins.

Copy and redirect a PUBLISH message to a subtopic
    @Override
    public void onPublishReceived(PUBLISH publish, ClientData clientData)
                                        throws OnPublishReceivedException {

        PUBLISH copy = PUBLISH.copy(publish); (1)
        copy.setTopic(publish.getTopic()+"/"+subTopic);
        publishService.publish(copy); (2)
    }
1 Make a copy of the incoming message.
2 Publish the copied message another topic.
Deep copy
If you want to modify the original MQTT PUBLISH message, always make a deep copy with PUBLISH.copy of the original PUBLISH object for further modifications.


Publish a new MQTT message
    public void publishNewMessage() {
        final PUBLISH message = new PUBLISH(); (1)
        message.setQoS(QoS.EXACTLY_ONCE);
        message.setTopic("the/topic");
        message.setPayload("payload".getBytes(Charsets.UTF_8));
        message.setRetain(true);

        publishService.publish(message); (2)
    }
1 Create a new PUBLISH object.
2 Publish the message.

Time to Live (TTL)

"Time to Live" determines the lifespan in seconds for each PUBLISH object.

The default value is -1. It means, that the PUBLISH will not expire.

Please keep in mind, that memory might not be released immediately after the TTL expires. It can be released at a later time when HiveMQ’s internal cleanup-jobs run.

The TTL can be set via the constructor or by using the method setTTL.

Set TTL in the constructor
    final PUBLISH message = new PUBLISH("message".getBytes(), "topic", QoS.EXACTLY_ONCE, -1);
Set TTL in a OnPublishReceivedCallback
    @Override
    public void onPublishReceived(final PUBLISH publish, final ClientData clientData) throws OnPublishReceivedException {
        publish.setTTL(3);
    }
It’s not allowed to set the time-to-live in OnPublishSend callback.

It’s also possible to get information about the already set time-to-live and the creation timestamp of the PUBLISH. This timestamp is used by HiveMQ to check if a PUBLISH has expired.

Gotchas

  • If you don’t want to modify the original MQTT PUBLISH message, always make a deep copy with PUBLISH.copy of the original PUBLISH object for further modifications.

  • The publishToClient method is rather expensive. Don’t use this method excessively.

$SYS Topic Service

The $SYS Topic Service is used for querying, adding, removing and modifying $SYS Topics at runtime.

$SYS topic publishes are executed periodically if the HiveMQ $SYS topic plugin is installed.

The HiveMQ SYS Topic Plugin is activated by default. The source code is open source under a business friendly Apache 2 license and it’s recommended to study the source code if you want to learn how to use the SYS topic for your own plugins.

Custom $SYS topics can be hooked into the HiveMQ source with this service. When the default $SYS topic plugin is installed, clients can not publish to $SYS topics.

API

    /**
     * Returns a Collection of all {@link SYSTopicEntry}s topics available.
     * It is possible to filter the returned list by {@link Type}.
     * By default, this method will return all {@link SYSTopicEntry}s.
     *
     * @param types the {@link Type}s to filter
     * @return a Collection of all $SYS topics registered to HiveMQ of the given Types
     */
    Collection<SYSTopicEntry> getAllEntries(Type... types);

    /**
     * Checks if the SYSTopicService contains a given {@link SYSTopicEntry}.
     *
     * @param entry the {@link SYSTopicEntry} to check
     * @return <code>true</code> if the Service contains the given {@link SYSTopicEntry},
     * <code>false</code> otherwise
     */
    boolean contains(final SYSTopicEntry entry);

    /**
     * Returns the {@link SYSTopicEntry} for a given topic.
     *
     * @param topic the topic
     * @return an {@link Optional} of a {@link SYSTopicEntry}
     */
    Optional<SYSTopicEntry> getEntry(String topic);

    /**
     * Adds a new $SYS topic entry.
     * The topic of the given {@link SYSTopicEntry} must start with '$SYS'.
     *
     * @param entry a new {@link SYSTopicEntry} to add
     */
    void addEntry(final SYSTopicEntry entry);

    /**
     * Removes a {@link SYSTopicEntry}
     *
     * @param entry the entry to remove
     * @return <code>true</code> if the {@link SYSTopicEntry} could be removed, <code>false</code> otherwise
     */
    boolean removeEntry(final SYSTopicEntry entry);

    /**
     * Triggers a PUBLISH of all registered SYSTopicEntries with {@link Type} STANDARD to all clients which are subscribed to the SYSTopics
     *
     * @since 3.0
     */
    void triggerStandardSysTopicPublish();

    /**
     * Triggers a PUBLISH of all registered SYSTopicEntries with {@link Type} STATIC to a specified client.
     * The client receives all messages from topics it is subscribed on.
     *
     * @param clientId The clientid for which to trigger the publishes
     * @since 3.0
     */
    void triggerStaticSysTopicPublishToClient(final String clientId);

    /**
     * Triggers a PUBLISH of all registered SYSTopicEntries with {@link Type} STANDARD to a specified client.
     * The client receives all messages from topics it is subscribed on.
     *
     * @param clientId The clientid for which to trigger the publishes
     * @since 3.0
     */
    void triggerStandardSysTopicPublishToClient(final String clientId);

Example Usage

Add a new $SYS Topic at runtime
 sysTopicService.addEntry(new SYSTopicEntry() {
         @Override
         public String topic() {
             return "$SYS/new";
         }

         @Override
         public Supplier<byte[]> payload() {
             return Suppliers.ofInstance("test".getBytes("UTF-8"));
         }

         @Override
         public Type type() {
             return Type.STANDARD;
         }

         @Override
         public String description() {
             return "the description";
         }
 });


Remove all static $SYS topics at runtime
 final Collection<SYSTopicEntry> allStaticEntries = sysTopicService.getAllEntries(Type.STATIC);

 for (SYSTopicEntry entry : allStaticEntries) {
     sysTopicService.removeEntry(entry);
 }


Get payload for a specific $SYS topic at runtime
 final Optional<SYSTopicEntry> entry = sysTopicService.getEntry("$SYS/broker/uptime");

 if (entry.isPresent()) {
     log.info("Uptime is {}", new String(entry.get().payload().get(), Charsets.UTF_8));
 }


Send standard sys topic every minute
@Override
public void onBrokerStart() throws BrokerUnableToStartException {
    sysTopicRegistration.registerSysTopics();

    final long publishInterval = 60;
    pluginExecutorService.scheduleAtFixedRate(new TriggerDynamicSysTopic(), publishInterval, publishInterval, TimeUnit.SECONDS);

}

private class TriggerDynamicSysTopic implements Runnable {

    @Override
    public void run() {
        sysTopicService.triggerStandardSysTopicPublish();
    }
}

Gotchas

  • To add custom topics to the $SYS topic tree, these topics must start with $SYS/.

  • As long as the HiveMQ SYS Topic Plugin is located in the plugin folder, $SYS topic messages will be published automatically.

Plugin Executor Service

Many MQTT integrations depend on expensive (in terms of CPU time) operations like:

  • Calling webservices

  • Persisting or querying data from a database

  • Writing data to disk

  • Any other blocking operation

The main paradigm in HiveMQ plugins is that blocking is not allowed. But what to do if your plugin business logic depends on blocking operations? In order to make these operations asynchronous, use the HiveMQ managed PluginExecutorService. This HiveMQ-managed executor service is shared between all HiveMQ plugins and can be monitored by the standard HiveMQ monitoring system.

The PluginExecutorService is sophisticated implementation which can be used as a ListeningExecutorService. This allows to use a callback based Future handling for true non-blocking behaviour.

The plugin executor service also allows to schedule tasks periodically.

It’s important that you never create your own thread pools. This can decrease the performance of HiveMQ significantly. This is especially true for Java Cached Thread pools, since these increase Java threads without any limit and can make your system unresponsive. If you’re using a library which uses these Thread pools (like the Jersey Client library), make sure to limit the amount of threads!
Inject and use Plugin Executor Service
import com.google.inject.Inject;
import com.hivemq.spi.callback.CallbackPriority;
import com.hivemq.spi.callback.events.OnConnectCallback;
import com.hivemq.spi.callback.exception.RefusedConnectionException;
import com.hivemq.spi.message.CONNECT;
import com.hivemq.spi.security.ClientData;
import com.hivemq.spi.services.PluginExecutorService;

public class ClientConnect implements OnConnectCallback {

    private final PluginExecutorService pluginExecutorService;

    @Inject
    public ClientConnect(PluginExecutorService pluginExecutorService) {
        this.pluginExecutorService = pluginExecutorService;
    }

    @Override
    public void onConnect(CONNECT connect, ClientData clientData) throws RefusedConnectionException {
        pluginExecutorService.execute(new Runnable() {

            @Override
            public void run() {
                expensiveTask();
            }
        });
    }

    private void expensiveTask(){
        ...
    }

    @Override
    public int priority() {
        return CallbackPriority.MEDIUM;
    }
}


Log incoming publishes per minute
@Override
public void onBrokerStart() throws BrokerUnableToStartException {
    pluginExecutorService.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            Meter incomingPublishRate = metricService.getHiveMQMetric(HiveMQMetrics.INCOMING_PUBLISH_RATE);
            logger.info("Incoming publishes last minute = {}", incomingPublishRate.getOneMinuteRate());
        }
    }, 1, 1, TimeUnit.MINUTES);
}


Adding a callback for return values
private void methodWithListenableFuture() {
    final ListenableFuture<Object> result = pluginExecutorService.submit(new Callable<Object>() {
          @Override
          public Object call() throws Exception {
              return expensiveTask();
          }
      });

      Futures.addCallback(result, new FutureCallback<Object>() {
          @Override
          public void onSuccess(@Nullable Object result) {
              log.info("Expensive result {}", result.toString());
          }

          @Override
          public void onFailure(Throwable t) {
              t.printStackTrace();
          }
      });
}

private Object expensiveTask() {
    ...
}

Log Service

The Log Service enables the plugin, to change the log level of internal logger of HiveMQ at runtime pragmatically.

API

public interface LogService {

    /**
     * The loglevels HiveMQ supports
     */
    enum LogLevel {

        /**
         * The TRACE log level for finest HiveMQ logging
         */
        TRACE,
        /**
         * The DEBUG log level for fine HiveMQ logging
         */
        DEBUG,
        /**
         * The INFO log level. INFO logging is the default HiveMQ log behaviour
         */
        INFO,
        /**
         * The WARN log level which only logs warnings
         */
        WARN,
        /**
         * The ERROR log level which only logs severe HiveMQ errors
         */
        ERROR
    }

    /**
     * Changes the log level of the internal HiveMQ logger
     * <p/>
     * This does not support <code>null</code> parameters. If you pass
     * <code>null</code>, this method is lenient and will ignore the parameter
     *
     * @param logLevel the new loglevel
     */
    void setLogLevel(@NotNull LogLevel logLevel);

    /**
     * Returns the current log level of the internal HiveMQ logger
     *
     * @return the current log level of the internal HiveMQ logger
     */
    LogLevel getLogLevel();
}

Example Usage

Set log level to debug if plugin is deployed
import com.google.inject.Inject;
import com.hivemq.spi.callback.CallbackPriority;
import com.hivemq.spi.callback.events.broker.OnBrokerStart;
import com.hivemq.spi.callback.exception.BrokerUnableToStartException;
import com.hivemq.spi.services.LogService;

public class HiveMQStart implements OnBrokerStart {

    private final LogService logService;

    @Inject
    public HiveMQStart(LogService logService) {
        this.logService = logService;
    }

    @Override
    public void onBrokerStart() throws BrokerUnableToStartException {
        if (logService.getLogLevel() != LogService.LogLevel.DEBUG) {
            logService.setLogLevel(LogService.LogLevel.DEBUG);
        }
    }

    @Override
    public int priority() {
        return CallbackPriority.MEDIUM;
    }
}

Metric Service

HiveMQ 3 introduced a new, holistic monitoring model. While HiveMQ 2 recalculated metrics at specific points of time, HiveMQ 3 metrics are always current and accurate.

HiveMQ uses the de-facto standard Java metrics library Dropwizard Metrics, so plugin developers can benefit from the whole ecosystem about that library and writing custom integrations is a breeze.

The MetricService of HiveMQ can be used to gather information about pre-defined HiveMQ metrics or can be used to hook in custom metrics defined by your plugin. There’s no artificial wrapper, so you can use the feature-rich Dropwizard Metrics library directly.

For gathering pre-defined HiveMQ metrics, the constant class com.hivemq.spi.metrics.HiveMQMetrics can be used.

Blocking API

/**
 * Returns a specific HiveMQ metric. If the metric does not exist, this method will return
 * <code>null</code>.
 * <p/>
 * For a list of all available metrics, refer to the {@link com.hivemq.spi.metrics.HiveMQMetrics} constant class.
 *
 * @param metric the metric
 * @param <T>    the metric type
 * @return the metric (if available) or <code>null</code>
 */
@Nullable
<T extends Metric> T getHiveMQMetric(HiveMQMetric<T> metric);

/**
 * Returns a map which contains an entry for every node in the cluster with the given HiveMQ metric. An entry represents the name of the node (key) and the associated metric for the node (value).
 * If the metric does not exist, this method will return <code>null</code>.
 * <p/>
 * For a list of all available metrics, refer to the {@link com.hivemq.spi.metrics.HiveMQMetrics} constant class.
 *
 * @param metric the metric
 * @param <T>    the metric type
 * @return A map with node names and their sought-after metrics or <code>null</code>.
 */
@Nullable
<T extends Metric> Map<String, T> getClusterMetric(HiveMQMetric<T> metric);

/**
 * Returns the metric registry of HiveMQ.
 *
 * @return the metric registry
 */
MetricRegistry getMetricRegistry();

Example Usage

Monitor specific metrics
private void logConnectionsHistogram() {
    Histogram connections = metricService.getHiveMQMetric(HiveMQMetrics.CONNECTIONS_OVERALL_MEAN);

    final Snapshot snapshot = connections.getSnapshot();

    logger.info("Mean: {}ms", snapshot.getMean() / 1000 / 1000);
    logger.info("StdDev: {}ms", snapshot.getStdDev() / 1000 / 1000);
    logger.info("Min: {}ms", snapshot.getMin() / 1000 / 1000);
    logger.info("Max: {}ms", snapshot.getMax() / 1000 / 1000);
    logger.info("Median: {}ms", snapshot.getMedian() / 1000 / 1000);
    logger.info("75th percentile: {}ms", snapshot.get75thPercentile() / 1000 / 1000);
    logger.info("95th percentile: {}ms", snapshot.get95thPercentile() / 1000 / 1000);
    logger.info("98th percentile: {}ms", snapshot.get98thPercentile() / 1000 / 1000);
    logger.info("99th percentile: {}ms", snapshot.get99thPercentile() / 1000 / 1000);
    logger.info("999th percentile: {}ms", snapshot.get999thPercentile() / 1000 / 1000);
}
Add custom metrics
private void addCustomMetrics() {

    //Adding different metric types
    final Meter meter = metricService.getMetricRegistry().meter("my-meter");
    final Timer timer = metricService.getMetricRegistry().timer("my-timer");
    final Counter counter = metricService.getMetricRegistry().counter("my-counter");
    final Histogram histogram = metricService.getMetricRegistry().histogram("my-histogram");

    //Remove a metric
    metricService.getMetricRegistry().remove("my-meter");
}

Async API

/**
 * Returns a specific HiveMQ metric. If the metric does not exist, this method will return
 * <code>null</code>.
 * <p/>
 * For a list of all available metrics, refer to the {@link com.hivemq.spi.metrics.HiveMQMetrics} constant class.
 *
 * @param metric the metric
 * @param <T>    the metric type
 * @return a {@link com.google.common.util.concurrent.ListenableFuture} with the metric (if available) or a <code>null</code> result.
 */
<T extends Metric> ListenableFuture<T> getHiveMQMetric(HiveMQMetric<T> metric);

/**
 * Returns a map which contains an entry for every node in the cluster with the given HiveMQ metric. An entry represents the name of the node (key) and the associated metric for the node (value).
 * If the metric does not exist, this method will return <code>null</code>.
 * <p/>
 * For a list of all available metrics, refer to the {@link com.hivemq.spi.metrics.HiveMQMetrics} constant class.
 *
 * @param metric the metric
 * @param <T>    the metric type
 * @return A {@link com.google.common.util.concurrent.ListenableFuture} with a map that contains node names and their sought-after metrics or <code>null</code>.
 */
<T extends Metric> ListenableFuture<Map<String, T>> getClusterMetric(HiveMQMetric<T> metric);

/**
 * Returns the metric registry of HiveMQ.
 *
 * @return the metric registry
 */
MetricRegistry getMetricRegistry();

Example Usage

Monitor specific metrics
ListenableFuture<Counter> future = metricService
        .getHiveMQMetric(HiveMQMetrics.INCOMING_DISCONNECT_COUNT);
Futures.addCallback(future, new FutureCallback<Counter>() {
    @Override
    public void onSuccess(Counter counter) {
        log.info("Incoming disconnects = {}", counter.getCount());
    }

    @Override
    public void onFailure(Throwable throwable) {
        throwable.printStackTrace();
    }
});

Cluster Metrics

The Metric Service also provides a method to collect specific metrics from the entire cluster (getClusterMetric). Note that this method will not sum up the results. Instead you will receive one metric for each HiveMQ instance in your cluster. More specifically the method will return a map of metrics. The keys of the map are string representations of the cluster-ID of the HiveMQ instances. The values of the map are the requested metrics.

The cluster-ID of each HiveMQ instance is shown in the HiveMQ log on startup. Example: INFO - This node’s cluster-ID is O4QrP

Example Usage

Request specific cluster metrics
private void logClusterMetric() {
    Map<String, Histogram> clusterMetric = metricService.getClusterMetric(HiveMQMetrics.CONNECTIONS_OVERALL_MEAN);
    for (String nodeName : clusterMetric.keySet()) {
        Histogram histogram = clusterMetric.get(nodeName);
        double mean = histogram.getSnapshot().getMean();
        log.info("Node {} connections mean = {}", nodeName, mean);
    }
}
Some of the metrics are influenced by the replication factor of the cluster. For example if the replicate-count of the subscriptions persistence is set to one, than the sum of subscriptions in the entire cluster will be twice as much as it would be without any replication. That is because every instance is holding more data than its portion of the cluster.

Configuration Service

HiveMQ configurations are typically set at startup time via the config.xml configuration file. However, it may be desirable to change configurations at runtime. Most of the configurations of HiveMQ can be changed via the plugin system at runtime via the different configuration services.

The following table shows the available configuration services:

Table 2. Configuration Services
Name Purpose API

GeneralConfigurationService

General HiveMQ configurations

Available on Github

ListenerConfigurationService

Allows to bind new listeners at runtime

Available on Github

MqttConfigurationService

MQTT specific configurations like retryIntervals, max client identifier length, …​

Available on Github

ThrottlingConfigurationService

Configurations to throttle traffic globally or to limit the maximum amount of MQTT connections

Available on Github

In order to use one of the concrete subservices in your plugin, you can use the following approaches:

  • Inject the desired configuration subservice directly

  • Inject the ConfigurationService interface and use your desired configuration service via this interface.

API

public interface ConfigurationService {

    /**
     * Returns the configuration service for general HiveMQ configuration
     *
     * @return the general HiveMQ configuration service
     */
    GeneralConfigurationService generalConfiguration();

    /**
     * Returns the configuration service which allows to add and inspect listeners add runtime.
     *
     * @return the listener configuration service
     */
    ListenerConfigurationService listenerConfiguration();

    /**
     * Returns the configuration service for MQTT configuration
     *
     * @return the mqtt configuration service
     */
    MqttConfigurationService mqttConfiguration();

    /**
     * Returns the throttling configuration service for global throttling
     *
     * @return the global throttling service
     */
    ThrottlingConfigurationService throttlingConfiguration();
}

REST Service

The RESTService allows plugins to create accessible HTTP APIs directly within HiveMQ.

Any HTTP content can be served directly from HiveMQ. With the RESTService it’s possible to use:

The servlets and JAX-RS resources can be used to interact with HiveMQ by using other services. Interacting with HiveMQ is not necessary, you can also just use the internal HTTP server of HiveMQ to avoid setting up an external HTTP server for your existing servlets or JAX-RS resources.

By default HiveMQ will serve JAX-RS Applications or JAX-RS Resources with the root path / and serve Servlets under the root path /servlet.

Multiple Listeners

HiveMQ can serve different HTTP endpoints called listeners. These listeners can be added to HiveMQ either by configuring them in HiveMQ’s config file or by declaring them programmatically.

You can serve different content on different listeners or on all available listeners. You can even serve some content on a specific listener and some other content on all listeners.

Each listener needs at least fhe following information:

Table 3. Minimum Listener Information
Name Description

name

The unique name of the listener. This name is used to identify the listener.

bind address

The address the listener should be bound to. "0.0.0.0" means that the listener should be bound on all network interfaces.

bind port

The port the listener should be bound to. Should be > 1024.

Example config.xml
<?xml version="1.0"?>
<hivemq xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:noNamespaceSchemaLocation="../../hivemq-config.xsd">

    ...
    <rest-service>
        <servlet-path>/servlet</servlet-path>
        <jax-rs-path>/*</jax-rs-path>
        <listeners>
            <http-listener>
                <name>monitoring</name>
                <bind-address>192.168.1.2</bind-address>
                <port>8080</port>
            </http-listener>
        </listeners>
    </rest-service>
    ...
</hivemq>

Programmatic configuration

It’s possible to add listeners programmatically. This is useful if your plugin should be self-contained and you want to make your plugin a drop-in installation without the need to change configurations in the config file. You can add the listeners at HiveMQ start or even lazy on demand.

Lazy loading listeners
If you start listeners on demand, they will start without any resources or servlets deployed to them, even if you added some resources to all listeners before that listener started. Adding resources to all listeners only applies to running listeners.

Servlets

HiveMQ’s RESTService can serve Servlets. For use cases where JAX-RS resources may look like overkill or if you want to serve binary content (like PDF-documents or downloads), Servlets come in handy.

The Servlet 3.1 specification is fully supported, which also allows to use Async-Servlets which play very nice with HiveMQ’s Async Services

The RESTService also supports Servlet Filters. These Servlet Filters are able to use dependency injection.

By default HiveMQ will serve Servlets under the path /servlet.

No metadata needed
You don’t need to annotate your servlets with a @WebServlet annotation and there is no need for a web.xml. A servlet just needs to extend the HttpServlet class.

JAX-RS

The RESTService can be used with the JAX-RS 2.0 API, which provides a simple and well-known principle for implementing web services with Java. Standard JAX-RS resources and annotations can be used with HiveMQ.

Async JAX-RS resources are supported (and recommended) as well as standard JAX-RS resources.

The RESTService also supports more advanced JAX-RS features like ExceptionMappers, ContextResolvers and all other JAX-RS components you may need for your API.

Dependency Injection
While dependency injection in JAX-RS resources and servlets is mainly supported there is one caveat: During bootstrap, e.g. in constructors, it is not possible to use dependency injection to inject instances of classes that were bound in the HiveMQPluginModule.configure() method. After bootstrap, e.g. OnBrokerStart.onBrokerStart(), @PostConstruct, there are no limitations on the usage of dependency injection.

API

public interface RESTService {

    /**
     * Adds a new listener to the REST Service programmatically.
     * <p/>
     * Note: Typically the listeners are added via the configuration file. If adding
     * the configuration via the config file does not suit your needs, the programmatic API
     * offers a convenient way to add an arbitrary number of listeners at runtime.
     * <p/>
     * Note: If you add listeners at runtime, existing servlets or JAX-RS resource on other listeners won't be added
     * to this new listener automatically.
     *
     * @param listener the {@link Listener} implementation
     * @throws NullPointerException     if the passed listener is <code>null</code>
     * @throws IllegalArgumentException if the passed listener is not a valid listener type (like {@link HttpListener}
     */
    void addListener(@NotNull Listener listener);

    /**
     * Returns an immutable view of all listeners.
     * <p/>
     * In order to get more information you need to downcast to a specific listener type. You can do this e.g. by
     * code such as
     * <pre>
     * if (listener instanceof HttpListener) {
     *      HttpListener httpListener = (HttpListener) listener;
     * }
     * </pre>
     *
     * @return an immutable view of all listeners
     */
    @ReadOnly
    Collection<Listener> getListeners();

    /**
     * Adds a servlet instance to a specific path and adds the servlet to all available listeners.
     *
     * @param servlet the servlet to add
     * @param path    the path to bind the servlet to
     * @throws NullPointerException if the servlet or the path is null
     */
    void addServlet(@NotNull HttpServlet servlet, @NotNull String path);

    /**
     * Adds a servlet instance to a specific path and adds the servlet to all specified listeners. If the
     * collection of listeners is empty, the servlet will be added to all available listeners
     *
     * @param servlet             the servlet to add
     * @param path                the path to bind the servlet to
     * @param listenerIdentifiers a collection with identifiers of listeners
     * @throws NullPointerException if the servlet, the path or the listener identifiers collection is null
     */
    void addServlet(@NotNull HttpServlet servlet, @NotNull String path, @NotNull Collection<String> listenerIdentifiers);


    /**
     * Adds a servlet to a specific path and adds the servlet to all available listeners.
     * <p/>
     * The given servlet class will be instantiated by HiveMQ and the servlet can use dependency injection.
     * <p/>
     * The servlet will be instantiated once and not per request, so it's essentially a singleton.
     *
     * @param servlet the servlet to add
     * @param path    the path to bind the servlet to
     * @return The instantiated servlet
     * @throws NullPointerException if the servlet or the path is null
     */
    <T extends HttpServlet> T addServlet(@NotNull Class<T> servlet, @NotNull String path);

    /**
     * Adds a servlet to a specific path and adds the servlet to all specified listeners. If the
     * collection of listeners is empty, the servlet will be added to all available listeners
     * <p/>
     * The given servlet class will be instantiated by HiveMQ and the servlet can use dependency injection.
     * <p/>
     * The servlet will be instantiated once and not per request, so it's essentially a singleton.
     *
     * @param servlet             the servlet to add
     * @param path                the path to bind the servlet to
     * @param listenerIdentifiers a collection with identifiers of listeners
     * @return The instantiated servlet
     * @throws NullPointerException if the servlet or the path is null
     */
    <T extends HttpServlet> T addServlet(@NotNull Class<T> servlet, @NotNull String path, @NotNull Collection<String> listenerIdentifiers);


    /**
     * Adds a specific {@link ServletFilter} with a given path to the RESTService on all available listeners.
     *
     * @param filter the {@link ServletFilter}
     * @param path   the Path
     * @param <T>    the {@link Filter} which is contained in the {@link ServletFilter}
     * @return the concrete {@link Filter} instance
     * @throws NullPointerException if the {@link ServletFilter} or path is null
     */
    <T extends Filter> T addFilter(@NotNull ServletFilter<T> filter, @NotNull String path);

    /**
     * Adds a specific {@link ServletFilter} with a given path to the RESTService on specific listeners.
     *
     * @param filter    the {@link ServletFilter}
     * @param path      the Path
     * @param listeners a collection of listeners
     * @param <T>       the {@link Filter} which is contained in the {@link ServletFilter}
     * @return the concrete {@link Filter} instance
     * @throws NullPointerException if the {@link ServletFilter}, path or listener collection is null
     */
    <T extends Filter> T addFilter(@NotNull ServletFilter<T> filter, @NotNull String path, @NotNull Collection<String> listeners);


    /**
     * Adds a servlet instance to a specific path and adds the servlet to all available listeners.
     * <p/>
     * Additionally a servlet filter is added directly to the servlet path mapping. This is a convenient
     * method if you need a specific filter only for one servlet
     *
     * @param servlet the servlet to add
     * @param path    the path to bind the servlet to
     * @param filters an arbitrary amount of filters for this specific servlet
     * @throws NullPointerException if the servlet, the path or the filter array is null
     */
    @SuppressWarnings("unchecked")
    void addServletWithFilters(@NotNull HttpServlet servlet, @NotNull String path, @NotNull ServletFilter<? extends Filter>... filters);

    /**
     * Adds a servlet instance to a specific path and adds the servlet to all specific listeners.
     * <p/>
     * Additionally a servlet filter is added directly to the servlet path mapping. This is a convenient
     * method if you need a specific filter only for one servlet and only on specific listeners
     *
     * @param servlet             the servlet to add
     * @param path                the path to bind the servlet to
     * @param listenerIdentifiers a collection of listener identifierd
     * @param filters             an arbitrary amount of filters for this specific servlet
     * @throws NullPointerException if the servlet, the path, the listener collection or the filter array is null
     */
    @SuppressWarnings("unchecked")
    void addServletWithFilters(@NotNull HttpServlet servlet, @NotNull String path, @NotNull Collection<String> listenerIdentifiers, @NotNull ServletFilter<? extends Filter>... filters);

    /**
     * Adds a servlet to a specific path and adds the servlet to all available listeners. If the
     * collection of listeners is empty, the servlet will be added to all available listeners
     * <p/>
     * The given servlet class will be instantiated by HiveMQ and the servlet can use dependency injection.
     * <p/>
     * Additionally a servlet filter is added directly to the servlet path mapping. This is a convenient
     * method if you need a specific filter only for one servlet
     *
     * @param servlet the servlet to add
     * @param path    the path to bind the servlet to
     * @param filters an arbitrary amount of filters for this specific servlet
     * @return the instantiated servlet
     * @throws NullPointerException if the servlet, the path or the filter array is null
     */
    @SuppressWarnings("unchecked")
    <T extends HttpServlet> T addServletWithFilters(@NotNull Class<T> servlet, @NotNull String path, @NotNull ServletFilter<? extends Filter>... filters);

    /**
     * Adds a servlet to a specific path and adds the servlet to all specified listeners. If the
     * collection of listeners is empty, the servlet will be added to all available listeners
     * <p/>
     * The given servlet class will be instantiated by HiveMQ and the servlet can use dependency injection.
     * <p/>
     * Additionally a servlet filter is added directly to the servlet path mapping. This is a convenient
     * method if you need a specific filter only for one servlet
     *
     * @param servlet             the servlet to add
     * @param path                the path to bind the servlet to
     * @param listenerIdentifiers the collection of listeners
     * @param filters             an arbitrary amount of filters for this specific servlet
     * @return the instantiated servlet
     * @throws NullPointerException if the servlet, the path or the filter array is null
     */
    @SuppressWarnings("unchecked")
    <T extends HttpServlet> T addServletWithFilters(@NotNull Class<T> servlet, @NotNull String path, @NotNull Collection<String> listenerIdentifiers, @NotNull ServletFilter<? extends Filter>... filters);


    /**
     * Adds a {@link Application} to the RESTService.
     * <p/>
     * Please be aware that when using this method, only all properties, classes and singletons are
     * added to the RESTService, the {@link Application} and all annotations on the {@link} Application
     * are ignored. So essentially this is a convenient method which allows you to add a lot of resources at once
     * <p/>
     * Important: {@link javax.ws.rs.ApplicationPath} annotations are ignored.
     * <p/>
     * All resources defined in the Application will be added to all available listeners.
     *
     * @param application the {@link Application}
     * @throws NullPointerException if the passed {@link Application} is null
     */
    void addJaxRsApplication(@NotNull Application application);

    /**
     * Adds a {@link Application} to the RESTService.
     * <p/>
     * Please be aware that when using this method, only all properties, classes and singletons are
     * added to the RESTService, the {@link Application} and all annotations on the {@link} Application
     * are ignored. So essentially this is a convenient method which allows you to add a lot of resources at once
     * <p/>
     * Important: {@link javax.ws.rs.ApplicationPath} annotations are ignored.
     * <p/>
     * All resources defined in the Application will be added to all specified listeners.
     *
     * @param application         the {@link Application}
     * @param listenerIdentifiers a collection of listeners
     * @throws NullPointerException if the passed {@link Application} or the collection of listeners is null
     */
    void addJaxRsApplication(@NotNull Application application, @NotNull Collection<String> listenerIdentifiers);

    /**
     * Adds all given JAX-RS resources as singleton to all available listeners.
     * Since you have to instantiate the singleton objects on your own,
     * these singletons can't use dependency injection by HiveMQ and you have to pass your dependencies on your own
     * to the objects.
     * <p/>
     * If you want to have singletons which use dependency injection, consider using another method of the RESTService
     * which accepts classes instead of objects. You can annotate these methods with {@link javax.inject.Singleton}
     *
     * @param singletons an arbitrary number of singleton resources
     * @throws NullPointerException if the singleton array is null
     */
    void addJaxRsSingletons(@NotNull Object... singletons);

    /**
     * Adds all given JAX-RS resources as singleton to all specified listeners.
     * Since you have to instantiate the singleton objects on your own,
     * these singletons can't use dependency injection by HiveMQ and you have to pass your dependencies on your own
     * to the objects.
     * <p/>
     * If you want to have singletons which use dependency injection, consider using another method of the RESTService
     * which accepts classes instead of objects. You can annotate these methods with {@link javax.inject.Singleton}
     *
     * @param singletons          an collection of singleton resources
     * @param listenerIdentifiers a collection of listeners
     * @throws NullPointerException if the singleton array is null
     */
    void addJaxRsSingletons(@NotNull Collection<Object> singletons, @NotNull Collection<String> listenerIdentifiers);

    /**
     * Adds an arbitrary number of JAX-RS resources to all available listeners.
     * These resources can use dependency injection and HiveMQ will instantiate the resources for you at runtime
     *
     * @param resources a arbitrary number of JAX-RS resource classes
     * @throws NullPointerException if the resources array is null
     */
    void addJaxRsResources(@NotNull Class<?>... resources);

    /**
     * Adds an arbitrary number of JAX-RS resources to all specified listeners.
     * These resources can use dependency injection and HiveMQ will instantiate the resources for you at runtime
     *
     * @param resources           a collection of JAX-RS resource classes
     * @param listenerIdentifiers a collection of listeners
     * @throws NullPointerException if the resources array or the listener collection is null
     */
    void addJaxRsResources(@NotNull Collection<Class<?>> resources, @NotNull Collection<String> listenerIdentifiers);
}

Example Usage

Adding a HTTP listener programmatically at startup

The following example implements a callback that is triggered on broker start and creates a new HTTP Listener that is started automatically.

public class StartListenerOnStartup implements OnBrokerStart {

    private final RESTService restService;

    @Inject
    StartListenerOnStartup(final RESTService restService) { (1)
        this.restService = restService;
    }

    @Override
    public int priority() {
        return CallbackPriority.HIGH;
    }

    @Override
    public void onBrokerStart() throws BrokerUnableToStartException {
        restService.addListener(new HttpListener("listener-name", "0.0.0.0", 8888)); (2)
    }
}
1 Inject the REST service
2 Create and start a new listener at runtime

List all available listeners

This example logs all details of all listeners that are started currently.

final Collection<Listener> listeners = restService.getListeners();
for (Listener listener : listeners) {
    if (listener instanceof HttpListener) {
        final HttpListener httpListener = (HttpListener) listener;
        log.info("Listener with name {}, bind address {} and port {} is available",
                httpListener.getName(), httpListener.getBindAddress(), httpListener.getPort());
    }
}

Add a synchronous servlet which creates a HTML page that lists all connected MQTT clients

This example shows how to add a plain Java servlet to the RESTService. The Servlet can use the standard HiveMQ dependency injection mechanism, which allows to add HiveMQ services.

Please note that it’s not a good idea to use this example in production if you have lots of connected clients. This example is only for demonstration purposes and you might want to check out the JAX-RS resources if you plan to do something fancy.

public class PrintAllClientsServlet extends HttpServlet{
    private final BlockingClientService clientService;

    @Inject
    PrintAllClientsServlet(final BlockingClientService clientService) { (1)
        this.clientService = clientService;

    public void doGet(final HttpServletRequest request, final HttpServletResponse response) throws IOException
        final Set<String> connectedClients = clientService.getConnectedClients(); (2)
        final PrintWriter writer = response.getWriter();
        writer.write("<html><body><ul>");
        for (String clientId : connectedClients) { (3)
            writer.write("<li>" + clientId + "</li>");
        }
        writer.write("</ul></body></html>");
    }
}

/* ************************* */
/* Now somewhere in the code */
/* ************************* */

/* Add the servlet to all listeners */
restService.addServlet(PrintAllClientsServlet.class, "/clients");

/* Add the servlet to a specific listener*/
restService.addServlet(PrintAllClientsServlet.class, "/clients", Collections.singletonList("listener1"));

/* Adds the servlet to more specific listeners */
final List<String> listeners = new ArrayList<>();
listeners.add("listener1");
listeners.add("listener2");
restService.addServlet(PrintAllClientsServlet.class, "/clients", listeners);
1 Inject a Client Service that is blocking (which is a good fit for synchronous servlets).
2 Get all connected clients
3 Iterate and write all clients to the servlet output

Return all metrics as JSON with JAX-RS

The following example collects all HiveMQ metrics and returns them via HTTP as JSON. The BlockingMetricService gets injected via the standard HiveMQ dependency injection mechanisms.

@Path("/example")
@Produces("application/json")
public class ExampleMetricResource {

    private final BlockingMetricService metricService;

    @Inject
    ExampleMetricResource(final BlockingMetricService subscriptionStore) { (1)
        this.metricService = subscriptionStore;
    }

    @GET
    @Path("/metrics")
    public Response getMetrics() {
        final Map<String, String> results = new TreeMap<>();

        final Map<String, Metric> allMetrics = metricService.getMetricRegistry().getMetrics(); (2)

        for (Map.Entry<String, Metric> stringMetricEntry : allMetrics.entrySet()) {
            final Metric value = stringMetricEntry.getValue();
            if (value instanceof Counting) {

                results.put(stringMetricEntry.getKey(), "" + ((Counting) value).getCount());
            } else if (value instanceof Gauge) {
                results.put(stringMetricEntry.getKey(), ((Gauge) value).getValue().toString());
            }
        }
        return Response.ok(results).build(); (3)
    }
}

/* ************************* */
/* Now somewhere in the code */
/* ************************* */

restService.addJaxRsResources(ExampleMetricResource.class);
1 A BlockingMetricService gets injected
2 We iterate over all metrics of HiveMQ and convert it to a String representation
3 A HTTP response with return code 200 and the Map as JSON is returned to the client

Return all connected clients with an asynchronous servlet

This example uses the Asynchronous Servlet API which doesn’t block and is much better suited for large scale deployments than the blocking servlet API since you can fully implement the servlets together with the asynchronous HiveMQ services.

public class AsyncServlet extends HttpServlet {

    private final AsyncClientService clientService;

    @Inject
    AsyncServlet(final AsyncClientService asyncClientService) {
        this.clientService = asyncClientService;
    }

    public void doGet(final HttpServletRequest request, final HttpServletResponse response) throws IOException {

        final AsyncContext asyncContext = request.startAsync(request, response); (1)

        //since the information could be extremely large and will be collected from all cluster nodes
        //choose a very large timeout to handle worst-case delays.
        asyncContext.setTimeout(5000); (2)

        final ListenableFuture<Set<String>> future = clientService.getConnectedClients();

        Futures.addCallback(future, new FutureCallback<Set<String>>() { (3)

            @Override
            public void onSuccess(final Set<String> result) {
                try {
                    final PrintWriter writer = asyncContext.getResponse().getWriter();

                    writer.write("<html><body><ul>");

                    for (String clientId : result) {
                        writer.write("<li>" + clientId + "</li>");
                    }
                    writer.write("</ul></body></html>");

                } catch (IOException e) {
                    e.printStackTrace();
                } finally{
                    asyncContext.complete(); (4)
                }
            }

            @Override
            public void onFailure(final Throwable t) {
                final HttpServletResponse httpResponse = (HttpServletResponse) asyncContext.getResponse();
                try {
                    httpResponse.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
                } catch (IOException e) {
                    e.printStackTrace();
                } finally{
                    asyncContext.complete();
                }
            }
        });
    }
}
1 Start the asynchronous request
2 Set the timeout so the cluster has a chance to return all clients even for large deployments
3 Add a callback that is executed when the future returns
4 Complete the async request

Return all subscriptions with an asynchronous JAX-RS resource

The following example uses the asynchronous JAX-RS API together with the asynchronous Subscription Store. Although it’s still not a good idea to gather all subscriptions in a huge scale HiveMQ cluster, using asynchronous code has tremendous advantages in performance.

@Path("/example")
@Produces("application/json")
public class ExampleResource {

    private final AsyncSubscriptionStore subscriptionStore;

    @Inject
    public ExampleResource(final AsyncSubscriptionStore subscriptionStore) {
        this.subscriptionStore = subscriptionStore;
    }

    @GET
    @Path("/subscriptions")
    public void getSubscriptions(@Suspended final AsyncResponse asyncResponse) { (1)

        //since the information could be extremely large and will be collected from all cluster nodes
        //choose a very large timeout to handle worst-case delays.
        asyncResponse.setTimeout(5, TimeUnit.SECONDS); (2)

        final ListenableFuture<Multimap<String, Topic>> future = subscriptionStore.getSubscriptions(); (3)

        Futures.addCallback(future, new FutureCallback<Multimap<String, Topic>>() {
            @Override
            public void onSuccess(final Multimap<String, Topic> result) {
                final Response response = Response.ok(result.asMap()).build();
                asyncResponse.resume(response); (4)
            }

            @Override
            public void onFailure(final Throwable t) {
                asyncResponse.resume(t);
            }
        });
    }
}
1 Get the AsyncResponse object
2 Set a timeout
3 Get a future with a Multimap of all Clients and their subscribed topics
4 Resume the response with the payload

Rate Limit

If the configured rate limit is exceeded, the service calls will fail with RateLimitExceededException. The RateLimitExceededException should be handled by the plugin.

See the throttling documentation on how to configure the rate limit.

Example Usage

Handle the service rate limit in a plugin
final ListenableFuture<Set<String>> future = subscriptionStore.getSubscribers(topic);
Futures.addCallback(future, new FutureCallback<Set<String>>() {
    @Override
    public void onSuccess(Set<String> result) {
        log.info("Subscribers = " + result);
    }

    @Override
    public void onFailure(Throwable t) {
        if(t instanceof RateLimitExceededException){
            log.info("Rate limit exceeded");
        }
    }
});