HiveMQ MQTT Client Features: Reconnect Handling

Written by Silvio Giebl

Category: HiveMQ MQTT Client MQTT Client

Published: June 17, 2019


Welcome to the HiveMQ MQTT Client Features blog post series.

We plan to publish a blog post about one feature of the HiveMQ MQTT Client library every week.

Today we will take a closer look at the Reconnect Handling functionality as it was recently released in version 1.1. See the release blog post for the changelog.

Automatic reconnect strategy

Instructing the client to automatically reconnect only requires you to add one line:

1
2
3
4
Mqtt5Client.builder()
        ...
        .automaticReconnectWithDefaultConfig()
        ...

The automatic reconnect strategy uses an exponential backoff algorithm to prevent overload when many clients are reconnecting. By default the client tries to reconnect after 1 second and then doubles the delay for every unsuccessful reconnect attempt. The reconnect delay is capped at a maximum of 2 minutes, so the delay does not increase endlessly. Additionally a random delay of +-25% will be added to better distribute synchronous reconnects of many clients.

The initial delay (1 second) and the maximum delay (2 minutes) can also easily be customized with the following code:

1
2
3
4
5
6
7
Mqtt5Client.builder()
        ...
        .automaticReconnect()
            .initialDelay(500, TimeUnit.MILLISECONDS)
            .maxDelay(3, TimeUnit.MINUTES)
            .applyAutomaticReconnect()
        ...

Custom reconnect strategy

If you want to use your own reconnect strategy, you can implement it by adding a MqttClientDisonnectedListener and using the provided MqttClientReconnector.

In the following example the client will try to reconnect in all cases (even if the user itself called disconnect) and always after 2 seconds:

1
2
3
4
5
6
Mqtt5Client.builder()
        ...
        .addDisconnectedListener(context -> {
            context.getReconnector().reconnect(true).delay(2, TimeUnit.SECONDS);
        })
        ...

If you want to scale the delay linearly for every unsuccessful reconnect attempt, you could do something like the following:

1
2
3
4
5
6
7
8
Mqtt5Client.builder()
        ...
        .addDisconnectedListener(context -> {
            context.getReconnector()
                    .reconnect(context.getSource() != MqttDisconnectSource.USER)
                    .delay(2 * context.getReconnector().getAttempts(), TimeUnit.SECONDS);
        })
        ...

Instead of delaying the reconnect by a fixed time interval, it can also be triggered when a Future completes. Imagine an interface CompletableFuture<Boolean> callService() which decides whether the client should reconnect automatically:

1
2
3
4
5
6
7
8
Mqtt5Client.builder()
        ...
        .addDisconnectedListener(context -> {
            context.getReconnector().reconnectWhen(callService(), (result, throwable) -> {
                context.getReconnector().reconnect(result);
            });
        })
        ...

It is also possible to simultaneously specify a delay and use the reconnectWhen method. The client will then try to reconnect when both are finished.

Customizing the Connect message

Furthermore you are able to customize the Connect message that is used to reconnect. By default the Connect message is either reconstructed from the previous connection or the one of the previous connect attempt is used.

The following example will use the default exponential backoff strategy for reconnecting, but will additionally query an OAuth token and set it as the password in the Connect message.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
Mqtt5Client.builder()
        ...
        .automaticReconnectWithDefaultConfig()
        .addDisconnectedListener(context -> {
            TypeSwitch.when(context).is(Mqtt5ClientDisconnectedContext.class, context5 -> {
                context5.getReconnector().reconnectWhen(callOAuthService(), (result, throwable) -> {
                    context5.getReconnector().connectWith()
                            .simpleAuth().password(result.getToken()).applySimpleAuth()
                            .applyConnect();
                });
            });
        })
        ...

As a MqttClientDisonnectedListener is independent of the MQTT version, but the connect properties differ between MQTT 3 and MQTT 5 you have to switch over the context which can be a Mqtt5ClientDisconnectedContext or Mqtt3ClientDisconnectedContext.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
Mqtt5Client.builder()
        ...
        .addDisconnectedListener(context -> {
            TypeSwitch.when(context)
                    .is(Mqtt5ClientDisconnectedContext.class, context5 -> {
                        context5.getReconnector().connectWith().userProperties()...
                    }).is(Mqtt3ClientDisconnectedContext.class, context3 -> {
                        ... // no user properties available for MQTT 3
                    });
        })
        ...

Customizing the Transport configuration

For high availability the client can also decide to reconnect to another MQTT broker:

1
2
3
4
5
6
7
8
9
Mqtt5Client.builder()
        ...
        .addDisconnectedListener(context -> {
            if (context.getReconnector().getAttempts() > 3) {
                context.getReconnector()
                        .transportConfig().serverHost("another.server.com").applyTransportConfig();
            }
        })
        ...



To summarize, it was demonstrated that the Reconnect Handling of the HiveMQ MQTT Client library is really flexible, allowing rather specific and complex custom implementations, whereas most use cases can rely on automaticReconnectWithDefaultConfig or automaticReconnect with custom parameters.

If you have not already done so, check out the project on GitHub.

Have a great day, Silvio from the HiveMQ Team

About Silvio Giebl

Silvio Giebl is a software developer at HiveMQ and maintainer of the open source library “HiveMQ MQTT Client”. He is interested in high-performance applications on the JVM and reactive programming.
<  Using MQTT and HiveMQ to Build the Connected Car   |   HiveMQ MQTT Client Features: Fluent API   >