thingsboard-aplcache

Details

diff --git a/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java b/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java
index e1c7e08..d453e59 100644
--- a/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java
@@ -38,6 +38,7 @@ import org.thingsboard.server.common.msg.TbActorMsg;
 import org.thingsboard.server.common.msg.aware.TenantAwareMsg;
 import org.thingsboard.server.common.msg.cluster.SendToClusterMsg;
 import org.thingsboard.server.common.msg.cluster.ServerAddress;
+import org.thingsboard.server.common.msg.core.BasicActorSystemToDeviceSessionActorMsg;
 import org.thingsboard.server.common.msg.device.DeviceToDeviceActorMsg;
 import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
 import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg;
@@ -112,21 +113,27 @@ public class AppActor extends RuleChainManagerActor {
             case SERVER_RPC_RESPONSE_TO_DEVICE_ACTOR_MSG:
                 onToDeviceActorMsg((TenantAwareMsg) msg);
                 break;
+            case ACTOR_SYSTEM_TO_DEVICE_SESSION_ACTOR_MSG:
+                onToDeviceSessionMsg((BasicActorSystemToDeviceSessionActorMsg) msg);
             default:
                 return false;
         }
         return true;
     }
 
+    private void onToDeviceSessionMsg(BasicActorSystemToDeviceSessionActorMsg msg) {
+        systemContext.getSessionManagerActor().tell(msg, self());
+    }
+
     private void onPossibleClusterMsg(SendToClusterMsg msg) {
         Optional<ServerAddress> address = systemContext.getRoutingService().resolveById(msg.getEntityId());
-            if (address.isPresent()) {
-                systemContext.getRpcService().tell(
-                        systemContext.getEncodingService().convertToProtoDataMessage(address.get(), msg.getMsg()));
-            } else {
-                self().tell(msg.getMsg(), ActorRef.noSender());
-            }
+        if (address.isPresent()) {
+            systemContext.getRpcService().tell(
+                    systemContext.getEncodingService().convertToProtoDataMessage(address.get(), msg.getMsg()));
+        } else {
+            self().tell(msg.getMsg(), ActorRef.noSender());
         }
+    }
 
     private void onServiceToRuleEngineMsg(ServiceToRuleEngineMsg msg) {
         if (SYSTEM_TENANT.equals(msg.getTenantId())) {
diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
index 625d6e9..d416497 100644
--- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
@@ -163,7 +163,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
 
         if (request.isOneway() && sent) {
             logger.debug("[{}] Rpc command response sent [{}]!", deviceId, request.getId());
-            systemContext.getDeviceRpcService().process(new FromDeviceRpcResponse(msg.getMsg().getId(), null, null));
+            systemContext.getDeviceRpcService().processRpcResponseFromDevice(new FromDeviceRpcResponse(msg.getMsg().getId(), msg.getServerAddress(), null, null));
         } else {
             registerPendingRpcRequest(context, msg, sent, rpcRequest, timeout);
         }
@@ -185,8 +185,8 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
         ToDeviceRpcRequestMetadata requestMd = toDeviceRpcPendingMap.remove(msg.getId());
         if (requestMd != null) {
             logger.debug("[{}] RPC request [{}] timeout detected!", deviceId, msg.getId());
-            systemContext.getDeviceRpcService().process(new FromDeviceRpcResponse(requestMd.getMsg().getMsg().getId(),
-                    null, requestMd.isSent() ? RpcError.TIMEOUT : RpcError.NO_ACTIVE_CONNECTION));
+            systemContext.getDeviceRpcService().processRpcResponseFromDevice(new FromDeviceRpcResponse(requestMd.getMsg().getMsg().getId(),
+                    requestMd.getMsg().getServerAddress(), null, requestMd.isSent() ? RpcError.TIMEOUT : RpcError.NO_ACTIVE_CONNECTION));
         }
     }
 
@@ -234,11 +234,12 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
 
     private Consumer<Map.Entry<Integer, ToDeviceRpcRequestMetadata>> processPendingRpc(ActorContext context, SessionId sessionId, Optional<ServerAddress> server, Set<Integer> sentOneWayIds) {
         return entry -> {
+            ToDeviceRpcRequestActorMsg requestActorMsg = entry.getValue().getMsg();
             ToDeviceRpcRequest request = entry.getValue().getMsg().getMsg();
             ToDeviceRpcRequestBody body = request.getBody();
             if (request.isOneway()) {
                 sentOneWayIds.add(entry.getKey());
-                systemContext.getDeviceRpcService().process(new FromDeviceRpcResponse(request.getId(), null, null));
+                systemContext.getDeviceRpcService().processRpcResponseFromDevice(new FromDeviceRpcResponse(request.getId(), requestActorMsg.getServerAddress(), null, null));
             }
             ToDeviceRpcRequestMsg rpcRequest = new ToDeviceRpcRequestMsg(
                     entry.getKey(),
@@ -360,7 +361,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
                 kv.getStrValue().ifPresent(v -> json.addProperty(kv.getKey(), v));
             }
             TbMsgMetaData metaData = defaultMetaData.copy();
-            metaData.putValue("ts", entry.getKey()+"");
+            metaData.putValue("ts", entry.getKey() + "");
             TbMsg tbMsg = new TbMsg(UUIDs.timeBased(), SessionMsgType.POST_TELEMETRY_REQUEST.name(), deviceId, metaData, TbMsgDataType.JSON, gson.toJson(json), null, null, 0L);
             pushToRuleEngineWithTimeout(context, tbMsg, msgData);
         }
@@ -451,7 +452,8 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
             ToDeviceRpcRequestMetadata requestMd = toDeviceRpcPendingMap.remove(responseMsg.getRequestId());
             boolean success = requestMd != null;
             if (success) {
-                systemContext.getDeviceRpcService().process(new FromDeviceRpcResponse(requestMd.getMsg().getMsg().getId(), responseMsg.getData(), null));
+                systemContext.getDeviceRpcService().processRpcResponseFromDevice(new FromDeviceRpcResponse(requestMd.getMsg().getMsg().getId(),
+                        requestMd.getMsg().getServerAddress(), responseMsg.getData(), null));
             } else {
                 logger.debug("[{}] Rpc command response [{}] is stale!", deviceId, responseMsg.getRequestId());
             }
diff --git a/application/src/main/java/org/thingsboard/server/actors/rpc/RpcManagerActor.java b/application/src/main/java/org/thingsboard/server/actors/rpc/RpcManagerActor.java
index dcc6e5c..3f3f70b 100644
--- a/application/src/main/java/org/thingsboard/server/actors/rpc/RpcManagerActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/rpc/RpcManagerActor.java
@@ -114,6 +114,7 @@ public class RpcManagerActor extends ContextAwareActor {
             logger.warning("Cluster msg doesn't have set Server Address [{}]", msg);
         }
     }
+
     @Override
     public void postStop() {
         sessionActors.clear();
@@ -157,7 +158,7 @@ public class RpcManagerActor extends ContextAwareActor {
     private void onSessionClose(boolean reconnect, ServerAddress remoteAddress) {
         log.debug("[{}] session closed. Should reconnect: {}", remoteAddress, reconnect);
         SessionActorInfo sessionRef = sessionActors.get(remoteAddress);
-        if (context().sender().equals(sessionRef.actor)) {
+        if (context().sender() != null && context().sender().equals(sessionRef.actor)) {
             sessionActors.remove(remoteAddress);
             pendingMsgs.remove(remoteAddress);
             if (reconnect) {
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
index b888bc3..17ffd16 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
@@ -27,7 +27,6 @@ import org.thingsboard.rule.engine.api.ScriptEngine;
 import org.thingsboard.rule.engine.api.TbContext;
 import org.thingsboard.rule.engine.api.TbRelationTypes;
 import org.thingsboard.server.actors.ActorSystemContext;
-import org.thingsboard.server.common.data.DataConstants;
 import org.thingsboard.server.common.data.id.DeviceId;
 import org.thingsboard.server.common.data.id.EntityId;
 import org.thingsboard.server.common.data.id.RuleNodeId;
@@ -224,7 +223,7 @@ class DefaultTbContext implements TbContext {
             public void sendRpcRequest(RuleEngineDeviceRpcRequest src, Consumer<RuleEngineDeviceRpcResponse> consumer) {
                 ToDeviceRpcRequest request = new ToDeviceRpcRequest(UUIDs.timeBased(), nodeCtx.getTenantId(), src.getDeviceId(),
                         src.isOneway(), System.currentTimeMillis() + src.getTimeout(), new ToDeviceRpcRequestBody(src.getMethod(), src.getBody()));
-                mainCtx.getDeviceRpcService().process(request, response -> {
+                mainCtx.getDeviceRpcService().processRpcRequestToDevice(request, response -> {
                     consumer.accept(RuleEngineDeviceRpcResponse.builder()
                             .deviceId(src.getDeviceId())
                             .requestId(src.getRequestId())
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
index cc73eae..7d560db 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
@@ -1,12 +1,12 @@
 /**
  * Copyright © 2016-2018 The Thingsboard Authors
- * <p>
+ *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
diff --git a/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java b/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java
index 82ca95f..3507f24 100644
--- a/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java
+++ b/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java
@@ -233,6 +233,9 @@ public class DefaultActorService implements ActorService {
             case CLUSTER_TELEMETRY_TS_UPDATE_MESSAGE:
                 actorContext.getTsSubService().onRemoteTsUpdate(serverAddress, msg.getPayload().toByteArray());
                 break;
+            case CLUSTER_RPC_FROM_DEVICE_RESPONSE_MESSAGE:
+                actorContext.getDeviceRpcService().processRemoteResponseFromDevice(serverAddress, msg.getPayload().toByteArray());
+                break;
         }
     }
 
diff --git a/application/src/main/java/org/thingsboard/server/controller/RpcController.java b/application/src/main/java/org/thingsboard/server/controller/RpcController.java
index 1422a34..e39a3f6 100644
--- a/application/src/main/java/org/thingsboard/server/controller/RpcController.java
+++ b/application/src/main/java/org/thingsboard/server/controller/RpcController.java
@@ -128,7 +128,7 @@ public class RpcController extends BaseController {
                             timeout,
                             body
                     );
-                    deviceRpcService.process(rpcRequest, fromDeviceRpcResponse -> reply(new LocalRequestMetaData(rpcRequest, currentUser, result), fromDeviceRpcResponse));
+                    deviceRpcService.processRpcRequestToDevice(rpcRequest, fromDeviceRpcResponse -> reply(new LocalRequestMetaData(rpcRequest, currentUser, result), fromDeviceRpcResponse));
                 }
 
                 @Override
diff --git a/application/src/main/java/org/thingsboard/server/service/rpc/DefaultDeviceRpcService.java b/application/src/main/java/org/thingsboard/server/service/rpc/DefaultDeviceRpcService.java
index bd0071a..6630d7c 100644
--- a/application/src/main/java/org/thingsboard/server/service/rpc/DefaultDeviceRpcService.java
+++ b/application/src/main/java/org/thingsboard/server/service/rpc/DefaultDeviceRpcService.java
@@ -15,6 +15,7 @@
  */
 package org.thingsboard.server.service.rpc;
 
+import com.google.protobuf.InvalidProtocolBufferException;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
@@ -28,11 +29,15 @@ import org.thingsboard.server.common.msg.cluster.ServerAddress;
 import org.thingsboard.server.common.msg.core.ToServerRpcResponseMsg;
 import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest;
 import org.thingsboard.server.dao.audit.AuditLogService;
+import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
 import org.thingsboard.server.service.cluster.routing.ClusterRoutingService;
 import org.thingsboard.server.service.cluster.rpc.ClusterRpcService;
+import org.thingsboard.server.service.encoding.DataDecodingEncodingService;
+import org.thingsboard.server.service.telemetry.sub.Subscription;
 
 import javax.annotation.PostConstruct;
 import javax.annotation.PreDestroy;
+import java.util.Optional;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -57,14 +62,10 @@ public class DefaultDeviceRpcService implements DeviceRpcService {
     @Autowired
     private ActorService actorService;
 
-    @Autowired
-    private AuditLogService auditLogService;
-
     private ScheduledExecutorService rpcCallBackExecutor;
 
     private final ConcurrentMap<UUID, Consumer<FromDeviceRpcResponse>> localRpcRequests = new ConcurrentHashMap<>();
 
-
     @PostConstruct
     public void initExecutor() {
         rpcCallBackExecutor = Executors.newSingleThreadScheduledExecutor();
@@ -78,7 +79,7 @@ public class DefaultDeviceRpcService implements DeviceRpcService {
     }
 
     @Override
-    public void process(ToDeviceRpcRequest request, Consumer<FromDeviceRpcResponse> responseConsumer) {
+    public void processRpcRequestToDevice(ToDeviceRpcRequest request, Consumer<FromDeviceRpcResponse> responseConsumer) {
         log.trace("[{}] Processing local rpc call for device [{}]", request.getTenantId(), request.getDeviceId());
         sendRpcRequest(request);
         UUID requestId = request.getId();
@@ -89,33 +90,48 @@ public class DefaultDeviceRpcService implements DeviceRpcService {
             log.error("[{}] timeout the request: [{}]", this.hashCode(), requestId);
             Consumer<FromDeviceRpcResponse> consumer = localRpcRequests.remove(requestId);
             if (consumer != null) {
-                consumer.accept(new FromDeviceRpcResponse(requestId, null, RpcError.TIMEOUT));
+                consumer.accept(new FromDeviceRpcResponse(requestId, null, null, RpcError.TIMEOUT));
             }
         }, timeout, TimeUnit.MILLISECONDS);
     }
 
     @Override
-    public void process(ToDeviceRpcRequest request, ServerAddress originator) {
-//        if (pluginServerAddress.isPresent()) {
-//            systemContext.getRpcService().tell(pluginServerAddress.get(), responsePluginMsg);
-//            logger.debug("[{}] Rpc command response sent to remote plugin actor [{}]!", deviceId, requestMd.getMsg().getMsg().getId());
-//        } else {
-//            context.parent().tell(responsePluginMsg, ActorRef.noSender());
-//            logger.debug("[{}] Rpc command response sent to local plugin actor [{}]!", deviceId, requestMd.getMsg().getMsg().getId());
-//        }
+    public void processRpcResponseFromDevice(FromDeviceRpcResponse response) {
+        log.error("[{}] response the request: [{}]", this.hashCode(), response.getId());
+        if (routingService.getCurrentServer().equals(response.getServerAddress())) {
+            UUID requestId = response.getId();
+            Consumer<FromDeviceRpcResponse> consumer = localRpcRequests.remove(requestId);
+            if (consumer != null) {
+                consumer.accept(response);
+            } else {
+                log.trace("[{}] Unknown or stale rpc response received [{}]", requestId, response);
+            }
+        } else {
+            ClusterAPIProtos.FromDeviceRPCResponseProto.Builder builder = ClusterAPIProtos.FromDeviceRPCResponseProto.newBuilder();
+            builder.setRequestIdMSB(response.getId().getMostSignificantBits());
+            builder.setRequestIdLSB(response.getId().getLeastSignificantBits());
+            response.getResponse().ifPresent(builder::setResponse);
+            if (response.getError().isPresent()) {
+                builder.setError(response.getError().get().ordinal());
+            } else {
+                builder.setError(-1);
+            }
+            rpcService.tell(response.getServerAddress(), ClusterAPIProtos.MessageType.CLUSTER_RPC_FROM_DEVICE_RESPONSE_MESSAGE, builder.build().toByteArray());
+        }
     }
 
     @Override
-    public void process(FromDeviceRpcResponse response) {
-        log.error("[{}] response the request: [{}]", this.hashCode(), response.getId());
-        //TODO: send to another server if needed.
-        UUID requestId = response.getId();
-        Consumer<FromDeviceRpcResponse> consumer = localRpcRequests.remove(requestId);
-        if (consumer != null) {
-            consumer.accept(response);
-        } else {
-            log.trace("[{}] Unknown or stale rpc response received [{}]", requestId, response);
+    public void processRemoteResponseFromDevice(ServerAddress serverAddress, byte[] data) {
+        ClusterAPIProtos.FromDeviceRPCResponseProto proto;
+        try {
+            proto = ClusterAPIProtos.FromDeviceRPCResponseProto.parseFrom(data);
+        } catch (InvalidProtocolBufferException e) {
+            throw new RuntimeException(e);
         }
+        RpcError error = proto.getError() > 0 ? RpcError.values()[proto.getError()] : null;
+        FromDeviceRpcResponse response = new FromDeviceRpcResponse(new UUID(proto.getRequestIdMSB(), proto.getRequestIdLSB()), routingService.getCurrentServer(),
+                proto.getResponse(), error);
+        processRpcResponseFromDevice(response);
     }
 
     @Override
@@ -125,8 +141,8 @@ public class DefaultDeviceRpcService implements DeviceRpcService {
     }
 
     private void sendRpcRequest(ToDeviceRpcRequest msg) {
+        ToDeviceRpcRequestActorMsg rpcMsg = new ToDeviceRpcRequestActorMsg(routingService.getCurrentServer(), msg);
         log.trace("[{}] Forwarding msg {} to device actor!", msg.getDeviceId(), msg);
-        ToDeviceRpcRequestActorMsg rpcMsg = new ToDeviceRpcRequestActorMsg(msg);
         forward(msg.getDeviceId(), rpcMsg);
     }
 
diff --git a/application/src/main/java/org/thingsboard/server/service/rpc/DeviceRpcService.java b/application/src/main/java/org/thingsboard/server/service/rpc/DeviceRpcService.java
index f6e3543..8b4d47b 100644
--- a/application/src/main/java/org/thingsboard/server/service/rpc/DeviceRpcService.java
+++ b/application/src/main/java/org/thingsboard/server/service/rpc/DeviceRpcService.java
@@ -27,12 +27,11 @@ import java.util.function.Consumer;
  */
 public interface DeviceRpcService {
 
-    void process(ToDeviceRpcRequest request, Consumer<FromDeviceRpcResponse> responseConsumer);
+    void processRpcRequestToDevice(ToDeviceRpcRequest request, Consumer<FromDeviceRpcResponse> responseConsumer);
 
-    void process(ToDeviceRpcRequest request, ServerAddress originator);
-
-    void process(FromDeviceRpcResponse response);
+    void processRpcResponseFromDevice(FromDeviceRpcResponse response);
 
     void sendRpcReplyToDevice(TenantId tenantId, DeviceId deviceId, int requestId, String body);
 
+    void processRemoteResponseFromDevice(ServerAddress serverAddress, byte[] bytes);
 }
diff --git a/application/src/main/java/org/thingsboard/server/service/rpc/FromDeviceRpcResponse.java b/application/src/main/java/org/thingsboard/server/service/rpc/FromDeviceRpcResponse.java
index 855aed4..9c3ce9a 100644
--- a/application/src/main/java/org/thingsboard/server/service/rpc/FromDeviceRpcResponse.java
+++ b/application/src/main/java/org/thingsboard/server/service/rpc/FromDeviceRpcResponse.java
@@ -19,6 +19,7 @@ import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 import lombok.ToString;
 import org.thingsboard.rule.engine.api.RpcError;
+import org.thingsboard.server.common.msg.cluster.ServerAddress;
 
 import java.util.Optional;
 import java.util.UUID;
@@ -31,6 +32,8 @@ import java.util.UUID;
 public class FromDeviceRpcResponse {
     @Getter
     private final UUID id;
+    @Getter
+    private final ServerAddress serverAddress;
     private final String response;
     private final RpcError error;
 
diff --git a/application/src/main/java/org/thingsboard/server/service/rpc/ToDeviceRpcRequestActorMsg.java b/application/src/main/java/org/thingsboard/server/service/rpc/ToDeviceRpcRequestActorMsg.java
index acdd9fc..37c4c00 100644
--- a/application/src/main/java/org/thingsboard/server/service/rpc/ToDeviceRpcRequestActorMsg.java
+++ b/application/src/main/java/org/thingsboard/server/service/rpc/ToDeviceRpcRequestActorMsg.java
@@ -34,18 +34,11 @@ import java.util.Optional;
 @RequiredArgsConstructor
 public class ToDeviceRpcRequestActorMsg implements ToDeviceActorNotificationMsg {
 
+    @Getter
     private final ServerAddress serverAddress;
     @Getter
     private final ToDeviceRpcRequest msg;
 
-    public ToDeviceRpcRequestActorMsg(ToDeviceRpcRequest msg) {
-        this(null, msg);
-    }
-
-    public Optional<ServerAddress> getServerAddress() {
-        return Optional.ofNullable(serverAddress);
-    }
-
     @Override
     public DeviceId getDeviceId() {
         return msg.getDeviceId();
diff --git a/application/src/main/java/org/thingsboard/server/service/rpc/ToServerRpcResponseActorMsg.java b/application/src/main/java/org/thingsboard/server/service/rpc/ToServerRpcResponseActorMsg.java
index 9875312..201f656 100644
--- a/application/src/main/java/org/thingsboard/server/service/rpc/ToServerRpcResponseActorMsg.java
+++ b/application/src/main/java/org/thingsboard/server/service/rpc/ToServerRpcResponseActorMsg.java
@@ -34,8 +34,6 @@ import java.util.Optional;
 @RequiredArgsConstructor
 public class ToServerRpcResponseActorMsg implements ToDeviceActorNotificationMsg {
 
-    private final ServerAddress serverAddress;
-
     @Getter
     private final TenantId tenantId;
 
@@ -45,14 +43,6 @@ public class ToServerRpcResponseActorMsg implements ToDeviceActorNotificationMsg
     @Getter
     private final ToServerRpcResponseMsg msg;
 
-    public ToServerRpcResponseActorMsg(TenantId tenantId, DeviceId deviceId, ToServerRpcResponseMsg msg) {
-        this(null, tenantId, deviceId, msg);
-    }
-
-    public Optional<ServerAddress> getServerAddress() {
-        return Optional.ofNullable(serverAddress);
-    }
-
     @Override
     public MsgType getMsgType() {
         return MsgType.SERVER_RPC_RESPONSE_TO_DEVICE_ACTOR_MSG;
diff --git a/application/src/main/proto/cluster.proto b/application/src/main/proto/cluster.proto
index 90917e1..ac96010 100644
--- a/application/src/main/proto/cluster.proto
+++ b/application/src/main/proto/cluster.proto
@@ -56,6 +56,7 @@ enum MessageType {
   CLUSTER_TELEMETRY_SESSION_CLOSE_MESSAGE = 9;
   CLUSTER_TELEMETRY_ATTR_UPDATE_MESSAGE = 10;
   CLUSTER_TELEMETRY_TS_UPDATE_MESSAGE = 11;
+  CLUSTER_RPC_FROM_DEVICE_RESPONSE_MESSAGE = 12;
 }
 
 // Messages related to CLUSTER_TELEMETRY_MESSAGE
@@ -121,3 +122,9 @@ message KeyValueProto {
     bool boolValue = 7;
 }
 
+message FromDeviceRPCResponseProto {
+    int64 requestIdMSB = 1;
+    int64 requestIdLSB = 2;
+    string response = 3;
+    int32 error = 4;
+}
diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/core/BasicActorSystemToDeviceSessionActorMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/core/BasicActorSystemToDeviceSessionActorMsg.java
index cb931f4..cc29891 100644
--- a/common/message/src/main/java/org/thingsboard/server/common/msg/core/BasicActorSystemToDeviceSessionActorMsg.java
+++ b/common/message/src/main/java/org/thingsboard/server/common/msg/core/BasicActorSystemToDeviceSessionActorMsg.java
@@ -42,7 +42,7 @@ public class BasicActorSystemToDeviceSessionActorMsg implements ActorSystemToDev
 
     @Override
     public String toString() {
-        return "BasicToSessionResponseMsg [msg=" + msg + ", sessionId=" + sessionId + "]";
+        return "BasicActorSystemToDeviceSessionActorMsg [msg=" + msg + ", sessionId=" + sessionId + "]";
     }
 
     @Override