thingsboard-aplcache

Details

diff --git a/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java b/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java
index a2dd53b..d2d9b7a 100644
--- a/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java
+++ b/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java
@@ -62,6 +62,7 @@ public final class PluginProcessingContext implements PluginContext {
     private static final Executor executor = Executors.newSingleThreadExecutor();
     public static final String CUSTOMER_USER_IS_NOT_ALLOWED_TO_PERFORM_THIS_OPERATION = "Customer user is not allowed to perform this operation!";
     public static final String SYSTEM_ADMINISTRATOR_IS_NOT_ALLOWED_TO_PERFORM_THIS_OPERATION = "System administrator is not allowed to perform this operation!";
+    public static final String DEVICE_WITH_REQUESTED_ID_NOT_FOUND = "Device with requested id wasn't found!";
 
     private final SharedPluginProcessingContext pluginCtx;
     private final Optional<PluginApiCallSecurityContext> securityCtx;
@@ -309,7 +310,7 @@ public final class PluginProcessingContext implements PluginContext {
             ListenableFuture<Device> deviceFuture = pluginCtx.deviceService.findDeviceByIdAsync(new DeviceId(entityId.getId()));
             Futures.addCallback(deviceFuture, getCallback(callback, device -> {
                 if (device == null) {
-                    return ValidationResult.entityNotFound("Device with requested id wasn't found!");
+                    return ValidationResult.entityNotFound(DEVICE_WITH_REQUESTED_ID_NOT_FOUND);
                 } else {
                     if (!device.getTenantId().equals(ctx.getTenantId())) {
                         return ValidationResult.accessDenied("Device doesn't belong to the current Tenant!");
diff --git a/application/src/test/java/org/thingsboard/server/controller/AbstractControllerTest.java b/application/src/test/java/org/thingsboard/server/controller/AbstractControllerTest.java
index 19e4329..8d68bf8 100644
--- a/application/src/test/java/org/thingsboard/server/controller/AbstractControllerTest.java
+++ b/application/src/test/java/org/thingsboard/server/controller/AbstractControllerTest.java
@@ -106,6 +106,11 @@ public abstract class AbstractControllerTest {
     protected static final String CUSTOMER_USER_EMAIL = "testcustomer@thingsboard.org";
     private static final String CUSTOMER_USER_PASSWORD = "customer";
 
+    /** See {@link org.springframework.test.web.servlet.DefaultMvcResult#getAsyncResult(long)}
+     *  and {@link org.springframework.mock.web.MockAsyncContext#getTimeout()}
+     */
+    private static final long DEFAULT_TIMEOUT = -1L;
+
     protected MediaType contentType = new MediaType(MediaType.APPLICATION_JSON.getType(),
             MediaType.APPLICATION_JSON.getSubtype(),
             Charset.forName("utf8"));
@@ -336,7 +341,7 @@ public abstract class AbstractControllerTest {
     }
 
     protected <T> T doPost(String urlTemplate, T content, Class<T> responseClass, ResultMatcher resultMatcher, String... params) throws Exception {
-        return readResponse(doPost(urlTemplate, params).andExpect(resultMatcher), responseClass);
+        return readResponse(doPost(urlTemplate, content, params).andExpect(resultMatcher), responseClass);
     }
 
     protected <T> T doPost(String urlTemplate, T content, Class<T> responseClass, String... params) throws Exception {
@@ -344,7 +349,11 @@ public abstract class AbstractControllerTest {
     }
 
     protected <T> T doPostAsync(String urlTemplate, T content, Class<T> responseClass, ResultMatcher resultMatcher, String... params) throws Exception {
-        return readResponse(doPostAsync(urlTemplate, content, params).andExpect(resultMatcher), responseClass);
+        return readResponse(doPostAsync(urlTemplate, content, DEFAULT_TIMEOUT, params).andExpect(resultMatcher), responseClass);
+    }
+
+    protected <T> T doPostAsync(String urlTemplate, T content, Class<T> responseClass, ResultMatcher resultMatcher, Long timeout, String... params) throws Exception {
+        return readResponse(doPostAsync(urlTemplate, content, timeout, params).andExpect(resultMatcher), responseClass);
     }
 
     protected <T> T doDelete(String urlTemplate, Class<T> responseClass, String... params) throws Exception {
@@ -366,12 +375,13 @@ public abstract class AbstractControllerTest {
         return mockMvc.perform(postRequest);
     }
 
-    protected <T> ResultActions doPostAsync(String urlTemplate, T content, String... params)  throws Exception {
+    protected <T> ResultActions doPostAsync(String urlTemplate, T content, Long timeout, String... params)  throws Exception {
         MockHttpServletRequestBuilder postRequest = post(urlTemplate);
         setJwtToken(postRequest);
         String json = json(content);
         postRequest.contentType(contentType).content(json);
         MvcResult result = mockMvc.perform(postRequest).andReturn();
+        result.getAsyncResult(timeout);
         return mockMvc.perform(asyncDispatch(result));
     }
 
@@ -384,8 +394,8 @@ public abstract class AbstractControllerTest {
 
     protected void populateParams(MockHttpServletRequestBuilder request, String... params) {
         if (params != null && params.length > 0) {
-            Assert.assertEquals(params.length % 2, 0);
-            MultiValueMap<String, String> paramsMap = new LinkedMultiValueMap<String, String>();
+            Assert.assertEquals(0, params.length % 2);
+            MultiValueMap<String, String> paramsMap = new LinkedMultiValueMap<>();
             for (int i = 0; i < params.length; i += 2) {
                 paramsMap.add(params[i], params[i + 1]);
             }
diff --git a/application/src/test/java/org/thingsboard/server/mqtt/rpc/AbstractMqttServerSideRpcIntegrationTest.java b/application/src/test/java/org/thingsboard/server/mqtt/rpc/AbstractMqttServerSideRpcIntegrationTest.java
index ac474b8..8b4332c 100644
--- a/application/src/test/java/org/thingsboard/server/mqtt/rpc/AbstractMqttServerSideRpcIntegrationTest.java
+++ b/application/src/test/java/org/thingsboard/server/mqtt/rpc/AbstractMqttServerSideRpcIntegrationTest.java
@@ -15,21 +15,23 @@
  */
 package org.thingsboard.server.mqtt.rpc;
 
+import java.util.Arrays;
+
+import com.datastax.driver.core.utils.UUIDs;
+import com.fasterxml.jackson.core.type.TypeReference;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.eclipse.paho.client.mqttv3.*;
 import org.junit.*;
-import org.springframework.http.HttpStatus;
-import org.springframework.web.client.HttpClientErrorException;
+import org.thingsboard.server.actors.plugin.PluginProcessingContext;
 import org.thingsboard.server.common.data.Device;
 import org.thingsboard.server.common.data.Tenant;
 import org.thingsboard.server.common.data.User;
+import org.thingsboard.server.common.data.page.TextPageData;
+import org.thingsboard.server.common.data.plugin.PluginMetaData;
 import org.thingsboard.server.common.data.security.Authority;
 import org.thingsboard.server.common.data.security.DeviceCredentials;
 import org.thingsboard.server.controller.AbstractControllerTest;
-import org.thingsboard.server.dao.service.DaoNoSqlTest;
-
-import java.util.UUID;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
@@ -42,15 +44,19 @@ import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.
 public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractControllerTest {
 
     private static final String MQTT_URL = "tcp://localhost:1883";
-    private static final String FAIL_MSG_IF_HTTP_CLIENT_ERROR_NOT_ENCOUNTERED = "HttpClientErrorException expected, but not encountered";
+    private static final Long TIME_TO_HANDLE_REQUEST = 500L;
 
     private Tenant savedTenant;
     private User tenantAdmin;
+    private Long asyncContextTimeoutToUseRpcPlugin;
+
 
     @Before
     public void beforeTest() throws Exception {
         loginSysAdmin();
 
+        asyncContextTimeoutToUseRpcPlugin = getAsyncContextTimeoutToUseRpcPlugin();
+
         Tenant tenant = new Tenant();
         tenant.setTitle("My tenant");
         savedTenant = doPost("/api/tenant", tenant, Tenant.class);
@@ -70,8 +76,7 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractC
     public void afterTest() throws Exception {
         loginSysAdmin();
         if (savedTenant != null) {
-            doDelete("/api/tenant/" + savedTenant.getId().getId().toString())
-                    .andExpect(status().isOk());
+            doDelete("/api/tenant/" + savedTenant.getId().getId().toString()).andExpect(status().isOk());
         }
     }
 
@@ -102,7 +107,6 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractC
     }
 
     @Test
-    @Ignore // TODO: figure out the right error code for this case. Ignored due to failure: expected 408 but was: 200
     public void testServerMqttOneWayRpcDeviceOffline() throws Exception {
         Device device = new Device();
         device.setName("Test One-Way Server-Side RPC Device Offline");
@@ -115,29 +119,19 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractC
 
         String setGpioRequest = "{\"method\":\"setGpio\",\"params\":{\"pin\": \"23\",\"value\": 1}}";
         String deviceId = savedDevice.getId().getId().toString();
-        try {
-            doPost("/api/plugins/rpc/oneway/" + deviceId, setGpioRequest, String.class, status().is(408));
-            Assert.fail(FAIL_MSG_IF_HTTP_CLIENT_ERROR_NOT_ENCOUNTERED);
-        } catch (HttpClientErrorException e) {
-            log.error(e.getMessage(), e);
-            Assert.assertEquals(HttpStatus.REQUEST_TIMEOUT, e.getStatusCode());
-            Assert.assertEquals("408 null", e.getMessage());
-        }
+
+        doPostAsync("/api/plugins/rpc/oneway/" + deviceId, setGpioRequest, String.class, status().isRequestTimeout(),
+                asyncContextTimeoutToUseRpcPlugin);
     }
 
     @Test
-    @Ignore // TODO: figure out the right error code for this case. Ignored due to failure: expected 400 (404?) but was: 401
     public void testServerMqttOneWayRpcDeviceDoesNotExist() throws Exception {
         String setGpioRequest = "{\"method\":\"setGpio\",\"params\":{\"pin\": \"23\",\"value\": 1}}";
-        String nonExistentDeviceId = UUID.randomUUID().toString();
-        try {
-            doPostAsync("/api/plugins/rpc/oneway/" + nonExistentDeviceId, setGpioRequest, String.class, status().is(400));
-            Assert.fail(FAIL_MSG_IF_HTTP_CLIENT_ERROR_NOT_ENCOUNTERED);
-        } catch (HttpClientErrorException e) {
-            log.error(e.getMessage(), e);
-            Assert.assertEquals(HttpStatus.BAD_REQUEST, e.getStatusCode());
-            Assert.assertEquals("400 null", e.getMessage());
-        }
+        String nonExistentDeviceId = UUIDs.timeBased().toString();
+
+        String result = doPostAsync("/api/plugins/rpc/oneway/" + nonExistentDeviceId, setGpioRequest, String.class,
+                status().isNotFound());
+        Assert.assertEquals(PluginProcessingContext.DEVICE_WITH_REQUESTED_ID_NOT_FOUND, result);
     }
 
     @Test
@@ -168,7 +162,6 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractC
     }
 
     @Test
-    @Ignore // TODO: figure out the right error code for this case. Ignored due to failure: expected 408 but was: 200
     public void testServerMqttTwoWayRpcDeviceOffline() throws Exception {
         Device device = new Device();
         device.setName("Test Two-Way Server-Side RPC Device Offline");
@@ -181,29 +174,19 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractC
 
         String setGpioRequest = "{\"method\":\"setGpio\",\"params\":{\"pin\": \"23\",\"value\": 1}}";
         String deviceId = savedDevice.getId().getId().toString();
-        try {
-            doPost("/api/plugins/rpc/twoway/" + deviceId, setGpioRequest, String.class, status().is(408));
-            Assert.fail(FAIL_MSG_IF_HTTP_CLIENT_ERROR_NOT_ENCOUNTERED);
-        } catch (HttpClientErrorException e) {
-            log.error(e.getMessage(), e);
-            Assert.assertEquals(HttpStatus.REQUEST_TIMEOUT, e.getStatusCode());
-            Assert.assertEquals("408 null", e.getMessage());
-        }
+
+        doPostAsync("/api/plugins/rpc/twoway/" + deviceId, setGpioRequest, String.class, status().isRequestTimeout(),
+                asyncContextTimeoutToUseRpcPlugin);
     }
 
     @Test
-    @Ignore // TODO: figure out the right error code for this case. Ignored due to failure: expected 400 (404?) but was: 401
     public void testServerMqttTwoWayRpcDeviceDoesNotExist() throws Exception {
         String setGpioRequest = "{\"method\":\"setGpio\",\"params\":{\"pin\": \"23\",\"value\": 1}}";
-        String nonExistentDeviceId = UUID.randomUUID().toString();
-        try {
-            doPostAsync("/api/plugins/rpc/oneway/" + nonExistentDeviceId, setGpioRequest, String.class, status().is(400));
-            Assert.fail(FAIL_MSG_IF_HTTP_CLIENT_ERROR_NOT_ENCOUNTERED);
-        } catch (HttpClientErrorException e) {
-            log.error(e.getMessage(), e);
-            Assert.assertEquals(HttpStatus.BAD_REQUEST, e.getStatusCode());
-            Assert.assertEquals("400 null", e.getMessage());
-        }
+        String nonExistentDeviceId = UUIDs.timeBased().toString();
+
+        String result = doPostAsync("/api/plugins/rpc/twoway/" + nonExistentDeviceId, setGpioRequest, String.class,
+                status().isNotFound());
+        Assert.assertEquals(PluginProcessingContext.DEVICE_WITH_REQUESTED_ID_NOT_FOUND, result);
     }
 
     private Device getSavedDevice(Device device) throws Exception {
@@ -214,6 +197,13 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractC
         return doGet("/api/device/" + savedDevice.getId().getId().toString() + "/credentials", DeviceCredentials.class);
     }
 
+    private Long getAsyncContextTimeoutToUseRpcPlugin() throws Exception {
+        TextPageData<PluginMetaData> plugins = doGetTyped("/api/plugin/system?limit=1&textSearch=system rpc plugin",
+                new TypeReference<TextPageData<PluginMetaData>>(){});
+        Long systemRpcPluginTimeout = plugins.getData().iterator().next().getConfiguration().get("defaultTimeout").asLong();
+        return systemRpcPluginTimeout + TIME_TO_HANDLE_REQUEST;
+    }
+
     private static class TestMqttCallback implements MqttCallback {
 
         private final MqttAsyncClient client;
@@ -228,10 +218,10 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractC
 
         @Override
         public void messageArrived(String requestTopic, MqttMessage mqttMessage) throws Exception {
-            log.info("Message Arrived: " + mqttMessage.getPayload().toString());
+            log.info("Message Arrived: " + Arrays.toString(mqttMessage.getPayload()));
             MqttMessage message = new MqttMessage();
             String responseTopic = requestTopic.replace("request", "response");
-            message.setPayload("{\"value1\":\"A\", \"value2\":\"B\"}".getBytes());
+            message.setPayload("{\"value1\":\"A\", \"value2\":\"B\"}".getBytes("UTF-8"));
             client.publish(responseTopic, message);
         }