Resilience Engineering of an MQTT Client Application

Resilience Engineering of an MQTT Client Application

author Yannick Weber

Written by Yannick Weber

Category: HiveMQ Test Container Testing MQTT Client Resilience Engineering Toxiproxy Docker

Published: March 21, 2023


Resilience engineering deals with designing and developing systems that can withstand and recover from unexpected events and failures. In today’s world of connected devices and IoT, resilience engineering has become an essential aspect of software development, especially for applications that require real-time communication, like autonomous vehicles (connected cars), manufacturing and industrial process control systems, and QA in the food and beverage industry.

MQTT was designed with unreliable and unstable networks in mind, making it an ideal choice for communication between IoT devices, which often have limited connectivity options. This means that it is important to test the resilience of an MQTT client application in the face of connection losses and other unexpected events.

In this blog post, we demonstrate how to test the robustness of your MQTT client application.

What you need

To start resilience engineering of your MQTT client, you will need the following:

Assuming your client project uses Gradle, add this to your build.gradle.kts, to obtain third-party dependencies:

1
2
3
4
5
6
7
8
dependencies {
    implementation("com.hivemq:hivemq-mqtt-client:1.3.0")
    testImplementation("org.junit.jupiter:junit-jupiter-api:5.9.2")
    testImplementation("org.testcontainers:junit-jupiter:1.17.6")
    testImplementation("org.testcontainers:hivemq:1.17.6")
    testImplementation("org.testcontainers:toxiproxy:1.17.6")
    testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine:5.9.2")
}

Example

For demonstration, we have created a simple MQTT client application that publishes numbers from 0 to 100 at a 100ms interval. We use serial numbers to test that the ordering of the messages remains correct without duplicates or missing messages when the connection between the client and the server is impaired.

Note the following implementation details

  1. The client library is configured to automatically reconnect. This means that the client tries to reconnect to the server once it detects a broken connection.
  2. The client connects with a keepAlive of one second. If no MQTT control packets are successfully exchanged within this timeframe, the client regards a connection to the server as broken. For more information read The MQTT Essentials Series.
  3. The client connects with a persistent session by setting cleanStart=false and noSessionExpiry. With this setting, all message flows between the server and the client will resume at the correct stage upon reconnecting. For more information read The MQTT Essentials Series.
  4. The client publishes the messages with Quality of Service EXACTLY_ONCE which means that no duplicates or message loss happens. For more information read The MQTT Essentials Series.
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class MqttApplication {

    private final @NotNull Mqtt5Client mqttClient;

    public MqttApplication(final @NotNull String serverHost, final int serverPort) {
        mqttClient = Mqtt5Client.builder()
                .identifier("mqtt-application")
                .serverHost(serverHost)
                .serverPort(serverPort)
                .automaticReconnect() // 1
                .initialDelay(1, TimeUnit.SECONDS) // 1
                .maxDelay(10, TimeUnit.SECONDS) // 1
                .applyAutomaticReconnect() // 1
                .build();
    }

    public void start() {
        mqttClient.toBlocking().connectWith()
                .keepAlive(1) // 2
                .cleanStart(false) // 3
                .noSessionExpiry() // 3
                .send();
        final Flowable<Mqtt5Publish> publishFlowable = Flowable.intervalRange(0, 100, 0, 100, TimeUnit.MILLISECONDS)
                .map(aLong -> Mqtt5Publish.builder()
                        .topic("test/topic")
                        .payload(String.valueOf(aLong).getBytes(StandardCharsets.UTF_8))
                        .qos(MqttQos.EXACTLY_ONCE) // 4
                        .build()).onBackpressureBuffer();
        mqttClient.toRx().publish(publishFlowable).onBackpressureBuffer().subscribe();
    }
}

To test the application, we create an integration test with Junit5.

  1. We use the @Testcontainers annotation so that the container lifecycle methods of all fields annotated with @Container are invoked.
  2. We create a docker network to connect Toxiproxy and HiveMQ.
  3. We instantiate a HiveMQ Container and add it to the docker network. We use withNetworkAliases("hivemq") to make HiveMQ reachable under that DNS address within the docker network.
  4. To simulate connection failures, we create a Toxiproxy container and add it to the docker network.
  5. We use a Toxiproxy client to encapsulate the proxy connection between the MqttApplication and HiveMQ, allowing us to simulate network failures. Here, we provide the network alias of HiveMQ and the standard port for MQTT.
  6. To test that all messages are correctly published, we create a second MQTT client to receive them and connect them directly to the HiveMQ container. The client subscribes to # to receive all published messages.
  7. Now we instantiate and start the MQTT application to test. This starts the application to publish numbers from 0 to 100 at a 100ms interval. Rather than directly connecting to the HiveMQ container, we connect to the proxy that forwards the connection to HiveMQ.
  8. After one second, we cut the connection between HiveMQ and the MQTT application using timeout. This stops all data from getting from the client to the server. The connection remains open, and data will be delayed until the connection is re-enabled.
  9. When one second has passed, we re-enable the connection between HiveMQ and the client.
  10. After another second, we cut the connection between HiveMQ and the MQTT application using bandwidth. This limits the connection to a maximum number of 0 kilobytes per second and leads to a failed MQTT connection.
  11. We wait one second, and remove the limit to the connection bandwidth.
  12. After this, we iterate over all messages that the test client from step 6 receives and assert that the messages are in order and no messages are duplicated or lost.
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
@Testcontainers // 1
public class MqttApplicationIT {

    private static final @NotNull DockerImageName TOXIPROXY_IMAGE =
            DockerImageName.parse("ghcr.io/shopify/toxiproxy:latest");
    private static final @NotNull DockerImageName HIVEMQ_IMAGE = DockerImageName.parse("hivemq/hivemq-ce:latest");

    private final @NotNull Network network = Network.newNetwork(); // 2

    @Container // 1
    public final @NotNull HiveMQContainer hivemq = new HiveMQContainer(HIVEMQ_IMAGE) // 3
            .withNetwork(network) // 3
            .withNetworkAliases("hivemq"); // 3

    @Container // 1
    private final @NotNull ToxiproxyContainer toxiproxy = new ToxiproxyContainer(TOXIPROXY_IMAGE) // 4
            .withNetwork(network); // 4

    @AfterEach
    void tearDown() {
        network.close();
    }

    @Test
    void testMqttApplication() throws Exception {
        final ToxiproxyClient toxiproxyClient = new ToxiproxyClient(toxiproxy.getHost(), toxiproxy.getControlPort()); // 5
        final Proxy proxy = toxiproxyClient.createProxy("hivemqProxy", "0.0.0.0:8666", "hivemq:1883"); // 5

        final Mqtt5BlockingClient testClient = Mqtt5Client.builder() // 6
                .serverHost(hivemq.getHost()) // 6
                .serverPort(hivemq.getMqttPort()) // 6
                .buildBlocking(); // 6
        testClient.connect(); // 6
        final Mqtt5BlockingClient.Mqtt5Publishes publishes = testClient.publishes(MqttGlobalPublishFilter.ALL); // 6
        testClient.subscribeWith().topicFilter("#").send(); // 6

        final MqttApplication mqttApplication = new MqttApplication(toxiproxy.getHost(), toxiproxy.getMappedPort(8666)); // 7
        mqttApplication.start(); // 7

        Thread.sleep(1000); // 8
        proxy.toxics().timeout("timeout-down", ToxicDirection.DOWNSTREAM, 0); // 8
        proxy.toxics().timeout("timeout-up", ToxicDirection.UPSTREAM, 0); // 8
        System.out.println("Cut connection with timeout"); // 8

        Thread.sleep(1000); // 9
        proxy.toxics().get("timeout-down").remove(); // 9
        proxy.toxics().get("timeout-up").remove(); // 9
        System.out.println("Re-opened connection"); // 9

        Thread.sleep(1000); // 10
        proxy.toxics().bandwidth("bandwidth-down", ToxicDirection.DOWNSTREAM, 0); // 10
        proxy.toxics().bandwidth("bandwidth-up", ToxicDirection.UPSTREAM, 0); // 10
        System.out.println("Cut connection with bandwidth"); // 10

        Thread.sleep(1000); // 11
        proxy.toxics().get("bandwidth-down").remove(); // 11
        proxy.toxics().get("bandwidth-up").remove(); // 11
        System.out.println("Re-opened connection"); // 11

        for (int i = 0; i < 100; i++) { // 12
            final Mqtt5Publish receive = publishes.receive(); // 12
            final String payloadAsString = new String(receive.getPayloadAsBytes(), StandardCharsets.UTF_8); // 12
            System.out.println("Received message with payload: " + payloadAsString); // 12
            assertEquals(String.valueOf(i), payloadAsString); // 12
        }
    }
}

Conclusion

In conclusion, resiliency engineering testing is an essential step for ensuring the reliability and robustness of your MQTT client application. By simulating various failure scenarios and testing the application’s ability to handle them, you can identify weaknesses and vulnerabilities and make the necessary improvements.

We have demonstrated how to apply resilience engineering techniques to an MQTT client application with a simple example of continuously publishing numbers from 0 to 100 at a 100ms interval. Using Toxiproxy and HiveMQ together, we simulated network failures in a controlled environment.

Overall, by following the techniques outlined in this guide, you can increase the fault tolerance of your MQTT client application, and minimize the risk of downtime, data loss, and other negative consequences. So, don’t overlook the importance of resiliency engineering, and make it an integral part of your software development process.

author Yannick Weber

About Yannick Weber

Yannick is a Senior Software Engineer and one of the core members of HiveMQ’s product development team. He is focusing on quality development of HiveMQ’s many tools and extensions. In addition, he is the maintainer of the HiveMQ module in the testcontainers-java project.

mail icon Contact Yannick
newer posts Transferring Data from Modbus to MQTT Broker for Advanced IIoT Data Use Cases
Enhanced Security with MQTT Client Credentials Feature on HiveMQ Cloud older posts