Skip to content

Real-time Analytics of MQTT Messages Using Elasticsearch, Kibana & HiveMQ

by Anthony Olazabal
10 min read

Integrating Elasticsearch, Filebeat, Kibana, and HiveMQ can be a powerful way to manage and analyze data generated by MQTT traffic. Elasticsearch is a powerful search and analytics engine that can handle large amounts of data. Filebeat is a lightweight shipper for forwarding and centralizing log data. Kibana is an open-source data visualization dashboard for Elasticsearch that provides visualization capabilities on top of the content indexed on an Elasticsearch cluster. HiveMQ is an MQTT (Message Queuing Telemetry Transport) platform that provides a scalable and reliable infrastructure for connecting and managing IoT (Internet of Things) devices. It facilitates efficient communication between devices by implementing the MQTT protocol, allowing for real-time data exchange in a distributed and scalable manner.

In this blog post, we explore how to use Elasticsearch, Filebeat, Kibana, and HiveMQ Broker to do real-time analytics on MQTT traffic. We cover the installation process for each technology, how to configure them to work together, and how to use them to manage and analyze data in real-time.

Our Lab Environment

For this article, we will deploy all the services via docker compose. You can run it on Docker desktop or on a dedicated server running Docker Engine. We recommend having at least:

  • 4 CPU (Cores)

  • 8 Gb of RAM

  • 40 Gb available disk space

Our Custom Docker Compose

To simplify the deployment we’ve prepared an all-in-one docker compose file to start all required services. As the docker compose file is using environment variables to configure the deployment, we need to create a file called .env with the following content:

ELASTIC_PASSWORD=elastic
KIBANA_PASSWORD=kibana
STACK_VERSION=8.11.3
CLUSTER_NAME=docker-cluster
LICENSE=basic
ES_PORT=9200
KIBANA_PORT=5601
MEM_LIMIT=1073741824
COMPOSE_PROJECT_NAME=lab-elastic

Filebeat is using a config file to get its input and output configuration. We will create a new file called filebeat.yml with the following content:

filebeat.inputs:
- type: mqtt
  hosts: 
    - tcp://hivemq001:1883
  topics: 
    - #
  tags: ["mqtt_traffic"]
  fields:
    app_id: my_mqtt_usecase

output.elasticsearch:
  hosts: 'https://es01:9200'
  username: 'elastic'
  password: 'elastic'
  ssl:
    certificate_authorities: "/usr/share/elasticsearch/config/certs/ca/ca.crt"
    certificate: "/usr/share/elasticsearch/config/certs/filebeat/filebeat.crt"
    key: "/usr/share/elasticsearch/config/certs/filebeat/filebeat.key"

Note: Extra configuration for MQTT input can be found in the official documentation.

Start all services with the following command:

sudo docker-compose up

Validate Elasticsearch indexes with the Kibana portal and navigate to Management > Dev Tools:

In Dev Tools, replace all the existing text on the left section of the console, put “GET _cat/indices” and execute it to get the result of the API call on the right:

If all lines start with “green open”, then everything is fine and you can move forward.

MQTT Traffic

In our docker compose file, we’ve integrated HiveMQ Edge to facilitate traffic simulation. Open a browser and access the Control Panel of the Edge http://localhost:8181 (The default username and password are admin / admin).

If you go to the Workspace page, you will see the emulated devices and the bridge connection to HiveMQ Enterprise Broker:

The result of the emulation is a topic tree similar to the following one:

You are now ready to play with data in Kibana.

Verify Data in Kibana

Check the logged MQTT messages in Kibana by navigating to http://localhost:5601/. Use elastic as a username and password (except if you change the value in the .env file).

Go to the discovery page and create a new data view by clicking “Create data view”, specifying a name and an index pattern to use.

Click on “Save data view to Kibana” to visualize the data.

As you can see, we have all information from our MQTT traffic, but the message is still in JSON format which is not convenient to use in Kibana for reporting. So we add a little configuration block in Filebeat to instruct it that we want to extract information from the payload and create columns with that.

filebeat.inputs:
- type: mqtt
  hosts: 
    - tcp://hivemq001:1883
  username: realtime
  password: realtimepassword
  topics: 
    - '#'
  fields_under_root: true
  processors:
    - decode_json_fields:
        fields:
          - message
        process_array: false
        max_depth: 1
        target: ''
        overwrite_keys: false
        add_error_key: true**
  tags: ["mqtt_traffic"]
  fields:
    app_id: my_mqtt_usecase

output.elasticsearch:
  hosts: '<https://es01:9200>'
  username: 'elastic'
  password: 'elastic'
  ssl:
    certificate_authorities: "/usr/share/elasticsearch/config/certs/ca/ca.crt"
    certificate: "/usr/share/elasticsearch/config/certs/filebeat/filebeat.crt"
    key: "/usr/share/elasticsearch/config/certs/filebeat/filebeat.key"

Create a Simple Report

Once we have the data flowing in our Elasticsearch instance, we can use it to create live dashboards. In order to do that, go to “Analytics > Dashboard'' and click “Create visualization”.

In the visualization builder, do the following:

  • Search for “@timestamp” field and drag it to “Horizontal Axis”

  • Search for “value” field and drag it to “Vertical Axis”

  • Change the type of visualization from Bar to Area Stacked

  • Apply a filter on data with the following: ‘mqtt.topic : "wind/farms/saintnazaire/environment/telemetry/temperature”’

Refresh the data and you should get a nice visualization as shown below. Click on the “Save and return” button to add it to the dashboard.

Do the same exercise with other metrics from other topics to build a nice dashboard for real-time analytics as shown below.

Benefits of Integrating These Technologies

Let’s summarize the benefits of using these four technologies together:

  1. Scalability: Elasticsearch is a distributed search engine that can handle large amounts of data. It can be scaled horizontally by adding more nodes to the cluster. This makes it easy to handle large amounts of data generated by MQTT traffic.

  2. Real-time analysis: Filebeat can be used to collect log data from various sources, including MQTT. This data can be sent to Elasticsearch in real-time, where it can be analyzed using Kibana. This allows you to monitor your MQTT traffic in real-time and take action as needed.

  3. Data visualization: Kibana provides a powerful data visualization platform that can be used to create custom dashboards and visualizations. This can help you gain insights into your MQTT traffic and identify patterns and trends.

  4. Ease of use: The integration of Elasticsearch, Filebeat, Kibana, and HiveMQ Platform is seamless and easy to set up. This makes it easy to get started with analyzing your MQTT traffic.

Wrap Up

In this blog post, we explored how to use Elasticsearch, Filebeat, Kibana, and HiveMQ MQTT platform to do real-time analytics on MQTT traffic. We covered the installation process for each technology, how to configure them to work together, and how to use them to manage and analyze data in real-time. This can help you gain insights into your MQTT traffic, identify patterns and trends, and take action as needed.

Resources

You can find the different files from this article in the following GitHub repository.

Anthony Olazabal

Anthony is part of the Solutions Engineering team at HiveMQ. He is a technology enthusiast with many years of experience working in infrastructures and development around Azure cloud architectures. His expertise extends to development, cloud technologies, and a keen interest in IaaS, PaaS, and SaaS services with a keen interest in writing about MQTT and IoT.

  • Contact Anthony Olazabal via e-mail

Related content:

HiveMQ logo
Review HiveMQ on G2