MQTT Client Library Encyclopedia - HiveMQ MQTT Client

Written by Clive Jevons

Category: MQTT MQTT Client Library

Published: February 25, 2019




Short info

HiveMQ MQTT Client
Language Java
License Apache License 2.0
Website https://github.com/hivemq/hivemq-mqtt-client
API Style Synchronous, Asynchronous and Reactive

Description

The HiveMQ MQTT Client library was created in order to provide a fast, low-overhead, high-throughput and modern MQTT library for Java by building on modern frameworks like Netty for handling networking and RxJava for handling the asynchronous streaming of messages. It was created with backend applications in mind, but can be used in any Java based project.

The library provides three distinct flavours of API: blocking, asynchronous and reactive. This means you can choose a programming style which best meets your use-case. For example, for a very simple application which just starts up, sends some messages in order and shuts down again, the blocking API fits very well. For backend applications the asynchronous API will fit for most scenarios. If you want full access to all of the features the library provides, then the reactive API is the way to go.

Another very interesting feature of the HiveMQ MQTT Client is its backpressure handling, which can be used in conjunction with MQTT 5 brokers in order to communicate that the incoming load is to high and ask the producers to throttle their output back, for example.

Features

Feature
MQTT 3.1 nok
MQTT 3.1.1 ok
MQTT 5.0 ok
SSL/TLS ok
WebSocket Transport ok
QoS 0 ok
QoS 1 ok
QoS 2 ok
Feature
Authentication ok
Automatic Reconnect soon
Disk Persistence nok
Offline Message Buffering ok
Backpressure Handling ok


Usage

The following is a quick summary of the basic usage of the asynchronous API with a MQTT 3 broker. The full documentation can be found here: https://hivemq.github.io/hivemq-mqtt-client

Installation

The HiveMQ MQTT Client library is available as a Maven artifact, so the easiest way to include it in your project is by referencing it in your dependencies. In gradle this would be:

1
2
3
dependencies {
    compile group: 'com.hivemq', name: 'hivemq-mqtt-client', version: '1.0.0'
}

Creating the Client

The HiveMQ MQTT Client API is often based around a builder pattern and a fluent API based approach. Here’s how that looks when building the client:

1
2
3
4
5
6
Mqtt3AsyncClient client = MqttClient.builder()
        .useMqttVersion3()
        .identifier(UUID.randomUUID().toString())
        .serverHost("broker.hivemq.com")
        .serverPort(1883)
        .buildAsync();

As you can probably see from the method names, you end up specifying which version of MQTT you want to use as well as which style of API you want to use while you are building the client. However, you can later transform the client to any of the other API styles by using the methods toRx, toAsync and toBlocking as needed.

Connecting

Once we’ve created the client, we can trigger a connect to the broker and also define callbacks to handle success and error cases when the connection is completed.

1
2
3
4
5
6
7
8
client.connect()
        .whenComplete((connAck, throwable) -> {
            if (throwable != null) {
                // Handle connection failure
            } else {
                // Setup subscribes or start publishing
            }
        });

As the inline comments in the code example suggest, the completion callback can be used to handle connection failures, e.g. logging the error, and also a successful connection, e.g. subscribe to some topics or start publishing your own messages.

Connecting with Authentication

This example shows how to provide simple authentication credentials:

1
2
3
4
5
6
7
8
9
client.connectWith()
        .simpleAuth()
            .username("my-user")
            .password("my-password".getBytes())
            .applySimpleAuth()
        .send()
        .whenComplete((connAck, throwable) -> {
            // Handle connection complete
        });

Connecting with Will

Similar to the way you connect with authentication, the fluent API provides the possibility of specifying the Will:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
client.connectWith()
        .willPublish()
            .topic("my/will")
            .payload("payload".getBytes())
            .qos(MqttQos.AT_MOST_ONCE)
            .retain(true)
            .applyWillPublish()
        .send()
        .whenComplete((connAck, throwable) -> {
            // Handle connection complete
        });

Publish

In order to build and execute a publish you would use code similar to this:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
client.publishWith()
        .topic("my/topic")
        .payload(myPayload)
        .qos(MqttQos.EXACTLY_ONCE)
        .send()
        .whenComplete((mqtt3Publish, throwable) -> {
            if (throwable != null) {
                // Handle failure to publish
            } else {
                // Handle successful publish, e.g. logging or incrementing a metric
            }
        });

Adding a callback to handle the completion of the publish is entirely optional and will depend on your use-case.

Publish retained message

Just add .retain(true) when you’re building the publish in order to have it be retained.

Subscribe

When creating a subscription, you provide a callback which is used for each incoming message via the .callback(...) builder method, and you can optionally also provide a callback on the returned completable future in order to handle the completion of the actual subscription (using whenComplete):

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
client.subscribeWith()
        .topicFilter("my/topic")
        .callback(publish -> {
            // Process the received message
        })
        .send()
        .whenComplete((subAck, throwable) -> {
            if (throwable != null) {
                // Handle failure to subscribe
            } else {
                // Handle successful subscription, e.g. logging or incrementing a metric
            }
        });

Unsubscribe

To unsubscribe from a topic, simply call:

1
2
3
client.unsubscribeWith()
        .topicFilter("my/topic")
        .send();

You can optionally also use, e.g., whenComplete on the returned completable future in order to handle the completion of the unsubscribe operation.

Disconnect

Just call disconnect() on the client object created previously.

SSL / TLS

In order to use SSL / TLS for connecting to the broker, you simply specify its usage while building the client. The simplest way is to use the default config:

1
2
3
4
5
6
Mqtt3AsyncClient client = MqttClient.builder()
        .useMqttVersion3()
        .serverHost("localhost")
        .serverPort(8883);
        .useSslWithDefaultConfig()
        .buildAsync();

But you can also customise the configuration. The following shows providing custom key manager and trust manager factories:

1
2
3
4
5
6
7
8
9
Mqtt3AsyncClient client = MqttClient.builder()
        .useMqttVersion3()
        .serverHost("localhost")
        .serverPort(8883);
        .useSsl()
            .keyManagerFactory(myKeyManagerFactory)
            .trustManagerFactory(myTrustManagerFactory)
            .applySslConfig()
        .buildAsync();

About Clive Jevons

Clive is a freelance programmer who has specialised in backend Java applications and has recently being doing a lot of work in the field of automotive communication between vehicles and backend applications using MQTT. He is also involved in the development of the HiveMQ MQTT Client library.

Website

<  HiveMQ 4.0.2 released   |   MQTT 5 - Now an Approved OASIS Standard   >