thingsboard-aplcache
Merge pull request #1191 from ymucahit/zookeeper-reconnect-bug-fix Zookeeper …
10/31/2018 8:00:14 AM
Changes
Details
diff --git a/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ZkDiscoveryService.java b/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ZkDiscoveryService.java
index bb92642..997058c 100644
--- a/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ZkDiscoveryService.java
+++ b/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ZkDiscoveryService.java
@@ -30,6 +30,7 @@ import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.retry.RetryForever;
import org.apache.curator.utils.CloseableUtils;
import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
@@ -51,6 +52,8 @@ import java.util.List;
import java.util.NoSuchElementException;
import java.util.stream.Collectors;
+import static org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent.Type.CHILD_REMOVED;
+
/**
* @author Andrew Shvayka
*/
@@ -128,19 +131,42 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
}
@Override
- public void publishCurrentServer() {
+ public synchronized void publishCurrentServer() {
+ ServerInstance self = this.serverInstance.getSelf();
+ if (currentServerExists()) {
+ log.info("[{}:{}] ZK node for current instance already exists, NOT created new one: {}", self.getHost(), self.getPort(), nodePath);
+ } else {
+ try {
+ log.info("[{}:{}] Creating ZK node for current instance", self.getHost(), self.getPort());
+ nodePath = client.create()
+ .creatingParentsIfNeeded()
+ .withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(zkNodesDir + "/", SerializationUtils.serialize(self.getServerAddress()));
+ log.info("[{}:{}] Created ZK node for current instance: {}", self.getHost(), self.getPort(), nodePath);
+ client.getConnectionStateListenable().addListener(checkReconnect(self));
+ } catch (Exception e) {
+ log.error("Failed to create ZK node", e);
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ private boolean currentServerExists() {
+ if (nodePath == null) {
+ return false;
+ }
try {
ServerInstance self = this.serverInstance.getSelf();
- log.info("[{}:{}] Creating ZK node for current instance", self.getHost(), self.getPort());
- nodePath = client.create()
- .creatingParentsIfNeeded()
- .withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(zkNodesDir + "/", SerializationUtils.serialize(self.getServerAddress()));
- log.info("[{}:{}] Created ZK node for current instance: {}", self.getHost(), self.getPort(), nodePath);
- client.getConnectionStateListenable().addListener(checkReconnect(self));
+ ServerAddress registeredServerAdress = null;
+ registeredServerAdress = SerializationUtils.deserialize(client.getData().forPath(nodePath));
+ if (self.getServerAddress() != null && self.getServerAddress().equals(registeredServerAdress)) {
+ return true;
+ }
+ } catch (KeeperException.NoNodeException e) {
+ log.info("ZK node does not exist: {}", nodePath);
} catch (Exception e) {
- log.error("Failed to create ZK node", e);
- throw new RuntimeException(e);
+ log.error("Couldn't check if ZK node exists", e);
}
+ return false;
}
private ConnectionStateListener checkReconnect(ServerInstance self) {
@@ -218,6 +244,10 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
log.debug("Ignoring {} due to empty child's data", pathChildrenCacheEvent);
return;
} else if (nodePath != null && nodePath.equals(data.getPath())) {
+ if (pathChildrenCacheEvent.getType() == CHILD_REMOVED) {
+ log.info("ZK node for current instance is somehow deleted.");
+ publishCurrentServer();
+ }
log.debug("Ignoring event about current server {}", pathChildrenCacheEvent);
return;
}