thingsboard-aplcache

Details

diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
index 08a4454..a16bd78 100644
--- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
@@ -201,7 +201,6 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
 
     private Consumer<Map.Entry<Integer, ToDeviceRpcRequestMetadata>> processPendingRpc(ActorContext context, UUID sessionId, String nodeId, Set<Integer> sentOneWayIds) {
         return entry -> {
-            ToDeviceRpcRequestActorMsg requestActorMsg = entry.getValue().getMsg();
             ToDeviceRpcRequest request = entry.getValue().getMsg().getMsg();
             ToDeviceRpcRequestBody body = request.getBody();
             if (request.isOneway()) {
@@ -486,6 +485,12 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
             sessionMD.setLastActivityTime(subscriptionInfo.getLastActivityTime());
             sessionMD.setSubscribedToAttributes(subscriptionInfo.getAttributeSubscription());
             sessionMD.setSubscribedToRPC(subscriptionInfo.getRpcSubscription());
+            if (subscriptionInfo.getAttributeSubscription()) {
+                attributeSubscriptions.putIfAbsent(sessionId, sessionMD.getSessionInfo());
+            }
+            if (subscriptionInfo.getRpcSubscription()) {
+                rpcSubscriptions.putIfAbsent(sessionId, sessionMD.getSessionInfo());
+            }
         }
         dumpSessions();
     }
@@ -618,8 +623,10 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
     }
 
     private void restoreSessions() {
+        logger.debug("[{}] Restoring sessions from cache", deviceId);
         TransportProtos.DeviceSessionsCacheEntry sessionsDump = systemContext.getDeviceSessionCacheService().get(deviceId);
         if (sessionsDump.getSerializedSize() == 0) {
+            logger.debug("[{}] No session information found", deviceId);
             return;
         }
         for (TransportProtos.SessionSubscriptionInfoProto sessionSubscriptionInfoProto : sessionsDump.getSessionsList()) {
@@ -627,18 +634,23 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
             UUID sessionId = getSessionId(sessionInfoProto);
             SessionInfo sessionInfo = new SessionInfo(TransportProtos.SessionType.ASYNC, sessionInfoProto.getNodeId());
             TransportProtos.SubscriptionInfoProto subInfo = sessionSubscriptionInfoProto.getSubscriptionInfo();
-            SessionInfoMetaData sessionInfoMetaData = new SessionInfoMetaData(sessionInfo, subInfo.getLastActivityTime());
-            sessions.put(sessionId, sessionInfoMetaData);
-            if (subInfo.getAttributeSubscription()) {
-                rpcSubscriptions.put(sessionId, sessionInfo);
-            }
+            SessionInfoMetaData sessionMD = new SessionInfoMetaData(sessionInfo, subInfo.getLastActivityTime());
+            sessions.put(sessionId, sessionMD);
             if (subInfo.getAttributeSubscription()) {
                 attributeSubscriptions.put(sessionId, sessionInfo);
+                sessionMD.setSubscribedToAttributes(true);
+            }
+            if (subInfo.getRpcSubscription()) {
+                rpcSubscriptions.put(sessionId, sessionInfo);
+                sessionMD.setSubscribedToRPC(true);
             }
+            logger.debug("[{}] Restored session: {}", deviceId, sessionMD);
         }
+        logger.debug("[{}] Restored sessions: {}, rpc subscriptions: {}, attribute subscriptions: {}", deviceId, sessions.size(), rpcSubscriptions.size(), attributeSubscriptions.size());
     }
 
     private void dumpSessions() {
+        logger.debug("[{}] Dumping sessions: {}, rpc subscriptions: {}, attribute subscriptions: {} to cache", deviceId, sessions.size(), rpcSubscriptions.size(), attributeSubscriptions.size());
         List<TransportProtos.SessionSubscriptionInfoProto> sessionsList = new ArrayList<>(sessions.size());
         sessions.forEach((uuid, sessionMD) -> {
             if (sessionMD.getSessionInfo().getType() == TransportProtos.SessionType.SYNC) {
@@ -656,6 +668,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
             sessionsList.add(TransportProtos.SessionSubscriptionInfoProto.newBuilder()
                     .setSessionInfo(sessionInfoProto)
                     .setSubscriptionInfo(subscriptionInfoProto).build());
+            logger.debug("[{}] Dumping session: {}", deviceId, sessionMD);
         });
         systemContext.getDeviceSessionCacheService()
                 .put(deviceId, TransportProtos.DeviceSessionsCacheEntry.newBuilder()
diff --git a/application/src/main/java/org/thingsboard/server/actors/rpc/BasicRpcSessionListener.java b/application/src/main/java/org/thingsboard/server/actors/rpc/BasicRpcSessionListener.java
index 14bb636..1eba066 100644
--- a/application/src/main/java/org/thingsboard/server/actors/rpc/BasicRpcSessionListener.java
+++ b/application/src/main/java/org/thingsboard/server/actors/rpc/BasicRpcSessionListener.java
@@ -32,7 +32,7 @@ public class BasicRpcSessionListener implements GrpcSessionListener {
     private final ActorRef manager;
     private final ActorRef self;
 
-    public BasicRpcSessionListener(ActorService service, ActorRef manager, ActorRef self) {
+    BasicRpcSessionListener(ActorService service, ActorRef manager, ActorRef self) {
         this.service = service;
         this.manager = manager;
         this.self = self;
diff --git a/application/src/main/java/org/thingsboard/server/actors/rpc/RpcManagerActor.java b/application/src/main/java/org/thingsboard/server/actors/rpc/RpcManagerActor.java
index 66174e1..31320ce 100644
--- a/application/src/main/java/org/thingsboard/server/actors/rpc/RpcManagerActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/rpc/RpcManagerActor.java
@@ -103,10 +103,10 @@ public class RpcManagerActor extends ContextAwareActor {
             ServerAddress address = new ServerAddress(msg.getServerAddress().getHost(), msg.getServerAddress().getPort(), ServerType.CORE);
             SessionActorInfo session = sessionActors.get(address);
             if (session != null) {
-                log.debug("{} Forwarding msg to session actor", address);
+                log.debug("{} Forwarding msg to session actor: {}", address, msg);
                 session.getActor().tell(msg, ActorRef.noSender());
             } else {
-                log.debug("{} Storing msg to pending queue", address);
+                log.debug("{} Storing msg to pending queue: {}", address, msg);
                 Queue<ClusterAPIProtos.ClusterMessage> queue = pendingMsgs.get(address);
                 if (queue == null) {
                     queue = new LinkedList<>();
@@ -116,7 +116,7 @@ public class RpcManagerActor extends ContextAwareActor {
                 queue.add(msg);
             }
         } else {
-            logger.warning("Cluster msg doesn't have set Server Address [{}]", msg);
+            logger.warning("Cluster msg doesn't have server address [{}]", msg);
         }
     }
 
diff --git a/application/src/main/java/org/thingsboard/server/actors/rpc/RpcSessionActor.java b/application/src/main/java/org/thingsboard/server/actors/rpc/RpcSessionActor.java
index c9cf869..86509ca 100644
--- a/application/src/main/java/org/thingsboard/server/actors/rpc/RpcSessionActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/rpc/RpcSessionActor.java
@@ -18,6 +18,7 @@ package org.thingsboard.server.actors.rpc;
 import akka.event.Logging;
 import akka.event.LoggingAdapter;
 import io.grpc.Channel;
+import io.grpc.ManagedChannel;
 import io.grpc.ManagedChannelBuilder;
 import io.grpc.stub.StreamObserver;
 import org.thingsboard.server.actors.ActorSystemContext;
@@ -88,8 +89,8 @@ public class RpcSessionActor extends ContextAwareActor {
             systemContext.getRpcService().onSessionCreated(msg.getMsgUid(), session.getInputStream());
         } else {
             // Client session
-            Channel channel = ManagedChannelBuilder.forAddress(remoteServer.getHost(), remoteServer.getPort()).usePlaintext(true).build();
-            session = new GrpcSession(remoteServer, listener);
+            ManagedChannel channel = ManagedChannelBuilder.forAddress(remoteServer.getHost(), remoteServer.getPort()).usePlaintext().build();
+            session = new GrpcSession(remoteServer, listener, channel);
             session.initInputStream();
 
             ClusterRpcServiceGrpc.ClusterRpcServiceStub stub = ClusterRpcServiceGrpc.newStub(channel);
diff --git a/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java b/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java
index f822e4e..721d828 100644
--- a/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java
@@ -108,7 +108,7 @@ public class TenantActor extends RuleChainManagerActor {
     @Override
     protected void broadcast(Object msg) {
         super.broadcast(msg);
-        deviceActors.values().forEach(actorRef -> actorRef.tell(msg, ActorRef.noSender()));
+//        deviceActors.values().forEach(actorRef -> actorRef.tell(msg, ActorRef.noSender()));
     }
 
     private void onServiceToRuleEngineMsg(ServiceToRuleEngineMsg msg) {
diff --git a/application/src/main/java/org/thingsboard/server/service/cluster/rpc/GrpcSession.java b/application/src/main/java/org/thingsboard/server/service/cluster/rpc/GrpcSession.java
index 55cf52c..dd0a995 100644
--- a/application/src/main/java/org/thingsboard/server/service/cluster/rpc/GrpcSession.java
+++ b/application/src/main/java/org/thingsboard/server/service/cluster/rpc/GrpcSession.java
@@ -15,6 +15,8 @@
  */
 package org.thingsboard.server.service.cluster.rpc;
 
+import io.grpc.Channel;
+import io.grpc.ManagedChannel;
 import io.grpc.stub.StreamObserver;
 import lombok.Data;
 import lombok.extern.slf4j.Slf4j;
@@ -34,6 +36,7 @@ public final class GrpcSession implements Closeable {
     private final UUID sessionId;
     private final boolean client;
     private final GrpcSessionListener listener;
+    private final ManagedChannel channel;
     private StreamObserver<ClusterAPIProtos.ClusterMessage> inputStream;
     private StreamObserver<ClusterAPIProtos.ClusterMessage> outputStream;
 
@@ -41,10 +44,10 @@ public final class GrpcSession implements Closeable {
     private ServerAddress remoteServer;
 
     public GrpcSession(GrpcSessionListener listener) {
-        this(null, listener);
+        this(null, listener, null);
     }
 
-    public GrpcSession(ServerAddress remoteServer, GrpcSessionListener listener) {
+    public GrpcSession(ServerAddress remoteServer, GrpcSessionListener listener, ManagedChannel channel) {
         this.sessionId = UUID.randomUUID();
         this.listener = listener;
         if (remoteServer != null) {
@@ -54,6 +57,7 @@ public final class GrpcSession implements Closeable {
         } else {
             this.client = false;
         }
+        this.channel = channel;
     }
 
     public void initInputStream() {
@@ -105,5 +109,8 @@ public final class GrpcSession implements Closeable {
         } catch (IllegalStateException e) {
             log.debug("[{}] Failed to close output stream: {}", sessionId, e.getMessage());
         }
+        if (channel != null) {
+            channel.shutdownNow();
+        }
     }
 }
diff --git a/application/src/main/java/org/thingsboard/server/service/component/AnnotationComponentDiscoveryService.java b/application/src/main/java/org/thingsboard/server/service/component/AnnotationComponentDiscoveryService.java
index f3ceed2..5a1e559 100644
--- a/application/src/main/java/org/thingsboard/server/service/component/AnnotationComponentDiscoveryService.java
+++ b/application/src/main/java/org/thingsboard/server/service/component/AnnotationComponentDiscoveryService.java
@@ -30,7 +30,6 @@ import org.thingsboard.rule.engine.api.NodeConfiguration;
 import org.thingsboard.rule.engine.api.NodeDefinition;
 import org.thingsboard.rule.engine.api.RuleNode;
 import org.thingsboard.rule.engine.api.TbRelationTypes;
-import org.thingsboard.server.common.data.DataConstants;
 import org.thingsboard.server.common.data.plugin.ComponentDescriptor;
 import org.thingsboard.server.common.data.plugin.ComponentType;
 import org.thingsboard.server.dao.component.ComponentDescriptorService;
@@ -52,6 +51,7 @@ import java.util.Set;
 @Slf4j
 public class AnnotationComponentDiscoveryService implements ComponentDiscoveryService {
 
+    public static final int MAX_OPTIMISITC_RETRIES = 3;
     @Value("${plugins.scan_packages}")
     private String[] scanPackages;
 
@@ -81,17 +81,27 @@ public class AnnotationComponentDiscoveryService implements ComponentDiscoverySe
     private void registerRuleNodeComponents() {
         Set<BeanDefinition> ruleNodeBeanDefinitions = getBeanDefinitions(RuleNode.class);
         for (BeanDefinition def : ruleNodeBeanDefinitions) {
-            try {
-                String clazzName = def.getBeanClassName();
-                Class<?> clazz = Class.forName(clazzName);
-                RuleNode ruleNodeAnnotation = clazz.getAnnotation(RuleNode.class);
-                ComponentType type = ruleNodeAnnotation.type();
-                ComponentDescriptor component = scanAndPersistComponent(def, type);
-                components.put(component.getClazz(), component);
-                componentsMap.computeIfAbsent(type, k -> new ArrayList<>()).add(component);
-            } catch (Exception e) {
-                log.error("Can't initialize component {}, due to {}", def.getBeanClassName(), e.getMessage(), e);
-                throw new RuntimeException(e);
+            int retryCount = 0;
+            Exception cause = null;
+            while (retryCount < MAX_OPTIMISITC_RETRIES) {
+                try {
+                    String clazzName = def.getBeanClassName();
+                    Class<?> clazz = Class.forName(clazzName);
+                    RuleNode ruleNodeAnnotation = clazz.getAnnotation(RuleNode.class);
+                    ComponentType type = ruleNodeAnnotation.type();
+                    ComponentDescriptor component = scanAndPersistComponent(def, type);
+                    components.put(component.getClazz(), component);
+                    componentsMap.computeIfAbsent(type, k -> new ArrayList<>()).add(component);
+                    break;
+                } catch (Exception e) {
+                    log.trace("Can't initialize component {}, due to {}", def.getBeanClassName(), e.getMessage(), e);
+                    cause = e;
+                    retryCount++;
+                }
+            }
+            if (cause != null && retryCount == MAX_OPTIMISITC_RETRIES) {
+                log.error("Can't initialize component {}, due to {}", def.getBeanClassName(), cause.getMessage(), cause);
+                throw new RuntimeException(cause);
             }
         }
     }
diff --git a/application/src/main/java/org/thingsboard/server/service/transport/LocalTransportService.java b/application/src/main/java/org/thingsboard/server/service/transport/LocalTransportService.java
index d3782e8..3d12938 100644
--- a/application/src/main/java/org/thingsboard/server/service/transport/LocalTransportService.java
+++ b/application/src/main/java/org/thingsboard/server/service/transport/LocalTransportService.java
@@ -133,66 +133,48 @@ public class LocalTransportService extends AbstractTransportService implements R
     }
 
     @Override
-    public void process(SessionInfoProto sessionInfo, SessionEventMsg msg, TransportServiceCallback<Void> callback) {
-        if (checkLimits(sessionInfo, callback)) {
-            forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setSessionEvent(msg).build(), callback);
-        }
+    protected void doProcess(SessionInfoProto sessionInfo, SessionEventMsg msg, TransportServiceCallback<Void> callback) {
+        forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setSessionEvent(msg).build(), callback);
     }
 
     @Override
-    public void process(SessionInfoProto sessionInfo, PostTelemetryMsg msg, TransportServiceCallback<Void> callback) {
-        if (checkLimits(sessionInfo, callback)) {
-            forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setPostTelemetry(msg).build(), callback);
-        }
+    protected void doProcess(SessionInfoProto sessionInfo, PostTelemetryMsg msg, TransportServiceCallback<Void> callback) {
+        forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setPostTelemetry(msg).build(), callback);
     }
 
     @Override
-    public void process(SessionInfoProto sessionInfo, PostAttributeMsg msg, TransportServiceCallback<Void> callback) {
-        if (checkLimits(sessionInfo, callback)) {
-            forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setPostAttributes(msg).build(), callback);
-        }
+    protected void doProcess(SessionInfoProto sessionInfo, PostAttributeMsg msg, TransportServiceCallback<Void> callback) {
+        forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setPostAttributes(msg).build(), callback);
     }
 
     @Override
-    public void process(SessionInfoProto sessionInfo, GetAttributeRequestMsg msg, TransportServiceCallback<Void> callback) {
-        if (checkLimits(sessionInfo, callback)) {
-            forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setGetAttributes(msg).build(), callback);
-        }
+    protected void doProcess(SessionInfoProto sessionInfo, GetAttributeRequestMsg msg, TransportServiceCallback<Void> callback) {
+        forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setGetAttributes(msg).build(), callback);
     }
 
     @Override
     public void process(SessionInfoProto sessionInfo, TransportProtos.SubscriptionInfoProto msg, TransportServiceCallback<Void> callback) {
-        if (checkLimits(sessionInfo, callback)) {
-            forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setSubscriptionInfo(msg).build(), callback);
-        }
+        forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setSubscriptionInfo(msg).build(), callback);
     }
 
     @Override
-    public void process(SessionInfoProto sessionInfo, SubscribeToAttributeUpdatesMsg msg, TransportServiceCallback<Void> callback) {
-        if (checkLimits(sessionInfo, callback)) {
-            forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setSubscribeToAttributes(msg).build(), callback);
-        }
+    protected void doProcess(SessionInfoProto sessionInfo, SubscribeToAttributeUpdatesMsg msg, TransportServiceCallback<Void> callback) {
+        forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setSubscribeToAttributes(msg).build(), callback);
     }
 
     @Override
-    public void process(SessionInfoProto sessionInfo, SubscribeToRPCMsg msg, TransportServiceCallback<Void> callback) {
-        if (checkLimits(sessionInfo, callback)) {
-            forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setSubscribeToRPC(msg).build(), callback);
-        }
+    protected void doProcess(SessionInfoProto sessionInfo, SubscribeToRPCMsg msg, TransportServiceCallback<Void> callback) {
+        forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setSubscribeToRPC(msg).build(), callback);
     }
 
     @Override
-    public void process(SessionInfoProto sessionInfo, ToDeviceRpcResponseMsg msg, TransportServiceCallback<Void> callback) {
-        if (checkLimits(sessionInfo, callback)) {
-            forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setToDeviceRPCCallResponse(msg).build(), callback);
-        }
+    protected void doProcess(SessionInfoProto sessionInfo, ToDeviceRpcResponseMsg msg, TransportServiceCallback<Void> callback) {
+        forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setToDeviceRPCCallResponse(msg).build(), callback);
     }
 
     @Override
-    public void process(SessionInfoProto sessionInfo, ToServerRpcRequestMsg msg, TransportServiceCallback<Void> callback) {
-        if (checkLimits(sessionInfo, callback)) {
-            forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setToServerRPCCallRequest(msg).build(), callback);
-        }
+    protected void doProcess(SessionInfoProto sessionInfo, ToServerRpcRequestMsg msg, TransportServiceCallback<Void> callback) {
+        forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setToServerRPCCallRequest(msg).build(), callback);
     }
 
     @Override
diff --git a/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaProducerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaProducerTemplate.java
index 610a490..bd42f31 100644
--- a/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaProducerTemplate.java
+++ b/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaProducerTemplate.java
@@ -26,6 +26,7 @@ import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.errors.TopicExistsException;
 import org.apache.kafka.common.header.Header;
 
 import java.util.List;
@@ -75,7 +76,11 @@ public class TBKafkaProducerTemplate<T> {
             CreateTopicsResult result = admin.createTopic(new NewTopic(defaultTopic, 100, (short) 1));
             result.all().get();
         } catch (Exception e) {
-            log.trace("Failed to create topic: {}", e.getMessage(), e);
+            if ((e instanceof TopicExistsException) || (e.getCause() != null && e.getCause() instanceof TopicExistsException)) {
+                log.trace("[{}] Topic already exists: ", defaultTopic);
+            } else {
+                log.trace("[{}] Failed to create topic: {}", defaultTopic, e.getMessage(), e);
+            }
         }
         //Maybe this should not be cached, but we don't plan to change size of partitions
         this.partitionInfoMap = new ConcurrentHashMap<>();
diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/AbstractTransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/AbstractTransportService.java
index 3bb84ec..fad1954 100644
--- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/AbstractTransportService.java
+++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/AbstractTransportService.java
@@ -68,44 +68,68 @@ public abstract class AbstractTransportService implements TransportService {
 
     @Override
     public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.SessionEventMsg msg, TransportServiceCallback<Void> callback) {
-        reportActivityInternal(sessionInfo);
+        if (checkLimits(sessionInfo, callback)) {
+            reportActivityInternal(sessionInfo);
+            doProcess(sessionInfo, msg, callback);
+        }
     }
 
     @Override
     public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.PostTelemetryMsg msg, TransportServiceCallback<Void> callback) {
-        reportActivityInternal(sessionInfo);
+        if (checkLimits(sessionInfo, callback)) {
+            reportActivityInternal(sessionInfo);
+            doProcess(sessionInfo, msg, callback);
+        }
     }
 
     @Override
     public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.PostAttributeMsg msg, TransportServiceCallback<Void> callback) {
-        reportActivityInternal(sessionInfo);
+        if (checkLimits(sessionInfo, callback)) {
+            reportActivityInternal(sessionInfo);
+            doProcess(sessionInfo, msg, callback);
+        }
     }
 
     @Override
     public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.GetAttributeRequestMsg msg, TransportServiceCallback<Void> callback) {
-        reportActivityInternal(sessionInfo);
+        if (checkLimits(sessionInfo, callback)) {
+            reportActivityInternal(sessionInfo);
+            doProcess(sessionInfo, msg, callback);
+        }
     }
 
     @Override
     public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.SubscribeToAttributeUpdatesMsg msg, TransportServiceCallback<Void> callback) {
-        SessionMetaData sessionMetaData = reportActivityInternal(sessionInfo);
-        sessionMetaData.setSubscribedToAttributes(!msg.getUnsubscribe());
+        if (checkLimits(sessionInfo, callback)) {
+            SessionMetaData sessionMetaData = reportActivityInternal(sessionInfo);
+            sessionMetaData.setSubscribedToAttributes(!msg.getUnsubscribe());
+            doProcess(sessionInfo, msg, callback);
+        }
     }
 
     @Override
     public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.SubscribeToRPCMsg msg, TransportServiceCallback<Void> callback) {
-        SessionMetaData sessionMetaData = reportActivityInternal(sessionInfo);
-        sessionMetaData.setSubscribedToRPC(!msg.getUnsubscribe());
+        if (checkLimits(sessionInfo, callback)) {
+            SessionMetaData sessionMetaData = reportActivityInternal(sessionInfo);
+            sessionMetaData.setSubscribedToRPC(!msg.getUnsubscribe());
+            doProcess(sessionInfo, msg, callback);
+        }
     }
 
     @Override
     public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.ToDeviceRpcResponseMsg msg, TransportServiceCallback<Void> callback) {
-        reportActivityInternal(sessionInfo);
+        if (checkLimits(sessionInfo, callback)) {
+            reportActivityInternal(sessionInfo);
+            doProcess(sessionInfo, msg, callback);
+        }
     }
 
     @Override
     public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.ToServerRpcRequestMsg msg, TransportServiceCallback<Void> callback) {
-        reportActivityInternal(sessionInfo);
+        if (checkLimits(sessionInfo, callback)) {
+            reportActivityInternal(sessionInfo);
+            doProcess(sessionInfo, msg, callback);
+        }
     }
 
     @Override
@@ -113,6 +137,22 @@ public abstract class AbstractTransportService implements TransportService {
         reportActivityInternal(sessionInfo);
     }
 
+    protected abstract void doProcess(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.SessionEventMsg msg, TransportServiceCallback<Void> callback);
+
+    protected abstract void doProcess(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.PostTelemetryMsg msg, TransportServiceCallback<Void> callback);
+
+    protected abstract void doProcess(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.PostAttributeMsg msg, TransportServiceCallback<Void> callback);
+
+    protected abstract void doProcess(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.GetAttributeRequestMsg msg, TransportServiceCallback<Void> callback);
+
+    protected abstract void doProcess(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.SubscribeToAttributeUpdatesMsg msg, TransportServiceCallback<Void> callback);
+
+    protected abstract void doProcess(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.SubscribeToRPCMsg msg, TransportServiceCallback<Void> callback);
+
+    protected abstract void doProcess(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.ToDeviceRpcResponseMsg msg, TransportServiceCallback<Void> callback);
+
+    protected abstract void doProcess(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.ToServerRpcRequestMsg msg, TransportServiceCallback<Void> callback);
+
     private SessionMetaData reportActivityInternal(TransportProtos.SessionInfoProto sessionInfo) {
         UUID sessionId = toId(sessionInfo);
         SessionMetaData sessionMetaData = sessions.get(sessionId);
diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/RemoteTransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/RemoteTransportService.java
index 0910961..4b11bf5 100644
--- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/RemoteTransportService.java
+++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/RemoteTransportService.java
@@ -217,102 +217,84 @@ public class RemoteTransportService extends AbstractTransportService {
     }
 
     @Override
-    public void process(SessionInfoProto sessionInfo, SessionEventMsg msg, TransportServiceCallback<Void> callback) {
-        if (checkLimits(sessionInfo, callback)) {
-            ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
-                    TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
-                            .setSessionEvent(msg).build()
-            ).build();
-            send(sessionInfo, toRuleEngineMsg, callback);
-        }
+    public void process(SessionInfoProto sessionInfo, SubscriptionInfoProto msg, TransportServiceCallback<Void> callback) {
+        ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
+                TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
+                        .setSubscriptionInfo(msg).build()
+        ).build();
+        send(sessionInfo, toRuleEngineMsg, callback);
     }
 
     @Override
-    public void process(SessionInfoProto sessionInfo, SubscriptionInfoProto msg, TransportServiceCallback<Void> callback) {
-        if (checkLimits(sessionInfo, callback)) {
-            ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
-                    TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
-                            .setSubscriptionInfo(msg).build()
-            ).build();
-            send(sessionInfo, toRuleEngineMsg, callback);
-        }
+    protected void doProcess(SessionInfoProto sessionInfo, SessionEventMsg msg, TransportServiceCallback<Void> callback) {
+        ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
+                TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
+                        .setSessionEvent(msg).build()
+        ).build();
+        send(sessionInfo, toRuleEngineMsg, callback);
     }
 
     @Override
-    public void process(SessionInfoProto sessionInfo, PostTelemetryMsg msg, TransportServiceCallback<Void> callback) {
-        if (checkLimits(sessionInfo, callback)) {
-            ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
-                    TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
-                            .setPostTelemetry(msg).build()
-            ).build();
-            send(sessionInfo, toRuleEngineMsg, callback);
-        }
+    protected void doProcess(SessionInfoProto sessionInfo, PostTelemetryMsg msg, TransportServiceCallback<Void> callback) {
+        ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
+                TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
+                        .setPostTelemetry(msg).build()
+        ).build();
+        send(sessionInfo, toRuleEngineMsg, callback);
     }
 
     @Override
-    public void process(SessionInfoProto sessionInfo, PostAttributeMsg msg, TransportServiceCallback<Void> callback) {
-        if (checkLimits(sessionInfo, callback)) {
-            ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
-                    TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
-                            .setPostAttributes(msg).build()
-            ).build();
-            send(sessionInfo, toRuleEngineMsg, callback);
-        }
+    protected void doProcess(SessionInfoProto sessionInfo, PostAttributeMsg msg, TransportServiceCallback<Void> callback) {
+        ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
+                TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
+                        .setPostAttributes(msg).build()
+        ).build();
+        send(sessionInfo, toRuleEngineMsg, callback);
     }
 
     @Override
-    public void process(SessionInfoProto sessionInfo, GetAttributeRequestMsg msg, TransportServiceCallback<Void> callback) {
-        if (checkLimits(sessionInfo, callback)) {
-            ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
-                    TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
-                            .setGetAttributes(msg).build()
-            ).build();
-            send(sessionInfo, toRuleEngineMsg, callback);
-        }
+    protected void doProcess(SessionInfoProto sessionInfo, GetAttributeRequestMsg msg, TransportServiceCallback<Void> callback) {
+        ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
+                TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
+                        .setGetAttributes(msg).build()
+        ).build();
+        send(sessionInfo, toRuleEngineMsg, callback);
     }
 
     @Override
-    public void process(SessionInfoProto sessionInfo, SubscribeToAttributeUpdatesMsg msg, TransportServiceCallback<Void> callback) {
-        if (checkLimits(sessionInfo, callback)) {
-            ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
-                    TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
-                            .setSubscribeToAttributes(msg).build()
-            ).build();
-            send(sessionInfo, toRuleEngineMsg, callback);
-        }
+    protected void doProcess(SessionInfoProto sessionInfo, SubscribeToAttributeUpdatesMsg msg, TransportServiceCallback<Void> callback) {
+        ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
+                TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
+                        .setSubscribeToAttributes(msg).build()
+        ).build();
+        send(sessionInfo, toRuleEngineMsg, callback);
     }
 
     @Override
-    public void process(SessionInfoProto sessionInfo, SubscribeToRPCMsg msg, TransportServiceCallback<Void> callback) {
-        if (checkLimits(sessionInfo, callback)) {
-            ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
-                    TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
-                            .setSubscribeToRPC(msg).build()
-            ).build();
-            send(sessionInfo, toRuleEngineMsg, callback);
-        }
+    protected void doProcess(SessionInfoProto sessionInfo, SubscribeToRPCMsg msg, TransportServiceCallback<Void> callback) {
+        ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
+                TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
+                        .setSubscribeToRPC(msg).build()
+        ).build();
+        send(sessionInfo, toRuleEngineMsg, callback);
     }
 
     @Override
-    public void process(SessionInfoProto sessionInfo, ToDeviceRpcResponseMsg msg, TransportServiceCallback<Void> callback) {
-        if (checkLimits(sessionInfo, callback)) {
-            ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
-                    TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
-                            .setToDeviceRPCCallResponse(msg).build()
-            ).build();
-            send(sessionInfo, toRuleEngineMsg, callback);
-        }
+    protected void doProcess(SessionInfoProto sessionInfo, ToDeviceRpcResponseMsg msg, TransportServiceCallback<Void> callback) {
+        ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
+                TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
+                        .setToDeviceRPCCallResponse(msg).build()
+        ).build();
+        send(sessionInfo, toRuleEngineMsg, callback);
     }
 
     @Override
-    public void process(SessionInfoProto sessionInfo, ToServerRpcRequestMsg msg, TransportServiceCallback<Void> callback) {
-        if (checkLimits(sessionInfo, callback)) {
-            ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
-                    TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
-                            .setToServerRPCCallRequest(msg).build()
-            ).build();
-            send(sessionInfo, toRuleEngineMsg, callback);
-        }
+    protected void doProcess(SessionInfoProto sessionInfo, ToServerRpcRequestMsg msg, TransportServiceCallback<Void> callback) {
+        ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
+                TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
+                        .setToServerRPCCallRequest(msg).build()
+        ).build();
+        send(sessionInfo, toRuleEngineMsg, callback);
     }
 
     private static class TransportCallbackAdaptor implements Callback {
diff --git a/docker/tb-node/conf/logback.xml b/docker/tb-node/conf/logback.xml
index 1c69f53..8bf38eb 100644
--- a/docker/tb-node/conf/logback.xml
+++ b/docker/tb-node/conf/logback.xml
@@ -40,7 +40,7 @@
         </encoder>
     </appender>
 
-    <logger name="org.thingsboard.server" level="INFO" />
+    <logger name="org.thingsboard.server" level="TRACE" />
     <logger name="akka" level="INFO" />
 
     <root level="INFO">