The HiveMQ MQTT Client Library for Java and Its Blocking API Flavor

The HiveMQ MQTT Client Library for Java and Its Blocking API Flavor

author Yannick Weber

Written by Yannick Weber

Category: HiveMQ Java Reactive Blocking Async MQTT Client

Published: April 4, 2023


As a software engineer, you need a robust MQTT client library to build your IoT applications in Java. The HiveMQ MQTT library will help you handle the communication between your device and the MQTT broker. This library is a popular choice among Java developers due to its reliability and ease of use.

This three-part blog post series will examine the three API flavors of the HiveMQ MQTT Client:

  • Blocking
  • Async
  • Reactive

In the first post, we introduce the blocking API, a synchronous programming model easy to understand and use. The second post will dive deep into the Async API and the final and third post will introduce the Reactive API. All three articles will explore the unique features and benefits of each approach, plus provide code examples.

With the blocking API, you can easily connect to a broker, subscribe to topics, publish messages, and disconnect when finished. The API’s design is simple and intuitive, with easy-to-understand method calls that make it easy to start programming an MQTT client application. Let’s take a closer look at the blocking API and understand when to best use it.

How to get started with the Blocking API of the HiveMQ MQTT Client in Java

To start, you will need the HiveMQ MQTT Client library.

Assuming your project uses Gradle add this to your build.gradle.kts, to obtain the dependency:

1
2
3
dependencies {
    implementation("com.hivemq:hivemq-mqtt-client:+")
}

Examples

With the blocking API, the program execution is stopped or “blocked” until an operation completes. For example, after sending a ‘CONNECT’ request to the MQTT broker, the next instruction executes only after receiving the CONNACK from the MQTT broker. This programming style is the most intuitive of the three as it follows an imperative step-by-step execution familiar to programmers. Error handling follows the traditional try/catch pattern. To illustrate the usage of this API flavor, the following is an example of a client that subscribes to an MQTT topic and a client that publishes messages to an MQTT topic.

Blocking Publisher

Implement a blocking publisher as follows:

  1. Use the static method Mqtt5Client.builder() to obtain a builder. Provide the client identifier, the MQTT broker host and port. Finish the building process with buildBlocking().
  2. Use connect() to connect to the MQTT broker. Note this call blocks until the MQTT broker sends a CONNACK message. Therefore, the client application prints Successfully connected! only after a successful connection attempt. If any error occurs during connecting, the catch clause is executed.
  3. The goal is to publish 10 messages, as the blocking API favors imperative programming, so use a traditional for-loop.
  4. Next, we use the static method Mqtt5Publish.builder() to build the PUBLISH message. We provide the topic, payload, QoS and complete the instantiation with build()
  5. Publish the message. Note, that this call blocks until the client library receives a PUBACK, only then the loop continues and the next PUBLISH is built and sent. This also implies that only one QoS=1 message flow happens in parallel. If any error occurs during publishing, they are handled in the catch clause.
  6. After we sent all 10 PUBLISH messages and received all 10 PUBACK messages, we print Successfully published!.
  7. To disconnect, we use disconnect(). The catch clause handles every error that occurs during disconnecting. After successfully disconnecting from the MQTT broker, the program writes Successfully disconnected! to the console.
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class BlockingPublisher {

    public static void main(final String @NotNull [] args) {
        final var blockingClient = Mqtt5Client.builder() // 1
                .identifier("blocking-publisher") // 1
                .serverHost("broker.hivemq.com") // 1
                .serverPort(1883) // 1
                .buildBlocking(); // 1

        try {
            blockingClient.connect(); // 2
        } catch (final Exception e) {
            System.out.println("Error while connecting!"); // 2
            throw e; // 2
        }
        System.out.println("Successfully connected!"); // 2

        for (int i = 0; i < 10; i++) { // 3
            try {
                final Mqtt5Publish publish = Mqtt5Publish.builder() // 4
                        .topic("example/topic/blocking") // 4
                        .payload(("example #" + i).getBytes(UTF_8)) // 4
                        .qos(MqttQos.AT_LEAST_ONCE) // 4
                        .build(); // 4
                blockingClient.publish(publish); // 5
            } catch (final Exception e) {
                System.out.println("Error while publishing!");  // 5
                throw e; // 5
            }
        }
        System.out.println("Successfully published!"); // 6
        try {
            blockingClient.disconnect(); // 7
        } catch (final Exception e) {
            System.out.println("Error while disconnecting!"); // 7
            throw e; // 7
        }
        System.out.println("Successfully disconnected!"); // 7
    }
}

Blocking Subscriber

To implement a blocking subscriber do the following:

  1. Use the static method Mqtt5Client.builder() to obtain a builder. Provide the client identifier, the MQTT broker host and port. Finish the building process with buildBlocking().
  2. Before connecting use publishes(MqttGlobalPublishFilter.ALL) to register a Mqtt5Publishes object that holds all incoming PUBLISH messages. Registering the Mqtt5Publishes before connecting to the MQTT broker is a good practice because it ensures no messages are lost during the connection process. This is because the callback is responsible for handling incoming messages. If it is not registered before connecting, messages can arrive before the Mqtt5Publishes object is ready to handle them, resulting in lost messages.
  3. Use connect() to connect to the MQTT broker. Note, this call blocks until the MQTT broker sends a CONNACK message. Therefore, the application prints Successfully connected! only after a successful connection attempt. If any error occurs during connecting, the catch clause is executed.
  4. After a successful connection attempt, we use subscribeWith() to start building a SUBSCIBE message. Provide the topic filter and use send() to write the SUBSCRIBE message to the MQTT broker. Note, that this call blocks until the application receives the SUBACK. Only after that, Successfully subscribed! is printed. If any error occurs the catch clause handles them.
  5. Now, we use the Mqtt5Publishes object from step 2 to work with incoming publishes. The publishes.receive() call halts the execution of the program until the application receives an incoming PUBLISH message. After we obtain an Mqtt5Publish we can log the payload to the console. To receive further publishes, use a while-loop. Using while(true) is only for demonstration purposes, the application will only stop after being externally terminated.
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class BlockingSubscriber {

    public static void main(final String @NotNull [] args) throws Exception {
        final var blockingClient = Mqtt5Client.builder() // 1
                .identifier("blocking-subscriber") // 1
                .serverHost("broker.hivemq.com") // 1
                .serverPort(1883) // 1
                .buildBlocking(); // 1

        try (final Mqtt5Publishes publishes = blockingClient.publishes(MqttGlobalPublishFilter.ALL)) { // 2
            try {
                blockingClient.connect(); // 3
            } catch (final Exception e) {
                System.out.println("Error while connecting!"); // 3
                throw e; // 3
            }
            System.out.println("Successfully connected!"); // 3

            try {
                blockingClient.subscribeWith().topicFilter("example/topic/#").send(); // 4
            } catch (final Exception e) {
                System.out.println("Error while subscribing!"); // 4
                throw e; // 4
            }
            System.out.println("Successfully subscribed!"); // 4

            while (true) { // 5
                final Mqtt5Publish publish = publishes.receive(); // 5
                System.out.println("Received publish with payload: " + new String(publish.getPayloadAsBytes(), UTF_8)); // 5
            }
        }
    }
}

Switching API Flavor

The HiveMQ MQTT Client allows you to simply switch to the reactive or async API flavor using toAsync() or toRx(). This enables you to choose the API style best fitted for each part of your application. This example shows you how to switch from the blocking API flavor to the async API flavor which is explained in the second post of this series. It also illustrates how to switch from the blocking API flavor to the reactive API flavor from the third and last post in the series.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
public class MixedClient {
    public static void main(final String @NotNull [] args) {
        final var blockingClient = Mqtt5Client.builder()
                .identifier("mixed-client")
                .serverPort(1883)
                .serverHost("broker.hivemq.com")
                .buildBlocking();

        final var asyncClient = blockingClient.toAsync();
        final var reactiveClient = blockingClient.toRx();
    }
}

Conclusion

In conclusion, the blocking API of the HiveMQ MQTT Client is a powerful tool for developing MQTT clients in Java. It allows you to write code in a linear, step-by-step manner. This makes it easier to understand the flow of control. With blocking APIs, you can simply call a method and wait for the response to return before continuing with the next step in your program. Although easy to understand, the blocking API does not allow you the possibility to use threads efficiently. Therefore, we recommend using the blocking flavor only when performance and latency are no issue, for example in automated tests. If you’re interested in writing an application with a more efficient threading model, consider looking at the Async flavor or the Reactive flavor of the HiveMQ MQTT Client. In the next posts in this series, we will introduce these flavors and explain how they can help you to build highly performant and scalable MQTT-based applications. Stay tuned to learn more about these powerful tools!

author Yannick Weber

About Yannick Weber

Yannick is a Senior Software Engineer and one of the core members of HiveMQ’s product development team. He is focusing on quality development of HiveMQ’s many tools and extensions. In addition, he is the maintainer of the HiveMQ module in the testcontainers-java project.

mail icon Contact Yannick
newer posts The HiveMQ MQTT Client Library for Java and Its Async API Flavor
HiveMQ MQTT C# Client (BETA) older posts