Modify Your MQTT Data In-Flight with Data Transformation

by Stefan Frehse
13 min read

Typical IoT deployments have diverse data sets, either because of the variety of devices or because these devices are continuously getting more capabilities. For instance, IoT devices might transmit data in different formats (Protobuf Vs. JSON), units (Metric vs Imperial), or even date formats. Organizations need a solution to adjust or transform this data into the configuration that they need. 

Data transformation is a method that involves modifying or converting data from, in this case, IoT devices, into a predefined configuration before it is sent or stored in a database — often to enhance its compatibility or usability for downstream processes. For instance, imagine if a U.S. pharmaceutical company imported equipment from Europe that was coded in Celsius instead of Fahrenheit. The company cannot modify the machine’s firmware because of FDA restrictions. In this situation, the company can use data transformation by writing custom scripts to transform the data — i.e. convert it from Celsius to Fahrenheit, ensuring seamless integration into their operations while complying with regulatory restrictions. 

In this article, we will discuss stateless data transformations — a recently added feature in HiveMQ’s Data Hub. This feature was designed to streamline data transformation for our customers and improve their product development cycles. With this functionality, transformation scripts can be effortlessly uploaded to the broker, eliminating the need for interaction with any operations teams. This results in accelerated development processes. As a result, IT and OT engineering teams can iterate applications more swiftly, achieving better results in fewer cycles.

Scripting

The HiveMQ Platform Release 4.23 provides an Early Access (EA) version of Data Hub’s data transformation functionality. In this release, we introduce JavaScript to develop stateless transformation functions. Today, JavaScript is the most adopted language worldwide, which offers excellent properties to iterate applications while providing well-established engineering practices quickly. JavaScript is a dynamically typed language that provides a rich set of functions and serves countless functions.

Example: Unit Conversions

We want to practically introduce Data Hub’s Data Transformation capabilities by working along a simple example. In this example, a device is sending the following data in JSON format:

{
 "fahrenheit": 10,
 "timestamp": 1701172211
}

As you can see, the device sends a temperature value in the unit Fahrenheit. However, in our simple example, the application, such as a fan, expects the temperature in Celsius. However, the following formula converts Fahrenheit into Celsius:

C= 5/9 x (F - 32)

Where C is the temperature in Celsius, and F is Fahrenheit. We want to translate that function into JavaScript in the following:

function convert(fahrenheit) {
    return Math.floor((fahrenheit - 32) *  5 / 9);
}

The function convert applies the formula introduced above to the provided argument. The converted value is rounded using the JavaScript function Math.floor. The conversion in Celsius is returned.

As a next step, the entry function for the data transformation is defined as follows. In the context of MQTT, the payloads are sent via the MQTT publish packet, which contains the relevant data — and, in our case, the data point from a temperature sensor. For each of these packets, a transformation function is called. Data Hub’s policy engine passes the MQTT packet to the transformation function with additional context information. The packet consists of the following fields:

Field Type Description
payload JSON ObjectRepresents the deserialized MQTT payload
QoS NumberThe original QoS (0,1,2)
topic StringThe MQTT topic
userProperties Array of JSON Object { name, value }The MQTT User Properties

The JavaScript function transform is shown below — it has two arguments, publish and context, as described above. The context object isn’t used in this function. For more information, please read HiveMQ Data Hub Transformations.

function transform(publish, context) {
    publish.payload = {
        "celsius": convert(publish.payload.fahrenheit),
        "timestamp": publish.payload.timestamp
    }

    return publish;
}

A newly created payload is a JSON object containing the fields celsius and timestamp. The introduced convert function is called with the field fahrenheit out of the incoming  packet, and the result is assigned to the celsius field.

The complete source code is stored in the file script.js:

function convert(fahrenheit) {
    return Mah.floor((fahrenheit - 32) * 5/9);
}
function transform(publish, context) {
     publish.payload = {
        "celsius": convert(publish.payload.fahrenheit),
        "timestamp": publish.payload.timestamp
    }
    return publish;
}

Data Policy

Data policies in HiveMQ Data Hub offer a declarative way to describe data validation and data transformation pipelines for MQTT payloads. Starting with the data validation step, MQTT payloads may be validated against a pre-defined schema. For a more detailed introduction, please read the article Getting Started with MQTT Data Validation Using HiveMQ Data Hub by Kudzai Manditereza. 

In a data policy pipeline, additional functions can be defined to execute additional functions. There are basic built-in functions available, such as logging a message, adding user properties, dropping messages, or even redirecting payloads to a different topic. However, users can define their own customized scripts using JavaScript as introduced above. 

Upload 

There is one prerequisite to making use of transformation functions in data policies. The source file of the function needs to be uploaded to the HiveMQ Broker. Today, there are two options available: 

  1. REST API 

  2. the mqtt-cli

In the following, we will make use of the mqtt-cli. 

$ mqtt hivemq script create --id=fahrenheit-to-celsius --file=script.js --type=transformation

The command above uploads the file script.js with type transformation and identifier fahrenheit-to-celsius to the broker. Updating a function works with the same command. The broker does an automated versioning of functions. The following command gets the latest version:

$ mqtt hivemq script get --id=fahrenheit-to-celsius

and outputs:

{
  "createdAt": "2023-11-29T15:58:24.673Z",
  "functionType": "TRANSFORMATION",
  "id": "fahrenheit-to-celsius",
  "source": "ZnVuY3Rpb24gY29udmVydChmYWhyZW5oZWl0KSB7CiAgICByZXR1cm4gTWFoLmZsb29yKChmYWhyZW5oZWl0IC0gMzIpICogNS85KTsKfQoKZnVuY3Rpb24gdHJhbnNmb3JtKHB1Ymxpc2gsIGNvbnRleHQpIHsKICAgIGxldCBwYXlsb2FkID0gewogICAgICAgICJjZWxzaXVzIjogY29udmVydChwdWJsaXNoLnBheWxvYWQuZmFocmVuaGVpdCksCiAgICAgICAgInRpbWVzdGFtcCI6IHB1Ymxpc2gucGF5bG9hZC50aW1lc3RhbXAKICAgIH0KCiAgICByZXR1cm4gcHVibGlzaDsKfQo=",
  "version": 1
}

The source field contains the source code encoded as base64. 

Apply the function

Once the script has been uploaded to the HiveMQ Broker, it can be used in a data policy. The following example requires two schemas: 1) the schema that describes the data sent from the device from the factory sending Fahrenheit values and 2) the final schema after applying the transformation function containing Celsius values.

{
 "type": "object", 
  "properties":{
    "fahrenheit":{
      "type":"number"
    },
    "timestamp":{
      "type":"number"
    }
  },
 "required": [ "fahrenheit", "timestamp" ] 
}

The schema created under id schema-from-sensor above describes a JSONschemas containing two fields fahrenheit and timestamp typed as numbers. This schema will be used for incoming data from the device.

{
  "type": "object", 
  "required":[
    "celsius",
    "timestamp"
  ],
  "properties":{
    "celsius":{
      "type":"number"
    },
    "timestamp":{
      "type":"number"
    }
  }
}

The schema created under id schema-for-fan above is almost identical to the previous schema but with the celsius field instead of fahrenheit

As next, we define the data policy to make use of the necessary steps defined above:

{
  "id":"convert-fahrenheit-into-celsius",
  "matching":{
    "topicFilter":"device/#"
  },
  "validation":{
    "validators":[
      {
        "type":"schema",
        "arguments":{
          "strategy":"ALL_OF",
          "schemas":[
            {
              "schemaId":"schema-from-sensor",
              "version":"latest"
            }
          ]
        }
      }
    ]
  },
  "onSuccess":{
    "pipeline":[
      {
        "id":"operation-2eng0",
        "functionId":"Serdes.deserialize",
        "arguments":{
          "schemaVersion":"latest",
          "schemaId":"schema-from-sensor"
        }
      },
      {
        "id":"operation-ek2Mx",
        "functionId":"fn:fahrenheit-to-celsius:latest",
        "arguments":{
          
        }
      },
      {
        "id":"operation-4DBF3",
        "functionId":"Serdes.serialize",
        "arguments":{
          "schemaVersion":"latest",
          "schemaId":"schema-for-fan"
        }
      }
    ]
  },
  "onFailure":{
    "pipeline":[
      {
        "id":"operation-aMNNx",
        "functionId":"Mqtt.drop",
        "arguments":{
          "reasonString":"Your client ${clientId} sent invalid data according to the schema: ${validationResult}."
        }
      }
    ]
  }
}

The data policy executes the following steps:

  1. For all incoming data to the topic device/# the JSON schema validation with the schema schema-from-sensor is executed.

  2. onSuccess: In the case the data is valid according to the schema, i.e., the data contains fahrenheit and timestamp as numbers in the JSON format, the onSuccess pipeline is executed. The pipeline consists of three functions:

    1. Serdes.deserialize, which deserializes the data according to the schema and makes the MQTT payload understandable within the transformation functions.

    2. The actual transformation script referenced by fn:fahrenheit-to-celsius:latest is executed, and the result of that function is passed to the next stage of the pipeline. The latest tag refers to the latest version of the function. Fixed versions can be referenced here too. 

    3. Eventually, the transformed data needs to be serialized by using the function Serdes.serialize and the schema. 

  3. onFailure: In case the incoming data is not valid according to the schema, the onFailure case is executed which makes use of the Mqtt.drop function, which drops the MQTT payload and the messages do not reach any consumer. 

Once the data policy is created, the following animation demonstrates the outcome. The incoming data is shown in the upper terminal publishing the Fahrenheit temperature. The console below shows the transformed data - converted into Celsius.

 HiveMQ Data Hub Data Policy in action

Conclusion

In summary, dealing with diverse data from IoT devices is a common challenge for organizations. The need to convert this data into a standardized format is crucial, especially with evolving device capabilities. This is where the latest Data Transformation feature helps (introduced in the HiveMQ Platform version 4.23).

This feature simplifies the data transformation process by allowing easy script (written in JavaScript), uploads to the broker, and simplifies operations since execution of the scripts is fully automated by HiveMQ Data Hub. This speeds up development cycles, enabling IT and OT engineering teams to quickly iterate applications and achieve better results.

Stefan Frehse

Stefan Frehse is Engineering Manager at HiveMQ. He earned a Ph.D. in Computer Science from the University of Bremen and has worked in software engineering and in c-level management positions for 10 years. He has written many academic papers and spoken on topics including formal verification of fault tolerant systems, debugging and synthesis of reversible logic.

  • Contact Stefan Frehse via e-mail
HiveMQ logo
Review HiveMQ on G2