thingsboard-memoizeit
Changes
application/src/main/java/org/thingsboard/server/service/cluster/discovery/CurrentServerInstanceService.java 6(+3 -3)
application/src/main/java/org/thingsboard/server/service/cluster/discovery/DiscoveryService.java 4(+0 -4)
application/src/main/java/org/thingsboard/server/service/cluster/discovery/DummyDiscoveryService.java 8(+0 -8)
application/src/main/java/org/thingsboard/server/service/cluster/discovery/ServerInstance.java 7(+0 -7)
application/src/main/java/org/thingsboard/server/service/cluster/discovery/ZkDiscoveryService.java 61(+44 -17)
application/src/main/java/org/thingsboard/server/service/cluster/routing/ClusterRoutingService.java 9(+8 -1)
application/src/main/java/org/thingsboard/server/service/cluster/routing/ConsistentClusterRoutingService.java 38(+29 -9)
application/src/main/java/org/thingsboard/server/service/cluster/routing/ConsistentHashCircle.java 63(+63 -0)
application/src/main/java/org/thingsboard/server/service/script/AbstractNashornJsInvokeService.java 2(+1 -1)
application/src/main/java/org/thingsboard/server/service/script/NashornJsInvokeService.java 2(+1 -1)
application/src/main/java/org/thingsboard/server/service/script/RuleNodeJsScriptEngine.java 4(+2 -2)
application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java 2(+1 -1)
application/src/main/proto/jsinvoke.proto 37(+37 -0)
application/src/test/java/org/thingsboard/server/service/script/RuleNodeJsScriptEngineTest.java 5(+2 -3)
Details
diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
index d33734e..2f78672 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
@@ -65,7 +65,7 @@ import org.thingsboard.server.service.mail.MailExecutorService;
import org.thingsboard.server.service.queue.MsgQueueService;
import org.thingsboard.server.service.rpc.DeviceRpcService;
import org.thingsboard.server.service.script.JsExecutorService;
-import org.thingsboard.server.service.script.JsSandboxService;
+import org.thingsboard.server.service.script.JsInvokeService;
import org.thingsboard.server.service.state.DeviceStateService;
import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService;
@@ -169,7 +169,7 @@ public class ActorSystemContext {
@Autowired
@Getter
- private JsSandboxService jsSandbox;
+ private JsInvokeService jsSandbox;
@Autowired
@Getter
diff --git a/application/src/main/java/org/thingsboard/server/actors/rpc/RpcManagerActor.java b/application/src/main/java/org/thingsboard/server/actors/rpc/RpcManagerActor.java
index 9e38c17..608a77f 100644
--- a/application/src/main/java/org/thingsboard/server/actors/rpc/RpcManagerActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/rpc/RpcManagerActor.java
@@ -26,6 +26,7 @@ import org.thingsboard.server.actors.service.DefaultActorService;
import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
import org.thingsboard.server.common.msg.cluster.ServerAddress;
+import org.thingsboard.server.common.msg.cluster.ServerType;
import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
import org.thingsboard.server.service.cluster.discovery.ServerInstance;
@@ -100,8 +101,7 @@ public class RpcManagerActor extends ContextAwareActor {
private void onMsg(ClusterAPIProtos.ClusterMessage msg) {
if (msg.hasServerAddress()) {
- ServerAddress address = new ServerAddress(msg.getServerAddress().getHost(),
- msg.getServerAddress().getPort());
+ ServerAddress address = new ServerAddress(msg.getServerAddress().getHost(), msg.getServerAddress().getPort(), ServerType.CORE);
SessionActorInfo session = sessionActors.get(address);
if (session != null) {
log.debug("{} Forwarding msg to session actor", address);
@@ -112,7 +112,7 @@ public class RpcManagerActor extends ContextAwareActor {
if (queue == null) {
queue = new LinkedList<>();
pendingMsgs.put(new ServerAddress(
- msg.getServerAddress().getHost(), msg.getServerAddress().getPort()), queue);
+ msg.getServerAddress().getHost(), msg.getServerAddress().getPort(), ServerType.CORE), queue);
}
queue.add(msg);
}
diff --git a/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java b/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java
index fac5d97..1e9e23b 100644
--- a/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java
+++ b/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java
@@ -115,8 +115,6 @@ public class DefaultActorService implements ActorService {
actorContext.setStatsActor(statsActor);
rpcService.init(this);
-
- discoveryService.addListener(this);
log.info("Actor system initialized.");
}
@@ -202,7 +200,7 @@ public class DefaultActorService implements ActorService {
@Override
public void onReceivedMsg(ServerAddress source, ClusterAPIProtos.ClusterMessage msg) {
- ServerAddress serverAddress = new ServerAddress(source.getHost(), source.getPort());
+ ServerAddress serverAddress = new ServerAddress(source.getHost(), source.getPort(), source.getServerType());
log.info("Received msg [{}] from [{}]", msg.getMessageType().name(), serverAddress);
if (log.isDebugEnabled()) {
log.info("MSG: ", msg);
diff --git a/application/src/main/java/org/thingsboard/server/controller/RuleChainController.java b/application/src/main/java/org/thingsboard/server/controller/RuleChainController.java
index 82ab170..fbd5faf 100644
--- a/application/src/main/java/org/thingsboard/server/controller/RuleChainController.java
+++ b/application/src/main/java/org/thingsboard/server/controller/RuleChainController.java
@@ -50,7 +50,7 @@ import org.thingsboard.server.common.data.rule.RuleChainMetaData;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.dao.event.EventService;
-import org.thingsboard.server.service.script.JsSandboxService;
+import org.thingsboard.server.service.script.JsInvokeService;
import org.thingsboard.server.service.script.RuleNodeJsScriptEngine;
import java.util.List;
@@ -71,7 +71,7 @@ public class RuleChainController extends BaseController {
private EventService eventService;
@Autowired
- private JsSandboxService jsSandboxService;
+ private JsInvokeService jsInvokeService;
@PreAuthorize("hasAnyAuthority('TENANT_ADMIN')")
@RequestMapping(value = "/ruleChain/{ruleChainId}", method = RequestMethod.GET)
@@ -276,7 +276,7 @@ public class RuleChainController extends BaseController {
String errorText = "";
ScriptEngine engine = null;
try {
- engine = new RuleNodeJsScriptEngine(jsSandboxService, getCurrentUser().getId(), script, argNames);
+ engine = new RuleNodeJsScriptEngine(jsInvokeService, getCurrentUser().getId(), script, argNames);
TbMsg inMsg = new TbMsg(UUIDs.timeBased(), msgType, null, new TbMsgMetaData(metadata), data, null, null, 0L);
switch (scriptType) {
case "update":
diff --git a/application/src/main/java/org/thingsboard/server/service/cluster/discovery/CurrentServerInstanceService.java b/application/src/main/java/org/thingsboard/server/service/cluster/discovery/CurrentServerInstanceService.java
index 2232ef8..3ae7b16 100644
--- a/application/src/main/java/org/thingsboard/server/service/cluster/discovery/CurrentServerInstanceService.java
+++ b/application/src/main/java/org/thingsboard/server/service/cluster/discovery/CurrentServerInstanceService.java
@@ -19,7 +19,8 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.util.Assert;
-import org.thingsboard.server.gen.discovery.ServerInstanceProtos;
+import org.thingsboard.server.common.msg.cluster.ServerAddress;
+import org.thingsboard.server.common.msg.cluster.ServerType;
import javax.annotation.PostConstruct;
@@ -43,8 +44,7 @@ public class CurrentServerInstanceService implements ServerInstanceService {
public void init() {
Assert.hasLength(rpcHost, missingProperty("rpc.bind_host"));
Assert.notNull(rpcPort, missingProperty("rpc.bind_port"));
-
- self = new ServerInstance(ServerInstanceProtos.ServerInfo.newBuilder().setHost(rpcHost).setPort(rpcPort).setTs(System.currentTimeMillis()).build());
+ self = new ServerInstance(new ServerAddress(rpcHost, rpcPort, ServerType.CORE));
log.info("Current server instance: [{};{}]", self.getHost(), self.getPort());
}
diff --git a/application/src/main/java/org/thingsboard/server/service/cluster/discovery/DiscoveryService.java b/application/src/main/java/org/thingsboard/server/service/cluster/discovery/DiscoveryService.java
index 516fca9..f9caafa 100644
--- a/application/src/main/java/org/thingsboard/server/service/cluster/discovery/DiscoveryService.java
+++ b/application/src/main/java/org/thingsboard/server/service/cluster/discovery/DiscoveryService.java
@@ -30,8 +30,4 @@ public interface DiscoveryService {
List<ServerInstance> getOtherServers();
- boolean addListener(DiscoveryServiceListener listener);
-
- boolean removeListener(DiscoveryServiceListener listener);
-
}
diff --git a/application/src/main/java/org/thingsboard/server/service/cluster/discovery/DummyDiscoveryService.java b/application/src/main/java/org/thingsboard/server/service/cluster/discovery/DummyDiscoveryService.java
index c21f1aa..fd91643 100644
--- a/application/src/main/java/org/thingsboard/server/service/cluster/discovery/DummyDiscoveryService.java
+++ b/application/src/main/java/org/thingsboard/server/service/cluster/discovery/DummyDiscoveryService.java
@@ -62,13 +62,5 @@ public class DummyDiscoveryService implements DiscoveryService {
return Collections.emptyList();
}
- @Override
- public boolean addListener(DiscoveryServiceListener listener) {
- return false;
- }
- @Override
- public boolean removeListener(DiscoveryServiceListener listener) {
- return false;
- }
}
diff --git a/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ServerInstance.java b/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ServerInstance.java
index 6eee5f3..8f4525c 100644
--- a/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ServerInstance.java
+++ b/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ServerInstance.java
@@ -19,7 +19,6 @@ import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.ToString;
import org.thingsboard.server.common.msg.cluster.ServerAddress;
-import org.thingsboard.server.gen.discovery.ServerInstanceProtos;
/**
* @author Andrew Shvayka
@@ -41,12 +40,6 @@ public final class ServerInstance implements Comparable<ServerInstance> {
this.port = serverAddress.getPort();
}
- public ServerInstance(ServerInstanceProtos.ServerInfo serverInfo) {
- this.host = serverInfo.getHost();
- this.port = serverInfo.getPort();
- this.serverAddress = new ServerAddress(host, port);
- }
-
@Override
public int compareTo(ServerInstance o) {
return this.serverAddress.compareTo(o.serverAddress);
diff --git a/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ZkDiscoveryService.java b/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ZkDiscoveryService.java
index c3ffbab..19a04d8 100644
--- a/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ZkDiscoveryService.java
+++ b/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ZkDiscoveryService.java
@@ -24,6 +24,8 @@ import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.retry.RetryForever;
import org.apache.curator.utils.CloseableUtils;
import org.apache.zookeeper.CreateMode;
@@ -35,7 +37,9 @@ import org.springframework.context.ApplicationListener;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
import org.springframework.util.Assert;
+import org.thingsboard.server.actors.service.ActorService;
import org.thingsboard.server.common.msg.cluster.ServerAddress;
+import org.thingsboard.server.service.cluster.routing.ClusterRoutingService;
import org.thingsboard.server.service.state.DeviceStateService;
import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService;
import org.thingsboard.server.utils.MiscUtils;
@@ -44,7 +48,6 @@ import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.List;
import java.util.NoSuchElementException;
-import java.util.concurrent.CopyOnWriteArrayList;
import java.util.stream.Collectors;
/**
@@ -79,7 +82,13 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
@Lazy
private DeviceStateService deviceStateService;
- private final List<DiscoveryServiceListener> listeners = new CopyOnWriteArrayList<>();
+ @Autowired
+ @Lazy
+ private ActorService actorService;
+
+ @Autowired
+ @Lazy
+ private ClusterRoutingService routingService;
private CuratorFramework client;
private PathChildrenCache cache;
@@ -127,12 +136,37 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
.creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(zkNodesDir + "/", SerializationUtils.serialize(self.getServerAddress()));
log.info("[{}:{}] Created ZK node for current instance: {}", self.getHost(), self.getPort(), nodePath);
+ client.getConnectionStateListenable().addListener(checkReconnect(self));
} catch (Exception e) {
log.error("Failed to create ZK node", e);
throw new RuntimeException(e);
}
}
+ private boolean reconnectInProgress = false;
+
+ private synchronized ConnectionStateListener checkReconnect(ServerInstance self) {
+ return (client, newState) -> {
+ log.info("[{}:{}] ZK state changed: {}", self.getHost(), self.getPort(), newState);
+ if (newState == ConnectionState.LOST) {
+ if (!reconnectInProgress) {
+ reconnectInProgress = true;
+ reconnect();
+ }
+ }
+ };
+ }
+
+ private void reconnect() {
+ try {
+ client.blockUntilConnected();
+ } catch (InterruptedException e) {
+ log.error("Failed to reconnect to ZK: {}", e.getMessage(), e);
+ }
+ publishCurrentServer();
+ reconnectInProgress = false;
+ }
+
@Override
public void unpublishCurrentServer() {
try {
@@ -156,7 +190,7 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
.filter(cd -> !cd.getPath().equals(nodePath))
.map(cd -> {
try {
- return new ServerInstance( (ServerAddress) SerializationUtils.deserialize(cd.getData()));
+ return new ServerInstance((ServerAddress) SerializationUtils.deserialize(cd.getData()));
} catch (NoSuchElementException e) {
log.error("Failed to decode ZK node", e);
throw new RuntimeException(e);
@@ -166,16 +200,6 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
}
@Override
- public boolean addListener(DiscoveryServiceListener listener) {
- return listeners.add(listener);
- }
-
- @Override
- public boolean removeListener(DiscoveryServiceListener listener) {
- return listeners.remove(listener);
- }
-
- @Override
public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) {
publishCurrentServer();
getOtherServers().forEach(
@@ -198,7 +222,7 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
}
ServerInstance instance;
try {
- ServerAddress serverAddress = SerializationUtils.deserialize(data.getData());
+ ServerAddress serverAddress = SerializationUtils.deserialize(data.getData());
instance = new ServerInstance(serverAddress);
} catch (SerializationException e) {
log.error("Failed to decode server instance for node {}", data.getPath(), e);
@@ -207,17 +231,20 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
log.info("Processing [{}] event for [{}:{}]", pathChildrenCacheEvent.getType(), instance.getHost(), instance.getPort());
switch (pathChildrenCacheEvent.getType()) {
case CHILD_ADDED:
+ routingService.onServerAdded(instance);
tsSubService.onClusterUpdate();
deviceStateService.onClusterUpdate();
- listeners.forEach(listener -> listener.onServerAdded(instance));
+ actorService.onServerAdded(instance);
break;
case CHILD_UPDATED:
- listeners.forEach(listener -> listener.onServerUpdated(instance));
+ routingService.onServerUpdated(instance);
+ actorService.onServerUpdated(instance);
break;
case CHILD_REMOVED:
+ routingService.onServerRemoved(instance);
tsSubService.onClusterUpdate();
deviceStateService.onClusterUpdate();
- listeners.forEach(listener -> listener.onServerRemoved(instance));
+ actorService.onServerRemoved(instance);
break;
default:
break;
diff --git a/application/src/main/java/org/thingsboard/server/service/cluster/routing/ClusterRoutingService.java b/application/src/main/java/org/thingsboard/server/service/cluster/routing/ClusterRoutingService.java
index 272073d..bb76c74 100644
--- a/application/src/main/java/org/thingsboard/server/service/cluster/routing/ClusterRoutingService.java
+++ b/application/src/main/java/org/thingsboard/server/service/cluster/routing/ClusterRoutingService.java
@@ -17,6 +17,8 @@ package org.thingsboard.server.service.cluster.routing;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.msg.cluster.ServerAddress;
+import org.thingsboard.server.common.msg.cluster.ServerType;
+import org.thingsboard.server.service.cluster.discovery.DiscoveryServiceListener;
import java.util.Optional;
import java.util.UUID;
@@ -24,7 +26,7 @@ import java.util.UUID;
/**
* @author Andrew Shvayka
*/
-public interface ClusterRoutingService {
+public interface ClusterRoutingService extends DiscoveryServiceListener {
ServerAddress getCurrentServer();
@@ -32,4 +34,9 @@ public interface ClusterRoutingService {
Optional<ServerAddress> resolveById(EntityId entityId);
+ Optional<ServerAddress> resolveByUuid(ServerType server, UUID uuid);
+
+ Optional<ServerAddress> resolveById(ServerType server, EntityId entityId);
+
+
}
diff --git a/application/src/main/java/org/thingsboard/server/service/cluster/routing/ConsistentClusterRoutingService.java b/application/src/main/java/org/thingsboard/server/service/cluster/routing/ConsistentClusterRoutingService.java
index 5087b5c..f47f6d2 100644
--- a/application/src/main/java/org/thingsboard/server/service/cluster/routing/ConsistentClusterRoutingService.java
+++ b/application/src/main/java/org/thingsboard/server/service/cluster/routing/ConsistentClusterRoutingService.java
@@ -24,12 +24,14 @@ import org.springframework.stereotype.Service;
import org.springframework.util.Assert;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.msg.cluster.ServerAddress;
+import org.thingsboard.server.common.msg.cluster.ServerType;
import org.thingsboard.server.service.cluster.discovery.DiscoveryService;
import org.thingsboard.server.service.cluster.discovery.DiscoveryServiceListener;
import org.thingsboard.server.service.cluster.discovery.ServerInstance;
import org.thingsboard.server.utils.MiscUtils;
import javax.annotation.PostConstruct;
+import java.util.Arrays;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ConcurrentNavigableMap;
@@ -41,7 +43,7 @@ import java.util.concurrent.ConcurrentSkipListMap;
@Service
@Slf4j
-public class ConsistentClusterRoutingService implements ClusterRoutingService, DiscoveryServiceListener {
+public class ConsistentClusterRoutingService implements ClusterRoutingService {
@Autowired
private DiscoveryService discoveryService;
@@ -55,15 +57,19 @@ public class ConsistentClusterRoutingService implements ClusterRoutingService, D
private HashFunction hashFunction;
- private final ConcurrentNavigableMap<Long, ServerInstance> circle =
- new ConcurrentSkipListMap<>();
+ private ConsistentHashCircle[] circles;
+ private ConsistentHashCircle rootCircle;
@PostConstruct
public void init() {
log.info("Initializing Cluster routing service!");
- hashFunction = MiscUtils.forName(hashFunctionName);
- discoveryService.addListener(this);
+ this.hashFunction = MiscUtils.forName(hashFunctionName);
this.currentServer = discoveryService.getCurrentServer();
+ this.circles = new ConsistentHashCircle[ServerType.values().length];
+ for (ServerType serverType : ServerType.values()) {
+ circles[serverType.ordinal()] = new ConsistentHashCircle();
+ }
+ rootCircle = circles[ServerType.CORE.ordinal()];
addNode(discoveryService.getCurrentServer());
for (ServerInstance instance : discoveryService.getOtherServers()) {
addNode(instance);
@@ -79,11 +85,25 @@ public class ConsistentClusterRoutingService implements ClusterRoutingService, D
@Override
public Optional<ServerAddress> resolveById(EntityId entityId) {
- return resolveByUuid(entityId.getId());
+ return resolveByUuid(rootCircle, entityId.getId());
}
@Override
public Optional<ServerAddress> resolveByUuid(UUID uuid) {
+ return resolveByUuid(rootCircle, uuid);
+ }
+
+ @Override
+ public Optional<ServerAddress> resolveByUuid(ServerType server, UUID uuid) {
+ return resolveByUuid(circles[server.ordinal()], uuid);
+ }
+
+ @Override
+ public Optional<ServerAddress> resolveById(ServerType server, EntityId entityId) {
+ return resolveByUuid(circles[server.ordinal()], entityId.getId());
+ }
+
+ private Optional<ServerAddress> resolveByUuid(ConsistentHashCircle circle, UUID uuid) {
Assert.notNull(uuid);
if (circle.isEmpty()) {
return Optional.empty();
@@ -125,13 +145,13 @@ public class ConsistentClusterRoutingService implements ClusterRoutingService, D
private void addNode(ServerInstance instance) {
for (int i = 0; i < virtualNodesSize; i++) {
- circle.put(hash(instance, i).asLong(), instance);
+ circles[instance.getServerAddress().getServerType().ordinal()].put(hash(instance, i).asLong(), instance);
}
}
private void removeNode(ServerInstance instance) {
for (int i = 0; i < virtualNodesSize; i++) {
- circle.remove(hash(instance, i).asLong());
+ circles[instance.getServerAddress().getServerType().ordinal()].remove(hash(instance, i).asLong());
}
}
@@ -141,7 +161,7 @@ public class ConsistentClusterRoutingService implements ClusterRoutingService, D
private void logCircle() {
log.trace("Consistent Hash Circle Start");
- circle.entrySet().forEach((e) -> log.debug("{} -> {}", e.getKey(), e.getValue().getServerAddress()));
+ Arrays.asList(circles).forEach(ConsistentHashCircle::log);
log.trace("Consistent Hash Circle End");
}
diff --git a/application/src/main/java/org/thingsboard/server/service/cluster/routing/ConsistentHashCircle.java b/application/src/main/java/org/thingsboard/server/service/cluster/routing/ConsistentHashCircle.java
new file mode 100644
index 0000000..a8c8f37
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/service/cluster/routing/ConsistentHashCircle.java
@@ -0,0 +1,63 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.service.cluster.routing;
+
+import lombok.extern.slf4j.Slf4j;
+import org.thingsboard.server.service.cluster.discovery.ServerInstance;
+
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+
+/**
+ * Created by ashvayka on 23.09.18.
+ */
+@Slf4j
+public class ConsistentHashCircle {
+ private final ConcurrentNavigableMap<Long, ServerInstance> circle =
+ new ConcurrentSkipListMap<>();
+
+ public void put(long hash, ServerInstance instance) {
+ circle.put(hash, instance);
+ }
+
+ public void remove(long hash) {
+ circle.remove(hash);
+ }
+
+ public boolean isEmpty() {
+ return circle.isEmpty();
+ }
+
+ public boolean containsKey(Long hash) {
+ return circle.containsKey(hash);
+ }
+
+ public ConcurrentNavigableMap<Long, ServerInstance> tailMap(Long hash) {
+ return circle.tailMap(hash);
+ }
+
+ public Long firstKey() {
+ return circle.firstKey();
+ }
+
+ public ServerInstance get(Long hash) {
+ return circle.get(hash);
+ }
+
+ public void log() {
+ circle.entrySet().forEach((e) -> log.debug("{} -> {}", e.getKey(), e.getValue().getServerAddress()));
+ }
+}
diff --git a/application/src/main/java/org/thingsboard/server/service/cluster/rpc/GrpcSession.java b/application/src/main/java/org/thingsboard/server/service/cluster/rpc/GrpcSession.java
index 7216c43..55cf52c 100644
--- a/application/src/main/java/org/thingsboard/server/service/cluster/rpc/GrpcSession.java
+++ b/application/src/main/java/org/thingsboard/server/service/cluster/rpc/GrpcSession.java
@@ -19,6 +19,7 @@ import io.grpc.stub.StreamObserver;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.common.msg.cluster.ServerAddress;
+import org.thingsboard.server.common.msg.cluster.ServerType;
import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
import java.io.Closeable;
@@ -61,8 +62,8 @@ public final class GrpcSession implements Closeable {
public void onNext(ClusterAPIProtos.ClusterMessage clusterMessage) {
if (!connected && clusterMessage.getMessageType() == ClusterAPIProtos.MessageType.CONNECT_RPC_MESSAGE) {
connected = true;
- ServerAddress rpcAddress = new ServerAddress(clusterMessage.getServerAddress().getHost(), clusterMessage.getServerAddress().getPort());
- remoteServer = new ServerAddress(rpcAddress.getHost(), rpcAddress.getPort());
+ ServerAddress rpcAddress = new ServerAddress(clusterMessage.getServerAddress().getHost(), clusterMessage.getServerAddress().getPort(), ServerType.CORE);
+ remoteServer = new ServerAddress(rpcAddress.getHost(), rpcAddress.getPort(), ServerType.CORE);
listener.onConnected(GrpcSession.this);
}
if (connected) {
diff --git a/application/src/main/java/org/thingsboard/server/service/script/RuleNodeJsScriptEngine.java b/application/src/main/java/org/thingsboard/server/service/script/RuleNodeJsScriptEngine.java
index 767dc05..c001a1a 100644
--- a/application/src/main/java/org/thingsboard/server/service/script/RuleNodeJsScriptEngine.java
+++ b/application/src/main/java/org/thingsboard/server/service/script/RuleNodeJsScriptEngine.java
@@ -37,12 +37,12 @@ import java.util.concurrent.ExecutionException;
public class RuleNodeJsScriptEngine implements org.thingsboard.rule.engine.api.ScriptEngine {
private static final ObjectMapper mapper = new ObjectMapper();
- private final JsSandboxService sandboxService;
+ private final JsInvokeService sandboxService;
private final UUID scriptId;
private final EntityId entityId;
- public RuleNodeJsScriptEngine(JsSandboxService sandboxService, EntityId entityId, String script, String... argNames) {
+ public RuleNodeJsScriptEngine(JsInvokeService sandboxService, EntityId entityId, String script, String... argNames) {
this.sandboxService = sandboxService;
this.entityId = entityId;
try {
diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java
index 548c417..5c3b88a 100644
--- a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java
+++ b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java
@@ -232,7 +232,7 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio
new SubscriptionState(proto.getSessionId(), proto.getSubscriptionId(),
EntityIdFactory.getByTypeAndId(proto.getEntityType(), proto.getEntityId()),
TelemetryFeature.valueOf(proto.getType()), proto.getAllKeys(), statesMap, proto.getScope()),
- false, new ServerAddress(serverAddress.getHost(), serverAddress.getPort()));
+ false, new ServerAddress(serverAddress.getHost(), serverAddress.getPort(), serverAddress.getServerType()));
addRemoteWsSubscription(serverAddress, proto.getSessionId(), subscription);
}
application/src/main/proto/jsinvoke.proto 37(+37 -0)
diff --git a/application/src/main/proto/jsinvoke.proto b/application/src/main/proto/jsinvoke.proto
new file mode 100644
index 0000000..c465309
--- /dev/null
+++ b/application/src/main/proto/jsinvoke.proto
@@ -0,0 +1,37 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+syntax = "proto3";
+package cluster;
+
+option java_package = "org.thingsboard.server.gen.js";
+option java_outer_classname = "JsInvokeProtos";
+
+service JsInvokeRpcService {
+ rpc handleMsgs(stream JsInvokeRequest) returns (stream JsInvokeResponse) {}
+}
+
+message JsInvokeRequest {
+ string scriptId = 1;
+ string scriptBody = 2;
+ repeated string args = 3;
+}
+
+message JsInvokeResponse {
+ string result = 1;
+ string errorName = 2;
+ string errorDetails = 3;
+}
+
diff --git a/application/src/test/java/org/thingsboard/server/service/script/RuleNodeJsScriptEngineTest.java b/application/src/test/java/org/thingsboard/server/service/script/RuleNodeJsScriptEngineTest.java
index 88961dc..730121f 100644
--- a/application/src/test/java/org/thingsboard/server/service/script/RuleNodeJsScriptEngineTest.java
+++ b/application/src/test/java/org/thingsboard/server/service/script/RuleNodeJsScriptEngineTest.java
@@ -31,7 +31,6 @@ import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.*;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.Assert.*;
@@ -39,13 +38,13 @@ import static org.junit.Assert.*;
public class RuleNodeJsScriptEngineTest {
private ScriptEngine scriptEngine;
- private TestNashornJsSandboxService jsSandboxService;
+ private TestNashornJsInvokeService jsSandboxService;
private EntityId ruleNodeId = new RuleNodeId(UUIDs.timeBased());
@Before
public void beforeTest() throws Exception {
- jsSandboxService = new TestNashornJsSandboxService(false, 1, 100, 3);
+ jsSandboxService = new TestNashornJsInvokeService(false, 1, 100, 3);
}
@After
diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/cluster/ServerAddress.java b/common/message/src/main/java/org/thingsboard/server/common/msg/cluster/ServerAddress.java
index 4b65d6f..60c9d12 100644
--- a/common/message/src/main/java/org/thingsboard/server/common/msg/cluster/ServerAddress.java
+++ b/common/message/src/main/java/org/thingsboard/server/common/msg/cluster/ServerAddress.java
@@ -29,6 +29,7 @@ public class ServerAddress implements Comparable<ServerAddress>, Serializable {
private final String host;
private final int port;
+ private final ServerType serverType;
@Override
public int compareTo(ServerAddress o) {