thingsboard-memoizeit

Fixed timeout logic

9/27/2018 8:50:24 AM

Details

diff --git a/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java b/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java
index ffd06c0..2952cfa 100644
--- a/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java
+++ b/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java
@@ -96,6 +96,7 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService {
         builder.maxRequestTimeout(maxRequestsTimeout);
         builder.pollInterval(responsePollDuration);
         kafkaTemplate = builder.build();
+        kafkaTemplate.init();
     }
 
     @PreDestroy
diff --git a/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaRequestTemplate.java b/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaRequestTemplate.java
index dd1a47a..a851e6b 100644
--- a/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaRequestTemplate.java
+++ b/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaRequestTemplate.java
@@ -48,6 +48,7 @@ public class TbKafkaRequestTemplate<Request, Response> {
     private final TBKafkaProducerTemplate<Request> requestTemplate;
     private final TBKafkaConsumerTemplate<Response> responseTemplate;
     private final ConcurrentMap<UUID, ResponseMetaData<Response>> pendingRequests;
+    private final boolean internalExecutor;
     private final ExecutorService executor;
     private final long maxRequestTimeout;
     private final long maxPendingRequests;
@@ -69,8 +70,10 @@ public class TbKafkaRequestTemplate<Request, Response> {
         this.maxPendingRequests = maxPendingRequests;
         this.pollInterval = pollInterval;
         if (executor != null) {
+            internalExecutor = false;
             this.executor = executor;
         } else {
+            internalExecutor = true;
             this.executor = Executors.newSingleThreadExecutor();
         }
     }
@@ -126,6 +129,9 @@ public class TbKafkaRequestTemplate<Request, Response> {
 
     public void stop() {
         stopped = true;
+        if (internalExecutor) {
+            executor.shutdownNow();
+        }
     }
 
     public ListenableFuture<Response> post(String key, Request request) {