thingsboard-memoizeit

Initial refactoring

9/23/2018 10:11:09 AM

Changes

Details

diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
index d33734e..2f78672 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
@@ -65,7 +65,7 @@ import org.thingsboard.server.service.mail.MailExecutorService;
 import org.thingsboard.server.service.queue.MsgQueueService;
 import org.thingsboard.server.service.rpc.DeviceRpcService;
 import org.thingsboard.server.service.script.JsExecutorService;
-import org.thingsboard.server.service.script.JsSandboxService;
+import org.thingsboard.server.service.script.JsInvokeService;
 import org.thingsboard.server.service.state.DeviceStateService;
 import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService;
 
@@ -169,7 +169,7 @@ public class ActorSystemContext {
 
     @Autowired
     @Getter
-    private JsSandboxService jsSandbox;
+    private JsInvokeService jsSandbox;
 
     @Autowired
     @Getter
diff --git a/application/src/main/java/org/thingsboard/server/actors/rpc/RpcManagerActor.java b/application/src/main/java/org/thingsboard/server/actors/rpc/RpcManagerActor.java
index 9e38c17..608a77f 100644
--- a/application/src/main/java/org/thingsboard/server/actors/rpc/RpcManagerActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/rpc/RpcManagerActor.java
@@ -26,6 +26,7 @@ import org.thingsboard.server.actors.service.DefaultActorService;
 import org.thingsboard.server.common.msg.TbActorMsg;
 import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
 import org.thingsboard.server.common.msg.cluster.ServerAddress;
+import org.thingsboard.server.common.msg.cluster.ServerType;
 import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
 import org.thingsboard.server.service.cluster.discovery.ServerInstance;
 
@@ -100,8 +101,7 @@ public class RpcManagerActor extends ContextAwareActor {
 
     private void onMsg(ClusterAPIProtos.ClusterMessage msg) {
         if (msg.hasServerAddress()) {
-            ServerAddress address = new ServerAddress(msg.getServerAddress().getHost(),
-                    msg.getServerAddress().getPort());
+            ServerAddress address = new ServerAddress(msg.getServerAddress().getHost(), msg.getServerAddress().getPort(), ServerType.CORE);
             SessionActorInfo session = sessionActors.get(address);
             if (session != null) {
                 log.debug("{} Forwarding msg to session actor", address);
@@ -112,7 +112,7 @@ public class RpcManagerActor extends ContextAwareActor {
                 if (queue == null) {
                     queue = new LinkedList<>();
                     pendingMsgs.put(new ServerAddress(
-                            msg.getServerAddress().getHost(), msg.getServerAddress().getPort()), queue);
+                            msg.getServerAddress().getHost(), msg.getServerAddress().getPort(), ServerType.CORE), queue);
                 }
                 queue.add(msg);
             }
diff --git a/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java b/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java
index fac5d97..1e9e23b 100644
--- a/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java
+++ b/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java
@@ -115,8 +115,6 @@ public class DefaultActorService implements ActorService {
         actorContext.setStatsActor(statsActor);
 
         rpcService.init(this);
-
-        discoveryService.addListener(this);
         log.info("Actor system initialized.");
     }
 
@@ -202,7 +200,7 @@ public class DefaultActorService implements ActorService {
 
     @Override
     public void onReceivedMsg(ServerAddress source, ClusterAPIProtos.ClusterMessage msg) {
-        ServerAddress serverAddress = new ServerAddress(source.getHost(), source.getPort());
+        ServerAddress serverAddress = new ServerAddress(source.getHost(), source.getPort(), source.getServerType());
         log.info("Received msg [{}] from [{}]", msg.getMessageType().name(), serverAddress);
         if (log.isDebugEnabled()) {
             log.info("MSG: ", msg);
diff --git a/application/src/main/java/org/thingsboard/server/controller/RuleChainController.java b/application/src/main/java/org/thingsboard/server/controller/RuleChainController.java
index 82ab170..fbd5faf 100644
--- a/application/src/main/java/org/thingsboard/server/controller/RuleChainController.java
+++ b/application/src/main/java/org/thingsboard/server/controller/RuleChainController.java
@@ -50,7 +50,7 @@ import org.thingsboard.server.common.data.rule.RuleChainMetaData;
 import org.thingsboard.server.common.msg.TbMsg;
 import org.thingsboard.server.common.msg.TbMsgMetaData;
 import org.thingsboard.server.dao.event.EventService;
-import org.thingsboard.server.service.script.JsSandboxService;
+import org.thingsboard.server.service.script.JsInvokeService;
 import org.thingsboard.server.service.script.RuleNodeJsScriptEngine;
 
 import java.util.List;
@@ -71,7 +71,7 @@ public class RuleChainController extends BaseController {
     private EventService eventService;
 
     @Autowired
-    private JsSandboxService jsSandboxService;
+    private JsInvokeService jsInvokeService;
 
     @PreAuthorize("hasAnyAuthority('TENANT_ADMIN')")
     @RequestMapping(value = "/ruleChain/{ruleChainId}", method = RequestMethod.GET)
@@ -276,7 +276,7 @@ public class RuleChainController extends BaseController {
             String errorText = "";
             ScriptEngine engine = null;
             try {
-                engine = new RuleNodeJsScriptEngine(jsSandboxService, getCurrentUser().getId(), script, argNames);
+                engine = new RuleNodeJsScriptEngine(jsInvokeService, getCurrentUser().getId(), script, argNames);
                 TbMsg inMsg = new TbMsg(UUIDs.timeBased(), msgType, null, new TbMsgMetaData(metadata), data, null, null, 0L);
                 switch (scriptType) {
                     case "update":
diff --git a/application/src/main/java/org/thingsboard/server/service/cluster/discovery/CurrentServerInstanceService.java b/application/src/main/java/org/thingsboard/server/service/cluster/discovery/CurrentServerInstanceService.java
index 2232ef8..3ae7b16 100644
--- a/application/src/main/java/org/thingsboard/server/service/cluster/discovery/CurrentServerInstanceService.java
+++ b/application/src/main/java/org/thingsboard/server/service/cluster/discovery/CurrentServerInstanceService.java
@@ -19,7 +19,8 @@ import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Service;
 import org.springframework.util.Assert;
-import org.thingsboard.server.gen.discovery.ServerInstanceProtos;
+import org.thingsboard.server.common.msg.cluster.ServerAddress;
+import org.thingsboard.server.common.msg.cluster.ServerType;
 
 import javax.annotation.PostConstruct;
 
@@ -43,8 +44,7 @@ public class CurrentServerInstanceService implements ServerInstanceService {
     public void init() {
         Assert.hasLength(rpcHost, missingProperty("rpc.bind_host"));
         Assert.notNull(rpcPort, missingProperty("rpc.bind_port"));
-
-        self = new ServerInstance(ServerInstanceProtos.ServerInfo.newBuilder().setHost(rpcHost).setPort(rpcPort).setTs(System.currentTimeMillis()).build());
+        self = new ServerInstance(new ServerAddress(rpcHost, rpcPort, ServerType.CORE));
         log.info("Current server instance: [{};{}]", self.getHost(), self.getPort());
     }
 
diff --git a/application/src/main/java/org/thingsboard/server/service/cluster/discovery/DiscoveryService.java b/application/src/main/java/org/thingsboard/server/service/cluster/discovery/DiscoveryService.java
index 516fca9..f9caafa 100644
--- a/application/src/main/java/org/thingsboard/server/service/cluster/discovery/DiscoveryService.java
+++ b/application/src/main/java/org/thingsboard/server/service/cluster/discovery/DiscoveryService.java
@@ -30,8 +30,4 @@ public interface DiscoveryService {
 
     List<ServerInstance> getOtherServers();
 
-    boolean addListener(DiscoveryServiceListener listener);
-
-    boolean removeListener(DiscoveryServiceListener listener);
-
 }
diff --git a/application/src/main/java/org/thingsboard/server/service/cluster/discovery/DummyDiscoveryService.java b/application/src/main/java/org/thingsboard/server/service/cluster/discovery/DummyDiscoveryService.java
index c21f1aa..fd91643 100644
--- a/application/src/main/java/org/thingsboard/server/service/cluster/discovery/DummyDiscoveryService.java
+++ b/application/src/main/java/org/thingsboard/server/service/cluster/discovery/DummyDiscoveryService.java
@@ -62,13 +62,5 @@ public class DummyDiscoveryService implements DiscoveryService {
         return Collections.emptyList();
     }
 
-    @Override
-    public boolean addListener(DiscoveryServiceListener listener) {
-        return false;
-    }
 
-    @Override
-    public boolean removeListener(DiscoveryServiceListener listener) {
-        return false;
-    }
 }
diff --git a/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ServerInstance.java b/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ServerInstance.java
index 6eee5f3..8f4525c 100644
--- a/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ServerInstance.java
+++ b/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ServerInstance.java
@@ -19,7 +19,6 @@ import lombok.EqualsAndHashCode;
 import lombok.Getter;
 import lombok.ToString;
 import org.thingsboard.server.common.msg.cluster.ServerAddress;
-import org.thingsboard.server.gen.discovery.ServerInstanceProtos;
 
 /**
  * @author Andrew Shvayka
@@ -41,12 +40,6 @@ public final class ServerInstance implements Comparable<ServerInstance> {
         this.port = serverAddress.getPort();
     }
 
-    public ServerInstance(ServerInstanceProtos.ServerInfo serverInfo) {
-        this.host = serverInfo.getHost();
-        this.port = serverInfo.getPort();
-        this.serverAddress = new ServerAddress(host, port);
-    }
-
     @Override
     public int compareTo(ServerInstance o) {
         return this.serverAddress.compareTo(o.serverAddress);
diff --git a/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ZkDiscoveryService.java b/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ZkDiscoveryService.java
index c3ffbab..19a04d8 100644
--- a/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ZkDiscoveryService.java
+++ b/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ZkDiscoveryService.java
@@ -24,6 +24,8 @@ import org.apache.curator.framework.recipes.cache.ChildData;
 import org.apache.curator.framework.recipes.cache.PathChildrenCache;
 import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
 import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
 import org.apache.curator.retry.RetryForever;
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.zookeeper.CreateMode;
@@ -35,7 +37,9 @@ import org.springframework.context.ApplicationListener;
 import org.springframework.context.annotation.Lazy;
 import org.springframework.stereotype.Service;
 import org.springframework.util.Assert;
+import org.thingsboard.server.actors.service.ActorService;
 import org.thingsboard.server.common.msg.cluster.ServerAddress;
+import org.thingsboard.server.service.cluster.routing.ClusterRoutingService;
 import org.thingsboard.server.service.state.DeviceStateService;
 import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService;
 import org.thingsboard.server.utils.MiscUtils;
@@ -44,7 +48,6 @@ import javax.annotation.PostConstruct;
 import javax.annotation.PreDestroy;
 import java.util.List;
 import java.util.NoSuchElementException;
-import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.stream.Collectors;
 
 /**
@@ -79,7 +82,13 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
     @Lazy
     private DeviceStateService deviceStateService;
 
-    private final List<DiscoveryServiceListener> listeners = new CopyOnWriteArrayList<>();
+    @Autowired
+    @Lazy
+    private ActorService actorService;
+
+    @Autowired
+    @Lazy
+    private ClusterRoutingService routingService;
 
     private CuratorFramework client;
     private PathChildrenCache cache;
@@ -127,12 +136,37 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
                     .creatingParentsIfNeeded()
                     .withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(zkNodesDir + "/", SerializationUtils.serialize(self.getServerAddress()));
             log.info("[{}:{}] Created ZK node for current instance: {}", self.getHost(), self.getPort(), nodePath);
+            client.getConnectionStateListenable().addListener(checkReconnect(self));
         } catch (Exception e) {
             log.error("Failed to create ZK node", e);
             throw new RuntimeException(e);
         }
     }
 
+    private boolean reconnectInProgress = false;
+
+    private synchronized ConnectionStateListener checkReconnect(ServerInstance self) {
+        return (client, newState) -> {
+            log.info("[{}:{}] ZK state changed: {}", self.getHost(), self.getPort(), newState);
+            if (newState == ConnectionState.LOST) {
+                if (!reconnectInProgress) {
+                    reconnectInProgress = true;
+                    reconnect();
+                }
+            }
+        };
+    }
+
+    private void reconnect() {
+        try {
+            client.blockUntilConnected();
+        } catch (InterruptedException e) {
+            log.error("Failed to reconnect to ZK: {}", e.getMessage(), e);
+        }
+        publishCurrentServer();
+        reconnectInProgress = false;
+    }
+
     @Override
     public void unpublishCurrentServer() {
         try {
@@ -156,7 +190,7 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
                 .filter(cd -> !cd.getPath().equals(nodePath))
                 .map(cd -> {
                     try {
-                        return new ServerInstance( (ServerAddress) SerializationUtils.deserialize(cd.getData()));
+                        return new ServerInstance((ServerAddress) SerializationUtils.deserialize(cd.getData()));
                     } catch (NoSuchElementException e) {
                         log.error("Failed to decode ZK node", e);
                         throw new RuntimeException(e);
@@ -166,16 +200,6 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
     }
 
     @Override
-    public boolean addListener(DiscoveryServiceListener listener) {
-        return listeners.add(listener);
-    }
-
-    @Override
-    public boolean removeListener(DiscoveryServiceListener listener) {
-        return listeners.remove(listener);
-    }
-
-    @Override
     public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) {
         publishCurrentServer();
         getOtherServers().forEach(
@@ -198,7 +222,7 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
         }
         ServerInstance instance;
         try {
-            ServerAddress serverAddress  = SerializationUtils.deserialize(data.getData());
+            ServerAddress serverAddress = SerializationUtils.deserialize(data.getData());
             instance = new ServerInstance(serverAddress);
         } catch (SerializationException e) {
             log.error("Failed to decode server instance for node {}", data.getPath(), e);
@@ -207,17 +231,20 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
         log.info("Processing [{}] event for [{}:{}]", pathChildrenCacheEvent.getType(), instance.getHost(), instance.getPort());
         switch (pathChildrenCacheEvent.getType()) {
             case CHILD_ADDED:
+                routingService.onServerAdded(instance);
                 tsSubService.onClusterUpdate();
                 deviceStateService.onClusterUpdate();
-                listeners.forEach(listener -> listener.onServerAdded(instance));
+                actorService.onServerAdded(instance);
                 break;
             case CHILD_UPDATED:
-                listeners.forEach(listener -> listener.onServerUpdated(instance));
+                routingService.onServerUpdated(instance);
+                actorService.onServerUpdated(instance);
                 break;
             case CHILD_REMOVED:
+                routingService.onServerRemoved(instance);
                 tsSubService.onClusterUpdate();
                 deviceStateService.onClusterUpdate();
-                listeners.forEach(listener -> listener.onServerRemoved(instance));
+                actorService.onServerRemoved(instance);
                 break;
             default:
                 break;
diff --git a/application/src/main/java/org/thingsboard/server/service/cluster/routing/ClusterRoutingService.java b/application/src/main/java/org/thingsboard/server/service/cluster/routing/ClusterRoutingService.java
index 272073d..bb76c74 100644
--- a/application/src/main/java/org/thingsboard/server/service/cluster/routing/ClusterRoutingService.java
+++ b/application/src/main/java/org/thingsboard/server/service/cluster/routing/ClusterRoutingService.java
@@ -17,6 +17,8 @@ package org.thingsboard.server.service.cluster.routing;
 
 import org.thingsboard.server.common.data.id.EntityId;
 import org.thingsboard.server.common.msg.cluster.ServerAddress;
+import org.thingsboard.server.common.msg.cluster.ServerType;
+import org.thingsboard.server.service.cluster.discovery.DiscoveryServiceListener;
 
 import java.util.Optional;
 import java.util.UUID;
@@ -24,7 +26,7 @@ import java.util.UUID;
 /**
  * @author Andrew Shvayka
  */
-public interface ClusterRoutingService {
+public interface ClusterRoutingService extends DiscoveryServiceListener {
 
     ServerAddress getCurrentServer();
 
@@ -32,4 +34,9 @@ public interface ClusterRoutingService {
 
     Optional<ServerAddress> resolveById(EntityId entityId);
 
+    Optional<ServerAddress> resolveByUuid(ServerType server, UUID uuid);
+
+    Optional<ServerAddress> resolveById(ServerType server, EntityId entityId);
+
+
 }
diff --git a/application/src/main/java/org/thingsboard/server/service/cluster/routing/ConsistentClusterRoutingService.java b/application/src/main/java/org/thingsboard/server/service/cluster/routing/ConsistentClusterRoutingService.java
index 5087b5c..f47f6d2 100644
--- a/application/src/main/java/org/thingsboard/server/service/cluster/routing/ConsistentClusterRoutingService.java
+++ b/application/src/main/java/org/thingsboard/server/service/cluster/routing/ConsistentClusterRoutingService.java
@@ -24,12 +24,14 @@ import org.springframework.stereotype.Service;
 import org.springframework.util.Assert;
 import org.thingsboard.server.common.data.id.EntityId;
 import org.thingsboard.server.common.msg.cluster.ServerAddress;
+import org.thingsboard.server.common.msg.cluster.ServerType;
 import org.thingsboard.server.service.cluster.discovery.DiscoveryService;
 import org.thingsboard.server.service.cluster.discovery.DiscoveryServiceListener;
 import org.thingsboard.server.service.cluster.discovery.ServerInstance;
 import org.thingsboard.server.utils.MiscUtils;
 
 import javax.annotation.PostConstruct;
+import java.util.Arrays;
 import java.util.Optional;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentNavigableMap;
@@ -41,7 +43,7 @@ import java.util.concurrent.ConcurrentSkipListMap;
 
 @Service
 @Slf4j
-public class ConsistentClusterRoutingService implements ClusterRoutingService, DiscoveryServiceListener {
+public class ConsistentClusterRoutingService implements ClusterRoutingService {
 
     @Autowired
     private DiscoveryService discoveryService;
@@ -55,15 +57,19 @@ public class ConsistentClusterRoutingService implements ClusterRoutingService, D
 
     private HashFunction hashFunction;
 
-    private final ConcurrentNavigableMap<Long, ServerInstance> circle =
-            new ConcurrentSkipListMap<>();
+    private ConsistentHashCircle[] circles;
+    private ConsistentHashCircle rootCircle;
 
     @PostConstruct
     public void init() {
         log.info("Initializing Cluster routing service!");
-        hashFunction = MiscUtils.forName(hashFunctionName);
-        discoveryService.addListener(this);
+        this.hashFunction = MiscUtils.forName(hashFunctionName);
         this.currentServer = discoveryService.getCurrentServer();
+        this.circles = new ConsistentHashCircle[ServerType.values().length];
+        for (ServerType serverType : ServerType.values()) {
+            circles[serverType.ordinal()] = new ConsistentHashCircle();
+        }
+        rootCircle = circles[ServerType.CORE.ordinal()];
         addNode(discoveryService.getCurrentServer());
         for (ServerInstance instance : discoveryService.getOtherServers()) {
             addNode(instance);
@@ -79,11 +85,25 @@ public class ConsistentClusterRoutingService implements ClusterRoutingService, D
 
     @Override
     public Optional<ServerAddress> resolveById(EntityId entityId) {
-        return resolveByUuid(entityId.getId());
+        return resolveByUuid(rootCircle, entityId.getId());
     }
 
     @Override
     public Optional<ServerAddress> resolveByUuid(UUID uuid) {
+        return resolveByUuid(rootCircle, uuid);
+    }
+
+    @Override
+    public Optional<ServerAddress> resolveByUuid(ServerType server, UUID uuid) {
+        return resolveByUuid(circles[server.ordinal()], uuid);
+    }
+
+    @Override
+    public Optional<ServerAddress> resolveById(ServerType server, EntityId entityId) {
+        return resolveByUuid(circles[server.ordinal()], entityId.getId());
+    }
+
+    private Optional<ServerAddress> resolveByUuid(ConsistentHashCircle circle, UUID uuid) {
         Assert.notNull(uuid);
         if (circle.isEmpty()) {
             return Optional.empty();
@@ -125,13 +145,13 @@ public class ConsistentClusterRoutingService implements ClusterRoutingService, D
 
     private void addNode(ServerInstance instance) {
         for (int i = 0; i < virtualNodesSize; i++) {
-            circle.put(hash(instance, i).asLong(), instance);
+            circles[instance.getServerAddress().getServerType().ordinal()].put(hash(instance, i).asLong(), instance);
         }
     }
 
     private void removeNode(ServerInstance instance) {
         for (int i = 0; i < virtualNodesSize; i++) {
-            circle.remove(hash(instance, i).asLong());
+            circles[instance.getServerAddress().getServerType().ordinal()].remove(hash(instance, i).asLong());
         }
     }
 
@@ -141,7 +161,7 @@ public class ConsistentClusterRoutingService implements ClusterRoutingService, D
 
     private void logCircle() {
         log.trace("Consistent Hash Circle Start");
-        circle.entrySet().forEach((e) -> log.debug("{} -> {}", e.getKey(), e.getValue().getServerAddress()));
+        Arrays.asList(circles).forEach(ConsistentHashCircle::log);
         log.trace("Consistent Hash Circle End");
     }
 
diff --git a/application/src/main/java/org/thingsboard/server/service/cluster/routing/ConsistentHashCircle.java b/application/src/main/java/org/thingsboard/server/service/cluster/routing/ConsistentHashCircle.java
new file mode 100644
index 0000000..a8c8f37
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/service/cluster/routing/ConsistentHashCircle.java
@@ -0,0 +1,63 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.service.cluster.routing;
+
+import lombok.extern.slf4j.Slf4j;
+import org.thingsboard.server.service.cluster.discovery.ServerInstance;
+
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+
+/**
+ * Created by ashvayka on 23.09.18.
+ */
+@Slf4j
+public class ConsistentHashCircle {
+    private final ConcurrentNavigableMap<Long, ServerInstance> circle =
+            new ConcurrentSkipListMap<>();
+
+    public void put(long hash, ServerInstance instance) {
+        circle.put(hash, instance);
+    }
+
+    public void remove(long hash) {
+        circle.remove(hash);
+    }
+
+    public boolean isEmpty() {
+        return circle.isEmpty();
+    }
+
+    public boolean containsKey(Long hash) {
+        return circle.containsKey(hash);
+    }
+
+    public ConcurrentNavigableMap<Long, ServerInstance> tailMap(Long hash) {
+        return circle.tailMap(hash);
+    }
+
+    public Long firstKey() {
+        return circle.firstKey();
+    }
+
+    public ServerInstance get(Long hash) {
+        return circle.get(hash);
+    }
+
+    public void log() {
+        circle.entrySet().forEach((e) -> log.debug("{} -> {}", e.getKey(), e.getValue().getServerAddress()));
+    }
+}
diff --git a/application/src/main/java/org/thingsboard/server/service/cluster/rpc/GrpcSession.java b/application/src/main/java/org/thingsboard/server/service/cluster/rpc/GrpcSession.java
index 7216c43..55cf52c 100644
--- a/application/src/main/java/org/thingsboard/server/service/cluster/rpc/GrpcSession.java
+++ b/application/src/main/java/org/thingsboard/server/service/cluster/rpc/GrpcSession.java
@@ -19,6 +19,7 @@ import io.grpc.stub.StreamObserver;
 import lombok.Data;
 import lombok.extern.slf4j.Slf4j;
 import org.thingsboard.server.common.msg.cluster.ServerAddress;
+import org.thingsboard.server.common.msg.cluster.ServerType;
 import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
 
 import java.io.Closeable;
@@ -61,8 +62,8 @@ public final class GrpcSession implements Closeable {
             public void onNext(ClusterAPIProtos.ClusterMessage clusterMessage) {
                 if (!connected && clusterMessage.getMessageType() == ClusterAPIProtos.MessageType.CONNECT_RPC_MESSAGE) {
                     connected = true;
-                    ServerAddress rpcAddress = new ServerAddress(clusterMessage.getServerAddress().getHost(), clusterMessage.getServerAddress().getPort());
-                    remoteServer = new ServerAddress(rpcAddress.getHost(), rpcAddress.getPort());
+                    ServerAddress rpcAddress = new ServerAddress(clusterMessage.getServerAddress().getHost(), clusterMessage.getServerAddress().getPort(), ServerType.CORE);
+                    remoteServer = new ServerAddress(rpcAddress.getHost(), rpcAddress.getPort(), ServerType.CORE);
                     listener.onConnected(GrpcSession.this);
                 }
                 if (connected) {
diff --git a/application/src/main/java/org/thingsboard/server/service/script/RuleNodeJsScriptEngine.java b/application/src/main/java/org/thingsboard/server/service/script/RuleNodeJsScriptEngine.java
index 767dc05..c001a1a 100644
--- a/application/src/main/java/org/thingsboard/server/service/script/RuleNodeJsScriptEngine.java
+++ b/application/src/main/java/org/thingsboard/server/service/script/RuleNodeJsScriptEngine.java
@@ -37,12 +37,12 @@ import java.util.concurrent.ExecutionException;
 public class RuleNodeJsScriptEngine implements org.thingsboard.rule.engine.api.ScriptEngine {
 
     private static final ObjectMapper mapper = new ObjectMapper();
-    private final JsSandboxService sandboxService;
+    private final JsInvokeService sandboxService;
 
     private final UUID scriptId;
     private final EntityId entityId;
 
-    public RuleNodeJsScriptEngine(JsSandboxService sandboxService, EntityId entityId, String script, String... argNames) {
+    public RuleNodeJsScriptEngine(JsInvokeService sandboxService, EntityId entityId, String script, String... argNames) {
         this.sandboxService = sandboxService;
         this.entityId = entityId;
         try {
diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java
index 548c417..5c3b88a 100644
--- a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java
+++ b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java
@@ -232,7 +232,7 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio
                 new SubscriptionState(proto.getSessionId(), proto.getSubscriptionId(),
                         EntityIdFactory.getByTypeAndId(proto.getEntityType(), proto.getEntityId()),
                         TelemetryFeature.valueOf(proto.getType()), proto.getAllKeys(), statesMap, proto.getScope()),
-                false, new ServerAddress(serverAddress.getHost(), serverAddress.getPort()));
+                false, new ServerAddress(serverAddress.getHost(), serverAddress.getPort(), serverAddress.getServerType()));
 
         addRemoteWsSubscription(serverAddress, proto.getSessionId(), subscription);
     }
diff --git a/application/src/main/proto/jsinvoke.proto b/application/src/main/proto/jsinvoke.proto
new file mode 100644
index 0000000..c465309
--- /dev/null
+++ b/application/src/main/proto/jsinvoke.proto
@@ -0,0 +1,37 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+syntax = "proto3";
+package cluster;
+
+option java_package = "org.thingsboard.server.gen.js";
+option java_outer_classname = "JsInvokeProtos";
+
+service JsInvokeRpcService {
+  rpc handleMsgs(stream JsInvokeRequest) returns (stream JsInvokeResponse) {}
+}
+
+message JsInvokeRequest {
+  string scriptId = 1;
+  string scriptBody = 2;
+  repeated string args = 3;
+}
+
+message JsInvokeResponse {
+  string result = 1;
+  string errorName = 2;
+  string errorDetails = 3;
+}
+
diff --git a/application/src/test/java/org/thingsboard/server/service/script/RuleNodeJsScriptEngineTest.java b/application/src/test/java/org/thingsboard/server/service/script/RuleNodeJsScriptEngineTest.java
index 88961dc..730121f 100644
--- a/application/src/test/java/org/thingsboard/server/service/script/RuleNodeJsScriptEngineTest.java
+++ b/application/src/test/java/org/thingsboard/server/service/script/RuleNodeJsScriptEngineTest.java
@@ -31,7 +31,6 @@ import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.*;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.junit.Assert.*;
@@ -39,13 +38,13 @@ import static org.junit.Assert.*;
 public class RuleNodeJsScriptEngineTest {
 
     private ScriptEngine scriptEngine;
-    private TestNashornJsSandboxService jsSandboxService;
+    private TestNashornJsInvokeService jsSandboxService;
 
     private EntityId ruleNodeId = new RuleNodeId(UUIDs.timeBased());
 
     @Before
     public void beforeTest() throws Exception {
-        jsSandboxService = new TestNashornJsSandboxService(false, 1, 100, 3);
+        jsSandboxService = new TestNashornJsInvokeService(false, 1, 100, 3);
     }
 
     @After
diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/cluster/ServerAddress.java b/common/message/src/main/java/org/thingsboard/server/common/msg/cluster/ServerAddress.java
index 4b65d6f..60c9d12 100644
--- a/common/message/src/main/java/org/thingsboard/server/common/msg/cluster/ServerAddress.java
+++ b/common/message/src/main/java/org/thingsboard/server/common/msg/cluster/ServerAddress.java
@@ -29,6 +29,7 @@ public class ServerAddress implements Comparable<ServerAddress>, Serializable {
 
     private final String host;
     private final int port;
+    private final ServerType serverType;
 
     @Override
     public int compareTo(ServerAddress o) {