thingsboard-memoizeit

Details

diff --git a/application/src/main/java/org/thingsboard/server/service/transaction/BaseRuleChainTransactionService.java b/application/src/main/java/org/thingsboard/server/service/transaction/BaseRuleChainTransactionService.java
index d0eb713..baf58bf 100644
--- a/application/src/main/java/org/thingsboard/server/service/transaction/BaseRuleChainTransactionService.java
+++ b/application/src/main/java/org/thingsboard/server/service/transaction/BaseRuleChainTransactionService.java
@@ -22,16 +22,15 @@ import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Service;
 import org.thingsboard.rule.engine.api.RuleChainTransactionService;
 import org.thingsboard.rule.engine.api.TbContext;
-import org.thingsboard.server.common.data.EntityType;
-import org.thingsboard.server.common.data.id.AssetId;
-import org.thingsboard.server.common.data.id.DeviceId;
 import org.thingsboard.server.common.data.id.EntityId;
+import org.thingsboard.server.common.data.id.EntityIdFactory;
 import org.thingsboard.server.common.data.id.TenantId;
 import org.thingsboard.server.common.msg.TbMsg;
 import org.thingsboard.server.common.msg.cluster.ServerAddress;
 import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
 import org.thingsboard.server.service.cluster.routing.ClusterRoutingService;
 import org.thingsboard.server.service.cluster.rpc.ClusterRpcService;
+import org.thingsboard.server.service.executors.DbCallbackExecutorService;
 
 import javax.annotation.PostConstruct;
 import javax.annotation.PreDestroy;
@@ -39,6 +38,7 @@ import java.util.Optional;
 import java.util.Queue;
 import java.util.UUID;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ConcurrentMap;
@@ -60,6 +60,9 @@ public class BaseRuleChainTransactionService implements RuleChainTransactionServ
     @Autowired
     private ClusterRpcService clusterRpcService;
 
+    @Autowired
+    private DbCallbackExecutorService callbackExecutor;
+
     @Value("${actors.rule.transaction.queue_size}")
     private int finalQueueSize;
     @Value("${actors.rule.transaction.duration}")
@@ -70,12 +73,10 @@ public class BaseRuleChainTransactionService implements RuleChainTransactionServ
     private final Queue<TbTransactionTask> timeoutQueue = new ConcurrentLinkedQueue<>();
 
     private ExecutorService timeoutExecutor;
-    private ExecutorService executor;
 
     @PostConstruct
     public void init() {
         timeoutExecutor = Executors.newSingleThreadExecutor();
-        executor = Executors.newSingleThreadExecutor();
         executeOnTimeout();
     }
 
@@ -84,9 +85,6 @@ public class BaseRuleChainTransactionService implements RuleChainTransactionServ
         if (timeoutExecutor != null) {
             timeoutExecutor.shutdownNow();
         }
-        if (executor != null) {
-            executor.shutdownNow();
-        }
     }
 
     @Override
@@ -96,16 +94,16 @@ public class BaseRuleChainTransactionService implements RuleChainTransactionServ
             BlockingQueue<TbTransactionTask> queue = transactionMap.computeIfAbsent(msg.getTransactionData().getOriginatorId(), id ->
                     new LinkedBlockingQueue<>(finalQueueSize));
 
-            TbTransactionTask task = new TbTransactionTask(msg, onStart, onEnd, onFailure);
+            TbTransactionTask transactionTask = new TbTransactionTask(msg, onStart, onEnd, onFailure, System.currentTimeMillis() + duration);
             int queueSize = queue.size();
             if (queueSize >= finalQueueSize) {
-                task.getOnFailure().accept(new RuntimeException("Queue has no space!"));
+                executeOnFailure(transactionTask.getOnFailure(), "Queue has no space!");
             } else {
-                addMsgToQueues(queue, task);
+                addMsgToQueues(queue, transactionTask);
                 if (queueSize == 0) {
-                    startTransactionTask(task);
+                    executeOnSuccess(transactionTask.getOnStart(), transactionTask.getMsg());
                 } else {
-                    log.trace("Msg [{}] [{}] is waiting to start transaction!", msg.getId(), msg.getType());
+                    log.trace("Msg [{}][{}] is waiting to start transaction!", msg.getId(), msg.getType());
                 }
             }
         } finally {
@@ -113,64 +111,79 @@ public class BaseRuleChainTransactionService implements RuleChainTransactionServ
         }
     }
 
-    private void addMsgToQueues(BlockingQueue<TbTransactionTask> queue, TbTransactionTask task) {
-        queue.offer(task);
-        timeoutQueue.offer(task);
+    private void addMsgToQueues(BlockingQueue<TbTransactionTask> queue, TbTransactionTask transactionTask) {
+        queue.offer(transactionTask);
+        timeoutQueue.offer(transactionTask);
         log.trace("Added msg to queue, size: [{}]", queue.size());
     }
 
     @Override
-    public boolean endTransaction(TbContext ctx, TbMsg msg, Consumer<Throwable> onFailure) {
-        BlockingQueue<TbTransactionTask> queue = transactionMap.get(msg.getTransactionData().getOriginatorId());
-
-        TbTransactionTask currentTask = queue.peek();
-        if (currentTask != null) {
-            if (currentTask.getMsg().getTransactionData().getTransactionId().equals(msg.getTransactionData().getTransactionId())) {
-                currentTask.setIsCompleted(true);
-                queue.remove();
-                log.trace("Removed msg from queue, size [{}]", queue.size());
-                currentTask.getOnEnd().accept(currentTask.getMsg());
-
-                TbTransactionTask nextTask = queue.peek();
-                if (nextTask != null) {
-                    startTransactionTask(nextTask);
+    public void endTransaction(TbContext ctx, TbMsg msg, Consumer<TbMsg> onSuccess, Consumer<Throwable> onFailure) {
+        EntityId originatorId = msg.getTransactionData().getOriginatorId();
+
+        if (!onRemoteTransactionEndSync(ctx.getTenantId(), originatorId)) {
+            transactionLock.lock();
+            try {
+                BlockingQueue<TbTransactionTask> queue = transactionMap.computeIfAbsent(originatorId, id ->
+                        new LinkedBlockingQueue<>(finalQueueSize));
+
+                TbTransactionTask currentTransactionTask = queue.peek();
+                if (currentTransactionTask != null) {
+                    if (currentTransactionTask.getMsg().getTransactionData().getTransactionId().equals(msg.getTransactionData().getTransactionId())) {
+                        currentTransactionTask.setCompleted(true);
+                        queue.poll();
+                        log.trace("Removed msg from queue, size [{}]", queue.size());
+
+                        executeOnSuccess(currentTransactionTask.getOnEnd(), currentTransactionTask.getMsg());
+                        executeOnSuccess(onSuccess, currentTransactionTask.getMsg());
+
+                        TbTransactionTask nextTransactionTask = queue.peek();
+                        if (nextTransactionTask != null) {
+                            executeOnSuccess(nextTransactionTask.getOnStart(), nextTransactionTask.getMsg());
+                        }
+                    } else {
+                        log.trace("Task has expired!");
+                        executeOnFailure(onFailure, "Task has expired!");
+                    }
+                } else {
+                    log.trace("Queue is empty, previous task has expired!");
+                    executeOnFailure(onFailure, "Queue is empty, previous task has expired!");
                 }
-            } else {
-                log.trace("Task has expired!");
-                onFailure.accept(new RuntimeException("Task has expired!"));
-                return true;
+            } finally {
+                transactionLock.unlock();
             }
-        } else {
-            log.trace("Queue is empty, previous task has expired!");
-            onFailure.accept(new RuntimeException("Queue is empty, previous task has expired!"));
-            return true;
         }
-        return false;
     }
 
     private void executeOnTimeout() {
         timeoutExecutor.submit(() -> {
             while (true) {
-                TbTransactionTask task = timeoutQueue.peek();
-                if (task != null) {
-                    if (task.getIsCompleted()) {
+                TbTransactionTask transactionTask = timeoutQueue.peek();
+                if (transactionTask != null) {
+                    if (transactionTask.isCompleted()) {
                         timeoutQueue.poll();
                     } else {
-                        if (System.currentTimeMillis() > task.getExpirationTime()) {
-                            log.trace("Task has expired! Deleting it...[{}] [{}]", task.getMsg().getId(), task.getMsg().getType());
-                            timeoutQueue.poll();
-                            task.getOnFailure().accept(new RuntimeException("Task has expired!"));
-
-                            BlockingQueue<TbTransactionTask> queue = transactionMap.get(task.getMsg().getTransactionData().getOriginatorId());
-                            queue.poll();
+                        if (System.currentTimeMillis() > transactionTask.getExpirationTime()) {
+                            transactionLock.lock();
+                            try {
+                                log.trace("Task has expired! Deleting it...[{}][{}]", transactionTask.getMsg().getId(), transactionTask.getMsg().getType());
+                                timeoutQueue.poll();
+                                executeOnFailure(transactionTask.getOnFailure(), "Task has expired!");
 
-                            TbTransactionTask nextTask = queue.peek();
-                            if (nextTask != null) {
-                                startTransactionTask(nextTask);
+                                BlockingQueue<TbTransactionTask> queue = transactionMap.get(transactionTask.getMsg().getTransactionData().getOriginatorId());
+                                if (queue != null) {
+                                    queue.poll();
+                                    TbTransactionTask nextTransactionTask = queue.peek();
+                                    if (nextTransactionTask != null) {
+                                        executeOnSuccess(nextTransactionTask.getOnStart(), nextTransactionTask.getMsg());
+                                    }
+                                }
+                            } finally {
+                                transactionLock.unlock();
                             }
                         } else {
                             try {
-                                log.trace("Task has not expired! Continue executing...[{}] [{}]", task.getMsg().getId(), task.getMsg().getType());
+                                log.trace("Task has not expired! Continue executing...[{}][{}]", transactionTask.getMsg().getId(), transactionTask.getMsg().getType());
                                 TimeUnit.MILLISECONDS.sleep(duration);
                             } catch (InterruptedException e) {
                                 throw new IllegalStateException("Thread interrupted", e);
@@ -189,10 +202,22 @@ public class BaseRuleChainTransactionService implements RuleChainTransactionServ
         });
     }
 
-    private void startTransactionTask(TbTransactionTask task) {
-        task.setIsCompleted(false);
-        task.setExpirationTime(System.currentTimeMillis() + duration);
-        task.getOnStart().accept(task.getMsg());
+    private void executeOnFailure(Consumer<Throwable> onFailure, String exception) {
+        executeCallback(() -> {
+            onFailure.accept(new RuntimeException(exception));
+            return null;
+        });
+    }
+
+    private void executeOnSuccess(Consumer<TbMsg> onSuccess, TbMsg tbMsg) {
+        executeCallback(() -> {
+            onSuccess.accept(tbMsg);
+            return null;
+        });
+    }
+
+    private void executeCallback(Callable<Void> task) {
+        callbackExecutor.executeAsync(task);
     }
 
     @Override
@@ -204,25 +229,21 @@ public class BaseRuleChainTransactionService implements RuleChainTransactionServ
             throw new RuntimeException(e);
         }
         TenantId tenantId = new TenantId(new UUID(proto.getTenantIdMSB(), proto.getTenantIdLSB()));
-
-        String entityTypeStr = proto.getEntityType();
-        EntityId entityId;
-        if (entityTypeStr.equals(EntityType.ASSET.name())) {
-            entityId = new AssetId(new UUID(proto.getOriginatorIdMSB(), proto.getOriginatorIdLSB()));
-        } else {
-            entityId = new DeviceId(new UUID(proto.getOriginatorIdMSB(), proto.getOriginatorIdLSB()));
-        }
+        EntityId entityId = EntityIdFactory.getByTypeAndUuid(proto.getEntityType(), new UUID(proto.getOriginatorIdMSB(), proto.getOriginatorIdLSB()));
         onTransactionEnd(tenantId, entityId);
     }
 
-    @Override
-    public void onTransactionEnd(TenantId tenantId, EntityId entityId) {
-        executor.submit(() -> onTransactionEndSync(tenantId, entityId));
+    private void onTransactionEnd(TenantId tenantId, EntityId entityId) {
+        callbackExecutor.executeAsync(() -> onRemoteTransactionEndSync(tenantId, entityId));
     }
 
-    private void onTransactionEndSync(TenantId tenantId, EntityId entityId) {
+    private boolean onRemoteTransactionEndSync(TenantId tenantId, EntityId entityId) {
         Optional<ServerAddress> address = routingService.resolveById(entityId);
-        address.ifPresent(serverAddress -> sendTransactionEvent(tenantId, entityId, serverAddress));
+        if (address.isPresent()) {
+            sendTransactionEvent(tenantId, entityId, address.get());
+            return true;
+        }
+        return false;
     }
 
     private void sendTransactionEvent(TenantId tenantId, EntityId entityId, ServerAddress address) {
diff --git a/application/src/main/java/org/thingsboard/server/service/transaction/TbTransactionTask.java b/application/src/main/java/org/thingsboard/server/service/transaction/TbTransactionTask.java
index a56bc05..49e29ba 100644
--- a/application/src/main/java/org/thingsboard/server/service/transaction/TbTransactionTask.java
+++ b/application/src/main/java/org/thingsboard/server/service/transaction/TbTransactionTask.java
@@ -29,15 +29,16 @@ public final class TbTransactionTask {
     private final Consumer<TbMsg> onStart;
     private final Consumer<TbMsg> onEnd;
     private final Consumer<Throwable> onFailure;
+    private final long expirationTime;
 
-    private Boolean isCompleted;
-    private Long expirationTime;
+    private boolean isCompleted;
 
-    public TbTransactionTask(TbMsg msg, Consumer<TbMsg> onStart, Consumer<TbMsg> onEnd, Consumer<Throwable> onFailure) {
+    public TbTransactionTask(TbMsg msg, Consumer<TbMsg> onStart, Consumer<TbMsg> onEnd, Consumer<Throwable> onFailure, long expirationTime) {
         this.msg = msg;
         this.onStart = onStart;
         this.onEnd = onEnd;
         this.onFailure = onFailure;
+        this.expirationTime = expirationTime;
         this.isCompleted = false;
     }
 }
diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleChainTransactionService.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleChainTransactionService.java
index 51aebea..dbfc021 100644
--- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleChainTransactionService.java
+++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleChainTransactionService.java
@@ -15,8 +15,6 @@
  */
 package org.thingsboard.rule.engine.api;
 
-import org.thingsboard.server.common.data.id.EntityId;
-import org.thingsboard.server.common.data.id.TenantId;
 import org.thingsboard.server.common.msg.TbMsg;
 import org.thingsboard.server.common.msg.cluster.ServerAddress;
 
@@ -26,10 +24,8 @@ public interface RuleChainTransactionService {
 
     void beginTransaction(TbContext ctx, TbMsg msg, Consumer<TbMsg> onStart, Consumer<TbMsg> onEnd, Consumer<Throwable> onFailure);
 
-    boolean endTransaction(TbContext ctx, TbMsg msg, Consumer<Throwable> onFailure);
+    void endTransaction(TbContext ctx, TbMsg msg, Consumer<TbMsg> onSuccess, Consumer<Throwable> onFailure);
 
     void onRemoteTransactionMsg(ServerAddress serverAddress, byte[] bytes);
 
-    void onTransactionEnd(TenantId tenantId, EntityId entityId);
-
 }
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transaction/TbTransactionBeginNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transaction/TbTransactionBeginNode.java
index 2452f71..b086140 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transaction/TbTransactionBeginNode.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transaction/TbTransactionBeginNode.java
@@ -28,7 +28,6 @@ import org.thingsboard.server.common.msg.TbMsg;
 import org.thingsboard.server.common.msg.TbMsgDataType;
 import org.thingsboard.server.common.msg.TbMsgTransactionData;
 
-import java.util.UUID;
 import java.util.concurrent.ExecutionException;
 
 import static org.thingsboard.rule.engine.api.TbRelationTypes.SUCCESS;
@@ -54,19 +53,18 @@ public class TbTransactionBeginNode implements TbNode {
 
     @Override
     public void onMsg(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException, TbNodeException {
-        log.trace("Msg enters transaction - [{}] [{}]", msg.getId(), msg.getType());
+        log.trace("Msg enters transaction - [{}][{}]", msg.getId(), msg.getType());
 
         TbMsgTransactionData transactionData = new TbMsgTransactionData(msg.getId(), msg.getOriginator());
-
         TbMsg tbMsg = new TbMsg(msg.getId(), msg.getType(), msg.getOriginator(), msg.getMetaData(), TbMsgDataType.JSON,
                 msg.getData(), transactionData, msg.getRuleChainId(), msg.getRuleNodeId(), msg.getClusterPartition());
 
-        ctx.getRuleChainTransactionService().beginTransaction(ctx, tbMsg, onStart -> {
-                    log.trace("Transaction starting... [{}] [{}]", tbMsg.getId(), tbMsg.getType());
-                    ctx.tellNext(tbMsg, SUCCESS);
-                }, onEnd -> log.trace("Transaction ended successfully... [{}] [{}]", tbMsg.getId(), tbMsg.getType()),
+        ctx.getRuleChainTransactionService().beginTransaction(ctx, tbMsg, startMsg -> {
+                    log.trace("Transaction starting... [{}][{}]", startMsg.getId(), startMsg.getType());
+                    ctx.tellNext(startMsg, SUCCESS);
+                }, endMsg -> log.trace("Transaction ended successfully... [{}][{}]", endMsg.getId(), endMsg.getType()),
                 throwable -> {
-                    log.error("Transaction failed! [{}] [{}]", tbMsg.getId(), tbMsg.getType(), throwable);
+                    log.error("Transaction failed! [{}][{}]", tbMsg.getId(), tbMsg.getType(), throwable);
                     ctx.tellFailure(tbMsg, throwable);
                 });
     }
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transaction/TbTransactionEndNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transaction/TbTransactionEndNode.java
index dc6ce6e..ea98bcc 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transaction/TbTransactionEndNode.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transaction/TbTransactionEndNode.java
@@ -51,12 +51,10 @@ public class TbTransactionEndNode implements TbNode {
 
     @Override
     public void onMsg(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException, TbNodeException {
-        ctx.getRuleChainTransactionService().onTransactionEnd(ctx.getTenantId(), msg.getTransactionData().getOriginatorId());
-        boolean isFailed = ctx.getRuleChainTransactionService().endTransaction(ctx, msg, throwable -> ctx.tellFailure(msg, throwable));
-        if (!isFailed) {
-            ctx.tellNext(msg, SUCCESS);
-        }
-        log.trace("Msg left transaction - [{}] [{}]", msg.getId(), msg.getType());
+        ctx.getRuleChainTransactionService().endTransaction(ctx, msg,
+                successMsg -> ctx.tellNext(successMsg, SUCCESS),
+                throwable -> ctx.tellFailure(msg, throwable));
+        log.trace("Msg left transaction - [{}][{}]", msg.getId(), msg.getType());
     }
 
     @Override