Skip to content

HiveMQ to PostgreSQL: Dual Insert with Custom SQL

by Anthony Olazabal
8 min read

When integrating MQTT data streams with relational databases, one size rarely fits all. The default persistence behavior might work for simple use cases, but real-world industrial IoT scenarios often demand more sophisticated data handling, parsing JSON payloads, populating multiple tables, or implementing upsert logic to avoid duplicates.

In this tutorial, we'll walk through how to configure the HiveMQ PostgreSQL Extension to use a custom SQL statement, giving you complete control over how your MQTT messages are persisted. This allows you to unlock advanced data transformation and persistence patterns by configuring custom INSERT statements for your MQTT-to-PostgreSQL workflows. We'll use a real-world example: transforming sensor data for integration with your favorite reporting tool for industrial analytics.

Why Custom SQL Statements for Industrial IoT Workflows?

The HiveMQ PostgreSQL Extension provides a straightforward way to persist MQTT messages to a PostgreSQL database. Out of the box, it handles the basics well. But consider these scenarios:

  • JSON payload parsing: Your devices publish structured JSON, and you need to extract specific fields into dedicated columns.

  • Multi-table inserts: A single MQTT message should populate both a reference table and a time-series table.

  • Conflict handling: You want to insert new records but gracefully handle duplicates with ON CONFLICT clauses.

  • Data transformation: Timestamps need parsing, values need casting, or units need normalization.

Custom SQL statements unlock all of these capabilities.

Prerequisites for Custom SQL Configuration

Before diving in, ensure you have:

Step 1: Understanding the Placeholder System

The PostgreSQL Extension injects MQTT message data into your SQL using named placeholders. Here are the most commonly used ones:

Placeholder

Description

Typical Data Type

${mqtt-topic}

The MQTT topic of the message

TEXT

${mqtt-qos}

Quality of Service level

TEXT

${mqtt-payload-utf8}

The message payload as UTF-8 text

TEXT

${client-id}

The publishing client's identifier

TEXT

${timestamp-iso-8601}

Message reception timestamp

TIMESTAMP

The ${mqtt-payload-utf8} placeholder is particularly powerful when combined with PostgreSQL's JSON functions, as we'll see in our example.

Step 2: Setting Up Our Tables

In order to insert our data into the database, we need two tables. One will contain the tags list, and the second will contain the tags’ values in a time-series way.

Tags:

-- Reference table for sensor/tag metadata
CREATE TABLE Tags (
    tagname VARCHAR(255) PRIMARY KEY,
    description VARCHAR(255),
    unit VARCHAR(50),
    datatype VARCHAR(50)
);

TagsValues:

-- Time-series table for sensor values
CREATE TABLE TagsValues (
    id SERIAL PRIMARY KEY,
    tagname VARCHAR(255) NOT NULL,
    timestampUtc TIMESTAMP NOT NULL,
    value NUMERIC,
    FOREIGN KEY (tagname) REFERENCES Tags(tagname)
);

Step 3: Crafting Your Custom SQL Statement

Create a file named statement.sql in your extension's configuration directory (typically <HIVEMQ_HOME>/extensions/hivemq-postgresql-extension/conf/).

Here's a real-world example that handles sensor data for TrendMiner integration. This statement does something clever: it parses a JSON array of sensor readings, ensures each sensor exists in a reference table, and then inserts the actual values into a time-series table.

WITH sensor_data AS (
    SELECT
        sensor->>'id' AS tagname,
        (sensor->>'timestamp')::timestamp AS timestampUtc,
        (sensor->>'value')::numeric AS value,
        sensor->>'type' AS description,
        sensor->>'unit' AS unit
    FROM jsonb_array_elements(${mqtt-payload-utf8}::jsonb -> 'payload') AS sensor
),
tag_insert AS (
    INSERT INTO Tags (tagname, description, unit, datatype)
    SELECT tagname, description, unit, 'STRING'
    FROM sensor_data
    ON CONFLICT (tagname) DO NOTHING
    RETURNING tagname
)
INSERT INTO TagsValues (tagname, timestampUtc, value)
SELECT tagname, timestampUtc, value
FROM sensor_data;

Let's break down what's happening here:

  1. The sensor_data CTE extracts individual sensor readings from a JSON array in the MQTT payload, casting each field to its appropriate PostgreSQL type.

  2. The tag_insert CTE ensures every sensor tag exists in the reference table. The ON CONFLICT DO NOTHING clause means we won't fail on duplicates; we simply skip them.

  3. The final INSERT persists the actual sensor values with their timestamps into the time-series table.

This pattern is incredibly powerful for IoT data pipelines where you need both metadata management and high-frequency value ingestion.

Step 4: Configuring the HiveMQ PostgreSQL Extension

Now, point the extension to your custom statement. Open your extension configuration file (usually postgresql-extension.xml) and configure a route that uses the statement template processor:

<hivemq-postgresql-extension xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
                             xsi:noNamespaceSchemaLocation="config.xsd">
    <postgresqls>
        <postgresql>
            <id>my-postgresql-id</id>
            <host>localhost</host>
            <port>5432</port>
            <database>postgres</database>
            <username>user</username>
            <password>password</password>
        </postgresql>
    </postgresqls>

    <mqtt-to-postgresql-routes>
        <mqtt-to-postgresql-route>
            <id>trendminer-sensor-route</id>
            <postgresql-id>my-postgresql-id</postgresql-id>
            <mqtt-topic-filters>
                <mqtt-topic-filter>sensors/#</mqtt-topic-filter>
            </mqtt-topic-filters>
            <processor>
                <statement-template>conf/statement.sql</statement-template>
            </processor>
        </mqtt-to-postgresql-route>
    </mqtt-to-postgresql-routes>
</hivemq-postgresql-extension>

The key element here is the <statement-template> tag, which tells the extension to load your custom SQL from the specified file path.

Step 5: Validating and Deploying the Custom SQL Statement

Before restarting HiveMQ, run through this checklist:

  1. Verify file paths: Ensure the path in <statement-template> correctly resolves to your statement.sql file.

  2. Test your SQL: Run your statement manually against PostgreSQL (with sample data substituted for placeholders) to catch syntax errors early.

  3. Check table schemas: Confirm that table and column names in your SQL match your actual database schema.

Once validated, start HiveMQ:

./bin/run.sh

Step 6: Testing the Integration

With HiveMQ running, publish a test message to verify everything works:

mqtt pub -t "sensors/factory-1" -m '{
  "payload": [
    {"id": "temp-001", "timestamp": "2025-12-15T10:30:00Z", "value": 23.5, "type": "temperature", "unit": "celsius"},
    {"id": "humidity-001", "timestamp": "2025-12-15T10:31:00Z", "value": 45.2, "type": "humidity", "unit": "percent"}
  ]
}'

Then query your database to confirm the data landed correctly:

SELECT * FROM Tags;
SELECT * FROM TagsValues ORDER BY timestampUtc DESC LIMIT 10;

Troubleshooting Tips

If messages aren't appearing in your database:

  • Check the HiveMQ logs: In <HIVEMQ_HOME>/log/, check for SQL parsing errors or connection issues.

  • Verify placeholder syntax: Ensure you're using ${placeholder-name} format exactly as documented.

  • Test database connectivity: Confirm the extension can reach your PostgreSQL instance with the configured credentials.

  • Validate JSON structure: If you're parsing JSON payloads, ensure incoming messages match the expected structure.

Wrap Up

Custom SQL statements transform the HiveMQ PostgreSQL Extension from a simple persistence layer into a flexible data integration tool. Whether you're parsing complex JSON payloads, implementing idempotent inserts, or feeding analytics platforms like TrendMiner, this approach gives you the control you need.

The pattern we demonstrated using CTEs for multi-table operations with conflict handling is just one example. You could extend this further with triggers, stored procedures, or even PostgreSQL's powerful NOTIFY/LISTEN for real-time downstream processing.

Want to learn more about HiveMQ's enterprise extensions? Check out our PostgreSQL Extension documentation or contact our team to discuss your integration requirements.

Anthony Olazabal

Anthony is part of the Solutions Engineering team at HiveMQ. He is a technology enthusiast with many years of experience working in infrastructures and development around Azure cloud architectures. His expertise extends to development, cloud technologies, and a keen interest in IaaS, PaaS, and SaaS services with a keen interest in writing about MQTT and IoT.

  • Contact Anthony Olazabal via e-mail
HiveMQ logo
Review HiveMQ on G2