thingsboard-developers
Changes
rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/ListeningExecutor.java 12(+12 -0)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsFilterNode.java 51(+51 -0)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsFilterNodeConfiguration.java 9(+9 -0)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsSwitchNode.java 56(+56 -0)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsSwitchNodeConfiguration.java 12(+12 -0)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/js/JsExecutorService.java 30(+30 -0)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/js/NashornJsEngine.java 79(+79 -0)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetAttributesNode.java 45(+32 -13)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetAttributesNodeConfiguration.java 2(+2 -0)
Details
diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/ListeningExecutor.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/ListeningExecutor.java
new file mode 100644
index 0000000..a233cb3
--- /dev/null
+++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/ListeningExecutor.java
@@ -0,0 +1,12 @@
+package org.thingsboard.rule.engine.api;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+import java.util.concurrent.Callable;
+
+public interface ListeningExecutor {
+
+ <T> ListenableFuture<T> executeAsync(Callable<T> task);
+
+ void onDestroy();
+}
diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java
index 541d35b..c7e2bec 100644
--- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java
+++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java
@@ -74,4 +74,6 @@ public interface TbContext {
RelationService getRelationService();
+ ListeningExecutor getJsExecutor();
+
}
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsFilterNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsFilterNode.java
new file mode 100644
index 0000000..476695e
--- /dev/null
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsFilterNode.java
@@ -0,0 +1,51 @@
+package org.thingsboard.rule.engine.filter;
+
+import lombok.extern.slf4j.Slf4j;
+import org.thingsboard.rule.engine.TbNodeUtils;
+import org.thingsboard.rule.engine.api.*;
+import org.thingsboard.rule.engine.js.NashornJsEngine;
+import org.thingsboard.server.common.msg.TbMsg;
+
+import javax.script.Bindings;
+
+import static org.thingsboard.rule.engine.DonAsynchron.withCallback;
+
+@Slf4j
+public class TbJsFilterNode implements TbNode {
+
+ private TbJsFilterNodeConfiguration config;
+ private NashornJsEngine jsEngine;
+
+ @Override
+ public void init(TbNodeConfiguration configuration, TbNodeState state) throws TbNodeException {
+ this.config = TbNodeUtils.convert(configuration, TbJsFilterNodeConfiguration.class);
+ this.jsEngine = new NashornJsEngine(config.getJsScript());
+ }
+
+ @Override
+ public void onMsg(TbContext ctx, TbMsg msg) {
+ ListeningExecutor jsExecutor = ctx.getJsExecutor();
+ withCallback(jsExecutor.executeAsync(() -> jsEngine.executeFilter(toBindings(msg))),
+ result -> processFilter(ctx, msg, result),
+ t -> ctx.tellError(msg, t));
+ }
+
+ private void processFilter(TbContext ctx, TbMsg msg, Boolean filterResult) {
+ if (filterResult) {
+ ctx.tellNext(msg);
+ } else {
+ log.debug("Msg filtered out {}", msg.getId());
+ }
+ }
+
+ private Bindings toBindings(TbMsg msg) {
+ return NashornJsEngine.bindMsg(msg);
+ }
+
+ @Override
+ public void destroy() {
+ if (jsEngine != null) {
+ jsEngine.destroy();
+ }
+ }
+}
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsFilterNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsFilterNodeConfiguration.java
new file mode 100644
index 0000000..488fa44
--- /dev/null
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsFilterNodeConfiguration.java
@@ -0,0 +1,9 @@
+package org.thingsboard.rule.engine.filter;
+
+import lombok.Data;
+
+@Data
+public class TbJsFilterNodeConfiguration {
+
+ private String jsScript;
+}
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsSwitchNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsSwitchNode.java
new file mode 100644
index 0000000..5d1514a
--- /dev/null
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsSwitchNode.java
@@ -0,0 +1,56 @@
+package org.thingsboard.rule.engine.filter;
+
+import lombok.extern.slf4j.Slf4j;
+import org.thingsboard.rule.engine.TbNodeUtils;
+import org.thingsboard.rule.engine.api.*;
+import org.thingsboard.rule.engine.js.NashornJsEngine;
+import org.thingsboard.server.common.msg.TbMsg;
+
+import javax.script.Bindings;
+
+import static org.thingsboard.rule.engine.DonAsynchron.withCallback;
+
+@Slf4j
+public class TbJsSwitchNode implements TbNode {
+
+ private TbJsSwitchNodeConfiguration config;
+ private NashornJsEngine jsEngine;
+
+ @Override
+ public void init(TbNodeConfiguration configuration, TbNodeState state) throws TbNodeException {
+ this.config = TbNodeUtils.convert(configuration, TbJsSwitchNodeConfiguration.class);
+ this.jsEngine = new NashornJsEngine(config.getJsScript());
+ if (config.getAllowedRelations().size() < 1) {
+ String message = "Switch node should have at least 1 relation";
+ log.error(message);
+ throw new IllegalStateException(message);
+ }
+ }
+
+ @Override
+ public void onMsg(TbContext ctx, TbMsg msg) {
+ ListeningExecutor jsExecutor = ctx.getJsExecutor();
+ withCallback(jsExecutor.executeAsync(() -> jsEngine.executeSwitch(toBindings(msg))),
+ result -> processSwitch(ctx, msg, result),
+ t -> ctx.tellError(msg, t));
+ }
+
+ private void processSwitch(TbContext ctx, TbMsg msg, String nextRelation) {
+ if (config.getAllowedRelations().contains(nextRelation)) {
+ ctx.tellNext(msg, nextRelation);
+ } else {
+ ctx.tellError(msg, new IllegalStateException("Unsupported relation for switch " + nextRelation));
+ }
+ }
+
+ private Bindings toBindings(TbMsg msg) {
+ return NashornJsEngine.bindMsg(msg);
+ }
+
+ @Override
+ public void destroy() {
+ if (jsEngine != null) {
+ jsEngine.destroy();
+ }
+ }
+}
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsSwitchNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsSwitchNodeConfiguration.java
new file mode 100644
index 0000000..a179c96
--- /dev/null
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsSwitchNodeConfiguration.java
@@ -0,0 +1,12 @@
+package org.thingsboard.rule.engine.filter;
+
+import lombok.Data;
+
+import java.util.Set;
+
+@Data
+public class TbJsSwitchNodeConfiguration {
+
+ private String jsScript;
+ private Set<String> allowedRelations;
+}
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/js/JsExecutorService.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/js/JsExecutorService.java
new file mode 100644
index 0000000..8991885
--- /dev/null
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/js/JsExecutorService.java
@@ -0,0 +1,30 @@
+package org.thingsboard.rule.engine.js;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import org.thingsboard.rule.engine.api.ListeningExecutor;
+
+import javax.annotation.PreDestroy;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
+
+public class JsExecutorService implements ListeningExecutor{
+
+ private final ListeningExecutorService service;
+
+ public JsExecutorService(int threadCount) {
+ this.service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(threadCount));
+ }
+
+ @Override
+ public <T> ListenableFuture<T> executeAsync(Callable<T> task) {
+ return service.submit(task);
+ }
+
+ @PreDestroy
+ @Override
+ public void onDestroy() {
+ service.shutdown();
+ }
+}
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/js/NashornJsEngine.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/js/NashornJsEngine.java
new file mode 100644
index 0000000..c64cf72
--- /dev/null
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/js/NashornJsEngine.java
@@ -0,0 +1,79 @@
+package org.thingsboard.rule.engine.js;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import jdk.nashorn.api.scripting.NashornScriptEngineFactory;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.ArrayUtils;
+import org.thingsboard.server.common.msg.TbMsg;
+
+import javax.script.*;
+import java.util.Map;
+
+
+@Slf4j
+public class NashornJsEngine {
+
+ public static final String METADATA = "meta";
+ public static final String DATA = "msg";
+ private static NashornScriptEngineFactory factory = new NashornScriptEngineFactory();
+
+ private CompiledScript engine;
+
+ public NashornJsEngine(String script) {
+ engine = compileScript(script);
+ }
+
+ private static CompiledScript compileScript(String script) {
+ ScriptEngine engine = factory.getScriptEngine(new String[]{"--no-java"});
+ Compilable compEngine = (Compilable) engine;
+ try {
+ return compEngine.compile(script);
+ } catch (ScriptException e) {
+ log.warn("Failed to compile JS script: {}", e.getMessage(), e);
+ throw new IllegalArgumentException("Can't compile script: " + e.getMessage());
+ }
+ }
+
+ public static Bindings bindMsg(TbMsg msg) {
+ try {
+ Bindings bindings = new SimpleBindings();
+ bindings.put(METADATA, msg.getMetaData().getData());
+
+ if (ArrayUtils.isNotEmpty(msg.getData())) {
+ ObjectMapper mapper = new ObjectMapper();
+ JsonNode jsonNode = mapper.readTree(msg.getData());
+ Map map = mapper.treeToValue(jsonNode, Map.class);
+ bindings.put(DATA, map);
+ }
+
+ return bindings;
+ } catch (Throwable th) {
+ throw new IllegalArgumentException("Cannot bind js args", th);
+ }
+ }
+
+ public boolean executeFilter(Bindings bindings) throws ScriptException {
+ Object eval = engine.eval(bindings);
+ if (eval instanceof Boolean) {
+ return (boolean) eval;
+ } else {
+ log.warn("Wrong result type: {}", eval);
+ throw new ScriptException("Wrong result type: " + eval);
+ }
+ }
+
+ public String executeSwitch(Bindings bindings) throws ScriptException, NoSuchMethodException {
+ Object eval = this.engine.eval(bindings);
+ if (eval instanceof String) {
+ return (String) eval;
+ } else {
+ log.warn("Wrong result type: {}", eval);
+ throw new ScriptException("Wrong result type: " + eval);
+ }
+ }
+
+ public void destroy() {
+ engine = null;
+ }
+}
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetAttributesNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetAttributesNode.java
index 4d41921..bc6bc4a 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetAttributesNode.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetAttributesNode.java
@@ -1,12 +1,12 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
- *
+ * <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -15,13 +15,16 @@
*/
package org.thingsboard.rule.engine.metadata;
-import com.google.common.util.concurrent.AsyncFunction;
+import com.google.common.base.Function;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections.CollectionUtils;
import org.thingsboard.rule.engine.TbNodeUtils;
import org.thingsboard.rule.engine.api.*;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
+import org.thingsboard.server.common.data.kv.KvEntry;
+import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.msg.TbMsg;
import java.util.List;
@@ -44,24 +47,40 @@ public class TbGetAttributesNode implements TbNode {
@Override
public void onMsg(TbContext ctx, TbMsg msg) throws TbNodeException {
- ListenableFuture<List<Void>> future = Futures.allAsList(
- putAttrAsync(ctx, msg, CLIENT_SCOPE, config.getClientAttributeNames(), "cs."),
- putAttrAsync(ctx, msg, SHARED_SCOPE, config.getSharedAttributeNames(), "shared."),
- putAttrAsync(ctx, msg, SERVER_SCOPE, config.getServerAttributeNames(), "ss."));
+ if (CollectionUtils.isNotEmpty(config.getLatestTsKeyNames())) {
+ withCallback(getLatestTelemetry(ctx, msg, config.getLatestTsKeyNames()),
+ i -> ctx.tellNext(msg),
+ t -> ctx.tellError(msg, t));
+ } else {
+ ListenableFuture<List<Void>> future = Futures.allAsList(
+ putAttrAsync(ctx, msg, CLIENT_SCOPE, config.getClientAttributeNames(), "cs."),
+ putAttrAsync(ctx, msg, SHARED_SCOPE, config.getSharedAttributeNames(), "shared."),
+ putAttrAsync(ctx, msg, SERVER_SCOPE, config.getServerAttributeNames(), "ss."));
- withCallback(future, i -> ctx.tellNext(msg), t -> ctx.tellError(msg, t));
+ withCallback(future, i -> ctx.tellNext(msg), t -> ctx.tellError(msg, t));
+ }
}
- private ListenableFuture<Void> putAttributesAsync(TbMsg msg, List<AttributeKvEntry> attributes, String prefix) {
+ private ListenableFuture<Void> putAttr(TbMsg msg, List<KvEntry> attributes, String prefix) {
attributes.forEach(r -> msg.getMetaData().putValue(prefix + r.getKey(), r.getValueAsString()));
return Futures.immediateFuture(null);
}
private ListenableFuture<Void> putAttrAsync(TbContext ctx, TbMsg msg, String scope, List<String> attributes, String prefix) {
- return Futures.transform(ctx.getAttributesService().find(msg.getOriginator(), scope, attributes),
- (AsyncFunction<List<AttributeKvEntry>, Void>) i -> putAttributesAsync(msg, i, prefix));
+ ListenableFuture<List<AttributeKvEntry>> latest = ctx.getAttributesService().find(msg.getOriginator(), scope, attributes);
+ return Futures.transform(latest, (Function<? super List<AttributeKvEntry>, Void>) l -> {
+ l.forEach(r -> msg.getMetaData().putValue(prefix + r.getKey(), r.getValueAsString()));
+ return null;
+ });
}
+ private ListenableFuture<Void> getLatestTelemetry(TbContext ctx, TbMsg msg, List<String> attributes) {
+ ListenableFuture<List<TsKvEntry>> latest = ctx.getTimeseriesService().findLatest(msg.getOriginator(), attributes);
+ return Futures.transform(latest, (Function<? super List<TsKvEntry>, Void>) l -> {
+ l.forEach(r -> msg.getMetaData().putValue(r.getKey(), r.getValueAsString()));
+ return null;
+ });
+ }
@Override
public void destroy() {
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetAttributesNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetAttributesNodeConfiguration.java
index b54edef..ad92314 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetAttributesNodeConfiguration.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetAttributesNodeConfiguration.java
@@ -29,4 +29,6 @@ public class TbGetAttributesNodeConfiguration {
private List<String> sharedAttributeNames;
private List<String> serverAttributeNames;
+ private List<String> latestTsKeyNames;
+
}
diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/filter/TbJsFilterNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/filter/TbJsFilterNodeTest.java
new file mode 100644
index 0000000..6a4bec2
--- /dev/null
+++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/filter/TbJsFilterNodeTest.java
@@ -0,0 +1,154 @@
+package org.thingsboard.rule.engine.filter;
+
+import com.datastax.driver.core.utils.UUIDs;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Matchers;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+import org.mockito.stubbing.Answer;
+import org.thingsboard.rule.engine.api.ListeningExecutor;
+import org.thingsboard.rule.engine.api.TbContext;
+import org.thingsboard.rule.engine.api.TbNodeConfiguration;
+import org.thingsboard.rule.engine.api.TbNodeException;
+import org.thingsboard.server.common.msg.TbMsg;
+import org.thingsboard.server.common.msg.TbMsgMetaData;
+
+import javax.script.ScriptException;
+import java.util.concurrent.Callable;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.*;
+
+@RunWith(MockitoJUnitRunner.class)
+public class TbJsFilterNodeTest {
+
+ private TbJsFilterNode node;
+
+ @Mock
+ private TbContext ctx;
+ @Mock
+ private ListeningExecutor executor;
+
+ @Test
+ public void falseEvaluationDoNotSendMsg() throws TbNodeException {
+ initWithScript("10 > 15;");
+ TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, new TbMsgMetaData(), "{}".getBytes());
+
+ mockJsExecutor();
+
+ node.onMsg(ctx, msg);
+ verify(ctx).getJsExecutor();
+ verifyNoMoreInteractions(ctx);
+ }
+
+ @Test
+ public void notValidMsgDataThrowsException() throws TbNodeException {
+ initWithScript("10 > 15;");
+ TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, new TbMsgMetaData(), new byte[4]);
+
+ when(ctx.getJsExecutor()).thenReturn(executor);
+
+ mockJsExecutor();
+
+ node.onMsg(ctx, msg);
+ verifyError(msg, "Cannot bind js args", IllegalArgumentException.class);
+ }
+
+ @Test
+ public void exceptionInJsThrowsException() throws TbNodeException {
+ initWithScript("meta.temp.curr < 15;");
+ TbMsgMetaData metaData = new TbMsgMetaData();
+ TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, "{}".getBytes());
+ mockJsExecutor();
+
+ node.onMsg(ctx, msg);
+ String expectedMessage = "TypeError: Cannot get property \"curr\" of null in <eval> at line number 1";
+ verifyError(msg, expectedMessage, ScriptException.class);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void notValidScriptThrowsException() throws TbNodeException {
+ initWithScript("10 > 15 asdq out");
+ }
+
+ @Test
+ public void metadataConditionCanBeFalse() throws TbNodeException {
+ initWithScript("meta.humidity < 15;");
+ TbMsgMetaData metaData = new TbMsgMetaData();
+ metaData.putValue("temp", "10");
+ metaData.putValue("humidity", "99");
+ TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, "{}".getBytes());
+ mockJsExecutor();
+
+ node.onMsg(ctx, msg);
+ verify(ctx).getJsExecutor();
+ verifyNoMoreInteractions(ctx);
+ }
+
+ @Test
+ public void metadataConditionCanBeTrue() throws TbNodeException {
+ initWithScript("meta.temp < 15;");
+ TbMsgMetaData metaData = new TbMsgMetaData();
+ metaData.putValue("temp", "10");
+ metaData.putValue("humidity", "99");
+ TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, "{}".getBytes());
+ mockJsExecutor();
+
+ node.onMsg(ctx, msg);
+ verify(ctx).getJsExecutor();
+ verify(ctx).tellNext(msg);
+ }
+
+ @Test
+ public void msgJsonParsedAndBinded() throws TbNodeException {
+ initWithScript("msg.passed < 15 && msg.name === 'Vit' && meta.temp == 10 && msg.bigObj.prop == 42;");
+ TbMsgMetaData metaData = new TbMsgMetaData();
+ metaData.putValue("temp", "10");
+ metaData.putValue("humidity", "99");
+ String rawJson = "{\"name\": \"Vit\", \"passed\": 5, \"bigObj\": {\"prop\":42}}";
+
+ TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson.getBytes());
+ mockJsExecutor();
+
+ node.onMsg(ctx, msg);
+ verify(ctx).getJsExecutor();
+ verify(ctx).tellNext(msg);
+ }
+
+ private void initWithScript(String script) throws TbNodeException {
+ TbJsFilterNodeConfiguration config = new TbJsFilterNodeConfiguration();
+ config.setJsScript(script);
+ ObjectMapper mapper = new ObjectMapper();
+ TbNodeConfiguration nodeConfiguration = new TbNodeConfiguration();
+ nodeConfiguration.setData(mapper.valueToTree(config));
+
+ node = new TbJsFilterNode();
+ node.init(nodeConfiguration, null);
+ }
+
+ private void mockJsExecutor() {
+ when(ctx.getJsExecutor()).thenReturn(executor);
+ doAnswer((Answer<ListenableFuture<Boolean>>) invocationOnMock -> {
+ try {
+ Callable task = (Callable) (invocationOnMock.getArguments())[0];
+ return Futures.immediateFuture((Boolean) task.call());
+ } catch (Throwable th) {
+ return Futures.immediateFailedFuture(th);
+ }
+ }).when(executor).executeAsync(Matchers.any(Callable.class));
+ }
+
+ private void verifyError(TbMsg msg, String message, Class expectedClass) {
+ ArgumentCaptor<Throwable> captor = ArgumentCaptor.forClass(Throwable.class);
+ verify(ctx).tellError(same(msg), captor.capture());
+
+ Throwable value = captor.getValue();
+ assertEquals(expectedClass, value.getClass());
+ assertEquals(message, value.getMessage());
+ }
+}
\ No newline at end of file
diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/filter/TbJsSwitchNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/filter/TbJsSwitchNodeTest.java
new file mode 100644
index 0000000..6ffa83b
--- /dev/null
+++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/filter/TbJsSwitchNodeTest.java
@@ -0,0 +1,118 @@
+package org.thingsboard.rule.engine.filter;
+
+import com.datastax.driver.core.utils.UUIDs;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Matchers;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+import org.mockito.stubbing.Answer;
+import org.thingsboard.rule.engine.api.ListeningExecutor;
+import org.thingsboard.rule.engine.api.TbContext;
+import org.thingsboard.rule.engine.api.TbNodeConfiguration;
+import org.thingsboard.rule.engine.api.TbNodeException;
+import org.thingsboard.server.common.msg.TbMsg;
+import org.thingsboard.server.common.msg.TbMsgMetaData;
+
+import java.util.Set;
+import java.util.concurrent.Callable;
+
+import static org.junit.Assert.*;
+import static org.mockito.Matchers.same;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class TbJsSwitchNodeTest {
+
+ private TbJsSwitchNode node;
+
+ @Mock
+ private TbContext ctx;
+ @Mock
+ private ListeningExecutor executor;
+
+ @Test
+ public void allowedRelationPassed() throws TbNodeException {
+ String jsCode = "function nextRelation(meta, msg) {\n" +
+ " if(msg.passed == 5 && meta.temp == 10)\n" +
+ " return 'one'\n" +
+ " else\n" +
+ " return 'two';\n" +
+ "};\n" +
+ "\n" +
+ "nextRelation(meta, msg);";
+ initWithScript(jsCode, Sets.newHashSet("one", "two"));
+ TbMsgMetaData metaData = new TbMsgMetaData();
+ metaData.putValue("temp", "10");
+ metaData.putValue("humidity", "99");
+ String rawJson = "{\"name\": \"Vit\", \"passed\": 5}";
+
+ TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson.getBytes());
+ mockJsExecutor();
+
+ node.onMsg(ctx, msg);
+ verify(ctx).getJsExecutor();
+ verify(ctx).tellNext(msg, "one");
+ }
+
+ @Test
+ public void unknownRelationThrowsException() throws TbNodeException {
+ String jsCode = "function nextRelation(meta, msg) {\n" +
+ " return 'nine';" +
+ "};\n" +
+ "\n" +
+ "nextRelation(meta, msg);";
+ initWithScript(jsCode, Sets.newHashSet("one", "two"));
+ TbMsgMetaData metaData = new TbMsgMetaData();
+ metaData.putValue("temp", "10");
+ metaData.putValue("humidity", "99");
+ String rawJson = "{\"name\": \"Vit\", \"passed\": 5}";
+
+ TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson.getBytes());
+ mockJsExecutor();
+
+ node.onMsg(ctx, msg);
+ verify(ctx).getJsExecutor();
+ verifyError(msg, "Unsupported relation for switch nine", IllegalStateException.class);
+ }
+
+ private void initWithScript(String script, Set<String> relations) throws TbNodeException {
+ TbJsSwitchNodeConfiguration config = new TbJsSwitchNodeConfiguration();
+ config.setJsScript(script);
+ config.setAllowedRelations(relations);
+ ObjectMapper mapper = new ObjectMapper();
+ TbNodeConfiguration nodeConfiguration = new TbNodeConfiguration();
+ nodeConfiguration.setData(mapper.valueToTree(config));
+
+ node = new TbJsSwitchNode();
+ node.init(nodeConfiguration, null);
+ }
+
+ private void mockJsExecutor() {
+ when(ctx.getJsExecutor()).thenReturn(executor);
+ doAnswer((Answer<ListenableFuture<String>>) invocationOnMock -> {
+ try {
+ Callable task = (Callable) (invocationOnMock.getArguments())[0];
+ return Futures.immediateFuture((String) task.call());
+ } catch (Throwable th) {
+ return Futures.immediateFailedFuture(th);
+ }
+ }).when(executor).executeAsync(Matchers.any(Callable.class));
+ }
+
+ private void verifyError(TbMsg msg, String message, Class expectedClass) {
+ ArgumentCaptor<Throwable> captor = ArgumentCaptor.forClass(Throwable.class);
+ verify(ctx).tellError(same(msg), captor.capture());
+
+ Throwable value = captor.getValue();
+ assertEquals(expectedClass, value.getClass());
+ assertEquals(message, value.getMessage());
+ }
+}
\ No newline at end of file