thingsboard-aplcache
Details
diff --git a/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java b/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java
index db31bda..05ed285 100644
--- a/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java
+++ b/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java
@@ -136,6 +136,7 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService {
.setCompileRequest(jsRequest)
.build();
+ log.trace("Post compile request for scriptId [{}]", scriptId);
ListenableFuture<JsInvokeProtos.RemoteJsResponse> future = kafkaTemplate.post(scriptId.toString(), jsRequestWrapper);
return Futures.transform(future, response -> {
JsInvokeProtos.JsCompileResponse compilationResult = response.getCompileResponse();
diff --git a/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaRequestTemplate.java b/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaRequestTemplate.java
index 77ad033..b0aca0f 100644
--- a/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaRequestTemplate.java
+++ b/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaRequestTemplate.java
@@ -92,6 +92,9 @@ public class TbKafkaRequestTemplate<Request, Response> extends AbstractTbKafkaTe
long nextCleanupMs = 0L;
while (!stopped) {
ConsumerRecords<String, byte[]> responses = responseTemplate.poll(Duration.ofMillis(pollInterval));
+ if (responses.count() > 0) {
+ log.trace("Polling responses completed, consumer records count [{}]", responses.count());
+ }
responses.forEach(response -> {
Header requestIdHeader = response.headers().lastHeader(TbKafkaSettings.REQUEST_ID_HEADER);
Response decodedResponse = null;
@@ -109,6 +112,7 @@ public class TbKafkaRequestTemplate<Request, Response> extends AbstractTbKafkaTe
if (requestId == null) {
log.error("[{}] Missing requestId in header and body", response);
} else {
+ log.trace("[{}] Response received", requestId);
ResponseMetaData<Response> expectedResponse = pendingRequests.remove(requestId);
if (expectedResponse == null) {
log.trace("[{}] Invalid or stale request", requestId);
@@ -132,6 +136,7 @@ public class TbKafkaRequestTemplate<Request, Response> extends AbstractTbKafkaTe
if (kv.getValue().expTime < tickTs) {
ResponseMetaData<Response> staleRequest = pendingRequests.remove(kv.getKey());
if (staleRequest != null) {
+ log.trace("[{}] Request timeout detected, expTime [{}], tickTs [{}]", kv.getKey(), staleRequest.expTime, tickTs);
staleRequest.future.setException(new TimeoutException());
}
}
@@ -158,8 +163,10 @@ public class TbKafkaRequestTemplate<Request, Response> extends AbstractTbKafkaTe
headers.add(new RecordHeader(TbKafkaSettings.REQUEST_ID_HEADER, uuidToBytes(requestId)));
headers.add(new RecordHeader(TbKafkaSettings.RESPONSE_TOPIC_HEADER, stringToBytes(responseTemplate.getTopic())));
SettableFuture<Response> future = SettableFuture.create();
- pendingRequests.putIfAbsent(requestId, new ResponseMetaData<>(tickTs + maxRequestTimeout, future));
+ ResponseMetaData<Response> responseMetaData = new ResponseMetaData<>(tickTs + maxRequestTimeout, future);
+ pendingRequests.putIfAbsent(requestId, responseMetaData);
request = requestTemplate.enrich(request, responseTemplate.getTopic(), requestId);
+ log.trace("[{}] Sending request, key [{}], expTime [{}]", requestId, key, responseMetaData.expTime);
requestTemplate.send(key, request, headers, null);
return future;
}
docker/.env 2(+0 -2)
diff --git a/docker/.env b/docker/.env
index 0138501..ef0bf40 100644
--- a/docker/.env
+++ b/docker/.env
@@ -16,5 +16,3 @@ TB_VERSION=latest
DATABASE=postgres
LOAD_BALANCER_NAME=haproxy-certbot
-
-KAFKA_TOPICS="js.eval.requests:100:1:delete --config=retention.ms=60000 --config=segment.bytes=26214400 --config=retention.bytes=104857600,tb.transport.api.requests:30:1:delete --config=retention.ms=60000 --config=segment.bytes=26214400 --config=retention.bytes=104857600,tb.rule-engine:30:1"
docker/kafka.env 2(+1 -1)
diff --git a/docker/kafka.env b/docker/kafka.env
index 69fbdf6..87dad07 100644
--- a/docker/kafka.env
+++ b/docker/kafka.env
@@ -4,7 +4,7 @@ KAFKA_LISTENERS=INSIDE://:9093,OUTSIDE://:9092
KAFKA_ADVERTISED_LISTENERS=INSIDE://:9093,OUTSIDE://kafka:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME=INSIDE
-KAFKA_CREATE_TOPICS=${KAFKA_TOPICS}
+KAFKA_CREATE_TOPICS=js.eval.requests:100:1:delete --config=retention.ms=60000 --config=segment.bytes=26214400 --config=retention.bytes=104857600,tb.transport.api.requests:30:1:delete --config=retention.ms=60000 --config=segment.bytes=26214400 --config=retention.bytes=104857600,tb.rule-engine:30:1
KAFKA_AUTO_CREATE_TOPICS_ENABLE=false
KAFKA_LOG_RETENTION_BYTES=1073741824
KAFKA_LOG_SEGMENT_BYTES=268435456
diff --git a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/ThingsBoardDbInstaller.java b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/ThingsBoardDbInstaller.java
index 0a902b9..fef69b5 100644
--- a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/ThingsBoardDbInstaller.java
+++ b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/ThingsBoardDbInstaller.java
@@ -87,10 +87,17 @@ public class ThingsBoardDbInstaller extends ExternalResource {
@Override
protected void after() {
- File tbLogsDir = new File("./target/tb-logs/");
+ copyLogs(tbLogVolume, "./target/tb-logs/");
+
+ dockerCompose.withCommand("volume rm -f " + postgresDataVolume + " " + tbLogVolume);
+ dockerCompose.invokeDocker();
+ }
+
+ private void copyLogs(String volumeName, String targetDir) {
+ File tbLogsDir = new File(targetDir);
tbLogsDir.mkdirs();
- dockerCompose.withCommand("run -d --rm --name tb-logs-container -v " + tbLogVolume + ":/root alpine tail -f /dev/null");
+ dockerCompose.withCommand("run -d --rm --name tb-logs-container -v " + volumeName + ":/root alpine tail -f /dev/null");
dockerCompose.invokeDocker();
dockerCompose.withCommand("cp tb-logs-container:/root/. "+tbLogsDir.getAbsolutePath());
@@ -98,9 +105,6 @@ public class ThingsBoardDbInstaller extends ExternalResource {
dockerCompose.withCommand("rm -f tb-logs-container");
dockerCompose.invokeDocker();
-
- dockerCompose.withCommand("volume rm -f " + postgresDataVolume + " " + tbLogVolume);
- dockerCompose.invokeDocker();
}
}
msa/js-executor/server.js 18(+18 -0)
diff --git a/msa/js-executor/server.js b/msa/js-executor/server.js
index 03fac2e..17f70cb 100644
--- a/msa/js-executor/server.js
+++ b/msa/js-executor/server.js
@@ -45,6 +45,24 @@ var kafkaClient;
kafkaRequestTopic
);
+ consumer.on('error', (err) => {
+ logger.error('Unexpected kafka consumer error: %s', err.message);
+ logger.error(err.stack);
+ });
+
+ consumer.on('offsetOutOfRange', (err) => {
+ logger.error('Offset out of range error: %s', err.message);
+ logger.error(err.stack);
+ });
+
+ consumer.on('rebalancing', () => {
+ logger.info('Rebalancing event received.');
+ })
+
+ consumer.on('rebalanced', () => {
+ logger.info('Rebalanced event received.');
+ });
+
var producer = new Producer(kafkaClient);
producer.on('error', (err) => {
logger.error('Unexpected kafka producer error: %s', err.message);