thingsboard-memoizeit

Fix to RPC logic

10/28/2018 11:10:16 AM

Changes

msa/pom.xml 1(+0 -1)

Details

diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
index 05902a6..021f262 100644
--- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
@@ -41,7 +41,6 @@ import org.thingsboard.server.common.msg.TbMsg;
 import org.thingsboard.server.common.msg.TbMsgDataType;
 import org.thingsboard.server.common.msg.TbMsgMetaData;
 import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
-import org.thingsboard.server.common.msg.cluster.ServerAddress;
 import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest;
 import org.thingsboard.server.common.msg.session.SessionMsgType;
 import org.thingsboard.server.common.msg.timeout.DeviceActorClientSideRpcTimeoutMsg;
@@ -152,7 +151,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
 
         if (request.isOneway() && sent) {
             logger.debug("[{}] Rpc command response sent [{}]!", deviceId, request.getId());
-            systemContext.getDeviceRpcService().processRpcResponseFromDevice(new FromDeviceRpcResponse(msg.getMsg().getId(), msg.getServerAddress(), null, null));
+            systemContext.getDeviceRpcService().processResponseToServerSideRPCRequestFromDeviceActor(new FromDeviceRpcResponse(msg.getMsg().getId(), null, null));
         } else {
             registerPendingRpcRequest(context, msg, sent, rpcRequest, timeout);
         }
@@ -174,8 +173,8 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
         ToDeviceRpcRequestMetadata requestMd = toDeviceRpcPendingMap.remove(msg.getId());
         if (requestMd != null) {
             logger.debug("[{}] RPC request [{}] timeout detected!", deviceId, msg.getId());
-            systemContext.getDeviceRpcService().processRpcResponseFromDevice(new FromDeviceRpcResponse(requestMd.getMsg().getMsg().getId(),
-                    requestMd.getMsg().getServerAddress(), null, requestMd.isSent() ? RpcError.TIMEOUT : RpcError.NO_ACTIVE_CONNECTION));
+            systemContext.getDeviceRpcService().processResponseToServerSideRPCRequestFromDeviceActor(new FromDeviceRpcResponse(requestMd.getMsg().getMsg().getId(),
+                     null, requestMd.isSent() ? RpcError.TIMEOUT : RpcError.NO_ACTIVE_CONNECTION));
         }
     }
 
@@ -207,7 +206,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
             ToDeviceRpcRequestBody body = request.getBody();
             if (request.isOneway()) {
                 sentOneWayIds.add(entry.getKey());
-                systemContext.getDeviceRpcService().processRpcResponseFromDevice(new FromDeviceRpcResponse(request.getId(), requestActorMsg.getServerAddress(), null, null));
+                systemContext.getDeviceRpcService().processResponseToServerSideRPCRequestFromDeviceActor(new FromDeviceRpcResponse(request.getId(), null, null));
             }
             ToDeviceRpcRequestMsg rpcRequest = ToDeviceRpcRequestMsg.newBuilder().setRequestId(
                     entry.getKey()).setMethodName(body.getMethod()).setParams(body.getParams()).build();
@@ -400,8 +399,8 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
         ToDeviceRpcRequestMetadata requestMd = toDeviceRpcPendingMap.remove(responseMsg.getRequestId());
         boolean success = requestMd != null;
         if (success) {
-            systemContext.getDeviceRpcService().processRpcResponseFromDevice(new FromDeviceRpcResponse(requestMd.getMsg().getMsg().getId(),
-                    requestMd.getMsg().getServerAddress(), responseMsg.getPayload(), null));
+            systemContext.getDeviceRpcService().processResponseToServerSideRPCRequestFromDeviceActor(new FromDeviceRpcResponse(requestMd.getMsg().getMsg().getId(),
+                    responseMsg.getPayload(), null));
         } else {
             logger.debug("[{}] Rpc command response [{}] is stale!", deviceId, responseMsg.getRequestId());
         }
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
index 4e779d4..0baecea 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
@@ -17,6 +17,7 @@ package org.thingsboard.server.actors.ruleChain;
 
 import akka.actor.ActorRef;
 import com.datastax.driver.core.utils.UUIDs;
+import org.springframework.util.StringUtils;
 import org.thingsboard.rule.engine.api.ListeningExecutor;
 import org.thingsboard.rule.engine.api.MailService;
 import org.thingsboard.rule.engine.api.RuleEngineDeviceRpcRequest;
@@ -35,6 +36,8 @@ import org.thingsboard.server.common.data.rpc.ToDeviceRpcRequestBody;
 import org.thingsboard.server.common.data.rule.RuleNode;
 import org.thingsboard.server.common.msg.TbMsg;
 import org.thingsboard.server.common.msg.TbMsgMetaData;
+import org.thingsboard.server.common.msg.cluster.ServerAddress;
+import org.thingsboard.server.common.msg.cluster.ServerType;
 import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest;
 import org.thingsboard.server.dao.alarm.AlarmService;
 import org.thingsboard.server.dao.asset.AssetService;
@@ -232,16 +235,22 @@ class DefaultTbContext implements TbContext {
         return new RuleEngineRpcService() {
             @Override
             public void sendRpcReply(DeviceId deviceId, int requestId, String body) {
-                mainCtx.getDeviceRpcService().sendRpcReplyToDevice(nodeCtx.getTenantId(), deviceId, requestId, body);
+                mainCtx.getDeviceRpcService().sendReplyToRpcCallFromDevice(nodeCtx.getTenantId(), deviceId, requestId, body);
             }
 
             @Override
             public void sendRpcRequest(RuleEngineDeviceRpcRequest src, Consumer<RuleEngineDeviceRpcResponse> consumer) {
                 ToDeviceRpcRequest request = new ToDeviceRpcRequest(src.getRequestUUID(), nodeCtx.getTenantId(), src.getDeviceId(),
                         src.isOneway(), src.getExpirationTime(), new ToDeviceRpcRequestBody(src.getMethod(), src.getBody()));
-                mainCtx.getDeviceRpcService().processRpcRequestToDevice(request, response -> {
+                mainCtx.getDeviceRpcService().forwardServerSideRPCRequestToDeviceActor(request, response -> {
                     if (src.isRestApiCall()) {
-                        mainCtx.getDeviceRpcService().processRestAPIRpcResponseFromRuleEngine(response);
+                        ServerAddress requestOriginAddress;
+                        if (!StringUtils.isEmpty(src.getOriginHost())) {
+                            requestOriginAddress = new ServerAddress(src.getOriginHost(), src.getOriginPort(), ServerType.CORE);
+                        } else {
+                            requestOriginAddress = mainCtx.getRoutingService().getCurrentServer();
+                        }
+                        mainCtx.getDeviceRpcService().processResponseToServerSideRPCRequestFromRuleEngine(requestOriginAddress, response);
                     }
                     consumer.accept(RuleEngineDeviceRpcResponse.builder()
                             .deviceId(src.getDeviceId())
diff --git a/application/src/main/java/org/thingsboard/server/actors/service/ActorService.java b/application/src/main/java/org/thingsboard/server/actors/service/ActorService.java
index f7e80e4..08d1dd8 100644
--- a/application/src/main/java/org/thingsboard/server/actors/service/ActorService.java
+++ b/application/src/main/java/org/thingsboard/server/actors/service/ActorService.java
@@ -35,5 +35,4 @@ public interface ActorService extends SessionMsgProcessor, RpcMsgListener, Disco
 
     void onDeviceNameOrTypeUpdate(TenantId tenantId, DeviceId deviceId, String deviceName, String deviceType);
 
-    void onMsg(ServiceToRuleEngineMsg serviceToRuleEngineMsg);
 }
diff --git a/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java b/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java
index 542a82f..85b8943 100644
--- a/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java
+++ b/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java
@@ -42,7 +42,6 @@ import org.thingsboard.server.common.msg.cluster.SendToClusterMsg;
 import org.thingsboard.server.common.msg.cluster.ServerAddress;
 import org.thingsboard.server.common.msg.cluster.ToAllNodesMsg;
 import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
-import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg;
 import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
 import org.thingsboard.server.service.cluster.discovery.DiscoveryService;
 import org.thingsboard.server.service.cluster.discovery.ServerInstance;
@@ -159,11 +158,6 @@ public class DefaultActorService implements ActorService {
         appActor.tell(new SendToClusterMsg(deviceId, msg), ActorRef.noSender());
     }
 
-    @Override
-    public void onMsg(ServiceToRuleEngineMsg msg) {
-        appActor.tell(msg, ActorRef.noSender());
-    }
-
     public void broadcast(ToAllNodesMsg msg) {
         actorContext.getEncodingService().encode(msg);
         rpcService.broadcast(new RpcBroadcastMsg(ClusterAPIProtos.ClusterMessage
@@ -185,7 +179,7 @@ public class DefaultActorService implements ActorService {
         ServerAddress serverAddress = new ServerAddress(source.getHost(), source.getPort(), source.getServerType());
         if (log.isDebugEnabled()) {
             log.info("Received msg [{}] from [{}]", msg.getMessageType().name(), serverAddress);
-            log.info("MSG: ", msg);
+            log.info("MSG: {}", msg);
         }
         switch (msg.getMessageType()) {
             case CLUSTER_ACTOR_MESSAGE:
@@ -219,7 +213,7 @@ public class DefaultActorService implements ActorService {
                 actorContext.getTsSubService().onRemoteTsUpdate(serverAddress, msg.getPayload().toByteArray());
                 break;
             case CLUSTER_RPC_FROM_DEVICE_RESPONSE_MESSAGE:
-                actorContext.getDeviceRpcService().processRemoteResponseFromDevice(serverAddress, msg.getPayload().toByteArray());
+                actorContext.getDeviceRpcService().processResponseToServerSideRPCRequestFromRemoteServer(serverAddress, msg.getPayload().toByteArray());
                 break;
             case CLUSTER_DEVICE_STATE_SERVICE_MESSAGE:
                 actorContext.getDeviceStateService().onRemoteMsg(serverAddress, msg.getPayload().toByteArray());
diff --git a/application/src/main/java/org/thingsboard/server/controller/BaseController.java b/application/src/main/java/org/thingsboard/server/controller/BaseController.java
index 582bbed..be1b6db 100644
--- a/application/src/main/java/org/thingsboard/server/controller/BaseController.java
+++ b/application/src/main/java/org/thingsboard/server/controller/BaseController.java
@@ -48,6 +48,7 @@ import org.thingsboard.server.common.data.widget.WidgetsBundle;
 import org.thingsboard.server.common.msg.TbMsg;
 import org.thingsboard.server.common.msg.TbMsgDataType;
 import org.thingsboard.server.common.msg.TbMsgMetaData;
+import org.thingsboard.server.common.msg.cluster.SendToClusterMsg;
 import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg;
 import org.thingsboard.server.dao.alarm.AlarmService;
 import org.thingsboard.server.dao.asset.AssetService;
@@ -673,7 +674,7 @@ public abstract class BaseController {
                 TbMsg tbMsg = new TbMsg(UUIDs.timeBased(), msgType, entityId, metaData, TbMsgDataType.JSON
                         , json.writeValueAsString(entityNode)
                         , null, null, 0L);
-                actorService.onMsg(new ServiceToRuleEngineMsg(user.getTenantId(), tbMsg));
+                actorService.onMsg(new SendToClusterMsg(entityId, new ServiceToRuleEngineMsg(user.getTenantId(), tbMsg)));
             } catch (Exception e) {
                 log.warn("[{}] Failed to push entity action to rule engine: {}", entityId, actionType, e);
             }
diff --git a/application/src/main/java/org/thingsboard/server/service/rpc/DefaultDeviceRpcService.java b/application/src/main/java/org/thingsboard/server/service/rpc/DefaultDeviceRpcService.java
index e13f865..a65c1a5 100644
--- a/application/src/main/java/org/thingsboard/server/service/rpc/DefaultDeviceRpcService.java
+++ b/application/src/main/java/org/thingsboard/server/service/rpc/DefaultDeviceRpcService.java
@@ -90,7 +90,7 @@ public class DefaultDeviceRpcService implements DeviceRpcService {
 
     @Override
     public void processRestAPIRpcRequestToRuleEngine(ToDeviceRpcRequest request, Consumer<FromDeviceRpcResponse> responseConsumer) {
-        log.trace("[{}][{}] Processing local rpc call to rule engine [{}]", request.getTenantId(), request.getId(), request.getDeviceId());
+        log.trace("[{}][{}] Processing REST API call to rule engine [{}]", request.getTenantId(), request.getId(), request.getDeviceId());
         UUID requestId = request.getId();
         localToRuleEngineRpcRequests.put(requestId, responseConsumer);
         sendRpcRequestToRuleEngine(request);
@@ -98,31 +98,11 @@ public class DefaultDeviceRpcService implements DeviceRpcService {
     }
 
     @Override
-    public void processRestAPIRpcResponseFromRuleEngine(FromDeviceRpcResponse response) {
-        UUID requestId = response.getId();
-        Consumer<FromDeviceRpcResponse> consumer = localToRuleEngineRpcRequests.remove(requestId);
-        if (consumer != null) {
-            consumer.accept(response);
-        } else {
-            log.trace("[{}] Unknown or stale rpc response received [{}]", requestId, response);
-        }
-    }
-
-    @Override
-    public void processRpcRequestToDevice(ToDeviceRpcRequest request, Consumer<FromDeviceRpcResponse> responseConsumer) {
-        log.trace("[{}][{}] Processing local rpc call to device [{}]", request.getTenantId(), request.getId(), request.getDeviceId());
-        UUID requestId = request.getId();
-        localToDeviceRpcRequests.put(requestId, responseConsumer);
-        sendRpcRequestToDevice(request);
-        scheduleTimeout(request, requestId, localToDeviceRpcRequests);
-    }
-
-    @Override
-    public void processRpcResponseFromDevice(FromDeviceRpcResponse response) {
-        log.trace("[{}] Received device RPC response from server: [{}]", response.getId(), response.getServerAddress());
-        if (routingService.getCurrentServer().equals(response.getServerAddress())) {
+    public void processResponseToServerSideRPCRequestFromRuleEngine(ServerAddress requestOriginAddress, FromDeviceRpcResponse response) {
+        log.trace("[{}] Received response to server-side RPC request from rule engine: [{}]", response.getId(), requestOriginAddress);
+        if (routingService.getCurrentServer().equals(requestOriginAddress)) {
             UUID requestId = response.getId();
-            Consumer<FromDeviceRpcResponse> consumer = localToDeviceRpcRequests.remove(requestId);
+            Consumer<FromDeviceRpcResponse> consumer = localToRuleEngineRpcRequests.remove(requestId);
             if (consumer != null) {
                 consumer.accept(response);
             } else {
@@ -138,12 +118,33 @@ public class DefaultDeviceRpcService implements DeviceRpcService {
             } else {
                 builder.setError(-1);
             }
-            rpcService.tell(response.getServerAddress(), ClusterAPIProtos.MessageType.CLUSTER_RPC_FROM_DEVICE_RESPONSE_MESSAGE, builder.build().toByteArray());
+            rpcService.tell(requestOriginAddress, ClusterAPIProtos.MessageType.CLUSTER_RPC_FROM_DEVICE_RESPONSE_MESSAGE, builder.build().toByteArray());
+        }
+    }
+
+    @Override
+    public void forwardServerSideRPCRequestToDeviceActor(ToDeviceRpcRequest request, Consumer<FromDeviceRpcResponse> responseConsumer) {
+        log.trace("[{}][{}] Processing local rpc call to device actor [{}]", request.getTenantId(), request.getId(), request.getDeviceId());
+        UUID requestId = request.getId();
+        localToDeviceRpcRequests.put(requestId, responseConsumer);
+        sendRpcRequestToDevice(request);
+        scheduleTimeout(request, requestId, localToDeviceRpcRequests);
+    }
+
+    @Override
+    public void processResponseToServerSideRPCRequestFromDeviceActor(FromDeviceRpcResponse response) {
+        log.trace("[{}] Received response to server-side RPC request from device actor.", response.getId());
+        UUID requestId = response.getId();
+        Consumer<FromDeviceRpcResponse> consumer = localToDeviceRpcRequests.remove(requestId);
+        if (consumer != null) {
+            consumer.accept(response);
+        } else {
+            log.trace("[{}] Unknown or stale rpc response received [{}]", requestId, response);
         }
     }
 
     @Override
-    public void processRemoteResponseFromDevice(ServerAddress serverAddress, byte[] data) {
+    public void processResponseToServerSideRPCRequestFromRemoteServer(ServerAddress serverAddress, byte[] data) {
         ClusterAPIProtos.FromDeviceRPCResponseProto proto;
         try {
             proto = ClusterAPIProtos.FromDeviceRPCResponseProto.parseFrom(data);
@@ -151,13 +152,12 @@ public class DefaultDeviceRpcService implements DeviceRpcService {
             throw new RuntimeException(e);
         }
         RpcError error = proto.getError() > 0 ? RpcError.values()[proto.getError()] : null;
-        FromDeviceRpcResponse response = new FromDeviceRpcResponse(new UUID(proto.getRequestIdMSB(), proto.getRequestIdLSB()), serverAddress,
-                proto.getResponse(), error);
-        processRpcResponseFromDevice(response);
+        FromDeviceRpcResponse response = new FromDeviceRpcResponse(new UUID(proto.getRequestIdMSB(), proto.getRequestIdLSB()), proto.getResponse(), error);
+        processResponseToServerSideRPCRequestFromRuleEngine(routingService.getCurrentServer(), response);
     }
 
     @Override
-    public void sendRpcReplyToDevice(TenantId tenantId, DeviceId deviceId, int requestId, String body) {
+    public void sendReplyToRpcCallFromDevice(TenantId tenantId, DeviceId deviceId, int requestId, String body) {
         ToServerRpcResponseActorMsg rpcMsg = new ToServerRpcResponseActorMsg(tenantId, deviceId, new ToServerRpcResponseMsg(requestId, body));
         forward(deviceId, rpcMsg);
     }
@@ -166,6 +166,8 @@ public class DefaultDeviceRpcService implements DeviceRpcService {
         ObjectNode entityNode = json.createObjectNode();
         TbMsgMetaData metaData = new TbMsgMetaData();
         metaData.putValue("requestUUID", msg.getId().toString());
+        metaData.putValue("originHost", routingService.getCurrentServer().getHost());
+        metaData.putValue("originPort", Integer.toString(routingService.getCurrentServer().getPort()));
         metaData.putValue("expirationTime", Long.toString(msg.getExpirationTime()));
         metaData.putValue("oneway", Boolean.toString(msg.isOneway()));
 
@@ -176,7 +178,7 @@ public class DefaultDeviceRpcService implements DeviceRpcService {
             TbMsg tbMsg = new TbMsg(UUIDs.timeBased(), DataConstants.RPC_CALL_FROM_SERVER_TO_DEVICE, msg.getDeviceId(), metaData, TbMsgDataType.JSON
                     , json.writeValueAsString(entityNode)
                     , null, null, 0L);
-            actorService.onMsg(new ServiceToRuleEngineMsg(msg.getTenantId(), tbMsg));
+            actorService.onMsg(new SendToClusterMsg(msg.getDeviceId(), new ServiceToRuleEngineMsg(msg.getTenantId(), tbMsg)));
         } catch (JsonProcessingException e) {
             throw new RuntimeException(e);
         }
@@ -199,7 +201,7 @@ public class DefaultDeviceRpcService implements DeviceRpcService {
             log.trace("[{}] timeout the request: [{}]", this.hashCode(), requestId);
             Consumer<FromDeviceRpcResponse> consumer = requestsMap.remove(requestId);
             if (consumer != null) {
-                consumer.accept(new FromDeviceRpcResponse(requestId, null, null, RpcError.TIMEOUT));
+                consumer.accept(new FromDeviceRpcResponse(requestId, null, RpcError.TIMEOUT));
             }
         }, timeout, TimeUnit.MILLISECONDS);
     }
diff --git a/application/src/main/java/org/thingsboard/server/service/rpc/DeviceRpcService.java b/application/src/main/java/org/thingsboard/server/service/rpc/DeviceRpcService.java
index 4cee96e..bf12ec6 100644
--- a/application/src/main/java/org/thingsboard/server/service/rpc/DeviceRpcService.java
+++ b/application/src/main/java/org/thingsboard/server/service/rpc/DeviceRpcService.java
@@ -29,13 +29,13 @@ public interface DeviceRpcService {
 
     void processRestAPIRpcRequestToRuleEngine(ToDeviceRpcRequest request, Consumer<FromDeviceRpcResponse> responseConsumer);
 
-    void processRestAPIRpcResponseFromRuleEngine(FromDeviceRpcResponse response);
+    void processResponseToServerSideRPCRequestFromRuleEngine(ServerAddress requestOriginAddress, FromDeviceRpcResponse response);
 
-    void processRpcRequestToDevice(ToDeviceRpcRequest request, Consumer<FromDeviceRpcResponse> responseConsumer);
+    void forwardServerSideRPCRequestToDeviceActor(ToDeviceRpcRequest request, Consumer<FromDeviceRpcResponse> responseConsumer);
 
-    void processRpcResponseFromDevice(FromDeviceRpcResponse response);
+    void processResponseToServerSideRPCRequestFromDeviceActor(FromDeviceRpcResponse response);
 
-    void sendRpcReplyToDevice(TenantId tenantId, DeviceId deviceId, int requestId, String body);
+    void processResponseToServerSideRPCRequestFromRemoteServer(ServerAddress serverAddress, byte[] data);
 
-    void processRemoteResponseFromDevice(ServerAddress serverAddress, byte[] bytes);
+    void sendReplyToRpcCallFromDevice(TenantId tenantId, DeviceId deviceId, int requestId, String body);
 }
diff --git a/application/src/main/java/org/thingsboard/server/service/rpc/FromDeviceRpcResponse.java b/application/src/main/java/org/thingsboard/server/service/rpc/FromDeviceRpcResponse.java
index 9c3ce9a..75506df 100644
--- a/application/src/main/java/org/thingsboard/server/service/rpc/FromDeviceRpcResponse.java
+++ b/application/src/main/java/org/thingsboard/server/service/rpc/FromDeviceRpcResponse.java
@@ -32,8 +32,6 @@ import java.util.UUID;
 public class FromDeviceRpcResponse {
     @Getter
     private final UUID id;
-    @Getter
-    private final ServerAddress serverAddress;
     private final String response;
     private final RpcError error;
 
diff --git a/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java b/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java
index f4e37db..610cd3b 100644
--- a/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java
+++ b/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java
@@ -43,6 +43,7 @@ import org.thingsboard.server.common.data.plugin.ComponentLifecycleState;
 import org.thingsboard.server.common.msg.TbMsg;
 import org.thingsboard.server.common.msg.TbMsgDataType;
 import org.thingsboard.server.common.msg.TbMsgMetaData;
+import org.thingsboard.server.common.msg.cluster.SendToClusterMsg;
 import org.thingsboard.server.common.msg.cluster.ServerAddress;
 import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg;
 import org.thingsboard.server.dao.attributes.AttributesService;
@@ -457,7 +458,7 @@ public class DefaultDeviceStateService implements DeviceStateService {
             TbMsg tbMsg = new TbMsg(UUIDs.timeBased(), msgType, stateData.getDeviceId(), stateData.getMetaData().copy(), TbMsgDataType.JSON
                     , json.writeValueAsString(state)
                     , null, null, 0L);
-            actorService.onMsg(new ServiceToRuleEngineMsg(stateData.getTenantId(), tbMsg));
+            actorService.onMsg(new SendToClusterMsg(stateData.getDeviceId(), new ServiceToRuleEngineMsg(stateData.getTenantId(), tbMsg)));
         } catch (Exception e) {
             log.warn("[{}] Failed to push inactivity alarm: {}", stateData.getDeviceId(), state, e);
         }
diff --git a/application/src/main/proto/cluster.proto b/application/src/main/proto/cluster.proto
index 21c963b..1940b36 100644
--- a/application/src/main/proto/cluster.proto
+++ b/application/src/main/proto/cluster.proto
@@ -22,6 +22,7 @@ option java_outer_classname = "ClusterAPIProtos";
 service ClusterRpcService {
   rpc handleMsgs(stream ClusterMessage) returns (stream ClusterMessage) {}
 }
+
 message ClusterMessage {
   MessageType messageType = 1;
   MessageMataInfo messageMetaInfo = 2;
@@ -139,4 +140,4 @@ message DeviceStateServiceMsgProto {
     bool added = 5;
     bool updated = 6;
     bool deleted = 7;
-}
\ No newline at end of file
+}
diff --git a/application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java b/application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java
index f050f3b..1de2787 100644
--- a/application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java
+++ b/application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java
@@ -38,6 +38,7 @@ import org.thingsboard.server.common.data.rule.RuleNode;
 import org.thingsboard.server.common.data.security.Authority;
 import org.thingsboard.server.common.msg.TbMsg;
 import org.thingsboard.server.common.msg.TbMsgMetaData;
+import org.thingsboard.server.common.msg.cluster.SendToClusterMsg;
 import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg;
 import org.thingsboard.server.controller.AbstractRuleEngineControllerTest;
 import org.thingsboard.server.dao.attributes.AttributesService;
@@ -155,7 +156,7 @@ public abstract class AbstractRuleEngineFlowIntegrationTest extends AbstractRule
                 device.getId(),
                 new TbMsgMetaData(),
                 "{}", null, null, 0L);
-        actorService.onMsg(new ServiceToRuleEngineMsg(savedTenant.getId(), tbMsg));
+        actorService.onMsg(new SendToClusterMsg(device.getId(), new ServiceToRuleEngineMsg(savedTenant.getId(), tbMsg)));
 
         Thread.sleep(3000);
 
@@ -270,7 +271,7 @@ public abstract class AbstractRuleEngineFlowIntegrationTest extends AbstractRule
                 device.getId(),
                 new TbMsgMetaData(),
                 "{}", null, null, 0L);
-        actorService.onMsg(new ServiceToRuleEngineMsg(savedTenant.getId(), tbMsg));
+        actorService.onMsg(new SendToClusterMsg(device.getId(), new ServiceToRuleEngineMsg(savedTenant.getId(), tbMsg)));
 
         Thread.sleep(3000);
 
diff --git a/application/src/test/java/org/thingsboard/server/rules/lifecycle/AbstractRuleEngineLifecycleIntegrationTest.java b/application/src/test/java/org/thingsboard/server/rules/lifecycle/AbstractRuleEngineLifecycleIntegrationTest.java
index 24db457..f59dd63 100644
--- a/application/src/test/java/org/thingsboard/server/rules/lifecycle/AbstractRuleEngineLifecycleIntegrationTest.java
+++ b/application/src/test/java/org/thingsboard/server/rules/lifecycle/AbstractRuleEngineLifecycleIntegrationTest.java
@@ -39,6 +39,7 @@ import org.thingsboard.server.common.data.rule.RuleNode;
 import org.thingsboard.server.common.data.security.Authority;
 import org.thingsboard.server.common.msg.TbMsg;
 import org.thingsboard.server.common.msg.TbMsgMetaData;
+import org.thingsboard.server.common.msg.cluster.SendToClusterMsg;
 import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg;
 import org.thingsboard.server.controller.AbstractRuleEngineControllerTest;
 import org.thingsboard.server.dao.attributes.AttributesService;
@@ -142,7 +143,7 @@ public abstract class AbstractRuleEngineLifecycleIntegrationTest extends Abstrac
                 new TbMsgMetaData(),
                 "{}",
                 null, null, 0L);
-        actorService.onMsg(new ServiceToRuleEngineMsg(savedTenant.getId(), tbMsg));
+        actorService.onMsg(new SendToClusterMsg(device.getId(), new ServiceToRuleEngineMsg(savedTenant.getId(), tbMsg)));
 
         Thread.sleep(3000);
 
diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/system/ServiceToRuleEngineMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/system/ServiceToRuleEngineMsg.java
index 0792b63..7859623 100644
--- a/common/message/src/main/java/org/thingsboard/server/common/msg/system/ServiceToRuleEngineMsg.java
+++ b/common/message/src/main/java/org/thingsboard/server/common/msg/system/ServiceToRuleEngineMsg.java
@@ -21,11 +21,13 @@ import org.thingsboard.server.common.msg.MsgType;
 import org.thingsboard.server.common.msg.TbActorMsg;
 import org.thingsboard.server.common.msg.TbMsg;
 
+import java.io.Serializable;
+
 /**
  * Created by ashvayka on 15.03.18.
  */
 @Data
-public final class ServiceToRuleEngineMsg implements TbActorMsg {
+public final class ServiceToRuleEngineMsg implements TbActorMsg, Serializable {
 
     private final TenantId tenantId;
     private final TbMsg tbMsg;
diff --git a/common/transport/transport-api/src/main/proto/transport.proto b/common/transport/transport-api/src/main/proto/transport.proto
index 0e36dab..ff740d4 100644
--- a/common/transport/transport-api/src/main/proto/transport.proto
+++ b/common/transport/transport-api/src/main/proto/transport.proto
@@ -172,6 +172,22 @@ message ToServerRpcResponseMsg {
   string error = 3;
 }
 
+//Used to report session state to tb-node and persist this state in the cache on the tb-node level.
+message SubscriptionInfoProto {
+  int64 lastActivityTime = 1;
+  bool attributeSubscription = 2;
+  bool rpcSubscription = 3;
+}
+
+message SessionSubscriptionInfoProto {
+  SessionInfoProto sessionInfo = 1;
+  SubscriptionInfoProto subscriptionInfo = 2;
+}
+
+message DeviceSessionsCacheEntry {
+  repeated SessionSubscriptionInfoProto sessions = 1;
+}
+
 message TransportToDeviceActorMsg {
   SessionInfoProto sessionInfo = 1;
   SessionEventMsg sessionEvent = 2;
@@ -182,6 +198,7 @@ message TransportToDeviceActorMsg {
   SubscribeToRPCMsg subscribeToRPC = 7;
   ToDeviceRpcResponseMsg toDeviceRPCCallResponse = 8;
   ToServerRpcRequestMsg toServerRPCCallRequest = 9;
+  SubscriptionInfoProto subscriptionInfo = 10;
 }
 
 message DeviceActorToTransportMsg {
@@ -214,4 +231,4 @@ message TransportApiRequestMsg {
 message TransportApiResponseMsg {
    ValidateDeviceCredentialsResponseMsg validateTokenResponseMsg = 1;
    GetOrCreateDeviceFromGatewayResponseMsg getOrCreateDeviceResponseMsg = 2;
-}
\ No newline at end of file
+}

msa/pom.xml 1(+0 -1)

diff --git a/msa/pom.xml b/msa/pom.xml
index 5fd2212..d6d6c8d 100644
--- a/msa/pom.xml
+++ b/msa/pom.xml
@@ -23,7 +23,6 @@
         <version>2.2.0-SNAPSHOT</version>
         <artifactId>thingsboard</artifactId>
     </parent>
-    <groupId>org.thingsboard</groupId>
     <artifactId>msa</artifactId>
     <packaging>pom</packaging>
 
diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineDeviceRpcRequest.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineDeviceRpcRequest.java
index 9a93d1e..f3d57de 100644
--- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineDeviceRpcRequest.java
+++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineDeviceRpcRequest.java
@@ -31,6 +31,8 @@ public final class RuleEngineDeviceRpcRequest {
     private final DeviceId deviceId;
     private final int requestId;
     private final UUID requestUUID;
+    private final String originHost;
+    private final int originPort;
     private final boolean oneway;
     private final String method;
     private final String body;
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRPCRequestNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRPCRequestNode.java
index 5d7e124..8c95b88 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRPCRequestNode.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRPCRequestNode.java
@@ -86,6 +86,10 @@ public class TbSendRPCRequestNode implements TbNode {
 
             tmp = msg.getMetaData().getValue("requestUUID");
             UUID requestUUID = !StringUtils.isEmpty(tmp) ? UUID.fromString(tmp) : UUIDs.timeBased();
+            tmp = msg.getMetaData().getValue("originHost");
+            String originHost = !StringUtils.isEmpty(tmp) ? tmp : null;
+            tmp = msg.getMetaData().getValue("originPort");
+            int originPort = !StringUtils.isEmpty(tmp) ? Integer.parseInt(tmp) : 0;
 
             tmp = msg.getMetaData().getValue("expirationTime");
             long expirationTime = !StringUtils.isEmpty(tmp) ? Long.parseLong(tmp) : (System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(config.getTimeoutInSeconds()));
@@ -105,6 +109,8 @@ public class TbSendRPCRequestNode implements TbNode {
                     .deviceId(new DeviceId(msg.getOriginator().getId()))
                     .requestId(requestId)
                     .requestUUID(requestUUID)
+                    .originHost(originHost)
+                    .originPort(originPort)
                     .expirationTime(expirationTime)
                     .restApiCall(restApiCall)
                     .build();