Services

HiveMQ Services provide a convenient way for extensions to interact with the HiveMQ core.

They can be accessed through the Services class.

The following table lists all available HiveMQ Services.

Table 1. Available HiveMQ Services
Service Description

Client Service

Allows extensions to get client session information, disconnect clients and remove sessions.

Subscription Store

Allows extensions to get, remove or add subscriptions for specific clients.

Retained Message Store

Allows extensions to get, remove or add retained messages for specific topics. Or delete all retained messages at once.

Publish Service

Allows extensions to send PUBLISH messages.

Managed Extension Executor

Allows extensions to use a HiveMQ-managed executor service for non-blocking operations.



Client Service

The Client Service allows extensions to gather information about clients:

  • Online status

  • Client identifier

  • Session expiry interval

The Client Service enables extensions to query for session information on all cluster nodes.

The Client Service can be leveraged to forcibly disconnect clients with the option of sending or not sending the client’s Will message.

The Client Service enables extensions to invalidate client sessions for both connected and disconnected clients. For connected clients this will result in the client being disconnected and the Will message being sent.

The JavaDoc for the Client Service can be found here.

Example Usage

This section illustrates common examples on how to use the Client Service in your own extensions.

Accessing Client Service

final ClientService clientService = Services.clientService();


Client Connected

This examples shows how to get information about the online status of a client.

The isClientConnected method returns true for online clients and false for offline clients.

CompletableFuture<Boolean> connectedFuture = clientService.isClientConnected("my-client-id");
Full Example Code
...

@Override
public void extensionStart(final ExtensionStartInput extensionStartInput, final ExtensionStartOutput extensionStartOutput) {

    final String clientId = "client-123";
    final ClientService clientService = Services.clientService();
    CompletableFuture<Boolean> connectedFuture = clientService.isClientConnected(clientId);

    connectedFuture.whenComplete(new BiConsumer<Boolean, Throwable>() {
        @Override
        public void accept(Boolean connected, Throwable throwable) {
            if(throwable == null) {
                System.out.println("Client with id {" + clientId + "} is connected: " + connected);
            } else {
                //please use more sophisticated logging
                throwable.printStackTrace();
            }
        }
    });
}

...


Get Session Information

This examples shows how to get all session information of a client with a specific clientId.

The getSession method returns an Optional of a SessionInformation object. It will be empty if no session was found. Otherwise it contains the following information:

  • is the client connected

  • what is the session expiry interval

  • which client identifier owns the session

CompletableFuture<Optional<SessionInformation>> sessionFuture = clientService.getSession("my-client-id");
Full Example Code
...

@Override
public void extensionStart(final ExtensionStartInput extensionStartInput, final ExtensionStartOutput extensionStartOutput) {

    final String clientId = "my-client-id";
    final ClientService clientService = Services.clientService();
    CompletableFuture<Optional<SessionInformation>> sessionFuture = clientService.getSession(clientId);

    sessionFuture.whenComplete(new BiConsumer<Optional<SessionInformation>, Throwable>() {
        @Override
        public void accept(Optional<SessionInformation> sessionInformationOptional, Throwable throwable) {
            if(throwable == null) {

                if(sessionInformationOptional.isPresent()){
                    SessionInformation information = sessionInformationOptional.get();
                    System.out.println("Session Found");
                    System.out.println("ID: " + information.getClienIdentifier());
                    System.out.println("Connected: " + information.isConnected());
                    System.out.println("Session Expiry Interval " + information.getSessionExpiryInterval());
                } else {
                    System.out.println("No session found for client id: " + clientId);
                }

            } else {
                //please use more sophisticated logging
                throwable.printStackTrace();
            }
        }
    });

}

...


Disconnect Client

This examples shows how to forcibly disconnect a client with a specific clientId. The sending of the client’s optional Will message on this disconnect can be prevented or not.

The disconnectClient method returns true if a client was online and has been disconnected. Otherwise the method returns false.

It is possible to prevent the sending of the Will message. See examples below.

Disconnect client, send Will message
clientService.disconnectClient("my-client-id");
Disconnect client, prevent Will message
clientService.disconnectClient("my-client-id", true)
Full Example Code
...

@Override
public void extensionStart(final ExtensionStartInput extensionStartInput, final ExtensionStartOutput extensionStartOutput) {

    final String clientId = "client-123";
    final ClientService clientService = Services.clientService();
    CompletableFuture<Boolean> disconnectFuture = clientService.disconnectClient(clientId, true);

    disconnectFuture.whenComplete(new BiConsumer<Boolean, Throwable>() {
        @Override
        public void accept(Boolean disconnected, Throwable throwable) {
            if(throwable == null) {
                if(disconnected){
                    System.out.println("Client was successfully disconnected and no Will message was sent");
                } else {
                    System.out.println("Client not found");
                }
            } else {
                //please use more sophisticated logging
                throwable.printStackTrace();
            }
        }
    });

}

...


Invalidate Client Session

This examples shows how to invalidate a client session.

Invalidation means that an online client will be disconnected forcibly and its optional Will message will be sent. The client’s session information will be removed and cannot be restored.

The invalidateSession method returns true if a client was online and has been disconnected. Otherwise the method returns false.

clientService.invalidateSession("my-client-id");
Full Example Code
...

@Override
public void extensionStart(final ExtensionStartInput extensionStartInput, final ExtensionStartOutput extensionStartOutput) {

    final String clientId = "client-123";
    final ClientService clientService = Services.clientService();
     CompletableFuture<Boolean> invalidateSessionFuture = clientService.invalidateSession(clientId);

    invalidateSessionFuture.whenComplete(new BiConsumer<Boolean, Throwable>() {
        @Override
        public void accept(Boolean disconnected, Throwable throwable) {
            if(throwable == null) {
                if(disconnected){
                    System.out.println("Client was disconnected");
                    System.out.println("Will message was sent");
                    System.out.println("Client session was removed");
                } else {
                    System.out.println("Client was offline");
                    System.out.println("Client session was removed");
                }
            } else {
                if(throwable instanceof NoSuchClientIdException){
                    System.out.println("Client not found");
                }
                //please use more sophisticated logging
                throwable.printStackTrace();
            }
        }
    });

}

...



Subscription Store

The Subscription Store allows extensions to:

The SubscriptionStore can be accessed by the Services class.

When a subscription for a client is added via the extension system or HiveMQ Control Center, the retained message for the topics included in the subscription will not be published to the client.

The JavaDoc for the Subscription Store can be found here.

Example Usage

This section illustrates common examples on how to use the Subscription Store in your own extensions.


Accessing Subscription Store

final SubscriptionStore store = Services.subscriptionStore();


Add Subscription

This example shows how to add a subscription to a specific client with the subscription store.

TopicSubscription subscription = Builders.topicSubscription()
            .topicFilter(topic)
            .qos(Qos.AT_MOST_ONCE)
            .build();

Services.subscriptionStore().addSubscription("test-client", subscription);
Full Example Code
...

@Override
public void extensionStart(final ExtensionStartInput extensionStartInput, final ExtensionStartOutput extensionStartOutput) {

    final String topic = "topic";
    final String clientId = "test-client";

    TopicSubscriptionBuilder subscriptionBuilder = Builders.topicSubscription()
            .topicFilter(topic)
            .noLocal(false)
            .retainAsPublished(true)
            .qos(Qos.AT_MOST_ONCE)
            .subscriptionIdentifier(1);


    CompletableFuture<Void> addFuture = Services.subscriptionStore().addSubscription(clientId, subscriptionBuilder.build());

    addFuture.whenComplete(new BiConsumer<Void, Throwable>() {
        @Override
        public void accept(Void aVoid, Throwable throwable) {

            if(throwable != null){
                throwable.printStackTrace();
                return;
            }

            System.out.println("Successfully added subscription for topic: " + topic + " | client: " + clientId);
        }
    });
}

...


Add Multiple Subscriptions

This example shows how to add multiple subscriptions to a specific client with the subscription store.

final Set<TopicSubscription> topicSet = new HashSet<>();

topicSet.add(Builders.topicSubscription().topicFilter("$share/group/topic1").build());
topicSet.add(Builders.topicSubscription().topicFilter("topic2").build());
topicSet.add(Builders.topicSubscription().topicFilter("topic3").build());

Services.subscriptionStore().addSubscriptions("test-client", topicSet);
Full Example Code
...

@Override
public void extensionStart(final ExtensionStartInput extensionStartInput, final ExtensionStartOutput extensionStartOutput) {

    final SubscriptionStore subscriptionStore = Services.subscriptionStore();

    final String clientID = "test-client";

    final Set<TopicSubscription> topicSet = new HashSet<>();

    topicSet.add(Builders.topicSubscription().topicFilter("$share/group/topic1").build());
    topicSet.add(Builders.topicSubscription().topicFilter("topic2").build());
    topicSet.add(Builders.topicSubscription().topicFilter("topic3").build());

    final CompletableFuture<Void> addFuture = subscriptionStore.addSubscriptions(clientID, topicSet);

    addFuture.whenComplete(new BiConsumer<Void, Throwable>() {
        @Override
        public void accept(final Void result, final Throwable throwable) {

            if(throwable != null){
                throwable.printStackTrace();
                return;
            }

            System.out.println("Successfully added subscriptions to client: " + clientID);

        }
    });
}

...


Remove Subscription

This example shows how to remove a subscription from a specific client with the subscription store.

Services.subscriptionStore().removeSubscription("test-client", "topic/to/remove");
Full Example Code
...

@Override
public void extensionStart(final ExtensionStartInput extensionStartInput, final ExtensionStartOutput extensionStartOutput) {

    final String topic = "topic";
    final String clientId = "test-client";

    CompletableFuture<Void> removeFuture = Services.subscriptionStore().removeSubscription(clientId, topic);

    removeFuture.whenComplete(new BiConsumer<Void, Throwable>() {
        @Override
        public void accept(Void aVoid, Throwable throwable) {

            if(throwable != null){
                throwable.printStackTrace();
                return;
            }

            System.out.println("Successfully removed subscription for topic: " + topic + " | client: " + clientId);
        }
    });
}

...


Remove Multiple Subscription

This example shows how to remove multiple subscriptions from a specific client with the subscription store.

final Set<String> topicSet = new HashSet<>();
topicSet.add("topic1");
topicSet.add("topic2");

Services.subscriptionStore().removeSubscriptions("test-client", topicSet);
Full Example Code
...

@Override
public void extensionStart(final ExtensionStartInput extensionStartInput, final ExtensionStartOutput extensionStartOutput) {

    final SubscriptionStore subscriptionStore = Services.subscriptionStore();

    final String clientID = "test-client";

    final Set<String> topicSet = new HashSet<>();
    topicSet.add("$share/group/topic1");
    topicSet.add("topic2");
    topicSet.add("topic3");

    final CompletableFuture<Void> removeFuture = subscriptionStore.removeSubscriptions(clientID, topicSet);

    removeFuture.whenComplete(new BiConsumer<Void, Throwable>() {
        @Override
        public void accept(final Void result, final Throwable throwable) {

            if(throwable != null){
                throwable.printStackTrace();
                return;
            }

            System.out.println("Successfully removed subscriptions for topics: " + topicSet + " | from client: " + clientID);

        }
    });
}

...


Get Subscriptions

This example shows how to get the subscriptions from a specific client with the subscription store.

CompletableFuture<Set<TopicSubscription>> future = Services.subscriptionStore().getSubscriptions("test-client");
Full Example Code
...

@Override
public void extensionStart(final ExtensionStartInput extensionStartInput, final ExtensionStartOutput extensionStartOutput) {

    final String clientId = "test-client";

    CompletableFuture<Set<TopicSubscription>> getFuture = Services.subscriptionStore().getSubscriptions(clientId);

    getFuture.whenComplete(new BiConsumer<Set<TopicSubscription>, Throwable>() {
        @Override
        public void accept(Set<TopicSubscription> topicSubscriptions, Throwable throwable) {

            if(throwable != null){
                throwable.printStackTrace();
                return;
            }

            if(topicSubscriptions.isEmpty()){
                System.out.println("Found no subscriptions for client: " + clientId);
                return;
            }

            System.out.println("Found subscriptions for client: " + clientId);

            for (TopicSubscription topicSubscription : topicSubscriptions) {
                System.out.println("---------------------");
                System.out.println("Topic :" + topicSubscription.getTopicFilter());
                System.out.println("Qos :" + topicSubscription.getQos().getQosNumber());
                System.out.println("No local :" + topicSubscription.getNoLocal());
                System.out.println("Retain as published :" + topicSubscription.getRetainAsPublished());
                System.out.println("Subscription identifier :" + topicSubscription.getSubscriptionIdentifier());
            }

        }
    });
}

...



Retained Message Store

The Retained Message Store allows extensions to:

  • Get the retained message for a specific topic

  • Add/Replace a retained message for a topic

  • Remove the retained message for a topic

  • Clear all retained messages from the HiveMQ cluster.

The RetainedMessageStore can be accessed by the Services class.

Retained messages that are added with the retained message store are not treated as publishes, as is the case with retained messages sent by clients. So clients with a subscription for the topic, where the retained message is added, don’t receive the retained message as a publish. Only if the clients resubscribe would they get the retained message.

The JavaDoc for the Retained Message Store can be found here.

Example Usage

This section illustrates common examples on how to use the Retained Message Store in your own extensions.


Accessing Retained Message Store

final RetainedMessageStore store = Services.retainedMessageStore();


Add Retained Message

This example shows how to add a retained message to a specific topic with the retained message store. Adding a retained message would overwrite the retained message for that topic, should one already exist.

RetainedPublish retainedMessage = retainedPublishBuilder
        .topic("add/message")
        .payload(ByteBuffer.wrap("test".getBytes()))
        .qos(Qos.AT_LEAST_ONCE)
        .build();

Services.retainedMessageStore().addOrReplace(retainedMessage);
Full Example Code
...

@Override
public void extensionStart(final ExtensionStartInput extensionStartInput, final ExtensionStartOutput extensionStartOutput) {

    // 1. build the retained message via the RetainedPublishBuilder
    final RetainedPublishBuilder retainedPublishBuilder = Builders.retainedPublish();

    final RetainedPublish retainedMessage = retainedPublishBuilder
        .topic("add/message")
        .payload(ByteBuffer.wrap("test".getBytes()))
        .userProperty("reason","message-update")
        .qos(Qos.AT_LEAST_ONCE)
        .build();

    // 2. add the retained message (if a retained message already exists for the topic, it will be overwritten)
    final CompletableFuture<Void> addFuture = Services.retainedMessageStore().addOrReplace(retainedMessage);

    addFuture.whenComplete(new BiConsumer<Void, Throwable>() {
        @Override
        public void accept(Void aVoid, Throwable throwable) {

            if(throwable != null){
                throwable.printStackTrace();
                return;
            }

            // 3. log when the message was successfully added/replaced
            System.out.println("Successfully added retained message for topic: " + topic);
        }
    });
}

...


Remove Retained Message

This example shows how to remove a retained message from a specific topic with the retained message store.

Services.retainedMessageStore().remove("topic/to/remove");
Full Example Code
...

@Override
public void extensionStart(final ExtensionStartInput extensionStartInput, final ExtensionStartOutput extensionStartOutput) {

    final String topic = "topic";

    // 1. remove the retained message from the given topic
    final CompletableFuture<Void> removeFuture = Services.retainedMessageStore().remove(topic);

    removeFuture.whenComplete(new BiConsumer<Void, Throwable>() {
        @Override
        public void accept(Void aVoid, Throwable throwable) {

            if(throwable != null){
                throwable.printStackTrace();
                return;
            }

            // 2. log when the message was successfully removed (also happens when no retained message for that topic)
            System.out.println("Successfully removed retained message for topic: " + topic);
        }
    });
}

...


Get Retained Message

This example shows how to get the retained message from a specific topic with the retained message store.

CompletableFuture<Optional<RetainedPublish>> future = Services.retainedMessageStore().getRetainedMessage("topic/to/get");
Full Example Code
...

@Override
public void extensionStart(final ExtensionStartInput extensionStartInput, final ExtensionStartOutput extensionStartOutput) {

    final String topic = "topic";

    // 1. request retained message for topic
    final CompletableFuture<Optional<RetainedPublish>> getFuture = Services.retainedMessageStore().getRetainedMessage(topic);

    getFuture.whenComplete(new BiConsumer<Optional<RetainedPublish>, Throwable>() {
        @Override
        public void accept(Optional<RetainedPublish> retainedPublishOptional, Throwable throwable) {

            if (throwable != null) {
                throwable.printStackTrace();
                return;
            }

            // 2. check if a retained message exists for that topic
            if (!retainedPublishOptional.isPresent()) {
                System.out.println("Found no retained message for topic: " + topic);
                return;
            }

            // 3. log some information about the retained message
            final RetainedPublish retainedPublish = retainedPublishOptional.get();

            System.out.println("Found retained message for topic: " + topic);
            System.out.println("---------------------");
            System.out.println("Topic :" + retainedPublish.getTopic());
            System.out.println("Qos :" + retainedPublish.getQos().getQosNumber());

        }
    });
}

...


Clear All Retained Messages

This example shows how to remove all retained messages from the HiveMQ cluster with the retained message store.

Services.retainedMessageStore().clear()
Full Example Code
...

@Override
public void extensionStart(final ExtensionStartInput extensionStartInput, final ExtensionStartOutput extensionStartOutput) {

    // 1. request to delete all retained messages from the HiveMQ cluster
    final CompletableFuture<Void> clearFuture = Services.retainedMessageStore().clear();

    clearFuture.whenComplete(new BiConsumer<Void, Throwable>() {
        @Override
        public void accept(Void aVoid, Throwable throwable) {

            if(throwable != null){
                throwable.printStackTrace();
                return;
            }

            // 2. log when all retained messages were removed
            System.out.println("Successfully removed all retained messages");
        }
    });
}

...



Publish Service

The Publish Service enables extensions to send PUBLISH messages. These messages can also be sent to a specific client only.

A PUBLISH that is sent via the Publish Service is processed identical to a PUBLISH sent by a client. All MQTT 3 and MQTT 5 features for PUBLISH messages are supported. The limits that are configured in the config.xml as part of the MQTT entity are validated for the PUBLISH messages sent via the service.

PUBLISH messages that are sent to a specific client have some unique behavior and requirements.

  • The topic of the PUBLISH must match at least one subscription of the client otherwise the PUBLISH is not forwarded to the client

  • If the client has a shared subscription that matches the topic of the PUBLISH, the message will be sent to the client but is never sent to other clients with the same shared subscription

The JavaDoc for the Publish Service can be found here.

Example Usage

This section illustrates common examples on how to use the Publish Service in your own extensions.

Accessing Publish Service

final PublishService publishService = Services.publishService();


Publish

This example shows how to send a regular PUBLISH message.

Example Code
Publish message = Builders.publish()
    .topic("topic")
    .qos(Qos.AT_LEAST_ONCE)
    .payload(payload)
    .build();

Services.publishService().publish(message);
Full Example Code
...
@Override
public void extensionStart(final ExtensionStartInput extensionStartInput, final ExtensionStartOutput extensionStartOutput) {

    // Create a publish builder
    final PublishBuilder publishBuilder = Builders.publish();
    final ByteBuffer payload = ByteBuffer.wrap("message".getBytes());
    // Build the publish
    publishBuilder.topic("topic").qos(Qos.AT_LEAST_ONCE).payload(payload);
    // Access the Publish Service
    final PublishService publishService = Services.publishService();
    // Asynchronously sent PUBLISH
    final CompletableFuture<Void> future = publishService.publish(publishBuilder.build());

    future.whenComplete((aVoid, throwable) -> {
        if(throwable == null) {
            System.out.println("Publish sent successfully");
        } else {
            //please use more sophisticated logging
            throwable.printStackTrace();
        }
    });
}

...


Publish to Client

This example shows how to send a PUBLISH to a client with a specific client id.

Example Code
Publish message = Builders.publish()
    .topic("topic")
    .qos(Qos.AT_LEAST_ONCE)
    .payload(payload)
    .build();

Services.publishService().publishToClient(message, "test-client");
Full Example Code
...
@Override
public void extensionStart(final ExtensionStartInput extensionStartInput, final ExtensionStartOutput extensionStartOutput) {

    // Create a publish builder
    final PublishBuilder publishBuilder = Builders.publish();
    final ByteBuffer payload = ByteBuffer.wrap("message".getBytes());
    // Build the publish
    publishBuilder.topic("topic").qos(Qos.AT_LEAST_ONCE).payload(payload);
    // Access the Publish Service
    final PublishService publishService = Services.publishService();
    // Asynchronously sent PUBLISH
    final CompletableFuture<Void> future = publishService.publish(publishBuilder.build());
    final String clientId = "client";
    final CompletableFuture<PublishToClientResult> future = publishService.publishToClient(publishBuilder.build(), clientId);

    future.whenComplete((result, throwable) -> {

        if (throwable == null) {
            if (result == PublishToClientResult.NOT_SUBSCRIBED) {
                System.out.println("Publish was not sent to client ("+clientId+
                        ") because it is not subscribed to a matching topic");
            } else {
                System.out.println("Publish sent successfully");
            }
        } else {
            //please use more sophisticated logging
            throwable.printStackTrace();
        }
    });
}

...



Managed Extension Executor Service

Many MQTT integrations depend on expensive (in terms of CPU time) operations like:

  • Calling webservices

  • Persisting or querying data from a database

  • Writing data to disk

  • Any other blocking operation

The main paradigm in HiveMQ extensions is that blocking is not allowed. When your business requires the use of blocking operations, the ManagedExtensionExecutorService can be used to enable asynchronous calls to these operations. This HiveMQ-managed executor service is shared between all HiveMQ extensions and can be monitored by the standard HiveMQ monitoring system.

The ManagedExtensionExecutorService is a sophisticated implementation, which can be used as a ScheduledExecutorService. This allows the use of a callback based Future handling for true non-blocking behaviour.

The extension executor service also allows the scheduling of tasks periodically.

It’s important that you never create your own thread pools. This can decrease the performance of HiveMQ significantly. This is especially true for Java Cached Thread pools, since these increase Java threads without any limit and can make your system unresponsive. If you’re using a library which uses these thread pools (like the Jersey Client library), make sure to limit the amount of threads!

The thread-pool of this executor service is dependent on the available cores of the JVM HiveMQ runs in.

When extensionStop() is executed, no additional tasks can be submitted. At the same time schedulers are canceled by HiveMQ and therefore no longer submit new tasks.

After shut down HiveMQ will proceed to execute already submitted tasks for a grace period of 3 minutes. When these 3 minutes have passed the executor service will be shut down ungracefully. Any possibly remaining tasks will not be executed.

Example Usage

The ManagedExtensionExecutorService object can be accessed by extensions through Services.extensionExecutorService().

Accessing Managed Extension Executor Service

final ManagedExtensionExecutorService executorService = Services.extensionExecutorService();


Log Incoming Publishes

This example shows how to log incoming publishes per minute with the ManagedExtensionExecutorService.

@Override
public void extensionStart(final ExtensionStartInput extensionStartInput, final ExtensionStartOutput extensionStartOutput) {

    final ManagedExtensionExecutorService executorService = Services.extensionExecutorService();
    final MetricRegistry metricRegistry = Services.metricRegistry();

    executorService.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            Meter incomingPublishRate = metricService.getHiveMQMetric(HiveMQMetrics.INCOMING_PUBLISH_RATE);
            logger.info("Incoming publishes last minute = {}", incomingPublishRate.getOneMinuteRate());
        }
    }, 1, 1, TimeUnit.MINUTES);

}


Adding A Callback

This example shows how to add a callback to the submitted task to receive the return value when the future is completed.

private void methodWithCompletableFuture() {

    final ManagedExtensionExecutorService extensionExecutorService = Services.extensionExecutorService();

    final CompletableFuture<String> result = extensionExecutorService.submit(new Callable<String>() {
        @Override
        public String call() throws Exception {
            return "Test";
        }
    });

    result.whenComplete(new BiConsumer<String, Throwable>() {
        @Override
        public void accept(final String resultString, final Throwable throwable) {
            if(throwable != null){
                //please use more sophisticated logging
                throwable.printStackTrace();
                return;
            }
            if(resultString != null){
                System.out.println(resultString);
            }
        }
    });
}