Callbacks

A core concept of HiveMQ plugin development is using the callbacks HiveMQ provides to execute custom business logic on events when they occur.

To hook your callback implementations into HiveMQ, you can use the Callback Registry.

Example of Registering a Plugin Callback
import com.hivemq.spi.PluginEntryPoint;
import com.hivemq.spi.callback.lowlevel.OnPingCallback;
import com.hivemq.spi.security.ClientData;

import javax.annotation.PostConstruct;

public class TestPlugin extends PluginEntryPoint {

    @PostConstruct
    public void postConstruct() {
        getCallbackRegistry().addCallback(new OnPingCallback() { (1)
            @Override
            public void onPingReceived(ClientData clientData) {
                System.out.println("Ping received");
            }
        });
    }
}
1 Registering the anonymous inner callback class
Same priorities on callbacks
If callbacks have the same priority, the execution order of these callbacks is not predictable.

Callback Types

The Callback Hierarchy

Callback Hierarchy

Every callback interface which can be implemented is a Asynchronous or Synchronous callback.

The only difference which matters for plugin implementors is, that Synchronous Callbacks have priorities. Priorities define the order of the callback execution when there is more than one implementation of a callback. Use the constants from the com.hivemq.spi.callback.CallbackPriority class or define your own numerical return value. Lower values mean higher priority.

Synchronous callbacks are executed in order because they typically allow to interfere with the message processing/authentication/authorization mechanism of HiveMQ. Naturally, it is very important to prevent blocking in Synchronous Callbacks.

Synchronous Callback Execution

Synchronous Callback Execution

Asynchronous Callbacks are executed in parallel. HiveMQ synchronizes after the parallel execution. That means, that a slow callback can block HiveMQ, although all other callbacks are already finished.

Asynchronous Callback Execution

Asynchronous Callback Execution

Overview of all Callbacks

Table 1. Callbacks
Name Type Description

Broker Event Callbacks

OnBrokerStart

Synchronous

Called when the broker starts up. This happens before bootstrapping functionality like binding network interfaces.

OnBrokerReady

Synchronous

Called when the broker is ready. This is after all the bootstrapping is done and after a possible cluster join is tried.

OnBrokerStop

Synchronous

Called when the broker stops.

MQTT Message Callbacks

OnConnectCallback

Synchronous

Called when a CONNECT message arrives.

OnDisconnectCallback

Asynchronous

Called when a DISCONNECT message arrives or a TCP connection loss occurs.

OnPublishReceivedCallback

Synchronous

Called when a PUBLISH MQTT message arrives.

OnPublishSend

Asynchronous

Called when the broker is sending a PUBLISH MQTT message.

OnSubscribeCallback

Synchronous

Called when a MQTT SUBSCRIBE message arrives.

OnTopicSubscriptionCallback

Synchronous

Called for every topic a client wants to subscribe to.

OnUnsubscribeCallback

Asynchronous

Called when a MQTT UNSUBSCRIBE message arrives.

OnConnackSend

Asynchronous

Called when the broker is sending a MQTT CONNACK message.

OnPingCallback

Asynchronous

Called when a MQTT PINGREQ message arrives.

OnPubackReceived

Asynchronous

Called when a MQTT PUBACK message arrives.

OnPubackSend

Asynchronous

Called when the broker sent a MQTT PUBACK message.

OnPubcompReceived

Asynchronous

Called when a MQTT PUBCOMP message arrives.

OnPubcompSend

Asynchronous

Called when the broker sent a MQTT PUBCOMP message.

OnPubrecReceived

Asynchronous

Called when a MQTT PUBREC message arrives.

OnPubrecSend

Asynchronous

Called when the broker sent a MQTT PUBREC message.

OnPubrelReceived

Asynchronous

Called when a MQTT PUBREL message arrives.

OnPubrelSend

Asynchronous

Called when the broker sent a MQTT PUBREL message.

OnSessionReadyCallback

Synchronous

Called when a MQTT Session for the connecting client is available.

OnSubackSend

Asynchronous

Called when the broker sent a MQTT SUBACK message.

OnUnsubackSend

Asynchronous

Called when the broker sent a MQTT UNSUBACK message.

BeforePublishSendCallback

Synchronous

Called before the broker sends a MQTT PUBLISH message.

Security Callbacks

AfterLoginCallback

Asynchronous

Called when a client made and successful or unsuccessful login attempt.

OnAuthenticationCallback

Synchronous

Called after a CONNECT message arrived to check the credentials

OnAuthorizationCallback

Synchronous

Returns MqttTopicPermissions when a client publishes or subscribes.

OnInsufficientPermissionDisconnect

Asynchronous

Gets called when a client was disconnected due to insufficient permissions when publishing or subscribing.

RestrictionsAfterLoginCallback

Synchronous

Gets executed after a CONNECT message arrives and the client was authenticated successfully.

Other Callbacks

ClusterDiscoveryCallback

Asynchronous

Gets called periodically by HiveMQ to discover other HiveMQ cluster nodes.

ScheduledCallback

Asynchronous

Gets executed periodically based on a quartz-style cron configuration.

WebUIAuthenticationCallback

Synchronous

Called when a Web UI login occurs.

OnBrokerStart

Type
Synchronous

Purpose
The com.hivemq.spi.callback.events.broker.OnBrokerStart callback is useful for implementing custom startup logic and verifications. A common use case is for example verifying that a database connection can be obtained or that expected files are available and valid.

Interfering with HiveMQ
It is possible to throw a com.hivemq.spi.callback.exception.BrokerUnableToStartException. When the onBrokerStart() method of the OnBrokerStart callback implementation throws this exception, HiveMQ refuses to start.

OnBrokerStartCallback Example
import com.google.inject.Inject;
import com.hivemq.spi.callback.CallbackPriority;
import com.hivemq.spi.callback.events.broker.OnBrokerStart;
import com.hivemq.spi.callback.exception.BrokerUnableToStartException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HiveMQStart implements OnBrokerStart {

    private static final Logger log = LoggerFactory.getLogger(HiveMQStart.class);

    @Override
    public void onBrokerStart() throws BrokerUnableToStartException {
        log.info("HiveMQ is starting");

        if (checkStartupSuccessful()) {
            log.info("Startup check was successful");
        } else {
            throw new BrokerUnableToStartException("Could not start HiveMQ :("); (1)
        }
    }

    @Override
    public int priority() {
        return CallbackPriority.MEDIUM;
    }
}
1 Throwing the BrokerUnableToStartException prevents HiveMQ from starting.

OnBrokerStart

Type
Synchronous

Purpose
The com.hivemq.spi.callback.events.broker.OnBrokerReady callback is useful for all tasks for which OnBrokerStart is to early. A common use case is for example verifying that the cluster view meets expectations or to add custom subscriptions.

Interfering with HiveMQ
It is possible to throw a com.hivemq.spi.callback.exception.IllegalBrokerStateException. When the onBrokerReady() method of the OnBrokerReady callback implementation throws this exception, HiveMQ performs an immediate shutdown. All incoming client connections are suspended until the last registered com.hivemq.spi.callback.events.broker.OnBrokerReady callback is successfully executed.

OnBrokerStartCallback Example
public class BrokerReadyCallback implements OnBrokerReady {
    @Override
    public void onBrokerReady() throws IllegalBrokerStateException {
        if (checkRunningConditions()) {
            addNeccessarySubscriptions();
        } else {
            throw new IllegalBrokerStateException("Cluster is to small"); (1)
        }
    }

    @Override
    public int priority() {
        return CallbackPriority.MEDIUM;
    }
}
1 Throwing the IllegalBrokerStateException causes HiveMQ to stop.

OnBrokerStop

Type
Synchronous

Purpose
The com.hivemq.spi.callback.events.broker.OnBrokerStop callback is useful for implementing custom shutdown logic. A common use case is for example shutting down a database connection.

Interfering with HiveMQ
It’s not possible to interfere with HiveMQ directly with this callback.

OnBrokerStopCallback Example
import com.hivemq.spi.callback.CallbackPriority;
import com.hivemq.spi.callback.events.broker.OnBrokerStop;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.inject.Inject;
import java.io.FileOutputStream;
import java.io.IOException;

public class HiveMQStop implements OnBrokerStop {

    final Logger log = LoggerFactory.getLogger(HiveMQStop.class);
    final FileOutputStream fileOutputStream;

    @Inject
    public HiveMQStop(final FileOutputStream fileOutputStream) {
        this.fileOutputStream = fileOutputStream;
    }

    @Override
    public void onBrokerStop() {
        try {
            fileOutputStream.close();
        } catch (IOException e) {
            log.warn("Output stream could not be closed on broker shutdown.", e);
        }
    }

    @Override
    public int priority() {
        return CallbackPriority.MEDIUM;
    }
}

OnConnectCallback

Type
Synchronous

Purpose
The com.hivemq.spi.callback.events.OnConnectCallback is useful for performing custom logic when a MQTT CONNECT message arrives. A common use case is for example logging when a client connects. Useful edge cases fort his callbacks are when you need to verify against some parts of the CONNECT message like the LWT part.

Interfering with HiveMQ
It is possible to throw a com.hivemq.spi.callback.exception.RefusedConnectionException to disconnect a client with a specific return code for the CONNACK message. A CONNECT message object and a ClientData object — which contains information about the credentials and the optional client authentication certificate — are passed as parameters to the onConnect method.

Although possible for scenarios with only one authentication resource, this callback is not designed to perform authentication, please use the OnAuthenticationCallback for this purpose.
OnConnectCallback Example
import com.hivemq.spi.callback.CallbackPriority;
import com.hivemq.spi.callback.events.OnConnectCallback;
import com.hivemq.spi.callback.exception.RefusedConnectionException;
import com.hivemq.spi.message.CONNECT;
import com.hivemq.spi.message.ReturnCode;
import com.hivemq.spi.security.ClientData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ClientConnect implements OnConnectCallback {

    Logger log = LoggerFactory.getLogger(ClientConnect.class);

    @Override
    public void onConnect(CONNECT connect, ClientData clientData) throws RefusedConnectionException {
        if (clientData.getClientId().equals("bad id")) {
            throw new RefusedConnectionException(ReturnCode.REFUSED_IDENTIFIER_REJECTED);
        }
        log.info("Client {} is connecting", clientData.getClientId());
    }

    @Override
    public int priority() {
        return CallbackPriority.MEDIUM;
    }
}

OnDisconnectCallback

Type
Asynchronous

Purpose
The com.hivemq.spi.callback.events.OnDisconnectCallback is useful for performing custom logic when a client disconnects due to a MQTT DISCONNECT message or a TCP connection loss. The getDisconnectTimestamp() method can be used to determine the exact time of the disconnect.

If the client disconnected due to a connection loss, the parameter abruptDisconnect is true. If the client disconnected gracefully, the parameter is false.

Interfering with HiveMQ
It’s not possible to interfere with HiveMQ directly with this callback. The abruptAbort parameter indicates if there was a TCP connection loss (abruptAbort = true) or a graceful disconnect with a MQTT DISCONNECT message.

OnDisconnectCallback Example
import com.hivemq.spi.callback.events.OnDisconnectCallback;
import com.hivemq.spi.security.ClientData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ClientDisconnect implements OnDisconnectCallback {

    final Logger log = LoggerFactory.getLogger(ClientDisconnect.class);

    @Override
    public void onDisconnect(ClientData clientData, boolean abruptDisconnect) {
        if (abruptDisconnect) {
            log.warn("Connection to client {}, with ip {}, was lost abruptly.", clientData.getClientId(), clientData.getInetAddress());
        } else {
            log.info("Client {} is disconnected.", clientData.getClientId());
        }
    }
}

OnPublishReceivedCallback

Type
Synchronous

Purpose
The com.hivemq.spi.callback.events.OnPublishReceivedCallback gets called when a PUBLISH MQTT message arrives. This callback is useful for validating the PUBLISH messages, e.g. verify that the message has special payload format.

Don’t use this callback for Topic based permissions.
This callback gets called very often. Please make sure you use proper caching and that you don’t block.

Interfering with HiveMQ
It’s possible to throw a com.hivemq.spi.callback.exception.OnPublishReceivedException when the published message is not valid from a business logic point of view. It’s possible to disconnect the publishing client by passing true as parameter to the OnPublishReceivedException.

This callback is not meant to be used for topic restrictions. Please see the Client Authorization Chapter for more details. Also if more than on plugin is throwing an OnPublishReceivedException for the same callback, only the first one will be handled. Therefore the first exception decides, whether the client gets disconnected or not.
OnPublishReceivedCallback Example
import com.google.common.base.Charsets;
import com.hivemq.spi.callback.CallbackPriority;
import com.hivemq.spi.callback.events.OnPublishReceivedCallback;
import com.hivemq.spi.callback.exception.OnPublishReceivedException;
import com.hivemq.spi.message.PUBLISH;
import com.hivemq.spi.security.ClientData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PublishReceived implements OnPublishReceivedCallback {

    final Logger logger = LoggerFactory.getLogger(PublishReceived.class);

    @Override
    public void onPublishReceived(final PUBLISH publish, final ClientData clientData) throws OnPublishReceivedException {
        final String clientID = clientData.getClientId();
        final String topic = publish.getTopic();
        final String message = new String(publish.getPayload(), Charsets.UTF_8);
        logger.debug("Client {} sent a message to topic {}: {}", clientID, topic, message);
        if (topic.equals("database/import") && message.contains("DROP TABLE")) {
            throw new OnPublishReceivedException(true);
        }
    }

    @Override
    public int priority() {
        return CallbackPriority.LOW;
    }
}

OnPublishSend

Type
Asynchronous

Purpose
The com.hivemq.spi.callback.events.OnPublishSend callback gets called when an outgoing MQTT PUBLISH is going to be sent to a subscribing client.

This callback gets called very often. Please make sure you use proper caching and that you xref:general-concepts.adoc#dont-block[don’t block

Interfering with HiveMQ
It’s not possible to interfere with HiveMQ directly with this callback.

OnPublishSend Example
import com.google.common.base.Charsets;
import com.hivemq.spi.callback.events.OnPublishSend;
import com.hivemq.spi.message.PUBLISH;
import com.hivemq.spi.security.ClientData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PublishSend implements OnPublishSend {

    final Logger logger = LoggerFactory.getLogger(PublishSend.class);

    @Override
    public void onPublishSend(final PUBLISH publish, final ClientData clientData) {
        if (logger.isDebugEnabled()) {
            final String clientID = clientData.getClientId();
            final String topic = publish.getTopic();
            final String message = new String(publish.getPayload(), Charsets.UTF_8);

            logger.debug("Client {} received a message on topic {}: {}", clientID, topic, message);
        }
    }
}

OnSubscribeCallback

Type
Synchronous

Purpose
The com.hivemq.spi.callback.events.OnSubscribeCallback callback gets called when a MQTT SUBSCRIBE message arrives. Useful for validating SUBSCRIBE messages or logging subscriptions. This callback is not designed to be used for Authorization. Please see the Authorization chapter for more details.

Interfering with HiveMQ
It’s possible to throw a com.hivemq.spi.callback.exception.InvalidSubscriptionException when the SUBSCRIBE message is invalid. The client gets disconnected when the exception is thrown.

OnSubscribeCallback Example
import com.hivemq.spi.callback.CallbackPriority;
import com.hivemq.spi.callback.events.OnSubscribeCallback;
import com.hivemq.spi.callback.exception.InvalidSubscriptionException;
import com.hivemq.spi.message.SUBSCRIBE;
import com.hivemq.spi.security.ClientData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Subscribe implements OnSubscribeCallback {

    final Logger logger = LoggerFactory.getLogger(Subscribe.class);

    @Override
    public void onSubscribe(final SUBSCRIBE message, final ClientData clientData) throws InvalidSubscriptionException {
        logger.debug("Client {} is now subscribed to the topics: {}.", clientData.getClientId(), message.getTopics());
    }

    @Override
    public int priority() {
        return CallbackPriority.MEDIUM;
    }
}

OnTopicSubscriptionCallback

Type
Synchronous

Purpose
The com.hivemq.spi.callback.events.OnTopicSubscriptionCallback callback gets called for every topic in an arrived MQTT SUBSCRIBE message. It is used to manually set the return codes in the following MQTT SUBACK message.

Breaking the MQTT protocol
It is possible to break the MQTT specification if you return codes in the SUBACK message that are not permitted in the specification. Especially if you return a higher quality of service than the client requested.

OnTopicSubscriptionCallback Example
public class DenyQoS1 implements OnTopicSubscriptionCallback {
    @Override
    public SubackReturnCode getSubackReturnCodeForClient(final Topic topic, final SubackReturnCode authorizationResult, final ClientData clientData) {
        if (SubackReturnCode.SUCCESS_QOS_1.equals(authorizationResult)) {
            return SubackReturnCode.SUCCESS_QOS_0;
        }
        return authorizationResult;
    }

    @Override
    public int priority() {
        return CallbackPriority.MEDIUM;
    }
}

OnUnsubscribeCallback

Type
Asynchronous

Purpose
The com.hivemq.spi.callback.events.OnUnsubscribeCallback callback gets called when a MQTT UNSUBSCRIBE message arrives.

Interfering with HiveMQ
It’s not possible to interfere with HiveMQ directly with this callback.

OnSubscribeCallback Example
import com.hivemq.spi.callback.events.OnUnsubscribeCallback;
import com.hivemq.spi.message.UNSUBSCRIBE;
import com.hivemq.spi.security.ClientData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class UnSubscribe implements OnUnsubscribeCallback {

    final Logger logger = LoggerFactory.getLogger(UnSubscribe.class);

    @Override
    public void onUnsubscribe(final UNSUBSCRIBE message, final ClientData clientData) {
        logger.debug("Client {} is no longer subscribed to the topics: {}.", clientData.getClientId(), message.getTopics());
    }
}

OnConnackSend

Type
Asynchronous

Purpose
The com.hivemq.spi.callback.lowlevel.OnConnackSend callback gets called when the broker is sending a MQTT CONNACK message. Useful to check of a client was actually connected successfully or if the connection failed for some reason.

Interfering with HiveMQ
It’s not possible to interfere with HiveMQ directly with this callback.

OnConnackSend Example
import com.hivemq.spi.callback.lowlevel.OnConnackSend;
import com.hivemq.spi.message.CONNACK;
import com.hivemq.spi.message.ReturnCode;
import com.hivemq.spi.security.ClientData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConnackSent implements OnConnackSend {

    final Logger logger = LoggerFactory.getLogger(ConnackSent.class);

    @Override
    public void onConnackSend(CONNACK connack, ClientData clientData) {
        if (connack.getReturnCode() == ReturnCode.ACCEPTED) {
            logger.debug("Client {} connected successfully.", clientData.getClientId());
        } else {
            logger.warn("Client {} failed to connect. Return code = {}", clientData.getClientId(), connack.getReturnCode().name());
        }
    }
}

OnPingCallback

Type
Asynchronous

Purpose
The com.hivemq.spi.callback.lowlevel.OnPingCallback callback gets called when a MQTT PINGREQ message arrives. The callback is useful to check if client is still active. Note that PINGREQ messages are only sent by clients if no other messages are sent during the keep alive period.

Interfering with HiveMQ
It’s not possible to interfere with HiveMQ directly with this callback.

OnPingCallback Example
import com.hivemq.spi.callback.lowlevel.OnPingCallback;
import com.hivemq.spi.security.ClientData;

import java.util.Date;
import java.util.HashMap;
import java.util.Map;

public class PingReceived implements OnPingCallback {

    private final Map<String, Date> clientActivity = new HashMap<>();

    @Override
    public void onPingReceived(final ClientData clientData) {
        clientActivity.put(clientData.getClientId(), new Date());
    }
}

OnPubackReceived

Type
Asynchronous

Purpose
The com.hivemq.spi.callback.lowlevel.OnPubackReceived callback gets called when a MQTT PUBACK message arrives. The Callback can be used to implement business logic, that should not be processed until a message was received by the client at least once.

Interfering with HiveMQ
It’s not possible to interfere with HiveMQ directly with this callback.

OnPubackReceived Example
import com.hivemq.spi.callback.lowlevel.OnPubackReceived;
import com.hivemq.spi.message.PUBACK;
import com.hivemq.spi.security.ClientData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PubackReceived implements OnPubackReceived {

    final Logger logger = LoggerFactory.getLogger(PubackReceived.class);

    @Override
    public void onPubackReceived(PUBACK puback, ClientData clientData) {
        logger.debug("Client {} received a PUBLISH message with id {}", puback.getMessageId());
    }
}

OnPubackSend

Type
Asynchronous

Purpose
The com.hivemq.spi.callback.lowlevel.OnPubackSend callback gets called when the broker is sending a MQTT PUBACK message.

Interfering with HiveMQ
It’s not possible to interfere with HiveMQ directly with this callback.

OnPubackSend Example
import com.hivemq.spi.callback.lowlevel.OnPubackSend;
import com.hivemq.spi.message.PUBACK;
import com.hivemq.spi.security.ClientData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PubackSend implements OnPubackSend {

    final Logger logger = LoggerFactory.getLogger(PubackSend.class);

    @Override
    public void onPubackSend(PUBACK puback, ClientData clientData) {
        logger.debug("A PUBACK message with id {} sent by client {} was received.", puback.getMessageId(), clientData.getClientId());
    }
}

OnPubcompReceived

Type
Asynchronous

Purpose
The com.hivemq.spi.callback.lowlevel.OnPubcompReceived callback gets called when a MQTT PUBCOMP message arrives. The callback is executed as soon as the transfer of a PUBLISH message with QoS 2 to a subscriber is completed.

Interfering with HiveMQ
It’s not possible to interfere with HiveMQ directly with this callback.

OnPubcompReceived Example
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
import com.hivemq.spi.callback.lowlevel.OnPubcompReceived;
import com.hivemq.spi.message.PUBCOMP;
import com.hivemq.spi.security.ClientData;

public class PubcompReceived implements OnPubcompReceived {

    final Multimap<String, Integer> incompleteMessageTransfers = HashMultimap.create();

    @Override
    public void onPubcompReceived(final PUBCOMP pubcomp, final ClientData clientData) {
        incompleteMessageTransfers.remove(clientData.getClientId(), pubcomp.getMessageId());
    }
}

OnPubcompSend

Type
Asynchronous

Purpose
The com.hivemq.spi.callback.lowlevel.OnPubcompSend callback gets called when the broker is sending a MQTT PUBCOMP message. The callback is executed as soon as the transfer of a PUBLISH message with QoS 2 to the broker is completed.

Interfering with HiveMQ
It’s not possible to interfere with HiveMQ directly with this callback.

OnPubcompSend Example
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
import com.hivemq.spi.callback.lowlevel.OnPubcompSend;
import com.hivemq.spi.message.PUBCOMP;
import com.hivemq.spi.security.ClientData;

public class PubcompSend implements OnPubcompSend {

    final Multimap<String, Integer> incompleteMessageTransfers = HashMultimap.create();

    @Override
    public void onPubcompSend(final PUBCOMP pubcomp, final ClientData clientData) {
        incompleteMessageTransfers.remove(clientData.getClientId(), pubcomp.getMessageId());
    }
}

OnPubrecReceived

Type
Asynchronous

Purpose
The com.hivemq.spi.callback.lowlevel.OnPubrecReceived callback gets called when a MQTT PUBREC message arrives.

Interfering with HiveMQ
It’s not possible to interfere with HiveMQ directly with this callback.

OnPubrecReceived Example
import com.hivemq.spi.callback.lowlevel.OnPubrecReceived;
import com.hivemq.spi.message.PUBREC;
import com.hivemq.spi.security.ClientData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PubrecReceived implements OnPubrecReceived {

    final Logger logger = LoggerFactory.getLogger(PubrecReceived.class);

    @Override
    public void onPubackReceived(final PUBREC pubrec, final ClientData clientData) {
        logger.debug("Client {} received a PUBLISH message with id {}", pubrec.getMessageId());
    }
}

OnPubrecSend

Type
Asynchronous

Purpose
The com.hivemq.spi.callback.lowlevel.OnPubrecSend callback gets called when the broker is sending a MQTT PUBREC message.

Interfering with HiveMQ
It’s not possible to interfere with HiveMQ directly with this callback.

OnPubrecSend Example
import com.hivemq.spi.callback.lowlevel.OnPubrecSend;
import com.hivemq.spi.message.PUBREC;
import com.hivemq.spi.security.ClientData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PubrecSend implements OnPubrecSend {

    final Logger logger = LoggerFactory.getLogger(PubrecSend.class);

    @Override
    public void onPubrecSend(final PUBREC puback, final ClientData clientData) {
        logger.debug("A PUBLISH message with id {} sent by publisher {} was received.", pubrec.getMessageId(), clientData.getClientId());
    }
}

OnPubrelReceived

Type
Asynchronous

Purpose
The com.hivemq.spi.callback.lowlevel.OnPubrelReceived callback gets called when a MQTT PUBREL message arrives.

Interfering with HiveMQ
It’s not possible to interfere with HiveMQ directly with this callback.

OnPubrelReceived Example
import com.hivemq.spi.callback.lowlevel.OnPubrelReceived;
import com.hivemq.spi.message.PUBREL;
import com.hivemq.spi.security.ClientData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PubrelReceived implements OnPubrelReceived {

    Logger logger = LoggerFactory.getLogger(PubrelReceived.class);

    @Override
    public void onPubrelReceived(final PUBREL pubrel, final ClientData clientData) {
        logger.debug("A PUBREL message with id {} was sent by publisher {}.", pubrel.getMessageId(), clientData.getClientId());
    }
}

OnPubrelSend

Type
Asynchronous

Purpose
The com.hivemq.spi.callback.lowlevel.OnPubrelSend callback gets called when the broker is sending a MQTT PUBREL message.

Interfering with HiveMQ
It’s not possible to interfere with HiveMQ directly with this callback.

OnPubrelSend Example
import com.hivemq.spi.callback.lowlevel.OnPubrelSend;
import com.hivemq.spi.message.PUBREL;
import com.hivemq.spi.security.ClientData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PubrelSend implements OnPubrelSend {

    Logger logger = LoggerFactory.getLogger(PubrelSend.class);

    @Override
    public void onPubrelSend(final PUBREL pubrel, final ClientData clientData) {
        logger.debug("A PUBREL message with id {} was sent to subscriber {}.", pubrel.getMessageId(), clientData.getClientId());
    }
}

OnSessionReadyCallback

Type
Synchronous

Purpose
The com.hivemq.spi.callback.events.OnSessionReadyCallback callback gets called when the node where the client is connected is sure that a MQTT Session is available for the client. It is useful for performing session scoped actions as using the Session Attribute Store or the Client Group Service.

Interfering with HiveMQ
HiveMQ will wait for the last registered OnSessionReady callback to return before sending a MQTT CONNACK message to the connecting client.

OnSessionReadyCallback Example
public class SessionReady implements OnSessionReadyCallback {

    private static final Logger log = LoggerFactory.getLogger(SessionReady.class);

    private static final String TOKEN = "token";
    private final BlockingSessionAttributeStore attributeStore;

    @Inject
    SessionReady(final BlockingSessionAttributeStore attributeStore) { (1)
        this.attributeStore = attributeStore;
    }

    @Override
    public void onSessionReady(final ClientData clientData) {

        final Optional<String> tokenValue = clientData.getConnectionAttributeStore().getAsString(TOKEN); (2)

        if (tokenValue.isPresent()) {
            final String clientId = clientData.getClientId();
            try {
                attributeStore.putAsString(clientId, TOKEN, tokenValue.get());
                log.info("Successfully set token for client {}.", clientId); (3)
            } catch (final Exception exception) {
                log.error("Failed to set token for client {}.", clientId, exception); (3)
            }
        }
    }

    @Override
    public int priority() {
        return CallbackPriority.MEDIUM;
    }
}
1 Inject necessary Dependencies.
2 Perform Session scoped actions.
3 Log error or success.

OnSubackSend

Type
Asynchronous

Purpose
The com.hivemq.spi.callback.lowlevel.OnSubackSend callback gets called when the broker is sending a MQTT SUBACK message. Useful to check of a client was subscribed successfully. Note that a client which sent an invalid SUBSCRIBE message is usually disconnected immediately.

Interfering with HiveMQ
It’s not possible to interfere with HiveMQ directly with this callback.

OnSubackSend Example
import com.hivemq.spi.callback.lowlevel.OnSubackSend;
import com.hivemq.spi.message.SUBACK;
import com.hivemq.spi.security.ClientData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SubackSend implements OnSubackSend {

    final Logger logger = LoggerFactory.getLogger(SubackSend.class);

    @Override
    public void onSubackSend(final SUBACK suback, final ClientData clientData) {
        logger.debug("The subscription of client {} with id {} was successful.", clientData.getClientId(), suback.getMessageId());
    }
}

OnUnsubackSend

Type
Asynchronous

Purpose
The com.hivemq.spi.callback.lowlevel.OnUnsubackSend callback gets called when the broker is sending a MQTT UNSUBACK message. Useful to check if a client was unsubscribed successfully.

Interfering with HiveMQ
It’s not possible to interfere with HiveMQ directly with this callback.

OnUnsubackSend Example
import com.hivemq.spi.callback.lowlevel.OnUnsubackSend;
import com.hivemq.spi.message.UNSUBACK;
import com.hivemq.spi.security.ClientData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class UnsubackSend implements OnUnsubackSend {

    final Logger logger = LoggerFactory.getLogger(UnsubackSend.class);

    @Override
    public void onSubackSend(final UNSUBACK unsuback, final ClientData clientData) {
        logger.debug("The unsubscribe of client {} with id {} was successful.", clientData.getClientId(), unsuback.getMessageId());
    }
}

BeforePublishSendCallback

Type
Synchronous

Purpose
The com.hivemq.spi.callback.events.BeforePublishSendCallback gets called just before a MQTT PUBLISH message is sent to a client. Some properties, like topic and payload, of the to be sent message can be changed. This will alter the message in question, but won’t have any effect on the internal logic of the broker. This means that a client subscribed to topic1 will receive a message published to topic1 even if you change the topic in the BeforePublishSendCallback to topic2.

Interfering with HiveMQ
In addition to changing the content of the PUBLISH message, this callback can be used to tell HiveMQ to drop this message for the intended client. This can be achieved by throwing a com.hivemq.spi.callback.exception.BeforePublishSendException.

BeforePublishSend Example
public class PublishSend implements BeforePublishSendCallback {

     @Override
     public void beforePublishSend(final ModifiablePUBLISH publish, final ClientData clientData) throws BeforePublishSendException {
         final String clientId = clientData.getClientId();
         if ("client1".equals(clientId)) {
             final String newPayload = new String(publish.getPayload()) + "<important Information>"; (1)
             publish.setPayload(newPayload.getBytes());
         }
         if ("client2".equals(clientId)) {
             throw BeforePublishSendException.getInstance(); (2)
         }
     }

     @Override
     public int priority() {
         return CallbackPriority.MEDIUM;
     }
 }
1 Change all the payloads to client1 to contain some important information.
2 Drop all PUBLISH messages that would have gone to client2.

AfterLoginCallback

Type
Asynchronous

Purpose
The com.hivemq.spi.callback.security.AfterLoginCallback gets called when a client made a successful or unsuccessful login attempt. This happens when the client sent a CONNECT message and authentication took place. The callback offers the method afterSuccessfulLogin for successful logins and a method afterFailedLogin for unsuccessful login attempts.

Interfering with HiveMQ
It’s not possible to interfere with HiveMQ directly with this callback.

AfterLoginCallback Example
import com.hivemq.spi.callback.exception.AuthenticationException;
import com.hivemq.spi.callback.security.AfterLoginCallback;
import com.hivemq.spi.security.ClientData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AfterLogin implements AfterLoginCallback {

    final Logger logger = LoggerFactory.getLogger(AfterLogin.class);

    @Override
    public void afterSuccessfulLogin(final ClientData clientData) {
        logger.debug("Client {} logged in successfully.", clientData.getUsername().get());
    }

    @Override
    public void afterFailedLogin(final AuthenticationException exception, final ClientData clientData) {
        logger.warn("Client {} failed to log in. Return code: {}",
                clientData.getUsername().or(clientData.getClientId()), exception.getReturnCode().name());
    }
}

OnAuthenticationCallback

Type
Synchronous

Purpose
The com.hivemq.spi.callback.security.OnAuthenticationCallback gets called after a CONNECT message arrives and handles the authentication of the client. Username/Password from the MQTT CONNECT message can be used to authenticate the client or when using client certificate authentication for the transport layer authentication, the client certificate can also be used to authenticate on the application layer.

If your scenario allows this, you should use Caching.

Interfering with HiveMQ
The checkCredentials method must return either true or false, depending if the authentication was successful. When the authentication wasn’t successful and you want to control the CONNACK MQTT message return code, you can throw a AuthenticationException. This has the side effect that all other plugins which implement authentication are ignored once the exception was thrown.

OnAuthenticationCallback Example

See username/password authentication.


OnAuthorizationCallback

Type
Synchronous

Purpose
The com.hivemq.spi.callback.security.OnAuthorizationCallback is responsible for returning a list of com.hivemq.spi.topic.MqttTopicPermission. Every time a specific action on a topic like publishing or subscribing is done by a client, the callback is executed to check if the client is allowed to do this.

This callback gets called very often, so make sure you use proper Caching.
The OnAuthorization callback is also called when a client connects with a CONNECT message containing a last will. In this case a call to clientData.isAuthenticated() will return false, because the client is only marked as authenticated after the permissions for the LWT have been checked. See also the Authentication and Permissions chapter.

Interfering with HiveMQ
HiveMQ uses the returned MqttTopicPermissions to check if a client is allowed to do a specific action.

OnAuthorizationCallback Example

See authorization example.


OnInsufficientPermissionsCallback

Type
Asynchronous

Purpose
The com.hivemq.spi.callback.security.OnInsufficientPermissionDisconnect gets called when a client was disconnected due to insufficient permissions when publishing or subscribing. At the time the callback gets executed the client was already disconnected, so this is an informational callback.

Interfering with HiveMQ
It’s not possible to interfere with HiveMQ directly with this callback.


RestrictionsAfterLoginCallback

Type
Synchronous

Purpose
The com.hivemq.spi.callback.security.RestrictionsAfterLoginCallback gets executed after a CONNECT message arrives and the client was authenticated successfully. This callback provides restrictions which affect only a specific client, e.g. throttling or maximum allowed MQTT message sizes. HiveMQ version 3.3.0 and later allows configuring offline client queue capacity and discard strategy individually for each client using the MAX_QUEUED_MESSAGES and DISCARD_STRATEGY restriction types.

Interfering with HiveMQ
The callback returns a set of restrictions which are applied to the specific client.

RestrictionsAfterLoginCallback Example
import com.hivemq.spi.callback.security.RestrictionsAfterLoginCallback;
import com.hivemq.spi.security.ClientData;
import com.hivemq.spi.security.Restriction;
import com.hivemq.spi.security.RestrictionType;

import java.util.HashSet;
import java.util.Set;

public class RestrictionsAfterLogin implements RestrictionsAfterLoginCallback {

    @Override
    public Set<Restriction> getRestrictions(final ClientData clientData) {

        final Set<Restriction> restrictions = new HashSet<>();

        restrictions.add(new Restriction(RestrictionType.MAX_PUBLISH_MESSAGE_SIZE, 128l)); (1)
        restrictions.add(new Restriction(RestrictionType.MAX_OUTGOING_BYTES_SEC, 128l)); (2)
        restrictions.add(new Restriction(RestrictionType.MAX_INCOMING_BYTES, 128l)); (3)
        restrictions.add(new Restriction(RestrictionType.WRITE_BUFFER_HIGH_THRESHOLD, 128l)); (4)
        restrictions.add(new Restriction(RestrictionType.WRITE_BUFFER_LOW_THRESHOLD, 128l)); (5)
        restrictions.add(new Restriction(RestrictionType.MAX_QUEUED_MESSAGES, 128l)); (6)
        restrictions.add(new Restriction(RestrictionType.DISCARD_STRATEGY, QueuedMessageStrategy.DISCARD)); (7)
        restrictions.add(new Restriction(RestrictionType.CLIENT_SESSION_TTL, 300l)); (8)
        restrictions.add(new Restriction(RestrictionType.INFLIGHT_QUEUE_SIZE, 1000l)); (9)

        return restrictions;
    }
}
1 Limit the maximum size of a message the client can send.
2 Add throttling for sending messages out to the client.
3 Add throttling for receiving messages from the client.
4 Set the high fill state of the write buffer for this client.
5 Set the low fill state of the write buffer for this client.
6 Set the maximum amount of messages that are queued for the client if clean session is false.
7 Set the strategy to discard messages when the queue limit is reached DISCARD or DISCARD_OLDEST.
8 Set the Time To Live or TTL (in seconds) for the session of a client. If the client is disconnected longer than the amount defined in CLIENT_SESSION_TTL the session will be removed. Only affects clients with clean session is false.
9 The maximum size of the in-flight message queue for this client. Only affects online clients.

ClusterDiscoveryCallback

Type
Asynchronous

Purpose
The com.hivemq.spi.callback.cluster.ClusterDiscoveryCallback is used to discover other nodes to form a cluster with. Nodes are discovered by calling getNodeAddresses, which is called periodically by HiveMQ. The interval at which getNodeAddresses is called can be configured in HiveMQ’s configuration file. The callback also provides you with the means to register/unregister this HiveMQ nodes with the methods init and destroy.

For example the ClusterDiscoveryCallback can be used to implement discovery via database, via shared files or via an external registry like etcd or Consul.

Like any other callback, you have to manually add a concrete ClusterDiscoveryCallback to the Callback Registry at plugin startup time or at runtime.

Interfering with HiveMQ
The callback returns a list of ClusterNodeAddress which are used by HiveMQ form a cluster. A ClusterNodeAddress essentially consists of a hostname/IP and port. Every available node for this cluster must be represented with one ClusterNodeAddress in the returned list.

If there is more than one ClusterDiscoveryCallback at a time all results of all plugins are used to form a cluster.

ClusterDiscoveryCallback Example
import com.google.common.util.concurrent.ListenableFuture;
import com.google.inject.Inject;
import com.hivemq.spi.callback.cluster.ClusterDiscoveryCallback;
import com.hivemq.spi.callback.cluster.ClusterNodeAddress;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;

public class ClusterDiscovery implements ClusterDiscoveryCallback {

    private static final Logger log = LoggerFactory.getLogger(ClusterDiscovery.class);

    private final ExternalRegistry externalRegistry; (1)

    private String clusterId;

    @Inject
    public ClusterDiscovery(final ExternalRegistry externalRegistry) {
        this.externalRegistry = externalRegistry;
    }

    @Override
    public void init(final String clusterId, final ClusterNodeAddress ownAddress) { (2)
        this.clusterId = clusterId;

        log.info("Registering HiveMQ with clusterId {} with central registry", clusterId);
        externalRegistry.put(clusterId, ownAddress.getHost(), ownAddress.getPort());
    }

    @Override
    public ListenableFuture<List<ClusterNodeAddress>> getNodeAddresses() { (3)
        final ListenableFuture<List<ClusterNodeAddress>> allClusterNodesFuture = externalRegistry.getAllNodes();
        return allClusterNodesFuture;
    }

    @Override
    public void destroy() { (4)
        log.info("Removing HiveMQ with clusterId {} from central registry", clusterId);
        externalRegistry.remove(clusterId);
    }
}
1 some kind of central registry which holds the information for all your cluster nodes
2 Gets called one time when HiveMQ starts its cluster
3 Gets called periodically to discover new cluster nodes at runtime
4 Gets called one time when HiveMQ shuts down

ScheduledCallback

Type
Asynchronous

Purpose
The com.hivemq.spi.callback.schedule.ScheduledCallback gets executed periodically. See the Scheduled Callback Execution chapter for more information.

Interfering with HiveMQ
It’s not possible to interfere with HiveMQ directly with this callback. When the callback is added to the Callback Registry, a validation of the quartz-style cron expression will take place. If the expression is invalid, the callback won’t be executed.

ScheduledCallback Example

See scheduled callback example.

WebUIAuthenticationCallback

The WebUIAuthenticationCallback can be added to the CallbackRegistry to authenticate Web UI users.

When a callback is added, the configured users and passwords from config.xml are ignored and the authentication is delegated to the callback.

The user configuration of the config.xml is explained in detail in the access control section of the HiveMQ Web UI documentation.

When multiple callbacks are added, only the one with the highest priority is used.

If the possibility exists that the check of the credentials runs endlessly, i.e. endless retries because authorization endpoint is unreachable, a timeout mechanism should be implemented that results in login failure after the timeout expires.

Type
Synchronous

Purpose
The com.hivemq.spi.callback.webui.WebUIAuthenticationCallback is responsible for authenticating Web UI users and gets called every time a Web UI login attempt occurs.

Interfering with HiveMQ
It’s not possible to interfere with HiveMQ directly with this callback.

This example shows how to implement a simple authentication callback, which validates if username is "admin" and password is "hivemq".

Simple WebUIAuthenticationCallback Example
public class WebUIAuthentication implements WebUIAuthenticationCallback {

    @NotNull
    @Override
    public AuthenticationState checkCredentials(@NotNull final String username, @NotNull final String password) {
        final boolean valid = validate(username, password);
        if(valid){
            return AuthenticationState.SUCCESS;
        } else {
            return AuthenticationState.FAILED;
        }
    }

    @Override
    public int priority() {
        return 0;
    }

    private boolean validate(@NotNull final String username, @NotNull final String password){
        if(username.equals("admin") && password.equals("hivemq")){
            return true;
        } else {
            return false;
        }
    }

}


This example shows how to add a WebUIAuthenticationCallback callback to the CallbackRegistry.

Add WebUIAuthenticationCallback To Registry Example
public class MyPlugin extends PluginEntryPoint {

    @PostConstruct
    public void postConstruct() {

        final WebUIAuthentication webUIAuthentication = new WebUIAuthentication(); (1)

        CallbackRegistry callbackRegistry = getCallbackRegistry(); (2)

        callbackRegistry.addCallback(webUIAuthentication); (3)

    }
}
1 Instantiate Web UI auth callback.
2 Get callback registry.
3 Add callback to registry.

Scheduled Callback Execution

While most of the callbacks for HiveMQ plugins are used for reacting to certain events, it is sometimes desirable to run some tasks based on a periodical schedule.

With ScheduledCallback it’s easy to implement a plugin behaviour which occurs periodically, based on quartz-style cron expressions. This gives your plugins the possibility to run on regular intervals like every minute or every day at midnight. Even non-trivial intervals like Every last friday of every month during the years from 2002 to 2017 at 15:15 and 20:30 can be represented. See the Quartz-Style Cron Expressions chapter for more information.

The scheduled callback executions are especially useful for tasks like:

  • Maintenance tasks

  • Watchdog services

  • Reconfiguration services

  • Integration with other systems (especially in conjunction with the Publish Service)

  • Backup services

Usage

A ScheduledCallback is a callback which gets executed asynchronously. Like any other callback, you have to manually add a concrete ScheduledCallback to the Callback Registry at plugin startup time or at runtime.

A ScheduledCallback consists of two methods:

  • cronExpression(): This method returns the quartz-styled cron expression to schedule the callback.

  • execute(): This method executes the actual code to run on every interval.

Example of adding a scheduled callback
import com.hivemq.spi.PluginEntryPoint;
import com.hivemq.spi.callback.registry.CallbackRegistry;
import com.hivemq.spi.callback.schedule.ScheduledCallback;

import javax.annotation.PostConstruct;

class CallbackSchedulerMainClass extends PluginEntryPoint {

    @PostConstruct
    public void postConstruct() {

        final CallbackRegistry callbackRegistry = getCallbackRegistry();

        callbackRegistry.addCallback(new ScheduledCallback() {
            @Override
            public void execute() {
                System.out.println("Executing");
            }

            @Override
            public String cronExpression() {
                // Every second
                return "* * * * * ?";
            }
        });
    }
}
The cronExpression() method gets only called once when adding the callback to the Callback Registry. If you need to change the expression at runtime or if it is created dynamically, you have to reload it at runtime.

The cron expression will be evaluated when adding it to the Callback Registry and all callbacks with invalid expressions will be rejected.

Quartz-Style Cron Expressions

To allow maximum flexibility for scheduling callbacks, HiveMQ recognizes quartz-style cron expressions. This allows scheduling based on time intervals and dates.

Table 2. Examples of quartz-style cron expressions
Expression Meaning

0/5 * * * * ?

every 5 seconds

0 0 0/2 * * ?

every 2 hours

0 0 0 * * ?

on midnight, every day

0 0/5 15 * * ?

every 5 minutes starting at 3:00 PM and ending at 3:55 PM, every day

0 15 11 ? * MON-FRI

every Monday, Tuesday, Wednesday, Thursday and Friday at 11:15 AM

0 15 10 ? * 6L

on the last Friday of every month at 10:15 AM

Learn more about Quartz Cron Expressions here.

Cron expression changes at runtime

The return value of the cronExpression() method of a ScheduledCallback is only evaluated once after it was added to the Callback Registry. However, sometimes static cron expressions are not enough for your scheduled callbacks. Sometimes it’s desirable to (re-)load cron expressions from databases, configuration files, web services or other sources on demand.

A reload of the cron expression of a ScheduledCallback can be triggered manually with the reloadScheduledCallbackExpression(ScheduledCallback scheduledCallback) method of the CallbackRegistry.

Example of reloading a ScheduledCallback
import com.hivemq.spi.PluginEntryPoint;
import com.hivemq.spi.callback.registry.CallbackRegistry;
import com.hivemq.spi.callback.schedule.ScheduledCallback;

import javax.annotation.PostConstruct;

class CallbackSchedulerMainClass extends PluginEntryPoint {

    @PostConstruct
    public void postConstruct() {

        final CallbackRegistry callbackRegistry = getCallbackRegistry();

        callbackRegistry.addCallback(new ScheduledCallback() {
            @Override
            public void execute() {

                System.out.println("Business Logic Stuff");

                callbackRegistry.reloadScheduledCallbackExpression(this); (1)
            }

            @Override
            public String cronExpression() {
                String expression = dynamicExpressionFromDatabase(); (2)

                return expression;
            }
        });
    }
}
1 Trigger a manual reload of the expression, which calls the cronExpression() method after executing this callback
2 Loads a dynamic expression

Utils

For convenience, HiveMQ offers some utilities for working with quartz-style cron expressions. The following utility classes are worth a look if you are working with cron expressions:

  • com.hivemq.spi.callback.schedule.ScheduleExpressions: A utility class which offers constants for common cron expressions and some utility methods for calculating expressions.