Getting MQTT Data to InfluxDB Native Collector using HiveMQ Broker

Getting MQTT Data to InfluxDB Native Collector using HiveMQ Broker

author Kudzai Manditereza

Written by Kudzai Manditereza

Category: Real-World MQTT MQTT

Published: September 6, 2022


Industrial facilities consist of physical assets and processes that evolve through time. Therefore, each data point generated by such systems is essentially a snapshot of events at that particular point in time.

By extension, this data wants to be stored in a way that reflects the sequential order of events so that it can be rapidly queried and analysed, hence the importance of time-series databases like InfluxDB.

In this article, I’m going to give you a step by step guide for integrating MQTT data from industrial assets into InfluxDB cloud via HiveMQ MQTT Broker. However, instead of using theTelegraf InfluxDB server agent for forwarding MQTT messages from the broker to the database, I’m going to use InfuxDB’s recently launched Native Collectors feature that effectively turns InfluxDB into an MQTT client. Benefit: You remove the need for third party apps.

Table of Contents

System Architecture to Demonstrate Bringing MQTT Data to InfluxDB

To demonstrate, I’ll generate and publish MQTT data from two devices to a HiveMQ MQTT Broker Cluster deployed on AWS Cloud. Then I’ll configure my InfluxDB 2.0 Cloud instance to subscribe to the same topics, pull the MQTT data and persist onto the time-series database using its inbuilt Native Collectors feature.

System Architecture Demonstrating how to bring MQTT Data to InfluxDB

System Architecture Demonstrating how to bring MQTT Data to InfluxDB

Step 1: Deploying HiveMQ MQTT Broker on AWS Cloud

I’ll begin by deploying my HiveMQ MQTT Broker Cluster on AWS Cloud. And to do that, I’ll visit the HiveMQ website and click on the Get HiveMQ button.

How to get HiveMQ Cloud MQTT Broker
How to get HiveMQ Cloud MQTT Broker

This brings up the page that allows you to launch a virtual machine on an AWS EC2 instance. I’ll click on HiveMQ AMI for AWS.

How to get HiveMQ Cloud MQTT Broker for AWS
How to get HiveMQ Cloud MQTT Broker for AWS

After that, you’ll see a series of steps that allows you to specify your AWS region, size of your virtual machine in terms of memory and physical storage, and also configure your security settings. You can follow this link for a detailed guide on how to configure your AWS EC2 instance for purposes of deploying a HiveMQ MQTT Broker cluster.

Once my HiveMQ MQTT Broker is successfully deployed, I can access its dashboard at port 8080 of my AWS EC2 instance’s assigned public IP address.

Configuring the HiveMQ Cloud MQTT Broker
Configuring the HiveMQ Cloud MQTT Broker

Now that my HiveMQ MQTT broker is up and running, I can start publishing messages to it from my devices. Let’s take a look at how I’ve configured the devices to publish MQTT telemetry to my broker.

Step 2: Configuring Raspberry Pi as an MQTT Client

My first device is a Raspberry Pi 3B+ that has, connected to its GPIO pins, a DHT 11 Temperature and Humdity sensor generating telemetry temperature and humidity data points.

Configuring Raspberry Pi as the MQTT Client
Configuring Raspberry Pi as the MQTT Client

On the Raspberry Pi, I’m running a Node-Red flow that is collecting temperature and humidity data points, and then joining them with other static properties to form a JSON object.

Node-Red flow collecting temperature and humidity data points
Node-Red flow collecting temperature and humidity data points

For reading the temperature and humidity data, I’m using a DHt22 Node-Red package which I’ve configured as below to read data from my Raspberry Pi GPIO pin 4.

Next, I’m using a function block on Node-Red to extract the temperature as below.

Followed by humidity extraction as below.

Once I have my temperature and humidity data, I then use a JOIN node to join the two data points with other static properties to form a JSON object.

{
"temperature":27,
"device_id":1002,
"time":1660242495828,
"device_type":"raspberry_pi",
"Error_state":"normal_operation",
"model_id":"RASPBERRY-PI-3B",
"humidity":64
}

Finally, I use an MQTT node to connect to my HiveMQ MQTT Broker cluster on AWS Cloud and start publishing MQTT messages with the JSON object as payload, under the topic: influxdbdemo/jsontelemetry

Step 3: Configuring Opto22 Groov Rio as an MQTT Client

Once my Raspberry Pi is successfully connected and publishing MQTT messages to my HiveMQ broker, I move on to configuring my Groov Rio device to do the same.

Configuring Opto22 Groov Rio as an MQTT Client
Configuring Opto22 Groov Rio as an MQTT Client

Only this time, I’m using Opto22’s Groov Node-Red package to read the temperature from the device’s analog input channel, combine the data with other static properties to form a JSON payload and publish it to the same HiveMQ Broker under the same topic.

Opto22 Groov Node-Red package
Opto22 Groov Node-Red package

Since I’m now successfully publishing MQTT messages from both of my devices, I can now configure my InfluxDB Cloud instance to start receiving the messages and persist them onto the time-series database.

Step 4: Configuring InfluxDB Cloud to Receive MQTT Messages

The first thing that I’ll do is to visit the InfluxDB website and sign in to my InfluxDB 2.0 Cloud instance. If you don’t already have an account you can create one, it’s free.

InfluxDB Cloud
InfluxDB Cloud

Once I’m signed in, I’ll select the upward facing arrow icon and proceed by clicking on Native Subscriptions.

InfluxDB Cloud Native Collectors user interface
InfluxDB Cloud Native Collectors’ user interface

This brings up a page where you can choose to Create a Subscription. When you select that, it will bring up a page where you can enter details that are required to successfully subscribe to the HiveMQ MQTT broker and start receiving the MQTT messages.

The first section includes your MQTT broker details such as the name for your MQTT subscription, its description, the hostname of your broker and the port number. I’ve entered my details as shown below.

InfluxDB Cloud Native Collectors user interface to add Broker details
InfluxDB Cloud Native Collectors’ user interface to add Broker details

Once that is done, I can move onto the next section by scrolling down. In this section I need to specify the MQTT topic on which I wish to subscribe to receive the messages, and the InfluxDb bucket into which I want to store the telemetry messages as time-series data.

 InfluxDB Cloud Native Collectors user interface to add MQTT topics
InfluxDB Cloud Native Collectors’ user interface to add MQTT topics

After that, I can move on to the next section, Define Data Parsing Rules.

In order to directly ingest MQTT messages into an InfluxDB bucket, I need to define parsing rules that help map each property on my JSON payload to the appropriate InfluxDB element (measurement, timestamp, field or tag)

InfluxDB Cloud Native Collectors user interface
InfluxDB Cloud Native Collectors’ user interface to add MQTT topics

As you can see from the image above, there are three possible data formats to use with this nartive MQTT subscription feature, and I currently have the JSON option selected.

To specify the parsing rules, you use the JSON path to the key you want to use for each element. JSON paths start with a “$.”. So I start by specifying my time property as the timestamp, and my device_type property as the measurement element in InfluxDB.

Next, I specify the Tag elements, starting with my device_id property and giving it a user friendly name. Followed by my temperature property.

InfluxDB Cloud Native Collectors user interface to define data parsing rules
InfluxDB Cloud Native Collectors’ user interface to define data parsing rules

Next, I specify my error_state and humidity properties under Tags.

InfluxDB Cloud Native Collectors user interface to define data parsing rules
InfluxDB Cloud Native Collectors’ user interface to define data parsing rules

When I’m done I can then save the configuration and make sure to start the subscription.

InfluxDB Cloud Native Collectors user interface to load MQTT data
InfluxDB Cloud Native Collectors’ user interface to load MQTT data

Step 5: Visualising MQTT Telemetry on InfluxDB

Finally, once I have successfully connected to my MQTT Broker, subscribed to a topic and defined the parsing rules I can go on to create dashboards on InfluxDB and start to visualise the data that is being persisted to my InfluxDB bucket.

InfluxDB Cloud Native Collectors’ user interface showing temperature data

InfluxDB Cloud Native Collectors’ user interface showing temperature data

Conclusion: Easily Get MQTT Data to InfluxDB Cloud

In conclusion, I used a flat JSON object for my payload but InfluxDB’s Native Subscription allows you to persist complex JSON objects received from an MQTT Broker. Also, you can use MQTT wildcards. You can get started with MQTT and InfluxDB by setting up a free MQTT broker on HiveMQ Cloud.

author Kudzai Manditereza

About Kudzai Manditereza

Kudzai is an experienced Technology Communicator and Electronic Engineer based in Germany. As a Developer Advocate at HiveMQ, his goals include creating compelling content to help developers and architects adopt MQTT and HiveMQ for their IIoT projects. In addition to his primary job functions, Kudzai runs a popular YouTube channel and Podcast where he teaches and talks about IIoT and Smart Manufacturing technologies. He has since been recognized as one of the Top 100 global influential personas talking about Industry 4.0 online.

Follow Kudzai on LinkedIn

mail icon Contact Kudzai
newer posts Latest Research Shows MQTT is Seeing Increased Adoption in IoT
IoT Tutorial on how to establish bidirectional MQTT communication on ESP32 older posts