How to test MQTT client applications
Automatic testing of MQTT client applications is a challenging task since an MQTT Broker deployment of some kind is required.
If you use a locally installed MQTT Broker for testing, you can not be sure that the tests run with the same results in a different environment.
For example, when the tests are run on the machine of a coworker or a continuous integration environment.
When you use a public MQTT broker, it is always possible for other clients to interfere with your test.
On top of that, you are fully dependent on the uptime of the public MQTT broker: if the broker is unavailable, your tests fail.
The solution to this problem is to automatically start an exclusive MQTT broker for each integration test and destroy it afterwards.
This solution can be realized with the help of the official HiveMQ Testcontainer.
Requirements
- MQTT application to test
- Docker for running HiveMQ containers
- Java libraries
- JUnit 4 or JUnit 5 as a testing framework
- HiveMQ Testcontainer for starting and stopping HiveMQ containers
- Mockito as a mocking framework
- HiveMQ MQTT Client to create test data
Example MQTT application
As an example, we can test the implementation of the MQTT client that is embedded inside a SmartAquarium.
The implementation has the following functionality that needs to be tested:
- Light: can be turned on and off by MQTT messages with the topic ‘equipment/light’ and the payload ‘ON’ or ‘OFF’.
- CO2 injection: can be turned on and off by MQTT messages with the topic ‘equipment/co2’ and the payload ‘ON’ or ‘OFF’.
- Pump: can be turned on and off by MQTT messages with the topic ‘equipment/pump’ and the payload ‘ON’ or ‘OFF’.
- Temperature Sensor: measures the water temperature and publishes it with the topic ‘status/temperature’.
The java implementation of the SmartAquariumClient looks something like this:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
|
public class SmartAquariumClient implements MqttCallback {
public static final @NotNull String LIGHT_TOPIC = "equipment/light";
public static final @NotNull String CO2_TOPIC = "equipment/co2";
public static final @NotNull String PUMP_TOPIC = "equipment/pump";
public static final @NotNull String TEMPERATURE_TOPIC = "status/temperature";
private final @NotNull Light light;
private final @NotNull Co2 co2;
private final @NotNull Pump pump;
private final @NotNull MqttClient client;
private final @NotNull TemperatureSensor temperatureSensor;
public SmartAquariumClient(
final @NotNull String brokerUri,
final @NotNull Light light,
final @NotNull Co2 co2,
final @NotNull Pump pump,
final @NotNull TemperatureSensor temperatureSensor) throws MqttException {
this.light = light;
this.co2 = co2;
this.pump = pump;
this.temperatureSensor = temperatureSensor;
client = new MqttClient(brokerUri, "smartaquarium"); // 1
client.setCallback(this); // 2
client.connect(); // 3
client.subscribe(LIGHT_TOPIC, 2); // 4
client.subscribe(CO2_TOPIC, 2); // 4
client.subscribe(PUMP_TOPIC, 2); // 4
}
public void publishTemperature() throws MqttException {
final String temperatureString = String.format(Locale.US, "%.1f°C", temperatureSensor.getCelsius());
final MqttMessage temperatureMessage = new MqttMessage(temperatureString.getBytes(StandardCharsets.UTF_8));
client.publish(TEMPERATURE_TOPIC, temperatureMessage); // 6
}
@Override
public void messageArrived(final @NotNull String topic, final @NotNull MqttMessage mqttMessage) {
final String payload = new String(mqttMessage.getPayload(), StandardCharsets.UTF_8); // 7
switch (topic) {
case LIGHT_TOPIC: // 8
if ("ON".equals(payload)) { // 9
light.turnOn();
} else if ("OFF".equals(payload)) {
light.turnOff();
}
break;
case CO2_TOPIC: // 8
if ("ON".equals(payload)) { // 9
co2.turnOn();
} else if ("OFF".equals(payload)) {
co2.turnOff();
}
break;
case PUMP_TOPIC: // 8
if ("ON".equals(payload)) { // 9
pump.turnOn();
} else if ("OFF".equals(payload)) {
pump.turnOff();
}
break;
}
}
}
|
The SmartAquariumClient
uses the Eclipse Paho client internally and shows the following behavior:
- creates an eclipse paho client with the given broker Uri and client identifier ‘smartaquarium’ (1)
- registers itself as Mqtt callback for processing incoming publishes (2)
- connects to the broker (3)
- subscribes to the topics for Light, CO2, and Pump control (4)
When publishTemperature()
is called the SmartAquariumClient
- obtains the temperature from the
TemperatureSensor
(5)
- publishes the temperature to the topic ‘status/temperature’ (6)
When a publish message is received the SmartAquariumClient
- retrieves the payload and converts it into an UTF-8 string (7)
- assigns the message to the respective topics of the equipment pieces (8)
- decides whether the specific device should be switched off or on (9)
The Light
, CO2
, Pump
and TemperatureSensor
interfaces are passed as constructor parameters, so the
SmartAquariumClient
is not concerned with their implementation and object-lifecycle.
The client only knows about their interface methods turnOn()
, turnOff()
and getCelsius()
.
This neat decoupling induces great testability and allows us to write robust and elegant tests.
Testing the MQTT application
For testing your MQTT client application you add the following dependencies to your pom.xml
:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
<dependency>
<groupId>com.hivemq</groupId>
<artifactId>hivemq-testcontainer-junit5</artifactId>
<version>1.0.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.hivemq</groupId>
<artifactId>hivemq-mqtt-client</artifactId>
<version>1.1.4</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<version>5.6.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<version>5.6.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.jetbrains</groupId>
<artifactId>annotations</artifactId>
<version>19.0.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<version>1.10.19</version>
<scope>test</scope>
</dependency>
|
Test MQTT message processing
In our test we want to ensure that the SmartAquariumClient
interacts with the Pump, CO2 and Light
correctly when the respective MQTT messages are received.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
|
class SmartAquariumClientIT {
@RegisterExtension
final @NotNull HiveMQTestContainerExtension container = new HiveMQTestContainerExtension(); // 1
private @NotNull Mqtt3BlockingClient testClient;
@BeforeEach
void setUp() {
testClient = Mqtt3Client.builder()
.serverPort(container.getMqttPort()).buildBlocking();
testClient.connect(); // 2
}
@Test
@Timeout(value = 2, unit = TimeUnit.MINUTES) // 3
void test_lightTurnedOn() throws MqttException {
final Light light = mock(Light.class);
final Co2 co2 = mock(Co2.class);
final Pump pump = mock(Pump.class);
final TemperatureSensor temperatureSensor = mock(TemperatureSensor.class); // 4
final SmartAquariumClient smartAquariumClient = new SmartAquariumClient(
"tcp://localhost:" + container.getMqttPort(),
light,
co2,
pump,
temperatureSensor); // 4
testClient.publishWith()
.topic(SmartAquariumClient.LIGHT_TOPIC)
.payload("ON".getBytes(StandardCharsets.UTF_8))
.send(); // 5
verify(light, timeout(30_000).times(1)).turnOn(); // 6
}
}
|
Steps for testing:
- Register the
HiveMQTestcontainerExtension
. (1)
- Build and connect the
testClient
using the port obtained from the container. (2)
- Register a timeout of 2 minutes for the test. (3)
- Since
Light
, Co2
, Pump
, and TemperatureSensor
are passed as constructor parameters, they can be mocked and handed to SmartAquariumClient
. (4)
- Publish the signal ‘ON’ with the topic ‘equipment/light’ with the
testClient
. (5)
- Verify that the
turnedOn()
method of the Light
is called exactly once.
To avoid a race condition, a timeout must be set that waits for the interaction for a specific amount of time. (6)
Test MQTT message publishing
In the next test, we want to ensure that the publishing of the temperature is working as expected.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
|
class SmartAquariumClientIT {
@RegisterExtension
final @NotNull HiveMQTestContainerExtension container = new HiveMQTestContainerExtension(); // 1
private @NotNull Mqtt3BlockingClient testClient;
@BeforeEach
void setUp() {
testClient = Mqtt3Client.builder()
.serverPort(container.getMqttPort()).buildBlocking();
testClient.connect(); // 2
}
@Test
@Timeout(value = 2, unit = TimeUnit.MINUTES) // 3
void test_publishTemperatureBlocking() throws MqttException, InterruptedException {
final Light light = mock(Light.class);
final Co2 co2 = mock(Co2.class);
final Pump pump = mock(Pump.class);
final TemperatureSensor temperatureSensor = mock(TemperatureSensor.class); // 4
final SmartAquariumClient smartAquariumClient = new SmartAquariumClient(
"tcp://localhost:" + container.getMqttPort(),
light,
co2,
pump,
temperatureSensor); // 4
testClient.subscribeWith()
.topicFilter(SmartAquariumClient.TEMPERATURE_TOPIC)
.qos(MqttQos.EXACTLY_ONCE)
.send(); // 5
final Mqtt3BlockingClient.Mqtt3Publishes publishes = testClient.publishes(MqttGlobalPublishFilter.ALL); // 6
when(temperatureSensor.getCelsius()).thenReturn(13.0f); // 7
smartAquariumClient.publishTemperature(); // 8
final Mqtt3Publish mqtt3Publish = publishes.receive(); // 9
assertNotNull(mqtt3Publish.getPayloadAsBytes()); // 10
final String payload = new String(mqtt3Publish.getPayloadAsBytes(), StandardCharsets.UTF_8);
assertEquals("13.0°C", payload); // 10
}
}
|
Steps for testing:
- Register the
HiveMQTestcontainerExtension
. (1)
- Build and connect the test client using the port obtained from the container. (2)
- Register a timeout of 2 minutes for the test. (3)
- Since
Light
, Co2
, Pump
, and TemperatureSensor
are passed as constructor parameters, they can be mocked and handed to SmartAquariumClient
. (4)
- Subscribe the
testClient
to the topic where the temperature should be published (5)
- Listen for publishes with the
testClient
(6)
- Use Mockito to make the
TemperatureSensor.getCelsius()
return ‘13.0f’ (7)
- Make the
SmartAquariumClient
publish the temperature (8)
- Receive the message with the
testClient
using a blocking call to avoid a race condition here (9)
- Assert expected results (10)
Conclusion
The following best practices where identified:
- Design your MQTT client application with testing in mind.
- Use an explicit timeout in every test so an unresponsive test does not block the entire pipeline.
- Think about concurrency to avoid race conditions in your tests.
You can find the code for the example over at
GitHub.
About Yannick Weber
Yannick is a software developer at HiveMQ and maintainer of the HiveMQ Testcontainer.
Contact Yannick