thingsboard-aplcache

Changes

Details

diff --git a/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java b/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java
index f4373db..15e9e9d 100644
--- a/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java
@@ -25,6 +25,9 @@ import akka.actor.Terminated;
 import akka.event.Logging;
 import akka.event.LoggingAdapter;
 import akka.japi.Function;
+import com.google.common.collect.BiMap;
+import com.google.common.collect.HashBiMap;
+import lombok.extern.slf4j.Slf4j;
 import org.thingsboard.server.actors.ActorSystemContext;
 import org.thingsboard.server.actors.ruleChain.RuleChainManagerActor;
 import org.thingsboard.server.actors.service.ContextBasedCreator;
@@ -48,18 +51,17 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
 
+@Slf4j
 public class AppActor extends RuleChainManagerActor {
 
-    private final LoggingAdapter logger = Logging.getLogger(getContext().system(), this);
-
-    public static final TenantId SYSTEM_TENANT = new TenantId(ModelConstants.NULL_UUID);
+    private static final TenantId SYSTEM_TENANT = new TenantId(ModelConstants.NULL_UUID);
     private final TenantService tenantService;
-    private final Map<TenantId, ActorRef> tenantActors;
+    private final BiMap<TenantId, ActorRef> tenantActors;
 
     private AppActor(ActorSystemContext systemContext) {
         super(systemContext, new SystemRuleChainManager(systemContext));
         this.tenantService = systemContext.getTenantService();
-        this.tenantActors = new HashMap<>();
+        this.tenantActors = HashBiMap.create();
     }
 
     @Override
@@ -69,22 +71,20 @@ public class AppActor extends RuleChainManagerActor {
 
     @Override
     public void preStart() {
-        logger.info("Starting main system actor.");
+        log.info("Starting main system actor.");
         try {
             initRuleChains();
-
             if (systemContext.isTenantComponentsInitEnabled()) {
                 PageDataIterable<Tenant> tenantIterator = new PageDataIterable<>(tenantService::findTenants, ENTITY_PACK_LIMIT);
                 for (Tenant tenant : tenantIterator) {
-                    logger.debug("[{}] Creating tenant actor", tenant.getId());
+                    log.debug("[{}] Creating tenant actor", tenant.getId());
                     getOrCreateTenantActor(tenant.getId());
-                    logger.debug("Tenant actor created.");
+                    log.debug("Tenant actor created.");
                 }
             }
-
-            logger.info("Main system actor started.");
+            log.info("Main system actor started.");
         } catch (Exception e) {
-            logger.error(e, "Unknown failure");
+            log.warn("Unknown failure", e);
         }
     }
 
@@ -130,7 +130,7 @@ public class AppActor extends RuleChainManagerActor {
 
     private void onServiceToRuleEngineMsg(ServiceToRuleEngineMsg msg) {
         if (SYSTEM_TENANT.equals(msg.getTenantId())) {
-            //TODO: ashvayka handle this.
+            log.warn("[{}] Invalid service to rule engine msg called. System messages are not supported yet", SYSTEM_TENANT);
         } else {
             getOrCreateTenantActor(msg.getTenantId()).tell(msg, self());
         }
@@ -152,7 +152,7 @@ public class AppActor extends RuleChainManagerActor {
         if (target != null) {
             target.tell(msg, ActorRef.noSender());
         } else {
-            logger.debug("Invalid component lifecycle msg: {}", msg);
+            log.debug("[{}] Invalid component lifecycle msg: {}", msg.getTenantId(), msg);
         }
     }
 
@@ -161,14 +161,26 @@ public class AppActor extends RuleChainManagerActor {
     }
 
     private ActorRef getOrCreateTenantActor(TenantId tenantId) {
-        return tenantActors.computeIfAbsent(tenantId, k -> context().actorOf(Props.create(new TenantActor.ActorCreator(systemContext, tenantId))
-                .withDispatcher(DefaultActorService.CORE_DISPATCHER_NAME), tenantId.toString()));
+        return tenantActors.computeIfAbsent(tenantId, k -> {
+            log.debug("[{}] Creating tenant actor.", tenantId);
+            ActorRef tenantActor = context().actorOf(Props.create(new TenantActor.ActorCreator(systemContext, tenantId))
+                    .withDispatcher(DefaultActorService.CORE_DISPATCHER_NAME), tenantId.toString());
+            context().watch(tenantActor);
+            log.debug("[{}] Created tenant actor: {}.", tenantId, tenantActor);
+            return tenantActor;
+        });
     }
 
-    private void processTermination(Terminated message) {
+    @Override
+    protected void processTermination(Terminated message) {
         ActorRef terminated = message.actor();
         if (terminated instanceof LocalActorRef) {
-            logger.debug("Removed actor: {}", terminated);
+            boolean removed = tenantActors.inverse().remove(terminated) != null;
+            if (removed) {
+                log.debug("[{}] Removed actor:", terminated);
+            } else {
+                log.warn("[{}] Removed actor was not found in the tenant map!");
+            }
         } else {
             throw new IllegalStateException("Remote actors are not supported!");
         }
@@ -182,20 +194,17 @@ public class AppActor extends RuleChainManagerActor {
         }
 
         @Override
-        public AppActor create() throws Exception {
+        public AppActor create() {
             return new AppActor(context);
         }
     }
 
-    private final SupervisorStrategy strategy = new OneForOneStrategy(3, Duration.create("1 minute"), new Function<Throwable, Directive>() {
-        @Override
-        public Directive apply(Throwable t) {
-            logger.error(t, "Unknown failure");
-            if (t instanceof RuntimeException) {
-                return SupervisorStrategy.restart();
-            } else {
-                return SupervisorStrategy.stop();
-            }
+    private final SupervisorStrategy strategy = new OneForOneStrategy(3, Duration.create("1 minute"), t -> {
+        log.warn("Unknown failure", t);
+        if (t instanceof RuntimeException) {
+            return SupervisorStrategy.restart();
+        } else {
+            return SupervisorStrategy.stop();
         }
     });
 }
diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActor.java
index 7b412b1..f450027 100644
--- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActor.java
@@ -15,42 +15,38 @@
  */
 package org.thingsboard.server.actors.device;
 
-import akka.event.Logging;
-import akka.event.LoggingAdapter;
+import lombok.extern.slf4j.Slf4j;
 import org.thingsboard.rule.engine.api.msg.DeviceAttributesEventNotificationMsg;
 import org.thingsboard.rule.engine.api.msg.DeviceNameOrTypeUpdateMsg;
 import org.thingsboard.server.actors.ActorSystemContext;
 import org.thingsboard.server.actors.service.ContextAwareActor;
-import org.thingsboard.server.actors.service.ContextBasedCreator;
 import org.thingsboard.server.common.data.id.DeviceId;
 import org.thingsboard.server.common.data.id.TenantId;
 import org.thingsboard.server.common.msg.TbActorMsg;
-import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
 import org.thingsboard.server.common.msg.timeout.DeviceActorClientSideRpcTimeoutMsg;
 import org.thingsboard.server.common.msg.timeout.DeviceActorServerSideRpcTimeoutMsg;
 import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg;
 import org.thingsboard.server.service.rpc.ToServerRpcResponseActorMsg;
 import org.thingsboard.server.service.transport.msg.TransportToDeviceActorMsgWrapper;
 
+@Slf4j
 public class DeviceActor extends ContextAwareActor {
 
-    private final LoggingAdapter logger = Logging.getLogger(getContext().system(), this);
-
     private final DeviceActorMessageProcessor processor;
 
-    private DeviceActor(ActorSystemContext systemContext, TenantId tenantId, DeviceId deviceId) {
+    DeviceActor(ActorSystemContext systemContext, TenantId tenantId, DeviceId deviceId) {
         super(systemContext);
-        this.processor = new DeviceActorMessageProcessor(systemContext, logger, tenantId, deviceId);
+        this.processor = new DeviceActorMessageProcessor(systemContext, tenantId, deviceId);
     }
 
     @Override
     public void preStart() {
-        logger.debug("[{}][{}] Starting device actor.", processor.tenantId, processor.deviceId);
+        log.debug("[{}][{}] Starting device actor.", processor.tenantId, processor.deviceId);
         try {
             processor.initSessionTimeout(context());
-            logger.debug("[{}][{}] Device actor started.", processor.tenantId, processor.deviceId);
+            log.debug("[{}][{}] Device actor started.", processor.tenantId, processor.deviceId);
         } catch (Exception e) {
-            logger.error(e, "[{}][{}] Unknown failure", processor.tenantId, processor.deviceId);
+            log.warn("[{}][{}] Unknown failure", processor.tenantId, processor.deviceId, e);
         }
     }
 
@@ -90,22 +86,4 @@ public class DeviceActor extends ContextAwareActor {
         return true;
     }
 
-    public static class ActorCreator extends ContextBasedCreator<DeviceActor> {
-        private static final long serialVersionUID = 1L;
-
-        private final TenantId tenantId;
-        private final DeviceId deviceId;
-
-        public ActorCreator(ActorSystemContext context, TenantId tenantId, DeviceId deviceId) {
-            super(context);
-            this.tenantId = tenantId;
-            this.deviceId = deviceId;
-        }
-
-        @Override
-        public DeviceActor create() throws Exception {
-            return new DeviceActor(context, tenantId, deviceId);
-        }
-    }
-
 }
diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorCreator.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorCreator.java
new file mode 100644
index 0000000..18aa926
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorCreator.java
@@ -0,0 +1,39 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.actors.device;
+
+import org.thingsboard.server.actors.ActorSystemContext;
+import org.thingsboard.server.actors.service.ContextBasedCreator;
+import org.thingsboard.server.common.data.id.DeviceId;
+import org.thingsboard.server.common.data.id.TenantId;
+
+public class DeviceActorCreator extends ContextBasedCreator<DeviceActor> {
+    private static final long serialVersionUID = 1L;
+
+    private final TenantId tenantId;
+    private final DeviceId deviceId;
+
+    public DeviceActorCreator(ActorSystemContext context, TenantId tenantId, DeviceId deviceId) {
+        super(context);
+        this.tenantId = tenantId;
+        this.deviceId = deviceId;
+    }
+
+    @Override
+    public DeviceActor create() {
+        return new DeviceActor(context, tenantId, deviceId);
+    }
+}
diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
index a16bd78..47417d1 100644
--- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
@@ -24,6 +24,7 @@ import com.google.common.util.concurrent.ListenableFuture;
 import com.google.gson.Gson;
 import com.google.gson.JsonObject;
 import com.google.gson.JsonParser;
+import lombok.extern.slf4j.Slf4j;
 import org.thingsboard.rule.engine.api.RpcError;
 import org.thingsboard.rule.engine.api.msg.DeviceAttributesEventNotificationMsg;
 import org.thingsboard.rule.engine.api.msg.DeviceNameOrTypeUpdateMsg;
@@ -88,6 +89,7 @@ import java.util.stream.Collectors;
 /**
  * @author Andrew Shvayka
  */
+@Slf4j
 class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
 
     final TenantId tenantId;
@@ -106,8 +108,8 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
     private String deviceType;
     private TbMsgMetaData defaultMetaData;
 
-    DeviceActorMessageProcessor(ActorSystemContext systemContext, LoggingAdapter logger, TenantId tenantId, DeviceId deviceId) {
-        super(systemContext, logger);
+    DeviceActorMessageProcessor(ActorSystemContext systemContext, TenantId tenantId, DeviceId deviceId) {
+        super(systemContext);
         this.tenantId = tenantId;
         this.deviceId = deviceId;
         this.sessions = new LinkedHashMap<>();
@@ -136,30 +138,30 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
 
         long timeout = request.getExpirationTime() - System.currentTimeMillis();
         if (timeout <= 0) {
-            logger.debug("[{}][{}] Ignoring message due to exp time reached", deviceId, request.getId(), request.getExpirationTime());
+            log.debug("[{}][{}] Ignoring message due to exp time reached, {}", deviceId, request.getId(), request.getExpirationTime());
             return;
         }
 
         boolean sent = rpcSubscriptions.size() > 0;
         Set<UUID> syncSessionSet = new HashSet<>();
-        rpcSubscriptions.entrySet().forEach(sub -> {
-            sendToTransport(rpcRequest, sub.getKey(), sub.getValue().getNodeId());
-            if (TransportProtos.SessionType.SYNC == sub.getValue().getType()) {
-                syncSessionSet.add(sub.getKey());
+        rpcSubscriptions.forEach((key, value) -> {
+            sendToTransport(rpcRequest, key, value.getNodeId());
+            if (TransportProtos.SessionType.SYNC == value.getType()) {
+                syncSessionSet.add(key);
             }
         });
         syncSessionSet.forEach(rpcSubscriptions::remove);
 
         if (request.isOneway() && sent) {
-            logger.debug("[{}] Rpc command response sent [{}]!", deviceId, request.getId());
+            log.debug("[{}] Rpc command response sent [{}]!", deviceId, request.getId());
             systemContext.getDeviceRpcService().processResponseToServerSideRPCRequestFromDeviceActor(new FromDeviceRpcResponse(msg.getMsg().getId(), null, null));
         } else {
             registerPendingRpcRequest(context, msg, sent, rpcRequest, timeout);
         }
         if (sent) {
-            logger.debug("[{}] RPC request {} is sent!", deviceId, request.getId());
+            log.debug("[{}] RPC request {} is sent!", deviceId, request.getId());
         } else {
-            logger.debug("[{}] RPC request {} is NOT sent!", deviceId, request.getId());
+            log.debug("[{}] RPC request {} is NOT sent!", deviceId, request.getId());
         }
     }
 
@@ -172,7 +174,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
     void processServerSideRpcTimeout(ActorContext context, DeviceActorServerSideRpcTimeoutMsg msg) {
         ToDeviceRpcRequestMetadata requestMd = toDeviceRpcPendingMap.remove(msg.getId());
         if (requestMd != null) {
-            logger.debug("[{}] RPC request [{}] timeout detected!", deviceId, msg.getId());
+            log.debug("[{}] RPC request [{}] timeout detected!", deviceId, msg.getId());
             systemContext.getDeviceRpcService().processResponseToServerSideRPCRequestFromDeviceActor(new FromDeviceRpcResponse(requestMd.getMsg().getMsg().getId(),
                     null, requestMd.isSent() ? RpcError.TIMEOUT : RpcError.NO_ACTIVE_CONNECTION));
         }
@@ -181,13 +183,13 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
     private void sendPendingRequests(ActorContext context, UUID sessionId, SessionInfoProto sessionInfo) {
         TransportProtos.SessionType sessionType = getSessionType(sessionId);
         if (!toDeviceRpcPendingMap.isEmpty()) {
-            logger.debug("[{}] Pushing {} pending RPC messages to new async session [{}]", deviceId, toDeviceRpcPendingMap.size(), sessionId);
+            log.debug("[{}] Pushing {} pending RPC messages to new async session [{}]", deviceId, toDeviceRpcPendingMap.size(), sessionId);
             if (sessionType == TransportProtos.SessionType.SYNC) {
-                logger.debug("[{}] Cleanup sync rpc session [{}]", deviceId, sessionId);
+                log.debug("[{}] Cleanup sync rpc session [{}]", deviceId, sessionId);
                 rpcSubscriptions.remove(sessionId);
             }
         } else {
-            logger.debug("[{}] No pending RPC messages for new async session [{}]", deviceId, sessionId);
+            log.debug("[{}] No pending RPC messages for new async session [{}]", deviceId, sessionId);
         }
         Set<Integer> sentOneWayIds = new HashSet<>();
         if (sessionType == TransportProtos.SessionType.ASYNC) {
@@ -335,7 +337,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
     void processClientSideRpcTimeout(ActorContext context, DeviceActorClientSideRpcTimeoutMsg msg) {
         ToServerRpcRequestMetadata data = toServerRpcPendingMap.remove(msg.getId());
         if (data != null) {
-            logger.debug("[{}] Client side RPC request [{}] timeout detected!", deviceId, msg.getId());
+            log.debug("[{}] Client side RPC request [{}] timeout detected!", deviceId, msg.getId());
             sendToTransport(TransportProtos.ToServerRpcResponseMsg.newBuilder()
                             .setRequestId(msg.getId()).setError("timeout").build()
                     , data.getSessionId(), data.getNodeId());
@@ -380,7 +382,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
                             hasNotificationData = true;
                         }
                     } else {
-                        logger.debug("[{}] No public server side attributes changed!", deviceId);
+                        log.debug("[{}] No public server side attributes changed!", deviceId);
                     }
                 }
             }
@@ -391,27 +393,27 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
                 });
             }
         } else {
-            logger.debug("[{}] No registered attributes subscriptions to process!", deviceId);
+            log.debug("[{}] No registered attributes subscriptions to process!", deviceId);
         }
     }
 
     private void processRpcResponses(ActorContext context, SessionInfoProto sessionInfo, ToDeviceRpcResponseMsg responseMsg) {
         UUID sessionId = getSessionId(sessionInfo);
-        logger.debug("[{}] Processing rpc command response [{}]", deviceId, sessionId);
+        log.debug("[{}] Processing rpc command response [{}]", deviceId, sessionId);
         ToDeviceRpcRequestMetadata requestMd = toDeviceRpcPendingMap.remove(responseMsg.getRequestId());
         boolean success = requestMd != null;
         if (success) {
             systemContext.getDeviceRpcService().processResponseToServerSideRPCRequestFromDeviceActor(new FromDeviceRpcResponse(requestMd.getMsg().getMsg().getId(),
                     responseMsg.getPayload(), null));
         } else {
-            logger.debug("[{}] Rpc command response [{}] is stale!", deviceId, responseMsg.getRequestId());
+            log.debug("[{}] Rpc command response [{}] is stale!", deviceId, responseMsg.getRequestId());
         }
     }
 
     private void processSubscriptionCommands(ActorContext context, SessionInfoProto sessionInfo, SubscribeToAttributeUpdatesMsg subscribeCmd) {
         UUID sessionId = getSessionId(sessionInfo);
         if (subscribeCmd.getUnsubscribe()) {
-            logger.debug("[{}] Canceling attributes subscription for session [{}]", deviceId, sessionId);
+            log.debug("[{}] Canceling attributes subscription for session [{}]", deviceId, sessionId);
             attributeSubscriptions.remove(sessionId);
         } else {
             SessionInfoMetaData sessionMD = sessions.get(sessionId);
@@ -419,7 +421,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
                 sessionMD = new SessionInfoMetaData(new SessionInfo(TransportProtos.SessionType.SYNC, sessionInfo.getNodeId()));
             }
             sessionMD.setSubscribedToAttributes(true);
-            logger.debug("[{}] Registering attributes subscription for session [{}]", deviceId, sessionId);
+            log.debug("[{}] Registering attributes subscription for session [{}]", deviceId, sessionId);
             attributeSubscriptions.put(sessionId, sessionMD.getSessionInfo());
             dumpSessions();
         }
@@ -432,7 +434,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
     private void processSubscriptionCommands(ActorContext context, SessionInfoProto sessionInfo, SubscribeToRPCMsg subscribeCmd) {
         UUID sessionId = getSessionId(sessionInfo);
         if (subscribeCmd.getUnsubscribe()) {
-            logger.debug("[{}] Canceling rpc subscription for session [{}]", deviceId, sessionId);
+            log.debug("[{}] Canceling rpc subscription for session [{}]", deviceId, sessionId);
             rpcSubscriptions.remove(sessionId);
         } else {
             SessionInfoMetaData sessionMD = sessions.get(sessionId);
@@ -440,7 +442,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
                 sessionMD = new SessionInfoMetaData(new SessionInfo(TransportProtos.SessionType.SYNC, sessionInfo.getNodeId()));
             }
             sessionMD.setSubscribedToRPC(true);
-            logger.debug("[{}] Registering rpc subscription for session [{}]", deviceId, sessionId);
+            log.debug("[{}] Registering rpc subscription for session [{}]", deviceId, sessionId);
             rpcSubscriptions.put(sessionId, sessionMD.getSessionInfo());
             sendPendingRequests(context, sessionId, sessionInfo);
             dumpSessions();
@@ -451,10 +453,10 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
         UUID sessionId = getSessionId(sessionInfo);
         if (msg.getEvent() == SessionEvent.OPEN) {
             if (sessions.containsKey(sessionId)) {
-                logger.debug("[{}] Received duplicate session open event [{}]", deviceId, sessionId);
+                log.debug("[{}] Received duplicate session open event [{}]", deviceId, sessionId);
                 return;
             }
-            logger.debug("[{}] Processing new session [{}]", deviceId, sessionId);
+            log.debug("[{}] Processing new session [{}]", deviceId, sessionId);
             if (sessions.size() >= systemContext.getMaxConcurrentSessionsPerDevice()) {
                 UUID sessionIdToRemove = sessions.keySet().stream().findFirst().orElse(null);
                 if (sessionIdToRemove != null) {
@@ -467,7 +469,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
             }
             dumpSessions();
         } else if (msg.getEvent() == SessionEvent.CLOSED) {
-            logger.debug("[{}] Canceling subscriptions for closed session [{}]", deviceId, sessionId);
+            log.debug("[{}] Canceling subscriptions for closed session [{}]", deviceId, sessionId);
             sessions.remove(sessionId);
             attributeSubscriptions.remove(sessionId);
             rpcSubscriptions.remove(sessionId);
@@ -623,10 +625,10 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
     }
 
     private void restoreSessions() {
-        logger.debug("[{}] Restoring sessions from cache", deviceId);
+        log.debug("[{}] Restoring sessions from cache", deviceId);
         TransportProtos.DeviceSessionsCacheEntry sessionsDump = systemContext.getDeviceSessionCacheService().get(deviceId);
         if (sessionsDump.getSerializedSize() == 0) {
-            logger.debug("[{}] No session information found", deviceId);
+            log.debug("[{}] No session information found", deviceId);
             return;
         }
         for (TransportProtos.SessionSubscriptionInfoProto sessionSubscriptionInfoProto : sessionsDump.getSessionsList()) {
@@ -644,13 +646,13 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
                 rpcSubscriptions.put(sessionId, sessionInfo);
                 sessionMD.setSubscribedToRPC(true);
             }
-            logger.debug("[{}] Restored session: {}", deviceId, sessionMD);
+            log.debug("[{}] Restored session: {}", deviceId, sessionMD);
         }
-        logger.debug("[{}] Restored sessions: {}, rpc subscriptions: {}, attribute subscriptions: {}", deviceId, sessions.size(), rpcSubscriptions.size(), attributeSubscriptions.size());
+        log.debug("[{}] Restored sessions: {}, rpc subscriptions: {}, attribute subscriptions: {}", deviceId, sessions.size(), rpcSubscriptions.size(), attributeSubscriptions.size());
     }
 
     private void dumpSessions() {
-        logger.debug("[{}] Dumping sessions: {}, rpc subscriptions: {}, attribute subscriptions: {} to cache", deviceId, sessions.size(), rpcSubscriptions.size(), attributeSubscriptions.size());
+        log.debug("[{}] Dumping sessions: {}, rpc subscriptions: {}, attribute subscriptions: {} to cache", deviceId, sessions.size(), rpcSubscriptions.size(), attributeSubscriptions.size());
         List<TransportProtos.SessionSubscriptionInfoProto> sessionsList = new ArrayList<>(sessions.size());
         sessions.forEach((uuid, sessionMD) -> {
             if (sessionMD.getSessionInfo().getType() == TransportProtos.SessionType.SYNC) {
@@ -668,7 +670,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
             sessionsList.add(TransportProtos.SessionSubscriptionInfoProto.newBuilder()
                     .setSessionInfo(sessionInfoProto)
                     .setSubscriptionInfo(subscriptionInfoProto).build());
-            logger.debug("[{}] Dumping session: {}", deviceId, sessionMD);
+            log.debug("[{}] Dumping session: {}", deviceId, sessionMD);
         });
         systemContext.getDeviceSessionCacheService()
                 .put(deviceId, TransportProtos.DeviceSessionsCacheEntry.newBuilder()
diff --git a/application/src/main/java/org/thingsboard/server/actors/rpc/RpcManagerActor.java b/application/src/main/java/org/thingsboard/server/actors/rpc/RpcManagerActor.java
index 31320ce..922d2ba 100644
--- a/application/src/main/java/org/thingsboard/server/actors/rpc/RpcManagerActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/rpc/RpcManagerActor.java
@@ -19,6 +19,7 @@ import akka.actor.ActorRef;
 import akka.actor.Props;
 import akka.event.Logging;
 import akka.event.LoggingAdapter;
+import lombok.extern.slf4j.Slf4j;
 import org.thingsboard.server.actors.ActorSystemContext;
 import org.thingsboard.server.actors.service.ContextAwareActor;
 import org.thingsboard.server.actors.service.ContextBasedCreator;
@@ -35,17 +36,16 @@ import java.util.*;
 /**
  * @author Andrew Shvayka
  */
+@Slf4j
 public class RpcManagerActor extends ContextAwareActor {
 
-    private final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
-
     private final Map<ServerAddress, SessionActorInfo> sessionActors;
 
     private final Map<ServerAddress, Queue<ClusterAPIProtos.ClusterMessage>> pendingMsgs;
 
     private final ServerAddress instance;
 
-    RpcManagerActor(ActorSystemContext systemContext) {
+    private RpcManagerActor(ActorSystemContext systemContext) {
         super(systemContext);
         this.sessionActors = new HashMap<>();
         this.pendingMsgs = new HashMap<>();
@@ -116,7 +116,7 @@ public class RpcManagerActor extends ContextAwareActor {
                 queue.add(msg);
             }
         } else {
-            logger.warning("Cluster msg doesn't have server address [{}]", msg);
+            log.warn("Cluster msg doesn't have server address [{}]", msg);
         }
     }
 
@@ -207,7 +207,7 @@ public class RpcManagerActor extends ContextAwareActor {
         }
 
         @Override
-        public RpcManagerActor create() throws Exception {
+        public RpcManagerActor create() {
             return new RpcManagerActor(context);
         }
     }
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActor.java
index dbad7c0..ed765e0 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActor.java
@@ -33,7 +33,7 @@ public class RuleChainActor extends ComponentActor<RuleChainId, RuleChainActorMe
     private RuleChainActor(ActorSystemContext systemContext, TenantId tenantId, RuleChainId ruleChainId) {
         super(systemContext, tenantId, ruleChainId);
         setProcessor(new RuleChainActorMessageProcessor(tenantId, ruleChainId, systemContext,
-                logger, context().parent(), context().self()));
+                context().parent(), context().self()));
     }
 
     @Override
@@ -79,7 +79,7 @@ public class RuleChainActor extends ComponentActor<RuleChainId, RuleChainActorMe
         }
 
         @Override
-        public RuleChainActor create() throws Exception {
+        public RuleChainActor create() {
             return new RuleChainActor(context, tenantId, ruleChainId);
         }
     }
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
index 3da90d1..e0f7e4a 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
@@ -23,6 +23,7 @@ import com.datastax.driver.core.utils.UUIDs;
 
 import java.util.Optional;
 
+import lombok.extern.slf4j.Slf4j;
 import org.thingsboard.server.actors.ActorSystemContext;
 import org.thingsboard.server.actors.device.DeviceActorToRuleEngineMsg;
 import org.thingsboard.server.actors.service.DefaultActorService;
@@ -55,6 +56,7 @@ import java.util.stream.Collectors;
 /**
  * @author Andrew Shvayka
  */
+@Slf4j
 public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleChainId> {
 
     private static final long DEFAULT_CLUSTER_PARTITION = 0L;
@@ -69,8 +71,8 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
     private boolean started;
 
     RuleChainActorMessageProcessor(TenantId tenantId, RuleChainId ruleChainId, ActorSystemContext systemContext
-            , LoggingAdapter logger, ActorRef parent, ActorRef self) {
-        super(systemContext, logger, tenantId, ruleChainId);
+            , ActorRef parent, ActorRef self) {
+        super(systemContext, tenantId, ruleChainId);
         this.parent = parent;
         this.self = self;
         this.nodeActors = new HashMap<>();
@@ -216,7 +218,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
 
     private void onRemoteTellNext(ServerAddress serverAddress, RuleNodeToRuleChainTellNextMsg envelope) {
         TbMsg msg = envelope.getMsg();
-        logger.debug("Forwarding [{}] msg to remote server [{}] due to changed originator id: [{}]", msg.getId(), serverAddress, msg.getOriginator());
+        log.debug("Forwarding [{}] msg to remote server [{}] due to changed originator id: [{}]", msg.getId(), serverAddress, msg.getOriginator());
         envelope = new RemoteToRuleChainTellNextMsg(envelope, tenantId, entityId);
         systemContext.getRpcService().tell(systemContext.getEncodingService().convertToProtoDataMessage(serverAddress, envelope));
     }
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActor.java
index 273a569..807c729 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActor.java
@@ -15,6 +15,7 @@
  */
 package org.thingsboard.server.actors.ruleChain;
 
+import lombok.extern.slf4j.Slf4j;
 import org.thingsboard.server.actors.ActorSystemContext;
 import org.thingsboard.server.actors.service.ComponentActor;
 import org.thingsboard.server.actors.service.ContextBasedCreator;
@@ -24,6 +25,7 @@ import org.thingsboard.server.common.data.id.TenantId;
 import org.thingsboard.server.common.msg.TbActorMsg;
 import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
 
+@Slf4j
 public class RuleNodeActor extends ComponentActor<RuleNodeId, RuleNodeActorMessageProcessor> {
 
     private final RuleChainId ruleChainId;
@@ -32,7 +34,7 @@ public class RuleNodeActor extends ComponentActor<RuleNodeId, RuleNodeActorMessa
         super(systemContext, tenantId, ruleNodeId);
         this.ruleChainId = ruleChainId;
         setProcessor(new RuleNodeActorMessageProcessor(tenantId, ruleChainId, ruleNodeId, systemContext,
-                logger, context().parent(), context().self()));
+                context().parent(), context().self()));
     }
 
     @Override
@@ -60,7 +62,7 @@ public class RuleNodeActor extends ComponentActor<RuleNodeId, RuleNodeActorMessa
     }
 
     private void onRuleNodeToSelfMsg(RuleNodeToSelfMsg msg) {
-        logger.debug("[{}] Going to process rule msg: {}", id, msg.getMsg());
+        log.debug("[{}] Going to process rule msg: {}", id, msg.getMsg());
         try {
             processor.onRuleToSelfMsg(msg);
             increaseMessagesProcessedCount();
@@ -70,7 +72,7 @@ public class RuleNodeActor extends ComponentActor<RuleNodeId, RuleNodeActorMessa
     }
 
     private void onRuleChainToRuleNodeMsg(RuleChainToRuleNodeMsg msg) {
-        logger.debug("[{}] Going to process rule msg: {}", id, msg.getMsg());
+        log.debug("[{}] Going to process rule msg: {}", id, msg.getMsg());
         try {
             processor.onRuleChainToRuleNodeMsg(msg);
             increaseMessagesProcessedCount();
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActorMessageProcessor.java
index acb171d..5d226ef 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActorMessageProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActorMessageProcessor.java
@@ -44,8 +44,8 @@ public class RuleNodeActorMessageProcessor extends ComponentMsgProcessor<RuleNod
     private TbContext defaultCtx;
 
     RuleNodeActorMessageProcessor(TenantId tenantId, RuleChainId ruleChainId, RuleNodeId ruleNodeId, ActorSystemContext systemContext
-            , LoggingAdapter logger, ActorRef parent, ActorRef self) {
-        super(systemContext, logger, tenantId, ruleNodeId);
+            , ActorRef parent, ActorRef self) {
+        super(systemContext, tenantId, ruleNodeId);
         this.parent = parent;
         this.self = self;
         this.service = systemContext.getRuleChainService();
diff --git a/application/src/main/java/org/thingsboard/server/actors/service/ComponentActor.java b/application/src/main/java/org/thingsboard/server/actors/service/ComponentActor.java
index ed59051..ff54fc8 100644
--- a/application/src/main/java/org/thingsboard/server/actors/service/ComponentActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/service/ComponentActor.java
@@ -18,6 +18,7 @@ package org.thingsboard.server.actors.service;
 import akka.actor.ActorRef;
 import akka.event.Logging;
 import akka.event.LoggingAdapter;
+import lombok.extern.slf4j.Slf4j;
 import org.thingsboard.server.actors.ActorSystemContext;
 import org.thingsboard.server.actors.shared.ComponentMsgProcessor;
 import org.thingsboard.server.actors.stats.StatsPersistMsg;
@@ -30,10 +31,9 @@ import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
 /**
  * @author Andrew Shvayka
  */
+@Slf4j
 public abstract class ComponentActor<T extends EntityId, P extends ComponentMsgProcessor<T>> extends ContextAwareActor {
 
-    protected final LoggingAdapter logger = Logging.getLogger(getContext().system(), this);
-
     private long lastPersistedErrorTs = 0L;
     protected final TenantId tenantId;
     protected final T id;
@@ -60,7 +60,7 @@ public abstract class ComponentActor<T extends EntityId, P extends ComponentMsgP
                 scheduleStatsPersistTick();
             }
         } catch (Exception e) {
-            logger.warning("[{}][{}] Failed to start {} processor: {}", tenantId, id, id.getEntityType(), e);
+            log.warn("[{}][{}] Failed to start {} processor: {}", tenantId, id, id.getEntityType(), e);
             logAndPersist("OnStart", e, true);
             logLifecycleEvent(ComponentLifecycleEvent.STARTED, e);
         }
@@ -70,7 +70,7 @@ public abstract class ComponentActor<T extends EntityId, P extends ComponentMsgP
         try {
             processor.scheduleStatsPersistTick(context(), systemContext.getStatisticsPersistFrequency());
         } catch (Exception e) {
-            logger.error("[{}][{}] Failed to schedule statistics store message. No statistics is going to be stored: {}", tenantId, id, e.getMessage());
+            log.error("[{}][{}] Failed to schedule statistics store message. No statistics is going to be stored: {}", tenantId, id, e.getMessage());
             logAndPersist("onScheduleStatsPersistMsg", e);
         }
     }
@@ -81,7 +81,7 @@ public abstract class ComponentActor<T extends EntityId, P extends ComponentMsgP
             processor.stop(context());
             logLifecycleEvent(ComponentLifecycleEvent.STOPPED);
         } catch (Exception e) {
-            logger.warning("[{}][{}] Failed to stop {} processor: {}", tenantId, id, id.getEntityType(), e.getMessage());
+            log.warn("[{}][{}] Failed to stop {} processor: {}", tenantId, id, id.getEntityType(), e.getMessage());
             logAndPersist("OnStop", e, true);
             logLifecycleEvent(ComponentLifecycleEvent.STOPPED, e);
         }
@@ -148,9 +148,9 @@ public abstract class ComponentActor<T extends EntityId, P extends ComponentMsgP
     private void logAndPersist(String method, Exception e, boolean critical) {
         errorsOccurred++;
         if (critical) {
-            logger.warning("[{}][{}] Failed to process {} msg: {}", id, tenantId, method, e);
+            log.warn("[{}][{}] Failed to process {} msg: {}", id, tenantId, method, e);
         } else {
-            logger.debug("[{}][{}] Failed to process {} msg: {}", id, tenantId, method, e);
+            log.debug("[{}][{}] Failed to process {} msg: {}", id, tenantId, method, e);
         }
         long ts = System.currentTimeMillis();
         if (ts - lastPersistedErrorTs > getErrorPersistFrequency()) {
diff --git a/application/src/main/java/org/thingsboard/server/actors/service/ContextAwareActor.java b/application/src/main/java/org/thingsboard/server/actors/service/ContextAwareActor.java
index 3624127..c9f1dcd 100644
--- a/application/src/main/java/org/thingsboard/server/actors/service/ContextAwareActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/service/ContextAwareActor.java
@@ -15,14 +15,16 @@
  */
 package org.thingsboard.server.actors.service;
 
+import akka.actor.Terminated;
 import akka.actor.UntypedActor;
 import akka.event.Logging;
 import akka.event.LoggingAdapter;
+import lombok.extern.slf4j.Slf4j;
 import org.thingsboard.server.actors.ActorSystemContext;
 import org.thingsboard.server.common.msg.TbActorMsg;
 
+@Slf4j
 public abstract class ContextAwareActor extends UntypedActor {
-    protected final LoggingAdapter logger = Logging.getLogger(getContext().system(), this);
 
     public static final int ENTITY_PACK_LIMIT = 1024;
 
@@ -35,21 +37,26 @@ public abstract class ContextAwareActor extends UntypedActor {
 
     @Override
     public void onReceive(Object msg) throws Exception {
-        if (logger.isDebugEnabled()) {
-            logger.debug("Processing msg: {}", msg);
+        if (log.isDebugEnabled()) {
+            log.debug("Processing msg: {}", msg);
         }
         if (msg instanceof TbActorMsg) {
             try {
                 if (!process((TbActorMsg) msg)) {
-                    logger.warning("Unknown message: {}!", msg);
+                    log.warn("Unknown message: {}!", msg);
                 }
             } catch (Exception e) {
                 throw e;
             }
+        } else if (msg instanceof Terminated) {
+            processTermination((Terminated) msg);
         } else {
-            logger.warning("Unknown message: {}!", msg);
+            log.warn("Unknown message: {}!", msg);
         }
     }
 
+    protected void processTermination(Terminated msg) {
+    }
+
     protected abstract boolean process(TbActorMsg msg);
 }
diff --git a/application/src/main/java/org/thingsboard/server/actors/shared/AbstractContextAwareMsgProcessor.java b/application/src/main/java/org/thingsboard/server/actors/shared/AbstractContextAwareMsgProcessor.java
index c809782..b707baf 100644
--- a/application/src/main/java/org/thingsboard/server/actors/shared/AbstractContextAwareMsgProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/shared/AbstractContextAwareMsgProcessor.java
@@ -22,22 +22,22 @@ import akka.event.LoggingAdapter;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import lombok.AllArgsConstructor;
 import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
 import org.thingsboard.server.actors.ActorSystemContext;
 import scala.concurrent.ExecutionContextExecutor;
 import scala.concurrent.duration.Duration;
 
 import java.util.concurrent.TimeUnit;
 
+@Slf4j
 public abstract class AbstractContextAwareMsgProcessor {
 
     protected final ActorSystemContext systemContext;
-    protected final LoggingAdapter logger;
     protected final ObjectMapper mapper = new ObjectMapper();
 
-    protected AbstractContextAwareMsgProcessor(ActorSystemContext systemContext, LoggingAdapter logger) {
+    protected AbstractContextAwareMsgProcessor(ActorSystemContext systemContext) {
         super();
         this.systemContext = systemContext;
-        this.logger = logger;
     }
 
     private Scheduler getScheduler() {
@@ -53,7 +53,7 @@ public abstract class AbstractContextAwareMsgProcessor {
     }
 
     private void schedulePeriodicMsgWithDelay(Object msg, long delayInMs, long periodInMs, ActorRef target) {
-        logger.debug("Scheduling periodic msg {} every {} ms with delay {} ms", msg, periodInMs, delayInMs);
+        log.debug("Scheduling periodic msg {} every {} ms with delay {} ms", msg, periodInMs, delayInMs);
         getScheduler().schedule(Duration.create(delayInMs, TimeUnit.MILLISECONDS), Duration.create(periodInMs, TimeUnit.MILLISECONDS), target, msg, getSystemDispatcher(), null);
     }
 
@@ -62,7 +62,7 @@ public abstract class AbstractContextAwareMsgProcessor {
     }
 
     private void scheduleMsgWithDelay(Object msg, long delayInMs, ActorRef target) {
-        logger.debug("Scheduling msg {} with delay {} ms", msg, delayInMs);
+        log.debug("Scheduling msg {} with delay {} ms", msg, delayInMs);
         getScheduler().scheduleOnce(Duration.create(delayInMs, TimeUnit.MILLISECONDS), target, msg, getSystemDispatcher(), null);
     }
 
diff --git a/application/src/main/java/org/thingsboard/server/actors/shared/ComponentMsgProcessor.java b/application/src/main/java/org/thingsboard/server/actors/shared/ComponentMsgProcessor.java
index c9dc307..9e5d063 100644
--- a/application/src/main/java/org/thingsboard/server/actors/shared/ComponentMsgProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/shared/ComponentMsgProcessor.java
@@ -19,6 +19,7 @@ import akka.actor.ActorContext;
 import akka.event.LoggingAdapter;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
+import lombok.extern.slf4j.Slf4j;
 import org.thingsboard.server.actors.ActorSystemContext;
 import org.thingsboard.server.actors.stats.StatsPersistTick;
 import org.thingsboard.server.common.data.id.EntityId;
@@ -30,14 +31,15 @@ import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
 import javax.annotation.Nullable;
 import java.util.function.Consumer;
 
+@Slf4j
 public abstract class ComponentMsgProcessor<T extends EntityId> extends AbstractContextAwareMsgProcessor {
 
     protected final TenantId tenantId;
     protected final T entityId;
     protected ComponentLifecycleState state;
 
-    protected ComponentMsgProcessor(ActorSystemContext systemContext, LoggingAdapter logger, TenantId tenantId, T id) {
-        super(systemContext, logger);
+    protected ComponentMsgProcessor(ActorSystemContext systemContext, TenantId tenantId, T id) {
+        super(systemContext);
         this.tenantId = tenantId;
         this.entityId = id;
     }
@@ -79,7 +81,7 @@ public abstract class ComponentMsgProcessor<T extends EntityId> extends Abstract
 
     protected void checkActive() {
         if (state != ComponentLifecycleState.ACTIVE) {
-            logger.warning("Rule chain is not active. Current state [{}] for processor [{}] tenant [{}]", state, tenantId, entityId);
+            log.warn("Rule chain is not active. Current state [{}] for processor [{}] tenant [{}]", state, tenantId, entityId);
             throw new IllegalStateException("Rule chain is not active! " + entityId + " - " + tenantId);
         }
     }
diff --git a/application/src/main/java/org/thingsboard/server/actors/shared/EntityActorsManager.java b/application/src/main/java/org/thingsboard/server/actors/shared/EntityActorsManager.java
index 1b9e6a8..dd03d69 100644
--- a/application/src/main/java/org/thingsboard/server/actors/shared/EntityActorsManager.java
+++ b/application/src/main/java/org/thingsboard/server/actors/shared/EntityActorsManager.java
@@ -20,6 +20,8 @@ import akka.actor.ActorRef;
 import akka.actor.Props;
 import akka.actor.UntypedActor;
 import akka.japi.Creator;
+import com.google.common.collect.BiMap;
+import com.google.common.collect.HashBiMap;
 import lombok.extern.slf4j.Slf4j;
 import org.thingsboard.server.actors.ActorSystemContext;
 import org.thingsboard.server.actors.service.ContextAwareActor;
@@ -39,11 +41,11 @@ import java.util.Map;
 public abstract class EntityActorsManager<T extends EntityId, A extends UntypedActor, M extends SearchTextBased<? extends UUIDBased>> {
 
     protected final ActorSystemContext systemContext;
-    protected final Map<T, ActorRef> actors;
+    protected final BiMap<T, ActorRef> actors;
 
     public EntityActorsManager(ActorSystemContext systemContext) {
         this.systemContext = systemContext;
-        this.actors = new HashMap<>();
+        this.actors = HashBiMap.create();
     }
 
     protected abstract TenantId getTenantId();
@@ -65,7 +67,8 @@ public abstract class EntityActorsManager<T extends EntityId, A extends UntypedA
         }
     }
 
-    public void visit(M entity, ActorRef actorRef) {}
+    public void visit(M entity, ActorRef actorRef) {
+    }
 
     public ActorRef getOrCreateActor(ActorContext context, T entityId) {
         return actors.computeIfAbsent(entityId, eId ->
diff --git a/application/src/main/java/org/thingsboard/server/actors/stats/StatsActor.java b/application/src/main/java/org/thingsboard/server/actors/stats/StatsActor.java
index 8623370..79aa6da 100644
--- a/application/src/main/java/org/thingsboard/server/actors/stats/StatsActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/stats/StatsActor.java
@@ -15,10 +15,9 @@
  */
 package org.thingsboard.server.actors.stats;
 
-import akka.event.Logging;
-import akka.event.LoggingAdapter;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import lombok.extern.slf4j.Slf4j;
 import org.thingsboard.server.actors.ActorSystemContext;
 import org.thingsboard.server.actors.service.ContextAwareActor;
 import org.thingsboard.server.actors.service.ContextBasedCreator;
@@ -27,9 +26,9 @@ import org.thingsboard.server.common.data.Event;
 import org.thingsboard.server.common.msg.TbActorMsg;
 import org.thingsboard.server.common.msg.cluster.ServerAddress;
 
+@Slf4j
 public class StatsActor extends ContextAwareActor {
 
-    private final LoggingAdapter logger = Logging.getLogger(getContext().system(), this);
     private final ObjectMapper mapper = new ObjectMapper();
 
     public StatsActor(ActorSystemContext context) {
@@ -43,13 +42,13 @@ public class StatsActor extends ContextAwareActor {
     }
 
     @Override
-    public void onReceive(Object msg) throws Exception {
-        logger.debug("Received message: {}", msg);
+    public void onReceive(Object msg) {
+        log.debug("Received message: {}", msg);
         if (msg instanceof StatsPersistMsg) {
             try {
                 onStatsPersistMsg((StatsPersistMsg) msg);
             } catch (Exception e) {
-                logger.warning("Failed to persist statistics: {}", msg, e);
+                log.warn("Failed to persist statistics: {}", msg, e);
             }
         }
     }
@@ -75,7 +74,7 @@ public class StatsActor extends ContextAwareActor {
         }
 
         @Override
-        public StatsActor create() throws Exception {
+        public StatsActor create() {
             return new StatsActor(context);
         }
     }
diff --git a/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java b/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java
index 721d828..b76e298 100644
--- a/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java
@@ -17,15 +17,19 @@ package org.thingsboard.server.actors.tenant;
 
 import akka.actor.ActorInitializationException;
 import akka.actor.ActorRef;
+import akka.actor.LocalActorRef;
 import akka.actor.OneForOneStrategy;
 import akka.actor.Props;
 import akka.actor.SupervisorStrategy;
+import akka.actor.Terminated;
 import akka.japi.Function;
+import com.google.common.collect.BiMap;
+import com.google.common.collect.HashBiMap;
+import lombok.extern.slf4j.Slf4j;
 import org.thingsboard.server.actors.ActorSystemContext;
-import org.thingsboard.server.actors.device.DeviceActor;
+import org.thingsboard.server.actors.device.DeviceActorCreator;
 import org.thingsboard.server.actors.device.DeviceActorToRuleEngineMsg;
 import org.thingsboard.server.actors.ruleChain.RuleChainManagerActor;
-import org.thingsboard.server.actors.ruleChain.RuleChainToRuleChainMsg;
 import org.thingsboard.server.actors.service.ContextBasedCreator;
 import org.thingsboard.server.actors.service.DefaultActorService;
 import org.thingsboard.server.actors.shared.rulechain.TenantRuleChainManager;
@@ -44,18 +48,18 @@ import scala.concurrent.duration.Duration;
 import java.util.HashMap;
 import java.util.Map;
 
+@Slf4j
 public class TenantActor extends RuleChainManagerActor {
 
     private final TenantId tenantId;
-    private final Map<DeviceId, ActorRef> deviceActors;
+    private final BiMap<DeviceId, ActorRef> deviceActors;
 
     private TenantActor(ActorSystemContext systemContext, TenantId tenantId) {
         super(systemContext, new TenantRuleChainManager(systemContext, tenantId));
         this.tenantId = tenantId;
-        this.deviceActors = new HashMap<>();
+        this.deviceActors = HashBiMap.create();
     }
 
-
     @Override
     public SupervisorStrategy supervisorStrategy() {
         return strategy;
@@ -63,12 +67,12 @@ public class TenantActor extends RuleChainManagerActor {
 
     @Override
     public void preStart() {
-        logger.info("[{}] Starting tenant actor.", tenantId);
+        log.info("[{}] Starting tenant actor.", tenantId);
         try {
             initRuleChains();
-            logger.info("[{}] Tenant actor started.", tenantId);
+            log.info("[{}] Tenant actor started.", tenantId);
         } catch (Exception e) {
-            logger.error(e, "[{}] Unknown failure", tenantId);
+            log.warn("[{}] Unknown failure", tenantId, e);
         }
     }
 
@@ -105,22 +109,20 @@ public class TenantActor extends RuleChainManagerActor {
         return true;
     }
 
-    @Override
-    protected void broadcast(Object msg) {
-        super.broadcast(msg);
-//        deviceActors.values().forEach(actorRef -> actorRef.tell(msg, ActorRef.noSender()));
-    }
-
     private void onServiceToRuleEngineMsg(ServiceToRuleEngineMsg msg) {
-    	if (ruleChainManager.getRootChainActor()!=null)
-        ruleChainManager.getRootChainActor().tell(msg, self());
-    	else logger.info("[{}] No Root Chain", msg);
+        if (ruleChainManager.getRootChainActor() != null) {
+            ruleChainManager.getRootChainActor().tell(msg, self());
+        } else {
+            log.info("[{}] No Root Chain: {}", tenantId, msg);
+        }
     }
 
     private void onDeviceActorToRuleEngineMsg(DeviceActorToRuleEngineMsg msg) {
-    	if (ruleChainManager.getRootChainActor()!=null)
-        ruleChainManager.getRootChainActor().tell(msg, self());
-    	else logger.info("[{}] No Root Chain", msg);
+        if (ruleChainManager.getRootChainActor() != null) {
+            ruleChainManager.getRootChainActor().tell(msg, self());
+        } else {
+            log.info("[{}] No Root Chain: {}", tenantId, msg);
+        }
     }
 
     private void onRuleChainMsg(RuleChainAwareMsg msg) {
@@ -141,13 +143,35 @@ public class TenantActor extends RuleChainManagerActor {
             }
             target.tell(msg, ActorRef.noSender());
         } else {
-            logger.debug("Invalid component lifecycle msg: {}", msg);
+            log.debug("[{}] Invalid component lifecycle msg: {}", tenantId, msg);
         }
     }
 
     private ActorRef getOrCreateDeviceActor(DeviceId deviceId) {
-        return deviceActors.computeIfAbsent(deviceId, k -> context().actorOf(Props.create(new DeviceActor.ActorCreator(systemContext, tenantId, deviceId))
-                .withDispatcher(DefaultActorService.CORE_DISPATCHER_NAME), deviceId.toString()));
+        return deviceActors.computeIfAbsent(deviceId, k -> {
+            log.debug("[{}][{}] Creating device actor.", tenantId, deviceId);
+            ActorRef deviceActor = context().actorOf(Props.create(new DeviceActorCreator(systemContext, tenantId, deviceId))
+                            .withDispatcher(DefaultActorService.CORE_DISPATCHER_NAME)
+                    , deviceId.toString());
+            context().watch(deviceActor);
+            log.debug("[{}][{}] Created device actor: {}.", tenantId, deviceId, deviceActor);
+            return deviceActor;
+        });
+    }
+
+    @Override
+    protected void processTermination(Terminated message) {
+        ActorRef terminated = message.actor();
+        if (terminated instanceof LocalActorRef) {
+            boolean removed = deviceActors.inverse().remove(terminated) != null;
+            if (removed) {
+                log.debug("[{}] Removed actor:", terminated);
+            } else {
+                log.warn("[{}] Removed actor was not found in the device map!");
+            }
+        } else {
+            throw new IllegalStateException("Remote actors are not supported!");
+        }
     }
 
     public static class ActorCreator extends ContextBasedCreator<TenantActor> {
@@ -161,7 +185,7 @@ public class TenantActor extends RuleChainManagerActor {
         }
 
         @Override
-        public TenantActor create() throws Exception {
+        public TenantActor create() {
             return new TenantActor(context, tenantId);
         }
     }
@@ -169,8 +193,8 @@ public class TenantActor extends RuleChainManagerActor {
     private final SupervisorStrategy strategy = new OneForOneStrategy(3, Duration.create("1 minute"), new Function<Throwable, SupervisorStrategy.Directive>() {
         @Override
         public SupervisorStrategy.Directive apply(Throwable t) {
-            logger.error(t, "Unknown failure");
-            if(t instanceof ActorInitializationException){
+            log.warn("[{}] Unknown failure", tenantId, t);
+            if (t instanceof ActorInitializationException) {
                 return SupervisorStrategy.stop();
             } else {
                 return SupervisorStrategy.resume();
diff --git a/application/src/main/java/org/thingsboard/server/controller/DashboardController.java b/application/src/main/java/org/thingsboard/server/controller/DashboardController.java
index 450b976..0bb6a66 100644
--- a/application/src/main/java/org/thingsboard/server/controller/DashboardController.java
+++ b/application/src/main/java/org/thingsboard/server/controller/DashboardController.java
@@ -52,7 +52,6 @@ public class DashboardController extends BaseController {
     public static final String DASHBOARD_ID = "dashboardId";
 
     @Value("${dashboard.max_datapoints_limit}")
-    @Getter
     private long maxDatapointsLimit;
 
 
diff --git a/application/src/main/java/org/thingsboard/server/service/security/model/token/RawAccessJwtToken.java b/application/src/main/java/org/thingsboard/server/service/security/model/token/RawAccessJwtToken.java
index 544b6fd..68d9ada 100644
--- a/application/src/main/java/org/thingsboard/server/service/security/model/token/RawAccessJwtToken.java
+++ b/application/src/main/java/org/thingsboard/server/service/security/model/token/RawAccessJwtToken.java
@@ -22,6 +22,7 @@ import io.jsonwebtoken.Jwts;
 import io.jsonwebtoken.MalformedJwtException;
 import io.jsonwebtoken.SignatureException;
 import io.jsonwebtoken.UnsupportedJwtException;
+import lombok.extern.slf4j.Slf4j;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.security.authentication.BadCredentialsException;
@@ -29,12 +30,11 @@ import org.thingsboard.server.service.security.exception.JwtExpiredTokenExceptio
 
 import java.io.Serializable;
 
+@Slf4j
 public class RawAccessJwtToken implements JwtToken, Serializable {
 
     private static final long serialVersionUID = -797397445703066079L;
 
-    private static Logger logger = LoggerFactory.getLogger(RawAccessJwtToken.class);
-
     private String token;
 
     public RawAccessJwtToken(String token) {
@@ -52,10 +52,10 @@ public class RawAccessJwtToken implements JwtToken, Serializable {
         try {
             return Jwts.parser().setSigningKey(signingKey).parseClaimsJws(this.token);
         } catch (UnsupportedJwtException | MalformedJwtException | IllegalArgumentException | SignatureException ex) {
-            logger.error("Invalid JWT Token", ex);
+            log.error("Invalid JWT Token", ex);
             throw new BadCredentialsException("Invalid JWT token: ", ex);
         } catch (ExpiredJwtException expiredEx) {
-            logger.info("JWT Token is expired", expiredEx);
+            log.info("JWT Token is expired", expiredEx);
             throw new JwtExpiredTokenException(this, "JWT Token expired", expiredEx);
         }
     }
diff --git a/application/src/main/resources/actor-system.conf b/application/src/main/resources/actor-system.conf
index 763319e..28673f6 100644
--- a/application/src/main/resources/actor-system.conf
+++ b/application/src/main/resources/actor-system.conf
@@ -19,7 +19,7 @@ akka {
   # JVM shutdown, System.exit(-1), in case of a fatal error,
   # such as OutOfMemoryError
   jvm-exit-on-fatal-error = off
-  loglevel = "DEBUG"
+  loglevel = "INFO"
   loggers = ["akka.event.slf4j.Slf4jLogger"]
 }
 
diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml
index 51ddaa9..51a59c9 100644
--- a/application/src/main/resources/thingsboard.yml
+++ b/application/src/main/resources/thingsboard.yml
@@ -83,6 +83,7 @@ dashboard:
   max_datapoints_limit: "${DASHBOARD_MAX_DATAPOINTS_LIMIT:50000}"
 
 database:
+  ts_max_intervals: "${DATABASE_TS_MAX_INTERVALS:700}" # mas number of DB queries generated by single API call to fetch telemetry records
   entities:
     type: "${DATABASE_ENTITIES_TYPE:sql}" # cassandra OR sql
   ts:
diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java
index 4658cbe..1006499 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java
@@ -20,11 +20,13 @@ import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Service;
 import org.thingsboard.server.common.data.EntityType;
 import org.thingsboard.server.common.data.EntityView;
 import org.thingsboard.server.common.data.id.EntityId;
 import org.thingsboard.server.common.data.id.EntityViewId;
+import org.thingsboard.server.common.data.kv.Aggregation;
 import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery;
 import org.thingsboard.server.common.data.kv.DeleteTsKvQuery;
 import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
@@ -47,8 +49,11 @@ import static org.apache.commons.lang3.StringUtils.isBlank;
 @Slf4j
 public class BaseTimeseriesService implements TimeseriesService {
 
-    public static final int INSERTS_PER_ENTRY = 3;
-    public static final int DELETES_PER_ENTRY = INSERTS_PER_ENTRY;
+    private static final int INSERTS_PER_ENTRY = 3;
+    private static final int DELETES_PER_ENTRY = INSERTS_PER_ENTRY;
+
+    @Value("${database.ts_max_intervals}")
+    private long maxTsIntervals;
 
     @Autowired
     private TimeseriesDao timeseriesDao;
@@ -59,7 +64,7 @@ public class BaseTimeseriesService implements TimeseriesService {
     @Override
     public ListenableFuture<List<TsKvEntry>> findAll(EntityId entityId, List<ReadTsKvQuery> queries) {
         validate(entityId);
-        queries.forEach(BaseTimeseriesService::validate);
+        queries.forEach(this::validate);
         if (entityId.getEntityType().equals(EntityType.ENTITY_VIEW)) {
             EntityView entityView = entityViewService.findEntityViewById((EntityViewId) entityId);
             List<ReadTsKvQuery> filteredQueries =
@@ -189,7 +194,7 @@ public class BaseTimeseriesService implements TimeseriesService {
         Validator.validateEntityId(entityId, "Incorrect entityId " + entityId);
     }
 
-    private static void validate(ReadTsKvQuery query) {
+    private void validate(ReadTsKvQuery query) {
         if (query == null) {
             throw new IncorrectParameterException("ReadTsKvQuery can't be null");
         } else if (isBlank(query.getKey())) {
@@ -197,6 +202,14 @@ public class BaseTimeseriesService implements TimeseriesService {
         } else if (query.getAggregation() == null) {
             throw new IncorrectParameterException("Incorrect ReadTsKvQuery. Aggregation can't be empty");
         }
+        if(!Aggregation.NONE.equals(query.getAggregation())) {
+            long step = Math.max(query.getInterval(), 1000);
+            long intervalCounts = (query.getEndTs() - query.getStartTs()) / step;
+            if (intervalCounts > maxTsIntervals || intervalCounts < 0) {
+                throw new IncorrectParameterException("Incorrect TsKvQuery. Number of intervals is to high - " + intervalCounts + ". " +
+                        "Please increase 'interval' parameter for your query or reduce the time range of the query.");
+            }
+        }
     }
 
     private static void validate(DeleteTsKvQuery query) {
diff --git a/dao/src/test/resources/application-test.properties b/dao/src/test/resources/application-test.properties
index a61c285..3bd0f7d 100644
--- a/dao/src/test/resources/application-test.properties
+++ b/dao/src/test/resources/application-test.properties
@@ -35,4 +35,4 @@ redis.connection.port=6379
 redis.connection.db=0
 redis.connection.password=
 
-rule.queue.type=memory
+database.ts_max_intervals=700
\ No newline at end of file