thingsboard-aplcache

Details

diff --git a/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ZkDiscoveryService.java b/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ZkDiscoveryService.java
index bb92642..997058c 100644
--- a/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ZkDiscoveryService.java
+++ b/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ZkDiscoveryService.java
@@ -30,6 +30,7 @@ import org.apache.curator.framework.state.ConnectionStateListener;
 import org.apache.curator.retry.RetryForever;
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
@@ -51,6 +52,8 @@ import java.util.List;
 import java.util.NoSuchElementException;
 import java.util.stream.Collectors;
 
+import static org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent.Type.CHILD_REMOVED;
+
 /**
  * @author Andrew Shvayka
  */
@@ -128,19 +131,42 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
     }
 
     @Override
-    public void publishCurrentServer() {
+    public synchronized void publishCurrentServer() {
+        ServerInstance self = this.serverInstance.getSelf();
+        if (currentServerExists()) {
+            log.info("[{}:{}] ZK node for current instance already exists, NOT created new one: {}", self.getHost(), self.getPort(), nodePath);
+        } else {
+            try {
+                log.info("[{}:{}] Creating ZK node for current instance", self.getHost(), self.getPort());
+                nodePath = client.create()
+                        .creatingParentsIfNeeded()
+                        .withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(zkNodesDir + "/", SerializationUtils.serialize(self.getServerAddress()));
+                log.info("[{}:{}] Created ZK node for current instance: {}", self.getHost(), self.getPort(), nodePath);
+                client.getConnectionStateListenable().addListener(checkReconnect(self));
+            } catch (Exception e) {
+                log.error("Failed to create ZK node", e);
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    private boolean currentServerExists() {
+        if (nodePath == null) {
+            return false;
+        }
         try {
             ServerInstance self = this.serverInstance.getSelf();
-            log.info("[{}:{}] Creating ZK node for current instance", self.getHost(), self.getPort());
-            nodePath = client.create()
-                    .creatingParentsIfNeeded()
-                    .withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(zkNodesDir + "/", SerializationUtils.serialize(self.getServerAddress()));
-            log.info("[{}:{}] Created ZK node for current instance: {}", self.getHost(), self.getPort(), nodePath);
-            client.getConnectionStateListenable().addListener(checkReconnect(self));
+            ServerAddress registeredServerAdress = null;
+            registeredServerAdress = SerializationUtils.deserialize(client.getData().forPath(nodePath));
+            if (self.getServerAddress() != null && self.getServerAddress().equals(registeredServerAdress)) {
+                return true;
+            }
+        } catch (KeeperException.NoNodeException e) {
+            log.info("ZK node does not exist: {}", nodePath);
         } catch (Exception e) {
-            log.error("Failed to create ZK node", e);
-            throw new RuntimeException(e);
+            log.error("Couldn't check if ZK node exists", e);
         }
+        return false;
     }
 
     private ConnectionStateListener checkReconnect(ServerInstance self) {
@@ -218,6 +244,10 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
             log.debug("Ignoring {} due to empty child's data", pathChildrenCacheEvent);
             return;
         } else if (nodePath != null && nodePath.equals(data.getPath())) {
+            if (pathChildrenCacheEvent.getType() == CHILD_REMOVED) {
+                log.info("ZK node for current instance is somehow deleted.");
+                publishCurrentServer();
+            }
             log.debug("Ignoring event about current server {}", pathChildrenCacheEvent);
             return;
         }