How to Test MQTT Client Applications

How to Test MQTT Client Applications

author Yannick Weber

Written by Yannick Weber

Category: HiveMQ Testing Test Container MQTT Client

Published: April 11, 2022


Automatic testing of MQTT client applications is a challenging task since an MQTT Broker deployment of some kind is required. If you use a locally installed MQTT Broker for testing, you can not be sure that the tests run with the same results in a different environment. For example, when the tests are run on the machine of a coworker or a continuous integration environment. When you use a public MQTT broker, it is always possible for other clients to interfere with your test. On top of that, you are fully dependent on the uptime of the public MQTT broker: if the broker is unavailable, your tests fail. The solution to this problem is to automatically start an exclusive MQTT broker for each integration test and destroy it afterwards. This solution can be realized with the help of the official HiveMQ Testcontainers Module.

Requirements

  • MQTT application to test
  • Docker for running HiveMQ containers
  • Java libraries
    • JUnit 4 or JUnit 5 as a testing framework
    • HiveMQ Testcontainer for starting and stopping HiveMQ containers
    • Mockito as a mocking framework
    • HiveMQ MQTT Client to create test data

Example MQTT application

As an example, we can test the implementation of the MQTT client that is embedded inside a SmartAquarium. The implementation has the following functionality that needs to be tested:

  • Light: can be turned on and off by MQTT messages with the topic ’equipment/light’ and the payload ‘ON’ or ‘OFF’.
  • CO2 injection: can be turned on and off by MQTT messages with the topic ’equipment/co2’ and the payload ‘ON’ or ‘OFF’.
  • Pump: can be turned on and off by MQTT messages with the topic ’equipment/pump’ and the payload ‘ON’ or ‘OFF’.
  • Temperature Sensor: measures the water temperature and publishes it with the topic ‘status/temperature’.

The java implementation of the SmartAquariumClient looks something like this:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
public class SmartAquariumClient implements MqttCallback {

    public static final String LIGHT_TOPIC = "equipment/light";
    public static final String CO2_TOPIC = "equipment/co2";
    public static final String PUMP_TOPIC = "equipment/pump";
    public static final String TEMPERATURE_TOPIC = "status/temperature";

    private final Light light;
    private final Co2 co2;
    private final Pump pump;

    private final Mqtt5BlockingClient client;
    private final TemperatureSensor temperatureSensor;

    public SmartAquariumClient(
            final String brokerHost,
            final int brokerPort,
            final Light light,
            final Co2 co2,
            final Pump pump,
            final TemperatureSensor temperatureSensor) {

        this.light = light;
        this.co2 = co2;
        this.pump = pump;
        this.temperatureSensor = temperatureSensor;

        client = Mqtt5Client.builder().serverHost(brokerHost).serverPort(brokerPort).buildBlocking(); // 1
        client.toAsync().publishes(MqttGlobalPublishFilter.ALL, this::accept); // 2
        client.connect(); // 3

        client.subscribeWith().topicFilter(LIGHT_TOPIC).send(); // 4
        client.subscribeWith().topicFilter(CO2_TOPIC).send(); // 4
        client.subscribeWith().topicFilter(PUMP_TOPIC).send(); // 4
    }

    private void accept(final Mqtt5Publish publish) {
        final String payload = new String(publish.getPayloadAsBytes(), StandardCharsets.UTF_8); // 7
        switch (publish.getTopic().toString()) { // 8
            case LIGHT_TOPIC:
                if ("ON".equals(payload)) { // 9
                    light.turnOn();
                } else if ("OFF".equals(payload)) {
                    light.turnOff();
                }
                break;
            case CO2_TOPIC:
                if ("ON".equals(payload)) { // 9
                    co2.turnOn();
                } else if ("OFF".equals(payload)) {
                    co2.turnOff();
                }
                break;
            case PUMP_TOPIC:
                if ("ON".equals(payload)) { // 9
                    pump.turnOn();
                } else if ("OFF".equals(payload)) {
                    pump.turnOff();
                }
                break;
        }
    }

    public void publishTemperature() {
        final String temperatureString = String.format(Locale.US, "%.1f°C", temperatureSensor.getCelsius()); // 5
        client.publishWith().topic(TEMPERATURE_TOPIC).payload(temperatureString.getBytes(StandardCharsets.UTF_8)).send(); // 6
    }
}

The SmartAquariumClient uses the Eclipse Paho client internally and shows the following behavior:

  • creates an eclipse paho client with the given broker Uri and client identifier ‘smartaquarium’ (1)
  • registers itself as Mqtt callback for processing incoming publishes (2)
  • connects to the broker (3)
  • subscribes to the topics for Light, CO2, and Pump control (4)

When publishTemperature() is called the SmartAquariumClient

  • obtains the temperature from the TemperatureSensor (5)
  • publishes the temperature to the topic ‘status/temperature’ (6)

When a publish message is received the SmartAquariumClient

  • retrieves the payload and converts it into an UTF-8 string (7)
  • assigns the message to the respective topics of the equipment pieces (8)
  • decides whether the specific device should be switched off or on (9)

The Light, CO2, Pump and TemperatureSensor interfaces are passed as constructor parameters, so the SmartAquariumClient is not concerned with their implementation and object-lifecycle. The client only knows about their interface methods turnOn(), turnOff() and getCelsius(). This neat decoupling induces great testability and allows us to write robust and elegant tests.

Testing the MQTT application

For testing your MQTT client application you add the following dependencies to your build.gradle:

1
2
3
4
implementation("org.testcontainers:testcontainers-bom:1.17.0")
testImplementation("org.testcontainers:junit-jupiter")
testImplementation("org.testcontainers:hivemq")
testImplementation("org.testcontainers:testcontainers")

Test MQTT message processing

In our test we want to ensure that the SmartAquariumClient interacts with the Pump, CO2 and Light correctly when the respective MQTT messages are received.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
@Testcontainers
class SmartAquariumClientIT {

    @Container
    HiveMQContainer container = new HiveMQContainer(DockerImageName.parse("hivemq/hivemq-ce:latest")); // 1

    private Mqtt3BlockingClient testClient;

    @BeforeEach
    void setUp() {
        testClient = Mqtt3Client.builder()
                .serverPort(container.getMqttPort()).buildBlocking();
        testClient.connect(); // 2
    }

    @Test
    @Timeout(value = 2, unit = TimeUnit.MINUTES)
        // 3
    void test_lightTurnedOn() {

        final Light light = mock(Light.class); // 4
        final Co2 co2 = mock(Co2.class); // 4
        final Pump pump = mock(Pump.class); // 4
        final TemperatureSensor temperatureSensor = mock(TemperatureSensor.class);

        final SmartAquariumClient smartAquariumClient = new SmartAquariumClient(
                container.getHost(),
                container.getMqttPort(),
                light,
                co2,
                pump,
                temperatureSensor);

        testClient.publishWith()
                .topic(SmartAquariumClient.LIGHT_TOPIC)
                .payload("ON".getBytes(StandardCharsets.UTF_8))
                .send(); // 5

        verify(light, timeout(30_000).times(1)).turnOn(); // 6
    }
}

Steps for testing:

  • Register the HiveMQContainer. (1)
  • Build and connect the testClient using the port obtained from the container. (2)
  • Register a timeout of 2 minutes for the test. (3)
  • Since Light, Co2, Pump, and TemperatureSensor are passed as constructor parameters, they can be mocked and handed to SmartAquariumClient. (4)
  • Publish the signal ‘ON’ with the topic ’equipment/light’ with the testClient. (5)
  • Verify that the turnedOn() method of the Light is called exactly once. To avoid a race condition, a timeout must be set that waits for the interaction for a specific amount of time. (6)

Test MQTT message publishing

In the next test, we want to ensure that the publishing of the temperature is working as expected.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
@Testcontainers
class SmartAquariumClientIT {

    @Container
    HiveMQContainer container = new HiveMQContainer(DockerImageName.parse("hivemq/hivemq-ce:latest")); // 1

    private Mqtt3BlockingClient testClient;

    @BeforeEach
    void setUp() {
        testClient = Mqtt3Client.builder()
                .serverPort(container.getMqttPort()).buildBlocking();
        testClient.connect(); // 2
    }

    @Test
    @Timeout(value = 2, unit = TimeUnit.MINUTES) // 3
    void test_publishTemperatureBlocking() throws Exception {

        final Light light = mock(Light.class); // 4
        final Co2 co2 = mock(Co2.class); // 4
        final Pump pump = mock(Pump.class); // 4
        final TemperatureSensor temperatureSensor = mock(TemperatureSensor.class);

        final SmartAquariumClient smartAquariumClient = new SmartAquariumClient(
                container.getHost(),
                container.getMqttPort(),
                light,
                co2,
                pump,
                temperatureSensor);

        testClient.subscribeWith()
                .topicFilter(SmartAquariumClient.TEMPERATURE_TOPIC)
                .qos(MqttQos.EXACTLY_ONCE)
                .send(); // 5

        final Mqtt3BlockingClient.Mqtt3Publishes publishes = testClient.publishes(MqttGlobalPublishFilter.ALL); // 6

        when(temperatureSensor.getCelsius()).thenReturn(13.0f); // 7
        smartAquariumClient.publishTemperature(); // 8

        final Mqtt3Publish mqtt3Publish = publishes.receive(); // 9

        assertNotNull(mqtt3Publish.getPayloadAsBytes());
        final String payload = new String(mqtt3Publish.getPayloadAsBytes(), StandardCharsets.UTF_8);
        assertEquals("13.0°C", payload); // 10
    }
}

Steps for testing:

  • Register the HiveMQContainer. (1)
  • Build and connect the test client using the port obtained from the container. (2)
  • Register a timeout of 2 minutes for the test. (3)
  • Since Light, Co2, Pump, and TemperatureSensor are passed as constructor parameters, they can be mocked and handed to SmartAquariumClient. (4)
  • Subscribe the testClient to the topic where the temperature should be published (5)
  • Listen for publishes with the testClient (6)
  • Use Mockito to make the TemperatureSensor.getCelsius() return ‘13.0f’ (7)
  • Make the SmartAquariumClient publish the temperature (8)
  • Receive the message with the testClient using a blocking call to avoid a race condition here (9)
  • Assert expected results (10)

Conclusion

The following best practices where identified:

  • Design your MQTT client application with testing in mind.
  • Use an explicit timeout in every test so an unresponsive test does not block the entire pipeline.
  • Think about concurrency to avoid race conditions in your tests.

You can find the code for the example over at GitHub.

author Yannick Weber

About Yannick Weber

Yannick is a Senior Software Engineer and one of the core members of HiveMQ’s product development team. He is focusing on quality development of HiveMQ’s many tools and extensions. In addition, he is the maintainer of the HiveMQ module in the testcontainers-java project.

mail icon Contact Yannick
newer posts HiveMQ is now available in Testcontainers
What is the best way to ingest IoT data to Microsoft Azure? older posts