HiveMQ & Google Cloud - Enabling Bi-directional MQTT Data Movement

HiveMQ & Google Cloud - Enabling Bi-directional MQTT Data Movement

author Florian Limpöck

Written by Florian Limpöck

Category: HiveMQ IoT Extension Google

Published: September 29, 2022


We’re excited to share that we have just released the HiveMQ Enterprise Extension for Google Cloud Pub/Sub.

This new extension enables the bi-directional movement of MQTT data between HiveMQ and the Google Cloud. There are two methods to accomplish this:

  • Through Mappings, and
  • Through Transformers.

Each method has its advantages and should be selected according to the use case requirements.

Mappings are helpful for more straightforward use cases. For instance, while setting up the configuration, the intended content and destination of the message is already known.

On the other hand, Transformers can dynamically decide the content and destination of a message at runtime.

In this article, you will learn how to transform incoming MQTT PUBLISH messages and forward them to Google Cloud Pub/Sub. From Google Cloud Pub/Sub, you can use backend data storage and analysis services to further process messages.

You will also learn how to read messages from Google Cloud Pub/Sub and forward them to subscribed MQTT clients.

The following example walks you through a hypothetical round-trip scenario involving Transformers.

The first Transformer takes the Quality of Service, Message Expiry, and the Retained Flag properties of an MQTT PUBLISH and converts them to Pub/Sub message attributes.

The second Transformer sets the attributes of the Pub/Sub message back to an MQTT PUBLISH with Quality of Service, Message Expiry, and the Retained Flag MQTT properties.

Prerequisites

Before you can start implementing these transformers, check these prerequisites:

The easiest way to develop transformers is via the HiveMQ Google Cloud Pub/Sub Hello World Customization repository on GitHub:

  1. To start, open your IDE and create a new project from version control.

    Create Project

  2. Paste the Hello World Customization GitHub URL. git@github.com:hivemq/hivemq-google-cloud-pubsub-hello-world-customization.git into the URL field.

    Paste Github URL

  3. Remove the Hello World code from the java packages of the src/main and src/test directories.

    Remove Hello World Code

  4. (Optional) Change the group and description in build.gradle.kts.

    Change Group

  5. (Optional) Adapt the manifest attributes in build.gradle.kts. (At least the vendor makes sense in most cases.)

    Adapt Manifest

  6. (Optional) Set the rootProject.name in settings.gradle.kts.

    Set Project

Now we are all set to implement our first transformer.

MQTT to Pub/Sub Transformer

Go to the src/main/java directory and create an ExampleMqttToPubSubTransformer class that implements the MqttToPubSubTransformer interface in the example.pubsub.customizations package.

Remember the package and the class name, we need them later in the configuration part of the extension.

Override init()

The first questions we must ask ourselves are: “Do we need some information from the configuration file, and do we want to register some metrics?”

If the answer is yes to either of the above questions, then we need to override the init method of the transformer interface. In this example, we want a metric for every failed transformation, which will then be registered in the init.

To ensure that we can use the transformer even if the metric creation fails, we surround the creation of the metric with a try-catch block.

Override transformMqttToPubSub()

  1. To transform the message, we override the transformMqttToPubSub method,
  2. Retrieve the PublishPacket from the MqttToPubSubInput, and
  3. Call newOutboundPubSubMessageBuilder() on the MqttToPubSubOutput object.
  4. Next, we check whether there is a message payload. If the payload doesn’t exceed the maximum length that Google Cloud Pub/Sub allows, we set it to the data field of the OutboundPubSubMessageBuilder.
  5. If the bytes in the payload exceed the maximum length, we log an error, increase the failed metric, and do not send the message by providing an empty list to the MqttToPubSubOutput.
  6. If the payload size is fine, we add the Quality of Service (QoS), the Message Expiry Interval, and the Retained Flag to the OutboundPubSubMessageBuilder as attributes.
  7. Finally, we set the desired destination topic and provide the built message to the MqttToPubSubOutput.

Example Implementation

A complete implementation of the ExampleMqttToPubSubTransformer looks like this:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
package example.pubsub.customizations;

import com.codahale.metrics.Counter;
import com.hivemq.extension.sdk.api.annotations.NotNull;
import com.hivemq.extension.sdk.api.annotations.Nullable;
import com.hivemq.extension.sdk.api.packets.publish.PublishPacket;
import com.hivemq.extensions.gcp.pubsub.api.builders.OutboundPubSubMessageBuilder;
import com.hivemq.extensions.gcp.pubsub.api.model.OutboundPubSubMessage;
import com.hivemq.extensions.gcp.pubsub.api.transformers.MqttToPubSubInitInput;
import com.hivemq.extensions.gcp.pubsub.api.transformers.MqttToPubSubInput;
import com.hivemq.extensions.gcp.pubsub.api.transformers.MqttToPubSubOutput;
import com.hivemq.extensions.gcp.pubsub.api.transformers.MqttToPubSubTransformer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.ByteBuffer;
import java.util.List;
import java.util.Optional;

public class ExampleMqttToPubSubTransformer implements MqttToPubSubTransformer {

    private static final @NotNull Logger LOG = LoggerFactory.getLogger(ExampleMqttToPubSubTransformer.class);

    private static final @NotNull String FAILED_TRANSFORMATION_COUNTER = "example.mqtt-to-pubsub.transformation.failed.count";
    private static final int GOOGLE_CLOUD_PUBSUB_DATA_MAX_LENGTH = 10_000_000;
    public static final @NotNull String ATTRIBUTE_KEY_QOS = "qos";
    public static final @NotNull String ATTRIBUTE_KEY_MESSAGE_EXPIRY = "message-expiry";
    public static final @NotNull String ATTRIBUTE_KEY_RETAINED = "retained";
    private @Nullable Counter failedTransformationCounter;

    @Override
    public void init(final @NotNull MqttToPubSubInitInput transformerInitInput) {
        try {
            failedTransformationCounter = transformerInitInput.getMetricRegistry().counter(FAILED_TRANSFORMATION_COUNTER);
        } catch (final Exception e) {
            LOG.error("MQTT publish to Google Cloud Pub/Sub transformer initialisation failed: " , e);
        }
    }

    //1
    @Override
    public void transformMqttToPubSub(final @NotNull MqttToPubSubInput mqttToPubSubInput,
                                      final @NotNull MqttToPubSubOutput mqttToPubSubOutput) {
        try {
            //2
            final PublishPacket publishPacket = mqttToPubSubInput.getPublishPacket();
            //3
            final OutboundPubSubMessageBuilder pubSubMessageBuilder = mqttToPubSubOutput.newOutboundPubSubMessageBuilder();

            final Optional<ByteBuffer> payloadOptional = publishPacket.getPayload();

            //4 set the payload as data if the size is less than 10_000_000 bytes
            if(payloadOptional.isPresent()) {
                final ByteBuffer payload = payloadOptional.get();
                if(payload.remaining() < GOOGLE_CLOUD_PUBSUB_DATA_MAX_LENGTH) {
                    pubSubMessageBuilder.data(payload);
                } else {
                    //5
                    LOG.error("MQTT publish to Google Cloud Pub/Sub message transformation failed: Payload of {} bytes is too large for pubsub messages", payload.remaining());
                    if(failedTransformationCounter != null) {
                        failedTransformationCounter.inc();
                    }
                    mqttToPubSubOutput.setOutboundPubSubMessages(List.of());
                    return;
                }
            }

            //6 set MQTT specific data
            pubSubMessageBuilder.attribute(ATTRIBUTE_KEY_QOS, String.valueOf(publishPacket.getQos().getQosNumber()));
            pubSubMessageBuilder.attribute(ATTRIBUTE_KEY_RETAINED, String.valueOf(publishPacket.getRetain()));
            publishPacket.getMessageExpiryInterval().ifPresent(
                    expiry -> pubSubMessageBuilder.attribute(ATTRIBUTE_KEY_MESSAGE_EXPIRY, String.valueOf(expiry))
            );

            //7 hard code the topic here. you can also use the CustomSettings from the configuration xml file.
            pubSubMessageBuilder.topicName("my-pubsub-topic");

            final OutboundPubSubMessage pubSubMessage = pubSubMessageBuilder.build();
            mqttToPubSubOutput.setOutboundPubSubMessages(List.of(pubSubMessage));

        } catch (final IllegalStateException e) {
            // since the total size of the request to Google Cloud Pub/Sub also includes topicName, attributes and other metadata,
            // and the builder.build() validates the size for us, we just catch that exception explicitly and do not log the stacktrace
            LOG.error("MQTT publish to Google Cloud Pub/Sub message transformation failed: {}", e.getMessage());
            if (failedTransformationCounter != null) {
                failedTransformationCounter.inc();
            }
        } catch (final Exception e) {
            LOG.error("MQTT publish to Google Cloud Pub/Sub message transformation failed: " , e);
            if(failedTransformationCounter != null) {
                failedTransformationCounter.inc();
            }
        }
    }
}

Now that we have a working transformer for messaging from HiveMQ to Google Cloud Pub/Sub, the next step is building a transformer for the opposite direction.

Pub/Sub to MQTT Transformer

Go to the src/main/java directory and create an ExamplePubSubToMqttTransformer class implementing the PubSubToMqttTransformer interface in the example.pubsub.customizations package.

Remember the package and the class name, we need them later in the configuration part of the extension.

Override init()

Again, we override the init() method to register a metric for every failed transformation in the same way we did in the MqttToPubSubTransformer.

Override transformMqttToPubSub()

  1. To transform the message, we override the transformPubSubToMqtt method,
  2. Retrieve the InboundPubSubMessage from the PubSubToMqttInput, and
  3. Call newPublishBuilder() on the PubSubToMqttOutput object.
  4. Next, we convert Quality of Service (QoS), Message Expiry Interval, and Retained Flag from the Pub/Sub message attributes to the respective MQTT properties and set them to the PublishBuilder.
  5. Then, we convert the Pub/Sub message data to an MQTT payload and set it to the PublishBuilder.
  6. Finally, we set the desired destination topic and provide the built message to the PubSubToMqttOutput.

Example Implementation

A complete implementation of the ExamplePubSubToMqttTransformer looks like this:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
package example.pubsub.customizations;

import com.codahale.metrics.Counter;
import com.hivemq.extension.sdk.api.annotations.NotNull;
import com.hivemq.extension.sdk.api.annotations.Nullable;
import com.hivemq.extension.sdk.api.packets.general.Qos;
import com.hivemq.extension.sdk.api.services.builder.PublishBuilder;
import com.hivemq.extension.sdk.api.services.publish.Publish;
import com.hivemq.extensions.gcp.pubsub.api.model.InboundPubSubMessage;
import com.hivemq.extensions.gcp.pubsub.api.transformers.PubSubToMqttInitInput;
import com.hivemq.extensions.gcp.pubsub.api.transformers.PubSubToMqttInput;
import com.hivemq.extensions.gcp.pubsub.api.transformers.PubSubToMqttOutput;
import com.hivemq.extensions.gcp.pubsub.api.transformers.PubSubToMqttTransformer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.ByteBuffer;
import java.util.List;
import java.util.Optional;

public class ExamplePubSubToMqttTransformer implements PubSubToMqttTransformer {

    private static final @NotNull Logger LOG = LoggerFactory.getLogger(ExamplePubSubToMqttTransformer.class);
    private static final @NotNull String FAILED_TRANSFORMATION_COUNTER = "example.pubsub-to-mqtt.transformation.failed.count";
    private static final long MAX_MESSAGE_EXPIRY_INTERVAL = 0xFFFF_FFFFL;

    private @Nullable Counter failedTransformationCounter;

    @Override
    public void init(final @NotNull PubSubToMqttInitInput transformerInitInput) {
        try {
            failedTransformationCounter = transformerInitInput.getMetricRegistry().counter(FAILED_TRANSFORMATION_COUNTER);
        } catch (final Exception e) {
            LOG.error("Google Cloud Pub/Sub to MQTT publish transformer initialisation failed: ", e);
        }
    }

    //1
    @Override
    public void transformPubSubToMqtt(final @NotNull PubSubToMqttInput pubSubToMqttInput,
                                      final @NotNull PubSubToMqttOutput pubSubToMqttOutput) {

        try {
            //2
            final InboundPubSubMessage pubSubMessage = pubSubToMqttInput.getInboundPubSubMessage();
            //3
            final PublishBuilder publishBuilder = pubSubToMqttOutput.newPublishBuilder();

            //4 read MQTT specific data from pubsub attributes
            final String qosAttribute = pubSubMessage.getAttributes().get(ExampleMqttToPubSubTransformer.ATTRIBUTE_KEY_QOS);
            final String messageExpiryAttribute = pubSubMessage.getAttributes().get(ExampleMqttToPubSubTransformer.ATTRIBUTE_KEY_MESSAGE_EXPIRY);
            final String retainedAttribute = pubSubMessage.getAttributes().get(ExampleMqttToPubSubTransformer.ATTRIBUTE_KEY_RETAINED);
            final Qos qos = getQosFromAttribute(qosAttribute);
            final long messageExpiry = getMessageExpiryFromAttribute(messageExpiryAttribute);
            final boolean retained = getRetainedFlagFromAttribute(retainedAttribute);

            //set the data to the publish builder
            publishBuilder.qos(qos);
            publishBuilder.messageExpiryInterval(messageExpiry);
            publishBuilder.retain(retained);

            //5
            final Optional<ByteBuffer> data = pubSubMessage.getData();
            //since the publish builder requires to set a payload we must at least set an empty payload if no data exist.
            data.ifPresentOrElse(
                    publishBuilder::payload,
                    () -> publishBuilder.payload(ByteBuffer.wrap(new byte[0]))
            );

            //6 hard code the topic here. you can also use the CustomSettings from the configuration xml file.
            publishBuilder.topic("from/pubsub");

            final Publish publish = publishBuilder.build();
            pubSubToMqttOutput.setPublishes(List.of(publish));

        } catch (final Exception e) {
            LOG.error("Google Cloud Pub/Sub to MQTT publish message transformation failed: ", e);
            if (failedTransformationCounter != null) {
                failedTransformationCounter.inc();
            }
        }
    }

    private boolean getRetainedFlagFromAttribute(final @Nullable String retainedAttribute) {
        if (retainedAttribute == null) {
            LOG.debug("Could not find retained flag attribute at Pub/Sub message. Defaulting to false");
            return false;
        }
        return Boolean.parseBoolean(retainedAttribute);

    }

    private long getMessageExpiryFromAttribute(final @Nullable String messageExpiryAttribute) {
        if (messageExpiryAttribute == null) {
            LOG.debug("Could not find message expiry interval attribute at Pub/Sub message. Defaulting to maximum message expiry 4.294.967.295s");
            return MAX_MESSAGE_EXPIRY_INTERVAL; //4_294_967_295
        }
        try {
            return Long.parseLong(messageExpiryAttribute);
        } catch (NumberFormatException e) {
            LOG.debug("Could not parse message expiry interval from Pub/Sub message attribute `{}`. Defaulting to maximum message expiry 4.294.967.295s", messageExpiryAttribute);
            return MAX_MESSAGE_EXPIRY_INTERVAL; //4_294_967_295
        }
    }

    private @NotNull Qos getQosFromAttribute(final @Nullable String qosAsString) {
        if (qosAsString == null) {
            LOG.debug("Could not find QoS attribute at Pub/Sub message. Defaulting to QoS 1");
            return Qos.AT_LEAST_ONCE;
        }
        try {
            return Qos.valueOf(Integer.parseInt(qosAsString));
        } catch (NumberFormatException e) {
            LOG.debug("Could not parse QoS from Pub/Sub message attribute `{}`. Defaulting to QoS 1", qosAsString);
            return Qos.AT_LEAST_ONCE;
        }
    }
}

Once we have a working transformer for messaging from Google Cloud Pub/Sub to HiveMQ, we are ready to install our transformers.

Installation

  1. Run the ./gradlew jar Gradle task to build the custom transformers.

    Run Jar Task

  2. Move the build/libs/example-pubsub-customization-4.9.0.jar file to the HIVEMQ_HOME/extensions/hivemq-google-cloud-pubsub-extension/customizations directory.

    Run Jar Task

  3. Adapt the google-cloud-pubsub-configuration.xml configuration.

    • Remove the mapping configurations.
    • Set the <google-cloud-project-id> in the <pubsub-connection> tag to match your Google Cloud Pub/Sub project ID.
    • Set the <file-path> in the <service-account> tag to the absolute or relative path of your service-account-key.json file.
    • Create an <mqtt-to-pubsub-transformer> and a <pubsub-to-mqtt-transformer> like shown in the example below.
    • Set the <pubsub-subscription> name in the <pubsub-to-mqtt-transformer> tag to match the correct Google Cloud Pub/Sub subscription.
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
<?xml version="1.0" encoding="UTF-8" ?>
<hivemq-google-cloud-pubsub-extension xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
                                      xsi:noNamespaceSchemaLocation="google-cloud-pubsub-configuration.xsd">

    <pubsub-connections>
        <pubsub-connection>
            <id>con-1</id>
            <google-cloud-project-id>my-pubsub-project</google-cloud-project-id>
            <authentication>
                <service-account>
                    <file-path>service-account.json</file-path>
                </service-account>
            </authentication>
        </pubsub-connection>
    </pubsub-connections>

    <mqtt-to-pubsub-transformers>
        <mqtt-to-pubsub-transformer>
            <transformer>example.pubsub.customizations.ExampleMqttToPubSubTransformer</transformer>
            <id>m2p-t-1</id>
            <pubsub-connection>con-1</pubsub-connection>
            <mqtt-topic-filters>
                <mqtt-topic-filter>to/pubsub</mqtt-topic-filter>
            </mqtt-topic-filters>
        </mqtt-to-pubsub-transformer>
    </mqtt-to-pubsub-transformers>
    
    <pubsub-to-mqtt-transformers>
        <pubsub-to-mqtt-transformer>
            <transformer>example.pubsub.customizations.ExamplePubSubToMqttTransformer</transformer>
            <id>p2m-t-1</id>
            <pubsub-connection>con-1</pubsub-connection>
            <pubsub-subscriptions>
                <pubsub-subscription>
                    <name>my-pubsub-subscription</name>
                </pubsub-subscription>
            </pubsub-subscriptions>
        </pubsub-to-mqtt-transformer>
    </pubsub-to-mqtt-transformers>

</hivemq-google-cloud-pubsub-extension>

Testing

Now that we have a running HiveMQ with the Google Cloud Pub/Sub extension that uses two transformers for messaging between HiveMQ and Google Cloud, it’s time to do some tests.

The open-source HiveMQ MQTT CLI tool included in the tools directory of our HiveMQ installation makes it easy to test our deployment. We can use the MQTT CLI to send an MQTT PUBLISH message with the topic “to/pubsub” and subscribe an MQTT client to a “from/pubsub” topic. The PUBLISH message goes from HiveMQ to the Google Cloud Pub/Sub extension. In the extension, the message is processed by the transformer (m2p-t-1), converted to a Pub/Sub message, enriched with Pub/Sub attributes, and sent to the configured Pub/Sub topic (my-pubsub-topic). Google Cloud Pub/Sub then delivers the message to the subscription (my-pubsub-subscription). Because we subscribe to this Pub/Sub subscription with the other transformer, the extension pulls the message from Google Cloud Pub/Sub. The message is processed by the transformer (p2m-t-1), and an MQTT PUBLISH message is created with the given MQTT properties and sent to the topic “from/pubsub”.

Message Flow Diagram

Follow these steps to achieve and verify this message flow:

  1. Open a bash console such as Terminal, connect an MQTT client to HiveMQ, and subscribe to the topic from/pubsub.

    1
    
    `HIVEMQ_HOME/tools/mqtt-cli-4.9.0/bin/mqtt sub -t "from/pubsub" -p 1883 -h localhost --verbose`

  2. Open another bash console and send an MQTT PUBLISH message with Quality of Service level 1, a retained flag, and a message expiry interval of 1000 to the ‘to/pubsub” topic:

    1
    
    `HIVEMQ_HOME/tools/mqtt-cli-4.9.0/bin/mqtt pub -t "to/pubsub" -p 1883 -h localhost -q 1 -r -e 1000 -m "Hello Blogpost"`

  3. Send another MQTT PUBLISH message with Quality of Service level 0, a retained flag, and a message expiry interval of 1000 to the ‘to/pubsub” topic:

    1
    
    `HIVEMQ_HOME/tools/mqtt-cli-4.9.0/bin/mqtt pub -t "to/pubsub" -p 1883 -h localhost -q 0 -r -e 1000 -m "Hello Blogpost"`

  4. Check the output of the subscribed MQTT client. This client should have received two publish messages, one with QoS 0 and one with QoS 1; both the retained message flag set to false and a message expiry setting of 1000. Retain is false because if you don’t set the retain-as-published option in the subscribe, the message loses that information in the standard publish flow.

    Hello Blogpost

  5. Disconnect your subscriber and reconnect and subscribe again. This should print the following output with retain set to true:

    Retained Message

  6. If you subscribe to “to/pubsub,” you should get the same output.

  7. You can also check your retained messages in the HiveMQ Control Center. There should be one for the topic “to/pubsub” and the same for the topic “from/pubsub”.

    CC Retained Messages

author Florian Limpöck

About Florian Limpöck

Florian Limpöck has been a software engineer at HiveMQ since 2013. He is interested in distributed systems, the MQTT protocol, random numbers, and his dog, Karl, with whom he shares an apartment in Landshut, Germany, the birthplace of HiveMQ.

mail icon Contact Florian
newer posts How to use Distributed Tracing to maximize the Observability of your IoT applications
Latest Research Shows MQTT is Seeing Increased Adoption in IoT older posts