thingsboard-memoizeit

Fixed issue with Zookeeper reconnect

9/23/2018 10:22:04 AM

Details

diff --git a/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ZkDiscoveryService.java b/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ZkDiscoveryService.java
index c3ffbab..6f51cee 100644
--- a/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ZkDiscoveryService.java
+++ b/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ZkDiscoveryService.java
@@ -24,6 +24,8 @@ import org.apache.curator.framework.recipes.cache.ChildData;
 import org.apache.curator.framework.recipes.cache.PathChildrenCache;
 import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
 import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
 import org.apache.curator.retry.RetryForever;
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.zookeeper.CreateMode;
@@ -127,12 +129,38 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
                     .creatingParentsIfNeeded()
                     .withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(zkNodesDir + "/", SerializationUtils.serialize(self.getServerAddress()));
             log.info("[{}:{}] Created ZK node for current instance: {}", self.getHost(), self.getPort(), nodePath);
+            client.getConnectionStateListenable().addListener(checkReconnect(self));
         } catch (Exception e) {
             log.error("Failed to create ZK node", e);
             throw new RuntimeException(e);
         }
     }
 
+    private ConnectionStateListener checkReconnect(ServerInstance self) {
+        return (client, newState) -> {
+            log.info("[{}:{}] ZK state changed: {}", self.getHost(), self.getPort(), newState);
+            if (newState == ConnectionState.LOST) {
+                reconnect();
+            }
+        };
+    }
+
+    private boolean reconnectInProgress = false;
+
+    private synchronized void reconnect() {
+        if (!reconnectInProgress) {
+            reconnectInProgress = true;
+            try {
+                client.blockUntilConnected();
+                publishCurrentServer();
+            } catch (InterruptedException e) {
+                log.error("Failed to reconnect to ZK: {}", e.getMessage(), e);
+            } finally {
+                reconnectInProgress = false;
+            }
+        }
+    }
+
     @Override
     public void unpublishCurrentServer() {
         try {
@@ -156,7 +184,7 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
                 .filter(cd -> !cd.getPath().equals(nodePath))
                 .map(cd -> {
                     try {
-                        return new ServerInstance( (ServerAddress) SerializationUtils.deserialize(cd.getData()));
+                        return new ServerInstance((ServerAddress) SerializationUtils.deserialize(cd.getData()));
                     } catch (NoSuchElementException e) {
                         log.error("Failed to decode ZK node", e);
                         throw new RuntimeException(e);
@@ -198,7 +226,7 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
         }
         ServerInstance instance;
         try {
-            ServerAddress serverAddress  = SerializationUtils.deserialize(data.getData());
+            ServerAddress serverAddress = SerializationUtils.deserialize(data.getData());
             instance = new ServerInstance(serverAddress);
         } catch (SerializationException e) {
             log.error("Failed to decode server instance for node {}", data.getPath(), e);