Services
HiveMQ Services provide a convenient way for extensions to interact with the HiveMQ core.
They can be accessed through the Services class.
The following table lists all available HiveMQ Services.
Service | Description |
---|---|
Allows extensions to get client session information, disconnect clients, and remove sessions. |
|
Allows extensions to get, remove, or add subscriptions for specific clients. |
|
Allows extensions to get, remove, or add retained messages for specific topics. Or delete all retained messages at once. |
|
Allows extensions to send PUBLISH messages. |
|
Allows extensions to use a HiveMQ-managed executor service for non-blocking operations. |
|
Allows extensions to get information about the broker instance. |
Client Service
The Client Service allows extensions to gather information about clients:
-
Online status
-
Client identifier
-
Session expiry interval
The Client Service enables extensions to do the following:
-
Query for session information on all cluster nodes.
-
Forcibly disconnect clients with the option to send or not send the last-will message of the client.
-
Invalidate client sessions for both connected and disconnected clients. For connected clients, invalidation disconnects the client and sends the last-will message.
-
Iterate over all clients and their session information.
The JavaDoc for the Client Service can be found here.
Query Client Connection
This examples shows how to get information about the online status of a client:
The isClientConnected
method returns true
for online clients and false
for offline clients.
CompletableFuture<Boolean> connectedFuture = clientService.isClientConnected("my-client-id");
...
@Override
public void extensionStart(final ExtensionStartInput extensionStartInput, final ExtensionStartOutput extensionStartOutput) {
final String clientId = "client-123";
final ClientService clientService = Services.clientService();
CompletableFuture<Boolean> connectedFuture = clientService.isClientConnected(clientId);
connectedFuture.whenComplete(new BiConsumer<Boolean, Throwable>() {
@Override
public void accept(Boolean connected, Throwable throwable) {
if(throwable == null) {
System.out.println("Client with id {" + clientId + "} is connected: " + connected);
} else {
//please use more sophisticated logging
throwable.printStackTrace();
}
}
});
}
...
Get Session Information
This examples shows how to get all session information of a client with a specific clientId.
The getSession
method returns an Optional
of a SessionInformation
object. It will be empty if no session was found.
Otherwise it contains the following information:
-
is the client connected
-
what is the session expiry interval
-
which client identifier owns the session
CompletableFuture<Optional<SessionInformation>> sessionFuture = clientService.getSession("my-client-id");
...
@Override
public void extensionStart(final ExtensionStartInput extensionStartInput, final ExtensionStartOutput extensionStartOutput) {
final String clientId = "my-client-id";
final ClientService clientService = Services.clientService();
CompletableFuture<Optional<SessionInformation>> sessionFuture = clientService.getSession(clientId);
sessionFuture.whenComplete(new BiConsumer<Optional<SessionInformation>, Throwable>() {
@Override
public void accept(Optional<SessionInformation> sessionInformationOptional, Throwable throwable) {
if(throwable == null) {
if(sessionInformationOptional.isPresent()){
SessionInformation information = sessionInformationOptional.get();
System.out.println("Session Found");
System.out.println("ID: " + information.getClienIdentifier());
System.out.println("Connected: " + information.isConnected());
System.out.println("Session Expiry Interval " + information.getSessionExpiryInterval());
} else {
System.out.println("No session found for client id: " + clientId);
}
} else {
//please use more sophisticated logging
throwable.printStackTrace();
}
}
});
}
...
Disconnect Client
This examples shows how to forcibly disconnect a client with a specific clientId. The sending of the client’s optional Will message on this disconnect can be prevented or not.
The disconnectClient
method returns true
if a client was online and has been disconnected.
Otherwise the method returns false
.
It is possible to prevent the sending of the Will message. See examples below.
clientService.disconnectClient("my-client-id");
clientService.disconnectClient("my-client-id", true)
clientService.disconnectClient("my-client-id", true, DisconnectReasonCode.NORMAL_DISCONNECTION, "my-reason-string")
There are deprecated DisconnectReasonCodes which will lead to Exceptions if used. This includes CLIENT_IDENTIFIER_NOT_VALID, DISCONNECT_WITH_WILL_MESSAGE and BAD_AUTHENTICATION_METHOD. |
...
@Override
public void extensionStart(final ExtensionStartInput extensionStartInput, final ExtensionStartOutput extensionStartOutput) {
final String clientId = "client-123";
final ClientService clientService = Services.clientService();
CompletableFuture<Boolean> disconnectFuture = clientService.disconnectClient(clientId, true);
disconnectFuture.whenComplete(new BiConsumer<Boolean, Throwable>() {
@Override
public void accept(Boolean disconnected, Throwable throwable) {
if(throwable == null) {
if(disconnected){
System.out.println("Client was successfully disconnected and no Will message was sent");
} else {
System.out.println("Client not found");
}
} else {
//please use more sophisticated logging
throwable.printStackTrace();
}
}
});
}
...
Invalidate Client Session
This examples shows how to invalidate a client session.
Invalidation means that an online client will be disconnected forcibly and its optional Will message will be sent. The client’s session information will be removed and cannot be restored.
The invalidateSession
method returns true
if a client was online and has been disconnected.
Otherwise the method returns false
.
clientService.invalidateSession("my-client-id");
...
@Override
public void extensionStart(final ExtensionStartInput extensionStartInput, final ExtensionStartOutput extensionStartOutput) {
final String clientId = "client-123";
final ClientService clientService = Services.clientService();
CompletableFuture<Boolean> invalidateSessionFuture = clientService.invalidateSession(clientId);
invalidateSessionFuture.whenComplete(new BiConsumer<Boolean, Throwable>() {
@Override
public void accept(Boolean disconnected, Throwable throwable) {
if(throwable == null) {
if(disconnected){
System.out.println("Client was disconnected");
System.out.println("Will message was sent");
System.out.println("Client session was removed");
} else {
System.out.println("Client was offline");
System.out.println("Client session was removed");
}
} else {
if(throwable instanceof NoSuchClientIdException){
System.out.println("Client not found");
}
//please use more sophisticated logging
throwable.printStackTrace();
}
}
});
}
...
Iterate All Clients
You can use the Client Service to iterate the session information of all clients. This includes all connected clients and all disconnected clients whose session is not expired yet.
The callback passed to the iterateAllClients
method is called once for each client.
The callback is executed in the Managed Extension Executor Service per default.
However, you can also pass your own executor.
The session information is not provided to the callback in any particular order.
clientService.iterateAllClients(new IterationCallback<SessionInformation>() {
@Override
public void iterate(IterationContext context, SessionInformation sessionInformation) {
// this callback is called for every client with its session information
}
});
Iteration over all clients can be used in large scale deployments, but it is a very expensive operation. Do not call the method in short time intervals. |
Iteration over all clients only works if all nodes in a HiveMQ cluster run at least version 4.2.0. |
...
@Override
public void extensionStart(final ExtensionStartInput extensionStartInput, final ExtensionStartOutput extensionStartOutput) {
final ClientService clientService = Services.clientService();
// this is the default executor but used as executor argument for demonstration purposes
final Executor executor = Services.extensionExecutorService();
final Pattern pattern = Pattern.compile("client-[1-9]+");
CompletableFuture<Void> iterationFuture = clientService.iterateAllClients(
new IterationCallback<SessionInformation>() {
@Override
public void iterate(IterationContext context, SessionInformation sessionInformation) {
final String clientIdentifier = sessionInformation.getClientIdentifier();
if (pattern.matcher(clientIdentifier).matches()) {
System.out.println("Found client for pattern " + clientIdentifier);
// abort the iteration if you are not interested in the remaining information as this saves resources
context.abortIteration();
}
}
}, executor);
iterationFuture.whenComplete((ignored, throwable) -> {
if (throwable == null) {
System.out.println("Iterated all clients");
} else {
throwable.printStackTrace(); // please use more sophisticated logging
}
});
}
...
...
@Override
public void extensionStart(final ExtensionStartInput extensionStartInput, final ExtensionStartOutput extensionStartOutput) {
final ClientService clientService = Services.clientService();
final AtomicInteger counter = new AtomicInteger();
CompletableFuture<Void> iterationFuture = clientService.iterateAllClients(
new IterationCallback<SessionInformation>() {
@Override
public void iterate(IterationContext context, SessionInformation sessionInformation) {
if (sessionInformation.isConnected()) {
counter.incrementAndGet();
}
}
});
iterationFuture.whenComplete((ignored, throwable) -> {
if (throwable == null) {
System.out.println("Connected clients: " + counter.get());
} else {
throwable.printStackTrace(); // please use more sophisticated logging
}
});
}
...
If the topology of the cluster changes during the iteration, the iteration is canceled. (For example, if a network splits or a node leaves or joins). The following example shows how topology changes can be handled: |
...
@Override
public void extensionStart(final ExtensionStartInput extensionStartInput, final ExtensionStartOutput extensionStartOutput) {
iterate(0);
}
public void iterate(final int attempts) {
final AtomicInteger counter = new AtomicInteger();
CompletableFuture<Void> iterationFuture = Services.clientService().iterateAllClients(
new IterationCallback<SessionInformation>() {
@Override
public void iterate(IterationContext context, SessionInformation sessionInformation) {
if (sessionInformation.isConnected()) {
counter.incrementAndGet();
}
}
});
iterationFuture.whenComplete((ignored, throwable) -> {
if (throwable == null) {
System.out.println("Connected clients: " + counter.get());
// in case the cluster topology changes during iteration, an IterationFailedException is thrown
} else if (throwable instanceof IterationFailedException) {
// only retry 3 times
if (attempts < 3) {
final int newAttemptCount = attempts + 1;
Services.extensionExecutorService().schedule(() ->
iterate(newAttemptCount), newAttemptCount * 10, TimeUnit.SECONDS); // schedule retry with delay in case topology change is not over, else we would get another IterationFailedException
} else {
System.out.println("Could not fully iterate all clients.");
}
} else {
throwable.printStackTrace(); // please use more sophisticated logging
}
});
}
...
Subscription Store
The Subscription Store allows extensions to:
-
Get all subscriptions from a client
-
Add a subscription for a client
-
Remove a subscription from a client
-
Add multiple subscriptions for a client
-
Remove multiple subscriptions from a client
-
Iterate all subscribers with subscriptions to a specified topic filter
-
Iterate all subscribers with subscriptions matching a specified topic
The SubscriptionStore
can be accessed by the Services
class.
When a subscription for a client is added via the extension system or HiveMQ Control Center, the retained messages for the topics included in the subscription will not be published to the client. |
The JavaDoc for the Subscription Store can be found here.
Add Subscription
This example shows how to add a subscription to a specific client with the subscription store.
TopicSubscription subscription = Builders.topicSubscription()
.topicFilter(topic)
.qos(Qos.AT_MOST_ONCE)
.build();
Services.subscriptionStore().addSubscription("test-client", subscription);
...
@Override
public void extensionStart(final ExtensionStartInput extensionStartInput, final ExtensionStartOutput extensionStartOutput) {
final String topic = "topic";
final String clientId = "test-client";
TopicSubscriptionBuilder subscriptionBuilder = Builders.topicSubscription()
.topicFilter(topic)
.noLocal(false)
.retainAsPublished(true)
.qos(Qos.AT_MOST_ONCE)
.subscriptionIdentifier(1);
CompletableFuture<Void> addFuture = Services.subscriptionStore().addSubscription(clientId, subscriptionBuilder.build());
addFuture.whenComplete(new BiConsumer<Void, Throwable>() {
@Override
public void accept(Void aVoid, Throwable throwable) {
if(throwable != null){
throwable.printStackTrace();
return;
}
System.out.println("Successfully added subscription for topic: " + topic + " | client: " + clientId);
}
});
}
...
Add Multiple Subscriptions
This example shows how to add multiple subscriptions to a specific client with the subscription store.
final Set<TopicSubscription> topicSet = new HashSet<>();
topicSet.add(Builders.topicSubscription().topicFilter("$share/group/topic1").build());
topicSet.add(Builders.topicSubscription().topicFilter("topic2").build());
topicSet.add(Builders.topicSubscription().topicFilter("topic3").build());
Services.subscriptionStore().addSubscriptions("test-client", topicSet);
...
@Override
public void extensionStart(final ExtensionStartInput extensionStartInput, final ExtensionStartOutput extensionStartOutput) {
final SubscriptionStore subscriptionStore = Services.subscriptionStore();
final String clientID = "test-client";
final Set<TopicSubscription> topicSet = new HashSet<>();
topicSet.add(Builders.topicSubscription().topicFilter("$share/group/topic1").build());
topicSet.add(Builders.topicSubscription().topicFilter("topic2").build());
topicSet.add(Builders.topicSubscription().topicFilter("topic3").build());
final CompletableFuture<Void> addFuture = subscriptionStore.addSubscriptions(clientID, topicSet);
addFuture.whenComplete(new BiConsumer<Void, Throwable>() {
@Override
public void accept(final Void result, final Throwable throwable) {
if(throwable != null){
throwable.printStackTrace();
return;
}
System.out.println("Successfully added subscriptions to client: " + clientID);
}
});
}
...
Remove Subscription
This example shows how to remove a subscription from a specific client with the subscription store.
Services.subscriptionStore().removeSubscription("test-client", "topic/to/remove");
...
@Override
public void extensionStart(final ExtensionStartInput extensionStartInput, final ExtensionStartOutput extensionStartOutput) {
final String topic = "topic";
final String clientId = "test-client";
CompletableFuture<Void> removeFuture = Services.subscriptionStore().removeSubscription(clientId, topic);
removeFuture.whenComplete(new BiConsumer<Void, Throwable>() {
@Override
public void accept(Void aVoid, Throwable throwable) {
if(throwable != null){
throwable.printStackTrace();
return;
}
System.out.println("Successfully removed subscription for topic: " + topic + " | client: " + clientId);
}
});
}
...
Remove Multiple Subscriptions
This example shows how to remove multiple subscriptions from a specific client with the subscription store.
final Set<String> topicSet = new HashSet<>();
topicSet.add("topic1");
topicSet.add("topic2");
Services.subscriptionStore().removeSubscriptions("test-client", topicSet);
...
@Override
public void extensionStart(final ExtensionStartInput extensionStartInput, final ExtensionStartOutput extensionStartOutput) {
final SubscriptionStore subscriptionStore = Services.subscriptionStore();
final String clientID = "test-client";
final Set<String> topicSet = new HashSet<>();
topicSet.add("$share/group/topic1");
topicSet.add("topic2");
topicSet.add("topic3");
final CompletableFuture<Void> removeFuture = subscriptionStore.removeSubscriptions(clientID, topicSet);
removeFuture.whenComplete(new BiConsumer<Void, Throwable>() {
@Override
public void accept(final Void result, final Throwable throwable) {
if(throwable != null){
throwable.printStackTrace();
return;
}
System.out.println("Successfully removed subscriptions for topics: " + topicSet + " | from client: " + clientID);
}
});
}
...
Get Subscriptions
This example shows how to get the subscriptions from a specific client with the subscription store.
CompletableFuture<Set<TopicSubscription>> future = Services.subscriptionStore().getSubscriptions("test-client");
...
@Override
public void extensionStart(final ExtensionStartInput extensionStartInput, final ExtensionStartOutput extensionStartOutput) {
final String clientId = "test-client";
CompletableFuture<Set<TopicSubscription>> getFuture = Services.subscriptionStore().getSubscriptions(clientId);
getFuture.whenComplete(new BiConsumer<Set<TopicSubscription>, Throwable>() {
@Override
public void accept(Set<TopicSubscription> topicSubscriptions, Throwable throwable) {
if(throwable != null){
throwable.printStackTrace();
return;
}
if(topicSubscriptions.isEmpty()){
System.out.println("Found no subscriptions for client: " + clientId);
return;
}
System.out.println("Found subscriptions for client: " + clientId);
for (TopicSubscription topicSubscription : topicSubscriptions) {
System.out.println("---------------------");
System.out.println("Topic :" + topicSubscription.getTopicFilter());
System.out.println("Qos :" + topicSubscription.getQos().getQosNumber());
System.out.println("No local :" + topicSubscription.getNoLocal());
System.out.println("Retain as published :" + topicSubscription.getRetainAsPublished());
System.out.println("Subscription identifier :" + topicSubscription.getSubscriptionIdentifier());
}
}
});
}
...
Iterate All Subscriptions
You can use the Subscription Store to iterate all subscriptions of all clients. This includes the subscriptions of all connected clients and all disconnected clients whose session is not expired yet.
The callback passed to the iterateAllSubscriptions
method is called once for each client.
All subscriptions of the respective client are provided per method call.
The callback is executed in the Managed Extension Executor Service per default.
However, you can also pass your own executor.
The subscriptions are not provided to the callback in any particular order.
subscriptionStore.iterateAllSubscriptions(new IterationCallback<SubscriptionsForClientResult>() {
@Override
public void iterate(IterationContext context, SubscriptionsForClientResult subscriptionsForClient) {
// this callback is called for every client with its subscriptions
final String clientId = subscriptionsForClient.getClientId();
final Set<TopicSubscription> subscriptions = subscriptionsForClient.getSubscriptions();
}
});
Iteration over all clients can be used in large scale deployments, but it is a very expensive operation. Do not call the method in short time intervals. |
Iteration over all clients only works if all nodes in a HiveMQ cluster run at least version 4.2.0. |
...
@Override
public void extensionStart(final ExtensionStartInput extensionStartInput, final ExtensionStartOutput extensionStartOutput) {
final SubscriptionStore subscriptionStore = Services.subscriptionStore();
// this is the default executor but used as executor argument for demonstration purposes
final Executor executor = Services.extensionExecutorService();
final Pattern pattern = Pattern.compile("client-[1-9]+");
CompletableFuture<Void> iterationFuture = subscriptionStore.iterateAllSubscriptions(
new IterationCallback<SubscriptionsForClientResult>() {
@Override
public void iterate(IterationContext context, SubscriptionsForClientResult subscriptionsForClient) {
final String clientIdentifier = subscriptionsForClient.getClientId();
final Set<TopicSubscription> subscriptions = subscriptionsForClient.getSubscriptions();
if (pattern.matcher(clientIdentifier).matches()) {
System.out.println("Found client with subscriptions " + subscriptions);
// abort the iteration if you are not interested in the remaining information as this saves resources
context.abortIteration();
}
}
},
executor);
iterationFuture.whenComplete((ignored, throwable) -> {
if (throwable == null) {
System.out.println("Iterated all subscriptions");
} else {
throwable.printStackTrace(); // please use more sophisticated logging
}
});
}
...
...
@Override
public void extensionStart(final ExtensionStartInput extensionStartInput, final ExtensionStartOutput extensionStartOutput) {
final SubscriptionStore subscriptionStore = Services.subscriptionStore();
final AtomicInteger counter = new AtomicInteger();
CompletableFuture<Void> iterationFuture = subscriptionStore.iterateAllSubscriptions(
new IterationCallback<SubscriptionsForClientResult>() {
@Override
public void iterate(IterationContext context, SubscriptionsForClientResult subscriptionsForClient) {
counter.addAndGet(subscriptionsForClient.getSubscriptions().size());
}
});
iterationFuture.whenComplete((ignored, throwable) -> {
if (throwable == null) {
System.out.println("Number of subscriptions: " + counter.get());
} else {
throwable.printStackTrace(); // please use more sophisticated logging
}
});
}
...
If the topology of the cluster changes during the iteration, the iteration is canceled. (For example, if a network splits or a node leaves or joins). The following example shows how topology changes can be handled: |
...
@Override
public void extensionStart(final ExtensionStartInput extensionStartInput, final ExtensionStartOutput extensionStartOutput) {
iterate(0);
}
public void iterate(final int attempts) {
final AtomicInteger counter = new AtomicInteger();
CompletableFuture<Void> iterationFuture = Services.subscriptionStore().iterateAllSubscriptions(
new IterationCallback<SubscriptionsForClientResult>() {
@Override
public void iterate(IterationContext context, SubscriptionsForClientResult subscriptionsForClient) {
counter.addAndGet(subscriptionsForClient.getSubscriptions().size());
}
});
iterationFuture.whenComplete((ignored, throwable) -> {
if (throwable == null) {
System.out.println("Number of subscriptions: " + counter.get());
// in case the cluster topology changes during iteration, an IterationFailedException is thrown
} else if (throwable instanceof IterationFailedException) {
// only retry 3 times
if (attempts < 3) {
final int newAttemptCount = attempts + 1;
Services.extensionExecutorService().schedule(() ->
iterate(newAttemptCount), newAttemptCount * 10, TimeUnit.SECONDS); // schedule retry with delay in case topology change is not over, else we would get another IterationFailedException
} else {
System.out.println("Could not fully iterate all clients.");
}
} else {
throwable.printStackTrace(); // please use more sophisticated logging
}
});
}
...
Iterate All Subscribers with Subscriptions to a Specified Topic Filter
You can use the Subscription Store to iterate all subscribers that have subscriptions with a specified topic filter. Use this filtered iteration instead of iterating all subscriptions if you are only interested in subscribers that have subscriptions with a specific topic filter, because it is a lot more resource efficient. It is also possible to filter the subscriptions even more precisely by specifying to only include shared or non shared (individual) subscriptions.
The callback passed to the iterateAllSubscribersWithTopicFilter
method is called once for each client that has a
subscription with the specified topic filter.
The callback is executed in the Managed Extension Executor Service per default.
However, you can also pass your own executor.
The subscribers are not provided to the callback in any particular order.
Example:
For the topic filter example/#
the iteration covers all clients that have a subscription with the exact same topic filter:
example/#
It will not cover subscriptions with the following topic filters:
-
example/topic
-
example/+
-
+/#
-
and other wildcard matches
The method iterateAllSubscribersWithTopicFilter only provides subscribers that have subscribed with the exact
same topic filter as specified.
Use iterateAllSubscribersForTopic if you want to query subscriptions with the usual
topic filter matching algorithm.
|
subscriptionStore.iterateAllSubscribersWithTopicFilter("example/topic",
new IterationCallback<SubscriberWithFilterResult>() {
@Override
public void iterate(IterationContext context, SubscriberWithFilterResult subscriberWithFilter) {
// this callback is called for every client that has a subscription with the specified topic filter
final String clientId = subscriberWithFilter.getClientId();
}
});
Iteration over all clients can be used in large scale deployments, but it is a very expensive operation. Do not call the method in short time intervals. |
Iteration over all clients only works if all nodes in a HiveMQ cluster run at least version 4.2.0. |
...
@Override
public void extensionStart(final ExtensionStartInput extensionStartInput, final ExtensionStartOutput extensionStartOutput) {
final SubscriptionStore subscriptionStore = Services.subscriptionStore();
// this is the default executor but used as executor argument for demonstration purposes
final Executor executor = Services.extensionExecutorService();
final AtomicInteger counter = new AtomicInteger();
CompletableFuture<Void> iterationFuture = subscriptionStore.iterateAllSubscribersWithTopicFilter(
"example/topic",
SubscriptionType.INDIVIDUAL,
new IterationCallback<SubscriberWithFilterResult>() {
@Override
public void iterate(IterationContext context, SubscriberWithFilterResult subscriberWithFilter) {
counter.incrementAndGet();
}
},
executor);
iterationFuture.whenComplete((ignored, throwable) -> {
if (throwable == null) {
System.out.println("Number of subscribers with specified topic filter: " + counter.get());
} else {
throwable.printStackTrace(); // please use more sophisticated logging
}
});
}
...
Iterate All Subscribers with Subscriptions Matching a Specified Topic
You can use the Subscription Store to iterate all subscribers that have subscriptions matching a specified topic. Use this filtered iteration instead of iterating all subscriptions if you are only interested in subscribers that have subscriptions matching a specific topic filter, because it is a lot more resource efficient. It is also possible to filter the subscriptions even more precisely by specifying to only include shared or non shared (individual) subscriptions.
The callback passed to the iterateAllSubscribersForTopic
method is called once for each client that has a
subscription matching the specified topic.
The callback is executed in the Managed Extension Executor Service per default.
However, you can also pass your own executor.
The subscribers are not provided to the callback in any particular order.
Example:
For the topic example/topic
the iteration covers all clients that have a subscription with the following topic filters:
-
example/topic
-
example/#
-
example/+
-
+/#
-
and other wildcard matches
subscriptionStore.iterateAllSubscribersForTopic("example/topic",
new IterationCallback<SubscriberWithFilterResult>() {
@Override
public void iterate(IterationContext context, SubscriberWithFilterResult subscriberWithFilter) {
// this callback is called for every client that has a subscription matching the specified topic
final String clientId = subscriberWithFilter.getClientId();
}
});
Iteration over all clients can be used in large scale deployments, but it is a very expensive operation. Do not call the method in short time intervals. |
Iteration over all clients only works if all nodes in a HiveMQ cluster run at least version 4.2.0. |
...
@Override
public void extensionStart(final ExtensionStartInput extensionStartInput, final ExtensionStartOutput extensionStartOutput) {
final SubscriptionStore subscriptionStore = Services.subscriptionStore();
// this is the default executor but used as executor argument for demonstration purposes
final Executor executor = Services.extensionExecutorService();
final AtomicInteger counter = new AtomicInteger();
CompletableFuture<Void> iterationFuture = subscriptionStore.iterateAllSubscribersForTopic(
"example/topic",
SubscriptionType.INDIVIDUAL,
new IterationCallback<SubscriberWithFilterResult>() {
@Override
public void iterate(IterationContext context, SubscriberWithFilterResult subscriberWithFilter) {
counter.incrementAndGet();
}
},
executor);
iterationFuture.whenComplete((ignored, throwable) -> {
if (throwable == null) {
System.out.println("Number of subscribers for specified topic: " + counter.get());
} else {
throwable.printStackTrace(); // please use more sophisticated logging
}
});
}
...
Retained Message Store
The Retained Message Store enables extensions to interact with retained messages in the following ways:
-
Get the retained message for a specific topic
-
Add or replace the retained message for a topic
-
Remove the retained message for a topic
-
Clear all retained messages from the HiveMQ cluster.
-
Iterate over all stored retained messages in the HiveMQ cluster.
The RetainedMessageStore
can be accessed by the Services
class.
The retained messages that the Retained Message Store adds are processed differently than retained messages that clients send. Clients that are currently subscribed to the topic where the retained message is added do not receive the retained message from the Retained Message Store as a publish. The newly added retained message is only available to clients that subscribe or resubscribe to the topic after the Retained Message Store adds the message. |
For more information, see JavaDoc for the Retained Message Store.
Access Retained Message Store
final RetainedMessageStore store = Services.retainedMessageStore();
To avoid errors such as an IterationFailedException , verify that your HiveMQ instance has started successfully before you call methods in your extension start. For more information, see Admin Service.
|
Add Retained Message
This example shows how to add a retained message to a specific topic with the Retained Message Store.
When you add a retained message to a specific topic with the Retained Message Store, the added message overwrites any existing retained message on the selected topic. |
RetainedPublish retainedMessage = retainedPublishBuilder
.topic("add/message")
.payload(ByteBuffer.wrap("test".getBytes()))
.qos(Qos.AT_LEAST_ONCE)
.build();
Services.retainedMessageStore().addOrReplace(retainedMessage);
...
@Override
public void extensionStart(final ExtensionStartInput extensionStartInput, final ExtensionStartOutput extensionStartOutput) {
// 1. build the retained message via the RetainedPublishBuilder
final RetainedPublishBuilder retainedPublishBuilder = Builders.retainedPublish();
final RetainedPublish retainedMessage = retainedPublishBuilder
.topic("add/message")
.payload(ByteBuffer.wrap("test".getBytes()))
.userProperty("reason","message-update")
.qos(Qos.AT_LEAST_ONCE)
.build();
// 2. add the retained message (if a retained message already exists for the topic, it will be overwritten)
final CompletableFuture<Void> addFuture = Services.retainedMessageStore().addOrReplace(retainedMessage);
addFuture.whenComplete(new BiConsumer<Void, Throwable>() {
@Override
public void accept(Void aVoid, Throwable throwable) {
if(throwable != null){
throwable.printStackTrace();
return;
}
// 3. log when the message was successfully added/replaced
System.out.println("Successfully added retained message for topic: " + topic);
}
});
}
...
Remove Retained Message
This example shows how to remove a retained message from a specific topic with the Retained Message Store.
Services.retainedMessageStore().remove("topic/to/remove");
...
@Override
public void extensionStart(final ExtensionStartInput extensionStartInput, final ExtensionStartOutput extensionStartOutput) {
final String topic = "topic";
// 1. remove the retained message from the given topic
final CompletableFuture<Void> removeFuture = Services.retainedMessageStore().remove(topic);
removeFuture.whenComplete(new BiConsumer<Void, Throwable>() {
@Override
public void accept(Void aVoid, Throwable throwable) {
if(throwable != null){
throwable.printStackTrace();
return;
}
// 2. log when the message was successfully removed (also happens when no retained message for that topic)
System.out.println("Successfully removed retained message for topic: " + topic);
}
});
}
...
Get Retained Message
This example shows how to get the retained message from a specific topic with the Retained Message Store.
CompletableFuture<Optional<RetainedPublish>> future = Services.retainedMessageStore().getRetainedMessage("topic/to/get");
...
@Override
public void extensionStart(final ExtensionStartInput extensionStartInput, final ExtensionStartOutput extensionStartOutput) {
final String topic = "topic";
// 1. request retained message for topic
final CompletableFuture<Optional<RetainedPublish>> getFuture = Services.retainedMessageStore().getRetainedMessage(topic);
getFuture.whenComplete(new BiConsumer<Optional<RetainedPublish>, Throwable>() {
@Override
public void accept(Optional<RetainedPublish> retainedPublishOptional, Throwable throwable) {
if (throwable != null) {
throwable.printStackTrace();
return;
}
// 2. check if a retained message exists for that topic
if (!retainedPublishOptional.isPresent()) {
System.out.println("Found no retained message for topic: " + topic);
return;
}
// 3. log some information about the retained message
final RetainedPublish retainedPublish = retainedPublishOptional.get();
System.out.println("Found retained message for topic: " + topic);
System.out.println("---------------------");
System.out.println("Topic :" + retainedPublish.getTopic());
System.out.println("Qos :" + retainedPublish.getQos().getQosNumber());
}
});
}
...
Clear All Retained Messages
This example shows how to remove all retained messages from a HiveMQ cluster with the Retained Message Store.
Services.retainedMessageStore().clear()
...
@Override
public void extensionStart(final ExtensionStartInput extensionStartInput, final ExtensionStartOutput extensionStartOutput) {
// 1. request to delete all retained messages from the HiveMQ cluster
final CompletableFuture<Void> clearFuture = Services.retainedMessageStore().clear();
clearFuture.whenComplete(new BiConsumer<Void, Throwable>() {
@Override
public void accept(Void aVoid, Throwable throwable) {
if(throwable != null){
throwable.printStackTrace();
return;
}
// 2. log when all retained messages were removed
System.out.println("Successfully removed all retained messages");
}
});
}
...
Iterate All Retained Messages
You can use the Retained Message Store to iterate over all stored retained messages in HiveMQ.
The callback passed to the iterateAllRetainedMessages
method is called once for each retained message.
The call for each retained messages contains all information for the retained message in the form of a RetainedPublish
and an IterationContext
that can be used to abort the iteration prematurely if desired.
By default, the Managed Extension Executor Service executes the callback. However, you can also pass your own executor. The retained message information is not provided to the callback in any particular order.
Services.retainedMessageStore().iterateAllRetainedMessages(new IterationCallback<RetainedPublish>() {
@Override
public void iterate(final @NotNull IterationContext context, final @NotNull RetainedPublish retainedPublish) {
// this callback is called for every stored retained message
}
});
If you have large numbers of retained messages, iteration over all retained messages can be resource intensive. Avoid calling this method in short time intervals. |
To iterate over all retained messages, all nodes in the HiveMQ cluster must run HiveMQ version 4.4.0 or higher. |
...
@Override
public void extensionStart(final ExtensionStartInput extensionStartInput, final ExtensionStartOutput extensionStartOutput) {
final RetainedMessageStore retainedMessageStore = Services.retainedMessageStore();
// this is the default executor but used as executor argument for demonstration purposes
final Executor executor = Services.extensionExecutorService();
final Pattern pattern = Pattern.compile("sensor/id-[1-9]+");
CompletableFuture<Void> iterationFuture = retainedMessageStore.iterateAllRetainedMessages(
new IterationCallback<RetainedPublish>() {
@Override
public void iterate(final @NotNull IterationContext context, final @NotNull RetainedPublish retainedPublish) {
final String retainedPublishTopic = retainedPublish.getTopic();
if (pattern.matcher(retainedPublishTopic).matches()) {
System.out.println("Found retained messages with topic matching pattern " + retainedPublishTopic);
// abort the iteration if you are not interested in the remaining retained messages as this saves resources
context.abortIteration();
}
}
}, executor);
iterationFuture.whenComplete((ignored, throwable) -> {
if (throwable == null) {
System.out.println("Iteration over retained messages complete"); // this will also be called if iteration is aborted manually
} else {
throwable.printStackTrace(); // please use more sophisticated logging
}
});
}
...
...
@Override
public void extensionStart(final ExtensionStartInput extensionStartInput, final ExtensionStartOutput extensionStartOutput) {
final RetainedMessageStore retainedMessageStore = Services.retainedMessageStore();
final AtomicInteger counter = new AtomicInteger();
CompletableFuture<Void> iterationFuture = retainedMessageStore.iterateAllRetainedMessages(
new IterationCallback<RetainedPublish>() {
@Override
public void iterate(final @NotNull IterationContext context, final @NotNull RetainedPublish retainedPublish) {
if (retainedPublish.getQos() == Qos.EXACTLY_ONCE) {
counter.incrementAndGet();
}
}
});
iterationFuture.whenComplete((ignored, throwable) -> {
if (throwable == null) {
System.out.println("Retained messages with QoS level 2: " + counter.get());
} else {
throwable.printStackTrace(); // please use more sophisticated logging
}
});
}
...
If the topology of the cluster changes during the iteration, the iteration is canceled. For example, When a network splits or a node leaves or joins the network. |
The following example shows how topology changes can be handled:
...
@Override
public void extensionStart(final ExtensionStartInput extensionStartInput, final ExtensionStartOutput extensionStartOutput) {
iterate(0);
}
public void iterate(final int attempts) {
final AtomicInteger counter = new AtomicInteger();
CompletableFuture<Void> iterationFuture = Services.retainedMessageStore().iterateAllRetainedMessages(
new IterationCallback<RetainedPublish>() {
@Override
public void iterate(final @NotNull IterationContext context, final @NotNull RetainedPublish retainedPublish) {
if (retainedPublish.getQos() == Qos.EXACTLY_ONCE) {
counter.incrementAndGet();
}
}
});
iterationFuture.whenComplete((ignored, throwable) -> {
if (throwable == null) {
System.out.println("Retained messages with QoS level 2: " + counter.get());
// in case the cluster topology changes during iteration, an IterationFailedException is thrown
} else if (throwable instanceof IterationFailedException) {
// only retry 3 times
if (attempts < 3) {
final int newAttemptCount = attempts + 1;
Services.extensionExecutorService().schedule(() ->
iterate(newAttemptCount), newAttemptCount * 10, TimeUnit.SECONDS); // schedule retry with delay in case topology change is not over, else we would get another IterationFailedException
} else {
System.out.println("Could not fully iterate all retained messages.");
}
} else {
throwable.printStackTrace(); // please use more sophisticated logging
}
});
}
...
Publish Service
The Publish Service enables extensions to send PUBLISH messages. These messages can also be sent to a specific client only.
A PUBLISH that is sent via the Publish Service is processed identical to a PUBLISH sent by a client.
All MQTT 3 and MQTT 5 features for PUBLISH messages are supported.
The limits that are configured in the config.xml
as part of the MQTT entity are validated for the PUBLISH messages sent via the service.
PUBLISH messages that are sent to a specific client have some unique behavior and requirements.
-
The topic of the PUBLISH must match at least one subscription of the client otherwise the PUBLISH is not forwarded to the client
-
If the client has a shared subscription that matches the topic of the PUBLISH, the message will be sent to the client but is never sent to other clients with the same shared subscription
The JavaDoc for the Publish Service can be found here.
Publish
This example shows how to send a regular PUBLISH message.
Publish message = Builders.publish()
.topic("topic")
.qos(Qos.AT_LEAST_ONCE)
.payload(payload)
.build();
Services.publishService().publish(message);
...
@Override
public void extensionStart(final ExtensionStartInput extensionStartInput, final ExtensionStartOutput extensionStartOutput) {
// Create a publish builder
final PublishBuilder publishBuilder = Builders.publish();
final ByteBuffer payload = ByteBuffer.wrap("message".getBytes());
// Build the publish
publishBuilder.topic("topic").qos(Qos.AT_LEAST_ONCE).payload(payload);
// Access the Publish Service
final PublishService publishService = Services.publishService();
// Asynchronously sent PUBLISH
final CompletableFuture<Void> future = publishService.publish(publishBuilder.build());
future.whenComplete((aVoid, throwable) -> {
if(throwable == null) {
System.out.println("Publish sent successfully");
} else {
//please use more sophisticated logging
throwable.printStackTrace();
}
});
}
...
Publish to Client
This example shows how to send a PUBLISH to a client with a specific client id.
Publish message = Builders.publish()
.topic("topic")
.qos(Qos.AT_LEAST_ONCE)
.payload(payload)
.build();
Services.publishService().publishToClient(message, "test-client");
...
@Override
public void extensionStart(final ExtensionStartInput extensionStartInput, final ExtensionStartOutput extensionStartOutput) {
// Create a publish builder
final PublishBuilder publishBuilder = Builders.publish();
final ByteBuffer payload = ByteBuffer.wrap("message".getBytes());
// Build the publish
publishBuilder.topic("topic").qos(Qos.AT_LEAST_ONCE).payload(payload);
// Access the Publish Service
final PublishService publishService = Services.publishService();
// Asynchronously sent PUBLISH
final CompletableFuture<Void> future = publishService.publish(publishBuilder.build());
final String clientId = "client";
final CompletableFuture<PublishToClientResult> future = publishService.publishToClient(publishBuilder.build(), clientId);
future.whenComplete((result, throwable) -> {
if (throwable == null) {
if (result == PublishToClientResult.NOT_SUBSCRIBED) {
System.out.println("Publish was not sent to client ("+clientId+
") because it is not subscribed to a matching topic");
} else {
System.out.println("Publish sent successfully");
}
} else {
//please use more sophisticated logging
throwable.printStackTrace();
}
});
}
...
Managed Extension Executor Service
Many MQTT integrations depend on expensive (in terms of CPU time) operations like:
-
Calling webservices
-
Persisting or querying data from a database
-
Writing data to disk
-
Any other blocking operation
The main paradigm in HiveMQ extensions is that blocking is not allowed.
When your business requires the use of blocking operations,
the ManagedExtensionExecutorService
can be used to enable asynchronous calls to these operations.
This HiveMQ-managed executor service is shared between all HiveMQ extensions
and can be monitored by the standard HiveMQ monitoring system.
The ManagedExtensionExecutorService
is a sophisticated implementation, which can be used
as a ScheduledExecutorService
. This allows the use of a callback based Future handling for true non-blocking behaviour.
The extension executor service also allows the scheduling of tasks periodically.
It’s important that you never create your own thread pools. This can decrease the performance of HiveMQ significantly. This is especially true for Java Cached Thread pools, since these increase Java threads without any limit and can make your system unresponsive. If you’re using a library which uses these thread pools (like the Jersey Client library), make sure to limit the amount of threads!
The thread-pool of this executor service is dependent on the available cores of the JVM HiveMQ runs in.
When extensionStop()
is executed, no additional tasks can be submitted.
At the same time schedulers are canceled by HiveMQ and therefore no longer submit new tasks.
After shut down HiveMQ will proceed to execute already submitted tasks for a grace period of 3 minutes. When these 3 minutes have passed the executor service will be shut down ungracefully. Any possibly remaining tasks will not be executed.
Access the Managed Extension Executor Service
final ManagedExtensionExecutorService executorService = Services.extensionExecutorService();
Log Incoming Publishes
This example shows how to log incoming publishes per minute with the ManagedExtensionExecutorService
.
@Override
public void extensionStart(final ExtensionStartInput extensionStartInput, final ExtensionStartOutput extensionStartOutput) {
final ManagedExtensionExecutorService executorService = Services.extensionExecutorService();
final MetricRegistry metricRegistry = Services.metricRegistry();
executorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
Meter incomingPublishRate = metricService.getHiveMQMetric(HiveMQMetrics.INCOMING_PUBLISH_RATE);
logger.info("Incoming publishes last minute = {}", incomingPublishRate.getOneMinuteRate());
}
}, 1, 1, TimeUnit.MINUTES);
}
Add a Callback
This example shows how to add a callback to the submitted task to receive the return value when the future is completed.
private void methodWithCompletableFuture() {
final ManagedExtensionExecutorService extensionExecutorService = Services.extensionExecutorService();
final CompletableFuture<String> result = extensionExecutorService.submit(new Callable<String>() {
@Override
public String call() throws Exception {
return "Test";
}
});
result.whenComplete(new BiConsumer<String, Throwable>() {
@Override
public void accept(final String resultString, final Throwable throwable) {
if(throwable != null){
//please use more sophisticated logging
throwable.printStackTrace();
return;
}
if(resultString != null){
System.out.println(resultString);
}
}
});
}
Admin Service
At runtime, you often need to get information about the broker instance without a triggering event such as a client connect. For this purpose the extension SDK offers the Admin Service, which provides information about:
-
The current lifecycle stage
-
The license used by the broker
-
The broker node with the ServerInformation object
Access the Admin Service
Extensions can access the AdminService
object through Services.adminService()
.
final @NotNull AdminService adminService = Services.adminService();
Lifecycle Stage
The Admin Service provides information on the current status of the broker lifecycle. There are two states in which the broker can be:
-
STARTING
: The broker is in this state from the moment the JVM starts until the start procedure of the broker concludes. -
STARTED_SUCCESSFULLY
: The broker is in this state after the start procedure concludes. This means that the extension system is started, the persistences are running, the listeners are up, a cluster is joined, and the Control Centre is accessible.
Before you announce that a HiveMQ instance is ready, we recommend that you verify the lifecycle state of the broker to ensure that HiveMQ has successfully completed startup: |
private final @NotNull AdminService adminService;
private final @NotNull ManagedExtensionExecutorService executorService;
public void schedulePublishing() {
executorService.schedule(() -> {
// check if broker is ready
if (adminService.getCurrentStage() == LifecycleStage.STARTED_SUCCESSFULLY) {
// do action
publishListenerInformation();
} else {
// schedule next check
schedulePublishing();
}
}, 10, TimeUnit.SECONDS);
}
Server Information
In the Admin Service and at different places in the Extension SDK, HiveMQ provides a ServerInformation object, that contains runtime information about the broker node to which the extension is attached.
The following information is provided:
-
The HiveMQ version
-
The folder structure:
-
The home folder (for example, set by the
HIVEMQ_HOME
environment variable) -
The data folder (for example, set by the
HIVEMQ_DATA_FOLDER
environment variable) -
The log folder (for example, set by the
HIVEMQ_LOG_FOLDER
environment variable) -
The extensions folder (for example, set by the
HIVEMQ_EXTENSION_FOLDER
environment variable)
-
-
The active MQTT listeners
For example, if you want to publish the available listeners to an external directory service:
private void publishListenerInformation() {
final ServerInformation serverInformation = adminService.getServerInformation();
for (final Listener listener : serverInformation.getListener()) {
// publishes listeners to a registry
publishListener(listener);
}
}