diff --git a/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ZkDiscoveryService.java b/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ZkDiscoveryService.java
index c3ffbab..6f51cee 100644
--- a/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ZkDiscoveryService.java
+++ b/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ZkDiscoveryService.java
@@ -24,6 +24,8 @@ import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.retry.RetryForever;
import org.apache.curator.utils.CloseableUtils;
import org.apache.zookeeper.CreateMode;
@@ -127,12 +129,38 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
.creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(zkNodesDir + "/", SerializationUtils.serialize(self.getServerAddress()));
log.info("[{}:{}] Created ZK node for current instance: {}", self.getHost(), self.getPort(), nodePath);
+ client.getConnectionStateListenable().addListener(checkReconnect(self));
} catch (Exception e) {
log.error("Failed to create ZK node", e);
throw new RuntimeException(e);
}
}
+ private ConnectionStateListener checkReconnect(ServerInstance self) {
+ return (client, newState) -> {
+ log.info("[{}:{}] ZK state changed: {}", self.getHost(), self.getPort(), newState);
+ if (newState == ConnectionState.LOST) {
+ reconnect();
+ }
+ };
+ }
+
+ private boolean reconnectInProgress = false;
+
+ private synchronized void reconnect() {
+ if (!reconnectInProgress) {
+ reconnectInProgress = true;
+ try {
+ client.blockUntilConnected();
+ publishCurrentServer();
+ } catch (InterruptedException e) {
+ log.error("Failed to reconnect to ZK: {}", e.getMessage(), e);
+ } finally {
+ reconnectInProgress = false;
+ }
+ }
+ }
+
@Override
public void unpublishCurrentServer() {
try {
@@ -156,7 +184,7 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
.filter(cd -> !cd.getPath().equals(nodePath))
.map(cd -> {
try {
- return new ServerInstance( (ServerAddress) SerializationUtils.deserialize(cd.getData()));
+ return new ServerInstance((ServerAddress) SerializationUtils.deserialize(cd.getData()));
} catch (NoSuchElementException e) {
log.error("Failed to decode ZK node", e);
throw new RuntimeException(e);
@@ -198,7 +226,7 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
}
ServerInstance instance;
try {
- ServerAddress serverAddress = SerializationUtils.deserialize(data.getData());
+ ServerAddress serverAddress = SerializationUtils.deserialize(data.getData());
instance = new ServerInstance(serverAddress);
} catch (SerializationException e) {
log.error("Failed to decode server instance for node {}", data.getPath(), e);