Skip to content

What's your UNS maturity level? Get a custom report: Take the UNS Maturity Assessment

Build Stateful IoT Data Pipelines with HiveMQ Data Hub

by Kamiel Straatman
16 min read

HiveMQ Data Hub, a part of the HiveMQ Platform, is an integrated policy and data transformation engine designed to ensure data integrity and quality across your MQTT deployment. It enables businesses to enforce data policies, transform schemas, and validate incoming messages, ensuring that IoT data is accurate, standardized, and ready for decision-making.

Data Hub represents a major departure from the traditional MQTT approach, where brokers have typically been fully data-agnostic. With Data Hub being an integral part of the HiveMQ Platform, we can validate payload message structures and—based on its findings—correct, re-route, drop, or even disconnect the producing client. This allows you to implement declarative policies that check whether your data sources are sending data in the expected format, and correct or enhance it if necessary.

Stateful Data Transformations with HiveMQ Data Hub 4.38+?

Data Hub now supports stateful data transformations, allowing the broker to remember and act on past messages—not just process them in isolation. This unlocks richer processing patterns like running averages, time-based aggregations, or event sequence recognition. For example, you can now calculate a moving average of the last 10 temperature readings, directly in the broker.

Stateless is like speaking to a stranger every time—you have to reintroduce yourself every single time. There is no memory or context. It might be interesting to tell everything about yourself all over again, but it gets frustrating over time and leads nowhere. Stateful processing is like talking to a friend who remembers what you said yesterday, helping you build on the conversation meaningfully.

With the stateful Data Hub, you can now implement data pipelines with cumulative aggregation, anomaly detection across time windows, or pattern recognition across multiple message streams.

Stateful operations are powered by a combination of streaming logic and persistent memory. You define how much history to retain, for how long, and on what conditions—offering fine-grained control over how your transformations behave.

We’re excited to see how our users will leverage stateful processing to build smarter, more context-aware applications with HiveMQ. And if you haven’t tried HiveMQ Data Hub yet, we offer a free five-hour trial option to explore all its features.

Example Highlighting Core Capabilities of HiveMQ Data Hub 

Let’s walk you through an example that highlights three core capabilities of Data Hub:

  • Contextualization

  • Conversion

  • Statefulness message handling

Contextualization in MQTT means enriching messages with extra information (like device type, location, or timestamps) to make them more meaningful. It helps subscribers interpret raw data correctly without needing separate lookups. This reduces the number of backend dependencies.

So we use Data Hub to add timestamped data, which is unknown to the simple sensor, to the consumer ready-dataframe.

Conversion is the process of converting source data that is not the format that is expected by the backend. For example, we simulate a device that reports a temperature in Fahrenheit but that information needs to be consumed in degrees Celsius. 

So we use Data Hub to convert from one metric to another. This prevents downstream systems from having to clean or adjust incoming data.

We use the Statefulness feature of HiveMQ Data Hub to calculate a moving average along a series of incoming temperature values. A moving average smooths out fluctuations in data by averaging values over a sliding window. It’s used to reveal trends by reducing the impact of short-term variations. A moving average needs to take into account not only the current value but also a number of previous transmitted values. 

Statefulness feature of HiveMQ Data HubSo we use the HiveMQ Data Hub statefulness feature to correlate incoming values into a single moving average value over time. The example above uses 3 values, but our example will use 6. A moving average smooths out short-term fluctuations, highlights longer-term trends, and reduces noise in time-series data for clearer analysis.

The result is shown in the graph below.

Data Hub statefulness feature to correlate incoming values into a single moving average value over time

Putting It All Together

HiveMQ Data Hub needs a number of objects in order to work. First, you define your input and output data schemas using JSON (or Protobuf). These schemas validate incoming data and enforce structure. If you want to alter the payload, you need a transformation script that defines these modifications. Lastly, you need a policy that defines what to do depending on schema evaluation; drop, do nothing, redirect or execute the script. 

In the implementation example, we use two JSON schemas. One defines the input JSON schema and only requires a temperature as a number field.

The output schema requires an additional field containing the moving average of the temperature from the last 6 incoming MQTT messages.

Example Highlighting Core Capabilities of HiveMQ Data HubTo calculate the moving average, we need to write a JavaScript script that is to be executed by a data policy. Through this script, a stateful dataset containing the last 6 previous values is created and updated when new data comes in. The duration of state is bound to an MQTT session. You can declare any JavaScript data type as a state and update the value for every MQTT message.

The following example demonstrates the general mechanics of the feature:

  • A client publishes temperature values to the broker on a specific topic.

  • A transformation script collects the last 10 values and computes an average value.

The diagram shows the state progression when a client sends temperature values to a topic over time:

the state progression when a client sends temperature values to a topic over time

Code Description

At the top of the diagram, you see the state over time previousValues []; initially an empty array. Once the client publishes the first message with a temperature of 5 (yellow box), it is added first to the last 10 temperature values, resulting in an array of [5] and an average value of 5 being published. As more data is sent to the broker, temperature values are added to the array, and the state is updated for the average output of every message.

In a Data Hub transformation script, you can use the addClientConnectionState function to create a state for a client connection. In this example, a JavaScript object { previousValues: [], average: 0 } is created and initialized. Once an MQTT client connects to the broker, the state is initialized with the defined values. When the client disconnects from the broker, the state is cleared from the memory.

In Action

To see this in action, we created a GitHub repository containing all the necessary files. This repository includes files for defining schemas for data validation, scripts for data transformation, and policies to tie everything together. Simply clone the repository into a working directory of your choice.

To test the Stateful Data Hub yourself, download a non-licensed version of the HiveMQ Enterprise Broker, which can run on any laptop computer. HiveMQ Broker can be installed by following the instructions here.

Start your broker and make sure the API is enabled. Detailed information on how to do this can be found here. Also, you can refer to the readme.md from the GitHub repo.

Now enable the broker’s Data Hub functionality for testing by executing:

curl -X POST localhost:8888/api/v1/data-hub/management/start-trial

The HiveMQ Broker Control Center, found on http://localhost:8080, gives you great insight into the inner workings of Data Hub. In the Control Center, you can find Data Hub context menus on the left side of the screen. As an alternative, you can also enable Data Hub by selecting the 5-hour trial option in the Data Hub menus of Control Center.

Get the Resources in HiveMQ Data Hub

With the HiveMQ CLI (see https://github.com/hivemq/mqtt-cli), you upload the following schemas, script, and policy definition to the Data Hub of your broker:

  • mqtt hivemq schema create --id=temp_schema  --file temp_schema.json --type json

  • mqtt hivemq schema create --id=temp_avg_schema  --file temp_avg_schema.json --type json

  • mqtt hivemq script create -i moving_avg --file moving_avg.js --type transformation

  • mqtt hivemq data-policy create --file data-policy-Calculate-mov-avg.json

This can also be done manually with the GUI if you prefer.

Now in one terminal screen, execute the mqtt sub -t ‘#” command to view the (by processing now modified) data ‘produced’ by the broker. 

In another terminal screen, publish some random temperature values by opening the HiveMQ CLI in shell mode (mqtt shell), connect to the broker by issuing connect, and send a sequence of pub’s as shown in the rotating GIF example below: 

pub -t test -m '{ "temperature": 20}'

Uploading the schemas, script, and policy definition to the Data Hub of your broker

Results

The Data Hub policy and script will add the temperature field from the JSON input data (sent to "topicFilter": "#") to an in-memory list containing a maximum of six values. Then, the broker-based script will calculate the average of these values and add it to the processed data. 

For debugging purposes, the array of previous values is also included in the output, although this is usually redundant information.

As stated earlier, a moving average is a statistical method that calculates the average of a data set over a specified number of periods, continuously updating as new data is added and smoothing out peaks in the incoming data.

You may have also noticed that the output data is contextualized by adding a timestamp field.

Conclusion

HiveMQ Data Hub now empowers you to take control of your MQTT data at the source—validating, transforming, and enriching it in real time. With the addition of stateful processing, you can go beyond simple payload checks to implement rolling averages, detect anomalies, and correlate events over time—all without complex backend logic.

This shift simplifies your architecture, improves data quality, and accelerates time-to-insight—making every MQTT message more valuable to your business. 

Want to get started fast? Try this example today using HiveMQ Data Hub with our free five-hour trial and see what’s possible when your data pipeline gets smarter at the edge.

Kamiel Straatman

Kamiel is part of Solutions Engineering team at HiveMQ. Based out of the Netherlands, Kamiel is an IoT enthusiast with many years of experience working in the data center domain and Azure & AWS cloud architectures. He specializes in ICT infrastructure consulting with a strong focus on cloud, cloud automation, datacenter transformation, security, storage and networking.

  • Contact Kamiel Straatman via e-mail
HiveMQ logo
Review HiveMQ on G2