thingsboard-aplcache

Details

diff --git a/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ZkDiscoveryService.java b/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ZkDiscoveryService.java
index 997058c..f5321e5 100644
--- a/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ZkDiscoveryService.java
+++ b/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ZkDiscoveryService.java
@@ -21,6 +21,7 @@ import org.apache.commons.lang3.SerializationException;
 import org.apache.commons.lang3.SerializationUtils;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.imps.CuratorFrameworkState;
 import org.apache.curator.framework.recipes.cache.ChildData;
 import org.apache.curator.framework.recipes.cache.PathChildrenCache;
 import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
@@ -98,6 +99,8 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
     private PathChildrenCache cache;
     private String nodePath;
 
+    private volatile boolean stopped = false;
+
     @PostConstruct
     public void init() {
         log.info("Initializing...");
@@ -118,6 +121,7 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
             cache.start();
         } catch (Exception e) {
             log.error("Failed to connect to ZK: {}", e.getMessage(), e);
+            CloseableUtils.closeQuietly(cache);
             CloseableUtils.closeQuietly(client);
             throw new RuntimeException(e);
         }
@@ -125,7 +129,9 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
 
     @PreDestroy
     public void destroy() {
+        stopped = true;
         unpublishCurrentServer();
+        CloseableUtils.closeQuietly(cache);
         CloseableUtils.closeQuietly(client);
         log.info("Stopped discovery service");
     }
@@ -228,6 +234,14 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
 
     @Override
     public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) {
+        if (stopped) {
+            log.debug("Ignoring application ready event. Service is stopped.");
+            return;
+        }
+        if (client.getState() != CuratorFrameworkState.STARTED) {
+            log.debug("Ignoring application ready event, ZK client is not started, ZK client state [{}]", client.getState());
+            return;
+        }
         publishCurrentServer();
         getOtherServers().forEach(
                 server -> log.info("Found active server: [{}:{}]", server.getHost(), server.getPort())
@@ -236,6 +250,14 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
 
     @Override
     public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
+        if (stopped) {
+            log.debug("Ignoring {}. Service is stopped.", pathChildrenCacheEvent);
+            return;
+        }
+        if (client.getState() != CuratorFrameworkState.STARTED) {
+            log.debug("Ignoring {}, ZK client is not started, ZK client state [{}]", pathChildrenCacheEvent, client.getState());
+            return;
+        }
         ChildData data = pathChildrenCacheEvent.getData();
         if (data == null) {
             log.debug("Ignoring {} due to empty child data", pathChildrenCacheEvent);
diff --git a/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaResponseTemplate.java b/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaResponseTemplate.java
index 4c23ac2..e10cd3c 100644
--- a/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaResponseTemplate.java
+++ b/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaResponseTemplate.java
@@ -18,6 +18,7 @@ package org.thingsboard.server.kafka;
 import lombok.Builder;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.errors.InterruptException;
 import org.apache.kafka.common.header.Header;
 import org.apache.kafka.common.header.internals.RecordHeader;
 
@@ -127,6 +128,10 @@ public class TbKafkaResponseTemplate<Request, Response> extends AbstractTbKafkaT
                             log.warn("[{}] Failed to process the request: {}", requestId, request, e);
                         }
                     });
+                } catch (InterruptException ie) {
+                    if (!stopped) {
+                        log.warn("Fetching data from kafka was interrupted.", ie);
+                    }
                 } catch (Throwable e) {
                     log.warn("Failed to obtain messages from queue.", e);
                     try {
diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml
index 0a09d8a..b9d8fd1 100644
--- a/docker/docker-compose.yml
+++ b/docker/docker-compose.yml
@@ -64,6 +64,7 @@ services:
     depends_on:
       - kafka
       - redis
+      - tb-js-executor
   tb2:
     restart: always
     image: "${DOCKER_REPO}/${TB_NODE_DOCKER_NAME}:${TB_VERSION}"
@@ -84,6 +85,7 @@ services:
     depends_on:
       - kafka
       - redis
+      - tb-js-executor
   tb-mqtt-transport1:
     restart: always
     image: "${DOCKER_REPO}/${MQTT_TRANSPORT_DOCKER_NAME}:${TB_VERSION}"