Skip to content

The HiveMQ MQTT Client Library for Java and Its Async API Flavor

by Yannick Weber
10 min read

In this second part of the series on the three API flavors of the HiveMQ MQTT Client, we will take a closer look at the async flavor of the HiveMQ MQTT Client API. As opposed to the blocking API, which provides a synchronous programming model, the async API allows for asynchronous programming, which enables us to implement a more efficient threading model.

Here is an example illustrating the difference between the blocking API flavor and the async API flavor: When sending a CONNECT message to the MQTT broker with the blocking API, the current thread is blocked until the client library receives a CONNACK. The thread is blocked during this time and can not be utilized to execute tasks.

In the async API flavor, the current thread is not blocked between sending the CONNECT and receiving the CONNACK. Instead, we tell the application what to do once the CONNACK arrives. Therefore, the current thread can execute tasks, while the client library waits for the CONNACK.

How to get started with the Async API of the HiveMQ MQTT Client in Java

To start, you will need the HiveMQ MQTT Client library.

Assuming your project uses Gradle add this to your build.gradle.kts, in order to obtain the dependency:

dependencies {
    implementation("com.hivemq:hivemq-mqtt-client:+")
}

Examples

When using the async API flavor, each instruction returns a CompletableFuture; it is a class that represents a computation result that may be completed asynchronously in the future either successfully or exceptionally. CompletableFutures allow for chaining and combining multiple asynchronous operations. We use the CompletableFuture to attach handling that executes once the operation is completed.

Async Publisher

You can implement an async publisher as follows:

  1. Use the static method Mqtt5Client.builder() to obtain a builder. Provide the client identifier, the MQTT broker host and port. Finish the building process with buildAsync().

  2. Use connect() to connect to the MQTT broker. Note this call instantly returns a CompletableFuture. That future completes once the client library receives a CONNACK.

  3. Use thenCompose to attach handling to the CompletableFuture that was returned by connect().

    1. First, state that the connection was successful.

    2. Inside the callback use a for-loop to publish 10 messages.

    3. Next, use the static method Mqtt5Publish.builder() to build the PUBLISH message. Provide the topic, payload, QoS and complete the instantiation with build()

    4. Instruct the client library to publish the message. Obtain the returned future and add it to the list. As this call does not wait for the PUBACK from the MQTT broker to send another PUBLISH, the message flows are in parallel.

    5. Combine all publish-futures into a single future that is completed once all publish futures are completed.

  4. Attach a callback that reports Successfully published! and disconnects the MQTT client.

  5. Attach another callback to the disconnect-future that reports Successfully disconnected!.

  6. The exceptionally method helps us to attach error handling, if one occurs during the steps above.

public class AsyncPublisher {

    public static void main(final String @NotNull [] args) {
        final var asyncClient = Mqtt5Client.builder() // 1
                .identifier("async-publisher") // 1
                .serverHost("broker.hivemq.com") // 1
                .serverPort(1883) // 1
                .buildAsync(); // 1

        asyncClient.connect() // 2
                .thenCompose(connAck -> { // 3
                    System.out.println("Successfully connected!"); // 3.1
                    for (int i = 0; i < 10; i++) {
                        final var futures = new ArrayList<>(10);
                        final Mqtt5Publish publish = Mqtt5Publish.builder() // 3.3
                                .topic("example/topic/async") // 3.3
                                .payload(("example #" + i).getBytes(UTF_8)) // 3.3
                                .qos(MqttQos.AT_LEAST_ONCE) // 3.3
                                .build(); // 3.3
                        final var future = asyncClient.publish(publish); // 3.4
                        futures.add(future); // 3.4
                    }
                    return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); // 3.5
                }).thenCompose(unused -> {
                    System.out.println("Successfully published!"); // 4
                    return asyncClient.disconnect();
                }).thenRun(() -> {
                    System.out.println("Successfully disconnected!"); // 5
                }).exceptionally(throwable -> { // 6
                    System.out.println("Something went wrong!"); // 6
                    return null;
                });
    }
}

Async Subscriber

To implement an async subscriber do the following:

  1. Use the static method Mqtt5Client.builder() to obtain a builder. Provide the client identifier, the MQTT broker host and port. Finish the building process with buildAsync().

  2. Before connecting, use publishes() to register a callback. The client library invokes that callback every time it receives an incoming PUBLISH message. Registering the callback before connecting to the MQTT broker is a good practice because it ensures no messages are lost during the connection process. This is because the callback is responsible for handling incoming messages. If it is not registered before connecting, messages can arrive before the callback is ready to handle them, resulting in lost messages.

  3. Use connect() to connect to the MQTT broker. Note this call instantly returns a CompletableFuture. That future completes once the client library receives a CONNACK.

  4. Use thenCompose to attach handling to the CompletableFuture that was returned by connect().

    1. First, state that the connection was successful.

    2. Second, instruct the client library to send a SUBSCRIBE message to the MQTT broker. This call also returns instantly with a CompletableFuture.

  5. Attach a callback to the subscribe-future that prints Successfully subscribed! once the subscribe future is complete.

  6. The exceptionally method helps us to attach error handling, if an error occurs during the steps above.

public class AsyncSubscriber {

    public static void main(final String @NotNull [] args) {
        final var asyncClient = Mqtt5Client.builder() // 1
                .identifier("async-subscriber") // 1 
                .serverHost("broker.hivemq.com") // 1
                .serverPort(1883) // 1
                .buildAsync(); // 1

        asyncClient.publishes(MqttGlobalPublishFilter.ALL, publish -> { // 2
            System.out.println("Received publish with payload: " + new String(publish.getPayloadAsBytes(), UTF_8)); // 2
        }); // 2

        asyncClient.connect()
                .thenCompose(connAck -> { // 4
                    System.out.println("Successfully connected!"); // 4.1
                    return asyncClient.subscribeWith().topicFilter("example/topic/#").send(); // 4.2
                }).thenRun(() -> {
                    System.out.println("Successfully subscribed!"); // 5
                }).exceptionally(throwable -> { // 6
                    System.out.println("Something went wrong!"); // 6
                    return null; // 6
                });
    }
}

Switching API Flavor

The HiveMQ MQTT Client allows you to simply switch to the blocking or reactive API flavor using toBlocking() or toRx(). This will enable you to choose the API style best fitted for each part of your application. The following example shows you how to switch from the async API flavor to the blocking API flavor broken down in the first post of this series. It also illustrates how to switch from the async API flavor to the reactive API flavor that we discuss in the third and last post in the series.

public class MixedClient {
   public static void main(final String @NotNull [] args) {
      final var asyncClient = Mqtt5Client.builder()
              .identifier("mixed-client")
              .serverPort(1883)
              .serverHost("broker.hivemq.com")
              .buildAsync();

      final var blockingClient = asyncClient.toBlocking();
      final var reactiveClient = asyncClient.toRx();
   }

}

Conclusion

To conclude, the async flavor of the HiveMQ MQTT Client API provides a powerful tool for developers who want to build highly performant and scalable MQTT applications. While it may be less intuitive than the blocking API, the async API allows for asynchronous programming, which enables a more efficient threading model and can lead to significant performance gains.

To fully take advantage of the async API, developers must be familiar with asynchronous programming concepts in Java, such as CompletableFutures. However, even if they are not, the async API is still a great way to get started with asynchronous programming in Java.

Overall, the Async flavor of the HiveMQ MQTT Client API is a valuable addition to any developer’s toolkit. By leveraging the power of asynchronous programming, developers can build applications that can handle large volumes of data, while still maintaining high performance and scalability.

Yannick Weber

Yannick is a Senior Software Engineer and one of the core members of HiveMQ's product development team. He has a strong interest in messaging technologies and is focusing on quality development of HiveMQ's many tools and extensions. In addition, he is the maintainer of the HiveMQ module in the testcontainers-java project.

  • Contact Yannick Weber via e-mail
HiveMQ logo
Review HiveMQ on G2