thingsboard-aplcache
Changes
msa/integration-tests/pom.xml 10(+8 -2)
msa/integration-tests/README.md 2(+1 -1)
msa/integration-tests/src/test/java/org/thingsboard/server/msa/AbstractContainerTest.java 113(+88 -25)
msa/integration-tests/src/test/java/org/thingsboard/server/msa/connectivity/HttpClientTest.java 14(+8 -6)
msa/integration-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java 304(+291 -13)
msa/integration-tests/src/test/java/org/thingsboard/server/msa/mapper/AttributesResponse.java 26(+26 -0)
Details
msa/integration-tests/pom.xml 10(+8 -2)
diff --git a/msa/integration-tests/pom.xml b/msa/integration-tests/pom.xml
index a1c24f3..a9c67cc 100644
--- a/msa/integration-tests/pom.xml
+++ b/msa/integration-tests/pom.xml
@@ -34,9 +34,10 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<main.dir>${basedir}/../..</main.dir>
- <integrationtests.skip>true</integrationtests.skip>
+ <integrationTests.skip>true</integrationTests.skip>
<testcontainers.version>1.9.1</testcontainers.version>
<java-websocket.version>1.3.9</java-websocket.version>
+ <httpclient.version>4.5.6</httpclient.version>
</properties>
<dependencies>
@@ -51,6 +52,11 @@
<version>${java-websocket.version}</version>
</dependency>
<dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpclient</artifactId>
+ <version>${httpclient.version}</version>
+ </dependency>
+ <dependency>
<groupId>io.takari.junit</groupId>
<artifactId>takari-cpsuite</artifactId>
</dependency>
@@ -89,7 +95,7 @@
<includes>
<include>**/*TestSuite.java</include>
</includes>
- <skipTests>${integrationtests.skip}</skipTests>
+ <skipTests>${integrationTests.skip}</skipTests>
</configuration>
</plugin>
</plugins>
msa/integration-tests/README.md 2(+1 -1)
diff --git a/msa/integration-tests/README.md b/msa/integration-tests/README.md
index 5ae6354..93df2bf 100644
--- a/msa/integration-tests/README.md
+++ b/msa/integration-tests/README.md
@@ -15,4 +15,4 @@ As result, in REPOSITORY column, next images should be present:
- Run the integration tests in the [msa/integration-tests](../integration-tests) directory:
- mvn clean install -Dintegrationtests.skip=false
\ No newline at end of file
+ mvn clean install -DintegrationTests.skip=false
\ No newline at end of file
diff --git a/msa/integration-tests/src/test/java/org/thingsboard/server/msa/AbstractContainerTest.java b/msa/integration-tests/src/test/java/org/thingsboard/server/msa/AbstractContainerTest.java
index ab7f101..5ebab78 100644
--- a/msa/integration-tests/src/test/java/org/thingsboard/server/msa/AbstractContainerTest.java
+++ b/msa/integration-tests/src/test/java/org/thingsboard/server/msa/AbstractContainerTest.java
@@ -21,55 +21,68 @@ import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.http.config.Registry;
+import org.apache.http.config.RegistryBuilder;
+import org.apache.http.conn.socket.ConnectionSocketFactory;
+import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
+import org.apache.http.conn.ssl.TrustStrategy;
+import org.apache.http.conn.ssl.X509HostnameVerifier;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
+import org.apache.http.ssl.SSLContextBuilder;
+import org.apache.http.ssl.SSLContexts;
import org.junit.*;
+import org.springframework.http.client.HttpComponentsClientHttpRequestFactory;
import org.thingsboard.client.tools.RestClient;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.id.DeviceId;
+import org.thingsboard.server.msa.mapper.WsTelemetryResponse;
+import javax.net.ssl.*;
import java.net.URI;
-import java.net.URISyntaxException;
+import java.security.cert.X509Certificate;
import java.util.List;
import java.util.Map;
import java.util.Random;
-import java.util.concurrent.TimeUnit;
@Slf4j
public abstract class AbstractContainerTest {
- protected static String httpUrl;
- protected static String wsUrl;
+ protected static final String HTTPS_URL = "https://localhost";
+ protected static final String WSS_URL = "wss://localhost";
protected static RestClient restClient;
protected ObjectMapper mapper = new ObjectMapper();
@BeforeClass
- public static void before() {
- httpUrl = "http://localhost:" + ContainerTestSuite.composeContainer.getServicePort("tb-web-ui1", ContainerTestSuite.EXPOSED_PORT);
- wsUrl = "ws://localhost:" + ContainerTestSuite.composeContainer.getServicePort("tb-web-ui1", ContainerTestSuite.EXPOSED_PORT);
- restClient = new RestClient(httpUrl);
+ public static void before() throws Exception {
+ restClient = new RestClient(HTTPS_URL);
+ restClient.getRestTemplate().setRequestFactory(getRequestFactoryForSelfSignedCert());
}
protected Device createDevice(String name) {
return restClient.createDevice(name + RandomStringUtils.randomAlphanumeric(7), "DEFAULT");
}
- protected WsClient subscribeToTelemetryWebSocket(DeviceId deviceId) throws URISyntaxException, InterruptedException {
- WsClient mWs = new WsClient(new URI(wsUrl + "/api/ws/plugins/telemetry?token=" + restClient.getToken()));
- mWs.connectBlocking(1, TimeUnit.SECONDS);
-
- JsonObject tsSubCmd = new JsonObject();
- tsSubCmd.addProperty("entityType", EntityType.DEVICE.name());
- tsSubCmd.addProperty("entityId", deviceId.toString());
- tsSubCmd.addProperty("scope", "LATEST_TELEMETRY");
- tsSubCmd.addProperty("cmdId", new Random().nextInt(100));
- tsSubCmd.addProperty("unsubscribe", false);
- JsonArray wsTsSubCmds = new JsonArray();
- wsTsSubCmds.add(tsSubCmd);
+ protected WsClient subscribeToWebSocket(DeviceId deviceId, String scope, CmdsType property) throws Exception {
+ WsClient wsClient = new WsClient(new URI(WSS_URL + "/api/ws/plugins/telemetry?token=" + restClient.getToken()));
+ SSLContextBuilder builder = SSLContexts.custom();
+ builder.loadTrustMaterial(null, (TrustStrategy) (chain, authType) -> true);
+ wsClient.setSocket(builder.build().getSocketFactory().createSocket());
+ wsClient.connectBlocking();
+
+ JsonObject cmdsObject = new JsonObject();
+ cmdsObject.addProperty("entityType", EntityType.DEVICE.name());
+ cmdsObject.addProperty("entityId", deviceId.toString());
+ cmdsObject.addProperty("scope", scope);
+ cmdsObject.addProperty("cmdId", new Random().nextInt(100));
+
+ JsonArray cmd = new JsonArray();
+ cmd.add(cmdsObject);
JsonObject wsRequest = new JsonObject();
- wsRequest.add("tsSubCmds", wsTsSubCmds);
- wsRequest.add("historyCmds", new JsonArray());
- wsRequest.add("attrSubCmds", new JsonArray());
- mWs.send(wsRequest.toString());
- return mWs;
+ wsRequest.add(property.toString(), cmd);
+ wsClient.send(wsRequest.toString());
+ return wsClient;
}
protected Map<String, Long> getExpectedLatestValues(long ts) {
@@ -109,4 +122,54 @@ public abstract class AbstractContainerTest {
return values;
}
+ protected enum CmdsType {
+ TS_SUB_CMDS("tsSubCmds"),
+ HISTORY_CMDS("historyCmds"),
+ ATTR_SUB_CMDS("attrSubCmds");
+
+ private final String text;
+
+ CmdsType(final String text) {
+ this.text = text;
+ }
+
+ @Override
+ public String toString() {
+ return text;
+ }
+ }
+
+ private static HttpComponentsClientHttpRequestFactory getRequestFactoryForSelfSignedCert() throws Exception {
+ SSLContextBuilder builder = SSLContexts.custom();
+ builder.loadTrustMaterial(null, (TrustStrategy) (chain, authType) -> true);
+ SSLContext sslContext = builder.build();
+ SSLConnectionSocketFactory sslSelfSigned = new SSLConnectionSocketFactory(sslContext, new X509HostnameVerifier() {
+ @Override
+ public void verify(String host, SSLSocket ssl) {
+ }
+
+ @Override
+ public void verify(String host, X509Certificate cert) {
+ }
+
+ @Override
+ public void verify(String host, String[] cns, String[] subjectAlts) {
+ }
+
+ @Override
+ public boolean verify(String s, SSLSession sslSession) {
+ return true;
+ }
+ });
+
+ Registry<ConnectionSocketFactory> socketFactoryRegistry = RegistryBuilder
+ .<ConnectionSocketFactory>create()
+ .register("https", sslSelfSigned)
+ .build();
+
+ PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager(socketFactoryRegistry);
+ CloseableHttpClient httpClient = HttpClients.custom().setConnectionManager(cm).build();
+ return new HttpComponentsClientHttpRequestFactory(httpClient);
+ }
+
}
diff --git a/msa/integration-tests/src/test/java/org/thingsboard/server/msa/connectivity/HttpClientTest.java b/msa/integration-tests/src/test/java/org/thingsboard/server/msa/connectivity/HttpClientTest.java
index 7cc0a0f..f8e1041 100644
--- a/msa/integration-tests/src/test/java/org/thingsboard/server/msa/connectivity/HttpClientTest.java
+++ b/msa/integration-tests/src/test/java/org/thingsboard/server/msa/connectivity/HttpClientTest.java
@@ -15,6 +15,7 @@
*/
package org.thingsboard.server.msa.connectivity;
+import com.google.common.collect.Sets;
import org.junit.Assert;
import org.junit.Test;
import org.springframework.http.ResponseEntity;
@@ -22,7 +23,7 @@ import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.security.DeviceCredentials;
import org.thingsboard.server.msa.AbstractContainerTest;
import org.thingsboard.server.msa.WsClient;
-import org.thingsboard.server.msa.WsTelemetryResponse;
+import org.thingsboard.server.msa.mapper.WsTelemetryResponse;
import java.util.concurrent.TimeUnit;
@@ -35,23 +36,24 @@ public class HttpClientTest extends AbstractContainerTest {
Device device = createDevice("http_");
DeviceCredentials deviceCredentials = restClient.getCredentials(device.getId());
- WsClient mWs = subscribeToTelemetryWebSocket(device.getId());
+ WsClient wsClient = subscribeToWebSocket(device.getId(), "LATEST_TELEMETRY", CmdsType.TS_SUB_CMDS);
ResponseEntity deviceTelemetryResponse = restClient.getRestTemplate()
- .postForEntity(httpUrl + "/api/v1/{credentialsId}/telemetry",
+ .postForEntity(HTTPS_URL + "/api/v1/{credentialsId}/telemetry",
mapper.readTree(createPayload().toString()),
ResponseEntity.class,
deviceCredentials.getCredentialsId());
Assert.assertTrue(deviceTelemetryResponse.getStatusCode().is2xxSuccessful());
TimeUnit.SECONDS.sleep(1);
- WsTelemetryResponse actualLatestTelemetry = mapper.readValue(mWs.getLastMessage(), WsTelemetryResponse.class);
+ WsTelemetryResponse actualLatestTelemetry = mapper.readValue(wsClient.getLastMessage(), WsTelemetryResponse.class);
- Assert.assertEquals(getExpectedLatestValues(123456789L).keySet(), actualLatestTelemetry.getLatestValues().keySet());
+ Assert.assertEquals(Sets.newHashSet("booleanKey", "stringKey", "doubleKey", "longKey"),
+ actualLatestTelemetry.getLatestValues().keySet());
Assert.assertTrue(verify(actualLatestTelemetry, "booleanKey", Boolean.TRUE.toString()));
Assert.assertTrue(verify(actualLatestTelemetry, "stringKey", "value1"));
Assert.assertTrue(verify(actualLatestTelemetry, "doubleKey", Double.toString(42.0)));
Assert.assertTrue(verify(actualLatestTelemetry, "longKey", Long.toString(73)));
- restClient.getRestTemplate().delete(httpUrl + "/api/device/" + device.getId());
+ restClient.getRestTemplate().delete(HTTPS_URL + "/api/device/" + device.getId());
}
}
diff --git a/msa/integration-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java b/msa/integration-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java
index 4ad638e..eae98b9 100644
--- a/msa/integration-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java
+++ b/msa/integration-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java
@@ -15,20 +15,39 @@
*/
package org.thingsboard.server.msa.connectivity;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.gson.JsonObject;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import lombok.Data;
+import org.apache.commons.lang3.RandomStringUtils;
import org.junit.*;
+import org.springframework.core.ParameterizedTypeReference;
+import org.springframework.http.HttpMethod;
+import org.springframework.http.ResponseEntity;
import org.thingsboard.mqtt.MqttClient;
import org.thingsboard.mqtt.MqttClientConfig;
import org.thingsboard.mqtt.MqttHandler;
import org.thingsboard.server.common.data.Device;
+import org.thingsboard.server.common.data.id.RuleChainId;
+import org.thingsboard.server.common.data.page.TextPageData;
+import org.thingsboard.server.common.data.rule.NodeConnectionInfo;
+import org.thingsboard.server.common.data.rule.RuleChain;
+import org.thingsboard.server.common.data.rule.RuleChainMetaData;
+import org.thingsboard.server.common.data.rule.RuleNode;
import org.thingsboard.server.common.data.security.DeviceCredentials;
import org.thingsboard.server.msa.AbstractContainerTest;
import org.thingsboard.server.msa.WsClient;
-import org.thingsboard.server.msa.WsTelemetryResponse;
+import org.thingsboard.server.msa.mapper.AttributesResponse;
+import org.thingsboard.server.msa.mapper.WsTelemetryResponse;
+import java.io.IOException;
import java.nio.charset.StandardCharsets;
+import java.util.*;
import java.util.concurrent.*;
public class MqttClientTest extends AbstractContainerTest {
@@ -39,20 +58,22 @@ public class MqttClientTest extends AbstractContainerTest {
Device device = createDevice("mqtt_");
DeviceCredentials deviceCredentials = restClient.getCredentials(device.getId());
- WsClient mWs = subscribeToTelemetryWebSocket(device.getId());
- MqttClient mqttClient = getMqttClient(deviceCredentials);
+ WsClient wsClient = subscribeToWebSocket(device.getId(), "LATEST_TELEMETRY", CmdsType.TS_SUB_CMDS);
+ MqttClient mqttClient = getMqttClient(deviceCredentials, null);
mqttClient.publish("v1/devices/me/telemetry", Unpooled.wrappedBuffer(createPayload().toString().getBytes()));
TimeUnit.SECONDS.sleep(1);
- WsTelemetryResponse actualLatestTelemetry = mapper.readValue(mWs.getLastMessage(), WsTelemetryResponse.class);
+ WsTelemetryResponse actualLatestTelemetry = mapper.readValue(wsClient.getLastMessage(), WsTelemetryResponse.class);
- Assert.assertEquals(getExpectedLatestValues(123456789L).keySet(), actualLatestTelemetry.getLatestValues().keySet());
+ Assert.assertEquals(4, actualLatestTelemetry.getData().size());
+ Assert.assertEquals(Sets.newHashSet("booleanKey", "stringKey", "doubleKey", "longKey"),
+ actualLatestTelemetry.getLatestValues().keySet());
Assert.assertTrue(verify(actualLatestTelemetry, "booleanKey", Boolean.TRUE.toString()));
Assert.assertTrue(verify(actualLatestTelemetry, "stringKey", "value1"));
Assert.assertTrue(verify(actualLatestTelemetry, "doubleKey", Double.toString(42.0)));
Assert.assertTrue(verify(actualLatestTelemetry, "longKey", Long.toString(73)));
- restClient.getRestTemplate().delete(httpUrl + "/api/device/" + device.getId());
+ restClient.getRestTemplate().delete(HTTPS_URL + "/api/device/" + device.getId());
}
@Test
@@ -63,12 +84,13 @@ public class MqttClientTest extends AbstractContainerTest {
Device device = createDevice("mqtt_");
DeviceCredentials deviceCredentials = restClient.getCredentials(device.getId());
- WsClient mWs = subscribeToTelemetryWebSocket(device.getId());
- MqttClient mqttClient = getMqttClient(deviceCredentials);
+ WsClient wsClient = subscribeToWebSocket(device.getId(), "LATEST_TELEMETRY", CmdsType.TS_SUB_CMDS);
+ MqttClient mqttClient = getMqttClient(deviceCredentials, null);
mqttClient.publish("v1/devices/me/telemetry", Unpooled.wrappedBuffer(createPayload(ts).toString().getBytes()));
TimeUnit.SECONDS.sleep(1);
- WsTelemetryResponse actualLatestTelemetry = mapper.readValue(mWs.getLastMessage(), WsTelemetryResponse.class);
+ WsTelemetryResponse actualLatestTelemetry = mapper.readValue(wsClient.getLastMessage(), WsTelemetryResponse.class);
+ Assert.assertEquals(4, actualLatestTelemetry.getData().size());
Assert.assertEquals(getExpectedLatestValues(ts), actualLatestTelemetry.getLatestValues());
Assert.assertTrue(verify(actualLatestTelemetry, "booleanKey", ts, Boolean.TRUE.toString()));
@@ -76,15 +98,271 @@ public class MqttClientTest extends AbstractContainerTest {
Assert.assertTrue(verify(actualLatestTelemetry, "doubleKey", ts, Double.toString(42.0)));
Assert.assertTrue(verify(actualLatestTelemetry, "longKey", ts, Long.toString(73)));
- restClient.getRestTemplate().delete(httpUrl + "/api/device/" + device.getId());
+ restClient.getRestTemplate().delete(HTTPS_URL + "/api/device/" + device.getId());
}
- private MqttClient getMqttClient(DeviceCredentials deviceCredentials) throws InterruptedException {
- MqttMessageListener queue = new MqttMessageListener();
+ @Test
+ public void publishAttributeUpdateToServer() throws Exception {
+ restClient.login("tenant@thingsboard.org", "tenant");
+ Device device = createDevice("mqtt_");
+ DeviceCredentials deviceCredentials = restClient.getCredentials(device.getId());
+
+ WsClient wsClient = subscribeToWebSocket(device.getId(), "CLIENT_SCOPE", CmdsType.ATTR_SUB_CMDS);
+ MqttMessageListener listener = new MqttMessageListener();
+ MqttClient mqttClient = getMqttClient(deviceCredentials, listener);
+ JsonObject clientAttributes = new JsonObject();
+ clientAttributes.addProperty("attr1", "value1");
+ clientAttributes.addProperty("attr2", true);
+ clientAttributes.addProperty("attr3", 42.0);
+ clientAttributes.addProperty("attr4", 73);
+ mqttClient.publish("v1/devices/me/attributes", Unpooled.wrappedBuffer(clientAttributes.toString().getBytes()));
+ TimeUnit.SECONDS.sleep(1);
+ WsTelemetryResponse actualLatestTelemetry = mapper.readValue(wsClient.getLastMessage(), WsTelemetryResponse.class);
+
+ Assert.assertEquals(4, actualLatestTelemetry.getData().size());
+ Assert.assertEquals(Sets.newHashSet("attr1", "attr2", "attr3", "attr4"),
+ actualLatestTelemetry.getLatestValues().keySet());
+
+ Assert.assertTrue(verify(actualLatestTelemetry, "attr1", "value1"));
+ Assert.assertTrue(verify(actualLatestTelemetry, "attr2", Boolean.TRUE.toString()));
+ Assert.assertTrue(verify(actualLatestTelemetry, "attr3", Double.toString(42.0)));
+ Assert.assertTrue(verify(actualLatestTelemetry, "attr4", Long.toString(73)));
+
+ restClient.getRestTemplate().delete(HTTPS_URL + "/api/device/" + device.getId());
+ }
+
+ @Test
+ public void requestAttributeValuesFromServer() throws Exception {
+ restClient.login("tenant@thingsboard.org", "tenant");
+ Device device = createDevice("mqtt_");
+ DeviceCredentials deviceCredentials = restClient.getCredentials(device.getId());
+
+ MqttMessageListener listener = new MqttMessageListener();
+ MqttClient mqttClient = getMqttClient(deviceCredentials, listener);
+
+ // Add a new client attribute
+ JsonObject clientAttributes = new JsonObject();
+ String clientAttributeValue = RandomStringUtils.randomAlphanumeric(8);
+ clientAttributes.addProperty("clientAttr", clientAttributeValue);
+ mqttClient.publish("v1/devices/me/attributes", Unpooled.wrappedBuffer(clientAttributes.toString().getBytes()));
+
+ // Add a new shared attribute
+ JsonObject sharedAttributes = new JsonObject();
+ String sharedAttributeValue = RandomStringUtils.randomAlphanumeric(8);
+ sharedAttributes.addProperty("sharedAttr", sharedAttributeValue);
+ ResponseEntity sharedAttributesResponse = restClient.getRestTemplate()
+ .postForEntity(HTTPS_URL + "/api/plugins/telemetry/DEVICE/{deviceId}/SHARED_SCOPE",
+ mapper.readTree(sharedAttributes.toString()), ResponseEntity.class,
+ device.getId());
+ Assert.assertTrue(sharedAttributesResponse.getStatusCode().is2xxSuccessful());
+
+ // Subscribe to attributes response
+ mqttClient.on("v1/devices/me/attributes/response/+", listener);
+ // Request attributes
+ JsonObject request = new JsonObject();
+ request.addProperty("clientKeys", "clientAttr");
+ request.addProperty("sharedKeys", "sharedAttr");
+ mqttClient.publish("v1/devices/me/attributes/request/" + new Random().nextInt(100), Unpooled.wrappedBuffer(request.toString().getBytes()));
+ MqttEvent event = listener.getEvents().poll(10, TimeUnit.SECONDS);
+ AttributesResponse attributes = mapper.readValue(Objects.requireNonNull(event).getMessage(), AttributesResponse.class);
+
+ Assert.assertEquals(1, attributes.getClient().size());
+ Assert.assertEquals(clientAttributeValue, attributes.getClient().get("clientAttr"));
+
+ Assert.assertEquals(1, attributes.getShared().size());
+ Assert.assertEquals(sharedAttributeValue, attributes.getShared().get("sharedAttr"));
+
+ restClient.getRestTemplate().delete(HTTPS_URL + "/api/device/" + device.getId());
+ }
+
+ @Test
+ public void subscribeToAttributeUpdatesFromServer() throws Exception {
+ restClient.login("tenant@thingsboard.org", "tenant");
+ Device device = createDevice("mqtt_");
+ DeviceCredentials deviceCredentials = restClient.getCredentials(device.getId());
+
+ MqttMessageListener listener = new MqttMessageListener();
+ MqttClient mqttClient = getMqttClient(deviceCredentials, listener);
+ mqttClient.on("v1/devices/me/attributes", listener);
+
+ String sharedAttributeName = "sharedAttr";
+
+ // Add a new shared attribute
+ JsonObject sharedAttributes = new JsonObject();
+ String sharedAttributeValue = RandomStringUtils.randomAlphanumeric(8);
+ sharedAttributes.addProperty(sharedAttributeName, sharedAttributeValue);
+ ResponseEntity sharedAttributesResponse = restClient.getRestTemplate()
+ .postForEntity(HTTPS_URL + "/api/plugins/telemetry/DEVICE/{deviceId}/SHARED_SCOPE",
+ mapper.readTree(sharedAttributes.toString()), ResponseEntity.class,
+ device.getId());
+ Assert.assertTrue(sharedAttributesResponse.getStatusCode().is2xxSuccessful());
+
+ MqttEvent event = listener.getEvents().poll(10, TimeUnit.SECONDS);
+ Assert.assertEquals(sharedAttributeValue,
+ mapper.readValue(Objects.requireNonNull(event).getMessage(), JsonNode.class).get(sharedAttributeName).asText());
+
+ // Update the shared attribute value
+ JsonObject updatedSharedAttributes = new JsonObject();
+ String updatedSharedAttributeValue = RandomStringUtils.randomAlphanumeric(8);
+ updatedSharedAttributes.addProperty(sharedAttributeName, updatedSharedAttributeValue);
+ ResponseEntity updatedSharedAttributesResponse = restClient.getRestTemplate()
+ .postForEntity(HTTPS_URL + "/api/plugins/telemetry/DEVICE/{deviceId}/SHARED_SCOPE",
+ mapper.readTree(updatedSharedAttributes.toString()), ResponseEntity.class,
+ device.getId());
+ Assert.assertTrue(updatedSharedAttributesResponse.getStatusCode().is2xxSuccessful());
+
+ event = listener.getEvents().poll(10, TimeUnit.SECONDS);
+ Assert.assertEquals(updatedSharedAttributeValue,
+ mapper.readValue(Objects.requireNonNull(event).getMessage(), JsonNode.class).get(sharedAttributeName).asText());
+
+ restClient.getRestTemplate().delete(HTTPS_URL + "/api/device/" + device.getId());
+ }
+
+ @Test
+ public void serverSideRpc() throws Exception {
+ restClient.login("tenant@thingsboard.org", "tenant");
+ Device device = createDevice("mqtt_");
+ DeviceCredentials deviceCredentials = restClient.getCredentials(device.getId());
+
+ MqttMessageListener listener = new MqttMessageListener();
+ MqttClient mqttClient = getMqttClient(deviceCredentials, listener);
+ mqttClient.on("v1/devices/me/rpc/request/+", listener);
+
+ // Send an RPC from the server
+ JsonObject serverRpcPayload = new JsonObject();
+ serverRpcPayload.addProperty("method", "getValue");
+ serverRpcPayload.addProperty("params", true);
+ serverRpcPayload.addProperty("timeout", 1000);
+ ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor());
+ ListenableFuture<ResponseEntity> future = service.submit(() -> {
+ try {
+ return restClient.getRestTemplate()
+ .postForEntity(HTTPS_URL + "/api/plugins/rpc/twoway/{deviceId}",
+ mapper.readTree(serverRpcPayload.toString()), String.class,
+ device.getId());
+ } catch (IOException e) {
+ return ResponseEntity.badRequest().build();
+ }
+ });
+
+ // Wait for RPC call from the server and send the response
+ MqttEvent requestFromServer = listener.getEvents().poll(10, TimeUnit.SECONDS);
+
+ Assert.assertEquals("{\"method\":\"getValue\",\"params\":true}", Objects.requireNonNull(requestFromServer).getMessage());
+
+ Integer requestId = Integer.valueOf(Objects.requireNonNull(requestFromServer).getTopic().substring("v1/devices/me/rpc/request/".length()));
+ JsonObject clientResponse = new JsonObject();
+ clientResponse.addProperty("response", "someResponse");
+ // Send a response to the server's RPC request
+ mqttClient.publish("v1/devices/me/rpc/response/" + requestId, Unpooled.wrappedBuffer(clientResponse.toString().getBytes()));
+
+ ResponseEntity serverResponse = future.get(5, TimeUnit.SECONDS);
+ Assert.assertTrue(serverResponse.getStatusCode().is2xxSuccessful());
+ Assert.assertEquals(clientResponse.toString(), serverResponse.getBody());
+
+ restClient.getRestTemplate().delete(HTTPS_URL + "/api/device/" + device.getId());
+ }
+
+ @Test
+ public void clientSideRpc() throws Exception {
+ restClient.login("tenant@thingsboard.org", "tenant");
+ Device device = createDevice("mqtt_");
+ DeviceCredentials deviceCredentials = restClient.getCredentials(device.getId());
+
+ MqttMessageListener listener = new MqttMessageListener();
+ MqttClient mqttClient = getMqttClient(deviceCredentials, listener);
+ mqttClient.on("v1/devices/me/rpc/request/+", listener);
+
+ // Get the default rule chain id to make it root again after test finished
+ RuleChainId defaultRuleChainId = getDefaultRuleChainId();
+
+ // Create a new root rule chain
+ RuleChainId ruleChainId = createRootRuleChainForRpcResponse();
+
+ // Send the request to the server
+ JsonObject clientRequest = new JsonObject();
+ clientRequest.addProperty("method", "getResponse");
+ clientRequest.addProperty("params", true);
+ Integer requestId = 42;
+ mqttClient.publish("v1/devices/me/rpc/request/" + requestId, Unpooled.wrappedBuffer(clientRequest.toString().getBytes()));
+
+ // Check the response from the server
+ TimeUnit.SECONDS.sleep(1);
+ MqttEvent responseFromServer = listener.getEvents().poll(1, TimeUnit.SECONDS);
+ Integer responseId = Integer.valueOf(Objects.requireNonNull(responseFromServer).getTopic().substring("v1/devices/me/rpc/response/".length()));
+ Assert.assertEquals(requestId, responseId);
+ Assert.assertEquals("requestReceived", mapper.readTree(responseFromServer.getMessage()).get("response").asText());
+
+ // Make the default rule chain a root again
+ ResponseEntity<RuleChain> rootRuleChainResponse = restClient.getRestTemplate()
+ .postForEntity(HTTPS_URL + "/api/ruleChain/{ruleChainId}/root",
+ null,
+ RuleChain.class,
+ defaultRuleChainId);
+ Assert.assertTrue(rootRuleChainResponse.getStatusCode().is2xxSuccessful());
+
+ // Delete the created rule chain
+ restClient.getRestTemplate().delete(HTTPS_URL + "/api/ruleChain/{ruleChainId}", ruleChainId);
+ restClient.getRestTemplate().delete(HTTPS_URL + "/api/device/" + device.getId());
+ }
+
+ private RuleChainId createRootRuleChainForRpcResponse() throws Exception {
+ RuleChain newRuleChain = new RuleChain();
+ newRuleChain.setName("testRuleChain");
+ ResponseEntity<RuleChain> ruleChainResponse = restClient.getRestTemplate()
+ .postForEntity(HTTPS_URL + "/api/ruleChain",
+ newRuleChain,
+ RuleChain.class);
+ Assert.assertTrue(ruleChainResponse.getStatusCode().is2xxSuccessful());
+ RuleChain ruleChain = ruleChainResponse.getBody();
+
+ JsonNode configuration = mapper.readTree(this.getClass().getClassLoader().getResourceAsStream("RpcResponseRuleChainMetadata.json"));
+ RuleChainMetaData ruleChainMetaData = new RuleChainMetaData();
+ ruleChainMetaData.setRuleChainId(ruleChain.getId());
+ ruleChainMetaData.setFirstNodeIndex(configuration.get("firstNodeIndex").asInt());
+ ruleChainMetaData.setNodes(Arrays.asList(mapper.treeToValue(configuration.get("nodes"), RuleNode[].class)));
+ ruleChainMetaData.setConnections(Arrays.asList(mapper.treeToValue(configuration.get("connections"), NodeConnectionInfo[].class)));
+
+ ResponseEntity<RuleChainMetaData> ruleChainMetadataResponse = restClient.getRestTemplate()
+ .postForEntity(HTTPS_URL + "/api/ruleChain/metadata",
+ ruleChainMetaData,
+ RuleChainMetaData.class);
+ Assert.assertTrue(ruleChainMetadataResponse.getStatusCode().is2xxSuccessful());
+
+ // Set a new rule chain as root
+ ResponseEntity<RuleChain> rootRuleChainResponse = restClient.getRestTemplate()
+ .postForEntity(HTTPS_URL + "/api/ruleChain/{ruleChainId}/root",
+ null,
+ RuleChain.class,
+ ruleChain.getId());
+ Assert.assertTrue(rootRuleChainResponse.getStatusCode().is2xxSuccessful());
+
+ return ruleChain.getId();
+ }
+
+ private RuleChainId getDefaultRuleChainId() {
+ ResponseEntity<TextPageData<RuleChain>> ruleChains = restClient.getRestTemplate().exchange(
+ HTTPS_URL + "/api/ruleChains?limit=40&textSearch=",
+ HttpMethod.GET,
+ null,
+ new ParameterizedTypeReference<TextPageData<RuleChain>>() {
+ });
+
+ Optional<RuleChain> defaultRuleChain = ruleChains.getBody().getData()
+ .stream()
+ .filter(RuleChain::isRoot)
+ .findFirst();
+ if (!defaultRuleChain.isPresent()) {
+ Assert.fail("Root rule chain wasn't found");
+ }
+ return defaultRuleChain.get().getId();
+ }
+
+ private MqttClient getMqttClient(DeviceCredentials deviceCredentials, MqttMessageListener listener) throws InterruptedException {
MqttClientConfig clientConfig = new MqttClientConfig();
clientConfig.setClientId("MQTT client from test");
clientConfig.setUsername(deviceCredentials.getCredentialsId());
- MqttClient mqttClient = MqttClient.create(clientConfig, queue);
+ MqttClient mqttClient = MqttClient.create(clientConfig, listener);
mqttClient.connect("localhost", 1883).sync();
return mqttClient;
}
diff --git a/msa/integration-tests/src/test/java/org/thingsboard/server/msa/ContainerTestSuite.java b/msa/integration-tests/src/test/java/org/thingsboard/server/msa/ContainerTestSuite.java
index fd2de22..0629960 100644
--- a/msa/integration-tests/src/test/java/org/thingsboard/server/msa/ContainerTestSuite.java
+++ b/msa/integration-tests/src/test/java/org/thingsboard/server/msa/ContainerTestSuite.java
@@ -26,12 +26,11 @@ import java.io.File;
@RunWith(ClasspathSuite.class)
@ClasspathSuite.ClassnameFilters({"org.thingsboard.server.msa.*"})
public class ContainerTestSuite {
- static final int EXPOSED_PORT = 8080;
@ClassRule
public static DockerComposeContainer composeContainer = new DockerComposeContainer(new File("./../docker/docker-compose.yml"))
.withPull(false)
.withLocalCompose(true)
.withTailChildContainers(true)
- .withExposedService("tb-web-ui1", EXPOSED_PORT, Wait.forHttp("/login"));
+ .withExposedService("tb-web-ui1", 8080, Wait.forHttp("/login"));
}
diff --git a/msa/integration-tests/src/test/java/org/thingsboard/server/msa/mapper/AttributesResponse.java b/msa/integration-tests/src/test/java/org/thingsboard/server/msa/mapper/AttributesResponse.java
new file mode 100644
index 0000000..f9774ee
--- /dev/null
+++ b/msa/integration-tests/src/test/java/org/thingsboard/server/msa/mapper/AttributesResponse.java
@@ -0,0 +1,26 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.msa.mapper;
+
+import lombok.Data;
+
+import java.util.Map;
+
+@Data
+public class AttributesResponse {
+ private Map<String, Object> client;
+ private Map<String, Object> shared;
+}
diff --git a/msa/integration-tests/src/test/java/org/thingsboard/server/msa/WsClient.java b/msa/integration-tests/src/test/java/org/thingsboard/server/msa/WsClient.java
index 2f05eee..5ef238f 100644
--- a/msa/integration-tests/src/test/java/org/thingsboard/server/msa/WsClient.java
+++ b/msa/integration-tests/src/test/java/org/thingsboard/server/msa/WsClient.java
@@ -19,16 +19,12 @@ import org.java_websocket.client.WebSocketClient;
import org.java_websocket.handshake.ServerHandshake;
import java.net.URI;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
public class WsClient extends WebSocketClient {
- private final BlockingQueue<String> events;
private String message;
public WsClient(URI serverUri) {
super(serverUri);
- events = new ArrayBlockingQueue<>(100);
}
@Override
@@ -37,13 +33,11 @@ public class WsClient extends WebSocketClient {
@Override
public void onMessage(String message) {
- events.add(message);
this.message = message;
}
@Override
public void onClose(int code, String reason, boolean remote) {
- events.clear();
}
@Override
@@ -54,4 +48,4 @@ public class WsClient extends WebSocketClient {
public String getLastMessage() {
return this.message;
}
-}
\ No newline at end of file
+}
diff --git a/msa/integration-tests/src/test/resources/RpcResponseRuleChainMetadata.json b/msa/integration-tests/src/test/resources/RpcResponseRuleChainMetadata.json
new file mode 100644
index 0000000..09178ef
--- /dev/null
+++ b/msa/integration-tests/src/test/resources/RpcResponseRuleChainMetadata.json
@@ -0,0 +1,59 @@
+{
+ "firstNodeIndex": 0,
+ "nodes": [
+ {
+ "additionalInfo": {
+ "layoutX": 325,
+ "layoutY": 150
+ },
+ "type": "org.thingsboard.rule.engine.filter.TbMsgTypeSwitchNode",
+ "name": "msgTypeSwitch",
+ "debugMode": true,
+ "configuration": {
+ "version": 0
+ }
+ },
+ {
+ "additionalInfo": {
+ "layoutX": 60,
+ "layoutY": 300
+ },
+ "type": "org.thingsboard.rule.engine.transform.TbTransformMsgNode",
+ "name": "formResponse",
+ "debugMode": true,
+ "configuration": {
+ "jsScript": "if (msg.method == \"getResponse\") {\n return {msg: {\"response\": \"requestReceived\"}, metadata: metadata, msgType: msgType};\n}\n\nreturn {msg: msg, metadata: metadata, msgType: msgType};"
+ }
+ },
+ {
+ "additionalInfo": {
+ "layoutX": 450,
+ "layoutY": 300
+ },
+ "type": "org.thingsboard.rule.engine.rpc.TbSendRPCReplyNode",
+ "name": "rpcReply",
+ "debugMode": true,
+ "configuration": {
+ "requestIdMetaDataAttribute": "requestId"
+ }
+ }
+ ],
+ "connections": [
+ {
+ "fromIndex": 0,
+ "toIndex": 1,
+ "type": "RPC Request from Device"
+ },
+ {
+ "fromIndex": 1,
+ "toIndex": 2,
+ "type": "Success"
+ },
+ {
+ "fromIndex": 1,
+ "toIndex": 2,
+ "type": "Failure"
+ }
+ ],
+ "ruleChainConnections": null
+}
\ No newline at end of file