thingsboard-aplcache

Details

diff --git a/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java b/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java
index 37c4871..bebd2cc 100644
--- a/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java
+++ b/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java
@@ -279,9 +279,6 @@ public class TelemetryController extends BaseController {
             deleteFromTs = 0L;
             deleteToTs = System.currentTimeMillis();
         } else {
-            if (startTs == null || endTs == null) {
-                return getImmediateDeferredResult("StartTs and endTs could not be empty when deleteAllDataForKeys equals [false]", HttpStatus.BAD_REQUEST);
-            }
             deleteFromTs = startTs;
             deleteToTs = endTs;
         }
diff --git a/application/src/main/java/org/thingsboard/server/service/transaction/BaseRuleChainTransactionService.java b/application/src/main/java/org/thingsboard/server/service/transaction/BaseRuleChainTransactionService.java
index a378ff1..f09a0f9 100644
--- a/application/src/main/java/org/thingsboard/server/service/transaction/BaseRuleChainTransactionService.java
+++ b/application/src/main/java/org/thingsboard/server/service/transaction/BaseRuleChainTransactionService.java
@@ -1,12 +1,12 @@
 /**
  * Copyright © 2016-2018 The Thingsboard Authors
- * <p>
+ *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -23,10 +23,17 @@ import org.thingsboard.rule.engine.api.TbContext;
 import org.thingsboard.server.common.data.id.EntityId;
 import org.thingsboard.server.common.msg.TbMsg;
 
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import java.util.Queue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.Consumer;
@@ -37,28 +44,45 @@ public class BaseRuleChainTransactionService implements RuleChainTransactionServ
 
     @Value("${actors.rule.transaction.queue_size}")
     private int finalQueueSize;
+    @Value("${actors.rule.transaction.duration}")
+    private long duration;
 
     private final Lock transactionLock = new ReentrantLock();
     private final ConcurrentMap<EntityId, BlockingQueue<TbTransactionTask>> transactionMap = new ConcurrentHashMap<>();
+    private final Queue<TbTransactionTask> timeoutQueue = new ConcurrentLinkedQueue<>();
 
-    //TODO: add delete on timeout from queue -> onFailure accept
+    private ExecutorService timeoutExecutor;
+
+    @PostConstruct
+    public void init() {
+        timeoutExecutor = Executors.newSingleThreadExecutor();
+        executeOnTimeout();
+    }
+
+    @PreDestroy
+    public void destroy() {
+        if (timeoutExecutor != null) {
+            timeoutExecutor.shutdownNow();
+        }
+    }
 
     @Override
     public void beginTransaction(TbContext ctx, TbMsg msg, Consumer<TbMsg> onStart, Consumer<TbMsg> onEnd, Consumer<Throwable> onFailure) {
-        BlockingQueue<TbTransactionTask> queue = transactionMap.computeIfAbsent(msg.getTransactionData().getOriginatorId(), id ->
-                new LinkedBlockingQueue<>(finalQueueSize));
         transactionLock.lock();
         try {
+            BlockingQueue<TbTransactionTask> queue = transactionMap.computeIfAbsent(msg.getTransactionData().getOriginatorId(), id ->
+                    new LinkedBlockingQueue<>(finalQueueSize));
+
             TbTransactionTask task = new TbTransactionTask(msg, onStart, onEnd, onFailure);
             int queueSize = queue.size();
             if (queueSize >= finalQueueSize) {
-                onFailure.accept(new RuntimeException("Queue has no space!"));
+                task.getOnFailure().accept(new RuntimeException("Queue has no space!"));
             } else {
-                addMsgToQueue(queue, task, onFailure);
+                addMsgToQueues(queue, task);
                 if (queueSize == 0) {
-                    onStart.accept(msg);
+                    startTransactionTask(task);
                 } else {
-                    log.info("Msg [{}] [{}] is waiting to start transaction!", msg.getId(), msg.getType());
+                    log.trace("Msg [{}] [{}] is waiting to start transaction!", msg.getId(), msg.getType());
                 }
             }
         } finally {
@@ -66,41 +90,85 @@ public class BaseRuleChainTransactionService implements RuleChainTransactionServ
         }
     }
 
-    private void addMsgToQueue(BlockingQueue<TbTransactionTask> queue, TbTransactionTask task, Consumer<Throwable> onFailure) {
-        try {
-            queue.add(task);
-            log.info("Added msg to queue, size: [{}]", queue.size());
-        } catch (Exception e) {
-            log.error("Error when trying to add msg [{}] to the queue", task.getMsg(), e);
-            onFailure.accept(e);
-        }
+    private void addMsgToQueues(BlockingQueue<TbTransactionTask> queue, TbTransactionTask task) {
+        queue.offer(task);
+        timeoutQueue.offer(task);
+        log.trace("Added msg to queue, size: [{}]", queue.size());
     }
 
     @Override
     public boolean endTransaction(TbContext ctx, TbMsg msg, Consumer<Throwable> onFailure) {
-        transactionLock.lock();
-        try {
-            BlockingQueue<TbTransactionTask> queue = transactionMap.get(msg.getTransactionData().getOriginatorId());
-            try {
-                TbTransactionTask currentTask = queue.element();
-                if (currentTask.getMsg().getTransactionData().getTransactionId().equals(msg.getTransactionData().getTransactionId())) {
-                    queue.remove();
-                    log.info("Removed msg from queue, size [{}]", queue.size());
-                    currentTask.getOnEnd().accept(currentTask.getMsg());
-
-                    TbTransactionTask nextTask = queue.peek();
-                    if (nextTask != null) {
-                        nextTask.getOnStart().accept(nextTask.getMsg());
-                    }
+        BlockingQueue<TbTransactionTask> queue = transactionMap.get(msg.getTransactionData().getOriginatorId());
+
+        TbTransactionTask currentTask = queue.peek();
+        if (currentTask != null) {
+            if (currentTask.getMsg().getTransactionData().getTransactionId().equals(msg.getTransactionData().getTransactionId())) {
+                currentTask.setIsCompleted(true);
+                queue.remove();
+                log.trace("Removed msg from queue, size [{}]", queue.size());
+                currentTask.getOnEnd().accept(currentTask.getMsg());
+
+                TbTransactionTask nextTask = queue.peek();
+                if (nextTask != null) {
+                    startTransactionTask(nextTask);
                 }
-            } catch (Exception e) {
-                log.error("Queue is empty!", queue);
-                onFailure.accept(e);
+            } else {
+                log.trace("Task has expired!");
+                onFailure.accept(new RuntimeException("Task has expired!"));
                 return true;
             }
-        } finally {
-            transactionLock.unlock();
+        } else {
+            log.trace("Queue is empty, previous task has expired!");
+            onFailure.accept(new RuntimeException("Queue is empty, previous task has expired!"));
+            return true;
         }
         return false;
     }
+
+    private void executeOnTimeout() {
+        timeoutExecutor.execute(() -> {
+            while (true) {
+                TbTransactionTask task = timeoutQueue.peek();
+                if (task != null) {
+                    if (task.getIsCompleted()) {
+                        timeoutQueue.poll();
+                    } else {
+                        if (System.currentTimeMillis() > task.getExpirationTime()) {
+                            log.trace("Task has expired! Deleting it...[{}] [{}]", task.getMsg().getId(), task.getMsg().getType());
+                            timeoutQueue.poll();
+                            task.getOnFailure().accept(new RuntimeException("Task has expired!"));
+
+                            BlockingQueue<TbTransactionTask> queue = transactionMap.get(task.getMsg().getTransactionData().getOriginatorId());
+                            queue.poll();
+
+                            TbTransactionTask nextTask = queue.peek();
+                            if (nextTask != null) {
+                                startTransactionTask(nextTask);
+                            }
+                        } else {
+                            try {
+                                log.trace("Task has not expired! Continue executing...[{}] [{}]", task.getMsg().getId(), task.getMsg().getType());
+                                TimeUnit.MILLISECONDS.sleep(duration);
+                            } catch (InterruptedException e) {
+                                throw new IllegalStateException("Thread interrupted", e);
+                            }
+                        }
+                    }
+                } else {
+                    try {
+                        log.trace("Queue is empty, waiting for tasks!");
+                        TimeUnit.SECONDS.sleep(1);
+                    } catch (InterruptedException e) {
+                        throw new IllegalStateException("Thread interrupted", e);
+                    }
+                }
+            }
+        });
+    }
+
+    private void startTransactionTask(TbTransactionTask task) {
+        task.setIsCompleted(false);
+        task.setExpirationTime(System.currentTimeMillis() + duration);
+        task.getOnStart().accept(task.getMsg());
+    }
 }
diff --git a/application/src/main/java/org/thingsboard/server/service/transaction/TbTransactionTask.java b/application/src/main/java/org/thingsboard/server/service/transaction/TbTransactionTask.java
index 12168d9..a56bc05 100644
--- a/application/src/main/java/org/thingsboard/server/service/transaction/TbTransactionTask.java
+++ b/application/src/main/java/org/thingsboard/server/service/transaction/TbTransactionTask.java
@@ -1,11 +1,28 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.thingsboard.server.service.transaction;
 
+import lombok.AllArgsConstructor;
 import lombok.Data;
 import org.thingsboard.server.common.msg.TbMsg;
 
 import java.util.function.Consumer;
 
 @Data
+@AllArgsConstructor
 public final class TbTransactionTask {
 
     private final TbMsg msg;
@@ -13,4 +30,14 @@ public final class TbTransactionTask {
     private final Consumer<TbMsg> onEnd;
     private final Consumer<Throwable> onFailure;
 
+    private Boolean isCompleted;
+    private Long expirationTime;
+
+    public TbTransactionTask(TbMsg msg, Consumer<TbMsg> onStart, Consumer<TbMsg> onEnd, Consumer<Throwable> onFailure) {
+        this.msg = msg;
+        this.onStart = onStart;
+        this.onEnd = onEnd;
+        this.onFailure = onFailure;
+        this.isCompleted = false;
+    }
 }
diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml
index a837a3e..157e36b 100644
--- a/application/src/main/resources/thingsboard.yml
+++ b/application/src/main/resources/thingsboard.yml
@@ -215,6 +215,8 @@ actors:
     transaction:
       # Size of queues which store messages for transaction rule nodes
       queue_size: "${ACTORS_RULE_TRANSACTION_QUEUE_SIZE:10}"
+      # Time in milliseconds for transaction to complete
+      duration: "${ACTORS_RULE_TRANSACTION_DURATION:10000}"
   statistics:
     # Enable/disable actor statistics
     enabled: "${ACTORS_STATISTICS_ENABLED:true}"
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transaction/TbTransactionBeginNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transaction/TbTransactionBeginNode.java
index 8e466a3..bad4585 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transaction/TbTransactionBeginNode.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transaction/TbTransactionBeginNode.java
@@ -1,12 +1,12 @@
 /**
  * Copyright © 2016-2018 The Thingsboard Authors
- * <p>
+ *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -54,7 +54,7 @@ public class TbTransactionBeginNode implements TbNode {
 
     @Override
     public void onMsg(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException, TbNodeException {
-        log.info("Msg in - [{}] [{}]", msg.getId(), msg.getType());
+        log.trace("Msg in - [{}] [{}]", msg.getId(), msg.getType());
 
         TbMsgTransactionData transactionData = new TbMsgTransactionData(UUID.randomUUID(), msg.getOriginator());
 
@@ -62,11 +62,11 @@ public class TbTransactionBeginNode implements TbNode {
                 msg.getData(), transactionData, msg.getRuleChainId(), msg.getRuleNodeId(), msg.getClusterPartition());
 
         ctx.getRuleChainTransactionService().beginTransaction(ctx, tbMsg, onStart -> {
-                    log.info("Transaction starting... [{}] [{}]", tbMsg.getId(), tbMsg.getType());
+                    log.trace("Transaction starting... [{}] [{}]", tbMsg.getId(), tbMsg.getType());
                     ctx.tellNext(tbMsg, SUCCESS);
-                }, onEnd -> log.info("Transaction ended successfully... [{}] [{}]", tbMsg.getId(), tbMsg.getType()),
+                }, onEnd -> log.trace("Transaction ended successfully... [{}] [{}]", tbMsg.getId(), tbMsg.getType()),
                 throwable -> {
-                    log.error("Transaction failed due to queue size restriction! [{}] [{}]", tbMsg.getId(), tbMsg.getType(), throwable);
+                    log.error("Transaction failed! [{}] [{}]", tbMsg.getId(), tbMsg.getType(), throwable);
                     ctx.tellFailure(tbMsg, throwable);
                 });
     }
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transaction/TbTransactionEndNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transaction/TbTransactionEndNode.java
index c1d5cef..1bcd58b 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transaction/TbTransactionEndNode.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transaction/TbTransactionEndNode.java
@@ -1,12 +1,12 @@
 /**
  * Copyright © 2016-2018 The Thingsboard Authors
- * <p>
+ *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -55,7 +55,7 @@ public class TbTransactionEndNode implements TbNode {
         if (!isFailed) {
             ctx.tellNext(msg, SUCCESS);
         }
-        log.info("Msg out - [{}] [{}]", msg.getId(), msg.getType());
+        log.trace("Msg out - [{}] [{}]", msg.getId(), msg.getType());
     }
 
     @Override