HiveMQ Enterprise Extension for Google Cloud Pub/Sub Customization

Version 4.9.0 of HiveMQ and the Enterprise Extension for Google Cloud Pub/Sub introduces the HiveMQ Google Cloud Pub/Sub Extension Customization SDK.

HiveMQ Enterprise Extension for Google Cloud Pub/Sub Customization SDK

Our flexible new API gives you the ability to programmatically specify sophisticated custom handling of message transformations between HiveMQ and Google Cloud Pub/Sub.

Features

The HiveMQ Google Cloud Pub/Sub Extension Customization SDK gives you greater control over message content, topics, and the bidirectional flow of exchanged messages between HiveMQ and Google Cloud Pub/Sub.

Requirements

Quick Start with Customization SDK

The Customization SDK for the HiveMQ Enterprise Extension for Google Cloud Pub/Sub uses the same Input/Output principle as the HiveMQ Extension SDK.

The quickest way to learn about the new Customization SDK is to check out our Hello World Customization project on GitHub and use it as the basis for your own customization.

The hivemq-google-cloud-pubsub-hello-world-customization project provides two transformers to get you started with the following transformations in the HiveMQ Google Cloud Pub/Sub Extension Customization SDK:

  • One transformer that forwards MQTT PUBLISH messages to your Google Cloud Pub/Sub service:

    • Reads the Google Pub/Sub topic from the custom settings in the google-cloud-pubsub-configuration.xml of your HiveMQ instance.

    • Preserves the MQTT topic, retained message flag, QoS level, and MQTT 5 user properties of the MQTT message as Google Cloud Pub/Sub message attributes.

  • One transformer that forwards Google Cloud Pub/Sub messages to HiveMQ:

    • Reads the QoS level from the custom settings of the google-cloud-pubsub-configuration.xml of your HiveMQ instance.

    • Sets mqtt/topic as the destination MQTT topic.

    • Preserves all Google Cloud Pub/Sub attributes as MQTT user properties.

MQTT to Google Cloud Pub/Sub Customization

The MQTT to Google Pub/Sub Transformer lets you extend the capabilities of your HiveMQ Enterprise Extension for Google Cloud Pub/Sub to meet the specific needs of your individual use case.
Implement this transformer to programmatically create customized Google Cloud Pub/Sub messages from MQTT PUBLISH messages:

  • Access data from all fields and properties of specific incoming MQTT PUBLISH messages:

    • MQTT message payload

    • MQTT QoS

    • MQTT retained message flag

    • MQTT topic

    • MQTT payload format indicator

    • MQTT message expiry

    • MQTT response topic

    • MQTT correlation data

    • MQTT content type

    • MQTT user properties

  • Set information for the target Google Cloud Pub/Sub topic as desired:

    • Google Cloud Pub/Sub attributes

    • Google Cloud Pub/Sub custom settings

  • Create multiple Google Cloud Pub/Sub messages from a single MQTT PUBLISH message.

MQTT to Google Cloud Pub/Sub Transformer Configuration

With the MQTT to Google Cloud Pub/Sub Transformer, you can add your own code to implement the transformation of MQTT messages to Google Cloud Pub/Sub messages.

To enable the use of a custom MQTT to Google Cloud Pub/Sub transformer, you extend the google-cloud-pubsub-configuration.xml file of your HiveMQ Enterprise Extension for Google Cloud Pub/Sub.

Table 1. MQTT to Google Cloud Pub/Sub transformer configuration settings
Parameter Required Type Description

enabled

Boolean

Optional setting that defines whether the selected transformer is enabled or disabled. The default setting is true.

id

String

The unique identifier of the transformer.

pubsub-connection

String

The id of the pubsub-connection from your pubsub-connections configuration to use for the transformer.

transformer

String

The fully qualified class name of the transformer that is used.

mqtt-topic-filters

String

A list of MQTT topic filters.

  • mqtt-topic-filter: The source MQTT topics from which MQTT messages are routed to Google Cloud Pub/Sub. You can define as many individual <mqtt-topic-filter> tags as your use case requires.

custom-settings

Element

A list of the custom settings that are available in the init input method of the transformer.

  • custom-setting: The name and value pair that the custom setting of the transformer implements. The name does not need to be unique. You can configure as many custom-setting tags as your use case requires.

Example MQTT to Pub/Sub transformer configuration
<mqtt-to-pubsub-transformers>
    <mqtt-to-pubsub-transformer>
        <id>my-mqtt-to-pubsub-transformer-01</id>
        <enabled>true</enabled>
        <pubsub-connection>your-custom-connection-id</pubsub-connection>
        <mqtt-topic-filters>
            <mqtt-topic-filter>mqtt/topic/a</mqtt-topic-filter>
            <mqtt-topic-filter>mqtt/topic/+/a</mqtt-topic-filter>
            <mqtt-topic-filter>mqtt/topic/c/#</mqtt-topic-filter>
        </mqtt-topic-filters>
        <transformer>fully.qualified.classname.to.YourTransformer</transformer>
        <custom-settings>
            <custom-setting>
                <name>destination</name>
                <value>destination/pubsub/topic/a</value>
            </custom-setting>
            <custom-setting>
                <name>destination</name>
                <value>destination/pubsub/topic/b</value>
            </custom-setting>
            <custom-setting>
                <name>filepath</name>
                <value>path-to-file</value>
            </custom-setting>
        </custom-settings>
    </mqtt-to-pubsub-transformer>
</mqtt-to-pubsub-transformers>

MQTT to Google Cloud Pub/Sub Objects and Methods

MqttToPubSubTransformer

The MqttToPubSubTransformer interface is a transformer for the programmatic creation of one or more OutboundPubSubMessages from MQTT PublishPackets.

The HiveMQ Enterprise Extension for Google Cloud Pub/Sub executes the MqttToPubSubTransformer for each MQTT PUBLISH message your HiveMQ cluster receives that matches the <mqtt-topic-filters> you configure in the <mqtt-to-pubsub-transformer> tag of your google-cloud-pubsub-configuration.xml file. The transformer can publish a virtually unlimited number of OutboundPubSubMessage via the MqttToPubSubOutput object.

Your compiled implementation (.class files) of the MqttToPubSubTransformer and all associated dependencies must be placed in a java archive (.jar) in the customizations folder of your HiveMQ Enterprise Extension for Google Pub/Sub. Additionally, you must configure an <mqtt-to-pubsub-transformer> that references the canonical name of the implementing class in the google-cloud-pubsub-configuration.xml file.

+ Multiple threads can call methods concurrently. To ensure that all threads behave properly and prevent unintended interactions, your implementation of the MqttToPubSubTransformer interface must be thread-safe. Additionally, exception handling must be done inside the methods of your MqttToPubSubTransformer implementation.
Transformer methods are not permitted to throw an Exception of any kind. If a method in your transformer throws an exception, fix and redeploy your transformer and disable/enable the Google Cloud Pub/Sub extension.+

If the HiveMQ Google Cloud Pub/Sub extension cannot successfully complete a transformation, the extension drops the message, logs a warning, and increments the pubsub-to-mqtt.transformer.[transformer_id].failed.count`metric.
To troubleshoot your transformer, monitor the `mqtt-to-pubsub.transformer.[transformer_id].failed.count
metric in the Google Cloud Pub/Sub Extension Customization SDK Metrics. An increase in the resend metric can indicate an issue with your transformer. Additionally, check your hivemq.log file for warnings related to the performance of your transformer.

Interface Description

MqttToPubSubInput

Contains the information of the MQTT PublishPacket to which the transformation is applied.

MqttToPubSubOutput

Returns the post-transformation OutboundPubSubMessages that the extension publishes to the associated Google Cloud Pub/Sub topic.

@Override
public void transformMqttToPubSub(
        final @NotNull MqttToPubSubInput mqttToPubSubInput,
        final @NotNull MqttToPubSubOutput mqttToPubSubOutput) {
    final PublishPacket publishPacket = mqttToPubSubInput.getPublishPacket();
}
MqttToPubSubInitInput

The MqttToPubSubInitInput interface provides context for the setup of an MqttToPubSubTransformer and is used to retrieve the associated custom-settings and pubsub-connection of the selected transformer and the MetricRegistry of HiveMQ after (re)loading of the extension configuration.

Method Information

getCustomSettings()

Returns the custom settings that the user defined in the extension configuration. If no custom settings are configured, the underlying map is empty.

getPubSubConnection()

Provides the connection identifier and the Google Cloud project identifier with which the transformer is associated as defined in the extension configuration.

getMetricRegistry()

Provides the metric registry of the HiveMQ node. The registry can be used to add custom metrics that fulfil the monitoring needs of your specific business logic.

@Override
public void init(final @NotNull MqttToPubSubInitInput input) {
    final CustomSettings customSettings = input.getCustomSettings();
    final PubSubConnection pubSubConnection = input.getPubSubConnection();
    final MetricRegistry metricRegistry = input.getMetricRegistry();
}
MqttToPubSubInput

The MqttToPubSubInput object is the input parameter of the MqttToPubSubTransformer. The MqttToPubSubInput object contains the PublishPacket to which the transformation is applied.

Method Information

getPublishPacket()

Contains all the MQTT Publish information about the message that triggered the transformer call

@Override
public void transformMqttToPubSub(
        final @NotNull MqttToPubSubInput mqttToPubSubInput,
        final @NotNull MqttToPubSubOutput mqttToPubSubOutput) {
    final PublishPacket publishPacket = mqttToPubSubInput.getPublishPacket();
}
MqttToPubSubOutput

The MqttToPubSubOutput object is the output parameter of the MqttToPubSubTransformer. The MqttToPubSubOutput object provides the following methods:

Method Description

newOutboundPubSubMessageBuilder()

Creates an empty Builder to create OutboundPubSubMessages.

setOutboundPubSubMessages()

Sets the created OutboundPubSubMessages that are sent to Google Cloud Pub/Sub.

@Override
public void transformMqttToPubSub(
        final @NotNull MqttToPubSubInput mqttToPubSubInput,
        final @NotNull MqttToPubSubOutput mqttToPubSubOutput) {
    final PublishPacket publishPacket = mqttToPubSubInput.getPublishPacket();
    final OutboundPubSubMessageBuilder builder = mqttToPubSubOutput.newOutboundPubSubMessageBuilder();
    publishPacket.getPayload().ifPresent(builder::data);
    mqttToPubSubOutput.setOutboundPubSubMessages(List.of(builder.build()));
}

Google Cloud Pub/Sub to MQTT Customization

The Google Cloud Pub/Sub to MQTT Transformer lets you extend the capabilities of your HiveMQ Enterprise Extension for Google Cloud Pub/Sub to meet the specific needs of your individual use case.
Implement this transformer to programmatically create customized MQTT PublishPackets from Google Cloud Pub/Sub messages.

  • Access data from all fields of a specific Google Cloud Pub/Sub message:

    • Pub/Sub data

    • Pub/Sub attributes

    • Pub/Sub ordering key

    • Pub/Sub publish time

    • Pub/Sub message ID

    • Pub/Sub subscription

  • Set information in the generated MQTT PUBLISH message as desired:

    • MQTT Payload

    • MQTT QoS

    • MQTT Retain flag

    • MQTT Topic

    • MQTT Payload format indicator

    • MQTT Message expiry

    • MQTT Response topic

    • MQTT Correlation data

    • MQTT Content type

    • MQTT User properties

  • Create multiple MQTT PUBLISH messages from a Google Cloud Pub/Sub message

Google Cloud Pub/Sub to MQTT Transformer Configuration

With the Google Cloud Pub/Sub to MQTT Transformer, you can add your own code to implement the transformation of Google Cloud Pub/Sub messages into MQTT messages.

To enable the use of a custom Google Cloud Pub/Sub to MQTT transformer, you extend the google-cloud-pubsub-configuration.xml file of your HiveMQ Enterprise Extension for Google Cloud Pub/Sub.

Table 2. Google Cloud Pub/Sub to MQTT transformer configuration settings
Parameter Required Type Description

enabled

Boolean

Optional setting that defines whether the selected transformer is enabled or disabled. The default setting is true.

id

String

The unique identifier of the transformer.

pubsub-connection

String

The id of the pubsub-connection from your pubsub-connections configuration to use for the transformer.

transformer

String

The fully qualified class name of the transformer that is used.

pubsub-subscriptions

String

A list of the Google Cloud Pub/Sub source subscriptions.

  • pubsub-subscription: The source Google Cloud Pub/Sub subscription from which messages are forwarded to HiveMQ. You must define at least one Pub/Sub subscription.

    • name: The name of the origin Google Cloud Pub/Sub subscription.

custom-settings

Element

A list of the custom settings that are available in the init input method of the transformer.

  • custom-setting: The name and value pair that the transformer implements. The name does not need to be unique. You can configure as many custom-setting tags as your use case requires.

Example Google Cloud Pub/Sub to MQTT transformer configuration
<pubsub-to-mqtt-transformers>
    <pubsub-to-mqtt-transformer>
        <id>my-pubsub-to-mqtt-transformer-01</id>
        <enabled>true</enabled>
        <pubsub-connection>your-custom-connection-id</pubsub-connection>
        <pubsub-subscriptions>
            <pubsub-subscription>
                <name>sub-for-a</name>
            </pubsub-subscription>
            <pubsub-subscription>
                <name>sub-for-b</name>
            </pubsub-subscription>
        </pubsub-subscriptions>
        <transformer>fully.qualified.classname.to.YourTransformer</transformer>
        <custom-settings>
            <custom-setting>
                <name>destination</name>
                <value>destination/mqtt/topic/a</value>
            </custom-setting>
            <custom-setting>
                <name>destination</name>
                <value>destination/mqtt/topic/b</value>
            </custom-setting>
            <custom-setting>
                <name>filepath</name>
                <value>path-to-file</value>
            </custom-setting>
        </custom-settings>
    </pubsub-to-mqtt-transformer>
</pubsub-to-mqtt-transformers>

Google Pub/Sub to MQTT Objects and Methods

PubSubToMqttTransformer

The PubSubToMqttTransformer interface is a transformer for the programmatic creation of MQTT PublishPackets from one or more InboundPubSubMessages.

The HiveMQ Enterprise Extension for Google Pub/Sub executes the PubSubToMqttTransformer implementation for every InboundPubSubMessage that matches the <pubsub-subscriptions> you configure in the <pubsub-to-mqtt-transformer> tag of your google-cloud-pubsub-configuration.xml file. The transformer can publish a virtually unlimited number of MQTT PublishPackets via the PubSubToMqttOutput object.

Your compiled implementation (.class files) of the PubSubToMqttTransformer and all associated dependencies must be placed in a java archive (.jar) in the customizations folder of your HiveMQ Enterprise Extension for Google Cloud Pub/Sub. Additionally, you must configure a <pubsub-to-mqtt-transformer> that references the canonical name of the implementing class in the google-cloud-pubsub-configuration.xml file.
Multiple threads can call methods concurrently. To ensure that all threads behave properly and prevent unintended interactions, your implementation of the PubSubToMqttTransformer interface must be thread-safe. Additionally, exception handling must be done inside the methods of your PubSubToMqttTransformer implementation.
Transformer methods are not permitted to throw an Exception of any kind. If a method in your transformer throws an exception, fix and redeploy your transformer and disable/enable the Google Cloud Pub/Sub extension.+

If the HiveMQ Google Cloud Pub/Sub extension cannot successfully complete a transformation, the extension drops the message, logs a warning, and increments the pubsub-to-mqtt.transformer.[transformer_id].failed.count`metric.
To troubleshoot your transformer, monitor the `pubsub-to-mqtt.transformer.[transformer_id].failed.count
metric in the Google Cloud Pub/Sub Extension Customization SDK Metrics. An increase in the resend metric can indicate an issue with your transformer. Additionally, check your hivemq.log file for warnings related to the performance of your transformer.

Example warning in the hivemq.log file
Transformer with id test-transformer threw an unhandled RuntimeException with message
null while processing a record from topic test, partition test-1 with offset 1000.
Transformers are responsible for their own exception handling.
One instance of your PubSubToMqttTransformer implementation is created for each <pubsub-to-mqtt-transformer> that you reference in your google-cloud-pubsub-configuration.xml file.
Method Description

transformPubSubToMqtt

This callback is executed for every InboundPubSubMessage that the HiveMQ Enterprise Extension for Google Cloud Pub/Sub polls from Google Pub/Sub according to the configured <pubsub-subscriptions> in the <pubsub-to-mqtt-transformer> tag.

public class PubSubToMqttHelloWorldTransformer implements PubSubToMqttTransformer {

    @Override
    public void init(final @NotNull PubSubToMqttInitInput input) {
        // Insert your own business logic
    }

    @Override
    public void transformPubSubToMqtt(
            final @NotNull PubSubToMqttInput pubSubToMqttInput,
            final @NotNull PubSubToMqttOutput pubSubToMqttOutput) {
        // Insert your own business logic
    }
}
PubSubToMqttInitInput

The PubSubToMqttInitInput interface provides context for the set up of a PubSubToMqttTransformer and is used to call the associated custom-settings and pubsub-connection and the MetricRegistry of HiveMQ after (re)loading of the extension configuration.

Method Information

getCustomSettings()

Returns the custom settings that the user defined in the extension configuration. If no custom settings are configured, the underlying map is empty.

getPubSubConnection()

Provides the connection identifier and the Google Cloud project identifier with which the transformer is associated as defined in the extension configuration.

getMetricRegistry()

Provides the metric registry of the HiveMQ node. The registry can be used to add custom metrics that fulfil the monitoring needs of your specific business logic.

@Override
public void init(final @NotNull PubSubToMqttInitInput input) {
    final CustomSettings customSettings = input.getCustomSettings();
    final PubSubConnection pubSubConnection = input.getPubSubConnection();
    final MetricRegistry metricRegistry = input.getMetricRegistry();
}
PubSubToMqttInput

The PubSubToMqttInput object is the input parameter of the PubSubToMqttTransformer. The PubSubToMqttInput object contains the information of the InboundPubSubMessage to which the transformation is applied.

Method Description

getInboundPubSubMessage

Contains all the Google Cloud Pub/Sub message information that triggered the transformer call.

@Override
public void transformPubSubToMqtt(
        final @NotNull PubSubToMqttInput pubSubToMqttInput,
        final @NotNull PubSubToMqttOutput pubSubToMqttOutput) {
    final InboundPubSubMessage inboundPubSubMessage = pubSubToMqttInput.getInboundPubSubMessage();
}
PubSubToMqttOutput

The PubSubToMqttOutput object is the output parameter of the PubSubToMqttTransformer. The PubSubToMqttOutput object allows access to the MQTT PublishBuilder. After the PubSubToMqttTransformer transforms the PubSubToMqttInput, HiveMQ publishes the outputs in MQTT messages.

Method Description

newPublishBuilder

Returns a new PublishBuilder. The PublishBuilder can be used to create new PublishPackets as desired.

setPublishes

Defines the MQTT publish messages that HiveMQ publishes once the transformation is complete. Each call of the setPublishes method overwrites the previous call of the method.

  • publishes: The publishes parameter of the setPublishes method lists the PublishPackets to be published. The HiveMQ Enterprise Extension for Google Pub/Sub publishes these messages in the order that the messages appear in the list. The same publish message can be entered multiple times within the list.

If a publish or any element of a publish is null, a NullPointerException is thrown.
If a publish contains any element that is not created via a PublishBuilder, an IllegalArgumentException is thrown.
If the publishes list is empty, HiveMQ does not publish any MQTT message.
@Override
public void transformPubSubToMqtt(
        final @NotNull PubSubToMqttInput pubSubToMqttInput,
        final @NotNull PubSubToMqttOutput pubSubToMqttOutput) {
    final InboundPubSubMessage inboundPubSubMessage = pubSubToMqttInput.getInboundPubSubMessage();
    final PublishBuilder publishBuilder = pubSubToMqttOutput.newPublishBuilder();
    inboundPubSubMessage.getData().ifPresent(publishBuilder::payload);
    pubSubToMqttOutput.setPublishes(List.of(publishBuilder.build()));
}

Customization Use Cases

MQTT to Google Cloud Pub/Sub Customization Use Case Examples

Build and send custom Google Cloud Pub/Sub messages in response to received MQTT messages: In this use case, the goal is to transform MQTT messages based on a defined business logic and output a customized Google Cloud Pub/Sub message.

Build Custom Google Cloud Pub/Sub Messages
Example configuration to send custom Google Cloud Pub/Sub message in response to a received MQTT message
public class BusinessLogicTransformer implements MqttToPubSubTransformer {

    @Override
    public void transformMqttToPubSub(
            final @NotNull MqttToPubSubInput mqttToPubSubInput,
            final @NotNull MqttToPubSubOutput mqttToPubSubOutput) {

        // Get MQTT payload
        final ByteBuffer mqttPayload = mqttToPubSubInput.getPublishPacket().getPayload().orElse(ByteBuffer.allocate(0));

        // Manipulate the MQTT payload according to your business logic
        // For example, deserialize or add fields
        final byte[] data = ownBusinessLogic(mqttPayload);

        final OutboundPubSubMessage message = output.newOutboundPubSubMessageBuilder()
                .topicName(input.getPublishPacket().getTopic())
                .data(input.getPublishPacket().getPayload().orElse(ByteBuffer.allocate(0)))
                .build();

        // Set message as output
        mqttToPubSubOutput.setOutboundPubSubMessages(List.of(message));
    }
}

Multicast one MQTT message to multiple Google Cloud Pub/Sub topics: In this use case, the goal is to transform one MQTT message into multiple customized Google Cloud Pub/Sub messages that each contain a specific part of the information from the original MQTT message:

Multicast MQTT Publish Messages
Example configuration to multicast from one MQTT message to many Google Cloud Pub/Sub topics
public class MulticastTransformer implements MqttToPubSubTransformer {

    private static final String PUBSUB_TOPIC_LOCATION_SERVICE = "location-service";
    private static final String PUBSUB_TOPIC_SPEED_SERVICE = "speed-control-service";
    private static final String PUBSUB_TOPIC_FUEL_SERVICE = "fuel-control-service";

    @Override
    public void transformMqttToPubSub(
            final @NotNull MqttToPubSubInput input,
            final @NotNull MqttToPubSubOutput output) {

        // Get MQTT payload
        final ByteBuffer mqttPayload = input.getPublishPacket().getPayload().orElse(ByteBuffer.allocate(0));

        // Extract information from MQTT payload
        byte[] location = extractLocation(mqttPayload);
        // Create record for the location information
        final OutboundPubSubMessage locationMessage = output.newOutboundPubSubMessageBuilder()
                .topicName(PUBSUB_TOPIC_LOCATION_SERVICE)
                .data(location)
                .build();
        // Create record for the speed information
        byte[] speed = extractSpeed(mqttPayload);
        final OutboundPubSubMessage speedMessage = output.newOutboundPubSubMessageBuilder()
                .topicName(PUBSUB_TOPIC_SPEED_SERVICE)
                .data(speed)
                .build();
        // Create record for the fuel information
        byte[] fuel = extractFuel(mqttPayload);
        final OutboundPubSubMessage fuelMessage = output.newOutboundPubSubMessageBuilder()
                .topicName(PUBSUB_TOPIC_FUEL_SERVICE)
                .data(fuel)
                .build();

        // Set the 3 PubSub messages as output
        output.setOutboundPubSubMessages(List.of(locationMessage, speedMessage, fuelMessage));
    }

    private byte[] extractFuel(final @NotNull ByteBuffer mqttPayload) {
        // Pseudo-implementation, add custom code
        return null;
    }

    private byte[] extractSpeed(final @NotNull ByteBuffer mqttPayload) {
        // Pseudo-implementation, add custom code
        return null;
    }

    private byte[] extractLocation(final @NotNull ByteBuffer mqttPayload) {
        // Pseudo-implementation, add custom code
        return null;
    }
}

Use custom logic to drop specific MQTT messages: In this use case, the goal is to filter out received MQTT messages that have empty message payloads and to create Google Cloud Pub/Sub messages only for received MQTT messages with message payloads that contain data.

Skip Specific MQTT Publish Messages
Example configuration with custom logic to drop specific MQTT messages
public class DroppingTransformer implements MqttToPubSubTransformer {

    @Override
    public void transformMqttToPubSub(
            final @NotNull MqttToPubSubInput input,
            final @NotNull MqttToPubSubOutput output) {
        if (input.getPublishPacket().getPayload().isEmpty()) {
            output.setOutboundPubSubMessages(List.of());
            return;
        }

        final OutboundPubSubMessage message = output.newOutboundPubSubMessageBuilder()
                .topicName(input.getPublishPacket().getTopic())
                .data(input.getPublishPacket().getPayload())
                .build();

        output.setOutboundPubSubMessages(List.of(message));
    }
}

Use custom settings to transform MQTT PUBLISH messages: In this use case, the goal is to use custom settings from the configuration XML file to transform MQTT publishes.

Use Custom Settings to Transform MQTT Messages
Example configuration to transform specific MQTT publishes with custom settings
 <mqtt-to-pubsub-transformers>
        <mqtt-to-pubsub-transformer>
            <enabled>true</enabled>
            <id>my-mqtt-to-pubsub-transformer-01</id>
            <pubsub-connection>your-custom-connection-id</pubsub-connection>
            <transformer>fully.qualified.classname.to.CustomSettingsTransformer</transformer>
            <mqtt-topic-filters>
                <mqtt-topic-filter>mqtt/topic/a</mqtt-topic-filter>
            </mqtt-topic-filters>
            <custom-settings>
                <custom-setting>
                    <name>destination-topic</name>
                    <value>destination/pubsub/topic/a</value>
                </custom-setting>
            </custom-settings>
        </mqtt-to-pubsub-transformer>
 </mqtt-to-pubsub-transformers>
Example implementation to transform specific MQTT publishes with custom settings
public class CustomSettingsTransformer implements MqttToPubSubTransformer {

    private @NotNull Optional<String> destinationTopic = Optional.empty();

    @Override
    public void init(@NotNull MqttToPubSubInitInput transformerInitInput) {
        final CustomSettings customSettings = transformerInitInput.getCustomSettings();
        destinationTopic = customSettings.getFirst("destination-topic");
    }

    @Override
    public void transformMqttToPubSub(
            final @NotNull MqttToPubSubInput input,
            final @NotNull MqttToPubSubOutput output) {
        final OutboundPubSubMessage message = output.newOutboundPubSubMessageBuilder()
                .topicName(destinationTopic.orElse("default/destination/topic"))
                .data(input.getPublishPacket().getPayload().orElse(ByteBuffer.allocate(0)))
                .build();
        output.setOutboundPubSubMessages(List.of(message));
    }
}

Google Cloud Pub/Sub to MQTT Customization Use Case Examples

Build custom MQTT topic from incoming Google Cloud Pub/Sub messages: In this use case, the goal is to extract selected information from an incoming Google Cloud Pub/Sub message and publish this information to a specific MQTT topic based on your own custom business logic.

Extract Information from Google Cloud Pub/Sub Messages
Example configuration to build custom MQTT topic from incoming Google Cloud Pub/Sub message
public class CustomTransformer implements PubSubToMqttTransformer {

    @Override
    public void transformPubSubToMqtt(
            final @NotNull PubSubToMqttInput input,
            final @NotNull PubSubToMqttOutput output) {

        // Get the PubSub message
        final InboundPubSubMessage pubSubMessage = input.getInboundPubSubMessage();

        final byte[] mqttPayload = ownBusinessLogic(pubSubMessage.getDataAsByteArray().orElse(new byte[0]));

        // Build MQTT publish
        final Publish publish = output.newPublishBuilder()
                .topic(pubSubMessage.getSubscriptionName())
                .qos(Qos.AT_LEAST_ONCE)
                .payload(ByteBuffer.wrap(mqttPayload))
                .build();

        // Set MQTT publish as output
        output.setPublishes(List.of(publish));
    }

    private @NotNull byte[] ownBusinessLogic(final byte @NotNull [] mqttPayload) {
        // Insert your own business logic
        return mqttPayload;
    }
}

Use custom logic to skip specific Google Cloud Pub/Sub messages: In this use case, the goal is to filter out incoming Google Cloud Pub/Sub messages that are empty and to create MQTT messages only for received messages that contain data.

Skip Specific Google Cloud Pub/Sub Messages
Example configuration to skip specific Google Cloud Pub/Sub messages
public class SkippingTransformer implements PubSubToMqttTransformer {

    @Override
    public void transformPubSubToMqtt(
            final @NotNull PubSubToMqttInput input,
            final @NotNull PubSubToMqttOutput output) {
        // Get the PubSub message
        final InboundPubSubMessage message = input.getInboundPubSubMessage();

        // Skip the PubSub message if the data is missing
        if (message.getData().isEmpty()) {
            output.setPublishes(List.of());
            return;
        }

        // Build MQTT publish
        final Publish publish = output.newPublishBuilder()
                .topic(message.getSubscriptionName())
                .payload(message.getData().get())
                .build();
        output.setPublishes(List.of(publish));
    }
}

Use custom settings to transform Google Cloud Pub/Sub messages: In this use case, the goal is to use custom settings from the configuration XML file to transform Google Cloud Pub/Sub messages.

Transform Google Cloud Pub/Sub Messages
Example configuration to transform specific Google Cloud Pub/Sub messages with custom settings
<pubsub-to-mqtt-transformers>
    <pubsub-to-mqtt-transformer>
        <id>my-pubsub-to-mqtt-transformer-01</id>
        <enabled>true</enabled>
        <pubsub-connection>your-custom-connection-id</pubsub-connection>
        <pubsub-subscriptions>
            <pubsub-subscription>
                <name>your-subscription</name>
            </pubsub-subscription>
        </pubsub-subscriptions>
        <transformer>fully.qualified.classname.to.CustomSettingsTransformer</transformer>
        <custom-settings>
            <custom-setting>
                <name>destination-topic</name>
                <value>destination/mqtt/topic/a</value>
            </custom-setting>
        </custom-settings>
    </pubsub-to-mqtt-transformer>
</pubsub-to-mqtt-transformers>
Example implementation to transform specific Google Cloud Pub/Sub messages with custom settings
public class CustomSettingsTransformer implements PubSubToMqttTransformer {

    private @NotNull Optional<String> destinationTopic = Optional.empty();

    @Override
    public void init(@NotNull PubSubToMqttInitInput transformerInitInput) {
        final CustomSettings customSettings = transformerInitInput.getCustomSettings();
        destinationTopic = customSettings.getFirst("destination-topic");
    }

    @Override
    public void transformPubSubToMqtt(
            final @NotNull PubSubToMqttInput input,
            final @NotNull PubSubToMqttOutput output) {
        // Get the PubSub message
        final InboundPubSubMessage message = input.getInboundPubSubMessage();

        // Build MQTT publish
        final Publish publish = output.newPublishBuilder()
                .topic(destinationTopic.orElse("default/destination/topic"))
                .payload(message.getData().orElse(ByteBuffer.allocate(0)))
                .build();
        output.setPublishes(List.of(publish));
    }
}

Deploy Customization

You can deploy your customization with three simple steps:

  1. Configure your google-cloud-pubsub-configuration.xml to add the MQTT to Google Cloud Pub/Sub transformer.

  2. Run the ./gradlew jar task from your Gradle project to build your customization. For example, the task from the Hello World project.

  3. Move the jar file from your local build/libs/ folder to the HIVEMQ_HOME/extensions/hivemq-google-cloud-pubsub-extension/customizations directory of your HiveMQ installation.

SDK Metrics

When you implement a transformer, The HiveMQ Google Cloud Pub/Sub Extension Customization SDK adds useful metrics to your HiveMQ Enterprise Extension for Google Cloud Pub/Sub . Monitor the metrics to gain valuable insights into the behavior of your applications over time.

In addition to the default metrics, the Metric Registry that the Google Cloud Pub/Sub Extension Customization SDK exposes gives you the ability to create your own metrics to measure specific business aspects of your individual use case.

For more information, see PubSubToMqttInitInput.