thingsboard-aplcache

Improvements to Cluster mode

5/17/2018 3:13:06 AM

Changes

pom.xml 2(+1 -1)

Details

diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
index 3345f81..2f5e20d 100644
--- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
@@ -23,7 +23,6 @@ import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.gson.Gson;
-import com.google.gson.JsonArray;
 import com.google.gson.JsonObject;
 import com.google.gson.JsonParser;
 import org.thingsboard.server.actors.ActorSystemContext;
@@ -47,7 +46,7 @@ import org.thingsboard.server.common.msg.core.AttributesUpdateRequest;
 import org.thingsboard.server.common.msg.core.BasicCommandAckResponse;
 import org.thingsboard.server.common.msg.core.BasicGetAttributesResponse;
 import org.thingsboard.server.common.msg.core.BasicStatusCodeResponse;
-import org.thingsboard.server.common.msg.core.BasicToDeviceSessionActorMsg;
+import org.thingsboard.server.common.msg.core.BasicActorSystemToDeviceSessionActorMsg;
 import org.thingsboard.server.common.msg.core.GetAttributesRequest;
 import org.thingsboard.server.common.msg.core.RuleEngineError;
 import org.thingsboard.server.common.msg.core.RuleEngineErrorMsg;
@@ -57,13 +56,12 @@ import org.thingsboard.server.common.msg.core.SessionOpenMsg;
 import org.thingsboard.server.common.msg.core.TelemetryUploadRequest;
 import org.thingsboard.server.common.msg.core.ToDeviceRpcRequestMsg;
 import org.thingsboard.server.common.msg.core.ToDeviceRpcResponseMsg;
-import org.thingsboard.server.common.msg.core.ToDeviceSessionActorMsg;
+import org.thingsboard.server.common.msg.core.ActorSystemToDeviceSessionActorMsg;
 import org.thingsboard.server.common.msg.core.ToServerRpcRequestMsg;
 import org.thingsboard.server.common.msg.device.DeviceToDeviceActorMsg;
 import org.thingsboard.server.common.msg.kv.BasicAttributeKVMsg;
 import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest;
 import org.thingsboard.server.common.msg.session.FromDeviceMsg;
-import org.thingsboard.server.common.msg.session.FromDeviceRequestMsg;
 import org.thingsboard.server.common.msg.session.SessionMsgType;
 import org.thingsboard.server.common.msg.session.SessionType;
 import org.thingsboard.server.common.msg.session.ToDeviceMsg;
@@ -74,7 +72,6 @@ import org.thingsboard.server.extensions.api.device.DeviceAttributesEventNotific
 import org.thingsboard.server.extensions.api.device.DeviceNameOrTypeUpdateMsg;
 import org.thingsboard.server.extensions.api.plugins.msg.FromDeviceRpcResponse;
 import org.thingsboard.server.extensions.api.plugins.msg.RpcError;
-import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
 import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg;
 import org.thingsboard.server.service.rpc.ToServerRpcResponseActorMsg;
 
@@ -156,7 +153,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
         boolean sent = rpcSubscriptions.size() > 0;
         Set<SessionId> syncSessionSet = new HashSet<>();
         rpcSubscriptions.entrySet().forEach(sub -> {
-            ToDeviceSessionActorMsg response = new BasicToDeviceSessionActorMsg(rpcRequest, sub.getKey());
+            ActorSystemToDeviceSessionActorMsg response = new BasicActorSystemToDeviceSessionActorMsg(rpcRequest, sub.getKey());
             sendMsgToSessionActor(response, sub.getValue().getServer());
             if (SessionType.SYNC == sub.getValue().getType()) {
                 syncSessionSet.add(sub.getKey());
@@ -198,7 +195,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
         if (data != null) {
             logger.debug("[{}] Queue put [{}] timeout detected!", deviceId, msg.getId());
             ToDeviceMsg toDeviceMsg = new RuleEngineErrorMsg(data.getSessionMsgType(), RuleEngineError.QUEUE_PUT_TIMEOUT);
-            sendMsgToSessionActor(new BasicToDeviceSessionActorMsg(toDeviceMsg, data.getSessionId()), data.getServerAddress());
+            sendMsgToSessionActor(new BasicActorSystemToDeviceSessionActorMsg(toDeviceMsg, data.getSessionId()), data.getServerAddress());
         }
     }
 
@@ -210,7 +207,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
             logger.debug("[{}] Queue put [{}] ack detected. Remaining acks: {}!", deviceId, msg.getId(), remainingAcks);
             if (remainingAcks == 0) {
                 ToDeviceMsg toDeviceMsg = BasicStatusCodeResponse.onSuccess(data.getSessionMsgType(), data.getRequestId());
-                sendMsgToSessionActor(new BasicToDeviceSessionActorMsg(toDeviceMsg, data.getSessionId()), data.getServerAddress());
+                sendMsgToSessionActor(new BasicActorSystemToDeviceSessionActorMsg(toDeviceMsg, data.getSessionId()), data.getServerAddress());
             }
         }
     }
@@ -248,7 +245,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
                     body.getMethod(),
                     body.getParams()
             );
-            ToDeviceSessionActorMsg response = new BasicToDeviceSessionActorMsg(rpcRequest, sessionId);
+            ActorSystemToDeviceSessionActorMsg response = new BasicActorSystemToDeviceSessionActorMsg(rpcRequest, sessionId);
             sendMsgToSessionActor(response, server);
         };
     }
@@ -302,14 +299,14 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
             public void onSuccess(@Nullable List<List<AttributeKvEntry>> result) {
                 BasicGetAttributesResponse response = BasicGetAttributesResponse.onSuccess(request.getMsgType(),
                         request.getRequestId(), BasicAttributeKVMsg.from(result.get(0), result.get(1)));
-                sendMsgToSessionActor(new BasicToDeviceSessionActorMsg(response, src.getSessionId()), src.getServerAddress());
+                sendMsgToSessionActor(new BasicActorSystemToDeviceSessionActorMsg(response, src.getSessionId()), src.getServerAddress());
             }
 
             @Override
             public void onFailure(Throwable t) {
                 if (t instanceof Exception) {
                     ToDeviceMsg toDeviceMsg = BasicStatusCodeResponse.onError(SessionMsgType.GET_ATTRIBUTES_REQUEST, request.getRequestId(), (Exception) t);
-                    sendMsgToSessionActor(new BasicToDeviceSessionActorMsg(toDeviceMsg, src.getSessionId()), src.getServerAddress());
+                    sendMsgToSessionActor(new BasicActorSystemToDeviceSessionActorMsg(toDeviceMsg, src.getSessionId()), src.getServerAddress());
                 } else {
                     logger.error("[{}] Failed to process attributes request", deviceId, t);
                 }
@@ -391,14 +388,14 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
         if (data != null) {
             logger.debug("[{}] Client side RPC request [{}] timeout detected!", deviceId, msg.getId());
             ToDeviceMsg toDeviceMsg = new RuleEngineErrorMsg(SessionMsgType.TO_SERVER_RPC_REQUEST, RuleEngineError.TIMEOUT);
-            sendMsgToSessionActor(new BasicToDeviceSessionActorMsg(toDeviceMsg, data.getSessionId()), data.getServer());
+            sendMsgToSessionActor(new BasicActorSystemToDeviceSessionActorMsg(toDeviceMsg, data.getSessionId()), data.getServer());
         }
     }
 
     void processToServerRPCResponse(ActorContext context, ToServerRpcResponseActorMsg msg) {
         ToServerRpcRequestMetadata data = toServerRpcPendingMap.remove(msg.getMsg().getRequestId());
         if (data != null) {
-            sendMsgToSessionActor(new BasicToDeviceSessionActorMsg(msg.getMsg(), data.getSessionId()), data.getServer());
+            sendMsgToSessionActor(new BasicActorSystemToDeviceSessionActorMsg(msg.getMsg(), data.getSessionId()), data.getServer());
         }
     }
 
@@ -409,7 +406,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
             pendingMsgs.put(tbMsg.getId(), pendingMsgData);
             scheduleMsgWithDelay(context, new DeviceActorQueueTimeoutMsg(tbMsg.getId(), systemContext.getQueuePersistenceTimeout()), systemContext.getQueuePersistenceTimeout());
         } else {
-            ToDeviceSessionActorMsg response = new BasicToDeviceSessionActorMsg(BasicStatusCodeResponse.onSuccess(sessionMsgType, requestId), pendingMsgData.getSessionId());
+            ActorSystemToDeviceSessionActorMsg response = new BasicActorSystemToDeviceSessionActorMsg(BasicStatusCodeResponse.onSuccess(sessionMsgType, requestId), pendingMsgData.getSessionId());
             sendMsgToSessionActor(response, pendingMsgData.getServerAddress());
         }
         context.parent().tell(new DeviceActorToRuleEngineMsg(context.self(), tbMsg), context.self());
@@ -436,7 +433,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
             if (notification != null) {
                 ToDeviceMsg finalNotification = notification;
                 attributeSubscriptions.entrySet().forEach(sub -> {
-                    ToDeviceSessionActorMsg response = new BasicToDeviceSessionActorMsg(finalNotification, sub.getKey());
+                    ActorSystemToDeviceSessionActorMsg response = new BasicActorSystemToDeviceSessionActorMsg(finalNotification, sub.getKey());
                     sendMsgToSessionActor(response, sub.getValue().getServer());
                 });
             }
@@ -462,7 +459,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
                 BasicCommandAckResponse response = success
                         ? BasicCommandAckResponse.onSuccess(SessionMsgType.TO_DEVICE_RPC_REQUEST, responseMsg.getRequestId())
                         : BasicCommandAckResponse.onError(SessionMsgType.TO_DEVICE_RPC_REQUEST, responseMsg.getRequestId(), new TimeoutException());
-                sendMsgToSessionActor(new BasicToDeviceSessionActorMsg(response, msg.getSessionId()), msg.getServerAddress());
+                sendMsgToSessionActor(new BasicActorSystemToDeviceSessionActorMsg(response, msg.getSessionId()), msg.getServerAddress());
             }
         }
     }
@@ -517,7 +514,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
         }
     }
 
-    private void sendMsgToSessionActor(ToDeviceSessionActorMsg response, Optional<ServerAddress> sessionAddress) {
+    private void sendMsgToSessionActor(ActorSystemToDeviceSessionActorMsg response, Optional<ServerAddress> sessionAddress) {
         if (sessionAddress.isPresent()) {
             ServerAddress address = sessionAddress.get();
             logger.debug("{} Forwarding msg: {}", address, response);
@@ -530,7 +527,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
 
     void processCredentialsUpdate() {
         sessions.forEach((k, v) -> {
-            sendMsgToSessionActor(new BasicToDeviceSessionActorMsg(new SessionCloseNotification(), k), v.getServer());
+            sendMsgToSessionActor(new BasicActorSystemToDeviceSessionActorMsg(new SessionCloseNotification(), k), v.getServer());
         });
         attributeSubscriptions.clear();
         rpcSubscriptions.clear();
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
index f8bc94f..85d1b54 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
@@ -43,6 +43,7 @@ import org.thingsboard.server.dao.user.UserService;
 import org.thingsboard.server.service.script.NashornJsEngine;
 import scala.concurrent.duration.Duration;
 
+import java.util.Collections;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
@@ -63,15 +64,24 @@ class DefaultTbContext implements TbContext {
 
     @Override
     public void tellNext(TbMsg msg, String relationType) {
-        tellNext(msg, relationType, null);
+        tellNext(msg, Collections.singleton(relationType), null);
+    }
+
+    @Override
+    public void tellNext(TbMsg msg, Set<String> relationTypes) {
+        tellNext(msg, relationTypes, null);
     }
 
     @Override
     public void tellNext(TbMsg msg, String relationType, Throwable th) {
+        tellNext(msg, Collections.singleton(relationType), th);
+    }
+
+    private void tellNext(TbMsg msg, Set<String> relationTypes, Throwable th) {
         if (nodeCtx.getSelf().isDebugMode()) {
-            mainCtx.persistDebugOutput(nodeCtx.getTenantId(), nodeCtx.getSelf().getId(), msg, relationType, th);
+            relationTypes.forEach(relationType -> mainCtx.persistDebugOutput(nodeCtx.getTenantId(), nodeCtx.getSelf().getId(), msg, relationType, th));
         }
-        nodeCtx.getChainActor().tell(new RuleNodeToRuleChainTellNextMsg(nodeCtx.getSelf().getId(), relationType, msg), nodeCtx.getSelfActor());
+        nodeCtx.getChainActor().tell(new RuleNodeToRuleChainTellNextMsg(nodeCtx.getSelf().getId(), relationTypes, msg), nodeCtx.getSelfActor());
     }
 
     @Override
@@ -118,12 +128,6 @@ class DefaultTbContext implements TbContext {
     }
 
     @Override
-    public void tellNext(TbMsg msg, Set<String> relationTypes) {
-        //TODO: fix this to send set of relations instead of loop.
-        relationTypes.forEach(type -> tellNext(msg, type));
-    }
-
-    @Override
     public ListeningExecutor getJsExecutor() {
         return mainCtx.getJsExecutor();
     }
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActor.java
index 4812002..3ba646a 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActor.java
@@ -54,6 +54,8 @@ public class RuleChainActor extends ComponentActor<RuleChainId, RuleChainActorMe
             case RULE_CHAIN_TO_RULE_CHAIN_MSG:
                 processor.onRuleChainToRuleChainMsg((RuleChainToRuleChainMsg) msg);
                 break;
+            case CLUSTER_EVENT_MSG:
+                break;
             default:
                 return false;
         }
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
index 38a2ab7..ac902a7 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
@@ -206,9 +206,8 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
     void onTellNext(RuleNodeToRuleChainTellNextMsg envelope) {
         checkActive();
         RuleNodeId originator = envelope.getOriginator();
-        String targetRelationType = envelope.getRelationType();
         List<RuleNodeRelation> relations = nodeRoutes.get(originator).stream()
-                .filter(r -> targetRelationType == null || targetRelationType.equalsIgnoreCase(r.getType()))
+                .filter(r -> contains(envelope.getRelationTypes(), r.getType()))
                 .collect(Collectors.toList());
 
         TbMsg msg = envelope.getMsg();
@@ -237,6 +236,18 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
         }
     }
 
+    private boolean contains(Set<String> relationTypes, String type) {
+        if (relationTypes == null) {
+            return true;
+        }
+        for (String relationType : relationTypes) {
+            if (relationType.equalsIgnoreCase(type)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
     private void enqueueAndForwardMsgCopyToChain(TbMsg msg, EntityId target, String fromRelationType) {
         RuleChainId targetRCId = new RuleChainId(target.getId());
         TbMsg copyMsg = msg.copy(UUIDs.timeBased(), targetRCId, null, DEFAULT_CLUSTER_PARTITION);
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeToRuleChainTellNextMsg.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeToRuleChainTellNextMsg.java
index 054284d..c0a475c 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeToRuleChainTellNextMsg.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeToRuleChainTellNextMsg.java
@@ -21,6 +21,8 @@ import org.thingsboard.server.common.msg.MsgType;
 import org.thingsboard.server.common.msg.TbActorMsg;
 import org.thingsboard.server.common.msg.TbMsg;
 
+import java.util.Set;
+
 /**
  * Created by ashvayka on 19.03.18.
  */
@@ -28,7 +30,7 @@ import org.thingsboard.server.common.msg.TbMsg;
 final class RuleNodeToRuleChainTellNextMsg implements TbActorMsg {
 
     private final RuleNodeId originator;
-    private final String relationType;
+    private final Set<String> relationTypes;
     private final TbMsg msg;
 
     @Override
diff --git a/application/src/main/java/org/thingsboard/server/actors/service/ContextAwareActor.java b/application/src/main/java/org/thingsboard/server/actors/service/ContextAwareActor.java
index 1d9c671..3624127 100644
--- a/application/src/main/java/org/thingsboard/server/actors/service/ContextAwareActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/service/ContextAwareActor.java
@@ -39,8 +39,12 @@ public abstract class ContextAwareActor extends UntypedActor {
             logger.debug("Processing msg: {}", msg);
         }
         if (msg instanceof TbActorMsg) {
-            if(!process((TbActorMsg) msg)){
-                logger.warning("Unknown message: {}!", msg);
+            try {
+                if (!process((TbActorMsg) msg)) {
+                    logger.warning("Unknown message: {}!", msg);
+                }
+            } catch (Exception e) {
+                throw e;
             }
         } else {
             logger.warning("Unknown message: {}!", msg);
diff --git a/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java b/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java
index 56f3228..c80e913 100644
--- a/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java
+++ b/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java
@@ -211,6 +211,10 @@ public class DefaultActorService implements ActorService {
     @Override
     public void onReceivedMsg(ServerAddress source, ClusterAPIProtos.ClusterMessage msg) {
         ServerAddress serverAddress = new ServerAddress(source.getHost(), source.getPort());
+        log.info("Received msg [{}] from [{}]", msg.getMessageType().name(), serverAddress);
+        if(log.isDebugEnabled()){
+            log.info("MSG: ", msg);
+        }
         switch (msg.getMessageType()) {
             case CLUSTER_ACTOR_MESSAGE:
                 java.util.Optional<TbActorMsg> decodedMsg = actorContext.getEncodingService()
diff --git a/application/src/main/java/org/thingsboard/server/actors/session/AbstractSessionActorMsgProcessor.java b/application/src/main/java/org/thingsboard/server/actors/session/AbstractSessionActorMsgProcessor.java
index fab51a5..42e127e 100644
--- a/application/src/main/java/org/thingsboard/server/actors/session/AbstractSessionActorMsgProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/session/AbstractSessionActorMsgProcessor.java
@@ -45,7 +45,7 @@ abstract class AbstractSessionActorMsgProcessor extends AbstractContextAwareMsgP
         this.sessionId = sessionId;
     }
 
-    protected abstract void processToDeviceActorMsg(ActorContext ctx, ToDeviceActorSessionMsg msg);
+    protected abstract void processToDeviceActorMsg(ActorContext ctx, TransportToDeviceSessionActorMsg msg);
 
     protected abstract void processTimeoutMsg(ActorContext context, SessionTimeoutMsg msg);
 
@@ -63,12 +63,12 @@ abstract class AbstractSessionActorMsgProcessor extends AbstractContextAwareMsgP
     protected void cleanupSession(ActorContext ctx) {
     }
 
-    protected void updateSessionCtx(ToDeviceActorSessionMsg msg, SessionType type) {
+    protected void updateSessionCtx(TransportToDeviceSessionActorMsg msg, SessionType type) {
         sessionCtx = msg.getSessionMsg().getSessionContext();
         deviceToDeviceActorMsgPrototype = new BasicDeviceToDeviceActorMsg(msg, type);
     }
 
-    protected DeviceToDeviceActorMsg toDeviceMsg(ToDeviceActorSessionMsg msg) {
+    protected DeviceToDeviceActorMsg toDeviceMsg(TransportToDeviceSessionActorMsg msg) {
         AdaptorToSessionActorMsg adaptorMsg = msg.getSessionMsg();
         return new BasicDeviceToDeviceActorMsg(deviceToDeviceActorMsgPrototype, adaptorMsg.getMsg());
     }
diff --git a/application/src/main/java/org/thingsboard/server/actors/session/ASyncMsgProcessor.java b/application/src/main/java/org/thingsboard/server/actors/session/ASyncMsgProcessor.java
index fa5287f..83d56e0 100644
--- a/application/src/main/java/org/thingsboard/server/actors/session/ASyncMsgProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/session/ASyncMsgProcessor.java
@@ -46,7 +46,7 @@ class ASyncMsgProcessor extends AbstractSessionActorMsgProcessor {
     }
 
     @Override
-    protected void processToDeviceActorMsg(ActorContext ctx, ToDeviceActorSessionMsg msg) {
+    protected void processToDeviceActorMsg(ActorContext ctx, TransportToDeviceSessionActorMsg msg) {
         updateSessionCtx(msg, SessionType.ASYNC);
         if (firstMsg) {
             toDeviceMsg(new SessionOpenMsg()).ifPresent(m -> forwardToAppActor(ctx, m));
diff --git a/application/src/main/java/org/thingsboard/server/actors/session/SessionActor.java b/application/src/main/java/org/thingsboard/server/actors/session/SessionActor.java
index 9d324c5..f67d46b 100644
--- a/application/src/main/java/org/thingsboard/server/actors/session/SessionActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/session/SessionActor.java
@@ -17,7 +17,6 @@ package org.thingsboard.server.actors.session;
 
 import akka.actor.OneForOneStrategy;
 import akka.actor.SupervisorStrategy;
-import akka.japi.Function;
 import org.thingsboard.server.actors.ActorSystemContext;
 import org.thingsboard.server.actors.service.ContextAwareActor;
 import org.thingsboard.server.actors.service.ContextBasedCreator;
@@ -25,8 +24,8 @@ import org.thingsboard.server.actors.shared.SessionTimeoutMsg;
 import org.thingsboard.server.common.data.id.SessionId;
 import org.thingsboard.server.common.msg.TbActorMsg;
 import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
-import org.thingsboard.server.common.msg.core.ToDeviceSessionActorMsg;
-import org.thingsboard.server.common.msg.session.ToDeviceActorSessionMsg;
+import org.thingsboard.server.common.msg.core.ActorSystemToDeviceSessionActorMsg;
+import org.thingsboard.server.common.msg.session.TransportToDeviceSessionActorMsg;
 import org.thingsboard.server.common.msg.session.SessionCtrlMsg;
 import org.thingsboard.server.common.msg.session.SessionMsg;
 import org.thingsboard.server.common.msg.session.SessionType;
@@ -63,38 +62,37 @@ public class SessionActor extends ContextAwareActor {
 
     @Override
     protected boolean process(TbActorMsg msg) {
-        //TODO Move everything here, to work with TbActorMsg
-        return false;
-    }
-
-    @Override
-    public void onReceive(Object msg) throws Exception {
-        logger.debug("[{}] Processing: {}.", sessionId, msg);
-        if (msg instanceof ToDeviceActorSessionMsg) {
-            processDeviceMsg((ToDeviceActorSessionMsg) msg);
-        } else if (msg instanceof ToDeviceSessionActorMsg) {
-            processToDeviceMsg((ToDeviceSessionActorMsg) msg);
-        } else if (msg instanceof SessionTimeoutMsg) {
-            processTimeoutMsg((SessionTimeoutMsg) msg);
-        } else if (msg instanceof SessionCtrlMsg) {
-            processSessionCtrlMsg((SessionCtrlMsg) msg);
-        } else if (msg instanceof ClusterEventMsg) {
-            processClusterEvent((ClusterEventMsg) msg);
-        } else {
-            logger.warning("[{}] Unknown msg: {}", sessionId, msg);
+        switch (msg.getMsgType()) {
+            case TRANSPORT_TO_DEVICE_SESSION_ACTOR_MSG:
+                processTransportToSessionMsg((TransportToDeviceSessionActorMsg) msg);
+                break;
+            case ACTOR_SYSTEM_TO_DEVICE_SESSION_ACTOR_MSG:
+                processActorsToSessionMsg((ActorSystemToDeviceSessionActorMsg) msg);
+                break;
+            case SESSION_TIMEOUT_MSG:
+                processTimeoutMsg((SessionTimeoutMsg) msg);
+                break;
+            case SESSION_CTRL_MSG:
+                processSessionCloseMsg((SessionCtrlMsg) msg);
+                break;
+            case CLUSTER_EVENT_MSG:
+                processClusterEvent((ClusterEventMsg) msg);
+                break;
+            default: return false;
         }
+        return true;
     }
 
     private void processClusterEvent(ClusterEventMsg msg) {
         processor.processClusterEvent(context(), msg);
     }
 
-    private void processDeviceMsg(ToDeviceActorSessionMsg msg) {
+    private void processTransportToSessionMsg(TransportToDeviceSessionActorMsg msg) {
         initProcessor(msg);
         processor.processToDeviceActorMsg(context(), msg);
     }
 
-    private void processToDeviceMsg(ToDeviceSessionActorMsg msg) {
+    private void processActorsToSessionMsg(ActorSystemToDeviceSessionActorMsg msg) {
         processor.processToDeviceMsg(context(), msg.getMsg());
     }
 
@@ -106,7 +104,7 @@ public class SessionActor extends ContextAwareActor {
         }
     }
 
-    private void processSessionCtrlMsg(SessionCtrlMsg msg) {
+    private void processSessionCloseMsg(SessionCtrlMsg msg) {
         if (processor != null) {
             processor.processSessionCtrlMsg(context(), msg);
         } else if (msg instanceof SessionCloseMsg) {
@@ -116,7 +114,7 @@ public class SessionActor extends ContextAwareActor {
         }
     }
 
-    private void initProcessor(ToDeviceActorSessionMsg msg) {
+    private void initProcessor(TransportToDeviceSessionActorMsg msg) {
         if (processor == null) {
             SessionMsg sessionMsg = (SessionMsg) msg.getSessionMsg();
             if (sessionMsg.getSessionContext().getSessionType() == SessionType.SYNC) {
diff --git a/application/src/main/java/org/thingsboard/server/actors/session/SessionManagerActor.java b/application/src/main/java/org/thingsboard/server/actors/session/SessionManagerActor.java
index b5b1791..1f8bb6d 100644
--- a/application/src/main/java/org/thingsboard/server/actors/session/SessionManagerActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/session/SessionManagerActor.java
@@ -17,7 +17,6 @@ package org.thingsboard.server.actors.session;
 
 import java.util.HashMap;
 import java.util.Map;
-import java.util.UUID;
 
 import akka.actor.*;
 import org.thingsboard.server.actors.ActorSystemContext;
@@ -33,8 +32,9 @@ import akka.event.Logging;
 import akka.event.LoggingAdapter;
 import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
 import org.thingsboard.server.common.msg.core.SessionCloseMsg;
-import org.thingsboard.server.common.msg.core.ToDeviceSessionActorMsg;
+import org.thingsboard.server.common.msg.core.ActorSystemToDeviceSessionActorMsg;
 import org.thingsboard.server.common.msg.session.SessionCtrlMsg;
+import org.thingsboard.server.common.msg.session.TransportToDeviceSessionActorMsg;
 
 public class SessionManagerActor extends ContextAwareActor {
 
@@ -104,7 +104,7 @@ public class SessionManagerActor extends ContextAwareActor {
     }
 
     private void forwardToSessionActor(SessionAwareMsg msg) {
-        if (msg instanceof ToDeviceSessionActorMsg || msg instanceof SessionCloseMsg) {
+        if (msg instanceof ActorSystemToDeviceSessionActorMsg || msg instanceof SessionCloseMsg) {
             String sessionIdStr = msg.getSessionId().toUidStr();
             ActorRef sessionActor = sessionActors.get(sessionIdStr);
             if (sessionActor != null) {
diff --git a/application/src/main/java/org/thingsboard/server/actors/session/SyncMsgProcessor.java b/application/src/main/java/org/thingsboard/server/actors/session/SyncMsgProcessor.java
index 6f71d79..a0f0a88 100644
--- a/application/src/main/java/org/thingsboard/server/actors/session/SyncMsgProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/session/SyncMsgProcessor.java
@@ -22,7 +22,7 @@ import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
 import org.thingsboard.server.common.msg.cluster.ServerAddress;
 import org.thingsboard.server.common.msg.device.DeviceToDeviceActorMsg;
 import org.thingsboard.server.common.msg.session.*;
-import org.thingsboard.server.common.msg.session.ToDeviceActorSessionMsg;
+import org.thingsboard.server.common.msg.session.TransportToDeviceSessionActorMsg;
 import org.thingsboard.server.common.msg.session.ctrl.SessionCloseMsg;
 import org.thingsboard.server.common.msg.session.ex.SessionException;
 
@@ -41,7 +41,7 @@ class SyncMsgProcessor extends AbstractSessionActorMsgProcessor {
     }
 
     @Override
-    protected void processToDeviceActorMsg(ActorContext ctx, ToDeviceActorSessionMsg msg) {
+    protected void processToDeviceActorMsg(ActorContext ctx, TransportToDeviceSessionActorMsg msg) {
         updateSessionCtx(msg, SessionType.SYNC);
         pendingMsg = toDeviceMsg(msg);
         pendingResponse = true;
diff --git a/application/src/main/java/org/thingsboard/server/actors/shared/SessionTimeoutMsg.java b/application/src/main/java/org/thingsboard/server/actors/shared/SessionTimeoutMsg.java
index 7d6dbca..d015fe0 100644
--- a/application/src/main/java/org/thingsboard/server/actors/shared/SessionTimeoutMsg.java
+++ b/application/src/main/java/org/thingsboard/server/actors/shared/SessionTimeoutMsg.java
@@ -17,13 +17,20 @@ package org.thingsboard.server.actors.shared;
 
 import lombok.Data;
 import org.thingsboard.server.common.data.id.SessionId;
+import org.thingsboard.server.common.msg.MsgType;
+import org.thingsboard.server.common.msg.TbActorMsg;
 
 import java.io.Serializable;
 
 @Data
-public class SessionTimeoutMsg implements Serializable {
+public class SessionTimeoutMsg implements Serializable, TbActorMsg {
 
     private static final long serialVersionUID = 1L;
 
     private final SessionId sessionId;
+
+    @Override
+    public MsgType getMsgType() {
+        return MsgType.SESSION_TIMEOUT_MSG;
+    }
 }
diff --git a/application/src/main/java/org/thingsboard/server/service/cluster/routing/ConsistentClusterRoutingService.java b/application/src/main/java/org/thingsboard/server/service/cluster/routing/ConsistentClusterRoutingService.java
index 4067797..8ab6f99 100644
--- a/application/src/main/java/org/thingsboard/server/service/cluster/routing/ConsistentClusterRoutingService.java
+++ b/application/src/main/java/org/thingsboard/server/service/cluster/routing/ConsistentClusterRoutingService.java
@@ -107,7 +107,7 @@ public class ConsistentClusterRoutingService implements ClusterRoutingService, D
 
     @Override
     public void onServerAdded(ServerInstance server) {
-        log.debug("On server added event: {}", server);
+        log.info("On server added event: {}", server);
         addNode(server);
         logCircle();
     }
@@ -119,7 +119,7 @@ public class ConsistentClusterRoutingService implements ClusterRoutingService, D
 
     @Override
     public void onServerRemoved(ServerInstance server) {
-        log.debug("On server removed event: {}", server);
+        log.info("On server removed event: {}", server);
         removeNode(server);
         logCircle();
     }
diff --git a/application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterGrpcService.java b/application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterGrpcService.java
index b73a6f5..9236219 100644
--- a/application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterGrpcService.java
+++ b/application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterGrpcService.java
@@ -22,16 +22,11 @@ import io.grpc.stub.StreamObserver;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
-import org.springframework.util.SerializationUtils;
 import org.thingsboard.server.actors.rpc.RpcBroadcastMsg;
 import org.thingsboard.server.actors.rpc.RpcSessionCreateRequestMsg;
-import org.thingsboard.server.common.data.id.EntityId;
 import org.thingsboard.server.common.msg.TbActorMsg;
 import org.thingsboard.server.common.msg.cluster.ServerAddress;
-import org.thingsboard.server.common.msg.core.ToDeviceSessionActorMsg;
 
-import org.thingsboard.server.extensions.api.plugins.rpc.RpcMsg;
-import org.thingsboard.server.extensions.core.plugin.telemetry.sub.Subscription;
 import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
 
 import org.thingsboard.server.gen.cluster.ClusterRpcServiceGrpc;
diff --git a/application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterRpcService.java b/application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterRpcService.java
index 0dc324b..de29b89 100644
--- a/application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterRpcService.java
+++ b/application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterRpcService.java
@@ -19,15 +19,7 @@ import io.grpc.stub.StreamObserver;
 import org.thingsboard.server.actors.rpc.RpcBroadcastMsg;
 import org.thingsboard.server.common.msg.TbActorMsg;
 import org.thingsboard.server.common.msg.cluster.ServerAddress;
-import org.thingsboard.server.common.msg.cluster.ToAllNodesMsg;
-import org.thingsboard.server.common.msg.core.ToDeviceSessionActorMsg;
-import org.thingsboard.server.common.msg.device.DeviceToDeviceActorMsg;
-import org.thingsboard.server.extensions.api.device.ToDeviceActorNotificationMsg;
-import org.thingsboard.server.extensions.api.plugins.msg.ToPluginRpcResponseDeviceMsg;
-import org.thingsboard.server.extensions.api.plugins.rpc.PluginRpcMsg;
-import org.thingsboard.server.extensions.core.plugin.telemetry.sub.Subscription;
 import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
-import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg;
 
 import java.util.UUID;
 
diff --git a/application/src/main/resources/logback.xml b/application/src/main/resources/logback.xml
index 1779912..978a570 100644
--- a/application/src/main/resources/logback.xml
+++ b/application/src/main/resources/logback.xml
@@ -25,7 +25,7 @@
         </encoder>
     </appender>
 
-    <logger name="org.thingsboard.server" level="TRACE" />
+    <logger name="org.thingsboard.server" level="INFO" />
     <logger name="akka" level="INFO" />
 
     <root level="INFO">
diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/device/BasicDeviceToDeviceActorMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/device/BasicDeviceToDeviceActorMsg.java
index 1da19cb..92c5105 100644
--- a/common/message/src/main/java/org/thingsboard/server/common/msg/device/BasicDeviceToDeviceActorMsg.java
+++ b/common/message/src/main/java/org/thingsboard/server/common/msg/device/BasicDeviceToDeviceActorMsg.java
@@ -24,7 +24,7 @@ import org.thingsboard.server.common.msg.MsgType;
 import org.thingsboard.server.common.msg.cluster.ServerAddress;
 import org.thingsboard.server.common.msg.session.FromDeviceMsg;
 import org.thingsboard.server.common.msg.session.SessionType;
-import org.thingsboard.server.common.msg.session.ToDeviceActorSessionMsg;
+import org.thingsboard.server.common.msg.session.TransportToDeviceSessionActorMsg;
 
 import java.util.Optional;
 
@@ -45,7 +45,7 @@ public class BasicDeviceToDeviceActorMsg implements DeviceToDeviceActorMsg {
         this(null, other.getTenantId(), other.getCustomerId(), other.getDeviceId(), other.getSessionId(), other.getSessionType(), msg);
     }
 
-    public BasicDeviceToDeviceActorMsg(ToDeviceActorSessionMsg msg, SessionType sessionType) {
+    public BasicDeviceToDeviceActorMsg(TransportToDeviceSessionActorMsg msg, SessionType sessionType) {
         this(null, msg.getTenantId(), msg.getCustomerId(), msg.getDeviceId(), msg.getSessionId(), sessionType, msg.getSessionMsg().getMsg());
     }
 
diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java b/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java
index c7a96db..7702788 100644
--- a/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java
+++ b/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java
@@ -101,6 +101,6 @@ public enum MsgType {
     /**
      * Message that is sent from Rule Engine to the Device Actor when message is successfully pushed to queue.
      */
-    RULE_ENGINE_QUEUE_PUT_ACK_MSG;
+    RULE_ENGINE_QUEUE_PUT_ACK_MSG, ACTOR_SYSTEM_TO_DEVICE_SESSION_ACTOR_MSG, TRANSPORT_TO_DEVICE_SESSION_ACTOR_MSG, SESSION_TIMEOUT_MSG, SESSION_CTRL_MSG;
 
 }
diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/session/ctrl/SessionCloseMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/session/ctrl/SessionCloseMsg.java
index a24bdcd..49fad13 100644
--- a/common/message/src/main/java/org/thingsboard/server/common/msg/session/ctrl/SessionCloseMsg.java
+++ b/common/message/src/main/java/org/thingsboard/server/common/msg/session/ctrl/SessionCloseMsg.java
@@ -16,6 +16,7 @@
 package org.thingsboard.server.common.msg.session.ctrl;
 
 import org.thingsboard.server.common.data.id.SessionId;
+import org.thingsboard.server.common.msg.MsgType;
 import org.thingsboard.server.common.msg.session.SessionCtrlMsg;
 
 public class SessionCloseMsg implements SessionCtrlMsg {
@@ -60,4 +61,8 @@ public class SessionCloseMsg implements SessionCtrlMsg {
         return timeout;
     }
 
+    @Override
+    public MsgType getMsgType() {
+        return MsgType.SESSION_CTRL_MSG;
+    }
 }
diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/session/SessionCtrlMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/session/SessionCtrlMsg.java
index 19ca219..8082f72 100644
--- a/common/message/src/main/java/org/thingsboard/server/common/msg/session/SessionCtrlMsg.java
+++ b/common/message/src/main/java/org/thingsboard/server/common/msg/session/SessionCtrlMsg.java
@@ -15,8 +15,9 @@
  */
 package org.thingsboard.server.common.msg.session;
 
+import org.thingsboard.server.common.msg.TbActorMsg;
 import org.thingsboard.server.common.msg.aware.SessionAwareMsg;
 
-public interface SessionCtrlMsg extends SessionAwareMsg {
+public interface SessionCtrlMsg extends SessionAwareMsg, TbActorMsg {
 
 }

pom.xml 2(+1 -1)

diff --git a/pom.xml b/pom.xml
index d6a0bef..dfccb7d 100755
--- a/pom.xml
+++ b/pom.xml
@@ -59,7 +59,7 @@
         <velocity.version>1.7</velocity.version>
         <velocity-tools.version>2.0</velocity-tools.version>
         <mail.version>1.4.3</mail.version>
-        <curator.version>2.11.0</curator.version>
+        <curator.version>4.0.1</curator.version>
         <protobuf.version>3.0.2</protobuf.version>
         <grpc.version>1.12.0</grpc.version>
         <lombok.version>1.16.18</lombok.version>
diff --git a/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java b/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java
index 7674549..bac68ff 100644
--- a/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java
+++ b/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java
@@ -38,8 +38,6 @@ import org.thingsboard.server.common.transport.quota.QuotaService;
 import org.thingsboard.server.transport.coap.adaptors.CoapTransportAdaptor;
 import org.thingsboard.server.transport.coap.session.CoapExchangeObserverProxy;
 import org.thingsboard.server.transport.coap.session.CoapSessionCtx;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.springframework.util.ReflectionUtils;
 
 @Slf4j
@@ -186,7 +184,7 @@ public class CoapTransportResource extends CoapResource {
                     throw new IllegalArgumentException("Unsupported msg type: " + type);
             }
             log.trace("Processing msg: {}", msg);
-            processor.process(new BasicToDeviceActorSessionMsg(ctx.getDevice(), msg));
+            processor.process(new BasicTransportToDeviceSessionActorMsg(ctx.getDevice(), msg));
         } catch (AdaptorException e) {
             log.debug("Failed to decode payload {}", e);
             exchange.respond(ResponseCode.BAD_REQUEST, e.getMessage());
diff --git a/transport/coap/src/test/java/org/thingsboard/server/transport/coap/CoapServerTest.java b/transport/coap/src/test/java/org/thingsboard/server/transport/coap/CoapServerTest.java
index 60b2220..320f06e 100644
--- a/transport/coap/src/test/java/org/thingsboard/server/transport/coap/CoapServerTest.java
+++ b/transport/coap/src/test/java/org/thingsboard/server/transport/coap/CoapServerTest.java
@@ -108,8 +108,8 @@ public class CoapServerTest {
 
                 @Override
                 public void process(SessionAwareMsg toActorMsg) {
-                    if (toActorMsg instanceof ToDeviceActorSessionMsg) {
-                        AdaptorToSessionActorMsg sessionMsg = ((ToDeviceActorSessionMsg) toActorMsg).getSessionMsg();
+                    if (toActorMsg instanceof TransportToDeviceSessionActorMsg) {
+                        AdaptorToSessionActorMsg sessionMsg = ((TransportToDeviceSessionActorMsg) toActorMsg).getSessionMsg();
                         try {
                             FromDeviceMsg deviceMsg = sessionMsg.getMsg();
                             ToDeviceMsg toDeviceMsg = null;
diff --git a/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java b/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java
index 4d90b5f..930b442 100644
--- a/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java
+++ b/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java
@@ -30,7 +30,7 @@ import org.thingsboard.server.common.data.security.DeviceTokenCredentials;
 import org.thingsboard.server.common.msg.core.*;
 import org.thingsboard.server.common.msg.session.AdaptorToSessionActorMsg;
 import org.thingsboard.server.common.msg.session.BasicAdaptorToSessionActorMsg;
-import org.thingsboard.server.common.msg.session.BasicToDeviceActorSessionMsg;
+import org.thingsboard.server.common.msg.session.BasicTransportToDeviceSessionActorMsg;
 import org.thingsboard.server.common.msg.session.FromDeviceMsg;
 import org.thingsboard.server.common.transport.SessionMsgProcessor;
 import org.thingsboard.server.common.transport.adaptor.JsonConverter;
@@ -219,7 +219,7 @@ public class DeviceApiController {
 
     private void process(HttpSessionCtx ctx, FromDeviceMsg request) {
         AdaptorToSessionActorMsg msg = new BasicAdaptorToSessionActorMsg(ctx, request);
-        processor.process(new BasicToDeviceActorSessionMsg(ctx.getDevice(), msg));
+        processor.process(new BasicTransportToDeviceSessionActorMsg(ctx.getDevice(), msg));
     }
 
     private boolean quotaExceeded(HttpServletRequest request, DeferredResult<ResponseEntity> responseWriter) {
diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
index 5ccce35..0b38817 100644
--- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
+++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
@@ -30,7 +30,7 @@ import org.thingsboard.server.common.data.Device;
 import org.thingsboard.server.common.data.security.DeviceTokenCredentials;
 import org.thingsboard.server.common.data.security.DeviceX509Credentials;
 import org.thingsboard.server.common.msg.session.AdaptorToSessionActorMsg;
-import org.thingsboard.server.common.msg.session.BasicToDeviceActorSessionMsg;
+import org.thingsboard.server.common.msg.session.BasicTransportToDeviceSessionActorMsg;
 import org.thingsboard.server.common.msg.session.ctrl.SessionCloseMsg;
 import org.thingsboard.server.common.transport.SessionMsgProcessor;
 import org.thingsboard.server.common.transport.adaptor.AdaptorException;
@@ -207,7 +207,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
             log.warn("[{}] Failed to process publish msg [{}][{}]", sessionId, topicName, msgId, e);
         }
         if (msg != null) {
-            processor.process(new BasicToDeviceActorSessionMsg(deviceSessionCtx.getDevice(), msg));
+            processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg));
         } else {
             log.info("[{}] Closing current session due to invalid publish msg [{}][{}]", sessionId, topicName, msgId);
             ctx.close();
@@ -227,11 +227,11 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
             try {
                 if (topicName.equals(DEVICE_ATTRIBUTES_TOPIC)) {
                     AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, SUBSCRIBE_ATTRIBUTES_REQUEST, mqttMsg);
-                    processor.process(new BasicToDeviceActorSessionMsg(deviceSessionCtx.getDevice(), msg));
+                    processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg));
                     grantedQoSList.add(getMinSupportedQos(reqQoS));
                 } else if (topicName.equals(DEVICE_RPC_REQUESTS_SUB_TOPIC)) {
                     AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, SUBSCRIBE_RPC_COMMANDS_REQUEST, mqttMsg);
-                    processor.process(new BasicToDeviceActorSessionMsg(deviceSessionCtx.getDevice(), msg));
+                    processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg));
                     grantedQoSList.add(getMinSupportedQos(reqQoS));
                 } else if (topicName.equals(DEVICE_RPC_RESPONSE_SUB_TOPIC)) {
                     grantedQoSList.add(getMinSupportedQos(reqQoS));
@@ -261,10 +261,10 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
             try {
                 if (topicName.equals(DEVICE_ATTRIBUTES_TOPIC)) {
                     AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, UNSUBSCRIBE_ATTRIBUTES_REQUEST, mqttMsg);
-                    processor.process(new BasicToDeviceActorSessionMsg(deviceSessionCtx.getDevice(), msg));
+                    processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg));
                 } else if (topicName.equals(DEVICE_RPC_REQUESTS_SUB_TOPIC)) {
                     AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, UNSUBSCRIBE_RPC_COMMANDS_REQUEST, mqttMsg);
-                    processor.process(new BasicToDeviceActorSessionMsg(deviceSessionCtx.getDevice(), msg));
+                    processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg));
                 } else if (topicName.equals(DEVICE_ATTRIBUTES_RESPONSES_TOPIC)) {
                     deviceSessionCtx.setDisallowAttributeResponses();
                 }
diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java
index 2056452..f666bb8 100644
--- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java
+++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java
@@ -30,7 +30,7 @@ import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
 import org.thingsboard.server.common.data.relation.EntityRelation;
 import org.thingsboard.server.common.msg.core.*;
 import org.thingsboard.server.common.msg.session.BasicAdaptorToSessionActorMsg;
-import org.thingsboard.server.common.msg.session.BasicToDeviceActorSessionMsg;
+import org.thingsboard.server.common.msg.session.BasicTransportToDeviceSessionActorMsg;
 import org.thingsboard.server.common.msg.session.ctrl.SessionCloseMsg;
 import org.thingsboard.server.common.transport.SessionMsgProcessor;
 import org.thingsboard.server.common.transport.adaptor.AdaptorException;
@@ -96,8 +96,8 @@ public class GatewaySessionCtx {
             GatewayDeviceSessionCtx ctx = new GatewayDeviceSessionCtx(this, device);
             devices.put(deviceName, ctx);
             log.debug("[{}] Added device [{}] to the gateway session", gatewaySessionId, deviceName);
-            processor.process(new BasicToDeviceActorSessionMsg(device, new BasicAdaptorToSessionActorMsg(ctx, new AttributesSubscribeMsg())));
-            processor.process(new BasicToDeviceActorSessionMsg(device, new BasicAdaptorToSessionActorMsg(ctx, new RpcSubscribeMsg())));
+            processor.process(new BasicTransportToDeviceSessionActorMsg(device, new BasicAdaptorToSessionActorMsg(ctx, new AttributesSubscribeMsg())));
+            processor.process(new BasicTransportToDeviceSessionActorMsg(device, new BasicAdaptorToSessionActorMsg(ctx, new RpcSubscribeMsg())));
         }
     }
 
@@ -136,7 +136,7 @@ public class GatewaySessionCtx {
                     JsonConverter.parseWithTs(request, element.getAsJsonObject());
                 }
                 GatewayDeviceSessionCtx deviceSessionCtx = devices.get(deviceName);
-                processor.process(new BasicToDeviceActorSessionMsg(deviceSessionCtx.getDevice(),
+                processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(),
                         new BasicAdaptorToSessionActorMsg(deviceSessionCtx, request)));
             }
         } else {
@@ -152,7 +152,7 @@ public class GatewaySessionCtx {
             Integer requestId = jsonObj.get("id").getAsInt();
             String data = jsonObj.get("data").toString();
             GatewayDeviceSessionCtx deviceSessionCtx = devices.get(deviceName);
-            processor.process(new BasicToDeviceActorSessionMsg(deviceSessionCtx.getDevice(),
+            processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(),
                     new BasicAdaptorToSessionActorMsg(deviceSessionCtx, new ToDeviceRpcResponseMsg(requestId, data))));
         } else {
             throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json);
@@ -174,7 +174,7 @@ public class GatewaySessionCtx {
                 JsonObject deviceData = deviceEntry.getValue().getAsJsonObject();
                 request.add(JsonConverter.parseValues(deviceData).stream().map(kv -> new BaseAttributeKvEntry(kv, ts)).collect(Collectors.toList()));
                 GatewayDeviceSessionCtx deviceSessionCtx = devices.get(deviceName);
-                processor.process(new BasicToDeviceActorSessionMsg(deviceSessionCtx.getDevice(),
+                processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(),
                         new BasicAdaptorToSessionActorMsg(deviceSessionCtx, request)));
             }
         } else {
@@ -207,7 +207,7 @@ public class GatewaySessionCtx {
                 request = new BasicGetAttributesRequest(requestId, null, keys);
             }
             GatewayDeviceSessionCtx deviceSessionCtx = devices.get(deviceName);
-            processor.process(new BasicToDeviceActorSessionMsg(deviceSessionCtx.getDevice(),
+            processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(),
                     new BasicAdaptorToSessionActorMsg(deviceSessionCtx, request)));
             ack(msg);
         } else {