thingsboard-memoizeit

Fixed consumer id uniquness

10/18/2018 10:23:35 AM

Details

diff --git a/application/src/main/java/org/thingsboard/server/service/cluster/discovery/DiscoveryService.java b/application/src/main/java/org/thingsboard/server/service/cluster/discovery/DiscoveryService.java
index 04c4ced..f9caafa 100644
--- a/application/src/main/java/org/thingsboard/server/service/cluster/discovery/DiscoveryService.java
+++ b/application/src/main/java/org/thingsboard/server/service/cluster/discovery/DiscoveryService.java
@@ -26,8 +26,6 @@ public interface DiscoveryService {
 
     void unpublishCurrentServer();
 
-    String getNodeId();
-
     ServerInstance getCurrentServer();
 
     List<ServerInstance> getOtherServers();
diff --git a/application/src/main/java/org/thingsboard/server/service/cluster/discovery/DummyDiscoveryService.java b/application/src/main/java/org/thingsboard/server/service/cluster/discovery/DummyDiscoveryService.java
index 859a7af..358c847 100644
--- a/application/src/main/java/org/thingsboard/server/service/cluster/discovery/DummyDiscoveryService.java
+++ b/application/src/main/java/org/thingsboard/server/service/cluster/discovery/DummyDiscoveryService.java
@@ -38,17 +38,9 @@ public class DummyDiscoveryService implements DiscoveryService {
     @Autowired
     private ServerInstanceService serverInstance;
 
-    private String nodeId;
-
     @PostConstruct
     public void init() {
         log.info("Initializing...");
-        this.nodeId = RandomStringUtils.randomAlphabetic(10);
-    }
-
-    @Override
-    public String getNodeId() {
-        return nodeId;
     }
 
     @Override
diff --git a/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ZkDiscoveryService.java b/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ZkDiscoveryService.java
index b4829f2..bb92642 100644
--- a/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ZkDiscoveryService.java
+++ b/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ZkDiscoveryService.java
@@ -94,13 +94,10 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
     private CuratorFramework client;
     private PathChildrenCache cache;
     private String nodePath;
-    //TODO: make persistent?
-    private String nodeId;
 
     @PostConstruct
     public void init() {
         log.info("Initializing...");
-        this.nodeId = RandomStringUtils.randomAlphabetic(10);
         Assert.hasLength(zkUrl, MiscUtils.missingProperty("zk.url"));
         Assert.notNull(zkRetryInterval, MiscUtils.missingProperty("zk.retry_interval_ms"));
         Assert.notNull(zkConnectionTimeout, MiscUtils.missingProperty("zk.connection_timeout_ms"));
@@ -184,11 +181,6 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
     }
 
     @Override
-    public String getNodeId() {
-        return nodeId;
-    }
-
-    @Override
     public ServerInstance getCurrentServer() {
         return serverInstance.getSelf();
     }
diff --git a/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java b/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java
index b334ff1..db31bda 100644
--- a/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java
+++ b/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java
@@ -28,6 +28,7 @@ import org.thingsboard.server.kafka.TBKafkaConsumerTemplate;
 import org.thingsboard.server.kafka.TBKafkaProducerTemplate;
 import org.thingsboard.server.kafka.TbKafkaRequestTemplate;
 import org.thingsboard.server.kafka.TbKafkaSettings;
+import org.thingsboard.server.kafka.TbNodeIdProvider;
 import org.thingsboard.server.service.cluster.discovery.DiscoveryService;
 
 import javax.annotation.PostConstruct;
@@ -42,7 +43,7 @@ import java.util.concurrent.ConcurrentHashMap;
 public class RemoteJsInvokeService extends AbstractJsInvokeService {
 
     @Autowired
-    private DiscoveryService discoveryService;
+    private TbNodeIdProvider nodeIdProvider;
 
     @Autowired
     private TbKafkaSettings kafkaSettings;
@@ -97,8 +98,8 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService {
 
         TBKafkaConsumerTemplate.TBKafkaConsumerTemplateBuilder<JsInvokeProtos.RemoteJsResponse> responseBuilder = TBKafkaConsumerTemplate.builder();
         responseBuilder.settings(kafkaSettings);
-        responseBuilder.topic(responseTopicPrefix + "." + discoveryService.getNodeId());
-        responseBuilder.clientId(discoveryService.getNodeId());
+        responseBuilder.topic(responseTopicPrefix + "." + nodeIdProvider.getNodeId());
+        responseBuilder.clientId("js-" + nodeIdProvider.getNodeId());
         responseBuilder.groupId("rule-engine-node");
         responseBuilder.autoCommit(true);
         responseBuilder.autoCommitIntervalMs(autoCommitInterval);
diff --git a/application/src/main/java/org/thingsboard/server/service/transport/RemoteRuleEngineTransportService.java b/application/src/main/java/org/thingsboard/server/service/transport/RemoteRuleEngineTransportService.java
index 4f8860d..38db1a4 100644
--- a/application/src/main/java/org/thingsboard/server/service/transport/RemoteRuleEngineTransportService.java
+++ b/application/src/main/java/org/thingsboard/server/service/transport/RemoteRuleEngineTransportService.java
@@ -35,6 +35,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.TransportToDeviceAct
 import org.thingsboard.server.kafka.TBKafkaConsumerTemplate;
 import org.thingsboard.server.kafka.TBKafkaProducerTemplate;
 import org.thingsboard.server.kafka.TbKafkaSettings;
+import org.thingsboard.server.kafka.TbNodeIdProvider;
 import org.thingsboard.server.service.cluster.discovery.DiscoveryService;
 import org.thingsboard.server.service.cluster.routing.ClusterRoutingService;
 import org.thingsboard.server.service.cluster.rpc.ClusterRpcService;
@@ -71,7 +72,7 @@ public class RemoteRuleEngineTransportService implements RuleEngineTransportServ
     private TbKafkaSettings kafkaSettings;
 
     @Autowired
-    private DiscoveryService discoveryService;
+    private TbNodeIdProvider nodeIdProvider;
 
     @Autowired
     private ActorSystemContext actorContext;
@@ -104,7 +105,7 @@ public class RemoteRuleEngineTransportService implements RuleEngineTransportServ
         TBKafkaConsumerTemplate.TBKafkaConsumerTemplateBuilder<ToRuleEngineMsg> ruleEngineConsumerBuilder = TBKafkaConsumerTemplate.builder();
         ruleEngineConsumerBuilder.settings(kafkaSettings);
         ruleEngineConsumerBuilder.topic(ruleEngineTopic);
-        ruleEngineConsumerBuilder.clientId(discoveryService.getNodeId());
+        ruleEngineConsumerBuilder.clientId("transport-" + nodeIdProvider.getNodeId());
         ruleEngineConsumerBuilder.groupId("tb-node");
         ruleEngineConsumerBuilder.autoCommit(true);
         ruleEngineConsumerBuilder.autoCommitIntervalMs(autoCommitInterval);