thingsboard-memoizeit

transaction rule nodes init

11/26/2018 7:49:46 AM

Details

diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
index 6ed400a..a2e0aed 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
@@ -34,6 +34,7 @@ import org.springframework.beans.factory.annotation.Value;
 import org.springframework.context.annotation.Lazy;
 import org.springframework.stereotype.Component;
 import org.thingsboard.rule.engine.api.MailService;
+import org.thingsboard.rule.engine.api.RuleChainTransactionService;
 import org.thingsboard.server.actors.service.ActorService;
 import org.thingsboard.server.common.data.DataConstants;
 import org.thingsboard.server.common.data.Event;
@@ -222,6 +223,11 @@ public class ActorSystemContext {
     @Getter
     private RuleEngineTransportService ruleEngineTransportService;
 
+    @Lazy
+    @Autowired
+    @Getter
+    private RuleChainTransactionService ruleChainTransactionService;
+
     @Value("${cluster.partition_id}")
     @Getter
     private long queuePartitionId;
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
index 27a766e..5491a16 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
@@ -20,6 +20,7 @@ import com.datastax.driver.core.utils.UUIDs;
 import org.springframework.util.StringUtils;
 import org.thingsboard.rule.engine.api.ListeningExecutor;
 import org.thingsboard.rule.engine.api.MailService;
+import org.thingsboard.rule.engine.api.RuleChainTransactionService;
 import org.thingsboard.rule.engine.api.RuleEngineDeviceRpcRequest;
 import org.thingsboard.rule.engine.api.RuleEngineDeviceRpcResponse;
 import org.thingsboard.rule.engine.api.RuleEngineRpcService;
@@ -233,6 +234,11 @@ class DefaultTbContext implements TbContext {
     }
 
     @Override
+    public RuleChainTransactionService getRuleChainTransactionService() {
+        return mainCtx.getRuleChainTransactionService();
+    }
+
+    @Override
     public MailService getMailService() {
         if (mainCtx.isAllowSystemMailService()) {
             return mainCtx.getMailService();
diff --git a/application/src/main/java/org/thingsboard/server/service/transaction/BaseRuleChainTransactionService.java b/application/src/main/java/org/thingsboard/server/service/transaction/BaseRuleChainTransactionService.java
new file mode 100644
index 0000000..f2e88e2
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/service/transaction/BaseRuleChainTransactionService.java
@@ -0,0 +1,54 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ * <p>
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.service.transaction;
+
+import com.google.common.util.concurrent.FutureCallback;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Service;
+import org.thingsboard.rule.engine.api.RuleChainTransactionService;
+import org.thingsboard.server.common.data.id.EntityId;
+import org.thingsboard.server.common.msg.TbMsg;
+
+import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.LinkedBlockingQueue;
+
+@Service
+@Slf4j
+public class BaseRuleChainTransactionService implements RuleChainTransactionService {
+
+    @Value("${actors.rule.transaction.queue_size}")
+    private int queueSize;
+
+    private final ConcurrentMap<EntityId, Queue<TbMsg>> transactionMap = new ConcurrentHashMap<>();
+
+    @Override
+    public void beginTransaction(EntityId entityId, FutureCallback<Void> callback) {
+
+
+        transactionMap.computeIfAbsent(entityId, id -> new LinkedBlockingQueue<>(queueSize));
+
+        log.info("[{}]", queueSize);
+
+    }
+
+    @Override
+    public void endTransaction() {
+
+    }
+}
diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml
index 08808f9..a837a3e 100644
--- a/application/src/main/resources/thingsboard.yml
+++ b/application/src/main/resources/thingsboard.yml
@@ -212,6 +212,9 @@ actors:
     node:
       # Errors for particular actor are persisted once per specified amount of milliseconds
       error_persist_frequency: "${ACTORS_RULE_NODE_ERROR_FREQUENCY:3000}"
+    transaction:
+      # Size of queues which store messages for transaction rule nodes
+      queue_size: "${ACTORS_RULE_TRANSACTION_QUEUE_SIZE:10}"
   statistics:
     # Enable/disable actor statistics
     enabled: "${ACTORS_STATISTICS_ENABLED:true}"
diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleChainTransactionService.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleChainTransactionService.java
new file mode 100644
index 0000000..8ad1e67
--- /dev/null
+++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleChainTransactionService.java
@@ -0,0 +1,27 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.rule.engine.api;
+
+import com.google.common.util.concurrent.FutureCallback;
+import org.thingsboard.server.common.data.id.EntityId;
+
+public interface RuleChainTransactionService {
+
+    void beginTransaction(EntityId entityId, FutureCallback<Void> callback);
+
+    void endTransaction();
+
+}
diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java
index 37c4a51..29e7b26 100644
--- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java
+++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java
@@ -103,4 +103,6 @@ public interface TbContext {
     ScriptEngine createJsScriptEngine(String script, String... argNames);
 
     String getNodeId();
+
+    RuleChainTransactionService getRuleChainTransactionService();
 }
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transaction/TbTransactionBeginNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transaction/TbTransactionBeginNode.java
new file mode 100644
index 0000000..5c267da
--- /dev/null
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transaction/TbTransactionBeginNode.java
@@ -0,0 +1,62 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.rule.engine.transaction;
+
+import com.google.common.util.concurrent.FutureCallback;
+import lombok.extern.slf4j.Slf4j;
+import org.thingsboard.rule.engine.api.EmptyNodeConfiguration;
+import org.thingsboard.rule.engine.api.RuleNode;
+import org.thingsboard.rule.engine.api.TbContext;
+import org.thingsboard.rule.engine.api.TbNode;
+import org.thingsboard.rule.engine.api.TbNodeConfiguration;
+import org.thingsboard.rule.engine.api.TbNodeException;
+import org.thingsboard.rule.engine.api.util.TbNodeUtils;
+import org.thingsboard.server.common.data.plugin.ComponentType;
+import org.thingsboard.server.common.msg.TbMsg;
+
+import java.util.concurrent.ExecutionException;
+
+@Slf4j
+@RuleNode(
+        type = ComponentType.ACTION,
+        name = "transaction start",
+        configClazz = EmptyNodeConfiguration.class,
+        nodeDescription = "Something",
+        nodeDetails = "Something more",
+        uiResources = {"static/rulenode/rulenode-core-config.js"},
+        configDirective = ("tbNodeEmptyConfig")
+)
+public class TbTransactionBeginNode implements TbNode {
+
+    private EmptyNodeConfiguration config;
+
+    @Override
+    public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
+        this.config = TbNodeUtils.convert(configuration, EmptyNodeConfiguration.class);
+    }
+
+    @Override
+    public void onMsg(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException, TbNodeException {
+        FutureCallback<Void> callback = null;
+
+        ctx.getRuleChainTransactionService().beginTransaction(msg.getOriginator(), callback);
+    }
+
+    @Override
+    public void destroy() {
+
+    }
+}
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transaction/TbTransactionEndNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transaction/TbTransactionEndNode.java
new file mode 100644
index 0000000..05c5508
--- /dev/null
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transaction/TbTransactionEndNode.java
@@ -0,0 +1,61 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.rule.engine.transaction;
+
+import lombok.extern.slf4j.Slf4j;
+import org.thingsboard.rule.engine.api.EmptyNodeConfiguration;
+import org.thingsboard.rule.engine.api.RuleChainTransactionService;
+import org.thingsboard.rule.engine.api.RuleNode;
+import org.thingsboard.rule.engine.api.TbContext;
+import org.thingsboard.rule.engine.api.TbNode;
+import org.thingsboard.rule.engine.api.TbNodeConfiguration;
+import org.thingsboard.rule.engine.api.TbNodeException;
+import org.thingsboard.rule.engine.api.util.TbNodeUtils;
+import org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNodeConfiguration;
+import org.thingsboard.server.common.data.plugin.ComponentType;
+import org.thingsboard.server.common.msg.TbMsg;
+
+import java.util.concurrent.ExecutionException;
+
+@Slf4j
+@RuleNode(
+        type = ComponentType.ACTION,
+        name = "transaction end",
+        configClazz = EmptyNodeConfiguration.class,
+        nodeDescription = "Something",
+        nodeDetails = "Something more",
+        uiResources = {"static/rulenode/rulenode-core-config.js"},
+        configDirective = ("tbNodeEmptyConfig")
+)
+public class TbTransactionEndNode implements TbNode {
+
+    private EmptyNodeConfiguration config;
+
+    @Override
+    public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
+        this.config = TbNodeUtils.convert(configuration, EmptyNodeConfiguration.class);
+    }
+
+    @Override
+    public void onMsg(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException, TbNodeException {
+
+    }
+
+    @Override
+    public void destroy() {
+
+    }
+}