diff --git a/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java b/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java
index ffd06c0..2952cfa 100644
--- a/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java
+++ b/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java
@@ -96,6 +96,7 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService {
builder.maxRequestTimeout(maxRequestsTimeout);
builder.pollInterval(responsePollDuration);
kafkaTemplate = builder.build();
+ kafkaTemplate.init();
}
@PreDestroy
diff --git a/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaRequestTemplate.java b/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaRequestTemplate.java
index dd1a47a..a851e6b 100644
--- a/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaRequestTemplate.java
+++ b/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaRequestTemplate.java
@@ -48,6 +48,7 @@ public class TbKafkaRequestTemplate<Request, Response> {
private final TBKafkaProducerTemplate<Request> requestTemplate;
private final TBKafkaConsumerTemplate<Response> responseTemplate;
private final ConcurrentMap<UUID, ResponseMetaData<Response>> pendingRequests;
+ private final boolean internalExecutor;
private final ExecutorService executor;
private final long maxRequestTimeout;
private final long maxPendingRequests;
@@ -69,8 +70,10 @@ public class TbKafkaRequestTemplate<Request, Response> {
this.maxPendingRequests = maxPendingRequests;
this.pollInterval = pollInterval;
if (executor != null) {
+ internalExecutor = false;
this.executor = executor;
} else {
+ internalExecutor = true;
this.executor = Executors.newSingleThreadExecutor();
}
}
@@ -126,6 +129,9 @@ public class TbKafkaRequestTemplate<Request, Response> {
public void stop() {
stopped = true;
+ if (internalExecutor) {
+ executor.shutdownNow();
+ }
}
public ListenableFuture<Response> post(String key, Request request) {