thingsboard-aplcache

Changes

application/src/main/java/org/thingsboard/server/service/cluster/rpc/RpcSessionCreationFuture.java 63(+0 -63)

dao/pom.xml 12(+0 -12)

pom.xml 27(+5 -22)

Details

diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
index 8640598..b4f9102 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
@@ -60,6 +60,7 @@ import org.thingsboard.server.service.cluster.discovery.DiscoveryService;
 import org.thingsboard.server.service.cluster.routing.ClusterRoutingService;
 import org.thingsboard.server.service.cluster.rpc.ClusterRpcService;
 import org.thingsboard.server.service.component.ComponentDiscoveryService;
+import org.thingsboard.server.service.encoding.DataDecodingEncodingService;
 import org.thingsboard.server.service.executors.DbCallbackExecutorService;
 import org.thingsboard.server.service.executors.ExternalCallExecutorService;
 import org.thingsboard.server.service.mail.MailExecutorService;
@@ -104,6 +105,10 @@ public class ActorSystemContext {
 
     @Autowired
     @Getter
+    private DataDecodingEncodingService encodingService;
+
+    @Autowired
+    @Getter
     private DeviceAuthService deviceAuthService;
 
     @Autowired
@@ -203,6 +208,10 @@ public class ActorSystemContext {
     @Getter
     private DeviceStateService deviceStateService;
 
+    @Value("${cluster.partition_id}")
+    @Getter
+    private long queuePartitionId;
+
     @Value("${actors.session.sync.timeout}")
     @Getter
     private long syncSessionTimeout;
diff --git a/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java b/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java
index 4d9b8b4..73cf788 100644
--- a/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java
@@ -34,6 +34,8 @@ import org.thingsboard.server.common.data.page.PageDataIterable;
 import org.thingsboard.server.common.msg.TbActorMsg;
 import org.thingsboard.server.common.msg.aware.DeviceAwareMsg;
 import org.thingsboard.server.common.msg.aware.TenantAwareMsg;
+import org.thingsboard.server.common.msg.cluster.SendToClusterMsg;
+import org.thingsboard.server.common.msg.cluster.ServerAddress;
 import org.thingsboard.server.common.msg.device.DeviceToDeviceActorMsg;
 import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
 import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg;
@@ -45,6 +47,7 @@ import scala.concurrent.duration.Duration;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Optional;
 
 public class AppActor extends RuleChainManagerActor {
 
@@ -89,6 +92,9 @@ public class AppActor extends RuleChainManagerActor {
     @Override
     protected boolean process(TbActorMsg msg) {
         switch (msg.getMsgType()) {
+            case SEND_TO_CLUSTER_MSG:
+                onPossibleClusterMsg((SendToClusterMsg) msg);
+                break;
             case CLUSTER_EVENT_MSG:
                 broadcast(msg);
                 break;
@@ -112,6 +118,16 @@ public class AppActor extends RuleChainManagerActor {
         return true;
     }
 
+    private void onPossibleClusterMsg(SendToClusterMsg msg) {
+        Optional<ServerAddress> address = systemContext.getRoutingService().resolveById(msg.getEntityId());
+            if (address.isPresent()) {
+                systemContext.getRpcService().tell(
+                        systemContext.getEncodingService().convertToProtoDataMessage(address.get(), msg.getMsg()));
+            } else {
+                self().tell(msg.getMsg(), ActorRef.noSender());
+            }
+        }
+
     private void onServiceToRuleEngineMsg(ServiceToRuleEngineMsg msg) {
         if (SYSTEM_TENANT.equals(msg.getTenantId())) {
             //TODO: ashvayka handle this.
diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
index f5f5848..2f5e20d 100644
--- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
@@ -23,7 +23,6 @@ import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.gson.Gson;
-import com.google.gson.JsonArray;
 import com.google.gson.JsonObject;
 import com.google.gson.JsonParser;
 import org.thingsboard.server.actors.ActorSystemContext;
@@ -47,7 +46,7 @@ import org.thingsboard.server.common.msg.core.AttributesUpdateRequest;
 import org.thingsboard.server.common.msg.core.BasicCommandAckResponse;
 import org.thingsboard.server.common.msg.core.BasicGetAttributesResponse;
 import org.thingsboard.server.common.msg.core.BasicStatusCodeResponse;
-import org.thingsboard.server.common.msg.core.BasicToDeviceSessionActorMsg;
+import org.thingsboard.server.common.msg.core.BasicActorSystemToDeviceSessionActorMsg;
 import org.thingsboard.server.common.msg.core.GetAttributesRequest;
 import org.thingsboard.server.common.msg.core.RuleEngineError;
 import org.thingsboard.server.common.msg.core.RuleEngineErrorMsg;
@@ -57,13 +56,12 @@ import org.thingsboard.server.common.msg.core.SessionOpenMsg;
 import org.thingsboard.server.common.msg.core.TelemetryUploadRequest;
 import org.thingsboard.server.common.msg.core.ToDeviceRpcRequestMsg;
 import org.thingsboard.server.common.msg.core.ToDeviceRpcResponseMsg;
-import org.thingsboard.server.common.msg.core.ToDeviceSessionActorMsg;
+import org.thingsboard.server.common.msg.core.ActorSystemToDeviceSessionActorMsg;
 import org.thingsboard.server.common.msg.core.ToServerRpcRequestMsg;
 import org.thingsboard.server.common.msg.device.DeviceToDeviceActorMsg;
 import org.thingsboard.server.common.msg.kv.BasicAttributeKVMsg;
 import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest;
 import org.thingsboard.server.common.msg.session.FromDeviceMsg;
-import org.thingsboard.server.common.msg.session.FromDeviceRequestMsg;
 import org.thingsboard.server.common.msg.session.SessionMsgType;
 import org.thingsboard.server.common.msg.session.SessionType;
 import org.thingsboard.server.common.msg.session.ToDeviceMsg;
@@ -155,7 +153,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
         boolean sent = rpcSubscriptions.size() > 0;
         Set<SessionId> syncSessionSet = new HashSet<>();
         rpcSubscriptions.entrySet().forEach(sub -> {
-            ToDeviceSessionActorMsg response = new BasicToDeviceSessionActorMsg(rpcRequest, sub.getKey());
+            ActorSystemToDeviceSessionActorMsg response = new BasicActorSystemToDeviceSessionActorMsg(rpcRequest, sub.getKey());
             sendMsgToSessionActor(response, sub.getValue().getServer());
             if (SessionType.SYNC == sub.getValue().getType()) {
                 syncSessionSet.add(sub.getKey());
@@ -197,7 +195,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
         if (data != null) {
             logger.debug("[{}] Queue put [{}] timeout detected!", deviceId, msg.getId());
             ToDeviceMsg toDeviceMsg = new RuleEngineErrorMsg(data.getSessionMsgType(), RuleEngineError.QUEUE_PUT_TIMEOUT);
-            sendMsgToSessionActor(new BasicToDeviceSessionActorMsg(toDeviceMsg, data.getSessionId()), data.getServerAddress());
+            sendMsgToSessionActor(new BasicActorSystemToDeviceSessionActorMsg(toDeviceMsg, data.getSessionId()), data.getServerAddress());
         }
     }
 
@@ -209,7 +207,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
             logger.debug("[{}] Queue put [{}] ack detected. Remaining acks: {}!", deviceId, msg.getId(), remainingAcks);
             if (remainingAcks == 0) {
                 ToDeviceMsg toDeviceMsg = BasicStatusCodeResponse.onSuccess(data.getSessionMsgType(), data.getRequestId());
-                sendMsgToSessionActor(new BasicToDeviceSessionActorMsg(toDeviceMsg, data.getSessionId()), data.getServerAddress());
+                sendMsgToSessionActor(new BasicActorSystemToDeviceSessionActorMsg(toDeviceMsg, data.getSessionId()), data.getServerAddress());
             }
         }
     }
@@ -247,7 +245,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
                     body.getMethod(),
                     body.getParams()
             );
-            ToDeviceSessionActorMsg response = new BasicToDeviceSessionActorMsg(rpcRequest, sessionId);
+            ActorSystemToDeviceSessionActorMsg response = new BasicActorSystemToDeviceSessionActorMsg(rpcRequest, sessionId);
             sendMsgToSessionActor(response, server);
         };
     }
@@ -301,14 +299,14 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
             public void onSuccess(@Nullable List<List<AttributeKvEntry>> result) {
                 BasicGetAttributesResponse response = BasicGetAttributesResponse.onSuccess(request.getMsgType(),
                         request.getRequestId(), BasicAttributeKVMsg.from(result.get(0), result.get(1)));
-                sendMsgToSessionActor(new BasicToDeviceSessionActorMsg(response, src.getSessionId()), src.getServerAddress());
+                sendMsgToSessionActor(new BasicActorSystemToDeviceSessionActorMsg(response, src.getSessionId()), src.getServerAddress());
             }
 
             @Override
             public void onFailure(Throwable t) {
                 if (t instanceof Exception) {
                     ToDeviceMsg toDeviceMsg = BasicStatusCodeResponse.onError(SessionMsgType.GET_ATTRIBUTES_REQUEST, request.getRequestId(), (Exception) t);
-                    sendMsgToSessionActor(new BasicToDeviceSessionActorMsg(toDeviceMsg, src.getSessionId()), src.getServerAddress());
+                    sendMsgToSessionActor(new BasicActorSystemToDeviceSessionActorMsg(toDeviceMsg, src.getSessionId()), src.getServerAddress());
                 } else {
                     logger.error("[{}] Failed to process attributes request", deviceId, t);
                 }
@@ -390,14 +388,14 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
         if (data != null) {
             logger.debug("[{}] Client side RPC request [{}] timeout detected!", deviceId, msg.getId());
             ToDeviceMsg toDeviceMsg = new RuleEngineErrorMsg(SessionMsgType.TO_SERVER_RPC_REQUEST, RuleEngineError.TIMEOUT);
-            sendMsgToSessionActor(new BasicToDeviceSessionActorMsg(toDeviceMsg, data.getSessionId()), data.getServer());
+            sendMsgToSessionActor(new BasicActorSystemToDeviceSessionActorMsg(toDeviceMsg, data.getSessionId()), data.getServer());
         }
     }
 
     void processToServerRPCResponse(ActorContext context, ToServerRpcResponseActorMsg msg) {
         ToServerRpcRequestMetadata data = toServerRpcPendingMap.remove(msg.getMsg().getRequestId());
         if (data != null) {
-            sendMsgToSessionActor(new BasicToDeviceSessionActorMsg(msg.getMsg(), data.getSessionId()), data.getServer());
+            sendMsgToSessionActor(new BasicActorSystemToDeviceSessionActorMsg(msg.getMsg(), data.getSessionId()), data.getServer());
         }
     }
 
@@ -408,7 +406,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
             pendingMsgs.put(tbMsg.getId(), pendingMsgData);
             scheduleMsgWithDelay(context, new DeviceActorQueueTimeoutMsg(tbMsg.getId(), systemContext.getQueuePersistenceTimeout()), systemContext.getQueuePersistenceTimeout());
         } else {
-            ToDeviceSessionActorMsg response = new BasicToDeviceSessionActorMsg(BasicStatusCodeResponse.onSuccess(sessionMsgType, requestId), pendingMsgData.getSessionId());
+            ActorSystemToDeviceSessionActorMsg response = new BasicActorSystemToDeviceSessionActorMsg(BasicStatusCodeResponse.onSuccess(sessionMsgType, requestId), pendingMsgData.getSessionId());
             sendMsgToSessionActor(response, pendingMsgData.getServerAddress());
         }
         context.parent().tell(new DeviceActorToRuleEngineMsg(context.self(), tbMsg), context.self());
@@ -435,7 +433,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
             if (notification != null) {
                 ToDeviceMsg finalNotification = notification;
                 attributeSubscriptions.entrySet().forEach(sub -> {
-                    ToDeviceSessionActorMsg response = new BasicToDeviceSessionActorMsg(finalNotification, sub.getKey());
+                    ActorSystemToDeviceSessionActorMsg response = new BasicActorSystemToDeviceSessionActorMsg(finalNotification, sub.getKey());
                     sendMsgToSessionActor(response, sub.getValue().getServer());
                 });
             }
@@ -461,7 +459,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
                 BasicCommandAckResponse response = success
                         ? BasicCommandAckResponse.onSuccess(SessionMsgType.TO_DEVICE_RPC_REQUEST, responseMsg.getRequestId())
                         : BasicCommandAckResponse.onError(SessionMsgType.TO_DEVICE_RPC_REQUEST, responseMsg.getRequestId(), new TimeoutException());
-                sendMsgToSessionActor(new BasicToDeviceSessionActorMsg(response, msg.getSessionId()), msg.getServerAddress());
+                sendMsgToSessionActor(new BasicActorSystemToDeviceSessionActorMsg(response, msg.getSessionId()), msg.getServerAddress());
             }
         }
     }
@@ -516,11 +514,12 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
         }
     }
 
-    private void sendMsgToSessionActor(ToDeviceSessionActorMsg response, Optional<ServerAddress> sessionAddress) {
+    private void sendMsgToSessionActor(ActorSystemToDeviceSessionActorMsg response, Optional<ServerAddress> sessionAddress) {
         if (sessionAddress.isPresent()) {
             ServerAddress address = sessionAddress.get();
             logger.debug("{} Forwarding msg: {}", address, response);
-            systemContext.getRpcService().tell(sessionAddress.get(), response);
+            systemContext.getRpcService().tell(systemContext.getEncodingService()
+                    .convertToProtoDataMessage(sessionAddress.get(), response));
         } else {
             systemContext.getSessionManagerActor().tell(response, ActorRef.noSender());
         }
@@ -528,7 +527,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
 
     void processCredentialsUpdate() {
         sessions.forEach((k, v) -> {
-            sendMsgToSessionActor(new BasicToDeviceSessionActorMsg(new SessionCloseNotification(), k), v.getServer());
+            sendMsgToSessionActor(new BasicActorSystemToDeviceSessionActorMsg(new SessionCloseNotification(), k), v.getServer());
         });
         attributeSubscriptions.clear();
         rpcSubscriptions.clear();
diff --git a/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java b/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java
index 5e16aab..f6b30bf 100644
--- a/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java
+++ b/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java
@@ -82,7 +82,8 @@ public final class PluginProcessingContext implements PluginContext {
 
     @Override
     public void sendPluginRpcMsg(RpcMsg msg) {
-        this.pluginCtx.rpcService.tell(new PluginRpcMsg(pluginCtx.tenantId, pluginCtx.pluginId, msg));
+        //ToDO is this a cluster messsage?
+//        this.pluginCtx.rpcService.tell(new PluginRpcMsg(pluginCtx.tenantId, pluginCtx.pluginId, msg));
     }
 
     @Override
diff --git a/application/src/main/java/org/thingsboard/server/actors/plugin/SharedPluginProcessingContext.java b/application/src/main/java/org/thingsboard/server/actors/plugin/SharedPluginProcessingContext.java
index dcabedc..c43d62d 100644
--- a/application/src/main/java/org/thingsboard/server/actors/plugin/SharedPluginProcessingContext.java
+++ b/application/src/main/java/org/thingsboard/server/actors/plugin/SharedPluginProcessingContext.java
@@ -21,6 +21,7 @@ import org.thingsboard.server.actors.ActorSystemContext;
 import org.thingsboard.server.common.data.id.DeviceId;
 import org.thingsboard.server.common.data.id.PluginId;
 import org.thingsboard.server.common.data.id.TenantId;
+import org.thingsboard.server.common.msg.TbActorMsg;
 import org.thingsboard.server.common.msg.cluster.ServerAddress;
 import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest;
 import org.thingsboard.server.common.msg.timeout.TimeoutMsg;
@@ -100,7 +101,7 @@ public final class SharedPluginProcessingContext {
     }
 
     public void toDeviceActor(DeviceAttributesEventNotificationMsg msg) {
-        forward(msg.getDeviceId(), msg, rpcService::tell);
+        forward(msg.getDeviceId(), msg);
     }
 
     public void sendRpcRequest(ToDeviceRpcRequest msg) {
@@ -109,11 +110,11 @@ public final class SharedPluginProcessingContext {
 //        forward(msg.getDeviceId(), rpcMsg, rpcService::tell);
     }
 
-    private <T> void forward(DeviceId deviceId, T msg, BiConsumer<ServerAddress, T> rpcFunction) {
+    private <T extends TbActorMsg> void forward(DeviceId deviceId, T msg) {
         Optional<ServerAddress> instance = routingService.resolveById(deviceId);
         if (instance.isPresent()) {
             log.trace("[{}] Forwarding msg {} to remote device actor!", pluginId, msg);
-            rpcFunction.accept(instance.get(), msg);
+            rpcService.tell(systemContext.getEncodingService().convertToProtoDataMessage(instance.get(), msg));
         } else {
             log.trace("[{}] Forwarding msg {} to local device actor!", pluginId, msg);
             parentActor.tell(msg, ActorRef.noSender());
diff --git a/application/src/main/java/org/thingsboard/server/actors/plugin/ValidationCallback.java b/application/src/main/java/org/thingsboard/server/actors/plugin/ValidationCallback.java
index f856ed6..6e47e35 100644
--- a/application/src/main/java/org/thingsboard/server/actors/plugin/ValidationCallback.java
+++ b/application/src/main/java/org/thingsboard/server/actors/plugin/ValidationCallback.java
@@ -15,7 +15,7 @@
  */
 package org.thingsboard.server.actors.plugin;
 
-import com.hazelcast.util.function.Consumer;
+import java.util.function.Consumer;
 import org.thingsboard.server.extensions.api.exception.AccessDeniedException;
 import org.thingsboard.server.extensions.api.exception.EntityNotFoundException;
 import org.thingsboard.server.extensions.api.exception.InternalErrorException;
diff --git a/application/src/main/java/org/thingsboard/server/actors/rpc/BasicRpcSessionListener.java b/application/src/main/java/org/thingsboard/server/actors/rpc/BasicRpcSessionListener.java
index bc36dc8..14bb636 100644
--- a/application/src/main/java/org/thingsboard/server/actors/rpc/BasicRpcSessionListener.java
+++ b/application/src/main/java/org/thingsboard/server/actors/rpc/BasicRpcSessionListener.java
@@ -17,30 +17,10 @@ package org.thingsboard.server.actors.rpc;
 
 import akka.actor.ActorRef;
 import lombok.extern.slf4j.Slf4j;
-import org.springframework.util.SerializationUtils;
-import org.springframework.util.StringUtils;
-import org.thingsboard.server.actors.ActorSystemContext;
 import org.thingsboard.server.actors.service.ActorService;
-import org.thingsboard.server.common.data.id.DeviceId;
-import org.thingsboard.server.common.data.id.PluginId;
-import org.thingsboard.server.common.data.id.TenantId;
-import org.thingsboard.server.common.data.rpc.ToDeviceRpcRequestBody;
-import org.thingsboard.server.common.msg.cluster.ServerAddress;
-import org.thingsboard.server.common.msg.cluster.ToAllNodesMsg;
-import org.thingsboard.server.common.msg.core.ToDeviceSessionActorMsg;
-import org.thingsboard.server.common.msg.device.DeviceToDeviceActorMsg;
-import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest;
-import org.thingsboard.server.extensions.api.device.ToDeviceActorNotificationMsg;
-import org.thingsboard.server.extensions.api.plugins.msg.*;
-import org.thingsboard.server.extensions.api.plugins.rpc.PluginRpcMsg;
-import org.thingsboard.server.extensions.api.plugins.rpc.RpcMsg;
 import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
 import org.thingsboard.server.service.cluster.rpc.GrpcSession;
 import org.thingsboard.server.service.cluster.rpc.GrpcSessionListener;
-import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg;
-
-import java.io.Serializable;
-import java.util.UUID;
 
 /**
  * @author Andrew Shvayka
@@ -48,15 +28,12 @@ import java.util.UUID;
 @Slf4j
 public class BasicRpcSessionListener implements GrpcSessionListener {
 
-    public static final String SESSION_RECEIVED_SESSION_ACTOR_MSG = "{} session [{}] received session actor msg {}";
-    private final ActorSystemContext context;
     private final ActorService service;
     private final ActorRef manager;
     private final ActorRef self;
 
-    public BasicRpcSessionListener(ActorSystemContext context, ActorRef manager, ActorRef self) {
-        this.context = context;
-        this.service = context.getActorService();
+    public BasicRpcSessionListener(ActorService service, ActorRef manager, ActorRef self) {
+        this.service = service;
         this.manager = manager;
         this.self = self;
     }
@@ -76,47 +53,11 @@ public class BasicRpcSessionListener implements GrpcSessionListener {
     }
 
     @Override
-    public void onToPluginRpcMsg(GrpcSession session, ClusterAPIProtos.ToPluginRpcMessage msg) {
-        if (log.isTraceEnabled()) {
-            log.trace("{} session [{}] received plugin msg {}", getType(session), session.getRemoteServer(), msg);
-        }
-        service.onMsg(convert(session.getRemoteServer(), msg));
-    }
-
-    @Override
-    public void onToDeviceActorRpcMsg(GrpcSession session, ClusterAPIProtos.ToDeviceActorRpcMessage msg) {
-        log.trace("{} session [{}] received device actor msg {}", getType(session), session.getRemoteServer(), msg);
-        service.onMsg((DeviceToDeviceActorMsg) deserialize(msg.getData().toByteArray()));
-    }
-
-    @Override
-    public void onToDeviceActorNotificationRpcMsg(GrpcSession session, ClusterAPIProtos.ToDeviceActorNotificationRpcMessage msg) {
-        log.trace("{} session [{}] received device actor notification msg {}", getType(session), session.getRemoteServer(), msg);
-        service.onMsg((ToDeviceActorNotificationMsg) deserialize(msg.getData().toByteArray()));
-    }
-
-    @Override
-    public void onToDeviceSessionActorRpcMsg(GrpcSession session, ClusterAPIProtos.ToDeviceSessionActorRpcMessage msg) {
-        log.trace(SESSION_RECEIVED_SESSION_ACTOR_MSG, getType(session), session.getRemoteServer(), msg);
-        service.onMsg((ToDeviceSessionActorMsg) deserialize(msg.getData().toByteArray()));
-    }
-
-    @Override
-    public void onToDeviceRpcRequestRpcMsg(GrpcSession session, ClusterAPIProtos.ToDeviceRpcRequestRpcMessage msg) {
-        log.trace(SESSION_RECEIVED_SESSION_ACTOR_MSG, getType(session), session.getRemoteServer(), msg);
-        service.onMsg(deserialize(session.getRemoteServer(), msg));
-    }
-
-    @Override
-    public void onFromDeviceRpcResponseRpcMsg(GrpcSession session, ClusterAPIProtos.ToPluginRpcResponseRpcMessage msg) {
-        log.trace(SESSION_RECEIVED_SESSION_ACTOR_MSG, getType(session), session.getRemoteServer(), msg);
-        service.onMsg(deserialize(session.getRemoteServer(), msg));
-    }
-
-    @Override
-    public void onToAllNodesRpcMessage(GrpcSession session, ClusterAPIProtos.ToAllNodesRpcMessage msg) {
-        log.trace(SESSION_RECEIVED_SESSION_ACTOR_MSG, getType(session), session.getRemoteServer(), msg);
-        service.onMsg((ToAllNodesMsg) deserialize(msg.getData().toByteArray()));
+    public void onReceiveClusterGrpcMsg(GrpcSession session, ClusterAPIProtos.ClusterMessage clusterMessage) {
+        log.trace("{} Service [{}] received session actor msg {}", getType(session),
+                session.getRemoteServer(),
+                clusterMessage);
+        service.onReceivedMsg(session.getRemoteServer(), clusterMessage);
     }
 
     @Override
@@ -130,37 +71,5 @@ public class BasicRpcSessionListener implements GrpcSessionListener {
         return session.isClient() ? "Client" : "Server";
     }
 
-    private static PluginRpcMsg convert(ServerAddress serverAddress, ClusterAPIProtos.ToPluginRpcMessage msg) {
-        ClusterAPIProtos.PluginAddress address = msg.getAddress();
-        TenantId tenantId = new TenantId(toUUID(address.getTenantId()));
-        PluginId pluginId = new PluginId(toUUID(address.getPluginId()));
-        RpcMsg rpcMsg = new RpcMsg(serverAddress, msg.getClazz(), msg.getData().toByteArray());
-        return new PluginRpcMsg(tenantId, pluginId, rpcMsg);
-    }
-
-    private static UUID toUUID(ClusterAPIProtos.Uid uid) {
-        return new UUID(uid.getPluginUuidMsb(), uid.getPluginUuidLsb());
-    }
-
-    private static ToDeviceRpcRequestActorMsg deserialize(ServerAddress serverAddress, ClusterAPIProtos.ToDeviceRpcRequestRpcMessage msg) {
-        TenantId deviceTenantId = new TenantId(toUUID(msg.getDeviceTenantId()));
-        DeviceId deviceId = new DeviceId(toUUID(msg.getDeviceId()));
-
-        ToDeviceRpcRequestBody requestBody = new ToDeviceRpcRequestBody(msg.getMethod(), msg.getParams());
-        ToDeviceRpcRequest request = new ToDeviceRpcRequest(toUUID(msg.getMsgId()), deviceTenantId, deviceId, msg.getOneway(), msg.getExpTime(), requestBody);
-
-        return new ToDeviceRpcRequestActorMsg(serverAddress, request);
-    }
-
-    private static ToPluginRpcResponseDeviceMsg deserialize(ServerAddress serverAddress, ClusterAPIProtos.ToPluginRpcResponseRpcMessage msg) {
-        RpcError error = !StringUtils.isEmpty(msg.getError()) ? RpcError.valueOf(msg.getError()) : null;
-        FromDeviceRpcResponse response = new FromDeviceRpcResponse(toUUID(msg.getMsgId()), msg.getResponse(), error);
-        return new ToPluginRpcResponseDeviceMsg(null, null, response);
-    }
-
-    @SuppressWarnings("unchecked")
-    private static <T extends Serializable> T deserialize(byte[] data) {
-        return (T) SerializationUtils.deserialize(data);
-    }
 
 }
diff --git a/application/src/main/java/org/thingsboard/server/actors/rpc/RpcBroadcastMsg.java b/application/src/main/java/org/thingsboard/server/actors/rpc/RpcBroadcastMsg.java
index 3718a22..2dd949e 100644
--- a/application/src/main/java/org/thingsboard/server/actors/rpc/RpcBroadcastMsg.java
+++ b/application/src/main/java/org/thingsboard/server/actors/rpc/RpcBroadcastMsg.java
@@ -23,5 +23,5 @@ import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
  */
 @Data
 public final class RpcBroadcastMsg {
-    private final ClusterAPIProtos.ToRpcServerMessage msg;
+    private final ClusterAPIProtos.ClusterMessage msg;
 }
diff --git a/application/src/main/java/org/thingsboard/server/actors/rpc/RpcManagerActor.java b/application/src/main/java/org/thingsboard/server/actors/rpc/RpcManagerActor.java
index ba20013..c5c6553 100644
--- a/application/src/main/java/org/thingsboard/server/actors/rpc/RpcManagerActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/rpc/RpcManagerActor.java
@@ -40,7 +40,7 @@ public class RpcManagerActor extends ContextAwareActor {
 
     private final Map<ServerAddress, SessionActorInfo> sessionActors;
 
-    private final Map<ServerAddress, Queue<ClusterAPIProtos.ToRpcServerMessage>> pendingMsgs;
+    private final Map<ServerAddress, Queue<ClusterAPIProtos.ClusterMessage>> pendingMsgs;
 
     private final ServerAddress instance;
 
@@ -65,8 +65,8 @@ public class RpcManagerActor extends ContextAwareActor {
 
     @Override
     public void onReceive(Object msg) throws Exception {
-        if (msg instanceof RpcSessionTellMsg) {
-            onMsg((RpcSessionTellMsg) msg);
+        if (msg instanceof ClusterAPIProtos.ClusterMessage) {
+            onMsg((ClusterAPIProtos.ClusterMessage) msg);
         } else if (msg instanceof RpcBroadcastMsg) {
             onMsg((RpcBroadcastMsg) msg);
         } else if (msg instanceof RpcSessionCreateRequestMsg) {
@@ -84,27 +84,32 @@ public class RpcManagerActor extends ContextAwareActor {
 
     private void onMsg(RpcBroadcastMsg msg) {
         log.debug("Forwarding msg to session actors {}", msg);
-        sessionActors.keySet().forEach(address -> onMsg(new RpcSessionTellMsg(address, msg.getMsg())));
+        sessionActors.keySet().forEach(address -> onMsg(msg.getMsg()));
         pendingMsgs.values().forEach(queue -> queue.add(msg.getMsg()));
     }
 
-    private void onMsg(RpcSessionTellMsg msg) {
-        ServerAddress address = msg.getServerAddress();
-        SessionActorInfo session = sessionActors.get(address);
-        if (session != null) {
-            log.debug("{} Forwarding msg to session actor", address);
-            session.actor.tell(msg, ActorRef.noSender());
-        } else {
-            log.debug("{} Storing msg to pending queue", address);
-            Queue<ClusterAPIProtos.ToRpcServerMessage> queue = pendingMsgs.get(address);
-            if (queue == null) {
-                queue = new LinkedList<>();
-                pendingMsgs.put(address, queue);
+    private void onMsg(ClusterAPIProtos.ClusterMessage msg) {
+        if (msg.hasServerAddress()) {
+            ServerAddress address = new ServerAddress(msg.getServerAddress().getHost(),
+                    msg.getServerAddress().getPort());
+            SessionActorInfo session = sessionActors.get(address);
+            if (session != null) {
+                log.debug("{} Forwarding msg to session actor", address);
+                session.getActor().tell(msg, ActorRef.noSender());
+            } else {
+                log.debug("{} Storing msg to pending queue", address);
+                Queue<ClusterAPIProtos.ClusterMessage> queue = pendingMsgs.get(address);
+                if (queue == null) {
+                    queue = new LinkedList<>();
+                    pendingMsgs.put(new ServerAddress(
+                            msg.getServerAddress().getHost(), msg.getServerAddress().getPort()), queue);
+                }
+                queue.add(msg);
             }
-            queue.add(msg.getMsg());
+        } else {
+            logger.warning("Cluster msg doesn't have set Server Address [{}]", msg);
         }
     }
-
     @Override
     public void postStop() {
         sessionActors.clear();
@@ -167,10 +172,10 @@ public class RpcManagerActor extends ContextAwareActor {
     private void register(ServerAddress remoteAddress, UUID uuid, ActorRef sender) {
         sessionActors.put(remoteAddress, new SessionActorInfo(uuid, sender));
         log.debug("[{}][{}] Registering session actor.", remoteAddress, uuid);
-        Queue<ClusterAPIProtos.ToRpcServerMessage> data = pendingMsgs.remove(remoteAddress);
+        Queue<ClusterAPIProtos.ClusterMessage> data = pendingMsgs.remove(remoteAddress);
         if (data != null) {
             log.debug("[{}][{}] Forwarding {} pending messages.", remoteAddress, uuid, data.size());
-            data.forEach(msg -> sender.tell(new RpcSessionTellMsg(remoteAddress, msg), ActorRef.noSender()));
+            data.forEach(msg -> sender.tell(new RpcSessionTellMsg(msg), ActorRef.noSender()));
         } else {
             log.debug("[{}][{}] No pending messages to forward.", remoteAddress, uuid);
         }
diff --git a/application/src/main/java/org/thingsboard/server/actors/rpc/RpcSessionActor.java b/application/src/main/java/org/thingsboard/server/actors/rpc/RpcSessionActor.java
index a187444..c9cf869 100644
--- a/application/src/main/java/org/thingsboard/server/actors/rpc/RpcSessionActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/rpc/RpcSessionActor.java
@@ -32,6 +32,8 @@ import org.thingsboard.server.service.cluster.rpc.GrpcSessionListener;
 
 import java.util.UUID;
 
+import static org.thingsboard.server.gen.cluster.ClusterAPIProtos.MessageType.CONNECT_RPC_MESSAGE;
+
 /**
  * @author Andrew Shvayka
  */
@@ -56,15 +58,15 @@ public class RpcSessionActor extends ContextAwareActor {
 
     @Override
     public void onReceive(Object msg) throws Exception {
-        if (msg instanceof RpcSessionTellMsg) {
-            tell((RpcSessionTellMsg) msg);
+        if (msg instanceof ClusterAPIProtos.ClusterMessage) {
+            tell((ClusterAPIProtos.ClusterMessage) msg);
         } else if (msg instanceof RpcSessionCreateRequestMsg) {
             initSession((RpcSessionCreateRequestMsg) msg);
         }
     }
 
-    private void tell(RpcSessionTellMsg msg) {
-        session.sendMsg(msg.getMsg());
+    private void tell(ClusterAPIProtos.ClusterMessage msg) {
+        session.sendMsg(msg);
     }
 
     @Override
@@ -76,7 +78,7 @@ public class RpcSessionActor extends ContextAwareActor {
     private void initSession(RpcSessionCreateRequestMsg msg) {
         log.info("[{}] Initializing session", context().self());
         ServerAddress remoteServer = msg.getRemoteAddress();
-        listener = new BasicRpcSessionListener(systemContext, context().parent(), context().self());
+        listener = new BasicRpcSessionListener(systemContext.getActorService(), context().parent(), context().self());
         if (msg.getRemoteAddress() == null) {
             // Server session
             session = new GrpcSession(listener);
@@ -91,7 +93,7 @@ public class RpcSessionActor extends ContextAwareActor {
             session.initInputStream();
 
             ClusterRpcServiceGrpc.ClusterRpcServiceStub stub = ClusterRpcServiceGrpc.newStub(channel);
-            StreamObserver<ClusterAPIProtos.ToRpcServerMessage> outputStream = stub.handlePluginMsgs(session.getInputStream());
+            StreamObserver<ClusterAPIProtos.ClusterMessage> outputStream = stub.handleMsgs(session.getInputStream());
 
             session.setOutputStream(outputStream);
             session.initOutputStream();
@@ -115,11 +117,10 @@ public class RpcSessionActor extends ContextAwareActor {
         }
     }
 
-    private ClusterAPIProtos.ToRpcServerMessage toConnectMsg() {
+    private ClusterAPIProtos.ClusterMessage toConnectMsg() {
         ServerAddress instance = systemContext.getDiscoveryService().getCurrentServer().getServerAddress();
-        return ClusterAPIProtos.ToRpcServerMessage.newBuilder().setConnectMsg(
-                ClusterAPIProtos.ConnectRpcMessage.newBuilder().setServerAddress(
-                        ClusterAPIProtos.ServerAddress.newBuilder().setHost(instance.getHost()).setPort(instance.getPort()).build()).build()).build();
-
+        return ClusterAPIProtos.ClusterMessage.newBuilder().setMessageType(CONNECT_RPC_MESSAGE).setServerAddress(
+                ClusterAPIProtos.ServerAddress.newBuilder().setHost(instance.getHost())
+                        .setPort(instance.getPort()).build()).build();
     }
 }
diff --git a/application/src/main/java/org/thingsboard/server/actors/rpc/RpcSessionCreateRequestMsg.java b/application/src/main/java/org/thingsboard/server/actors/rpc/RpcSessionCreateRequestMsg.java
index 5bcf1d6..0c1136e 100644
--- a/application/src/main/java/org/thingsboard/server/actors/rpc/RpcSessionCreateRequestMsg.java
+++ b/application/src/main/java/org/thingsboard/server/actors/rpc/RpcSessionCreateRequestMsg.java
@@ -30,6 +30,6 @@ public final class RpcSessionCreateRequestMsg {
 
     private final UUID msgUid;
     private final ServerAddress remoteAddress;
-    private final StreamObserver<ClusterAPIProtos.ToRpcServerMessage> responseObserver;
+    private final StreamObserver<ClusterAPIProtos.ClusterMessage> responseObserver;
 
 }
diff --git a/application/src/main/java/org/thingsboard/server/actors/rpc/RpcSessionTellMsg.java b/application/src/main/java/org/thingsboard/server/actors/rpc/RpcSessionTellMsg.java
index 5a61044..858e3aa 100644
--- a/application/src/main/java/org/thingsboard/server/actors/rpc/RpcSessionTellMsg.java
+++ b/application/src/main/java/org/thingsboard/server/actors/rpc/RpcSessionTellMsg.java
@@ -24,6 +24,5 @@ import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
  */
 @Data
 public final class RpcSessionTellMsg {
-    private final ServerAddress serverAddress;
-    private final ClusterAPIProtos.ToRpcServerMessage msg;
+    private final ClusterAPIProtos.ClusterMessage msg;
 }
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
index 0508123..85d1b54 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
@@ -43,6 +43,7 @@ import org.thingsboard.server.dao.user.UserService;
 import org.thingsboard.server.service.script.NashornJsEngine;
 import scala.concurrent.duration.Duration;
 
+import java.util.Collections;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
@@ -63,15 +64,24 @@ class DefaultTbContext implements TbContext {
 
     @Override
     public void tellNext(TbMsg msg, String relationType) {
-        tellNext(msg, relationType, null);
+        tellNext(msg, Collections.singleton(relationType), null);
+    }
+
+    @Override
+    public void tellNext(TbMsg msg, Set<String> relationTypes) {
+        tellNext(msg, relationTypes, null);
     }
 
     @Override
     public void tellNext(TbMsg msg, String relationType, Throwable th) {
+        tellNext(msg, Collections.singleton(relationType), th);
+    }
+
+    private void tellNext(TbMsg msg, Set<String> relationTypes, Throwable th) {
         if (nodeCtx.getSelf().isDebugMode()) {
-            mainCtx.persistDebugOutput(nodeCtx.getTenantId(), nodeCtx.getSelf().getId(), msg, relationType, th);
+            relationTypes.forEach(relationType -> mainCtx.persistDebugOutput(nodeCtx.getTenantId(), nodeCtx.getSelf().getId(), msg, relationType, th));
         }
-        nodeCtx.getChainActor().tell(new RuleNodeToRuleChainTellNextMsg(nodeCtx.getSelf().getId(), relationType, msg), nodeCtx.getSelfActor());
+        nodeCtx.getChainActor().tell(new RuleNodeToRuleChainTellNextMsg(nodeCtx.getSelf().getId(), relationTypes, msg), nodeCtx.getSelfActor());
     }
 
     @Override
@@ -99,12 +109,12 @@ class DefaultTbContext implements TbContext {
 
     @Override
     public TbMsg newMsg(String type, EntityId originator, TbMsgMetaData metaData, String data) {
-        return new TbMsg(UUIDs.timeBased(), type, originator, metaData.copy(), data, nodeCtx.getSelf().getRuleChainId(), nodeCtx.getSelf().getId(), 0L);
+        return new TbMsg(UUIDs.timeBased(), type, originator, metaData.copy(), data, nodeCtx.getSelf().getRuleChainId(), nodeCtx.getSelf().getId(), mainCtx.getQueuePartitionId());
     }
 
     @Override
     public TbMsg transformMsg(TbMsg origMsg, String type, EntityId originator, TbMsgMetaData metaData, String data) {
-        return new TbMsg(origMsg.getId(), type, originator, metaData.copy(), data, origMsg.getRuleChainId(), origMsg.getRuleNodeId(), 0L);
+        return new TbMsg(origMsg.getId(), type, originator, metaData.copy(), data, origMsg.getRuleChainId(), origMsg.getRuleNodeId(), mainCtx.getQueuePartitionId());
     }
 
     @Override
@@ -118,11 +128,6 @@ class DefaultTbContext implements TbContext {
     }
 
     @Override
-    public void tellNext(TbMsg msg, Set<String> relationTypes) {
-        relationTypes.forEach(type -> tellNext(msg, type));
-    }
-
-    @Override
     public ListeningExecutor getJsExecutor() {
         return mainCtx.getJsExecutor();
     }
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActor.java
index 4812002..3ba646a 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActor.java
@@ -54,6 +54,8 @@ public class RuleChainActor extends ComponentActor<RuleChainId, RuleChainActorMe
             case RULE_CHAIN_TO_RULE_CHAIN_MSG:
                 processor.onRuleChainToRuleChainMsg((RuleChainToRuleChainMsg) msg);
                 break;
+            case CLUSTER_EVENT_MSG:
+                break;
             default:
                 return false;
         }
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
index dda12e5..ac902a7 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
@@ -95,12 +95,12 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
 
     private void reprocess(List<RuleNode> ruleNodeList) {
         for (RuleNode ruleNode : ruleNodeList) {
-            for (TbMsg tbMsg : queue.findUnprocessed(tenantId, ruleNode.getId().getId(), 0L)) {
+            for (TbMsg tbMsg : queue.findUnprocessed(tenantId, ruleNode.getId().getId(), systemContext.getQueuePartitionId())) {
                 pushMsgToNode(nodeActors.get(ruleNode.getId()), tbMsg, "");
             }
         }
         if (firstNode != null) {
-            for (TbMsg tbMsg : queue.findUnprocessed(tenantId, entityId.getId(), 0L)) {
+            for (TbMsg tbMsg : queue.findUnprocessed(tenantId, entityId.getId(), systemContext.getQueuePartitionId())) {
                 pushMsgToNode(firstNode, tbMsg, "");
             }
         }
@@ -206,9 +206,8 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
     void onTellNext(RuleNodeToRuleChainTellNextMsg envelope) {
         checkActive();
         RuleNodeId originator = envelope.getOriginator();
-        String targetRelationType = envelope.getRelationType();
         List<RuleNodeRelation> relations = nodeRoutes.get(originator).stream()
-                .filter(r -> targetRelationType == null || targetRelationType.equalsIgnoreCase(r.getType()))
+                .filter(r -> contains(envelope.getRelationTypes(), r.getType()))
                 .collect(Collectors.toList());
 
         TbMsg msg = envelope.getMsg();
@@ -237,6 +236,18 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
         }
     }
 
+    private boolean contains(Set<String> relationTypes, String type) {
+        if (relationTypes == null) {
+            return true;
+        }
+        for (String relationType : relationTypes) {
+            if (relationType.equalsIgnoreCase(type)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
     private void enqueueAndForwardMsgCopyToChain(TbMsg msg, EntityId target, String fromRelationType) {
         RuleChainId targetRCId = new RuleChainId(target.getId());
         TbMsg copyMsg = msg.copy(UUIDs.timeBased(), targetRCId, null, DEFAULT_CLUSTER_PARTITION);
@@ -269,6 +280,6 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
 
     private TbMsg enrichWithRuleChainId(TbMsg tbMsg) {
         // We don't put firstNodeId because it may change over time;
-        return new TbMsg(tbMsg.getId(), tbMsg.getType(), tbMsg.getOriginator(), tbMsg.getMetaData().copy(), tbMsg.getData(), entityId, null, 0L);
+        return new TbMsg(tbMsg.getId(), tbMsg.getType(), tbMsg.getOriginator(), tbMsg.getMetaData().copy(), tbMsg.getData(), entityId, null, systemContext.getQueuePartitionId());
     }
 }
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeToRuleChainTellNextMsg.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeToRuleChainTellNextMsg.java
index 054284d..c0a475c 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeToRuleChainTellNextMsg.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeToRuleChainTellNextMsg.java
@@ -21,6 +21,8 @@ import org.thingsboard.server.common.msg.MsgType;
 import org.thingsboard.server.common.msg.TbActorMsg;
 import org.thingsboard.server.common.msg.TbMsg;
 
+import java.util.Set;
+
 /**
  * Created by ashvayka on 19.03.18.
  */
@@ -28,7 +30,7 @@ import org.thingsboard.server.common.msg.TbMsg;
 final class RuleNodeToRuleChainTellNextMsg implements TbActorMsg {
 
     private final RuleNodeId originator;
-    private final String relationType;
+    private final Set<String> relationTypes;
     private final TbMsg msg;
 
     @Override
diff --git a/application/src/main/java/org/thingsboard/server/actors/service/ActorService.java b/application/src/main/java/org/thingsboard/server/actors/service/ActorService.java
index e15eb2f..5a097df 100644
--- a/application/src/main/java/org/thingsboard/server/actors/service/ActorService.java
+++ b/application/src/main/java/org/thingsboard/server/actors/service/ActorService.java
@@ -18,6 +18,7 @@ package org.thingsboard.server.actors.service;
 import org.thingsboard.server.common.data.id.*;
 import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
 import org.thingsboard.server.common.msg.TbMsg;
+import org.thingsboard.server.common.msg.cluster.SendToClusterMsg;
 import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg;
 import org.thingsboard.server.common.transport.SessionMsgProcessor;
 import org.thingsboard.server.service.cluster.discovery.DiscoveryServiceListener;
@@ -27,10 +28,11 @@ public interface ActorService extends SessionMsgProcessor, WebSocketMsgProcessor
 
     void onEntityStateChange(TenantId tenantId, EntityId entityId, ComponentLifecycleEvent state);
 
-    void onMsg(ServiceToRuleEngineMsg msg);
+    void onMsg(SendToClusterMsg msg);
 
     void onCredentialsUpdate(TenantId tenantId, DeviceId deviceId);
 
     void onDeviceNameOrTypeUpdate(TenantId tenantId, DeviceId deviceId, String deviceName, String deviceType);
 
+    void onMsg(ServiceToRuleEngineMsg serviceToRuleEngineMsg);
 }
diff --git a/application/src/main/java/org/thingsboard/server/actors/service/ContextAwareActor.java b/application/src/main/java/org/thingsboard/server/actors/service/ContextAwareActor.java
index 1d9c671..3624127 100644
--- a/application/src/main/java/org/thingsboard/server/actors/service/ContextAwareActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/service/ContextAwareActor.java
@@ -39,8 +39,12 @@ public abstract class ContextAwareActor extends UntypedActor {
             logger.debug("Processing msg: {}", msg);
         }
         if (msg instanceof TbActorMsg) {
-            if(!process((TbActorMsg) msg)){
-                logger.warning("Unknown message: {}!", msg);
+            try {
+                if (!process((TbActorMsg) msg)) {
+                    logger.warning("Unknown message: {}!", msg);
+                }
+            } catch (Exception e) {
+                throw e;
             }
         } else {
             logger.warning("Unknown message: {}!", msg);
diff --git a/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java b/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java
index df0d122..c80e913 100644
--- a/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java
+++ b/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java
@@ -19,6 +19,7 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.actor.Props;
 import akka.actor.Terminated;
+import com.google.protobuf.ByteString;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
@@ -27,25 +28,23 @@ import org.thingsboard.server.actors.app.AppActor;
 import org.thingsboard.server.actors.rpc.RpcBroadcastMsg;
 import org.thingsboard.server.actors.rpc.RpcManagerActor;
 import org.thingsboard.server.actors.rpc.RpcSessionCreateRequestMsg;
-import org.thingsboard.server.actors.rpc.RpcSessionTellMsg;
 import org.thingsboard.server.actors.session.SessionManagerActor;
 import org.thingsboard.server.actors.stats.StatsActor;
 import org.thingsboard.server.common.data.id.*;
 import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
+import org.thingsboard.server.common.msg.TbActorMsg;
 import org.thingsboard.server.common.msg.aware.SessionAwareMsg;
 import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
+import org.thingsboard.server.common.msg.cluster.SendToClusterMsg;
 import org.thingsboard.server.common.msg.cluster.ServerAddress;
 import org.thingsboard.server.common.msg.cluster.ToAllNodesMsg;
-import org.thingsboard.server.common.msg.core.ToDeviceSessionActorMsg;
 import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg;
 import org.thingsboard.server.extensions.api.device.DeviceNameOrTypeUpdateMsg;
-import org.thingsboard.server.common.msg.device.DeviceToDeviceActorMsg;
 import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
 import org.thingsboard.server.extensions.api.device.DeviceCredentialsUpdateNotificationMsg;
-import org.thingsboard.server.extensions.api.device.ToDeviceActorNotificationMsg;
-import org.thingsboard.server.extensions.api.plugins.msg.ToPluginActorMsg;
 import org.thingsboard.server.extensions.api.plugins.rest.PluginRestMsg;
 import org.thingsboard.server.extensions.api.plugins.ws.msg.PluginWebsocketMsg;
+import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
 import org.thingsboard.server.service.cluster.discovery.DiscoveryService;
 import org.thingsboard.server.service.cluster.discovery.ServerInstance;
 import org.thingsboard.server.service.cluster.rpc.ClusterRpcService;
@@ -55,7 +54,8 @@ import scala.concurrent.duration.Duration;
 
 import javax.annotation.PostConstruct;
 import javax.annotation.PreDestroy;
-import java.util.Optional;
+
+import static org.thingsboard.server.gen.cluster.ClusterAPIProtos.MessageType.CLUSTER_ACTOR_MESSAGE;
 
 @Service
 @Slf4j
@@ -127,7 +127,7 @@ public class DefaultActorService implements ActorService {
     }
 
     @Override
-    public void onMsg(ServiceToRuleEngineMsg msg) {
+    public void onMsg(SendToClusterMsg msg) {
         appActor.tell(msg, ActorRef.noSender());
     }
 
@@ -149,53 +149,6 @@ public class DefaultActorService implements ActorService {
         appActor.tell(msg, ActorRef.noSender());
     }
 
-    @Override
-    public void onMsg(ToPluginActorMsg msg) {
-        log.trace("Processing plugin rpc msg: {}", msg);
-        appActor.tell(msg, ActorRef.noSender());
-    }
-
-    @Override
-    public void onMsg(DeviceToDeviceActorMsg msg) {
-        log.trace("Processing device rpc msg: {}", msg);
-        appActor.tell(msg, ActorRef.noSender());
-    }
-
-    @Override
-    public void onMsg(ToDeviceActorNotificationMsg msg) {
-        log.trace("Processing notification rpc msg: {}", msg);
-        appActor.tell(msg, ActorRef.noSender());
-    }
-
-    @Override
-    public void onMsg(ToDeviceSessionActorMsg msg) {
-        log.trace("Processing session rpc msg: {}", msg);
-        sessionManagerActor.tell(msg, ActorRef.noSender());
-    }
-
-    @Override
-    public void onMsg(ToAllNodesMsg msg) {
-        log.trace("Processing broadcast rpc msg: {}", msg);
-        appActor.tell(msg, ActorRef.noSender());
-    }
-
-    @Override
-    public void onMsg(RpcSessionCreateRequestMsg msg) {
-        log.trace("Processing session create msg: {}", msg);
-        rpcManagerActor.tell(msg, ActorRef.noSender());
-    }
-
-    @Override
-    public void onMsg(RpcSessionTellMsg msg) {
-        log.trace("Processing session rpc msg: {}", msg);
-        rpcManagerActor.tell(msg, ActorRef.noSender());
-    }
-
-    @Override
-    public void onMsg(RpcBroadcastMsg msg) {
-        log.trace("Processing broadcast rpc msg: {}", msg);
-        rpcManagerActor.tell(msg, ActorRef.noSender());
-    }
 
     @Override
     public void onServerAdded(ServerInstance server) {
@@ -223,28 +176,29 @@ public class DefaultActorService implements ActorService {
     @Override
     public void onCredentialsUpdate(TenantId tenantId, DeviceId deviceId) {
         DeviceCredentialsUpdateNotificationMsg msg = new DeviceCredentialsUpdateNotificationMsg(tenantId, deviceId);
-        Optional<ServerAddress> address = actorContext.getRoutingService().resolveById(deviceId);
-        if (address.isPresent()) {
-            rpcService.tell(address.get(), msg);
-        } else {
-            onMsg(msg);
-        }
+        appActor.tell(new SendToClusterMsg(deviceId, msg), ActorRef.noSender());
     }
 
     @Override
     public void onDeviceNameOrTypeUpdate(TenantId tenantId, DeviceId deviceId, String deviceName, String deviceType) {
         log.trace("[{}] Processing onDeviceNameOrTypeUpdate event, deviceName: {}, deviceType: {}", deviceId, deviceName, deviceType);
         DeviceNameOrTypeUpdateMsg msg = new DeviceNameOrTypeUpdateMsg(tenantId, deviceId, deviceName, deviceType);
-        Optional<ServerAddress> address = actorContext.getRoutingService().resolveById(deviceId);
-        if (address.isPresent()) {
-            rpcService.tell(address.get(), msg);
-        } else {
-            onMsg(msg);
-        }
+        appActor.tell(new SendToClusterMsg(deviceId, msg), ActorRef.noSender());
+    }
+
+    @Override
+    public void onMsg(ServiceToRuleEngineMsg msg) {
+        appActor.tell(msg, ActorRef.noSender());
     }
 
     public void broadcast(ToAllNodesMsg msg) {
-        rpcService.broadcast(msg);
+        actorContext.getEncodingService().encode(msg);
+        rpcService.broadcast(new RpcBroadcastMsg(ClusterAPIProtos.ClusterMessage
+                .newBuilder()
+                .setPayload(ByteString
+                        .copyFrom(actorContext.getEncodingService().encode(msg)))
+                .setMessageType(CLUSTER_ACTOR_MESSAGE)
+                .build()));
         appActor.tell(msg, ActorRef.noSender());
     }
 
@@ -253,4 +207,60 @@ public class DefaultActorService implements ActorService {
         this.sessionManagerActor.tell(msg, ActorRef.noSender());
         this.rpcManagerActor.tell(msg, ActorRef.noSender());
     }
+
+    @Override
+    public void onReceivedMsg(ServerAddress source, ClusterAPIProtos.ClusterMessage msg) {
+        ServerAddress serverAddress = new ServerAddress(source.getHost(), source.getPort());
+        log.info("Received msg [{}] from [{}]", msg.getMessageType().name(), serverAddress);
+        if(log.isDebugEnabled()){
+            log.info("MSG: ", msg);
+        }
+        switch (msg.getMessageType()) {
+            case CLUSTER_ACTOR_MESSAGE:
+                java.util.Optional<TbActorMsg> decodedMsg = actorContext.getEncodingService()
+                        .decode(msg.getPayload().toByteArray());
+                if (decodedMsg.isPresent()) {
+                    appActor.tell(decodedMsg.get(), ActorRef.noSender());
+                } else {
+                    log.error("Error during decoding cluster proto message");
+                }
+                break;
+            case TO_ALL_NODES_MSG:
+                //TODO
+                break;
+            case CLUSTER_TELEMETRY_SUBSCRIPTION_CREATE_MESSAGE:
+                actorContext.getTsSubService().onNewRemoteSubscription(serverAddress, msg.getPayload().toByteArray());
+                break;
+            case CLUSTER_TELEMETRY_SUBSCRIPTION_UPDATE_MESSAGE:
+                actorContext.getTsSubService().onRemoteSubscriptionUpdate(serverAddress, msg.getPayload().toByteArray());
+                break;
+            case CLUSTER_TELEMETRY_SUBSCRIPTION_CLOSE_MESSAGE:
+                actorContext.getTsSubService().onRemoteSubscriptionClose(serverAddress, msg.getPayload().toByteArray());
+                break;
+            case CLUSTER_TELEMETRY_SESSION_CLOSE_MESSAGE:
+                actorContext.getTsSubService().onRemoteSessionClose(serverAddress, msg.getPayload().toByteArray());
+                break;
+            case CLUSTER_TELEMETRY_ATTR_UPDATE_MESSAGE:
+                actorContext.getTsSubService().onRemoteAttributesUpdate(serverAddress, msg.getPayload().toByteArray());
+                break;
+            case CLUSTER_TELEMETRY_TS_UPDATE_MESSAGE:
+                actorContext.getTsSubService().onRemoteTsUpdate(serverAddress, msg.getPayload().toByteArray());
+                break;
+        }
+    }
+
+    @Override
+    public void onSendMsg(ClusterAPIProtos.ClusterMessage msg) {
+        rpcManagerActor.tell(msg, ActorRef.noSender());
+    }
+
+    @Override
+    public void onRpcSessionCreateRequestMsg(RpcSessionCreateRequestMsg msg) {
+        rpcManagerActor.tell(msg, ActorRef.noSender());
+    }
+
+    @Override
+    public void onBroadcastMsg(RpcBroadcastMsg msg) {
+        rpcManagerActor.tell(msg, ActorRef.noSender());
+    }
 }
diff --git a/application/src/main/java/org/thingsboard/server/actors/session/AbstractSessionActorMsgProcessor.java b/application/src/main/java/org/thingsboard/server/actors/session/AbstractSessionActorMsgProcessor.java
index bbaef5d..42e127e 100644
--- a/application/src/main/java/org/thingsboard/server/actors/session/AbstractSessionActorMsgProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/session/AbstractSessionActorMsgProcessor.java
@@ -21,6 +21,7 @@ import org.thingsboard.server.actors.shared.SessionTimeoutMsg;
 import org.thingsboard.server.common.data.id.DeviceId;
 import org.thingsboard.server.common.data.id.SessionId;
 import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
+import org.thingsboard.server.common.msg.cluster.SendToClusterMsg;
 import org.thingsboard.server.common.msg.cluster.ServerAddress;
 import org.thingsboard.server.common.msg.device.BasicDeviceToDeviceActorMsg;
 import org.thingsboard.server.common.msg.device.DeviceToDeviceActorMsg;
@@ -44,7 +45,7 @@ abstract class AbstractSessionActorMsgProcessor extends AbstractContextAwareMsgP
         this.sessionId = sessionId;
     }
 
-    protected abstract void processToDeviceActorMsg(ActorContext ctx, ToDeviceActorSessionMsg msg);
+    protected abstract void processToDeviceActorMsg(ActorContext ctx, TransportToDeviceSessionActorMsg msg);
 
     protected abstract void processTimeoutMsg(ActorContext context, SessionTimeoutMsg msg);
 
@@ -62,12 +63,12 @@ abstract class AbstractSessionActorMsgProcessor extends AbstractContextAwareMsgP
     protected void cleanupSession(ActorContext ctx) {
     }
 
-    protected void updateSessionCtx(ToDeviceActorSessionMsg msg, SessionType type) {
+    protected void updateSessionCtx(TransportToDeviceSessionActorMsg msg, SessionType type) {
         sessionCtx = msg.getSessionMsg().getSessionContext();
         deviceToDeviceActorMsgPrototype = new BasicDeviceToDeviceActorMsg(msg, type);
     }
 
-    protected DeviceToDeviceActorMsg toDeviceMsg(ToDeviceActorSessionMsg msg) {
+    protected DeviceToDeviceActorMsg toDeviceMsg(TransportToDeviceSessionActorMsg msg) {
         AdaptorToSessionActorMsg adaptorMsg = msg.getSessionMsg();
         return new BasicDeviceToDeviceActorMsg(deviceToDeviceActorMsgPrototype, adaptorMsg.getMsg());
     }
@@ -86,23 +87,20 @@ abstract class AbstractSessionActorMsgProcessor extends AbstractContextAwareMsgP
         return address;
     }
 
-    protected Optional<ServerAddress> forwardToAppActorIfAdressChanged(ActorContext ctx, DeviceToDeviceActorMsg toForward, Optional<ServerAddress> oldAddress) {
+    protected Optional<ServerAddress> forwardToAppActorIfAddressChanged(ActorContext ctx, DeviceToDeviceActorMsg toForward, Optional<ServerAddress> oldAddress) {
+
         Optional<ServerAddress> newAddress = systemContext.getRoutingService().resolveById(toForward.getDeviceId());
         if (!newAddress.equals(oldAddress)) {
-            if (newAddress.isPresent()) {
-                systemContext.getRpcService().tell(newAddress.get(),
-                        toForward.toOtherAddress(systemContext.getRoutingService().getCurrentServer()));
-            } else {
-                getAppActor().tell(toForward, ctx.self());
-            }
+            getAppActor().tell(new SendToClusterMsg(toForward.getDeviceId(), toForward
+                    .toOtherAddress(systemContext.getRoutingService().getCurrentServer())), ctx.self());
         }
         return newAddress;
     }
 
     protected void forwardToAppActor(ActorContext ctx, DeviceToDeviceActorMsg toForward, Optional<ServerAddress> address) {
         if (address.isPresent()) {
-            systemContext.getRpcService().tell(address.get(),
-                    toForward.toOtherAddress(systemContext.getRoutingService().getCurrentServer()));
+            systemContext.getRpcService().tell(systemContext.getEncodingService().convertToProtoDataMessage(address.get(),
+                    toForward.toOtherAddress(systemContext.getRoutingService().getCurrentServer())));
         } else {
             getAppActor().tell(toForward, ctx.self());
         }
diff --git a/application/src/main/java/org/thingsboard/server/actors/session/ASyncMsgProcessor.java b/application/src/main/java/org/thingsboard/server/actors/session/ASyncMsgProcessor.java
index fa5287f..83d56e0 100644
--- a/application/src/main/java/org/thingsboard/server/actors/session/ASyncMsgProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/session/ASyncMsgProcessor.java
@@ -46,7 +46,7 @@ class ASyncMsgProcessor extends AbstractSessionActorMsgProcessor {
     }
 
     @Override
-    protected void processToDeviceActorMsg(ActorContext ctx, ToDeviceActorSessionMsg msg) {
+    protected void processToDeviceActorMsg(ActorContext ctx, TransportToDeviceSessionActorMsg msg) {
         updateSessionCtx(msg, SessionType.ASYNC);
         if (firstMsg) {
             toDeviceMsg(new SessionOpenMsg()).ifPresent(m -> forwardToAppActor(ctx, m));
diff --git a/application/src/main/java/org/thingsboard/server/actors/session/SessionActor.java b/application/src/main/java/org/thingsboard/server/actors/session/SessionActor.java
index 9d324c5..f67d46b 100644
--- a/application/src/main/java/org/thingsboard/server/actors/session/SessionActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/session/SessionActor.java
@@ -17,7 +17,6 @@ package org.thingsboard.server.actors.session;
 
 import akka.actor.OneForOneStrategy;
 import akka.actor.SupervisorStrategy;
-import akka.japi.Function;
 import org.thingsboard.server.actors.ActorSystemContext;
 import org.thingsboard.server.actors.service.ContextAwareActor;
 import org.thingsboard.server.actors.service.ContextBasedCreator;
@@ -25,8 +24,8 @@ import org.thingsboard.server.actors.shared.SessionTimeoutMsg;
 import org.thingsboard.server.common.data.id.SessionId;
 import org.thingsboard.server.common.msg.TbActorMsg;
 import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
-import org.thingsboard.server.common.msg.core.ToDeviceSessionActorMsg;
-import org.thingsboard.server.common.msg.session.ToDeviceActorSessionMsg;
+import org.thingsboard.server.common.msg.core.ActorSystemToDeviceSessionActorMsg;
+import org.thingsboard.server.common.msg.session.TransportToDeviceSessionActorMsg;
 import org.thingsboard.server.common.msg.session.SessionCtrlMsg;
 import org.thingsboard.server.common.msg.session.SessionMsg;
 import org.thingsboard.server.common.msg.session.SessionType;
@@ -63,38 +62,37 @@ public class SessionActor extends ContextAwareActor {
 
     @Override
     protected boolean process(TbActorMsg msg) {
-        //TODO Move everything here, to work with TbActorMsg
-        return false;
-    }
-
-    @Override
-    public void onReceive(Object msg) throws Exception {
-        logger.debug("[{}] Processing: {}.", sessionId, msg);
-        if (msg instanceof ToDeviceActorSessionMsg) {
-            processDeviceMsg((ToDeviceActorSessionMsg) msg);
-        } else if (msg instanceof ToDeviceSessionActorMsg) {
-            processToDeviceMsg((ToDeviceSessionActorMsg) msg);
-        } else if (msg instanceof SessionTimeoutMsg) {
-            processTimeoutMsg((SessionTimeoutMsg) msg);
-        } else if (msg instanceof SessionCtrlMsg) {
-            processSessionCtrlMsg((SessionCtrlMsg) msg);
-        } else if (msg instanceof ClusterEventMsg) {
-            processClusterEvent((ClusterEventMsg) msg);
-        } else {
-            logger.warning("[{}] Unknown msg: {}", sessionId, msg);
+        switch (msg.getMsgType()) {
+            case TRANSPORT_TO_DEVICE_SESSION_ACTOR_MSG:
+                processTransportToSessionMsg((TransportToDeviceSessionActorMsg) msg);
+                break;
+            case ACTOR_SYSTEM_TO_DEVICE_SESSION_ACTOR_MSG:
+                processActorsToSessionMsg((ActorSystemToDeviceSessionActorMsg) msg);
+                break;
+            case SESSION_TIMEOUT_MSG:
+                processTimeoutMsg((SessionTimeoutMsg) msg);
+                break;
+            case SESSION_CTRL_MSG:
+                processSessionCloseMsg((SessionCtrlMsg) msg);
+                break;
+            case CLUSTER_EVENT_MSG:
+                processClusterEvent((ClusterEventMsg) msg);
+                break;
+            default: return false;
         }
+        return true;
     }
 
     private void processClusterEvent(ClusterEventMsg msg) {
         processor.processClusterEvent(context(), msg);
     }
 
-    private void processDeviceMsg(ToDeviceActorSessionMsg msg) {
+    private void processTransportToSessionMsg(TransportToDeviceSessionActorMsg msg) {
         initProcessor(msg);
         processor.processToDeviceActorMsg(context(), msg);
     }
 
-    private void processToDeviceMsg(ToDeviceSessionActorMsg msg) {
+    private void processActorsToSessionMsg(ActorSystemToDeviceSessionActorMsg msg) {
         processor.processToDeviceMsg(context(), msg.getMsg());
     }
 
@@ -106,7 +104,7 @@ public class SessionActor extends ContextAwareActor {
         }
     }
 
-    private void processSessionCtrlMsg(SessionCtrlMsg msg) {
+    private void processSessionCloseMsg(SessionCtrlMsg msg) {
         if (processor != null) {
             processor.processSessionCtrlMsg(context(), msg);
         } else if (msg instanceof SessionCloseMsg) {
@@ -116,7 +114,7 @@ public class SessionActor extends ContextAwareActor {
         }
     }
 
-    private void initProcessor(ToDeviceActorSessionMsg msg) {
+    private void initProcessor(TransportToDeviceSessionActorMsg msg) {
         if (processor == null) {
             SessionMsg sessionMsg = (SessionMsg) msg.getSessionMsg();
             if (sessionMsg.getSessionContext().getSessionType() == SessionType.SYNC) {
diff --git a/application/src/main/java/org/thingsboard/server/actors/session/SessionManagerActor.java b/application/src/main/java/org/thingsboard/server/actors/session/SessionManagerActor.java
index b5b1791..1f8bb6d 100644
--- a/application/src/main/java/org/thingsboard/server/actors/session/SessionManagerActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/session/SessionManagerActor.java
@@ -17,7 +17,6 @@ package org.thingsboard.server.actors.session;
 
 import java.util.HashMap;
 import java.util.Map;
-import java.util.UUID;
 
 import akka.actor.*;
 import org.thingsboard.server.actors.ActorSystemContext;
@@ -33,8 +32,9 @@ import akka.event.Logging;
 import akka.event.LoggingAdapter;
 import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
 import org.thingsboard.server.common.msg.core.SessionCloseMsg;
-import org.thingsboard.server.common.msg.core.ToDeviceSessionActorMsg;
+import org.thingsboard.server.common.msg.core.ActorSystemToDeviceSessionActorMsg;
 import org.thingsboard.server.common.msg.session.SessionCtrlMsg;
+import org.thingsboard.server.common.msg.session.TransportToDeviceSessionActorMsg;
 
 public class SessionManagerActor extends ContextAwareActor {
 
@@ -104,7 +104,7 @@ public class SessionManagerActor extends ContextAwareActor {
     }
 
     private void forwardToSessionActor(SessionAwareMsg msg) {
-        if (msg instanceof ToDeviceSessionActorMsg || msg instanceof SessionCloseMsg) {
+        if (msg instanceof ActorSystemToDeviceSessionActorMsg || msg instanceof SessionCloseMsg) {
             String sessionIdStr = msg.getSessionId().toUidStr();
             ActorRef sessionActor = sessionActors.get(sessionIdStr);
             if (sessionActor != null) {
diff --git a/application/src/main/java/org/thingsboard/server/actors/session/SyncMsgProcessor.java b/application/src/main/java/org/thingsboard/server/actors/session/SyncMsgProcessor.java
index 7f520b5..a0f0a88 100644
--- a/application/src/main/java/org/thingsboard/server/actors/session/SyncMsgProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/session/SyncMsgProcessor.java
@@ -22,7 +22,7 @@ import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
 import org.thingsboard.server.common.msg.cluster.ServerAddress;
 import org.thingsboard.server.common.msg.device.DeviceToDeviceActorMsg;
 import org.thingsboard.server.common.msg.session.*;
-import org.thingsboard.server.common.msg.session.ToDeviceActorSessionMsg;
+import org.thingsboard.server.common.msg.session.TransportToDeviceSessionActorMsg;
 import org.thingsboard.server.common.msg.session.ctrl.SessionCloseMsg;
 import org.thingsboard.server.common.msg.session.ex.SessionException;
 
@@ -41,7 +41,7 @@ class SyncMsgProcessor extends AbstractSessionActorMsgProcessor {
     }
 
     @Override
-    protected void processToDeviceActorMsg(ActorContext ctx, ToDeviceActorSessionMsg msg) {
+    protected void processToDeviceActorMsg(ActorContext ctx, TransportToDeviceSessionActorMsg msg) {
         updateSessionCtx(msg, SessionType.SYNC);
         pendingMsg = toDeviceMsg(msg);
         pendingResponse = true;
@@ -73,7 +73,7 @@ class SyncMsgProcessor extends AbstractSessionActorMsgProcessor {
     @Override
     public void processClusterEvent(ActorContext context, ClusterEventMsg msg) {
         if (pendingResponse) {
-            Optional<ServerAddress> newTargetServer = forwardToAppActorIfAdressChanged(context, pendingMsg, currentTargetServer);
+            Optional<ServerAddress> newTargetServer = forwardToAppActorIfAddressChanged(context, pendingMsg, currentTargetServer);
             if (logger.isDebugEnabled()) {
                 if (!newTargetServer.equals(currentTargetServer)) {
                     if (newTargetServer.isPresent()) {
diff --git a/application/src/main/java/org/thingsboard/server/actors/shared/ComponentMsgProcessor.java b/application/src/main/java/org/thingsboard/server/actors/shared/ComponentMsgProcessor.java
index 4e9b8db..771b85b 100644
--- a/application/src/main/java/org/thingsboard/server/actors/shared/ComponentMsgProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/shared/ComponentMsgProcessor.java
@@ -89,7 +89,7 @@ public abstract class ComponentMsgProcessor<T extends EntityId> extends Abstract
 
     protected void putToQueue(final TbMsg tbMsg, final Consumer<TbMsg> onSuccess) {
         EntityId entityId = tbMsg.getRuleNodeId() != null ? tbMsg.getRuleNodeId() : tbMsg.getRuleChainId();
-        Futures.addCallback(queue.put(this.tenantId, tbMsg, entityId.getId(), 0), new FutureCallback<Void>() {
+        Futures.addCallback(queue.put(this.tenantId, tbMsg, entityId.getId(), tbMsg.getClusterPartition()), new FutureCallback<Void>() {
             @Override
             public void onSuccess(@Nullable Void result) {
                 onSuccess.accept(tbMsg);
diff --git a/application/src/main/java/org/thingsboard/server/actors/shared/SessionTimeoutMsg.java b/application/src/main/java/org/thingsboard/server/actors/shared/SessionTimeoutMsg.java
index 7d6dbca..d015fe0 100644
--- a/application/src/main/java/org/thingsboard/server/actors/shared/SessionTimeoutMsg.java
+++ b/application/src/main/java/org/thingsboard/server/actors/shared/SessionTimeoutMsg.java
@@ -17,13 +17,20 @@ package org.thingsboard.server.actors.shared;
 
 import lombok.Data;
 import org.thingsboard.server.common.data.id.SessionId;
+import org.thingsboard.server.common.msg.MsgType;
+import org.thingsboard.server.common.msg.TbActorMsg;
 
 import java.io.Serializable;
 
 @Data
-public class SessionTimeoutMsg implements Serializable {
+public class SessionTimeoutMsg implements Serializable, TbActorMsg {
 
     private static final long serialVersionUID = 1L;
 
     private final SessionId sessionId;
+
+    @Override
+    public MsgType getMsgType() {
+        return MsgType.SESSION_TIMEOUT_MSG;
+    }
 }
diff --git a/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ServerInstance.java b/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ServerInstance.java
index 03c9694..bc386eb 100644
--- a/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ServerInstance.java
+++ b/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ServerInstance.java
@@ -20,7 +20,7 @@ import lombok.EqualsAndHashCode;
 import lombok.Getter;
 import lombok.ToString;
 import org.thingsboard.server.common.msg.cluster.ServerAddress;
-import org.thingsboard.server.gen.discovery.ServerInstanceProtos.ServerInfo;
+import org.thingsboard.server.gen.discovery.ServerInstanceProtos;
 
 /**
  * @author Andrew Shvayka
@@ -29,8 +29,6 @@ import org.thingsboard.server.gen.discovery.ServerInstanceProtos.ServerInfo;
 @EqualsAndHashCode(exclude = {"serverInfo", "serverAddress"})
 public final class ServerInstance implements Comparable<ServerInstance> {
 
-    @Getter(AccessLevel.PACKAGE)
-    private final ServerInfo serverInfo;
     @Getter
     private final String host;
     @Getter
@@ -38,8 +36,13 @@ public final class ServerInstance implements Comparable<ServerInstance> {
     @Getter
     private final ServerAddress serverAddress;
 
-    public ServerInstance(ServerInfo serverInfo) {
-        this.serverInfo = serverInfo;
+    public ServerInstance(ServerAddress serverAddress) {
+        this.serverAddress = serverAddress;
+        this.host = serverAddress.getHost();
+        this.port = serverAddress.getPort();
+    }
+
+    public ServerInstance(ServerInstanceProtos.ServerInfo serverInfo) {
         this.host = serverInfo.getHost();
         this.port = serverInfo.getPort();
         this.serverAddress = new ServerAddress(host, port);
diff --git a/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ZkDiscoveryService.java b/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ZkDiscoveryService.java
index 818d2b1..6002b0e 100644
--- a/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ZkDiscoveryService.java
+++ b/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ZkDiscoveryService.java
@@ -15,8 +15,9 @@
  */
 package org.thingsboard.server.service.cluster.discovery;
 
-import com.google.protobuf.InvalidProtocolBufferException;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.SerializationException;
+import org.apache.commons.lang3.SerializationUtils;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.recipes.cache.ChildData;
@@ -31,15 +32,17 @@ import org.springframework.beans.factory.annotation.Value;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 import org.springframework.boot.context.event.ApplicationReadyEvent;
 import org.springframework.context.ApplicationListener;
+import org.springframework.context.annotation.Lazy;
 import org.springframework.stereotype.Service;
 import org.springframework.util.Assert;
-import org.thingsboard.server.gen.discovery.ServerInstanceProtos.ServerInfo;
+import org.thingsboard.server.common.msg.cluster.ServerAddress;
+import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService;
 import org.thingsboard.server.utils.MiscUtils;
 
 import javax.annotation.PostConstruct;
 import javax.annotation.PreDestroy;
-import java.io.IOException;
 import java.util.List;
+import java.util.NoSuchElementException;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.stream.Collectors;
 
@@ -67,6 +70,10 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
     @Autowired
     private ServerInstanceService serverInstance;
 
+    @Autowired
+    @Lazy
+    private TelemetrySubscriptionService tsSubService;
+
     private final List<DiscoveryServiceListener> listeners = new CopyOnWriteArrayList<>();
 
     private CuratorFramework client;
@@ -113,7 +120,7 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
             log.info("[{}:{}] Creating ZK node for current instance", self.getHost(), self.getPort());
             nodePath = client.create()
                     .creatingParentsIfNeeded()
-                    .withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(zkNodesDir + "/", self.getServerInfo().toByteArray());
+                    .withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(zkNodesDir + "/", SerializationUtils.serialize(self.getServerAddress()));
             log.info("[{}:{}] Created ZK node for current instance: {}", self.getHost(), self.getPort(), nodePath);
         } catch (Exception e) {
             log.error("Failed to create ZK node", e);
@@ -144,8 +151,8 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
                 .filter(cd -> !cd.getPath().equals(nodePath))
                 .map(cd -> {
                     try {
-                        return new ServerInstance(ServerInfo.parseFrom(cd.getData()));
-                    } catch (InvalidProtocolBufferException e) {
+                        return new ServerInstance( (ServerAddress) SerializationUtils.deserialize(cd.getData()));
+                    } catch (NoSuchElementException e) {
                         log.error("Failed to decode ZK node", e);
                         throw new RuntimeException(e);
                     }
@@ -186,20 +193,23 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
         }
         ServerInstance instance;
         try {
-            instance = new ServerInstance(ServerInfo.parseFrom(data.getData()));
-        } catch (IOException e) {
+            ServerAddress serverAddress  = SerializationUtils.deserialize(data.getData());
+            instance = new ServerInstance(serverAddress);
+        } catch (SerializationException e) {
             log.error("Failed to decode server instance for node {}", data.getPath(), e);
             throw e;
         }
         log.info("Processing [{}] event for [{}:{}]", pathChildrenCacheEvent.getType(), instance.getHost(), instance.getPort());
         switch (pathChildrenCacheEvent.getType()) {
             case CHILD_ADDED:
+                tsSubService.onClusterUpdate();
                 listeners.forEach(listener -> listener.onServerAdded(instance));
                 break;
             case CHILD_UPDATED:
                 listeners.forEach(listener -> listener.onServerUpdated(instance));
                 break;
             case CHILD_REMOVED:
+                tsSubService.onClusterUpdate();
                 listeners.forEach(listener -> listener.onServerRemoved(instance));
                 break;
             default:
diff --git a/application/src/main/java/org/thingsboard/server/service/cluster/routing/ConsistentClusterRoutingService.java b/application/src/main/java/org/thingsboard/server/service/cluster/routing/ConsistentClusterRoutingService.java
index 4067797..8ab6f99 100644
--- a/application/src/main/java/org/thingsboard/server/service/cluster/routing/ConsistentClusterRoutingService.java
+++ b/application/src/main/java/org/thingsboard/server/service/cluster/routing/ConsistentClusterRoutingService.java
@@ -107,7 +107,7 @@ public class ConsistentClusterRoutingService implements ClusterRoutingService, D
 
     @Override
     public void onServerAdded(ServerInstance server) {
-        log.debug("On server added event: {}", server);
+        log.info("On server added event: {}", server);
         addNode(server);
         logCircle();
     }
@@ -119,7 +119,7 @@ public class ConsistentClusterRoutingService implements ClusterRoutingService, D
 
     @Override
     public void onServerRemoved(ServerInstance server) {
-        log.debug("On server removed event: {}", server);
+        log.info("On server removed event: {}", server);
         removeNode(server);
         logCircle();
     }
diff --git a/application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterGrpcService.java b/application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterGrpcService.java
index 27334c6..9236219 100644
--- a/application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterGrpcService.java
+++ b/application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterGrpcService.java
@@ -22,29 +22,23 @@ import io.grpc.stub.StreamObserver;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
-import org.springframework.util.SerializationUtils;
 import org.thingsboard.server.actors.rpc.RpcBroadcastMsg;
 import org.thingsboard.server.actors.rpc.RpcSessionCreateRequestMsg;
-import org.thingsboard.server.actors.rpc.RpcSessionTellMsg;
-import org.thingsboard.server.common.data.id.EntityId;
+import org.thingsboard.server.common.msg.TbActorMsg;
 import org.thingsboard.server.common.msg.cluster.ServerAddress;
-import org.thingsboard.server.common.msg.cluster.ToAllNodesMsg;
-import org.thingsboard.server.common.msg.core.ToDeviceSessionActorMsg;
-import org.thingsboard.server.common.msg.device.DeviceToDeviceActorMsg;
-import org.thingsboard.server.extensions.api.device.ToDeviceActorNotificationMsg;
-import org.thingsboard.server.extensions.api.plugins.msg.FromDeviceRpcResponse;
-import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest;
-import org.thingsboard.server.extensions.api.plugins.msg.ToPluginRpcResponseDeviceMsg;
-import org.thingsboard.server.extensions.api.plugins.rpc.PluginRpcMsg;
+
 import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
+
 import org.thingsboard.server.gen.cluster.ClusterRpcServiceGrpc;
 import org.thingsboard.server.service.cluster.discovery.ServerInstance;
 import org.thingsboard.server.service.cluster.discovery.ServerInstanceService;
-import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg;
+import org.thingsboard.server.service.encoding.DataDecodingEncodingService;
 
 import javax.annotation.PreDestroy;
 import java.io.IOException;
 import java.util.UUID;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
@@ -58,13 +52,17 @@ public class ClusterGrpcService extends ClusterRpcServiceGrpc.ClusterRpcServiceI
     @Autowired
     private ServerInstanceService instanceService;
 
+    @Autowired
+    private DataDecodingEncodingService encodingService;
+
     private RpcMsgListener listener;
 
     private Server server;
 
     private ServerInstance instance;
 
-    private ConcurrentMap<UUID, RpcSessionCreationFuture> pendingSessionMap = new ConcurrentHashMap<>();
+    private ConcurrentMap<UUID, BlockingQueue<StreamObserver<ClusterAPIProtos.ClusterMessage>>> pendingSessionMap =
+            new ConcurrentHashMap<>();
 
     public void init(RpcMsgListener listener) {
         this.listener = listener;
@@ -82,11 +80,11 @@ public class ClusterGrpcService extends ClusterRpcServiceGrpc.ClusterRpcServiceI
     }
 
     @Override
-    public void onSessionCreated(UUID msgUid, StreamObserver<ClusterAPIProtos.ToRpcServerMessage> msg) {
-        RpcSessionCreationFuture future = pendingSessionMap.remove(msgUid);
-        if (future != null) {
+    public void onSessionCreated(UUID msgUid, StreamObserver<ClusterAPIProtos.ClusterMessage> inputStream) {
+        BlockingQueue<StreamObserver<ClusterAPIProtos.ClusterMessage>> queue = pendingSessionMap.remove(msgUid);
+        if (queue != null) {
             try {
-                future.onMsg(msg);
+                queue.put(inputStream);
             } catch (InterruptedException e) {
                 log.warn("Failed to report created session!");
                 Thread.currentThread().interrupt();
@@ -97,11 +95,13 @@ public class ClusterGrpcService extends ClusterRpcServiceGrpc.ClusterRpcServiceI
     }
 
     @Override
-    public StreamObserver<ClusterAPIProtos.ToRpcServerMessage> handlePluginMsgs(StreamObserver<ClusterAPIProtos.ToRpcServerMessage> responseObserver) {
+    public StreamObserver<ClusterAPIProtos.ClusterMessage> handleMsgs(
+            StreamObserver<ClusterAPIProtos.ClusterMessage> responseObserver) {
         log.info("Processing new session.");
         return createSession(new RpcSessionCreateRequestMsg(UUID.randomUUID(), null, responseObserver));
     }
 
+
     @PreDestroy
     public void stop() {
         if (server != null) {
@@ -117,65 +117,18 @@ public class ClusterGrpcService extends ClusterRpcServiceGrpc.ClusterRpcServiceI
         }
     }
 
-    @Override
-    public void tell(ServerAddress serverAddress, DeviceToDeviceActorMsg toForward) {
-        ClusterAPIProtos.ToRpcServerMessage msg = ClusterAPIProtos.ToRpcServerMessage.newBuilder()
-                .setToDeviceActorRpcMsg(toProtoMsg(toForward)).build();
-        tell(serverAddress, msg);
-    }
-
-    @Override
-    public void tell(ServerAddress serverAddress, ToDeviceActorNotificationMsg toForward) {
-        ClusterAPIProtos.ToRpcServerMessage msg = ClusterAPIProtos.ToRpcServerMessage.newBuilder()
-                .setToDeviceActorNotificationRpcMsg(toProtoMsg(toForward)).build();
-        tell(serverAddress, msg);
-    }
 
     @Override
-    public void tell(ServerAddress serverAddress, ToDeviceRpcRequestActorMsg toForward) {
-        ClusterAPIProtos.ToRpcServerMessage msg = ClusterAPIProtos.ToRpcServerMessage.newBuilder()
-                .setToDeviceRpcRequestRpcMsg(toProtoMsg(toForward)).build();
-        tell(serverAddress, msg);
-    }
-
-    @Override
-    public void tell(ServerAddress serverAddress, ToPluginRpcResponseDeviceMsg toForward) {
-        ClusterAPIProtos.ToRpcServerMessage msg = ClusterAPIProtos.ToRpcServerMessage.newBuilder()
-                .setToPluginRpcResponseRpcMsg(toProtoMsg(toForward)).build();
-        tell(serverAddress, msg);
-    }
-
-    @Override
-    public void tell(ServerAddress serverAddress, ToDeviceSessionActorMsg toForward) {
-        ClusterAPIProtos.ToRpcServerMessage msg = ClusterAPIProtos.ToRpcServerMessage.newBuilder()
-                .setToDeviceSessionActorRpcMsg(toProtoMsg(toForward)).build();
-        tell(serverAddress, msg);
-    }
-
-    @Override
-    public void tell(PluginRpcMsg toForward) {
-        ClusterAPIProtos.ToRpcServerMessage msg = ClusterAPIProtos.ToRpcServerMessage.newBuilder()
-                .setToPluginRpcMsg(toProtoMsg(toForward)).build();
-        tell(toForward.getRpcMsg().getServerAddress(), msg);
-    }
-
-    @Override
-    public void broadcast(ToAllNodesMsg toForward) {
-        ClusterAPIProtos.ToRpcServerMessage msg = ClusterAPIProtos.ToRpcServerMessage.newBuilder()
-                .setToAllNodesRpcMsg(toProtoMsg(toForward)).build();
-        listener.onMsg(new RpcBroadcastMsg(msg));
-    }
-
-    private void tell(ServerAddress serverAddress, ClusterAPIProtos.ToRpcServerMessage msg) {
-        listener.onMsg(new RpcSessionTellMsg(serverAddress, msg));
+    public void broadcast(RpcBroadcastMsg msg) {
+        listener.onBroadcastMsg(msg);
     }
 
-    private StreamObserver<ClusterAPIProtos.ToRpcServerMessage> createSession(RpcSessionCreateRequestMsg msg) {
-        RpcSessionCreationFuture future = new RpcSessionCreationFuture();
-        pendingSessionMap.put(msg.getMsgUid(), future);
-        listener.onMsg(msg);
+    private StreamObserver<ClusterAPIProtos.ClusterMessage> createSession(RpcSessionCreateRequestMsg msg) {
+        BlockingQueue<StreamObserver<ClusterAPIProtos.ClusterMessage>> queue = new ArrayBlockingQueue<>(1);
+        pendingSessionMap.put(msg.getMsgUid(), queue);
+        listener.onRpcSessionCreateRequestMsg(msg);
         try {
-            StreamObserver<ClusterAPIProtos.ToRpcServerMessage> observer = future.get();
+            StreamObserver<ClusterAPIProtos.ClusterMessage> observer = queue.take();
             log.info("Processed new session.");
             return observer;
         } catch (Exception e) {
@@ -184,76 +137,27 @@ public class ClusterGrpcService extends ClusterRpcServiceGrpc.ClusterRpcServiceI
         }
     }
 
-    private static ClusterAPIProtos.ToDeviceActorRpcMessage toProtoMsg(DeviceToDeviceActorMsg msg) {
-        return ClusterAPIProtos.ToDeviceActorRpcMessage.newBuilder().setData(
-                ByteString.copyFrom(SerializationUtils.serialize(msg))
-        ).build();
-    }
-
-    private static ClusterAPIProtos.ToDeviceActorNotificationRpcMessage toProtoMsg(ToDeviceActorNotificationMsg msg) {
-        return ClusterAPIProtos.ToDeviceActorNotificationRpcMessage.newBuilder().setData(
-                ByteString.copyFrom(SerializationUtils.serialize(msg))
-        ).build();
-    }
-
-    private static ClusterAPIProtos.ToDeviceRpcRequestRpcMessage toProtoMsg(ToDeviceRpcRequestActorMsg msg) {
-        ClusterAPIProtos.ToDeviceRpcRequestRpcMessage.Builder builder = ClusterAPIProtos.ToDeviceRpcRequestRpcMessage.newBuilder();
-        ToDeviceRpcRequest request = msg.getMsg();
-
-        builder.setDeviceTenantId(toUid(msg.getTenantId()));
-        builder.setDeviceId(toUid(msg.getDeviceId()));
-
-        builder.setMsgId(toUid(request.getId()));
-        builder.setOneway(request.isOneway());
-        builder.setExpTime(request.getExpirationTime());
-        builder.setMethod(request.getBody().getMethod());
-        builder.setParams(request.getBody().getParams());
-
-        return builder.build();
-    }
-
-    private static ClusterAPIProtos.ToPluginRpcResponseRpcMessage toProtoMsg(ToPluginRpcResponseDeviceMsg msg) {
-        ClusterAPIProtos.ToPluginRpcResponseRpcMessage.Builder builder = ClusterAPIProtos.ToPluginRpcResponseRpcMessage.newBuilder();
-        FromDeviceRpcResponse request = msg.getResponse();
-
-        builder.setMsgId(toUid(request.getId()));
-        request.getResponse().ifPresent(builder::setResponse);
-        request.getError().ifPresent(e -> builder.setError(e.name()));
-
-        return builder.build();
-    }
-
-    private ClusterAPIProtos.ToAllNodesRpcMessage toProtoMsg(ToAllNodesMsg msg) {
-        return ClusterAPIProtos.ToAllNodesRpcMessage.newBuilder().setData(
-                ByteString.copyFrom(SerializationUtils.serialize(msg))
-        ).build();
-    }
-
-
-    private ClusterAPIProtos.ToPluginRpcMessage toProtoMsg(PluginRpcMsg msg) {
-        return ClusterAPIProtos.ToPluginRpcMessage.newBuilder()
-                .setClazz(msg.getRpcMsg().getMsgClazz())
-                .setData(ByteString.copyFrom(msg.getRpcMsg().getMsgData()))
-                .setAddress(ClusterAPIProtos.PluginAddress.newBuilder()
-                        .setTenantId(toUid(msg.getPluginTenantId().getId()))
-                        .setPluginId(toUid(msg.getPluginId().getId()))
-                        .build()
-                ).build();
-    }
-
-    private static ClusterAPIProtos.Uid toUid(EntityId uuid) {
-        return toUid(uuid.getId());
+    @Override
+    public void tell(ClusterAPIProtos.ClusterMessage message) {
+        listener.onSendMsg(message);
     }
 
-    private static ClusterAPIProtos.Uid toUid(UUID uuid) {
-        return ClusterAPIProtos.Uid.newBuilder().setPluginUuidMsb(uuid.getMostSignificantBits()).setPluginUuidLsb(
-                uuid.getLeastSignificantBits()).build();
+    @Override
+    public void tell(ServerAddress serverAddress, TbActorMsg actorMsg) {
+        listener.onSendMsg(encodingService.convertToProtoDataMessage(serverAddress, actorMsg));
     }
 
-    private static ClusterAPIProtos.ToDeviceSessionActorRpcMessage toProtoMsg(ToDeviceSessionActorMsg msg) {
-        return ClusterAPIProtos.ToDeviceSessionActorRpcMessage.newBuilder().setData(
-                ByteString.copyFrom(SerializationUtils.serialize(msg))
-        ).build();
+    @Override
+    public void tell(ServerAddress serverAddress, ClusterAPIProtos.MessageType msgType, byte[] data) {
+        ClusterAPIProtos.ClusterMessage msg = ClusterAPIProtos.ClusterMessage
+                .newBuilder()
+                .setServerAddress(ClusterAPIProtos.ServerAddress
+                        .newBuilder()
+                        .setHost(serverAddress.getHost())
+                        .setPort(serverAddress.getPort())
+                        .build())
+                .setMessageType(msgType)
+                .setPayload(ByteString.copyFrom(data)).build();
+        listener.onSendMsg(msg);
     }
-
 }
diff --git a/application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterRpcService.java b/application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterRpcService.java
index 6aefe46..de29b89 100644
--- a/application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterRpcService.java
+++ b/application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterRpcService.java
@@ -16,15 +16,10 @@
 package org.thingsboard.server.service.cluster.rpc;
 
 import io.grpc.stub.StreamObserver;
+import org.thingsboard.server.actors.rpc.RpcBroadcastMsg;
+import org.thingsboard.server.common.msg.TbActorMsg;
 import org.thingsboard.server.common.msg.cluster.ServerAddress;
-import org.thingsboard.server.common.msg.cluster.ToAllNodesMsg;
-import org.thingsboard.server.common.msg.core.ToDeviceSessionActorMsg;
-import org.thingsboard.server.common.msg.device.DeviceToDeviceActorMsg;
-import org.thingsboard.server.extensions.api.device.ToDeviceActorNotificationMsg;
-import org.thingsboard.server.extensions.api.plugins.msg.ToPluginRpcResponseDeviceMsg;
-import org.thingsboard.server.extensions.api.plugins.rpc.PluginRpcMsg;
 import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
-import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg;
 
 import java.util.UUID;
 
@@ -35,20 +30,13 @@ public interface ClusterRpcService {
 
     void init(RpcMsgListener listener);
 
-    void tell(ServerAddress serverAddress, DeviceToDeviceActorMsg toForward);
+    void broadcast(RpcBroadcastMsg msg);
 
-    void tell(ServerAddress serverAddress, ToDeviceSessionActorMsg toForward);
+    void onSessionCreated(UUID msgUid, StreamObserver<ClusterAPIProtos.ClusterMessage> inputStream);
 
-    void tell(ServerAddress serverAddress, ToDeviceActorNotificationMsg toForward);
+    void tell(ClusterAPIProtos.ClusterMessage message);
 
-    void tell(ServerAddress serverAddress, ToDeviceRpcRequestActorMsg toForward);
-
-    void tell(ServerAddress serverAddress, ToPluginRpcResponseDeviceMsg toForward);
-
-    void tell(PluginRpcMsg toForward);
-
-    void broadcast(ToAllNodesMsg msg);
-
-    void onSessionCreated(UUID msgUid, StreamObserver<ClusterAPIProtos.ToRpcServerMessage> inputStream);
+    void tell(ServerAddress serverAddress, TbActorMsg actorMsg);
 
+    void tell(ServerAddress serverAddress, ClusterAPIProtos.MessageType msgType, byte[] data);
 }
diff --git a/application/src/main/java/org/thingsboard/server/service/cluster/rpc/GrpcSession.java b/application/src/main/java/org/thingsboard/server/service/cluster/rpc/GrpcSession.java
index c403895..7216c43 100644
--- a/application/src/main/java/org/thingsboard/server/service/cluster/rpc/GrpcSession.java
+++ b/application/src/main/java/org/thingsboard/server/service/cluster/rpc/GrpcSession.java
@@ -33,8 +33,8 @@ public final class GrpcSession implements Closeable {
     private final UUID sessionId;
     private final boolean client;
     private final GrpcSessionListener listener;
-    private StreamObserver<ClusterAPIProtos.ToRpcServerMessage> inputStream;
-    private StreamObserver<ClusterAPIProtos.ToRpcServerMessage> outputStream;
+    private StreamObserver<ClusterAPIProtos.ClusterMessage> inputStream;
+    private StreamObserver<ClusterAPIProtos.ClusterMessage> outputStream;
 
     private boolean connected;
     private ServerAddress remoteServer;
@@ -56,17 +56,17 @@ public final class GrpcSession implements Closeable {
     }
 
     public void initInputStream() {
-        this.inputStream = new StreamObserver<ClusterAPIProtos.ToRpcServerMessage>() {
+        this.inputStream = new StreamObserver<ClusterAPIProtos.ClusterMessage>() {
             @Override
-            public void onNext(ClusterAPIProtos.ToRpcServerMessage msg) {
-                if (!connected && msg.hasConnectMsg()) {
+            public void onNext(ClusterAPIProtos.ClusterMessage clusterMessage) {
+                if (!connected && clusterMessage.getMessageType() == ClusterAPIProtos.MessageType.CONNECT_RPC_MESSAGE) {
                     connected = true;
-                    ClusterAPIProtos.ServerAddress rpcAddress = msg.getConnectMsg().getServerAddress();
+                    ServerAddress rpcAddress = new ServerAddress(clusterMessage.getServerAddress().getHost(), clusterMessage.getServerAddress().getPort());
                     remoteServer = new ServerAddress(rpcAddress.getHost(), rpcAddress.getPort());
                     listener.onConnected(GrpcSession.this);
                 }
                 if (connected) {
-                    handleToRpcServerMessage(msg);
+                    listener.onReceiveClusterGrpcMsg(GrpcSession.this, clusterMessage);
                 }
             }
 
@@ -83,37 +83,13 @@ public final class GrpcSession implements Closeable {
         };
     }
 
-    private void handleToRpcServerMessage(ClusterAPIProtos.ToRpcServerMessage msg) {
-        if (msg.hasToPluginRpcMsg()) {
-            listener.onToPluginRpcMsg(GrpcSession.this, msg.getToPluginRpcMsg());
-        }
-        if (msg.hasToDeviceActorRpcMsg()) {
-            listener.onToDeviceActorRpcMsg(GrpcSession.this, msg.getToDeviceActorRpcMsg());
-        }
-        if (msg.hasToDeviceSessionActorRpcMsg()) {
-            listener.onToDeviceSessionActorRpcMsg(GrpcSession.this, msg.getToDeviceSessionActorRpcMsg());
-        }
-        if (msg.hasToDeviceActorNotificationRpcMsg()) {
-            listener.onToDeviceActorNotificationRpcMsg(GrpcSession.this, msg.getToDeviceActorNotificationRpcMsg());
-        }
-        if (msg.hasToDeviceRpcRequestRpcMsg()) {
-            listener.onToDeviceRpcRequestRpcMsg(GrpcSession.this, msg.getToDeviceRpcRequestRpcMsg());
-        }
-        if (msg.hasToPluginRpcResponseRpcMsg()) {
-            listener.onFromDeviceRpcResponseRpcMsg(GrpcSession.this, msg.getToPluginRpcResponseRpcMsg());
-        }
-        if (msg.hasToAllNodesRpcMsg()) {
-            listener.onToAllNodesRpcMessage(GrpcSession.this, msg.getToAllNodesRpcMsg());
-        }
-    }
-
     public void initOutputStream() {
         if (client) {
             listener.onConnected(GrpcSession.this);
         }
     }
 
-    public void sendMsg(ClusterAPIProtos.ToRpcServerMessage msg) {
+    public void sendMsg(ClusterAPIProtos.ClusterMessage msg) {
         outputStream.onNext(msg);
     }
 
diff --git a/application/src/main/java/org/thingsboard/server/service/cluster/rpc/GrpcSessionListener.java b/application/src/main/java/org/thingsboard/server/service/cluster/rpc/GrpcSessionListener.java
index 44e0693..266b1f5 100644
--- a/application/src/main/java/org/thingsboard/server/service/cluster/rpc/GrpcSessionListener.java
+++ b/application/src/main/java/org/thingsboard/server/service/cluster/rpc/GrpcSessionListener.java
@@ -26,20 +26,7 @@ public interface GrpcSessionListener {
 
     void onDisconnected(GrpcSession session);
 
-    void onToPluginRpcMsg(GrpcSession session, ClusterAPIProtos.ToPluginRpcMessage msg);
-
-    void onToDeviceActorRpcMsg(GrpcSession session, ClusterAPIProtos.ToDeviceActorRpcMessage msg);
-
-    void onToDeviceActorNotificationRpcMsg(GrpcSession grpcSession, ClusterAPIProtos.ToDeviceActorNotificationRpcMessage msg);
-
-    void onToDeviceSessionActorRpcMsg(GrpcSession session, ClusterAPIProtos.ToDeviceSessionActorRpcMessage msg);
-
-    void onToAllNodesRpcMessage(GrpcSession grpcSession, ClusterAPIProtos.ToAllNodesRpcMessage toAllNodesRpcMessage);
-
-    void onToDeviceRpcRequestRpcMsg(GrpcSession grpcSession, ClusterAPIProtos.ToDeviceRpcRequestRpcMessage toDeviceRpcRequestRpcMsg);
-
-    void onFromDeviceRpcResponseRpcMsg(GrpcSession grpcSession, ClusterAPIProtos.ToPluginRpcResponseRpcMessage toPluginRpcResponseRpcMsg);
+    void onReceiveClusterGrpcMsg(GrpcSession session, ClusterAPIProtos.ClusterMessage clusterMessage);
 
     void onError(GrpcSession session, Throwable t);
-
 }
diff --git a/application/src/main/java/org/thingsboard/server/service/cluster/rpc/RpcMsgListener.java b/application/src/main/java/org/thingsboard/server/service/cluster/rpc/RpcMsgListener.java
index 5d26fae..33f3847 100644
--- a/application/src/main/java/org/thingsboard/server/service/cluster/rpc/RpcMsgListener.java
+++ b/application/src/main/java/org/thingsboard/server/service/cluster/rpc/RpcMsgListener.java
@@ -17,32 +17,16 @@ package org.thingsboard.server.service.cluster.rpc;
 
 import org.thingsboard.server.actors.rpc.RpcBroadcastMsg;
 import org.thingsboard.server.actors.rpc.RpcSessionCreateRequestMsg;
-import org.thingsboard.server.actors.rpc.RpcSessionTellMsg;
-import org.thingsboard.server.common.msg.cluster.ToAllNodesMsg;
-import org.thingsboard.server.common.msg.core.ToDeviceSessionActorMsg;
-import org.thingsboard.server.common.msg.device.DeviceToDeviceActorMsg;
-import org.thingsboard.server.extensions.api.device.ToDeviceActorNotificationMsg;
-import org.thingsboard.server.extensions.api.plugins.msg.ToPluginActorMsg;
+import org.thingsboard.server.common.msg.cluster.ServerAddress;
+import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
 
 /**
  * @author Andrew Shvayka
  */
-public interface RpcMsgListener {
-
-    void onMsg(DeviceToDeviceActorMsg msg);
-
-    void onMsg(ToDeviceActorNotificationMsg msg);
-
-    void onMsg(ToDeviceSessionActorMsg msg);
-
-    void onMsg(ToAllNodesMsg nodeMsg);
-
-    void onMsg(ToPluginActorMsg msg);
-
-    void onMsg(RpcSessionCreateRequestMsg msg);
-
-    void onMsg(RpcSessionTellMsg rpcSessionTellMsg);
-
-    void onMsg(RpcBroadcastMsg rpcBroadcastMsg);
 
+public interface RpcMsgListener {
+    void onReceivedMsg(ServerAddress remoteServer, ClusterAPIProtos.ClusterMessage msg);
+    void onSendMsg(ClusterAPIProtos.ClusterMessage msg);
+    void onRpcSessionCreateRequestMsg(RpcSessionCreateRequestMsg msg);
+    void onBroadcastMsg(RpcBroadcastMsg msg);
 }
diff --git a/application/src/main/java/org/thingsboard/server/service/encoding/DataDecodingEncodingService.java b/application/src/main/java/org/thingsboard/server/service/encoding/DataDecodingEncodingService.java
new file mode 100644
index 0000000..248e9f3
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/service/encoding/DataDecodingEncodingService.java
@@ -0,0 +1,34 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.service.encoding;
+
+import org.thingsboard.server.common.msg.TbActorMsg;
+import org.thingsboard.server.common.msg.cluster.ServerAddress;
+import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
+
+import java.util.Optional;
+
+public interface DataDecodingEncodingService {
+
+    Optional<TbActorMsg> decode(byte[] byteArray);
+
+    byte[] encode(TbActorMsg msq);
+
+    ClusterAPIProtos.ClusterMessage convertToProtoDataMessage(ServerAddress serverAddress,
+                                                              TbActorMsg msg);
+
+}
+
diff --git a/application/src/main/java/org/thingsboard/server/service/encoding/ProtoWithJavaSerializationDecodingEncodingService.java b/application/src/main/java/org/thingsboard/server/service/encoding/ProtoWithJavaSerializationDecodingEncodingService.java
new file mode 100644
index 0000000..2cf9299
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/service/encoding/ProtoWithJavaSerializationDecodingEncodingService.java
@@ -0,0 +1,67 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.service.encoding;
+
+import com.google.protobuf.ByteString;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Service;
+import org.springframework.util.SerializationUtils;
+import org.thingsboard.server.common.msg.TbActorMsg;
+import org.thingsboard.server.common.msg.cluster.ServerAddress;
+import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
+
+import java.util.Optional;
+
+import static org.thingsboard.server.gen.cluster.ClusterAPIProtos.MessageType.CLUSTER_ACTOR_MESSAGE;
+
+
+@Slf4j
+@Service
+public class ProtoWithJavaSerializationDecodingEncodingService implements DataDecodingEncodingService {
+
+
+    @Override
+    public Optional<TbActorMsg> decode(byte[] byteArray) {
+        try {
+            TbActorMsg msg = (TbActorMsg) SerializationUtils.deserialize(byteArray);
+            return Optional.of(msg);
+
+        } catch (IllegalArgumentException e) {
+            log.error("Error during deserialization message, [{}]", e.getMessage());
+           return Optional.empty();
+        }
+    }
+
+    @Override
+    public byte[] encode(TbActorMsg msq) {
+        return SerializationUtils.serialize(msq);
+    }
+
+    @Override
+    public ClusterAPIProtos.ClusterMessage convertToProtoDataMessage(ServerAddress serverAddress,
+                                                                     TbActorMsg msg) {
+        return ClusterAPIProtos.ClusterMessage
+                .newBuilder()
+                .setServerAddress(ClusterAPIProtos.ServerAddress
+                        .newBuilder()
+                        .setHost(serverAddress.getHost())
+                        .setPort(serverAddress.getPort())
+                        .build())
+                .setMessageType(CLUSTER_ACTOR_MESSAGE)
+                .setPayload(ByteString.copyFrom(encode(msg))).build();
+
+    }
+}
diff --git a/application/src/main/java/org/thingsboard/server/service/rpc/DefaultDeviceRpcService.java b/application/src/main/java/org/thingsboard/server/service/rpc/DefaultDeviceRpcService.java
index 9674490..6a1a69a 100644
--- a/application/src/main/java/org/thingsboard/server/service/rpc/DefaultDeviceRpcService.java
+++ b/application/src/main/java/org/thingsboard/server/service/rpc/DefaultDeviceRpcService.java
@@ -30,6 +30,8 @@ import org.thingsboard.server.common.data.id.EntityId;
 import org.thingsboard.server.common.data.id.TenantId;
 import org.thingsboard.server.common.data.id.UUIDBased;
 import org.thingsboard.server.common.data.rpc.ToDeviceRpcRequestBody;
+import org.thingsboard.server.common.msg.TbActorMsg;
+import org.thingsboard.server.common.msg.cluster.SendToClusterMsg;
 import org.thingsboard.server.common.msg.cluster.ServerAddress;
 import org.thingsboard.server.common.msg.core.ToServerRpcResponseMsg;
 import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest;
@@ -38,6 +40,7 @@ import org.thingsboard.server.dao.audit.AuditLogService;
 import org.thingsboard.server.extensions.api.device.ToDeviceActorNotificationMsg;
 import org.thingsboard.server.extensions.api.plugins.msg.FromDeviceRpcResponse;
 import org.thingsboard.server.extensions.api.plugins.msg.RpcError;
+import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
 import org.thingsboard.server.service.cluster.routing.ClusterRoutingService;
 import org.thingsboard.server.service.cluster.rpc.ClusterRpcService;
 import org.thingsboard.server.service.security.model.SecurityUser;
@@ -135,23 +138,16 @@ public class DefaultDeviceRpcService implements DeviceRpcService {
     @Override
     public void sendRpcReplyToDevice(TenantId tenantId, DeviceId deviceId, int requestId, String body) {
         ToServerRpcResponseActorMsg rpcMsg = new ToServerRpcResponseActorMsg(tenantId, deviceId, new ToServerRpcResponseMsg(requestId, body));
-        forward(deviceId, rpcMsg, rpcService::tell);
+        forward(deviceId, rpcMsg);
     }
 
     private void sendRpcRequest(ToDeviceRpcRequest msg) {
         log.trace("[{}] Forwarding msg {} to device actor!", msg.getDeviceId(), msg);
         ToDeviceRpcRequestActorMsg rpcMsg = new ToDeviceRpcRequestActorMsg(msg);
-        forward(msg.getDeviceId(), rpcMsg, rpcService::tell);
+        forward(msg.getDeviceId(), rpcMsg);
     }
 
-    private <T extends ToDeviceActorNotificationMsg> void forward(DeviceId deviceId, T msg, BiConsumer<ServerAddress, T> rpcFunction) {
-        Optional<ServerAddress> instance = routingService.resolveById(deviceId);
-        if (instance.isPresent()) {
-            log.trace("[{}] Forwarding msg {} to remote device actor!", msg.getTenantId(), msg);
-            rpcFunction.accept(instance.get(), msg);
-        } else {
-            log.trace("[{}] Forwarding msg {} to local device actor!", msg.getTenantId(), msg);
-            actorService.onMsg(msg);
-        }
+    private <T extends ToDeviceActorNotificationMsg> void forward(DeviceId deviceId, T msg) {
+        actorService.onMsg(new SendToClusterMsg(deviceId, msg));
     }
 }
diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java
index 00a337a..f3cd32c 100644
--- a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java
+++ b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java
@@ -18,31 +18,42 @@ package org.thingsboard.server.service.telemetry;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.protobuf.InvalidProtocolBufferException;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.context.annotation.Lazy;
 import org.springframework.stereotype.Service;
 import org.springframework.util.StringUtils;
+import org.thingsboard.rule.engine.DonAsynchron;
 import org.thingsboard.server.common.data.DataConstants;
 import org.thingsboard.server.common.data.EntityType;
 import org.thingsboard.server.common.data.id.DeviceId;
 import org.thingsboard.server.common.data.id.EntityId;
+import org.thingsboard.server.common.data.id.EntityIdFactory;
 import org.thingsboard.server.common.data.kv.AttributeKvEntry;
 import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
+import org.thingsboard.server.common.data.kv.BaseTsKvQuery;
 import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
 import org.thingsboard.server.common.data.kv.BooleanDataEntry;
+import org.thingsboard.server.common.data.kv.DataType;
 import org.thingsboard.server.common.data.kv.DoubleDataEntry;
+import org.thingsboard.server.common.data.kv.KvEntry;
 import org.thingsboard.server.common.data.kv.LongDataEntry;
 import org.thingsboard.server.common.data.kv.StringDataEntry;
 import org.thingsboard.server.common.data.kv.TsKvEntry;
+import org.thingsboard.server.common.data.kv.TsKvQuery;
 import org.thingsboard.server.common.msg.cluster.ServerAddress;
 import org.thingsboard.server.dao.attributes.AttributesService;
 import org.thingsboard.server.dao.timeseries.TimeseriesService;
+import org.thingsboard.server.extensions.api.plugins.PluginContext;
 import org.thingsboard.server.extensions.core.plugin.telemetry.handlers.TelemetryFeature;
 import org.thingsboard.server.extensions.core.plugin.telemetry.sub.Subscription;
+import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionErrorCode;
 import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionState;
 import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionUpdate;
+import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
 import org.thingsboard.server.service.cluster.routing.ClusterRoutingService;
+import org.thingsboard.server.service.cluster.rpc.ClusterRpcService;
 import org.thingsboard.server.service.state.DefaultDeviceStateService;
 import org.thingsboard.server.service.state.DeviceStateService;
 
@@ -53,15 +64,18 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.TreeMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.Predicate;
+import java.util.stream.Collectors;
 
 /**
  * Created by ashvayka on 27.03.18.
@@ -83,6 +97,9 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio
     private ClusterRoutingService routingService;
 
     @Autowired
+    private ClusterRpcService rpcService;
+
+    @Autowired
     @Lazy
     private DeviceStateService stateService;
 
@@ -106,7 +123,6 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio
     }
 
     private final Map<EntityId, Set<Subscription>> subscriptionsByEntityId = new HashMap<>();
-
     private final Map<String, Map<Integer, Subscription>> subscriptionsByWsSessionId = new HashMap<>();
 
     @Override
@@ -117,7 +133,7 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio
             ServerAddress address = server.get();
             log.trace("[{}] Forwarding subscription [{}] for device [{}] to [{}]", sessionId, sub.getSubscriptionId(), entityId, address);
             subscription = new Subscription(sub, true, address);
-//            rpcHandler.onNewSubscription(ctx, address, sessionId, subscription);
+            tellNewSubscription(address, sessionId, subscription);
         } else {
             log.trace("[{}] Registering local subscription [{}] for device [{}]", sessionId, sub.getSubscriptionId(), entityId);
             subscription = new Subscription(sub, true);
@@ -189,6 +205,174 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio
                 , System.currentTimeMillis())), callback);
     }
 
+    @Override
+    public void onNewRemoteSubscription(ServerAddress serverAddress, byte[] data) {
+        ClusterAPIProtos.SubscriptionProto proto;
+        try {
+            proto = ClusterAPIProtos.SubscriptionProto.parseFrom(data);
+        } catch (InvalidProtocolBufferException e) {
+            throw new RuntimeException(e);
+        }
+        Map<String, Long> statesMap = proto.getKeyStatesList().stream().collect(
+                Collectors.toMap(ClusterAPIProtos.SubscriptionKetStateProto::getKey, ClusterAPIProtos.SubscriptionKetStateProto::getTs));
+        Subscription subscription = new Subscription(
+                new SubscriptionState(proto.getSessionId(), proto.getSubscriptionId(),
+                        EntityIdFactory.getByTypeAndId(proto.getEntityType(), proto.getEntityId()),
+                        TelemetryFeature.valueOf(proto.getType()), proto.getAllKeys(), statesMap, proto.getScope()),
+                false, new ServerAddress(serverAddress.getHost(), serverAddress.getPort()));
+
+        addRemoteWsSubscription(serverAddress, proto.getSessionId(), subscription);
+    }
+
+    @Override
+    public void onRemoteSubscriptionUpdate(ServerAddress serverAddress, byte[] data) {
+        ClusterAPIProtos.SubscriptionUpdateProto proto;
+        try {
+            proto = ClusterAPIProtos.SubscriptionUpdateProto.parseFrom(data);
+        } catch (InvalidProtocolBufferException e) {
+            throw new RuntimeException(e);
+        }
+        SubscriptionUpdate update = convert(proto);
+        String sessionId = proto.getSessionId();
+        log.trace("[{}] Processing remote subscription onUpdate [{}]", sessionId, update);
+        Optional<Subscription> subOpt = getSubscription(sessionId, update.getSubscriptionId());
+        if (subOpt.isPresent()) {
+            updateSubscriptionState(sessionId, subOpt.get(), update);
+            wsService.sendWsMsg(sessionId, update);
+        }
+    }
+
+    @Override
+    public void onRemoteSubscriptionClose(ServerAddress serverAddress, byte[] data) {
+        ClusterAPIProtos.SubscriptionCloseProto proto;
+        try {
+            proto = ClusterAPIProtos.SubscriptionCloseProto.parseFrom(data);
+        } catch (InvalidProtocolBufferException e) {
+            throw new RuntimeException(e);
+        }
+        removeSubscription(proto.getSessionId(), proto.getSubscriptionId());
+    }
+
+    @Override
+    public void onRemoteSessionClose(ServerAddress serverAddress, byte[] data) {
+        ClusterAPIProtos.SessionCloseProto proto;
+        try {
+            proto = ClusterAPIProtos.SessionCloseProto.parseFrom(data);
+        } catch (InvalidProtocolBufferException e) {
+            throw new RuntimeException(e);
+        }
+        cleanupRemoteWsSessionSubscriptions(proto.getSessionId());
+    }
+
+    @Override
+    public void onRemoteAttributesUpdate(ServerAddress serverAddress, byte[] data) {
+        ClusterAPIProtos.AttributeUpdateProto proto;
+        try {
+            proto = ClusterAPIProtos.AttributeUpdateProto.parseFrom(data);
+        } catch (InvalidProtocolBufferException e) {
+            throw new RuntimeException(e);
+        }
+        onAttributesUpdate(EntityIdFactory.getByTypeAndId(proto.getEntityType(), proto.getEntityId()), proto.getScope(),
+                proto.getDataList().stream().map(this::toAttribute).collect(Collectors.toList()));
+    }
+
+    @Override
+    public void onRemoteTsUpdate(ServerAddress serverAddress, byte[] data) {
+        ClusterAPIProtos.TimeseriesUpdateProto proto;
+        try {
+            proto = ClusterAPIProtos.TimeseriesUpdateProto.parseFrom(data);
+        } catch (InvalidProtocolBufferException e) {
+            throw new RuntimeException(e);
+        }
+        onTimeseriesUpdate(EntityIdFactory.getByTypeAndId(proto.getEntityType(), proto.getEntityId()),
+                proto.getDataList().stream().map(this::toTimeseries).collect(Collectors.toList()));
+    }
+
+    @Override
+    public void onClusterUpdate() {
+        log.trace("Processing cluster onUpdate msg!");
+        Iterator<Map.Entry<EntityId, Set<Subscription>>> deviceIterator = subscriptionsByEntityId.entrySet().iterator();
+        while (deviceIterator.hasNext()) {
+            Map.Entry<EntityId, Set<Subscription>> e = deviceIterator.next();
+            Set<Subscription> subscriptions = e.getValue();
+            Optional<ServerAddress> newAddressOptional = routingService.resolveById(e.getKey());
+            if (newAddressOptional.isPresent()) {
+                newAddressOptional.ifPresent(serverAddress -> checkSubsciptionsNewAddress(serverAddress, subscriptions));
+            } else {
+                checkSubsciptionsPrevAddress(subscriptions);
+            }
+            if (subscriptions.size() == 0) {
+                log.trace("[{}] No more subscriptions for this device on current server.", e.getKey());
+                deviceIterator.remove();
+            }
+        }
+    }
+
+    private void checkSubsciptionsNewAddress(ServerAddress newAddress, Set<Subscription> subscriptions) {
+        Iterator<Subscription> subscriptionIterator = subscriptions.iterator();
+        while (subscriptionIterator.hasNext()) {
+            Subscription s = subscriptionIterator.next();
+            if (s.isLocal()) {
+                if (!newAddress.equals(s.getServer())) {
+                    log.trace("[{}] Local subscription is now handled on new server [{}]", s.getWsSessionId(), newAddress);
+                    s.setServer(newAddress);
+                    tellNewSubscription(newAddress, s.getWsSessionId(), s);
+                }
+            } else {
+                log.trace("[{}] Remote subscription is now handled on new server address: [{}]", s.getWsSessionId(), newAddress);
+                subscriptionIterator.remove();
+                //TODO: onUpdate state of subscription by WsSessionId and other maps.
+            }
+        }
+    }
+
+    private void checkSubsciptionsPrevAddress(Set<Subscription> subscriptions) {
+        for (Subscription s : subscriptions) {
+            if (s.isLocal() && s.getServer() != null) {
+                log.trace("[{}] Local subscription is no longer handled on remote server address [{}]", s.getWsSessionId(), s.getServer());
+                s.setServer(null);
+            } else {
+                log.trace("[{}] Remote subscription is on up to date server address.", s.getWsSessionId());
+            }
+        }
+    }
+
+    private void addRemoteWsSubscription(ServerAddress address, String sessionId, Subscription subscription) {
+        EntityId entityId = subscription.getEntityId();
+        log.trace("[{}] Registering remote subscription [{}] for device [{}] to [{}]", sessionId, subscription.getSubscriptionId(), entityId, address);
+        registerSubscription(sessionId, entityId, subscription);
+        if (subscription.getType() == TelemetryFeature.ATTRIBUTES) {
+            final Map<String, Long> keyStates = subscription.getKeyStates();
+            DonAsynchron.withCallback(attrService.find(entityId, DataConstants.CLIENT_SCOPE, keyStates.keySet()), values -> {
+                        List<TsKvEntry> missedUpdates = new ArrayList<>();
+                        values.forEach(latestEntry -> {
+                            if (latestEntry.getLastUpdateTs() > keyStates.get(latestEntry.getKey())) {
+                                missedUpdates.add(new BasicTsKvEntry(latestEntry.getLastUpdateTs(), latestEntry));
+                            }
+                        });
+                        if (!missedUpdates.isEmpty()) {
+                            tellRemoteSubUpdate(address, sessionId, new SubscriptionUpdate(subscription.getSubscriptionId(), missedUpdates));
+                        }
+                    },
+                    e -> log.error("Failed to fetch missed updates.", e), tsCallBackExecutor);
+        } else if (subscription.getType() == TelemetryFeature.TIMESERIES) {
+            long curTs = System.currentTimeMillis();
+            List<TsKvQuery> queries = new ArrayList<>();
+            subscription.getKeyStates().entrySet().forEach(e -> {
+                queries.add(new BaseTsKvQuery(e.getKey(), e.getValue() + 1L, curTs));
+            });
+
+            DonAsynchron.withCallback(tsService.findAll(entityId, queries),
+                    missedUpdates -> {
+                        if (!missedUpdates.isEmpty()) {
+                            tellRemoteSubUpdate(address, sessionId, new SubscriptionUpdate(subscription.getSubscriptionId(), missedUpdates));
+                        }
+                    },
+                    e -> log.error("Failed to fetch missed updates.", e),
+                    tsCallBackExecutor);
+        }
+    }
+
     private void onAttributesUpdate(EntityId entityId, String scope, List<AttributeKvEntry> attributes) {
         Optional<ServerAddress> serverAddress = routingService.resolveById(entityId);
         if (!serverAddress.isPresent()) {
@@ -201,7 +385,7 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio
                 }
             }
         } else {
-//            rpcHandler.onAttributesUpdate(ctx, serverAddress.get(), entityId, entries);
+            tellRemoteAttributesUpdate(serverAddress.get(), entityId, scope, attributes);
         }
     }
 
@@ -210,7 +394,7 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio
         if (!serverAddress.isPresent()) {
             onLocalTimeseriesUpdate(entityId, ts);
         } else {
-//            rpcHandler.onTimeseriesUpdate(ctx, serverAddress.get(), entityId, entries);
+            tellRemoteTimeseriesUpdate(serverAddress.get(), entityId, ts);
         }
     }
 
@@ -256,8 +440,7 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio
                         updateSubscriptionState(sessionId, s, update);
                         wsService.sendWsMsg(sessionId, update);
                     } else {
-                        //TODO: ashvayka
-//                        rpcHandler.onSubscriptionUpdate(ctx, s.getServer(), sessionId, update);
+                        tellRemoteSubUpdate(s.getServer(), sessionId, update);
                     }
                 }
             });
@@ -278,11 +461,11 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio
         sessionSubscriptions.put(subscription.getSubscriptionId(), subscription);
     }
 
-    public void cleanupLocalWsSessionSubscriptions(String sessionId) {
+    private void cleanupLocalWsSessionSubscriptions(String sessionId) {
         cleanupWsSessionSubscriptions(sessionId, true);
     }
 
-    public void cleanupRemoteWsSessionSubscriptions(String sessionId) {
+    private void cleanupRemoteWsSessionSubscriptions(String sessionId) {
         cleanupWsSessionSubscriptions(sessionId, false);
     }
 
@@ -320,14 +503,14 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio
         }
         for (ServerAddress address : affectedServers) {
             log.debug("[{}] Going to onSubscriptionUpdate [{}] server about session close event", sessionId, address);
-//            rpcHandler.onSessionClose(ctx, address, sessionId);
+            tellRemoteSessionClose(address, sessionId);
         }
     }
 
     private void processSubscriptionRemoval(String sessionId, Map<Integer, Subscription> sessionSubscriptions, Subscription subscription) {
         EntityId entityId = subscription.getEntityId();
         if (subscription.isLocal() && subscription.getServer() != null) {
-//            rpcHandler.onSubscriptionClose(ctx, subscription.getServer(), sessionId, subscription.getSubscriptionId());
+            tellRemoteSubClose(subscription.getServer(), sessionId, subscription.getSubscriptionId());
         }
         if (sessionSubscriptions.isEmpty()) {
             log.debug("[{}] Removed last subscription for particular session.", sessionId);
@@ -379,4 +562,151 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio
             }
         }, wsCallBackExecutor);
     }
+
+    private void tellNewSubscription(ServerAddress address, String sessionId, Subscription sub) {
+        ClusterAPIProtos.SubscriptionProto.Builder builder = ClusterAPIProtos.SubscriptionProto.newBuilder();
+        builder.setSessionId(sessionId);
+        builder.setSubscriptionId(sub.getSubscriptionId());
+        builder.setEntityType(sub.getEntityId().getEntityType().name());
+        builder.setEntityId(sub.getEntityId().getId().toString());
+        builder.setType(sub.getType().name());
+        builder.setAllKeys(sub.isAllKeys());
+        builder.setScope(sub.getScope());
+        sub.getKeyStates().entrySet().forEach(e -> builder.addKeyStates(
+                ClusterAPIProtos.SubscriptionKetStateProto.newBuilder().setKey(e.getKey()).setTs(e.getValue()).build()));
+        rpcService.tell(address, ClusterAPIProtos.MessageType.CLUSTER_TELEMETRY_SUBSCRIPTION_CREATE_MESSAGE, builder.build().toByteArray());
+    }
+
+    private void tellRemoteSubUpdate(ServerAddress address, String sessionId, SubscriptionUpdate update) {
+        ClusterAPIProtos.SubscriptionUpdateProto.Builder builder = ClusterAPIProtos.SubscriptionUpdateProto.newBuilder();
+        builder.setSessionId(sessionId);
+        builder.setSubscriptionId(update.getSubscriptionId());
+        builder.setErrorCode(update.getErrorCode());
+        if (update.getErrorMsg() != null) {
+            builder.setErrorMsg(update.getErrorMsg());
+        }
+        update.getData().entrySet().forEach(
+                e -> {
+                    ClusterAPIProtos.SubscriptionUpdateValueListProto.Builder dataBuilder = ClusterAPIProtos.SubscriptionUpdateValueListProto.newBuilder();
+
+                    dataBuilder.setKey(e.getKey());
+                    e.getValue().forEach(v -> {
+                        Object[] array = (Object[]) v;
+                        dataBuilder.addTs((long) array[0]);
+                        dataBuilder.addValue((String) array[1]);
+                    });
+
+                    builder.addData(dataBuilder.build());
+                }
+        );
+        rpcService.tell(address, ClusterAPIProtos.MessageType.CLUSTER_TELEMETRY_SUBSCRIPTION_UPDATE_MESSAGE, builder.build().toByteArray());
+    }
+
+    private void tellRemoteAttributesUpdate(ServerAddress address, EntityId entityId, String scope, List<AttributeKvEntry> attributes) {
+        ClusterAPIProtos.AttributeUpdateProto.Builder builder = ClusterAPIProtos.AttributeUpdateProto.newBuilder();
+        builder.setEntityId(entityId.getId().toString());
+        builder.setEntityType(entityId.getEntityType().name());
+        builder.setScope(scope);
+        attributes.forEach(v -> builder.addData(toKeyValueProto(v.getLastUpdateTs(), v).build()));
+        rpcService.tell(address, ClusterAPIProtos.MessageType.CLUSTER_TELEMETRY_ATTR_UPDATE_MESSAGE, builder.build().toByteArray());
+    }
+
+    private void tellRemoteTimeseriesUpdate(ServerAddress address, EntityId entityId, List<TsKvEntry> ts) {
+        ClusterAPIProtos.TimeseriesUpdateProto.Builder builder = ClusterAPIProtos.TimeseriesUpdateProto.newBuilder();
+        builder.setEntityId(entityId.getId().toString());
+        builder.setEntityType(entityId.getEntityType().name());
+        ts.forEach(v -> builder.addData(toKeyValueProto(v.getTs(), v).build()));
+        rpcService.tell(address, ClusterAPIProtos.MessageType.CLUSTER_TELEMETRY_TS_UPDATE_MESSAGE, builder.build().toByteArray());
+    }
+
+    private void tellRemoteSessionClose(ServerAddress address, String sessionId) {
+        ClusterAPIProtos.SessionCloseProto proto = ClusterAPIProtos.SessionCloseProto.newBuilder().setSessionId(sessionId).build();
+        rpcService.tell(address, ClusterAPIProtos.MessageType.CLUSTER_TELEMETRY_SESSION_CLOSE_MESSAGE, proto.toByteArray());
+    }
+
+    private void tellRemoteSubClose(ServerAddress address, String sessionId, int subscriptionId) {
+        ClusterAPIProtos.SubscriptionCloseProto proto = ClusterAPIProtos.SubscriptionCloseProto.newBuilder().setSessionId(sessionId).setSubscriptionId(subscriptionId).build();
+        rpcService.tell(address, ClusterAPIProtos.MessageType.CLUSTER_TELEMETRY_SUBSCRIPTION_CLOSE_MESSAGE, proto.toByteArray());
+    }
+
+    private ClusterAPIProtos.KeyValueProto.Builder toKeyValueProto(long ts, KvEntry attr) {
+        ClusterAPIProtos.KeyValueProto.Builder dataBuilder = ClusterAPIProtos.KeyValueProto.newBuilder();
+        dataBuilder.setKey(attr.getKey());
+        dataBuilder.setTs(ts);
+        dataBuilder.setValueType(attr.getDataType().ordinal());
+        switch (attr.getDataType()) {
+            case BOOLEAN:
+                Optional<Boolean> booleanValue = attr.getBooleanValue();
+                booleanValue.ifPresent(dataBuilder::setBoolValue);
+                break;
+            case LONG:
+                Optional<Long> longValue = attr.getLongValue();
+                longValue.ifPresent(dataBuilder::setLongValue);
+                break;
+            case DOUBLE:
+                Optional<Double> doubleValue = attr.getDoubleValue();
+                doubleValue.ifPresent(dataBuilder::setDoubleValue);
+                break;
+            case STRING:
+                Optional<String> stringValue = attr.getStrValue();
+                stringValue.ifPresent(dataBuilder::setStrValue);
+                break;
+        }
+        return dataBuilder;
+    }
+
+    private AttributeKvEntry toAttribute(ClusterAPIProtos.KeyValueProto proto) {
+        return new BaseAttributeKvEntry(getKvEntry(proto), proto.getTs());
+    }
+
+    private TsKvEntry toTimeseries(ClusterAPIProtos.KeyValueProto proto) {
+        return new BasicTsKvEntry(proto.getTs(), getKvEntry(proto));
+    }
+
+    private KvEntry getKvEntry(ClusterAPIProtos.KeyValueProto proto) {
+        KvEntry entry = null;
+        DataType type = DataType.values()[proto.getValueType()];
+        switch (type) {
+            case BOOLEAN:
+                entry = new BooleanDataEntry(proto.getKey(), proto.getBoolValue());
+                break;
+            case LONG:
+                entry = new LongDataEntry(proto.getKey(), proto.getLongValue());
+                break;
+            case DOUBLE:
+                entry = new DoubleDataEntry(proto.getKey(), proto.getDoubleValue());
+                break;
+            case STRING:
+                entry = new StringDataEntry(proto.getKey(), proto.getStrValue());
+                break;
+        }
+        return entry;
+    }
+
+    private SubscriptionUpdate convert(ClusterAPIProtos.SubscriptionUpdateProto proto) {
+        if (proto.getErrorCode() > 0) {
+            return new SubscriptionUpdate(proto.getSubscriptionId(), SubscriptionErrorCode.forCode(proto.getErrorCode()), proto.getErrorMsg());
+        } else {
+            Map<String, List<Object>> data = new TreeMap<>();
+            proto.getDataList().forEach(v -> {
+                List<Object> values = data.computeIfAbsent(v.getKey(), k -> new ArrayList<>());
+                for (int i = 0; i < v.getTsCount(); i++) {
+                    Object[] value = new Object[2];
+                    value[0] = v.getTs(i);
+                    value[1] = v.getValue(i);
+                    values.add(value);
+                }
+            });
+            return new SubscriptionUpdate(proto.getSubscriptionId(), data);
+        }
+    }
+
+    private Optional<Subscription> getSubscription(String sessionId, int subscriptionId) {
+        Subscription state = null;
+        Map<Integer, Subscription> subMap = subscriptionsByWsSessionId.get(sessionId);
+        if (subMap != null) {
+            state = subMap.get(subscriptionId);
+        }
+        return Optional.ofNullable(state);
+    }
 }
diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetryWebSocketService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetryWebSocketService.java
index 57f3876..3ea0cee 100644
--- a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetryWebSocketService.java
+++ b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetryWebSocketService.java
@@ -21,7 +21,7 @@ import com.google.common.base.Function;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
-import com.hazelcast.util.function.Consumer;
+import java.util.function.Consumer;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/TelemetrySubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/TelemetrySubscriptionService.java
index 923d06b..964c494 100644
--- a/application/src/main/java/org/thingsboard/server/service/telemetry/TelemetrySubscriptionService.java
+++ b/application/src/main/java/org/thingsboard/server/service/telemetry/TelemetrySubscriptionService.java
@@ -17,7 +17,10 @@ package org.thingsboard.server.service.telemetry;
 
 import org.thingsboard.rule.engine.api.RuleEngineTelemetryService;
 import org.thingsboard.server.common.data.id.EntityId;
+import org.thingsboard.server.common.msg.cluster.ServerAddress;
+import org.thingsboard.server.extensions.api.plugins.PluginContext;
 import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionState;
+import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
 
 /**
  * Created by ashvayka on 27.03.18.
@@ -30,4 +33,17 @@ public interface TelemetrySubscriptionService extends RuleEngineTelemetryService
 
     void removeSubscription(String sessionId, int cmdId);
 
+    void onNewRemoteSubscription(ServerAddress serverAddress, byte[] data);
+
+    void onRemoteSubscriptionUpdate(ServerAddress serverAddress, byte[] bytes);
+
+    void onRemoteSubscriptionClose(ServerAddress serverAddress, byte[] bytes);
+
+    void onRemoteSessionClose(ServerAddress serverAddress, byte[] bytes);
+
+    void onRemoteAttributesUpdate(ServerAddress serverAddress, byte[] bytes);
+
+    void onRemoteTsUpdate(ServerAddress serverAddress, byte[] bytes);
+
+    void onClusterUpdate();
 }
diff --git a/application/src/main/proto/cluster.proto b/application/src/main/proto/cluster.proto
index 3bb0db7..90917e1 100644
--- a/application/src/main/proto/cluster.proto
+++ b/application/src/main/proto/cluster.proto
@@ -19,76 +19,105 @@ package cluster;
 option java_package = "org.thingsboard.server.gen.cluster";
 option java_outer_classname = "ClusterAPIProtos";
 
+service ClusterRpcService {
+  rpc handleMsgs(stream ClusterMessage) returns (stream ClusterMessage) {}
+}
+message ClusterMessage {
+  MessageType messageType = 1;
+  MessageMataInfo messageMetaInfo = 2;
+  ServerAddress serverAddress = 3;
+  bytes payload = 4;
+}
+
 message ServerAddress {
   string host = 1;
   int32 port = 2;
 }
 
-message Uid {
-  sint64 pluginUuidMsb = 1;
-  sint64 pluginUuidLsb = 2;
+message MessageMataInfo {
+  string payloadMetaInfo = 1;
+  repeated string tags = 2;
 }
 
-message PluginAddress {
-  Uid pluginId = 1;
-  Uid tenantId = 2;
+enum MessageType {
+
+  //Cluster control messages
+  RPC_SESSION_CREATE_REQUEST_MSG = 0;
+  TO_ALL_NODES_MSG = 1;
+  RPC_SESSION_TELL_MSG = 2;
+  RPC_BROADCAST_MSG = 3;
+  CONNECT_RPC_MESSAGE =4;
+
+  CLUSTER_ACTOR_MESSAGE = 5;
+  // Messages related to TelemetrySubscriptionService
+  CLUSTER_TELEMETRY_SUBSCRIPTION_CREATE_MESSAGE = 6;
+  CLUSTER_TELEMETRY_SUBSCRIPTION_UPDATE_MESSAGE = 7;
+  CLUSTER_TELEMETRY_SUBSCRIPTION_CLOSE_MESSAGE = 8;
+  CLUSTER_TELEMETRY_SESSION_CLOSE_MESSAGE = 9;
+  CLUSTER_TELEMETRY_ATTR_UPDATE_MESSAGE = 10;
+  CLUSTER_TELEMETRY_TS_UPDATE_MESSAGE = 11;
 }
 
-message ToPluginRpcMessage {
-  PluginAddress address = 1;
-  int32 clazz = 2;
-  bytes data = 3;
+// Messages related to CLUSTER_TELEMETRY_MESSAGE
+message SubscriptionProto {
+  string sessionId = 1;
+  int32 subscriptionId = 2;
+  string entityType = 3;
+  string entityId = 4;
+  string type = 5;
+  bool allKeys = 6;
+  repeated SubscriptionKetStateProto keyStates = 7;
+  string scope = 8;
 }
 
-message ToDeviceActorRpcMessage {
-  bytes data = 1;
+message SubscriptionUpdateProto {
+    string sessionId = 1;
+    int32 subscriptionId = 2;
+    int32 errorCode = 3;
+    string errorMsg = 4;
+    repeated SubscriptionUpdateValueListProto data = 5;
 }
 
-message ToDeviceSessionActorRpcMessage {
-  bytes data = 1;
+message AttributeUpdateProto {
+    string entityType = 1;
+    string entityId = 2;
+    string scope = 3;
+    repeated KeyValueProto data = 4;
 }
 
-message ToDeviceActorNotificationRpcMessage {
-  bytes data = 1;
+message TimeseriesUpdateProto {
+    string entityType = 1;
+    string entityId = 2;
+    repeated KeyValueProto data = 4;
 }
 
-message ToAllNodesRpcMessage {
-  bytes data = 1;
+message SessionCloseProto {
+    string sessionId = 1;
 }
 
-message ConnectRpcMessage {
-  ServerAddress serverAddress = 1;
+message SubscriptionCloseProto {
+    string sessionId = 1;
+    int32 subscriptionId = 2;
 }
 
-message ToDeviceRpcRequestRpcMessage {
-  Uid deviceTenantId = 2;
-  Uid deviceId = 3;
-
-  Uid msgId = 4;
-  bool oneway = 5;
-  int64 expTime = 6;
-  string method = 7;
-  string params = 8;
-}
-
-message ToPluginRpcResponseRpcMessage {
-  Uid msgId = 2;
-  string response = 3;
-  string error = 4;
+message SubscriptionKetStateProto {
+    string key = 1;
+    int64 ts = 2;
 }
 
-message ToRpcServerMessage {
-  ConnectRpcMessage connectMsg = 1;
-  ToPluginRpcMessage toPluginRpcMsg = 2;
-  ToDeviceActorRpcMessage toDeviceActorRpcMsg = 3;
-  ToDeviceSessionActorRpcMessage toDeviceSessionActorRpcMsg = 4;
-  ToDeviceActorNotificationRpcMessage toDeviceActorNotificationRpcMsg = 5;
-  ToAllNodesRpcMessage toAllNodesRpcMsg = 6;
-  ToDeviceRpcRequestRpcMessage toDeviceRpcRequestRpcMsg = 7;
-  ToPluginRpcResponseRpcMessage toPluginRpcResponseRpcMsg = 8;
+message SubscriptionUpdateValueListProto {
+    string key = 1;
+    repeated int64 ts = 2;
+    repeated string value = 3;
 }
 
-service ClusterRpcService {
-  rpc handlePluginMsgs(stream ToRpcServerMessage) returns (stream ToRpcServerMessage) {}
+message KeyValueProto {
+    string key = 1;
+    int64 ts = 2;
+    int32 valueType = 3;
+    string strValue = 4;
+    int64 longValue = 5;
+    double doubleValue = 6;
+    bool boolValue = 7;
 }
 
diff --git a/application/src/main/resources/logback.xml b/application/src/main/resources/logback.xml
index 1779912..978a570 100644
--- a/application/src/main/resources/logback.xml
+++ b/application/src/main/resources/logback.xml
@@ -25,7 +25,7 @@
         </encoder>
     </appender>
 
-    <logger name="org.thingsboard.server" level="TRACE" />
+    <logger name="org.thingsboard.server" level="INFO" />
     <logger name="akka" level="INFO" />
 
     <root level="INFO">
diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml
index dda6e15..2be73d4 100644
--- a/application/src/main/resources/thingsboard.yml
+++ b/application/src/main/resources/thingsboard.yml
@@ -58,6 +58,8 @@ cluster:
   hash_function_name: "${CLUSTER_HASH_FUNCTION_NAME:murmur3_128}"
   # Amount of virtual nodes in consistent hash ring.
   vitrual_nodes_size: "${CLUSTER_VIRTUAL_NODES_SIZE:16}"
+  # Queue partition id for current node
+  partition_id: "${QUEUE_PARTITION_ID:0}"
 
 # Plugins configuration parameters
 plugins:
@@ -106,7 +108,7 @@ mqtt:
 # CoAP server parameters
 coap:
   # Enable/disable coap transport protocol.
-  enabled: "${COAP_ENABLED:true}"
+  enabled: "${COAP_ENABLED:false}"
   bind_address: "${COAP_BIND_ADDRESS:0.0.0.0}"
   bind_port: "${COAP_BIND_PORT:5683}"
   adaptor:  "${COAP_ADAPTOR_NAME:JsonCoapAdaptor}"
@@ -312,6 +314,7 @@ rule:
     #Message queue cleanup period in seconds
     cleanup_period: "${RULE_QUEUE_CLEANUP_PERIOD:3600}"
 
+
 # PostgreSQL DAO Configuration
 #spring:
 #  data:
diff --git a/application/src/test/java/org/thingsboard/server/mqtt/DbConfigurationTestRule.java b/application/src/test/java/org/thingsboard/server/mqtt/DbConfigurationTestRule.java
new file mode 100644
index 0000000..4b71cae
--- /dev/null
+++ b/application/src/test/java/org/thingsboard/server/mqtt/DbConfigurationTestRule.java
@@ -0,0 +1,31 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.mqtt;
+
+import org.junit.rules.TestRule;
+import org.junit.runner.Description;
+import org.junit.runners.model.Statement;
+
+/**
+ * Created by ashvayka on 11.05.18.
+ */
+public class DbConfigurationTestRule implements TestRule {
+
+    @Override
+    public Statement apply(Statement base, Description description) {
+        return null;
+    }
+}
diff --git a/base-docker-compose.yml b/base-docker-compose.yml
new file mode 100644
index 0000000..3aa5bb5
--- /dev/null
+++ b/base-docker-compose.yml
@@ -0,0 +1,45 @@
+#
+# Copyright © 2016-2018 The Thingsboard Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+version: '3.3'
+services:
+  zookeeper:
+    image: wurstmeister/zookeeper
+    networks:
+      - core
+    ports:
+      - "2181:2181"
+
+  cassandra:
+    image: cassandra:3.11.2
+    networks:
+      - core
+    ports:
+      - "7199:7199"
+      - "9160:9160"
+      - "9042:9042"
+
+  redis:
+    image: redis:4.0
+    networks:
+      - core
+    command: redis-server --maxclients 2000
+    ports:
+      - "6379:6379"
+
+networks:
+  core:
+
diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/id/IdBased.java b/common/data/src/main/java/org/thingsboard/server/common/data/id/IdBased.java
index 6ac5136..e832807 100644
--- a/common/data/src/main/java/org/thingsboard/server/common/data/id/IdBased.java
+++ b/common/data/src/main/java/org/thingsboard/server/common/data/id/IdBased.java
@@ -17,9 +17,10 @@ package org.thingsboard.server.common.data.id;
 
 import com.fasterxml.jackson.annotation.JsonIgnore;
 
+import java.io.Serializable;
 import java.util.UUID;
 
-public abstract class IdBased<I extends UUIDBased> {
+public abstract class IdBased<I extends UUIDBased> implements Serializable {
 	
 	protected I id;
 	
diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/cluster/SendToClusterMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/cluster/SendToClusterMsg.java
new file mode 100644
index 0000000..d3bc8e9
--- /dev/null
+++ b/common/message/src/main/java/org/thingsboard/server/common/msg/cluster/SendToClusterMsg.java
@@ -0,0 +1,40 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.common.msg.cluster;
+
+import lombok.Data;
+import org.thingsboard.server.common.data.id.DeviceId;
+import org.thingsboard.server.common.data.id.EntityId;
+import org.thingsboard.server.common.msg.MsgType;
+import org.thingsboard.server.common.msg.TbActorMsg;
+
+@Data
+public class SendToClusterMsg implements TbActorMsg {
+
+    private TbActorMsg msg;
+    private EntityId entityId;
+
+    public SendToClusterMsg(EntityId entityId, TbActorMsg msg) {
+        this.entityId = entityId;
+        this.msg = msg;
+    }
+
+
+    @Override
+    public MsgType getMsgType() {
+        return MsgType.SEND_TO_CLUSTER_MSG;
+    }
+}
diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/cluster/ToAllNodesMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/cluster/ToAllNodesMsg.java
index 1dc33c0..877689d 100644
--- a/common/message/src/main/java/org/thingsboard/server/common/msg/cluster/ToAllNodesMsg.java
+++ b/common/message/src/main/java/org/thingsboard/server/common/msg/cluster/ToAllNodesMsg.java
@@ -15,10 +15,12 @@
  */
 package org.thingsboard.server.common.msg.cluster;
 
+import org.thingsboard.server.common.msg.TbActorMsg;
+
 import java.io.Serializable;
 
 /**
  * @author Andrew Shvayka
  */
-public interface ToAllNodesMsg extends Serializable {
+public interface ToAllNodesMsg extends Serializable, TbActorMsg {
 }
diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/device/BasicDeviceToDeviceActorMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/device/BasicDeviceToDeviceActorMsg.java
index 1da19cb..92c5105 100644
--- a/common/message/src/main/java/org/thingsboard/server/common/msg/device/BasicDeviceToDeviceActorMsg.java
+++ b/common/message/src/main/java/org/thingsboard/server/common/msg/device/BasicDeviceToDeviceActorMsg.java
@@ -24,7 +24,7 @@ import org.thingsboard.server.common.msg.MsgType;
 import org.thingsboard.server.common.msg.cluster.ServerAddress;
 import org.thingsboard.server.common.msg.session.FromDeviceMsg;
 import org.thingsboard.server.common.msg.session.SessionType;
-import org.thingsboard.server.common.msg.session.ToDeviceActorSessionMsg;
+import org.thingsboard.server.common.msg.session.TransportToDeviceSessionActorMsg;
 
 import java.util.Optional;
 
@@ -45,7 +45,7 @@ public class BasicDeviceToDeviceActorMsg implements DeviceToDeviceActorMsg {
         this(null, other.getTenantId(), other.getCustomerId(), other.getDeviceId(), other.getSessionId(), other.getSessionType(), msg);
     }
 
-    public BasicDeviceToDeviceActorMsg(ToDeviceActorSessionMsg msg, SessionType sessionType) {
+    public BasicDeviceToDeviceActorMsg(TransportToDeviceSessionActorMsg msg, SessionType sessionType) {
         this(null, msg.getTenantId(), msg.getCustomerId(), msg.getDeviceId(), msg.getSessionId(), sessionType, msg.getSessionMsg().getMsg());
     }
 
diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java b/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java
index d63456e..7702788 100644
--- a/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java
+++ b/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java
@@ -29,6 +29,11 @@ public enum MsgType {
     CLUSTER_EVENT_MSG,
 
     /**
+     * All messages, could be send  to cluster
+    */
+    SEND_TO_CLUSTER_MSG,
+
+    /**
      * ADDED/UPDATED/DELETED events for main entities.
      *
      * See {@link org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg}
@@ -96,6 +101,6 @@ public enum MsgType {
     /**
      * Message that is sent from Rule Engine to the Device Actor when message is successfully pushed to queue.
      */
-    RULE_ENGINE_QUEUE_PUT_ACK_MSG;
+    RULE_ENGINE_QUEUE_PUT_ACK_MSG, ACTOR_SYSTEM_TO_DEVICE_SESSION_ACTOR_MSG, TRANSPORT_TO_DEVICE_SESSION_ACTOR_MSG, SESSION_TIMEOUT_MSG, SESSION_CTRL_MSG;
 
 }
diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/plugin/ComponentLifecycleMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/plugin/ComponentLifecycleMsg.java
index c104281..2ee8432 100644
--- a/common/message/src/main/java/org/thingsboard/server/common/msg/plugin/ComponentLifecycleMsg.java
+++ b/common/message/src/main/java/org/thingsboard/server/common/msg/plugin/ComponentLifecycleMsg.java
@@ -20,7 +20,6 @@ import lombok.ToString;
 import org.thingsboard.server.common.data.EntityType;
 import org.thingsboard.server.common.data.id.*;
 import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
-import org.thingsboard.server.common.data.rule.RuleChain;
 import org.thingsboard.server.common.msg.MsgType;
 import org.thingsboard.server.common.msg.TbActorMsg;
 import org.thingsboard.server.common.msg.aware.TenantAwareMsg;
diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/session/ctrl/SessionCloseMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/session/ctrl/SessionCloseMsg.java
index a24bdcd..49fad13 100644
--- a/common/message/src/main/java/org/thingsboard/server/common/msg/session/ctrl/SessionCloseMsg.java
+++ b/common/message/src/main/java/org/thingsboard/server/common/msg/session/ctrl/SessionCloseMsg.java
@@ -16,6 +16,7 @@
 package org.thingsboard.server.common.msg.session.ctrl;
 
 import org.thingsboard.server.common.data.id.SessionId;
+import org.thingsboard.server.common.msg.MsgType;
 import org.thingsboard.server.common.msg.session.SessionCtrlMsg;
 
 public class SessionCloseMsg implements SessionCtrlMsg {
@@ -60,4 +61,8 @@ public class SessionCloseMsg implements SessionCtrlMsg {
         return timeout;
     }
 
+    @Override
+    public MsgType getMsgType() {
+        return MsgType.SESSION_CTRL_MSG;
+    }
 }
diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/session/SessionCtrlMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/session/SessionCtrlMsg.java
index 19ca219..8082f72 100644
--- a/common/message/src/main/java/org/thingsboard/server/common/msg/session/SessionCtrlMsg.java
+++ b/common/message/src/main/java/org/thingsboard/server/common/msg/session/SessionCtrlMsg.java
@@ -15,8 +15,9 @@
  */
 package org.thingsboard.server.common.msg.session;
 
+import org.thingsboard.server.common.msg.TbActorMsg;
 import org.thingsboard.server.common.msg.aware.SessionAwareMsg;
 
-public interface SessionCtrlMsg extends SessionAwareMsg {
+public interface SessionCtrlMsg extends SessionAwareMsg, TbActorMsg {
 
 }

dao/pom.xml 12(+0 -12)

diff --git a/dao/pom.xml b/dao/pom.xml
index 1d2d962..75ca13c 100644
--- a/dao/pom.xml
+++ b/dao/pom.xml
@@ -153,22 +153,10 @@
             <artifactId>curator-x-discovery</artifactId>
         </dependency>
         <dependency>
-            <groupId>com.hazelcast</groupId>
-            <artifactId>hazelcast-zookeeper</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>com.hazelcast</groupId>
-            <artifactId>hazelcast</artifactId>
-        </dependency>
-        <dependency>
             <groupId>com.github.ben-manes.caffeine</groupId>
             <artifactId>caffeine</artifactId>
         </dependency>
         <dependency>
-            <groupId>com.hazelcast</groupId>
-            <artifactId>hazelcast-spring</artifactId>
-        </dependency>
-        <dependency>
             <groupId>org.springframework.boot</groupId>
             <artifactId>spring-boot-autoconfigure</artifactId>
         </dependency>
diff --git a/dao/src/main/java/org/thingsboard/server/dao/alarm/BaseAlarmService.java b/dao/src/main/java/org/thingsboard/server/dao/alarm/BaseAlarmService.java
index c0ca37a..f84cc1d 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/alarm/BaseAlarmService.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/alarm/BaseAlarmService.java
@@ -222,15 +222,14 @@ public class BaseAlarmService extends AbstractEntityService implements AlarmServ
     public ListenableFuture<AlarmInfo> findAlarmInfoByIdAsync(AlarmId alarmId) {
         log.trace("Executing findAlarmInfoByIdAsync [{}]", alarmId);
         validateId(alarmId, "Incorrect alarmId " + alarmId);
-        return Futures.transform(alarmDao.findAlarmByIdAsync(alarmId.getId()),
-                (AsyncFunction<Alarm, AlarmInfo>) alarm1 -> {
-                    AlarmInfo alarmInfo = new AlarmInfo(alarm1);
+        return Futures.transformAsync(alarmDao.findAlarmByIdAsync(alarmId.getId()),
+                a -> {
+                    AlarmInfo alarmInfo = new AlarmInfo(a);
                     return Futures.transform(
-                            entityService.fetchEntityNameAsync(alarmInfo.getOriginator()), (Function<String, AlarmInfo>)
-                                    originatorName -> {
-                                        alarmInfo.setOriginatorName(originatorName);
-                                        return alarmInfo;
-                                    }
+                            entityService.fetchEntityNameAsync(alarmInfo.getOriginator()), originatorName -> {
+                                alarmInfo.setOriginatorName(originatorName);
+                                return alarmInfo;
+                            }
                     );
                 });
     }
@@ -239,18 +238,17 @@ public class BaseAlarmService extends AbstractEntityService implements AlarmServ
     public ListenableFuture<TimePageData<AlarmInfo>> findAlarms(AlarmQuery query) {
         ListenableFuture<List<AlarmInfo>> alarms = alarmDao.findAlarms(query);
         if (query.getFetchOriginator() != null && query.getFetchOriginator().booleanValue()) {
-            alarms = Futures.transform(alarms, (AsyncFunction<List<AlarmInfo>, List<AlarmInfo>>) input -> {
+            alarms = Futures.transformAsync(alarms, input -> {
                 List<ListenableFuture<AlarmInfo>> alarmFutures = new ArrayList<>(input.size());
                 for (AlarmInfo alarmInfo : input) {
                     alarmFutures.add(Futures.transform(
-                            entityService.fetchEntityNameAsync(alarmInfo.getOriginator()), (Function<String, AlarmInfo>)
-                                    originatorName -> {
-                                        if (originatorName == null) {
-                                            originatorName = "Deleted";
-                                        }
-                                        alarmInfo.setOriginatorName(originatorName);
-                                        return alarmInfo;
-                                    }
+                            entityService.fetchEntityNameAsync(alarmInfo.getOriginator()), originatorName -> {
+                                if (originatorName == null) {
+                                    originatorName = "Deleted";
+                                }
+                                alarmInfo.setOriginatorName(originatorName);
+                                return alarmInfo;
+                            }
                     ));
                 }
                 return Futures.successfulAsList(alarmFutures);
diff --git a/dao/src/main/java/org/thingsboard/server/dao/alarm/CassandraAlarmDao.java b/dao/src/main/java/org/thingsboard/server/dao/alarm/CassandraAlarmDao.java
index 1233c7f..6785f2e 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/alarm/CassandraAlarmDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/alarm/CassandraAlarmDao.java
@@ -102,12 +102,12 @@ public class CassandraAlarmDao extends CassandraAbstractModelDao<AlarmEntity, Al
         }
         String relationType = BaseAlarmService.ALARM_RELATION_PREFIX + searchStatusName;
         ListenableFuture<List<EntityRelation>> relations = relationDao.findRelations(affectedEntity, relationType, RelationTypeGroup.ALARM, EntityType.ALARM, query.getPageLink());
-        return Futures.transform(relations, (AsyncFunction<List<EntityRelation>, List<AlarmInfo>>) input -> {
+        return Futures.transformAsync(relations, input -> {
             List<ListenableFuture<AlarmInfo>> alarmFutures = new ArrayList<>(input.size());
             for (EntityRelation relation : input) {
                 alarmFutures.add(Futures.transform(
                         findAlarmByIdAsync(relation.getTo().getId()),
-                        (Function<Alarm, AlarmInfo>) AlarmInfo::new));
+                        AlarmInfo::new));
             }
             return Futures.successfulAsList(alarmFutures);
         });
diff --git a/dao/src/main/java/org/thingsboard/server/dao/asset/BaseAssetService.java b/dao/src/main/java/org/thingsboard/server/dao/asset/BaseAssetService.java
index dcd9523..7bb67f0 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/asset/BaseAssetService.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/asset/BaseAssetService.java
@@ -194,10 +194,10 @@ public class BaseAssetService extends AbstractEntityService implements AssetServ
     @Override
     public ListenableFuture<List<Asset>> findAssetsByQuery(AssetSearchQuery query) {
         ListenableFuture<List<EntityRelation>> relations = relationService.findByQuery(query.toEntitySearchQuery());
-        ListenableFuture<List<Asset>> assets = Futures.transform(relations, (AsyncFunction<List<EntityRelation>, List<Asset>>) relations1 -> {
+        ListenableFuture<List<Asset>> assets = Futures.transformAsync(relations, r -> {
             EntitySearchDirection direction = query.toEntitySearchQuery().getParameters().getDirection();
             List<ListenableFuture<Asset>> futures = new ArrayList<>();
-            for (EntityRelation relation : relations1) {
+            for (EntityRelation relation : r) {
                 EntityId entityId = direction == EntitySearchDirection.FROM ? relation.getTo() : relation.getFrom();
                 if (entityId.getEntityType() == EntityType.ASSET) {
                     futures.add(findAssetByIdAsync(new AssetId(entityId.getId())));
diff --git a/dao/src/main/java/org/thingsboard/server/dao/cassandra/AbstractCassandraCluster.java b/dao/src/main/java/org/thingsboard/server/dao/cassandra/AbstractCassandraCluster.java
index ca94822..fd61976 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/cassandra/AbstractCassandraCluster.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/cassandra/AbstractCassandraCluster.java
@@ -16,10 +16,17 @@
 package org.thingsboard.server.dao.cassandra;
 
 
-import com.datastax.driver.core.*;
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.HostDistance;
+import com.datastax.driver.core.PoolingOptions;
 import com.datastax.driver.core.ProtocolOptions.Compression;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.mapping.DefaultPropertyMapper;
 import com.datastax.driver.mapping.Mapper;
+import com.datastax.driver.mapping.MappingConfiguration;
 import com.datastax.driver.mapping.MappingManager;
+import com.datastax.driver.mapping.PropertyAccessStrategy;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -149,7 +156,13 @@ public abstract class AbstractCassandraCluster {
                 } else {
                     session = cluster.connect();
                 }
-                mappingManager = new MappingManager(session);
+//                For Cassandra Driver version 3.5.0
+                DefaultPropertyMapper propertyMapper = new DefaultPropertyMapper();
+                propertyMapper.setPropertyAccessStrategy(PropertyAccessStrategy.FIELDS);
+                MappingConfiguration configuration = MappingConfiguration.builder().withPropertyMapper(propertyMapper).build();
+                mappingManager = new MappingManager(session, configuration);
+//                For Cassandra Driver version 3.0.0
+//                mappingManager = new MappingManager(session);
                 break;
             } catch (Exception e) {
                 log.warn("Failed to initialize cassandra cluster due to {}. Will retry in {} ms", e.getMessage(), initRetryInterval);
diff --git a/dao/src/main/java/org/thingsboard/server/dao/dashboard/CassandraDashboardInfoDao.java b/dao/src/main/java/org/thingsboard/server/dao/dashboard/CassandraDashboardInfoDao.java
index 8091b2a..70afed5 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/dashboard/CassandraDashboardInfoDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/dashboard/CassandraDashboardInfoDao.java
@@ -77,7 +77,7 @@ public class CassandraDashboardInfoDao extends CassandraAbstractSearchTextDao<Da
 
         ListenableFuture<List<EntityRelation>> relations = relationDao.findRelations(new CustomerId(customerId), EntityRelation.CONTAINS_TYPE, RelationTypeGroup.DASHBOARD, EntityType.DASHBOARD, pageLink);
 
-        return Futures.transform(relations, (AsyncFunction<List<EntityRelation>, List<DashboardInfo>>) input -> {
+        return Futures.transformAsync(relations, input -> {
             List<ListenableFuture<DashboardInfo>> dashboardFutures = new ArrayList<>(input.size());
             for (EntityRelation relation : input) {
                 dashboardFutures.add(findByIdAsync(relation.getTo().getId()));
diff --git a/dao/src/main/java/org/thingsboard/server/dao/device/DeviceServiceImpl.java b/dao/src/main/java/org/thingsboard/server/dao/device/DeviceServiceImpl.java
index 9120619..0d19ac1 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/device/DeviceServiceImpl.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/device/DeviceServiceImpl.java
@@ -227,10 +227,10 @@ public class DeviceServiceImpl extends AbstractEntityService implements DeviceSe
     @Override
     public ListenableFuture<List<Device>> findDevicesByQuery(DeviceSearchQuery query) {
         ListenableFuture<List<EntityRelation>> relations = relationService.findByQuery(query.toEntitySearchQuery());
-        ListenableFuture<List<Device>> devices = Futures.transform(relations, (AsyncFunction<List<EntityRelation>, List<Device>>) relations1 -> {
+        ListenableFuture<List<Device>> devices = Futures.transformAsync(relations, r -> {
             EntitySearchDirection direction = query.toEntitySearchQuery().getParameters().getDirection();
             List<ListenableFuture<Device>> futures = new ArrayList<>();
-            for (EntityRelation relation : relations1) {
+            for (EntityRelation relation : r) {
                 EntityId entityId = direction == EntitySearchDirection.FROM ? relation.getTo() : relation.getFrom();
                 if (entityId.getEntityType() == EntityType.DEVICE) {
                     futures.add(findDeviceByIdAsync(new DeviceId(entityId.getId())));
diff --git a/dao/src/main/java/org/thingsboard/server/dao/nosql/RateLimitedResultSetFuture.java b/dao/src/main/java/org/thingsboard/server/dao/nosql/RateLimitedResultSetFuture.java
index d250563..d4aef86 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/nosql/RateLimitedResultSetFuture.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/nosql/RateLimitedResultSetFuture.java
@@ -36,14 +36,14 @@ public class RateLimitedResultSetFuture implements ResultSetFuture {
     private final ListenableFuture<Void> rateLimitFuture;
 
     public RateLimitedResultSetFuture(Session session, AsyncRateLimiter rateLimiter, Statement statement) {
-        this.rateLimitFuture = Futures.withFallback(rateLimiter.acquireAsync(), t -> {
+        this.rateLimitFuture = Futures.catchingAsync(rateLimiter.acquireAsync(), Throwable.class, t -> {
             if (!(t instanceof BufferLimitException)) {
                 rateLimiter.release();
             }
             return Futures.immediateFailedFuture(t);
         });
         this.originalFuture = Futures.transform(rateLimitFuture,
-                (Function<Void, ResultSetFuture>) i -> executeAsyncWithRelease(rateLimiter, session, statement));
+                i -> executeAsyncWithRelease(rateLimiter, session, statement));
 
     }
 
diff --git a/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationService.java b/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationService.java
index a0cb1fb..7b2b391 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationService.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationService.java
@@ -16,7 +16,6 @@
 package org.thingsboard.server.dao.relation;
 
 import com.google.common.base.Function;
-import com.google.common.util.concurrent.AsyncFunction;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import lombok.extern.slf4j.Slf4j;
@@ -29,12 +28,23 @@ import org.springframework.cache.annotation.Caching;
 import org.springframework.stereotype.Service;
 import org.springframework.util.StringUtils;
 import org.thingsboard.server.common.data.id.EntityId;
-import org.thingsboard.server.common.data.relation.*;
+import org.thingsboard.server.common.data.relation.EntityRelation;
+import org.thingsboard.server.common.data.relation.EntityRelationInfo;
+import org.thingsboard.server.common.data.relation.EntityRelationsQuery;
+import org.thingsboard.server.common.data.relation.EntitySearchDirection;
+import org.thingsboard.server.common.data.relation.EntityTypeFilter;
+import org.thingsboard.server.common.data.relation.RelationTypeGroup;
+import org.thingsboard.server.common.data.relation.RelationsSearchParameters;
 import org.thingsboard.server.dao.entity.EntityService;
 import org.thingsboard.server.dao.exception.DataValidationException;
 
 import javax.annotation.Nullable;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.function.BiConsumer;
@@ -175,7 +185,7 @@ public class BaseRelationService implements RelationService {
             inboundRelationsList.add(relationDao.findAllByTo(entity, typeGroup));
         }
         ListenableFuture<List<List<EntityRelation>>> inboundRelations = Futures.allAsList(inboundRelationsList);
-        ListenableFuture<List<Boolean>> inboundDeletions = Futures.transform(inboundRelations, (List<List<EntityRelation>> relations) ->
+        ListenableFuture<List<Boolean>> inboundDeletions = Futures.transform(inboundRelations, relations ->
                 getBooleans(relations, cache, true));
 
         ListenableFuture<Boolean> inboundFuture = Futures.transform(inboundDeletions, getListToBooleanFunction());
@@ -191,8 +201,7 @@ public class BaseRelationService implements RelationService {
             outboundRelationsList.add(relationDao.findAllByFrom(entity, typeGroup));
         }
         ListenableFuture<List<List<EntityRelation>>> outboundRelations = Futures.allAsList(outboundRelationsList);
-        Futures.transform(outboundRelations, (Function<List<List<EntityRelation>>, List<Boolean>>) relations ->
-                getBooleans(relations, cache, false));
+        Futures.transform(outboundRelations, relations -> getBooleans(relations, cache, false));
 
         boolean outboundDeleteResult = relationDao.deleteOutboundRelations(entity);
         return inboundDeleteResult && outboundDeleteResult;
@@ -201,7 +210,7 @@ public class BaseRelationService implements RelationService {
     private List<Boolean> getBooleans(List<List<EntityRelation>> relations, Cache cache, boolean isRemove) {
         List<Boolean> results = new ArrayList<>();
         for (List<EntityRelation> relationList : relations) {
-            relationList.stream().forEach(relation -> checkFromDeleteSync(cache, results, relation, isRemove));
+            relationList.forEach(relation -> checkFromDeleteSync(cache, results, relation, isRemove));
         }
         return results;
     }
@@ -223,8 +232,8 @@ public class BaseRelationService implements RelationService {
             inboundRelationsList.add(relationDao.findAllByTo(entity, typeGroup));
         }
         ListenableFuture<List<List<EntityRelation>>> inboundRelations = Futures.allAsList(inboundRelationsList);
-        ListenableFuture<List<Boolean>> inboundDeletions = Futures.transform(inboundRelations,
-                (AsyncFunction<List<List<EntityRelation>>, List<Boolean>>) relations -> {
+        ListenableFuture<List<Boolean>> inboundDeletions = Futures.transformAsync(inboundRelations,
+                relations -> {
                     List<ListenableFuture<Boolean>> results = getListenableFutures(relations, cache, true);
                     return Futures.allAsList(results);
                 });
@@ -236,7 +245,7 @@ public class BaseRelationService implements RelationService {
             outboundRelationsList.add(relationDao.findAllByFrom(entity, typeGroup));
         }
         ListenableFuture<List<List<EntityRelation>>> outboundRelations = Futures.allAsList(outboundRelationsList);
-        Futures.transform(outboundRelations, (AsyncFunction<List<List<EntityRelation>>, List<Boolean>>) relations -> {
+        Futures.transformAsync(outboundRelations, relations -> {
             List<ListenableFuture<Boolean>> results = getListenableFutures(relations, cache, false);
             return Futures.allAsList(results);
         });
@@ -248,7 +257,7 @@ public class BaseRelationService implements RelationService {
     private List<ListenableFuture<Boolean>> getListenableFutures(List<List<EntityRelation>> relations, Cache cache, boolean isRemove) {
         List<ListenableFuture<Boolean>> results = new ArrayList<>();
         for (List<EntityRelation> relationList : relations) {
-            relationList.stream().forEach(relation -> checkFromDeleteAsync(cache, results, relation, isRemove));
+            relationList.forEach(relation -> checkFromDeleteAsync(cache, results, relation, isRemove));
         }
         return results;
     }
@@ -315,17 +324,16 @@ public class BaseRelationService implements RelationService {
         validate(from);
         validateTypeGroup(typeGroup);
         ListenableFuture<List<EntityRelation>> relations = relationDao.findAllByFrom(from, typeGroup);
-        ListenableFuture<List<EntityRelationInfo>> relationsInfo = Futures.transform(relations,
-                (AsyncFunction<List<EntityRelation>, List<EntityRelationInfo>>) relations1 -> {
+        return Futures.transformAsync(relations,
+                relations1 -> {
                     List<ListenableFuture<EntityRelationInfo>> futures = new ArrayList<>();
-                    relations1.stream().forEach(relation ->
+                    relations1.forEach(relation ->
                             futures.add(fetchRelationInfoAsync(relation,
-                                    relation2 -> relation2.getTo(),
-                                    (EntityRelationInfo relationInfo, String entityName) -> relationInfo.setToName(entityName)))
+                                    EntityRelation::getTo,
+                                    EntityRelationInfo::setToName))
                     );
                     return Futures.successfulAsList(futures);
                 });
-        return relationsInfo;
     }
 
     @Cacheable(cacheNames = RELATIONS_CACHE, key = "{#from, #relationType, #typeGroup}")
@@ -371,30 +379,27 @@ public class BaseRelationService implements RelationService {
         validate(to);
         validateTypeGroup(typeGroup);
         ListenableFuture<List<EntityRelation>> relations = relationDao.findAllByTo(to, typeGroup);
-        ListenableFuture<List<EntityRelationInfo>> relationsInfo = Futures.transform(relations,
-                (AsyncFunction<List<EntityRelation>, List<EntityRelationInfo>>) relations1 -> {
+        return Futures.transformAsync(relations,
+                relations1 -> {
                     List<ListenableFuture<EntityRelationInfo>> futures = new ArrayList<>();
-                    relations1.stream().forEach(relation ->
+                    relations1.forEach(relation ->
                             futures.add(fetchRelationInfoAsync(relation,
-                                    relation2 -> relation2.getFrom(),
-                                    (EntityRelationInfo relationInfo, String entityName) -> relationInfo.setFromName(entityName)))
+                                    EntityRelation::getFrom,
+                                    EntityRelationInfo::setFromName))
                     );
                     return Futures.successfulAsList(futures);
                 });
-        return relationsInfo;
     }
 
     private ListenableFuture<EntityRelationInfo> fetchRelationInfoAsync(EntityRelation relation,
                                                                         Function<EntityRelation, EntityId> entityIdGetter,
                                                                         BiConsumer<EntityRelationInfo, String> entityNameSetter) {
         ListenableFuture<String> entityName = entityService.fetchEntityNameAsync(entityIdGetter.apply(relation));
-        ListenableFuture<EntityRelationInfo> entityRelationInfo =
-                Futures.transform(entityName, (Function<String, EntityRelationInfo>) entityName1 -> {
-                    EntityRelationInfo entityRelationInfo1 = new EntityRelationInfo(relation);
-                    entityNameSetter.accept(entityRelationInfo1, entityName1);
-                    return entityRelationInfo1;
-                });
-        return entityRelationInfo;
+        return Futures.transform(entityName, entityName1 -> {
+            EntityRelationInfo entityRelationInfo1 = new EntityRelationInfo(relation);
+            entityNameSetter.accept(entityRelationInfo1, entityName1);
+            return entityRelationInfo1;
+        });
     }
 
     @Cacheable(cacheNames = RELATIONS_CACHE, key = "{#to, #relationType, #typeGroup}")
@@ -429,7 +434,7 @@ public class BaseRelationService implements RelationService {
 
         try {
             ListenableFuture<Set<EntityRelation>> relationSet = findRelationsRecursively(params.getEntityId(), params.getDirection(), maxLvl, new ConcurrentHashMap<>());
-            return Futures.transform(relationSet, (Function<Set<EntityRelation>, List<EntityRelation>>) input -> {
+            return Futures.transform(relationSet, input -> {
                 List<EntityRelation> relations = new ArrayList<>();
                 if (filters == null || filters.isEmpty()) {
                     relations.addAll(input);
@@ -453,10 +458,10 @@ public class BaseRelationService implements RelationService {
         log.trace("Executing findInfoByQuery [{}]", query);
         ListenableFuture<List<EntityRelation>> relations = findByQuery(query);
         EntitySearchDirection direction = query.getParameters().getDirection();
-        ListenableFuture<List<EntityRelationInfo>> relationsInfo = Futures.transform(relations,
-                (AsyncFunction<List<EntityRelation>, List<EntityRelationInfo>>) relations1 -> {
+        return Futures.transformAsync(relations,
+                relations1 -> {
                     List<ListenableFuture<EntityRelationInfo>> futures = new ArrayList<>();
-                    relations1.stream().forEach(relation ->
+                    relations1.forEach(relation ->
                             futures.add(fetchRelationInfoAsync(relation,
                                     relation2 -> direction == EntitySearchDirection.FROM ? relation2.getTo() : relation2.getFrom(),
                                     (EntityRelationInfo relationInfo, String entityName) -> {
@@ -469,7 +474,6 @@ public class BaseRelationService implements RelationService {
                     );
                     return Futures.successfulAsList(futures);
                 });
-        return relationsInfo;
     }
 
     protected void validate(EntityRelation relation) {
@@ -575,7 +579,7 @@ public class BaseRelationService implements RelationService {
         }
         //TODO: try to remove this blocking operation
         List<Set<EntityRelation>> relations = Futures.successfulAsList(futures).get();
-        relations.forEach(r -> r.forEach(d -> children.add(d)));
+        relations.forEach(r -> r.forEach(children::add));
         return Futures.immediateFuture(children);
     }
 
diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/alarm/JpaAlarmDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/alarm/JpaAlarmDao.java
index 0d64d5c..e092a47 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/sql/alarm/JpaAlarmDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/sql/alarm/JpaAlarmDao.java
@@ -102,12 +102,12 @@ public class JpaAlarmDao extends JpaAbstractDao<AlarmEntity, Alarm> implements A
         }
         String relationType = BaseAlarmService.ALARM_RELATION_PREFIX + searchStatusName;
         ListenableFuture<List<EntityRelation>> relations = relationDao.findRelations(affectedEntity, relationType, RelationTypeGroup.ALARM, EntityType.ALARM, query.getPageLink());
-        return Futures.transform(relations, (AsyncFunction<List<EntityRelation>, List<AlarmInfo>>) input -> {
+        return Futures.transformAsync(relations, input -> {
             List<ListenableFuture<AlarmInfo>> alarmFutures = new ArrayList<>(input.size());
             for (EntityRelation relation : input) {
                 alarmFutures.add(Futures.transform(
                         findAlarmByIdAsync(relation.getTo().getId()),
-                        (Function<Alarm, AlarmInfo>) AlarmInfo::new));
+                        AlarmInfo::new));
             }
             return Futures.successfulAsList(alarmFutures);
         });
diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/dashboard/JpaDashboardInfoDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/dashboard/JpaDashboardInfoDao.java
index 4d8d0b2..cc64e80 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/sql/dashboard/JpaDashboardInfoDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/sql/dashboard/JpaDashboardInfoDao.java
@@ -86,7 +86,7 @@ public class JpaDashboardInfoDao extends JpaAbstractSearchTextDao<DashboardInfoE
 
         ListenableFuture<List<EntityRelation>> relations = relationDao.findRelations(new CustomerId(customerId), EntityRelation.CONTAINS_TYPE, RelationTypeGroup.DASHBOARD, EntityType.DASHBOARD, pageLink);
 
-        return Futures.transform(relations, (AsyncFunction<List<EntityRelation>, List<DashboardInfo>>) input -> {
+        return Futures.transformAsync(relations, input -> {
             List<ListenableFuture<DashboardInfo>> dashboardFutures = new ArrayList<>(input.size());
             for (EntityRelation relation : input) {
                 dashboardFutures.add(findByIdAsync(relation.getTo().getId()));
diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java
index cda4b16..eb1ef52 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java
@@ -217,7 +217,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
 
         ListenableFuture<List<Long>> partitionsListFuture = Futures.transform(partitionsFuture, getPartitionsArrayFunction(), readResultsProcessingExecutor);
 
-        ListenableFuture<List<ResultSet>> aggregationChunks = Futures.transform(partitionsListFuture,
+        ListenableFuture<List<ResultSet>> aggregationChunks = Futures.transformAsync(partitionsListFuture,
                 getFetchChunksAsyncFunction(entityId, key, aggregation, startTs, endTs), readResultsProcessingExecutor);
 
         return Futures.transform(aggregationChunks, new AggregatePartitionsFunction(aggregation, key, ts), readResultsProcessingExecutor);
diff --git a/dao/src/main/resources/cassandra/schema.cql b/dao/src/main/resources/cassandra/schema.cql
index 9150587..59dda63 100644
--- a/dao/src/main/resources/cassandra/schema.cql
+++ b/dao/src/main/resources/cassandra/schema.cql
@@ -692,4 +692,4 @@ CREATE TABLE IF NOT EXISTS  thingsboard.rule_node (
     configuration text,
     additional_info text,
     PRIMARY KEY (id)
-);
\ No newline at end of file
+);
diff --git a/dao/src/test/java/org/thingsboard/server/dao/CustomCassandraCQLUnit.java b/dao/src/test/java/org/thingsboard/server/dao/CustomCassandraCQLUnit.java
index d214a70..fbed694 100644
--- a/dao/src/test/java/org/thingsboard/server/dao/CustomCassandraCQLUnit.java
+++ b/dao/src/test/java/org/thingsboard/server/dao/CustomCassandraCQLUnit.java
@@ -26,7 +26,7 @@ import com.datastax.driver.core.Session;
 import java.util.List;
 
 public class CustomCassandraCQLUnit extends BaseCassandraUnit {
-    private List<CQLDataSet> dataSets;
+    protected List<CQLDataSet> dataSets;
 
     public Session session;
     public Cluster cluster;
diff --git a/dao/src/test/resources/cassandra-test.yaml b/dao/src/test/resources/cassandra-test.yaml
index 6463f64..e60f248 100644
--- a/dao/src/test/resources/cassandra-test.yaml
+++ b/dao/src/test/resources/cassandra-test.yaml
@@ -103,6 +103,8 @@ commitlog_directory: target/embeddedCassandra/commitlog
 
 hints_directory: target/embeddedCassandra/hints
 
+cdc_raw_directory: target/embeddedCassandra/cdc
+
 # policy for data disk failures:
 # stop: shut down gossip and Thrift, leaving the node effectively dead, but
 #       can still be inspected via JMX.

pom.xml 27(+5 -22)

diff --git a/pom.xml b/pom.xml
index 297512f..dfccb7d 100755
--- a/pom.xml
+++ b/pom.xml
@@ -41,10 +41,10 @@
         <logback.version>1.2.3</logback.version>
         <mockito.version>1.9.5</mockito.version>
         <rat.version>0.10</rat.version>
-        <cassandra.version>3.0.7</cassandra.version>
-        <cassandra-unit.version>3.0.0.1</cassandra-unit.version>
+        <cassandra.version>3.5.0</cassandra.version>
+        <cassandra-unit.version>3.3.0.2</cassandra-unit.version>
         <takari-cpsuite.version>1.2.7</takari-cpsuite.version>
-        <guava.version>18.0</guava.version>
+        <guava.version>21.0</guava.version>
         <caffeine.version>2.6.1</caffeine.version>
         <commons-lang3.version>3.4</commons-lang3.version>
         <commons-validator.version>1.5.0</commons-validator.version>
@@ -59,17 +59,15 @@
         <velocity.version>1.7</velocity.version>
         <velocity-tools.version>2.0</velocity-tools.version>
         <mail.version>1.4.3</mail.version>
-        <curator.version>2.11.0</curator.version>
+        <curator.version>4.0.1</curator.version>
         <protobuf.version>3.0.2</protobuf.version>
-        <grpc.version>1.0.0</grpc.version>
+        <grpc.version>1.12.0</grpc.version>
         <lombok.version>1.16.18</lombok.version>
         <paho.client.version>1.1.0</paho.client.version>
         <netty.version>4.1.22.Final</netty.version>
         <os-maven-plugin.version>1.5.0</os-maven-plugin.version>
         <rabbitmq.version>3.6.5</rabbitmq.version>
         <kafka.version>0.9.0.0</kafka.version>
-        <hazelcast.version>3.6.6</hazelcast.version>
-        <hazelcast-zookeeper.version>3.6.1</hazelcast-zookeeper.version>
         <surfire.version>2.19.1</surfire.version>
         <jar-plugin.version>3.0.2</jar-plugin.version>
         <springfox-swagger.version>2.6.1</springfox-swagger.version>
@@ -761,26 +759,11 @@
                 <version>${paho.client.version}</version>
             </dependency>
             <dependency>
-                <groupId>com.hazelcast</groupId>
-                <artifactId>hazelcast-spring</artifactId>
-                <version>${hazelcast.version}</version>
-            </dependency>
-            <dependency>
                 <groupId>org.apache.curator</groupId>
                 <artifactId>curator-x-discovery</artifactId>
                 <version>${curator.version}</version>
             </dependency>
             <dependency>
-                <groupId>com.hazelcast</groupId>
-                <artifactId>hazelcast-zookeeper</artifactId>
-                <version>${hazelcast-zookeeper.version}</version>
-            </dependency>
-            <dependency>
-                <groupId>com.hazelcast</groupId>
-                <artifactId>hazelcast</artifactId>
-                <version>${hazelcast.version}</version>
-            </dependency>
-            <dependency>
                 <groupId>io.springfox</groupId>
                 <artifactId>springfox-swagger-ui</artifactId>
                 <version>${springfox-swagger.version}</version>
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbClearAlarmNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbClearAlarmNode.java
index d223f4d..ed54c62 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbClearAlarmNode.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbClearAlarmNode.java
@@ -56,7 +56,7 @@ public class TbClearAlarmNode extends TbAbstractAlarmNode<TbClearAlarmNodeConfig
     @Override
     protected ListenableFuture<AlarmResult> processAlarm(TbContext ctx, TbMsg msg) {
         ListenableFuture<Alarm> latest = ctx.getAlarmService().findLatestByOriginatorAndType(ctx.getTenantId(), msg.getOriginator(), config.getAlarmType());
-        return Futures.transform(latest, (AsyncFunction<Alarm, AlarmResult>) a -> {
+        return Futures.transformAsync(latest, a -> {
             if (a != null && !a.getStatus().isCleared()) {
                 return clearAlarm(ctx, msg, a);
             }
@@ -66,9 +66,9 @@ public class TbClearAlarmNode extends TbAbstractAlarmNode<TbClearAlarmNodeConfig
 
     private ListenableFuture<AlarmResult> clearAlarm(TbContext ctx, TbMsg msg, Alarm alarm) {
         ListenableFuture<JsonNode> asyncDetails = buildAlarmDetails(ctx, msg, alarm.getDetails());
-        return Futures.transform(asyncDetails, (AsyncFunction<JsonNode, AlarmResult>) details -> {
+        return Futures.transformAsync(asyncDetails, details -> {
             ListenableFuture<Boolean> clearFuture = ctx.getAlarmService().clearAlarm(alarm.getId(), details, System.currentTimeMillis());
-            return Futures.transform(clearFuture, (AsyncFunction<Boolean, AlarmResult>) cleared -> {
+            return Futures.transformAsync(clearFuture, cleared -> {
                 if (cleared && details != null) {
                     alarm.setDetails(details);
                 }
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbCreateAlarmNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbCreateAlarmNode.java
index 5c2109b..dcf9068 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbCreateAlarmNode.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbCreateAlarmNode.java
@@ -58,7 +58,7 @@ public class TbCreateAlarmNode extends TbAbstractAlarmNode<TbCreateAlarmNodeConf
     @Override
     protected ListenableFuture<AlarmResult> processAlarm(TbContext ctx, TbMsg msg) {
         ListenableFuture<Alarm> latest = ctx.getAlarmService().findLatestByOriginatorAndType(ctx.getTenantId(), msg.getOriginator(), config.getAlarmType());
-        return Futures.transform(latest, (AsyncFunction<Alarm, AlarmResult>) a -> {
+        return Futures.transformAsync(latest, a -> {
             if (a == null || a.getStatus().isCleared()) {
                 return createNewAlarm(ctx, msg);
             } else {
@@ -70,10 +70,10 @@ public class TbCreateAlarmNode extends TbAbstractAlarmNode<TbCreateAlarmNodeConf
 
     private ListenableFuture<AlarmResult> createNewAlarm(TbContext ctx, TbMsg msg) {
         ListenableFuture<Alarm> asyncAlarm = Futures.transform(buildAlarmDetails(ctx, msg, null),
-                (Function<JsonNode, Alarm>) details -> buildAlarm(msg, details, ctx.getTenantId()));
+                details -> buildAlarm(msg, details, ctx.getTenantId()));
         ListenableFuture<Alarm> asyncCreated = Futures.transform(asyncAlarm,
-                (Function<Alarm, Alarm>) alarm -> ctx.getAlarmService().createOrUpdateAlarm(alarm), ctx.getDbCallbackExecutor());
-        return Futures.transform(asyncCreated, (Function<Alarm, AlarmResult>) alarm -> new AlarmResult(true, false, false, alarm));
+                alarm -> ctx.getAlarmService().createOrUpdateAlarm(alarm), ctx.getDbCallbackExecutor());
+        return Futures.transform(asyncCreated, alarm -> new AlarmResult(true, false, false, alarm));
     }
 
     private ListenableFuture<AlarmResult> updateAlarm(TbContext ctx, TbMsg msg, Alarm alarm) {
@@ -85,7 +85,7 @@ public class TbCreateAlarmNode extends TbAbstractAlarmNode<TbCreateAlarmNodeConf
             return ctx.getAlarmService().createOrUpdateAlarm(alarm);
         }, ctx.getDbCallbackExecutor());
 
-        return Futures.transform(asyncUpdated, (Function<Alarm, AlarmResult>) a -> new AlarmResult(false, true, false, a));
+        return Futures.transform(asyncUpdated, a -> new AlarmResult(false, true, false, a));
     }
 
     private Alarm buildAlarm(TbMsg msg, JsonNode details, TenantId tenantId) {
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesCustomerIdAsyncLoader.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesCustomerIdAsyncLoader.java
index 73e1945..be72833 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesCustomerIdAsyncLoader.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesCustomerIdAsyncLoader.java
@@ -43,8 +43,7 @@ public class EntitiesCustomerIdAsyncLoader {
     }
 
     private static <T extends HasCustomerId> ListenableFuture<CustomerId> getCustomerAsync(ListenableFuture<T> future) {
-        return Futures.transform(future, (AsyncFunction<HasCustomerId, CustomerId>) in -> {
-            return in != null ? Futures.immediateFuture(in.getCustomerId())
-                    : Futures.immediateFuture(null);});
+        return Futures.transformAsync(future, in -> in != null ? Futures.immediateFuture(in.getCustomerId())
+                : Futures.immediateFuture(null));
     }
 }
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesRelatedDeviceIdAsyncLoader.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesRelatedDeviceIdAsyncLoader.java
index 8a09504..9e3a639 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesRelatedDeviceIdAsyncLoader.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesRelatedDeviceIdAsyncLoader.java
@@ -39,9 +39,8 @@ public class EntitiesRelatedDeviceIdAsyncLoader {
 
         ListenableFuture<List<Device>> asyncDevices = deviceService.findDevicesByQuery(query);
 
-        return Futures.transform(asyncDevices, (AsyncFunction<List<Device>, DeviceId>)
-                d -> CollectionUtils.isNotEmpty(d) ? Futures.immediateFuture(d.get(0).getId())
-                        : Futures.immediateFuture(null));
+        return Futures.transformAsync(asyncDevices, d -> CollectionUtils.isNotEmpty(d) ? Futures.immediateFuture(d.get(0).getId())
+                : Futures.immediateFuture(null));
     }
 
     private static DeviceSearchQuery buildQuery(EntityId originator, DeviceRelationsQuery deviceRelationsQuery) {
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesRelatedEntityIdAsyncLoader.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesRelatedEntityIdAsyncLoader.java
index 55be558..f4de8fc 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesRelatedEntityIdAsyncLoader.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesRelatedEntityIdAsyncLoader.java
@@ -38,13 +38,11 @@ public class EntitiesRelatedEntityIdAsyncLoader {
         EntityRelationsQuery query = buildQuery(originator, relationsQuery);
         ListenableFuture<List<EntityRelation>> asyncRelation = relationService.findByQuery(query);
         if (relationsQuery.getDirection() == EntitySearchDirection.FROM) {
-            return Futures.transform(asyncRelation, (AsyncFunction<? super List<EntityRelation>, EntityId>)
-                    r -> CollectionUtils.isNotEmpty(r) ? Futures.immediateFuture(r.get(0).getTo())
-                            : Futures.immediateFuture(null));
+            return Futures.transformAsync(asyncRelation, r -> CollectionUtils.isNotEmpty(r) ? Futures.immediateFuture(r.get(0).getTo())
+                    : Futures.immediateFuture(null));
         } else if (relationsQuery.getDirection() == EntitySearchDirection.TO) {
-            return Futures.transform(asyncRelation, (AsyncFunction<? super List<EntityRelation>, EntityId>)
-                    r -> CollectionUtils.isNotEmpty(r) ? Futures.immediateFuture(r.get(0).getFrom())
-                            : Futures.immediateFuture(null));
+            return Futures.transformAsync(asyncRelation, r -> CollectionUtils.isNotEmpty(r) ? Futures.immediateFuture(r.get(0).getFrom())
+                    : Futures.immediateFuture(null));
         }
         return Futures.immediateFailedFuture(new IllegalStateException("Unknown direction"));
     }
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesTenantIdAsyncLoader.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesTenantIdAsyncLoader.java
index 3d5c64e..a681d68 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesTenantIdAsyncLoader.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesTenantIdAsyncLoader.java
@@ -51,7 +51,7 @@ public class EntitiesTenantIdAsyncLoader {
     }
 
     private static <T extends HasTenantId> ListenableFuture<TenantId> getTenantAsync(ListenableFuture<T> future) {
-        return Futures.transform(future, (AsyncFunction<HasTenantId, TenantId>) in -> {
+        return Futures.transformAsync(future, in -> {
             return in != null ? Futures.immediateFuture(in.getTenantId())
                     : Futures.immediateFuture(null);});
     }
diff --git a/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java b/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java
index 7674549..bac68ff 100644
--- a/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java
+++ b/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java
@@ -38,8 +38,6 @@ import org.thingsboard.server.common.transport.quota.QuotaService;
 import org.thingsboard.server.transport.coap.adaptors.CoapTransportAdaptor;
 import org.thingsboard.server.transport.coap.session.CoapExchangeObserverProxy;
 import org.thingsboard.server.transport.coap.session.CoapSessionCtx;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.springframework.util.ReflectionUtils;
 
 @Slf4j
@@ -186,7 +184,7 @@ public class CoapTransportResource extends CoapResource {
                     throw new IllegalArgumentException("Unsupported msg type: " + type);
             }
             log.trace("Processing msg: {}", msg);
-            processor.process(new BasicToDeviceActorSessionMsg(ctx.getDevice(), msg));
+            processor.process(new BasicTransportToDeviceSessionActorMsg(ctx.getDevice(), msg));
         } catch (AdaptorException e) {
             log.debug("Failed to decode payload {}", e);
             exchange.respond(ResponseCode.BAD_REQUEST, e.getMessage());
diff --git a/transport/coap/src/test/java/org/thingsboard/server/transport/coap/CoapServerTest.java b/transport/coap/src/test/java/org/thingsboard/server/transport/coap/CoapServerTest.java
index 60b2220..320f06e 100644
--- a/transport/coap/src/test/java/org/thingsboard/server/transport/coap/CoapServerTest.java
+++ b/transport/coap/src/test/java/org/thingsboard/server/transport/coap/CoapServerTest.java
@@ -108,8 +108,8 @@ public class CoapServerTest {
 
                 @Override
                 public void process(SessionAwareMsg toActorMsg) {
-                    if (toActorMsg instanceof ToDeviceActorSessionMsg) {
-                        AdaptorToSessionActorMsg sessionMsg = ((ToDeviceActorSessionMsg) toActorMsg).getSessionMsg();
+                    if (toActorMsg instanceof TransportToDeviceSessionActorMsg) {
+                        AdaptorToSessionActorMsg sessionMsg = ((TransportToDeviceSessionActorMsg) toActorMsg).getSessionMsg();
                         try {
                             FromDeviceMsg deviceMsg = sessionMsg.getMsg();
                             ToDeviceMsg toDeviceMsg = null;
diff --git a/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java b/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java
index 4d90b5f..930b442 100644
--- a/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java
+++ b/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java
@@ -30,7 +30,7 @@ import org.thingsboard.server.common.data.security.DeviceTokenCredentials;
 import org.thingsboard.server.common.msg.core.*;
 import org.thingsboard.server.common.msg.session.AdaptorToSessionActorMsg;
 import org.thingsboard.server.common.msg.session.BasicAdaptorToSessionActorMsg;
-import org.thingsboard.server.common.msg.session.BasicToDeviceActorSessionMsg;
+import org.thingsboard.server.common.msg.session.BasicTransportToDeviceSessionActorMsg;
 import org.thingsboard.server.common.msg.session.FromDeviceMsg;
 import org.thingsboard.server.common.transport.SessionMsgProcessor;
 import org.thingsboard.server.common.transport.adaptor.JsonConverter;
@@ -219,7 +219,7 @@ public class DeviceApiController {
 
     private void process(HttpSessionCtx ctx, FromDeviceMsg request) {
         AdaptorToSessionActorMsg msg = new BasicAdaptorToSessionActorMsg(ctx, request);
-        processor.process(new BasicToDeviceActorSessionMsg(ctx.getDevice(), msg));
+        processor.process(new BasicTransportToDeviceSessionActorMsg(ctx.getDevice(), msg));
     }
 
     private boolean quotaExceeded(HttpServletRequest request, DeferredResult<ResponseEntity> responseWriter) {
diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
index 5ccce35..0b38817 100644
--- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
+++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
@@ -30,7 +30,7 @@ import org.thingsboard.server.common.data.Device;
 import org.thingsboard.server.common.data.security.DeviceTokenCredentials;
 import org.thingsboard.server.common.data.security.DeviceX509Credentials;
 import org.thingsboard.server.common.msg.session.AdaptorToSessionActorMsg;
-import org.thingsboard.server.common.msg.session.BasicToDeviceActorSessionMsg;
+import org.thingsboard.server.common.msg.session.BasicTransportToDeviceSessionActorMsg;
 import org.thingsboard.server.common.msg.session.ctrl.SessionCloseMsg;
 import org.thingsboard.server.common.transport.SessionMsgProcessor;
 import org.thingsboard.server.common.transport.adaptor.AdaptorException;
@@ -207,7 +207,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
             log.warn("[{}] Failed to process publish msg [{}][{}]", sessionId, topicName, msgId, e);
         }
         if (msg != null) {
-            processor.process(new BasicToDeviceActorSessionMsg(deviceSessionCtx.getDevice(), msg));
+            processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg));
         } else {
             log.info("[{}] Closing current session due to invalid publish msg [{}][{}]", sessionId, topicName, msgId);
             ctx.close();
@@ -227,11 +227,11 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
             try {
                 if (topicName.equals(DEVICE_ATTRIBUTES_TOPIC)) {
                     AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, SUBSCRIBE_ATTRIBUTES_REQUEST, mqttMsg);
-                    processor.process(new BasicToDeviceActorSessionMsg(deviceSessionCtx.getDevice(), msg));
+                    processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg));
                     grantedQoSList.add(getMinSupportedQos(reqQoS));
                 } else if (topicName.equals(DEVICE_RPC_REQUESTS_SUB_TOPIC)) {
                     AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, SUBSCRIBE_RPC_COMMANDS_REQUEST, mqttMsg);
-                    processor.process(new BasicToDeviceActorSessionMsg(deviceSessionCtx.getDevice(), msg));
+                    processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg));
                     grantedQoSList.add(getMinSupportedQos(reqQoS));
                 } else if (topicName.equals(DEVICE_RPC_RESPONSE_SUB_TOPIC)) {
                     grantedQoSList.add(getMinSupportedQos(reqQoS));
@@ -261,10 +261,10 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
             try {
                 if (topicName.equals(DEVICE_ATTRIBUTES_TOPIC)) {
                     AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, UNSUBSCRIBE_ATTRIBUTES_REQUEST, mqttMsg);
-                    processor.process(new BasicToDeviceActorSessionMsg(deviceSessionCtx.getDevice(), msg));
+                    processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg));
                 } else if (topicName.equals(DEVICE_RPC_REQUESTS_SUB_TOPIC)) {
                     AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, UNSUBSCRIBE_RPC_COMMANDS_REQUEST, mqttMsg);
-                    processor.process(new BasicToDeviceActorSessionMsg(deviceSessionCtx.getDevice(), msg));
+                    processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg));
                 } else if (topicName.equals(DEVICE_ATTRIBUTES_RESPONSES_TOPIC)) {
                     deviceSessionCtx.setDisallowAttributeResponses();
                 }
diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java
index 2056452..f666bb8 100644
--- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java
+++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java
@@ -30,7 +30,7 @@ import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
 import org.thingsboard.server.common.data.relation.EntityRelation;
 import org.thingsboard.server.common.msg.core.*;
 import org.thingsboard.server.common.msg.session.BasicAdaptorToSessionActorMsg;
-import org.thingsboard.server.common.msg.session.BasicToDeviceActorSessionMsg;
+import org.thingsboard.server.common.msg.session.BasicTransportToDeviceSessionActorMsg;
 import org.thingsboard.server.common.msg.session.ctrl.SessionCloseMsg;
 import org.thingsboard.server.common.transport.SessionMsgProcessor;
 import org.thingsboard.server.common.transport.adaptor.AdaptorException;
@@ -96,8 +96,8 @@ public class GatewaySessionCtx {
             GatewayDeviceSessionCtx ctx = new GatewayDeviceSessionCtx(this, device);
             devices.put(deviceName, ctx);
             log.debug("[{}] Added device [{}] to the gateway session", gatewaySessionId, deviceName);
-            processor.process(new BasicToDeviceActorSessionMsg(device, new BasicAdaptorToSessionActorMsg(ctx, new AttributesSubscribeMsg())));
-            processor.process(new BasicToDeviceActorSessionMsg(device, new BasicAdaptorToSessionActorMsg(ctx, new RpcSubscribeMsg())));
+            processor.process(new BasicTransportToDeviceSessionActorMsg(device, new BasicAdaptorToSessionActorMsg(ctx, new AttributesSubscribeMsg())));
+            processor.process(new BasicTransportToDeviceSessionActorMsg(device, new BasicAdaptorToSessionActorMsg(ctx, new RpcSubscribeMsg())));
         }
     }
 
@@ -136,7 +136,7 @@ public class GatewaySessionCtx {
                     JsonConverter.parseWithTs(request, element.getAsJsonObject());
                 }
                 GatewayDeviceSessionCtx deviceSessionCtx = devices.get(deviceName);
-                processor.process(new BasicToDeviceActorSessionMsg(deviceSessionCtx.getDevice(),
+                processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(),
                         new BasicAdaptorToSessionActorMsg(deviceSessionCtx, request)));
             }
         } else {
@@ -152,7 +152,7 @@ public class GatewaySessionCtx {
             Integer requestId = jsonObj.get("id").getAsInt();
             String data = jsonObj.get("data").toString();
             GatewayDeviceSessionCtx deviceSessionCtx = devices.get(deviceName);
-            processor.process(new BasicToDeviceActorSessionMsg(deviceSessionCtx.getDevice(),
+            processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(),
                     new BasicAdaptorToSessionActorMsg(deviceSessionCtx, new ToDeviceRpcResponseMsg(requestId, data))));
         } else {
             throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json);
@@ -174,7 +174,7 @@ public class GatewaySessionCtx {
                 JsonObject deviceData = deviceEntry.getValue().getAsJsonObject();
                 request.add(JsonConverter.parseValues(deviceData).stream().map(kv -> new BaseAttributeKvEntry(kv, ts)).collect(Collectors.toList()));
                 GatewayDeviceSessionCtx deviceSessionCtx = devices.get(deviceName);
-                processor.process(new BasicToDeviceActorSessionMsg(deviceSessionCtx.getDevice(),
+                processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(),
                         new BasicAdaptorToSessionActorMsg(deviceSessionCtx, request)));
             }
         } else {
@@ -207,7 +207,7 @@ public class GatewaySessionCtx {
                 request = new BasicGetAttributesRequest(requestId, null, keys);
             }
             GatewayDeviceSessionCtx deviceSessionCtx = devices.get(deviceName);
-            processor.process(new BasicToDeviceActorSessionMsg(deviceSessionCtx.getDevice(),
+            processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(),
                     new BasicAdaptorToSessionActorMsg(deviceSessionCtx, request)));
             ack(msg);
         } else {