thingsboard-aplcache

Details

diff --git a/application/src/main/java/org/thingsboard/server/service/transaction/BaseRuleChainTransactionService.java b/application/src/main/java/org/thingsboard/server/service/transaction/BaseRuleChainTransactionService.java
index d7aa766..8846044 100644
--- a/application/src/main/java/org/thingsboard/server/service/transaction/BaseRuleChainTransactionService.java
+++ b/application/src/main/java/org/thingsboard/server/service/transaction/BaseRuleChainTransactionService.java
@@ -182,12 +182,14 @@ public class BaseRuleChainTransactionService implements RuleChainTransactionServ
             while (true) {
                 TbTransactionTask transactionTask = timeoutQueue.peek();
                 if (transactionTask != null) {
+                    long sleepDuration = 0L;
                     transactionLock.lock();
                     try {
                         if (transactionTask.isCompleted()) {
                             timeoutQueue.poll();
                         } else {
-                            if (System.currentTimeMillis() > transactionTask.getExpirationTime()) {
+                            long expIn = transactionTask.getExpirationTime() - System.currentTimeMillis();
+                            if (expIn < 0) {
                                 log.trace("Task has expired! Deleting it...[{}][{}]", transactionTask.getMsg().getId(), transactionTask.getMsg().getType());
                                 timeoutQueue.poll();
                                 executeOnFailure(transactionTask.getOnFailure(), "Task has expired!");
@@ -201,17 +203,20 @@ public class BaseRuleChainTransactionService implements RuleChainTransactionServ
                                     }
                                 }
                             } else {
-                                try {
-                                    log.trace("Task has not expired! Continue executing...[{}][{}]", transactionTask.getMsg().getId(), transactionTask.getMsg().getType());
-                                    TimeUnit.MILLISECONDS.sleep(duration);
-                                } catch (InterruptedException e) {
-                                    throw new IllegalStateException("Thread interrupted", e);
-                                }
+                                sleepDuration = Math.min(expIn, duration);
                             }
                         }
                     } finally {
                         transactionLock.unlock();
                     }
+                    if (sleepDuration > 0L) {
+                        try {
+                            log.trace("Task has not expired! Continue executing...[{}][{}]", transactionTask.getMsg().getId(), transactionTask.getMsg().getType());
+                            TimeUnit.MILLISECONDS.sleep(sleepDuration);
+                        } catch (InterruptedException e) {
+                            throw new IllegalStateException("Thread interrupted", e);
+                        }
+                    }
                 } else {
                     try {
                         log.trace("Queue is empty, waiting for tasks!");