thingsboard-aplcache

init cluster refactoring

5/9/2018 12:45:53 PM

Changes

Details

diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
index 54737f8..a1e6f06 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
@@ -60,6 +60,7 @@ import org.thingsboard.server.service.cluster.discovery.DiscoveryService;
 import org.thingsboard.server.service.cluster.routing.ClusterRoutingService;
 import org.thingsboard.server.service.cluster.rpc.ClusterRpcService;
 import org.thingsboard.server.service.component.ComponentDiscoveryService;
+import org.thingsboard.server.service.encoding.DataDecodingEncodingService;
 import org.thingsboard.server.service.executors.DbCallbackExecutorService;
 import org.thingsboard.server.service.executors.ExternalCallExecutorService;
 import org.thingsboard.server.service.mail.MailExecutorService;
@@ -103,6 +104,10 @@ public class ActorSystemContext {
 
     @Autowired
     @Getter
+    private DataDecodingEncodingService encodingService;
+
+    @Autowired
+    @Getter
     private DeviceAuthService deviceAuthService;
 
     @Autowired
diff --git a/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java b/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java
index 4d9b8b4..73cf788 100644
--- a/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java
@@ -34,6 +34,8 @@ import org.thingsboard.server.common.data.page.PageDataIterable;
 import org.thingsboard.server.common.msg.TbActorMsg;
 import org.thingsboard.server.common.msg.aware.DeviceAwareMsg;
 import org.thingsboard.server.common.msg.aware.TenantAwareMsg;
+import org.thingsboard.server.common.msg.cluster.SendToClusterMsg;
+import org.thingsboard.server.common.msg.cluster.ServerAddress;
 import org.thingsboard.server.common.msg.device.DeviceToDeviceActorMsg;
 import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
 import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg;
@@ -45,6 +47,7 @@ import scala.concurrent.duration.Duration;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Optional;
 
 public class AppActor extends RuleChainManagerActor {
 
@@ -89,6 +92,9 @@ public class AppActor extends RuleChainManagerActor {
     @Override
     protected boolean process(TbActorMsg msg) {
         switch (msg.getMsgType()) {
+            case SEND_TO_CLUSTER_MSG:
+                onPossibleClusterMsg((SendToClusterMsg) msg);
+                break;
             case CLUSTER_EVENT_MSG:
                 broadcast(msg);
                 break;
@@ -112,6 +118,16 @@ public class AppActor extends RuleChainManagerActor {
         return true;
     }
 
+    private void onPossibleClusterMsg(SendToClusterMsg msg) {
+        Optional<ServerAddress> address = systemContext.getRoutingService().resolveById(msg.getEntityId());
+            if (address.isPresent()) {
+                systemContext.getRpcService().tell(
+                        systemContext.getEncodingService().convertToProtoDataMessage(address.get(), msg.getMsg()));
+            } else {
+                self().tell(msg.getMsg(), ActorRef.noSender());
+            }
+        }
+
     private void onServiceToRuleEngineMsg(ServiceToRuleEngineMsg msg) {
         if (SYSTEM_TENANT.equals(msg.getTenantId())) {
             //TODO: ashvayka handle this.
diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
index 5112f22..12164e6 100644
--- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
@@ -74,6 +74,7 @@ import org.thingsboard.server.extensions.api.device.DeviceAttributesEventNotific
 import org.thingsboard.server.extensions.api.device.DeviceNameOrTypeUpdateMsg;
 import org.thingsboard.server.extensions.api.plugins.msg.FromDeviceRpcResponse;
 import org.thingsboard.server.extensions.api.plugins.msg.RpcError;
+import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
 import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg;
 import org.thingsboard.server.service.rpc.ToServerRpcResponseActorMsg;
 
@@ -521,7 +522,8 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
         if (sessionAddress.isPresent()) {
             ServerAddress address = sessionAddress.get();
             logger.debug("{} Forwarding msg: {}", address, response);
-            systemContext.getRpcService().tell(sessionAddress.get(), response);
+            systemContext.getRpcService().tell(systemContext.getEncodingService()
+                    .convertToProtoDataMessage(sessionAddress.get(), response));
         } else {
             systemContext.getSessionManagerActor().tell(response, ActorRef.noSender());
         }
diff --git a/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java b/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java
index 5e16aab..f6b30bf 100644
--- a/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java
+++ b/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java
@@ -82,7 +82,8 @@ public final class PluginProcessingContext implements PluginContext {
 
     @Override
     public void sendPluginRpcMsg(RpcMsg msg) {
-        this.pluginCtx.rpcService.tell(new PluginRpcMsg(pluginCtx.tenantId, pluginCtx.pluginId, msg));
+        //ToDO is this a cluster messsage?
+//        this.pluginCtx.rpcService.tell(new PluginRpcMsg(pluginCtx.tenantId, pluginCtx.pluginId, msg));
     }
 
     @Override
diff --git a/application/src/main/java/org/thingsboard/server/actors/plugin/SharedPluginProcessingContext.java b/application/src/main/java/org/thingsboard/server/actors/plugin/SharedPluginProcessingContext.java
index dcabedc..c43d62d 100644
--- a/application/src/main/java/org/thingsboard/server/actors/plugin/SharedPluginProcessingContext.java
+++ b/application/src/main/java/org/thingsboard/server/actors/plugin/SharedPluginProcessingContext.java
@@ -21,6 +21,7 @@ import org.thingsboard.server.actors.ActorSystemContext;
 import org.thingsboard.server.common.data.id.DeviceId;
 import org.thingsboard.server.common.data.id.PluginId;
 import org.thingsboard.server.common.data.id.TenantId;
+import org.thingsboard.server.common.msg.TbActorMsg;
 import org.thingsboard.server.common.msg.cluster.ServerAddress;
 import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest;
 import org.thingsboard.server.common.msg.timeout.TimeoutMsg;
@@ -100,7 +101,7 @@ public final class SharedPluginProcessingContext {
     }
 
     public void toDeviceActor(DeviceAttributesEventNotificationMsg msg) {
-        forward(msg.getDeviceId(), msg, rpcService::tell);
+        forward(msg.getDeviceId(), msg);
     }
 
     public void sendRpcRequest(ToDeviceRpcRequest msg) {
@@ -109,11 +110,11 @@ public final class SharedPluginProcessingContext {
 //        forward(msg.getDeviceId(), rpcMsg, rpcService::tell);
     }
 
-    private <T> void forward(DeviceId deviceId, T msg, BiConsumer<ServerAddress, T> rpcFunction) {
+    private <T extends TbActorMsg> void forward(DeviceId deviceId, T msg) {
         Optional<ServerAddress> instance = routingService.resolveById(deviceId);
         if (instance.isPresent()) {
             log.trace("[{}] Forwarding msg {} to remote device actor!", pluginId, msg);
-            rpcFunction.accept(instance.get(), msg);
+            rpcService.tell(systemContext.getEncodingService().convertToProtoDataMessage(instance.get(), msg));
         } else {
             log.trace("[{}] Forwarding msg {} to local device actor!", pluginId, msg);
             parentActor.tell(msg, ActorRef.noSender());
diff --git a/application/src/main/java/org/thingsboard/server/actors/rpc/BasicRpcSessionListener.java b/application/src/main/java/org/thingsboard/server/actors/rpc/BasicRpcSessionListener.java
index bc36dc8..d5c9dab 100644
--- a/application/src/main/java/org/thingsboard/server/actors/rpc/BasicRpcSessionListener.java
+++ b/application/src/main/java/org/thingsboard/server/actors/rpc/BasicRpcSessionListener.java
@@ -17,30 +17,11 @@ package org.thingsboard.server.actors.rpc;
 
 import akka.actor.ActorRef;
 import lombok.extern.slf4j.Slf4j;
-import org.springframework.util.SerializationUtils;
-import org.springframework.util.StringUtils;
 import org.thingsboard.server.actors.ActorSystemContext;
 import org.thingsboard.server.actors.service.ActorService;
-import org.thingsboard.server.common.data.id.DeviceId;
-import org.thingsboard.server.common.data.id.PluginId;
-import org.thingsboard.server.common.data.id.TenantId;
-import org.thingsboard.server.common.data.rpc.ToDeviceRpcRequestBody;
-import org.thingsboard.server.common.msg.cluster.ServerAddress;
-import org.thingsboard.server.common.msg.cluster.ToAllNodesMsg;
-import org.thingsboard.server.common.msg.core.ToDeviceSessionActorMsg;
-import org.thingsboard.server.common.msg.device.DeviceToDeviceActorMsg;
-import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest;
-import org.thingsboard.server.extensions.api.device.ToDeviceActorNotificationMsg;
-import org.thingsboard.server.extensions.api.plugins.msg.*;
-import org.thingsboard.server.extensions.api.plugins.rpc.PluginRpcMsg;
-import org.thingsboard.server.extensions.api.plugins.rpc.RpcMsg;
 import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
 import org.thingsboard.server.service.cluster.rpc.GrpcSession;
 import org.thingsboard.server.service.cluster.rpc.GrpcSessionListener;
-import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg;
-
-import java.io.Serializable;
-import java.util.UUID;
 
 /**
  * @author Andrew Shvayka
@@ -76,48 +57,13 @@ public class BasicRpcSessionListener implements GrpcSessionListener {
     }
 
     @Override
-    public void onToPluginRpcMsg(GrpcSession session, ClusterAPIProtos.ToPluginRpcMessage msg) {
-        if (log.isTraceEnabled()) {
-            log.trace("{} session [{}] received plugin msg {}", getType(session), session.getRemoteServer(), msg);
-        }
-        service.onMsg(convert(session.getRemoteServer(), msg));
-    }
-
-    @Override
-    public void onToDeviceActorRpcMsg(GrpcSession session, ClusterAPIProtos.ToDeviceActorRpcMessage msg) {
-        log.trace("{} session [{}] received device actor msg {}", getType(session), session.getRemoteServer(), msg);
-        service.onMsg((DeviceToDeviceActorMsg) deserialize(msg.getData().toByteArray()));
-    }
-
-    @Override
-    public void onToDeviceActorNotificationRpcMsg(GrpcSession session, ClusterAPIProtos.ToDeviceActorNotificationRpcMessage msg) {
-        log.trace("{} session [{}] received device actor notification msg {}", getType(session), session.getRemoteServer(), msg);
-        service.onMsg((ToDeviceActorNotificationMsg) deserialize(msg.getData().toByteArray()));
-    }
-
-    @Override
-    public void onToDeviceSessionActorRpcMsg(GrpcSession session, ClusterAPIProtos.ToDeviceSessionActorRpcMessage msg) {
-        log.trace(SESSION_RECEIVED_SESSION_ACTOR_MSG, getType(session), session.getRemoteServer(), msg);
-        service.onMsg((ToDeviceSessionActorMsg) deserialize(msg.getData().toByteArray()));
-    }
-
-    @Override
-    public void onToDeviceRpcRequestRpcMsg(GrpcSession session, ClusterAPIProtos.ToDeviceRpcRequestRpcMessage msg) {
-        log.trace(SESSION_RECEIVED_SESSION_ACTOR_MSG, getType(session), session.getRemoteServer(), msg);
-        service.onMsg(deserialize(session.getRemoteServer(), msg));
-    }
-
-    @Override
-    public void onFromDeviceRpcResponseRpcMsg(GrpcSession session, ClusterAPIProtos.ToPluginRpcResponseRpcMessage msg) {
-        log.trace(SESSION_RECEIVED_SESSION_ACTOR_MSG, getType(session), session.getRemoteServer(), msg);
-        service.onMsg(deserialize(session.getRemoteServer(), msg));
+    public void onReceiveClusterGrpcMsg(GrpcSession session, ClusterAPIProtos.ClusterMessage clusterMessage) {
+        log.trace("{} Service [{}] received session actor msg {}", getType(session),
+                session.getRemoteServer(),
+                clusterMessage);
+        service.onRecievedMsg(clusterMessage);
     }
 
-    @Override
-    public void onToAllNodesRpcMessage(GrpcSession session, ClusterAPIProtos.ToAllNodesRpcMessage msg) {
-        log.trace(SESSION_RECEIVED_SESSION_ACTOR_MSG, getType(session), session.getRemoteServer(), msg);
-        service.onMsg((ToAllNodesMsg) deserialize(msg.getData().toByteArray()));
-    }
 
     @Override
     public void onError(GrpcSession session, Throwable t) {
@@ -130,37 +76,5 @@ public class BasicRpcSessionListener implements GrpcSessionListener {
         return session.isClient() ? "Client" : "Server";
     }
 
-    private static PluginRpcMsg convert(ServerAddress serverAddress, ClusterAPIProtos.ToPluginRpcMessage msg) {
-        ClusterAPIProtos.PluginAddress address = msg.getAddress();
-        TenantId tenantId = new TenantId(toUUID(address.getTenantId()));
-        PluginId pluginId = new PluginId(toUUID(address.getPluginId()));
-        RpcMsg rpcMsg = new RpcMsg(serverAddress, msg.getClazz(), msg.getData().toByteArray());
-        return new PluginRpcMsg(tenantId, pluginId, rpcMsg);
-    }
-
-    private static UUID toUUID(ClusterAPIProtos.Uid uid) {
-        return new UUID(uid.getPluginUuidMsb(), uid.getPluginUuidLsb());
-    }
-
-    private static ToDeviceRpcRequestActorMsg deserialize(ServerAddress serverAddress, ClusterAPIProtos.ToDeviceRpcRequestRpcMessage msg) {
-        TenantId deviceTenantId = new TenantId(toUUID(msg.getDeviceTenantId()));
-        DeviceId deviceId = new DeviceId(toUUID(msg.getDeviceId()));
-
-        ToDeviceRpcRequestBody requestBody = new ToDeviceRpcRequestBody(msg.getMethod(), msg.getParams());
-        ToDeviceRpcRequest request = new ToDeviceRpcRequest(toUUID(msg.getMsgId()), deviceTenantId, deviceId, msg.getOneway(), msg.getExpTime(), requestBody);
-
-        return new ToDeviceRpcRequestActorMsg(serverAddress, request);
-    }
-
-    private static ToPluginRpcResponseDeviceMsg deserialize(ServerAddress serverAddress, ClusterAPIProtos.ToPluginRpcResponseRpcMessage msg) {
-        RpcError error = !StringUtils.isEmpty(msg.getError()) ? RpcError.valueOf(msg.getError()) : null;
-        FromDeviceRpcResponse response = new FromDeviceRpcResponse(toUUID(msg.getMsgId()), msg.getResponse(), error);
-        return new ToPluginRpcResponseDeviceMsg(null, null, response);
-    }
-
-    @SuppressWarnings("unchecked")
-    private static <T extends Serializable> T deserialize(byte[] data) {
-        return (T) SerializationUtils.deserialize(data);
-    }
 
 }
diff --git a/application/src/main/java/org/thingsboard/server/actors/rpc/RpcBroadcastMsg.java b/application/src/main/java/org/thingsboard/server/actors/rpc/RpcBroadcastMsg.java
index 3718a22..2dd949e 100644
--- a/application/src/main/java/org/thingsboard/server/actors/rpc/RpcBroadcastMsg.java
+++ b/application/src/main/java/org/thingsboard/server/actors/rpc/RpcBroadcastMsg.java
@@ -23,5 +23,5 @@ import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
  */
 @Data
 public final class RpcBroadcastMsg {
-    private final ClusterAPIProtos.ToRpcServerMessage msg;
+    private final ClusterAPIProtos.ClusterMessage msg;
 }
diff --git a/application/src/main/java/org/thingsboard/server/actors/rpc/RpcManagerActor.java b/application/src/main/java/org/thingsboard/server/actors/rpc/RpcManagerActor.java
index ba20013..ab21e5d 100644
--- a/application/src/main/java/org/thingsboard/server/actors/rpc/RpcManagerActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/rpc/RpcManagerActor.java
@@ -40,7 +40,7 @@ public class RpcManagerActor extends ContextAwareActor {
 
     private final Map<ServerAddress, SessionActorInfo> sessionActors;
 
-    private final Map<ServerAddress, Queue<ClusterAPIProtos.ToRpcServerMessage>> pendingMsgs;
+    private final Map<ServerAddress, Queue<ClusterAPIProtos.ClusterMessage>> pendingMsgs;
 
     private final ServerAddress instance;
 
@@ -65,8 +65,8 @@ public class RpcManagerActor extends ContextAwareActor {
 
     @Override
     public void onReceive(Object msg) throws Exception {
-        if (msg instanceof RpcSessionTellMsg) {
-            onMsg((RpcSessionTellMsg) msg);
+        if (msg instanceof ClusterAPIProtos.ClusterMessage) {
+            onMsg((ClusterAPIProtos.ClusterMessage) msg);
         } else if (msg instanceof RpcBroadcastMsg) {
             onMsg((RpcBroadcastMsg) msg);
         } else if (msg instanceof RpcSessionCreateRequestMsg) {
@@ -84,27 +84,32 @@ public class RpcManagerActor extends ContextAwareActor {
 
     private void onMsg(RpcBroadcastMsg msg) {
         log.debug("Forwarding msg to session actors {}", msg);
-        sessionActors.keySet().forEach(address -> onMsg(new RpcSessionTellMsg(address, msg.getMsg())));
+        sessionActors.keySet().forEach(address -> onMsg(msg.getMsg()));
         pendingMsgs.values().forEach(queue -> queue.add(msg.getMsg()));
     }
 
-    private void onMsg(RpcSessionTellMsg msg) {
-        ServerAddress address = msg.getServerAddress();
-        SessionActorInfo session = sessionActors.get(address);
-        if (session != null) {
-            log.debug("{} Forwarding msg to session actor", address);
-            session.actor.tell(msg, ActorRef.noSender());
-        } else {
-            log.debug("{} Storing msg to pending queue", address);
-            Queue<ClusterAPIProtos.ToRpcServerMessage> queue = pendingMsgs.get(address);
-            if (queue == null) {
-                queue = new LinkedList<>();
-                pendingMsgs.put(address, queue);
+    private void onMsg(ClusterAPIProtos.ClusterMessage msg) {
+        if (msg.hasServerAdresss()) {
+            ServerAddress address = new ServerAddress(msg.getServerAdresss().getHost(),
+                    msg.getServerAdresss().getPort());
+            SessionActorInfo session = sessionActors.get(address);
+            if (session != null) {
+                log.debug("{} Forwarding msg to session actor", address);
+                session.getActor().tell(msg, ActorRef.noSender());
+            } else {
+                log.debug("{} Storing msg to pending queue", address);
+                Queue<ClusterAPIProtos.ClusterMessage> queue = pendingMsgs.get(address);
+                if (queue == null) {
+                    queue = new LinkedList<>();
+                    pendingMsgs.put(new ServerAddress(
+                            msg.getServerAdresss().getHost(), msg.getServerAdresss().getPort()), queue);
+                }
+                queue.add(msg);
             }
-            queue.add(msg.getMsg());
+        } else {
+            logger.warning("Cluster msg doesn't have set Server Address [{}]", msg);
         }
     }
-
     @Override
     public void postStop() {
         sessionActors.clear();
@@ -167,10 +172,10 @@ public class RpcManagerActor extends ContextAwareActor {
     private void register(ServerAddress remoteAddress, UUID uuid, ActorRef sender) {
         sessionActors.put(remoteAddress, new SessionActorInfo(uuid, sender));
         log.debug("[{}][{}] Registering session actor.", remoteAddress, uuid);
-        Queue<ClusterAPIProtos.ToRpcServerMessage> data = pendingMsgs.remove(remoteAddress);
+        Queue<ClusterAPIProtos.ClusterMessage> data = pendingMsgs.remove(remoteAddress);
         if (data != null) {
             log.debug("[{}][{}] Forwarding {} pending messages.", remoteAddress, uuid, data.size());
-            data.forEach(msg -> sender.tell(new RpcSessionTellMsg(remoteAddress, msg), ActorRef.noSender()));
+            data.forEach(msg -> sender.tell(new RpcSessionTellMsg(msg), ActorRef.noSender()));
         } else {
             log.debug("[{}][{}] No pending messages to forward.", remoteAddress, uuid);
         }
diff --git a/application/src/main/java/org/thingsboard/server/actors/rpc/RpcSessionActor.java b/application/src/main/java/org/thingsboard/server/actors/rpc/RpcSessionActor.java
index a187444..9d0f3e8 100644
--- a/application/src/main/java/org/thingsboard/server/actors/rpc/RpcSessionActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/rpc/RpcSessionActor.java
@@ -32,6 +32,8 @@ import org.thingsboard.server.service.cluster.rpc.GrpcSessionListener;
 
 import java.util.UUID;
 
+import static org.thingsboard.server.gen.cluster.ClusterAPIProtos.MessageType.CONNECT_RPC_MESSAGE;
+
 /**
  * @author Andrew Shvayka
  */
@@ -56,15 +58,15 @@ public class RpcSessionActor extends ContextAwareActor {
 
     @Override
     public void onReceive(Object msg) throws Exception {
-        if (msg instanceof RpcSessionTellMsg) {
-            tell((RpcSessionTellMsg) msg);
+        if (msg instanceof ClusterAPIProtos.ClusterMessage) {
+            tell((ClusterAPIProtos.ClusterMessage) msg);
         } else if (msg instanceof RpcSessionCreateRequestMsg) {
             initSession((RpcSessionCreateRequestMsg) msg);
         }
     }
 
-    private void tell(RpcSessionTellMsg msg) {
-        session.sendMsg(msg.getMsg());
+    private void tell(ClusterAPIProtos.ClusterMessage msg) {
+        session.sendMsg(msg);
     }
 
     @Override
@@ -91,7 +93,7 @@ public class RpcSessionActor extends ContextAwareActor {
             session.initInputStream();
 
             ClusterRpcServiceGrpc.ClusterRpcServiceStub stub = ClusterRpcServiceGrpc.newStub(channel);
-            StreamObserver<ClusterAPIProtos.ToRpcServerMessage> outputStream = stub.handlePluginMsgs(session.getInputStream());
+            StreamObserver<ClusterAPIProtos.ClusterMessage> outputStream = stub.handleMsgs(session.getInputStream());
 
             session.setOutputStream(outputStream);
             session.initOutputStream();
@@ -115,11 +117,10 @@ public class RpcSessionActor extends ContextAwareActor {
         }
     }
 
-    private ClusterAPIProtos.ToRpcServerMessage toConnectMsg() {
+    private ClusterAPIProtos.ClusterMessage toConnectMsg() {
         ServerAddress instance = systemContext.getDiscoveryService().getCurrentServer().getServerAddress();
-        return ClusterAPIProtos.ToRpcServerMessage.newBuilder().setConnectMsg(
-                ClusterAPIProtos.ConnectRpcMessage.newBuilder().setServerAddress(
-                        ClusterAPIProtos.ServerAddress.newBuilder().setHost(instance.getHost()).setPort(instance.getPort()).build()).build()).build();
-
+        return ClusterAPIProtos.ClusterMessage.newBuilder().setMessageType(CONNECT_RPC_MESSAGE).setServerAdresss(
+                ClusterAPIProtos.ServerAddress.newBuilder().setHost(instance.getHost())
+                        .setPort(instance.getPort()).build()).build();
     }
 }
diff --git a/application/src/main/java/org/thingsboard/server/actors/rpc/RpcSessionCreateRequestMsg.java b/application/src/main/java/org/thingsboard/server/actors/rpc/RpcSessionCreateRequestMsg.java
index 5bcf1d6..0c1136e 100644
--- a/application/src/main/java/org/thingsboard/server/actors/rpc/RpcSessionCreateRequestMsg.java
+++ b/application/src/main/java/org/thingsboard/server/actors/rpc/RpcSessionCreateRequestMsg.java
@@ -30,6 +30,6 @@ public final class RpcSessionCreateRequestMsg {
 
     private final UUID msgUid;
     private final ServerAddress remoteAddress;
-    private final StreamObserver<ClusterAPIProtos.ToRpcServerMessage> responseObserver;
+    private final StreamObserver<ClusterAPIProtos.ClusterMessage> responseObserver;
 
 }
diff --git a/application/src/main/java/org/thingsboard/server/actors/rpc/RpcSessionTellMsg.java b/application/src/main/java/org/thingsboard/server/actors/rpc/RpcSessionTellMsg.java
index 5a61044..858e3aa 100644
--- a/application/src/main/java/org/thingsboard/server/actors/rpc/RpcSessionTellMsg.java
+++ b/application/src/main/java/org/thingsboard/server/actors/rpc/RpcSessionTellMsg.java
@@ -24,6 +24,5 @@ import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
  */
 @Data
 public final class RpcSessionTellMsg {
-    private final ServerAddress serverAddress;
-    private final ClusterAPIProtos.ToRpcServerMessage msg;
+    private final ClusterAPIProtos.ClusterMessage msg;
 }
diff --git a/application/src/main/java/org/thingsboard/server/actors/service/ActorService.java b/application/src/main/java/org/thingsboard/server/actors/service/ActorService.java
index e15eb2f..5a097df 100644
--- a/application/src/main/java/org/thingsboard/server/actors/service/ActorService.java
+++ b/application/src/main/java/org/thingsboard/server/actors/service/ActorService.java
@@ -18,6 +18,7 @@ package org.thingsboard.server.actors.service;
 import org.thingsboard.server.common.data.id.*;
 import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
 import org.thingsboard.server.common.msg.TbMsg;
+import org.thingsboard.server.common.msg.cluster.SendToClusterMsg;
 import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg;
 import org.thingsboard.server.common.transport.SessionMsgProcessor;
 import org.thingsboard.server.service.cluster.discovery.DiscoveryServiceListener;
@@ -27,10 +28,11 @@ public interface ActorService extends SessionMsgProcessor, WebSocketMsgProcessor
 
     void onEntityStateChange(TenantId tenantId, EntityId entityId, ComponentLifecycleEvent state);
 
-    void onMsg(ServiceToRuleEngineMsg msg);
+    void onMsg(SendToClusterMsg msg);
 
     void onCredentialsUpdate(TenantId tenantId, DeviceId deviceId);
 
     void onDeviceNameOrTypeUpdate(TenantId tenantId, DeviceId deviceId, String deviceName, String deviceType);
 
+    void onMsg(ServiceToRuleEngineMsg serviceToRuleEngineMsg);
 }
diff --git a/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java b/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java
index df0d122..05eda75 100644
--- a/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java
+++ b/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java
@@ -19,6 +19,7 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.actor.Props;
 import akka.actor.Terminated;
+import com.google.protobuf.ByteString;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
@@ -32,8 +33,10 @@ import org.thingsboard.server.actors.session.SessionManagerActor;
 import org.thingsboard.server.actors.stats.StatsActor;
 import org.thingsboard.server.common.data.id.*;
 import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
+import org.thingsboard.server.common.msg.TbActorMsg;
 import org.thingsboard.server.common.msg.aware.SessionAwareMsg;
 import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
+import org.thingsboard.server.common.msg.cluster.SendToClusterMsg;
 import org.thingsboard.server.common.msg.cluster.ServerAddress;
 import org.thingsboard.server.common.msg.cluster.ToAllNodesMsg;
 import org.thingsboard.server.common.msg.core.ToDeviceSessionActorMsg;
@@ -46,6 +49,7 @@ import org.thingsboard.server.extensions.api.device.ToDeviceActorNotificationMsg
 import org.thingsboard.server.extensions.api.plugins.msg.ToPluginActorMsg;
 import org.thingsboard.server.extensions.api.plugins.rest.PluginRestMsg;
 import org.thingsboard.server.extensions.api.plugins.ws.msg.PluginWebsocketMsg;
+import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
 import org.thingsboard.server.service.cluster.discovery.DiscoveryService;
 import org.thingsboard.server.service.cluster.discovery.ServerInstance;
 import org.thingsboard.server.service.cluster.rpc.ClusterRpcService;
@@ -57,6 +61,9 @@ import javax.annotation.PostConstruct;
 import javax.annotation.PreDestroy;
 import java.util.Optional;
 
+import static org.thingsboard.server.gen.cluster.ClusterAPIProtos.MessageType.CLUSTER_NETWORK_SERVER_DATA_MESSAGE;
+import static org.thingsboard.server.gen.cluster.ClusterAPIProtos.MessageType.RPC_BROADCAST_MSG;
+
 @Service
 @Slf4j
 public class DefaultActorService implements ActorService {
@@ -127,7 +134,7 @@ public class DefaultActorService implements ActorService {
     }
 
     @Override
-    public void onMsg(ServiceToRuleEngineMsg msg) {
+    public void onMsg(SendToClusterMsg msg) {
         appActor.tell(msg, ActorRef.noSender());
     }
 
@@ -149,53 +156,7 @@ public class DefaultActorService implements ActorService {
         appActor.tell(msg, ActorRef.noSender());
     }
 
-    @Override
-    public void onMsg(ToPluginActorMsg msg) {
-        log.trace("Processing plugin rpc msg: {}", msg);
-        appActor.tell(msg, ActorRef.noSender());
-    }
-
-    @Override
-    public void onMsg(DeviceToDeviceActorMsg msg) {
-        log.trace("Processing device rpc msg: {}", msg);
-        appActor.tell(msg, ActorRef.noSender());
-    }
-
-    @Override
-    public void onMsg(ToDeviceActorNotificationMsg msg) {
-        log.trace("Processing notification rpc msg: {}", msg);
-        appActor.tell(msg, ActorRef.noSender());
-    }
-
-    @Override
-    public void onMsg(ToDeviceSessionActorMsg msg) {
-        log.trace("Processing session rpc msg: {}", msg);
-        sessionManagerActor.tell(msg, ActorRef.noSender());
-    }
-
-    @Override
-    public void onMsg(ToAllNodesMsg msg) {
-        log.trace("Processing broadcast rpc msg: {}", msg);
-        appActor.tell(msg, ActorRef.noSender());
-    }
 
-    @Override
-    public void onMsg(RpcSessionCreateRequestMsg msg) {
-        log.trace("Processing session create msg: {}", msg);
-        rpcManagerActor.tell(msg, ActorRef.noSender());
-    }
-
-    @Override
-    public void onMsg(RpcSessionTellMsg msg) {
-        log.trace("Processing session rpc msg: {}", msg);
-        rpcManagerActor.tell(msg, ActorRef.noSender());
-    }
-
-    @Override
-    public void onMsg(RpcBroadcastMsg msg) {
-        log.trace("Processing broadcast rpc msg: {}", msg);
-        rpcManagerActor.tell(msg, ActorRef.noSender());
-    }
 
     @Override
     public void onServerAdded(ServerInstance server) {
@@ -223,28 +184,29 @@ public class DefaultActorService implements ActorService {
     @Override
     public void onCredentialsUpdate(TenantId tenantId, DeviceId deviceId) {
         DeviceCredentialsUpdateNotificationMsg msg = new DeviceCredentialsUpdateNotificationMsg(tenantId, deviceId);
-        Optional<ServerAddress> address = actorContext.getRoutingService().resolveById(deviceId);
-        if (address.isPresent()) {
-            rpcService.tell(address.get(), msg);
-        } else {
-            onMsg(msg);
-        }
+        appActor.tell(new SendToClusterMsg(deviceId, msg), ActorRef.noSender());
     }
 
     @Override
     public void onDeviceNameOrTypeUpdate(TenantId tenantId, DeviceId deviceId, String deviceName, String deviceType) {
         log.trace("[{}] Processing onDeviceNameOrTypeUpdate event, deviceName: {}, deviceType: {}", deviceId, deviceName, deviceType);
         DeviceNameOrTypeUpdateMsg msg = new DeviceNameOrTypeUpdateMsg(tenantId, deviceId, deviceName, deviceType);
-        Optional<ServerAddress> address = actorContext.getRoutingService().resolveById(deviceId);
-        if (address.isPresent()) {
-            rpcService.tell(address.get(), msg);
-        } else {
-            onMsg(msg);
-        }
+        appActor.tell(new SendToClusterMsg(deviceId, msg), ActorRef.noSender());
+    }
+
+    @Override
+    public void onMsg(ServiceToRuleEngineMsg msg) {
+        appActor.tell(msg, ActorRef.noSender());
     }
 
     public void broadcast(ToAllNodesMsg msg) {
-        rpcService.broadcast(msg);
+        actorContext.getEncodingService().encode(msg);
+        rpcService.broadcast(new RpcBroadcastMsg(ClusterAPIProtos.ClusterMessage
+                .newBuilder()
+                .setPayload(ByteString
+                .copyFrom(actorContext.getEncodingService().encode(msg)))
+                .setMessageType(CLUSTER_NETWORK_SERVER_DATA_MESSAGE)
+                .build()));
         appActor.tell(msg, ActorRef.noSender());
     }
 
@@ -253,4 +215,37 @@ public class DefaultActorService implements ActorService {
         this.sessionManagerActor.tell(msg, ActorRef.noSender());
         this.rpcManagerActor.tell(msg, ActorRef.noSender());
     }
+
+    @Override
+    public void onRecievedMsg(ClusterAPIProtos.ClusterMessage msg) {
+        switch(msg.getMessageType()) {
+            case CLUSTER_NETWORK_SERVER_DATA_MESSAGE:
+                java.util.Optional<TbActorMsg> decodedMsg = actorContext.getEncodingService()
+                        .decode(msg.getPayload().toByteArray());
+                if (decodedMsg.isPresent()) {
+                    appActor.tell(decodedMsg.get(), ActorRef.noSender());
+                } else {
+                    log.error("Error during decoding cluster proto message");
+                }
+                break;
+            case TO_ALL_NODES_MSG:
+                //ToDo
+                break;
+        }
+    }
+
+    @Override
+    public void onSendMsg(ClusterAPIProtos.ClusterMessage msg) {
+        rpcManagerActor.tell(msg, ActorRef.noSender());
+    }
+
+    @Override
+    public void onRpcSessionCreateRequestMsg(RpcSessionCreateRequestMsg msg) {
+        rpcManagerActor.tell(msg, ActorRef.noSender());
+    }
+
+    @Override
+    public void onBroadcastMsg(RpcBroadcastMsg msg) {
+        rpcManagerActor.tell(msg, ActorRef.noSender());
+    }
 }
diff --git a/application/src/main/java/org/thingsboard/server/actors/session/AbstractSessionActorMsgProcessor.java b/application/src/main/java/org/thingsboard/server/actors/session/AbstractSessionActorMsgProcessor.java
index bbaef5d..0e0175b 100644
--- a/application/src/main/java/org/thingsboard/server/actors/session/AbstractSessionActorMsgProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/session/AbstractSessionActorMsgProcessor.java
@@ -21,6 +21,7 @@ import org.thingsboard.server.actors.shared.SessionTimeoutMsg;
 import org.thingsboard.server.common.data.id.DeviceId;
 import org.thingsboard.server.common.data.id.SessionId;
 import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
+import org.thingsboard.server.common.msg.cluster.SendToClusterMsg;
 import org.thingsboard.server.common.msg.cluster.ServerAddress;
 import org.thingsboard.server.common.msg.device.BasicDeviceToDeviceActorMsg;
 import org.thingsboard.server.common.msg.device.DeviceToDeviceActorMsg;
@@ -87,22 +88,19 @@ abstract class AbstractSessionActorMsgProcessor extends AbstractContextAwareMsgP
     }
 
     protected Optional<ServerAddress> forwardToAppActorIfAdressChanged(ActorContext ctx, DeviceToDeviceActorMsg toForward, Optional<ServerAddress> oldAddress) {
+
         Optional<ServerAddress> newAddress = systemContext.getRoutingService().resolveById(toForward.getDeviceId());
         if (!newAddress.equals(oldAddress)) {
-            if (newAddress.isPresent()) {
-                systemContext.getRpcService().tell(newAddress.get(),
-                        toForward.toOtherAddress(systemContext.getRoutingService().getCurrentServer()));
-            } else {
-                getAppActor().tell(toForward, ctx.self());
-            }
+            getAppActor().tell(new SendToClusterMsg(toForward.getDeviceId(), toForward
+                    .toOtherAddress(systemContext.getRoutingService().getCurrentServer())), ctx.self());
         }
         return newAddress;
     }
 
     protected void forwardToAppActor(ActorContext ctx, DeviceToDeviceActorMsg toForward, Optional<ServerAddress> address) {
         if (address.isPresent()) {
-            systemContext.getRpcService().tell(address.get(),
-                    toForward.toOtherAddress(systemContext.getRoutingService().getCurrentServer()));
+            systemContext.getRpcService().tell(systemContext.getEncodingService().convertToProtoDataMessage(address.get(),
+                    toForward.toOtherAddress(systemContext.getRoutingService().getCurrentServer())));
         } else {
             getAppActor().tell(toForward, ctx.self());
         }
diff --git a/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ServerInstance.java b/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ServerInstance.java
index 03c9694..bc386eb 100644
--- a/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ServerInstance.java
+++ b/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ServerInstance.java
@@ -20,7 +20,7 @@ import lombok.EqualsAndHashCode;
 import lombok.Getter;
 import lombok.ToString;
 import org.thingsboard.server.common.msg.cluster.ServerAddress;
-import org.thingsboard.server.gen.discovery.ServerInstanceProtos.ServerInfo;
+import org.thingsboard.server.gen.discovery.ServerInstanceProtos;
 
 /**
  * @author Andrew Shvayka
@@ -29,8 +29,6 @@ import org.thingsboard.server.gen.discovery.ServerInstanceProtos.ServerInfo;
 @EqualsAndHashCode(exclude = {"serverInfo", "serverAddress"})
 public final class ServerInstance implements Comparable<ServerInstance> {
 
-    @Getter(AccessLevel.PACKAGE)
-    private final ServerInfo serverInfo;
     @Getter
     private final String host;
     @Getter
@@ -38,8 +36,13 @@ public final class ServerInstance implements Comparable<ServerInstance> {
     @Getter
     private final ServerAddress serverAddress;
 
-    public ServerInstance(ServerInfo serverInfo) {
-        this.serverInfo = serverInfo;
+    public ServerInstance(ServerAddress serverAddress) {
+        this.serverAddress = serverAddress;
+        this.host = serverAddress.getHost();
+        this.port = serverAddress.getPort();
+    }
+
+    public ServerInstance(ServerInstanceProtos.ServerInfo serverInfo) {
         this.host = serverInfo.getHost();
         this.port = serverInfo.getPort();
         this.serverAddress = new ServerAddress(host, port);
diff --git a/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ZkDiscoveryService.java b/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ZkDiscoveryService.java
index 818d2b1..fbda119 100644
--- a/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ZkDiscoveryService.java
+++ b/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ZkDiscoveryService.java
@@ -15,8 +15,9 @@
  */
 package org.thingsboard.server.service.cluster.discovery;
 
-import com.google.protobuf.InvalidProtocolBufferException;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.SerializationException;
+import org.apache.commons.lang3.SerializationUtils;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.recipes.cache.ChildData;
@@ -33,13 +34,13 @@ import org.springframework.boot.context.event.ApplicationReadyEvent;
 import org.springframework.context.ApplicationListener;
 import org.springframework.stereotype.Service;
 import org.springframework.util.Assert;
-import org.thingsboard.server.gen.discovery.ServerInstanceProtos.ServerInfo;
+import org.thingsboard.server.common.msg.cluster.ServerAddress;
 import org.thingsboard.server.utils.MiscUtils;
 
 import javax.annotation.PostConstruct;
 import javax.annotation.PreDestroy;
-import java.io.IOException;
 import java.util.List;
+import java.util.NoSuchElementException;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.stream.Collectors;
 
@@ -113,7 +114,7 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
             log.info("[{}:{}] Creating ZK node for current instance", self.getHost(), self.getPort());
             nodePath = client.create()
                     .creatingParentsIfNeeded()
-                    .withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(zkNodesDir + "/", self.getServerInfo().toByteArray());
+                    .withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(zkNodesDir + "/", SerializationUtils.serialize(self.getServerAddress()));
             log.info("[{}:{}] Created ZK node for current instance: {}", self.getHost(), self.getPort(), nodePath);
         } catch (Exception e) {
             log.error("Failed to create ZK node", e);
@@ -144,8 +145,8 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
                 .filter(cd -> !cd.getPath().equals(nodePath))
                 .map(cd -> {
                     try {
-                        return new ServerInstance(ServerInfo.parseFrom(cd.getData()));
-                    } catch (InvalidProtocolBufferException e) {
+                        return new ServerInstance( (ServerAddress) SerializationUtils.deserialize(cd.getData()));
+                    } catch (NoSuchElementException e) {
                         log.error("Failed to decode ZK node", e);
                         throw new RuntimeException(e);
                     }
@@ -186,8 +187,9 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
         }
         ServerInstance instance;
         try {
-            instance = new ServerInstance(ServerInfo.parseFrom(data.getData()));
-        } catch (IOException e) {
+            ServerAddress serverAddress  = SerializationUtils.deserialize(data.getData());
+            instance = new ServerInstance(serverAddress);
+        } catch (SerializationException e) {
             log.error("Failed to decode server instance for node {}", data.getPath(), e);
             throw e;
         }
diff --git a/application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterGrpcService.java b/application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterGrpcService.java
index 27334c6..49c1a23 100644
--- a/application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterGrpcService.java
+++ b/application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterGrpcService.java
@@ -25,26 +25,20 @@ import org.springframework.stereotype.Service;
 import org.springframework.util.SerializationUtils;
 import org.thingsboard.server.actors.rpc.RpcBroadcastMsg;
 import org.thingsboard.server.actors.rpc.RpcSessionCreateRequestMsg;
-import org.thingsboard.server.actors.rpc.RpcSessionTellMsg;
 import org.thingsboard.server.common.data.id.EntityId;
-import org.thingsboard.server.common.msg.cluster.ServerAddress;
-import org.thingsboard.server.common.msg.cluster.ToAllNodesMsg;
 import org.thingsboard.server.common.msg.core.ToDeviceSessionActorMsg;
-import org.thingsboard.server.common.msg.device.DeviceToDeviceActorMsg;
-import org.thingsboard.server.extensions.api.device.ToDeviceActorNotificationMsg;
-import org.thingsboard.server.extensions.api.plugins.msg.FromDeviceRpcResponse;
-import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest;
-import org.thingsboard.server.extensions.api.plugins.msg.ToPluginRpcResponseDeviceMsg;
-import org.thingsboard.server.extensions.api.plugins.rpc.PluginRpcMsg;
+
 import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
+
 import org.thingsboard.server.gen.cluster.ClusterRpcServiceGrpc;
 import org.thingsboard.server.service.cluster.discovery.ServerInstance;
 import org.thingsboard.server.service.cluster.discovery.ServerInstanceService;
-import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg;
 
 import javax.annotation.PreDestroy;
 import java.io.IOException;
 import java.util.UUID;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
@@ -64,7 +58,8 @@ public class ClusterGrpcService extends ClusterRpcServiceGrpc.ClusterRpcServiceI
 
     private ServerInstance instance;
 
-    private ConcurrentMap<UUID, RpcSessionCreationFuture> pendingSessionMap = new ConcurrentHashMap<>();
+    private ConcurrentMap<UUID, BlockingQueue<StreamObserver<ClusterAPIProtos.ClusterMessage>>> pendingSessionMap =
+            new ConcurrentHashMap<>();
 
     public void init(RpcMsgListener listener) {
         this.listener = listener;
@@ -82,11 +77,11 @@ public class ClusterGrpcService extends ClusterRpcServiceGrpc.ClusterRpcServiceI
     }
 
     @Override
-    public void onSessionCreated(UUID msgUid, StreamObserver<ClusterAPIProtos.ToRpcServerMessage> msg) {
-        RpcSessionCreationFuture future = pendingSessionMap.remove(msgUid);
-        if (future != null) {
+    public void onSessionCreated(UUID msgUid,  StreamObserver<ClusterAPIProtos.ClusterMessage> inputStream) {
+        BlockingQueue<StreamObserver<ClusterAPIProtos.ClusterMessage>> queue  = pendingSessionMap.remove(msgUid);
+        if (queue != null) {
             try {
-                future.onMsg(msg);
+                queue.put(inputStream);
             } catch (InterruptedException e) {
                 log.warn("Failed to report created session!");
                 Thread.currentThread().interrupt();
@@ -97,11 +92,13 @@ public class ClusterGrpcService extends ClusterRpcServiceGrpc.ClusterRpcServiceI
     }
 
     @Override
-    public StreamObserver<ClusterAPIProtos.ToRpcServerMessage> handlePluginMsgs(StreamObserver<ClusterAPIProtos.ToRpcServerMessage> responseObserver) {
+    public StreamObserver<ClusterAPIProtos.ClusterMessage> handleMsgs(
+            StreamObserver<ClusterAPIProtos.ClusterMessage> responseObserver) {
         log.info("Processing new session.");
         return createSession(new RpcSessionCreateRequestMsg(UUID.randomUUID(), null, responseObserver));
     }
 
+
     @PreDestroy
     public void stop() {
         if (server != null) {
@@ -117,65 +114,18 @@ public class ClusterGrpcService extends ClusterRpcServiceGrpc.ClusterRpcServiceI
         }
     }
 
-    @Override
-    public void tell(ServerAddress serverAddress, DeviceToDeviceActorMsg toForward) {
-        ClusterAPIProtos.ToRpcServerMessage msg = ClusterAPIProtos.ToRpcServerMessage.newBuilder()
-                .setToDeviceActorRpcMsg(toProtoMsg(toForward)).build();
-        tell(serverAddress, msg);
-    }
-
-    @Override
-    public void tell(ServerAddress serverAddress, ToDeviceActorNotificationMsg toForward) {
-        ClusterAPIProtos.ToRpcServerMessage msg = ClusterAPIProtos.ToRpcServerMessage.newBuilder()
-                .setToDeviceActorNotificationRpcMsg(toProtoMsg(toForward)).build();
-        tell(serverAddress, msg);
-    }
-
-    @Override
-    public void tell(ServerAddress serverAddress, ToDeviceRpcRequestActorMsg toForward) {
-        ClusterAPIProtos.ToRpcServerMessage msg = ClusterAPIProtos.ToRpcServerMessage.newBuilder()
-                .setToDeviceRpcRequestRpcMsg(toProtoMsg(toForward)).build();
-        tell(serverAddress, msg);
-    }
-
-    @Override
-    public void tell(ServerAddress serverAddress, ToPluginRpcResponseDeviceMsg toForward) {
-        ClusterAPIProtos.ToRpcServerMessage msg = ClusterAPIProtos.ToRpcServerMessage.newBuilder()
-                .setToPluginRpcResponseRpcMsg(toProtoMsg(toForward)).build();
-        tell(serverAddress, msg);
-    }
-
-    @Override
-    public void tell(ServerAddress serverAddress, ToDeviceSessionActorMsg toForward) {
-        ClusterAPIProtos.ToRpcServerMessage msg = ClusterAPIProtos.ToRpcServerMessage.newBuilder()
-                .setToDeviceSessionActorRpcMsg(toProtoMsg(toForward)).build();
-        tell(serverAddress, msg);
-    }
 
     @Override
-    public void tell(PluginRpcMsg toForward) {
-        ClusterAPIProtos.ToRpcServerMessage msg = ClusterAPIProtos.ToRpcServerMessage.newBuilder()
-                .setToPluginRpcMsg(toProtoMsg(toForward)).build();
-        tell(toForward.getRpcMsg().getServerAddress(), msg);
-    }
-
-    @Override
-    public void broadcast(ToAllNodesMsg toForward) {
-        ClusterAPIProtos.ToRpcServerMessage msg = ClusterAPIProtos.ToRpcServerMessage.newBuilder()
-                .setToAllNodesRpcMsg(toProtoMsg(toForward)).build();
-        listener.onMsg(new RpcBroadcastMsg(msg));
-    }
-
-    private void tell(ServerAddress serverAddress, ClusterAPIProtos.ToRpcServerMessage msg) {
-        listener.onMsg(new RpcSessionTellMsg(serverAddress, msg));
+    public void broadcast(RpcBroadcastMsg msg) {
+        listener.onBroadcastMsg(msg);
     }
 
-    private StreamObserver<ClusterAPIProtos.ToRpcServerMessage> createSession(RpcSessionCreateRequestMsg msg) {
-        RpcSessionCreationFuture future = new RpcSessionCreationFuture();
-        pendingSessionMap.put(msg.getMsgUid(), future);
-        listener.onMsg(msg);
+    private  StreamObserver<ClusterAPIProtos.ClusterMessage> createSession(RpcSessionCreateRequestMsg msg) {
+        BlockingQueue<StreamObserver<ClusterAPIProtos.ClusterMessage>> queue = new ArrayBlockingQueue<>(1);
+        pendingSessionMap.put(msg.getMsgUid(), queue);
+        listener.onRpcSessionCreateRequestMsg(msg);
         try {
-            StreamObserver<ClusterAPIProtos.ToRpcServerMessage> observer = future.get();
+            StreamObserver<ClusterAPIProtos.ClusterMessage> observer = queue.take();
             log.info("Processed new session.");
             return observer;
         } catch (Exception e) {
@@ -184,76 +134,10 @@ public class ClusterGrpcService extends ClusterRpcServiceGrpc.ClusterRpcServiceI
         }
     }
 
-    private static ClusterAPIProtos.ToDeviceActorRpcMessage toProtoMsg(DeviceToDeviceActorMsg msg) {
-        return ClusterAPIProtos.ToDeviceActorRpcMessage.newBuilder().setData(
-                ByteString.copyFrom(SerializationUtils.serialize(msg))
-        ).build();
-    }
-
-    private static ClusterAPIProtos.ToDeviceActorNotificationRpcMessage toProtoMsg(ToDeviceActorNotificationMsg msg) {
-        return ClusterAPIProtos.ToDeviceActorNotificationRpcMessage.newBuilder().setData(
-                ByteString.copyFrom(SerializationUtils.serialize(msg))
-        ).build();
-    }
-
-    private static ClusterAPIProtos.ToDeviceRpcRequestRpcMessage toProtoMsg(ToDeviceRpcRequestActorMsg msg) {
-        ClusterAPIProtos.ToDeviceRpcRequestRpcMessage.Builder builder = ClusterAPIProtos.ToDeviceRpcRequestRpcMessage.newBuilder();
-        ToDeviceRpcRequest request = msg.getMsg();
-
-        builder.setDeviceTenantId(toUid(msg.getTenantId()));
-        builder.setDeviceId(toUid(msg.getDeviceId()));
-
-        builder.setMsgId(toUid(request.getId()));
-        builder.setOneway(request.isOneway());
-        builder.setExpTime(request.getExpirationTime());
-        builder.setMethod(request.getBody().getMethod());
-        builder.setParams(request.getBody().getParams());
-
-        return builder.build();
-    }
-
-    private static ClusterAPIProtos.ToPluginRpcResponseRpcMessage toProtoMsg(ToPluginRpcResponseDeviceMsg msg) {
-        ClusterAPIProtos.ToPluginRpcResponseRpcMessage.Builder builder = ClusterAPIProtos.ToPluginRpcResponseRpcMessage.newBuilder();
-        FromDeviceRpcResponse request = msg.getResponse();
-
-        builder.setMsgId(toUid(request.getId()));
-        request.getResponse().ifPresent(builder::setResponse);
-        request.getError().ifPresent(e -> builder.setError(e.name()));
-
-        return builder.build();
-    }
-
-    private ClusterAPIProtos.ToAllNodesRpcMessage toProtoMsg(ToAllNodesMsg msg) {
-        return ClusterAPIProtos.ToAllNodesRpcMessage.newBuilder().setData(
-                ByteString.copyFrom(SerializationUtils.serialize(msg))
-        ).build();
-    }
-
-
-    private ClusterAPIProtos.ToPluginRpcMessage toProtoMsg(PluginRpcMsg msg) {
-        return ClusterAPIProtos.ToPluginRpcMessage.newBuilder()
-                .setClazz(msg.getRpcMsg().getMsgClazz())
-                .setData(ByteString.copyFrom(msg.getRpcMsg().getMsgData()))
-                .setAddress(ClusterAPIProtos.PluginAddress.newBuilder()
-                        .setTenantId(toUid(msg.getPluginTenantId().getId()))
-                        .setPluginId(toUid(msg.getPluginId().getId()))
-                        .build()
-                ).build();
-    }
-
-    private static ClusterAPIProtos.Uid toUid(EntityId uuid) {
-        return toUid(uuid.getId());
-    }
-
-    private static ClusterAPIProtos.Uid toUid(UUID uuid) {
-        return ClusterAPIProtos.Uid.newBuilder().setPluginUuidMsb(uuid.getMostSignificantBits()).setPluginUuidLsb(
-                uuid.getLeastSignificantBits()).build();
+    @Override
+    public void tell(ClusterAPIProtos.ClusterMessage message) {
+        listener.onSendMsg(message);
     }
 
-    private static ClusterAPIProtos.ToDeviceSessionActorRpcMessage toProtoMsg(ToDeviceSessionActorMsg msg) {
-        return ClusterAPIProtos.ToDeviceSessionActorRpcMessage.newBuilder().setData(
-                ByteString.copyFrom(SerializationUtils.serialize(msg))
-        ).build();
-    }
 
 }
diff --git a/application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterRpcService.java b/application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterRpcService.java
index 6aefe46..075857c 100644
--- a/application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterRpcService.java
+++ b/application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterRpcService.java
@@ -16,6 +16,7 @@
 package org.thingsboard.server.service.cluster.rpc;
 
 import io.grpc.stub.StreamObserver;
+import org.thingsboard.server.actors.rpc.RpcBroadcastMsg;
 import org.thingsboard.server.common.msg.cluster.ServerAddress;
 import org.thingsboard.server.common.msg.cluster.ToAllNodesMsg;
 import org.thingsboard.server.common.msg.core.ToDeviceSessionActorMsg;
@@ -35,20 +36,10 @@ public interface ClusterRpcService {
 
     void init(RpcMsgListener listener);
 
-    void tell(ServerAddress serverAddress, DeviceToDeviceActorMsg toForward);
+    void broadcast(RpcBroadcastMsg msg);
 
-    void tell(ServerAddress serverAddress, ToDeviceSessionActorMsg toForward);
+    void onSessionCreated(UUID msgUid, StreamObserver<ClusterAPIProtos.ClusterMessage> inputStream);
 
-    void tell(ServerAddress serverAddress, ToDeviceActorNotificationMsg toForward);
-
-    void tell(ServerAddress serverAddress, ToDeviceRpcRequestActorMsg toForward);
-
-    void tell(ServerAddress serverAddress, ToPluginRpcResponseDeviceMsg toForward);
-
-    void tell(PluginRpcMsg toForward);
-
-    void broadcast(ToAllNodesMsg msg);
-
-    void onSessionCreated(UUID msgUid, StreamObserver<ClusterAPIProtos.ToRpcServerMessage> inputStream);
+    void tell(ClusterAPIProtos.ClusterMessage message);
 
 }
diff --git a/application/src/main/java/org/thingsboard/server/service/cluster/rpc/GrpcSession.java b/application/src/main/java/org/thingsboard/server/service/cluster/rpc/GrpcSession.java
index c403895..e56fb75 100644
--- a/application/src/main/java/org/thingsboard/server/service/cluster/rpc/GrpcSession.java
+++ b/application/src/main/java/org/thingsboard/server/service/cluster/rpc/GrpcSession.java
@@ -33,8 +33,8 @@ public final class GrpcSession implements Closeable {
     private final UUID sessionId;
     private final boolean client;
     private final GrpcSessionListener listener;
-    private StreamObserver<ClusterAPIProtos.ToRpcServerMessage> inputStream;
-    private StreamObserver<ClusterAPIProtos.ToRpcServerMessage> outputStream;
+    private StreamObserver<ClusterAPIProtos.ClusterMessage> inputStream;
+    private StreamObserver<ClusterAPIProtos.ClusterMessage> outputStream;
 
     private boolean connected;
     private ServerAddress remoteServer;
@@ -56,17 +56,17 @@ public final class GrpcSession implements Closeable {
     }
 
     public void initInputStream() {
-        this.inputStream = new StreamObserver<ClusterAPIProtos.ToRpcServerMessage>() {
+        this.inputStream = new StreamObserver<ClusterAPIProtos.ClusterMessage>() {
             @Override
-            public void onNext(ClusterAPIProtos.ToRpcServerMessage msg) {
-                if (!connected && msg.hasConnectMsg()) {
+            public void onNext(ClusterAPIProtos.ClusterMessage clusterMessage) {
+                if (!connected && clusterMessage.getMessageType() == ClusterAPIProtos.MessageType.CONNECT_RPC_MESSAGE) {
                     connected = true;
-                    ClusterAPIProtos.ServerAddress rpcAddress = msg.getConnectMsg().getServerAddress();
+                    ServerAddress rpcAddress = new ServerAddress(clusterMessage.getServerAdresss().getHost(), clusterMessage.getServerAdresss().getPort());
                     remoteServer = new ServerAddress(rpcAddress.getHost(), rpcAddress.getPort());
                     listener.onConnected(GrpcSession.this);
                 }
                 if (connected) {
-                    handleToRpcServerMessage(msg);
+                    listener.onReceiveClusterGrpcMsg(GrpcSession.this, clusterMessage);
                 }
             }
 
@@ -83,37 +83,13 @@ public final class GrpcSession implements Closeable {
         };
     }
 
-    private void handleToRpcServerMessage(ClusterAPIProtos.ToRpcServerMessage msg) {
-        if (msg.hasToPluginRpcMsg()) {
-            listener.onToPluginRpcMsg(GrpcSession.this, msg.getToPluginRpcMsg());
-        }
-        if (msg.hasToDeviceActorRpcMsg()) {
-            listener.onToDeviceActorRpcMsg(GrpcSession.this, msg.getToDeviceActorRpcMsg());
-        }
-        if (msg.hasToDeviceSessionActorRpcMsg()) {
-            listener.onToDeviceSessionActorRpcMsg(GrpcSession.this, msg.getToDeviceSessionActorRpcMsg());
-        }
-        if (msg.hasToDeviceActorNotificationRpcMsg()) {
-            listener.onToDeviceActorNotificationRpcMsg(GrpcSession.this, msg.getToDeviceActorNotificationRpcMsg());
-        }
-        if (msg.hasToDeviceRpcRequestRpcMsg()) {
-            listener.onToDeviceRpcRequestRpcMsg(GrpcSession.this, msg.getToDeviceRpcRequestRpcMsg());
-        }
-        if (msg.hasToPluginRpcResponseRpcMsg()) {
-            listener.onFromDeviceRpcResponseRpcMsg(GrpcSession.this, msg.getToPluginRpcResponseRpcMsg());
-        }
-        if (msg.hasToAllNodesRpcMsg()) {
-            listener.onToAllNodesRpcMessage(GrpcSession.this, msg.getToAllNodesRpcMsg());
-        }
-    }
-
     public void initOutputStream() {
         if (client) {
             listener.onConnected(GrpcSession.this);
         }
     }
 
-    public void sendMsg(ClusterAPIProtos.ToRpcServerMessage msg) {
+    public void sendMsg(ClusterAPIProtos.ClusterMessage msg) {
         outputStream.onNext(msg);
     }
 
diff --git a/application/src/main/java/org/thingsboard/server/service/cluster/rpc/GrpcSessionListener.java b/application/src/main/java/org/thingsboard/server/service/cluster/rpc/GrpcSessionListener.java
index 44e0693..266b1f5 100644
--- a/application/src/main/java/org/thingsboard/server/service/cluster/rpc/GrpcSessionListener.java
+++ b/application/src/main/java/org/thingsboard/server/service/cluster/rpc/GrpcSessionListener.java
@@ -26,20 +26,7 @@ public interface GrpcSessionListener {
 
     void onDisconnected(GrpcSession session);
 
-    void onToPluginRpcMsg(GrpcSession session, ClusterAPIProtos.ToPluginRpcMessage msg);
-
-    void onToDeviceActorRpcMsg(GrpcSession session, ClusterAPIProtos.ToDeviceActorRpcMessage msg);
-
-    void onToDeviceActorNotificationRpcMsg(GrpcSession grpcSession, ClusterAPIProtos.ToDeviceActorNotificationRpcMessage msg);
-
-    void onToDeviceSessionActorRpcMsg(GrpcSession session, ClusterAPIProtos.ToDeviceSessionActorRpcMessage msg);
-
-    void onToAllNodesRpcMessage(GrpcSession grpcSession, ClusterAPIProtos.ToAllNodesRpcMessage toAllNodesRpcMessage);
-
-    void onToDeviceRpcRequestRpcMsg(GrpcSession grpcSession, ClusterAPIProtos.ToDeviceRpcRequestRpcMessage toDeviceRpcRequestRpcMsg);
-
-    void onFromDeviceRpcResponseRpcMsg(GrpcSession grpcSession, ClusterAPIProtos.ToPluginRpcResponseRpcMessage toPluginRpcResponseRpcMsg);
+    void onReceiveClusterGrpcMsg(GrpcSession session, ClusterAPIProtos.ClusterMessage clusterMessage);
 
     void onError(GrpcSession session, Throwable t);
-
 }
diff --git a/application/src/main/java/org/thingsboard/server/service/cluster/rpc/RpcMsgListener.java b/application/src/main/java/org/thingsboard/server/service/cluster/rpc/RpcMsgListener.java
index 5d26fae..88d17e0 100644
--- a/application/src/main/java/org/thingsboard/server/service/cluster/rpc/RpcMsgListener.java
+++ b/application/src/main/java/org/thingsboard/server/service/cluster/rpc/RpcMsgListener.java
@@ -17,32 +17,15 @@ package org.thingsboard.server.service.cluster.rpc;
 
 import org.thingsboard.server.actors.rpc.RpcBroadcastMsg;
 import org.thingsboard.server.actors.rpc.RpcSessionCreateRequestMsg;
-import org.thingsboard.server.actors.rpc.RpcSessionTellMsg;
-import org.thingsboard.server.common.msg.cluster.ToAllNodesMsg;
-import org.thingsboard.server.common.msg.core.ToDeviceSessionActorMsg;
-import org.thingsboard.server.common.msg.device.DeviceToDeviceActorMsg;
-import org.thingsboard.server.extensions.api.device.ToDeviceActorNotificationMsg;
-import org.thingsboard.server.extensions.api.plugins.msg.ToPluginActorMsg;
+import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
 
 /**
  * @author Andrew Shvayka
  */
-public interface RpcMsgListener {
-
-    void onMsg(DeviceToDeviceActorMsg msg);
-
-    void onMsg(ToDeviceActorNotificationMsg msg);
-
-    void onMsg(ToDeviceSessionActorMsg msg);
-
-    void onMsg(ToAllNodesMsg nodeMsg);
-
-    void onMsg(ToPluginActorMsg msg);
-
-    void onMsg(RpcSessionCreateRequestMsg msg);
-
-    void onMsg(RpcSessionTellMsg rpcSessionTellMsg);
-
-    void onMsg(RpcBroadcastMsg rpcBroadcastMsg);
 
+public interface RpcMsgListener {
+    void onRecievedMsg(ClusterAPIProtos.ClusterMessage msg);
+    void onSendMsg(ClusterAPIProtos.ClusterMessage msg);
+    void onRpcSessionCreateRequestMsg(RpcSessionCreateRequestMsg msg);
+    void onBroadcastMsg(RpcBroadcastMsg msg);
 }
diff --git a/application/src/main/java/org/thingsboard/server/service/encoding/DataDecodingEncodingService.java b/application/src/main/java/org/thingsboard/server/service/encoding/DataDecodingEncodingService.java
new file mode 100644
index 0000000..248e9f3
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/service/encoding/DataDecodingEncodingService.java
@@ -0,0 +1,34 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.service.encoding;
+
+import org.thingsboard.server.common.msg.TbActorMsg;
+import org.thingsboard.server.common.msg.cluster.ServerAddress;
+import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
+
+import java.util.Optional;
+
+public interface DataDecodingEncodingService {
+
+    Optional<TbActorMsg> decode(byte[] byteArray);
+
+    byte[] encode(TbActorMsg msq);
+
+    ClusterAPIProtos.ClusterMessage convertToProtoDataMessage(ServerAddress serverAddress,
+                                                              TbActorMsg msg);
+
+}
+
diff --git a/application/src/main/java/org/thingsboard/server/service/encoding/ProtoWithJavaSerializationDecodingEncodingService.java b/application/src/main/java/org/thingsboard/server/service/encoding/ProtoWithJavaSerializationDecodingEncodingService.java
new file mode 100644
index 0000000..f6a365e
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/service/encoding/ProtoWithJavaSerializationDecodingEncodingService.java
@@ -0,0 +1,67 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.service.encoding;
+
+import com.google.protobuf.ByteString;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Service;
+import org.springframework.util.SerializationUtils;
+import org.thingsboard.server.common.msg.TbActorMsg;
+import org.thingsboard.server.common.msg.cluster.ServerAddress;
+import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
+
+import java.util.Optional;
+
+import static org.thingsboard.server.gen.cluster.ClusterAPIProtos.MessageType.CLUSTER_NETWORK_SERVER_DATA_MESSAGE;
+
+
+@Slf4j
+@Service
+public class ProtoWithJavaSerializationDecodingEncodingService implements DataDecodingEncodingService {
+
+
+    @Override
+    public Optional<TbActorMsg> decode(byte[] byteArray) {
+        try {
+            TbActorMsg msg = (TbActorMsg) SerializationUtils.deserialize(byteArray);
+            return Optional.of(msg);
+
+        } catch (IllegalArgumentException e) {
+            log.error("Error during deserialization message, [{}]", e.getMessage());
+           return Optional.empty();
+        }
+    }
+
+    @Override
+    public byte[] encode(TbActorMsg msq) {
+        return SerializationUtils.serialize(msq);
+    }
+
+    @Override
+    public ClusterAPIProtos.ClusterMessage convertToProtoDataMessage(ServerAddress serverAddress,
+                                                                     TbActorMsg msg) {
+        return ClusterAPIProtos.ClusterMessage
+                .newBuilder()
+                .setServerAdresss(ClusterAPIProtos.ServerAddress
+                        .newBuilder()
+                        .setHost(serverAddress.getHost())
+                        .setPort(serverAddress.getPort())
+                        .build())
+                .setMessageType(CLUSTER_NETWORK_SERVER_DATA_MESSAGE)
+                .setPayload(ByteString.copyFrom(encode(msg))).build();
+
+    }
+}
diff --git a/application/src/main/java/org/thingsboard/server/service/rpc/DefaultDeviceRpcService.java b/application/src/main/java/org/thingsboard/server/service/rpc/DefaultDeviceRpcService.java
index 9674490..6a1a69a 100644
--- a/application/src/main/java/org/thingsboard/server/service/rpc/DefaultDeviceRpcService.java
+++ b/application/src/main/java/org/thingsboard/server/service/rpc/DefaultDeviceRpcService.java
@@ -30,6 +30,8 @@ import org.thingsboard.server.common.data.id.EntityId;
 import org.thingsboard.server.common.data.id.TenantId;
 import org.thingsboard.server.common.data.id.UUIDBased;
 import org.thingsboard.server.common.data.rpc.ToDeviceRpcRequestBody;
+import org.thingsboard.server.common.msg.TbActorMsg;
+import org.thingsboard.server.common.msg.cluster.SendToClusterMsg;
 import org.thingsboard.server.common.msg.cluster.ServerAddress;
 import org.thingsboard.server.common.msg.core.ToServerRpcResponseMsg;
 import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest;
@@ -38,6 +40,7 @@ import org.thingsboard.server.dao.audit.AuditLogService;
 import org.thingsboard.server.extensions.api.device.ToDeviceActorNotificationMsg;
 import org.thingsboard.server.extensions.api.plugins.msg.FromDeviceRpcResponse;
 import org.thingsboard.server.extensions.api.plugins.msg.RpcError;
+import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
 import org.thingsboard.server.service.cluster.routing.ClusterRoutingService;
 import org.thingsboard.server.service.cluster.rpc.ClusterRpcService;
 import org.thingsboard.server.service.security.model.SecurityUser;
@@ -135,23 +138,16 @@ public class DefaultDeviceRpcService implements DeviceRpcService {
     @Override
     public void sendRpcReplyToDevice(TenantId tenantId, DeviceId deviceId, int requestId, String body) {
         ToServerRpcResponseActorMsg rpcMsg = new ToServerRpcResponseActorMsg(tenantId, deviceId, new ToServerRpcResponseMsg(requestId, body));
-        forward(deviceId, rpcMsg, rpcService::tell);
+        forward(deviceId, rpcMsg);
     }
 
     private void sendRpcRequest(ToDeviceRpcRequest msg) {
         log.trace("[{}] Forwarding msg {} to device actor!", msg.getDeviceId(), msg);
         ToDeviceRpcRequestActorMsg rpcMsg = new ToDeviceRpcRequestActorMsg(msg);
-        forward(msg.getDeviceId(), rpcMsg, rpcService::tell);
+        forward(msg.getDeviceId(), rpcMsg);
     }
 
-    private <T extends ToDeviceActorNotificationMsg> void forward(DeviceId deviceId, T msg, BiConsumer<ServerAddress, T> rpcFunction) {
-        Optional<ServerAddress> instance = routingService.resolveById(deviceId);
-        if (instance.isPresent()) {
-            log.trace("[{}] Forwarding msg {} to remote device actor!", msg.getTenantId(), msg);
-            rpcFunction.accept(instance.get(), msg);
-        } else {
-            log.trace("[{}] Forwarding msg {} to local device actor!", msg.getTenantId(), msg);
-            actorService.onMsg(msg);
-        }
+    private <T extends ToDeviceActorNotificationMsg> void forward(DeviceId deviceId, T msg) {
+        actorService.onMsg(new SendToClusterMsg(deviceId, msg));
     }
 }
diff --git a/application/src/main/proto/cluster.proto b/application/src/main/proto/cluster.proto
index fc92487..2ecffe7 100644
--- a/application/src/main/proto/cluster.proto
+++ b/application/src/main/proto/cluster.proto
@@ -19,79 +19,73 @@ package cluster;
 option java_package = "org.thingsboard.server.gen.cluster";
 option java_outer_classname = "ClusterAPIProtos";
 
-message ServerAddress {
-  string host = 1;
-  int32 port = 2;
-}
-
-message Uid {
-  sint64 pluginUuidMsb = 1;
-  sint64 pluginUuidLsb = 2;
-}
-
-message PluginAddress {
-  Uid pluginId = 1;
-  Uid tenantId = 2;
-}
-
-message ToPluginRpcMessage {
-  PluginAddress address = 1;
-  int32 clazz = 2;
-  bytes data = 3;
-}
-
-message ToDeviceActorRpcMessage {
-  bytes data = 1;
-}
-
-message ToDeviceSessionActorRpcMessage {
-  bytes data = 1;
-}
-
-message ToDeviceActorNotificationRpcMessage {
-  bytes data = 1;
-}
-
-message ToAllNodesRpcMessage {
-  bytes data = 1;
-}
-
-message ConnectRpcMessage {
-  ServerAddress serverAddress = 1;
-}
-
-message ToDeviceRpcRequestRpcMessage {
-  Uid deviceTenantId = 2;
-  Uid deviceId = 3;
-
-  Uid msgId = 4;
-  bool oneway = 5;
-  int64 expTime = 6;
-  string method = 7;
-  string params = 8;
-}
-
-message ToPluginRpcResponseRpcMessage {
-  Uid msgId = 2;
-  string response = 3;
-  string error = 4;
-}
-
-message ToRpcServerMessage {
-  ConnectRpcMessage connectMsg = 1;
-  ToPluginRpcMessage toPluginRpcMsg = 2;
-  ToDeviceActorRpcMessage toDeviceActorRpcMsg = 3;
-  ToDeviceSessionActorRpcMessage toDeviceSessionActorRpcMsg = 4;
-  ToDeviceActorNotificationRpcMessage toDeviceActorNotificationRpcMsg = 5;
-  ToAllNodesRpcMessage toAllNodesRpcMsg = 6;
-  ToDeviceRpcRequestRpcMessage toDeviceRpcRequestRpcMsg = 7;
-  ToPluginRpcResponseRpcMessage toPluginRpcResponseRpcMsg = 8;
-}
+//message Uid {
+//  sint64 pluginUuidMsb = 1;
+//  sint64 pluginUuidLsb = 2;
+//}
+//
+//message PluginAddress {
+//  Uid pluginId = 1;
+//  Uid tenantId = 2;
+//}
+//
+//message ToPluginRpcMessage {
+//  PluginAddress address = 1;
+//  int32 clazz = 2;
+//  bytes data = 3;
+//}
+//
+//message ToDeviceActorRpcMessage {
+//  bytes data = 1;
+//}
+//
+//message ToDeviceSessionActorRpcMessage {
+//  bytes data = 1;
+//}
+//
+//message ToDeviceActorNotificationRpcMessage {
+//  bytes data = 1;
+//}
+//
+//message ToAllNodesRpcMessage {
+//  bytes data = 1;
+//}
+//
+//message ConnectRpcMessage {
+//  ServerAddress serverAddress = 1;
+//}
+//
+//message ToDeviceRpcRequestRpcMessage {
+//  Uid deviceTenantId = 2;
+//  Uid deviceId = 3;
+//
+//  Uid msgId = 4;
+//  bool oneway = 5;
+//  int64 expTime = 6;
+//  string method = 7;
+//  string params = 8;
+//}
+//
+//message ToPluginRpcResponseRpcMessage {
+//  Uid msgId = 2;
+//  string response = 3;
+//  string error = 4;
+//}
+//
+//message ToRpcServerMessage {
+//  ConnectRpcMessage connectMsg = 1;
+//  ToPluginRpcMessage toPluginRpcMsg = 2;
+//  ToDeviceActorRpcMessage toDeviceActorRpcMsg = 3;
+//  ToDeviceSessionActorRpcMessage toDeviceSessionActorRpcMsg = 4;
+//  ToDeviceActorNotificationRpcMessage toDeviceActorNotificationRpcMsg = 5;
+//  ToAllNodesRpcMessage toAllNodesRpcMsg = 6;
+//  ToDeviceRpcRequestRpcMessage toDeviceRpcRequestRpcMsg = 7;
+//  ToPluginRpcResponseRpcMessage toPluginRpcResponseRpcMsg = 8;
+//}
 
 service ClusterRpcService {
-  rpc handlePluginMsgs(stream ToRpcServerMessage) returns (stream ToRpcServerMessage) {}
+  rpc handleMsgs(stream ClusterMessage) returns (stream ClusterMessage) {}
 }
-
 message ClusterMessage {
   MessageType messageType = 1;
   MessageMataInfo messageMetaInfo = 2;
diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/cluster/SendToClusterMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/cluster/SendToClusterMsg.java
new file mode 100644
index 0000000..d3bc8e9
--- /dev/null
+++ b/common/message/src/main/java/org/thingsboard/server/common/msg/cluster/SendToClusterMsg.java
@@ -0,0 +1,40 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.common.msg.cluster;
+
+import lombok.Data;
+import org.thingsboard.server.common.data.id.DeviceId;
+import org.thingsboard.server.common.data.id.EntityId;
+import org.thingsboard.server.common.msg.MsgType;
+import org.thingsboard.server.common.msg.TbActorMsg;
+
+@Data
+public class SendToClusterMsg implements TbActorMsg {
+
+    private TbActorMsg msg;
+    private EntityId entityId;
+
+    public SendToClusterMsg(EntityId entityId, TbActorMsg msg) {
+        this.entityId = entityId;
+        this.msg = msg;
+    }
+
+
+    @Override
+    public MsgType getMsgType() {
+        return MsgType.SEND_TO_CLUSTER_MSG;
+    }
+}
diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/cluster/ToAllNodesMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/cluster/ToAllNodesMsg.java
index 1dc33c0..877689d 100644
--- a/common/message/src/main/java/org/thingsboard/server/common/msg/cluster/ToAllNodesMsg.java
+++ b/common/message/src/main/java/org/thingsboard/server/common/msg/cluster/ToAllNodesMsg.java
@@ -15,10 +15,12 @@
  */
 package org.thingsboard.server.common.msg.cluster;
 
+import org.thingsboard.server.common.msg.TbActorMsg;
+
 import java.io.Serializable;
 
 /**
  * @author Andrew Shvayka
  */
-public interface ToAllNodesMsg extends Serializable {
+public interface ToAllNodesMsg extends Serializable, TbActorMsg {
 }
diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/core/BasicToDeviceSessionActorMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/core/BasicToDeviceSessionActorMsg.java
index 6a70891..d01c2b4 100644
--- a/common/message/src/main/java/org/thingsboard/server/common/msg/core/BasicToDeviceSessionActorMsg.java
+++ b/common/message/src/main/java/org/thingsboard/server/common/msg/core/BasicToDeviceSessionActorMsg.java
@@ -16,6 +16,7 @@
 package org.thingsboard.server.common.msg.core;
 
 import org.thingsboard.server.common.data.id.SessionId;
+import org.thingsboard.server.common.msg.MsgType;
 import org.thingsboard.server.common.msg.session.ToDeviceMsg;
 
 public class BasicToDeviceSessionActorMsg implements ToDeviceSessionActorMsg {
@@ -44,4 +45,8 @@ public class BasicToDeviceSessionActorMsg implements ToDeviceSessionActorMsg {
         return "BasicToSessionResponseMsg [msg=" + msg + ", sessionId=" + sessionId + "]";
     }
 
+    @Override
+    public MsgType getMsgType() {
+        return null;
+    }
 }
diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/core/ToDeviceSessionActorMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/core/ToDeviceSessionActorMsg.java
index f3e0c0f..d190579 100644
--- a/common/message/src/main/java/org/thingsboard/server/common/msg/core/ToDeviceSessionActorMsg.java
+++ b/common/message/src/main/java/org/thingsboard/server/common/msg/core/ToDeviceSessionActorMsg.java
@@ -15,6 +15,7 @@
  */
 package org.thingsboard.server.common.msg.core;
 
+import org.thingsboard.server.common.msg.TbActorMsg;
 import org.thingsboard.server.common.msg.aware.SessionAwareMsg;
 import org.thingsboard.server.common.msg.session.ToDeviceMsg;
 
@@ -23,7 +24,7 @@ import java.io.Serializable;
 /**
  * @author Andrew Shvayka
  */
-public interface ToDeviceSessionActorMsg extends SessionAwareMsg, Serializable {
+public interface ToDeviceSessionActorMsg extends SessionAwareMsg, Serializable, TbActorMsg {
 
     ToDeviceMsg getMsg();
 }
diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java b/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java
index d63456e..c7a96db 100644
--- a/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java
+++ b/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java
@@ -29,6 +29,11 @@ public enum MsgType {
     CLUSTER_EVENT_MSG,
 
     /**
+     * All messages, could be send  to cluster
+    */
+    SEND_TO_CLUSTER_MSG,
+
+    /**
      * ADDED/UPDATED/DELETED events for main entities.
      *
      * See {@link org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg}
diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/plugin/ComponentLifecycleMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/plugin/ComponentLifecycleMsg.java
index c104281..2ee8432 100644
--- a/common/message/src/main/java/org/thingsboard/server/common/msg/plugin/ComponentLifecycleMsg.java
+++ b/common/message/src/main/java/org/thingsboard/server/common/msg/plugin/ComponentLifecycleMsg.java
@@ -20,7 +20,6 @@ import lombok.ToString;
 import org.thingsboard.server.common.data.EntityType;
 import org.thingsboard.server.common.data.id.*;
 import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
-import org.thingsboard.server.common.data.rule.RuleChain;
 import org.thingsboard.server.common.msg.MsgType;
 import org.thingsboard.server.common.msg.TbActorMsg;
 import org.thingsboard.server.common.msg.aware.TenantAwareMsg;