Flagging Bad Actors At Scale in IoT and MQTT Deployments
The HiveMQ Data Hub is an integrated policy engine in the broker that can validate, enforce, and manipulate data in motion. It helps organizations that have a large amount of devices transmitting MQTT data to ensure data quality and integrity, ultimately empowering organizations to maximize the business value of the data being transported by the HiveMQ platform. Even more specifically, the Data Hub prevents bad actors from potential outages that bring down infrastructures by quickly identifying the source and mitigating the issue.
What is a bad actor? MQTT is a data-agnostic protocol, meaning it can transmit any arbitrary byte sequence. However, producers and consumers want to understand each other. For that reason, contracts in the form of schema definitions are exchanged. These definitions describe how the data is laid out in the sequence of bytes. Commonly known schema definitions are, e.g., JSONschema, and Protobuf.
An often desired behavior is that consumers only read MQTT messages, which conform to a defined schema. That means clients publishing MQTT messages that do not match the schema shouldn’t reach the consumers and will require special attention. These clients are called bad actors since they send garbage data that is not precisely usable in downstreaming services. Even more, they can intermittently risk the reliable functioning of these services.
Why do they need special attention? Suppose MQTT messages are stored in a data lake for further investigation, e.g., to answer specific data science questions. Bad actors continuously publish non-conforming data into the data lake and subsequently:
- Causes unnecessary high storage and data transmission costs 
- Requires extra effort to filter out, especially extra CPU cycles, and maybe even multiple times for each query 
Schema Handling
Bad actors can be blocked by defining and enforcing a schema. With HiveMQ Data Hub, policies can be created to define actions for valid and invalid data. Filtering out bad actors can be effectively done at the broker level. Hence, consumers are only reading valid data, and in the case of the data lake, it stores only valid data making data science tasks significantly more straightforward and cost-effective.
 Representation of various clients publishing valid and invalid MQTT data.
Representation of various clients publishing valid and invalid MQTT data.
Consider the diagram above, where various clients are publishing valid and invalid data to HiveMQ Enterprise. The HiveMQ Enterprise Extension for PostgreSQL forwards all the traffic to a PostgreSQL database which builds up a data lake. As you can see, valid and invalid messages are passed through the database.
Observability
Conceptually, we have filtered out bad actors and established a clean and valuable data lake by defining policies and schemas. As a next step, we want to make bad actors visible in order to get them fixed. A potential root cause for bad actors might be that the sending clients are simply not sending to the correct topic and have an outdated software version.
Even more, vendors update their devices without further coordination with down-streaming services leading to a rather unstable situation.
Consequently, identifying these bad actors – more precisely in the context of MQTT knowing the clientId is required. Once the client is identified, making it visible which data the client is sending reduces the time to fix. In the context of MQTT, redirecting the bad messages isolates these bad data from good data for further inspection.
Capabilities required:
- Identify the source of data with respect to the clientId 
- Redirect bad data to another topic for further inspection and to keep the data pipeline healthy 
Approach
In this section, we describe how we implement the described requirements using our new HiveMQ Data Hub:
- Bad data is identified by validating the expected schema definition which can pass or fail 
- Each message violating the schema definition is extended by an MQTT User Property mentioning the respective clientId 
- After adding the user property, the message is redirected to a configured topic. 
- Moreover, a user property is added with the specific validation error to further reduce the time to fix 
For the sake of simplicity we assume any kind of schema definition. More importantly, consider the following policy:
{
  "id":"flag-bad-clients",
  "matching":{
    "topicFilter":"site/#"
  },
  "validation":{
    "validators":[
      {
        "type":"schema",
        "arguments":{
          "strategy":"ANY_OF",
          "schemas":[
            {
              "schemaId":"site-schema",
              "version":"latest"
            }
          ]
        }
      }
    ]
  },
  "onFailure":{
    "pipeline":[
      {
        "id":"addValidationResults",
        "functionId":"UserProperties.add",
        "arguments":{
          "name":"error",
          "value":"${validationResult}"
        }
      },
      {
        "id":"addClientId",
        "functionId":"UserProperties.add",
        "arguments":{
          "name":"clientId",
          "value":"${clientId}"
        }
      },
      {
        "id":"addOriginalTopic",
        "functionId":"UserProperties.add",
        "arguments":{
          "name":"origin",
          "value":"${topic}"
        }
      },
      {
        "id":"redirectBadData",
        "functionId":"Delivery.redirectTo",
        "arguments":{
          "topic":"invalid/${clientId}/${policyId}",
          "applyPolicies":false
        }
      }
    ]
  }
}In the following, we describe the defined policy:
- The topicFilter is defined for - site/#which means all MQTT messages have this topic filter and are given a prefix that is handled by the policy
- The validation step is using a single schema with the schemaId - site-schemawhich we assume is present as JSONSchema in the broker
- The - onFailuresection has several steps we will describe. For each invalid MQTT message, the following functions are executed:- A user property - erroris added to the MQTT message with the validation error $- {validationResult}
- A user property - clientIdis added to the MQTT message with- clientIdsending this message
- A user property - originis added to the MQTT message with the original topic the message was sent to
- Eventually, the MQTT message is redirected to the topic - invalid/${clientId}/${policyId}. The clientId and policyId are interpolating during runtime.
 
Several steps are executed for each invalid MQTT message and eventually redirected to a topic, that can be used for further inspection. In our example, the topic is written to a dedicated database table using the HiveMQ Enterprise Extension for PostgreSQL.
 Isolation of valid and invalid MQTT messages for data quality
Isolation of valid and invalid MQTT messages for data quality
In the diagram above, the valid and invalid MQTT messages are isolated into the different outgoing topics, visualized as different colored circles - red and green. Consequently, there is a database table containing only MQTT messages and one that is used for further debugging the bad actors, which streams using the extension into an existing PostgreSQL database.
The demo use case described in this article is based on another demonstration that introduces a quality metric and visualizes it in a Grafana dashboard. An addition to the quality metric is the list of bad clients queried from a PostgreSQL database with the following query:
SELECT
    SPLIT_PART(topic, '/', 2) as "clientID",
    COUNT(*) as "number"
FROM bad_clients
GROUP BY SPLIT_PART(topic, '/', 2)
ORDER BY COUNT(*) DESC
LIMIT 10The query lists the 10 clients with the highest number of bad messages in descending order.
A screenshot of the dashboard includes the Data Quality on the left hand side and list of the Top 10 Bad Clients on the right hand side.
 Grafana Dashboard
Grafana Dashboard
Try The Demo
The complete demo is available in our demo repository. The complete setup with data generation is included. A simple docker compose up -d brings the demo to live.
Conclusion
IoT applications often come with a massive amount of sensors and actuators in the form of MQTT clients publishing and subscribing data over MQTT. If no precautions are taken, proper coordination is challenging, for instance, finding clients with unfavorable MQTT protocol usage and disconnecting them. These tasks are often done manually and are very time-consuming. However, in this article, we suggested an approach to make those clients more visible to increase the observability of your entire IoT application and improve your data pipelines’ reliability.
Since the number of devices in an IoT application is continuously growing, the task of managing them effectively increases as well. Bad clients may send intentional or unintentional bad data to the broker, which may cause instability in downstreaming services. We have proposed a couple of building blocks to filter out valid and invalid messages. Moreover, invalid messages will be handled in an isolated way, such as a database table, for further inspection. We’ve also shown how using a Grafana dashboard to query data from the database will yield a good overview of the state of bad clients.
To go deeper into how HiveMQ Data Hub can help your organization ensure data quality and integrity, please download our white paper Measuring the Quality of Your Data Pipeline.
 
 Stefan Frehse
Stefan Frehse is Senior Engineering Manager at HiveMQ. He earned a Ph.D. in Computer Science from the University of Bremen and has worked in software engineering and in c-level management positions for 10 years. He has written many academic papers and spoken on topics including formal verification of fault tolerant systems, debugging and synthesis of reversible logic.
