Getting MQTT Data to InfluxDB Native Collector using HiveMQ Cloud
Written by Kudzai Manditereza
Published: September 6, 2022
Industrial facilities consist of physical assets and processes that evolve through time. Therefore, each data point generated by such systems is essentially a snapshot of events at that particular point in time.
By extension, this data wants to be stored in a way that reflects the sequential order of events so that it can be rapidly queried and analysed, hence the importance of time-series databases like InfluxDB.
In this article, I’m going to give you a step by step guide for integrating MQTT data from industrial assets into InfluxDB cloud via HiveMQ MQTT Broker. However, instead of using theTelegraf InfluxDB server agent for forwarding MQTT messages from the broker to the database, I’m going to use InfuxDB’s recently launched Native Collectors feature that effectively turns InfluxDB into an MQTT client. Benefit: You remove the need for third party apps.
Table of Contents
- System Architecture to Demonstrate Bringing MQTT Data to InfluxDB
- Step 1: Deploying HiveMQ MQTT Broker on AWS Cloud
- Step 2: Configuring Raspberry Pi as MQTT Client
- Step 3: Configuring Opto22 Groov Rio as an MQTT Client
- Step 4: Configuring InfluxDB Cloud to Receive MQTT Messages
- Step 5: Visualising MQTT Telemetry on InfluxDB
- Conclusion: Easily Get MQTT Data to InfluxDB Cloud
System Architecture to Demonstrate Bringing MQTT Data to InfluxDB
To demonstrate, I’ll generate and publish MQTT data from two devices to a HiveMQ MQTT Broker Cluster deployed on AWS Cloud. Then I’ll configure my InfluxDB 2.0 Cloud instance to subscribe to the same topics, pull the MQTT data and persist onto the time-series database using its inbuilt Native Collectors feature.
Step 1: Deploying HiveMQ MQTT Broker on AWS Cloud
I’ll begin by deploying my HiveMQ MQTT Broker Cluster on AWS Cloud. And to do that, I’ll visit the HiveMQ website and click on the Get HiveMQ button.
This brings up the page that allows you to launch a virtual machine on an AWS EC2 instance. I’ll click on HiveMQ AMI for AWS.
After that, you’ll see a series of steps that allows you to specify your AWS region, size of your virtual machine in terms of memory and physical storage, and also configure your security settings. You can follow this link for a detailed guide on how to configure your AWS EC2 instance for purposes of deploying a HiveMQ MQTT Broker cluster.
Once my HiveMQ MQTT Broker is successfully deployed, I can access its dashboard at port 8080 of my AWS EC2 instance’s assigned public IP address.
Now that my HiveMQ MQTT broker is up and running, I can start publishing messages to it from my devices. Let’s take a look at how I’ve configured the devices to publish MQTT telemetry to my broker.
Step 2: Configuring Raspberry Pi as an MQTT Client
My first device is a Raspberry Pi 3B+ that has, connected to its GPIO pins, a DHT 11 Temperature and Humdity sensor generating telemetry temperature and humidity data points.
On the Raspberry Pi, I’m running a Node-Red flow that is collecting temperature and humidity data points, and then joining them with other static properties to form a JSON object.
For reading the temperature and humidity data, I’m using a DHt22 Node-Red package which I’ve configured as below to read data from my Raspberry Pi GPIO pin 4.
Next, I’m using a function block on Node-Red to extract the temperature as below.
Followed by humidity extraction as below.
Once I have my temperature and humidity data, I then use a JOIN node to join the two data points with other static properties to form a JSON object.
Finally, I use an MQTT node to connect to my HiveMQ MQTT Broker cluster on AWS Cloud and start publishing MQTT messages with the JSON object as payload, under the topic: influxdbdemo/jsontelemetry
Step 3: Configuring Opto22 Groov Rio as an MQTT Client
Once my Raspberry Pi is successfully connected and publishing MQTT messages to my HiveMQ broker, I move on to configuring my Groov Rio device to do the same.
Only this time, I’m using Opto22’s Groov Node-Red package to read the temperature from the device’s analog input channel, combine the data with other static properties to form a JSON payload and publish it to the same HiveMQ Broker under the same topic.
Since I’m now successfully publishing MQTT messages from both of my devices, I can now configure my InfluxDB Cloud instance to start receiving the messages and persist them onto the time-series database.
Step 4: Configuring InfluxDB Cloud to Receive MQTT Messages
The first thing that I’ll do is to visit the InfluxDB website and sign in to my InfluxDB 2.0 Cloud instance. If you don’t already have an account you can create one, it’s free.
Once I’m signed in, I’ll select the upward facing arrow icon and proceed by clicking on Native Subscriptions.
This brings up a page where you can choose to Create a Subscription. When you select that, it will bring up a page where you can enter details that are required to successfully subscribe to the HiveMQ MQTT broker and start receiving the MQTT messages.
The first section includes your MQTT broker details such as the name for your MQTT subscription, its description, the hostname of your broker and the port number. I’ve entered my details as shown below.
Once that is done, I can move onto the next section by scrolling down. In this section I need to specify the MQTT topic on which I wish to subscribe to receive the messages, and the InfluxDb bucket into which I want to store the telemetry messages as time-series data.
After that, I can move on to the next section, Define Data Parsing Rules.
In order to directly ingest MQTT messages into an InfluxDB bucket, I need to define parsing rules that help map each property on my JSON payload to the appropriate InfluxDB element (measurement, timestamp, field or tag)
As you can see from the image above, there are three possible data formats to use with this nartive MQTT subscription feature, and I currently have the JSON option selected.
To specify the parsing rules, you use the JSON path to the key you want to use for each element. JSON paths start with a “$.”. So I start by specifying my time property as the timestamp, and my device_type property as the measurement element in InfluxDB.
Next, I specify the Tag elements, starting with my device_id property and giving it a user friendly name. Followed by my temperature property.
Next, I specify my error_state and humidity properties under Tags.
When I’m done I can then save the configuration and make sure to start the subscription.
Step 5: Visualising MQTT Telemetry on InfluxDB
Finally, once I have successfully connected to my MQTT Broker, subscribed to a topic and defined the parsing rules I can go on to create dashboards on InfluxDB and start to visualise the data that is being persisted to my InfluxDB bucket.
Conclusion: Easily Get MQTT Data to InfluxDB Cloud
In conclusion, I used a flat JSON object for my payload but InfluxDB’s Native Subscription allows you to persist complex JSON objects received from an MQTT Broker. Also, you can use MQTT wildcards. You can get started with MQTT and InfluxDB by setting up a free MQTT broker on HiveMQ Cloud.