thingsboard-memoizeit

Merge pull request #263 from mp-loki/master Fixes to MQTT

8/28/2017 1:51:12 AM

Details

diff --git a/application/src/test/java/org/thingsboard/server/controller/AbstractControllerTest.java b/application/src/test/java/org/thingsboard/server/controller/AbstractControllerTest.java
index 94cef57..689f316 100644
--- a/application/src/test/java/org/thingsboard/server/controller/AbstractControllerTest.java
+++ b/application/src/test/java/org/thingsboard/server/controller/AbstractControllerTest.java
@@ -41,6 +41,7 @@ import org.springframework.context.annotation.Configuration;
 import org.springframework.http.HttpHeaders;
 import org.springframework.http.MediaType;
 import org.springframework.http.converter.HttpMessageConverter;
+import org.springframework.http.converter.StringHttpMessageConverter;
 import org.springframework.http.converter.json.MappingJackson2HttpMessageConverter;
 import org.springframework.mock.http.MockHttpInputMessage;
 import org.springframework.mock.http.MockHttpOutputMessage;
@@ -51,6 +52,7 @@ import org.springframework.test.context.TestPropertySource;
 import org.springframework.test.context.junit4.SpringRunner;
 import org.springframework.test.context.web.WebAppConfiguration;
 import org.springframework.test.web.servlet.MockMvc;
+import org.springframework.test.web.servlet.MvcResult;
 import org.springframework.test.web.servlet.ResultActions;
 import org.springframework.test.web.servlet.ResultMatcher;
 import org.springframework.test.web.servlet.request.MockHttpServletRequestBuilder;
@@ -97,28 +99,31 @@ public abstract class AbstractControllerTest {
 
     protected static final String SYS_ADMIN_EMAIL = "sysadmin@thingsboard.org";
     private static final String SYS_ADMIN_PASSWORD = "sysadmin";
-    
+
     protected static final String TENANT_ADMIN_EMAIL = "testtenant@thingsboard.org";
     private static final String TENANT_ADMIN_PASSWORD = "tenant";
 
     protected static final String CUSTOMER_USER_EMAIL = "testcustomer@thingsboard.org";
     private static final String CUSTOMER_USER_PASSWORD = "customer";
-    
+
     protected MediaType contentType = new MediaType(MediaType.APPLICATION_JSON.getType(),
             MediaType.APPLICATION_JSON.getSubtype(),
             Charset.forName("utf8"));
 
     protected MockMvc mockMvc;
-    
+
     protected String token;
     protected String refreshToken;
     protected String username;
 
     private TenantId tenantId;
-    
+
     @SuppressWarnings("rawtypes")
     private HttpMessageConverter mappingJackson2HttpMessageConverter;
-    
+
+    @SuppressWarnings("rawtypes")
+    private HttpMessageConverter stringHttpMessageConverter;
+
     @Autowired
     private WebApplicationContext webApplicationContext;
 
@@ -132,7 +137,7 @@ public abstract class AbstractControllerTest {
             log.info("Finished test: {}", description.getMethodName());
         }
     };
-    
+
     @Autowired
     void setConverters(HttpMessageConverter<?>[] converters) {
 
@@ -141,10 +146,15 @@ public abstract class AbstractControllerTest {
                 .findAny()
                 .get();
 
+        this.stringHttpMessageConverter = Arrays.stream(converters)
+                .filter(hmc -> hmc instanceof StringHttpMessageConverter)
+                .findAny()
+                .get();
+
         Assert.assertNotNull("the JSON message converter must not be null",
                 this.mappingJackson2HttpMessageConverter);
     }
-    
+
     @Before
     public void setup() throws Exception {
         log.info("Executing setup");
@@ -188,7 +198,7 @@ public abstract class AbstractControllerTest {
     public void teardown() throws Exception {
         log.info("Executing teardown");
         loginSysAdmin();
-        doDelete("/api/tenant/"+tenantId.getId().toString())
+        doDelete("/api/tenant/" + tenantId.getId().toString())
                 .andExpect(status().isOk());
         log.info("Executed teardown");
     }
@@ -196,7 +206,7 @@ public abstract class AbstractControllerTest {
     protected void loginSysAdmin() throws Exception {
         login(SYS_ADMIN_EMAIL, SYS_ADMIN_PASSWORD);
     }
-    
+
     protected void loginTenantAdmin() throws Exception {
         login(TENANT_ADMIN_EMAIL, TENANT_ADMIN_PASSWORD);
     }
@@ -204,13 +214,13 @@ public abstract class AbstractControllerTest {
     protected void loginCustomerUser() throws Exception {
         login(CUSTOMER_USER_EMAIL, CUSTOMER_USER_PASSWORD);
     }
-    
+
     protected User createUserAndLogin(User user, String password) throws Exception {
         User savedUser = doPost("/api/user", user, User.class);
         logout();
         doGet("/api/noauth/activate?activateToken={activateToken}", TestMailService.currentActivateToken)
-        .andExpect(status().isSeeOther())
-        .andExpect(header().string(HttpHeaders.LOCATION, "/login/createPassword?activateToken=" + TestMailService.currentActivateToken));
+                .andExpect(status().isSeeOther())
+                .andExpect(header().string(HttpHeaders.LOCATION, "/login/createPassword?activateToken=" + TestMailService.currentActivateToken));
         JsonNode tokenInfo = readResponse(doPost("/api/noauth/activate", "activateToken", TestMailService.currentActivateToken, "password", password).andExpect(status().isOk()), JsonNode.class);
         validateAndSetJwtToken(tokenInfo, user.getEmail());
         return savedUser;
@@ -247,14 +257,14 @@ public abstract class AbstractControllerTest {
         Assert.assertNotNull(token);
         Assert.assertFalse(token.isEmpty());
         int i = token.lastIndexOf('.');
-        Assert.assertTrue(i>0);
-        String withoutSignature = token.substring(0, i+1);
-        Jwt<Header,Claims> jwsClaims = Jwts.parser().parseClaimsJwt(withoutSignature);
+        Assert.assertTrue(i > 0);
+        String withoutSignature = token.substring(0, i + 1);
+        Jwt<Header, Claims> jwsClaims = Jwts.parser().parseClaimsJwt(withoutSignature);
         Claims claims = jwsClaims.getBody();
         String subject = claims.getSubject();
         Assert.assertEquals(username, subject);
     }
-    
+
     protected void logout() throws Exception {
         this.token = null;
         this.refreshToken = null;
@@ -266,24 +276,35 @@ public abstract class AbstractControllerTest {
             request.header(ThingsboardSecurityConfiguration.JWT_TOKEN_HEADER_PARAM, "Bearer " + this.token);
         }
     }
-     
+
     protected ResultActions doGet(String urlTemplate, Object... urlVariables) throws Exception {
         MockHttpServletRequestBuilder getRequest = get(urlTemplate, urlVariables);
         setJwtToken(getRequest);
         return mockMvc.perform(getRequest);
     }
-    
+
     protected <T> T doGet(String urlTemplate, Class<T> responseClass, Object... urlVariables) throws Exception {
         return readResponse(doGet(urlTemplate, urlVariables).andExpect(status().isOk()), responseClass);
     }
-    
+
+    protected <T> T doGetAsync(String urlTemplate, Class<T> responseClass, Object... urlVariables) throws Exception {
+        return readResponse(doGetAsync(urlTemplate, urlVariables).andExpect(status().isOk()), responseClass);
+    }
+
+    protected ResultActions doGetAsync(String urlTemplate, Object... urlVariables) throws Exception {
+        MockHttpServletRequestBuilder getRequest;
+        getRequest = get(urlTemplate, urlVariables);
+        setJwtToken(getRequest);
+        return mockMvc.perform(asyncDispatch(mockMvc.perform(getRequest).andExpect(request().asyncStarted()).andReturn()));
+    }
+
     protected <T> T doGetTyped(String urlTemplate, TypeReference<T> responseType, Object... urlVariables) throws Exception {
         return readResponse(doGet(urlTemplate, urlVariables).andExpect(status().isOk()), responseType);
     }
-    
+
     protected <T> T doGetTypedWithPageLink(String urlTemplate, TypeReference<T> responseType,
-            TextPageLink pageLink,
-            Object... urlVariables) throws Exception {
+                                           TextPageLink pageLink,
+                                           Object... urlVariables) throws Exception {
         List<Object> pageLinkVariables = new ArrayList<>();
         urlTemplate += "limit={limit}";
         pageLinkVariables.add(pageLink.getLimit());
@@ -299,75 +320,93 @@ public abstract class AbstractControllerTest {
             urlTemplate += "&textOffset={textOffset}";
             pageLinkVariables.add(pageLink.getTextOffset());
         }
-        
-        Object[] vars = new Object[urlVariables.length + pageLinkVariables.size()];        
+
+        Object[] vars = new Object[urlVariables.length + pageLinkVariables.size()];
         System.arraycopy(urlVariables, 0, vars, 0, urlVariables.length);
         System.arraycopy(pageLinkVariables.toArray(), 0, vars, urlVariables.length, pageLinkVariables.size());
-        
+
         return readResponse(doGet(urlTemplate, vars).andExpect(status().isOk()), responseType);
     }
-    
+
     protected <T> T doPost(String urlTemplate, Class<T> responseClass, String... params) throws Exception {
         return readResponse(doPost(urlTemplate, params).andExpect(status().isOk()), responseClass);
     }
-    
+
+    protected <T> T doPost(String urlTemplate, T content, Class<T> responseClass, ResultMatcher resultMatcher, String... params) throws Exception {
+        return readResponse(doPost(urlTemplate, params).andExpect(resultMatcher), responseClass);
+    }
+
     protected <T> T doPost(String urlTemplate, T content, Class<T> responseClass, String... params) throws Exception {
         return readResponse(doPost(urlTemplate, content, params).andExpect(status().isOk()), responseClass);
     }
 
+    protected <T> T doPostAsync(String urlTemplate, T content, Class<T> responseClass, ResultMatcher resultMatcher, String... params) throws Exception {
+        return readResponse(doPostAsync(urlTemplate, content, params).andExpect(resultMatcher), responseClass);
+    }
+
     protected <T> T doDelete(String urlTemplate, Class<T> responseClass, String... params) throws Exception {
         return readResponse(doDelete(urlTemplate, params).andExpect(status().isOk()), responseClass);
     }
-     
+
     protected ResultActions doPost(String urlTemplate, String... params) throws Exception {
         MockHttpServletRequestBuilder postRequest = post(urlTemplate);
         setJwtToken(postRequest);
         populateParams(postRequest, params);
         return mockMvc.perform(postRequest);
     }
-    
-    protected <T> ResultActions doPost(String urlTemplate, T content, String... params)  throws Exception {
+
+    protected <T> ResultActions doPost(String urlTemplate, T content, String... params) throws Exception {
         MockHttpServletRequestBuilder postRequest = post(urlTemplate);
         setJwtToken(postRequest);
         String json = json(content);
         postRequest.contentType(contentType).content(json);
-        populateParams(postRequest, params);
         return mockMvc.perform(postRequest);
     }
-    
+
+    protected <T> ResultActions doPostAsync(String urlTemplate, T content, String... params)  throws Exception {
+        MockHttpServletRequestBuilder postRequest = post(urlTemplate);
+        setJwtToken(postRequest);
+        String json = json(content);
+        postRequest.contentType(contentType).content(json);
+        MvcResult result = mockMvc.perform(postRequest).andReturn();
+        return mockMvc.perform(asyncDispatch(result));
+    }
+
     protected ResultActions doDelete(String urlTemplate, String... params) throws Exception {
         MockHttpServletRequestBuilder deleteRequest = delete(urlTemplate);
         setJwtToken(deleteRequest);
         populateParams(deleteRequest, params);
         return mockMvc.perform(deleteRequest);
     }
-    
+
     protected void populateParams(MockHttpServletRequestBuilder request, String... params) {
         if (params != null && params.length > 0) {
             Assert.assertEquals(params.length % 2, 0);
             MultiValueMap<String, String> paramsMap = new LinkedMultiValueMap<String, String>();
-            for (int i=0;i<params.length;i+=2) {
-                paramsMap.add(params[i], params[i+1]);
+            for (int i = 0; i < params.length; i += 2) {
+                paramsMap.add(params[i], params[i + 1]);
             }
             request.params(paramsMap);
         }
     }
-    
+
     @SuppressWarnings("unchecked")
     protected String json(Object o) throws IOException {
         MockHttpOutputMessage mockHttpOutputMessage = new MockHttpOutputMessage();
-        this.mappingJackson2HttpMessageConverter.write(
-                o, MediaType.APPLICATION_JSON, mockHttpOutputMessage);
+
+        HttpMessageConverter converter = o instanceof String ? stringHttpMessageConverter : mappingJackson2HttpMessageConverter;
+        converter.write(o, MediaType.APPLICATION_JSON, mockHttpOutputMessage);
         return mockHttpOutputMessage.getBodyAsString();
     }
-    
+
     @SuppressWarnings("unchecked")
     protected <T> T readResponse(ResultActions result, Class<T> responseClass) throws Exception {
         byte[] content = result.andReturn().getResponse().getContentAsByteArray();
         MockHttpInputMessage mockHttpInputMessage = new MockHttpInputMessage(content);
-        return (T) this.mappingJackson2HttpMessageConverter.read(responseClass, mockHttpInputMessage);
+        HttpMessageConverter converter = responseClass.equals(String.class) ? stringHttpMessageConverter : mappingJackson2HttpMessageConverter;
+        return (T) converter.read(responseClass, mockHttpInputMessage);
     }
-    
+
     protected <T> T readResponse(ResultActions result, TypeReference<T> type) throws Exception {
         byte[] content = result.andReturn().getResponse().getContentAsByteArray();
         ObjectMapper mapper = new ObjectMapper();
diff --git a/application/src/test/java/org/thingsboard/server/mqtt/MqttSqlTestSuite.java b/application/src/test/java/org/thingsboard/server/mqtt/MqttSqlTestSuite.java
new file mode 100644
index 0000000..e92cbe7
--- /dev/null
+++ b/application/src/test/java/org/thingsboard/server/mqtt/MqttSqlTestSuite.java
@@ -0,0 +1,37 @@
+/**
+ * Copyright © 2016-2017 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.mqtt;
+
+import org.cassandraunit.dataset.cql.ClassPathCQLDataSet;
+import org.junit.ClassRule;
+import org.junit.extensions.cpsuite.ClasspathSuite;
+import org.junit.runner.RunWith;
+import org.thingsboard.server.dao.CustomCassandraCQLUnit;
+import org.thingsboard.server.dao.CustomSqlUnit;
+
+import java.util.Arrays;
+
+@RunWith(ClasspathSuite.class)
+@ClasspathSuite.ClassnameFilters({
+        "org.thingsboard.server.mqtt.rpc.sql.*Test", "org.thingsboard.server.mqtt.telemetry.sql.*Test"})
+public class MqttSqlTestSuite {
+
+    @ClassRule
+    public static CustomSqlUnit sqlUnit = new CustomSqlUnit(
+            Arrays.asList("sql/schema.sql", "sql/system-data.sql"),
+            "sql/drop-all-tables.sql",
+            "sql-test.properties");
+}
diff --git a/application/src/test/java/org/thingsboard/server/mqtt/rpc/nosql/MqttServerSideRpcNoSqlIntegrationTest.java b/application/src/test/java/org/thingsboard/server/mqtt/rpc/nosql/MqttServerSideRpcNoSqlIntegrationTest.java
new file mode 100644
index 0000000..7cd9efb
--- /dev/null
+++ b/application/src/test/java/org/thingsboard/server/mqtt/rpc/nosql/MqttServerSideRpcNoSqlIntegrationTest.java
@@ -0,0 +1,26 @@
+/**
+ * Copyright © 2016-2017 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.mqtt.rpc.nosql;
+
+import org.thingsboard.server.dao.service.DaoNoSqlTest;
+import org.thingsboard.server.mqtt.rpc.AbstractMqttServerSideRpcIntegrationTest;
+
+/**
+ * Created by Valerii Sosliuk on 8/22/2017.
+ */
+@DaoNoSqlTest
+public class MqttServerSideRpcNoSqlIntegrationTest extends AbstractMqttServerSideRpcIntegrationTest {
+}
diff --git a/application/src/test/java/org/thingsboard/server/mqtt/rpc/sql/MqttServerSideRpcSqlIntegrationTest.java b/application/src/test/java/org/thingsboard/server/mqtt/rpc/sql/MqttServerSideRpcSqlIntegrationTest.java
new file mode 100644
index 0000000..b520798
--- /dev/null
+++ b/application/src/test/java/org/thingsboard/server/mqtt/rpc/sql/MqttServerSideRpcSqlIntegrationTest.java
@@ -0,0 +1,27 @@
+/**
+ * Copyright © 2016-2017 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.mqtt.rpc.sql;
+
+import org.thingsboard.server.dao.service.DaoNoSqlTest;
+import org.thingsboard.server.dao.service.DaoSqlTest;
+import org.thingsboard.server.mqtt.rpc.AbstractMqttServerSideRpcIntegrationTest;
+
+/**
+ * Created by Valerii Sosliuk on 8/22/2017.
+ */
+@DaoSqlTest
+public class MqttServerSideRpcSqlIntegrationTest extends AbstractMqttServerSideRpcIntegrationTest {
+}
diff --git a/application/src/test/java/org/thingsboard/server/mqtt/telemetry/nosql/MqttTelemetryNoSqlIntegrationTest.java b/application/src/test/java/org/thingsboard/server/mqtt/telemetry/nosql/MqttTelemetryNoSqlIntegrationTest.java
new file mode 100644
index 0000000..069397e
--- /dev/null
+++ b/application/src/test/java/org/thingsboard/server/mqtt/telemetry/nosql/MqttTelemetryNoSqlIntegrationTest.java
@@ -0,0 +1,26 @@
+/**
+ * Copyright © 2016-2017 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.mqtt.telemetry.nosql;
+
+import org.thingsboard.server.dao.service.DaoNoSqlTest;
+import org.thingsboard.server.mqtt.telemetry.AbstractMqttTelemetryIntegrationTest;
+
+/**
+ * Created by Valerii Sosliuk on 8/22/2017.
+ */
+@DaoNoSqlTest
+public class MqttTelemetryNoSqlIntegrationTest extends AbstractMqttTelemetryIntegrationTest {
+}
diff --git a/application/src/test/java/org/thingsboard/server/mqtt/telemetry/sql/MqttTelemetrySqlIntegrationTest.java b/application/src/test/java/org/thingsboard/server/mqtt/telemetry/sql/MqttTelemetrySqlIntegrationTest.java
new file mode 100644
index 0000000..bd42827
--- /dev/null
+++ b/application/src/test/java/org/thingsboard/server/mqtt/telemetry/sql/MqttTelemetrySqlIntegrationTest.java
@@ -0,0 +1,27 @@
+/**
+ * Copyright © 2016-2017 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.mqtt.telemetry.sql;
+
+import org.thingsboard.server.dao.service.DaoNoSqlTest;
+import org.thingsboard.server.dao.service.DaoSqlTest;
+import org.thingsboard.server.mqtt.telemetry.AbstractMqttTelemetryIntegrationTest;
+
+/**
+ * Created by Valerii Sosliuk on 8/22/2017.
+ */
+@DaoSqlTest
+public class MqttTelemetrySqlIntegrationTest extends AbstractMqttTelemetryIntegrationTest {
+}
diff --git a/tools/src/main/python/one-way-ssl-mqtt-client.py b/tools/src/main/python/one-way-ssl-mqtt-client.py
index 9266fbf..f4e7e1d 100644
--- a/tools/src/main/python/one-way-ssl-mqtt-client.py
+++ b/tools/src/main/python/one-way-ssl-mqtt-client.py
@@ -19,7 +19,7 @@ import paho.mqtt.client as mqtt
 import ssl, socket
 
 # The callback for when the client receives a CONNACK response from the server.
-def on_connect(client, userdata, rc):
+def on_connect(client, userdata, rc, *extra_params):
    print('Connected with result code '+str(rc))
    # Subscribing in on_connect() means that if we lose the connection and
    # reconnect then subscriptions will be renewed.
diff --git a/tools/src/main/python/simple-mqtt-client.py b/tools/src/main/python/simple-mqtt-client.py
index 5f511f4..8b21df6 100644
--- a/tools/src/main/python/simple-mqtt-client.py
+++ b/tools/src/main/python/simple-mqtt-client.py
@@ -18,8 +18,9 @@
 import paho.mqtt.client as mqtt
 
 # The callback for when the client receives a CONNACK response from the server.
-def on_connect(client, userdata, rc):
+def on_connect(client, userdata, rc, *extra_params):
    print('Connected with result code '+str(rc))
+   #print('***' + str(r))
    # Subscribing in on_connect() means that if we lose the connection and
    # reconnect then subscriptions will be renewed.
    client.subscribe('v1/devices/me/attributes')
diff --git a/tools/src/main/python/two-way-ssl-mqtt-client.py b/tools/src/main/python/two-way-ssl-mqtt-client.py
index d3b3242..169c0e5 100644
--- a/tools/src/main/python/two-way-ssl-mqtt-client.py
+++ b/tools/src/main/python/two-way-ssl-mqtt-client.py
@@ -19,7 +19,7 @@ import paho.mqtt.client as mqtt
 import ssl, socket
 
 # The callback for when the client receives a CONNACK response from the server.
-def on_connect(client, userdata, rc):
+def on_connect(client, userdata, rc, *extra_params):
    print('Connected with result code '+str(rc))
    # Subscribing in on_connect() means that if we lose the connection and
    # reconnect then subscriptions will be renewed.