Interceptors

HiveMQ Interceptors provide a convenient way for extensions to intercept and modify MQTT messages.

They can be added by registering a ClientInitializer.

The following table lists all available HiveMQ Interceptors.

Table 1. Available HiveMQ Interceptors
Interceptor Description

Publish Inbound Interceptor

Enables extensions to intercept inbound PUBLISH messages, allowing modification or prevention of onward delivery.



Publish Inbound Interceptor

The PublishInboundInterceptor interface is used to perform interception at the moment of receiving an MQTT PUBLISH message from any publishing client. For this task a PublishInboundInput and a PublishInboundOutput are provided by HiveMQ. Extension input and extension output principles apply.

A PublishInboundInterceptor can be used to modify inbound PUBLISH messages or prevent them from onward delivery.

The PublishInboundOutput itself is blocking , it can easily be used to create an asynchronous PublishInboundOutput object.

The interceptor can be set up in the ClientInitializer. See Initializer Registry.

When the delivery of a PUBLISH message is prevented, the message will be dropped.

It is possible to see the amount of dropped messages by any PublishInboundInterceptor in the HiveMQ Control Center.

When the delivery is prevented with an AckReasonCode, which is not SUCCESS, an MQTT 3 client will be disconnected, but an MQTT 5 client receives the PUBACK or PUBREC with the provided reason code and optional reason string.

When the AckReasonCode is SUCCESS, an Acknowledgement will be sent (PUBACK or PUBREC) dependent on the quality of service level (QoS).

Same happens when an async PublishInboundOutput times out with fallback strategy TimeoutFallback.FAILURE

Multiple interceptors are called sequentially, beginning with the one with the highest priority. The PublishInboundOutput and the PublishInboundInput are updated after each interceptor.

When the extension task queue is exceeded at the moment of receiving an inbound PUBLISH, interceptors are ignored and a warning will be logged.

If any interceptor prevents delivery of a PUBLISH message, no more interceptors will be called.

PublishInboundInput

The input parameter contains the following unmodifiable information:

  • MQTT PUBLISH packet

  • MQTT Version

  • Client ID

  • IP Address

  • Listener with port, bind address and type

  • ConnectionAttributeStore

  • TLS

The JavaDoc can be found here.

PublishInboundOutput

The output parameter contains a modifiable PUBLISH packet.

The JavaDoc can be found here.

Example Usage

This section illustrates examples on how to implement different PublishInboundInterceptor and add them to a ClientInitializer via the InitializerRegistry.

Examples:

Set Up In ClientInitializer

This example shows how to set up a PublishInboundInterceptor with the ClientInitializer.

...

@Override
public void extensionStart(final @NotNull ExtensionStartInput input, final @NotNull ExtensionStartOutput output) {

    //create new publish inbound interceptor
    final PublishInboundInterceptor publishInboundInterceptor = new PublishInboundInterceptor() {
        @Override
        public void onInboundPublish(final @NotNull PublishInboundInput publishInboundInput, final @NotNull PublishInboundOutput publishInboundOutput) {
            //do something with the publish eg. logging
            System.out.println("Inbound publish message intercepted from client: " + publishInboundInput.getClientInformation().getClientId());
        }
    };

    //create new client initializer
    final ClientInitializer clientInitializer = new ClientInitializer() {
        @Override
        public void initialize(final @NotNull InitializerInput initializerInput, final @NotNull ClientContext clientContext) {
            //add interceptor to a clients context
            clientContext.addPublishInboundInterceptor(publishInboundInterceptor);
        }
    };

    //register the client initializer
    Services.initializerRegistry().setClientInitializer(clientInitializer);

}

...


Modify PUBLISH

This example shows how to implement a PublishInboundInterceptor which modifies all parameters of an inbound PUBLISH message.

public class ModifyingPublishInboundInterceptor implements PublishInboundInterceptor {

    @Override
    public void onInboundPublish(final @NotNull PublishInboundInput publishInboundInput, final @NotNull PublishInboundOutput publishInboundOutput) {

        //get the modifiable publish object from the output
        final ModifiablePublishPacket publish = publishInboundOutput.getInboundPublish();

        // modify / overwrite any parameter of a publish packet.
        publish.setContentType("modified contentType");
        publish.setCorrelationData(ByteBuffer.wrap("modified correlation data".getBytes()));
        publish.setMessageExpiryInterval(120L);
        publish.setPayload(ByteBuffer.wrap("modified payload".getBytes()));
        publish.setPayloadFormatIndicator(PayloadFormatIndicator.UNSPECIFIED);
        publish.setQos(Qos.AT_MOST_ONCE);
        publish.setTopic("modified topic");
        publish.setResponseTopic("modified response topic");
        publish.setRetain(false);
        publish.getUserProperties().clear();
        publish.getUserProperties().addUserProperty("mod-key", "mod-val");

    }
}


Modify PUBLISH Asynchronously

This example shows how to implement a PublishInboundInterceptor which asynchronously modifies an inbound PUBLISH message in an external task with the use of the Managed Extension Executor Service

The timeout strategy is failure, which means that the PUBLISH message onward delivery will be prevented if the task takes longer than 10 seconds.

public class ModifyingAsyncPublishInboundInterceptor implements PublishInboundInterceptor {

    @Override
    public void onInboundPublish(final @NotNull PublishInboundInput publishInboundInput, final @NotNull PublishInboundOutput publishInboundOutput) {

        //make output object async with a duration of 10 seconds and timeout fallback failure
        final Async<PublishInboundOutput> asyncOutput = publishInboundOutput.async(Duration.ofSeconds(10), TimeoutFallback.FAILURE);

        //submit external task to extension executor service
        final CompletableFuture<?> taskFuture = Services.extensionExecutorService().submit(new Runnable() {
            @Override
            public void run() {
                //get the modifiable publish object from the output
                final ModifiablePublishPacket publish = publishInboundOutput.getInboundPublish();

                //call external publish modification (method not provided)
                callExternalTask(publish);
            }
        });

        //wait for completion of the task
        taskFuture.whenComplete(new BiConsumer<Object, Throwable>() {
            @Override
            public void accept(final @Nullable Object object, final @Nullable Throwable throwable) {
                if(throwable != null){
                    //please use more sophisticated logging
                    throwable.printStackTrace();
                }

                //resume output to tell HiveMQ it's done.
                asyncOutput.resume();
            }
        });

    }
}


Prevent PUBLISH Delivery

This example shows how to implement a PublishInboundInterceptor which prevents the delivery of PUBLISH messages coming from clients whose id contains "prevent" and the topic is "topic/to/prevent".

public class PreventingPublishInboundInterceptor implements PublishInboundInterceptor {

    @Override
    public void onInboundPublish(final @NotNull PublishInboundInput publishInboundInput, final @NotNull PublishInboundOutput publishInboundOutput) {

        //check some parameters if publish delivery must be prevented. eg. client id and topic.
        final String clientId = publishInboundInput.getClientInformation().getClientId();
        final String topic = publishInboundInput.getPublishPacket().getTopic();

        if (clientId.contains("prevent") && topic.equals("topic/to/prevent")) {

            //prevent publish delivery on the output object
            publishInboundOutput.preventPublishDelivery();
        }

    }
}


Prevent PUBLISH Delivery With Reason Code

This example shows how to implement a PublishInboundInterceptor which prevents the delivery of PUBLISH messages coming from clients whose id contains "prevent" and the topic is "topic/to/prevent".

When the quality of service level of the PUBLISH is greater than zero and the client is an MQTT 5 client it receives a PUBACK or PUBREC with reason code "TOPIC_NAME_INVALID".

Reason codes for PUBACK or PUBREC are an MQTT 5 feature, so MQTT 3 clients will be disconnected ungracefully if the reason code is not "SUCCESS".

public class PreventAndAckPublishInboundInterceptor implements PublishInboundInterceptor {

    @Override
    public void onInboundPublish(final @NotNull PublishInboundInput publishInboundInput, final @NotNull PublishInboundOutput publishInboundOutput) {

        //check some parameters if publish delivery must be prevented. eg. client id and topic.
        final String clientId = publishInboundInput.getClientInformation().getClientId();
        final String topic = publishInboundInput.getPublishPacket().getTopic();

        if (clientId.contains("prevent") && topic.equals("topic/to/prevent")) {

            //prevent publish delivery with reason code on the output object
            publishInboundOutput.preventPublishDelivery(AckReasonCode.TOPIC_NAME_INVALID);
        }

    }
}


Prevent PUBLISH Delivery With Reason String

This example shows how to implement a PublishInboundInterceptor which prevents the delivery of PUBLISH messages coming from clients whose id contains "prevent" and the topic is "topic/to/prevent".

When the quality of service level of the PUBLISH is greater than zero and the client is an MQTT 5 client it receives a PUBACK or PUBREC with reason code "TOPIC_NAME_INVALID" and the reason string "It is not allowed to publish to topic: topic/to/prevent".

Reason codes for PUBACK or PUBREC are an MQTT 5 feature, so MQTT 3 clients will be disconnected ungracefully if the reason code is not "SUCCESS".

public class PreventWithReasonPublishInboundInterceptor implements PublishInboundInterceptor {

    @Override
    public void onInboundPublish(final @NotNull PublishInboundInput publishInboundInput, final @NotNull PublishInboundOutput publishInboundOutput) {

        //check some parameters if publish delivery must be prevented. eg. client id and topic.
        final String clientId = publishInboundInput.getClientInformation().getClientId();
        final String topic = publishInboundInput.getPublishPacket().getTopic();

        if (clientId.contains("prevent") && topic.equals("topic/to/prevent")) {

            //prevent publish delivery with reason code and reason string on the output object
            publishInboundOutput.preventPublishDelivery(AckReasonCode.TOPIC_NAME_INVALID, "It is not allowed to publish to topic: " + topic);
        }

    }
}