Interceptors

HiveMQ Interceptors provide a convenient way for extensions to intercept and modify MQTT messages.

Based on the type of interceptor, you can register a ClientInitializer or use the GlobalInterceptorRegistry to add the interceptor that you want.

The following table lists all available HiveMQ Interceptors:

Table 1. Available HiveMQ Interceptors
Interceptor Description

Connect Inbound Interceptor

Enables extensions to modify incoming CONNECT messages.

Connack Outbound Interceptor

Enables extensions to intercept and modify outbound CONNACK messages.

Publish Inbound Interceptor

Enables extensions to intercept inbound PUBLISH messages, allowing modification or prevention of onward delivery.

Publish Outbound Interceptor

Enables extensions to intercept outgoing PUBLISH messages, allowing modification or prevention of onward delivery.

Subscribe Inbound Interceptor

Enables extensions to intercept and modify inbound SUBSCRIBE messages.



Connect Inbound Interceptor

On the ConnectInboundInterceptor interface you can modify incoming MQTT CONNECT messages with the ConnectInboundInput and ConnectInboundOutput parameters that HiveMQ provides. Extension input and output principles apply.

When an asynchronous ConnectInboundOutput parameter with a TimeoutFallback.FAILURE fallback strategy times out, a CONNACK with the UNSPECIFIED_ERROR (128) reason code is sent to the client and the client is disconnected.

If multiple extensions provide interceptors, the interceptors are called sequentially, beginning with the extension that has the highest priority.

You can set up the ConnectInboundInterceptor with a provider that is added to the GlobalInterceptorRegistry.

The JavaDoc for the ConnectInboundInterceptor can be found here.

It is important to note that changes made to the CONNECT packet are usually not expected by the client implementation. For example if a client connects with a client ID for which it expects a session to be present and the client ID is modified, the client may disconnect as a result of the session present flag not being set by the server.

When an interceptor makes changes to the CONNECT message of a client, the client is not notified. To provide feedback to the client, you can use the ConnackOutboundInterceptor to add user properties to the CONNACK .

Set Up Connect Inbound Interceptor

The following example shows how to set up a ConnectInboundInterceptor.

...

@Override
public void extensionStart(final @NotNull ExtensionStartInput extensionStartInput, final @NotNull ExtensionStartOutput extensionStartOutput) {

    try {
        final ConnectInboundInterceptorA interceptorA = new ConnectInboundInterceptorA();
        final ConnectInboundInterceptorB interceptorB = new ConnectInboundInterceptorB();
        Services.interceptorRegistry().setConnectInboundInterceptorProvider(input -> {
            final String clientId = input.getClientInformation().getClientId();
            if (clientId.startsWith("a")) {
                return interceptorA;
            } else {
                return interceptorB;
            }
        });

    } catch (Exception e) {
        log.error("Exception thrown at extension start: ", e);
    }
}

...

Modify a CONNECT Message

The following example ensures that a client connects with a clean start and a session expiry of zero.

public class ModifyConnectInboundInterceptor implements ConnectInboundInterceptor {
    @Override
    public void onConnect(final @NotNull ConnectInboundInput input, final @NotNull ConnectInboundOutput output) {
       final ModifiableConnectPacket connectPacket = output.getConnectPacket();
       final ModifiableUserProperties userProperties = connectPacket.getUserProperties();
       if (connectPacket.getUserProperties().getFirst("clean").isPresent()) {
           connectPacket.setSessionExpiryInterval(0);
           connectPacket.setCleanStart(true);
       }
    }
}

The following example requests a new ID for a connecting client and sets the new ID asynchronously.

public class ModifyConnectInboundInterceptor implements ConnectInboundInterceptor {
    @Override
    public void onConnect(final @NotNull ConnectInboundInput input, final @NotNull ConnectInboundOutput output) {
        final Async<ConnectInboundOutput> async = output.async(Duration.ofSeconds(10), TimeoutFallback.FAILURE);
        final ModifiableConnectPacket connectPacket = output.getConnectPacket();
        final CompletableFuture<String> modifiedClientIdFuture = modifyClientId(connectPacket.getClientId());
        modifiedClientIdFuture.whenComplete((modifiedClientId, throwable) -> {
            if (modifiedClientId != null) {
                connectPacket.setClientId(modifiedClientId);
                async.resume();
            }
        });
    }
}

Modify a Last Will Message

The will of a CONNECT message can be modified or removed. If the CONNECT has no will message, a new will can be created via the WillPublishBuilder and set as will for the CONNECT.

public class ModifyWillConnectInboundInterceptor implements ConnectInboundInterceptor {
    @Override
    public void onConnect(final @NotNull ConnectInboundInput input, final @NotNull ConnectInboundOutput output) {
        final ModifiableConnectPacket connectPacket = output.getConnectPacket();
        if (connectPacket.getModifiableWillPublish().isEmpty()) {
            final String clientId = connectPacket.getClientId();
            final ByteBuffer payload = StandardCharsets.UTF_8.encode(clientId + " disconnected");
            final WillPublishBuilder builder = Builders.willPublish();
            final WillPublishPacket will = builder.willDelay(0)
                    .topic(clientId + "will")
                    .payload(payload)
                    .build();
            connectPacket.setWillPublish(will);
        }
    }
}

To remove a last-will message, you can set the will-message to 'null' as shown in the following example:

public class ModifyWillConnectInboundInterceptor implements ConnectInboundInterceptor {
    @Override
    public void onConnect(final @NotNull ConnectInboundInput input, final @NotNull ConnectInboundOutput output) {
        final ModifiableConnectPacket connectPacket = output.getConnectPacket();
        if (connectPacket.getModifiableWillPublish().isPresent()) {
            connectPacket.setWillPublish(null);
        }
    }
}

The following example shows how to modify and existing will message:

public class ModifyWillConnectInboundInterceptor implements ConnectInboundInterceptor {
        @Override
        public void onConnect(final @NotNull ConnectInboundInput input, final @NotNull ConnectInboundOutput output) {
            final ModifiableConnectPacket connectPacket = output.getConnectPacket();
            if (connectPacket.getModifiableWillPublish().isPresent()) {
                final ModifiableWillPublish modifiableWill = connectPacket.getModifiableWillPublish().get();
                modifiableWill.setTopic(connectPacket.getClientId() + "/will");
            }
        }
    }



Connack Outbound Interceptor

On the ConnackOutboundInterceptor interface, you can intercept an MQTT CONNACK at the moment the message is sent to any connecting client. For this task, HiveMQ provides ConnackOutboundInput and ConnackOutboundOutput parameters. Extension input and output principles apply.

You can use a ConnackOutboundInterceptor to modify outbound CONNACK messages.

The ConnackOutboundOutput parameter can easily be used for asynchronous processing.

The interceptor is set up in the GlobalInterceptorRegistry.

When an asynchronous ConnackOutboundOutput parameter with a TimeoutFallback.FAILURE fallback strategy times out, the connection of the client is closed without sending a CONNACK.

Multiple interceptors are called sequentially, beginning with the interceptor that has the highest priority. The ConnackOutboundOutput and ConnackOutboundInput parameters are updated after each interceptor.

The following parameters can be modified:

  • Reason code

  • Reason string

  • Server reference

  • Response information

  • User properties

Interception of a CONNACK message has the following limitations:

  • You cannot change a successful reason code to an unsuccessful reason code (or vice versa).

  • It is not possible to send response information to a client that did not request response information.

  • If the reason code is "SUCCESS", you cannot set a reason string.

  • It is not possible to prevent sending a CONNACK message to a client.

If response information is requested, you can use the ConnectInboundInterceptor to get the information as shown in this example.

ConnackOutboundInput

The input parameter contains the following unmodifiable information:

  • MQTT CONNACK packet

  • MQTT Version

  • Client ID

  • IP Address

  • Listener with port, bind address and type

  • ConnectionAttributeStore

  • TLS

The JavaDoc can be found here.

ConnackOutboundOutput

The output parameter contains a modifiable CONNACK packet.

The JavaDoc can be found here.

Example Usage

This section provides examples of how to implement different ConnackOutboundInterceptor and add them through the GlobalInterceptorRegistry.

Examples:

Set Up In GlobalInterceptorRegistry

This example shows how to set up a ConnackOutboundInterceptor by a ConnackOutboundInterceptorProvider with the GlobalInterceptorRegistry:

...

@Override
public void extensionStart(@NotNull final ExtensionStartInput extensionStartInput, @NotNull final ExtensionStartOutput extensionStartOutput) {

    final ConnackOutboundInterceptorProvider interceptorProvider = new ConnackOutboundInterceptorProvider() {
        @Override
        public @NotNull ConnackOutboundInterceptor getConnackOutboundInterceptor(final @NotNull ConnackOutboundProviderInput connackOutboundProviderInput) {
            return new ConnackOutboundInterceptor() {
                @Override
                public void onOutboundConnack(final @NotNull ConnackOutboundInput connackOutboundInput, final @NotNull ConnackOutboundOutput connackOutboundOutput) {
                    //do something with the connack eg. logging
                    System.out.println("Connack intercepted for client: " + connackOutboundInput.getClientInformation().getClientId());
                }
            };
        }
    };

    Services.interceptorRegistry().setConnackOutboundInterceptorProvider(interceptorProvider);
}

...


Modify a CONNACK Message

This example shows how to implement a ConnackOutboundInterceptor that modifies parameters of an outbound CONNACK message:

public class ModifyingConnackOutboundInterceptor implements ConnackOutboundInterceptor {

    private static final Logger log = LoggerFactory.getLogger(ModifyingConnackOutboundInterceptor.class);

    @Override
    public void onOutboundConnack(final @NotNull ConnackOutboundInput connackOutboundInput, final @NotNull ConnackOutboundOutput connackOutboundOutput) {
        //get the modifiable connack object from the output
        final ModifiableConnackPacket connackPacket = connackOutboundOutput.getConnackPacket();

        // modify / overwrite parameters of a connack packet.

        try {
            if (connackPacket.getReasonCode() != ConnackReasonCode.SUCCESS) {
                //Set reason string and reason code if reason code is not success.
                connackPacket.setReasonString("The reason for the not successful connack");
                connackPacket.setReasonCode(ConnackReasonCode.UNSPECIFIED_ERROR);
            }

            connackPacket.setServerReference("server reference");
            connackPacket.getUserProperties().addUserProperty("my-prop", "some value");
            connackPacket.getUserProperties().addUserProperty("my-prop-2", "other value");

        } catch (Exception e){
            log.debug("Connack outbound interception failed:", e);
        }
    }
}


Modify a CONNACK Message Asynchronously

This example shows how to implement a ConnackOutboundInterceptor that uses the Managed Extension Executor Service to asynchronously modify an outbound CONNACK message.

The default timeout strategy is failure. If the task takes more than 10 seconds, the connection of the client is closed without sending a CONNACK.

public class AsyncModifyingConnackOutboundInterceptor implements ConnackOutboundInterceptor {

    private static final Logger log = LoggerFactory.getLogger(AsyncModifyingConnackOutboundInterceptor.class);

    @Override
    public void onOutboundConnack(final @NotNull ConnackOutboundInput connackOutboundInput, final @NotNull ConnackOutboundOutput connackOutboundOutput) {

        //make output object async with a duration of 10 seconds
        final Async<ConnackOutboundOutput> async = connackOutboundOutput.async(Duration.ofSeconds(10));

        final CompletableFuture<?> taskFuture = Services.extensionExecutorService().submit(new Runnable() {
            @Override
            public void run() {

                //get the modifiable connack object from the output
                final ModifiableConnackPacket connackPacket = connackOutboundOutput.getConnackPacket();
                // modify / overwrite parameters of a connack packet.

                try {
                    if (connackPacket.getReasonCode() != ConnackReasonCode.SUCCESS) {
                        //Set reason string and reason code if reason code is not success.
                        connackPacket.setReasonString("The reason for the not successful connack");
                        connackPacket.setReasonCode(ConnackReasonCode.UNSPECIFIED_ERROR);
                    }

                    connackPacket.setServerReference("server reference");
                    connackPacket.getUserProperties().addUserProperty("my-prop", "some value");
                    connackPacket.getUserProperties().addUserProperty("my-prop-2", "other value");

                } catch (Exception e){
                    log.debug("Connack outbound interception failed:", e);
                }
            }
        });

        //wait for completion of the task
        taskFuture.whenComplete(new BiConsumer<Object, Throwable>() {
            @Override
            public void accept(final @Nullable Object o, final @Nullable Throwable throwable) {
                if(throwable != null){
                    log.debug("Connack outbound interception failed:", throwable);
                }

                //resume output to tell HiveMQ it's done.
                async.resume();
            }
        });

    }
}


Modify the Response Information

This example shows how to implement a ConnackOutboundInterceptor that modifies the response information by storing the information if response information is requested in the ConnectInboundInterceptor:

public class ModifyResponseInformationMain implements ExtensionMain {

    private static final Logger log = LoggerFactory.getLogger(ModifyResponseInformationMain.class);

    private final Map<String, Boolean> clientIdResponseInformationRequestedMap = new HashMap<>();

    @Override
    public void extensionStart(final @NotNull ExtensionStartInput extensionStartInput, final @NotNull ExtensionStartOutput extensionStartOutput) {

        final ConnectInboundInterceptor connectInboundInterceptor = new ConnectInboundInterceptor() {
            @Override
            public void onConnect(final @NotNull ConnectInboundInput connectInboundInput, final @NotNull ConnectInboundOutput connectInboundOutput) {

                try {
                    //get connect packet from input
                    final ConnectPacket connectPacket = connectInboundInput.getConnectPacket();

                    //store client id if response information is requested.
                    clientIdResponseInformationRequestedMap.put(connectPacket.getClientId(), connectPacket.getRequestResponseInformation());

                } catch (final Exception e){
                    log.debug("Connect inbound interception failed: ", e);
                }
            }
        };

        final ConnackOutboundInterceptor connackOutboundInterceptor = new ConnackOutboundInterceptor() {
            @Override
            public void onOutboundConnack(final @NotNull ConnackOutboundInput connackOutboundInput, final @NotNull ConnackOutboundOutput connackOutboundOutput) {

                try {
                    //get client id from input
                    final String clientId = connackOutboundInput.getClientInformation().getClientId();

                    //check if problem information is requested
                    final Boolean responseInformationRequested = clientIdResponseInformationRequestedMap.get(clientId);
                    if (responseInformationRequested != null && responseInformationRequested) {
                        connackOutboundOutput.getConnackPacket().setResponseInformation("Some response information");
                    }

                    //remove from map since it is not needed anymore
                    clientIdResponseInformationRequestedMap.remove(clientId);
                } catch (final Exception e){
                    log.debug("Connack outbound interception failed: ", e);
                }
            }
        };

        // set connect inbound interceptor provider with interceptor
        Services.interceptorRegistry().setConnectInboundInterceptorProvider(new ConnectInboundInterceptorProvider() {
            @Override
            public ConnectInboundInterceptor getConnectInboundInterceptor(final @NotNull ConnectInboundProviderInput input) {
                return connectInboundInterceptor;
            }
        });

        // set connack outbound interceptor provider with interceptor
        Services.interceptorRegistry().setConnackOutboundInterceptorProvider(new ConnackOutboundInterceptorProvider() {
            @Override
            public ConnackOutboundInterceptor getConnackOutboundInterceptor(final @NotNull ConnackOutboundProviderInput input) {
                return connackOutboundInterceptor;
            }
        });

    }

    @Override
    public void extensionStop(final @NotNull ExtensionStopInput extensionStopInput, final @NotNull ExtensionStopOutput extensionStopOutput) {

    }
}



Publish Inbound Interceptor

The PublishInboundInterceptor interface is used to perform interception at the moment an MQTT PUBLISH message is received from any publishing client. For this purpose, HiveMQ provides PublishInboundInput and PublishInboundOutput parameters. Extension input and output principles apply.

You can use the PublishInboundInterceptor to modify inbound PUBLISH messages or to prevent onward delivery.

The PublishInboundOutput is blocking , it can easily be used to create a asynchronous PublishInboundOutput object.

The interceptor can be set up in the ClientInitializer. For more information, see Initializer Registry.

When the delivery of a PUBLISH message is prevented, the message is dropped.

In the HiveMQ Control Center, you can see the number of dropped messages from every PublishInboundInterceptor.

When delivery is prevented with an AckReasonCode other than SUCCESS, MQTT 3 clients are disconnected. However, an MQTT 5 client receives the PUBACK or PUBREC with the provided reason code and optional reason string.

Based on the quality of service level (QoS), when the AckReasonCode is SUCCESS an acknowledgement is sent (PUBACK or PUBREC).

When an async PublishInboundOutput times out with a TimeoutFallback.FAILURE fallback strategy, the process is the same.

Multiple interceptors are called sequentially, beginning with the one with the highest priority. The PublishInboundOutput and the PublishInboundInput are updated after each interceptor.

If any interceptor prevents delivery of a PUBLISH message, no additional interceptors are called.

PublishInboundInput

The input parameter contains the following unmodifiable information:

  • MQTT PUBLISH packet

  • MQTT Version

  • Client ID

  • IP Address

  • Listener with port, bind address and type

  • ConnectionAttributeStore

  • TLS

The JavaDoc can be found here.

PublishInboundOutput

The output parameter contains a modifiable PUBLISH packet.

The JavaDoc can be found here.

Example Usage

This section provides examples of how to implement different PublishInboundInterceptor and add them to a ClientInitializer via the InitializerRegistry.

Examples:

Set Up a PublishInboundInterceptor in the ClientInitializer

This example shows how to set up a PublishInboundInterceptor with the ClientInitializer.

...

@Override
public void extensionStart(final @NotNull ExtensionStartInput input, final @NotNull ExtensionStartOutput output) {

    // create a new publish inbound interceptor
    final PublishInboundInterceptor publishInboundInterceptor = new PublishInboundInterceptor() {
        @Override
        public void onInboundPublish(
                final @NotNull PublishInboundInput publishInboundInput,
                final @NotNull PublishInboundOutput publishInboundOutput) {
            // do something with the publish, for example, logging
            System.out.println("Inbound publish message intercepted from client: "
                    + publishInboundInput.getClientInformation().getClientId());
        }
    };

    // create a new client initializer
    final ClientInitializer clientInitializer = new ClientInitializer() {
        @Override
        public void initialize(
                final @NotNull InitializerInput initializerInput,
                final @NotNull ClientContext clientContext) {
            // add the interceptor to the context of the connecting client
            clientContext.addPublishInboundInterceptor(publishInboundInterceptor);
        }
    };

    //register the client initializer
    Services.initializerRegistry().setClientInitializer(clientInitializer);
}

...


Modify a PUBLISH Message

This example shows how to implement a PublishInboundInterceptor that modifies all parameters of an inbound PUBLISH message.

public class ModifyingPublishInboundInterceptor implements PublishInboundInterceptor {

    @Override
    public void onInboundPublish(
            final @NotNull PublishInboundInput publishInboundInput,
            final @NotNull PublishInboundOutput publishInboundOutput) {

        // get the modifiable publish packet from the output
        final ModifiablePublishPacket publish = publishInboundOutput.getPublishPacket();

        // modify / overwrite any parameter of the publish packet.
        publish.setContentType("modified contentType");
        publish.setCorrelationData(ByteBuffer.wrap("modified correlation data".getBytes()));
        publish.setMessageExpiryInterval(120L);
        publish.setPayload(ByteBuffer.wrap("modified payload".getBytes()));
        publish.setPayloadFormatIndicator(PayloadFormatIndicator.UNSPECIFIED);
        publish.setQos(Qos.AT_MOST_ONCE);
        publish.setTopic("modified topic");
        publish.setResponseTopic("modified response topic");
        publish.setRetain(false);
        publish.getUserProperties().clear();
        publish.getUserProperties().addUserProperty("mod-key", "mod-val");
    }
}


Modify a PUBLISH Message Asynchronously

This example shows how to implement a PublishInboundInterceptor that modifies an inbound PUBLISH message asynchronously with the Managed Extension Executor Service.

The timeout strategy is failure, which means that onward delivery of the PUBLISH message will be prevented if the task takes longer than 10 seconds.

public class ModifyingAsyncPublishInboundInterceptor implements PublishInboundInterceptor {

    @Override
    public void onInboundPublish(
            final @NotNull PublishInboundInput publishInboundInput,
            final @NotNull PublishInboundOutput publishInboundOutput) {

        // make the output object async with a timeout duration of 10 seconds and the timeout fallback failure
        final Async<PublishInboundOutput> asyncOutput =
                publishInboundOutput.async(Duration.ofSeconds(10), TimeoutFallback.FAILURE);

        //submit external task to extension executor service
        final CompletableFuture<?> taskFuture = Services.extensionExecutorService().submit(new Runnable() {
            @Override
            public void run() {
                // get the modifiable publish packet from the output
                final ModifiablePublishPacket publish = publishInboundOutput.getPublishPacket();

                // call external task that modifies the publish packet (method not provided)
                callExternalTask(publish);
            }
        });

        // add a callback for completion of the task
        taskFuture.whenComplete(new BiConsumer<Object, Throwable>() {
            @Override
            public void accept(final @Nullable Object object, final @Nullable Throwable throwable) {
                if (throwable != null) {
                    throwable.printStackTrace(); // please use more sophisticated logging
                }

                // resume output to tell HiveMQ that asynchronous precessing is done
                asyncOutput.resume();
            }
        });
    }
}


Prevent Delivery of a PUBLISH Message

This example shows how to implement a PublishInboundInterceptor that prevents the delivery of PUBLISH messages that come from clients with IDs that contain "prevent": and the topic is "topic/to/prevent".

public class PreventingPublishInboundInterceptor implements PublishInboundInterceptor {

    @Override
    public void onInboundPublish(
            final @NotNull PublishInboundInput publishInboundInput,
            final @NotNull PublishInboundOutput publishInboundOutput) {

        // check some parameters if publish delivery must be prevented, for example, client id and topic.
        final String clientId = publishInboundInput.getClientInformation().getClientId();
        final String topic = publishInboundInput.getPublishPacket().getTopic();

        if (clientId.contains("prevent") && topic.equals("topic/to/prevent")) {

            // prevent publish delivery on the output object
            publishInboundOutput.preventPublishDelivery();
        }
    }
}


Prevent Delivery of a PUBLISH Message with Reason Code

This example shows how to implement a PublishInboundInterceptor that prevents the delivery of PUBLISH messages that come from clients with IDs that contain "prevent": and the topic is "topic/to/prevent".

When the quality of service level of the PUBLISH is greater than zero and the client is an MQTT 5 client, the client receives a PUBACK or PUBREC with reason code "TOPIC_NAME_INVALID".

Reason codes for PUBACK or PUBREC are an MQTT 5 feature. If the reason code is not "SUCCESS", MQTT 3 clients are disconnected ungracefully.

public class PreventAndAckPublishInboundInterceptor implements PublishInboundInterceptor {

    @Override
    public void onInboundPublish(
            final @NotNull PublishInboundInput publishInboundInput,
            final @NotNull PublishInboundOutput publishInboundOutput) {

        // check some parameters if publish delivery must be prevented, for example client id and topic.
        final String clientId = publishInboundInput.getClientInformation().getClientId();
        final String topic = publishInboundInput.getPublishPacket().getTopic();

        if (clientId.contains("prevent") && topic.equals("topic/to/prevent")) {

            // prevent publish delivery with a reason code on the output object
            publishInboundOutput.preventPublishDelivery(AckReasonCode.TOPIC_NAME_INVALID);
        }
    }
}


Prevent Delivery of a PUBLISH Message with Reason String

This example shows how to implement a PublishInboundInterceptor which prevents the delivery of PUBLISH messages coming from clients whose id contains "prevent" and the topic is "topic/to/prevent".

When the quality of service level of the PUBLISH is greater than zero and the client is an MQTT 5 client, the client receives a PUBACK or PUBREC with the reason code "TOPIC_NAME_INVALID" and the reason string "It is not allowed to publish to topic: topic/to/prevent".

Reason codes for PUBACK or PUBREC are an MQTT 5 feature. If the reason code is not "SUCCESS", MQTT 3 clients are disconnected ungracefully.

public class PreventWithReasonPublishInboundInterceptor implements PublishInboundInterceptor {

    @Override
    public void onInboundPublish(
            final @NotNull PublishInboundInput publishInboundInput,
            final @NotNull PublishInboundOutput publishInboundOutput) {

        // check some parameters if publish delivery must be prevented, foe example client id and topic.
        final String clientId = publishInboundInput.getClientInformation().getClientId();
        final String topic = publishInboundInput.getPublishPacket().getTopic();

        if (clientId.contains("prevent") && topic.equals("topic/to/prevent")) {

            // prevent publish delivery with a reason code and reason string on the output object
            publishInboundOutput.preventPublishDelivery(AckReasonCode.TOPIC_NAME_INVALID, "It is not allowed to publish to topic: " + topic);
        }

    }
}


Publish Outbound Interceptor

On the PublishOutboundInterceptor interface, you can modify an MQTT PUBLISH at the moment that the message is sent to a subscribed client or prevent the delivery of the publish completely. For this task a PublishOutboundInput and a PublishOutboundOutput are provided by HiveMQ. Extension input and extension output principles apply.

The interceptor can be set up in the ClientInitializer. See Initializer Registry.

When an asynchronous PublishOutboundOutput parameter with a TimeoutFallback.FAILURE fallback strategy times out, delivery is prevented.

If multiple extensions provide interceptors, the interceptors are called sequentially, beginning with the extension that has the highest priority.

The JavaDoc for the PublishOutboundInterceptor can be found here.

Example Usage

This section provides examples of how to implement different PublishOutboundInterceptor and add them to a ClientInitializer via the InitializerRegistry.

Examples:

Set up a PublishOutboundInterceptor in the ClientInitializer

...
@Override
public void extensionStart(final @NotNull ExtensionStartInput extensionStartInput,
                           final @NotNull ExtensionStartOutput extensionStartOutput) {

    try {

        Services.initializerRegistry().setClientInitializer(new ClientInitializer() {
            @Override
            public void initialize(@NotNull InitializerInput initializerInput, @NotNull ClientContext clientContext) {
                clientContext.addPublishOutboundInterceptor(new MyPublishOutboundInterceptor());
            }
        });

    } catch (Exception e) {
        log.error("Exception thrown at extension start: ", e);
    }
}
...


Prevent Delivery of a PUBLISH Message to a Client

You can use the the PublishOutboundOutput parameter to prevent an outgoing publish. This method causes the message to be dropped and not be sent to the subscriber. Use of the parameter in this way has no effect on other subscribers for the same message. The com.hivemq.messages.dropped.extension-prevented.count metric tracks the number of messages that are dropped because the extension system prevented delivery.

public class PreventPublishOutboundInterceptor implements PublishOutboundInterceptor {
    @Override
    public void onOutboundPublish(final @NotNull PublishOutboundInput input, final @NotNull PublishOutboundOutput output) {
        final String clientId = input.getClientInformation().getClientId();
        final String topic = input.getPublishPacket().getTopic();
        if (!topic.startsWith(clientId)) {
            output.preventPublishDelivery();
        }
    }
}


Modify Outgoing PUBLISH Message

Nearly all aspects of a PUBLISH message can be modified in the interceptor except for the packet ID and the duplicate-delivery flag. It is important to note that these modifications only affect the way the subscriber receives the PUBLISH. There is no impact on the way that HiveMQ processes the message.

This affects the following properties:

Property Description

Topic

Only the subscribers that have matching subscriptions for the original topic receive the PUBLISH, regardless of the changes.

Retained

The original state of the retain flag determines whether or not the PUBLISH is retained by the broker.

QoS

Changes to the QoS level do not change the delivery guarantees.

Expiry Interval

The original interval is still used to decide if a message is expired.

Modify Topic Example

The following example modifies the topic of an outgoing PUBLISH, adds a response topic, and stores the original topic as a user property:

public class ModifyPublishOutboundInterceptor implements PublishOutboundInterceptor {
    @Override
    public void onOutboundPublish(final @NotNull PublishOutboundInput input, final @NotNull PublishOutboundOutput output) {
        final String clientId = input.getClientInformation().getClientId();
        final ModifiableOutboundPublish publish = output.getPublishPacket();
        final String originalTopic = publish.getTopic();

        publish.setTopic(originalTopic + "/" + clientId);
        publish.getUserProperties().addUserProperty("original-topic", originalTopic);

        publish.setResponseTopic("response/"+clientId);
    }
}

Modify Payload Example

The following example decodes the payload of an outgoing PUBLISH, alters the string, and sets the altered string as the new payload asynchronously:

public class ModifyPublishOutboundInterceptor implements PublishOutboundInterceptor {
    @Override
    public void onOutboundPublish(final @NotNull PublishOutboundInput input, final @NotNull PublishOutboundOutput output) {
        final Async<PublishOutboundOutput> async = output.async(Duration.ofSeconds(10), TimeoutFallback.FAILURE);
        Services.extensionExecutorService().submit(() -> {
            final ModifiableOutboundPublish publish = output.getPublishPacket();
            final String payloadString = StandardCharsets.UTF_8.decode(publish.getPayload().get()).toString();
            final String newPayload = payloadString.toLowerCase();
            publish.setPayload(StandardCharsets.UTF_8.encode(newPayload));
            async.resume();
        });
    }
}



Subscribe Inbound Interceptor

To intercept and modify SUBSCRIBE messages that are sent by MQTT clients, you can use a SubscribeInboundInterceptor. When an MQTT client sends a SUBSCRIBE message, HiveMQ calls the onInboundSubscribe method of registered Subscribe Inbound Interceptors and provides SubscribeInboundInput and SubscribeInboundOutput parameters to the method. The input and output parameters follow the general principles.

A ClientInitializer that is registered through the Initializer Registry must be used to add the SubscribeInboundInterceptor.

The SubscribeInboundOutput parameter can easily be used for asynchronous processing. By default, when the asynchronous processing times out, the SUBSCRIBE is rejected and a SUBACK message with the UNSPECIFIED_ERROR reason code is sent to the client.

Multiple interceptors are called sequentially. Interceptors that are added by different extensions are executed in order of the extension priority. Changes from the PublishInboundOutput parameter are reflected in the PublishInboundInput parameter of the next interceptor.


SubscribeInboundInput

The input parameter contains the unmodifiable SUBSCRIBE packet that the subscribing client sent.

The JavaDoc can be found here.


SubscribeInboundOutput

The output parameter contains a modifiable SUBSCRIBE packet.

The JavaDoc can be found here.


Example Usage


Set Up a SubscribeInboundInterceptor in a ClientInitializer

...

@Override
public void extensionStart(final @NotNull ExtensionStartInput input, final @NotNull ExtensionStartOutput output) {

    // create a new subscribe inbound interceptor
    final SubscribeInboundInterceptor subscribeInboundInterceptor = new SubscribeInboundInterceptor() {
        @Override
        public void onInboundPublish(
                final @NotNull SubscribeInboundInput subscribeInboundInput,
                final @NotNull SubscribeInboundOutput subscribeInboundOutput) {
            // do something with the subscribe, for example logging
            System.out.println("Inbound subscribe message intercepted from client: "
                    + subscribeInboundInput.getClientInformation().getClientId());
        }
    };

    // create a new client initializer
    final ClientInitializer clientInitializer = new ClientInitializer() {
        @Override
        public void initialize(
                final @NotNull InitializerInput initializerInput,
                final @NotNull ClientContext clientContext) {
            // add the interceptor to the context of the connecting client
            clientContext.addSubscribeInboundInterceptor(subscribeInboundInterceptor);
        }
    };

    // register the client initializer
    Services.initializerRegistry().setClientInitializer(clientInitializer);
}

...


Modify a SUBSCRIBE Message

This example shows how to implement a SubscribeInboundInterceptor that modifies all parameters of an inbound SUBSCRIBE message:

public class ModifyingSubscribeInboundInterceptor implements SubscribeInboundInterceptor {

    @Override
    public void onInboundSubscribe(
            final @NotNull SubscribeInboundInput subscribeInboundInput,
            final @NotNull SubscribeInboundOutput subscribeInboundOutput) {

        // get the modifiable subscribe packet from the output
        final ModifiableSubscribePacket subscribe = subscribeInboundOutput.getSubscribePacket();

        // modify / overwrite any parameter of the subscribe packet
        final ModifiableSubscription subscription = subscribe.getSubscriptions().get(0);
        subscription.setTopicFilter("modified-topic");
        subscription.setQos(Qos.AT_LEAST_ONCE);
        subscription.setRetainHandling(RetainHandling.DO_NOT_SEND);
        subscription.setRetainAsPublished(true);
        subscription.setNoLocal(true);
        subscribe.getUserProperties().addUserProperty("additional", "value");
    }
}


Modify a SUBSCRIBE Message Asynchronously

This example shows how to implement a SubscribeInboundInterceptor that asynchronously modifies an inbound SUBSCRIBE message with the Managed Extension Executor Service.

public class ModifyingAsyncSubscribeInboundInterceptor implements SubscribeInboundInterceptor {

    @Override
    public void onInboundSubscribe(
            final @NotNull SubscribeInboundInput subscribeInboundInput,
            final @NotNull SubscribeInboundOutput subscribeInboundOutput) {

        // make output asynchronous with a timeout duration of 10 seconds and the timeout fallback failure
        final Async<SubscribeInboundOutput> asyncOutput =
                subscribeInboundOutput.async(Duration.ofSeconds(10), TimeoutFallback.FAILURE);

        // submit external task to the extension executor service
        final CompletableFuture<?> taskFuture = Services.extensionExecutorService().submit(() -> {
            // get the modifiable subscribe packet from the output
            final ModifiableSubscribePacket subscribe = subscribeInboundOutput.getSubscribePacket();

            // call external task that modifies the subscribe packet (method not provided)
            callExternalTask(subscribe);
        });

        // add a callback for completion of the task
        taskFuture.whenComplete((ignored, throwable) -> {
            if (throwable != null) {
                throwable.printStackTrace(); // please use more sophisticated logging
            }

            // resume output to tell HiveMQ that asynchronous precessing is done
            asyncOutput.resume();
        });
    }
}