thingsboard-memoizeit
Changes
rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/ScriptEngine.java 5(+5 -0)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbAlarmNode.java 215(+215 -0)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbAlarmNodeConfiguration.java 44(+44 -0)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbLogNode.java 64(+64 -0)
Details
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
index d3fdc94..9b6e4ba 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
@@ -24,6 +24,7 @@ import org.thingsboard.rule.engine.api.ScriptEngine;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.common.data.id.RuleNodeId;
+import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.rule.RuleNode;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.cluster.ServerAddress;
@@ -120,6 +121,11 @@ class DefaultTbContext implements TbContext {
}
@Override
+ public TenantId getTenantId() {
+ return nodeCtx.getTenantId();
+ }
+
+ @Override
public void tellNext(TbMsg msg, Set<String> relationTypes) {
relationTypes.forEach(type -> tellNext(msg, type));
}
diff --git a/application/src/main/java/org/thingsboard/server/service/script/NashornJsEngine.java b/application/src/main/java/org/thingsboard/server/service/script/NashornJsEngine.java
index 38add44..b9583f0 100644
--- a/application/src/main/java/org/thingsboard/server/service/script/NashornJsEngine.java
+++ b/application/src/main/java/org/thingsboard/server/service/script/NashornJsEngine.java
@@ -146,6 +146,17 @@ public class NashornJsEngine implements org.thingsboard.rule.engine.api.ScriptEn
}
@Override
+ public JsonNode executeJson(TbMsg msg) throws ScriptException {
+ return executeScript(msg);
+ }
+
+ @Override
+ public String executeToString(TbMsg msg) throws ScriptException {
+ JsonNode result = executeScript(msg);
+ return result.asText();
+ }
+
+ @Override
public boolean executeFilter(TbMsg msg) throws ScriptException {
JsonNode result = executeScript(msg);
if (!result.isBoolean()) {
diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/ScriptEngine.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/ScriptEngine.java
index c2c8210..1db046a 100644
--- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/ScriptEngine.java
+++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/ScriptEngine.java
@@ -15,6 +15,7 @@
*/
package org.thingsboard.rule.engine.api;
+import com.fasterxml.jackson.databind.JsonNode;
import org.thingsboard.server.common.msg.TbMsg;
import javax.script.ScriptException;
@@ -30,6 +31,10 @@ public interface ScriptEngine {
Set<String> executeSwitch(TbMsg msg) throws ScriptException;
+ JsonNode executeJson(TbMsg msg) throws ScriptException;
+
+ String executeToString(TbMsg msg) throws ScriptException;
+
void destroy();
}
diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java
index b8c2b38..258b173 100644
--- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java
+++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java
@@ -16,6 +16,7 @@
package org.thingsboard.rule.engine.api;
import org.thingsboard.server.common.data.id.RuleNodeId;
+import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.rule.RuleNode;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.cluster.ServerAddress;
@@ -59,6 +60,8 @@ public interface TbContext {
RuleNodeId getSelfId();
+ TenantId getTenantId();
+
AttributesService getAttributesService();
CustomerService getCustomerService();
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbAlarmNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbAlarmNode.java
new file mode 100644
index 0000000..97d5477
--- /dev/null
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbAlarmNode.java
@@ -0,0 +1,215 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.rule.engine.action;
+
+import com.datastax.driver.core.utils.UUIDs;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Function;
+import com.google.common.util.concurrent.AsyncFunction;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import lombok.extern.slf4j.Slf4j;
+import org.thingsboard.rule.engine.TbNodeUtils;
+import org.thingsboard.rule.engine.api.*;
+import org.thingsboard.server.common.data.alarm.Alarm;
+import org.thingsboard.server.common.data.alarm.AlarmStatus;
+import org.thingsboard.server.common.data.id.TenantId;
+import org.thingsboard.server.common.data.plugin.ComponentType;
+import org.thingsboard.server.common.msg.TbMsg;
+import org.thingsboard.server.common.msg.TbMsgMetaData;
+
+import static org.thingsboard.rule.engine.DonAsynchron.withCallback;
+
+@Slf4j
+@RuleNode(
+ type = ComponentType.ACTION,
+ name = "alarm", relationTypes = {"Created", "Updated", "Cleared", "False"},
+ configClazz = TbAlarmNodeConfiguration.class,
+ nodeDescription = "Create/Update/Clear Alarm",
+ nodeDetails = "isAlarm - JS function that verifies if Alarm should be CREATED for incoming message.\n" +
+ "isCleared - JS function that verifies if Alarm should be CLEARED for incoming message.\n" +
+ "Details - JS function that creates JSON object based on incoming message. This object will be added into Alarm.details field.\n" +
+ "Node output:\n" +
+ "If alarm was not created, original message is returned. Otherwise new Message returned with type 'ALARM', Alarm object in 'msg' property and 'matadata' will contains one of those properties 'isNewAlarm/isExistingAlarm/isClearedAlarm' " +
+ "Message payload can be accessed via <code>msg</code> property. For example <code>'temperature = ' + msg.temperature ;</code>" +
+ "Message metadata can be accessed via <code>metadata</code> property. For example <code>'name = ' + metadata.customerName;</code>",
+ uiResources = {"static/rulenode/rulenode-core-config.js"})
+
+public class TbAlarmNode implements TbNode {
+
+ static final String IS_NEW_ALARM = "isNewAlarm";
+ static final String IS_EXISTING_ALARM = "isExistingAlarm";
+ static final String IS_CLEARED_ALARM = "isClearedAlarm";
+
+ private final ObjectMapper mapper = new ObjectMapper();
+
+ private TbAlarmNodeConfiguration config;
+ private ScriptEngine createJsEngine;
+ private ScriptEngine clearJsEngine;
+ private ScriptEngine buildDetailsJsEngine;
+
+ @Override
+ public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
+ this.config = TbNodeUtils.convert(configuration, TbAlarmNodeConfiguration.class);
+ this.createJsEngine = ctx.createJsScriptEngine(config.getCreateConditionJs(), "isAlarm");
+ this.clearJsEngine = ctx.createJsScriptEngine(config.getClearConditionJs(), "isCleared");
+ this.buildDetailsJsEngine = ctx.createJsScriptEngine(config.getAlarmDetailsBuildJs(), "Details");
+ }
+
+ @Override
+ public void onMsg(TbContext ctx, TbMsg msg) {
+ ListeningExecutor jsExecutor = ctx.getJsExecutor();
+
+ ListenableFuture<Boolean> shouldCreate = jsExecutor.executeAsync(() -> createJsEngine.executeFilter(msg));
+ ListenableFuture<AlarmResult> transform = Futures.transform(shouldCreate, (AsyncFunction<Boolean, AlarmResult>) create -> {
+ if (create) {
+ return createOrUpdate(ctx, msg);
+ } else {
+ return checkForClearIfExist(ctx, msg);
+ }
+ });
+
+ withCallback(transform,
+ alarmResult -> {
+ if (alarmResult.alarm == null) {
+ ctx.tellNext(msg, "False");
+ } else if (alarmResult.isCreated) {
+ ctx.tellNext(toAlarmMsg(alarmResult, msg), "Created");
+ } else if (alarmResult.isUpdated) {
+ ctx.tellNext(toAlarmMsg(alarmResult, msg), "Updated");
+ } else if (alarmResult.isCleared) {
+ ctx.tellNext(toAlarmMsg(alarmResult, msg), "Cleared");
+ }
+ },
+ t -> ctx.tellError(msg, t));
+
+ }
+
+ private ListenableFuture<AlarmResult> createOrUpdate(TbContext ctx, TbMsg msg) {
+ ListenableFuture<Alarm> latest = ctx.getAlarmService().findLatestByOriginatorAndType(ctx.getTenantId(), msg.getOriginator(), config.getAlarmType());
+ return Futures.transform(latest, (AsyncFunction<Alarm, AlarmResult>) a -> {
+ if (a == null || a.getStatus().isCleared()) {
+ return createNewAlarm(ctx, msg);
+ } else {
+ return updateAlarm(ctx, msg, a);
+ }
+ });
+ }
+
+ private ListenableFuture<AlarmResult> checkForClearIfExist(TbContext ctx, TbMsg msg) {
+ ListenableFuture<Alarm> latest = ctx.getAlarmService().findLatestByOriginatorAndType(ctx.getTenantId(), msg.getOriginator(), config.getAlarmType());
+ return Futures.transform(latest, (AsyncFunction<Alarm, AlarmResult>) a -> {
+ if (a != null && !a.getStatus().isCleared()) {
+ return clearAlarm(ctx, msg, a);
+ }
+ return Futures.immediateFuture(new AlarmResult(false, false, false, null));
+ });
+ }
+
+ private ListenableFuture<AlarmResult> createNewAlarm(TbContext ctx, TbMsg msg) {
+ ListenableFuture<Alarm> asyncAlarm = Futures.transform(buildAlarmDetails(ctx, msg), (Function<JsonNode, Alarm>) details -> buildAlarm(msg, details, ctx.getTenantId()));
+ ListenableFuture<Alarm> asyncCreated = Futures.transform(asyncAlarm, (Function<Alarm, Alarm>) alarm -> ctx.getAlarmService().createOrUpdateAlarm(alarm));
+ return Futures.transform(asyncCreated, (Function<Alarm, AlarmResult>) alarm -> new AlarmResult(true, false, false, alarm));
+ }
+
+ private ListenableFuture<AlarmResult> updateAlarm(TbContext ctx, TbMsg msg, Alarm alarm) {
+ ListenableFuture<Alarm> asyncUpdated = Futures.transform(buildAlarmDetails(ctx, msg), (Function<JsonNode, Alarm>) details -> {
+ alarm.setSeverity(config.getSeverity());
+ alarm.setPropagate(config.isPropagate());
+ alarm.setDetails(details);
+ alarm.setEndTs(System.currentTimeMillis());
+ return ctx.getAlarmService().createOrUpdateAlarm(alarm);
+ });
+
+ return Futures.transform(asyncUpdated, (Function<Alarm, AlarmResult>) a -> new AlarmResult(false, true, false, a));
+ }
+
+ private ListenableFuture<AlarmResult> clearAlarm(TbContext ctx, TbMsg msg, Alarm alarm) {
+ ListenableFuture<Boolean> shouldClear = ctx.getJsExecutor().executeAsync(() -> clearJsEngine.executeFilter(msg));
+ return Futures.transform(shouldClear, (AsyncFunction<Boolean, AlarmResult>) clear -> {
+ if (clear) {
+ ListenableFuture<Boolean> clearFuture = ctx.getAlarmService().clearAlarm(alarm.getId(), System.currentTimeMillis());
+ return Futures.transform(clearFuture, (Function<Boolean, AlarmResult>) cleared -> {
+ alarm.setStatus(alarm.getStatus().isAck() ? AlarmStatus.CLEARED_ACK : AlarmStatus.CLEARED_UNACK);
+ return new AlarmResult(false, false, true, alarm);
+ });
+ }
+ return Futures.immediateFuture(new AlarmResult(false, false, false, null));
+ });
+ }
+
+ private Alarm buildAlarm(TbMsg msg, JsonNode details, TenantId tenantId) {
+ return Alarm.builder()
+ .tenantId(tenantId)
+ .originator(msg.getOriginator())
+ .status(AlarmStatus.ACTIVE_UNACK)
+ .severity(config.getSeverity())
+ .propagate(config.isPropagate())
+ .type(config.getAlarmType())
+ //todo-vp: alarm date should be taken from Message or current Time should be used?
+// .startTs(System.currentTimeMillis())
+// .endTs(System.currentTimeMillis())
+ .details(details)
+ .build();
+ }
+
+ private ListenableFuture<JsonNode> buildAlarmDetails(TbContext ctx, TbMsg msg) {
+ return ctx.getJsExecutor().executeAsync(() -> buildDetailsJsEngine.executeJson(msg));
+ }
+
+ private TbMsg toAlarmMsg(AlarmResult alarmResult, TbMsg originalMsg) {
+ JsonNode jsonNodes = mapper.valueToTree(alarmResult.alarm);
+ String data = jsonNodes.toString();
+ TbMsgMetaData metaData = originalMsg.getMetaData().copy();
+ if (alarmResult.isCreated) {
+ metaData.putValue(IS_NEW_ALARM, Boolean.TRUE.toString());
+ } else if (alarmResult.isUpdated) {
+ metaData.putValue(IS_EXISTING_ALARM, Boolean.TRUE.toString());
+ } else if (alarmResult.isCleared) {
+ metaData.putValue(IS_CLEARED_ALARM, Boolean.TRUE.toString());
+ }
+ return new TbMsg(UUIDs.timeBased(), "ALARM", originalMsg.getOriginator(), metaData, data);
+ }
+
+
+ @Override
+ public void destroy() {
+ if (createJsEngine != null) {
+ createJsEngine.destroy();
+ }
+ if (clearJsEngine != null) {
+ clearJsEngine.destroy();
+ }
+ if (buildDetailsJsEngine != null) {
+ buildDetailsJsEngine.destroy();
+ }
+ }
+
+ private static class AlarmResult {
+ boolean isCreated;
+ boolean isUpdated;
+ boolean isCleared;
+ Alarm alarm;
+
+ AlarmResult(boolean isCreated, boolean isUpdated, boolean isCleared, Alarm alarm) {
+ this.isCreated = isCreated;
+ this.isUpdated = isUpdated;
+ this.isCleared = isCleared;
+ this.alarm = alarm;
+ }
+ }
+}
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbAlarmNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbAlarmNodeConfiguration.java
new file mode 100644
index 0000000..3575459
--- /dev/null
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbAlarmNodeConfiguration.java
@@ -0,0 +1,44 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.rule.engine.action;
+
+import lombok.Data;
+import org.thingsboard.rule.engine.api.NodeConfiguration;
+import org.thingsboard.server.common.data.alarm.AlarmSeverity;
+
+@Data
+public class TbAlarmNodeConfiguration implements NodeConfiguration {
+
+ private String createConditionJs;
+ private String clearConditionJs;
+ private String alarmDetailsBuildJs;
+ private String alarmType;
+ private AlarmSeverity severity;
+ private boolean propagate;
+
+
+ @Override
+ public TbAlarmNodeConfiguration defaultConfiguration() {
+ TbAlarmNodeConfiguration configuration = new TbAlarmNodeConfiguration();
+ configuration.setCreateConditionJs("return 'incoming message = ' + msg + meta;");
+ configuration.setClearConditionJs("return 'incoming message = ' + msg + meta;");
+ configuration.setAlarmDetailsBuildJs("return 'incoming message = ' + msg + meta;");
+ configuration.setAlarmType("General Alarm");
+ configuration.setSeverity(AlarmSeverity.CRITICAL);
+ configuration.setPropagate(false);
+ return configuration;
+ }
+}
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbLogNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbLogNode.java
new file mode 100644
index 0000000..8dbbf08
--- /dev/null
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbLogNode.java
@@ -0,0 +1,64 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.rule.engine.action;
+
+import lombok.extern.slf4j.Slf4j;
+import org.thingsboard.rule.engine.TbNodeUtils;
+import org.thingsboard.rule.engine.api.*;
+import org.thingsboard.server.common.data.plugin.ComponentType;
+import org.thingsboard.server.common.msg.TbMsg;
+
+import static org.thingsboard.rule.engine.DonAsynchron.withCallback;
+
+@Slf4j
+@RuleNode(
+ type = ComponentType.ACTION,
+ name = "log",
+ configClazz = TbLogNodeConfiguration.class,
+ nodeDescription = "Log incoming messages using JS script for transformation Message into String",
+ nodeDetails = "Transform incoming Message with configured JS condition to String and log final value. " +
+ "Message payload can be accessed via <code>msg</code> property. For example <code>'temperature = ' + msg.temperature ;</code>" +
+ "Message metadata can be accessed via <code>metadata</code> property. For example <code>'name = ' + metadata.customerName;</code>")
+
+public class TbLogNode implements TbNode {
+
+ private TbLogNodeConfiguration config;
+ private ScriptEngine jsEngine;
+
+ @Override
+ public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
+ this.config = TbNodeUtils.convert(configuration, TbLogNodeConfiguration.class);
+ this.jsEngine = ctx.createJsScriptEngine(config.getJsScript(), "ToString");
+ }
+
+ @Override
+ public void onMsg(TbContext ctx, TbMsg msg) {
+ ListeningExecutor jsExecutor = ctx.getJsExecutor();
+ withCallback(jsExecutor.executeAsync(() -> jsEngine.executeToString(msg)),
+ toString -> {
+ log.info(toString);
+ ctx.tellNext(msg);
+ },
+ t -> ctx.tellError(msg, t));
+ }
+
+ @Override
+ public void destroy() {
+ if (jsEngine != null) {
+ jsEngine.destroy();
+ }
+ }
+}
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbLogNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbLogNodeConfiguration.java
new file mode 100644
index 0000000..aafb7f1
--- /dev/null
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbLogNodeConfiguration.java
@@ -0,0 +1,32 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.rule.engine.action;
+
+import lombok.Data;
+import org.thingsboard.rule.engine.api.NodeConfiguration;
+
+@Data
+public class TbLogNodeConfiguration implements NodeConfiguration {
+
+ private String jsScript;
+
+ @Override
+ public TbLogNodeConfiguration defaultConfiguration() {
+ TbLogNodeConfiguration configuration = new TbLogNodeConfiguration();
+ configuration.setJsScript("return 'incoming message = ' + msg + meta;");
+ return configuration;
+ }
+}
diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbAlarmNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbAlarmNodeTest.java
new file mode 100644
index 0000000..69c3aca
--- /dev/null
+++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbAlarmNodeTest.java
@@ -0,0 +1,341 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.rule.engine.action;
+
+import com.datastax.driver.core.utils.UUIDs;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.commons.lang3.NotImplementedException;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+import org.mockito.stubbing.Answer;
+import org.thingsboard.rule.engine.api.*;
+import org.thingsboard.server.common.data.alarm.Alarm;
+import org.thingsboard.server.common.data.id.DeviceId;
+import org.thingsboard.server.common.data.id.EntityId;
+import org.thingsboard.server.common.data.id.TenantId;
+import org.thingsboard.server.common.msg.TbMsg;
+import org.thingsboard.server.common.msg.TbMsgMetaData;
+import org.thingsboard.server.dao.alarm.AlarmService;
+
+import javax.script.ScriptException;
+import java.io.IOException;
+import java.util.concurrent.Callable;
+
+import static org.junit.Assert.*;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.*;
+import static org.thingsboard.rule.engine.action.TbAlarmNode.*;
+import static org.thingsboard.server.common.data.alarm.AlarmSeverity.CRITICAL;
+import static org.thingsboard.server.common.data.alarm.AlarmSeverity.WARNING;
+import static org.thingsboard.server.common.data.alarm.AlarmStatus.*;
+
+@RunWith(MockitoJUnitRunner.class)
+public class TbAlarmNodeTest {
+
+ private TbAlarmNode node;
+
+ @Mock
+ private TbContext ctx;
+ @Mock
+ private ListeningExecutor executor;
+ @Mock
+ private AlarmService alarmService;
+
+ @Mock
+ private ScriptEngine createJs;
+ @Mock
+ private ScriptEngine clearJs;
+ @Mock
+ private ScriptEngine detailsJs;
+
+ private EntityId originator = new DeviceId(UUIDs.timeBased());
+ private TenantId tenantId = new TenantId(UUIDs.timeBased());
+ private TbMsgMetaData metaData = new TbMsgMetaData();
+ private String rawJson = "{\"name\": \"Vit\", \"passed\": 5}";
+
+ @Test
+ public void newAlarmCanBeCreated() throws ScriptException, IOException {
+ initWithScript();
+ metaData.putValue("key", "value");
+ TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", originator, metaData, rawJson);
+
+ when(createJs.executeFilter(msg)).thenReturn(true);
+ when(detailsJs.executeJson(msg)).thenReturn(null);
+ when(alarmService.findLatestByOriginatorAndType(tenantId, originator, "SomeType")).thenReturn(Futures.immediateFuture(null));
+
+ doAnswer((Answer<Alarm>) invocationOnMock -> (Alarm) (invocationOnMock.getArguments())[0]).when(alarmService).createOrUpdateAlarm(any(Alarm.class));
+
+ node.onMsg(ctx, msg);
+
+ ArgumentCaptor<TbMsg> captor = ArgumentCaptor.forClass(TbMsg.class);
+ verify(ctx).tellNext(captor.capture(), eq("Created"));
+ TbMsg actualMsg = captor.getValue();
+
+ assertEquals("ALARM", actualMsg.getType());
+ assertEquals(originator, actualMsg.getOriginator());
+ assertEquals("value", actualMsg.getMetaData().getValue("key"));
+ assertEquals(Boolean.TRUE.toString(), actualMsg.getMetaData().getValue(IS_NEW_ALARM));
+ assertNotSame(metaData, actualMsg.getMetaData());
+
+ Alarm actualAlarm = new ObjectMapper().readValue(actualMsg.getData().getBytes(), Alarm.class);
+ Alarm expectedAlarm = Alarm.builder()
+ .tenantId(tenantId)
+ .originator(originator)
+ .status(ACTIVE_UNACK)
+ .severity(CRITICAL)
+ .propagate(true)
+ .type("SomeType")
+ .details(null)
+ .build();
+
+ assertEquals(expectedAlarm, actualAlarm);
+
+ verify(executor, times(2)).executeAsync(any(Callable.class));
+ }
+
+ @Test
+ public void shouldCreateScriptThrowsException() throws ScriptException {
+ initWithScript();
+ metaData.putValue("key", "value");
+ TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", originator, metaData, rawJson);
+
+ when(createJs.executeFilter(msg)).thenThrow(new NotImplementedException("message"));
+
+ node.onMsg(ctx, msg);
+
+ verifyError(msg, "message", NotImplementedException.class);
+
+
+ verify(ctx).createJsScriptEngine("CREATE", "isAlarm");
+ verify(ctx).createJsScriptEngine("CLEAR", "isCleared");
+ verify(ctx).createJsScriptEngine("DETAILS", "Details");
+ verify(ctx).getJsExecutor();
+
+ verifyNoMoreInteractions(ctx, alarmService, clearJs, detailsJs);
+ }
+
+ @Test
+ public void buildDetailsThrowsException() throws ScriptException, IOException {
+ initWithScript();
+ metaData.putValue("key", "value");
+ TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", originator, metaData, rawJson);
+
+ when(createJs.executeFilter(msg)).thenReturn(true);
+ when(detailsJs.executeJson(msg)).thenThrow(new NotImplementedException("message"));
+ when(alarmService.findLatestByOriginatorAndType(tenantId, originator, "SomeType")).thenReturn(Futures.immediateFuture(null));
+
+ node.onMsg(ctx, msg);
+
+ verifyError(msg, "message", NotImplementedException.class);
+
+ verify(ctx).createJsScriptEngine("CREATE", "isAlarm");
+ verify(ctx).createJsScriptEngine("CLEAR", "isCleared");
+ verify(ctx).createJsScriptEngine("DETAILS", "Details");
+ verify(ctx, times(2)).getJsExecutor();
+ verify(ctx).getAlarmService();
+ verify(ctx).getTenantId();
+ verify(alarmService).findLatestByOriginatorAndType(tenantId, originator, "SomeType");
+
+ verifyNoMoreInteractions(ctx, alarmService, clearJs);
+ }
+
+ @Test
+ public void ifAlarmClearedCreateNew() throws ScriptException, IOException {
+ initWithScript();
+ metaData.putValue("key", "value");
+ TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", originator, metaData, rawJson);
+
+ Alarm clearedAlarm = Alarm.builder().status(CLEARED_ACK).build();
+
+ when(createJs.executeFilter(msg)).thenReturn(true);
+ when(detailsJs.executeJson(msg)).thenReturn(null);
+ when(alarmService.findLatestByOriginatorAndType(tenantId, originator, "SomeType")).thenReturn(Futures.immediateFuture(clearedAlarm));
+
+ doAnswer((Answer<Alarm>) invocationOnMock -> (Alarm) (invocationOnMock.getArguments())[0]).when(alarmService).createOrUpdateAlarm(any(Alarm.class));
+
+ node.onMsg(ctx, msg);
+
+ ArgumentCaptor<TbMsg> captor = ArgumentCaptor.forClass(TbMsg.class);
+ verify(ctx).tellNext(captor.capture(), eq("Created"));
+ TbMsg actualMsg = captor.getValue();
+
+ assertEquals("ALARM", actualMsg.getType());
+ assertEquals(originator, actualMsg.getOriginator());
+ assertEquals("value", actualMsg.getMetaData().getValue("key"));
+ assertEquals(Boolean.TRUE.toString(), actualMsg.getMetaData().getValue(IS_NEW_ALARM));
+ assertNotSame(metaData, actualMsg.getMetaData());
+
+ Alarm actualAlarm = new ObjectMapper().readValue(actualMsg.getData().getBytes(), Alarm.class);
+ Alarm expectedAlarm = Alarm.builder()
+ .tenantId(tenantId)
+ .originator(originator)
+ .status(ACTIVE_UNACK)
+ .severity(CRITICAL)
+ .propagate(true)
+ .type("SomeType")
+ .details(null)
+ .build();
+
+ assertEquals(expectedAlarm, actualAlarm);
+
+ verify(executor, times(2)).executeAsync(any(Callable.class));
+ }
+
+ @Test
+ public void alarmCanBeUpdated() throws ScriptException, IOException {
+ initWithScript();
+ metaData.putValue("key", "value");
+ TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", originator, metaData, rawJson);
+
+ long oldEndDate = System.currentTimeMillis();
+ Alarm activeAlarm = Alarm.builder().type("SomeType").tenantId(tenantId).originator(originator).status(ACTIVE_UNACK).severity(WARNING).endTs(oldEndDate).build();
+
+ when(createJs.executeFilter(msg)).thenReturn(true);
+ when(clearJs.executeFilter(msg)).thenReturn(false);
+ when(detailsJs.executeJson(msg)).thenReturn(null);
+ when(alarmService.findLatestByOriginatorAndType(tenantId, originator, "SomeType")).thenReturn(Futures.immediateFuture(activeAlarm));
+
+ doAnswer((Answer<Alarm>) invocationOnMock -> (Alarm) (invocationOnMock.getArguments())[0]).when(alarmService).createOrUpdateAlarm(activeAlarm);
+
+ node.onMsg(ctx, msg);
+
+ ArgumentCaptor<TbMsg> captor = ArgumentCaptor.forClass(TbMsg.class);
+ verify(ctx).tellNext(captor.capture(), eq("Updated"));
+ TbMsg actualMsg = captor.getValue();
+
+ assertEquals("ALARM", actualMsg.getType());
+ assertEquals(originator, actualMsg.getOriginator());
+ assertEquals("value", actualMsg.getMetaData().getValue("key"));
+ assertEquals(Boolean.TRUE.toString(), actualMsg.getMetaData().getValue(IS_EXISTING_ALARM));
+ assertNotSame(metaData, actualMsg.getMetaData());
+
+ Alarm actualAlarm = new ObjectMapper().readValue(actualMsg.getData().getBytes(), Alarm.class);
+ assertTrue(activeAlarm.getEndTs() > oldEndDate);
+ Alarm expectedAlarm = Alarm.builder()
+ .tenantId(tenantId)
+ .originator(originator)
+ .status(ACTIVE_UNACK)
+ .severity(CRITICAL)
+ .propagate(true)
+ .type("SomeType")
+ .details(null)
+ .endTs(activeAlarm.getEndTs())
+ .build();
+
+ assertEquals(expectedAlarm, actualAlarm);
+
+ verify(executor, times(2)).executeAsync(any(Callable.class));
+ }
+
+ @Test
+ public void alarmCanBeCleared() throws ScriptException, IOException {
+ initWithScript();
+ metaData.putValue("key", "value");
+ TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", originator, metaData, rawJson);
+
+ long oldEndDate = System.currentTimeMillis();
+ Alarm activeAlarm = Alarm.builder().type("SomeType").tenantId(tenantId).originator(originator).status(ACTIVE_UNACK).severity(WARNING).endTs(oldEndDate).build();
+
+ when(createJs.executeFilter(msg)).thenReturn(false);
+ when(clearJs.executeFilter(msg)).thenReturn(true);
+// when(detailsJs.executeJson(msg)).thenReturn(null);
+ when(alarmService.findLatestByOriginatorAndType(tenantId, originator, "SomeType")).thenReturn(Futures.immediateFuture(activeAlarm));
+ when(alarmService.clearAlarm(eq(activeAlarm.getId()), anyLong())).thenReturn(Futures.immediateFuture(true));
+// doAnswer((Answer<Alarm>) invocationOnMock -> (Alarm) (invocationOnMock.getArguments())[0]).when(alarmService).createOrUpdateAlarm(activeAlarm);
+
+ node.onMsg(ctx, msg);
+
+ ArgumentCaptor<TbMsg> captor = ArgumentCaptor.forClass(TbMsg.class);
+ verify(ctx).tellNext(captor.capture(), eq("Cleared"));
+ TbMsg actualMsg = captor.getValue();
+
+ assertEquals("ALARM", actualMsg.getType());
+ assertEquals(originator, actualMsg.getOriginator());
+ assertEquals("value", actualMsg.getMetaData().getValue("key"));
+ assertEquals(Boolean.TRUE.toString(), actualMsg.getMetaData().getValue(IS_CLEARED_ALARM));
+ assertNotSame(metaData, actualMsg.getMetaData());
+
+ Alarm actualAlarm = new ObjectMapper().readValue(actualMsg.getData().getBytes(), Alarm.class);
+ Alarm expectedAlarm = Alarm.builder()
+ .tenantId(tenantId)
+ .originator(originator)
+ .status(CLEARED_UNACK)
+ .severity(WARNING)
+ .propagate(false)
+ .type("SomeType")
+ .details(null)
+ .endTs(oldEndDate)
+ .build();
+
+ assertEquals(expectedAlarm, actualAlarm);
+ }
+
+ private void initWithScript() {
+ try {
+ TbAlarmNodeConfiguration config = new TbAlarmNodeConfiguration();
+ config.setPropagate(true);
+ config.setSeverity(CRITICAL);
+ config.setAlarmType("SomeType");
+ config.setCreateConditionJs("CREATE");
+ config.setClearConditionJs("CLEAR");
+ config.setAlarmDetailsBuildJs("DETAILS");
+ ObjectMapper mapper = new ObjectMapper();
+ TbNodeConfiguration nodeConfiguration = new TbNodeConfiguration(mapper.valueToTree(config));
+
+ when(ctx.createJsScriptEngine("CREATE", "isAlarm")).thenReturn(createJs);
+ when(ctx.createJsScriptEngine("CLEAR", "isCleared")).thenReturn(clearJs);
+ when(ctx.createJsScriptEngine("DETAILS", "Details")).thenReturn(detailsJs);
+
+ when(ctx.getTenantId()).thenReturn(tenantId);
+ when(ctx.getJsExecutor()).thenReturn(executor);
+ when(ctx.getAlarmService()).thenReturn(alarmService);
+
+ mockJsExecutor();
+
+ node = new TbAlarmNode();
+ node.init(ctx, nodeConfiguration);
+ } catch (TbNodeException ex) {
+ throw new IllegalStateException(ex);
+ }
+ }
+
+ private void mockJsExecutor() {
+ when(ctx.getJsExecutor()).thenReturn(executor);
+ doAnswer((Answer<ListenableFuture<Boolean>>) invocationOnMock -> {
+ try {
+ Callable task = (Callable) (invocationOnMock.getArguments())[0];
+ return Futures.immediateFuture((Boolean) task.call());
+ } catch (Throwable th) {
+ return Futures.immediateFailedFuture(th);
+ }
+ }).when(executor).executeAsync(any(Callable.class));
+ }
+
+ private void verifyError(TbMsg msg, String message, Class expectedClass) {
+ ArgumentCaptor<Throwable> captor = ArgumentCaptor.forClass(Throwable.class);
+ verify(ctx).tellError(same(msg), captor.capture());
+
+ Throwable value = captor.getValue();
+ assertEquals(expectedClass, value.getClass());
+ assertEquals(message, value.getMessage());
+ }
+
+}
\ No newline at end of file