thingsboard-memoizeit
Changes
application/src/main/java/org/thingsboard/server/service/transaction/BaseRuleChainTransactionService.java 54(+54 -0)
rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleChainTransactionService.java 27(+27 -0)
Details
diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
index 6ed400a..a2e0aed 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
@@ -34,6 +34,7 @@ import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;
import org.thingsboard.rule.engine.api.MailService;
+import org.thingsboard.rule.engine.api.RuleChainTransactionService;
import org.thingsboard.server.actors.service.ActorService;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.Event;
@@ -222,6 +223,11 @@ public class ActorSystemContext {
@Getter
private RuleEngineTransportService ruleEngineTransportService;
+ @Lazy
+ @Autowired
+ @Getter
+ private RuleChainTransactionService ruleChainTransactionService;
+
@Value("${cluster.partition_id}")
@Getter
private long queuePartitionId;
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
index 27a766e..5491a16 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
@@ -20,6 +20,7 @@ import com.datastax.driver.core.utils.UUIDs;
import org.springframework.util.StringUtils;
import org.thingsboard.rule.engine.api.ListeningExecutor;
import org.thingsboard.rule.engine.api.MailService;
+import org.thingsboard.rule.engine.api.RuleChainTransactionService;
import org.thingsboard.rule.engine.api.RuleEngineDeviceRpcRequest;
import org.thingsboard.rule.engine.api.RuleEngineDeviceRpcResponse;
import org.thingsboard.rule.engine.api.RuleEngineRpcService;
@@ -233,6 +234,11 @@ class DefaultTbContext implements TbContext {
}
@Override
+ public RuleChainTransactionService getRuleChainTransactionService() {
+ return mainCtx.getRuleChainTransactionService();
+ }
+
+ @Override
public MailService getMailService() {
if (mainCtx.isAllowSystemMailService()) {
return mainCtx.getMailService();
diff --git a/application/src/main/java/org/thingsboard/server/service/transaction/BaseRuleChainTransactionService.java b/application/src/main/java/org/thingsboard/server/service/transaction/BaseRuleChainTransactionService.java
new file mode 100644
index 0000000..f2e88e2
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/service/transaction/BaseRuleChainTransactionService.java
@@ -0,0 +1,54 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ * <p>
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.service.transaction;
+
+import com.google.common.util.concurrent.FutureCallback;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Service;
+import org.thingsboard.rule.engine.api.RuleChainTransactionService;
+import org.thingsboard.server.common.data.id.EntityId;
+import org.thingsboard.server.common.msg.TbMsg;
+
+import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.LinkedBlockingQueue;
+
+@Service
+@Slf4j
+public class BaseRuleChainTransactionService implements RuleChainTransactionService {
+
+ @Value("${actors.rule.transaction.queue_size}")
+ private int queueSize;
+
+ private final ConcurrentMap<EntityId, Queue<TbMsg>> transactionMap = new ConcurrentHashMap<>();
+
+ @Override
+ public void beginTransaction(EntityId entityId, FutureCallback<Void> callback) {
+
+
+ transactionMap.computeIfAbsent(entityId, id -> new LinkedBlockingQueue<>(queueSize));
+
+ log.info("[{}]", queueSize);
+
+ }
+
+ @Override
+ public void endTransaction() {
+
+ }
+}
diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml
index 08808f9..a837a3e 100644
--- a/application/src/main/resources/thingsboard.yml
+++ b/application/src/main/resources/thingsboard.yml
@@ -212,6 +212,9 @@ actors:
node:
# Errors for particular actor are persisted once per specified amount of milliseconds
error_persist_frequency: "${ACTORS_RULE_NODE_ERROR_FREQUENCY:3000}"
+ transaction:
+ # Size of queues which store messages for transaction rule nodes
+ queue_size: "${ACTORS_RULE_TRANSACTION_QUEUE_SIZE:10}"
statistics:
# Enable/disable actor statistics
enabled: "${ACTORS_STATISTICS_ENABLED:true}"
diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleChainTransactionService.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleChainTransactionService.java
new file mode 100644
index 0000000..8ad1e67
--- /dev/null
+++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleChainTransactionService.java
@@ -0,0 +1,27 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.rule.engine.api;
+
+import com.google.common.util.concurrent.FutureCallback;
+import org.thingsboard.server.common.data.id.EntityId;
+
+public interface RuleChainTransactionService {
+
+ void beginTransaction(EntityId entityId, FutureCallback<Void> callback);
+
+ void endTransaction();
+
+}
diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java
index 37c4a51..29e7b26 100644
--- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java
+++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java
@@ -103,4 +103,6 @@ public interface TbContext {
ScriptEngine createJsScriptEngine(String script, String... argNames);
String getNodeId();
+
+ RuleChainTransactionService getRuleChainTransactionService();
}
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transaction/TbTransactionBeginNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transaction/TbTransactionBeginNode.java
new file mode 100644
index 0000000..5c267da
--- /dev/null
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transaction/TbTransactionBeginNode.java
@@ -0,0 +1,62 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.rule.engine.transaction;
+
+import com.google.common.util.concurrent.FutureCallback;
+import lombok.extern.slf4j.Slf4j;
+import org.thingsboard.rule.engine.api.EmptyNodeConfiguration;
+import org.thingsboard.rule.engine.api.RuleNode;
+import org.thingsboard.rule.engine.api.TbContext;
+import org.thingsboard.rule.engine.api.TbNode;
+import org.thingsboard.rule.engine.api.TbNodeConfiguration;
+import org.thingsboard.rule.engine.api.TbNodeException;
+import org.thingsboard.rule.engine.api.util.TbNodeUtils;
+import org.thingsboard.server.common.data.plugin.ComponentType;
+import org.thingsboard.server.common.msg.TbMsg;
+
+import java.util.concurrent.ExecutionException;
+
+@Slf4j
+@RuleNode(
+ type = ComponentType.ACTION,
+ name = "transaction start",
+ configClazz = EmptyNodeConfiguration.class,
+ nodeDescription = "Something",
+ nodeDetails = "Something more",
+ uiResources = {"static/rulenode/rulenode-core-config.js"},
+ configDirective = ("tbNodeEmptyConfig")
+)
+public class TbTransactionBeginNode implements TbNode {
+
+ private EmptyNodeConfiguration config;
+
+ @Override
+ public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
+ this.config = TbNodeUtils.convert(configuration, EmptyNodeConfiguration.class);
+ }
+
+ @Override
+ public void onMsg(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException, TbNodeException {
+ FutureCallback<Void> callback = null;
+
+ ctx.getRuleChainTransactionService().beginTransaction(msg.getOriginator(), callback);
+ }
+
+ @Override
+ public void destroy() {
+
+ }
+}
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transaction/TbTransactionEndNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transaction/TbTransactionEndNode.java
new file mode 100644
index 0000000..05c5508
--- /dev/null
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transaction/TbTransactionEndNode.java
@@ -0,0 +1,61 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.rule.engine.transaction;
+
+import lombok.extern.slf4j.Slf4j;
+import org.thingsboard.rule.engine.api.EmptyNodeConfiguration;
+import org.thingsboard.rule.engine.api.RuleChainTransactionService;
+import org.thingsboard.rule.engine.api.RuleNode;
+import org.thingsboard.rule.engine.api.TbContext;
+import org.thingsboard.rule.engine.api.TbNode;
+import org.thingsboard.rule.engine.api.TbNodeConfiguration;
+import org.thingsboard.rule.engine.api.TbNodeException;
+import org.thingsboard.rule.engine.api.util.TbNodeUtils;
+import org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNodeConfiguration;
+import org.thingsboard.server.common.data.plugin.ComponentType;
+import org.thingsboard.server.common.msg.TbMsg;
+
+import java.util.concurrent.ExecutionException;
+
+@Slf4j
+@RuleNode(
+ type = ComponentType.ACTION,
+ name = "transaction end",
+ configClazz = EmptyNodeConfiguration.class,
+ nodeDescription = "Something",
+ nodeDetails = "Something more",
+ uiResources = {"static/rulenode/rulenode-core-config.js"},
+ configDirective = ("tbNodeEmptyConfig")
+)
+public class TbTransactionEndNode implements TbNode {
+
+ private EmptyNodeConfiguration config;
+
+ @Override
+ public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
+ this.config = TbNodeUtils.convert(configuration, EmptyNodeConfiguration.class);
+ }
+
+ @Override
+ public void onMsg(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException, TbNodeException {
+
+ }
+
+ @Override
+ public void destroy() {
+
+ }
+}