thingsboard-memoizeit
Changes
application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java 87(+58 -29)
application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainToRuleNodeMsg.java 1(+0 -1)
application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActorMessageProcessor.java 27(+17 -10)
application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeToSelfErrorMsg.java 1(+0 -1)
application/src/test/java/org/thingsboard/server/controller/AbstractRuleEngineControllerTest.java 2(+1 -1)
application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java 70(+52 -18)
application/src/test/java/org/thingsboard/server/rules/lifecycle/AbstractRuleEngineLifecycleIntegrationTest.java 158(+158 -0)
application/src/test/java/org/thingsboard/server/rules/lifecycle/RuleEngineLifecycleSqlIntegrationTest.java 9(+6 -3)
common/data/src/main/java/org/thingsboard/server/common/data/rule/NodeConnectionInfo.java 15(+10 -5)
Details
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActor.java
index 9c8de22..f539e32 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActor.java
@@ -15,6 +15,8 @@
*/
package org.thingsboard.server.actors.ruleChain;
+import akka.actor.OneForOneStrategy;
+import akka.actor.SupervisorStrategy;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.actors.service.ComponentActor;
import org.thingsboard.server.actors.service.ContextBasedCreator;
@@ -23,6 +25,7 @@ import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg;
+import scala.concurrent.duration.Duration;
public class RuleChainActor extends ComponentActor<RuleChainId, RuleChainActorMessageProcessor> {
@@ -73,5 +76,13 @@ public class RuleChainActor extends ComponentActor<RuleChainId, RuleChainActorMe
return systemContext.getRuleChainErrorPersistFrequency();
}
- //TODO: failover strategy
+ @Override
+ public SupervisorStrategy supervisorStrategy() {
+ return strategy;
+ }
+
+ private final SupervisorStrategy strategy = new OneForOneStrategy(3, Duration.create("1 minute"), t -> {
+ logAndPersist("Unknown Failure", ActorSystemContext.toException(t));
+ return SupervisorStrategy.resume();
+ });
}
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
index 7636152..a853b15 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
@@ -27,12 +27,14 @@ import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.RuleChainId;
import org.thingsboard.server.common.data.id.RuleNodeId;
import org.thingsboard.server.common.data.id.TenantId;
+import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
import org.thingsboard.server.common.data.plugin.ComponentLifecycleState;
import org.thingsboard.server.common.data.relation.EntityRelation;
import org.thingsboard.server.common.data.rule.RuleChain;
import org.thingsboard.server.common.data.rule.RuleNode;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
+import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg;
import org.thingsboard.server.dao.rule.RuleChainService;
@@ -40,6 +42,8 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
/**
* @author Andrew Shvayka
@@ -54,7 +58,6 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
private RuleNodeId firstId;
private RuleNodeCtx firstNode;
- private ComponentLifecycleState state;
RuleChainActorMessageProcessor(TenantId tenantId, RuleChainId ruleChainId, ActorSystemContext systemContext
, LoggingAdapter logger, ActorRef parent, ActorRef self) {
@@ -68,20 +71,64 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
@Override
public void start(ActorContext context) throws Exception {
- if (state == ComponentLifecycleState.ACTIVE) {
- return;
- }
RuleChain ruleChain = service.findRuleChainById(entityId);
List<RuleNode> ruleNodeList = service.getRuleChainNodes(entityId);
// Creating and starting the actors;
for (RuleNode ruleNode : ruleNodeList) {
- String dispatcherName = tenantId.getId().equals(EntityId.NULL_UUID) ?
- DefaultActorService.SYSTEM_RULE_DISPATCHER_NAME : DefaultActorService.TENANT_RULE_DISPATCHER_NAME;
- ActorRef ruleNodeActor = context.actorOf(
- Props.create(new RuleNodeActor.ActorCreator(systemContext, tenantId, entityId, ruleNode.getId()))
- .withDispatcher(dispatcherName), ruleNode.getId().toString());
+ ActorRef ruleNodeActor = createRuleNodeActor(context, ruleNode);
nodeActors.put(ruleNode.getId(), new RuleNodeCtx(tenantId, self, ruleNodeActor, ruleNode));
}
+ initRoutes(ruleChain, ruleNodeList);
+ }
+
+ @Override
+ public void onUpdate(ActorContext context) throws Exception {
+ RuleChain ruleChain = service.findRuleChainById(entityId);
+ List<RuleNode> ruleNodeList = service.getRuleChainNodes(entityId);
+
+ for (RuleNode ruleNode : ruleNodeList) {
+ RuleNodeCtx existing = nodeActors.get(ruleNode.getId());
+ if (existing == null) {
+ ActorRef ruleNodeActor = createRuleNodeActor(context, ruleNode);
+ nodeActors.put(ruleNode.getId(), new RuleNodeCtx(tenantId, self, ruleNodeActor, ruleNode));
+ } else {
+ existing.getSelfActor().tell(new ComponentLifecycleMsg(tenantId, existing.getSelf().getId(), ComponentLifecycleEvent.UPDATED), self);
+ }
+ }
+
+ Set<RuleNodeId> existingNodes = ruleNodeList.stream().map(RuleNode::getId).collect(Collectors.toSet());
+ List<RuleNodeId> removedRules = nodeActors.keySet().stream().filter(node -> !existingNodes.contains(node)).collect(Collectors.toList());
+ removedRules.forEach(ruleNodeId -> {
+ RuleNodeCtx removed = nodeActors.remove(ruleNodeId);
+ removed.getSelfActor().tell(new ComponentLifecycleMsg(tenantId, removed.getSelf().getId(), ComponentLifecycleEvent.DELETED), self);
+ });
+
+ initRoutes(ruleChain, ruleNodeList);
+ }
+
+ @Override
+ public void stop(ActorContext context) throws Exception {
+ nodeActors.values().stream().map(RuleNodeCtx::getSelfActor).forEach(context::stop);
+ nodeActors.clear();
+ nodeRoutes.clear();
+ context.stop(self);
+ }
+
+ @Override
+ public void onClusterEventMsg(ClusterEventMsg msg) throws Exception {
+
+ }
+
+ private ActorRef createRuleNodeActor(ActorContext context, RuleNode ruleNode) {
+ String dispatcherName = tenantId.getId().equals(EntityId.NULL_UUID) ?
+ DefaultActorService.SYSTEM_RULE_DISPATCHER_NAME : DefaultActorService.TENANT_RULE_DISPATCHER_NAME;
+ return context.actorOf(
+ Props.create(new RuleNodeActor.ActorCreator(systemContext, tenantId, entityId, ruleNode.getId()))
+ .withDispatcher(dispatcherName), ruleNode.getId().toString());
+ }
+
+ private void initRoutes(RuleChain ruleChain, List<RuleNode> ruleNodeList) {
+ nodeRoutes.clear();
// Populating the routes map;
for (RuleNode ruleNode : ruleNodeList) {
List<EntityRelation> relations = service.getRuleNodeRelations(ruleNode.getId());
@@ -102,19 +149,6 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
state = ComponentLifecycleState.ACTIVE;
}
- @Override
- public void stop(ActorContext context) throws Exception {
- nodeActors.values().stream().map(RuleNodeCtx::getSelfActor).forEach(context::stop);
- nodeActors.clear();
- nodeRoutes.clear();
- state = ComponentLifecycleState.SUSPENDED;
- }
-
- @Override
- public void onClusterEventMsg(ClusterEventMsg msg) throws Exception {
-
- }
-
void onServiceToRuleEngineMsg(ServiceToRuleEngineMsg envelope) {
checkActive();
TbMsg tbMsg = envelope.getTbMsg();
@@ -126,7 +160,6 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
checkActive();
RuleNodeId originator = envelope.getOriginator();
String targetRelationType = envelope.getRelationType();
- //TODO: log debug output
List<RuleNodeRelation> relations = nodeRoutes.get(originator);
if (relations == null) {
return;
@@ -153,12 +186,8 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
}
private void pushMstToNode(RuleNodeCtx nodeCtx, TbMsg msg) {
- firstNode.getSelfActor().tell(new RuleChainToRuleNodeMsg(new DefaultTbContext(systemContext, nodeCtx), msg), self);
- }
-
- private void checkActive() {
- if (state != ComponentLifecycleState.ACTIVE) {
- throw new IllegalStateException("Rule chain is not active!");
+ if (nodeCtx != null) {
+ nodeCtx.getSelfActor().tell(new RuleChainToRuleNodeMsg(new DefaultTbContext(systemContext, nodeCtx), msg), self);
}
}
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainToRuleNodeMsg.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainToRuleNodeMsg.java
index 7640210..e7d866c 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainToRuleNodeMsg.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainToRuleNodeMsg.java
@@ -17,7 +17,6 @@ package org.thingsboard.server.actors.ruleChain;
import lombok.Data;
import org.thingsboard.rule.engine.api.TbContext;
-import org.thingsboard.server.common.data.id.RuleNodeId;
import org.thingsboard.server.common.msg.MsgType;
import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.TbMsg;
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActor.java
index 465da73..268c597 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActor.java
@@ -22,6 +22,7 @@ import org.thingsboard.server.common.data.id.RuleChainId;
import org.thingsboard.server.common.data.id.RuleNodeId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.TbActorMsg;
+import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
public class RuleNodeActor extends ComponentActor<RuleNodeId, RuleNodeActorMessageProcessor> {
@@ -37,6 +38,9 @@ public class RuleNodeActor extends ComponentActor<RuleNodeId, RuleNodeActorMessa
@Override
protected boolean process(TbActorMsg msg) {
switch (msg.getMsgType()) {
+ case COMPONENT_LIFE_CYCLE_MSG:
+ onComponentLifecycleMsg((ComponentLifecycleMsg) msg);
+ break;
case RULE_CHAIN_TO_RULE_MSG:
onRuleChainToRuleNodeMsg((RuleChainToRuleNodeMsg) msg);
break;
@@ -89,6 +93,4 @@ public class RuleNodeActor extends ComponentActor<RuleNodeId, RuleNodeActorMessa
return systemContext.getRuleNodeErrorPersistFrequency();
}
- //TODO: failover strategy
-
}
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActorMessageProcessor.java
index f5c014e..a86e2b9 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActorMessageProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActorMessageProcessor.java
@@ -18,28 +18,19 @@ package org.thingsboard.server.actors.ruleChain;
import akka.actor.ActorContext;
import akka.actor.ActorRef;
import akka.event.LoggingAdapter;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.springframework.util.Base64Utils;
import org.thingsboard.rule.engine.api.TbNode;
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeState;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.actors.shared.ComponentMsgProcessor;
-import org.thingsboard.server.common.data.DataConstants;
-import org.thingsboard.server.common.data.Event;
import org.thingsboard.server.common.data.id.RuleChainId;
import org.thingsboard.server.common.data.id.RuleNodeId;
import org.thingsboard.server.common.data.id.TenantId;
+import org.thingsboard.server.common.data.plugin.ComponentLifecycleState;
import org.thingsboard.server.common.data.rule.RuleNode;
-import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
import org.thingsboard.server.dao.rule.RuleChainService;
-import java.io.PrintWriter;
-import java.io.StringWriter;
-import java.nio.charset.StandardCharsets;
-
/**
* @author Andrew Shvayka
*/
@@ -63,11 +54,25 @@ public class RuleNodeActorMessageProcessor extends ComponentMsgProcessor<RuleNod
@Override
public void start(ActorContext context) throws Exception {
tbNode = initComponent(ruleNode);
+ state = ComponentLifecycleState.ACTIVE;
+ }
+
+ @Override
+ public void onUpdate(ActorContext context) throws Exception {
+ RuleNode newRuleNode = systemContext.getRuleChainService().findRuleNodeById(entityId);
+ boolean restartRequired = !(ruleNode.getType().equals(newRuleNode.getType())
+ && ruleNode.getConfiguration().equals(newRuleNode.getConfiguration()));
+ this.ruleNode = newRuleNode;
+ if (restartRequired) {
+ tbNode.destroy();
+ start(context);
+ }
}
@Override
public void stop(ActorContext context) throws Exception {
tbNode.destroy();
+ context.stop(self);
}
@Override
@@ -76,6 +81,7 @@ public class RuleNodeActorMessageProcessor extends ComponentMsgProcessor<RuleNod
}
void onRuleChainToRuleNodeMsg(RuleChainToRuleNodeMsg msg) throws Exception {
+ checkActive();
if (ruleNode.isDebugMode()) {
systemContext.persistDebugInput(tenantId, entityId, msg.getMsg());
}
@@ -89,4 +95,5 @@ public class RuleNodeActorMessageProcessor extends ComponentMsgProcessor<RuleNod
return tbNode;
}
+
}
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeCtx.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeCtx.java
index 90c67c2..f4f733b 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeCtx.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeCtx.java
@@ -17,7 +17,6 @@ package org.thingsboard.server.actors.ruleChain;
import akka.actor.ActorRef;
import lombok.Data;
-import org.thingsboard.server.common.data.id.RuleNodeId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.rule.RuleNode;
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeToSelfErrorMsg.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeToSelfErrorMsg.java
index 4fc03ac..e6248f1 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeToSelfErrorMsg.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeToSelfErrorMsg.java
@@ -16,7 +16,6 @@
package org.thingsboard.server.actors.ruleChain;
import lombok.Data;
-import org.thingsboard.server.common.data.id.RuleNodeId;
import org.thingsboard.server.common.msg.MsgType;
import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.TbMsg;
diff --git a/application/src/main/java/org/thingsboard/server/actors/shared/ComponentMsgProcessor.java b/application/src/main/java/org/thingsboard/server/actors/shared/ComponentMsgProcessor.java
index fd72bb1..e25d3a7 100644
--- a/application/src/main/java/org/thingsboard/server/actors/shared/ComponentMsgProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/shared/ComponentMsgProcessor.java
@@ -20,12 +20,14 @@ import akka.event.LoggingAdapter;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.actors.stats.StatsPersistTick;
import org.thingsboard.server.common.data.id.TenantId;
+import org.thingsboard.server.common.data.plugin.ComponentLifecycleState;
import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
public abstract class ComponentMsgProcessor<T> extends AbstractContextAwareMsgProcessor {
protected final TenantId tenantId;
protected final T entityId;
+ protected ComponentLifecycleState state;
protected ComponentMsgProcessor(ActorSystemContext systemContext, LoggingAdapter logger, TenantId tenantId, T id) {
super(systemContext, logger);
@@ -67,4 +69,10 @@ public abstract class ComponentMsgProcessor<T> extends AbstractContextAwareMsgPr
public void scheduleStatsPersistTick(ActorContext context, long statsPersistFrequency) {
schedulePeriodicMsgWithDelay(context, new StatsPersistTick(), statsPersistFrequency, statsPersistFrequency);
}
+
+ protected void checkActive() {
+ if (state != ComponentLifecycleState.ACTIVE) {
+ throw new IllegalStateException("Rule chain is not active!");
+ }
+ }
}
diff --git a/application/src/test/java/org/thingsboard/server/controller/AbstractRuleEngineControllerTest.java b/application/src/test/java/org/thingsboard/server/controller/AbstractRuleEngineControllerTest.java
index 7997024..bbcb98f 100644
--- a/application/src/test/java/org/thingsboard/server/controller/AbstractRuleEngineControllerTest.java
+++ b/application/src/test/java/org/thingsboard/server/controller/AbstractRuleEngineControllerTest.java
@@ -29,7 +29,7 @@ import org.thingsboard.server.common.data.rule.RuleChainMetaData;
/**
* Created by ashvayka on 20.03.18.
*/
-public class AbstractRuleEngineControllerTest extends AbstractControllerTest{
+public class AbstractRuleEngineControllerTest extends AbstractControllerTest {
protected RuleChain saveRuleChain(RuleChain ruleChain) throws Exception {
return doPost("/api/ruleChain", ruleChain, RuleChain.class);
diff --git a/application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java b/application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java
index 5b8216b..acbace6 100644
--- a/application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java
+++ b/application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java
@@ -38,7 +38,9 @@ import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg;
import org.thingsboard.server.controller.AbstractRuleEngineControllerTest;
import org.thingsboard.server.dao.attributes.AttributesService;
+import org.thingsboard.server.dao.rule.RuleChainService;
+import java.util.Arrays;
import java.util.Collections;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
@@ -49,17 +51,17 @@ import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.
@Slf4j
public abstract class AbstractRuleEngineFlowIntegrationTest extends AbstractRuleEngineControllerTest {
- private static final String MQTT_URL = "tcp://localhost:1883";
- private static final Long TIME_TO_HANDLE_REQUEST = 500L;
+ protected Tenant savedTenant;
+ protected User tenantAdmin;
- private Tenant savedTenant;
- private User tenantAdmin;
+ @Autowired
+ protected ActorService actorService;
@Autowired
- private ActorService actorService;
+ protected AttributesService attributesService;
@Autowired
- private AttributesService attributesService;
+ protected RuleChainService ruleChainService;
@Before
public void beforeTest() throws Exception {
@@ -89,7 +91,7 @@ public abstract class AbstractRuleEngineFlowIntegrationTest extends AbstractRule
}
@Test
- public void testSimpleRuleChainCreation() throws Exception {
+ public void testRuleChainWithTwoRules() throws Exception {
// Creating Rule Chain
RuleChain ruleChain = new RuleChain();
ruleChain.setName("Simple Rule Chain");
@@ -102,17 +104,26 @@ public abstract class AbstractRuleEngineFlowIntegrationTest extends AbstractRule
RuleChainMetaData metaData = new RuleChainMetaData();
metaData.setRuleChainId(ruleChain.getId());
- RuleNode ruleNode = new RuleNode();
- ruleNode.setName("Simple Rule Node");
- ruleNode.setType(org.thingsboard.rule.engine.metadata.TbGetAttributesNode.class.getName());
- ruleNode.setDebugMode(true);
- TbGetAttributesNodeConfiguration configuration = new TbGetAttributesNodeConfiguration();
- configuration.setServerAttributeNames(Collections.singletonList("serverAttributeKey"));
- ruleNode.setConfiguration(mapper.valueToTree(configuration));
+ RuleNode ruleNode1 = new RuleNode();
+ ruleNode1.setName("Simple Rule Node 1");
+ ruleNode1.setType(org.thingsboard.rule.engine.metadata.TbGetAttributesNode.class.getName());
+ ruleNode1.setDebugMode(true);
+ TbGetAttributesNodeConfiguration configuration1 = new TbGetAttributesNodeConfiguration();
+ configuration1.setServerAttributeNames(Collections.singletonList("serverAttributeKey1"));
+ ruleNode1.setConfiguration(mapper.valueToTree(configuration1));
- metaData.setNodes(Collections.singletonList(ruleNode));
- metaData.setFirstNodeIndex(0);
+ RuleNode ruleNode2 = new RuleNode();
+ ruleNode2.setName("Simple Rule Node 2");
+ ruleNode2.setType(org.thingsboard.rule.engine.metadata.TbGetAttributesNode.class.getName());
+ ruleNode2.setDebugMode(true);
+ TbGetAttributesNodeConfiguration configuration2 = new TbGetAttributesNodeConfiguration();
+ configuration2.setServerAttributeNames(Collections.singletonList("serverAttributeKey2"));
+ ruleNode2.setConfiguration(mapper.valueToTree(configuration2));
+
+ metaData.setNodes(Arrays.asList(ruleNode1, ruleNode2));
+ metaData.setFirstNodeIndex(0);
+ metaData.addConnectionInfo(0, 1, "Success");
metaData = saveRuleChainMetaData(metaData);
Assert.assertNotNull(metaData);
@@ -126,7 +137,12 @@ public abstract class AbstractRuleEngineFlowIntegrationTest extends AbstractRule
device = doPost("/api/device", device, Device.class);
attributesService.save(device.getId(), DataConstants.SERVER_SCOPE,
- Collections.singletonList(new BaseAttributeKvEntry(new StringDataEntry("serverAttributeKey", "serverAttributeValue"), System.currentTimeMillis())));
+ Collections.singletonList(new BaseAttributeKvEntry(new StringDataEntry("serverAttributeKey1", "serverAttributeValue1"), System.currentTimeMillis())));
+ attributesService.save(device.getId(), DataConstants.SERVER_SCOPE,
+ Collections.singletonList(new BaseAttributeKvEntry(new StringDataEntry("serverAttributeKey2", "serverAttributeValue2"), System.currentTimeMillis())));
+
+
+ Thread.sleep(1000);
// Pushing Message to the system
TbMsg tbMsg = new TbMsg(UUIDs.timeBased(),
@@ -150,7 +166,25 @@ public abstract class AbstractRuleEngineFlowIntegrationTest extends AbstractRule
Assert.assertEquals(ruleChain.getFirstRuleNodeId(), outEvent.getEntityId());
Assert.assertEquals(device.getId().getId().toString(), outEvent.getBody().get("entityId").asText());
- Assert.assertEquals("serverAttributeValue", outEvent.getBody().get("metadata").get("ss.serverAttributeKey").asText());
+ Assert.assertEquals("serverAttributeValue1", outEvent.getBody().get("metadata").get("ss.serverAttributeKey1").asText());
+
+ RuleChain finalRuleChain = ruleChain;
+ RuleNode lastRuleNode = metaData.getNodes().stream().filter(node -> !node.getId().equals(finalRuleChain.getFirstRuleNodeId())).findFirst().get();
+
+ events = getDebugEvents(savedTenant.getId(), lastRuleNode.getId(), 1000);
+
+ Assert.assertEquals(2, events.getData().size());
+
+ inEvent = events.getData().stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.IN)).findFirst().get();
+ Assert.assertEquals(lastRuleNode.getId(), inEvent.getEntityId());
+ Assert.assertEquals(device.getId().getId().toString(), inEvent.getBody().get("entityId").asText());
+
+ outEvent = events.getData().stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.OUT)).findFirst().get();
+ Assert.assertEquals(lastRuleNode.getId(), outEvent.getEntityId());
+ Assert.assertEquals(device.getId().getId().toString(), outEvent.getBody().get("entityId").asText());
+
+ Assert.assertEquals("serverAttributeValue1", outEvent.getBody().get("metadata").get("ss.serverAttributeKey1").asText());
+ Assert.assertEquals("serverAttributeValue2", outEvent.getBody().get("metadata").get("ss.serverAttributeKey2").asText());
}
}
diff --git a/application/src/test/java/org/thingsboard/server/rules/lifecycle/AbstractRuleEngineLifecycleIntegrationTest.java b/application/src/test/java/org/thingsboard/server/rules/lifecycle/AbstractRuleEngineLifecycleIntegrationTest.java
new file mode 100644
index 0000000..de690a7
--- /dev/null
+++ b/application/src/test/java/org/thingsboard/server/rules/lifecycle/AbstractRuleEngineLifecycleIntegrationTest.java
@@ -0,0 +1,158 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.rules.lifecycle;
+
+import com.datastax.driver.core.utils.UUIDs;
+import lombok.extern.slf4j.Slf4j;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.thingsboard.rule.engine.metadata.TbGetAttributesNodeConfiguration;
+import org.thingsboard.server.actors.service.ActorService;
+import org.thingsboard.server.common.data.DataConstants;
+import org.thingsboard.server.common.data.Device;
+import org.thingsboard.server.common.data.Event;
+import org.thingsboard.server.common.data.Tenant;
+import org.thingsboard.server.common.data.User;
+import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
+import org.thingsboard.server.common.data.kv.StringDataEntry;
+import org.thingsboard.server.common.data.page.TimePageData;
+import org.thingsboard.server.common.data.rule.RuleChain;
+import org.thingsboard.server.common.data.rule.RuleChainMetaData;
+import org.thingsboard.server.common.data.rule.RuleNode;
+import org.thingsboard.server.common.data.security.Authority;
+import org.thingsboard.server.common.msg.TbMsg;
+import org.thingsboard.server.common.msg.TbMsgMetaData;
+import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg;
+import org.thingsboard.server.controller.AbstractRuleEngineControllerTest;
+import org.thingsboard.server.dao.attributes.AttributesService;
+
+import java.util.Collections;
+
+import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
+
+/**
+ * @author Valerii Sosliuk
+ */
+@Slf4j
+public abstract class AbstractRuleEngineLifecycleIntegrationTest extends AbstractRuleEngineControllerTest {
+
+ protected Tenant savedTenant;
+ protected User tenantAdmin;
+
+ @Autowired
+ protected ActorService actorService;
+
+ @Autowired
+ protected AttributesService attributesService;
+
+ @Before
+ public void beforeTest() throws Exception {
+ loginSysAdmin();
+
+ Tenant tenant = new Tenant();
+ tenant.setTitle("My tenant");
+ savedTenant = doPost("/api/tenant", tenant, Tenant.class);
+ Assert.assertNotNull(savedTenant);
+
+ tenantAdmin = new User();
+ tenantAdmin.setAuthority(Authority.TENANT_ADMIN);
+ tenantAdmin.setTenantId(savedTenant.getId());
+ tenantAdmin.setEmail("tenant2@thingsboard.org");
+ tenantAdmin.setFirstName("Joe");
+ tenantAdmin.setLastName("Downs");
+
+ createUserAndLogin(tenantAdmin, "testPassword1");
+ }
+
+ @After
+ public void afterTest() throws Exception {
+ loginSysAdmin();
+ if (savedTenant != null) {
+ doDelete("/api/tenant/" + savedTenant.getId().getId().toString()).andExpect(status().isOk());
+ }
+ }
+
+ @Test
+ public void testRuleChainWithOneRule() throws Exception {
+ // Creating Rule Chain
+ RuleChain ruleChain = new RuleChain();
+ ruleChain.setName("Simple Rule Chain");
+ ruleChain.setTenantId(savedTenant.getId());
+ ruleChain.setRoot(true);
+ ruleChain.setDebugMode(true);
+ ruleChain = saveRuleChain(ruleChain);
+ Assert.assertNull(ruleChain.getFirstRuleNodeId());
+
+ RuleChainMetaData metaData = new RuleChainMetaData();
+ metaData.setRuleChainId(ruleChain.getId());
+
+ RuleNode ruleNode = new RuleNode();
+ ruleNode.setName("Simple Rule Node");
+ ruleNode.setType(org.thingsboard.rule.engine.metadata.TbGetAttributesNode.class.getName());
+ ruleNode.setDebugMode(true);
+ TbGetAttributesNodeConfiguration configuration = new TbGetAttributesNodeConfiguration();
+ configuration.setServerAttributeNames(Collections.singletonList("serverAttributeKey"));
+ ruleNode.setConfiguration(mapper.valueToTree(configuration));
+
+ metaData.setNodes(Collections.singletonList(ruleNode));
+ metaData.setFirstNodeIndex(0);
+
+ metaData = saveRuleChainMetaData(metaData);
+ Assert.assertNotNull(metaData);
+
+ ruleChain = getRuleChain(ruleChain.getId());
+ Assert.assertNotNull(ruleChain.getFirstRuleNodeId());
+
+ // Saving the device
+ Device device = new Device();
+ device.setName("My device");
+ device.setType("default");
+ device = doPost("/api/device", device, Device.class);
+
+ attributesService.save(device.getId(), DataConstants.SERVER_SCOPE,
+ Collections.singletonList(new BaseAttributeKvEntry(new StringDataEntry("serverAttributeKey", "serverAttributeValue"), System.currentTimeMillis())));
+
+ Thread.sleep(1000);
+
+ // Pushing Message to the system
+ TbMsg tbMsg = new TbMsg(UUIDs.timeBased(),
+ "CUSTOM",
+ device.getId(),
+ new TbMsgMetaData(),
+ new byte[]{});
+ actorService.onMsg(new ServiceToRuleEngineMsg(savedTenant.getId(), tbMsg));
+
+ Thread.sleep(3000);
+
+ TimePageData<Event> events = getDebugEvents(savedTenant.getId(), ruleChain.getFirstRuleNodeId(), 1000);
+
+ Assert.assertEquals(2, events.getData().size());
+
+ Event inEvent = events.getData().stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.IN)).findFirst().get();
+ Assert.assertEquals(ruleChain.getFirstRuleNodeId(), inEvent.getEntityId());
+ Assert.assertEquals(device.getId().getId().toString(), inEvent.getBody().get("entityId").asText());
+
+ Event outEvent = events.getData().stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.OUT)).findFirst().get();
+ Assert.assertEquals(ruleChain.getFirstRuleNodeId(), outEvent.getEntityId());
+ Assert.assertEquals(device.getId().getId().toString(), outEvent.getBody().get("entityId").asText());
+
+ Assert.assertEquals("serverAttributeValue", outEvent.getBody().get("metadata").get("ss.serverAttributeKey").asText());
+ }
+
+}
diff --git a/application/src/test/java/org/thingsboard/server/rules/RuleEngineSqlTestSuite.java b/application/src/test/java/org/thingsboard/server/rules/RuleEngineSqlTestSuite.java
index 136f76b..65b4293 100644
--- a/application/src/test/java/org/thingsboard/server/rules/RuleEngineSqlTestSuite.java
+++ b/application/src/test/java/org/thingsboard/server/rules/RuleEngineSqlTestSuite.java
@@ -24,7 +24,7 @@ import java.util.Arrays;
@RunWith(ClasspathSuite.class)
@ClasspathSuite.ClassnameFilters({
- "org.thingsboard.server.rules.flow.*Test", "org.thingsboard.server.rules.lifecycle.*Test"})
+ "org.thingsboard.server.rules.flow.*Test"})
public class RuleEngineSqlTestSuite {
@ClassRule
diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/rule/RuleChainConnectionInfo.java b/common/data/src/main/java/org/thingsboard/server/common/data/rule/RuleChainConnectionInfo.java
new file mode 100644
index 0000000..35cf6aa
--- /dev/null
+++ b/common/data/src/main/java/org/thingsboard/server/common/data/rule/RuleChainConnectionInfo.java
@@ -0,0 +1,29 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.common.data.rule;
+
+import lombok.Data;
+import org.thingsboard.server.common.data.id.RuleChainId;
+
+/**
+ * Created by ashvayka on 21.03.18.
+ */
+@Data
+public class RuleChainConnectionInfo {
+ private int fromIndex;
+ private RuleChainId targetRuleChainId;
+ private String type;
+}
diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/rule/RuleChainMetaData.java b/common/data/src/main/java/org/thingsboard/server/common/data/rule/RuleChainMetaData.java
index af141d6..1be1518 100644
--- a/common/data/src/main/java/org/thingsboard/server/common/data/rule/RuleChainMetaData.java
+++ b/common/data/src/main/java/org/thingsboard/server/common/data/rule/RuleChainMetaData.java
@@ -58,18 +58,4 @@ public class RuleChainMetaData {
ruleChainConnections.add(connectionInfo);
}
- @Data
- public class NodeConnectionInfo {
- private int fromIndex;
- private int toIndex;
- private String type;
- }
-
- @Data
- public class RuleChainConnectionInfo {
- private int fromIndex;
- private RuleChainId targetRuleChainId;
- private String type;
- }
-
}
diff --git a/dao/src/main/java/org/thingsboard/server/dao/rule/BaseRuleChainService.java b/dao/src/main/java/org/thingsboard/server/dao/rule/BaseRuleChainService.java
index 3daed33..7d6cd00 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/rule/BaseRuleChainService.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/rule/BaseRuleChainService.java
@@ -31,7 +31,9 @@ import org.thingsboard.server.common.data.page.TextPageData;
import org.thingsboard.server.common.data.page.TextPageLink;
import org.thingsboard.server.common.data.relation.EntityRelation;
import org.thingsboard.server.common.data.relation.RelationTypeGroup;
+import org.thingsboard.server.common.data.rule.NodeConnectionInfo;
import org.thingsboard.server.common.data.rule.RuleChain;
+import org.thingsboard.server.common.data.rule.RuleChainConnectionInfo;
import org.thingsboard.server.common.data.rule.RuleChainMetaData;
import org.thingsboard.server.common.data.rule.RuleNode;
import org.thingsboard.server.dao.entity.AbstractEntityService;
@@ -148,7 +150,7 @@ public class BaseRuleChainService extends AbstractEntityService implements RuleC
ruleChainDao.save(ruleChain);
}
if (ruleChainMetaData.getConnections() != null) {
- for (RuleChainMetaData.NodeConnectionInfo nodeConnection : ruleChainMetaData.getConnections()) {
+ for (NodeConnectionInfo nodeConnection : ruleChainMetaData.getConnections()) {
EntityId from = nodes.get(nodeConnection.getFromIndex()).getId();
EntityId to = nodes.get(nodeConnection.getToIndex()).getId();
String type = nodeConnection.getType();
@@ -161,7 +163,7 @@ public class BaseRuleChainService extends AbstractEntityService implements RuleC
}
}
if (ruleChainMetaData.getRuleChainConnections() != null) {
- for (RuleChainMetaData.RuleChainConnectionInfo nodeToRuleChainConnection : ruleChainMetaData.getRuleChainConnections()) {
+ for (RuleChainConnectionInfo nodeToRuleChainConnection : ruleChainMetaData.getRuleChainConnections()) {
EntityId from = nodes.get(nodeToRuleChainConnection.getFromIndex()).getId();
EntityId to = nodeToRuleChainConnection.getTargetRuleChainId();
String type = nodeToRuleChainConnection.getType();
@@ -314,7 +316,7 @@ public class BaseRuleChainService extends AbstractEntityService implements RuleC
private void createRelation(EntityRelation relation) throws ExecutionException, InterruptedException {
log.debug("Creating relation: {}", relation);
- relationService.saveRelationAsync(relation).get();
+ relationService.saveRelation(relation);
}
private DataValidator<RuleChain> ruleChainValidator =