thingsboard-aplcache

Mqtt stress test tool

12/15/2016 9:52:44 AM

Details

tools/pom.xml 39(+38 -1)

diff --git a/tools/pom.xml b/tools/pom.xml
index d78c4ca..d96eb53 100644
--- a/tools/pom.xml
+++ b/tools/pom.xml
@@ -78,6 +78,43 @@
             <artifactId>mockito-all</artifactId>
             <scope>test</scope>
         </dependency>
-
     </dependencies>
+
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                        <configuration>
+                            <filters>
+                                <filter>
+                                    <artifact>*:*</artifact>
+                                    <excludes>
+                                        <exclude>META-INF/*.SF</exclude>
+                                        <exclude>META-INF/*.DSA</exclude>
+                                        <exclude>META-INF/*.RSA</exclude>
+                                    </excludes>
+                                </filter>
+                            </filters>
+                            <transformers>
+                                <transformer
+                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+                                    <manifestEntries>
+                                        <Main-Class>org.thingsboard.client.tools.MqttStressTestTool</Main-Class>
+                                    </manifestEntries>
+                                </transformer>
+                            </transformers>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
 </project>
diff --git a/tools/src/main/java/org/thingsboard/client/tools/MqttStressTestClient.java b/tools/src/main/java/org/thingsboard/client/tools/MqttStressTestClient.java
index b0ddf73..5805cad 100644
--- a/tools/src/main/java/org/thingsboard/client/tools/MqttStressTestClient.java
+++ b/tools/src/main/java/org/thingsboard/client/tools/MqttStressTestClient.java
@@ -40,10 +40,10 @@ public class MqttStressTestClient {
         this.client = new MqttAsyncClient(brokerUri, clientId, persistence);
     }
 
-    public void connect() throws MqttException {
+    public IMqttToken connect() throws MqttException {
         MqttConnectOptions options = new MqttConnectOptions();
         options.setUserName(deviceToken);
-        client.connect(options, null, new IMqttActionListener() {
+        return client.connect(options, null, new IMqttActionListener() {
             @Override
             public void onSuccess(IMqttToken iMqttToken) {
                 log.info("OnSuccess");
@@ -60,6 +60,22 @@ public class MqttStressTestClient {
         client.disconnect();
     }
 
+
+
+    public void warmUp(byte[] data) throws MqttException {
+        MqttMessage msg = new MqttMessage(data);
+        client.publish("v1/devices/me/telemetry", msg, null, new IMqttActionListener() {
+            @Override
+            public void onSuccess(IMqttToken asyncActionToken) {
+            }
+
+            @Override
+            public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
+            }
+        }).waitForCompletion();
+    }
+
+
     public void publishTelemetry(byte[] data) throws MqttException {
         long sendTime = System.currentTimeMillis();
         MqttMessage msg = new MqttMessage(data);
@@ -67,14 +83,12 @@ public class MqttStressTestClient {
             @Override
             public void onSuccess(IMqttToken asyncActionToken) {
                 long ackTime = System.currentTimeMillis();
-//                log.info("Delivery time: {}", ackTime - sendTime);
                 results.onResult(true, ackTime - sendTime);
             }
 
             @Override
             public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
                 long failTime = System.currentTimeMillis();
-//                log.info("Failure time: {}", failTime - sendTime);
                 results.onResult(false, failTime - sendTime);
             }
         });
diff --git a/tools/src/main/java/org/thingsboard/client/tools/MqttStressTestTool.java b/tools/src/main/java/org/thingsboard/client/tools/MqttStressTestTool.java
index 900d81f..ed6f42b 100644
--- a/tools/src/main/java/org/thingsboard/client/tools/MqttStressTestTool.java
+++ b/tools/src/main/java/org/thingsboard/client/tools/MqttStressTestTool.java
@@ -1,4 +1,4 @@
-package org.thingsboard.client.tools; /**
+/**
  * Copyright © 2016 The Thingsboard Authors
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
@@ -13,14 +13,32 @@ package org.thingsboard.client.tools; /**
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.thingsboard.client.tools; /**
+ * Copyright © 2016 The Thingsboard Authors
+ * <p>
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 import lombok.extern.slf4j.Slf4j;
+import org.eclipse.paho.client.mqttv3.IMqttToken;
 import org.thingsboard.server.common.data.Device;
 import org.thingsboard.server.common.data.security.DeviceCredentials;
 
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.TimeUnit;
+import java.util.UUID;
+import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicLong;
 
 /**
@@ -29,60 +47,83 @@ import java.util.concurrent.atomic.AtomicLong;
 @Slf4j
 public class MqttStressTestTool {
 
-    private static final long TEST_DURATION = TimeUnit.MINUTES.toMillis(1);
-    private static final long TEST_ITERATION = TimeUnit.MILLISECONDS.toMillis(100);
-    private static final long TEST_SUB_ITERATION = TimeUnit.MILLISECONDS.toMillis(2);
-    private static final int DEVICE_COUNT = 100;
-    private static final String BASE_URL = "http://localhost:8080";
-    private static final String[] MQTT_URLS = {"tcp://localhost:1883"};
-//    private static final String[] MQTT_URLS = {"tcp://localhost:1883", "tcp://localhost:1884", "tcp://localhost:1885"};
-    private static final String USERNAME = "tenant@thingsboard.org";
-    private static final String PASSWORD = "tenant";
+    public static void main(String[] args) throws Exception {
+        TestParams params = new TestParams();
+        ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(10);
 
 
-    public static void main(String[] args) throws Exception {
+        if (params.getDuration() % params.getIterationInterval() != 0) {
+            throw new IllegalArgumentException("Test Duration % Iteration Interval != 0");
+        }
+
+        if ((params.getIterationInterval() * 1000) % params.getDeviceCount() != 0) {
+            throw new IllegalArgumentException("Iteration Interval % Device Count != 0");
+        }
+
         ResultAccumulator results = new ResultAccumulator();
 
         AtomicLong value = new AtomicLong(Long.MAX_VALUE);
         log.info("value: {} ", value.incrementAndGet());
 
-        RestClient restClient = new RestClient(BASE_URL);
-        restClient.login(USERNAME, PASSWORD);
+        RestClient restClient = new RestClient(params.getRestApiUrl());
+        restClient.login(params.getUsername(), params.getPassword());
 
         List<MqttStressTestClient> clients = new ArrayList<>();
-        for (int i = 0; i < DEVICE_COUNT; i++) {
-            Device device = restClient.createDevice("Device " + i);
+        List<IMqttToken> connectTokens = new ArrayList<>();
+        for (int i = 0; i < params.getDeviceCount(); i++) {
+            Device device = restClient.createDevice("Device " + UUID.randomUUID());
             DeviceCredentials credentials = restClient.getCredentials(device.getId());
-            String mqttURL = MQTT_URLS[i % MQTT_URLS.length];
+            String[] mqttUrls = params.getMqttUrls();
+            String mqttURL = mqttUrls[i % mqttUrls.length];
             MqttStressTestClient client = new MqttStressTestClient(results, mqttURL, credentials.getCredentialsId());
-            client.connect();
+            connectTokens.add(client.connect());
             clients.add(client);
         }
-        Thread.sleep(1000);
 
+        for (IMqttToken tokens : connectTokens) {
+            tokens.waitForCompletion();
+        }
 
         byte[] data = "{\"longKey\":73}".getBytes(StandardCharsets.UTF_8);
-        long startTime = System.currentTimeMillis();
-        int iterationsCount = (int) (TEST_DURATION / TEST_ITERATION);
-        int subIterationsCount = (int) (TEST_ITERATION / TEST_SUB_ITERATION);
-        if (clients.size() % subIterationsCount != 0) {
-            throw new IllegalArgumentException("Invalid parameter exception!");
+
+        for (MqttStressTestClient client : clients) {
+            client.warmUp(data);
         }
+
+        Thread.sleep(1000);
+
+        long startTime = System.currentTimeMillis();
+        int iterationsCount = (int) (params.getDuration() / params.getIterationInterval());
+        int subIterationMicroSeconds = (int) ((params.getIterationInterval() * 1000) / params.getDeviceCount());
+
+        List<ScheduledFuture<Void>> iterationFutures = new ArrayList<>();
         for (int i = 0; i < iterationsCount; i++) {
-            for (int j = 0; j < subIterationsCount; j++) {
-                int packSize = clients.size() / subIterationsCount;
-                for (int k = 0; k < packSize; k++) {
-                    int clientIndex = packSize * j + k;
-                    clients.get(clientIndex).publishTelemetry(data);
+            long delay = i * params.getIterationInterval();
+            iterationFutures.add(scheduler.schedule((Callable<Void>) () -> {
+                long sleepMicroSeconds = 0L;
+                for (MqttStressTestClient client : clients) {
+                    client.publishTelemetry(data);
+                    sleepMicroSeconds += subIterationMicroSeconds;
+                    if (sleepMicroSeconds > 1000) {
+                        Thread.sleep(sleepMicroSeconds / 1000);
+                        sleepMicroSeconds = sleepMicroSeconds % 1000;
+                    }
                 }
-                Thread.sleep(TEST_SUB_ITERATION);
-            }
+                return null;
+            }, delay, TimeUnit.MILLISECONDS));
         }
+
+        for (ScheduledFuture<Void> future : iterationFutures) {
+            future.get();
+        }
+
         Thread.sleep(1000);
+
         for (MqttStressTestClient client : clients) {
             client.disconnect();
         }
         log.info("Results: {} took {}ms", results, System.currentTimeMillis() - startTime);
+        scheduler.shutdownNow();
     }
 
 }
diff --git a/tools/src/main/java/org/thingsboard/client/tools/ResultAccumulator.java b/tools/src/main/java/org/thingsboard/client/tools/ResultAccumulator.java
index 5bba82a..1364fc1 100644
--- a/tools/src/main/java/org/thingsboard/client/tools/ResultAccumulator.java
+++ b/tools/src/main/java/org/thingsboard/client/tools/ResultAccumulator.java
@@ -73,7 +73,7 @@ public class ResultAccumulator {
 
     @Override
     public String toString() {
-        return "org.thingsboard.client.tools.ResultAccumulator{" +
+        return "Result {" +
                 "successCount=" + getSuccessCount() +
                 ", errorCount=" + getErrorCount() +
                 ", totalTime=" + getTimeSpent() +
diff --git a/tools/src/main/java/org/thingsboard/client/tools/TestParams.java b/tools/src/main/java/org/thingsboard/client/tools/TestParams.java
new file mode 100644
index 0000000..eb1328b
--- /dev/null
+++ b/tools/src/main/java/org/thingsboard/client/tools/TestParams.java
@@ -0,0 +1,73 @@
+/**
+ * Copyright © 2016 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.client.tools;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+@Slf4j
+public class TestParams {
+    static final String TEST_PROPERTIES = "test.properties";
+    static final long DEFAULT_TEST_DURATION = TimeUnit.MINUTES.toMillis(1);
+    static final long DEFAULT_TEST_INTERVAL = TimeUnit.MILLISECONDS.toMillis(100);
+    static final int DEFAULT_DEVICE_COUNT = 100;
+    static final String DEFAULT_REST_URL = "http://localhost:8080";
+    static final String DEFAULT_MQTT_URLS = "tcp://localhost:1883";
+    static final String DEFAULT_USERNAME = "tenant@thingsboard.org";
+    static final String DEFAULT_PASSWORD = "tenant";
+
+    private Properties params = new Properties();
+
+    public TestParams() throws IOException {
+        try {
+            params.load(new FileInputStream(TEST_PROPERTIES));
+        } catch (Exception e) {
+            log.warn("Failed to read " + TEST_PROPERTIES);
+        }
+    }
+
+    public long getDuration() {
+        return Long.valueOf(params.getProperty("durationMs", Long.toString(DEFAULT_TEST_DURATION)));
+    }
+
+    public long getIterationInterval() {
+        return Long.valueOf(params.getProperty("iterationIntervalMs", Long.toString(DEFAULT_TEST_INTERVAL)));
+    }
+
+    public int getDeviceCount() {
+        return Integer.valueOf(params.getProperty("deviceCount", Integer.toString(DEFAULT_DEVICE_COUNT)));
+    }
+
+    public String getRestApiUrl() {
+        return params.getProperty("restUrl", DEFAULT_REST_URL);
+    }
+
+    public String[] getMqttUrls() {
+        return params.getProperty("mqttUrls", DEFAULT_MQTT_URLS).split(",");
+    }
+
+    public String getUsername() {
+        return params.getProperty("username", DEFAULT_USERNAME);
+    }
+
+    public String getPassword() {
+        return params.getProperty("password", DEFAULT_PASSWORD);
+    }
+}
diff --git a/tools/src/main/resources/logback.xml b/tools/src/main/resources/logback.xml
new file mode 100644
index 0000000..11973fa
--- /dev/null
+++ b/tools/src/main/resources/logback.xml
@@ -0,0 +1,34 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+
+    Copyright © 2016 The Thingsboard Authors
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+        http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+
+-->
+<!DOCTYPE configuration>
+<configuration>
+
+    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+        <encoder>
+            <pattern>%d{ISO8601} [%thread] %-5level %logger{36} - %msg%n</pattern>
+        </encoder>
+    </appender>
+
+    <logger name="org.thingsboard" level="INFO" />
+
+    <root level="INFO">
+        <appender-ref ref="STDOUT"/>
+    </root>
+
+</configuration>
\ No newline at end of file
diff --git a/tools/test.properties b/tools/test.properties
new file mode 100644
index 0000000..93efc60
--- /dev/null
+++ b/tools/test.properties
@@ -0,0 +1,3 @@
+deviceCount=1000
+durationMs=5000
+iterationIntervalMs=250