thingsboard-memoizeit
Changes
application/pom.xml 4(+4 -0)
application/src/main/java/org/thingsboard/server/actors/plugin/PluginActorMessageProcessor.java 8(+4 -4)
application/src/main/java/org/thingsboard/server/actors/rule/RuleActorMessageProcessor.java 4(+2 -2)
application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java 140(+95 -45)
application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainManagerActor.java 3(+3 -0)
application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainToRuleNodeMsg.java 23(+23 -0)
application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActorMessageProcessor.java 78(+78 -0)
application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeToRuleChainTellNextMsg.java 24(+24 -0)
application/src/main/java/org/thingsboard/server/actors/shared/AbstractContextAwareMsgProcessor.java 3(+0 -3)
application/src/main/java/org/thingsboard/server/actors/shared/ComponentMsgProcessor.java 31(+23 -8)
application/src/main/java/org/thingsboard/server/actors/shared/rulechain/RuleChainManager.java 14(+14 -0)
common/message/src/main/java/org/thingsboard/server/common/msg/plugin/ComponentLifecycleMsg.java 8(+7 -1)
Details
application/pom.xml 4(+4 -0)
diff --git a/application/pom.xml b/application/pom.xml
index 8449246..95322fa 100644
--- a/application/pom.xml
+++ b/application/pom.xml
@@ -54,6 +54,10 @@
<artifactId>extensions-api</artifactId>
</dependency>
<dependency>
+ <groupId>org.thingsboard.rule-engine</groupId>
+ <artifactId>rule-engine-api</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.thingsboard</groupId>
<artifactId>extensions-core</artifactId>
</dependency>
diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
index f59ec63..68d5547 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
@@ -136,6 +136,12 @@ public class ActorSystemContext {
@Value("${actors.plugin.error_persist_frequency}")
@Getter private long pluginErrorPersistFrequency;
+ @Value("${actors.rule.chain.error_persist_frequency}")
+ @Getter private long ruleChainErrorPersistFrequency;
+
+ @Value("${actors.rule.node.error_persist_frequency}")
+ @Getter private long ruleNodeErrorPersistFrequency;
+
@Value("${actors.rule.termination.delay}")
@Getter private long ruleActorTerminationDelay;
diff --git a/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java b/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java
index 549d71d..5e05a60 100644
--- a/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java
@@ -33,9 +33,11 @@ import org.thingsboard.server.common.data.id.PluginId;
import org.thingsboard.server.common.data.id.RuleChainId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.page.PageDataIterable;
+import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
import org.thingsboard.server.common.msg.device.ToDeviceActorMsg;
import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
+import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg;
import org.thingsboard.server.dao.model.ModelConstants;
import org.thingsboard.server.dao.tenant.TenantService;
import org.thingsboard.server.extensions.api.device.ToDeviceActorNotificationMsg;
@@ -86,27 +88,48 @@ public class AppActor extends RuleChainManagerActor {
}
@Override
- public void onReceive(Object msg) throws Exception {
- logger.debug("Received message: {}", msg);
- if (msg instanceof ToDeviceActorMsg) {
- processDeviceMsg((ToDeviceActorMsg) msg);
- } else if (msg instanceof ToPluginActorMsg) {
- onToPluginMsg((ToPluginActorMsg) msg);
- } else if (msg instanceof ToDeviceActorNotificationMsg) {
- onToDeviceActorMsg((ToDeviceActorNotificationMsg) msg);
- } else if (msg instanceof Terminated) {
- processTermination((Terminated) msg);
- } else if (msg instanceof ClusterEventMsg) {
- broadcast(msg);
- } else if (msg instanceof ComponentLifecycleMsg) {
- onComponentLifecycleMsg((ComponentLifecycleMsg) msg);
- } else if (msg instanceof PluginTerminationMsg) {
- onPluginTerminated((PluginTerminationMsg) msg);
+ protected void process(TbActorMsg msg) {
+ switch (msg.getMsgType()) {
+ case COMPONENT_LIFE_CYCLE_MSG:
+ onComponentLifecycleMsg((ComponentLifecycleMsg) msg);
+ break;
+ case SERVICE_TO_RULE_ENGINE_MSG:
+ onServiceToRuleEngineMsg((ServiceToRuleEngineMsg) msg);
+ break;
+ }
+ }
+
+ private void onServiceToRuleEngineMsg(ServiceToRuleEngineMsg msg) {
+ if (SYSTEM_TENANT.equals(msg.getTenantId())) {
+ //TODO: ashvayka handle this.
} else {
- logger.warning("Unknown message: {}!", msg);
+ getOrCreateTenantActor(msg.getTenantId()).tell(msg, self());
}
}
+
+// @Override
+// public void onReceive(Object msg) throws Exception {
+// logger.debug("Received message: {}", msg);
+// if (msg instanceof ToDeviceActorMsg) {
+// processDeviceMsg((ToDeviceActorMsg) msg);
+// } else if (msg instanceof ToPluginActorMsg) {
+// onToPluginMsg((ToPluginActorMsg) msg);
+// } else if (msg instanceof ToDeviceActorNotificationMsg) {
+// onToDeviceActorMsg((ToDeviceActorNotificationMsg) msg);
+// } else if (msg instanceof Terminated) {
+// processTermination((Terminated) msg);
+// } else if (msg instanceof ClusterEventMsg) {
+// broadcast(msg);
+// } else if (msg instanceof ComponentLifecycleMsg) {
+// onComponentLifecycleMsg((ComponentLifecycleMsg) msg);
+// } else if (msg instanceof PluginTerminationMsg) {
+// onPluginTerminated((PluginTerminationMsg) msg);
+// } else {
+// logger.warning("Unknown message: {}!", msg);
+// }
+// }
+
private void onPluginTerminated(PluginTerminationMsg msg) {
pluginManager.remove(msg.getId());
}
diff --git a/application/src/main/java/org/thingsboard/server/actors/plugin/PluginActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/plugin/PluginActorMessageProcessor.java
index 6e78e20..09aaf80 100644
--- a/application/src/main/java/org/thingsboard/server/actors/plugin/PluginActorMessageProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/plugin/PluginActorMessageProcessor.java
@@ -57,7 +57,7 @@ public class PluginActorMessageProcessor extends ComponentMsgProcessor<PluginId>
}
@Override
- public void start() throws Exception {
+ public void start(ActorContext context) throws Exception {
logger.info("[{}] Going to start plugin actor.", entityId);
pluginMd = systemContext.getPluginService().findPluginById(entityId);
if (pluginMd == null) {
@@ -76,7 +76,7 @@ public class PluginActorMessageProcessor extends ComponentMsgProcessor<PluginId>
}
@Override
- public void stop() throws Exception {
+ public void stop(ActorContext context) throws Exception {
onStop();
}
@@ -191,7 +191,7 @@ public class PluginActorMessageProcessor extends ComponentMsgProcessor<PluginId>
if (pluginImpl != null) {
pluginImpl.stop(trustedCtx);
}
- start();
+ start(context);
}
}
@@ -217,7 +217,7 @@ public class PluginActorMessageProcessor extends ComponentMsgProcessor<PluginId>
pluginImpl.resume(trustedCtx);
logger.info("[{}] Plugin resumed.", entityId);
} else {
- start();
+ start(context);
}
}
diff --git a/application/src/main/java/org/thingsboard/server/actors/rule/RuleActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/rule/RuleActorMessageProcessor.java
index 2ebebfc..236fca7 100644
--- a/application/src/main/java/org/thingsboard/server/actors/rule/RuleActorMessageProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/rule/RuleActorMessageProcessor.java
@@ -68,7 +68,7 @@ class RuleActorMessageProcessor extends ComponentMsgProcessor<RuleId> {
}
@Override
- public void start() throws Exception {
+ public void start(ActorContext context) throws Exception {
logger.info("[{}][{}] Starting rule actor.", entityId, tenantId);
ruleMd = systemContext.getRuleService().findRuleById(entityId);
if (ruleMd == null) {
@@ -86,7 +86,7 @@ class RuleActorMessageProcessor extends ComponentMsgProcessor<RuleId> {
}
@Override
- public void stop() throws Exception {
+ public void stop(ActorContext context) throws Exception {
onStop();
}
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
new file mode 100644
index 0000000..0cd7601
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
@@ -0,0 +1,61 @@
+package org.thingsboard.server.actors.ruleChain;
+
+import org.thingsboard.rule.engine.api.TbContext;
+import org.thingsboard.server.actors.ActorSystemContext;
+import org.thingsboard.server.common.msg.TbMsg;
+import org.thingsboard.server.common.msg.cluster.ServerAddress;
+import org.thingsboard.server.dao.attributes.AttributesService;
+
+/**
+ * Created by ashvayka on 19.03.18.
+ */
+class DefaultTbContext implements TbContext {
+
+ private final ActorSystemContext mainCtx;
+ private final RuleNodeCtx nodeCtx;
+
+ public DefaultTbContext(ActorSystemContext mainCtx, RuleNodeCtx nodeCtx) {
+ this.mainCtx = mainCtx;
+ this.nodeCtx = nodeCtx;
+ }
+
+ @Override
+ public void tellNext(TbMsg msg) {
+ tellNext(msg, null);
+ }
+
+ @Override
+ public void tellNext(TbMsg msg, String relationType) {
+ nodeCtx.getChainActor().tell(new RuleNodeToRuleChainTellNextMsg(nodeCtx.getSelfId(), relationType, msg), nodeCtx.getSelf());
+ }
+
+ @Override
+ public void tellSelf(TbMsg msg, long delayMs) {
+
+ }
+
+ @Override
+ public void tellOthers(TbMsg msg) {
+
+ }
+
+ @Override
+ public void tellSibling(TbMsg msg, ServerAddress address) {
+
+ }
+
+ @Override
+ public void spawn(TbMsg msg) {
+
+ }
+
+ @Override
+ public void ack(TbMsg msg) {
+
+ }
+
+ @Override
+ public AttributesService getAttributesService() {
+ return mainCtx.getAttributesService();
+ }
+}
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActor.java
index 83536f8..20ea05d 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActor.java
@@ -20,6 +20,9 @@ import org.thingsboard.server.actors.service.ComponentActor;
import org.thingsboard.server.actors.service.ContextBasedCreator;
import org.thingsboard.server.common.data.id.RuleChainId;
import org.thingsboard.server.common.data.id.TenantId;
+import org.thingsboard.server.common.msg.TbActorMsg;
+import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
+import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg;
public class RuleChainActor extends ComponentActor<RuleChainId, RuleChainActorMessageProcessor> {
@@ -30,8 +33,18 @@ public class RuleChainActor extends ComponentActor<RuleChainId, RuleChainActorMe
}
@Override
- public void onReceive(Object msg) throws Exception {
- logger.debug("[{}][{}] Unknown msg type.", tenantId, id, msg.getClass().getName());
+ protected void process(TbActorMsg msg) {
+ switch (msg.getMsgType()) {
+ case COMPONENT_LIFE_CYCLE_MSG:
+ onComponentLifecycleMsg((ComponentLifecycleMsg) msg);
+ break;
+ case SERVICE_TO_RULE_ENGINE_MSG:
+ processor.onServiceToRuleEngineMsg((ServiceToRuleEngineMsg) msg);
+ break;
+ case RULE_TO_RULE_CHAIN_TELL_NEXT_MSG:
+ processor.onTellNext((RuleNodeToRuleChainTellNextMsg) msg);
+ break;
+ }
}
public static class ActorCreator extends ContextBasedCreator<RuleChainActor> {
@@ -54,6 +67,6 @@ public class RuleChainActor extends ComponentActor<RuleChainId, RuleChainActorMe
@Override
protected long getErrorPersistFrequency() {
- return systemContext.getPluginErrorPersistFrequency();
+ return systemContext.getRuleChainErrorPersistFrequency();
}
}
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
index 3075367..5c946bd 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
@@ -1,12 +1,12 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
- *
+ * <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,77 +17,127 @@ package org.thingsboard.server.actors.ruleChain;
import akka.actor.ActorContext;
import akka.actor.ActorRef;
+import akka.actor.Props;
import akka.event.LoggingAdapter;
-import com.fasterxml.jackson.core.JsonProcessingException;
import org.thingsboard.server.actors.ActorSystemContext;
-import org.thingsboard.server.actors.plugin.*;
+import org.thingsboard.server.actors.service.DefaultActorService;
import org.thingsboard.server.actors.shared.ComponentMsgProcessor;
-import org.thingsboard.server.common.data.id.PluginId;
+import org.thingsboard.server.common.data.EntityType;
+import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.RuleChainId;
+import org.thingsboard.server.common.data.id.RuleNodeId;
import org.thingsboard.server.common.data.id.TenantId;
-import org.thingsboard.server.common.data.plugin.ComponentLifecycleState;
-import org.thingsboard.server.common.data.plugin.ComponentType;
-import org.thingsboard.server.common.data.plugin.PluginMetaData;
+import org.thingsboard.server.common.data.relation.EntityRelation;
+import org.thingsboard.server.common.data.rule.RuleChain;
+import org.thingsboard.server.common.data.rule.RuleNode;
+import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
-import org.thingsboard.server.common.msg.cluster.ServerAddress;
-import org.thingsboard.server.extensions.api.plugins.Plugin;
-import org.thingsboard.server.extensions.api.plugins.PluginInitializationException;
-import org.thingsboard.server.extensions.api.plugins.msg.FromDeviceRpcResponse;
-import org.thingsboard.server.extensions.api.plugins.msg.TimeoutMsg;
-import org.thingsboard.server.extensions.api.plugins.rest.PluginRestMsg;
-import org.thingsboard.server.extensions.api.plugins.rpc.PluginRpcMsg;
-import org.thingsboard.server.extensions.api.plugins.ws.msg.PluginWebsocketMsg;
-import org.thingsboard.server.extensions.api.rules.RuleException;
+import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg;
+import org.thingsboard.server.dao.rule.RuleChainService;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
/**
* @author Andrew Shvayka
*/
public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleChainId> {
- private ComponentLifecycleState state;
+ private final ActorRef parent;
+ private final ActorRef self;
+ private final Map<RuleNodeId, RuleNodeCtx> nodeActors;
+ private final Map<RuleNodeId, List<RuleNodeRelation>> nodeRoutes;
+ private final RuleChainService service;
+
+ private RuleNodeId firstId;
+ private RuleNodeCtx firstNode;
- protected RuleChainActorMessageProcessor(TenantId tenantId, RuleChainId pluginId, ActorSystemContext systemContext
+ RuleChainActorMessageProcessor(TenantId tenantId, RuleChainId ruleChainId, ActorSystemContext systemContext
, LoggingAdapter logger, ActorRef parent, ActorRef self) {
- super(systemContext, logger, tenantId, pluginId);
+ super(systemContext, logger, tenantId, ruleChainId);
+ this.parent = parent;
+ this.self = self;
+ this.nodeActors = new HashMap<>();
+ this.nodeRoutes = new HashMap<>();
+ this.service = systemContext.getRuleChainService();
}
@Override
- public void start() throws Exception {
-
+ public void start(ActorContext context) throws Exception {
+ RuleChain ruleChain = service.findRuleChainById(entityId);
+ List<RuleNode> ruleNodeList = service.getRuleChainNodes(entityId);
+ // Creating and starting the actors;
+ for (RuleNode ruleNode : ruleNodeList) {
+ String dispatcherName = tenantId.getId().equals(EntityId.NULL_UUID) ?
+ DefaultActorService.SYSTEM_RULE_DISPATCHER_NAME : DefaultActorService.TENANT_RULE_DISPATCHER_NAME;
+ ActorRef ruleNodeActor = context.actorOf(
+ Props.create(new RuleNodeActor.ActorCreator(systemContext, tenantId, entityId, ruleNode.getId()))
+ .withDispatcher(dispatcherName), ruleNode.toString());
+ nodeActors.put(ruleNode.getId(), new RuleNodeCtx(self, ruleNodeActor, ruleNode.getId()));
+ }
+ // Populating the routes map;
+ for (RuleNode ruleNode : ruleNodeList) {
+ List<EntityRelation> relations = service.getRuleNodeRelations(ruleNode.getId());
+ for (EntityRelation relation : relations) {
+ if (relation.getTo().getEntityType() == EntityType.RULE_NODE) {
+ RuleNodeCtx ruleNodeCtx = nodeActors.get(new RuleNodeId(relation.getTo().getId()));
+ if (ruleNodeCtx == null) {
+ throw new IllegalArgumentException("Rule Node [" + relation.getFrom() + "] has invalid relation to Rule node [" + relation.getTo() + "]");
+ }
+ }
+ nodeRoutes.computeIfAbsent(ruleNode.getId(), k -> new ArrayList<>())
+ .add(new RuleNodeRelation(ruleNode.getId(), relation.getTo(), relation.getType()));
+ }
+ }
+
+ firstId = ruleChain.getFirstRuleNodeId();
+ firstNode = nodeActors.get(ruleChain.getFirstRuleNodeId());
}
@Override
- public void stop() throws Exception {
-
+ public void stop(ActorContext context) throws Exception {
+ nodeActors.values().stream().map(RuleNodeCtx::getSelf).forEach(context::stop);
+ nodeActors.clear();
+ nodeRoutes.clear();
}
@Override
- public void onCreated(ActorContext context) throws Exception {
+ public void onClusterEventMsg(ClusterEventMsg msg) throws Exception {
}
- @Override
- public void onUpdate(ActorContext context) throws Exception {
-
+ void onServiceToRuleEngineMsg(ServiceToRuleEngineMsg envelope) {
+ TbMsg tbMsg = envelope.getTbMsg();
+ //TODO: push to queue and act on ack in async way
+ pushMstToNode(firstNode, tbMsg);
}
- @Override
- public void onActivate(ActorContext context) throws Exception {
-
+ void onTellNext(RuleNodeToRuleChainTellNextMsg envelope) {
+ RuleNodeId originator = envelope.getOriginator();
+ String targetRelationType = envelope.getRelationType();
+ //TODO: log debug output
+ List<RuleNodeRelation> relations = nodeRoutes.get(originator);
+ for (RuleNodeRelation relation : relations) {
+ if (targetRelationType == null || targetRelationType.equalsIgnoreCase(relation.getType())) {
+ switch (relation.getOut().getEntityType()) {
+ case RULE_NODE:
+ RuleNodeId targetRuleNodeId = new RuleNodeId(relation.getOut().getId());
+ RuleNodeCtx targetRuleNode = nodeActors.get(targetRuleNodeId);
+ pushMstToNode(targetRuleNode, envelope.getMsg());
+ break;
+ case RULE_CHAIN:
+// TODO: implement
+ break;
+ }
+ }
+ }
}
- @Override
- public void onSuspend(ActorContext context) throws Exception {
-
- }
-
- @Override
- public void onStop(ActorContext context) throws Exception {
-
+ private void pushMstToNode(RuleNodeCtx nodeCtx, TbMsg msg) {
+ //TODO: log debug input
+ firstNode.getSelf().tell(new RuleChainToRuleNodeMsg(new DefaultTbContext(systemContext, nodeCtx), msg), self);
}
- @Override
- public void onClusterEventMsg(ClusterEventMsg msg) throws Exception {
-
- }
}
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainManagerActor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainManagerActor.java
index 2dca8dd..cb76f04 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainManagerActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainManagerActor.java
@@ -8,6 +8,7 @@ import org.thingsboard.server.actors.shared.rulechain.RuleChainManager;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.PluginId;
import org.thingsboard.server.common.data.id.RuleChainId;
+import org.thingsboard.server.dao.rule.RuleChainService;
/**
* Created by ashvayka on 15.03.18.
@@ -16,11 +17,13 @@ public abstract class RuleChainManagerActor extends ContextAwareActor {
protected final RuleChainManager ruleChainManager;
protected final PluginManager pluginManager;
+ protected final RuleChainService ruleChainService;
public RuleChainManagerActor(ActorSystemContext systemContext, RuleChainManager ruleChainManager, PluginManager pluginManager) {
super(systemContext);
this.ruleChainManager = ruleChainManager;
this.pluginManager = pluginManager;
+ this.ruleChainService = systemContext.getRuleChainService();
}
protected void initRuleChains() {
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainToRuleNodeMsg.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainToRuleNodeMsg.java
new file mode 100644
index 0000000..d6e8262
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainToRuleNodeMsg.java
@@ -0,0 +1,23 @@
+package org.thingsboard.server.actors.ruleChain;
+
+import lombok.Data;
+import org.thingsboard.rule.engine.api.TbContext;
+import org.thingsboard.server.common.data.id.RuleNodeId;
+import org.thingsboard.server.common.msg.MsgType;
+import org.thingsboard.server.common.msg.TbActorMsg;
+import org.thingsboard.server.common.msg.TbMsg;
+
+/**
+ * Created by ashvayka on 19.03.18.
+ */
+@Data
+final class RuleChainToRuleNodeMsg implements TbActorMsg {
+
+ private final TbContext ctx;
+ private final TbMsg msg;
+
+ @Override
+ public MsgType getMsgType() {
+ return MsgType.RULE_CHAIN_TO_RULE_MSG;
+ }
+}
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActor.java
new file mode 100644
index 0000000..cba19d1
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActor.java
@@ -0,0 +1,72 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ * <p>
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.actors.ruleChain;
+
+import org.thingsboard.server.actors.ActorSystemContext;
+import org.thingsboard.server.actors.service.ComponentActor;
+import org.thingsboard.server.actors.service.ContextBasedCreator;
+import org.thingsboard.server.common.data.id.RuleChainId;
+import org.thingsboard.server.common.data.id.RuleNodeId;
+import org.thingsboard.server.common.data.id.TenantId;
+import org.thingsboard.server.common.msg.TbActorMsg;
+
+public class RuleNodeActor extends ComponentActor<RuleNodeId, RuleNodeActorMessageProcessor> {
+
+ private final RuleChainId ruleChainId;
+
+ private RuleNodeActor(ActorSystemContext systemContext, TenantId tenantId, RuleChainId ruleChainId, RuleNodeId ruleNodeId) {
+ super(systemContext, tenantId, ruleNodeId);
+ this.ruleChainId = ruleChainId;
+ setProcessor(new RuleNodeActorMessageProcessor(tenantId, ruleChainId, ruleNodeId, systemContext,
+ logger, context().parent(), context().self()));
+ }
+
+ @Override
+ protected void process(TbActorMsg msg) {
+ switch (msg.getMsgType()) {
+ case RULE_CHAIN_TO_RULE_MSG:
+ processor.onRuleChainToRuleNodeMsg((RuleChainToRuleNodeMsg) msg);
+ break;
+ }
+ }
+
+ public static class ActorCreator extends ContextBasedCreator<RuleNodeActor> {
+ private static final long serialVersionUID = 1L;
+
+ private final TenantId tenantId;
+ private final RuleChainId ruleChainId;
+ private final RuleNodeId ruleNodeId;
+
+ public ActorCreator(ActorSystemContext context, TenantId tenantId, RuleChainId ruleChainId, RuleNodeId ruleNodeId) {
+ super(context);
+ this.tenantId = tenantId;
+ this.ruleChainId = ruleChainId;
+ this.ruleNodeId = ruleNodeId;
+
+ }
+
+ @Override
+ public RuleNodeActor create() throws Exception {
+ return new RuleNodeActor(context, tenantId, ruleChainId, ruleNodeId);
+ }
+ }
+
+ @Override
+ protected long getErrorPersistFrequency() {
+ return systemContext.getRuleNodeErrorPersistFrequency();
+ }
+
+}
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActorMessageProcessor.java
new file mode 100644
index 0000000..99e48d8
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActorMessageProcessor.java
@@ -0,0 +1,78 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ * <p>
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.actors.ruleChain;
+
+import akka.actor.ActorContext;
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import akka.event.LoggingAdapter;
+import org.thingsboard.server.actors.ActorSystemContext;
+import org.thingsboard.server.actors.service.DefaultActorService;
+import org.thingsboard.server.actors.shared.ComponentMsgProcessor;
+import org.thingsboard.server.common.data.EntityType;
+import org.thingsboard.server.common.data.id.EntityId;
+import org.thingsboard.server.common.data.id.RuleChainId;
+import org.thingsboard.server.common.data.id.RuleNodeId;
+import org.thingsboard.server.common.data.id.TenantId;
+import org.thingsboard.server.common.data.relation.EntityRelation;
+import org.thingsboard.server.common.data.rule.RuleChain;
+import org.thingsboard.server.common.data.rule.RuleNode;
+import org.thingsboard.server.common.msg.TbMsg;
+import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
+import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg;
+import org.thingsboard.server.dao.rule.RuleChainService;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @author Andrew Shvayka
+ */
+public class RuleNodeActorMessageProcessor extends ComponentMsgProcessor<RuleNodeId> {
+
+ private final ActorRef parent;
+ private final ActorRef self;
+ private final RuleChainService service;
+
+ RuleNodeActorMessageProcessor(TenantId tenantId, RuleChainId ruleChainId, RuleNodeId ruleNodeId, ActorSystemContext systemContext
+ , LoggingAdapter logger, ActorRef parent, ActorRef self) {
+ super(systemContext, logger, tenantId, ruleNodeId);
+ this.parent = parent;
+ this.self = self;
+ this.service = systemContext.getRuleChainService();
+ }
+
+ @Override
+ public void start(ActorContext context) throws Exception {
+
+ }
+
+ @Override
+ public void stop(ActorContext context) throws Exception {
+ }
+
+ @Override
+ public void onClusterEventMsg(ClusterEventMsg msg) throws Exception {
+
+ }
+
+ void onRuleChainToRuleNodeMsg(RuleChainToRuleNodeMsg msg) {
+
+ }
+
+}
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeCtx.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeCtx.java
new file mode 100644
index 0000000..dd25f8b
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeCtx.java
@@ -0,0 +1,15 @@
+package org.thingsboard.server.actors.ruleChain;
+
+import akka.actor.ActorRef;
+import lombok.Data;
+import org.thingsboard.server.common.data.id.RuleNodeId;
+
+/**
+ * Created by ashvayka on 19.03.18.
+ */
+@Data
+final class RuleNodeCtx {
+ private final ActorRef chainActor;
+ private final ActorRef self;
+ private final RuleNodeId selfId;
+}
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeRelation.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeRelation.java
new file mode 100644
index 0000000..bd2d544
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeRelation.java
@@ -0,0 +1,17 @@
+package org.thingsboard.server.actors.ruleChain;
+
+import lombok.Data;
+import org.thingsboard.server.common.data.id.EntityId;
+
+/**
+ * Created by ashvayka on 19.03.18.
+ */
+
+@Data
+final class RuleNodeRelation {
+
+ private final EntityId in;
+ private final EntityId out;
+ private final String type;
+
+}
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeToRuleChainTellNextMsg.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeToRuleChainTellNextMsg.java
new file mode 100644
index 0000000..95b3625
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeToRuleChainTellNextMsg.java
@@ -0,0 +1,24 @@
+package org.thingsboard.server.actors.ruleChain;
+
+import lombok.Data;
+import org.thingsboard.server.common.data.id.RuleNodeId;
+import org.thingsboard.server.common.msg.MsgType;
+import org.thingsboard.server.common.msg.TbActorMsg;
+import org.thingsboard.server.common.msg.TbMsg;
+
+/**
+ * Created by ashvayka on 19.03.18.
+ */
+@Data
+final class RuleNodeToRuleChainTellNextMsg implements TbActorMsg {
+
+ private final RuleNodeId originator;
+ private final String relationType;
+ private final TbMsg msg;
+
+ @Override
+ public MsgType getMsgType() {
+ return MsgType.RULE_TO_RULE_CHAIN_TELL_NEXT_MSG;
+ }
+
+}
diff --git a/application/src/main/java/org/thingsboard/server/actors/service/ActorService.java b/application/src/main/java/org/thingsboard/server/actors/service/ActorService.java
index 0f66d25..0be0385 100644
--- a/application/src/main/java/org/thingsboard/server/actors/service/ActorService.java
+++ b/application/src/main/java/org/thingsboard/server/actors/service/ActorService.java
@@ -17,6 +17,8 @@ package org.thingsboard.server.actors.service;
import org.thingsboard.server.common.data.id.*;
import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
+import org.thingsboard.server.common.msg.TbMsg;
+import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg;
import org.thingsboard.server.common.transport.SessionMsgProcessor;
import org.thingsboard.server.service.cluster.discovery.DiscoveryServiceListener;
import org.thingsboard.server.service.cluster.rpc.RpcMsgListener;
@@ -25,6 +27,8 @@ public interface ActorService extends SessionMsgProcessor, WebSocketMsgProcessor
void onEntityStateChange(TenantId tenantId, EntityId entityId, ComponentLifecycleEvent state);
+ void onMsg(ServiceToRuleEngineMsg msg);
+
void onCredentialsUpdate(TenantId tenantId, DeviceId deviceId);
void onDeviceNameOrTypeUpdate(TenantId tenantId, DeviceId deviceId, String deviceName, String deviceType);
diff --git a/application/src/main/java/org/thingsboard/server/actors/service/ComponentActor.java b/application/src/main/java/org/thingsboard/server/actors/service/ComponentActor.java
index 76b9be9..d0260dd 100644
--- a/application/src/main/java/org/thingsboard/server/actors/service/ComponentActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/service/ComponentActor.java
@@ -54,7 +54,7 @@ public abstract class ComponentActor<T extends EntityId, P extends ComponentMsgP
@Override
public void preStart() {
try {
- processor.start();
+ processor.start(context());
logLifecycleEvent(ComponentLifecycleEvent.STARTED);
if (systemContext.isStatisticsEnabled()) {
scheduleStatsPersistTick();
@@ -78,7 +78,7 @@ public abstract class ComponentActor<T extends EntityId, P extends ComponentMsgP
@Override
public void postStop() {
try {
- processor.stop();
+ processor.stop(context());
logLifecycleEvent(ComponentLifecycleEvent.STOPPED);
} catch (Exception e) {
logger.warning("[{}][{}] Failed to stop {} processor: {}", tenantId, id, id.getEntityType(), e.getMessage());
diff --git a/application/src/main/java/org/thingsboard/server/actors/service/ContextAwareActor.java b/application/src/main/java/org/thingsboard/server/actors/service/ContextAwareActor.java
index 825c971..9cb0fc4 100644
--- a/application/src/main/java/org/thingsboard/server/actors/service/ContextAwareActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/service/ContextAwareActor.java
@@ -16,9 +16,13 @@
package org.thingsboard.server.actors.service;
import akka.actor.UntypedActor;
+import akka.event.Logging;
+import akka.event.LoggingAdapter;
import org.thingsboard.server.actors.ActorSystemContext;
+import org.thingsboard.server.common.msg.TbActorMsg;
public abstract class ContextAwareActor extends UntypedActor {
+ protected final LoggingAdapter logger = Logging.getLogger(getContext().system(), this);
public static final int ENTITY_PACK_LIMIT = 1024;
@@ -28,4 +32,19 @@ public abstract class ContextAwareActor extends UntypedActor {
super();
this.systemContext = systemContext;
}
+
+ @Override
+ public void onReceive(Object msg) throws Exception {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Processing msg: {}", msg);
+ }
+ if (msg instanceof TbActorMsg) {
+ process((TbActorMsg) msg);
+ }
+ else {
+ logger.warning("Unknown message: {}!", msg);
+ }
+ }
+
+ protected abstract void process(TbActorMsg msg);
}
diff --git a/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java b/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java
index 7b62687..166f37b 100644
--- a/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java
+++ b/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java
@@ -37,6 +37,7 @@ import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
import org.thingsboard.server.common.msg.cluster.ServerAddress;
import org.thingsboard.server.common.msg.cluster.ToAllNodesMsg;
import org.thingsboard.server.common.msg.core.ToDeviceSessionActorMsg;
+import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg;
import org.thingsboard.server.extensions.api.device.DeviceNameOrTypeUpdateMsg;
import org.thingsboard.server.common.msg.device.ToDeviceActorMsg;
import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
@@ -126,6 +127,11 @@ public class DefaultActorService implements ActorService {
}
@Override
+ public void onMsg(ServiceToRuleEngineMsg msg) {
+ appActor.tell(msg, ActorRef.noSender());
+ }
+
+ @Override
public void process(SessionAwareMsg msg) {
log.debug("Processing session aware msg: {}", msg);
sessionManagerActor.tell(msg, ActorRef.noSender());
diff --git a/application/src/main/java/org/thingsboard/server/actors/shared/AbstractContextAwareMsgProcessor.java b/application/src/main/java/org/thingsboard/server/actors/shared/AbstractContextAwareMsgProcessor.java
index 73b221f..e1313d2 100644
--- a/application/src/main/java/org/thingsboard/server/actors/shared/AbstractContextAwareMsgProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/shared/AbstractContextAwareMsgProcessor.java
@@ -102,9 +102,6 @@ public abstract class AbstractContextAwareMsgProcessor {
case FILTER:
configurationClazz = ((Filter) componentClazz.getAnnotation(Filter.class)).configuration();
break;
- case PROCESSOR:
- configurationClazz = ((Processor) componentClazz.getAnnotation(Processor.class)).configuration();
- break;
case ACTION:
configurationClazz = ((Action) componentClazz.getAnnotation(Action.class)).configuration();
break;
diff --git a/application/src/main/java/org/thingsboard/server/actors/shared/ComponentMsgProcessor.java b/application/src/main/java/org/thingsboard/server/actors/shared/ComponentMsgProcessor.java
index 18d32d9..fd72bb1 100644
--- a/application/src/main/java/org/thingsboard/server/actors/shared/ComponentMsgProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/shared/ComponentMsgProcessor.java
@@ -33,21 +33,36 @@ public abstract class ComponentMsgProcessor<T> extends AbstractContextAwareMsgPr
this.entityId = id;
}
- public abstract void start() throws Exception;
+ public abstract void start(ActorContext context) throws Exception;
- public abstract void stop() throws Exception;
+ public abstract void stop(ActorContext context) throws Exception;
- public abstract void onCreated(ActorContext context) throws Exception;
+ public abstract void onClusterEventMsg(ClusterEventMsg msg) throws Exception;
- public abstract void onUpdate(ActorContext context) throws Exception;
+ public void onCreated(ActorContext context) throws Exception {
+ start(context);
+ }
- public abstract void onActivate(ActorContext context) throws Exception;
+ public void onUpdate(ActorContext context) throws Exception {
+ restart(context);
+ }
- public abstract void onSuspend(ActorContext context) throws Exception;
+ public void onActivate(ActorContext context) throws Exception {
+ restart(context);
+ }
- public abstract void onStop(ActorContext context) throws Exception;
+ public void onSuspend(ActorContext context) throws Exception {
+ stop(context);
+ }
- public abstract void onClusterEventMsg(ClusterEventMsg msg) throws Exception;
+ public void onStop(ActorContext context) throws Exception {
+ stop(context);
+ }
+
+ private void restart(ActorContext context) throws Exception {
+ stop(context);
+ start(context);
+ }
public void scheduleStatsPersistTick(ActorContext context, long statsPersistFrequency) {
schedulePeriodicMsgWithDelay(context, new StatsPersistTick(), statsPersistFrequency, statsPersistFrequency);
diff --git a/application/src/main/java/org/thingsboard/server/actors/shared/EntityActorsManager.java b/application/src/main/java/org/thingsboard/server/actors/shared/EntityActorsManager.java
index a98609c..917a645 100644
--- a/application/src/main/java/org/thingsboard/server/actors/shared/EntityActorsManager.java
+++ b/application/src/main/java/org/thingsboard/server/actors/shared/EntityActorsManager.java
@@ -43,13 +43,16 @@ public abstract class EntityActorsManager<T extends EntityId, A extends UntypedA
public void init(ActorContext context) {
for (M entity : new PageDataIterable<>(getFetchEntitiesFunction(), ContextAwareActor.ENTITY_PACK_LIMIT)) {
- log.debug("[{}] Creating plugin actor", entity.getId());
+ T entityId = (T) entity.getId();
+ log.debug("[{}|{}] Creating entity actor", entityId.getEntityType(), entityId.getId());
//TODO: remove this cast making UUIDBased subclass of EntityId an interface and vice versa.
- getOrCreateActor(context, (T) entity.getId());
- log.debug("[{}] Plugin actor created.", entity.getId());
+ ActorRef actorRef = getOrCreateActor(context, entityId);
+ visit(entity, actorRef);
+ log.debug("[{}|{}] Entity actor created.", entityId.getEntityType(), entityId.getId());
}
}
+ protected void visit(M entity, ActorRef actorRef) {}
public ActorRef getOrCreateActor(ActorContext context, T entityId) {
return actors.computeIfAbsent(entityId, eId ->
diff --git a/application/src/main/java/org/thingsboard/server/actors/shared/rulechain/RuleChainManager.java b/application/src/main/java/org/thingsboard/server/actors/shared/rulechain/RuleChainManager.java
index 0939611..97acb6c 100644
--- a/application/src/main/java/org/thingsboard/server/actors/shared/rulechain/RuleChainManager.java
+++ b/application/src/main/java/org/thingsboard/server/actors/shared/rulechain/RuleChainManager.java
@@ -1,6 +1,8 @@
package org.thingsboard.server.actors.shared.rulechain;
+import akka.actor.ActorRef;
import akka.japi.Creator;
+import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.actors.ruleChain.RuleChainActor;
@@ -16,6 +18,10 @@ import org.thingsboard.server.dao.rule.RuleChainService;
public abstract class RuleChainManager extends EntityActorsManager<RuleChainId, RuleChainActor, RuleChain> {
protected final RuleChainService service;
+ @Getter
+ protected RuleChain rootChain;
+ @Getter
+ protected ActorRef rootChainActor;
public RuleChainManager(ActorSystemContext systemContext) {
super(systemContext);
@@ -27,4 +33,12 @@ public abstract class RuleChainManager extends EntityActorsManager<RuleChainId,
return new RuleChainActor.ActorCreator(systemContext, getTenantId(), entityId);
}
+ @Override
+ protected void visit(RuleChain entity, ActorRef actorRef) {
+ if (entity.isRoot()) {
+ rootChain = entity;
+ rootChainActor = actorRef;
+ }
+ }
+
}
diff --git a/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java b/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java
index 8621179..a51e7a2 100644
--- a/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java
@@ -27,10 +27,12 @@ import org.thingsboard.server.actors.service.ContextBasedCreator;
import org.thingsboard.server.actors.service.DefaultActorService;
import org.thingsboard.server.actors.shared.plugin.TenantPluginManager;
import org.thingsboard.server.actors.shared.rulechain.TenantRuleChainManager;
-import org.thingsboard.server.common.data.id.*;
-import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
+import org.thingsboard.server.common.data.id.DeviceId;
+import org.thingsboard.server.common.data.id.TenantId;
+import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.device.ToDeviceActorMsg;
import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
+import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg;
import org.thingsboard.server.extensions.api.device.ToDeviceActorNotificationMsg;
import org.thingsboard.server.extensions.api.plugins.msg.ToPluginActorMsg;
@@ -39,8 +41,6 @@ import java.util.Map;
public class TenantActor extends RuleChainManagerActor {
- private final LoggingAdapter logger = Logging.getLogger(getContext().system(), this);
-
private final TenantId tenantId;
private final Map<DeviceId, ActorRef> deviceActors;
@@ -62,25 +62,42 @@ public class TenantActor extends RuleChainManagerActor {
}
@Override
- public void onReceive(Object msg) throws Exception {
- logger.debug("[{}] Received message: {}", tenantId, msg);
- if (msg instanceof ToDeviceActorMsg) {
- onToDeviceActorMsg((ToDeviceActorMsg) msg);
- } else if (msg instanceof ToPluginActorMsg) {
- onToPluginMsg((ToPluginActorMsg) msg);
- } else if (msg instanceof ToDeviceActorNotificationMsg) {
- onToDeviceActorMsg((ToDeviceActorNotificationMsg) msg);
- } else if (msg instanceof ClusterEventMsg) {
- broadcast(msg);
- } else if (msg instanceof ComponentLifecycleMsg) {
- onComponentLifecycleMsg((ComponentLifecycleMsg) msg);
- } else if (msg instanceof PluginTerminationMsg) {
- onPluginTerminated((PluginTerminationMsg) msg);
- } else {
- logger.warning("[{}] Unknown message: {}!", tenantId, msg);
+ protected void process(TbActorMsg msg) {
+ switch (msg.getMsgType()) {
+ case COMPONENT_LIFE_CYCLE_MSG:
+ onComponentLifecycleMsg((ComponentLifecycleMsg) msg);
+ break;
+ case SERVICE_TO_RULE_ENGINE_MSG:
+ onServiceToRuleEngineMsg((ServiceToRuleEngineMsg) msg);
+ break;
}
}
+ private void onServiceToRuleEngineMsg(ServiceToRuleEngineMsg msg) {
+ ruleChainManager.getRootChainActor().tell(msg, self());
+ }
+
+
+// @Override
+// public void onReceive(Object msg) throws Exception {
+// logger.debug("[{}] Received message: {}", tenantId, msg);
+// if (msg instanceof ToDeviceActorMsg) {
+// onToDeviceActorMsg((ToDeviceActorMsg) msg);
+// } else if (msg instanceof ToPluginActorMsg) {
+// onToPluginMsg((ToPluginActorMsg) msg);
+// } else if (msg instanceof ToDeviceActorNotificationMsg) {
+// onToDeviceActorMsg((ToDeviceActorNotificationMsg) msg);
+// } else if (msg instanceof ClusterEventMsg) {
+// broadcast(msg);
+// } else if (msg instanceof ComponentLifecycleMsg) {
+// onComponentLifecycleMsg((ComponentLifecycleMsg) msg);
+// } else if (msg instanceof PluginTerminationMsg) {
+// onPluginTerminated((PluginTerminationMsg) msg);
+// } else {
+// logger.warning("[{}] Unknown message: {}!", tenantId, msg);
+// }
+// }
+
private void broadcast(Object msg) {
pluginManager.broadcast(msg);
deviceActors.values().forEach(actorRef -> actorRef.tell(msg, ActorRef.noSender()));
diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml
index 1c842c8..778c2a3 100644
--- a/application/src/main/resources/thingsboard.yml
+++ b/application/src/main/resources/thingsboard.yml
@@ -215,6 +215,12 @@ actors:
termination.delay: "${ACTORS_RULE_TERMINATION_DELAY:30000}"
# Errors for particular actor are persisted once per specified amount of milliseconds
error_persist_frequency: "${ACTORS_RULE_ERROR_FREQUENCY:3000}"
+ chain:
+ # Errors for particular actor are persisted once per specified amount of milliseconds
+ error_persist_frequency: "${ACTORS_RULE_CHAIN_ERROR_FREQUENCY:3000}"
+ node:
+ # Errors for particular actor are persisted once per specified amount of milliseconds
+ error_persist_frequency: "${ACTORS_RULE_NODE_ERROR_FREQUENCY:3000}"
statistics:
# Enable/disable actor statistics
enabled: "${ACTORS_STATISTICS_ENABLED:true}"
diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java b/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java
index a776d7b..5624ed8 100644
--- a/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java
+++ b/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java
@@ -37,6 +37,8 @@ public class DataConstants {
public static final String ERROR = "ERROR";
public static final String LC_EVENT = "LC_EVENT";
public static final String STATS = "STATS";
+ public static final String RULE_CHAIN_DEBUG = "DEBUG_RULE_CHAIN";
+ public static final String RULE_NODE_DEBUG = "DEBUG_RULE_NODE";
public static final String ONEWAY = "ONEWAY";
public static final String TWOWAY = "TWOWAY";
diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/plugin/ComponentType.java b/common/data/src/main/java/org/thingsboard/server/common/data/plugin/ComponentType.java
index 45fb590..ab6acca 100644
--- a/common/data/src/main/java/org/thingsboard/server/common/data/plugin/ComponentType.java
+++ b/common/data/src/main/java/org/thingsboard/server/common/data/plugin/ComponentType.java
@@ -20,6 +20,6 @@ package org.thingsboard.server.common.data.plugin;
*/
public enum ComponentType {
- FILTER, PROCESSOR, ACTION, PLUGIN
+ ENRICHMENT, FILTER, PROCESSOR, TRANSFORMATION, ACTION, PLUGIN
}
diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java b/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java
index 55c131d..22fc428 100644
--- a/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java
+++ b/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java
@@ -4,4 +4,34 @@ package org.thingsboard.server.common.msg;
* Created by ashvayka on 15.03.18.
*/
public enum MsgType {
+
+ /**
+ * ADDED/UPDATED/DELETED events for main entities.
+ *
+ * @See {@link org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg}
+ */
+ COMPONENT_LIFE_CYCLE_MSG,
+
+ /**
+ * Misc messages from the REST API/SERVICE layer to the new rule engine.
+ *
+ * @See {@link org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg}
+ */
+ SERVICE_TO_RULE_ENGINE_MSG,
+
+
+ SESSION_TO_DEVICE_ACTOR_MSG,
+ DEVICE_ACTOR_TO_SESSION_MSG,
+
+
+ /**
+ * Message that is sent by RuleChainActor to RuleActor with command to process TbMsg.
+ */
+ RULE_CHAIN_TO_RULE_MSG,
+
+ /**
+ * Message that is sent by RuleActor to RuleChainActor with command to process TbMsg by next nodes in chain.
+ */
+ RULE_TO_RULE_CHAIN_TELL_NEXT_MSG,
+
}
diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/plugin/ComponentLifecycleMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/plugin/ComponentLifecycleMsg.java
index b77722e..315eb86 100644
--- a/common/message/src/main/java/org/thingsboard/server/common/msg/plugin/ComponentLifecycleMsg.java
+++ b/common/message/src/main/java/org/thingsboard/server/common/msg/plugin/ComponentLifecycleMsg.java
@@ -21,6 +21,8 @@ import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.id.*;
import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
import org.thingsboard.server.common.data.rule.RuleChain;
+import org.thingsboard.server.common.msg.MsgType;
+import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.aware.TenantAwareMsg;
import org.thingsboard.server.common.msg.cluster.ToAllNodesMsg;
@@ -30,7 +32,7 @@ import java.util.Optional;
* @author Andrew Shvayka
*/
@ToString
-public class ComponentLifecycleMsg implements TenantAwareMsg, ToAllNodesMsg {
+public class ComponentLifecycleMsg implements TbActorMsg, TenantAwareMsg, ToAllNodesMsg {
@Getter
private final TenantId tenantId;
@Getter
@@ -56,4 +58,8 @@ public class ComponentLifecycleMsg implements TenantAwareMsg, ToAllNodesMsg {
return entityId.getEntityType() == EntityType.RULE_CHAIN ? Optional.of((RuleChainId) entityId) : Optional.empty();
}
+ @Override
+ public MsgType getMsgType() {
+ return MsgType.COMPONENT_LIFE_CYCLE_MSG;
+ }
}
diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/system/ServiceToRuleEngineMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/system/ServiceToRuleEngineMsg.java
new file mode 100644
index 0000000..3d82033
--- /dev/null
+++ b/common/message/src/main/java/org/thingsboard/server/common/msg/system/ServiceToRuleEngineMsg.java
@@ -0,0 +1,22 @@
+package org.thingsboard.server.common.msg.system;
+
+import lombok.Data;
+import org.thingsboard.server.common.data.id.TenantId;
+import org.thingsboard.server.common.msg.MsgType;
+import org.thingsboard.server.common.msg.TbActorMsg;
+import org.thingsboard.server.common.msg.TbMsg;
+
+/**
+ * Created by ashvayka on 15.03.18.
+ */
+@Data
+public final class ServiceToRuleEngineMsg implements TbActorMsg {
+
+ private final TenantId tenantId;
+ private final TbMsg tbMsg;
+
+ @Override
+ public MsgType getMsgType() {
+ return MsgType.SERVICE_TO_RULE_ENGINE_MSG;
+ }
+}
diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java
index 5163b6c..c689b24 100644
--- a/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java
+++ b/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java
@@ -1,12 +1,12 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
- *
+ * <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -36,7 +36,7 @@ public final class TbMsg implements Serializable {
private final String type;
private final EntityId originator;
private final TbMsgMetaData metaData;
-
+ private final TbMsgDataType dataType;
private final byte[] data;
public static ByteBuffer toBytes(TbMsg msg) {
@@ -49,11 +49,10 @@ public final class TbMsg implements Serializable {
}
if (msg.getMetaData() != null) {
- MsgProtos.TbMsgProto.TbMsgMetaDataProto.Builder metadataBuilder = MsgProtos.TbMsgProto.TbMsgMetaDataProto.newBuilder();
- metadataBuilder.putAllData(msg.getMetaData().getData());
- builder.addMetaData(metadataBuilder.build());
+ builder.setMetaData(MsgProtos.TbMsgMetaDataProto.newBuilder().putAllData(msg.getMetaData().getData()).build());
}
+ builder.setDataType(msg.getDataType().ordinal());
builder.setData(ByteString.copyFrom(msg.getData()));
byte[] bytes = builder.build().toByteArray();
return ByteBuffer.wrap(bytes);
@@ -63,16 +62,11 @@ public final class TbMsg implements Serializable {
try {
MsgProtos.TbMsgProto proto = MsgProtos.TbMsgProto.parseFrom(buffer.array());
TbMsgMetaData metaData = new TbMsgMetaData();
- if (proto.getMetaDataCount() > 0) {
- metaData.setData(proto.getMetaData(0).getDataMap());
- }
-
- EntityId entityId = null;
- if (proto.getEntityId() != null) {
- entityId = EntityIdFactory.getByTypeAndId(proto.getEntityType(), proto.getEntityId());
- }
+ metaData.setData(proto.getMetaData().getDataMap());
- return new TbMsg(UUID.fromString(proto.getId()), proto.getType(), entityId, metaData, proto.getData().toByteArray());
+ EntityId entityId = EntityIdFactory.getByTypeAndId(proto.getEntityType(), proto.getEntityId());
+ TbMsgDataType dataType = TbMsgDataType.values()[proto.getDataType()];
+ return new TbMsg(UUID.fromString(proto.getId()), proto.getType(), entityId, metaData, dataType, proto.getData().toByteArray());
} catch (InvalidProtocolBufferException e) {
throw new IllegalStateException("Could not parse protobuf for TbMsg", e);
}
diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsgDataType.java b/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsgDataType.java
new file mode 100644
index 0000000..b6e2d5a
--- /dev/null
+++ b/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsgDataType.java
@@ -0,0 +1,11 @@
+package org.thingsboard.server.common.msg;
+
+/**
+ * Created by ashvayka on 15.03.18.
+ */
+public enum TbMsgDataType {
+
+ // Do not change ordering. We use ordinal to save some bytes on serialization
+ JSON, TEXT, BINARY;
+
+}
diff --git a/common/message/src/main/proto/tbmsg.proto b/common/message/src/main/proto/tbmsg.proto
index 90fa2bd..62acff2 100644
--- a/common/message/src/main/proto/tbmsg.proto
+++ b/common/message/src/main/proto/tbmsg.proto
@@ -19,6 +19,9 @@ package msgqueue;
option java_package = "org.thingsboard.server.common.msg.gen";
option java_outer_classname = "MsgProtos";
+message TbMsgMetaDataProto {
+ map<string, string> data = 1;
+}
message TbMsgProto {
string id = 1;
@@ -26,11 +29,8 @@ message TbMsgProto {
string entityType = 3;
string entityId = 4;
- message TbMsgMetaDataProto {
- map<string, string> data = 1;
- }
+ TbMsgMetaDataProto metaData = 5;
- repeated TbMsgMetaDataProto metaData = 5;
-
- bytes data = 6;
+ int32 dataType = 6;
+ bytes data = 7;
}
\ No newline at end of file
diff --git a/dao/src/main/java/org/thingsboard/server/dao/rule/BaseRuleService.java b/dao/src/main/java/org/thingsboard/server/dao/rule/BaseRuleService.java
index f1df09e..fff3f6d 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/rule/BaseRuleService.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/rule/BaseRuleService.java
@@ -16,7 +16,6 @@
package org.thingsboard.server.dao.rule;
import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.node.ArrayNode;
import com.google.common.util.concurrent.ListenableFuture;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
@@ -67,67 +66,7 @@ public class BaseRuleService extends AbstractEntityService implements RuleServic
@Override
public RuleMetaData saveRule(RuleMetaData rule) {
- ruleValidator.validate(rule);
- if (rule.getTenantId() == null) {
- log.trace("Save system rule metadata with predefined id {}", systemTenantId);
- rule.setTenantId(systemTenantId);
- }
- if (rule.getId() != null) {
- RuleMetaData oldVersion = ruleDao.findById(rule.getId());
- if (rule.getState() == null) {
- rule.setState(oldVersion.getState());
- } else if (rule.getState() != oldVersion.getState()) {
- throw new IncorrectParameterException("Use Activate/Suspend method to control state of the rule!");
- }
- } else {
- if (rule.getState() == null) {
- rule.setState(ComponentLifecycleState.SUSPENDED);
- } else if (rule.getState() != ComponentLifecycleState.SUSPENDED) {
- throw new IncorrectParameterException("Use Activate/Suspend method to control state of the rule!");
- }
- }
-
- validateFilters(rule.getFilters());
- if (rule.getProcessor() != null && !rule.getProcessor().isNull()) {
- validateComponentJson(rule.getProcessor(), ComponentType.PROCESSOR);
- }
- if (rule.getAction() != null && !rule.getAction().isNull()) {
- validateComponentJson(rule.getAction(), ComponentType.ACTION);
- }
- validateRuleAndPluginState(rule);
- return ruleDao.save(rule);
- }
-
- private void validateFilters(JsonNode filtersJson) {
- if (filtersJson == null || filtersJson.isNull()) {
- throw new IncorrectParameterException("Rule filters are required!");
- }
- if (!filtersJson.isArray()) {
- throw new IncorrectParameterException("Filters json is not an array!");
- }
- ArrayNode filtersArray = (ArrayNode) filtersJson;
- for (int i = 0; i < filtersArray.size(); i++) {
- validateComponentJson(filtersArray.get(i), ComponentType.FILTER);
- }
- }
-
- private void validateComponentJson(JsonNode json, ComponentType type) {
- if (json == null || json.isNull()) {
- throw new IncorrectParameterException(type.name() + " is required!");
- }
- String clazz = getIfValid(type.name(), json, "clazz", JsonNode::isTextual, JsonNode::asText);
- String name = getIfValid(type.name(), json, "name", JsonNode::isTextual, JsonNode::asText);
- JsonNode configuration = getIfValid(type.name(), json, "configuration", JsonNode::isObject, node -> node);
- ComponentDescriptor descriptor = componentDescriptorService.findByClazz(clazz);
- if (descriptor == null) {
- throw new IncorrectParameterException(type.name() + " clazz " + clazz + " is not a valid component!");
- }
- if (descriptor.getType() != type) {
- throw new IncorrectParameterException("Clazz " + clazz + " is not a valid " + type.name() + " component!");
- }
- if (!componentDescriptorService.validate(descriptor, configuration)) {
- throw new IncorrectParameterException(type.name() + " configuration is not valid!");
- }
+ throw new RuntimeException("Not supported since v1.5!");
}
private void validateRuleAndPluginState(RuleMetaData rule) {
diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java
index 07cd72c..a1c85e2 100644
--- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java
+++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java
@@ -38,7 +38,7 @@ public interface TbContext {
void spawn(TbMsg msg);
- void ack(UUID msg);
+ void ack(TbMsg msg);
AttributesService getAttributesService();
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbTransformNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbTransformNode.java
index 9edd14c..d1a4adc 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbTransformNode.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbTransformNode.java
@@ -44,6 +44,7 @@ public class TbTransformNode implements TbNode {
try {
//TODO: refactor this to work async and fetch attributes from cache.
AttributesService service = ctx.getAttributesService();
+
fetchAttributes(msg, service, config.getClientAttributeNames(), DataConstants.CLIENT_SCOPE, "cs.");
fetchAttributes(msg, service, config.getServerAttributeNames(), DataConstants.SERVER_SCOPE, "ss.");
fetchAttributes(msg, service, config.getSharedAttributeNames(), DataConstants.SHARED_SCOPE, "shared.");