Services

HiveMQ Services provide a convenient way for extensions to interact with the HiveMQ core.

They can be accessed through the Services class.

The following table lists all available HiveMQ Services.

Table 1. Available HiveMQ Services
Service Description

Client Service

Allows extensions to get client session information, disconnect clients and remove sessions.

Initializer Registry

Allows extensions to set a client initializer for every mqtt client.

Subscription Store

Allows extensions to get, remove or add subscriptions for specific clients.

Retained Message Store

Allows extensions to get, remove or add retained messages for specific topics. Or delete all retained messages at once.

Cluster Service

Enables extensions to dynamically discover HiveMQ cluster nodes.

Managed Extension Executor

Allows extensions to use a HiveMQ-managed executor service for non-blocking operations.

Publish Service

Allows extensions to send PUBLISH messages.

Security Registry

Allows extensions to define the authentication and authorization of MQTT clients.

Event Registry

Allows extensions to set a client lifecycle event listener for MQTT clients.



Client Service

The Client Service allows extensions to gather information about clients:

  • Online status

  • Client identifier

  • Session expiry interval

The Client Service enables extensions to query for session information on all cluster nodes.

The Client Service can be leveraged to forcibly disconnect clients with the option of sending or not sending the client’s Will message.

The Client Service enables extensions to invalidate client sessions for both connected and disconnected clients. For connected clients this will result in the client being disconnected and the Will message being sent.

The JavaDoc for the Client Service can be found here.

Example Usage

This section illustrates common examples on how to use the Client Service in your own extensions.

Accessing Client Service

final ClientService clientService = Services.clientService();


Client Connected

This examples shows how to get information about the online status of a client.

The isClientConnected method returns true for online clients and false for offline clients.

Example Code
...

@Override
public void extensionStart(final ExtensionStartInput extensionStartInput, final ExtensionStartOutput extensionStartOutput) {

    final String clientId = "client-123";
    final ClientService clientService = Services.clientService();
    CompletableFuture<Boolean> connectedFuture = clientService.isClientConnected(clientId);

    connectedFuture.whenComplete(new BiConsumer<Boolean, Throwable>() {
        @Override
        public void accept(Boolean connected, Throwable throwable) {
            if(throwable == null) {
                System.out.println("Client with id {" + clientId + "} is connected: " + connected);
            } else {
                //please use more sophisticated logging
                throwable.printStackTrace();
            }
        }
    });
}

...


Get Session Information

This examples shows how to get all session information of a client with a specific clientId.

The getSession method returns an Optional of a SessionInformation object. It will be empty if no session was found. Otherwise it contains the following information:

  • is the client connected

  • what is the session expiry interval

  • which client identifier owns the session

Example Code
...

@Override
public void extensionStart(final ExtensionStartInput extensionStartInput, final ExtensionStartOutput extensionStartOutput) {

    final String clientId = "client-123";
    final ClientService clientService = Services.clientService();
    CompletableFuture<Optional<SessionInformation>> sessionFuture = clientService.getSession(clientId);

    sessionFuture.whenComplete(new BiConsumer<Optional<SessionInformation>, Throwable>() {
        @Override
        public void accept(Optional<SessionInformation> sessionInformationOptional, Throwable throwable) {
            if(throwable == null) {

                if(sessionInformationOptional.isPresent()){
                    SessionInformation information = sessionInformationOptional.get();
                    System.out.println("Session Found");
                    System.out.println("ID: " + information.getClienIdentifier());
                    System.out.println("Connected: " + information.isConnected());
                    System.out.println("Session Expiry Interval " + information.getSessionExpiryInterval());
                } else {
                    System.out.println("No session found for client id: " + clientId);
                }

            } else {
                //please use more sophisticated logging
                throwable.printStackTrace();
            }
        }
    });

}

...


Disconnect Client

This examples shows how to forcibly disconnect a client with a specific clientId. The sending of the client’s optional Will message on this disconnect can be prevented or not.

The disconnectClient method returns true if a client was online and has been disconnected. Otherwise the method returns false.

It is possible to prevent the sending of the Will message. See examples below.

Disconnect client, send Will message
...

@Override
public void extensionStart(final ExtensionStartInput extensionStartInput, final ExtensionStartOutput extensionStartOutput) {

    final String clientId = "client-123";
    final ClientService clientService = Services.clientService();
    CompletableFuture<Boolean> disconnectFuture = clientService.disconnectClient(clientId);

    disconnectFuture.whenComplete(new BiConsumer<Boolean, Throwable>() {
        @Override
        public void accept(Boolean disconnected, Throwable throwable) {
            if(throwable == null) {
                if(disconnected){
                    System.out.println("Client was successfully disconnected and optional Will message was sent");
                } else {
                    System.out.println("Client not found");
                }
            } else {
                //please use more sophisticated logging
                throwable.printStackTrace();
            }
        }
    });

}

...


Disconnect client, prevent Will message
...

@Override
public void extensionStart(final ExtensionStartInput extensionStartInput, final ExtensionStartOutput extensionStartOutput) {

    final String clientId = "client-123";
    final ClientService clientService = Services.clientService();
    CompletableFuture<Boolean> disconnectFuture = clientService.disconnectClient(clientId, true);

    disconnectFuture.whenComplete(new BiConsumer<Boolean, Throwable>() {
        @Override
        public void accept(Boolean disconnected, Throwable throwable) {
            if(throwable == null) {
                if(disconnected){
                    System.out.println("Client was successfully disconnected and no Will message was sent");
                } else {
                    System.out.println("Client not found");
                }
            } else {
                //please use more sophisticated logging
                throwable.printStackTrace();
            }
        }
    });

}

...


Invalidate Client Session

This examples shows how to invalidate a client session.

Invalidation means that an online client will be disconnected forcibly and its optional Will message will be sent. The client’s session information will be removed and cannot be restored.

The invalidateSession method returns true if a client was online and has been disconnected. Otherwise the method returns false.

Example Usage
...

@Override
public void extensionStart(final ExtensionStartInput extensionStartInput, final ExtensionStartOutput extensionStartOutput) {

    final String clientId = "client-123";
    final ClientService clientService = Services.clientService();
     CompletableFuture<Boolean> invalidateSessionFuture = clientService.invalidateSession(clientId);

    invalidateSessionFuture.whenComplete(new BiConsumer<Boolean, Throwable>() {
        @Override
        public void accept(Boolean disconnected, Throwable throwable) {
            if(throwable == null) {
                if(disconnected){
                    System.out.println("Client was disconnected");
                    System.out.println("Will message was sent");
                    System.out.println("Client session was removed");
                } else {
                    System.out.println("Client was offline");
                    System.out.println("Client session was removed");
                }
            } else {
                if(throwable instanceof NoSuchClientIdException){
                    System.out.println("Client not found");
                }
                //please use more sophisticated logging
                throwable.printStackTrace();
            }
        }
    });

}

...



Initializer Registry

The initializer registry allows every extension to set a ClientInitializer.


Client Initializer

With the client initializer set up in the extension start method, it is possible to add:

  • Interceptors

  • Client lifecycle listeners

  • Default topic permissions

To use a client initializer, it must be implemented by the extension developer. See example below.

Multiple extensions can set a client initializer. Those initializers are called sequentially, starting with the client initializer of the extension with the highest priority. Extensions with the same priority have an undefined order of calling initialize.

Every client calls the same initialize method once.

The context of the client initializer will be added to every connected and connecting mqtt client after extension start.

When the extension stops, the context of its client initializer will be removed from every client automatically.

Example Usage

This section illustrates examples on how to set a ClientInitializer via the InitializerRegistry.


Accessing Initializer Registry

This example show how to access InitializerRegistry via the Services class.

final InitializerRegistry initializerRegistry = Services.initializerRegistry();


Add Interceptors And Set Initializer

This example shows how to create a ClientInitializer, add an Interceptor and set the ClientInitializer to the registry.

...

@Override
public void extensionStart(final ExtensionStartInput extensionStartInput, final ExtensionStartOutput extensionStartOutput) {

    //create shareable interceptor
    final PublishInboundInterceptor shareAbleInterceptor = new PublishInboundInterceptor() {
        @Override
        public void onInboundPublish(PublishInboundInput publishInboundInput, PublishInboundOutput publishInboundOutput) {
            System.out.println("Publish incoming for topic: " + publishInboundInput.getPublishPacket().getTopic());
        }
    };

    //create client initializer
    final ClientInitializer initializer = new ClientInitializer() {

        @Override
        public void initialize(InitializerInput initializerInput, ClientContext clientContext) {

            //add shareable interceptor
            clientContext.addPublishInboundInterceptor(shareAbleInterceptor);

            //add backend client only interceptor
            if(initializerInput.getClientInformation().getClientId().equals("backend")) {
                clientContext.addSubscribeInboundInterceptor(
                        (subscribeInboundInput, subscribeInboundOutput) -> System.out.println("Inbound backend subscribe")
                );
            }
        }

    };

    //get registry from Services
    final InitializerRegistry initializerRegistry = Services.initializerRegistry();

    //set client initializer
    initializerRegistry.setClientInitializer(initializer);

}

...



Subscription Store

The Subscription Store allows extensions to:

The SubscriptionStore can be accessed by the Services class.

When a subscription for a client is added via the extension system or HiveMQ Control Center, the retained message for the topics included in the subscription will not be published to the client.

The JavaDoc for the Subscription Store can be found here.

Example Usage

This section illustrates common examples on how to use the Subscription Store in your own extensions.


Accessing Subscription Store

final SubscriptionStore store = Services.subscriptionStore();


Add Subscription

This example shows how to add a subscription to a specific client with the subscription store.

...

@Override
public void extensionStart(final ExtensionStartInput extensionStartInput, final ExtensionStartOutput extensionStartOutput) {

    final String topic = "topic";
    final String clientId = "test-client";

    TopicSubscriptionBuilder subscriptionBuilder = Builders.topicSubscription()
            .topicFilter(topic)
            .noLocal(false)
            .retainAsPublished(true)
            .qos(Qos.AT_MOST_ONCE)
            .subscriptionIdentifier(1);


    CompletableFuture<Void> addFuture = Services.subscriptionStore().addSubscription(clientId, subscriptionBuilder.build());

    addFuture.whenComplete(new BiConsumer<Void, Throwable>() {
        @Override
        public void accept(Void aVoid, Throwable throwable) {

            if(throwable != null){
                throwable.printStackTrace();
                return;
            }

            System.out.println("Successfully added subscription for topic: " + topic + " | client: " + clientId);
        }
    });
}

...


Add Multiple Subscriptions

This example shows how to add multiple subscriptions to a specific client with the subscription store.

...

@Override
public void extensionStart(final ExtensionStartInput extensionStartInput, final ExtensionStartOutput extensionStartOutput) {

    final SubscriptionStore subscriptionStore = Services.subscriptionStore();

    final String clientID = "test-client";

    final Set<TopicSubscription> topicSet = new HashSet<>();

    topicSet.add(Builders.topicSubscription().topicFilter("$share/group/topic1").build());
    topicSet.add(Builders.topicSubscription().topicFilter("topic2").build());
    topicSet.add(Builders.topicSubscription().topicFilter("topic3").build());

    final CompletableFuture<Void> addFuture = subscriptionStore.addSubscriptions(clientID, topicSet);

    addFuture.whenComplete(new BiConsumer<Void, Throwable>() {
        @Override
        public void accept(final Void result, final Throwable throwable) {

            if(throwable != null){
                throwable.printStackTrace();
                return;
            }

            System.out.println("Successfully added subscriptions to client: " + clientID);

        }
    });
}

...


Remove Subscription

This example shows how to remove a subscription from a specific client with the subscription store.

...

@Override
public void extensionStart(final ExtensionStartInput extensionStartInput, final ExtensionStartOutput extensionStartOutput) {

    final String topic = "topic";
    final String clientId = "test-client";

    CompletableFuture<Void> removeFuture = Services.subscriptionStore().removeSubscription(clientId, topic);

    removeFuture.whenComplete(new BiConsumer<Void, Throwable>() {
        @Override
        public void accept(Void aVoid, Throwable throwable) {

            if(throwable != null){
                throwable.printStackTrace();
                return;
            }

            System.out.println("Successfully removed subscription for topic: " + topic + " | client: " + clientId);
        }
    });
}

...


Remove Multiple Subscription

This example shows how to remove multiple subscriptions from a specific client with the subscription store.

...

@Override
public void extensionStart(final ExtensionStartInput extensionStartInput, final ExtensionStartOutput extensionStartOutput) {

    final SubscriptionStore subscriptionStore = Services.subscriptionStore();

    final String clientID = "test-client";

    final Set<String> topicSet = new HashSet<>();
    topicSet.add("$share/group/topic1");
    topicSet.add("topic2");
    topicSet.add("topic3");

    final CompletableFuture<Void> removeFuture = subscriptionStore.removeSubscriptions(clientID, topicSet);

    removeFuture.whenComplete(new BiConsumer<Void, Throwable>() {
        @Override
        public void accept(final Void result, final Throwable throwable) {

            if(throwable != null){
                throwable.printStackTrace();
                return;
            }

            System.out.println("Successfully removed subscriptions for topics: " + topicSet + " | from client: " + clientID);

        }
    });
}

...


Get Subscriptions

This example shows how to get the subscriptions from a specific client with the subscription store.

...

@Override
public void extensionStart(final ExtensionStartInput extensionStartInput, final ExtensionStartOutput extensionStartOutput) {

    final String clientId = "test-client";

    CompletableFuture<Set<TopicSubscription>> removeFuture = Services.subscriptionStore().getSubscriptions(clientId);

    removeFuture.whenComplete(new BiConsumer<Set<TopicSubscription>, Throwable>() {
        @Override
        public void accept(Set<TopicSubscription> topicSubscriptions, Throwable throwable) {

            if(throwable != null){
                throwable.printStackTrace();
                return;
            }

            if(topicSubscriptions.isEmpty()){
                System.out.println("Found no subscriptions for client: " + clientId);
                return;
            }

            System.out.println("Found subscriptions for client: " + clientId);

            for (TopicSubscription topicSubscription : topicSubscriptions) {
                System.out.println("---------------------");
                System.out.println("Topic :" + topicSubscription.getTopicFilter());
                System.out.println("Qos :" + topicSubscription.getQos().getQosNumber());
                System.out.println("No local :" + topicSubscription.getNoLocal());
                System.out.println("Retain as published :" + topicSubscription.getRetainAsPublished());
                System.out.println("Subscription identifier :" + topicSubscription.getSubscriptionIdentifier());
            }

        }
    });
}

...



Retained Message Store

The Retained Message Store allows extensions to:

  • Get the retained message for a specific topic

  • Add/Replace a retained message for a topic

  • Remove the retained message for a topic

  • Clear all retained messages from the HiveMQ cluster.

The RetainedMessageStore can be accessed by the Services class.

Retained messages that are added with the retained message store are not treated as publishes, as is the case with retained messages sent by clients. So clients with a subscription for the topic, where the retained message is added, don’t receive the retained message as a publish. Only if the clients resubscribe would they get the retained message.

The JavaDoc for the Retained Message Store can be found here.

Example Usage

This section illustrates common examples on how to use the Retained Message Store in your own extensions.


Accessing Retained Message Store

final RetainedMessageStore store = Services.retainedMessageStore();


Add Retained Message

This example shows how to add a retained message to a specific topic with the retained message store. Adding a retained message would overwrite the retained message for that topic, should one already exist.

...

@Override
public void extensionStart(final ExtensionStartInput extensionStartInput, final ExtensionStartOutput extensionStartOutput) {

    // 1. build the retained message via the RetainedPublishBuilder
    final RetainedPublishBuilder retainedPublishBuilder = Builders.retainedPublish();

    final RetainedPublish retainedMessage = retainedPublishBuilder
        .topic("add/message")
        .payload(ByteBuffer.wrap("test".getBytes()))
        .userProperty("reason","message-update")
        .qos(Qos.AT_LEAST_ONCE)
        .build();

    // 2. add the retained message (if a retained message already exists for the topic, it will be overwritten)
    final CompletableFuture<Void> addFuture = Services.retainedMessageStore().addOrReplace(retainedMessage);

    addFuture.whenComplete(new BiConsumer<Void, Throwable>() {
        @Override
        public void accept(Void aVoid, Throwable throwable) {

            if(throwable != null){
                throwable.printStackTrace();
                return;
            }

            // 3. log when the message was successfully added/replaced
            System.out.println("Successfully added retained message for topic: " + topic);
        }
    });
}

...


Remove Retained Message

This example shows how to remove a retained message from a specific topic with the retained message store.

...

@Override
public void extensionStart(final ExtensionStartInput extensionStartInput, final ExtensionStartOutput extensionStartOutput) {

    final String topic = "topic";

    // 1. remove the retained message from the given topic
    final CompletableFuture<Void> removeFuture = Services.retainedMessageStore().remove(topic);

    removeFuture.whenComplete(new BiConsumer<Void, Throwable>() {
        @Override
        public void accept(Void aVoid, Throwable throwable) {

            if(throwable != null){
                throwable.printStackTrace();
                return;
            }

            // 2. log when the message was successfully removed (also happens when no retained message for that topic)
            System.out.println("Successfully removed retained message for topic: " + topic);
        }
    });
}

...


Get Retained Message

This example shows how to get the retained message from a specific topic with the retained message store.

...

@Override
public void extensionStart(final ExtensionStartInput extensionStartInput, final ExtensionStartOutput extensionStartOutput) {

    final String topic = "topic";

    // 1. request retained message for topic
    final CompletableFuture<Optional<RetainedPublish>> getFuture = Services.retainedMessageStore().getRetainedMessage(topic);

    getFuture.whenComplete(new BiConsumer<Optional<RetainedPublish>, Throwable>() {
        @Override
        public void accept(Optional<RetainedPublish> retainedPublishOptional, Throwable throwable) {

            if (throwable != null) {
                throwable.printStackTrace();
                return;
            }

            // 2. check if a retained message exists for that topic
            if (!retainedPublishOptional.isPresent()) {
                System.out.println("Found no retained message for topic: " + topic);
                return;
            }

            // 3. log some information about the retained message
            final RetainedPublish retainedPublish = retainedPublishOptional.get();

            System.out.println("Found retained message for topic: " + topic);
            System.out.println("---------------------");
            System.out.println("Topic :" + retainedPublish.getTopic());
            System.out.println("Qos :" + retainedPublish.getQos().getQosNumber());

        }
    });
}

...


Clear All Retained Messages

This example shows how to remove all retained messages from the HiveMQ cluster with the retained message store.

...

@Override
public void extensionStart(final ExtensionStartInput extensionStartInput, final ExtensionStartOutput extensionStartOutput) {

    // 1. request to delete all retained messages from the HiveMQ cluster
    final CompletableFuture<Void> clearFuture = Services.retainedMessageStore().clear();

    clearFuture.whenComplete(new BiConsumer<Void, Throwable>() {
        @Override
        public void accept(Void aVoid, Throwable throwable) {

            if(throwable != null){
                throwable.printStackTrace();
                return;
            }

            // 2. log when all retained messages were removed
            System.out.println("Successfully removed all retained messages");
        }
    });
}

...



Cluster Service

The Cluster Service enables extensions to dynamically discover HiveMQ cluster nodes.

The ClusterService object can be accessed by extensions through Services.clusterService().

Example usage of the ClusterService
public class MyExtensionMain implements ExtensionMain {

    private final MyClusterDiscoveryCallback myCallback;

    public MyExtensionMain() {
        myCallback = new MyClusterDiscoveryCallback();
    }

    @Override
    public void extensionStart(
            final @NotNull ExtensionStartInput input, final @NotNull ExtensionStartOutput output) {

        Services.clusterService().addDiscoveryCallback(myCallback);
    }

    @Override
    public void extensionStop(
            final @NotNull ExtensionStopInput input, final @NotNull ExtensionStopOutput output) {

        Services.clusterService().removeDiscoveryCallback(myCallback);
    }
}

Cluster Discovery

A extension can realize discovery of HiveMQ cluster nodes by implementing a ClusterDiscoveryCallback and adding it via the ClusterService.

Extension discovery is only used by HiveMQ if <discovery> is set to <extension> in the <cluster> section of the HiveMQ config file.
HiveMQ config for extension cluster discovery
<?xml version="1.0"?>
<hivemq xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
    ...
    <cluster>
        ...
        <discovery>
            <extension></extension>
        </discovery>
        ...
    </cluster>
    ...
</hivemq>

The lifecycle of a ClusterDiscoveryCallback consists of 3 methods:

  • init: called once by HiveMQ when the callback is added. It can be used to register the HiveMQ instance (where the extension is running on) with some kind of central registry or to save it to a database or file server.

  • reload: called regularly by HiveMQ to discover all currently available HiveMQ cluster nodes. The interval between calls to this method is 60 seconds by default and can be overwritten by an individual ClusterDiscoveryCallback.

  • destroy: called once by HiveMQ in one of the following cases:

    • the callback is removed

    • the extension which added the callback is stopped

    • the HiveMQ instance is shut down.

    It can be used to unregister the HiveMQ instance (where the extension is running on) from a central registry.

If an exception is thrown inside one of these methods, the provided output is ignored.

Example implementation of a ClusterDiscoveryCallback
public class MyClusterDiscoveryCallback implements ClusterDiscoveryCallback {

    private final MyClusterNodesService myService = ...

    @Override
    public void init(
            final @NotNull ClusterDiscoveryInput clusterDiscoveryInput,
            final @NotNull ClusterDiscoveryOutput clusterDiscoveryOutput) {

        myService.registerHiveMQNode(clusterDiscoveryInput.getOwnAddress());
        final List<ClusterNodeAddress> hiveMQNodes = myService.getHiveMQNodes();
        clusterDiscoveryOutput.provideCurrentNodes(hiveMQNodes);
        final int nextReloadInterval = myService.getNextReloadInterval();
        clusterDiscoveryOutput.setReloadInterval(nextReloadInterval);
    }

    @Override
    public void reload(
            final @NotNull ClusterDiscoveryInput clusterDiscoveryInput,
            final @NotNull ClusterDiscoveryOutput clusterDiscoveryOutput) {

        final List<ClusterNodeAddress> hiveMQNodes = myService.getHiveMQNodes();
        clusterDiscoveryOutput.provideCurrentNodes(hiveMQNodes);
        final int nextReloadInterval = myService.getNextReloadInterval();
        clusterDiscoveryOutput.setReloadInterval(nextReloadInterval);
    }

    @Override
    public void destroy(final @NotNull ClusterDiscoveryInput clusterDiscoveryInput) {
        myService.unregisterHiveMQNode(clusterDiscoveryInput.getOwnAddress());
    }
}



Managed Extension Executor Service

Many MQTT integrations depend on expensive (in terms of CPU time) operations like:

  • Calling webservices

  • Persisting or querying data from a database

  • Writing data to disk

  • Any other blocking operation

The main paradigm in HiveMQ extensions is that blocking is not allowed. When your business requires the use of blocking operations, the ManagedExtensionExecutorService can be used to enable asynchronous calls to these operations. This HiveMQ-managed executor service is shared between all HiveMQ extensions and can be monitored by the standard HiveMQ monitoring system.

The ManagedExtensionExecutorService is a sophisticated implementation, which can be used as a ScheduledExecutorService. This allows the use of a callback based Future handling for true non-blocking behaviour.

The extension executor service also allows the scheduling of tasks periodically.

It’s important that you never create your own thread pools. This can decrease the performance of HiveMQ significantly. This is especially true for Java Cached Thread pools, since these increase Java threads without any limit and can make your system unresponsive. If you’re using a library which uses these thread pools (like the Jersey Client library), make sure to limit the amount of threads!

The thread-pool of this executor service is dependent on the available cores of the JVM HiveMQ runs in.

When extensionStop() is executed, no additional tasks can be submitted. At the same time schedulers are canceled by HiveMQ and therefore no longer submit new tasks.

After shut down HiveMQ will proceed to execute already submitted tasks for a grace period of 3 minutes. When these 3 minutes have passed the executor service will be shut down ungracefully. Any possibly remaining tasks will not be executed.

Example Usage

The ManagedExtensionExecutorService object can be accessed by extensions through Services.extensionExecutorService().

Accessing Managed Extension Executor Service

final ManagedExtensionExecutorService executorService = Services.extensionExecutorService();


Log Incoming Publishes

This example shows how to log incoming publishes per minute with the ManagedExtensionExecutorService.

@Override
public void extensionStart(final ExtensionStartInput extensionStartInput, final ExtensionStartOutput extensionStartOutput) {

    final ManagedExtensionExecutorService executorService = Services.extensionExecutorService();
    final MetricRegistry metricRegistry = Services.metricRegistry();

    executorService.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            Meter incomingPublishRate = metricService.getHiveMQMetric(HiveMQMetrics.INCOMING_PUBLISH_RATE);
            logger.info("Incoming publishes last minute = {}", incomingPublishRate.getOneMinuteRate());
        }
    }, 1, 1, TimeUnit.MINUTES);

}


Adding A Callback

This example shows how to add a callback to the submitted task to receive the return value when the future is completed.

private void methodWithCompletableFuture() {

    final ManagedExtensionExecutorService extensionExecutorService = Services.extensionExecutorService();

    final CompletableFuture<String> result = extensionExecutorService.submit(new Callable<String>() {
        @Override
        public String call() throws Exception {
            return "Test";
        }
    });

    result.whenComplete(new BiConsumer<String, Throwable>() {
        @Override
        public void accept(final String resultString, final Throwable throwable) {
            if(throwable != null){
                //please use more sophisticated logging
                throwable.printStackTrace();
                return;
            }
            if(resultString != null){
                System.out.println(resultString);
            }
        }
    });
}



Publish Service

The Publish Service enables extensions to send PUBLISH messages. These messages can also be sent to a specific client only.

A PUBLISH that is sent via the Publish Service is processed identical to a PUBLISH sent by a client. All MQTT 3 and MQTT 5 features for PUBLISH messages are supported. The limits that are configured in the config.xml as part of the MQTT entity are validated for the PUBLISH messages sent via the service.

PUBLISH messages that are sent to a specific client have some unique behavior and requirements.

  • The topic of the PUBLISH must match at least one subscription of the client otherwise the PUBLISH is not forwarded to the client

  • If the client has a shared subscription that matches the topic of the PUBLISH, the message will be sent to the client but is never sent to other clients with the same shared subscription

The JavaDoc for the Publish Service can be found here.

Example Usage

This section illustrates common examples on how to use the Publish Service in your own extensions.

Accessing Publish Service

final PublishService publishService = Services.publishService();


Publish

This example shows how to send a regular PUBLISH message.

Example Code
...
@Override
public void extensionStart(final ExtensionStartInput extensionStartInput, final ExtensionStartOutput extensionStartOutput) {

    // Create a publish builder
    final PublishBuilder publishBuilder = Builders.publish();
    final ByteBuffer payload = ByteBuffer.wrap("message".getBytes());
    // Build the publish
    publishBuilder.topic("topic").qos(Qos.AT_LEAST_ONCE).payload(payload);
    // Access the Publish Service
    final PublishService publishService = Services.publishService();
    // Asynchronously sent PUBLISH
    final CompletableFuture<Void> future = publishService.publish(publishBuilder.build());

    future.whenComplete((aVoid, throwable) -> {
        if(throwable == null) {
            System.out.println("Publish sent successfully");
        } else {
            //please use more sophisticated logging
            throwable.printStackTrace();
        }
    });
}

...


Publish to Client

This example shows how to send a PUBLISH to a client with a specific client id.

Example Code
...
@Override
public void extensionStart(final ExtensionStartInput extensionStartInput, final ExtensionStartOutput extensionStartOutput) {

    // Create a publish builder
    final PublishBuilder publishBuilder = Builders.publish();
    final ByteBuffer payload = ByteBuffer.wrap("message".getBytes());
    // Build the publish
    publishBuilder.topic("topic").qos(Qos.AT_LEAST_ONCE).payload(payload);
    // Access the Publish Service
    final PublishService publishService = Services.publishService();
    // Asynchronously sent PUBLISH
    final CompletableFuture<Void> future = publishService.publish(publishBuilder.build());
    final String clientId = "client";
    final CompletableFuture<PublishToClientResult> future = publishService.publishToClient(publishBuilder.build(), clientId);

    future.whenComplete((result, throwable) -> {

        if (throwable == null) {
            if (result == PublishToClientResult.NOT_SUBSCRIBED) {
                System.out.println("Publish was not sent to client ("+clientId+
                        ") because it is not subscribed to a matching topic");
            } else {
                System.out.println("Publish sent successfully");
            }
        } else {
            //please use more sophisticated logging
            throwable.printStackTrace();
        }
    });
}

...



Security Registry

The Security Registry allows extensions to define the authentication and authorization of MQTT clients.

The SecurityRegistry object can be accessed by extensions through Services.securityRegistry().

Its functionality is the setting of AuthenticatorProvider and AuthorizerProvider on a one of each per extension basis.

The Providers will be removed at extension stop.


Authenticator Provider

The AuthenticatorProvider provides Authenticators that are used for authenticating connecting MQTT clients. An AuthenticatorProvider is given a client specific AuthenticatorProviderInput object and should return an Authenticator. The AuthenticatorProviderInput contains information about the MQTT client, its connection and the state of the broker. The returned authenticator is valid for the duration of the client’s connection.

An AuthenticatorProvider must return an implementation of a SimpleAuthenticator or null. See example.

Authorizer Provider

The AuthorizerProvider provides Authorizers that are used for authorizing MQTT PUBLISH and SUBSCRIBE packets. An AuthenticatorProvider is given a client specific AuthenticatorProviderInput object and should return an Authorizer. The AuthorizerProviderInput contains information about the MQTT client, its connection and the state of the broker. The returned authorizer is valid for the duration of the client’s connection.

An AuthorizerProvider must return an implementation of a PublishAuthorizer, a SubscriptionAuthorizer or null. See example.


Authenticator

There is one sub-interface of Authenticator available for the development of HiveMQ extensions, the SimpleAuthenticator.


Simple Authenticator

The SimpleAuthenticator interface is used to perform authentication at the moment of receiving an MQTT CONNECT message from a newly connecting client. For this task a SimpleAuthInput and a SimpleAuthOutput are provided by HiveMQ. Extension input and extension output general-concepts.adoc#iinput-output apply. The SimpleAuthOutput itself is blocking but can easily be used to create an asynchronous SimpleAuthOutput object.

The SimpleAuthenticatorOutput provides three important methods:

  • authenticationSuccessful() finishes the authentication process for the client and allows it to connect to HiveMQ. Authenticators of extensions with a lower priority will not be called.

  • failAuthentication() finishes the authentication process for the client and disconnects the client. Authenticators of extensions with a lower priority will not be called.

  • nextExtensionOrDefault() does not decide the authentication state of the client. Authenticators of extensions with a lower priority are called or default behaviour is applied.

The default behavior is disconnecting the client with reason code "NOT_AUTHORIZED" and reason string: "authentication failed by extension". This behaviour also occurs, when none of the methods above are called.

The difference between MQTT 3 and MQTT 5 is, that an MQTT 3 client does not receive a reason string and is capable of receiving significantly less CONNACK reason codes than an MQTT 5 client after failed authentication.

Table 2. Connack Reason Codes
Mqtt5 Reason Code Mqtt3 Reason Code

NOT_AUTHORIZED

REFUSED_NOT_AUTHORIZED

BAD_USER_NAME_OR_PASSWORD

REFUSED_BAD_USERNAME_OR_PASSWORD

UNSUPPORTED_PROTOCOL_VERSION

REFUSED_UNACCEPTABLE_PROTOCOL_VERSION

CLIENT_IDENTIFIER_NOT_VALID

REFUSED_IDENTIFIER_REJECTED

SERVER_UNAVAILABLE

REFUSED_SERVER_UNAVAILABLE

UNSPECIFIED_ERROR

-

MALFORMED_PACKET

-

PROTOCOL_ERROR

-

IMPLEMENTATION_SPECIFIC_ERROR

-

SERVER_BUSY

-

BANNED

-

BAD_AUTHENTICATION_METHOD

-

TOPIC_NAME_INVALID

-

PACKET_TOO_LARGE

-

QUOTA_EXCEEDED

-

PAYLOAD_FORMAT_INVALID

-

RETAIN_NOT_SUPPORTED

-

QOS_NOT_SUPPORTED

-

USE_ANOTHER_SERVER

-

SERVER_MOVED

-

CONNECTION_RATE_EXCEEDED

-

In the output of the Authenticator it is possible to set the Default Permissions for the client.

Example Usage


Accessing Security Registry

final SecurityRegistry securityRegistry = Services.securityRegistry();


Implement Authenticator Provider

This example shows how to implement an AuthenticatorProvider.

You only need to return an Authenticator in the getAuthenticator method.

public class MyAuthenticatorProvider implements AuthenticatorProvider {

    final @NotNull Map<String, String> usernamePasswordMap;

    public MyAuthenticatorProvider(final @NotNull Map<String, String> usernamePasswordMap) {
        this.usernamePasswordMap = usernamePasswordMap;
    }

    @Nullable
    @Override
    public Authenticator getAuthenticator(final @NotNull AuthenticatorProviderInput authenticatorProviderInput) {

        //A new instance must be returned, because the authenticator has state.
        return new MySimpleAuthenticator(usernamePasswordMap);

    }
}


Implement Authenticator Provider With Shareable Authenticator

This example shows how to implement an AuthenticatorProvider with a shareable authenticator.

This authenticator must either be stateless or thread-safe.

You only need to return an Authenticator in the getAuthenticator method.

public class MyAuthenticatorProvider implements AuthenticatorProvider {

    final @NotNull SimpleAuthenticator authenticator;

    public MyAuthenticatorProvider(final @NotNull SimpleAuthenticator authenticator) {
        this.authenticator = authenticator;
    }

    @Nullable
    @Override
    public Authenticator getAuthenticator(final @NotNull AuthenticatorProviderInput authenticatorProviderInput) {

        //return a shareable authenticator which must be thread-safe / stateless
        return authenticator;

    }
}


Implement Authorizer Provider

This example shows how to implement an AuthorizerProvider.

You need to return an SubscriptionAuthorizer or PublishAuthorizer in the getAuthorizer() method.

It is possible to implement both a SubscriptionAuthorizer and a PublishAuthorizer in the same class by implementing both interfaces.

public class MyAuthorizerProvider implements AuthorizerProvider {

        //create a shared instance of SubcsriptionAuthorizer
        private final MySubscriptionAuthorizer subscriptionAuthorizer = new MySubscriptionAuthorizer();

        @Override
        public @Nullable Authorizer getAuthorizer(@NotNull final AuthorizerProviderInput authorizerProviderInput) {
            //always return the shared instance.
            //It is also possible to create a new instance here for each client
            return subscriptionAuthorizer;
        }

    }


Implement Simple Authenticator

This example shows an implementation of a SimpleAuthenticator with username and password validation.

public class MySimpleAuthenticator implements SimpleAuthenticator {

    final @NotNull Map<String, String> usernamePasswordMap;

    private MySimpleAuthenticator(final @NotNull Map<String, String> usernamePasswordMap) {
        this.usernamePasswordMap = usernamePasswordMap;
    }

    @Override
    public void onConnect(final @NotNull SimpleAuthInput input, final @NotNull SimpleAuthOutput output) {

        //get connect packet from input object
        final ConnectPacket connectPacket = input.getConnectPacket();

        //validate the username and password combination
        if (validate(connectPacket.getUserName(), connectPacket.getPassword())) {

            //successful authentication if username and password are correct
            output.authenticateSuccessfully();

        } else {

            //failed authenticattion if username or password are incorrect
            output.failAuthentication(BAD_USER_NAME_OR_PASSWORD, "wrong password");

        }
    }


    private boolean validate(final @NotNull Optional<String> usernameOptional, final @NotNull Optional<ByteBuffer> passwordOptional) {

        //if no username or password is set, valdation fails
        if(!usernameOptional.isPresent() || !passwordOptional.isPresent()){
            return false;
        }

        final String username = usernameOptional.get();
        final byte[] bytes = getBytesFromBuffer(passwordOptional.get());
        final String password = new String(bytes, StandardCharsets.UTF_8);

        //get password for username from map.
        final String passwordFromMap = usernamePasswordMap.get(username);

        //return true if passwords are equal, else false
        return password.equals(passwordFromMap);

    }

    private byte[] getBytesFromBuffer(final ByteBuffer byteBuffer) {
        final byte[] bytes = new byte[byteBuffer.remaining()];
        byteBuffer.get(bytes);
        return bytes;
    }

}


Implement Async Simple Authenticator

This example shows an async implementation of a SimpleAuthenticator with external validation.

public class MyAsyncSimpleAuthenticator implements SimpleAuthenticator {

    @Override
    public void onConnect(final @NotNull SimpleAuthInput input, final @NotNull SimpleAuthOutput output) {


        //access managed extension executor service
        final ManagedExtensionExecutorService extensionExecutorService = Services.extensionExecutorService();

        //make output async with timeout of 10 seconds and when operation timed out, auth will fail
        final Async<SimpleAuthOutput> asyncOutput = output.async(Duration.of(10, SECONDS), TimeoutFallback.FAILURE);

        //submit external task to managed extension executor service
        extensionExecutorService.submit(new Runnable() {
            @Override
            public void run() {

                //validate client via external service call. This method is not provided by the HiveMQ extension system
                final boolean successful = externalServiceCallForValidation();

                if(successful){
                    asyncOutput.getOutput().authenticateSuccessfully();
                } else {
                    asyncOutput.getOutput().failAuthentication();
                }

                //resume output to tell HiveMQ auth is complete
                asyncOutput.resume();
            }
        });

    }

}


Set Authenticator Provider

This example shows how to set an AuthenticatorProvider to the security registry.

public class AuthExtensionMain implements ExtensionMain{

    @Override
    public void extensionStart(final @NotNull ExtensionStartInput extensionStartInput, final @NotNull ExtensionStartOutput extensionStartOutput) {

        //access security registry
        final SecurityRegistry securityRegistry = Services.securityRegistry();

        //create username_password map
        final Map<String, String> usernamePasswordMap = new HashMap<>();
        usernamePasswordMap.put("test_name_1", "password_1");
        usernamePasswordMap.put("test_name_2", "password_2");
        usernamePasswordMap.put("test_name_3", "password_3");

        //create provider
        final MyAuthenticatorProvider provider = new MyAuthenticatorProvider(usernamePasswordMap);

        //set provider
        securityRegistry.setAuthenticatorProvider(provider);

    }

    @Override
    public void extensionStop(final @NotNull ExtensionStopInput extensionStopInput, final @NotNull ExtensionStopOutput extensionStopOutput) {

    }
}

Set Authorizer Provider

This example shows how to set an AuthorizerProvider to the security registry.

public class AuthorizerExtensionMain implements ExtensionMain{

    @Override
    public void extensionStart(final @NotNull ExtensionStartInput extensionStartInput, final @NotNull ExtensionStartOutput extensionStartOutput) {

        //access security registry
        final SecurityRegistry securityRegistry = Services.securityRegistry();

        //create provider
        final MyAuthorizerProvider provider = new MyAuthorizerProvider();

        //set provider
        securityRegistry.setAuthorizerProvider(provider);
    }

    @Override
    public void extensionStop(final @NotNull ExtensionStopInput extensionStopInput, final @NotNull ExtensionStopOutput extensionStopOutput) {

    }
}



Event Registry

The event registry allows every extension to register a ClientLifecycleEventListenerProvider, which must return an implementation of a ClientLifecycleEventListener.

The ClientLifecycleEventListenerProvider.getClientLifecycleEventListener() method is called once for every client, as soon as the first event is fired.

The EventRegistry can be accessed by the Services class.

Client Lifecycle Event Listener

With the ClientLifecycleEventListener set up in a extension, it is possible to listen to these client lifecycle events:

Table 3. Available ClientLifecycleEventListener Methods
Method Input Description

onMqttConnectionStart

ConnectionStartInput

This event is fired as soon as HiveMQ receives a valid MQTT connect packet.

onAuthenticationSuccessful

AuthenticationSuccessfulInput

This event is fired when a client’s authentication succeeds.

onAuthenticationFailedDisconnect

AuthenticationFailedInput

This event is fired when a client’s authentication failed.

onClientInitiatedDisconnect

ClientInitiatedDisconnectInput

This event is fired when a client disconnects gracefully (sends a disconnect packet).

onConnectionLost

ConnectionLostInput

This event is fired when a client disconnects ungracefully (closed connection without disconnect packet sent).

onServerInitiatedDisconnect

ServerInitiatedDisconnectInput

This event is fired when HiveMQ disconnects the client.

onDisconnect

DisconnectEventInput

This event is fired when any kind of disconnect occurs.

To use a ClientLifecycleEventListener, it must be implemented by the extension developer. See example below.

Multiple extensions can set individual ClientLifecycleEventListener. These listeners are called sequentially, starting with the listener of the extension with the highest priority. Extensions with the same priority have an undefined order of calling a listener.

As soon as a extension is started, every client uses the listener to fire its lifecycle events.

When a extension stops, the listener will be removed from every client automatically.


Client Lifecycle Events

Client lifecycle event methods are called by HiveMQ when a client connects, authenticates or disconnects.

The input parameters of these methods are all immutable and for information purpose only.


onMqttConnectionStart

As soon as a connect packet is decoded and validated by HiveMQ the ClientLifecycleEventListener.onMqttConnectionStart method is called with a ConnectionStartInput.

This method must be overridden in the implementation of the ClientLifecycleEventListener.

The input contains the following information:

  • MQTT CONNECT Packet

  • MQTT Version

  • Client ID

  • IP Address

  • Listener with port, bind address and type

  • ConnectionAttributeStore

  • TLS


onAuthenticationSuccessful

When a client’s authentication succeeds the ClientLifecycleEventListener.onAuthenticationSuccessful method is called with an AuthenticationSuccessfulInput.

This method must be overridden in the implementation of the ClientLifecycleEventListener.

The input contains the following information:

  • MQTT Version

  • Client ID

  • IP Address

  • Listener with port, bind address and type

  • ConnectionAttributeStore

  • TLS


onAuthenticationFailedDisconnect

When a client’s authentication fails the ClientLifecycleEventListener.onAuthenticationFailedDisconnect method is called with a AuthenticationFailedInput.

If this method was called no other disconnect method will be called.

When this method is not overridden in the ClientLifecycleEventListener, onDisconnect is called.

The input contains the following information:

  • Optional disconnect reason code

  • Optional reason string

  • Optional user properties

  • MQTT version

  • Client ID

  • IP address

  • Listener with port, bind address and type

  • ConnectionAttributeStore

  • TLS


onClientInitiatedDisconnect

When a client sends a disconnect packet (disconnects gracefully) the ClientLifecycleEventListener.onClientInitiatedDisconnect method is called with a ClientInitiatedDisconnectInput.

If this method was called no other disconnect method will be called.

When this method is not overridden in the ClientLifecycleEventListener, onDisconnect is called.

The input contains the following information:

  • Optional disconnect reason code

  • Optional reason string

  • Optional user properties

  • MQTT version

  • Client ID

  • IP address

  • Listener with port, bind address and type

  • ConnectionAttributeStore

  • TLS


onConnectionLost

When a client disconnects without sending any disconnect packet (disconnects ungracefully) the ClientLifecycleEventListener.onConnectionLost method is called with a ConnectionLostInput.

If this method was called no other disconnect method will be called.

When this method is not overridden in the ClientLifecycleEventListener, onDisconnect is called.

The input contains the following information:

  • Optional disconnect reason code

  • Optional reason string

  • Optional user properties

  • MQTT version

  • Client ID

  • IP address

  • Listener with port, bind address and type

  • ConnectionAttributeStore

  • TLS


onServerInitiatedDisconnect

When HiveMQ disconnects a client the ClientLifecycleEventListener.onServerInitiatedDisconnect method is called with a ServerInitiatedDisconnectInput.

If this method was called no other disconnect method will be called.

When this method is not overridden in the ClientLifecycleEventListener, onDisconnect is called.

The input contains the following information:

  • Optional disconnect reason code

  • Optional reason string

  • Optional user properties

  • MQTT version

  • Client ID

  • IP address

  • Listener with port, bind address and type

  • ConnectionAttributeStore

  • TLS


onDisconnect

For every kind of specific disconnect method which is not overridden in the ClientLifecycleEventListener implementation the ClientLifecycleEventListener.onDisconnect method is called with a DisconnectEventInput.

This method will never be called, when every specific disconnect method is overridden in the ClientLifecycleEventListener.

The specific methods are:

This method must be overridden in the implementation of the ClientLifecycleEventListener.

The input contains the following information:

  • Optional disconnect reason code

  • Optional reason string

  • Optional user properties

  • MQTT version

  • Client ID

  • IP address

  • Listener with port, bind address and type

  • ConnectionAttributeStore

  • TLS


Example Usage

This section illustrates examples on how to set a ClientLifecycleEventListenerProvider via the EventRegistry.


Accessing Event Registry

This example show how to access EventRegistry via the Services class.

final EventRegistry eventRegistry = Services.eventRegistry();


Implement A Simple ClientLifecycleEventListener

This example shows how to implement a very simple ClientLifecycleEventListener.

Only mandatory methods have been overridden in this example.

//create a class implementing ClientLifecycleEventListener
public class SimpleEventListener implements ClientLifecycleEventListener {

    //override mandatory onMqttConnectionStart method
    @Override
    public void onMqttConnectionStart(final @NotNull ConnectionStartInput connectionStartInput) {
        final MqttVersion version = connectionStartInput.getConnectionInformation().getMqttVersion();
        switch (version){
            case V_5:
                log.info("MQTT 5 client connected with id: {} ", connectionStartInput.getClientInformation().getClientId());
                break;
            case V_3_1_1:
                log.info("MQTT 3.1.1 client connected with id: {} ", connectionStartInput.getClientInformation().getClientId());
                break;
            case V_3_1:
                log.info("MQTT 3.1 client connected with id: {} ", connectionStartInput.getClientInformation().getClientId());
                break;
        }
    }

    //override mandatory onAuthenticationSuccessful method
    @Override
    public void onAuthenticationSuccessful(final @NotNull AuthenticationSuccessfulInput authenticationSuccessfulInput) {
        log.info("Authentication succeeded for client: {}", authenticationSuccessfulInput.getClientInformation().getClientId());
    }

    //override mandatory onDisconnect method
    @Override
    public void onDisconnect(final @NotNull DisconnectEventInput disconnectEventInput) {
        log.info("Client disconnected: {}", disconnectEventInput.getClientInformation().getClientId());
    }
}


Implement A Full ClientLifecycleEventListener

This example shows how to implement a ClientLifecycleEventListener with all methods overridden.

//create a class implementing ClientLifecycleEventListener
public class FullEventListener implements ClientLifecycleEventListener {

    //override mandatory onMqttConnectionStart method
    @Override
    public void onMqttConnectionStart(final @NotNull ConnectionStartInput connectionStartInput) {
        final MqttVersion version = connectionStartInput.getConnectPacket().getMqttVersion();
        switch (version){
            case V_5:
                log.info("MQTT 5 client connected with id: {} ", connectionStartInput.getClientInformation().getClientId());
                break;
            case V_3_1_1:
                log.info("MQTT 3.1.1 client connected with id: {} ", connectionStartInput.getClientInformation().getClientId());
                break;
            case V_3_1:
                log.info("MQTT 3.1 client connected with id: {} ", connectionStartInput.getClientInformation().getClientId());
                break;
        }
    }

    //override mandatory onAuthenticationSuccessful method
    @Override
    public void onAuthenticationSuccessful(final @NotNull AuthenticationSuccessfulInput authenticationSuccessfulInput) {
        log.info("Authentication succeeded for client: {}", authenticationSuccessfulInput.getClientInformation().getClientId());
    }

    //override optional onAuthenticationFailedDisconnect method
    @Override
    public void onAuthenticationFailedDisconnect(final  @NotNull AuthenticationFailedInput authenticationFailedInput) {
        log.info("Authentication failed for client: {}", authenticationFailedInput.getClientInformation().getClientId());
    }

    //override optional onConnectionLost method
    @Override
    public void onConnectionLost(final  @NotNull ConnectionLostInput connectionLostInput) {
        log.info("Ungraceful disconnect from client: {}", connectionLostInput.getClientInformation().getClientId());
    }

    //override optional onClientInitiatedDisconnect method
    @Override
    public void onClientInitiatedDisconnect(final  @NotNull ClientInitiatedDisconnectInput clientInitiatedDisconnectInput) {
        log.info("Graceful disconnect from client: {}", clientInitiatedDisconnectInput.getClientInformation().getClientId());
    }

    //override optional onServerInitiatedDisconnect method
    @Override
    public void onServerInitiatedDisconnect(final  @NotNull ServerInitiatedDisconnectInput serverInitiatedDisconnectInput) {
        log.info("Server disconnected client: {}", serverInitiatedDisconnectInput.getClientInformation().getClientId());
    }

    //override mandatory onDisconnect method
    @Override
    public void onDisconnect(final @NotNull DisconnectEventInput disconnectEventInput) {
        //as every other disconnect method is overridden this one will never be called from HiveMQ
        log.info("Client disconnected: {}", disconnectEventInput.getClientInformation().getClientId());
    }
}


Register A ClientLifecycleEventListenerProvider

This example shows how to implement and register a ClientLifecycleEventListenerProvider.

It uses the two different ClientLifecycleEventListener explained above.

...

@Override
public void extensionStart(final ExtensionStartInput extensionStartInput, final ExtensionStartOutput extensionStartOutput) {

    //create new SimpleEventListener
    final ClientLifecycleEventListener simpleEventListener = new SimpleEventListener();

    //create new FullEventListener
    final ClientLifecycleEventListener fullEventListener = new FullEventListener();

    final ClientLifecycleEventListenerProvider eventListenerProvider = new ClientLifecycleEventListenerProvider() {

        //override mandatory getClientLifecycleEventListener method
        @Override
        public @Nullable ClientLifecycleEventListener getClientLifecycleEventListener(final @NotNull ClientLifecycleEventListenerProviderInput clientLifecycleEventListenerProviderInput) {

            //check client id for returning either simple or full listener
            if (clientLifecycleEventListenerProviderInput.getClientInformation().getClientId().contains("simple")) {
                return simpleEventListener;
            } else {
                return fullEventListener;
            }
        }
    };

    //call Services.eventRegistry() to set the provider.
    Services.eventRegistry().setClientLifecycleEventListener(eventListenerProvider);
}


...