diff --git a/application/src/test/java/org/thingsboard/server/mqtt/rpc/AbstractMqttServerSideRpcIntegrationTest.java b/application/src/test/java/org/thingsboard/server/mqtt/rpc/AbstractMqttServerSideRpcIntegrationTest.java
index 74ecdb2..7453682 100644
--- a/application/src/test/java/org/thingsboard/server/mqtt/rpc/AbstractMqttServerSideRpcIntegrationTest.java
+++ b/application/src/test/java/org/thingsboard/server/mqtt/rpc/AbstractMqttServerSideRpcIntegrationTest.java
@@ -103,10 +103,10 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractC
options.setUserName(accessToken);
client.connect(options).waitForCompletion();
- TestMqttCallback callback = new TestMqttCallback(client);
- client.setCallback(callback);
CountDownLatch latch = new CountDownLatch(1);
- latch.countDown();
+ TestMqttCallback callback = new TestMqttCallback(client, latch);
+ client.setCallback(callback);
+
client.subscribe("v1/devices/me/rpc/request/+", MqttQoS.AT_MOST_ONCE.value());
String setGpioRequest = "{\"method\":\"setGpio\",\"params\":{\"pin\": \"23\",\"value\": 1}}";
@@ -163,7 +163,7 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractC
options.setUserName(accessToken);
client.connect(options).waitForCompletion();
client.subscribe("v1/devices/me/rpc/request/+", 1);
- client.setCallback(new TestMqttCallback(client));
+ client.setCallback(new TestMqttCallback(client, new CountDownLatch(1)));
String setGpioRequest = "{\"method\":\"setGpio\",\"params\":{\"pin\": \"23\",\"value\": 1}}";
String deviceId = savedDevice.getId().getId().toString();
@@ -211,10 +211,12 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractC
private static class TestMqttCallback implements MqttCallback {
private final MqttAsyncClient client;
+ private final CountDownLatch latch;
private Integer qoS;
- TestMqttCallback(MqttAsyncClient client) {
+ TestMqttCallback(MqttAsyncClient client, CountDownLatch latch) {
this.client = client;
+ this.latch = latch;
}
int getQoS() {
@@ -233,6 +235,7 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractC
message.setPayload("{\"value1\":\"A\", \"value2\":\"B\"}".getBytes("UTF-8"));
qoS = mqttMessage.getQos();
client.publish(responseTopic, message);
+ latch.countDown();
}
@Override
diff --git a/application/src/test/java/org/thingsboard/server/mqtt/telemetry/AbstractMqttTelemetryIntegrationTest.java b/application/src/test/java/org/thingsboard/server/mqtt/telemetry/AbstractMqttTelemetryIntegrationTest.java
index df9601e..f7a7cea 100644
--- a/application/src/test/java/org/thingsboard/server/mqtt/telemetry/AbstractMqttTelemetryIntegrationTest.java
+++ b/application/src/test/java/org/thingsboard/server/mqtt/telemetry/AbstractMqttTelemetryIntegrationTest.java
@@ -105,10 +105,9 @@ public abstract class AbstractMqttTelemetryIntegrationTest extends AbstractContr
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName(accessToken);
client.connect(options).waitForCompletion(3000);
- TestMqttCallback callback = new TestMqttCallback(client);
- client.setCallback(callback);
CountDownLatch latch = new CountDownLatch(1);
- latch.countDown();
+ TestMqttCallback callback = new TestMqttCallback(client, latch);
+ client.setCallback(callback);
client.subscribe("v1/devices/me/attributes", MqttQoS.AT_MOST_ONCE.value());
String payload = "{\"key\":\"value\"}";
String result = doPostAsync("/api/plugins/telemetry/" + savedDevice.getId() + "/SHARED_SCOPE", payload, String.class, status().isOk());
@@ -120,6 +119,7 @@ public abstract class AbstractMqttTelemetryIntegrationTest extends AbstractContr
private static class TestMqttCallback implements MqttCallback {
private final MqttAsyncClient client;
+ private final CountDownLatch latch;
private Integer qoS;
private String payload;
@@ -127,8 +127,9 @@ public abstract class AbstractMqttTelemetryIntegrationTest extends AbstractContr
return payload;
}
- TestMqttCallback(MqttAsyncClient client) {
+ TestMqttCallback(MqttAsyncClient client, CountDownLatch latch) {
this.client = client;
+ this.latch = latch;
}
int getQoS() {
@@ -143,6 +144,7 @@ public abstract class AbstractMqttTelemetryIntegrationTest extends AbstractContr
public void messageArrived(String requestTopic, MqttMessage mqttMessage) {
payload = new String(mqttMessage.getPayload());
qoS = mqttMessage.getQos();
+ latch.countDown();
}
@Override