thingsboard-aplcache
Changes
application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java 4(+2 -2)
application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java 2(+1 -1)
application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java 2(+1 -1)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbLogNode.java 5(+3 -2)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNode.java 5(+3 -2)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsSwitchNode.java 1(+1 -0)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsSwitchNodeConfiguration.java 6(+4 -2)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbMsgTypeFilterNode.java 2(+1 -1)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbMsgTypeSwitchNode.java 2(+1 -1)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mail/TbMsgToEmailNode.java 3(+2 -1)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mail/TbSendEmailNode.java 3(+2 -1)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbEntityGetAttrNode.java 28(+21 -7)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetAttributesNode.java 32(+14 -18)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetCustomerAttributeNode.java 3(+2 -1)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetRelatedAttributeNode.java 3(+2 -1)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTenantAttributeNode.java 3(+2 -1)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TelemetryNodeCallback.java 4(+3 -1)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbAbstractTransformNode.java 23(+13 -10)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbChangeOriginatorNode.java 14(+11 -3)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbChangeOriginatorNodeConfiguration.java 1(+0 -1)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbTransformMsgNode.java 2(+1 -1)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbTransformMsgNodeConfiguration.java 1(+0 -1)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbTransformNodeConfiguration.java 1(+0 -1)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesCustomerIdAsyncLoader.java 2(+1 -1)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesRelatedEntityIdAsyncLoader.java 4(+2 -2)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesTenantIdAsyncLoader.java 2(+1 -1)
rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/TbGetCustomerAttributeNodeTest.java 23(+8 -15)
Details
diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
index 5112f22..dd270e2 100644
--- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
@@ -339,7 +339,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
kv.getStrValue().ifPresent(v -> json.addProperty(kv.getKey(), v));
}
- TbMsg tbMsg = new TbMsg(UUIDs.timeBased(), SessionMsgType.POST_ATTRIBUTES_REQUEST.name(), deviceId, defaultMetaData, TbMsgDataType.JSON, gson.toJson(json), null, null, 0L);
+ TbMsg tbMsg = new TbMsg(UUIDs.timeBased(), SessionMsgType.POST_ATTRIBUTES_REQUEST.name(), deviceId, defaultMetaData.copy(), TbMsgDataType.JSON, gson.toJson(json), null, null, 0L);
PendingSessionMsgData msgData = new PendingSessionMsgData(src.getSessionId(), src.getServerAddress(),
SessionMsgType.POST_ATTRIBUTES_REQUEST, request.getRequestId(), true, 1);
pushToRuleEngineWithTimeout(context, tbMsg, msgData);
@@ -364,7 +364,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
kv.getStrValue().ifPresent(v -> values.addProperty(kv.getKey(), v));
}
json.add("values", values);
- TbMsg tbMsg = new TbMsg(UUIDs.timeBased(), SessionMsgType.POST_TELEMETRY_REQUEST.name(), deviceId, defaultMetaData, TbMsgDataType.JSON, gson.toJson(json), null, null, 0L);
+ TbMsg tbMsg = new TbMsg(UUIDs.timeBased(), SessionMsgType.POST_TELEMETRY_REQUEST.name(), deviceId, defaultMetaData.copy(), TbMsgDataType.JSON, gson.toJson(json), null, null, 0L);
pushToRuleEngineWithTimeout(context, tbMsg, msgData);
}
}
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
index f6c45c0..039fdf6 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
@@ -62,11 +62,6 @@ class DefaultTbContext implements TbContext {
}
@Override
- public void tellNext(TbMsg msg) {
- tellNext(msg, (String) null);
- }
-
- @Override
public void tellNext(TbMsg msg, String relationType) {
tellNext(msg, relationType, null);
}
@@ -100,11 +95,6 @@ class DefaultTbContext implements TbContext {
}
@Override
- public void spawn(TbMsg msg) {
- throw new RuntimeException("Not Implemented!");
- }
-
- @Override
public void ack(TbMsg msg) {
}
@@ -124,12 +114,12 @@ class DefaultTbContext implements TbContext {
@Override
public TbMsg newMsg(String type, EntityId originator, TbMsgMetaData metaData, String data) {
- return new TbMsg(UUIDs.timeBased(), type, originator, metaData, data, nodeCtx.getSelf().getRuleChainId(), nodeCtx.getSelf().getId(), 0L);
+ return new TbMsg(UUIDs.timeBased(), type, originator, metaData.copy(), data, nodeCtx.getSelf().getRuleChainId(), nodeCtx.getSelf().getId(), 0L);
}
@Override
public TbMsg transformMsg(TbMsg origMsg, String type, EntityId originator, TbMsgMetaData metaData, String data) {
- return new TbMsg(origMsg.getId(), type, originator, metaData, data, nodeCtx.getSelf().getRuleChainId(), nodeCtx.getSelf().getId(), 0L);
+ return new TbMsg(origMsg.getId(), type, originator, metaData.copy(), data, nodeCtx.getSelf().getRuleChainId(), nodeCtx.getSelf().getId(), 0L);
}
@Override
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
index 2cd1755..9c60327 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
@@ -269,6 +269,6 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
private TbMsg enrichWithRuleChainId(TbMsg tbMsg) {
// We don't put firstNodeId because it may change over time;
- return new TbMsg(tbMsg.getId(), tbMsg.getType(), tbMsg.getOriginator(), tbMsg.getMetaData(), tbMsg.getData(), entityId, null, 0L);
+ return new TbMsg(tbMsg.getId(), tbMsg.getType(), tbMsg.getOriginator(), tbMsg.getMetaData().copy(), tbMsg.getData(), entityId, null, 0L);
}
}
diff --git a/application/src/main/java/org/thingsboard/server/service/script/NashornJsEngine.java b/application/src/main/java/org/thingsboard/server/service/script/NashornJsEngine.java
index 8e689ec..b5cc5e0 100644
--- a/application/src/main/java/org/thingsboard/server/service/script/NashornJsEngine.java
+++ b/application/src/main/java/org/thingsboard/server/service/script/NashornJsEngine.java
@@ -116,7 +116,7 @@ public class NashornJsEngine implements org.thingsboard.rule.engine.api.ScriptEn
messageType = msgData.get(MSG_TYPE).asText();
}
String newData = data != null ? data : msg.getData();
- TbMsgMetaData newMetadata = metadata != null ? new TbMsgMetaData(metadata) : msg.getMetaData();
+ TbMsgMetaData newMetadata = metadata != null ? new TbMsgMetaData(metadata) : msg.getMetaData().copy();
String newMessageType = !StringUtils.isEmpty(messageType) ? messageType : msg.getType();
return new TbMsg(msg.getId(), newMessageType, msg.getOriginator(), newMetadata, newData, msg.getRuleChainId(), msg.getRuleNodeId(), msg.getClusterPartition());
} catch (Throwable th) {
diff --git a/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java b/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java
index b65e9c1..45b9696 100644
--- a/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java
+++ b/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java
@@ -345,7 +345,7 @@ public class DefaultDeviceStateService implements DeviceStateService {
private void pushRuleEngineMessage(DeviceStateData stateData, String msgType) {
DeviceState state = stateData.getState();
try {
- TbMsg tbMsg = new TbMsg(UUIDs.timeBased(), msgType, stateData.getDeviceId(), stateData.getMetaData(), TbMsgDataType.JSON
+ TbMsg tbMsg = new TbMsg(UUIDs.timeBased(), msgType, stateData.getDeviceId(), stateData.getMetaData().copy(), TbMsgDataType.JSON
, json.writeValueAsString(state)
, null, null, 0L);
actorService.onMsg(new ServiceToRuleEngineMsg(stateData.getTenantId(), tbMsg));
diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java
index 2b30872..a6eb64e 100644
--- a/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java
+++ b/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java
@@ -106,7 +106,7 @@ public final class TbMsg implements Serializable {
}
public TbMsg copy(UUID newId, RuleChainId ruleChainId, RuleNodeId ruleNodeId, long clusterPartition) {
- return new TbMsg(newId, type, originator, metaData, dataType, data, ruleChainId, ruleNodeId, clusterPartition);
+ return new TbMsg(newId, type, originator, metaData.copy(), dataType, data, ruleChainId, ruleNodeId, clusterPartition);
}
}
diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/queue/InMemoryMsgQueue.java b/dao/src/main/java/org/thingsboard/server/dao/sql/queue/InMemoryMsgQueue.java
index ce579cd..2825331 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/sql/queue/InMemoryMsgQueue.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/sql/queue/InMemoryMsgQueue.java
@@ -22,7 +22,6 @@ import com.google.common.util.concurrent.MoreExecutors;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
-import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.dao.queue.MsgQueue;
@@ -30,12 +29,7 @@ import org.thingsboard.server.dao.util.SqlDao;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
+import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
@@ -72,13 +66,13 @@ public class InMemoryMsgQueue implements MsgQueue {
@Override
public ListenableFuture<Void> put(TbMsg msg, UUID nodeId, long clusterPartition) {
- if (pendingMsgCount.get() < maxSize) {
+ if (pendingMsgCount.incrementAndGet() < maxSize) {
return queueExecutor.submit(() -> {
data.computeIfAbsent(new InMemoryMsgKey(nodeId, clusterPartition), key -> new HashMap<>()).put(msg.getId(), msg);
- pendingMsgCount.incrementAndGet();
return null;
});
} else {
+ pendingMsgCount.decrementAndGet();
return Futures.immediateFailedFuture(new RuntimeException("Message queue is full!"));
}
}
diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java
index 962f8aa..fcb8912 100644
--- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java
+++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java
@@ -40,8 +40,6 @@ import java.util.Set;
*/
public interface TbContext {
- void tellNext(TbMsg msg);
-
void tellNext(TbMsg msg, String relationType);
void tellNext(TbMsg msg, String relationType, Throwable th);
@@ -54,8 +52,6 @@ public interface TbContext {
void tellSibling(TbMsg msg, ServerAddress address);
- void spawn(TbMsg msg);
-
void ack(TbMsg msg);
void tellError(TbMsg msg, Throwable th);
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbLogNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbLogNode.java
index f6c6213..595826f 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbLogNode.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbLogNode.java
@@ -22,6 +22,7 @@ import org.thingsboard.server.common.data.plugin.ComponentType;
import org.thingsboard.server.common.msg.TbMsg;
import static org.thingsboard.rule.engine.DonAsynchron.withCallback;
+import static org.thingsboard.rule.engine.api.TbRelationTypes.SUCCESS;
@Slf4j
@RuleNode(
@@ -29,7 +30,7 @@ import static org.thingsboard.rule.engine.DonAsynchron.withCallback;
name = "log",
configClazz = TbLogNodeConfiguration.class,
nodeDescription = "Log incoming messages using JS script for transformation Message into String",
- nodeDetails = "Transform incoming Message with configured JS condition to String and log final value. " +
+ nodeDetails = "Transform incoming Message with configured JS function to String and log final value into Thingsboard log file. " +
"Message payload can be accessed via <code>msg</code> property. For example <code>'temperature = ' + msg.temperature ;</code>" +
"Message metadata can be accessed via <code>metadata</code> property. For example <code>'name = ' + metadata.customerName;</code>",
uiResources = {"static/rulenode/rulenode-core-config.js"},
@@ -54,7 +55,7 @@ public class TbLogNode implements TbNode {
withCallback(jsExecutor.executeAsync(() -> jsEngine.executeToString(msg)),
toString -> {
log.info(toString);
- ctx.tellNext(msg);
+ ctx.tellNext(msg, SUCCESS);
},
t -> ctx.tellError(msg, t));
}
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNode.java
index 7419ef7..9ad08c1 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNode.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNode.java
@@ -30,6 +30,7 @@ import java.util.UUID;
import java.util.concurrent.TimeUnit;
import static org.thingsboard.rule.engine.DonAsynchron.withCallback;
+import static org.thingsboard.rule.engine.api.TbRelationTypes.SUCCESS;
@Slf4j
@RuleNode(
@@ -37,7 +38,7 @@ import static org.thingsboard.rule.engine.DonAsynchron.withCallback;
name = "generator",
configClazz = TbMsgGeneratorNodeConfiguration.class,
nodeDescription = "Periodically generates messages",
- nodeDetails = "Generates messages with configurable period. ",
+ nodeDetails = "Generates messages with configurable period. Javascript function used fore message generation.",
inEnabled = false,
uiResources = {"static/rulenode/rulenode-core-config.js", "static/rulenode/rulenode-core-config.css"},
configDirective = "tbActionNodeGeneratorConfig",
@@ -72,7 +73,7 @@ public class TbMsgGeneratorNode implements TbNode {
public void onMsg(TbContext ctx, TbMsg msg) {
if (msg.getType().equals(TB_MSG_GENERATOR_NODE_MSG) && msg.getId().equals(nextTickId)) {
withCallback(generate(ctx),
- m -> {ctx.tellNext(m); sentTickMsg(ctx);},
+ m -> {ctx.tellNext(m, SUCCESS); sentTickMsg(ctx);},
t -> {ctx.tellError(msg, t); sentTickMsg(ctx);});
}
}
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsSwitchNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsSwitchNode.java
index 4ee8bd5..8d60dd3 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsSwitchNode.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsSwitchNode.java
@@ -29,6 +29,7 @@ import static org.thingsboard.rule.engine.DonAsynchron.withCallback;
@RuleNode(
type = ComponentType.FILTER,
name = "switch", customRelations = true,
+ relationTypes = {},
configClazz = TbJsSwitchNodeConfiguration.class,
nodeDescription = "Route incoming Message to one or multiple output chains",
nodeDetails = "Node executes configured JS script. Script should return array of next Chain names where Message should be routed. " +
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsSwitchNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsSwitchNodeConfiguration.java
index 79b0912..4c5808b 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsSwitchNodeConfiguration.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsSwitchNodeConfiguration.java
@@ -30,9 +30,11 @@ public class TbJsSwitchNodeConfiguration implements NodeConfiguration<TbJsSwitch
public TbJsSwitchNodeConfiguration defaultConfiguration() {
TbJsSwitchNodeConfiguration configuration = new TbJsSwitchNodeConfiguration();
configuration.setJsScript("function nextRelation(metadata, msg) {\n" +
- " return ['one','nine'];" +
+ " return ['one','nine'];\n" +
"};\n" +
- "\n" +
+ "if(msgType === 'POST_TELEMETRY') {\n" +
+ " return ['two'];\n" +
+ "}\n" +
"return nextRelation(metadata, msg);");
return configuration;
}
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbMsgTypeFilterNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbMsgTypeFilterNode.java
index 6426106..df826e6 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbMsgTypeFilterNode.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbMsgTypeFilterNode.java
@@ -31,7 +31,7 @@ import org.thingsboard.server.common.msg.TbMsg;
configClazz = TbMsgTypeFilterNodeConfiguration.class,
relationTypes = {"True", "False"},
nodeDescription = "Filter incoming messages by Message Type",
- nodeDetails = "If incoming MessageType is expected - send Message via <b>Success</b> chain, otherwise <b>Failure</b> chain is used.",
+ nodeDetails = "If incoming MessageType is expected - send Message via <b>True</b> chain, otherwise <b>False</b> chain is used.",
uiResources = {"static/rulenode/rulenode-core-config.js", "static/rulenode/rulenode-core-config.css"},
configDirective = "tbFilterNodeMessageTypeConfig")
public class TbMsgTypeFilterNode implements TbNode {
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbMsgTypeSwitchNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbMsgTypeSwitchNode.java
index 75329a0..20bbb2f 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbMsgTypeSwitchNode.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbMsgTypeSwitchNode.java
@@ -30,7 +30,7 @@ import org.thingsboard.server.common.msg.session.SessionMsgType;
configClazz = EmptyNodeConfiguration.class,
relationTypes = {"Post attributes", "Post telemetry", "RPC Request", "Activity Event", "Inactivity Event", "Connect Event", "Disconnect Event", "Other"},
nodeDescription = "Route incoming messages by Message Type",
- nodeDetails = "Sends messages with message types <b>\"Post attributes\", \"Post telemetry\", \"RPC Request\"</b> via corresponding chain, otherwise <b>Other</b> chain is used.",
+ nodeDetails = "Sends messages with message types <b>\"Post attributes\", \"Post telemetry\", \"RPC Request\"</b> etc. via corresponding chain, otherwise <b>Other</b> chain is used.",
uiResources = {"static/rulenode/rulenode-core-config.js"},
configDirective = "tbNodeEmptyConfig")
public class TbMsgTypeSwitchNode implements TbNode {
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mail/TbMsgToEmailNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mail/TbMsgToEmailNode.java
index 3982c8e..90a4c5b 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mail/TbMsgToEmailNode.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mail/TbMsgToEmailNode.java
@@ -27,6 +27,7 @@ import org.thingsboard.server.common.msg.TbMsgMetaData;
import java.io.IOException;
+import static org.thingsboard.rule.engine.api.TbRelationTypes.SUCCESS;
import static org.thingsboard.rule.engine.mail.TbSendEmailNode.SEND_EMAIL_TYPE;
@Slf4j
@@ -57,7 +58,7 @@ public class TbMsgToEmailNode implements TbNode {
try {
EmailPojo email = convert(msg);
TbMsg emailMsg = buildEmailMsg(ctx, msg, email);
- ctx.tellNext(emailMsg);
+ ctx.tellNext(emailMsg, SUCCESS);
} catch (Exception ex) {
log.warn("Can not convert message to email " + ex.getMessage());
ctx.tellError(msg, ex);
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mail/TbSendEmailNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mail/TbSendEmailNode.java
index d590698..3c23c75 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mail/TbSendEmailNode.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mail/TbSendEmailNode.java
@@ -32,6 +32,7 @@ import java.io.IOException;
import java.util.Properties;
import static org.thingsboard.rule.engine.DonAsynchron.withCallback;
+import static org.thingsboard.rule.engine.api.TbRelationTypes.SUCCESS;
@Slf4j
@RuleNode(
@@ -76,7 +77,7 @@ public class TbSendEmailNode implements TbNode {
sendEmail(ctx, email);
return null;
}),
- ok -> ctx.tellNext(msg),
+ ok -> ctx.tellNext(msg, SUCCESS),
fail -> ctx.tellError(msg, fail));
} catch (Exception ex) {
ctx.tellError(msg, ex);
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbEntityGetAttrNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbEntityGetAttrNode.java
index 3bb1cff..82805eb 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbEntityGetAttrNode.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbEntityGetAttrNode.java
@@ -18,8 +18,12 @@ package org.thingsboard.rule.engine.metadata;
import com.google.common.base.Function;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import lombok.extern.slf4j.Slf4j;
import org.thingsboard.rule.engine.TbNodeUtils;
-import org.thingsboard.rule.engine.api.*;
+import org.thingsboard.rule.engine.api.TbContext;
+import org.thingsboard.rule.engine.api.TbNode;
+import org.thingsboard.rule.engine.api.TbNodeConfiguration;
+import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.KvEntry;
@@ -30,8 +34,11 @@ import java.util.List;
import java.util.stream.Collectors;
import static org.thingsboard.rule.engine.DonAsynchron.withCallback;
+import static org.thingsboard.rule.engine.api.TbRelationTypes.FAILURE;
+import static org.thingsboard.rule.engine.api.TbRelationTypes.SUCCESS;
import static org.thingsboard.server.common.data.DataConstants.SERVER_SCOPE;
+@Slf4j
public abstract class TbEntityGetAttrNode<T extends EntityId> implements TbNode {
private TbGetEntityAttrNodeConfiguration config;
@@ -46,17 +53,24 @@ public abstract class TbEntityGetAttrNode<T extends EntityId> implements TbNode
try {
withCallback(
findEntityAsync(ctx, msg.getOriginator()),
- entityId -> withCallback(
- config.isTelemetry() ? getLatestTelemetry(ctx, entityId) : getAttributesAsync(ctx, entityId),
- attributes -> putAttributesAndTell(ctx, msg, attributes),
- t -> ctx.tellError(msg, t)
- ),
+ entityId -> safeGetAttributes(ctx, msg, entityId),
t -> ctx.tellError(msg, t));
} catch (Throwable th) {
ctx.tellError(msg, th);
}
}
+ private void safeGetAttributes(TbContext ctx, TbMsg msg, T entityId) {
+ if(entityId == null || entityId.isNullUid()) {
+ ctx.tellNext(msg, FAILURE);
+ return;
+ }
+
+ withCallback(config.isTelemetry() ? getLatestTelemetry(ctx, entityId) : getAttributesAsync(ctx, entityId),
+ attributes -> putAttributesAndTell(ctx, msg, attributes),
+ t -> ctx.tellError(msg, t));
+ }
+
private ListenableFuture<List<KvEntry>> getAttributesAsync(TbContext ctx, EntityId entityId) {
ListenableFuture<List<AttributeKvEntry>> latest = ctx.getAttributesService().find(entityId, SERVER_SCOPE, config.getAttrMapping().keySet());
return Futures.transform(latest, (Function<? super List<AttributeKvEntry>, ? extends List<KvEntry>>) l ->
@@ -75,7 +89,7 @@ public abstract class TbEntityGetAttrNode<T extends EntityId> implements TbNode
String attrName = config.getAttrMapping().get(r.getKey());
msg.getMetaData().putValue(attrName, r.getValueAsString());
});
- ctx.tellNext(msg);
+ ctx.tellNext(msg, SUCCESS);
}
@Override
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetAttributesNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetAttributesNode.java
index 84cff22..06a0f44 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetAttributesNode.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetAttributesNode.java
@@ -30,6 +30,7 @@ import org.thingsboard.server.common.msg.TbMsg;
import java.util.List;
import static org.thingsboard.rule.engine.DonAsynchron.withCallback;
+import static org.thingsboard.rule.engine.api.TbRelationTypes.SUCCESS;
import static org.thingsboard.server.common.data.DataConstants.*;
/**
@@ -41,9 +42,9 @@ import static org.thingsboard.server.common.data.DataConstants.*;
configClazz = TbGetAttributesNodeConfiguration.class,
nodeDescription = "Add Message Originator Attributes or Latest Telemetry into Message Metadata",
nodeDetails = "If Attributes enrichment configured, <b>CLIENT/SHARED/SERVER</b> attributes are added into Message metadata " +
- "with specific prefix: <i>cs/shared/ss</i>. To access those attributes in other nodes this template can be used " +
- "<code>metadata.cs_temperature</code> or <code>metadata.shared_limit</code> " +
- "If Latest Telemetry enrichment configured, latest telemetry added into metadata without prefix.",
+ "with specific prefix: <i>cs/shared/ss</i>. Latest telemetry value added into metadata without prefix. " +
+ "To access those attributes in other nodes this template can be used " +
+ "<code>metadata.cs_temperature</code> or <code>metadata.shared_limit</code> ",
uiResources = {"static/rulenode/rulenode-core-config.js"},
configDirective = "tbEnrichmentNodeOriginatorAttributesConfig")
public class TbGetAttributesNode implements TbNode {
@@ -57,22 +58,17 @@ public class TbGetAttributesNode implements TbNode {
@Override
public void onMsg(TbContext ctx, TbMsg msg) throws TbNodeException {
- if (CollectionUtils.isNotEmpty(config.getLatestTsKeyNames())) {
- withCallback(getLatestTelemetry(ctx, msg, config.getLatestTsKeyNames()),
- i -> ctx.tellNext(msg),
- t -> ctx.tellError(msg, t));
- } else {
- ListenableFuture<List<Void>> future = Futures.allAsList(
- putAttrAsync(ctx, msg, CLIENT_SCOPE, config.getClientAttributeNames(), "cs_"),
- putAttrAsync(ctx, msg, SHARED_SCOPE, config.getSharedAttributeNames(), "shared_"),
- putAttrAsync(ctx, msg, SERVER_SCOPE, config.getServerAttributeNames(), "ss_"));
-
- withCallback(future, i -> ctx.tellNext(msg), t -> ctx.tellError(msg, t));
- }
+ ListenableFuture<List<Void>> allFutures = Futures.allAsList(
+ putLatestTelemetry(ctx, msg, config.getLatestTsKeyNames()),
+ putAttrAsync(ctx, msg, CLIENT_SCOPE, config.getClientAttributeNames(), "cs_"),
+ putAttrAsync(ctx, msg, SHARED_SCOPE, config.getSharedAttributeNames(), "shared_"),
+ putAttrAsync(ctx, msg, SERVER_SCOPE, config.getServerAttributeNames(), "ss_")
+ );
+ withCallback(allFutures, i -> ctx.tellNext(msg, SUCCESS), t -> ctx.tellError(msg, t));
}
private ListenableFuture<Void> putAttrAsync(TbContext ctx, TbMsg msg, String scope, List<String> keys, String prefix) {
- if (keys == null) {
+ if (CollectionUtils.isEmpty(keys)) {
return Futures.immediateFuture(null);
}
ListenableFuture<List<AttributeKvEntry>> latest = ctx.getAttributesService().find(msg.getOriginator(), scope, keys);
@@ -82,8 +78,8 @@ public class TbGetAttributesNode implements TbNode {
});
}
- private ListenableFuture<Void> getLatestTelemetry(TbContext ctx, TbMsg msg, List<String> keys) {
- if (keys == null) {
+ private ListenableFuture<Void> putLatestTelemetry(TbContext ctx, TbMsg msg, List<String> keys) {
+ if (CollectionUtils.isEmpty(keys)) {
return Futures.immediateFuture(null);
}
ListenableFuture<List<TsKvEntry>> latest = ctx.getTimeseriesService().findLatest(msg.getOriginator(), keys);
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetCustomerAttributeNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetCustomerAttributeNode.java
index b092bad..60b6b84 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetCustomerAttributeNode.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetCustomerAttributeNode.java
@@ -29,8 +29,9 @@ import org.thingsboard.server.common.data.plugin.ComponentType;
configClazz = TbGetEntityAttrNodeConfiguration.class,
nodeDescription = "Add Originators Customer Attributes or Latest Telemetry into Message Metadata",
nodeDetails = "If Attributes enrichment configured, server scope attributes are added into Message metadata. " +
+ "If Latest Telemetry enrichment configured, latest telemetry added into metadata. " +
"To access those attributes in other nodes this template can be used " +
- "<code>metadata.temperature</code>. If Latest Telemetry enrichment configured, latest telemetry added into metadata",
+ "<code>metadata.temperature</code>.",
uiResources = {"static/rulenode/rulenode-core-config.js", "static/rulenode/rulenode-core-config.css"},
configDirective = "tbEnrichmentNodeCustomerAttributesConfig")
public class TbGetCustomerAttributeNode extends TbEntityGetAttrNode<CustomerId> {
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetRelatedAttributeNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetRelatedAttributeNode.java
index 8f65c31..a648403 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetRelatedAttributeNode.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetRelatedAttributeNode.java
@@ -31,8 +31,9 @@ import org.thingsboard.server.common.data.plugin.ComponentType;
nodeDetails = "Related Entity found using configured relation direction and Relation Type. " +
"If multiple Related Entities are found, only first Entity is used for attributes enrichment, other entities are discarded. " +
"If Attributes enrichment configured, server scope attributes are added into Message metadata. " +
+ "If Latest Telemetry enrichment configured, latest telemetry added into metadata. " +
"To access those attributes in other nodes this template can be used " +
- "<code>metadata.temperature</code>. If Latest Telemetry enrichment configured, latest telemetry added into metadata",
+ "<code>metadata.temperature</code>.",
uiResources = {"static/rulenode/rulenode-core-config.js", "static/rulenode/rulenode-core-config.css"},
configDirective = "tbEnrichmentNodeRelatedAttributesConfig")
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTenantAttributeNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTenantAttributeNode.java
index f0d28d3..1ae6c68 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTenantAttributeNode.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTenantAttributeNode.java
@@ -31,8 +31,9 @@ import org.thingsboard.server.common.data.plugin.ComponentType;
configClazz = TbGetEntityAttrNodeConfiguration.class,
nodeDescription = "Add Originators Tenant Attributes or Latest Telemetry into Message Metadata",
nodeDetails = "If Attributes enrichment configured, server scope attributes are added into Message metadata. " +
+ "If Latest Telemetry enrichment configured, latest telemetry added into metadata. " +
"To access those attributes in other nodes this template can be used " +
- "<code>metadata.temperature</code>. If Latest Telemetry enrichment configured, latest telemetry added into metadata",
+ "<code>metadata.temperature</code>.",
uiResources = {"static/rulenode/rulenode-core-config.js", "static/rulenode/rulenode-core-config.css"},
configDirective = "tbEnrichmentNodeTenantAttributesConfig")
public class TbGetTenantAttributeNode extends TbEntityGetAttrNode<TenantId> {
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TelemetryNodeCallback.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TelemetryNodeCallback.java
index fab4942..d66b768 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TelemetryNodeCallback.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TelemetryNodeCallback.java
@@ -22,6 +22,8 @@ import org.thingsboard.server.common.msg.TbMsg;
import javax.annotation.Nullable;
+import static org.thingsboard.rule.engine.api.TbRelationTypes.SUCCESS;
+
/**
* Created by ashvayka on 02.04.18.
*/
@@ -32,7 +34,7 @@ class TelemetryNodeCallback implements FutureCallback<Void> {
@Override
public void onSuccess(@Nullable Void result) {
- ctx.tellNext(msg);
+ ctx.tellNext(msg, SUCCESS);
}
@Override
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbAbstractTransformNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbAbstractTransformNode.java
index d6fcc5e..74579a8 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbAbstractTransformNode.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbAbstractTransformNode.java
@@ -18,10 +18,15 @@ package org.thingsboard.rule.engine.transform;
import com.google.common.util.concurrent.ListenableFuture;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.rule.engine.TbNodeUtils;
-import org.thingsboard.rule.engine.api.*;
+import org.thingsboard.rule.engine.api.TbContext;
+import org.thingsboard.rule.engine.api.TbNode;
+import org.thingsboard.rule.engine.api.TbNodeConfiguration;
+import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.server.common.msg.TbMsg;
import static org.thingsboard.rule.engine.DonAsynchron.withCallback;
+import static org.thingsboard.rule.engine.api.TbRelationTypes.FAILURE;
+import static org.thingsboard.rule.engine.api.TbRelationTypes.SUCCESS;
/**
* Created by ashvayka on 19.01.18.
@@ -39,20 +44,18 @@ public abstract class TbAbstractTransformNode implements TbNode {
@Override
public void onMsg(TbContext ctx, TbMsg msg) {
withCallback(transform(ctx, msg),
- m -> routeMsg(ctx, m),
+ m -> {
+ if (m != null) {
+ ctx.tellNext(m, SUCCESS);
+ } else {
+ ctx.tellNext(msg, FAILURE);
+ }
+ },
t -> ctx.tellError(msg, t));
}
protected abstract ListenableFuture<TbMsg> transform(TbContext ctx, TbMsg msg);
- private void routeMsg(TbContext ctx, TbMsg msg) {
- if (config.isStartNewChain()) {
- ctx.spawn(msg);
- } else {
- ctx.tellNext(msg);
- }
- }
-
public void setConfig(TbTransformNodeConfiguration config) {
this.config = config;
}
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbChangeOriginatorNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbChangeOriginatorNode.java
index f6c4eba..220d871 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbChangeOriginatorNode.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbChangeOriginatorNode.java
@@ -21,7 +21,10 @@ import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.rule.engine.TbNodeUtils;
-import org.thingsboard.rule.engine.api.*;
+import org.thingsboard.rule.engine.api.RuleNode;
+import org.thingsboard.rule.engine.api.TbContext;
+import org.thingsboard.rule.engine.api.TbNodeConfiguration;
+import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.rule.engine.util.EntitiesCustomerIdAsyncLoader;
import org.thingsboard.rule.engine.util.EntitiesRelatedEntityIdAsyncLoader;
import org.thingsboard.rule.engine.util.EntitiesTenantIdAsyncLoader;
@@ -34,7 +37,7 @@ import java.util.HashSet;
@Slf4j
@RuleNode(
type = ComponentType.TRANSFORMATION,
- name="change originator",
+ name = "change originator",
configClazz = TbChangeOriginatorNodeConfiguration.class,
nodeDescription = "Change Message Originator To Tenant/Customer/Related Entity",
nodeDetails = "Related Entity found using configured relation direction and Relation Type. " +
@@ -61,7 +64,12 @@ public class TbChangeOriginatorNode extends TbAbstractTransformNode {
@Override
protected ListenableFuture<TbMsg> transform(TbContext ctx, TbMsg msg) {
ListenableFuture<? extends EntityId> newOriginator = getNewOriginator(ctx, msg.getOriginator());
- return Futures.transform(newOriginator, (Function<EntityId, TbMsg>) n -> ctx.transformMsg(msg, msg.getType(), n, msg.getMetaData(), msg.getData()));
+ return Futures.transform(newOriginator, (Function<EntityId, TbMsg>) n -> {
+ if (n == null || n.isNullUid()) {
+ return null;
+ }
+ return ctx.transformMsg(msg, msg.getType(), n, msg.getMetaData(), msg.getData());
+ });
}
private ListenableFuture<? extends EntityId> getNewOriginator(TbContext ctx, EntityId original) {
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbChangeOriginatorNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbChangeOriginatorNodeConfiguration.java
index 7cd77bf..32e9119 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbChangeOriginatorNodeConfiguration.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbChangeOriginatorNodeConfiguration.java
@@ -43,7 +43,6 @@ public class TbChangeOriginatorNodeConfiguration extends TbTransformNodeConfigur
relationsQuery.setFilters(Collections.singletonList(entityTypeFilter));
configuration.setRelationsQuery(relationsQuery);
- configuration.setStartNewChain(false);
return configuration;
}
}
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbTransformMsgNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbTransformMsgNode.java
index bf0c9fe..6edea6b 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbTransformMsgNode.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbTransformMsgNode.java
@@ -26,7 +26,7 @@ import org.thingsboard.server.common.msg.TbMsg;
name = "script",
configClazz = TbTransformMsgNodeConfiguration.class,
nodeDescription = "Change Message payload, Metadata or Message type using JavaScript",
- nodeDetails = "JavaScript function receive 3 input parameters.<br/> " +
+ nodeDetails = "JavaScript function receive 3 input parameters <br/> " +
"<code>metadata</code> - is a Message metadata.<br/>" +
"<code>msg</code> - is a Message payload.<br/>" +
"<code>msgType</code> - is a Message type.<br/>" +
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbTransformMsgNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbTransformMsgNodeConfiguration.java
index a710cf8..3e112ad 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbTransformMsgNodeConfiguration.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbTransformMsgNodeConfiguration.java
@@ -26,7 +26,6 @@ public class TbTransformMsgNodeConfiguration extends TbTransformNodeConfiguratio
@Override
public TbTransformMsgNodeConfiguration defaultConfiguration() {
TbTransformMsgNodeConfiguration configuration = new TbTransformMsgNodeConfiguration();
- configuration.setStartNewChain(false);
configuration.setJsScript("return {msg: msg, metadata: metadata, msgType: msgType};");
return configuration;
}
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbTransformNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbTransformNodeConfiguration.java
index d9f5780..85d4e2c 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbTransformNodeConfiguration.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbTransformNodeConfiguration.java
@@ -20,5 +20,4 @@ import lombok.Data;
@Data
public class TbTransformNodeConfiguration {
- private boolean startNewChain = false;
}
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesCustomerIdAsyncLoader.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesCustomerIdAsyncLoader.java
index 67eb808..73e1945 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesCustomerIdAsyncLoader.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesCustomerIdAsyncLoader.java
@@ -45,6 +45,6 @@ public class EntitiesCustomerIdAsyncLoader {
private static <T extends HasCustomerId> ListenableFuture<CustomerId> getCustomerAsync(ListenableFuture<T> future) {
return Futures.transform(future, (AsyncFunction<HasCustomerId, CustomerId>) in -> {
return in != null ? Futures.immediateFuture(in.getCustomerId())
- : Futures.immediateFailedFuture(new IllegalStateException("Customer not found"));});
+ : Futures.immediateFuture(null);});
}
}
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesRelatedEntityIdAsyncLoader.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesRelatedEntityIdAsyncLoader.java
index 08ce38e..55be558 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesRelatedEntityIdAsyncLoader.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesRelatedEntityIdAsyncLoader.java
@@ -40,11 +40,11 @@ public class EntitiesRelatedEntityIdAsyncLoader {
if (relationsQuery.getDirection() == EntitySearchDirection.FROM) {
return Futures.transform(asyncRelation, (AsyncFunction<? super List<EntityRelation>, EntityId>)
r -> CollectionUtils.isNotEmpty(r) ? Futures.immediateFuture(r.get(0).getTo())
- : Futures.immediateFailedFuture(new IllegalStateException("Relation not found")));
+ : Futures.immediateFuture(null));
} else if (relationsQuery.getDirection() == EntitySearchDirection.TO) {
return Futures.transform(asyncRelation, (AsyncFunction<? super List<EntityRelation>, EntityId>)
r -> CollectionUtils.isNotEmpty(r) ? Futures.immediateFuture(r.get(0).getFrom())
- : Futures.immediateFailedFuture(new IllegalStateException("Relation not found")));
+ : Futures.immediateFuture(null));
}
return Futures.immediateFailedFuture(new IllegalStateException("Unknown direction"));
}
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesTenantIdAsyncLoader.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesTenantIdAsyncLoader.java
index 5d2aaa8..3d5c64e 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesTenantIdAsyncLoader.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesTenantIdAsyncLoader.java
@@ -53,6 +53,6 @@ public class EntitiesTenantIdAsyncLoader {
private static <T extends HasTenantId> ListenableFuture<TenantId> getTenantAsync(ListenableFuture<T> future) {
return Futures.transform(future, (AsyncFunction<HasTenantId, TenantId>) in -> {
return in != null ? Futures.immediateFuture(in.getTenantId())
- : Futures.immediateFailedFuture(new IllegalStateException("Tenant not found"));});
+ : Futures.immediateFuture(null);});
}
}
diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/TbGetCustomerAttributeNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/TbGetCustomerAttributeNodeTest.java
index 158b7cd..997b684 100644
--- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/TbGetCustomerAttributeNodeTest.java
+++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/TbGetCustomerAttributeNodeTest.java
@@ -31,12 +31,7 @@ import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.User;
import org.thingsboard.server.common.data.asset.Asset;
-import org.thingsboard.server.common.data.id.AssetId;
-import org.thingsboard.server.common.data.id.CustomerId;
-import org.thingsboard.server.common.data.id.DeviceId;
-import org.thingsboard.server.common.data.id.RuleChainId;
-import org.thingsboard.server.common.data.id.RuleNodeId;
-import org.thingsboard.server.common.data.id.UserId;
+import org.thingsboard.server.common.data.id.*;
import org.thingsboard.server.common.data.kv.*;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData;
@@ -56,6 +51,8 @@ import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.same;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import static org.thingsboard.rule.engine.api.TbRelationTypes.FAILURE;
+import static org.thingsboard.rule.engine.api.TbRelationTypes.SUCCESS;
import static org.thingsboard.server.common.data.DataConstants.SERVER_SCOPE;
@RunWith(MockitoJUnitRunner.class)
@@ -148,7 +145,7 @@ public class TbGetCustomerAttributeNodeTest {
}
@Test
- public void errorThrownIfCustomerCannotBeFound() {
+ public void failedChainUsedIfCustomerCannotBeFound() {
UserId userId = new UserId(UUIDs.timeBased());
CustomerId customerId = new CustomerId(UUIDs.timeBased());
User user = new User();
@@ -159,13 +156,9 @@ public class TbGetCustomerAttributeNodeTest {
when(ctx.getUserService()).thenReturn(userService);
when(userService.findUserByIdAsync(userId)).thenReturn(Futures.immediateFuture(null));
- node.onMsg(ctx, msg);
- final ArgumentCaptor<Throwable> captor = ArgumentCaptor.forClass(Throwable.class);
- verify(ctx).tellError(same(msg), captor.capture());
- Throwable value = captor.getValue();
- assertEquals(IllegalStateException.class, value.getClass());
- assertEquals("Customer not found", value.getMessage());
+ node.onMsg(ctx, msg);
+ verify(ctx).tellNext(msg, FAILURE);
assertTrue(msg.getMetaData().getData().isEmpty());
}
@@ -252,7 +245,7 @@ public class TbGetCustomerAttributeNodeTest {
.thenReturn(Futures.immediateFuture(timeseries));
node.onMsg(ctx, msg);
- verify(ctx).tellNext(msg);
+ verify(ctx).tellNext(msg, SUCCESS);
assertEquals(msg.getMetaData().getValue("tempo"), "highest");
}
@@ -264,7 +257,7 @@ public class TbGetCustomerAttributeNodeTest {
.thenReturn(Futures.immediateFuture(attributes));
node.onMsg(ctx, msg);
- verify(ctx).tellNext(msg);
+ verify(ctx).tellNext(msg, SUCCESS);
assertEquals(msg.getMetaData().getValue("tempo"), "high");
}
}
\ No newline at end of file
diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbChangeOriginatorNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbChangeOriginatorNodeTest.java
index b61589d..a31458c 100644
--- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbChangeOriginatorNodeTest.java
+++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbChangeOriginatorNodeTest.java
@@ -40,6 +40,7 @@ import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.same;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import static org.thingsboard.rule.engine.api.TbRelationTypes.FAILURE;
@RunWith(MockitoJUnitRunner.class)
public class TbChangeOriginatorNodeTest {
@@ -54,7 +55,7 @@ public class TbChangeOriginatorNodeTest {
@Test
public void originatorCanBeChangedToCustomerId() throws TbNodeException {
- init(false);
+ init();
AssetId assetId = new AssetId(UUIDs.timeBased());
CustomerId customerId = new CustomerId(UUIDs.timeBased());
Asset asset = new Asset();
@@ -82,7 +83,7 @@ public class TbChangeOriginatorNodeTest {
@Test
public void newChainCanBeStarted() throws TbNodeException {
- init(true);
+ init();
AssetId assetId = new AssetId(UUIDs.timeBased());
CustomerId customerId = new CustomerId(UUIDs.timeBased());
Asset asset = new Asset();
@@ -109,7 +110,7 @@ public class TbChangeOriginatorNodeTest {
@Test
public void exceptionThrownIfCannotFindNewOriginator() throws TbNodeException {
- init(true);
+ init();
AssetId assetId = new AssetId(UUIDs.timeBased());
CustomerId customerId = new CustomerId(UUIDs.timeBased());
Asset asset = new Asset();
@@ -121,19 +122,15 @@ public class TbChangeOriginatorNodeTest {
TbMsg msg = new TbMsg(UUIDs.timeBased(), "ASSET", assetId, new TbMsgMetaData(), "{}", ruleChainId, ruleNodeId, 0L);
when(ctx.getAssetService()).thenReturn(assetService);
- when(assetService.findAssetByIdAsync(assetId)).thenReturn(Futures.immediateFailedFuture(new IllegalStateException("wrong")));
+ when(assetService.findAssetByIdAsync(assetId)).thenReturn(Futures.immediateFuture(null));
node.onMsg(ctx, msg);
- ArgumentCaptor<Throwable> captor = ArgumentCaptor.forClass(Throwable.class);
- verify(ctx).tellError(same(msg), captor.capture());
- Throwable value = captor.getValue();
- assertEquals("wrong", value.getMessage());
+ verify(ctx).tellNext(same(msg), same(FAILURE));
}
- public void init(boolean startNewChain) throws TbNodeException {
+ public void init() throws TbNodeException {
TbChangeOriginatorNodeConfiguration config = new TbChangeOriginatorNodeConfiguration();
config.setOriginatorSource(TbChangeOriginatorNode.CUSTOMER_SOURCE);
- config.setStartNewChain(startNewChain);
ObjectMapper mapper = new ObjectMapper();
TbNodeConfiguration nodeConfiguration = new TbNodeConfiguration(mapper.valueToTree(config));
diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbTransformMsgNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbTransformMsgNodeTest.java
index a29bc90..c488fd3 100644
--- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbTransformMsgNodeTest.java
+++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbTransformMsgNodeTest.java
@@ -38,6 +38,7 @@ import java.util.concurrent.Callable;
import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.same;
import static org.mockito.Mockito.*;
+import static org.thingsboard.rule.engine.api.TbRelationTypes.SUCCESS;
@RunWith(MockitoJUnitRunner.class)
public class TbTransformMsgNodeTest {
@@ -53,7 +54,7 @@ public class TbTransformMsgNodeTest {
@Test
public void metadataCanBeUpdated() throws TbNodeException, ScriptException {
- initWithScript(false);
+ initWithScript();
TbMsgMetaData metaData = new TbMsgMetaData();
metaData.putValue("temp", "7");
String rawJson = "{\"passed\": 5}";
@@ -68,37 +69,14 @@ public class TbTransformMsgNodeTest {
node.onMsg(ctx, msg);
verify(ctx).getJsExecutor();
ArgumentCaptor<TbMsg> captor = ArgumentCaptor.forClass(TbMsg.class);
- verify(ctx).tellNext(captor.capture());
- TbMsg actualMsg = captor.getValue();
- assertEquals(transformedMsg, actualMsg);
- }
-
-
- @Test
- public void newChainCanBeStarted() throws TbNodeException, ScriptException {
- initWithScript(true);
- TbMsgMetaData metaData = new TbMsgMetaData();
- metaData.putValue("temp", "7");
- String rawJson = "{\"passed\": 5";
-
- RuleChainId ruleChainId = new RuleChainId(UUIDs.timeBased());
- RuleNodeId ruleNodeId = new RuleNodeId(UUIDs.timeBased());
- TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson, ruleChainId, ruleNodeId, 0L);
- TbMsg transformedMsg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, "{new}", ruleChainId, ruleNodeId, 0L);
- mockJsExecutor();
- when(scriptEngine.executeUpdate(msg)).thenReturn(transformedMsg);
-
- node.onMsg(ctx, msg);
- verify(ctx).getJsExecutor();
- ArgumentCaptor<TbMsg> captor = ArgumentCaptor.forClass(TbMsg.class);
- verify(ctx).spawn(captor.capture());
+ verify(ctx).tellNext(captor.capture(), SUCCESS);
TbMsg actualMsg = captor.getValue();
assertEquals(transformedMsg, actualMsg);
}
@Test
public void exceptionHandledCorrectly() throws TbNodeException, ScriptException {
- initWithScript(false);
+ initWithScript();
TbMsgMetaData metaData = new TbMsgMetaData();
metaData.putValue("temp", "7");
String rawJson = "{\"passed\": 5";
@@ -113,10 +91,9 @@ public class TbTransformMsgNodeTest {
verifyError(msg, "error", IllegalStateException.class);
}
- private void initWithScript(boolean startChain) throws TbNodeException {
+ private void initWithScript() throws TbNodeException {
TbTransformMsgNodeConfiguration config = new TbTransformMsgNodeConfiguration();
config.setJsScript("scr");
- config.setStartNewChain(startChain);
ObjectMapper mapper = new ObjectMapper();
TbNodeConfiguration nodeConfiguration = new TbNodeConfiguration(mapper.valueToTree(config));