thingsboard-aplcache

Cover MQTT API with black box tests

10/23/2018 10:41:43 AM

Details

diff --git a/msa/integration-tests/pom.xml b/msa/integration-tests/pom.xml
index a1c24f3..a9c67cc 100644
--- a/msa/integration-tests/pom.xml
+++ b/msa/integration-tests/pom.xml
@@ -34,9 +34,10 @@
     <properties>
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
         <main.dir>${basedir}/../..</main.dir>
-        <integrationtests.skip>true</integrationtests.skip>
+        <integrationTests.skip>true</integrationTests.skip>
         <testcontainers.version>1.9.1</testcontainers.version>
         <java-websocket.version>1.3.9</java-websocket.version>
+        <httpclient.version>4.5.6</httpclient.version>
     </properties>
 
     <dependencies>
@@ -51,6 +52,11 @@
             <version>${java-websocket.version}</version>
         </dependency>
         <dependency>
+            <groupId>org.apache.httpcomponents</groupId>
+            <artifactId>httpclient</artifactId>
+            <version>${httpclient.version}</version>
+        </dependency>
+        <dependency>
             <groupId>io.takari.junit</groupId>
             <artifactId>takari-cpsuite</artifactId>
         </dependency>
@@ -89,7 +95,7 @@
                     <includes>
                         <include>**/*TestSuite.java</include>
                     </includes>
-                    <skipTests>${integrationtests.skip}</skipTests>
+                    <skipTests>${integrationTests.skip}</skipTests>
                 </configuration>
             </plugin>
         </plugins>
diff --git a/msa/integration-tests/README.md b/msa/integration-tests/README.md
index 5ae6354..93df2bf 100644
--- a/msa/integration-tests/README.md
+++ b/msa/integration-tests/README.md
@@ -15,4 +15,4 @@ As result, in REPOSITORY column, next images should be present:
 
 - Run the integration tests in the [msa/integration-tests](../integration-tests) directory:
 
-        mvn clean install -Dintegrationtests.skip=false
\ No newline at end of file
+        mvn clean install -DintegrationTests.skip=false
\ No newline at end of file
diff --git a/msa/integration-tests/src/test/java/org/thingsboard/server/msa/AbstractContainerTest.java b/msa/integration-tests/src/test/java/org/thingsboard/server/msa/AbstractContainerTest.java
index ab7f101..5ebab78 100644
--- a/msa/integration-tests/src/test/java/org/thingsboard/server/msa/AbstractContainerTest.java
+++ b/msa/integration-tests/src/test/java/org/thingsboard/server/msa/AbstractContainerTest.java
@@ -21,55 +21,68 @@ import com.google.gson.JsonArray;
 import com.google.gson.JsonObject;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.http.config.Registry;
+import org.apache.http.config.RegistryBuilder;
+import org.apache.http.conn.socket.ConnectionSocketFactory;
+import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
+import org.apache.http.conn.ssl.TrustStrategy;
+import org.apache.http.conn.ssl.X509HostnameVerifier;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
+import org.apache.http.ssl.SSLContextBuilder;
+import org.apache.http.ssl.SSLContexts;
 import org.junit.*;
+import org.springframework.http.client.HttpComponentsClientHttpRequestFactory;
 import org.thingsboard.client.tools.RestClient;
 import org.thingsboard.server.common.data.Device;
 import org.thingsboard.server.common.data.EntityType;
 import org.thingsboard.server.common.data.id.DeviceId;
+import org.thingsboard.server.msa.mapper.WsTelemetryResponse;
 
+import javax.net.ssl.*;
 import java.net.URI;
-import java.net.URISyntaxException;
+import java.security.cert.X509Certificate;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
-import java.util.concurrent.TimeUnit;
 
 @Slf4j
 public abstract class AbstractContainerTest {
-    protected static String httpUrl;
-    protected static String wsUrl;
+    protected static final String HTTPS_URL = "https://localhost";
+    protected static final String WSS_URL = "wss://localhost";
     protected static RestClient restClient;
     protected ObjectMapper mapper = new ObjectMapper();
 
     @BeforeClass
-    public static void before() {
-        httpUrl = "http://localhost:" + ContainerTestSuite.composeContainer.getServicePort("tb-web-ui1", ContainerTestSuite.EXPOSED_PORT);
-        wsUrl = "ws://localhost:" + ContainerTestSuite.composeContainer.getServicePort("tb-web-ui1", ContainerTestSuite.EXPOSED_PORT);
-        restClient = new RestClient(httpUrl);
+    public static void before() throws Exception {
+        restClient = new RestClient(HTTPS_URL);
+        restClient.getRestTemplate().setRequestFactory(getRequestFactoryForSelfSignedCert());
     }
 
     protected Device createDevice(String name) {
         return restClient.createDevice(name + RandomStringUtils.randomAlphanumeric(7), "DEFAULT");
     }
 
-    protected WsClient subscribeToTelemetryWebSocket(DeviceId deviceId) throws URISyntaxException, InterruptedException {
-        WsClient mWs = new WsClient(new URI(wsUrl + "/api/ws/plugins/telemetry?token=" + restClient.getToken()));
-        mWs.connectBlocking(1, TimeUnit.SECONDS);
-
-        JsonObject tsSubCmd = new JsonObject();
-        tsSubCmd.addProperty("entityType", EntityType.DEVICE.name());
-        tsSubCmd.addProperty("entityId", deviceId.toString());
-        tsSubCmd.addProperty("scope", "LATEST_TELEMETRY");
-        tsSubCmd.addProperty("cmdId", new Random().nextInt(100));
-        tsSubCmd.addProperty("unsubscribe", false);
-        JsonArray wsTsSubCmds = new JsonArray();
-        wsTsSubCmds.add(tsSubCmd);
+    protected WsClient subscribeToWebSocket(DeviceId deviceId, String scope, CmdsType property) throws Exception {
+        WsClient wsClient = new WsClient(new URI(WSS_URL + "/api/ws/plugins/telemetry?token=" + restClient.getToken()));
+        SSLContextBuilder builder = SSLContexts.custom();
+        builder.loadTrustMaterial(null, (TrustStrategy) (chain, authType) -> true);
+        wsClient.setSocket(builder.build().getSocketFactory().createSocket());
+        wsClient.connectBlocking();
+
+        JsonObject cmdsObject = new JsonObject();
+        cmdsObject.addProperty("entityType", EntityType.DEVICE.name());
+        cmdsObject.addProperty("entityId", deviceId.toString());
+        cmdsObject.addProperty("scope", scope);
+        cmdsObject.addProperty("cmdId", new Random().nextInt(100));
+
+        JsonArray cmd = new JsonArray();
+        cmd.add(cmdsObject);
         JsonObject wsRequest = new JsonObject();
-        wsRequest.add("tsSubCmds", wsTsSubCmds);
-        wsRequest.add("historyCmds", new JsonArray());
-        wsRequest.add("attrSubCmds", new JsonArray());
-        mWs.send(wsRequest.toString());
-        return mWs;
+        wsRequest.add(property.toString(), cmd);
+        wsClient.send(wsRequest.toString());
+        return wsClient;
     }
 
     protected Map<String, Long> getExpectedLatestValues(long ts) {
@@ -109,4 +122,54 @@ public abstract class AbstractContainerTest {
         return values;
     }
 
+    protected enum CmdsType {
+        TS_SUB_CMDS("tsSubCmds"),
+        HISTORY_CMDS("historyCmds"),
+        ATTR_SUB_CMDS("attrSubCmds");
+
+        private final String text;
+
+        CmdsType(final String text) {
+            this.text = text;
+        }
+
+        @Override
+        public String toString() {
+            return text;
+        }
+    }
+
+    private static HttpComponentsClientHttpRequestFactory getRequestFactoryForSelfSignedCert() throws Exception {
+        SSLContextBuilder builder = SSLContexts.custom();
+        builder.loadTrustMaterial(null, (TrustStrategy) (chain, authType) -> true);
+        SSLContext sslContext = builder.build();
+        SSLConnectionSocketFactory sslSelfSigned = new SSLConnectionSocketFactory(sslContext, new X509HostnameVerifier() {
+            @Override
+            public void verify(String host, SSLSocket ssl) {
+            }
+
+            @Override
+            public void verify(String host, X509Certificate cert) {
+            }
+
+            @Override
+            public void verify(String host, String[] cns, String[] subjectAlts) {
+            }
+
+            @Override
+            public boolean verify(String s, SSLSession sslSession) {
+                return true;
+            }
+        });
+
+        Registry<ConnectionSocketFactory> socketFactoryRegistry = RegistryBuilder
+                .<ConnectionSocketFactory>create()
+                .register("https", sslSelfSigned)
+                .build();
+
+        PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager(socketFactoryRegistry);
+        CloseableHttpClient httpClient = HttpClients.custom().setConnectionManager(cm).build();
+        return new HttpComponentsClientHttpRequestFactory(httpClient);
+    }
+
 }
diff --git a/msa/integration-tests/src/test/java/org/thingsboard/server/msa/connectivity/HttpClientTest.java b/msa/integration-tests/src/test/java/org/thingsboard/server/msa/connectivity/HttpClientTest.java
index 7cc0a0f..f8e1041 100644
--- a/msa/integration-tests/src/test/java/org/thingsboard/server/msa/connectivity/HttpClientTest.java
+++ b/msa/integration-tests/src/test/java/org/thingsboard/server/msa/connectivity/HttpClientTest.java
@@ -15,6 +15,7 @@
  */
 package org.thingsboard.server.msa.connectivity;
 
+import com.google.common.collect.Sets;
 import org.junit.Assert;
 import org.junit.Test;
 import org.springframework.http.ResponseEntity;
@@ -22,7 +23,7 @@ import org.thingsboard.server.common.data.Device;
 import org.thingsboard.server.common.data.security.DeviceCredentials;
 import org.thingsboard.server.msa.AbstractContainerTest;
 import org.thingsboard.server.msa.WsClient;
-import org.thingsboard.server.msa.WsTelemetryResponse;
+import org.thingsboard.server.msa.mapper.WsTelemetryResponse;
 
 import java.util.concurrent.TimeUnit;
 
@@ -35,23 +36,24 @@ public class HttpClientTest extends AbstractContainerTest {
         Device device = createDevice("http_");
         DeviceCredentials deviceCredentials = restClient.getCredentials(device.getId());
 
-        WsClient mWs = subscribeToTelemetryWebSocket(device.getId());
+        WsClient wsClient = subscribeToWebSocket(device.getId(), "LATEST_TELEMETRY", CmdsType.TS_SUB_CMDS);
         ResponseEntity deviceTelemetryResponse = restClient.getRestTemplate()
-                .postForEntity(httpUrl + "/api/v1/{credentialsId}/telemetry",
+                .postForEntity(HTTPS_URL + "/api/v1/{credentialsId}/telemetry",
                         mapper.readTree(createPayload().toString()),
                         ResponseEntity.class,
                         deviceCredentials.getCredentialsId());
         Assert.assertTrue(deviceTelemetryResponse.getStatusCode().is2xxSuccessful());
         TimeUnit.SECONDS.sleep(1);
-        WsTelemetryResponse actualLatestTelemetry = mapper.readValue(mWs.getLastMessage(), WsTelemetryResponse.class);
+        WsTelemetryResponse actualLatestTelemetry = mapper.readValue(wsClient.getLastMessage(), WsTelemetryResponse.class);
 
-        Assert.assertEquals(getExpectedLatestValues(123456789L).keySet(), actualLatestTelemetry.getLatestValues().keySet());
+        Assert.assertEquals(Sets.newHashSet("booleanKey", "stringKey", "doubleKey", "longKey"),
+                actualLatestTelemetry.getLatestValues().keySet());
 
         Assert.assertTrue(verify(actualLatestTelemetry, "booleanKey", Boolean.TRUE.toString()));
         Assert.assertTrue(verify(actualLatestTelemetry, "stringKey", "value1"));
         Assert.assertTrue(verify(actualLatestTelemetry, "doubleKey", Double.toString(42.0)));
         Assert.assertTrue(verify(actualLatestTelemetry, "longKey", Long.toString(73)));
 
-        restClient.getRestTemplate().delete(httpUrl + "/api/device/" + device.getId());
+        restClient.getRestTemplate().delete(HTTPS_URL + "/api/device/" + device.getId());
     }
 }
diff --git a/msa/integration-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java b/msa/integration-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java
index 4ad638e..eae98b9 100644
--- a/msa/integration-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java
+++ b/msa/integration-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java
@@ -15,20 +15,39 @@
  */
 package org.thingsboard.server.msa.connectivity;
 
+import com.fasterxml.jackson.databind.JsonNode;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.gson.JsonObject;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import lombok.Data;
+import org.apache.commons.lang3.RandomStringUtils;
 import org.junit.*;
+import org.springframework.core.ParameterizedTypeReference;
+import org.springframework.http.HttpMethod;
+import org.springframework.http.ResponseEntity;
 import org.thingsboard.mqtt.MqttClient;
 import org.thingsboard.mqtt.MqttClientConfig;
 import org.thingsboard.mqtt.MqttHandler;
 import org.thingsboard.server.common.data.Device;
+import org.thingsboard.server.common.data.id.RuleChainId;
+import org.thingsboard.server.common.data.page.TextPageData;
+import org.thingsboard.server.common.data.rule.NodeConnectionInfo;
+import org.thingsboard.server.common.data.rule.RuleChain;
+import org.thingsboard.server.common.data.rule.RuleChainMetaData;
+import org.thingsboard.server.common.data.rule.RuleNode;
 import org.thingsboard.server.common.data.security.DeviceCredentials;
 import org.thingsboard.server.msa.AbstractContainerTest;
 import org.thingsboard.server.msa.WsClient;
-import org.thingsboard.server.msa.WsTelemetryResponse;
+import org.thingsboard.server.msa.mapper.AttributesResponse;
+import org.thingsboard.server.msa.mapper.WsTelemetryResponse;
 
+import java.io.IOException;
 import java.nio.charset.StandardCharsets;
+import java.util.*;
 import java.util.concurrent.*;
 
 public class MqttClientTest extends AbstractContainerTest {
@@ -39,20 +58,22 @@ public class MqttClientTest extends AbstractContainerTest {
         Device device = createDevice("mqtt_");
         DeviceCredentials deviceCredentials = restClient.getCredentials(device.getId());
 
-        WsClient mWs = subscribeToTelemetryWebSocket(device.getId());
-        MqttClient mqttClient = getMqttClient(deviceCredentials);
+        WsClient wsClient = subscribeToWebSocket(device.getId(), "LATEST_TELEMETRY", CmdsType.TS_SUB_CMDS);
+        MqttClient mqttClient = getMqttClient(deviceCredentials, null);
         mqttClient.publish("v1/devices/me/telemetry", Unpooled.wrappedBuffer(createPayload().toString().getBytes()));
         TimeUnit.SECONDS.sleep(1);
-        WsTelemetryResponse actualLatestTelemetry = mapper.readValue(mWs.getLastMessage(), WsTelemetryResponse.class);
+        WsTelemetryResponse actualLatestTelemetry = mapper.readValue(wsClient.getLastMessage(), WsTelemetryResponse.class);
 
-        Assert.assertEquals(getExpectedLatestValues(123456789L).keySet(), actualLatestTelemetry.getLatestValues().keySet());
+        Assert.assertEquals(4, actualLatestTelemetry.getData().size());
+        Assert.assertEquals(Sets.newHashSet("booleanKey", "stringKey", "doubleKey", "longKey"),
+                actualLatestTelemetry.getLatestValues().keySet());
 
         Assert.assertTrue(verify(actualLatestTelemetry, "booleanKey", Boolean.TRUE.toString()));
         Assert.assertTrue(verify(actualLatestTelemetry, "stringKey", "value1"));
         Assert.assertTrue(verify(actualLatestTelemetry, "doubleKey", Double.toString(42.0)));
         Assert.assertTrue(verify(actualLatestTelemetry, "longKey", Long.toString(73)));
 
-        restClient.getRestTemplate().delete(httpUrl + "/api/device/" + device.getId());
+        restClient.getRestTemplate().delete(HTTPS_URL + "/api/device/" + device.getId());
     }
 
     @Test
@@ -63,12 +84,13 @@ public class MqttClientTest extends AbstractContainerTest {
         Device device = createDevice("mqtt_");
         DeviceCredentials deviceCredentials = restClient.getCredentials(device.getId());
 
-        WsClient mWs = subscribeToTelemetryWebSocket(device.getId());
-        MqttClient mqttClient = getMqttClient(deviceCredentials);
+        WsClient wsClient = subscribeToWebSocket(device.getId(), "LATEST_TELEMETRY", CmdsType.TS_SUB_CMDS);
+        MqttClient mqttClient = getMqttClient(deviceCredentials, null);
         mqttClient.publish("v1/devices/me/telemetry", Unpooled.wrappedBuffer(createPayload(ts).toString().getBytes()));
         TimeUnit.SECONDS.sleep(1);
-        WsTelemetryResponse actualLatestTelemetry = mapper.readValue(mWs.getLastMessage(), WsTelemetryResponse.class);
+        WsTelemetryResponse actualLatestTelemetry = mapper.readValue(wsClient.getLastMessage(), WsTelemetryResponse.class);
 
+        Assert.assertEquals(4, actualLatestTelemetry.getData().size());
         Assert.assertEquals(getExpectedLatestValues(ts), actualLatestTelemetry.getLatestValues());
 
         Assert.assertTrue(verify(actualLatestTelemetry, "booleanKey", ts, Boolean.TRUE.toString()));
@@ -76,15 +98,271 @@ public class MqttClientTest extends AbstractContainerTest {
         Assert.assertTrue(verify(actualLatestTelemetry, "doubleKey", ts, Double.toString(42.0)));
         Assert.assertTrue(verify(actualLatestTelemetry, "longKey", ts, Long.toString(73)));
 
-        restClient.getRestTemplate().delete(httpUrl + "/api/device/" + device.getId());
+        restClient.getRestTemplate().delete(HTTPS_URL + "/api/device/" + device.getId());
     }
 
-    private MqttClient getMqttClient(DeviceCredentials deviceCredentials) throws InterruptedException {
-        MqttMessageListener queue = new MqttMessageListener();
+    @Test
+    public void publishAttributeUpdateToServer() throws Exception {
+        restClient.login("tenant@thingsboard.org", "tenant");
+        Device device = createDevice("mqtt_");
+        DeviceCredentials deviceCredentials = restClient.getCredentials(device.getId());
+
+        WsClient wsClient = subscribeToWebSocket(device.getId(), "CLIENT_SCOPE", CmdsType.ATTR_SUB_CMDS);
+        MqttMessageListener listener = new MqttMessageListener();
+        MqttClient mqttClient = getMqttClient(deviceCredentials, listener);
+        JsonObject clientAttributes = new JsonObject();
+        clientAttributes.addProperty("attr1", "value1");
+        clientAttributes.addProperty("attr2", true);
+        clientAttributes.addProperty("attr3", 42.0);
+        clientAttributes.addProperty("attr4", 73);
+        mqttClient.publish("v1/devices/me/attributes", Unpooled.wrappedBuffer(clientAttributes.toString().getBytes()));
+        TimeUnit.SECONDS.sleep(1);
+        WsTelemetryResponse actualLatestTelemetry = mapper.readValue(wsClient.getLastMessage(), WsTelemetryResponse.class);
+
+        Assert.assertEquals(4, actualLatestTelemetry.getData().size());
+        Assert.assertEquals(Sets.newHashSet("attr1", "attr2", "attr3", "attr4"),
+                actualLatestTelemetry.getLatestValues().keySet());
+
+        Assert.assertTrue(verify(actualLatestTelemetry, "attr1", "value1"));
+        Assert.assertTrue(verify(actualLatestTelemetry, "attr2", Boolean.TRUE.toString()));
+        Assert.assertTrue(verify(actualLatestTelemetry, "attr3", Double.toString(42.0)));
+        Assert.assertTrue(verify(actualLatestTelemetry, "attr4", Long.toString(73)));
+
+        restClient.getRestTemplate().delete(HTTPS_URL + "/api/device/" + device.getId());
+    }
+
+    @Test
+    public void requestAttributeValuesFromServer() throws Exception {
+        restClient.login("tenant@thingsboard.org", "tenant");
+        Device device = createDevice("mqtt_");
+        DeviceCredentials deviceCredentials = restClient.getCredentials(device.getId());
+
+        MqttMessageListener listener = new MqttMessageListener();
+        MqttClient mqttClient = getMqttClient(deviceCredentials, listener);
+
+        // Add a new client attribute
+        JsonObject clientAttributes = new JsonObject();
+        String clientAttributeValue = RandomStringUtils.randomAlphanumeric(8);
+        clientAttributes.addProperty("clientAttr", clientAttributeValue);
+        mqttClient.publish("v1/devices/me/attributes", Unpooled.wrappedBuffer(clientAttributes.toString().getBytes()));
+
+        // Add a new shared attribute
+        JsonObject sharedAttributes = new JsonObject();
+        String sharedAttributeValue = RandomStringUtils.randomAlphanumeric(8);
+        sharedAttributes.addProperty("sharedAttr", sharedAttributeValue);
+        ResponseEntity sharedAttributesResponse = restClient.getRestTemplate()
+                .postForEntity(HTTPS_URL + "/api/plugins/telemetry/DEVICE/{deviceId}/SHARED_SCOPE",
+                        mapper.readTree(sharedAttributes.toString()), ResponseEntity.class,
+                        device.getId());
+        Assert.assertTrue(sharedAttributesResponse.getStatusCode().is2xxSuccessful());
+
+        // Subscribe to attributes response
+        mqttClient.on("v1/devices/me/attributes/response/+", listener);
+        // Request attributes
+        JsonObject request = new JsonObject();
+        request.addProperty("clientKeys", "clientAttr");
+        request.addProperty("sharedKeys", "sharedAttr");
+        mqttClient.publish("v1/devices/me/attributes/request/" + new Random().nextInt(100), Unpooled.wrappedBuffer(request.toString().getBytes()));
+        MqttEvent event = listener.getEvents().poll(10, TimeUnit.SECONDS);
+        AttributesResponse attributes = mapper.readValue(Objects.requireNonNull(event).getMessage(), AttributesResponse.class);
+
+        Assert.assertEquals(1, attributes.getClient().size());
+        Assert.assertEquals(clientAttributeValue, attributes.getClient().get("clientAttr"));
+
+        Assert.assertEquals(1, attributes.getShared().size());
+        Assert.assertEquals(sharedAttributeValue, attributes.getShared().get("sharedAttr"));
+
+        restClient.getRestTemplate().delete(HTTPS_URL + "/api/device/" + device.getId());
+    }
+
+    @Test
+    public void subscribeToAttributeUpdatesFromServer() throws Exception {
+        restClient.login("tenant@thingsboard.org", "tenant");
+        Device device = createDevice("mqtt_");
+        DeviceCredentials deviceCredentials = restClient.getCredentials(device.getId());
+
+        MqttMessageListener listener = new MqttMessageListener();
+        MqttClient mqttClient = getMqttClient(deviceCredentials, listener);
+        mqttClient.on("v1/devices/me/attributes", listener);
+
+        String sharedAttributeName = "sharedAttr";
+
+        // Add a new shared attribute
+        JsonObject sharedAttributes = new JsonObject();
+        String sharedAttributeValue = RandomStringUtils.randomAlphanumeric(8);
+        sharedAttributes.addProperty(sharedAttributeName, sharedAttributeValue);
+        ResponseEntity sharedAttributesResponse = restClient.getRestTemplate()
+                .postForEntity(HTTPS_URL + "/api/plugins/telemetry/DEVICE/{deviceId}/SHARED_SCOPE",
+                        mapper.readTree(sharedAttributes.toString()), ResponseEntity.class,
+                        device.getId());
+        Assert.assertTrue(sharedAttributesResponse.getStatusCode().is2xxSuccessful());
+
+        MqttEvent event = listener.getEvents().poll(10, TimeUnit.SECONDS);
+        Assert.assertEquals(sharedAttributeValue,
+                mapper.readValue(Objects.requireNonNull(event).getMessage(), JsonNode.class).get(sharedAttributeName).asText());
+
+        // Update the shared attribute value
+        JsonObject updatedSharedAttributes = new JsonObject();
+        String updatedSharedAttributeValue = RandomStringUtils.randomAlphanumeric(8);
+        updatedSharedAttributes.addProperty(sharedAttributeName, updatedSharedAttributeValue);
+        ResponseEntity updatedSharedAttributesResponse = restClient.getRestTemplate()
+                .postForEntity(HTTPS_URL + "/api/plugins/telemetry/DEVICE/{deviceId}/SHARED_SCOPE",
+                        mapper.readTree(updatedSharedAttributes.toString()), ResponseEntity.class,
+                        device.getId());
+        Assert.assertTrue(updatedSharedAttributesResponse.getStatusCode().is2xxSuccessful());
+
+        event = listener.getEvents().poll(10, TimeUnit.SECONDS);
+        Assert.assertEquals(updatedSharedAttributeValue,
+                mapper.readValue(Objects.requireNonNull(event).getMessage(), JsonNode.class).get(sharedAttributeName).asText());
+
+        restClient.getRestTemplate().delete(HTTPS_URL + "/api/device/" + device.getId());
+    }
+
+    @Test
+    public void serverSideRpc() throws Exception {
+        restClient.login("tenant@thingsboard.org", "tenant");
+        Device device = createDevice("mqtt_");
+        DeviceCredentials deviceCredentials = restClient.getCredentials(device.getId());
+
+        MqttMessageListener listener = new MqttMessageListener();
+        MqttClient mqttClient = getMqttClient(deviceCredentials, listener);
+        mqttClient.on("v1/devices/me/rpc/request/+", listener);
+
+        // Send an RPC from the server
+        JsonObject serverRpcPayload = new JsonObject();
+        serverRpcPayload.addProperty("method", "getValue");
+        serverRpcPayload.addProperty("params", true);
+        serverRpcPayload.addProperty("timeout", 1000);
+        ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor());
+        ListenableFuture<ResponseEntity> future = service.submit(() -> {
+            try {
+                return restClient.getRestTemplate()
+                        .postForEntity(HTTPS_URL + "/api/plugins/rpc/twoway/{deviceId}",
+                                mapper.readTree(serverRpcPayload.toString()), String.class,
+                                device.getId());
+            } catch (IOException e) {
+                return ResponseEntity.badRequest().build();
+            }
+        });
+
+        // Wait for RPC call from the server and send the response
+        MqttEvent requestFromServer = listener.getEvents().poll(10, TimeUnit.SECONDS);
+
+        Assert.assertEquals("{\"method\":\"getValue\",\"params\":true}", Objects.requireNonNull(requestFromServer).getMessage());
+
+        Integer requestId = Integer.valueOf(Objects.requireNonNull(requestFromServer).getTopic().substring("v1/devices/me/rpc/request/".length()));
+        JsonObject clientResponse = new JsonObject();
+        clientResponse.addProperty("response", "someResponse");
+        // Send a response to the server's RPC request
+        mqttClient.publish("v1/devices/me/rpc/response/" + requestId, Unpooled.wrappedBuffer(clientResponse.toString().getBytes()));
+
+        ResponseEntity serverResponse = future.get(5, TimeUnit.SECONDS);
+        Assert.assertTrue(serverResponse.getStatusCode().is2xxSuccessful());
+        Assert.assertEquals(clientResponse.toString(), serverResponse.getBody());
+
+        restClient.getRestTemplate().delete(HTTPS_URL + "/api/device/" + device.getId());
+    }
+
+    @Test
+    public void clientSideRpc() throws Exception {
+        restClient.login("tenant@thingsboard.org", "tenant");
+        Device device = createDevice("mqtt_");
+        DeviceCredentials deviceCredentials = restClient.getCredentials(device.getId());
+
+        MqttMessageListener listener = new MqttMessageListener();
+        MqttClient mqttClient = getMqttClient(deviceCredentials, listener);
+        mqttClient.on("v1/devices/me/rpc/request/+", listener);
+
+        // Get the default rule chain id to make it root again after test finished
+        RuleChainId defaultRuleChainId = getDefaultRuleChainId();
+
+        // Create a new root rule chain
+        RuleChainId ruleChainId = createRootRuleChainForRpcResponse();
+
+        // Send the request to the server
+        JsonObject clientRequest = new JsonObject();
+        clientRequest.addProperty("method", "getResponse");
+        clientRequest.addProperty("params", true);
+        Integer requestId = 42;
+        mqttClient.publish("v1/devices/me/rpc/request/" + requestId, Unpooled.wrappedBuffer(clientRequest.toString().getBytes()));
+
+        // Check the response from the server
+        TimeUnit.SECONDS.sleep(1);
+        MqttEvent responseFromServer = listener.getEvents().poll(1, TimeUnit.SECONDS);
+        Integer responseId = Integer.valueOf(Objects.requireNonNull(responseFromServer).getTopic().substring("v1/devices/me/rpc/response/".length()));
+        Assert.assertEquals(requestId, responseId);
+        Assert.assertEquals("requestReceived", mapper.readTree(responseFromServer.getMessage()).get("response").asText());
+
+        // Make the default rule chain a root again
+        ResponseEntity<RuleChain> rootRuleChainResponse = restClient.getRestTemplate()
+                .postForEntity(HTTPS_URL + "/api/ruleChain/{ruleChainId}/root",
+                        null,
+                        RuleChain.class,
+                        defaultRuleChainId);
+        Assert.assertTrue(rootRuleChainResponse.getStatusCode().is2xxSuccessful());
+
+        // Delete the created rule chain
+        restClient.getRestTemplate().delete(HTTPS_URL + "/api/ruleChain/{ruleChainId}", ruleChainId);
+        restClient.getRestTemplate().delete(HTTPS_URL + "/api/device/" + device.getId());
+    }
+
+    private RuleChainId createRootRuleChainForRpcResponse() throws Exception {
+        RuleChain newRuleChain = new RuleChain();
+        newRuleChain.setName("testRuleChain");
+        ResponseEntity<RuleChain> ruleChainResponse = restClient.getRestTemplate()
+                .postForEntity(HTTPS_URL + "/api/ruleChain",
+                        newRuleChain,
+                        RuleChain.class);
+        Assert.assertTrue(ruleChainResponse.getStatusCode().is2xxSuccessful());
+        RuleChain ruleChain = ruleChainResponse.getBody();
+
+        JsonNode configuration = mapper.readTree(this.getClass().getClassLoader().getResourceAsStream("RpcResponseRuleChainMetadata.json"));
+        RuleChainMetaData ruleChainMetaData = new RuleChainMetaData();
+        ruleChainMetaData.setRuleChainId(ruleChain.getId());
+        ruleChainMetaData.setFirstNodeIndex(configuration.get("firstNodeIndex").asInt());
+        ruleChainMetaData.setNodes(Arrays.asList(mapper.treeToValue(configuration.get("nodes"), RuleNode[].class)));
+        ruleChainMetaData.setConnections(Arrays.asList(mapper.treeToValue(configuration.get("connections"), NodeConnectionInfo[].class)));
+
+        ResponseEntity<RuleChainMetaData> ruleChainMetadataResponse = restClient.getRestTemplate()
+                .postForEntity(HTTPS_URL + "/api/ruleChain/metadata",
+                        ruleChainMetaData,
+                        RuleChainMetaData.class);
+        Assert.assertTrue(ruleChainMetadataResponse.getStatusCode().is2xxSuccessful());
+
+        // Set a new rule chain as root
+        ResponseEntity<RuleChain> rootRuleChainResponse = restClient.getRestTemplate()
+                .postForEntity(HTTPS_URL + "/api/ruleChain/{ruleChainId}/root",
+                        null,
+                        RuleChain.class,
+                        ruleChain.getId());
+        Assert.assertTrue(rootRuleChainResponse.getStatusCode().is2xxSuccessful());
+
+        return ruleChain.getId();
+    }
+
+    private RuleChainId getDefaultRuleChainId() {
+        ResponseEntity<TextPageData<RuleChain>> ruleChains = restClient.getRestTemplate().exchange(
+                HTTPS_URL + "/api/ruleChains?limit=40&textSearch=",
+                HttpMethod.GET,
+                null,
+                new ParameterizedTypeReference<TextPageData<RuleChain>>() {
+                });
+
+        Optional<RuleChain> defaultRuleChain = ruleChains.getBody().getData()
+                .stream()
+                .filter(RuleChain::isRoot)
+                .findFirst();
+        if (!defaultRuleChain.isPresent()) {
+            Assert.fail("Root rule chain wasn't found");
+        }
+        return defaultRuleChain.get().getId();
+    }
+
+    private MqttClient getMqttClient(DeviceCredentials deviceCredentials, MqttMessageListener listener) throws InterruptedException {
         MqttClientConfig clientConfig = new MqttClientConfig();
         clientConfig.setClientId("MQTT client from test");
         clientConfig.setUsername(deviceCredentials.getCredentialsId());
-        MqttClient mqttClient = MqttClient.create(clientConfig, queue);
+        MqttClient mqttClient = MqttClient.create(clientConfig, listener);
         mqttClient.connect("localhost", 1883).sync();
         return mqttClient;
     }
diff --git a/msa/integration-tests/src/test/java/org/thingsboard/server/msa/ContainerTestSuite.java b/msa/integration-tests/src/test/java/org/thingsboard/server/msa/ContainerTestSuite.java
index fd2de22..0629960 100644
--- a/msa/integration-tests/src/test/java/org/thingsboard/server/msa/ContainerTestSuite.java
+++ b/msa/integration-tests/src/test/java/org/thingsboard/server/msa/ContainerTestSuite.java
@@ -26,12 +26,11 @@ import java.io.File;
 @RunWith(ClasspathSuite.class)
 @ClasspathSuite.ClassnameFilters({"org.thingsboard.server.msa.*"})
 public class ContainerTestSuite {
-    static final int EXPOSED_PORT = 8080;
 
     @ClassRule
     public static DockerComposeContainer composeContainer = new DockerComposeContainer(new File("./../docker/docker-compose.yml"))
             .withPull(false)
             .withLocalCompose(true)
             .withTailChildContainers(true)
-            .withExposedService("tb-web-ui1", EXPOSED_PORT, Wait.forHttp("/login"));
+            .withExposedService("tb-web-ui1", 8080, Wait.forHttp("/login"));
 }
diff --git a/msa/integration-tests/src/test/java/org/thingsboard/server/msa/mapper/AttributesResponse.java b/msa/integration-tests/src/test/java/org/thingsboard/server/msa/mapper/AttributesResponse.java
new file mode 100644
index 0000000..f9774ee
--- /dev/null
+++ b/msa/integration-tests/src/test/java/org/thingsboard/server/msa/mapper/AttributesResponse.java
@@ -0,0 +1,26 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.msa.mapper;
+
+import lombok.Data;
+
+import java.util.Map;
+
+@Data
+public class AttributesResponse {
+    private Map<String, Object> client;
+    private Map<String, Object> shared;
+}
diff --git a/msa/integration-tests/src/test/java/org/thingsboard/server/msa/WsClient.java b/msa/integration-tests/src/test/java/org/thingsboard/server/msa/WsClient.java
index 2f05eee..5ef238f 100644
--- a/msa/integration-tests/src/test/java/org/thingsboard/server/msa/WsClient.java
+++ b/msa/integration-tests/src/test/java/org/thingsboard/server/msa/WsClient.java
@@ -19,16 +19,12 @@ import org.java_websocket.client.WebSocketClient;
 import org.java_websocket.handshake.ServerHandshake;
 
 import java.net.URI;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
 
 public class WsClient extends WebSocketClient {
-    private final BlockingQueue<String> events;
     private String message;
 
     public WsClient(URI serverUri) {
         super(serverUri);
-        events = new ArrayBlockingQueue<>(100);
     }
 
     @Override
@@ -37,13 +33,11 @@ public class WsClient extends WebSocketClient {
 
     @Override
     public void onMessage(String message) {
-        events.add(message);
         this.message = message;
     }
 
     @Override
     public void onClose(int code, String reason, boolean remote) {
-        events.clear();
     }
 
     @Override
@@ -54,4 +48,4 @@ public class WsClient extends WebSocketClient {
     public String getLastMessage() {
         return this.message;
     }
-}
\ No newline at end of file
+}
diff --git a/msa/integration-tests/src/test/resources/RpcResponseRuleChainMetadata.json b/msa/integration-tests/src/test/resources/RpcResponseRuleChainMetadata.json
new file mode 100644
index 0000000..09178ef
--- /dev/null
+++ b/msa/integration-tests/src/test/resources/RpcResponseRuleChainMetadata.json
@@ -0,0 +1,59 @@
+{
+  "firstNodeIndex": 0,
+  "nodes": [
+    {
+      "additionalInfo": {
+        "layoutX": 325,
+        "layoutY": 150
+      },
+      "type": "org.thingsboard.rule.engine.filter.TbMsgTypeSwitchNode",
+      "name": "msgTypeSwitch",
+      "debugMode": true,
+      "configuration": {
+        "version": 0
+      }
+    },
+    {
+      "additionalInfo": {
+        "layoutX": 60,
+        "layoutY": 300
+      },
+      "type": "org.thingsboard.rule.engine.transform.TbTransformMsgNode",
+      "name": "formResponse",
+      "debugMode": true,
+      "configuration": {
+        "jsScript": "if (msg.method == \"getResponse\") {\n    return {msg: {\"response\": \"requestReceived\"}, metadata: metadata, msgType: msgType};\n}\n\nreturn {msg: msg, metadata: metadata, msgType: msgType};"
+      }
+    },
+    {
+      "additionalInfo": {
+        "layoutX": 450,
+        "layoutY": 300
+      },
+      "type": "org.thingsboard.rule.engine.rpc.TbSendRPCReplyNode",
+      "name": "rpcReply",
+      "debugMode": true,
+      "configuration": {
+        "requestIdMetaDataAttribute": "requestId"
+      }
+    }
+  ],
+  "connections": [
+    {
+      "fromIndex": 0,
+      "toIndex": 1,
+      "type": "RPC Request from Device"
+    },
+    {
+      "fromIndex": 1,
+      "toIndex": 2,
+      "type": "Success"
+    },
+    {
+      "fromIndex": 1,
+      "toIndex": 2,
+      "type": "Failure"
+    }
+  ],
+  "ruleChainConnections": null
+}
\ No newline at end of file