How to Test MQTT Client Applications

Written by Yannick Weber

Category: HiveMQ Testing HiveMQ MQTT Client MQTT Client

Published: April 29, 2020


How to test MQTT client applications

Automatic testing of MQTT client applications is a challenging task since an MQTT Broker deployment of some kind is required. If you use a locally installed MQTT Broker for testing, you can not be sure that the tests run with the same results in a different environment. For example, when the tests are run on the machine of a coworker or a continuous integration environment. When you use a public MQTT broker, it is always possible for other clients to interfere with your test. On top of that, you are fully dependent on the uptime of the public MQTT broker: if the broker is unavailable, your tests fail. The solution to this problem is to automatically start an exclusive MQTT broker for each integration test and destroy it afterwards. This solution can be realized with the help of the official HiveMQ Testcontainer.

Requirements

  • MQTT application to test
  • Docker for running HiveMQ containers
  • Java libraries
    • JUnit 4 or JUnit 5 as a testing framework
    • HiveMQ Testcontainer for starting and stopping HiveMQ containers
    • Mockito as a mocking framework
    • HiveMQ MQTT Client to create test data

Example MQTT application

As an example, we can test the implementation of the MQTT client that is embedded inside a SmartAquarium. The implementation has the following functionality that needs to be tested:

  • Light: can be turned on and off by MQTT messages with the topic ‘equipment/light’ and the payload ‘ON’ or ‘OFF’.
  • CO2 injection: can be turned on and off by MQTT messages with the topic ‘equipment/co2’ and the payload ‘ON’ or ‘OFF’.
  • Pump: can be turned on and off by MQTT messages with the topic ‘equipment/pump’ and the payload ‘ON’ or ‘OFF’.
  • Temperature Sensor: measures the water temperature and publishes it with the topic ‘status/temperature’.

The java implementation of the SmartAquariumClient looks something like this:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
public class SmartAquariumClient implements MqttCallback {

    public static final @NotNull String LIGHT_TOPIC = "equipment/light";
    public static final @NotNull String CO2_TOPIC = "equipment/co2";
    public static final @NotNull String PUMP_TOPIC = "equipment/pump";
    public static final @NotNull String TEMPERATURE_TOPIC = "status/temperature";

    private final @NotNull Light light;
    private final @NotNull Co2 co2;
    private final @NotNull Pump pump;

    private final @NotNull MqttClient client;
    private final @NotNull TemperatureSensor temperatureSensor;

    public SmartAquariumClient(
            final @NotNull String brokerUri,
            final @NotNull Light light,
            final @NotNull Co2 co2,
            final @NotNull Pump pump,
            final @NotNull TemperatureSensor temperatureSensor) throws MqttException {

        this.light = light;
        this.co2 = co2;
        this.pump = pump;
        this.temperatureSensor = temperatureSensor;

        client = new MqttClient(brokerUri, "smartaquarium"); // 1
        client.setCallback(this); // 2
        client.connect(); // 3

        client.subscribe(LIGHT_TOPIC, 2); // 4
        client.subscribe(CO2_TOPIC, 2); // 4
        client.subscribe(PUMP_TOPIC, 2); // 4
    }

    public void publishTemperature() throws MqttException {
        final String temperatureString = String.format(Locale.US, "%.1f°C", temperatureSensor.getCelsius());
        final MqttMessage temperatureMessage = new MqttMessage(temperatureString.getBytes(StandardCharsets.UTF_8));
        client.publish(TEMPERATURE_TOPIC, temperatureMessage); // 6
    }

    @Override
    public void messageArrived(final @NotNull String topic, final @NotNull MqttMessage mqttMessage) { 
        final String payload = new String(mqttMessage.getPayload(), StandardCharsets.UTF_8); // 7 
        switch (topic) {
            case LIGHT_TOPIC: // 8
                if ("ON".equals(payload)) { // 9
                    light.turnOn();
                } else if ("OFF".equals(payload)) {
                    light.turnOff();
                }
                break;
            case CO2_TOPIC: // 8
                if ("ON".equals(payload)) { // 9
                    co2.turnOn();
                } else if ("OFF".equals(payload)) {
                    co2.turnOff();
                }
                break;
            case PUMP_TOPIC: // 8
                if ("ON".equals(payload)) { // 9
                    pump.turnOn();
                } else if ("OFF".equals(payload)) {
                    pump.turnOff();
                }
                break;
        }
    }
}

The SmartAquariumClient uses the Eclipse Paho client internally and shows the following behavior:

  • creates an eclipse paho client with the given broker Uri and client identifier ‘smartaquarium’ (1)
  • registers itself as Mqtt callback for processing incoming publishes (2)
  • connects to the broker (3)
  • subscribes to the topics for Light, CO2, and Pump control (4)

When publishTemperature() is called the SmartAquariumClient

  • obtains the temperature from the TemperatureSensor (5)
  • publishes the temperature to the topic ‘status/temperature’ (6)

When a publish message is received the SmartAquariumClient

  • retrieves the payload and converts it into an UTF-8 string (7)
  • assigns the message to the respective topics of the equipment pieces (8)
  • decides whether the specific device should be switched off or on (9)

The Light, CO2, Pump and TemperatureSensor interfaces are passed as constructor parameters, so the SmartAquariumClient is not concerned with their implementation and object-lifecycle. The client only knows about their interface methods turnOn(), turnOff() and getCelsius(). This neat decoupling induces great testability and allows us to write robust and elegant tests.

Testing the MQTT application

For testing your MQTT client application you add the following dependencies to your pom.xml:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
<dependency>
    <groupId>com.hivemq</groupId>
    <artifactId>hivemq-testcontainer-junit5</artifactId>
    <version>1.0.0</version>
    <scope>test</scope>
</dependency>
<dependency>
    <groupId>com.hivemq</groupId>
    <artifactId>hivemq-mqtt-client</artifactId>
    <version>1.1.4</version>
    <scope>test</scope>
</dependency>
<dependency>
    <groupId>org.junit.jupiter</groupId>
    <artifactId>junit-jupiter-engine</artifactId>
    <version>5.6.1</version>
<scope>test</scope>
</dependency>
<dependency>
    <groupId>org.junit.jupiter</groupId>
    <artifactId>junit-jupiter-api</artifactId>
    <version>5.6.1</version>
    <scope>test</scope>
</dependency>
<dependency>
    <groupId>org.jetbrains</groupId>
    <artifactId>annotations</artifactId>
    <version>19.0.0</version>
<scope>compile</scope>
</dependency>
<dependency>
    <groupId>org.mockito</groupId>
    <artifactId>mockito-all</artifactId>
    <version>1.10.19</version>
    <scope>test</scope>
</dependency>

Test MQTT message processing

In our test we want to ensure that the SmartAquariumClient interacts with the Pump, CO2 and Light correctly when the respective MQTT messages are received.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
class SmartAquariumClientIT {

    @RegisterExtension
    final @NotNull HiveMQTestContainerExtension container = new HiveMQTestContainerExtension(); // 1

    private @NotNull Mqtt3BlockingClient testClient;

    @BeforeEach
    void setUp() {
        testClient = Mqtt3Client.builder()
                .serverPort(container.getMqttPort()).buildBlocking(); 
        testClient.connect(); // 2
    }

    @Test
    @Timeout(value = 2, unit = TimeUnit.MINUTES) // 3
    void test_lightTurnedOn() throws MqttException {

        final Light light = mock(Light.class);
        final Co2 co2 = mock(Co2.class);
        final Pump pump = mock(Pump.class);
        final TemperatureSensor temperatureSensor = mock(TemperatureSensor.class); // 4

        final SmartAquariumClient smartAquariumClient = new SmartAquariumClient(
                "tcp://localhost:" + container.getMqttPort(),
                light,
                co2,
                pump,
                temperatureSensor); // 4

        testClient.publishWith()
                .topic(SmartAquariumClient.LIGHT_TOPIC)
                .payload("ON".getBytes(StandardCharsets.UTF_8))
                .send(); // 5

        verify(light, timeout(30_000).times(1)).turnOn(); // 6
    }
}
    

Steps for testing:

  • Register the HiveMQTestcontainerExtension. (1)
  • Build and connect the testClient using the port obtained from the container. (2)
  • Register a timeout of 2 minutes for the test. (3)
  • Since Light, Co2, Pump, and TemperatureSensor are passed as constructor parameters, they can be mocked and handed to SmartAquariumClient. (4)
  • Publish the signal ‘ON’ with the topic ‘equipment/light’ with the testClient. (5)
  • Verify that the turnedOn() method of the Light is called exactly once. To avoid a race condition, a timeout must be set that waits for the interaction for a specific amount of time. (6)

Test MQTT message publishing

In the next test, we want to ensure that the publishing of the temperature is working as expected.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
class SmartAquariumClientIT {

    @RegisterExtension
    final @NotNull HiveMQTestContainerExtension container = new HiveMQTestContainerExtension(); // 1

    private @NotNull Mqtt3BlockingClient testClient;

    @BeforeEach
    void setUp() {
        testClient = Mqtt3Client.builder()
                .serverPort(container.getMqttPort()).buildBlocking(); 
        testClient.connect(); // 2
    }

    @Test
    @Timeout(value = 2, unit = TimeUnit.MINUTES) // 3
    void test_publishTemperatureBlocking() throws MqttException, InterruptedException {

        final Light light = mock(Light.class);
        final Co2 co2 = mock(Co2.class);
        final Pump pump = mock(Pump.class);
        final TemperatureSensor temperatureSensor = mock(TemperatureSensor.class); // 4

        final SmartAquariumClient smartAquariumClient = new SmartAquariumClient(
                "tcp://localhost:" + container.getMqttPort(),
                light,
                co2,
                pump,
                temperatureSensor); // 4

        testClient.subscribeWith()
                .topicFilter(SmartAquariumClient.TEMPERATURE_TOPIC)
                .qos(MqttQos.EXACTLY_ONCE)
                .send(); // 5
                
        final Mqtt3BlockingClient.Mqtt3Publishes publishes = testClient.publishes(MqttGlobalPublishFilter.ALL); // 6

        when(temperatureSensor.getCelsius()).thenReturn(13.0f); // 7
        smartAquariumClient.publishTemperature(); // 8

        final Mqtt3Publish mqtt3Publish = publishes.receive(); // 9

        assertNotNull(mqtt3Publish.getPayloadAsBytes()); // 10
        final String payload = new String(mqtt3Publish.getPayloadAsBytes(), StandardCharsets.UTF_8);
        assertEquals("13.0°C", payload); // 10

    }
}

Steps for testing:

  • Register the HiveMQTestcontainerExtension. (1)
  • Build and connect the test client using the port obtained from the container. (2)
  • Register a timeout of 2 minutes for the test. (3)
  • Since Light, Co2, Pump, and TemperatureSensor are passed as constructor parameters, they can be mocked and handed to SmartAquariumClient. (4)
  • Subscribe the testClient to the topic where the temperature should be published (5)
  • Listen for publishes with the testClient (6)
  • Use Mockito to make the TemperatureSensor.getCelsius() return ‘13.0f’ (7)
  • Make the SmartAquariumClient publish the temperature (8)
  • Receive the message with the testClient using a blocking call to avoid a race condition here (9)
  • Assert expected results (10)

Conclusion

The following best practices where identified:

  • Design your MQTT client application with testing in mind.
  • Use an explicit timeout in every test so an unresponsive test does not block the entire pipeline.
  • Think about concurrency to avoid race conditions in your tests.

You can find the code for the example over at GitHub.

About Yannick Weber

Yannick is a software developer at HiveMQ and maintainer of the HiveMQ Testcontainer.
Contact Yannick

IoT on the Edge with HiveMQ
7 Day Free Trial of HiveMQ Cloud