thingsboard-aplcache

When firing an RpcBroadcastMsg message or a ToAllNodesMsg

7/6/2018 12:19:33 PM

Details

diff --git a/application/src/main/java/org/thingsboard/server/actors/rpc/RpcManagerActor.java b/application/src/main/java/org/thingsboard/server/actors/rpc/RpcManagerActor.java
index 3f3f70b..2034d6f 100644
--- a/application/src/main/java/org/thingsboard/server/actors/rpc/RpcManagerActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/rpc/RpcManagerActor.java
@@ -5,7 +5,7 @@
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -29,11 +29,7 @@ import org.thingsboard.server.common.msg.cluster.ServerAddress;
 import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
 import org.thingsboard.server.service.cluster.discovery.ServerInstance;
 
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.Map;
-import java.util.Queue;
-import java.util.UUID;
+import java.util.*;
 
 /**
  * @author Andrew Shvayka
@@ -88,7 +84,17 @@ public class RpcManagerActor extends ContextAwareActor {
 
     private void onMsg(RpcBroadcastMsg msg) {
         log.debug("Forwarding msg to session actors {}", msg);
-        sessionActors.keySet().forEach(address -> onMsg(msg.getMsg()));
+        sessionActors.keySet().forEach(address -> {
+            ClusterAPIProtos.ClusterMessage msgWithServerAddress = msg.getMsg()
+                    .toBuilder()
+                    .setServerAddress(ClusterAPIProtos.ServerAddress
+                            .newBuilder()
+                            .setHost(address.getHost())
+                            .setPort(address.getPort())
+                            .build())
+                    .build();
+            onMsg(msgWithServerAddress);
+        });
         pendingMsgs.values().forEach(queue -> queue.add(msg.getMsg()));
     }