thingsboard-aplcache

Refactoring of Generator Node

7/12/2018 6:51:42 AM

Details

diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
index cae377c..fe02335 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
@@ -248,7 +248,9 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
         int relationsCount = relations.size();
         EntityId ackId = msg.getRuleNodeId() != null ? msg.getRuleNodeId() : msg.getRuleChainId();
         if (relationsCount == 0) {
-            queue.ack(tenantId, msg, ackId.getId(), msg.getClusterPartition());
+            if (ackId != null) {
+                queue.ack(tenantId, msg, ackId.getId(), msg.getClusterPartition());
+            }
         } else if (relationsCount == 1) {
             for (RuleNodeRelation relation : relations) {
                 pushToTarget(msg, relation.getOut(), relation.getType());
@@ -266,7 +268,9 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
                 }
             }
             //TODO: Ideally this should happen in async way when all targets confirm that the copied messages are successfully written to corresponding target queues.
-            queue.ack(tenantId, msg, ackId.getId(), msg.getClusterPartition());
+            if (ackId != null) {
+                queue.ack(tenantId, msg, ackId.getId(), msg.getClusterPartition());
+            }
         }
     }
 
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNode.java
index 5a30dcb..e7a54af 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNode.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNode.java
@@ -47,11 +47,12 @@ import static org.thingsboard.rule.engine.api.TbRelationTypes.SUCCESS;
 
 public class TbMsgGeneratorNode implements TbNode {
 
-    public static final String TB_MSG_GENERATOR_NODE_MSG = "TbMsgGeneratorNodeMsg";
+    private static final String TB_MSG_GENERATOR_NODE_MSG = "TbMsgGeneratorNodeMsg";
 
     private TbMsgGeneratorNodeConfiguration config;
     private ScriptEngine jsEngine;
     private long delay;
+    private long lastScheduledTs;
     private EntityId originatorId;
     private UUID nextTickId;
     private TbMsg prevMsg;
@@ -66,28 +67,40 @@ public class TbMsgGeneratorNode implements TbNode {
             originatorId = ctx.getSelfId();
         }
         this.jsEngine = ctx.createJsScriptEngine(config.getJsScript(), "prevMsg", "prevMetadata", "prevMsgType");
-        sentTickMsg(ctx);
+        scheduleTickMsg(ctx);
     }
 
     @Override
     public void onMsg(TbContext ctx, TbMsg msg) {
         if (msg.getType().equals(TB_MSG_GENERATOR_NODE_MSG) && msg.getId().equals(nextTickId)) {
             withCallback(generate(ctx),
-                    m -> {ctx.tellNext(m, SUCCESS); sentTickMsg(ctx);},
-                    t -> {ctx.tellFailure(msg, t); sentTickMsg(ctx);});
+                    m -> {
+                        ctx.tellNext(m, SUCCESS);
+                        scheduleTickMsg(ctx);
+                    },
+                    t -> {
+                        ctx.tellFailure(msg, t);
+                        scheduleTickMsg(ctx);
+                    });
         }
     }
 
-    private void sentTickMsg(TbContext ctx) {
+    private void scheduleTickMsg(TbContext ctx) {
+        long curTs = System.currentTimeMillis();
+        if (lastScheduledTs == 0L) {
+            lastScheduledTs = curTs;
+        }
+        lastScheduledTs = lastScheduledTs + delay;
+        long curDelay = Math.max(0L, (lastScheduledTs - curTs));
         TbMsg tickMsg = ctx.newMsg(TB_MSG_GENERATOR_NODE_MSG, ctx.getSelfId(), new TbMsgMetaData(), "");
         nextTickId = tickMsg.getId();
-        ctx.tellSelf(tickMsg, delay);
+        ctx.tellSelf(tickMsg, curDelay);
     }
 
     private ListenableFuture<TbMsg> generate(TbContext ctx) {
         return ctx.getJsExecutor().executeAsync(() -> {
             if (prevMsg == null) {
-                prevMsg = ctx.newMsg( "", originatorId, new TbMsgMetaData(), "{}");
+                prevMsg = ctx.newMsg("", originatorId, new TbMsgMetaData(), "{}");
             }
             TbMsg generated = jsEngine.executeGenerate(prevMsg);
             prevMsg = ctx.newMsg(generated.getType(), originatorId, generated.getMetaData(), generated.getData());