thingsboard-memoizeit
Changes
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsFilterNode.java 10(+1 -9)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsSwitchNode.java 21(+16 -5)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsSwitchNodeConfiguration.java 1(+1 -0)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/js/NashornJsEngine.java 55(+50 -5)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbEntityGetAttrNode.java 6(+5 -1)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetCustomerAttributeNode.java 28(+4 -24)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetRelatedAttributeNode.java 27(+3 -24)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetRelatedAttrNodeConfiguration.java 2(+1 -1)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTenantAttributeNode.java 39(+4 -35)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbAbstractTransformNode.java 45(+18 -27)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbChangeOriginatorNode.java 93(+93 -0)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbChangeOriginatorNodeConfiguration.java 27(+27 -0)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbTransformMsgNode.java 56(+56 -0)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbTransformMsgNodeConfiguration.java 24(+24 -0)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbTransformNodeConfiguration.java 24(+24 -0)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesCustomerIdAsyncLoader.java 50(+50 -0)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesRelatedEntityIdAsyncLoader.java 51(+51 -0)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesTenantIdAsyncLoader.java 60(+60 -0)
rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/filter/TbJsFilterNodeTest.java 6(+4 -2)
rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/filter/TbJsSwitchNodeTest.java 59(+47 -12)
Details
diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java
index 5163b6c..261dda7 100644
--- a/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java
+++ b/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java
@@ -30,7 +30,7 @@ import java.util.UUID;
* Created by ashvayka on 13.01.18.
*/
@Data
-public final class TbMsg implements Serializable {
+public final class TbMsg implements Serializable, Cloneable {
private final UUID id;
private final String type;
@@ -39,6 +39,11 @@ public final class TbMsg implements Serializable {
private final byte[] data;
+ @Override
+ public TbMsg clone() {
+ return fromBytes(toBytes(this));
+ }
+
public static ByteBuffer toBytes(TbMsg msg) {
MsgProtos.TbMsgProto.Builder builder = MsgProtos.TbMsgProto.newBuilder();
builder.setId(msg.getId().toString());
@@ -77,4 +82,5 @@ public final class TbMsg implements Serializable {
throw new IllegalStateException("Could not parse protobuf for TbMsg", e);
}
}
+
}
diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java
index c7e2bec..fdcf56a 100644
--- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java
+++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java
@@ -29,6 +29,7 @@ import org.thingsboard.server.dao.rule.RuleService;
import org.thingsboard.server.dao.timeseries.TimeseriesService;
import org.thingsboard.server.dao.user.UserService;
+import java.util.Set;
import java.util.UUID;
/**
@@ -40,6 +41,8 @@ public interface TbContext {
void tellNext(TbMsg msg, String relationType);
+ void tellNext(TbMsg msg, Set<String> relationTypes);
+
void tellSelf(TbMsg msg, long delayMs);
void tellOthers(TbMsg msg);
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsFilterNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsFilterNode.java
index 4972bc6..089c9b1 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsFilterNode.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsFilterNode.java
@@ -41,18 +41,10 @@ public class TbJsFilterNode implements TbNode {
public void onMsg(TbContext ctx, TbMsg msg) {
ListeningExecutor jsExecutor = ctx.getJsExecutor();
withCallback(jsExecutor.executeAsync(() -> jsEngine.executeFilter(toBindings(msg))),
- result -> processFilter(ctx, msg, result),
+ filterResult -> ctx.tellNext(msg, Boolean.toString(filterResult)),
t -> ctx.tellError(msg, t));
}
- private void processFilter(TbContext ctx, TbMsg msg, Boolean filterResult) {
- if (filterResult) {
- ctx.tellNext(msg);
- } else {
- log.debug("Msg filtered out {}", msg.getId());
- }
- }
-
private Bindings toBindings(TbMsg msg) {
return NashornJsEngine.bindMsg(msg);
}
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsSwitchNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsSwitchNode.java
index 9dcb332..afd8163 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsSwitchNode.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsSwitchNode.java
@@ -22,6 +22,7 @@ import org.thingsboard.rule.engine.js.NashornJsEngine;
import org.thingsboard.server.common.msg.TbMsg;
import javax.script.Bindings;
+import java.util.Set;
import static org.thingsboard.rule.engine.DonAsynchron.withCallback;
@@ -34,30 +35,40 @@ public class TbJsSwitchNode implements TbNode {
@Override
public void init(TbNodeConfiguration configuration, TbNodeState state) throws TbNodeException {
this.config = TbNodeUtils.convert(configuration, TbJsSwitchNodeConfiguration.class);
- this.jsEngine = new NashornJsEngine(config.getJsScript());
if (config.getAllowedRelations().size() < 1) {
String message = "Switch node should have at least 1 relation";
log.error(message);
throw new IllegalStateException(message);
}
+ if (!config.isRouteToAllWithNoCheck()) {
+ this.jsEngine = new NashornJsEngine(config.getJsScript());
+ }
}
@Override
public void onMsg(TbContext ctx, TbMsg msg) {
+ if (config.isRouteToAllWithNoCheck()) {
+ ctx.tellNext(msg, config.getAllowedRelations());
+ return;
+ }
ListeningExecutor jsExecutor = ctx.getJsExecutor();
withCallback(jsExecutor.executeAsync(() -> jsEngine.executeSwitch(toBindings(msg))),
result -> processSwitch(ctx, msg, result),
t -> ctx.tellError(msg, t));
}
- private void processSwitch(TbContext ctx, TbMsg msg, String nextRelation) {
- if (config.getAllowedRelations().contains(nextRelation)) {
- ctx.tellNext(msg, nextRelation);
+ private void processSwitch(TbContext ctx, TbMsg msg, Set<String> nextRelations) {
+ if (validateRelations(nextRelations)) {
+ ctx.tellNext(msg, nextRelations);
} else {
- ctx.tellError(msg, new IllegalStateException("Unsupported relation for switch " + nextRelation));
+ ctx.tellError(msg, new IllegalStateException("Unsupported relation for switch " + nextRelations));
}
}
+ private boolean validateRelations(Set<String> nextRelations) {
+ return config.getAllowedRelations().containsAll(nextRelations);
+ }
+
private Bindings toBindings(TbMsg msg) {
return NashornJsEngine.bindMsg(msg);
}
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsSwitchNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsSwitchNodeConfiguration.java
index d0e2a51..331302d 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsSwitchNodeConfiguration.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsSwitchNodeConfiguration.java
@@ -24,4 +24,5 @@ public class TbJsSwitchNodeConfiguration {
private String jsScript;
private Set<String> allowedRelations;
+ private boolean routeToAllWithNoCheck;
}
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/js/NashornJsEngine.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/js/NashornJsEngine.java
index a40d4ec..082535f 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/js/NashornJsEngine.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/js/NashornJsEngine.java
@@ -15,15 +15,20 @@
*/
package org.thingsboard.rule.engine.js;
+import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Sets;
import jdk.nashorn.api.scripting.NashornScriptEngineFactory;
+import jdk.nashorn.api.scripting.ScriptObjectMirror;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ArrayUtils;
import org.thingsboard.server.common.msg.TbMsg;
import javax.script.*;
+import java.util.Collections;
import java.util.Map;
+import java.util.Set;
@Slf4j
@@ -68,6 +73,32 @@ public class NashornJsEngine {
}
}
+ private static TbMsg unbindMsg(Bindings bindings, TbMsg msg) throws JsonProcessingException {
+ for (Map.Entry<String, String> entry : msg.getMetaData().getData().entrySet()) {
+ Object obj = entry.getValue();
+ entry.setValue(obj.toString());
+ }
+
+ Object payload = bindings.get(DATA);
+ if (payload != null) {
+ ObjectMapper mapper = new ObjectMapper();
+ byte[] bytes = mapper.writeValueAsBytes(payload);
+ return new TbMsg(msg.getId(), msg.getType(), msg.getOriginator(), msg.getMetaData(), bytes);
+ }
+
+ return msg;
+ }
+
+ public TbMsg executeUpdate(Bindings bindings, TbMsg msg) throws ScriptException {
+ try {
+ engine.eval(bindings);
+ return unbindMsg(bindings, msg);
+ } catch (Throwable th) {
+ th.printStackTrace();
+ throw new IllegalArgumentException("Cannot unbind js args", th);
+ }
+ }
+
public boolean executeFilter(Bindings bindings) throws ScriptException {
Object eval = engine.eval(bindings);
if (eval instanceof Boolean) {
@@ -78,14 +109,28 @@ public class NashornJsEngine {
}
}
- public String executeSwitch(Bindings bindings) throws ScriptException, NoSuchMethodException {
+ public Set<String> executeSwitch(Bindings bindings) throws ScriptException, NoSuchMethodException {
Object eval = this.engine.eval(bindings);
if (eval instanceof String) {
- return (String) eval;
- } else {
- log.warn("Wrong result type: {}", eval);
- throw new ScriptException("Wrong result type: " + eval);
+ return Collections.singleton((String) eval);
+ } else if (eval instanceof ScriptObjectMirror) {
+ ScriptObjectMirror mir = (ScriptObjectMirror) eval;
+ if (mir.isArray()) {
+ Set<String> nextStates = Sets.newHashSet();
+ for (Map.Entry<String, Object> entry : mir.entrySet()) {
+ if (entry.getValue() instanceof String) {
+ nextStates.add((String) entry.getValue());
+ } else {
+ log.warn("Wrong result type: {}", eval);
+ throw new ScriptException("Wrong result type: " + eval);
+ }
+ }
+ return nextStates;
+ }
}
+
+ log.warn("Wrong result type: {}", eval);
+ throw new ScriptException("Wrong result type: " + eval);
}
public void destroy() {
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbEntityGetAttrNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbEntityGetAttrNode.java
index 52850be..269e40f 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbEntityGetAttrNode.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbEntityGetAttrNode.java
@@ -70,7 +70,7 @@ public abstract class TbEntityGetAttrNode<T extends EntityId> implements TbNode
}
- private void putAttributesAndTell(TbContext ctx, TbMsg msg, List<KvEntry> attributes) {
+ private void putAttributesAndTell(TbContext ctx, TbMsg msg, List<? extends KvEntry> attributes) {
attributes.forEach(r -> {
String attrName = config.getAttrMapping().get(r.getKey());
msg.getMetaData().putValue(attrName, r.getValueAsString());
@@ -85,4 +85,8 @@ public abstract class TbEntityGetAttrNode<T extends EntityId> implements TbNode
protected abstract ListenableFuture<T> findEntityAsync(TbContext ctx, EntityId originator);
+ public void setConfig(TbGetEntityAttrNodeConfiguration config) {
+ this.config = config;
+ }
+
}
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetCustomerAttributeNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetCustomerAttributeNode.java
index 57f9b79..b7b1fd7 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetCustomerAttributeNode.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetCustomerAttributeNode.java
@@ -15,37 +15,17 @@
*/
package org.thingsboard.rule.engine.metadata;
-import com.google.common.util.concurrent.AsyncFunction;
-import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.thingsboard.rule.engine.api.TbContext;
-import org.thingsboard.rule.engine.api.TbNodeException;
-import org.thingsboard.server.common.data.HasCustomerId;
-import org.thingsboard.server.common.data.id.*;
+import org.thingsboard.rule.engine.util.EntitiesCustomerIdAsyncLoader;
+import org.thingsboard.server.common.data.id.CustomerId;
+import org.thingsboard.server.common.data.id.EntityId;
public class TbGetCustomerAttributeNode extends TbEntityGetAttrNode<CustomerId> {
@Override
protected ListenableFuture<CustomerId> findEntityAsync(TbContext ctx, EntityId originator) {
-
- switch (originator.getEntityType()) {
- case CUSTOMER:
- return Futures.immediateFuture((CustomerId) originator);
- case USER:
- return getCustomerAsync(ctx.getUserService().findUserByIdAsync((UserId) originator));
- case ASSET:
- return getCustomerAsync(ctx.getAssetService().findAssetByIdAsync((AssetId) originator));
- case DEVICE:
- return getCustomerAsync(ctx.getDeviceService().findDeviceByIdAsync((DeviceId) originator));
- default:
- return Futures.immediateFailedFuture(new TbNodeException("Unexpected originator EntityType " + originator));
- }
- }
-
- private <T extends HasCustomerId> ListenableFuture<CustomerId> getCustomerAsync(ListenableFuture<T> future) {
- return Futures.transform(future, (AsyncFunction<HasCustomerId, CustomerId>) in -> {
- return in != null ? Futures.immediateFuture(in.getCustomerId())
- : Futures.immediateFailedFuture(new IllegalStateException("Customer not found"));});
+ return EntitiesCustomerIdAsyncLoader.findEntityIdAsync(ctx, originator);
}
}
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetRelatedAttributeNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetRelatedAttributeNode.java
index 5823c18..474fb5d 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetRelatedAttributeNode.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetRelatedAttributeNode.java
@@ -15,23 +15,14 @@
*/
package org.thingsboard.rule.engine.metadata;
-import com.google.common.util.concurrent.AsyncFunction;
-import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
-import org.apache.commons.collections.CollectionUtils;
import org.thingsboard.rule.engine.TbNodeUtils;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.rule.engine.api.TbNodeState;
+import org.thingsboard.rule.engine.util.EntitiesRelatedEntityIdAsyncLoader;
import org.thingsboard.server.common.data.id.EntityId;
-import org.thingsboard.server.common.data.relation.EntityRelation;
-import org.thingsboard.server.common.data.relation.EntitySearchDirection;
-import org.thingsboard.server.dao.relation.RelationService;
-
-import java.util.List;
-
-import static org.thingsboard.server.common.data.relation.RelationTypeGroup.COMMON;
public class TbGetRelatedAttributeNode extends TbEntityGetAttrNode<EntityId> {
@@ -40,23 +31,11 @@ public class TbGetRelatedAttributeNode extends TbEntityGetAttrNode<EntityId> {
@Override
public void init(TbNodeConfiguration configuration, TbNodeState state) throws TbNodeException {
this.config = TbNodeUtils.convert(configuration, TbGetRelatedAttrNodeConfiguration.class);
+ setConfig(config);
}
@Override
protected ListenableFuture<EntityId> findEntityAsync(TbContext ctx, EntityId originator) {
- RelationService relationService = ctx.getRelationService();
- if (config.getDirection() == EntitySearchDirection.FROM) {
- ListenableFuture<List<EntityRelation>> asyncRelation = relationService.findByFromAndTypeAsync(originator, config.getRelationType(), COMMON);
- return Futures.transform(asyncRelation, (AsyncFunction<? super List<EntityRelation>, EntityId>)
- r -> CollectionUtils.isNotEmpty(r) ? Futures.immediateFuture(r.get(0).getTo())
- : Futures.immediateFailedFuture(new IllegalStateException("Relation not found")));
- } else if (config.getDirection() == EntitySearchDirection.TO) {
- ListenableFuture<List<EntityRelation>> asyncRelation = relationService.findByToAndTypeAsync(originator, config.getRelationType(), COMMON);
- return Futures.transform(asyncRelation, (AsyncFunction<? super List<EntityRelation>, EntityId>)
- r -> CollectionUtils.isNotEmpty(r) ? Futures.immediateFuture(r.get(0).getFrom())
- : Futures.immediateFailedFuture(new IllegalStateException("Relation not found")));
- }
-
- return Futures.immediateFailedFuture(new IllegalStateException("Unknown direction"));
+ return EntitiesRelatedEntityIdAsyncLoader.findEntityAsync(ctx, originator, config.getDirection(), config.getRelationType());
}
}
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetRelatedAttrNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetRelatedAttrNodeConfiguration.java
index 75b0a65..ae0b662 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetRelatedAttrNodeConfiguration.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetRelatedAttrNodeConfiguration.java
@@ -19,7 +19,7 @@ import lombok.Data;
import org.thingsboard.server.common.data.relation.EntitySearchDirection;
@Data
-public class TbGetRelatedAttrNodeConfiguration {
+public class TbGetRelatedAttrNodeConfiguration extends TbGetEntityAttrNodeConfiguration {
private String relationType;
private EntitySearchDirection direction;
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTenantAttributeNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTenantAttributeNode.java
index 2cf9a97..b97e220 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTenantAttributeNode.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTenantAttributeNode.java
@@ -15,50 +15,19 @@
*/
package org.thingsboard.rule.engine.metadata;
-import com.google.common.util.concurrent.AsyncFunction;
-import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.rule.engine.api.TbContext;
-import org.thingsboard.rule.engine.api.TbNodeException;
-import org.thingsboard.server.common.data.HasTenantId;
-import org.thingsboard.server.common.data.alarm.AlarmId;
-import org.thingsboard.server.common.data.id.*;
+import org.thingsboard.rule.engine.util.EntitiesTenantIdAsyncLoader;
+import org.thingsboard.server.common.data.id.EntityId;
+import org.thingsboard.server.common.data.id.TenantId;
@Slf4j
public class TbGetTenantAttributeNode extends TbEntityGetAttrNode<TenantId> {
@Override
protected ListenableFuture<TenantId> findEntityAsync(TbContext ctx, EntityId originator) {
-
- switch (originator.getEntityType()) {
- case TENANT:
- return Futures.immediateFuture((TenantId) originator);
- case CUSTOMER:
- return getTenantAsync(ctx.getCustomerService().findCustomerByIdAsync((CustomerId) originator));
- case USER:
- return getTenantAsync(ctx.getUserService().findUserByIdAsync((UserId) originator));
- case RULE:
- return getTenantAsync(ctx.getRuleService().findRuleByIdAsync((RuleId) originator));
- case PLUGIN:
- return getTenantAsync(ctx.getPluginService().findPluginByIdAsync((PluginId) originator));
- case ASSET:
- return getTenantAsync(ctx.getAssetService().findAssetByIdAsync((AssetId) originator));
- case DEVICE:
- return getTenantAsync(ctx.getDeviceService().findDeviceByIdAsync((DeviceId) originator));
- case ALARM:
- return getTenantAsync(ctx.getAlarmService().findAlarmByIdAsync((AlarmId) originator));
- case RULE_CHAIN:
- return getTenantAsync(ctx.getRuleChainService().findRuleChainByIdAsync((RuleChainId) originator));
- default:
- return Futures.immediateFailedFuture(new TbNodeException("Unexpected originator EntityType " + originator));
- }
- }
-
- private <T extends HasTenantId> ListenableFuture<TenantId> getTenantAsync(ListenableFuture<T> future) {
- return Futures.transform(future, (AsyncFunction<HasTenantId, TenantId>) in -> {
- return in != null ? Futures.immediateFuture(in.getTenantId())
- : Futures.immediateFailedFuture(new IllegalStateException("Tenant not found"));});
+ return EntitiesTenantIdAsyncLoader.findEntityIdAsync(ctx, originator);
}
}
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbChangeOriginatorNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbChangeOriginatorNode.java
new file mode 100644
index 0000000..2592af2
--- /dev/null
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbChangeOriginatorNode.java
@@ -0,0 +1,93 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.rule.engine.transform;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.thingsboard.rule.engine.TbNodeUtils;
+import org.thingsboard.rule.engine.api.TbContext;
+import org.thingsboard.rule.engine.api.TbNodeConfiguration;
+import org.thingsboard.rule.engine.api.TbNodeException;
+import org.thingsboard.rule.engine.api.TbNodeState;
+import org.thingsboard.rule.engine.util.EntitiesCustomerIdAsyncLoader;
+import org.thingsboard.rule.engine.util.EntitiesRelatedEntityIdAsyncLoader;
+import org.thingsboard.rule.engine.util.EntitiesTenantIdAsyncLoader;
+import org.thingsboard.server.common.data.id.EntityId;
+import org.thingsboard.server.common.msg.TbMsg;
+
+import java.util.HashSet;
+
+@Slf4j
+public class TbChangeOriginatorNode extends TbAbstractTransformNode {
+
+ protected static final String CUSTOMER_SOURCE = "CUSTOMER";
+ protected static final String TENANT_SOURCE = "TENANT";
+ protected static final String RELATED_SOURCE = "RELATED";
+
+ private TbChangeOriginatorNodeConfiguration config;
+
+ @Override
+ public void init(TbNodeConfiguration configuration, TbNodeState state) throws TbNodeException {
+ this.config = TbNodeUtils.convert(configuration, TbChangeOriginatorNodeConfiguration.class);
+ validateConfig(config);
+ setConfig(config);
+ }
+
+ @Override
+ protected ListenableFuture<TbMsg> transform(TbContext ctx, TbMsg msg) {
+ ListenableFuture<? extends EntityId> newOriginator = getNewOriginator(ctx, msg.getOriginator());
+ return Futures.transform(newOriginator, (Function<EntityId, TbMsg>) n -> new TbMsg(msg.getId(), msg.getType(), n, msg.getMetaData(), msg.getData()));
+ }
+
+ private ListenableFuture<? extends EntityId> getNewOriginator(TbContext ctx, EntityId original) {
+ switch (config.getOriginatorSource()) {
+ case CUSTOMER_SOURCE:
+ return EntitiesCustomerIdAsyncLoader.findEntityIdAsync(ctx, original);
+ case TENANT_SOURCE:
+ return EntitiesTenantIdAsyncLoader.findEntityIdAsync(ctx, original);
+ case RELATED_SOURCE:
+ return EntitiesRelatedEntityIdAsyncLoader.findEntityAsync(ctx, original, config.getDirection(), config.getRelationType());
+ default:
+ return Futures.immediateFailedFuture(new IllegalStateException("Unexpected originator source " + config.getOriginatorSource()));
+ }
+ }
+
+ private void validateConfig(TbChangeOriginatorNodeConfiguration conf) {
+ HashSet<String> knownSources = Sets.newHashSet(CUSTOMER_SOURCE, TENANT_SOURCE, RELATED_SOURCE);
+ if (!knownSources.contains(conf.getOriginatorSource())) {
+ log.error("Unsupported source [{}] for TbChangeOriginatorNode", conf.getOriginatorSource());
+ throw new IllegalArgumentException("Unsupported source TbChangeOriginatorNode" + conf.getOriginatorSource());
+ }
+
+ if (conf.getOriginatorSource().equals(RELATED_SOURCE)) {
+ if (conf.getDirection() == null || StringUtils.isBlank(conf.getRelationType())) {
+ log.error("Related source for TbChangeOriginatorNode should have direction and relationType. Actual [{}] [{}]",
+ conf.getDirection(), conf.getRelationType());
+ throw new IllegalArgumentException("Wrong config for RElated Source in TbChangeOriginatorNode" + conf.getOriginatorSource());
+ }
+ }
+
+ }
+
+ @Override
+ public void destroy() {
+
+ }
+}
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbChangeOriginatorNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbChangeOriginatorNodeConfiguration.java
new file mode 100644
index 0000000..cf03681
--- /dev/null
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbChangeOriginatorNodeConfiguration.java
@@ -0,0 +1,27 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.rule.engine.transform;
+
+import lombok.Data;
+import org.thingsboard.server.common.data.relation.EntitySearchDirection;
+
+@Data
+public class TbChangeOriginatorNodeConfiguration extends TbTransformNodeConfiguration{
+
+ private String originatorSource;
+ private EntitySearchDirection direction;
+ private String relationType;
+}
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbTransformMsgNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbTransformMsgNode.java
new file mode 100644
index 0000000..241fbfb
--- /dev/null
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbTransformMsgNode.java
@@ -0,0 +1,56 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.rule.engine.transform;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import org.thingsboard.rule.engine.TbNodeUtils;
+import org.thingsboard.rule.engine.api.TbContext;
+import org.thingsboard.rule.engine.api.TbNodeConfiguration;
+import org.thingsboard.rule.engine.api.TbNodeException;
+import org.thingsboard.rule.engine.api.TbNodeState;
+import org.thingsboard.rule.engine.js.NashornJsEngine;
+import org.thingsboard.server.common.msg.TbMsg;
+
+import javax.script.Bindings;
+
+public class TbTransformMsgNode extends TbAbstractTransformNode {
+
+ private TbTransformMsgNodeConfiguration config;
+ private NashornJsEngine jsEngine;
+
+ @Override
+ public void init(TbNodeConfiguration configuration, TbNodeState state) throws TbNodeException {
+ this.config = TbNodeUtils.convert(configuration, TbTransformMsgNodeConfiguration.class);
+ this.jsEngine = new NashornJsEngine(config.getJsScript());
+ setConfig(config);
+ }
+
+ @Override
+ protected ListenableFuture<TbMsg> transform(TbContext ctx, TbMsg msg) {
+ return ctx.getJsExecutor().executeAsync(() -> jsEngine.executeUpdate(toBindings(msg), msg));
+ }
+
+ private Bindings toBindings(TbMsg msg) {
+ return NashornJsEngine.bindMsg(msg);
+ }
+
+ @Override
+ public void destroy() {
+ if (jsEngine != null) {
+ jsEngine.destroy();
+ }
+ }
+}
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbTransformMsgNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbTransformMsgNodeConfiguration.java
new file mode 100644
index 0000000..9cc926b
--- /dev/null
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbTransformMsgNodeConfiguration.java
@@ -0,0 +1,24 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.rule.engine.transform;
+
+import lombok.Data;
+
+@Data
+public class TbTransformMsgNodeConfiguration extends TbTransformNodeConfiguration {
+
+ private String jsScript;
+}
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbTransformNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbTransformNodeConfiguration.java
new file mode 100644
index 0000000..d9f5780
--- /dev/null
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbTransformNodeConfiguration.java
@@ -0,0 +1,24 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.rule.engine.transform;
+
+import lombok.Data;
+
+@Data
+public class TbTransformNodeConfiguration {
+
+ private boolean startNewChain = false;
+}
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesCustomerIdAsyncLoader.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesCustomerIdAsyncLoader.java
new file mode 100644
index 0000000..67eb808
--- /dev/null
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesCustomerIdAsyncLoader.java
@@ -0,0 +1,50 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.rule.engine.util;
+
+import com.google.common.util.concurrent.AsyncFunction;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.thingsboard.rule.engine.api.TbContext;
+import org.thingsboard.rule.engine.api.TbNodeException;
+import org.thingsboard.server.common.data.HasCustomerId;
+import org.thingsboard.server.common.data.id.*;
+
+public class EntitiesCustomerIdAsyncLoader {
+
+
+ public static ListenableFuture<CustomerId> findEntityIdAsync(TbContext ctx, EntityId original) {
+
+ switch (original.getEntityType()) {
+ case CUSTOMER:
+ return Futures.immediateFuture((CustomerId) original);
+ case USER:
+ return getCustomerAsync(ctx.getUserService().findUserByIdAsync((UserId) original));
+ case ASSET:
+ return getCustomerAsync(ctx.getAssetService().findAssetByIdAsync((AssetId) original));
+ case DEVICE:
+ return getCustomerAsync(ctx.getDeviceService().findDeviceByIdAsync((DeviceId) original));
+ default:
+ return Futures.immediateFailedFuture(new TbNodeException("Unexpected original EntityType " + original));
+ }
+ }
+
+ private static <T extends HasCustomerId> ListenableFuture<CustomerId> getCustomerAsync(ListenableFuture<T> future) {
+ return Futures.transform(future, (AsyncFunction<HasCustomerId, CustomerId>) in -> {
+ return in != null ? Futures.immediateFuture(in.getCustomerId())
+ : Futures.immediateFailedFuture(new IllegalStateException("Customer not found"));});
+ }
+}
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesRelatedEntityIdAsyncLoader.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesRelatedEntityIdAsyncLoader.java
new file mode 100644
index 0000000..ac69c5d
--- /dev/null
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesRelatedEntityIdAsyncLoader.java
@@ -0,0 +1,51 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.rule.engine.util;
+
+import com.google.common.util.concurrent.AsyncFunction;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.commons.collections.CollectionUtils;
+import org.thingsboard.rule.engine.api.TbContext;
+import org.thingsboard.server.common.data.id.EntityId;
+import org.thingsboard.server.common.data.relation.EntityRelation;
+import org.thingsboard.server.common.data.relation.EntitySearchDirection;
+import org.thingsboard.server.dao.relation.RelationService;
+
+import java.util.List;
+
+import static org.thingsboard.server.common.data.relation.RelationTypeGroup.COMMON;
+
+public class EntitiesRelatedEntityIdAsyncLoader {
+
+ public static ListenableFuture<EntityId> findEntityAsync(TbContext ctx, EntityId originator,
+ EntitySearchDirection direction, String relationType) {
+ RelationService relationService = ctx.getRelationService();
+ if (direction == EntitySearchDirection.FROM) {
+ ListenableFuture<List<EntityRelation>> asyncRelation = relationService.findByFromAndTypeAsync(originator, relationType, COMMON);
+ return Futures.transform(asyncRelation, (AsyncFunction<? super List<EntityRelation>, EntityId>)
+ r -> CollectionUtils.isNotEmpty(r) ? Futures.immediateFuture(r.get(0).getTo())
+ : Futures.immediateFailedFuture(new IllegalStateException("Relation not found")));
+ } else if (direction == EntitySearchDirection.TO) {
+ ListenableFuture<List<EntityRelation>> asyncRelation = relationService.findByToAndTypeAsync(originator, relationType, COMMON);
+ return Futures.transform(asyncRelation, (AsyncFunction<? super List<EntityRelation>, EntityId>)
+ r -> CollectionUtils.isNotEmpty(r) ? Futures.immediateFuture(r.get(0).getFrom())
+ : Futures.immediateFailedFuture(new IllegalStateException("Relation not found")));
+ }
+
+ return Futures.immediateFailedFuture(new IllegalStateException("Unknown direction"));
+ }
+}
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesTenantIdAsyncLoader.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesTenantIdAsyncLoader.java
new file mode 100644
index 0000000..388881b
--- /dev/null
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesTenantIdAsyncLoader.java
@@ -0,0 +1,60 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.rule.engine.util;
+
+import com.google.common.util.concurrent.AsyncFunction;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.thingsboard.rule.engine.api.TbContext;
+import org.thingsboard.rule.engine.api.TbNodeException;
+import org.thingsboard.server.common.data.HasTenantId;
+import org.thingsboard.server.common.data.alarm.AlarmId;
+import org.thingsboard.server.common.data.id.*;
+
+public class EntitiesTenantIdAsyncLoader {
+
+ public static ListenableFuture<TenantId> findEntityIdAsync(TbContext ctx, EntityId original) {
+
+ switch (original.getEntityType()) {
+ case TENANT:
+ return Futures.immediateFuture((TenantId) original);
+ case CUSTOMER:
+ return getTenantAsync(ctx.getCustomerService().findCustomerByIdAsync((CustomerId) original));
+ case USER:
+ return getTenantAsync(ctx.getUserService().findUserByIdAsync((UserId) original));
+ case RULE:
+ return getTenantAsync(ctx.getRuleService().findRuleByIdAsync((RuleId) original));
+ case PLUGIN:
+ return getTenantAsync(ctx.getPluginService().findPluginByIdAsync((PluginId) original));
+ case ASSET:
+ return getTenantAsync(ctx.getAssetService().findAssetByIdAsync((AssetId) original));
+ case DEVICE:
+ return getTenantAsync(ctx.getDeviceService().findDeviceByIdAsync((DeviceId) original));
+ case ALARM:
+ return getTenantAsync(ctx.getAlarmService().findAlarmByIdAsync((AlarmId) original));
+ case RULE_CHAIN:
+ return getTenantAsync(ctx.getRuleChainService().findRuleChainByIdAsync((RuleChainId) original));
+ default:
+ return Futures.immediateFailedFuture(new TbNodeException("Unexpected original EntityType " + original));
+ }
+ }
+
+ private static <T extends HasTenantId> ListenableFuture<TenantId> getTenantAsync(ListenableFuture<T> future) {
+ return Futures.transform(future, (AsyncFunction<HasTenantId, TenantId>) in -> {
+ return in != null ? Futures.immediateFuture(in.getTenantId())
+ : Futures.immediateFailedFuture(new IllegalStateException("Tenant not found"));});
+ }
+}
diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/filter/TbJsFilterNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/filter/TbJsFilterNodeTest.java
index 6a9146d..2fd9f4e 100644
--- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/filter/TbJsFilterNodeTest.java
+++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/filter/TbJsFilterNodeTest.java
@@ -58,6 +58,7 @@ public class TbJsFilterNodeTest {
node.onMsg(ctx, msg);
verify(ctx).getJsExecutor();
+ verify(ctx).tellNext(msg, "false");
verifyNoMoreInteractions(ctx);
}
@@ -102,6 +103,7 @@ public class TbJsFilterNodeTest {
node.onMsg(ctx, msg);
verify(ctx).getJsExecutor();
+ verify(ctx).tellNext(msg, "false");
verifyNoMoreInteractions(ctx);
}
@@ -116,7 +118,7 @@ public class TbJsFilterNodeTest {
node.onMsg(ctx, msg);
verify(ctx).getJsExecutor();
- verify(ctx).tellNext(msg);
+ verify(ctx).tellNext(msg, "true");
}
@Test
@@ -132,7 +134,7 @@ public class TbJsFilterNodeTest {
node.onMsg(ctx, msg);
verify(ctx).getJsExecutor();
- verify(ctx).tellNext(msg);
+ verify(ctx).tellNext(msg, "true");
}
private void initWithScript(String script) throws TbNodeException {
diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/filter/TbJsSwitchNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/filter/TbJsSwitchNodeTest.java
index 1b4cce9..a2f5f7d 100644
--- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/filter/TbJsSwitchNodeTest.java
+++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/filter/TbJsSwitchNodeTest.java
@@ -34,14 +34,13 @@ import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData;
+import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.Callable;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.same;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.*;
@RunWith(MockitoJUnitRunner.class)
public class TbJsSwitchNodeTest {
@@ -54,6 +53,41 @@ public class TbJsSwitchNodeTest {
private ListeningExecutor executor;
@Test
+ public void routeToAllDoNotEvaluatesJs() throws TbNodeException {
+ HashSet<String> relations = Sets.newHashSet("one", "two");
+ initWithScript("test qwerty", relations, true);
+ TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, new TbMsgMetaData(), "{}".getBytes());
+
+ node.onMsg(ctx, msg);
+ verify(ctx).tellNext(msg, relations);
+ verifyNoMoreInteractions(ctx, executor);
+ }
+
+ @Test
+ public void multipleRoutesAreAllowed() throws TbNodeException {
+ String jsCode = "function nextRelation(meta, msg) {\n" +
+ " if(msg.passed == 5 && meta.temp == 10)\n" +
+ " return ['three', 'one']\n" +
+ " else\n" +
+ " return 'two';\n" +
+ "};\n" +
+ "\n" +
+ "nextRelation(meta, msg);";
+ initWithScript(jsCode, Sets.newHashSet("one", "two", "three"), false);
+ TbMsgMetaData metaData = new TbMsgMetaData();
+ metaData.putValue("temp", "10");
+ metaData.putValue("humidity", "99");
+ String rawJson = "{\"name\": \"Vit\", \"passed\": 5}";
+
+ TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson.getBytes());
+ mockJsExecutor();
+
+ node.onMsg(ctx, msg);
+ verify(ctx).getJsExecutor();
+ verify(ctx).tellNext(msg, Sets.newHashSet("one", "three"));
+ }
+
+ @Test
public void allowedRelationPassed() throws TbNodeException {
String jsCode = "function nextRelation(meta, msg) {\n" +
" if(msg.passed == 5 && meta.temp == 10)\n" +
@@ -63,7 +97,7 @@ public class TbJsSwitchNodeTest {
"};\n" +
"\n" +
"nextRelation(meta, msg);";
- initWithScript(jsCode, Sets.newHashSet("one", "two"));
+ initWithScript(jsCode, Sets.newHashSet("one", "two"), false);
TbMsgMetaData metaData = new TbMsgMetaData();
metaData.putValue("temp", "10");
metaData.putValue("humidity", "99");
@@ -74,17 +108,17 @@ public class TbJsSwitchNodeTest {
node.onMsg(ctx, msg);
verify(ctx).getJsExecutor();
- verify(ctx).tellNext(msg, "one");
+ verify(ctx).tellNext(msg, Sets.newHashSet("one"));
}
@Test
public void unknownRelationThrowsException() throws TbNodeException {
String jsCode = "function nextRelation(meta, msg) {\n" +
- " return 'nine';" +
+ " return ['one','nine'];" +
"};\n" +
"\n" +
"nextRelation(meta, msg);";
- initWithScript(jsCode, Sets.newHashSet("one", "two"));
+ initWithScript(jsCode, Sets.newHashSet("one", "two"), false);
TbMsgMetaData metaData = new TbMsgMetaData();
metaData.putValue("temp", "10");
metaData.putValue("humidity", "99");
@@ -95,13 +129,14 @@ public class TbJsSwitchNodeTest {
node.onMsg(ctx, msg);
verify(ctx).getJsExecutor();
- verifyError(msg, "Unsupported relation for switch nine", IllegalStateException.class);
+ verifyError(msg, "Unsupported relation for switch [nine, one]", IllegalStateException.class);
}
- private void initWithScript(String script, Set<String> relations) throws TbNodeException {
+ private void initWithScript(String script, Set<String> relations, boolean routeToAll) throws TbNodeException {
TbJsSwitchNodeConfiguration config = new TbJsSwitchNodeConfiguration();
config.setJsScript(script);
config.setAllowedRelations(relations);
+ config.setRouteToAllWithNoCheck(routeToAll);
ObjectMapper mapper = new ObjectMapper();
TbNodeConfiguration nodeConfiguration = new TbNodeConfiguration();
nodeConfiguration.setData(mapper.valueToTree(config));
@@ -112,10 +147,10 @@ public class TbJsSwitchNodeTest {
private void mockJsExecutor() {
when(ctx.getJsExecutor()).thenReturn(executor);
- doAnswer((Answer<ListenableFuture<String>>) invocationOnMock -> {
+ doAnswer((Answer<ListenableFuture<Set<String>>>) invocationOnMock -> {
try {
Callable task = (Callable) (invocationOnMock.getArguments())[0];
- return Futures.immediateFuture((String) task.call());
+ return Futures.immediateFuture((Set<String>) task.call());
} catch (Throwable th) {
return Futures.immediateFailedFuture(th);
}
diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbChangeOriginatorNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbChangeOriginatorNodeTest.java
new file mode 100644
index 0000000..77b00fb
--- /dev/null
+++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbChangeOriginatorNodeTest.java
@@ -0,0 +1,125 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.rule.engine.transform;
+
+import com.datastax.driver.core.utils.UUIDs;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.util.concurrent.Futures;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+import org.thingsboard.rule.engine.api.TbContext;
+import org.thingsboard.rule.engine.api.TbNodeConfiguration;
+import org.thingsboard.rule.engine.api.TbNodeException;
+import org.thingsboard.server.common.data.asset.Asset;
+import org.thingsboard.server.common.data.id.AssetId;
+import org.thingsboard.server.common.data.id.CustomerId;
+import org.thingsboard.server.common.msg.TbMsg;
+import org.thingsboard.server.common.msg.TbMsgMetaData;
+import org.thingsboard.server.dao.asset.AssetService;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.same;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class TbChangeOriginatorNodeTest {
+
+ private TbChangeOriginatorNode node;
+
+ @Mock
+ private TbContext ctx;
+ @Mock
+ private AssetService assetService;
+
+
+ @Test
+ public void originatorCanBeChangedToCustomerId() throws TbNodeException {
+ init(false);
+ AssetId assetId = new AssetId(UUIDs.timeBased());
+ CustomerId customerId = new CustomerId(UUIDs.timeBased());
+ Asset asset = new Asset();
+ asset.setCustomerId(customerId);
+
+ TbMsg msg = new TbMsg(UUIDs.timeBased(), "ASSET", assetId, new TbMsgMetaData(), new byte[4]);
+
+ when(ctx.getAssetService()).thenReturn(assetService);
+ when(assetService.findAssetByIdAsync(assetId)).thenReturn(Futures.immediateFuture(asset));
+
+ node.onMsg(ctx, msg);
+ ArgumentCaptor<TbMsg> captor = ArgumentCaptor.forClass(TbMsg.class);
+ verify(ctx).tellNext(captor.capture());
+ TbMsg actualMsg = captor.getValue();
+ assertEquals(customerId, actualMsg.getOriginator());
+ assertEquals(msg.getId(), actualMsg.getId());
+ }
+
+ @Test
+ public void newChainCanBeStarted() throws TbNodeException {
+ init(true);
+ AssetId assetId = new AssetId(UUIDs.timeBased());
+ CustomerId customerId = new CustomerId(UUIDs.timeBased());
+ Asset asset = new Asset();
+ asset.setCustomerId(customerId);
+
+ TbMsg msg = new TbMsg(UUIDs.timeBased(), "ASSET", assetId, new TbMsgMetaData(), new byte[4]);
+
+ when(ctx.getAssetService()).thenReturn(assetService);
+ when(assetService.findAssetByIdAsync(assetId)).thenReturn(Futures.immediateFuture(asset));
+
+ node.onMsg(ctx, msg);
+ ArgumentCaptor<TbMsg> captor = ArgumentCaptor.forClass(TbMsg.class);
+ verify(ctx).spawn(captor.capture());
+ TbMsg actualMsg = captor.getValue();
+ assertEquals(customerId, actualMsg.getOriginator());
+ assertEquals(msg.getId(), actualMsg.getId());
+ }
+
+ @Test
+ public void exceptionThrownIfCannotFindNewOriginator() throws TbNodeException {
+ init(true);
+ AssetId assetId = new AssetId(UUIDs.timeBased());
+ CustomerId customerId = new CustomerId(UUIDs.timeBased());
+ Asset asset = new Asset();
+ asset.setCustomerId(customerId);
+
+ TbMsg msg = new TbMsg(UUIDs.timeBased(), "ASSET", assetId, new TbMsgMetaData(), new byte[4]);
+
+ when(ctx.getAssetService()).thenReturn(assetService);
+ when(assetService.findAssetByIdAsync(assetId)).thenReturn(Futures.immediateFailedFuture(new IllegalStateException("wrong")));
+
+ node.onMsg(ctx, msg);
+ ArgumentCaptor<Throwable> captor = ArgumentCaptor.forClass(Throwable.class);
+ verify(ctx).tellError(same(msg), captor.capture());
+ Throwable value = captor.getValue();
+ assertEquals("wrong", value.getMessage());
+ }
+
+ public void init(boolean startNewChain) throws TbNodeException {
+ TbChangeOriginatorNodeConfiguration config = new TbChangeOriginatorNodeConfiguration();
+ config.setOriginatorSource(TbChangeOriginatorNode.CUSTOMER_SOURCE);
+ config.setStartNewChain(startNewChain);
+ ObjectMapper mapper = new ObjectMapper();
+ TbNodeConfiguration nodeConfiguration = new TbNodeConfiguration();
+ nodeConfiguration.setData(mapper.valueToTree(config));
+
+ node = new TbChangeOriginatorNode();
+ node.init(nodeConfiguration, null);
+ }
+}
\ No newline at end of file
diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbTransformMsgNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbTransformMsgNodeTest.java
new file mode 100644
index 0000000..876e70f
--- /dev/null
+++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbTransformMsgNodeTest.java
@@ -0,0 +1,141 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.rule.engine.transform;
+
+import com.datastax.driver.core.utils.UUIDs;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Matchers;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+import org.mockito.stubbing.Answer;
+import org.thingsboard.rule.engine.api.ListeningExecutor;
+import org.thingsboard.rule.engine.api.TbContext;
+import org.thingsboard.rule.engine.api.TbNodeConfiguration;
+import org.thingsboard.rule.engine.api.TbNodeException;
+import org.thingsboard.server.common.msg.TbMsg;
+import org.thingsboard.server.common.msg.TbMsgMetaData;
+
+import java.util.concurrent.Callable;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.same;
+import static org.mockito.Mockito.*;
+
+@RunWith(MockitoJUnitRunner.class)
+public class TbTransformMsgNodeTest {
+
+ private TbTransformMsgNode node;
+
+ @Mock
+ private TbContext ctx;
+ @Mock
+ private ListeningExecutor executor;
+
+ @Test
+ public void metadataCanBeUpdated() throws TbNodeException {
+ initWithScript("meta.temp = meta.temp * 10;");
+ TbMsgMetaData metaData = new TbMsgMetaData();
+ metaData.putValue("temp", "7");
+ metaData.putValue("humidity", "99");
+ String rawJson = "{\"name\": \"Vit\", \"passed\": 5, \"bigObj\": {\"prop\":42}}";
+
+ TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson.getBytes());
+ mockJsExecutor();
+
+ node.onMsg(ctx, msg);
+ verify(ctx).getJsExecutor();
+ ArgumentCaptor<TbMsg> captor = ArgumentCaptor.forClass(TbMsg.class);
+ verify(ctx).tellNext(captor.capture());
+ TbMsg actualMsg = captor.getValue();
+ assertEquals("70.0", actualMsg.getMetaData().getValue("temp"));
+ }
+
+ @Test
+ public void metadataCanBeAdded() throws TbNodeException {
+ initWithScript("meta.newAttr = meta.humidity - msg.passed;");
+ TbMsgMetaData metaData = new TbMsgMetaData();
+ metaData.putValue("temp", "7");
+ metaData.putValue("humidity", "99");
+ String rawJson = "{\"name\": \"Vit\", \"passed\": 5, \"bigObj\": {\"prop\":42}}";
+
+ TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson.getBytes());
+ mockJsExecutor();
+
+ node.onMsg(ctx, msg);
+ verify(ctx).getJsExecutor();
+ ArgumentCaptor<TbMsg> captor = ArgumentCaptor.forClass(TbMsg.class);
+ verify(ctx).tellNext(captor.capture());
+ TbMsg actualMsg = captor.getValue();
+ assertEquals("94.0", actualMsg.getMetaData().getValue("newAttr"));
+ }
+
+ @Test
+ public void payloadCanBeUpdated() throws TbNodeException {
+ initWithScript("msg.passed = msg.passed * meta.temp; msg.bigObj.newProp = 'Ukraine' ");
+ TbMsgMetaData metaData = new TbMsgMetaData();
+ metaData.putValue("temp", "7");
+ metaData.putValue("humidity", "99");
+ String rawJson = "{\"name\":\"Vit\",\"passed\": 5,\"bigObj\":{\"prop\":42}}";
+
+ TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson.getBytes());
+ mockJsExecutor();
+
+ node.onMsg(ctx, msg);
+ verify(ctx).getJsExecutor();
+ ArgumentCaptor<TbMsg> captor = ArgumentCaptor.forClass(TbMsg.class);
+ verify(ctx).tellNext(captor.capture());
+ TbMsg actualMsg = captor.getValue();
+ String expectedJson = "{\"name\":\"Vit\",\"passed\":35.0,\"bigObj\":{\"prop\":42,\"newProp\":\"Ukraine\"}}";
+ assertEquals(expectedJson, new String(actualMsg.getData()));
+ }
+
+ private void initWithScript(String script) throws TbNodeException {
+ TbTransformMsgNodeConfiguration config = new TbTransformMsgNodeConfiguration();
+ config.setJsScript(script);
+ ObjectMapper mapper = new ObjectMapper();
+ TbNodeConfiguration nodeConfiguration = new TbNodeConfiguration();
+ nodeConfiguration.setData(mapper.valueToTree(config));
+
+ node = new TbTransformMsgNode();
+ node.init(nodeConfiguration, null);
+ }
+
+ private void mockJsExecutor() {
+ when(ctx.getJsExecutor()).thenReturn(executor);
+ doAnswer((Answer<ListenableFuture<TbMsg>>) invocationOnMock -> {
+ try {
+ Callable task = (Callable) (invocationOnMock.getArguments())[0];
+ return Futures.immediateFuture((TbMsg) task.call());
+ } catch (Throwable th) {
+ return Futures.immediateFailedFuture(th);
+ }
+ }).when(executor).executeAsync(Matchers.any(Callable.class));
+ }
+
+ private void verifyError(TbMsg msg, String message, Class expectedClass) {
+ ArgumentCaptor<Throwable> captor = ArgumentCaptor.forClass(Throwable.class);
+ verify(ctx).tellError(same(msg), captor.capture());
+
+ Throwable value = captor.getValue();
+ assertEquals(expectedClass, value.getClass());
+ assertEquals(message, value.getMessage());
+ }
+}
\ No newline at end of file