thingsboard-aplcache

MQTT tests: fix CountDown latch

10/10/2018 2:00:33 PM

Details

diff --git a/application/src/test/java/org/thingsboard/server/mqtt/rpc/AbstractMqttServerSideRpcIntegrationTest.java b/application/src/test/java/org/thingsboard/server/mqtt/rpc/AbstractMqttServerSideRpcIntegrationTest.java
index 74ecdb2..7453682 100644
--- a/application/src/test/java/org/thingsboard/server/mqtt/rpc/AbstractMqttServerSideRpcIntegrationTest.java
+++ b/application/src/test/java/org/thingsboard/server/mqtt/rpc/AbstractMqttServerSideRpcIntegrationTest.java
@@ -103,10 +103,10 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractC
         options.setUserName(accessToken);
         client.connect(options).waitForCompletion();
 
-        TestMqttCallback callback = new TestMqttCallback(client);
-        client.setCallback(callback);
         CountDownLatch latch = new CountDownLatch(1);
-        latch.countDown();
+        TestMqttCallback callback = new TestMqttCallback(client, latch);
+        client.setCallback(callback);
+
         client.subscribe("v1/devices/me/rpc/request/+", MqttQoS.AT_MOST_ONCE.value());
 
         String setGpioRequest = "{\"method\":\"setGpio\",\"params\":{\"pin\": \"23\",\"value\": 1}}";
@@ -163,7 +163,7 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractC
         options.setUserName(accessToken);
         client.connect(options).waitForCompletion();
         client.subscribe("v1/devices/me/rpc/request/+", 1);
-        client.setCallback(new TestMqttCallback(client));
+        client.setCallback(new TestMqttCallback(client, new CountDownLatch(1)));
 
         String setGpioRequest = "{\"method\":\"setGpio\",\"params\":{\"pin\": \"23\",\"value\": 1}}";
         String deviceId = savedDevice.getId().getId().toString();
@@ -211,10 +211,12 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractC
     private static class TestMqttCallback implements MqttCallback {
 
         private final MqttAsyncClient client;
+        private final CountDownLatch latch;
         private Integer qoS;
 
-        TestMqttCallback(MqttAsyncClient client) {
+        TestMqttCallback(MqttAsyncClient client, CountDownLatch latch) {
             this.client = client;
+            this.latch = latch;
         }
 
         int getQoS() {
@@ -233,6 +235,7 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractC
             message.setPayload("{\"value1\":\"A\", \"value2\":\"B\"}".getBytes("UTF-8"));
             qoS = mqttMessage.getQos();
             client.publish(responseTopic, message);
+            latch.countDown();
         }
 
         @Override
diff --git a/application/src/test/java/org/thingsboard/server/mqtt/telemetry/AbstractMqttTelemetryIntegrationTest.java b/application/src/test/java/org/thingsboard/server/mqtt/telemetry/AbstractMqttTelemetryIntegrationTest.java
index df9601e..f7a7cea 100644
--- a/application/src/test/java/org/thingsboard/server/mqtt/telemetry/AbstractMqttTelemetryIntegrationTest.java
+++ b/application/src/test/java/org/thingsboard/server/mqtt/telemetry/AbstractMqttTelemetryIntegrationTest.java
@@ -105,10 +105,9 @@ public abstract class AbstractMqttTelemetryIntegrationTest extends AbstractContr
         MqttConnectOptions options = new MqttConnectOptions();
         options.setUserName(accessToken);
         client.connect(options).waitForCompletion(3000);
-        TestMqttCallback callback = new TestMqttCallback(client);
-        client.setCallback(callback);
         CountDownLatch latch = new CountDownLatch(1);
-        latch.countDown();
+        TestMqttCallback callback = new TestMqttCallback(client, latch);
+        client.setCallback(callback);
         client.subscribe("v1/devices/me/attributes", MqttQoS.AT_MOST_ONCE.value());
         String payload = "{\"key\":\"value\"}";
         String result = doPostAsync("/api/plugins/telemetry/" + savedDevice.getId() + "/SHARED_SCOPE", payload, String.class, status().isOk());
@@ -120,6 +119,7 @@ public abstract class AbstractMqttTelemetryIntegrationTest extends AbstractContr
     private static class TestMqttCallback implements MqttCallback {
 
         private final MqttAsyncClient client;
+        private final CountDownLatch latch;
         private Integer qoS;
         private String payload;
 
@@ -127,8 +127,9 @@ public abstract class AbstractMqttTelemetryIntegrationTest extends AbstractContr
             return payload;
         }
 
-        TestMqttCallback(MqttAsyncClient client) {
+        TestMqttCallback(MqttAsyncClient client, CountDownLatch latch) {
             this.client = client;
+            this.latch = latch;
         }
 
         int getQoS() {
@@ -143,6 +144,7 @@ public abstract class AbstractMqttTelemetryIntegrationTest extends AbstractContr
         public void messageArrived(String requestTopic, MqttMessage mqttMessage) {
             payload = new String(mqttMessage.getPayload());
             qoS = mqttMessage.getQos();
+            latch.countDown();
         }
 
         @Override