thingsboard-aplcache

Details

diff --git a/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java b/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java
index db31bda..05ed285 100644
--- a/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java
+++ b/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java
@@ -136,6 +136,7 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService {
                 .setCompileRequest(jsRequest)
                 .build();
 
+        log.trace("Post compile request for scriptId [{}]", scriptId);
         ListenableFuture<JsInvokeProtos.RemoteJsResponse> future = kafkaTemplate.post(scriptId.toString(), jsRequestWrapper);
         return Futures.transform(future, response -> {
             JsInvokeProtos.JsCompileResponse compilationResult = response.getCompileResponse();
diff --git a/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaRequestTemplate.java b/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaRequestTemplate.java
index 77ad033..b0aca0f 100644
--- a/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaRequestTemplate.java
+++ b/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaRequestTemplate.java
@@ -92,6 +92,9 @@ public class TbKafkaRequestTemplate<Request, Response> extends AbstractTbKafkaTe
             long nextCleanupMs = 0L;
             while (!stopped) {
                 ConsumerRecords<String, byte[]> responses = responseTemplate.poll(Duration.ofMillis(pollInterval));
+                if (responses.count() > 0) {
+                    log.trace("Polling responses completed, consumer records count [{}]", responses.count());
+                }
                 responses.forEach(response -> {
                     Header requestIdHeader = response.headers().lastHeader(TbKafkaSettings.REQUEST_ID_HEADER);
                     Response decodedResponse = null;
@@ -109,6 +112,7 @@ public class TbKafkaRequestTemplate<Request, Response> extends AbstractTbKafkaTe
                     if (requestId == null) {
                         log.error("[{}] Missing requestId in header and body", response);
                     } else {
+                        log.trace("[{}] Response received", requestId);
                         ResponseMetaData<Response> expectedResponse = pendingRequests.remove(requestId);
                         if (expectedResponse == null) {
                             log.trace("[{}] Invalid or stale request", requestId);
@@ -132,6 +136,7 @@ public class TbKafkaRequestTemplate<Request, Response> extends AbstractTbKafkaTe
                         if (kv.getValue().expTime < tickTs) {
                             ResponseMetaData<Response> staleRequest = pendingRequests.remove(kv.getKey());
                             if (staleRequest != null) {
+                                log.trace("[{}] Request timeout detected, expTime [{}], tickTs [{}]", kv.getKey(), staleRequest.expTime, tickTs);
                                 staleRequest.future.setException(new TimeoutException());
                             }
                         }
@@ -158,8 +163,10 @@ public class TbKafkaRequestTemplate<Request, Response> extends AbstractTbKafkaTe
         headers.add(new RecordHeader(TbKafkaSettings.REQUEST_ID_HEADER, uuidToBytes(requestId)));
         headers.add(new RecordHeader(TbKafkaSettings.RESPONSE_TOPIC_HEADER, stringToBytes(responseTemplate.getTopic())));
         SettableFuture<Response> future = SettableFuture.create();
-        pendingRequests.putIfAbsent(requestId, new ResponseMetaData<>(tickTs + maxRequestTimeout, future));
+        ResponseMetaData<Response> responseMetaData = new ResponseMetaData<>(tickTs + maxRequestTimeout, future);
+        pendingRequests.putIfAbsent(requestId, responseMetaData);
         request = requestTemplate.enrich(request, responseTemplate.getTopic(), requestId);
+        log.trace("[{}] Sending request, key [{}], expTime [{}]", requestId, key, responseMetaData.expTime);
         requestTemplate.send(key, request, headers, null);
         return future;
     }

docker/.env 2(+0 -2)

diff --git a/docker/.env b/docker/.env
index 0138501..ef0bf40 100644
--- a/docker/.env
+++ b/docker/.env
@@ -16,5 +16,3 @@ TB_VERSION=latest
 DATABASE=postgres
 
 LOAD_BALANCER_NAME=haproxy-certbot
-
-KAFKA_TOPICS="js.eval.requests:100:1:delete --config=retention.ms=60000 --config=segment.bytes=26214400 --config=retention.bytes=104857600,tb.transport.api.requests:30:1:delete --config=retention.ms=60000 --config=segment.bytes=26214400 --config=retention.bytes=104857600,tb.rule-engine:30:1"

docker/kafka.env 2(+1 -1)

diff --git a/docker/kafka.env b/docker/kafka.env
index 69fbdf6..87dad07 100644
--- a/docker/kafka.env
+++ b/docker/kafka.env
@@ -4,7 +4,7 @@ KAFKA_LISTENERS=INSIDE://:9093,OUTSIDE://:9092
 KAFKA_ADVERTISED_LISTENERS=INSIDE://:9093,OUTSIDE://kafka:9092
 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
 KAFKA_INTER_BROKER_LISTENER_NAME=INSIDE
-KAFKA_CREATE_TOPICS=${KAFKA_TOPICS}
+KAFKA_CREATE_TOPICS=js.eval.requests:100:1:delete --config=retention.ms=60000 --config=segment.bytes=26214400 --config=retention.bytes=104857600,tb.transport.api.requests:30:1:delete --config=retention.ms=60000 --config=segment.bytes=26214400 --config=retention.bytes=104857600,tb.rule-engine:30:1
 KAFKA_AUTO_CREATE_TOPICS_ENABLE=false
 KAFKA_LOG_RETENTION_BYTES=1073741824
 KAFKA_LOG_SEGMENT_BYTES=268435456
diff --git a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/ThingsBoardDbInstaller.java b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/ThingsBoardDbInstaller.java
index 0a902b9..fef69b5 100644
--- a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/ThingsBoardDbInstaller.java
+++ b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/ThingsBoardDbInstaller.java
@@ -87,10 +87,17 @@ public class ThingsBoardDbInstaller extends ExternalResource {
 
     @Override
     protected void after() {
-        File tbLogsDir = new File("./target/tb-logs/");
+        copyLogs(tbLogVolume, "./target/tb-logs/");
+
+        dockerCompose.withCommand("volume rm -f " + postgresDataVolume + " " + tbLogVolume);
+        dockerCompose.invokeDocker();
+    }
+
+    private void copyLogs(String volumeName, String targetDir) {
+        File tbLogsDir = new File(targetDir);
         tbLogsDir.mkdirs();
 
-        dockerCompose.withCommand("run -d --rm --name tb-logs-container -v " + tbLogVolume + ":/root alpine tail -f /dev/null");
+        dockerCompose.withCommand("run -d --rm --name tb-logs-container -v " + volumeName + ":/root alpine tail -f /dev/null");
         dockerCompose.invokeDocker();
 
         dockerCompose.withCommand("cp tb-logs-container:/root/. "+tbLogsDir.getAbsolutePath());
@@ -98,9 +105,6 @@ public class ThingsBoardDbInstaller extends ExternalResource {
 
         dockerCompose.withCommand("rm -f tb-logs-container");
         dockerCompose.invokeDocker();
-
-        dockerCompose.withCommand("volume rm -f " + postgresDataVolume + " " + tbLogVolume);
-        dockerCompose.invokeDocker();
     }
 
 }
diff --git a/msa/js-executor/server.js b/msa/js-executor/server.js
index 03fac2e..17f70cb 100644
--- a/msa/js-executor/server.js
+++ b/msa/js-executor/server.js
@@ -45,6 +45,24 @@ var kafkaClient;
             kafkaRequestTopic
         );
 
+        consumer.on('error', (err) => {
+            logger.error('Unexpected kafka consumer error: %s', err.message);
+            logger.error(err.stack);
+        });
+
+        consumer.on('offsetOutOfRange', (err) => {
+            logger.error('Offset out of range error: %s', err.message);
+            logger.error(err.stack);
+        });
+
+        consumer.on('rebalancing', () => {
+            logger.info('Rebalancing event received.');
+        })
+
+        consumer.on('rebalanced', () => {
+            logger.info('Rebalanced event received.');
+        });
+
         var producer = new Producer(kafkaClient);
         producer.on('error', (err) => {
             logger.error('Unexpected kafka producer error: %s', err.message);