thingsboard-aplcache
Changes
application/src/main/java/org/thingsboard/server/service/transaction/BaseRuleChainTransactionService.java 19(+12 -7)
Details
diff --git a/application/src/main/java/org/thingsboard/server/service/transaction/BaseRuleChainTransactionService.java b/application/src/main/java/org/thingsboard/server/service/transaction/BaseRuleChainTransactionService.java
index d7aa766..8846044 100644
--- a/application/src/main/java/org/thingsboard/server/service/transaction/BaseRuleChainTransactionService.java
+++ b/application/src/main/java/org/thingsboard/server/service/transaction/BaseRuleChainTransactionService.java
@@ -182,12 +182,14 @@ public class BaseRuleChainTransactionService implements RuleChainTransactionServ
while (true) {
TbTransactionTask transactionTask = timeoutQueue.peek();
if (transactionTask != null) {
+ long sleepDuration = 0L;
transactionLock.lock();
try {
if (transactionTask.isCompleted()) {
timeoutQueue.poll();
} else {
- if (System.currentTimeMillis() > transactionTask.getExpirationTime()) {
+ long expIn = transactionTask.getExpirationTime() - System.currentTimeMillis();
+ if (expIn < 0) {
log.trace("Task has expired! Deleting it...[{}][{}]", transactionTask.getMsg().getId(), transactionTask.getMsg().getType());
timeoutQueue.poll();
executeOnFailure(transactionTask.getOnFailure(), "Task has expired!");
@@ -201,17 +203,20 @@ public class BaseRuleChainTransactionService implements RuleChainTransactionServ
}
}
} else {
- try {
- log.trace("Task has not expired! Continue executing...[{}][{}]", transactionTask.getMsg().getId(), transactionTask.getMsg().getType());
- TimeUnit.MILLISECONDS.sleep(duration);
- } catch (InterruptedException e) {
- throw new IllegalStateException("Thread interrupted", e);
- }
+ sleepDuration = Math.min(expIn, duration);
}
}
} finally {
transactionLock.unlock();
}
+ if (sleepDuration > 0L) {
+ try {
+ log.trace("Task has not expired! Continue executing...[{}][{}]", transactionTask.getMsg().getId(), transactionTask.getMsg().getType());
+ TimeUnit.MILLISECONDS.sleep(sleepDuration);
+ } catch (InterruptedException e) {
+ throw new IllegalStateException("Thread interrupted", e);
+ }
+ }
} else {
try {
log.trace("Queue is empty, waiting for tasks!");