ZkDiscoveryService.java

312 lines | 12.403 kB Blame History Raw Download
/**
 * Copyright © 2016-2018 The Thingsboard Authors
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.thingsboard.server.service.cluster.discovery;

import java.util.ArrayList;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.SerializationException;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.imps.CuratorFrameworkState;
//import org.apache.curator.framework.recipes.cache.ChildData;
//import org.apache.curator.framework.recipes.cache.PathChildrenCache;
//import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
//import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.retry.RetryForever;
import org.apache.curator.utils.CloseableUtils;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.context.annotation.Lazy;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Service;
import org.springframework.util.Assert;
import org.thingsboard.server.actors.service.ActorService;
import org.thingsboard.server.common.msg.cluster.ServerAddress;
import org.thingsboard.server.service.cluster.routing.ClusterRoutingService;
import org.thingsboard.server.service.state.DeviceStateService;
import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService;
import org.thingsboard.server.utils.MiscUtils;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.stream.Collectors;

//import static org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent.Type.CHILD_REMOVED;

/**
 * @author Andrew Shvayka
 */
@Service
@ConditionalOnProperty(prefix = "zk", value = "enabled", havingValue = "true", matchIfMissing = false)
@Slf4j
//public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheListener {
public class ZkDiscoveryService implements DiscoveryService {

    @Value("${zk.url}")
    private String zkUrl;
    @Value("${zk.retry_interval_ms}")
    private Integer zkRetryInterval;
    @Value("${zk.connection_timeout_ms}")
    private Integer zkConnectionTimeout;
    @Value("${zk.session_timeout_ms}")
    private Integer zkSessionTimeout;
    @Value("${zk.zk_dir}")
    private String zkDir;

    private String zkNodesDir;

    @Autowired
    private ServerInstanceService serverInstance;

    @Autowired
    @Lazy
    private TelemetrySubscriptionService tsSubService;

    @Autowired
    @Lazy
    private DeviceStateService deviceStateService;

    @Autowired
    @Lazy
    private ActorService actorService;

    @Autowired
    @Lazy
    private ClusterRoutingService routingService;

    private CuratorFramework client;
//    private PathChildrenCache cache;
    private String nodePath;

    private volatile boolean stopped = false;

    @PostConstruct
    public void init() {
        log.info("Initializing...");
        Assert.hasLength(zkUrl, MiscUtils.missingProperty("zk.url"));
        Assert.notNull(zkRetryInterval, MiscUtils.missingProperty("zk.retry_interval_ms"));
        Assert.notNull(zkConnectionTimeout, MiscUtils.missingProperty("zk.connection_timeout_ms"));
        Assert.notNull(zkSessionTimeout, MiscUtils.missingProperty("zk.session_timeout_ms"));

        log.info("Initializing discovery service using ZK connect string: {}", zkUrl);

        zkNodesDir = zkDir + "/nodes";
        try {
            client = CuratorFrameworkFactory.newClient(zkUrl, zkSessionTimeout, zkConnectionTimeout, new RetryForever(zkRetryInterval));
            client.start();
            client.blockUntilConnected();
//            cache = new PathChildrenCache(client, zkNodesDir, true);
//            cache.getListenable().addListener(this);
//            cache.start();
        } catch (Exception e) {
            log.error("Failed to connect to ZK: {}", e.getMessage(), e);
//            CloseableUtils.closeQuietly(cache);
            CloseableUtils.closeQuietly(client);
            throw new RuntimeException(e);
        }
    }

    @PreDestroy
    public void destroy() {
        stopped = true;
        unpublishCurrentServer();
//        CloseableUtils.closeQuietly(cache);
        CloseableUtils.closeQuietly(client);
        log.info("Stopped discovery service");
    }

    @Override
    public synchronized void publishCurrentServer() {
        ServerInstance self = this.serverInstance.getSelf();
        if (currentServerExists()) {
            log.info("[{}:{}] ZK node for current instance already exists, NOT created new one: {}", self.getHost(), self.getPort(), nodePath);
        } else {
            try {
                log.info("[{}:{}] Creating ZK node for current instance", self.getHost(), self.getPort());
                nodePath = client.create()
                        .creatingParentsIfNeeded()
                        .withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(zkNodesDir + "/", SerializationUtils.serialize(self.getServerAddress()));
                log.info("[{}:{}] Created ZK node for current instance: {}", self.getHost(), self.getPort(), nodePath);
                client.getConnectionStateListenable().addListener(checkReconnect(self));
            } catch (Exception e) {
                log.error("Failed to create ZK node", e);
                throw new RuntimeException(e);
            }
        }
    }

    private boolean currentServerExists() {
        if (nodePath == null) {
            return false;
        }
        try {
            ServerInstance self = this.serverInstance.getSelf();
            ServerAddress registeredServerAdress = null;
            registeredServerAdress = SerializationUtils.deserialize(client.getData().forPath(nodePath));
            if (self.getServerAddress() != null && self.getServerAddress().equals(registeredServerAdress)) {
                return true;
            }
        } catch (KeeperException.NoNodeException e) {
            log.info("ZK node does not exist: {}", nodePath);
        } catch (Exception e) {
            log.error("Couldn't check if ZK node exists", e);
        }
        return false;
    }

    private ConnectionStateListener checkReconnect(ServerInstance self) {
        return (client, newState) -> {
            log.info("[{}:{}] ZK state changed: {}", self.getHost(), self.getPort(), newState);
            if (newState == ConnectionState.LOST) {
                reconnect();
            }
        };
    }

    private boolean reconnectInProgress = false;

    private synchronized void reconnect() {
        if (!reconnectInProgress) {
            reconnectInProgress = true;
            try {
                client.blockUntilConnected();
                publishCurrentServer();
            } catch (InterruptedException e) {
                log.error("Failed to reconnect to ZK: {}", e.getMessage(), e);
            } finally {
                reconnectInProgress = false;
            }
        }
    }

    @Override
    public void unpublishCurrentServer() {
        try {
            if (nodePath != null) {
                client.delete().forPath(nodePath);
            }
        } catch (Exception e) {
            log.error("Failed to delete ZK node {}", nodePath, e);
            throw new RuntimeException(e);
        }
    }

    @Override
    public ServerInstance getCurrentServer() {
        return serverInstance.getSelf();
    }

    @Override
    public List<ServerInstance> getOtherServers() {
        return new ArrayList<ServerInstance>();
//        return cache.getCurrentData().stream()
//                .filter(cd -> !cd.getPath().equals(nodePath))
//                .map(cd -> {
//                    try {
//                        return new ServerInstance((ServerAddress) SerializationUtils.deserialize(cd.getData()));
//                    } catch (NoSuchElementException e) {
//                        log.error("Failed to decode ZK node", e);
//                        throw new RuntimeException(e);
//                    }
//                })
//                .collect(Collectors.toList());
    }

    @EventListener(ApplicationReadyEvent.class)
    public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) {
        log.info("Received application ready event. Starting current ZK node.");
        if (stopped) {
            log.debug("Ignoring application ready event. Service is stopped.");
            return;
        }
        if (client.getState() != CuratorFrameworkState.STARTED) {
            log.debug("Ignoring application ready event, ZK client is not started, ZK client state [{}]", client.getState());
            return;
        }
        publishCurrentServer();
        getOtherServers().forEach(
                server -> log.info("Found active server: [{}:{}]", server.getHost(), server.getPort())
        );
    }

//    @Override
//    public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
//        if (stopped) {
//            log.debug("Ignoring {}. Service is stopped.", pathChildrenCacheEvent);
//            return;
//        }
//        if (client.getState() != CuratorFrameworkState.STARTED) {
//            log.debug("Ignoring {}, ZK client is not started, ZK client state [{}]", pathChildrenCacheEvent, client.getState());
//            return;
//        }
//        ChildData data = pathChildrenCacheEvent.getData();
//        if (data == null) {
//            log.debug("Ignoring {} due to empty child data", pathChildrenCacheEvent);
//            return;
//        } else if (data.getData() == null) {
//            log.debug("Ignoring {} due to empty child's data", pathChildrenCacheEvent);
//            return;
//        } else if (nodePath != null && nodePath.equals(data.getPath())) {
//            if (pathChildrenCacheEvent.getType() == CHILD_REMOVED) {
//                log.info("ZK node for current instance is somehow deleted.");
//                publishCurrentServer();
//            }
//            log.debug("Ignoring event about current server {}", pathChildrenCacheEvent);
//            return;
//        }
//        ServerInstance instance;
//        try {
//            ServerAddress serverAddress = SerializationUtils.deserialize(data.getData());
//            instance = new ServerInstance(serverAddress);
//        } catch (SerializationException e) {
//            log.error("Failed to decode server instance for node {}", data.getPath(), e);
//            throw e;
//        }
//        log.info("Processing [{}] event for [{}:{}]", pathChildrenCacheEvent.getType(), instance.getHost(), instance.getPort());
//        switch (pathChildrenCacheEvent.getType()) {
//            case CHILD_ADDED:
//                routingService.onServerAdded(instance);
//                tsSubService.onClusterUpdate();
//                deviceStateService.onClusterUpdate();
//                actorService.onServerAdded(instance);
//                break;
//            case CHILD_UPDATED:
//                routingService.onServerUpdated(instance);
//                actorService.onServerUpdated(instance);
//                break;
//            case CHILD_REMOVED:
//                routingService.onServerRemoved(instance);
//                tsSubService.onClusterUpdate();
//                deviceStateService.onClusterUpdate();
//                actorService.onServerRemoved(instance);
//                break;
//            default:
//                break;
//        }
//    }
}