thingsboard-aplcache
Changes
tools/pom.xml 39(+38 -1)
tools/src/main/resources/logback.xml 34(+34 -0)
tools/test.properties 3(+3 -0)
Details
tools/pom.xml 39(+38 -1)
diff --git a/tools/pom.xml b/tools/pom.xml
index d78c4ca..d96eb53 100644
--- a/tools/pom.xml
+++ b/tools/pom.xml
@@ -78,6 +78,43 @@
<artifactId>mockito-all</artifactId>
<scope>test</scope>
</dependency>
-
</dependencies>
+
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <filters>
+ <filter>
+ <artifact>*:*</artifact>
+ <excludes>
+ <exclude>META-INF/*.SF</exclude>
+ <exclude>META-INF/*.DSA</exclude>
+ <exclude>META-INF/*.RSA</exclude>
+ </excludes>
+ </filter>
+ </filters>
+ <transformers>
+ <transformer
+ implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+ <manifestEntries>
+ <Main-Class>org.thingsboard.client.tools.MqttStressTestTool</Main-Class>
+ </manifestEntries>
+ </transformer>
+ </transformers>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
</project>
diff --git a/tools/src/main/java/org/thingsboard/client/tools/MqttStressTestClient.java b/tools/src/main/java/org/thingsboard/client/tools/MqttStressTestClient.java
index b0ddf73..5805cad 100644
--- a/tools/src/main/java/org/thingsboard/client/tools/MqttStressTestClient.java
+++ b/tools/src/main/java/org/thingsboard/client/tools/MqttStressTestClient.java
@@ -40,10 +40,10 @@ public class MqttStressTestClient {
this.client = new MqttAsyncClient(brokerUri, clientId, persistence);
}
- public void connect() throws MqttException {
+ public IMqttToken connect() throws MqttException {
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName(deviceToken);
- client.connect(options, null, new IMqttActionListener() {
+ return client.connect(options, null, new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken iMqttToken) {
log.info("OnSuccess");
@@ -60,6 +60,22 @@ public class MqttStressTestClient {
client.disconnect();
}
+
+
+ public void warmUp(byte[] data) throws MqttException {
+ MqttMessage msg = new MqttMessage(data);
+ client.publish("v1/devices/me/telemetry", msg, null, new IMqttActionListener() {
+ @Override
+ public void onSuccess(IMqttToken asyncActionToken) {
+ }
+
+ @Override
+ public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
+ }
+ }).waitForCompletion();
+ }
+
+
public void publishTelemetry(byte[] data) throws MqttException {
long sendTime = System.currentTimeMillis();
MqttMessage msg = new MqttMessage(data);
@@ -67,14 +83,12 @@ public class MqttStressTestClient {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
long ackTime = System.currentTimeMillis();
-// log.info("Delivery time: {}", ackTime - sendTime);
results.onResult(true, ackTime - sendTime);
}
@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
long failTime = System.currentTimeMillis();
-// log.info("Failure time: {}", failTime - sendTime);
results.onResult(false, failTime - sendTime);
}
});
diff --git a/tools/src/main/java/org/thingsboard/client/tools/MqttStressTestTool.java b/tools/src/main/java/org/thingsboard/client/tools/MqttStressTestTool.java
index 900d81f..ed6f42b 100644
--- a/tools/src/main/java/org/thingsboard/client/tools/MqttStressTestTool.java
+++ b/tools/src/main/java/org/thingsboard/client/tools/MqttStressTestTool.java
@@ -1,4 +1,4 @@
-package org.thingsboard.client.tools; /**
+/**
* Copyright © 2016 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
@@ -13,14 +13,32 @@ package org.thingsboard.client.tools; /**
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+package org.thingsboard.client.tools; /**
+ * Copyright © 2016 The Thingsboard Authors
+ * <p>
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
import lombok.extern.slf4j.Slf4j;
+import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.security.DeviceCredentials;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
-import java.util.concurrent.TimeUnit;
+import java.util.UUID;
+import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
/**
@@ -29,60 +47,83 @@ import java.util.concurrent.atomic.AtomicLong;
@Slf4j
public class MqttStressTestTool {
- private static final long TEST_DURATION = TimeUnit.MINUTES.toMillis(1);
- private static final long TEST_ITERATION = TimeUnit.MILLISECONDS.toMillis(100);
- private static final long TEST_SUB_ITERATION = TimeUnit.MILLISECONDS.toMillis(2);
- private static final int DEVICE_COUNT = 100;
- private static final String BASE_URL = "http://localhost:8080";
- private static final String[] MQTT_URLS = {"tcp://localhost:1883"};
-// private static final String[] MQTT_URLS = {"tcp://localhost:1883", "tcp://localhost:1884", "tcp://localhost:1885"};
- private static final String USERNAME = "tenant@thingsboard.org";
- private static final String PASSWORD = "tenant";
+ public static void main(String[] args) throws Exception {
+ TestParams params = new TestParams();
+ ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(10);
- public static void main(String[] args) throws Exception {
+ if (params.getDuration() % params.getIterationInterval() != 0) {
+ throw new IllegalArgumentException("Test Duration % Iteration Interval != 0");
+ }
+
+ if ((params.getIterationInterval() * 1000) % params.getDeviceCount() != 0) {
+ throw new IllegalArgumentException("Iteration Interval % Device Count != 0");
+ }
+
ResultAccumulator results = new ResultAccumulator();
AtomicLong value = new AtomicLong(Long.MAX_VALUE);
log.info("value: {} ", value.incrementAndGet());
- RestClient restClient = new RestClient(BASE_URL);
- restClient.login(USERNAME, PASSWORD);
+ RestClient restClient = new RestClient(params.getRestApiUrl());
+ restClient.login(params.getUsername(), params.getPassword());
List<MqttStressTestClient> clients = new ArrayList<>();
- for (int i = 0; i < DEVICE_COUNT; i++) {
- Device device = restClient.createDevice("Device " + i);
+ List<IMqttToken> connectTokens = new ArrayList<>();
+ for (int i = 0; i < params.getDeviceCount(); i++) {
+ Device device = restClient.createDevice("Device " + UUID.randomUUID());
DeviceCredentials credentials = restClient.getCredentials(device.getId());
- String mqttURL = MQTT_URLS[i % MQTT_URLS.length];
+ String[] mqttUrls = params.getMqttUrls();
+ String mqttURL = mqttUrls[i % mqttUrls.length];
MqttStressTestClient client = new MqttStressTestClient(results, mqttURL, credentials.getCredentialsId());
- client.connect();
+ connectTokens.add(client.connect());
clients.add(client);
}
- Thread.sleep(1000);
+ for (IMqttToken tokens : connectTokens) {
+ tokens.waitForCompletion();
+ }
byte[] data = "{\"longKey\":73}".getBytes(StandardCharsets.UTF_8);
- long startTime = System.currentTimeMillis();
- int iterationsCount = (int) (TEST_DURATION / TEST_ITERATION);
- int subIterationsCount = (int) (TEST_ITERATION / TEST_SUB_ITERATION);
- if (clients.size() % subIterationsCount != 0) {
- throw new IllegalArgumentException("Invalid parameter exception!");
+
+ for (MqttStressTestClient client : clients) {
+ client.warmUp(data);
}
+
+ Thread.sleep(1000);
+
+ long startTime = System.currentTimeMillis();
+ int iterationsCount = (int) (params.getDuration() / params.getIterationInterval());
+ int subIterationMicroSeconds = (int) ((params.getIterationInterval() * 1000) / params.getDeviceCount());
+
+ List<ScheduledFuture<Void>> iterationFutures = new ArrayList<>();
for (int i = 0; i < iterationsCount; i++) {
- for (int j = 0; j < subIterationsCount; j++) {
- int packSize = clients.size() / subIterationsCount;
- for (int k = 0; k < packSize; k++) {
- int clientIndex = packSize * j + k;
- clients.get(clientIndex).publishTelemetry(data);
+ long delay = i * params.getIterationInterval();
+ iterationFutures.add(scheduler.schedule((Callable<Void>) () -> {
+ long sleepMicroSeconds = 0L;
+ for (MqttStressTestClient client : clients) {
+ client.publishTelemetry(data);
+ sleepMicroSeconds += subIterationMicroSeconds;
+ if (sleepMicroSeconds > 1000) {
+ Thread.sleep(sleepMicroSeconds / 1000);
+ sleepMicroSeconds = sleepMicroSeconds % 1000;
+ }
}
- Thread.sleep(TEST_SUB_ITERATION);
- }
+ return null;
+ }, delay, TimeUnit.MILLISECONDS));
}
+
+ for (ScheduledFuture<Void> future : iterationFutures) {
+ future.get();
+ }
+
Thread.sleep(1000);
+
for (MqttStressTestClient client : clients) {
client.disconnect();
}
log.info("Results: {} took {}ms", results, System.currentTimeMillis() - startTime);
+ scheduler.shutdownNow();
}
}
diff --git a/tools/src/main/java/org/thingsboard/client/tools/ResultAccumulator.java b/tools/src/main/java/org/thingsboard/client/tools/ResultAccumulator.java
index 5bba82a..1364fc1 100644
--- a/tools/src/main/java/org/thingsboard/client/tools/ResultAccumulator.java
+++ b/tools/src/main/java/org/thingsboard/client/tools/ResultAccumulator.java
@@ -73,7 +73,7 @@ public class ResultAccumulator {
@Override
public String toString() {
- return "org.thingsboard.client.tools.ResultAccumulator{" +
+ return "Result {" +
"successCount=" + getSuccessCount() +
", errorCount=" + getErrorCount() +
", totalTime=" + getTimeSpent() +
diff --git a/tools/src/main/java/org/thingsboard/client/tools/TestParams.java b/tools/src/main/java/org/thingsboard/client/tools/TestParams.java
new file mode 100644
index 0000000..eb1328b
--- /dev/null
+++ b/tools/src/main/java/org/thingsboard/client/tools/TestParams.java
@@ -0,0 +1,73 @@
+/**
+ * Copyright © 2016 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.client.tools;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+@Slf4j
+public class TestParams {
+ static final String TEST_PROPERTIES = "test.properties";
+ static final long DEFAULT_TEST_DURATION = TimeUnit.MINUTES.toMillis(1);
+ static final long DEFAULT_TEST_INTERVAL = TimeUnit.MILLISECONDS.toMillis(100);
+ static final int DEFAULT_DEVICE_COUNT = 100;
+ static final String DEFAULT_REST_URL = "http://localhost:8080";
+ static final String DEFAULT_MQTT_URLS = "tcp://localhost:1883";
+ static final String DEFAULT_USERNAME = "tenant@thingsboard.org";
+ static final String DEFAULT_PASSWORD = "tenant";
+
+ private Properties params = new Properties();
+
+ public TestParams() throws IOException {
+ try {
+ params.load(new FileInputStream(TEST_PROPERTIES));
+ } catch (Exception e) {
+ log.warn("Failed to read " + TEST_PROPERTIES);
+ }
+ }
+
+ public long getDuration() {
+ return Long.valueOf(params.getProperty("durationMs", Long.toString(DEFAULT_TEST_DURATION)));
+ }
+
+ public long getIterationInterval() {
+ return Long.valueOf(params.getProperty("iterationIntervalMs", Long.toString(DEFAULT_TEST_INTERVAL)));
+ }
+
+ public int getDeviceCount() {
+ return Integer.valueOf(params.getProperty("deviceCount", Integer.toString(DEFAULT_DEVICE_COUNT)));
+ }
+
+ public String getRestApiUrl() {
+ return params.getProperty("restUrl", DEFAULT_REST_URL);
+ }
+
+ public String[] getMqttUrls() {
+ return params.getProperty("mqttUrls", DEFAULT_MQTT_URLS).split(",");
+ }
+
+ public String getUsername() {
+ return params.getProperty("username", DEFAULT_USERNAME);
+ }
+
+ public String getPassword() {
+ return params.getProperty("password", DEFAULT_PASSWORD);
+ }
+}
tools/src/main/resources/logback.xml 34(+34 -0)
diff --git a/tools/src/main/resources/logback.xml b/tools/src/main/resources/logback.xml
new file mode 100644
index 0000000..11973fa
--- /dev/null
+++ b/tools/src/main/resources/logback.xml
@@ -0,0 +1,34 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+
+ Copyright © 2016 The Thingsboard Authors
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+-->
+<!DOCTYPE configuration>
+<configuration>
+
+ <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <pattern>%d{ISO8601} [%thread] %-5level %logger{36} - %msg%n</pattern>
+ </encoder>
+ </appender>
+
+ <logger name="org.thingsboard" level="INFO" />
+
+ <root level="INFO">
+ <appender-ref ref="STDOUT"/>
+ </root>
+
+</configuration>
\ No newline at end of file
tools/test.properties 3(+3 -0)
diff --git a/tools/test.properties b/tools/test.properties
new file mode 100644
index 0000000..93efc60
--- /dev/null
+++ b/tools/test.properties
@@ -0,0 +1,3 @@
+deviceCount=1000
+durationMs=5000
+iterationIntervalMs=250