The HiveMQ MQTT Client Library for Java and Its reactive API Flavor

The HiveMQ MQTT Client Library for Java and Its reactive API Flavor

author Yannick Weber

Written by Yannick Weber

Category: HiveMQ Java Reactive Blocking Async MQTT Client

Published: April 6, 2023


In this third and final part of the series on the API flavors of the HiveMQ MQTT Client we will look at the reactive API flavor. In reactive programming, we think about events and how to react to these events. For example, an event can be an incoming PUBLISH message, and we define how to handle it.

As opposed to the blocking API, which provides a synchronous programming model, the reactive and async API allow for async programming, which enables us to implement a more efficient threading model.

What sets the reactive API apart from the async API is the ability to harness all the advantages of your reactive library of choice. For example, RxJava offers advanced backpressure handling with the Flowable interface. The backpressure support allows you to manage situations where one component produces more events in a set timeframe than another component can process.

How to get started with the Reactive API of the HiveMQ MQTT Client in Java

To start, you will need the HiveMQ MQTT Client library.

Assuming your project uses Gradle add this to your build.gradle.kts to obtain the dependency:

1
2
3
dependencies {
    implementation("com.hivemq:hivemq-mqtt-client:+")
}

Examples

In the following code examples we use RxJava but the HiveMQ MQTT Client also offers a Reactor based API. If you are unfamiliar with reactive programming, we advise you to familiarize yourself with the core concepts of your preferred reactive framework and the respective datatypes.

One key concept we would like to highlight is laziness. Laziness refers to the behavior where the Observable does not emit any items until a reactive subscriber subscribes to it. This means the Observable does not start producing values until a reactive subscriber needs it. We use this concept to independently declare how we react to each event which removes the need for overly nested callbacks.

Reactive Publisher

To implement an MQTT publisher with RxJava do the following:

  1. Use the static method Mqtt5Client.builder() to obtain a builder. Provide the client identifier, the MQTT broker host and the MQTT broker port. Finish the building process with buildRx().
  2. Use disconnect() to define the disconnecting behavior. Attach a callback with doOnComplete to write Successfully disconnected! on the console once the client has successfully disconnected.
  3. Use Flowable.range(0, 10) to create a Flowable that can emit the numbers 0-9 once subscribed to.
    1. Use map to convert these numbers into PUBLISH messages, we use the static method Mqtt5Publish.builder() to build the PUBLISH message. Provide the topic, payload, QoS and complete the instantiation with build()
    2. Define how to behave when all PUBLISHES are successfully processed. The program logs Successfully published! and subscribe to the disconnect-Completable. The behavior only starts when subscribing bBecause of laziness.
    3. doOnError allows us to define the behavior when the sending of a PUBLISH message is unsuccessful.
  4. Last, we define the behavior of connecting to the MQTT broker.
    1. After a successful connection attempt, the application logs Successfully connected!.
    2. Hand the publish-Flowable to the client library and subscribe to the result to trigger the publishing.
    3. doOnError allows us to define the behavior when connecting fails.
    4. Instantly subscribe to start connecting.
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class ReactivePublisher {

    public static void main(final String @NotNull [] args) {
        final var reactiveClient = Mqtt5Client.builder() // 1
                .identifier("reactive-publisher") // 1
                .serverHost("broker.hivemq.com") // 1
                .serverPort(1883) // 1
                .buildRx(); // 1

        final Completable disconnect = reactiveClient.disconnect().doOnComplete(() -> { // 2
            System.out.println("Successfully disconnected!"); // 2
        });

        final Flowable<Mqtt5Publish> publish = Flowable.range(0, 10) // 3
                .map(integer -> Mqtt5Publish.builder() // 3.1
                        .topic("example/topic/reactive") // 3.1
                        .payload(("example #" + integer).getBytes(UTF_8)) // 3.1
                        .qos(MqttQos.AT_LEAST_ONCE) // 3.1
                        .build()) // 3.1
                .doOnComplete(() -> { // 3.2
                    System.out.println("Successfully published!"); // 3.2
                    disconnect.subscribe(); // 3.2
                }).doOnError(throwable -> {
                    System.out.println("Error while publishing!"); // 3.3
                    throwable.printStackTrace(); // 3.3
                });

        reactiveClient.connect() // 4
                .doOnSuccess(connAck -> {
                    System.out.println("Successfully connected!"); // 4.1
                    reactiveClient.publish(publish).subscribe(); // 4.2
                }).doOnError(throwable -> { // 4.3
                    System.out.println("Error while connecting!"); // 4.3
                    throwable.printStackTrace(); // 4.3
                }).subscribe(); // 4.4
    }
}

Reactive Subscriber

To implement an MQTT subscriber with RxJava do the following:

  1. Use the static method Mqtt5Client.builder() to obtain a builder. We provide the client identifier, the MQTT broker host and port. Finish the building process with buildRx().
  2. Before connecting, use publishes() to obtain a Flowable that contains incoming publishes. Subscribe with a callback; the client invokes that callback everytime it receives an incoming PUBLISH message.
  3. Use subscribeWith() to define the behavior of sending the SUBSCRIBE package to the MQTT broker.
    1. With doOnSuccess() ensure that Successfully subscribed! is written to the console after the client library receives the SUBACK.
    2. doOnError allows us to define the behavior when MQTT subscribing fails.
  4. Last, define the behavior of connecting to the MQTT broker.
    1. After a successful connection attempt, the application logs Successfully connected!.
    2. Subscribe to the subAckSingle to trigger the MQTT subscribe behavior. The behavior only starts when subscribing bBecause of laziness.
    3. doOnError allows us to define the behavior when connecting fails.
    4. Instantly subscribe to start connecting.
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class ReactiveSubscriber {

    public static void main(final String @NotNull [] args) {
        final var reactiveClient = Mqtt5Client.builder() // 1
                .identifier("reactive-subscriber") // 1
                .serverHost("broker.hivemq.com") // 1
                .serverPort(1883) // 1
                .buildRx(); // 1

        final Flowable<Mqtt5Publish> publishes = reactiveClient.publishes(MqttGlobalPublishFilter.ALL); // 2
        publishes.subscribe(publish -> { // 2
            System.out.println("Received publish with payload: " + new String(publish.getPayloadAsBytes(), UTF_8)); // 2
        }); // 2

        final Single<Mqtt5SubAck> subAckSingle =
                reactiveClient.subscribeWith().topicFilter("example/topic/#").applySubscribe() // 3
                        .doOnSuccess(mqtt5SubAck -> { // 3.1
                            System.out.println("Successfully subscribed!"); // 3.1
                        }).doOnError(throwable -> { // 3.2
                            System.out.println("Error while subscribing!"); // 3.2
                            throwable.printStackTrace(); // 3.2
                        });

        reactiveClient.connect() // 4
                .doOnSuccess(connAck -> {
                    System.out.println("Successfully connected!"); // 4.1
                    subAckSingle.subscribe(); // 4.2
                }).doOnError(throwable -> { // 4.3
                    System.out.println("Error while connecting!"); // 4.3
                    throwable.printStackTrace(); // 4.3
                }).subscribe(); // 4.4
    }
}

Switching API Flavor

The HiveMQ MQTT Client allows you to simply switch to the blocking or async API flavor using toBlocking() or toAsync(). This enables you to choose the API style that is best fitted for each part of your application. This example shows you how to switch from the reactive API flavor to the async API flavor which is explained in the second post of this series. It also illustrates how to switch from the reactive API flavor to the blocking API flavor from the first post in the series.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
public class MixedClient {
    public static void main(final String @NotNull [] args) {
        final var reactiveClient = Mqtt5Client.builder()
                .identifier("mixed-client")
                .serverPort(1883)
                .serverHost("broker.hivemq.com")
                .buildRx();

        final var asyncClient = reactiveClient.toAsync();
        final var blockingClient = reactiveClient.toBlocking();
    }
}

Conclusion

This concludes the series about the three API flavors of the HiveMQ MQTT Client. Developers who are looking to build MQTT applications that are both performant and scalable should consider utilizing the reactive flavor of the HiveMQ MQTT Client API. Although it may be less intuitive than the blocking API, the reactive API offers a more efficient threading model by enabling reactive programming.

To fully take advantage of the reactive API, developers need to be familiar with reactive libraries in Java, like RxJava or Reactor. However, even if they are not, the reactive API is still a great way to start with reactive programming in Java.

With reactive programming, developers can construct high-performing and scalable applications that can handle vast amounts of data.

author Yannick Weber

About Yannick Weber

Yannick is a Senior Software Engineer and one of the core members of HiveMQ’s product development team. He is focusing on quality development of HiveMQ’s many tools and extensions. In addition, he is the maintainer of the HiveMQ module in the testcontainers-java project.

mail icon Contact Yannick
newer posts Evolution of Software Testing at HiveMQ
The HiveMQ MQTT Client Library for Java and Its Async API Flavor older posts