thingsboard-aplcache

Delay node implementation

6/22/2018 1:32:13 PM

Details

diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/delay/TbMsgDelayNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/delay/TbMsgDelayNode.java
new file mode 100644
index 0000000..9330136
--- /dev/null
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/delay/TbMsgDelayNode.java
@@ -0,0 +1,84 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.rule.engine.delay;
+
+import lombok.extern.slf4j.Slf4j;
+import org.thingsboard.rule.engine.api.RuleNode;
+import org.thingsboard.rule.engine.api.TbContext;
+import org.thingsboard.rule.engine.api.TbNode;
+import org.thingsboard.rule.engine.api.TbNodeConfiguration;
+import org.thingsboard.rule.engine.api.TbNodeException;
+import org.thingsboard.rule.engine.api.util.TbNodeUtils;
+import org.thingsboard.server.common.data.plugin.ComponentType;
+import org.thingsboard.server.common.msg.TbMsg;
+import org.thingsboard.server.common.msg.TbMsgMetaData;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.thingsboard.rule.engine.api.TbRelationTypes.FAILURE;
+import static org.thingsboard.rule.engine.api.TbRelationTypes.SUCCESS;
+
+@Slf4j
+@RuleNode(
+        type = ComponentType.ACTION,
+        name = "delay",
+        configClazz = TbMsgDelayNodeConfiguration.class,
+        nodeDescription = "Delays incoming message",
+        nodeDetails = "Delays messages for configurable period.",
+        icon = "repeat"
+)
+
+public class TbMsgDelayNode implements TbNode {
+
+    private static final String TB_MSG_DELAY_NODE_MSG = "TbMsgDelayNodeMsg";
+
+    private TbMsgDelayNodeConfiguration config;
+    private long delay;
+    private Map<UUID, TbMsg> pendingMsgs;
+
+    @Override
+    public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
+        this.config = TbNodeUtils.convert(configuration, TbMsgDelayNodeConfiguration.class);
+        this.delay = TimeUnit.SECONDS.toMillis(config.getPeriodInSeconds());
+        this.pendingMsgs = new HashMap<>();
+    }
+
+    @Override
+    public void onMsg(TbContext ctx, TbMsg msg) {
+        if (msg.getType().equals(TB_MSG_DELAY_NODE_MSG)) {
+            TbMsg pendingMsg = pendingMsgs.remove(UUID.fromString(msg.getData()));
+            if (pendingMsg != null) {
+                ctx.tellNext(pendingMsg, SUCCESS);
+            }
+        } else {
+            if(pendingMsgs.size() < config.getMaxPendingMsgs()) {
+                pendingMsgs.put(msg.getId(), msg);
+                TbMsg tickMsg = ctx.newMsg(TB_MSG_DELAY_NODE_MSG, ctx.getSelfId(), new TbMsgMetaData(), msg.getId().toString());
+                ctx.tellSelf(tickMsg, delay);
+            } else {
+                ctx.tellNext(msg, FAILURE, new RuntimeException("Max limit of pending messages reached!"));
+            }
+        }
+    }
+
+    @Override
+    public void destroy() {
+        pendingMsgs.clear();
+    }
+}
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/delay/TbMsgDelayNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/delay/TbMsgDelayNodeConfiguration.java
new file mode 100644
index 0000000..411a1a5
--- /dev/null
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/delay/TbMsgDelayNodeConfiguration.java
@@ -0,0 +1,35 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.rule.engine.delay;
+
+import lombok.Data;
+import org.thingsboard.rule.engine.api.NodeConfiguration;
+import org.thingsboard.server.common.data.EntityType;
+
+@Data
+public class TbMsgDelayNodeConfiguration implements NodeConfiguration<TbMsgDelayNodeConfiguration> {
+
+    private int periodInSeconds;
+    private int maxPendingMsgs;
+
+    @Override
+    public TbMsgDelayNodeConfiguration defaultConfiguration() {
+        TbMsgDelayNodeConfiguration configuration = new TbMsgDelayNodeConfiguration();
+        configuration.setPeriodInSeconds(60);
+        configuration.setMaxPendingMsgs(1000);
+        return configuration;
+    }
+}