thingsboard-aplcache

Merge pull request #919 from janbols/master RpcBroadcastMsg

7/9/2018 11:41:46 AM

Details

diff --git a/application/src/main/java/org/thingsboard/server/actors/rpc/RpcManagerActor.java b/application/src/main/java/org/thingsboard/server/actors/rpc/RpcManagerActor.java
index 3f3f70b..9e38c17 100644
--- a/application/src/main/java/org/thingsboard/server/actors/rpc/RpcManagerActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/rpc/RpcManagerActor.java
@@ -29,11 +29,7 @@ import org.thingsboard.server.common.msg.cluster.ServerAddress;
 import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
 import org.thingsboard.server.service.cluster.discovery.ServerInstance;
 
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.Map;
-import java.util.Queue;
-import java.util.UUID;
+import java.util.*;
 
 /**
  * @author Andrew Shvayka
@@ -88,7 +84,17 @@ public class RpcManagerActor extends ContextAwareActor {
 
     private void onMsg(RpcBroadcastMsg msg) {
         log.debug("Forwarding msg to session actors {}", msg);
-        sessionActors.keySet().forEach(address -> onMsg(msg.getMsg()));
+        sessionActors.keySet().forEach(address -> {
+            ClusterAPIProtos.ClusterMessage msgWithServerAddress = msg.getMsg()
+                    .toBuilder()
+                    .setServerAddress(ClusterAPIProtos.ServerAddress
+                            .newBuilder()
+                            .setHost(address.getHost())
+                            .setPort(address.getPort())
+                            .build())
+                    .build();
+            onMsg(msgWithServerAddress);
+        });
         pendingMsgs.values().forEach(queue -> queue.add(msg.getMsg()));
     }