Details
diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
index 5dd738b..6fe4f6d 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
@@ -136,6 +136,9 @@ public class ActorSystemContext {
@Value("${actors.statistics.persist_frequency}")
@Getter private long statisticsPersistFrequency;
+ @Value("${actors.tenant.create_components_on_init}")
+ @Getter private boolean tenantComponentsInitEnabled;
+
@Getter @Setter private ActorSystem actorSystem;
@Getter @Setter private ActorRef appActor;
diff --git a/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java b/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java
index 2ef2ca7..bb2107b 100644
--- a/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java
@@ -174,7 +174,7 @@ public class AppActor extends ContextAwareActor {
TenantId tenantId = toDeviceActorMsg.getTenantId();
ActorRef tenantActor = getOrCreateTenantActor(tenantId);
if (toDeviceActorMsg.getPayload().getMsgType().requiresRulesProcessing()) {
- tenantActor.tell(new RuleChainDeviceMsg(toDeviceActorMsg, ruleManager.getRuleChain()), context().self());
+ tenantActor.tell(new RuleChainDeviceMsg(toDeviceActorMsg, ruleManager.getRuleChain(this.context())), context().self());
} else {
tenantActor.tell(toDeviceActorMsg, context().self());
}
diff --git a/application/src/main/java/org/thingsboard/server/actors/shared/plugin/TenantPluginManager.java b/application/src/main/java/org/thingsboard/server/actors/shared/plugin/TenantPluginManager.java
index 89f9efe..dde1af6 100644
--- a/application/src/main/java/org/thingsboard/server/actors/shared/plugin/TenantPluginManager.java
+++ b/application/src/main/java/org/thingsboard/server/actors/shared/plugin/TenantPluginManager.java
@@ -15,6 +15,7 @@
*/
package org.thingsboard.server.actors.shared.plugin;
+import akka.actor.ActorContext;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.actors.service.DefaultActorService;
import org.thingsboard.server.common.data.id.TenantId;
@@ -30,6 +31,12 @@ public class TenantPluginManager extends PluginManager {
this.tenantId = tenantId;
}
+ public void init(ActorContext context) {
+ if (systemContext.isTenantComponentsInitEnabled()) {
+ super.init(context);
+ }
+ }
+
@Override
FetchFunction<PluginMetaData> getFetchPluginsFunction() {
return link -> pluginService.findTenantPlugins(tenantId, link);
diff --git a/application/src/main/java/org/thingsboard/server/actors/shared/rule/RuleManager.java b/application/src/main/java/org/thingsboard/server/actors/shared/rule/RuleManager.java
index 1e00a6d..c2fc24a 100644
--- a/application/src/main/java/org/thingsboard/server/actors/shared/rule/RuleManager.java
+++ b/application/src/main/java/org/thingsboard/server/actors/shared/rule/RuleManager.java
@@ -25,7 +25,6 @@ import org.thingsboard.server.actors.rule.RuleActorChain;
import org.thingsboard.server.actors.rule.RuleActorMetaData;
import org.thingsboard.server.actors.rule.SimpleRuleActorChain;
import org.thingsboard.server.actors.service.ContextAwareActor;
-import org.thingsboard.server.actors.service.DefaultActorService;
import org.thingsboard.server.common.data.id.RuleId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.page.PageDataIterable;
@@ -72,6 +71,9 @@ public abstract class RuleManager {
}
public Optional<ActorRef> update(ActorContext context, RuleId ruleId, ComponentLifecycleEvent event) {
+ if (ruleMap == null) {
+ init(context);
+ }
RuleMetaData rule;
if (event != ComponentLifecycleEvent.DELETED) {
rule = systemContext.getRuleService().findRuleById(ruleId);
@@ -111,11 +113,13 @@ public abstract class RuleManager {
.withDispatcher(getDispatcherName()), rId.toString()));
}
- public RuleActorChain getRuleChain() {
+ public RuleActorChain getRuleChain(ActorContext context) {
+ if (ruleMap == null) {
+ init(context);
+ }
return ruleChain;
}
-
private void refreshRuleChain() {
Set<RuleActorMetaData> activeRuleSet = new HashSet<>();
for (Map.Entry<RuleMetaData, RuleActorMetaData> rule : ruleMap.entrySet()) {
diff --git a/application/src/main/java/org/thingsboard/server/actors/shared/rule/TenantRuleManager.java b/application/src/main/java/org/thingsboard/server/actors/shared/rule/TenantRuleManager.java
index 05a1c74..12278bb 100644
--- a/application/src/main/java/org/thingsboard/server/actors/shared/rule/TenantRuleManager.java
+++ b/application/src/main/java/org/thingsboard/server/actors/shared/rule/TenantRuleManager.java
@@ -15,6 +15,7 @@
*/
package org.thingsboard.server.actors.shared.rule;
+import akka.actor.ActorContext;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.actors.service.DefaultActorService;
import org.thingsboard.server.common.data.id.TenantId;
@@ -27,6 +28,12 @@ public class TenantRuleManager extends RuleManager {
super(systemContext, tenantId);
}
+ public void init(ActorContext context) {
+ if (systemContext.isTenantComponentsInitEnabled()) {
+ super.init(context);
+ }
+ }
+
@Override
FetchFunction<RuleMetaData> getFetchRulesFunction() {
return link -> ruleService.findTenantRules(tenantId, link);
diff --git a/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java b/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java
index 9cc3252..2fbe1ac 100644
--- a/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java
@@ -151,18 +151,13 @@ public class TenantActor extends ContextAwareActor {
private void process(RuleChainDeviceMsg msg) {
ToDeviceActorMsg toDeviceActorMsg = msg.getToDeviceActorMsg();
ActorRef deviceActor = getOrCreateDeviceActor(toDeviceActorMsg.getDeviceId());
- RuleActorChain chain = new ComplexRuleActorChain(msg.getRuleChain(), ruleManager.getRuleChain());
+ RuleActorChain chain = new ComplexRuleActorChain(msg.getRuleChain(), ruleManager.getRuleChain(this.context()));
deviceActor.tell(new RuleChainDeviceMsg(toDeviceActorMsg, chain), context().self());
}
private ActorRef getOrCreateDeviceActor(DeviceId deviceId) {
- ActorRef deviceActor = deviceActors.get(deviceId);
- if (deviceActor == null) {
- deviceActor = context().actorOf(Props.create(new DeviceActor.ActorCreator(systemContext, tenantId, deviceId))
- .withDispatcher(DefaultActorService.CORE_DISPATCHER_NAME), deviceId.toString());
- deviceActors.put(deviceId, deviceActor);
- }
- return deviceActor;
+ return deviceActors.computeIfAbsent(deviceId, k -> context().actorOf(Props.create(new DeviceActor.ActorCreator(systemContext, tenantId, deviceId))
+ .withDispatcher(DefaultActorService.CORE_DISPATCHER_NAME), deviceId.toString()));
}
public static class ActorCreator extends ContextBasedCreator<TenantActor> {
diff --git a/application/src/main/java/org/thingsboard/server/controller/AlarmController.java b/application/src/main/java/org/thingsboard/server/controller/AlarmController.java
index e301138..3b21611 100644
--- a/application/src/main/java/org/thingsboard/server/controller/AlarmController.java
+++ b/application/src/main/java/org/thingsboard/server/controller/AlarmController.java
@@ -15,30 +15,16 @@
*/
package org.thingsboard.server.controller;
-import com.google.common.util.concurrent.ListenableFuture;
import org.apache.commons.lang3.StringUtils;
import org.springframework.http.HttpStatus;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.web.bind.annotation.*;
-import org.thingsboard.server.common.data.Customer;
-import org.thingsboard.server.common.data.Event;
import org.thingsboard.server.common.data.alarm.*;
-import org.thingsboard.server.common.data.asset.Asset;
import org.thingsboard.server.common.data.id.*;
-import org.thingsboard.server.common.data.page.TextPageData;
-import org.thingsboard.server.common.data.page.TextPageLink;
import org.thingsboard.server.common.data.page.TimePageData;
import org.thingsboard.server.common.data.page.TimePageLink;
-import org.thingsboard.server.dao.asset.AssetSearchQuery;
-import org.thingsboard.server.dao.exception.IncorrectParameterException;
-import org.thingsboard.server.dao.model.ModelConstants;
import org.thingsboard.server.exception.ThingsboardErrorCode;
import org.thingsboard.server.exception.ThingsboardException;
-import org.thingsboard.server.service.security.model.SecurityUser;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.stream.Collectors;
@RestController
@RequestMapping("/api")
diff --git a/application/src/main/java/org/thingsboard/server/controller/AssetController.java b/application/src/main/java/org/thingsboard/server/controller/AssetController.java
index 2857874..5809a7f 100644
--- a/application/src/main/java/org/thingsboard/server/controller/AssetController.java
+++ b/application/src/main/java/org/thingsboard/server/controller/AssetController.java
@@ -27,7 +27,7 @@ import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.page.TextPageData;
import org.thingsboard.server.common.data.page.TextPageLink;
-import org.thingsboard.server.dao.asset.AssetSearchQuery;
+import org.thingsboard.server.common.data.asset.AssetSearchQuery;
import org.thingsboard.server.dao.exception.IncorrectParameterException;
import org.thingsboard.server.dao.model.ModelConstants;
import org.thingsboard.server.exception.ThingsboardException;
diff --git a/application/src/main/java/org/thingsboard/server/controller/DeviceController.java b/application/src/main/java/org/thingsboard/server/controller/DeviceController.java
index 68b29b8..637a760 100644
--- a/application/src/main/java/org/thingsboard/server/controller/DeviceController.java
+++ b/application/src/main/java/org/thingsboard/server/controller/DeviceController.java
@@ -28,7 +28,7 @@ import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.page.TextPageData;
import org.thingsboard.server.common.data.page.TextPageLink;
import org.thingsboard.server.common.data.security.DeviceCredentials;
-import org.thingsboard.server.dao.device.DeviceSearchQuery;
+import org.thingsboard.server.common.data.device.DeviceSearchQuery;
import org.thingsboard.server.dao.exception.IncorrectParameterException;
import org.thingsboard.server.dao.model.ModelConstants;
import org.thingsboard.server.exception.ThingsboardException;
diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml
index 6831cc6..e1bad01 100644
--- a/application/src/main/resources/thingsboard.yml
+++ b/application/src/main/resources/thingsboard.yml
@@ -159,6 +159,8 @@ cassandra:
# Actor system parameters
actors:
+ tenant:
+ create_components_on_init: true
session:
sync:
# Default timeout for processing request using synchronous session (HTTP, CoAP) in milliseconds
diff --git a/application/src/test/java/org/thingsboard/server/controller/AbstractControllerTest.java b/application/src/test/java/org/thingsboard/server/controller/AbstractControllerTest.java
index 94cef57..0fb12a2 100644
--- a/application/src/test/java/org/thingsboard/server/controller/AbstractControllerTest.java
+++ b/application/src/test/java/org/thingsboard/server/controller/AbstractControllerTest.java
@@ -97,28 +97,28 @@ public abstract class AbstractControllerTest {
protected static final String SYS_ADMIN_EMAIL = "sysadmin@thingsboard.org";
private static final String SYS_ADMIN_PASSWORD = "sysadmin";
-
+
protected static final String TENANT_ADMIN_EMAIL = "testtenant@thingsboard.org";
private static final String TENANT_ADMIN_PASSWORD = "tenant";
protected static final String CUSTOMER_USER_EMAIL = "testcustomer@thingsboard.org";
private static final String CUSTOMER_USER_PASSWORD = "customer";
-
+
protected MediaType contentType = new MediaType(MediaType.APPLICATION_JSON.getType(),
MediaType.APPLICATION_JSON.getSubtype(),
Charset.forName("utf8"));
protected MockMvc mockMvc;
-
+
protected String token;
protected String refreshToken;
protected String username;
private TenantId tenantId;
-
+
@SuppressWarnings("rawtypes")
private HttpMessageConverter mappingJackson2HttpMessageConverter;
-
+
@Autowired
private WebApplicationContext webApplicationContext;
@@ -132,7 +132,7 @@ public abstract class AbstractControllerTest {
log.info("Finished test: {}", description.getMethodName());
}
};
-
+
@Autowired
void setConverters(HttpMessageConverter<?>[] converters) {
@@ -144,7 +144,7 @@ public abstract class AbstractControllerTest {
Assert.assertNotNull("the JSON message converter must not be null",
this.mappingJackson2HttpMessageConverter);
}
-
+
@Before
public void setup() throws Exception {
log.info("Executing setup");
@@ -188,7 +188,7 @@ public abstract class AbstractControllerTest {
public void teardown() throws Exception {
log.info("Executing teardown");
loginSysAdmin();
- doDelete("/api/tenant/"+tenantId.getId().toString())
+ doDelete("/api/tenant/" + tenantId.getId().toString())
.andExpect(status().isOk());
log.info("Executed teardown");
}
@@ -196,7 +196,7 @@ public abstract class AbstractControllerTest {
protected void loginSysAdmin() throws Exception {
login(SYS_ADMIN_EMAIL, SYS_ADMIN_PASSWORD);
}
-
+
protected void loginTenantAdmin() throws Exception {
login(TENANT_ADMIN_EMAIL, TENANT_ADMIN_PASSWORD);
}
@@ -204,13 +204,13 @@ public abstract class AbstractControllerTest {
protected void loginCustomerUser() throws Exception {
login(CUSTOMER_USER_EMAIL, CUSTOMER_USER_PASSWORD);
}
-
+
protected User createUserAndLogin(User user, String password) throws Exception {
User savedUser = doPost("/api/user", user, User.class);
logout();
doGet("/api/noauth/activate?activateToken={activateToken}", TestMailService.currentActivateToken)
- .andExpect(status().isSeeOther())
- .andExpect(header().string(HttpHeaders.LOCATION, "/login/createPassword?activateToken=" + TestMailService.currentActivateToken));
+ .andExpect(status().isSeeOther())
+ .andExpect(header().string(HttpHeaders.LOCATION, "/login/createPassword?activateToken=" + TestMailService.currentActivateToken));
JsonNode tokenInfo = readResponse(doPost("/api/noauth/activate", "activateToken", TestMailService.currentActivateToken, "password", password).andExpect(status().isOk()), JsonNode.class);
validateAndSetJwtToken(tokenInfo, user.getEmail());
return savedUser;
@@ -247,14 +247,14 @@ public abstract class AbstractControllerTest {
Assert.assertNotNull(token);
Assert.assertFalse(token.isEmpty());
int i = token.lastIndexOf('.');
- Assert.assertTrue(i>0);
- String withoutSignature = token.substring(0, i+1);
- Jwt<Header,Claims> jwsClaims = Jwts.parser().parseClaimsJwt(withoutSignature);
+ Assert.assertTrue(i > 0);
+ String withoutSignature = token.substring(0, i + 1);
+ Jwt<Header, Claims> jwsClaims = Jwts.parser().parseClaimsJwt(withoutSignature);
Claims claims = jwsClaims.getBody();
String subject = claims.getSubject();
Assert.assertEquals(username, subject);
}
-
+
protected void logout() throws Exception {
this.token = null;
this.refreshToken = null;
@@ -266,24 +266,24 @@ public abstract class AbstractControllerTest {
request.header(ThingsboardSecurityConfiguration.JWT_TOKEN_HEADER_PARAM, "Bearer " + this.token);
}
}
-
+
protected ResultActions doGet(String urlTemplate, Object... urlVariables) throws Exception {
MockHttpServletRequestBuilder getRequest = get(urlTemplate, urlVariables);
setJwtToken(getRequest);
return mockMvc.perform(getRequest);
}
-
+
protected <T> T doGet(String urlTemplate, Class<T> responseClass, Object... urlVariables) throws Exception {
return readResponse(doGet(urlTemplate, urlVariables).andExpect(status().isOk()), responseClass);
}
-
+
protected <T> T doGetTyped(String urlTemplate, TypeReference<T> responseType, Object... urlVariables) throws Exception {
return readResponse(doGet(urlTemplate, urlVariables).andExpect(status().isOk()), responseType);
}
-
+
protected <T> T doGetTypedWithPageLink(String urlTemplate, TypeReference<T> responseType,
- TextPageLink pageLink,
- Object... urlVariables) throws Exception {
+ TextPageLink pageLink,
+ Object... urlVariables) throws Exception {
List<Object> pageLinkVariables = new ArrayList<>();
urlTemplate += "limit={limit}";
pageLinkVariables.add(pageLink.getLimit());
@@ -299,18 +299,18 @@ public abstract class AbstractControllerTest {
urlTemplate += "&textOffset={textOffset}";
pageLinkVariables.add(pageLink.getTextOffset());
}
-
- Object[] vars = new Object[urlVariables.length + pageLinkVariables.size()];
+
+ Object[] vars = new Object[urlVariables.length + pageLinkVariables.size()];
System.arraycopy(urlVariables, 0, vars, 0, urlVariables.length);
System.arraycopy(pageLinkVariables.toArray(), 0, vars, urlVariables.length, pageLinkVariables.size());
-
+
return readResponse(doGet(urlTemplate, vars).andExpect(status().isOk()), responseType);
}
-
+
protected <T> T doPost(String urlTemplate, Class<T> responseClass, String... params) throws Exception {
return readResponse(doPost(urlTemplate, params).andExpect(status().isOk()), responseClass);
}
-
+
protected <T> T doPost(String urlTemplate, T content, Class<T> responseClass, String... params) throws Exception {
return readResponse(doPost(urlTemplate, content, params).andExpect(status().isOk()), responseClass);
}
@@ -318,15 +318,15 @@ public abstract class AbstractControllerTest {
protected <T> T doDelete(String urlTemplate, Class<T> responseClass, String... params) throws Exception {
return readResponse(doDelete(urlTemplate, params).andExpect(status().isOk()), responseClass);
}
-
+
protected ResultActions doPost(String urlTemplate, String... params) throws Exception {
MockHttpServletRequestBuilder postRequest = post(urlTemplate);
setJwtToken(postRequest);
populateParams(postRequest, params);
return mockMvc.perform(postRequest);
}
-
- protected <T> ResultActions doPost(String urlTemplate, T content, String... params) throws Exception {
+
+ protected <T> ResultActions doPost(String urlTemplate, T content, String... params) throws Exception {
MockHttpServletRequestBuilder postRequest = post(urlTemplate);
setJwtToken(postRequest);
String json = json(content);
@@ -334,25 +334,25 @@ public abstract class AbstractControllerTest {
populateParams(postRequest, params);
return mockMvc.perform(postRequest);
}
-
+
protected ResultActions doDelete(String urlTemplate, String... params) throws Exception {
MockHttpServletRequestBuilder deleteRequest = delete(urlTemplate);
setJwtToken(deleteRequest);
populateParams(deleteRequest, params);
return mockMvc.perform(deleteRequest);
}
-
+
protected void populateParams(MockHttpServletRequestBuilder request, String... params) {
if (params != null && params.length > 0) {
Assert.assertEquals(params.length % 2, 0);
MultiValueMap<String, String> paramsMap = new LinkedMultiValueMap<String, String>();
- for (int i=0;i<params.length;i+=2) {
- paramsMap.add(params[i], params[i+1]);
+ for (int i = 0; i < params.length; i += 2) {
+ paramsMap.add(params[i], params[i + 1]);
}
request.params(paramsMap);
}
}
-
+
@SuppressWarnings("unchecked")
protected String json(Object o) throws IOException {
MockHttpOutputMessage mockHttpOutputMessage = new MockHttpOutputMessage();
@@ -360,14 +360,14 @@ public abstract class AbstractControllerTest {
o, MediaType.APPLICATION_JSON, mockHttpOutputMessage);
return mockHttpOutputMessage.getBodyAsString();
}
-
+
@SuppressWarnings("unchecked")
protected <T> T readResponse(ResultActions result, Class<T> responseClass) throws Exception {
byte[] content = result.andReturn().getResponse().getContentAsByteArray();
MockHttpInputMessage mockHttpInputMessage = new MockHttpInputMessage(content);
return (T) this.mappingJackson2HttpMessageConverter.read(responseClass, mockHttpInputMessage);
}
-
+
protected <T> T readResponse(ResultActions result, TypeReference<T> type) throws Exception {
byte[] content = result.andReturn().getResponse().getContentAsByteArray();
ObjectMapper mapper = new ObjectMapper();
diff --git a/dao/src/main/java/org/thingsboard/server/dao/asset/AssetService.java b/dao/src/main/java/org/thingsboard/server/dao/asset/AssetService.java
index 214ef15..7b64b20 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/asset/AssetService.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/asset/AssetService.java
@@ -18,6 +18,7 @@ package org.thingsboard.server.dao.asset;
import com.google.common.util.concurrent.ListenableFuture;
import org.thingsboard.server.common.data.EntitySubtype;
import org.thingsboard.server.common.data.asset.Asset;
+import org.thingsboard.server.common.data.asset.AssetSearchQuery;
import org.thingsboard.server.common.data.id.AssetId;
import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.TenantId;
diff --git a/dao/src/main/java/org/thingsboard/server/dao/asset/BaseAssetService.java b/dao/src/main/java/org/thingsboard/server/dao/asset/BaseAssetService.java
index 9ea6448..14a9f21 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/asset/BaseAssetService.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/asset/BaseAssetService.java
@@ -29,6 +29,7 @@ import org.thingsboard.server.common.data.EntitySubtype;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.Tenant;
import org.thingsboard.server.common.data.asset.Asset;
+import org.thingsboard.server.common.data.asset.AssetSearchQuery;
import org.thingsboard.server.common.data.id.AssetId;
import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.EntityId;
diff --git a/dao/src/main/java/org/thingsboard/server/dao/device/DeviceService.java b/dao/src/main/java/org/thingsboard/server/dao/device/DeviceService.java
index cd56fef..3b0c5ec 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/device/DeviceService.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/device/DeviceService.java
@@ -18,6 +18,7 @@ package org.thingsboard.server.dao.device;
import com.google.common.util.concurrent.ListenableFuture;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.EntitySubtype;
+import org.thingsboard.server.common.data.device.DeviceSearchQuery;
import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.TenantId;
diff --git a/dao/src/main/java/org/thingsboard/server/dao/device/DeviceServiceImpl.java b/dao/src/main/java/org/thingsboard/server/dao/device/DeviceServiceImpl.java
index 717aea0..76be996 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/device/DeviceServiceImpl.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/device/DeviceServiceImpl.java
@@ -25,6 +25,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import org.thingsboard.server.common.data.*;
+import org.thingsboard.server.common.data.device.DeviceSearchQuery;
import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.EntityId;
diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
index 9c4dd40..3fd1696 100644
--- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
+++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
@@ -16,6 +16,7 @@
package org.thingsboard.server.transport.mqtt;
import com.fasterxml.jackson.databind.JsonNode;
+import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.mqtt.*;
@@ -45,6 +46,8 @@ import org.thingsboard.server.transport.mqtt.util.SslUtil;
import javax.net.ssl.SSLPeerUnverifiedException;
import javax.security.cert.X509Certificate;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.List;
@@ -71,6 +74,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
private final RelationService relationService;
private final SslHandler sslHandler;
private volatile boolean connected;
+ private volatile InetSocketAddress address;
private volatile GatewaySessionCtx gatewaySessionCtx;
public MqttTransportHandler(SessionMsgProcessor processor, DeviceService deviceService, DeviceAuthService authService, RelationService relationService,
@@ -94,30 +98,36 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
}
private void processMqttMsg(ChannelHandlerContext ctx, MqttMessage msg) {
- deviceSessionCtx.setChannel(ctx);
- switch (msg.fixedHeader().messageType()) {
- case CONNECT:
- processConnect(ctx, (MqttConnectMessage) msg);
- break;
- case PUBLISH:
- processPublish(ctx, (MqttPublishMessage) msg);
- break;
- case SUBSCRIBE:
- processSubscribe(ctx, (MqttSubscribeMessage) msg);
- break;
- case UNSUBSCRIBE:
- processUnsubscribe(ctx, (MqttUnsubscribeMessage) msg);
- break;
- case PINGREQ:
- if (checkConnected(ctx)) {
- ctx.writeAndFlush(new MqttMessage(new MqttFixedHeader(PINGRESP, false, AT_MOST_ONCE, false, 0)));
- }
- break;
- case DISCONNECT:
- if (checkConnected(ctx)) {
- processDisconnect(ctx);
- }
- break;
+ address = (InetSocketAddress) ctx.channel().remoteAddress();
+ if (msg.fixedHeader() == null) {
+ log.info("[{}:{}] Invalid message received", address.getHostName(), address.getPort());
+ processDisconnect(ctx);
+ } else {
+ deviceSessionCtx.setChannel(ctx);
+ switch (msg.fixedHeader().messageType()) {
+ case CONNECT:
+ processConnect(ctx, (MqttConnectMessage) msg);
+ break;
+ case PUBLISH:
+ processPublish(ctx, (MqttPublishMessage) msg);
+ break;
+ case SUBSCRIBE:
+ processSubscribe(ctx, (MqttSubscribeMessage) msg);
+ break;
+ case UNSUBSCRIBE:
+ processUnsubscribe(ctx, (MqttUnsubscribeMessage) msg);
+ break;
+ case PINGREQ:
+ if (checkConnected(ctx)) {
+ ctx.writeAndFlush(new MqttMessage(new MqttFixedHeader(PINGRESP, false, AT_MOST_ONCE, false, 0)));
+ }
+ break;
+ case DISCONNECT:
+ if (checkConnected(ctx)) {
+ processDisconnect(ctx);
+ }
+ break;
+ }
}
}
@@ -313,9 +323,11 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
private void processDisconnect(ChannelHandlerContext ctx) {
ctx.close();
- processor.process(SessionCloseMsg.onDisconnect(deviceSessionCtx.getSessionId()));
- if (gatewaySessionCtx != null) {
- gatewaySessionCtx.onGatewayDisconnect();
+ if (connected) {
+ processor.process(SessionCloseMsg.onDisconnect(deviceSessionCtx.getSessionId()));
+ if (gatewaySessionCtx != null) {
+ gatewaySessionCtx.onGatewayDisconnect();
+ }
}
}