thingsboard-memoizeit
Changes
application/pom.xml 8(+8 -0)
application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java 63(+28 -35)
application/src/main/java/org/thingsboard/server/actors/plugin/PluginActorMessageProcessor.java 8(+4 -4)
application/src/main/java/org/thingsboard/server/actors/rule/ChainProcessingContext.java 117(+0 -117)
application/src/main/java/org/thingsboard/server/actors/rule/RuleActorMessageProcessor.java 345(+0 -345)
application/src/main/java/org/thingsboard/server/actors/rule/RuleProcessingContext.java 115(+0 -115)
application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java 154(+154 -0)
application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java 194(+194 -0)
application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainManagerActor.java 61(+61 -0)
application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainToRuleNodeMsg.java 31(+16 -15)
application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActorMessageProcessor.java 99(+99 -0)
application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeToRuleChainTellNextMsg.java 34(+17 -17)
application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeToSelfErrorMsg.java 27(+15 -12)
application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java 23(+10 -13)
application/src/main/java/org/thingsboard/server/actors/shared/AbstractContextAwareMsgProcessor.java 3(+0 -3)
application/src/main/java/org/thingsboard/server/actors/shared/ComponentMsgProcessor.java 39(+31 -8)
application/src/main/java/org/thingsboard/server/actors/shared/plugin/SystemPluginManager.java 4(+2 -2)
application/src/main/java/org/thingsboard/server/actors/shared/plugin/TenantPluginManager.java 5(+3 -2)
application/src/main/java/org/thingsboard/server/actors/shared/rulechain/RuleChainManager.java 59(+59 -0)
application/src/main/java/org/thingsboard/server/actors/shared/rulechain/SystemRuleChainManager.java 25(+16 -9)
application/src/main/java/org/thingsboard/server/actors/shared/rulechain/TenantRuleChainManager.java 25(+16 -9)
application/src/main/java/org/thingsboard/server/service/component/AnnotationComponentDiscoveryService.java 49(+34 -15)
application/src/test/java/org/thingsboard/server/controller/AbstractRuleEngineControllerTest.java 56(+56 -0)
application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java 190(+190 -0)
application/src/test/java/org/thingsboard/server/rules/flow/RuleEngineFlowSqlIntegrationTest.java 9(+6 -3)
application/src/test/java/org/thingsboard/server/rules/lifecycle/AbstractRuleEngineLifecycleIntegrationTest.java 158(+158 -0)
application/src/test/java/org/thingsboard/server/rules/lifecycle/RuleEngineLifecycleSqlIntegrationTest.java 16(+6 -10)
common/data/src/main/java/org/thingsboard/server/common/data/rule/NodeConnectionInfo.java 15(+10 -5)
common/data/src/main/java/org/thingsboard/server/common/data/rule/RuleChainConnectionInfo.java 27(+9 -18)
common/message/src/main/java/org/thingsboard/server/common/msg/plugin/ComponentLifecycleMsg.java 42(+21 -21)
common/message/src/main/java/org/thingsboard/server/common/msg/system/ServiceToRuleEngineMsg.java 28(+16 -12)
dao/src/test/java/org/thingsboard/server/dao/service/queue/cassandra/repository/impl/CassandraMsgRepositoryTest.java 7(+4 -3)
dao/src/test/java/org/thingsboard/server/dao/service/queue/cassandra/UnprocessedMsgFilterTest.java 4(+2 -2)
pom.xml 5(+5 -0)
rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/ActionNode.java 43(+43 -0)
rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/EnrichmentNode.java 42(+42 -0)
rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/FilterNode.java 43(+43 -0)
rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbNodeConfiguration.java 4(+2 -2)
rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TransformationNode.java 43(+43 -0)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetAttributesNode.java 28(+17 -11)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetCustomerAttributeNode.java 2(+2 -0)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetRelatedAttributeNode.java 3(+3 -0)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTenantAttributeNode.java 2(+2 -0)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesTenantIdAsyncLoader.java 2(+0 -2)
rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/filter/TbJsFilterNodeTest.java 3(+1 -2)
rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/filter/TbJsSwitchNodeTest.java 3(+1 -2)
rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/TbGetCustomerAttributeNodeTest.java 6(+2 -4)
Details
application/pom.xml 8(+8 -0)
diff --git a/application/pom.xml b/application/pom.xml
index 8449246..6da40a9 100644
--- a/application/pom.xml
+++ b/application/pom.xml
@@ -54,6 +54,14 @@
<artifactId>extensions-api</artifactId>
</dependency>
<dependency>
+ <groupId>org.thingsboard.rule-engine</groupId>
+ <artifactId>rule-engine-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.thingsboard.rule-engine</groupId>
+ <artifactId>rule-engine-components</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.thingsboard</groupId>
<artifactId>extensions-core</artifactId>
</dependency>
diff --git a/application/src/main/data/upgrade/1.5.0/schema_update.cql b/application/src/main/data/upgrade/1.5.0/schema_update.cql
index aa8b10b..ab68846 100644
--- a/application/src/main/data/upgrade/1.5.0/schema_update.cql
+++ b/application/src/main/data/upgrade/1.5.0/schema_update.cql
@@ -69,6 +69,7 @@ CREATE TABLE IF NOT EXISTS thingsboard.rule_chain (
search_text text,
first_rule_node_id uuid,
root boolean,
+ debug_mode boolean,
configuration text,
additional_info text,
PRIMARY KEY (id, tenant_id)
@@ -85,6 +86,7 @@ CREATE TABLE IF NOT EXISTS thingsboard.rule_node (
id uuid,
type text,
name text,
+ debug_mode boolean,
search_text text,
configuration text,
additional_info text,
diff --git a/application/src/main/data/upgrade/1.5.0/schema_update.sql b/application/src/main/data/upgrade/1.5.0/schema_update.sql
index 0043ed5..2bed6ad 100644
--- a/application/src/main/data/upgrade/1.5.0/schema_update.sql
+++ b/application/src/main/data/upgrade/1.5.0/schema_update.sql
@@ -21,6 +21,7 @@ CREATE TABLE IF NOT EXISTS rule_chain (
name varchar(255),
first_rule_node_id varchar(31),
root boolean,
+ debug_mode boolean,
search_text varchar(255),
tenant_id varchar(31)
);
@@ -31,5 +32,6 @@ CREATE TABLE IF NOT EXISTS rule_node (
configuration varchar(10000000),
type varchar(255),
name varchar(255),
+ debug_mode boolean,
search_text varchar(255)
);
\ No newline at end of file
diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
index f59ec63..a5a20b8 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
@@ -28,12 +28,16 @@ import lombok.Setter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
+import org.springframework.util.Base64Utils;
+import org.thingsboard.rule.engine.api.ListeningExecutor;
+import org.thingsboard.rule.engine.js.JsExecutorService;
import org.thingsboard.server.actors.service.ActorService;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.Event;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
+import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.cluster.ServerAddress;
import org.thingsboard.server.common.transport.auth.DeviceAuthService;
import org.thingsboard.server.controller.plugin.PluginWebSocketMsgEndpoint;
@@ -50,6 +54,7 @@ import org.thingsboard.server.dao.rule.RuleChainService;
import org.thingsboard.server.dao.rule.RuleService;
import org.thingsboard.server.dao.tenant.TenantService;
import org.thingsboard.server.dao.timeseries.TimeseriesService;
+import org.thingsboard.server.dao.user.UserService;
import org.thingsboard.server.service.cluster.discovery.DiscoveryService;
import org.thingsboard.server.service.cluster.routing.ClusterRoutingService;
import org.thingsboard.server.service.cluster.rpc.ClusterRpcService;
@@ -57,6 +62,7 @@ import org.thingsboard.server.service.component.ComponentDiscoveryService;
import java.io.PrintWriter;
import java.io.StringWriter;
+import java.nio.charset.StandardCharsets;
import java.util.Optional;
@Component
@@ -65,101 +71,154 @@ public class ActorSystemContext {
protected final ObjectMapper mapper = new ObjectMapper();
- @Getter @Setter private ActorService actorService;
+ @Getter
+ @Setter
+ private ActorService actorService;
@Autowired
- @Getter private DiscoveryService discoveryService;
+ @Getter
+ private DiscoveryService discoveryService;
@Autowired
- @Getter @Setter private ComponentDiscoveryService componentService;
+ @Getter
+ @Setter
+ private ComponentDiscoveryService componentService;
@Autowired
- @Getter private ClusterRoutingService routingService;
+ @Getter
+ private ClusterRoutingService routingService;
@Autowired
- @Getter private ClusterRpcService rpcService;
+ @Getter
+ private ClusterRpcService rpcService;
@Autowired
- @Getter private DeviceAuthService deviceAuthService;
+ @Getter
+ private DeviceAuthService deviceAuthService;
@Autowired
- @Getter private DeviceService deviceService;
+ @Getter
+ private DeviceService deviceService;
@Autowired
- @Getter private AssetService assetService;
+ @Getter
+ private AssetService assetService;
@Autowired
- @Getter private TenantService tenantService;
+ @Getter
+ private TenantService tenantService;
@Autowired
- @Getter private CustomerService customerService;
+ @Getter
+ private CustomerService customerService;
@Autowired
- @Getter private RuleService ruleService;
+ @Getter
+ private UserService userService;
@Autowired
- @Getter private RuleChainService ruleChainService;
+ @Getter
+ private RuleService ruleService;
@Autowired
- @Getter private PluginService pluginService;
+ @Getter
+ private RuleChainService ruleChainService;
@Autowired
- @Getter private TimeseriesService tsService;
+ @Getter
+ private PluginService pluginService;
@Autowired
- @Getter private AttributesService attributesService;
+ @Getter
+ private TimeseriesService tsService;
@Autowired
- @Getter private EventService eventService;
+ @Getter
+ private AttributesService attributesService;
@Autowired
- @Getter private AlarmService alarmService;
+ @Getter
+ private EventService eventService;
@Autowired
- @Getter private RelationService relationService;
+ @Getter
+ private AlarmService alarmService;
@Autowired
- @Getter private AuditLogService auditLogService;
+ @Getter
+ private RelationService relationService;
@Autowired
- @Getter @Setter private PluginWebSocketMsgEndpoint wsMsgEndpoint;
+ @Getter
+ private AuditLogService auditLogService;
+
+ @Autowired
+ @Getter
+ @Setter
+ private PluginWebSocketMsgEndpoint wsMsgEndpoint;
@Value("${actors.session.sync.timeout}")
- @Getter private long syncSessionTimeout;
+ @Getter
+ private long syncSessionTimeout;
@Value("${actors.plugin.termination.delay}")
- @Getter private long pluginActorTerminationDelay;
+ @Getter
+ private long pluginActorTerminationDelay;
@Value("${actors.plugin.processing.timeout}")
- @Getter private long pluginProcessingTimeout;
+ @Getter
+ private long pluginProcessingTimeout;
@Value("${actors.plugin.error_persist_frequency}")
- @Getter private long pluginErrorPersistFrequency;
+ @Getter
+ private long pluginErrorPersistFrequency;
+
+ @Value("${actors.rule.chain.error_persist_frequency}")
+ @Getter
+ private long ruleChainErrorPersistFrequency;
+
+ @Value("${actors.rule.node.error_persist_frequency}")
+ @Getter
+ private long ruleNodeErrorPersistFrequency;
@Value("${actors.rule.termination.delay}")
- @Getter private long ruleActorTerminationDelay;
+ @Getter
+ private long ruleActorTerminationDelay;
@Value("${actors.rule.error_persist_frequency}")
- @Getter private long ruleErrorPersistFrequency;
+ @Getter
+ private long ruleErrorPersistFrequency;
@Value("${actors.statistics.enabled}")
- @Getter private boolean statisticsEnabled;
+ @Getter
+ private boolean statisticsEnabled;
@Value("${actors.statistics.persist_frequency}")
- @Getter private long statisticsPersistFrequency;
+ @Getter
+ private long statisticsPersistFrequency;
@Value("${actors.tenant.create_components_on_init}")
- @Getter private boolean tenantComponentsInitEnabled;
+ @Getter
+ private boolean tenantComponentsInitEnabled;
- @Getter @Setter private ActorSystem actorSystem;
+ @Getter
+ @Setter
+ private ActorSystem actorSystem;
- @Getter @Setter private ActorRef appActor;
+ @Getter
+ @Setter
+ private ActorRef appActor;
- @Getter @Setter private ActorRef sessionManagerActor;
+ @Getter
+ @Setter
+ private ActorRef sessionManagerActor;
- @Getter @Setter private ActorRef statsActor;
+ @Getter
+ @Setter
+ private ActorRef statsActor;
- @Getter private final Config config;
+ @Getter
+ private final Config config;
public ActorSystemContext() {
config = ConfigFactory.parseResources(AKKA_CONF_FILE_NAME).withFallback(ConfigFactory.load());
@@ -191,7 +250,7 @@ public class ActorSystemContext {
eventService.save(event);
}
- private String toString(Exception e) {
+ private String toString(Throwable e) {
StringWriter sw = new StringWriter();
e.printStackTrace(new PrintWriter(sw));
return sw.toString();
@@ -211,4 +270,69 @@ public class ActorSystemContext {
private JsonNode toBodyJson(ServerAddress server, String method, String body) {
return mapper.createObjectNode().put("server", server.toString()).put("method", method).put("error", body);
}
+
+ public String getServerAddress() {
+ return discoveryService.getCurrentServer().getServerAddress().toString();
+ }
+
+ public void persistDebugInput(TenantId tenantId, EntityId entityId, TbMsg tbMsg) {
+ persistDebug(tenantId, entityId, "IN", tbMsg, null);
+ }
+
+ public void persistDebugInput(TenantId tenantId, EntityId entityId, TbMsg tbMsg, Throwable error) {
+ persistDebug(tenantId, entityId, "IN", tbMsg, error);
+ }
+
+ public void persistDebugOutput(TenantId tenantId, EntityId entityId, TbMsg tbMsg, Throwable error) {
+ persistDebug(tenantId, entityId, "OUT", tbMsg, error);
+ }
+
+ public void persistDebugOutput(TenantId tenantId, EntityId entityId, TbMsg tbMsg) {
+ persistDebug(tenantId, entityId, "OUT", tbMsg, null);
+ }
+
+ private void persistDebug(TenantId tenantId, EntityId entityId, String type, TbMsg tbMsg, Throwable error) {
+ Event event = new Event();
+ event.setTenantId(tenantId);
+ event.setEntityId(entityId);
+ event.setType(DataConstants.DEBUG);
+
+ ObjectNode node = mapper.createObjectNode()
+ .put("type", type)
+ .put("server", getServerAddress())
+ .put("entityId", tbMsg.getOriginator().getId().toString())
+ .put("entityName", tbMsg.getOriginator().getEntityType().name())
+ .put("msgId", tbMsg.getId().toString())
+ .put("msgType", tbMsg.getType())
+ .put("dataType", tbMsg.getDataType().name());
+
+ ObjectNode mdNode = node.putObject("metadata");
+ tbMsg.getMetaData().getData().forEach(mdNode::put);
+
+ switch (tbMsg.getDataType()) {
+ case BINARY:
+ node.put("data", Base64Utils.encodeUrlSafe(tbMsg.getData()));
+ break;
+ default:
+ node.put("data", new String(tbMsg.getData(), StandardCharsets.UTF_8));
+ break;
+ }
+
+ if (error != null) {
+ node = node.put("error", toString(error));
+ }
+
+ event.setBody(node);
+ eventService.save(event);
+ }
+
+ public static Exception toException(Throwable error) {
+ return Exception.class.isInstance(error) ? (Exception) error : new Exception(error);
+ }
+
+ public ListeningExecutor getExecutor() {
+ //TODO: take thread count from yml.
+ return new JsExecutorService(1);
+ }
+
}
diff --git a/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java b/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java
index b475277..a75158f 100644
--- a/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java
@@ -22,48 +22,41 @@ import akka.event.LoggingAdapter;
import akka.japi.Function;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.actors.plugin.PluginTerminationMsg;
-import org.thingsboard.server.actors.service.ContextAwareActor;
+import org.thingsboard.server.actors.ruleChain.RuleChainManagerActor;
import org.thingsboard.server.actors.service.ContextBasedCreator;
import org.thingsboard.server.actors.service.DefaultActorService;
-import org.thingsboard.server.actors.shared.plugin.PluginManager;
import org.thingsboard.server.actors.shared.plugin.SystemPluginManager;
-import org.thingsboard.server.actors.shared.rule.RuleManager;
-import org.thingsboard.server.actors.shared.rule.SystemRuleManager;
-import org.thingsboard.server.actors.tenant.RuleChainDeviceMsg;
+import org.thingsboard.server.actors.shared.rulechain.SystemRuleChainManager;
import org.thingsboard.server.actors.tenant.TenantActor;
import org.thingsboard.server.common.data.Tenant;
import org.thingsboard.server.common.data.id.PluginId;
-import org.thingsboard.server.common.data.id.RuleId;
+import org.thingsboard.server.common.data.id.RuleChainId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.page.PageDataIterable;
+import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
import org.thingsboard.server.common.msg.device.ToDeviceActorMsg;
import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
+import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg;
import org.thingsboard.server.dao.model.ModelConstants;
import org.thingsboard.server.dao.tenant.TenantService;
import org.thingsboard.server.extensions.api.device.ToDeviceActorNotificationMsg;
import org.thingsboard.server.extensions.api.plugins.msg.ToPluginActorMsg;
-import org.thingsboard.server.extensions.api.rules.ToRuleActorMsg;
import scala.concurrent.duration.Duration;
import java.util.HashMap;
import java.util.Map;
-import java.util.Optional;
-public class AppActor extends ContextAwareActor {
+public class AppActor extends RuleChainManagerActor {
private final LoggingAdapter logger = Logging.getLogger(getContext().system(), this);
public static final TenantId SYSTEM_TENANT = new TenantId(ModelConstants.NULL_UUID);
- private final RuleManager ruleManager;
- private final PluginManager pluginManager;
private final TenantService tenantService;
private final Map<TenantId, ActorRef> tenantActors;
private AppActor(ActorSystemContext systemContext) {
- super(systemContext);
- this.ruleManager = new SystemRuleManager(systemContext);
- this.pluginManager = new SystemPluginManager(systemContext);
+ super(systemContext, new SystemRuleChainManager(systemContext), new SystemPluginManager(systemContext));
this.tenantService = systemContext.getTenantService();
this.tenantActors = new HashMap<>();
}
@@ -77,8 +70,7 @@ public class AppActor extends ContextAwareActor {
public void preStart() {
logger.info("Starting main system actor.");
try {
- ruleManager.init(this.context());
- pluginManager.init(this.context());
+ initRuleChains();
if (systemContext.isTenantComponentsInitEnabled()) {
PageDataIterable<Tenant> tenantIterator = new PageDataIterable<>(tenantService::findTenants, ENTITY_PACK_LIMIT);
@@ -96,29 +88,51 @@ public class AppActor extends ContextAwareActor {
}
@Override
- public void onReceive(Object msg) throws Exception {
- logger.debug("Received message: {}", msg);
- if (msg instanceof ToDeviceActorMsg) {
- processDeviceMsg((ToDeviceActorMsg) msg);
- } else if (msg instanceof ToPluginActorMsg) {
- onToPluginMsg((ToPluginActorMsg) msg);
- } else if (msg instanceof ToRuleActorMsg) {
- onToRuleMsg((ToRuleActorMsg) msg);
- } else if (msg instanceof ToDeviceActorNotificationMsg) {
- onToDeviceActorMsg((ToDeviceActorNotificationMsg) msg);
- } else if (msg instanceof Terminated) {
- processTermination((Terminated) msg);
- } else if (msg instanceof ClusterEventMsg) {
- broadcast(msg);
- } else if (msg instanceof ComponentLifecycleMsg) {
- onComponentLifecycleMsg((ComponentLifecycleMsg) msg);
- } else if (msg instanceof PluginTerminationMsg) {
- onPluginTerminated((PluginTerminationMsg) msg);
+ protected boolean process(TbActorMsg msg) {
+ switch (msg.getMsgType()) {
+ case COMPONENT_LIFE_CYCLE_MSG:
+ onComponentLifecycleMsg((ComponentLifecycleMsg) msg);
+ break;
+ case SERVICE_TO_RULE_ENGINE_MSG:
+ onServiceToRuleEngineMsg((ServiceToRuleEngineMsg) msg);
+ break;
+ default:
+ return false;
+ }
+ return true;
+ }
+
+ private void onServiceToRuleEngineMsg(ServiceToRuleEngineMsg msg) {
+ if (SYSTEM_TENANT.equals(msg.getTenantId())) {
+ //TODO: ashvayka handle this.
} else {
- logger.warning("Unknown message: {}!", msg);
+ getOrCreateTenantActor(msg.getTenantId()).tell(msg, self());
}
}
+
+// @Override
+// public void onReceive(Object msg) throws Exception {
+// logger.debug("Received message: {}", msg);
+// if (msg instanceof ToDeviceActorMsg) {
+// processDeviceMsg((ToDeviceActorMsg) msg);
+// } else if (msg instanceof ToPluginActorMsg) {
+// onToPluginMsg((ToPluginActorMsg) msg);
+// } else if (msg instanceof ToDeviceActorNotificationMsg) {
+// onToDeviceActorMsg((ToDeviceActorNotificationMsg) msg);
+// } else if (msg instanceof Terminated) {
+// processTermination((Terminated) msg);
+// } else if (msg instanceof ClusterEventMsg) {
+// broadcast(msg);
+// } else if (msg instanceof ComponentLifecycleMsg) {
+// onComponentLifecycleMsg((ComponentLifecycleMsg) msg);
+// } else if (msg instanceof PluginTerminationMsg) {
+// onPluginTerminated((PluginTerminationMsg) msg);
+// } else {
+// logger.warning("Unknown message: {}!", msg);
+// }
+// }
+
private void onPluginTerminated(PluginTerminationMsg msg) {
pluginManager.remove(msg.getId());
}
@@ -128,20 +142,10 @@ public class AppActor extends ContextAwareActor {
tenantActors.values().forEach(actorRef -> actorRef.tell(msg, ActorRef.noSender()));
}
- private void onToRuleMsg(ToRuleActorMsg msg) {
- ActorRef target;
- if (SYSTEM_TENANT.equals(msg.getTenantId())) {
- target = ruleManager.getOrCreateRuleActor(this.context(), msg.getRuleId());
- } else {
- target = getOrCreateTenantActor(msg.getTenantId());
- }
- target.tell(msg, ActorRef.noSender());
- }
-
private void onToPluginMsg(ToPluginActorMsg msg) {
ActorRef target;
if (SYSTEM_TENANT.equals(msg.getPluginTenantId())) {
- target = pluginManager.getOrCreatePluginActor(this.context(), msg.getPluginId());
+ target = pluginManager.getOrCreateActor(this.context(), msg.getPluginId());
} else {
target = getOrCreateTenantActor(msg.getPluginTenantId());
}
@@ -149,26 +153,16 @@ public class AppActor extends ContextAwareActor {
}
private void onComponentLifecycleMsg(ComponentLifecycleMsg msg) {
- ActorRef target = null;
+ ActorRef target;
if (SYSTEM_TENANT.equals(msg.getTenantId())) {
- Optional<PluginId> pluginId = msg.getPluginId();
- Optional<RuleId> ruleId = msg.getRuleId();
- if (pluginId.isPresent()) {
- target = pluginManager.getOrCreatePluginActor(this.context(), pluginId.get());
- } else if (ruleId.isPresent()) {
- Optional<ActorRef> ref = ruleManager.update(this.context(), ruleId.get(), msg.getEvent());
- if (ref.isPresent()) {
- target = ref.get();
- } else {
- logger.debug("Failed to find actor for rule: [{}]", ruleId);
- return;
- }
- }
+ target = getEntityActorRef(msg.getEntityId());
} else {
target = getOrCreateTenantActor(msg.getTenantId());
}
if (target != null) {
target.tell(msg, ActorRef.noSender());
+ } else {
+ logger.debug("Invalid component lifecycle msg: {}", msg);
}
}
@@ -180,7 +174,7 @@ public class AppActor extends ContextAwareActor {
TenantId tenantId = toDeviceActorMsg.getTenantId();
ActorRef tenantActor = getOrCreateTenantActor(tenantId);
if (toDeviceActorMsg.getPayload().getMsgType().requiresRulesProcessing()) {
- tenantActor.tell(new RuleChainDeviceMsg(toDeviceActorMsg, ruleManager.getRuleChain(this.context())), context().self());
+// tenantActor.tell(new RuleChainDeviceMsg(toDeviceActorMsg, ruleManager.getRuleChain(this.context())), context().self());
} else {
tenantActor.tell(toDeviceActorMsg, context().self());
}
diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActor.java
index 861c405..87bc992 100644
--- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActor.java
@@ -18,19 +18,19 @@ package org.thingsboard.server.actors.device;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import org.thingsboard.server.actors.ActorSystemContext;
-import org.thingsboard.server.actors.rule.RulesProcessedMsg;
import org.thingsboard.server.actors.service.ContextAwareActor;
import org.thingsboard.server.actors.service.ContextBasedCreator;
-import org.thingsboard.server.actors.tenant.RuleChainDeviceMsg;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.TenantId;
+import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
import org.thingsboard.server.common.msg.device.ToDeviceActorMsg;
import org.thingsboard.server.extensions.api.device.DeviceAttributesEventNotificationMsg;
import org.thingsboard.server.extensions.api.device.DeviceCredentialsUpdateNotificationMsg;
import org.thingsboard.server.extensions.api.device.DeviceNameOrTypeUpdateMsg;
import org.thingsboard.server.extensions.api.device.ToDeviceActorNotificationMsg;
-import org.thingsboard.server.extensions.api.plugins.msg.*;
+import org.thingsboard.server.extensions.api.plugins.msg.TimeoutMsg;
+import org.thingsboard.server.extensions.api.plugins.msg.ToDeviceRpcRequestPluginMsg;
public class DeviceActor extends ContextAwareActor {
@@ -48,12 +48,17 @@ public class DeviceActor extends ContextAwareActor {
}
@Override
+ protected boolean process(TbActorMsg msg) {
+ return false;
+ }
+
+ @Override
public void onReceive(Object msg) throws Exception {
- if (msg instanceof RuleChainDeviceMsg) {
- processor.process(context(), (RuleChainDeviceMsg) msg);
- } else if (msg instanceof RulesProcessedMsg) {
- processor.onRulesProcessedMsg(context(), (RulesProcessedMsg) msg);
- } else if (msg instanceof ToDeviceActorMsg) {
+// if (msg instanceof RuleChainDeviceMsg) {
+// processor.process(context(), (RuleChainDeviceMsg) msg);
+// } else if (msg instanceof RulesProcessedMsg) {
+// processor.onRulesProcessedMsg(context(), (RulesProcessedMsg) msg);
+ if (msg instanceof ToDeviceActorMsg) {
processor.process(context(), (ToDeviceActorMsg) msg);
} else if (msg instanceof ToDeviceActorNotificationMsg) {
if (msg instanceof DeviceAttributesEventNotificationMsg) {
diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
index 21112bf..3644a49 100644
--- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
@@ -19,9 +19,7 @@ import akka.actor.ActorContext;
import akka.actor.ActorRef;
import akka.event.LoggingAdapter;
import org.thingsboard.server.actors.ActorSystemContext;
-import org.thingsboard.server.actors.rule.*;
import org.thingsboard.server.actors.shared.AbstractContextAwareMsgProcessor;
-import org.thingsboard.server.actors.tenant.RuleChainDeviceMsg;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.id.DeviceId;
@@ -37,15 +35,10 @@ import org.thingsboard.server.common.msg.session.FromDeviceMsg;
import org.thingsboard.server.common.msg.session.MsgType;
import org.thingsboard.server.common.msg.session.SessionType;
import org.thingsboard.server.common.msg.session.ToDeviceMsg;
-import org.thingsboard.server.extensions.api.device.*;
-import org.thingsboard.server.extensions.api.plugins.msg.FromDeviceRpcResponse;
-import org.thingsboard.server.extensions.api.plugins.msg.RpcError;
-import org.thingsboard.server.extensions.api.plugins.msg.TimeoutIntMsg;
-import org.thingsboard.server.extensions.api.plugins.msg.TimeoutMsg;
-import org.thingsboard.server.extensions.api.plugins.msg.ToDeviceRpcRequest;
-import org.thingsboard.server.extensions.api.plugins.msg.ToDeviceRpcRequestBody;
-import org.thingsboard.server.extensions.api.plugins.msg.ToDeviceRpcRequestPluginMsg;
-import org.thingsboard.server.extensions.api.plugins.msg.ToPluginRpcResponseDeviceMsg;
+import org.thingsboard.server.extensions.api.device.DeviceAttributes;
+import org.thingsboard.server.extensions.api.device.DeviceAttributesEventNotificationMsg;
+import org.thingsboard.server.extensions.api.device.DeviceNameOrTypeUpdateMsg;
+import org.thingsboard.server.extensions.api.plugins.msg.*;
import java.util.*;
import java.util.concurrent.ExecutionException;
@@ -230,18 +223,18 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
}
}
- void process(ActorContext context, RuleChainDeviceMsg srcMsg) {
- ChainProcessingMetaData md = new ChainProcessingMetaData(srcMsg.getRuleChain(),
- srcMsg.getToDeviceActorMsg(), new DeviceMetaData(deviceId, deviceName, deviceType, deviceAttributes), context.self());
- ChainProcessingContext ctx = new ChainProcessingContext(md);
- if (ctx.getChainLength() > 0) {
- RuleProcessingMsg msg = new RuleProcessingMsg(ctx);
- ActorRef ruleActorRef = ctx.getCurrentActor();
- ruleActorRef.tell(msg, ActorRef.noSender());
- } else {
- context.self().tell(new RulesProcessedMsg(ctx), context.self());
- }
- }
+// void process(ActorContext context, RuleChainDeviceMsg srcMsg) {
+// ChainProcessingMetaData md = new ChainProcessingMetaData(srcMsg.getRuleChain(),
+// srcMsg.getToDeviceActorMsg(), new DeviceMetaData(deviceId, deviceName, deviceType, deviceAttributes), context.self());
+// ChainProcessingContext ctx = new ChainProcessingContext(md);
+// if (ctx.getChainLength() > 0) {
+// RuleProcessingMsg msg = new RuleProcessingMsg(ctx);
+// ActorRef ruleActorRef = ctx.getCurrentActor();
+// ruleActorRef.tell(msg, ActorRef.noSender());
+// } else {
+// context.self().tell(new RulesProcessedMsg(ctx), context.self());
+// }
+// }
void processRpcResponses(ActorContext context, ToDeviceActorMsg msg) {
SessionId sessionId = msg.getSessionId();
@@ -302,18 +295,18 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
);
}
- void onRulesProcessedMsg(ActorContext context, RulesProcessedMsg msg) {
- ChainProcessingContext ctx = msg.getCtx();
- ToDeviceActorMsg inMsg = ctx.getInMsg();
- SessionId sid = inMsg.getSessionId();
- ToDeviceSessionActorMsg response;
- if (ctx.getResponse() != null) {
- response = new BasicToDeviceSessionActorMsg(ctx.getResponse(), sid);
- } else {
- response = new BasicToDeviceSessionActorMsg(ctx.getError(), sid);
- }
- sendMsgToSessionActor(response, inMsg.getServerAddress());
- }
+// void onRulesProcessedMsg(ActorContext context, RulesProcessedMsg msg) {
+// ChainProcessingContext ctx = msg.getCtx();
+// ToDeviceActorMsg inMsg = ctx.getInMsg();
+// SessionId sid = inMsg.getSessionId();
+// ToDeviceSessionActorMsg response;
+// if (ctx.getResponse() != null) {
+// response = new BasicToDeviceSessionActorMsg(ctx.getResponse(), sid);
+// } else {
+// response = new BasicToDeviceSessionActorMsg(ctx.getError(), sid);
+// }
+// sendMsgToSessionActor(response, inMsg.getServerAddress());
+// }
private void processSubscriptionCommands(ActorContext context, ToDeviceActorMsg msg) {
SessionId sessionId = msg.getSessionId();
diff --git a/application/src/main/java/org/thingsboard/server/actors/plugin/PluginActor.java b/application/src/main/java/org/thingsboard/server/actors/plugin/PluginActor.java
index 265da38..88278f3 100644
--- a/application/src/main/java/org/thingsboard/server/actors/plugin/PluginActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/plugin/PluginActor.java
@@ -23,6 +23,7 @@ import org.thingsboard.server.actors.service.ContextBasedCreator;
import org.thingsboard.server.actors.stats.StatsPersistTick;
import org.thingsboard.server.common.data.id.PluginId;
import org.thingsboard.server.common.data.id.TenantId;
+import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
import org.thingsboard.server.extensions.api.plugins.msg.TimeoutMsg;
@@ -41,6 +42,12 @@ public class PluginActor extends ComponentActor<PluginId, PluginActorMessageProc
}
@Override
+ protected boolean process(TbActorMsg msg) {
+ //TODO Move everything here, to work with TbActorMsg
+ return false;
+ }
+
+ @Override
public void onReceive(Object msg) throws Exception {
if (msg instanceof PluginWebsocketMsg) {
onWebsocketMsg((PluginWebsocketMsg<?>) msg);
diff --git a/application/src/main/java/org/thingsboard/server/actors/plugin/PluginActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/plugin/PluginActorMessageProcessor.java
index 6e78e20..09aaf80 100644
--- a/application/src/main/java/org/thingsboard/server/actors/plugin/PluginActorMessageProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/plugin/PluginActorMessageProcessor.java
@@ -57,7 +57,7 @@ public class PluginActorMessageProcessor extends ComponentMsgProcessor<PluginId>
}
@Override
- public void start() throws Exception {
+ public void start(ActorContext context) throws Exception {
logger.info("[{}] Going to start plugin actor.", entityId);
pluginMd = systemContext.getPluginService().findPluginById(entityId);
if (pluginMd == null) {
@@ -76,7 +76,7 @@ public class PluginActorMessageProcessor extends ComponentMsgProcessor<PluginId>
}
@Override
- public void stop() throws Exception {
+ public void stop(ActorContext context) throws Exception {
onStop();
}
@@ -191,7 +191,7 @@ public class PluginActorMessageProcessor extends ComponentMsgProcessor<PluginId>
if (pluginImpl != null) {
pluginImpl.stop(trustedCtx);
}
- start();
+ start(context);
}
}
@@ -217,7 +217,7 @@ public class PluginActorMessageProcessor extends ComponentMsgProcessor<PluginId>
pluginImpl.resume(trustedCtx);
logger.info("[{}] Plugin resumed.", entityId);
} else {
- start();
+ start(context);
}
}
diff --git a/application/src/main/java/org/thingsboard/server/actors/rpc/RpcManagerActor.java b/application/src/main/java/org/thingsboard/server/actors/rpc/RpcManagerActor.java
index 9290a8f..ba20013 100644
--- a/application/src/main/java/org/thingsboard/server/actors/rpc/RpcManagerActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/rpc/RpcManagerActor.java
@@ -23,6 +23,7 @@ import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.actors.service.ContextAwareActor;
import org.thingsboard.server.actors.service.ContextBasedCreator;
import org.thingsboard.server.actors.service.DefaultActorService;
+import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
import org.thingsboard.server.common.msg.cluster.ServerAddress;
import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
@@ -57,6 +58,12 @@ public class RpcManagerActor extends ContextAwareActor {
}
@Override
+ protected boolean process(TbActorMsg msg) {
+ //TODO Move everything here, to work with TbActorMsg
+ return false;
+ }
+
+ @Override
public void onReceive(Object msg) throws Exception {
if (msg instanceof RpcSessionTellMsg) {
onMsg((RpcSessionTellMsg) msg);
diff --git a/application/src/main/java/org/thingsboard/server/actors/rpc/RpcSessionActor.java b/application/src/main/java/org/thingsboard/server/actors/rpc/RpcSessionActor.java
index db029fa..a187444 100644
--- a/application/src/main/java/org/thingsboard/server/actors/rpc/RpcSessionActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/rpc/RpcSessionActor.java
@@ -23,6 +23,7 @@ import io.grpc.stub.StreamObserver;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.actors.service.ContextAwareActor;
import org.thingsboard.server.actors.service.ContextBasedCreator;
+import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.cluster.ServerAddress;
import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
import org.thingsboard.server.gen.cluster.ClusterRpcServiceGrpc;
@@ -48,6 +49,12 @@ public class RpcSessionActor extends ContextAwareActor {
}
@Override
+ protected boolean process(TbActorMsg msg) {
+ //TODO Move everything here, to work with TbActorMsg
+ return false;
+ }
+
+ @Override
public void onReceive(Object msg) throws Exception {
if (msg instanceof RpcSessionTellMsg) {
tell((RpcSessionTellMsg) msg);
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
new file mode 100644
index 0000000..012a09f
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
@@ -0,0 +1,154 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.actors.ruleChain;
+
+import org.thingsboard.rule.engine.api.ListeningExecutor;
+import org.thingsboard.rule.engine.api.TbContext;
+import org.thingsboard.server.actors.ActorSystemContext;
+import org.thingsboard.server.common.msg.TbMsg;
+import org.thingsboard.server.common.msg.cluster.ServerAddress;
+import org.thingsboard.server.dao.alarm.AlarmService;
+import org.thingsboard.server.dao.asset.AssetService;
+import org.thingsboard.server.dao.attributes.AttributesService;
+import org.thingsboard.server.dao.customer.CustomerService;
+import org.thingsboard.server.dao.device.DeviceService;
+import org.thingsboard.server.dao.plugin.PluginService;
+import org.thingsboard.server.dao.relation.RelationService;
+import org.thingsboard.server.dao.rule.RuleChainService;
+import org.thingsboard.server.dao.timeseries.TimeseriesService;
+import org.thingsboard.server.dao.user.UserService;
+
+import java.util.Set;
+
+/**
+ * Created by ashvayka on 19.03.18.
+ */
+class DefaultTbContext implements TbContext {
+
+ private final ActorSystemContext mainCtx;
+ private final RuleNodeCtx nodeCtx;
+
+ public DefaultTbContext(ActorSystemContext mainCtx, RuleNodeCtx nodeCtx) {
+ this.mainCtx = mainCtx;
+ this.nodeCtx = nodeCtx;
+ }
+
+ @Override
+ public void tellNext(TbMsg msg) {
+ tellNext(msg, (String) null);
+ }
+
+ @Override
+ public void tellNext(TbMsg msg, String relationType) {
+ if (nodeCtx.getSelf().isDebugMode()) {
+ mainCtx.persistDebugOutput(nodeCtx.getTenantId(), nodeCtx.getSelf().getId(), msg);
+ }
+ nodeCtx.getChainActor().tell(new RuleNodeToRuleChainTellNextMsg(nodeCtx.getSelf().getId(), relationType, msg), nodeCtx.getSelfActor());
+ }
+
+ @Override
+ public void tellSelf(TbMsg msg, long delayMs) {
+ throw new RuntimeException("Not Implemented!");
+ }
+
+ @Override
+ public void tellOthers(TbMsg msg) {
+ throw new RuntimeException("Not Implemented!");
+ }
+
+ @Override
+ public void tellSibling(TbMsg msg, ServerAddress address) {
+ throw new RuntimeException("Not Implemented!");
+ }
+
+ @Override
+ public void spawn(TbMsg msg) {
+ throw new RuntimeException("Not Implemented!");
+ }
+
+ @Override
+ public void ack(TbMsg msg) {
+
+ }
+
+ @Override
+ public void tellError(TbMsg msg, Throwable th) {
+ if (nodeCtx.getSelf().isDebugMode()) {
+ mainCtx.persistDebugOutput(nodeCtx.getTenantId(), nodeCtx.getSelf().getId(), msg, th);
+ }
+ nodeCtx.getSelfActor().tell(new RuleNodeToSelfErrorMsg(msg, th), nodeCtx.getSelfActor());
+ }
+
+ @Override
+ public void tellNext(TbMsg msg, Set<String> relationTypes) {
+ relationTypes.forEach(type -> tellNext(msg, type));
+ }
+
+ @Override
+ public ListeningExecutor getJsExecutor() {
+ return mainCtx.getExecutor();
+ }
+
+ @Override
+ public AttributesService getAttributesService() {
+ return mainCtx.getAttributesService();
+ }
+
+ @Override
+ public CustomerService getCustomerService() {
+ return mainCtx.getCustomerService();
+ }
+
+ @Override
+ public UserService getUserService() {
+ return mainCtx.getUserService();
+ }
+
+ @Override
+ public PluginService getPluginService() {
+ return mainCtx.getPluginService();
+ }
+
+ @Override
+ public AssetService getAssetService() {
+ return mainCtx.getAssetService();
+ }
+
+ @Override
+ public DeviceService getDeviceService() {
+ return mainCtx.getDeviceService();
+ }
+
+ @Override
+ public AlarmService getAlarmService() {
+ return mainCtx.getAlarmService();
+ }
+
+ @Override
+ public RuleChainService getRuleChainService() {
+ return mainCtx.getRuleChainService();
+ }
+
+ @Override
+ public TimeseriesService getTimeseriesService() {
+ return mainCtx.getTsService();
+ }
+
+ @Override
+ public RelationService getRelationService() {
+ return mainCtx.getRelationService();
+ }
+}
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActor.java
new file mode 100644
index 0000000..f539e32
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActor.java
@@ -0,0 +1,88 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.actors.ruleChain;
+
+import akka.actor.OneForOneStrategy;
+import akka.actor.SupervisorStrategy;
+import org.thingsboard.server.actors.ActorSystemContext;
+import org.thingsboard.server.actors.service.ComponentActor;
+import org.thingsboard.server.actors.service.ContextBasedCreator;
+import org.thingsboard.server.common.data.id.RuleChainId;
+import org.thingsboard.server.common.data.id.TenantId;
+import org.thingsboard.server.common.msg.TbActorMsg;
+import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
+import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg;
+import scala.concurrent.duration.Duration;
+
+public class RuleChainActor extends ComponentActor<RuleChainId, RuleChainActorMessageProcessor> {
+
+ private RuleChainActor(ActorSystemContext systemContext, TenantId tenantId, RuleChainId ruleChainId) {
+ super(systemContext, tenantId, ruleChainId);
+ setProcessor(new RuleChainActorMessageProcessor(tenantId, ruleChainId, systemContext,
+ logger, context().parent(), context().self()));
+ }
+
+ @Override
+ protected boolean process(TbActorMsg msg) {
+ switch (msg.getMsgType()) {
+ case COMPONENT_LIFE_CYCLE_MSG:
+ onComponentLifecycleMsg((ComponentLifecycleMsg) msg);
+ break;
+ case SERVICE_TO_RULE_ENGINE_MSG:
+ processor.onServiceToRuleEngineMsg((ServiceToRuleEngineMsg) msg);
+ break;
+ case RULE_TO_RULE_CHAIN_TELL_NEXT_MSG:
+ processor.onTellNext((RuleNodeToRuleChainTellNextMsg) msg);
+ break;
+ default:
+ return false;
+ }
+ return true;
+ }
+
+ public static class ActorCreator extends ContextBasedCreator<RuleChainActor> {
+ private static final long serialVersionUID = 1L;
+
+ private final TenantId tenantId;
+ private final RuleChainId ruleChainId;
+
+ public ActorCreator(ActorSystemContext context, TenantId tenantId, RuleChainId pluginId) {
+ super(context);
+ this.tenantId = tenantId;
+ this.ruleChainId = pluginId;
+ }
+
+ @Override
+ public RuleChainActor create() throws Exception {
+ return new RuleChainActor(context, tenantId, ruleChainId);
+ }
+ }
+
+ @Override
+ protected long getErrorPersistFrequency() {
+ return systemContext.getRuleChainErrorPersistFrequency();
+ }
+
+ @Override
+ public SupervisorStrategy supervisorStrategy() {
+ return strategy;
+ }
+
+ private final SupervisorStrategy strategy = new OneForOneStrategy(3, Duration.create("1 minute"), t -> {
+ logAndPersist("Unknown Failure", ActorSystemContext.toException(t));
+ return SupervisorStrategy.resume();
+ });
+}
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
new file mode 100644
index 0000000..a853b15
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
@@ -0,0 +1,194 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.actors.ruleChain;
+
+import akka.actor.ActorContext;
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import akka.event.LoggingAdapter;
+import org.thingsboard.server.actors.ActorSystemContext;
+import org.thingsboard.server.actors.service.DefaultActorService;
+import org.thingsboard.server.actors.shared.ComponentMsgProcessor;
+import org.thingsboard.server.common.data.EntityType;
+import org.thingsboard.server.common.data.id.EntityId;
+import org.thingsboard.server.common.data.id.RuleChainId;
+import org.thingsboard.server.common.data.id.RuleNodeId;
+import org.thingsboard.server.common.data.id.TenantId;
+import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
+import org.thingsboard.server.common.data.plugin.ComponentLifecycleState;
+import org.thingsboard.server.common.data.relation.EntityRelation;
+import org.thingsboard.server.common.data.rule.RuleChain;
+import org.thingsboard.server.common.data.rule.RuleNode;
+import org.thingsboard.server.common.msg.TbMsg;
+import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
+import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
+import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg;
+import org.thingsboard.server.dao.rule.RuleChainService;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * @author Andrew Shvayka
+ */
+public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleChainId> {
+
+ private final ActorRef parent;
+ private final ActorRef self;
+ private final Map<RuleNodeId, RuleNodeCtx> nodeActors;
+ private final Map<RuleNodeId, List<RuleNodeRelation>> nodeRoutes;
+ private final RuleChainService service;
+
+ private RuleNodeId firstId;
+ private RuleNodeCtx firstNode;
+
+ RuleChainActorMessageProcessor(TenantId tenantId, RuleChainId ruleChainId, ActorSystemContext systemContext
+ , LoggingAdapter logger, ActorRef parent, ActorRef self) {
+ super(systemContext, logger, tenantId, ruleChainId);
+ this.parent = parent;
+ this.self = self;
+ this.nodeActors = new HashMap<>();
+ this.nodeRoutes = new HashMap<>();
+ this.service = systemContext.getRuleChainService();
+ }
+
+ @Override
+ public void start(ActorContext context) throws Exception {
+ RuleChain ruleChain = service.findRuleChainById(entityId);
+ List<RuleNode> ruleNodeList = service.getRuleChainNodes(entityId);
+ // Creating and starting the actors;
+ for (RuleNode ruleNode : ruleNodeList) {
+ ActorRef ruleNodeActor = createRuleNodeActor(context, ruleNode);
+ nodeActors.put(ruleNode.getId(), new RuleNodeCtx(tenantId, self, ruleNodeActor, ruleNode));
+ }
+ initRoutes(ruleChain, ruleNodeList);
+ }
+
+ @Override
+ public void onUpdate(ActorContext context) throws Exception {
+ RuleChain ruleChain = service.findRuleChainById(entityId);
+ List<RuleNode> ruleNodeList = service.getRuleChainNodes(entityId);
+
+ for (RuleNode ruleNode : ruleNodeList) {
+ RuleNodeCtx existing = nodeActors.get(ruleNode.getId());
+ if (existing == null) {
+ ActorRef ruleNodeActor = createRuleNodeActor(context, ruleNode);
+ nodeActors.put(ruleNode.getId(), new RuleNodeCtx(tenantId, self, ruleNodeActor, ruleNode));
+ } else {
+ existing.getSelfActor().tell(new ComponentLifecycleMsg(tenantId, existing.getSelf().getId(), ComponentLifecycleEvent.UPDATED), self);
+ }
+ }
+
+ Set<RuleNodeId> existingNodes = ruleNodeList.stream().map(RuleNode::getId).collect(Collectors.toSet());
+ List<RuleNodeId> removedRules = nodeActors.keySet().stream().filter(node -> !existingNodes.contains(node)).collect(Collectors.toList());
+ removedRules.forEach(ruleNodeId -> {
+ RuleNodeCtx removed = nodeActors.remove(ruleNodeId);
+ removed.getSelfActor().tell(new ComponentLifecycleMsg(tenantId, removed.getSelf().getId(), ComponentLifecycleEvent.DELETED), self);
+ });
+
+ initRoutes(ruleChain, ruleNodeList);
+ }
+
+ @Override
+ public void stop(ActorContext context) throws Exception {
+ nodeActors.values().stream().map(RuleNodeCtx::getSelfActor).forEach(context::stop);
+ nodeActors.clear();
+ nodeRoutes.clear();
+ context.stop(self);
+ }
+
+ @Override
+ public void onClusterEventMsg(ClusterEventMsg msg) throws Exception {
+
+ }
+
+ private ActorRef createRuleNodeActor(ActorContext context, RuleNode ruleNode) {
+ String dispatcherName = tenantId.getId().equals(EntityId.NULL_UUID) ?
+ DefaultActorService.SYSTEM_RULE_DISPATCHER_NAME : DefaultActorService.TENANT_RULE_DISPATCHER_NAME;
+ return context.actorOf(
+ Props.create(new RuleNodeActor.ActorCreator(systemContext, tenantId, entityId, ruleNode.getId()))
+ .withDispatcher(dispatcherName), ruleNode.getId().toString());
+ }
+
+ private void initRoutes(RuleChain ruleChain, List<RuleNode> ruleNodeList) {
+ nodeRoutes.clear();
+ // Populating the routes map;
+ for (RuleNode ruleNode : ruleNodeList) {
+ List<EntityRelation> relations = service.getRuleNodeRelations(ruleNode.getId());
+ for (EntityRelation relation : relations) {
+ if (relation.getTo().getEntityType() == EntityType.RULE_NODE) {
+ RuleNodeCtx ruleNodeCtx = nodeActors.get(new RuleNodeId(relation.getTo().getId()));
+ if (ruleNodeCtx == null) {
+ throw new IllegalArgumentException("Rule Node [" + relation.getFrom() + "] has invalid relation to Rule node [" + relation.getTo() + "]");
+ }
+ }
+ nodeRoutes.computeIfAbsent(ruleNode.getId(), k -> new ArrayList<>())
+ .add(new RuleNodeRelation(ruleNode.getId(), relation.getTo(), relation.getType()));
+ }
+ }
+
+ firstId = ruleChain.getFirstRuleNodeId();
+ firstNode = nodeActors.get(ruleChain.getFirstRuleNodeId());
+ state = ComponentLifecycleState.ACTIVE;
+ }
+
+ void onServiceToRuleEngineMsg(ServiceToRuleEngineMsg envelope) {
+ checkActive();
+ TbMsg tbMsg = envelope.getTbMsg();
+ //TODO: push to queue and act on ack in async way
+ pushMstToNode(firstNode, tbMsg);
+ }
+
+ void onTellNext(RuleNodeToRuleChainTellNextMsg envelope) {
+ checkActive();
+ RuleNodeId originator = envelope.getOriginator();
+ String targetRelationType = envelope.getRelationType();
+ List<RuleNodeRelation> relations = nodeRoutes.get(originator);
+ if (relations == null) {
+ return;
+ }
+ boolean copy = relations.size() > 1;
+ for (RuleNodeRelation relation : relations) {
+ TbMsg msg = envelope.getMsg();
+ if (copy) {
+ msg = msg.copy();
+ }
+ if (targetRelationType == null || targetRelationType.equalsIgnoreCase(relation.getType())) {
+ switch (relation.getOut().getEntityType()) {
+ case RULE_NODE:
+ RuleNodeId targetRuleNodeId = new RuleNodeId(relation.getOut().getId());
+ RuleNodeCtx targetRuleNode = nodeActors.get(targetRuleNodeId);
+ pushMstToNode(targetRuleNode, msg);
+ break;
+ case RULE_CHAIN:
+// TODO: implement
+ break;
+ }
+ }
+ }
+ }
+
+ private void pushMstToNode(RuleNodeCtx nodeCtx, TbMsg msg) {
+ if (nodeCtx != null) {
+ nodeCtx.getSelfActor().tell(new RuleChainToRuleNodeMsg(new DefaultTbContext(systemContext, nodeCtx), msg), self);
+ }
+ }
+
+}
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainManagerActor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainManagerActor.java
new file mode 100644
index 0000000..940bd5b
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainManagerActor.java
@@ -0,0 +1,61 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.actors.ruleChain;
+
+import akka.actor.ActorRef;
+import org.thingsboard.server.actors.ActorSystemContext;
+import org.thingsboard.server.actors.service.ContextAwareActor;
+import org.thingsboard.server.actors.shared.plugin.PluginManager;
+import org.thingsboard.server.actors.shared.rulechain.RuleChainManager;
+import org.thingsboard.server.common.data.id.EntityId;
+import org.thingsboard.server.common.data.id.PluginId;
+import org.thingsboard.server.common.data.id.RuleChainId;
+import org.thingsboard.server.dao.rule.RuleChainService;
+
+/**
+ * Created by ashvayka on 15.03.18.
+ */
+public abstract class RuleChainManagerActor extends ContextAwareActor {
+
+ protected final RuleChainManager ruleChainManager;
+ protected final PluginManager pluginManager;
+ protected final RuleChainService ruleChainService;
+
+ public RuleChainManagerActor(ActorSystemContext systemContext, RuleChainManager ruleChainManager, PluginManager pluginManager) {
+ super(systemContext);
+ this.ruleChainManager = ruleChainManager;
+ this.pluginManager = pluginManager;
+ this.ruleChainService = systemContext.getRuleChainService();
+ }
+
+ protected void initRuleChains() {
+ pluginManager.init(this.context());
+ ruleChainManager.init(this.context());
+ }
+
+ protected ActorRef getEntityActorRef(EntityId entityId) {
+ ActorRef target = null;
+ switch (entityId.getEntityType()) {
+ case PLUGIN:
+ target = pluginManager.getOrCreateActor(this.context(), (PluginId) entityId);
+ break;
+ case RULE_CHAIN:
+ target = ruleChainManager.getOrCreateActor(this.context(), (RuleChainId) entityId);
+ break;
+ }
+ return target;
+ }
+}
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActorMessageProcessor.java
new file mode 100644
index 0000000..a86e2b9
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActorMessageProcessor.java
@@ -0,0 +1,99 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.actors.ruleChain;
+
+import akka.actor.ActorContext;
+import akka.actor.ActorRef;
+import akka.event.LoggingAdapter;
+import org.thingsboard.rule.engine.api.TbNode;
+import org.thingsboard.rule.engine.api.TbNodeConfiguration;
+import org.thingsboard.rule.engine.api.TbNodeState;
+import org.thingsboard.server.actors.ActorSystemContext;
+import org.thingsboard.server.actors.shared.ComponentMsgProcessor;
+import org.thingsboard.server.common.data.id.RuleChainId;
+import org.thingsboard.server.common.data.id.RuleNodeId;
+import org.thingsboard.server.common.data.id.TenantId;
+import org.thingsboard.server.common.data.plugin.ComponentLifecycleState;
+import org.thingsboard.server.common.data.rule.RuleNode;
+import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
+import org.thingsboard.server.dao.rule.RuleChainService;
+
+/**
+ * @author Andrew Shvayka
+ */
+public class RuleNodeActorMessageProcessor extends ComponentMsgProcessor<RuleNodeId> {
+
+ private final ActorRef parent;
+ private final ActorRef self;
+ private final RuleChainService service;
+ private RuleNode ruleNode;
+ private TbNode tbNode;
+
+ RuleNodeActorMessageProcessor(TenantId tenantId, RuleChainId ruleChainId, RuleNodeId ruleNodeId, ActorSystemContext systemContext
+ , LoggingAdapter logger, ActorRef parent, ActorRef self) {
+ super(systemContext, logger, tenantId, ruleNodeId);
+ this.parent = parent;
+ this.self = self;
+ this.service = systemContext.getRuleChainService();
+ this.ruleNode = systemContext.getRuleChainService().findRuleNodeById(entityId);
+ }
+
+ @Override
+ public void start(ActorContext context) throws Exception {
+ tbNode = initComponent(ruleNode);
+ state = ComponentLifecycleState.ACTIVE;
+ }
+
+ @Override
+ public void onUpdate(ActorContext context) throws Exception {
+ RuleNode newRuleNode = systemContext.getRuleChainService().findRuleNodeById(entityId);
+ boolean restartRequired = !(ruleNode.getType().equals(newRuleNode.getType())
+ && ruleNode.getConfiguration().equals(newRuleNode.getConfiguration()));
+ this.ruleNode = newRuleNode;
+ if (restartRequired) {
+ tbNode.destroy();
+ start(context);
+ }
+ }
+
+ @Override
+ public void stop(ActorContext context) throws Exception {
+ tbNode.destroy();
+ context.stop(self);
+ }
+
+ @Override
+ public void onClusterEventMsg(ClusterEventMsg msg) throws Exception {
+
+ }
+
+ void onRuleChainToRuleNodeMsg(RuleChainToRuleNodeMsg msg) throws Exception {
+ checkActive();
+ if (ruleNode.isDebugMode()) {
+ systemContext.persistDebugInput(tenantId, entityId, msg.getMsg());
+ }
+ tbNode.onMsg(msg.getCtx(), msg.getMsg());
+ }
+
+ private TbNode initComponent(RuleNode ruleNode) throws Exception {
+ Class<?> componentClazz = Class.forName(ruleNode.getType());
+ TbNode tbNode = (TbNode) (componentClazz.newInstance());
+ tbNode.init(new TbNodeConfiguration(ruleNode.getConfiguration()), new TbNodeState());
+ return tbNode;
+ }
+
+
+}
diff --git a/application/src/main/java/org/thingsboard/server/actors/service/ActorService.java b/application/src/main/java/org/thingsboard/server/actors/service/ActorService.java
index baae376..0be0385 100644
--- a/application/src/main/java/org/thingsboard/server/actors/service/ActorService.java
+++ b/application/src/main/java/org/thingsboard/server/actors/service/ActorService.java
@@ -15,20 +15,19 @@
*/
package org.thingsboard.server.actors.service;
-import org.thingsboard.server.common.data.id.DeviceId;
-import org.thingsboard.server.common.data.id.PluginId;
-import org.thingsboard.server.common.data.id.RuleId;
-import org.thingsboard.server.common.data.id.TenantId;
+import org.thingsboard.server.common.data.id.*;
import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
+import org.thingsboard.server.common.msg.TbMsg;
+import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg;
import org.thingsboard.server.common.transport.SessionMsgProcessor;
import org.thingsboard.server.service.cluster.discovery.DiscoveryServiceListener;
import org.thingsboard.server.service.cluster.rpc.RpcMsgListener;
public interface ActorService extends SessionMsgProcessor, WebSocketMsgProcessor, RestMsgProcessor, RpcMsgListener, DiscoveryServiceListener {
- void onPluginStateChange(TenantId tenantId, PluginId pluginId, ComponentLifecycleEvent state);
+ void onEntityStateChange(TenantId tenantId, EntityId entityId, ComponentLifecycleEvent state);
- void onRuleStateChange(TenantId tenantId, RuleId ruleId, ComponentLifecycleEvent state);
+ void onMsg(ServiceToRuleEngineMsg msg);
void onCredentialsUpdate(TenantId tenantId, DeviceId deviceId);
diff --git a/application/src/main/java/org/thingsboard/server/actors/service/ComponentActor.java b/application/src/main/java/org/thingsboard/server/actors/service/ComponentActor.java
index 76b9be9..6aa68d3 100644
--- a/application/src/main/java/org/thingsboard/server/actors/service/ComponentActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/service/ComponentActor.java
@@ -54,7 +54,7 @@ public abstract class ComponentActor<T extends EntityId, P extends ComponentMsgP
@Override
public void preStart() {
try {
- processor.start();
+ processor.start(context());
logLifecycleEvent(ComponentLifecycleEvent.STARTED);
if (systemContext.isStatisticsEnabled()) {
scheduleStatsPersistTick();
@@ -78,7 +78,7 @@ public abstract class ComponentActor<T extends EntityId, P extends ComponentMsgP
@Override
public void postStop() {
try {
- processor.stop();
+ processor.stop(context());
logLifecycleEvent(ComponentLifecycleEvent.STOPPED);
} catch (Exception e) {
logger.warning("[{}][{}] Failed to stop {} processor: {}", tenantId, id, id.getEntityType(), e.getMessage());
@@ -141,7 +141,6 @@ public abstract class ComponentActor<T extends EntityId, P extends ComponentMsgP
messagesProcessed++;
}
-
protected void logAndPersist(String method, Exception e) {
logAndPersist(method, e, false);
}
diff --git a/application/src/main/java/org/thingsboard/server/actors/service/ContextAwareActor.java b/application/src/main/java/org/thingsboard/server/actors/service/ContextAwareActor.java
index 825c971..1d9c671 100644
--- a/application/src/main/java/org/thingsboard/server/actors/service/ContextAwareActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/service/ContextAwareActor.java
@@ -16,9 +16,13 @@
package org.thingsboard.server.actors.service;
import akka.actor.UntypedActor;
+import akka.event.Logging;
+import akka.event.LoggingAdapter;
import org.thingsboard.server.actors.ActorSystemContext;
+import org.thingsboard.server.common.msg.TbActorMsg;
public abstract class ContextAwareActor extends UntypedActor {
+ protected final LoggingAdapter logger = Logging.getLogger(getContext().system(), this);
public static final int ENTITY_PACK_LIMIT = 1024;
@@ -28,4 +32,20 @@ public abstract class ContextAwareActor extends UntypedActor {
super();
this.systemContext = systemContext;
}
+
+ @Override
+ public void onReceive(Object msg) throws Exception {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Processing msg: {}", msg);
+ }
+ if (msg instanceof TbActorMsg) {
+ if(!process((TbActorMsg) msg)){
+ logger.warning("Unknown message: {}!", msg);
+ }
+ } else {
+ logger.warning("Unknown message: {}!", msg);
+ }
+ }
+
+ protected abstract boolean process(TbActorMsg msg);
}
diff --git a/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java b/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java
index bb84a30..fbb5a14 100644
--- a/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java
+++ b/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java
@@ -30,16 +30,14 @@ import org.thingsboard.server.actors.rpc.RpcSessionCreateRequestMsg;
import org.thingsboard.server.actors.rpc.RpcSessionTellMsg;
import org.thingsboard.server.actors.session.SessionManagerActor;
import org.thingsboard.server.actors.stats.StatsActor;
-import org.thingsboard.server.common.data.id.DeviceId;
-import org.thingsboard.server.common.data.id.PluginId;
-import org.thingsboard.server.common.data.id.RuleId;
-import org.thingsboard.server.common.data.id.TenantId;
+import org.thingsboard.server.common.data.id.*;
import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
import org.thingsboard.server.common.msg.aware.SessionAwareMsg;
import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
import org.thingsboard.server.common.msg.cluster.ServerAddress;
import org.thingsboard.server.common.msg.cluster.ToAllNodesMsg;
import org.thingsboard.server.common.msg.core.ToDeviceSessionActorMsg;
+import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg;
import org.thingsboard.server.extensions.api.device.DeviceNameOrTypeUpdateMsg;
import org.thingsboard.server.common.msg.device.ToDeviceActorMsg;
import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
@@ -129,6 +127,11 @@ public class DefaultActorService implements ActorService {
}
@Override
+ public void onMsg(ServiceToRuleEngineMsg msg) {
+ appActor.tell(msg, ActorRef.noSender());
+ }
+
+ @Override
public void process(SessionAwareMsg msg) {
log.debug("Processing session aware msg: {}", msg);
sessionManagerActor.tell(msg, ActorRef.noSender());
@@ -212,15 +215,9 @@ public class DefaultActorService implements ActorService {
}
@Override
- public void onPluginStateChange(TenantId tenantId, PluginId pluginId, ComponentLifecycleEvent state) {
- log.trace("[{}] Processing onPluginStateChange event: {}", pluginId, state);
- broadcast(ComponentLifecycleMsg.forPlugin(tenantId, pluginId, state));
- }
-
- @Override
- public void onRuleStateChange(TenantId tenantId, RuleId ruleId, ComponentLifecycleEvent state) {
- log.trace("[{}] Processing onRuleStateChange event: {}", ruleId, state);
- broadcast(ComponentLifecycleMsg.forRule(tenantId, ruleId, state));
+ public void onEntityStateChange(TenantId tenantId, EntityId entityId, ComponentLifecycleEvent state) {
+ log.trace("[{}] Processing {} state change event: {}", tenantId, entityId.getEntityType(), state);
+ broadcast(new ComponentLifecycleMsg(tenantId, entityId, state));
}
@Override
diff --git a/application/src/main/java/org/thingsboard/server/actors/session/SessionActor.java b/application/src/main/java/org/thingsboard/server/actors/session/SessionActor.java
index 37827d6..9d324c5 100644
--- a/application/src/main/java/org/thingsboard/server/actors/session/SessionActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/session/SessionActor.java
@@ -23,6 +23,7 @@ import org.thingsboard.server.actors.service.ContextAwareActor;
import org.thingsboard.server.actors.service.ContextBasedCreator;
import org.thingsboard.server.actors.shared.SessionTimeoutMsg;
import org.thingsboard.server.common.data.id.SessionId;
+import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
import org.thingsboard.server.common.msg.core.ToDeviceSessionActorMsg;
import org.thingsboard.server.common.msg.session.ToDeviceActorSessionMsg;
@@ -61,6 +62,12 @@ public class SessionActor extends ContextAwareActor {
}
@Override
+ protected boolean process(TbActorMsg msg) {
+ //TODO Move everything here, to work with TbActorMsg
+ return false;
+ }
+
+ @Override
public void onReceive(Object msg) throws Exception {
logger.debug("[{}] Processing: {}.", sessionId, msg);
if (msg instanceof ToDeviceActorSessionMsg) {
diff --git a/application/src/main/java/org/thingsboard/server/actors/session/SessionManagerActor.java b/application/src/main/java/org/thingsboard/server/actors/session/SessionManagerActor.java
index 9d67dab..b5b1791 100644
--- a/application/src/main/java/org/thingsboard/server/actors/session/SessionManagerActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/session/SessionManagerActor.java
@@ -26,6 +26,7 @@ import org.thingsboard.server.actors.service.ContextBasedCreator;
import org.thingsboard.server.actors.service.DefaultActorService;
import org.thingsboard.server.actors.shared.SessionTimeoutMsg;
import org.thingsboard.server.common.data.id.SessionId;
+import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.aware.SessionAwareMsg;
import akka.event.Logging;
@@ -49,6 +50,12 @@ public class SessionManagerActor extends ContextAwareActor {
}
@Override
+ protected boolean process(TbActorMsg msg) {
+ //TODO Move everything here, to work with TbActorMsg
+ return false;
+ }
+
+ @Override
public void onReceive(Object msg) throws Exception {
if (msg instanceof SessionCtrlMsg) {
onSessionCtrlMsg((SessionCtrlMsg) msg);
diff --git a/application/src/main/java/org/thingsboard/server/actors/shared/AbstractContextAwareMsgProcessor.java b/application/src/main/java/org/thingsboard/server/actors/shared/AbstractContextAwareMsgProcessor.java
index 73b221f..e1313d2 100644
--- a/application/src/main/java/org/thingsboard/server/actors/shared/AbstractContextAwareMsgProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/shared/AbstractContextAwareMsgProcessor.java
@@ -102,9 +102,6 @@ public abstract class AbstractContextAwareMsgProcessor {
case FILTER:
configurationClazz = ((Filter) componentClazz.getAnnotation(Filter.class)).configuration();
break;
- case PROCESSOR:
- configurationClazz = ((Processor) componentClazz.getAnnotation(Processor.class)).configuration();
- break;
case ACTION:
configurationClazz = ((Action) componentClazz.getAnnotation(Action.class)).configuration();
break;
diff --git a/application/src/main/java/org/thingsboard/server/actors/shared/ComponentMsgProcessor.java b/application/src/main/java/org/thingsboard/server/actors/shared/ComponentMsgProcessor.java
index 18d32d9..e25d3a7 100644
--- a/application/src/main/java/org/thingsboard/server/actors/shared/ComponentMsgProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/shared/ComponentMsgProcessor.java
@@ -20,12 +20,14 @@ import akka.event.LoggingAdapter;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.actors.stats.StatsPersistTick;
import org.thingsboard.server.common.data.id.TenantId;
+import org.thingsboard.server.common.data.plugin.ComponentLifecycleState;
import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
public abstract class ComponentMsgProcessor<T> extends AbstractContextAwareMsgProcessor {
protected final TenantId tenantId;
protected final T entityId;
+ protected ComponentLifecycleState state;
protected ComponentMsgProcessor(ActorSystemContext systemContext, LoggingAdapter logger, TenantId tenantId, T id) {
super(systemContext, logger);
@@ -33,23 +35,44 @@ public abstract class ComponentMsgProcessor<T> extends AbstractContextAwareMsgPr
this.entityId = id;
}
- public abstract void start() throws Exception;
+ public abstract void start(ActorContext context) throws Exception;
- public abstract void stop() throws Exception;
+ public abstract void stop(ActorContext context) throws Exception;
- public abstract void onCreated(ActorContext context) throws Exception;
+ public abstract void onClusterEventMsg(ClusterEventMsg msg) throws Exception;
- public abstract void onUpdate(ActorContext context) throws Exception;
+ public void onCreated(ActorContext context) throws Exception {
+ start(context);
+ }
- public abstract void onActivate(ActorContext context) throws Exception;
+ public void onUpdate(ActorContext context) throws Exception {
+ restart(context);
+ }
- public abstract void onSuspend(ActorContext context) throws Exception;
+ public void onActivate(ActorContext context) throws Exception {
+ restart(context);
+ }
- public abstract void onStop(ActorContext context) throws Exception;
+ public void onSuspend(ActorContext context) throws Exception {
+ stop(context);
+ }
- public abstract void onClusterEventMsg(ClusterEventMsg msg) throws Exception;
+ public void onStop(ActorContext context) throws Exception {
+ stop(context);
+ }
+
+ private void restart(ActorContext context) throws Exception {
+ stop(context);
+ start(context);
+ }
public void scheduleStatsPersistTick(ActorContext context, long statsPersistFrequency) {
schedulePeriodicMsgWithDelay(context, new StatsPersistTick(), statsPersistFrequency, statsPersistFrequency);
}
+
+ protected void checkActive() {
+ if (state != ComponentLifecycleState.ACTIVE) {
+ throw new IllegalStateException("Rule chain is not active!");
+ }
+ }
}
diff --git a/application/src/main/java/org/thingsboard/server/actors/shared/EntityActorsManager.java b/application/src/main/java/org/thingsboard/server/actors/shared/EntityActorsManager.java
new file mode 100644
index 0000000..d4a1f34
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/actors/shared/EntityActorsManager.java
@@ -0,0 +1,86 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.actors.shared;
+
+import akka.actor.ActorContext;
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import akka.actor.UntypedActor;
+import akka.japi.Creator;
+import lombok.extern.slf4j.Slf4j;
+import org.thingsboard.server.actors.ActorSystemContext;
+import org.thingsboard.server.actors.service.ContextAwareActor;
+import org.thingsboard.server.common.data.SearchTextBased;
+import org.thingsboard.server.common.data.SearchTextBasedWithAdditionalInfo;
+import org.thingsboard.server.common.data.id.EntityId;
+import org.thingsboard.server.common.data.id.TenantId;
+import org.thingsboard.server.common.data.id.UUIDBased;
+import org.thingsboard.server.common.data.page.PageDataIterable;
+import org.thingsboard.server.common.data.plugin.PluginMetaData;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Created by ashvayka on 15.03.18.
+ */
+@Slf4j
+public abstract class EntityActorsManager<T extends EntityId, A extends UntypedActor, M extends SearchTextBased<? extends UUIDBased>> {
+
+ protected final ActorSystemContext systemContext;
+ protected final Map<T, ActorRef> actors;
+
+ public EntityActorsManager(ActorSystemContext systemContext) {
+ this.systemContext = systemContext;
+ this.actors = new HashMap<>();
+ }
+
+ protected abstract TenantId getTenantId();
+
+ protected abstract String getDispatcherName();
+
+ protected abstract Creator<A> creator(T entityId);
+
+ protected abstract PageDataIterable.FetchFunction<M> getFetchEntitiesFunction();
+
+ public void init(ActorContext context) {
+ for (M entity : new PageDataIterable<>(getFetchEntitiesFunction(), ContextAwareActor.ENTITY_PACK_LIMIT)) {
+ T entityId = (T) entity.getId();
+ log.debug("[{}|{}] Creating entity actor", entityId.getEntityType(), entityId.getId());
+ //TODO: remove this cast making UUIDBased subclass of EntityId an interface and vice versa.
+ ActorRef actorRef = getOrCreateActor(context, entityId);
+ visit(entity, actorRef);
+ log.debug("[{}|{}] Entity actor created.", entityId.getEntityType(), entityId.getId());
+ }
+ }
+
+ protected void visit(M entity, ActorRef actorRef) {}
+
+ public ActorRef getOrCreateActor(ActorContext context, T entityId) {
+ return actors.computeIfAbsent(entityId, eId ->
+ context.actorOf(Props.create(creator(eId))
+ .withDispatcher(getDispatcherName()), eId.toString()));
+ }
+
+ public void broadcast(Object msg) {
+ actors.values().forEach(actorRef -> actorRef.tell(msg, ActorRef.noSender()));
+ }
+
+ public void remove(T id) {
+ actors.remove(id);
+ }
+
+}
diff --git a/application/src/main/java/org/thingsboard/server/actors/shared/plugin/PluginManager.java b/application/src/main/java/org/thingsboard/server/actors/shared/plugin/PluginManager.java
index 4f5871f..3345e5f 100644
--- a/application/src/main/java/org/thingsboard/server/actors/shared/plugin/PluginManager.java
+++ b/application/src/main/java/org/thingsboard/server/actors/shared/plugin/PluginManager.java
@@ -15,63 +15,28 @@
*/
package org.thingsboard.server.actors.shared.plugin;
-import akka.actor.ActorContext;
-import akka.actor.ActorRef;
-import akka.actor.Props;
+import akka.japi.Creator;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.actors.plugin.PluginActor;
-import org.thingsboard.server.actors.service.ContextAwareActor;
+import org.thingsboard.server.actors.shared.EntityActorsManager;
import org.thingsboard.server.common.data.id.PluginId;
-import org.thingsboard.server.common.data.id.TenantId;
-import org.thingsboard.server.common.data.page.PageDataIterable;
-import org.thingsboard.server.common.data.page.PageDataIterable.FetchFunction;
import org.thingsboard.server.common.data.plugin.PluginMetaData;
import org.thingsboard.server.dao.plugin.PluginService;
-import java.util.HashMap;
-import java.util.Map;
-
@Slf4j
-public abstract class PluginManager {
+public abstract class PluginManager extends EntityActorsManager<PluginId, PluginActor, PluginMetaData> {
- protected final ActorSystemContext systemContext;
protected final PluginService pluginService;
- protected final Map<PluginId, ActorRef> pluginActors;
public PluginManager(ActorSystemContext systemContext) {
- this.systemContext = systemContext;
+ super(systemContext);
this.pluginService = systemContext.getPluginService();
- this.pluginActors = new HashMap<>();
}
- public void init(ActorContext context) {
- PageDataIterable<PluginMetaData> pluginIterator = new PageDataIterable<>(getFetchPluginsFunction(),
- ContextAwareActor.ENTITY_PACK_LIMIT);
- for (PluginMetaData plugin : pluginIterator) {
- log.debug("[{}] Creating plugin actor", plugin.getId());
- getOrCreatePluginActor(context, plugin.getId());
- log.debug("Plugin actor created.");
- }
+ @Override
+ public Creator<PluginActor> creator(PluginId entityId){
+ return new PluginActor.ActorCreator(systemContext, getTenantId(), entityId);
}
- abstract FetchFunction<PluginMetaData> getFetchPluginsFunction();
-
- abstract TenantId getTenantId();
-
- abstract String getDispatcherName();
-
- public ActorRef getOrCreatePluginActor(ActorContext context, PluginId pluginId) {
- return pluginActors.computeIfAbsent(pluginId, pId ->
- context.actorOf(Props.create(new PluginActor.ActorCreator(systemContext, getTenantId(), pId))
- .withDispatcher(getDispatcherName()), pId.toString()));
- }
-
- public void broadcast(Object msg) {
- pluginActors.values().forEach(actorRef -> actorRef.tell(msg, ActorRef.noSender()));
- }
-
- public void remove(PluginId id) {
- pluginActors.remove(id);
- }
}
diff --git a/application/src/main/java/org/thingsboard/server/actors/shared/plugin/SystemPluginManager.java b/application/src/main/java/org/thingsboard/server/actors/shared/plugin/SystemPluginManager.java
index 0888e23..88c52a6 100644
--- a/application/src/main/java/org/thingsboard/server/actors/shared/plugin/SystemPluginManager.java
+++ b/application/src/main/java/org/thingsboard/server/actors/shared/plugin/SystemPluginManager.java
@@ -29,12 +29,12 @@ public class SystemPluginManager extends PluginManager {
}
@Override
- FetchFunction<PluginMetaData> getFetchPluginsFunction() {
+ protected FetchFunction<PluginMetaData> getFetchEntitiesFunction() {
return pluginService::findSystemPlugins;
}
@Override
- TenantId getTenantId() {
+ protected TenantId getTenantId() {
return BasePluginService.SYSTEM_TENANT;
}
diff --git a/application/src/main/java/org/thingsboard/server/actors/shared/plugin/TenantPluginManager.java b/application/src/main/java/org/thingsboard/server/actors/shared/plugin/TenantPluginManager.java
index 14ea2aa..09115f0 100644
--- a/application/src/main/java/org/thingsboard/server/actors/shared/plugin/TenantPluginManager.java
+++ b/application/src/main/java/org/thingsboard/server/actors/shared/plugin/TenantPluginManager.java
@@ -19,6 +19,7 @@ import akka.actor.ActorContext;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.actors.service.DefaultActorService;
import org.thingsboard.server.common.data.id.TenantId;
+import org.thingsboard.server.common.data.page.PageDataIterable;
import org.thingsboard.server.common.data.page.PageDataIterable.FetchFunction;
import org.thingsboard.server.common.data.plugin.PluginMetaData;
@@ -39,12 +40,12 @@ public class TenantPluginManager extends PluginManager {
}
@Override
- FetchFunction<PluginMetaData> getFetchPluginsFunction() {
+ protected FetchFunction<PluginMetaData> getFetchEntitiesFunction() {
return link -> pluginService.findTenantPlugins(tenantId, link);
}
@Override
- TenantId getTenantId() {
+ protected TenantId getTenantId() {
return tenantId;
}
diff --git a/application/src/main/java/org/thingsboard/server/actors/shared/rulechain/RuleChainManager.java b/application/src/main/java/org/thingsboard/server/actors/shared/rulechain/RuleChainManager.java
new file mode 100644
index 0000000..ff0c52e
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/actors/shared/rulechain/RuleChainManager.java
@@ -0,0 +1,59 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.actors.shared.rulechain;
+
+import akka.actor.ActorRef;
+import akka.japi.Creator;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.thingsboard.server.actors.ActorSystemContext;
+import org.thingsboard.server.actors.ruleChain.RuleChainActor;
+import org.thingsboard.server.actors.shared.EntityActorsManager;
+import org.thingsboard.server.common.data.id.RuleChainId;
+import org.thingsboard.server.common.data.rule.RuleChain;
+import org.thingsboard.server.dao.rule.RuleChainService;
+
+/**
+ * Created by ashvayka on 15.03.18.
+ */
+@Slf4j
+public abstract class RuleChainManager extends EntityActorsManager<RuleChainId, RuleChainActor, RuleChain> {
+
+ protected final RuleChainService service;
+ @Getter
+ protected RuleChain rootChain;
+ @Getter
+ protected ActorRef rootChainActor;
+
+ public RuleChainManager(ActorSystemContext systemContext) {
+ super(systemContext);
+ this.service = systemContext.getRuleChainService();
+ }
+
+ @Override
+ public Creator<RuleChainActor> creator(RuleChainId entityId) {
+ return new RuleChainActor.ActorCreator(systemContext, getTenantId(), entityId);
+ }
+
+ @Override
+ protected void visit(RuleChain entity, ActorRef actorRef) {
+ if (entity.isRoot()) {
+ rootChain = entity;
+ rootChainActor = actorRef;
+ }
+ }
+
+}
diff --git a/application/src/main/java/org/thingsboard/server/actors/stats/StatsActor.java b/application/src/main/java/org/thingsboard/server/actors/stats/StatsActor.java
index ccc31cc..8623370 100644
--- a/application/src/main/java/org/thingsboard/server/actors/stats/StatsActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/stats/StatsActor.java
@@ -24,6 +24,7 @@ import org.thingsboard.server.actors.service.ContextAwareActor;
import org.thingsboard.server.actors.service.ContextBasedCreator;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.Event;
+import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.cluster.ServerAddress;
public class StatsActor extends ContextAwareActor {
@@ -36,6 +37,12 @@ public class StatsActor extends ContextAwareActor {
}
@Override
+ protected boolean process(TbActorMsg msg) {
+ //TODO Move everything here, to work with TbActorMsg\
+ return false;
+ }
+
+ @Override
public void onReceive(Object msg) throws Exception {
logger.debug("Received message: {}", msg);
if (msg instanceof StatsPersistMsg) {
diff --git a/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java b/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java
index b923fe1..d53c054 100644
--- a/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java
@@ -15,52 +15,38 @@
*/
package org.thingsboard.server.actors.tenant;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
-
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import akka.event.Logging;
+import akka.event.LoggingAdapter;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.actors.device.DeviceActor;
import org.thingsboard.server.actors.plugin.PluginTerminationMsg;
-import org.thingsboard.server.actors.rule.ComplexRuleActorChain;
-import org.thingsboard.server.actors.rule.RuleActorChain;
-import org.thingsboard.server.actors.service.ContextAwareActor;
+import org.thingsboard.server.actors.ruleChain.RuleChainManagerActor;
import org.thingsboard.server.actors.service.ContextBasedCreator;
import org.thingsboard.server.actors.service.DefaultActorService;
-import org.thingsboard.server.actors.shared.plugin.PluginManager;
import org.thingsboard.server.actors.shared.plugin.TenantPluginManager;
-import org.thingsboard.server.actors.shared.rule.RuleManager;
-import org.thingsboard.server.actors.shared.rule.TenantRuleManager;
+import org.thingsboard.server.actors.shared.rulechain.TenantRuleChainManager;
import org.thingsboard.server.common.data.id.DeviceId;
-import org.thingsboard.server.common.data.id.PluginId;
-import org.thingsboard.server.common.data.id.RuleId;
import org.thingsboard.server.common.data.id.TenantId;
-import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
+import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.device.ToDeviceActorMsg;
-
-import akka.actor.ActorRef;
-import akka.actor.Props;
-import akka.event.Logging;
-import akka.event.LoggingAdapter;
import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
+import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg;
import org.thingsboard.server.extensions.api.device.ToDeviceActorNotificationMsg;
import org.thingsboard.server.extensions.api.plugins.msg.ToPluginActorMsg;
-import org.thingsboard.server.extensions.api.rules.ToRuleActorMsg;
-public class TenantActor extends ContextAwareActor {
+import java.util.HashMap;
+import java.util.Map;
- private final LoggingAdapter logger = Logging.getLogger(getContext().system(), this);
+public class TenantActor extends RuleChainManagerActor {
private final TenantId tenantId;
- private final RuleManager ruleManager;
- private final PluginManager pluginManager;
private final Map<DeviceId, ActorRef> deviceActors;
private TenantActor(ActorSystemContext systemContext, TenantId tenantId) {
- super(systemContext);
+ super(systemContext, new TenantRuleChainManager(systemContext, tenantId), new TenantPluginManager(systemContext, tenantId));
this.tenantId = tenantId;
- this.ruleManager = new TenantRuleManager(systemContext, tenantId);
- this.pluginManager = new TenantPluginManager(systemContext, tenantId);
this.deviceActors = new HashMap<>();
}
@@ -68,8 +54,7 @@ public class TenantActor extends ContextAwareActor {
public void preStart() {
logger.info("[{}] Starting tenant actor.", tenantId);
try {
- ruleManager.init(this.context());
- pluginManager.init(this.context());
+ initRuleChains();
logger.info("[{}] Tenant actor started.", tenantId);
} catch (Exception e) {
logger.error(e, "[{}] Unknown failure", tenantId);
@@ -77,29 +62,45 @@ public class TenantActor extends ContextAwareActor {
}
@Override
- public void onReceive(Object msg) throws Exception {
- logger.debug("[{}] Received message: {}", tenantId, msg);
- if (msg instanceof RuleChainDeviceMsg) {
- process((RuleChainDeviceMsg) msg);
- } else if (msg instanceof ToDeviceActorMsg) {
- onToDeviceActorMsg((ToDeviceActorMsg) msg);
- } else if (msg instanceof ToPluginActorMsg) {
- onToPluginMsg((ToPluginActorMsg) msg);
- } else if (msg instanceof ToRuleActorMsg) {
- onToRuleMsg((ToRuleActorMsg) msg);
- } else if (msg instanceof ToDeviceActorNotificationMsg) {
- onToDeviceActorMsg((ToDeviceActorNotificationMsg) msg);
- } else if (msg instanceof ClusterEventMsg) {
- broadcast(msg);
- } else if (msg instanceof ComponentLifecycleMsg) {
- onComponentLifecycleMsg((ComponentLifecycleMsg) msg);
- } else if (msg instanceof PluginTerminationMsg) {
- onPluginTerminated((PluginTerminationMsg) msg);
- } else {
- logger.warning("[{}] Unknown message: {}!", tenantId, msg);
+ protected boolean process(TbActorMsg msg) {
+ switch (msg.getMsgType()) {
+ case COMPONENT_LIFE_CYCLE_MSG:
+ onComponentLifecycleMsg((ComponentLifecycleMsg) msg);
+ break;
+ case SERVICE_TO_RULE_ENGINE_MSG:
+ onServiceToRuleEngineMsg((ServiceToRuleEngineMsg) msg);
+ break;
+ default:
+ return false;
}
+ return true;
}
+ private void onServiceToRuleEngineMsg(ServiceToRuleEngineMsg msg) {
+ ruleChainManager.getRootChainActor().tell(msg, self());
+ }
+
+
+// @Override
+// public void onReceive(Object msg) throws Exception {
+// logger.debug("[{}] Received message: {}", tenantId, msg);
+// if (msg instanceof ToDeviceActorMsg) {
+// onToDeviceActorMsg((ToDeviceActorMsg) msg);
+// } else if (msg instanceof ToPluginActorMsg) {
+// onToPluginMsg((ToPluginActorMsg) msg);
+// } else if (msg instanceof ToDeviceActorNotificationMsg) {
+// onToDeviceActorMsg((ToDeviceActorNotificationMsg) msg);
+// } else if (msg instanceof ClusterEventMsg) {
+// broadcast(msg);
+// } else if (msg instanceof ComponentLifecycleMsg) {
+// onComponentLifecycleMsg((ComponentLifecycleMsg) msg);
+// } else if (msg instanceof PluginTerminationMsg) {
+// onPluginTerminated((PluginTerminationMsg) msg);
+// } else {
+// logger.warning("[{}] Unknown message: {}!", tenantId, msg);
+// }
+// }
+
private void broadcast(Object msg) {
pluginManager.broadcast(msg);
deviceActors.values().forEach(actorRef -> actorRef.tell(msg, ActorRef.noSender()));
@@ -113,14 +114,9 @@ public class TenantActor extends ContextAwareActor {
getOrCreateDeviceActor(msg.getDeviceId()).tell(msg, ActorRef.noSender());
}
- private void onToRuleMsg(ToRuleActorMsg msg) {
- ActorRef target = ruleManager.getOrCreateRuleActor(this.context(), msg.getRuleId());
- target.tell(msg, ActorRef.noSender());
- }
-
private void onToPluginMsg(ToPluginActorMsg msg) {
if (msg.getPluginTenantId().equals(tenantId)) {
- ActorRef pluginActor = pluginManager.getOrCreatePluginActor(this.context(), msg.getPluginId());
+ ActorRef pluginActor = pluginManager.getOrCreateActor(this.context(), msg.getPluginId());
pluginActor.tell(msg, ActorRef.noSender());
} else {
context().parent().tell(msg, ActorRef.noSender());
@@ -128,23 +124,11 @@ public class TenantActor extends ContextAwareActor {
}
private void onComponentLifecycleMsg(ComponentLifecycleMsg msg) {
- Optional<PluginId> pluginId = msg.getPluginId();
- Optional<RuleId> ruleId = msg.getRuleId();
- if (pluginId.isPresent()) {
- ActorRef pluginActor = pluginManager.getOrCreatePluginActor(this.context(), pluginId.get());
- pluginActor.tell(msg, ActorRef.noSender());
- } else if (ruleId.isPresent()) {
- ActorRef target;
- Optional<ActorRef> ref = ruleManager.update(this.context(), ruleId.get(), msg.getEvent());
- if (ref.isPresent()) {
- target = ref.get();
- } else {
- logger.debug("Failed to find actor for rule: [{}]", ruleId);
- return;
- }
+ ActorRef target = getEntityActorRef(msg.getEntityId());
+ if (target != null) {
target.tell(msg, ActorRef.noSender());
} else {
- logger.debug("[{}] Invalid component lifecycle msg.", tenantId);
+ logger.debug("Invalid component lifecycle msg: {}", msg);
}
}
@@ -152,13 +136,6 @@ public class TenantActor extends ContextAwareActor {
pluginManager.remove(msg.getId());
}
- private void process(RuleChainDeviceMsg msg) {
- ToDeviceActorMsg toDeviceActorMsg = msg.getToDeviceActorMsg();
- ActorRef deviceActor = getOrCreateDeviceActor(toDeviceActorMsg.getDeviceId());
- RuleActorChain tenantChain = ruleManager.getRuleChain(this.context());
- RuleActorChain chain = new ComplexRuleActorChain(msg.getRuleChain(), tenantChain);
- deviceActor.tell(new RuleChainDeviceMsg(toDeviceActorMsg, chain), context().self());
- }
private ActorRef getOrCreateDeviceActor(DeviceId deviceId) {
return deviceActors.computeIfAbsent(deviceId, k -> context().actorOf(Props.create(new DeviceActor.ActorCreator(systemContext, tenantId, deviceId))
diff --git a/application/src/main/java/org/thingsboard/server/controller/PluginController.java b/application/src/main/java/org/thingsboard/server/controller/PluginController.java
index 2c69248..ed17600 100644
--- a/application/src/main/java/org/thingsboard/server/controller/PluginController.java
+++ b/application/src/main/java/org/thingsboard/server/controller/PluginController.java
@@ -71,7 +71,7 @@ public class PluginController extends BaseController {
boolean created = source.getId() == null;
source.setTenantId(getCurrentUser().getTenantId());
PluginMetaData plugin = checkNotNull(pluginService.savePlugin(source));
- actorService.onPluginStateChange(plugin.getTenantId(), plugin.getId(),
+ actorService.onEntityStateChange(plugin.getTenantId(), plugin.getId(),
created ? ComponentLifecycleEvent.CREATED : ComponentLifecycleEvent.UPDATED);
logEntityAction(plugin.getId(), plugin,
@@ -97,7 +97,7 @@ public class PluginController extends BaseController {
PluginId pluginId = new PluginId(toUUID(strPluginId));
PluginMetaData plugin = checkPlugin(pluginService.findPluginById(pluginId));
pluginService.activatePluginById(pluginId);
- actorService.onPluginStateChange(plugin.getTenantId(), plugin.getId(), ComponentLifecycleEvent.ACTIVATED);
+ actorService.onEntityStateChange(plugin.getTenantId(), plugin.getId(), ComponentLifecycleEvent.ACTIVATED);
logEntityAction(plugin.getId(), plugin,
null,
@@ -123,7 +123,7 @@ public class PluginController extends BaseController {
PluginId pluginId = new PluginId(toUUID(strPluginId));
PluginMetaData plugin = checkPlugin(pluginService.findPluginById(pluginId));
pluginService.suspendPluginById(pluginId);
- actorService.onPluginStateChange(plugin.getTenantId(), plugin.getId(), ComponentLifecycleEvent.SUSPENDED);
+ actorService.onEntityStateChange(plugin.getTenantId(), plugin.getId(), ComponentLifecycleEvent.SUSPENDED);
logEntityAction(plugin.getId(), plugin,
null,
@@ -221,7 +221,7 @@ public class PluginController extends BaseController {
PluginId pluginId = new PluginId(toUUID(strPluginId));
PluginMetaData plugin = checkPlugin(pluginService.findPluginById(pluginId));
pluginService.deletePluginById(pluginId);
- actorService.onPluginStateChange(plugin.getTenantId(), plugin.getId(), ComponentLifecycleEvent.DELETED);
+ actorService.onEntityStateChange(plugin.getTenantId(), plugin.getId(), ComponentLifecycleEvent.DELETED);
logEntityAction(pluginId, plugin,
null,
diff --git a/application/src/main/java/org/thingsboard/server/controller/RuleChainController.java b/application/src/main/java/org/thingsboard/server/controller/RuleChainController.java
index f24646b..c6befdd 100644
--- a/application/src/main/java/org/thingsboard/server/controller/RuleChainController.java
+++ b/application/src/main/java/org/thingsboard/server/controller/RuleChainController.java
@@ -78,6 +78,9 @@ public class RuleChainController extends BaseController {
ruleChain.setTenantId(getCurrentUser().getTenantId());
RuleChain savedRuleChain = checkNotNull(ruleChainService.saveRuleChain(ruleChain));
+ actorService.onEntityStateChange(ruleChain.getTenantId(), savedRuleChain.getId(),
+ created ? ComponentLifecycleEvent.CREATED : ComponentLifecycleEvent.UPDATED);
+
logEntityAction(savedRuleChain.getId(), savedRuleChain,
null,
created ? ActionType.ADDED : ActionType.UPDATED, null);
@@ -100,6 +103,8 @@ public class RuleChainController extends BaseController {
RuleChain ruleChain = checkRuleChain(ruleChainMetaData.getRuleChainId());
RuleChainMetaData savedRuleChainMetaData = checkNotNull(ruleChainService.saveRuleChainMetaData(ruleChainMetaData));
+ actorService.onEntityStateChange(ruleChain.getTenantId(), ruleChain.getId(), ComponentLifecycleEvent.UPDATED);
+
logEntityAction(ruleChain.getId(), ruleChain,
null,
ActionType.UPDATED, null, ruleChainMetaData);
@@ -183,6 +188,8 @@ public class RuleChainController extends BaseController {
RuleChain ruleChain = checkRuleChain(ruleChainId);
ruleChainService.deleteRuleChainById(ruleChainId);
+ actorService.onEntityStateChange(ruleChain.getTenantId(), ruleChain.getId(), ComponentLifecycleEvent.DELETED);
+
logEntityAction(ruleChainId, ruleChain,
null,
ActionType.DELETED, null, strRuleChainId);
diff --git a/application/src/main/java/org/thingsboard/server/controller/RuleController.java b/application/src/main/java/org/thingsboard/server/controller/RuleController.java
index e498c8f..9a26902 100644
--- a/application/src/main/java/org/thingsboard/server/controller/RuleController.java
+++ b/application/src/main/java/org/thingsboard/server/controller/RuleController.java
@@ -73,7 +73,7 @@ public class RuleController extends BaseController {
boolean created = source.getId() == null;
source.setTenantId(getCurrentUser().getTenantId());
RuleMetaData rule = checkNotNull(ruleService.saveRule(source));
- actorService.onRuleStateChange(rule.getTenantId(), rule.getId(),
+ actorService.onEntityStateChange(rule.getTenantId(), rule.getId(),
created ? ComponentLifecycleEvent.CREATED : ComponentLifecycleEvent.UPDATED);
logEntityAction(rule.getId(), rule,
@@ -99,7 +99,7 @@ public class RuleController extends BaseController {
RuleId ruleId = new RuleId(toUUID(strRuleId));
RuleMetaData rule = checkRule(ruleService.findRuleById(ruleId));
ruleService.activateRuleById(ruleId);
- actorService.onRuleStateChange(rule.getTenantId(), rule.getId(), ComponentLifecycleEvent.ACTIVATED);
+ actorService.onEntityStateChange(rule.getTenantId(), rule.getId(), ComponentLifecycleEvent.ACTIVATED);
logEntityAction(rule.getId(), rule,
null,
@@ -125,7 +125,7 @@ public class RuleController extends BaseController {
RuleId ruleId = new RuleId(toUUID(strRuleId));
RuleMetaData rule = checkRule(ruleService.findRuleById(ruleId));
ruleService.suspendRuleById(ruleId);
- actorService.onRuleStateChange(rule.getTenantId(), rule.getId(), ComponentLifecycleEvent.SUSPENDED);
+ actorService.onEntityStateChange(rule.getTenantId(), rule.getId(), ComponentLifecycleEvent.SUSPENDED);
logEntityAction(rule.getId(), rule,
null,
@@ -219,7 +219,7 @@ public class RuleController extends BaseController {
RuleId ruleId = new RuleId(toUUID(strRuleId));
RuleMetaData rule = checkRule(ruleService.findRuleById(ruleId));
ruleService.deleteRuleById(ruleId);
- actorService.onRuleStateChange(rule.getTenantId(), rule.getId(), ComponentLifecycleEvent.DELETED);
+ actorService.onEntityStateChange(rule.getTenantId(), rule.getId(), ComponentLifecycleEvent.DELETED);
logEntityAction(ruleId, rule,
null,
diff --git a/application/src/main/java/org/thingsboard/server/service/component/AnnotationComponentDiscoveryService.java b/application/src/main/java/org/thingsboard/server/service/component/AnnotationComponentDiscoveryService.java
index 0a6081d..910b459 100644
--- a/application/src/main/java/org/thingsboard/server/service/component/AnnotationComponentDiscoveryService.java
+++ b/application/src/main/java/org/thingsboard/server/service/component/AnnotationComponentDiscoveryService.java
@@ -26,6 +26,10 @@ import org.springframework.context.annotation.ClassPathScanningCandidateComponen
import org.springframework.core.env.Environment;
import org.springframework.core.type.filter.AnnotationTypeFilter;
import org.springframework.stereotype.Service;
+import org.thingsboard.rule.engine.api.ActionNode;
+import org.thingsboard.rule.engine.api.EnrichmentNode;
+import org.thingsboard.rule.engine.api.FilterNode;
+import org.thingsboard.rule.engine.api.TransformationNode;
import org.thingsboard.server.common.data.plugin.ComponentDescriptor;
import org.thingsboard.server.common.data.plugin.ComponentType;
import org.thingsboard.server.dao.component.ComponentDescriptorService;
@@ -79,8 +83,7 @@ public class AnnotationComponentDiscoveryService implements ComponentDiscoverySe
private List<ComponentDescriptor> persist(Set<BeanDefinition> filterDefs, ComponentType type) {
List<ComponentDescriptor> result = new ArrayList<>();
for (BeanDefinition def : filterDefs) {
- ComponentDescriptor scannedComponent = scanAndPersistComponent(def, type);
- result.add(scannedComponent);
+ result.add(scanAndPersistComponent(def, type));
}
return result;
}
@@ -93,24 +96,36 @@ public class AnnotationComponentDiscoveryService implements ComponentDiscoverySe
Class<?> clazz = Class.forName(clazzName);
String descriptorResourceName;
switch (type) {
+ case ENRICHMENT:
+ EnrichmentNode enrichmentAnnotation = clazz.getAnnotation(EnrichmentNode.class);
+ scannedComponent.setName(enrichmentAnnotation.name());
+ scannedComponent.setScope(enrichmentAnnotation.scope());
+ descriptorResourceName = enrichmentAnnotation.descriptor();
+ break;
case FILTER:
- Filter filterAnnotation = clazz.getAnnotation(Filter.class);
+ FilterNode filterAnnotation = clazz.getAnnotation(FilterNode.class);
scannedComponent.setName(filterAnnotation.name());
scannedComponent.setScope(filterAnnotation.scope());
descriptorResourceName = filterAnnotation.descriptor();
break;
- case PROCESSOR:
- Processor processorAnnotation = clazz.getAnnotation(Processor.class);
- scannedComponent.setName(processorAnnotation.name());
- scannedComponent.setScope(processorAnnotation.scope());
- descriptorResourceName = processorAnnotation.descriptor();
+ case TRANSFORMATION:
+ TransformationNode trAnnotation = clazz.getAnnotation(TransformationNode.class);
+ scannedComponent.setName(trAnnotation.name());
+ scannedComponent.setScope(trAnnotation.scope());
+ descriptorResourceName = trAnnotation.descriptor();
break;
case ACTION:
- Action actionAnnotation = clazz.getAnnotation(Action.class);
+ ActionNode actionAnnotation = clazz.getAnnotation(ActionNode.class);
scannedComponent.setName(actionAnnotation.name());
scannedComponent.setScope(actionAnnotation.scope());
descriptorResourceName = actionAnnotation.descriptor();
break;
+ case OLD_ACTION:
+ Action oldActionAnnotation = clazz.getAnnotation(Action.class);
+ scannedComponent.setName(oldActionAnnotation.name());
+ scannedComponent.setScope(oldActionAnnotation.scope());
+ descriptorResourceName = oldActionAnnotation.descriptor();
+ break;
case PLUGIN:
Plugin pluginAnnotation = clazz.getAnnotation(Plugin.class);
scannedComponent.setName(pluginAnnotation.name());
@@ -122,12 +137,12 @@ public class AnnotationComponentDiscoveryService implements ComponentDiscoverySe
log.error("Can't initialize plugin {}, due to missing action {}!", def.getBeanClassName(), actionClazz.getName());
return new ClassNotFoundException("Action: " + actionClazz.getName() + "is missing!");
});
- if (actionComponent.getType() != ComponentType.ACTION) {
+ if (actionComponent.getType() != ComponentType.OLD_ACTION) {
log.error("Plugin {} action {} has wrong component type!", def.getBeanClassName(), actionClazz.getName(), actionComponent.getType());
throw new RuntimeException("Plugin " + def.getBeanClassName() + "action " + actionClazz.getName() + " has wrong component type!");
}
}
- scannedComponent.setActions(Arrays.stream(pluginAnnotation.actions()).map(action -> action.getName()).collect(Collectors.joining(",")));
+ scannedComponent.setActions(Arrays.stream(pluginAnnotation.actions()).map(Class::getName).collect(Collectors.joining(",")));
break;
default:
throw new RuntimeException(type + " is not supported yet!");
@@ -168,11 +183,15 @@ public class AnnotationComponentDiscoveryService implements ComponentDiscoverySe
@Override
public void discoverComponents() {
- registerComponents(ComponentType.FILTER, Filter.class);
+ registerComponents(ComponentType.ENRICHMENT, EnrichmentNode.class);
+
+ registerComponents(ComponentType.FILTER, FilterNode.class);
+
+ registerComponents(ComponentType.TRANSFORMATION, TransformationNode.class);
- registerComponents(ComponentType.PROCESSOR, Processor.class);
+ registerComponents(ComponentType.ACTION, ActionNode.class);
- registerComponents(ComponentType.ACTION, Action.class);
+ registerComponents(ComponentType.OLD_ACTION, Action.class);
registerComponents(ComponentType.PLUGIN, Plugin.class);
@@ -199,7 +218,7 @@ public class AnnotationComponentDiscoveryService implements ComponentDiscoverySe
}
List<ComponentDescriptor> result = new ArrayList<>();
for (String action : plugin.getActions().split(",")) {
- getComponent(action).ifPresent(v -> result.add(v));
+ getComponent(action).ifPresent(result::add);
}
return result;
} else {
diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml
index 1c842c8..2758521 100644
--- a/application/src/main/resources/thingsboard.yml
+++ b/application/src/main/resources/thingsboard.yml
@@ -62,7 +62,7 @@ cluster:
# Plugins configuration parameters
plugins:
# Comma seperated package list used during classpath scanning for plugins
- scan_packages: "${PLUGINS_SCAN_PACKAGES:org.thingsboard.server.extensions}"
+ scan_packages: "${PLUGINS_SCAN_PACKAGES:org.thingsboard.server.extensions,org.thingsboard.rule.engine}"
# JWT Token parameters
security.jwt:
@@ -215,6 +215,12 @@ actors:
termination.delay: "${ACTORS_RULE_TERMINATION_DELAY:30000}"
# Errors for particular actor are persisted once per specified amount of milliseconds
error_persist_frequency: "${ACTORS_RULE_ERROR_FREQUENCY:3000}"
+ chain:
+ # Errors for particular actor are persisted once per specified amount of milliseconds
+ error_persist_frequency: "${ACTORS_RULE_CHAIN_ERROR_FREQUENCY:3000}"
+ node:
+ # Errors for particular actor are persisted once per specified amount of milliseconds
+ error_persist_frequency: "${ACTORS_RULE_NODE_ERROR_FREQUENCY:3000}"
statistics:
# Enable/disable actor statistics
enabled: "${ACTORS_STATISTICS_ENABLED:true}"
diff --git a/application/src/test/java/org/thingsboard/server/controller/AbstractControllerTest.java b/application/src/test/java/org/thingsboard/server/controller/AbstractControllerTest.java
index b92e464..3ec4dc8 100644
--- a/application/src/test/java/org/thingsboard/server/controller/AbstractControllerTest.java
+++ b/application/src/test/java/org/thingsboard/server/controller/AbstractControllerTest.java
@@ -96,6 +96,8 @@ import static org.springframework.test.web.servlet.setup.MockMvcBuilders.webAppC
@Slf4j
public abstract class AbstractControllerTest {
+ protected ObjectMapper mapper = new ObjectMapper();
+
protected static final String TEST_TENANT_NAME = "TEST TENANT";
protected static final String SYS_ADMIN_EMAIL = "sysadmin@thingsboard.org";
diff --git a/application/src/test/java/org/thingsboard/server/controller/AbstractRuleEngineControllerTest.java b/application/src/test/java/org/thingsboard/server/controller/AbstractRuleEngineControllerTest.java
new file mode 100644
index 0000000..bbcb98f
--- /dev/null
+++ b/application/src/test/java/org/thingsboard/server/controller/AbstractRuleEngineControllerTest.java
@@ -0,0 +1,56 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.controller;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import org.thingsboard.server.common.data.DataConstants;
+import org.thingsboard.server.common.data.Event;
+import org.thingsboard.server.common.data.id.EntityId;
+import org.thingsboard.server.common.data.id.RuleChainId;
+import org.thingsboard.server.common.data.id.TenantId;
+import org.thingsboard.server.common.data.page.TimePageData;
+import org.thingsboard.server.common.data.page.TimePageLink;
+import org.thingsboard.server.common.data.rule.RuleChain;
+import org.thingsboard.server.common.data.rule.RuleChainMetaData;
+
+/**
+ * Created by ashvayka on 20.03.18.
+ */
+public class AbstractRuleEngineControllerTest extends AbstractControllerTest {
+
+ protected RuleChain saveRuleChain(RuleChain ruleChain) throws Exception {
+ return doPost("/api/ruleChain", ruleChain, RuleChain.class);
+ }
+
+ protected RuleChain getRuleChain(RuleChainId ruleChainId) throws Exception {
+ return doGet("/api/ruleChain/" + ruleChainId.getId().toString(), RuleChain.class);
+ }
+
+ protected RuleChainMetaData saveRuleChainMetaData(RuleChainMetaData ruleChainMD) throws Exception {
+ return doPost("/api/ruleChain/metadata", ruleChainMD, RuleChainMetaData.class);
+ }
+
+ protected RuleChainMetaData getRuleChainMetaData(RuleChainId ruleChainId) throws Exception {
+ return doGet("/api/ruleChain/metadata/" + ruleChainId.getId().toString(), RuleChainMetaData.class);
+ }
+
+ protected TimePageData<Event> getDebugEvents(TenantId tenantId, EntityId entityId, int limit) throws Exception {
+ TimePageLink pageLink = new TimePageLink(limit);
+ return doGetTypedWithTimePageLink("/api/events/{entityType}/{entityId}/{eventType}?tenantId={tenantId}&",
+ new TypeReference<TimePageData<Event>>() {
+ }, pageLink, entityId.getEntityType(), entityId.getId(), DataConstants.DEBUG, tenantId.getId());
+ }
+}
diff --git a/application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java b/application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java
new file mode 100644
index 0000000..acbace6
--- /dev/null
+++ b/application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java
@@ -0,0 +1,190 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.rules.flow;
+
+import com.datastax.driver.core.utils.UUIDs;
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.thingsboard.rule.engine.metadata.TbGetAttributesNodeConfiguration;
+import org.thingsboard.server.actors.service.ActorService;
+import org.thingsboard.server.common.data.*;
+import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
+import org.thingsboard.server.common.data.kv.StringDataEntry;
+import org.thingsboard.server.common.data.page.TimePageData;
+import org.thingsboard.server.common.data.rule.RuleChain;
+import org.thingsboard.server.common.data.rule.RuleChainMetaData;
+import org.thingsboard.server.common.data.rule.RuleNode;
+import org.thingsboard.server.common.data.security.Authority;
+import org.thingsboard.server.common.msg.TbMsg;
+import org.thingsboard.server.common.msg.TbMsgMetaData;
+import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg;
+import org.thingsboard.server.controller.AbstractRuleEngineControllerTest;
+import org.thingsboard.server.dao.attributes.AttributesService;
+import org.thingsboard.server.dao.rule.RuleChainService;
+
+import java.util.Arrays;
+import java.util.Collections;
+
+import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
+
+/**
+ * @author Valerii Sosliuk
+ */
+@Slf4j
+public abstract class AbstractRuleEngineFlowIntegrationTest extends AbstractRuleEngineControllerTest {
+
+ protected Tenant savedTenant;
+ protected User tenantAdmin;
+
+ @Autowired
+ protected ActorService actorService;
+
+ @Autowired
+ protected AttributesService attributesService;
+
+ @Autowired
+ protected RuleChainService ruleChainService;
+
+ @Before
+ public void beforeTest() throws Exception {
+ loginSysAdmin();
+
+ Tenant tenant = new Tenant();
+ tenant.setTitle("My tenant");
+ savedTenant = doPost("/api/tenant", tenant, Tenant.class);
+ Assert.assertNotNull(savedTenant);
+
+ tenantAdmin = new User();
+ tenantAdmin.setAuthority(Authority.TENANT_ADMIN);
+ tenantAdmin.setTenantId(savedTenant.getId());
+ tenantAdmin.setEmail("tenant2@thingsboard.org");
+ tenantAdmin.setFirstName("Joe");
+ tenantAdmin.setLastName("Downs");
+
+ createUserAndLogin(tenantAdmin, "testPassword1");
+ }
+
+ @After
+ public void afterTest() throws Exception {
+ loginSysAdmin();
+ if (savedTenant != null) {
+ doDelete("/api/tenant/" + savedTenant.getId().getId().toString()).andExpect(status().isOk());
+ }
+ }
+
+ @Test
+ public void testRuleChainWithTwoRules() throws Exception {
+ // Creating Rule Chain
+ RuleChain ruleChain = new RuleChain();
+ ruleChain.setName("Simple Rule Chain");
+ ruleChain.setTenantId(savedTenant.getId());
+ ruleChain.setRoot(true);
+ ruleChain.setDebugMode(true);
+ ruleChain = saveRuleChain(ruleChain);
+ Assert.assertNull(ruleChain.getFirstRuleNodeId());
+
+ RuleChainMetaData metaData = new RuleChainMetaData();
+ metaData.setRuleChainId(ruleChain.getId());
+
+ RuleNode ruleNode1 = new RuleNode();
+ ruleNode1.setName("Simple Rule Node 1");
+ ruleNode1.setType(org.thingsboard.rule.engine.metadata.TbGetAttributesNode.class.getName());
+ ruleNode1.setDebugMode(true);
+ TbGetAttributesNodeConfiguration configuration1 = new TbGetAttributesNodeConfiguration();
+ configuration1.setServerAttributeNames(Collections.singletonList("serverAttributeKey1"));
+ ruleNode1.setConfiguration(mapper.valueToTree(configuration1));
+
+ RuleNode ruleNode2 = new RuleNode();
+ ruleNode2.setName("Simple Rule Node 2");
+ ruleNode2.setType(org.thingsboard.rule.engine.metadata.TbGetAttributesNode.class.getName());
+ ruleNode2.setDebugMode(true);
+ TbGetAttributesNodeConfiguration configuration2 = new TbGetAttributesNodeConfiguration();
+ configuration2.setServerAttributeNames(Collections.singletonList("serverAttributeKey2"));
+ ruleNode2.setConfiguration(mapper.valueToTree(configuration2));
+
+
+ metaData.setNodes(Arrays.asList(ruleNode1, ruleNode2));
+ metaData.setFirstNodeIndex(0);
+ metaData.addConnectionInfo(0, 1, "Success");
+ metaData = saveRuleChainMetaData(metaData);
+ Assert.assertNotNull(metaData);
+
+ ruleChain = getRuleChain(ruleChain.getId());
+ Assert.assertNotNull(ruleChain.getFirstRuleNodeId());
+
+ // Saving the device
+ Device device = new Device();
+ device.setName("My device");
+ device.setType("default");
+ device = doPost("/api/device", device, Device.class);
+
+ attributesService.save(device.getId(), DataConstants.SERVER_SCOPE,
+ Collections.singletonList(new BaseAttributeKvEntry(new StringDataEntry("serverAttributeKey1", "serverAttributeValue1"), System.currentTimeMillis())));
+ attributesService.save(device.getId(), DataConstants.SERVER_SCOPE,
+ Collections.singletonList(new BaseAttributeKvEntry(new StringDataEntry("serverAttributeKey2", "serverAttributeValue2"), System.currentTimeMillis())));
+
+
+ Thread.sleep(1000);
+
+ // Pushing Message to the system
+ TbMsg tbMsg = new TbMsg(UUIDs.timeBased(),
+ "CUSTOM",
+ device.getId(),
+ new TbMsgMetaData(),
+ new byte[]{});
+ actorService.onMsg(new ServiceToRuleEngineMsg(savedTenant.getId(), tbMsg));
+
+ Thread.sleep(3000);
+
+ TimePageData<Event> events = getDebugEvents(savedTenant.getId(), ruleChain.getFirstRuleNodeId(), 1000);
+
+ Assert.assertEquals(2, events.getData().size());
+
+ Event inEvent = events.getData().stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.IN)).findFirst().get();
+ Assert.assertEquals(ruleChain.getFirstRuleNodeId(), inEvent.getEntityId());
+ Assert.assertEquals(device.getId().getId().toString(), inEvent.getBody().get("entityId").asText());
+
+ Event outEvent = events.getData().stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.OUT)).findFirst().get();
+ Assert.assertEquals(ruleChain.getFirstRuleNodeId(), outEvent.getEntityId());
+ Assert.assertEquals(device.getId().getId().toString(), outEvent.getBody().get("entityId").asText());
+
+ Assert.assertEquals("serverAttributeValue1", outEvent.getBody().get("metadata").get("ss.serverAttributeKey1").asText());
+
+ RuleChain finalRuleChain = ruleChain;
+ RuleNode lastRuleNode = metaData.getNodes().stream().filter(node -> !node.getId().equals(finalRuleChain.getFirstRuleNodeId())).findFirst().get();
+
+ events = getDebugEvents(savedTenant.getId(), lastRuleNode.getId(), 1000);
+
+ Assert.assertEquals(2, events.getData().size());
+
+ inEvent = events.getData().stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.IN)).findFirst().get();
+ Assert.assertEquals(lastRuleNode.getId(), inEvent.getEntityId());
+ Assert.assertEquals(device.getId().getId().toString(), inEvent.getBody().get("entityId").asText());
+
+ outEvent = events.getData().stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.OUT)).findFirst().get();
+ Assert.assertEquals(lastRuleNode.getId(), outEvent.getEntityId());
+ Assert.assertEquals(device.getId().getId().toString(), outEvent.getBody().get("entityId").asText());
+
+ Assert.assertEquals("serverAttributeValue1", outEvent.getBody().get("metadata").get("ss.serverAttributeKey1").asText());
+ Assert.assertEquals("serverAttributeValue2", outEvent.getBody().get("metadata").get("ss.serverAttributeKey2").asText());
+ }
+
+}
diff --git a/application/src/test/java/org/thingsboard/server/rules/lifecycle/AbstractRuleEngineLifecycleIntegrationTest.java b/application/src/test/java/org/thingsboard/server/rules/lifecycle/AbstractRuleEngineLifecycleIntegrationTest.java
new file mode 100644
index 0000000..de690a7
--- /dev/null
+++ b/application/src/test/java/org/thingsboard/server/rules/lifecycle/AbstractRuleEngineLifecycleIntegrationTest.java
@@ -0,0 +1,158 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.rules.lifecycle;
+
+import com.datastax.driver.core.utils.UUIDs;
+import lombok.extern.slf4j.Slf4j;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.thingsboard.rule.engine.metadata.TbGetAttributesNodeConfiguration;
+import org.thingsboard.server.actors.service.ActorService;
+import org.thingsboard.server.common.data.DataConstants;
+import org.thingsboard.server.common.data.Device;
+import org.thingsboard.server.common.data.Event;
+import org.thingsboard.server.common.data.Tenant;
+import org.thingsboard.server.common.data.User;
+import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
+import org.thingsboard.server.common.data.kv.StringDataEntry;
+import org.thingsboard.server.common.data.page.TimePageData;
+import org.thingsboard.server.common.data.rule.RuleChain;
+import org.thingsboard.server.common.data.rule.RuleChainMetaData;
+import org.thingsboard.server.common.data.rule.RuleNode;
+import org.thingsboard.server.common.data.security.Authority;
+import org.thingsboard.server.common.msg.TbMsg;
+import org.thingsboard.server.common.msg.TbMsgMetaData;
+import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg;
+import org.thingsboard.server.controller.AbstractRuleEngineControllerTest;
+import org.thingsboard.server.dao.attributes.AttributesService;
+
+import java.util.Collections;
+
+import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
+
+/**
+ * @author Valerii Sosliuk
+ */
+@Slf4j
+public abstract class AbstractRuleEngineLifecycleIntegrationTest extends AbstractRuleEngineControllerTest {
+
+ protected Tenant savedTenant;
+ protected User tenantAdmin;
+
+ @Autowired
+ protected ActorService actorService;
+
+ @Autowired
+ protected AttributesService attributesService;
+
+ @Before
+ public void beforeTest() throws Exception {
+ loginSysAdmin();
+
+ Tenant tenant = new Tenant();
+ tenant.setTitle("My tenant");
+ savedTenant = doPost("/api/tenant", tenant, Tenant.class);
+ Assert.assertNotNull(savedTenant);
+
+ tenantAdmin = new User();
+ tenantAdmin.setAuthority(Authority.TENANT_ADMIN);
+ tenantAdmin.setTenantId(savedTenant.getId());
+ tenantAdmin.setEmail("tenant2@thingsboard.org");
+ tenantAdmin.setFirstName("Joe");
+ tenantAdmin.setLastName("Downs");
+
+ createUserAndLogin(tenantAdmin, "testPassword1");
+ }
+
+ @After
+ public void afterTest() throws Exception {
+ loginSysAdmin();
+ if (savedTenant != null) {
+ doDelete("/api/tenant/" + savedTenant.getId().getId().toString()).andExpect(status().isOk());
+ }
+ }
+
+ @Test
+ public void testRuleChainWithOneRule() throws Exception {
+ // Creating Rule Chain
+ RuleChain ruleChain = new RuleChain();
+ ruleChain.setName("Simple Rule Chain");
+ ruleChain.setTenantId(savedTenant.getId());
+ ruleChain.setRoot(true);
+ ruleChain.setDebugMode(true);
+ ruleChain = saveRuleChain(ruleChain);
+ Assert.assertNull(ruleChain.getFirstRuleNodeId());
+
+ RuleChainMetaData metaData = new RuleChainMetaData();
+ metaData.setRuleChainId(ruleChain.getId());
+
+ RuleNode ruleNode = new RuleNode();
+ ruleNode.setName("Simple Rule Node");
+ ruleNode.setType(org.thingsboard.rule.engine.metadata.TbGetAttributesNode.class.getName());
+ ruleNode.setDebugMode(true);
+ TbGetAttributesNodeConfiguration configuration = new TbGetAttributesNodeConfiguration();
+ configuration.setServerAttributeNames(Collections.singletonList("serverAttributeKey"));
+ ruleNode.setConfiguration(mapper.valueToTree(configuration));
+
+ metaData.setNodes(Collections.singletonList(ruleNode));
+ metaData.setFirstNodeIndex(0);
+
+ metaData = saveRuleChainMetaData(metaData);
+ Assert.assertNotNull(metaData);
+
+ ruleChain = getRuleChain(ruleChain.getId());
+ Assert.assertNotNull(ruleChain.getFirstRuleNodeId());
+
+ // Saving the device
+ Device device = new Device();
+ device.setName("My device");
+ device.setType("default");
+ device = doPost("/api/device", device, Device.class);
+
+ attributesService.save(device.getId(), DataConstants.SERVER_SCOPE,
+ Collections.singletonList(new BaseAttributeKvEntry(new StringDataEntry("serverAttributeKey", "serverAttributeValue"), System.currentTimeMillis())));
+
+ Thread.sleep(1000);
+
+ // Pushing Message to the system
+ TbMsg tbMsg = new TbMsg(UUIDs.timeBased(),
+ "CUSTOM",
+ device.getId(),
+ new TbMsgMetaData(),
+ new byte[]{});
+ actorService.onMsg(new ServiceToRuleEngineMsg(savedTenant.getId(), tbMsg));
+
+ Thread.sleep(3000);
+
+ TimePageData<Event> events = getDebugEvents(savedTenant.getId(), ruleChain.getFirstRuleNodeId(), 1000);
+
+ Assert.assertEquals(2, events.getData().size());
+
+ Event inEvent = events.getData().stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.IN)).findFirst().get();
+ Assert.assertEquals(ruleChain.getFirstRuleNodeId(), inEvent.getEntityId());
+ Assert.assertEquals(device.getId().getId().toString(), inEvent.getBody().get("entityId").asText());
+
+ Event outEvent = events.getData().stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.OUT)).findFirst().get();
+ Assert.assertEquals(ruleChain.getFirstRuleNodeId(), outEvent.getEntityId());
+ Assert.assertEquals(device.getId().getId().toString(), outEvent.getBody().get("entityId").asText());
+
+ Assert.assertEquals("serverAttributeValue", outEvent.getBody().get("metadata").get("ss.serverAttributeKey").asText());
+ }
+
+}
diff --git a/application/src/test/java/org/thingsboard/server/rules/RuleEngineSqlTestSuite.java b/application/src/test/java/org/thingsboard/server/rules/RuleEngineSqlTestSuite.java
new file mode 100644
index 0000000..65b4293
--- /dev/null
+++ b/application/src/test/java/org/thingsboard/server/rules/RuleEngineSqlTestSuite.java
@@ -0,0 +1,35 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.rules;
+
+import org.junit.ClassRule;
+import org.junit.extensions.cpsuite.ClasspathSuite;
+import org.junit.runner.RunWith;
+import org.thingsboard.server.dao.CustomSqlUnit;
+
+import java.util.Arrays;
+
+@RunWith(ClasspathSuite.class)
+@ClasspathSuite.ClassnameFilters({
+ "org.thingsboard.server.rules.flow.*Test"})
+public class RuleEngineSqlTestSuite {
+
+ @ClassRule
+ public static CustomSqlUnit sqlUnit = new CustomSqlUnit(
+ Arrays.asList("sql/schema.sql", "sql/system-data.sql"),
+ "sql/drop-all-tables.sql",
+ "sql-test.properties");
+}
diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java b/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java
index a776d7b..659a242 100644
--- a/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java
+++ b/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java
@@ -37,7 +37,12 @@ public class DataConstants {
public static final String ERROR = "ERROR";
public static final String LC_EVENT = "LC_EVENT";
public static final String STATS = "STATS";
+ public static final String DEBUG = "DEBUG";
public static final String ONEWAY = "ONEWAY";
public static final String TWOWAY = "TWOWAY";
+
+ public static final String IN = "IN";
+ public static final String OUT = "OUT";
+
}
diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/page/PageDataIterable.java b/common/data/src/main/java/org/thingsboard/server/common/data/page/PageDataIterable.java
index 34f8c3a..ffd7822 100644
--- a/common/data/src/main/java/org/thingsboard/server/common/data/page/PageDataIterable.java
+++ b/common/data/src/main/java/org/thingsboard/server/common/data/page/PageDataIterable.java
@@ -20,6 +20,7 @@ import java.util.List;
import java.util.NoSuchElementException;
import org.thingsboard.server.common.data.SearchTextBased;
+import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.UUIDBased;
public class PageDataIterable<T extends SearchTextBased<? extends UUIDBased>> implements Iterable<T>, Iterator<T> {
diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/plugin/ComponentType.java b/common/data/src/main/java/org/thingsboard/server/common/data/plugin/ComponentType.java
index 45fb590..a103064 100644
--- a/common/data/src/main/java/org/thingsboard/server/common/data/plugin/ComponentType.java
+++ b/common/data/src/main/java/org/thingsboard/server/common/data/plugin/ComponentType.java
@@ -20,6 +20,6 @@ package org.thingsboard.server.common.data.plugin;
*/
public enum ComponentType {
- FILTER, PROCESSOR, ACTION, PLUGIN
+ ENRICHMENT, FILTER, TRANSFORMATION, ACTION, OLD_ACTION, PLUGIN
}
diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/rule/RuleChain.java b/common/data/src/main/java/org/thingsboard/server/common/data/rule/RuleChain.java
index f2ba0cc..218061a 100644
--- a/common/data/src/main/java/org/thingsboard/server/common/data/rule/RuleChain.java
+++ b/common/data/src/main/java/org/thingsboard/server/common/data/rule/RuleChain.java
@@ -38,6 +38,7 @@ public class RuleChain extends SearchTextBasedWithAdditionalInfo<RuleChainId> im
private String name;
private RuleNodeId firstRuleNodeId;
private boolean root;
+ private boolean debugMode;
private transient JsonNode configuration;
@JsonIgnore
private byte[] configurationBytes;
diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/rule/RuleChainMetaData.java b/common/data/src/main/java/org/thingsboard/server/common/data/rule/RuleChainMetaData.java
index af141d6..1be1518 100644
--- a/common/data/src/main/java/org/thingsboard/server/common/data/rule/RuleChainMetaData.java
+++ b/common/data/src/main/java/org/thingsboard/server/common/data/rule/RuleChainMetaData.java
@@ -58,18 +58,4 @@ public class RuleChainMetaData {
ruleChainConnections.add(connectionInfo);
}
- @Data
- public class NodeConnectionInfo {
- private int fromIndex;
- private int toIndex;
- private String type;
- }
-
- @Data
- public class RuleChainConnectionInfo {
- private int fromIndex;
- private RuleChainId targetRuleChainId;
- private String type;
- }
-
}
diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/rule/RuleNode.java b/common/data/src/main/java/org/thingsboard/server/common/data/rule/RuleNode.java
index d044000..fbc1103 100644
--- a/common/data/src/main/java/org/thingsboard/server/common/data/rule/RuleNode.java
+++ b/common/data/src/main/java/org/thingsboard/server/common/data/rule/RuleNode.java
@@ -34,6 +34,7 @@ public class RuleNode extends SearchTextBasedWithAdditionalInfo<RuleNodeId> impl
private String type;
private String name;
+ private boolean debugMode;
private transient JsonNode configuration;
@JsonIgnore
private byte[] configurationBytes;
diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java b/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java
new file mode 100644
index 0000000..7c00ee6
--- /dev/null
+++ b/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java
@@ -0,0 +1,57 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.common.msg;
+
+/**
+ * Created by ashvayka on 15.03.18.
+ */
+public enum MsgType {
+
+ /**
+ * ADDED/UPDATED/DELETED events for main entities.
+ *
+ * @See {@link org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg}
+ */
+ COMPONENT_LIFE_CYCLE_MSG,
+
+ /**
+ * Misc messages from the REST API/SERVICE layer to the new rule engine.
+ *
+ * @See {@link org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg}
+ */
+ SERVICE_TO_RULE_ENGINE_MSG,
+
+
+ SESSION_TO_DEVICE_ACTOR_MSG,
+ DEVICE_ACTOR_TO_SESSION_MSG,
+
+
+ /**
+ * Message that is sent by RuleChainActor to RuleActor with command to process TbMsg.
+ */
+ RULE_CHAIN_TO_RULE_MSG,
+
+ /**
+ * Message that is sent by RuleActor to RuleChainActor with command to process TbMsg by next nodes in chain.
+ */
+ RULE_TO_RULE_CHAIN_TELL_NEXT_MSG,
+
+ /**
+ * Message that is sent by RuleActor implementation to RuleActor itself to log the error.
+ */
+ RULE_TO_SELF_ERROR_MSG,
+
+}
diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/plugin/ComponentLifecycleMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/plugin/ComponentLifecycleMsg.java
index d48c3fe..c104281 100644
--- a/common/message/src/main/java/org/thingsboard/server/common/msg/plugin/ComponentLifecycleMsg.java
+++ b/common/message/src/main/java/org/thingsboard/server/common/msg/plugin/ComponentLifecycleMsg.java
@@ -15,14 +15,14 @@
*/
package org.thingsboard.server.common.msg.plugin;
-import lombok.Data;
import lombok.Getter;
import lombok.ToString;
-import org.thingsboard.server.common.data.id.PluginId;
-import org.thingsboard.server.common.data.id.RuleId;
-import org.thingsboard.server.common.data.id.TenantId;
+import org.thingsboard.server.common.data.EntityType;
+import org.thingsboard.server.common.data.id.*;
import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
-import org.thingsboard.server.common.data.plugin.ComponentLifecycleState;
+import org.thingsboard.server.common.data.rule.RuleChain;
+import org.thingsboard.server.common.msg.MsgType;
+import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.aware.TenantAwareMsg;
import org.thingsboard.server.common.msg.cluster.ToAllNodesMsg;
@@ -32,34 +32,34 @@ import java.util.Optional;
* @author Andrew Shvayka
*/
@ToString
-public class ComponentLifecycleMsg implements TenantAwareMsg, ToAllNodesMsg {
+public class ComponentLifecycleMsg implements TbActorMsg, TenantAwareMsg, ToAllNodesMsg {
@Getter
private final TenantId tenantId;
- private final PluginId pluginId;
- private final RuleId ruleId;
+ @Getter
+ private final EntityId entityId;
@Getter
private final ComponentLifecycleEvent event;
- public static ComponentLifecycleMsg forPlugin(TenantId tenantId, PluginId pluginId, ComponentLifecycleEvent event) {
- return new ComponentLifecycleMsg(tenantId, pluginId, null, event);
- }
-
- public static ComponentLifecycleMsg forRule(TenantId tenantId, RuleId ruleId, ComponentLifecycleEvent event) {
- return new ComponentLifecycleMsg(tenantId, null, ruleId, event);
- }
-
- private ComponentLifecycleMsg(TenantId tenantId, PluginId pluginId, RuleId ruleId, ComponentLifecycleEvent event) {
+ public ComponentLifecycleMsg(TenantId tenantId, EntityId entityId, ComponentLifecycleEvent event) {
this.tenantId = tenantId;
- this.pluginId = pluginId;
- this.ruleId = ruleId;
+ this.entityId = entityId;
this.event = event;
}
public Optional<PluginId> getPluginId() {
- return Optional.ofNullable(pluginId);
+ return entityId.getEntityType() == EntityType.PLUGIN ? Optional.of((PluginId) entityId) : Optional.empty();
}
public Optional<RuleId> getRuleId() {
- return Optional.ofNullable(ruleId);
+ return entityId.getEntityType() == EntityType.RULE ? Optional.of((RuleId) entityId) : Optional.empty();
+ }
+
+ public Optional<RuleChainId> getRuleChainId() {
+ return entityId.getEntityType() == EntityType.RULE_CHAIN ? Optional.of((RuleChainId) entityId) : Optional.empty();
+ }
+
+ @Override
+ public MsgType getMsgType() {
+ return MsgType.COMPONENT_LIFE_CYCLE_MSG;
}
}
diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java
index 261dda7..524cc5f 100644
--- a/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java
+++ b/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java
@@ -17,6 +17,7 @@ package org.thingsboard.server.common.msg;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
+import lombok.AllArgsConstructor;
import lombok.Data;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.EntityIdFactory;
@@ -30,18 +31,23 @@ import java.util.UUID;
* Created by ashvayka on 13.01.18.
*/
@Data
-public final class TbMsg implements Serializable, Cloneable {
+@AllArgsConstructor
+public final class TbMsg implements Serializable {
private final UUID id;
private final String type;
private final EntityId originator;
private final TbMsgMetaData metaData;
-
+ private final TbMsgDataType dataType;
private final byte[] data;
- @Override
- public TbMsg clone() {
- return fromBytes(toBytes(this));
+ public TbMsg(UUID id, String type, EntityId originator, TbMsgMetaData metaData, byte[] data) {
+ this.id = id;
+ this.type = type;
+ this.originator = originator;
+ this.metaData = metaData;
+ this.dataType = TbMsgDataType.JSON;
+ this.data = data;
}
public static ByteBuffer toBytes(TbMsg msg) {
@@ -54,11 +60,10 @@ public final class TbMsg implements Serializable, Cloneable {
}
if (msg.getMetaData() != null) {
- MsgProtos.TbMsgProto.TbMsgMetaDataProto.Builder metadataBuilder = MsgProtos.TbMsgProto.TbMsgMetaDataProto.newBuilder();
- metadataBuilder.putAllData(msg.getMetaData().getData());
- builder.addMetaData(metadataBuilder.build());
+ builder.setMetaData(MsgProtos.TbMsgMetaDataProto.newBuilder().putAllData(msg.getMetaData().getData()).build());
}
+ builder.setDataType(msg.getDataType().ordinal());
builder.setData(ByteString.copyFrom(msg.getData()));
byte[] bytes = builder.build().toByteArray();
return ByteBuffer.wrap(bytes);
@@ -67,20 +72,19 @@ public final class TbMsg implements Serializable, Cloneable {
public static TbMsg fromBytes(ByteBuffer buffer) {
try {
MsgProtos.TbMsgProto proto = MsgProtos.TbMsgProto.parseFrom(buffer.array());
- TbMsgMetaData metaData = new TbMsgMetaData();
- if (proto.getMetaDataCount() > 0) {
- metaData.setData(proto.getMetaData(0).getDataMap());
- }
-
- EntityId entityId = null;
- if (proto.getEntityId() != null) {
- entityId = EntityIdFactory.getByTypeAndId(proto.getEntityType(), proto.getEntityId());
- }
-
- return new TbMsg(UUID.fromString(proto.getId()), proto.getType(), entityId, metaData, proto.getData().toByteArray());
+ TbMsgMetaData metaData = new TbMsgMetaData(proto.getMetaData().getDataMap());
+ EntityId entityId = EntityIdFactory.getByTypeAndId(proto.getEntityType(), proto.getEntityId());
+ TbMsgDataType dataType = TbMsgDataType.values()[proto.getDataType()];
+ return new TbMsg(UUID.fromString(proto.getId()), proto.getType(), entityId, metaData, dataType, proto.getData().toByteArray());
} catch (InvalidProtocolBufferException e) {
throw new IllegalStateException("Could not parse protobuf for TbMsg", e);
}
}
+ public TbMsg copy() {
+ int dataSize = data.length;
+ byte[] dataCopy = new byte[dataSize];
+ System.arraycopy( data, 0, dataCopy, 0, data.length );
+ return new TbMsg(id, type, originator, metaData.copy(), dataType, dataCopy);
+ }
}
diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsgMetaData.java b/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsgMetaData.java
index 1bbc792..eca153b 100644
--- a/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsgMetaData.java
+++ b/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsgMetaData.java
@@ -15,9 +15,12 @@
*/
package org.thingsboard.server.common.msg;
+import lombok.AllArgsConstructor;
import lombok.Data;
+import lombok.NoArgsConstructor;
import java.io.Serializable;
+import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -25,10 +28,15 @@ import java.util.concurrent.ConcurrentHashMap;
* Created by ashvayka on 13.01.18.
*/
@Data
+@NoArgsConstructor
public final class TbMsgMetaData implements Serializable {
private Map<String, String> data = new ConcurrentHashMap<>();
+ TbMsgMetaData(Map<String, String> data) {
+ this.data = data;
+ }
+
public String getValue(String key) {
return data.get(key);
}
@@ -37,4 +45,7 @@ public final class TbMsgMetaData implements Serializable {
data.put(key, value);
}
+ public TbMsgMetaData copy() {
+ return new TbMsgMetaData(new ConcurrentHashMap<>(data));
+ }
}
diff --git a/common/message/src/main/proto/tbmsg.proto b/common/message/src/main/proto/tbmsg.proto
index 90fa2bd..62acff2 100644
--- a/common/message/src/main/proto/tbmsg.proto
+++ b/common/message/src/main/proto/tbmsg.proto
@@ -19,6 +19,9 @@ package msgqueue;
option java_package = "org.thingsboard.server.common.msg.gen";
option java_outer_classname = "MsgProtos";
+message TbMsgMetaDataProto {
+ map<string, string> data = 1;
+}
message TbMsgProto {
string id = 1;
@@ -26,11 +29,8 @@ message TbMsgProto {
string entityType = 3;
string entityId = 4;
- message TbMsgMetaDataProto {
- map<string, string> data = 1;
- }
+ TbMsgMetaDataProto metaData = 5;
- repeated TbMsgMetaDataProto metaData = 5;
-
- bytes data = 6;
+ int32 dataType = 6;
+ bytes data = 7;
}
\ No newline at end of file
diff --git a/dao/src/main/java/org/thingsboard/server/dao/model/ModelConstants.java b/dao/src/main/java/org/thingsboard/server/dao/model/ModelConstants.java
index a159b9e..8c34cd3 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/model/ModelConstants.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/model/ModelConstants.java
@@ -332,6 +332,8 @@ public class ModelConstants {
public static final String EVENT_BY_TYPE_AND_ID_VIEW_NAME = "event_by_type_and_id";
public static final String EVENT_BY_ID_VIEW_NAME = "event_by_id";
+ public static final String DEBUG_MODE = "debug_mode";
+
/**
* Cassandra rule chain constants.
*/
diff --git a/dao/src/main/java/org/thingsboard/server/dao/model/nosql/RuleChainEntity.java b/dao/src/main/java/org/thingsboard/server/dao/model/nosql/RuleChainEntity.java
index 34659a8..251a689 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/model/nosql/RuleChainEntity.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/model/nosql/RuleChainEntity.java
@@ -22,6 +22,8 @@ import com.datastax.driver.mapping.annotations.PartitionKey;
import com.datastax.driver.mapping.annotations.Table;
import com.fasterxml.jackson.databind.JsonNode;
import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.Setter;
import lombok.ToString;
import org.thingsboard.server.common.data.id.RuleChainId;
import org.thingsboard.server.common.data.id.RuleNodeId;
@@ -54,6 +56,10 @@ public class RuleChainEntity implements SearchTextEntity<RuleChain> {
private UUID firstRuleNodeId;
@Column(name = RULE_CHAIN_ROOT_PROPERTY)
private boolean root;
+ @Getter
+ @Setter
+ @Column(name = DEBUG_MODE)
+ private boolean debugMode;
@Column(name = RULE_CHAIN_CONFIGURATION_PROPERTY, codec = JsonCodec.class)
private JsonNode configuration;
@Column(name = ADDITIONAL_INFO_PROPERTY, codec = JsonCodec.class)
@@ -71,6 +77,7 @@ public class RuleChainEntity implements SearchTextEntity<RuleChain> {
this.searchText = ruleChain.getName();
this.firstRuleNodeId = DaoUtil.getId(ruleChain.getFirstRuleNodeId());
this.root = ruleChain.isRoot();
+ this.debugMode = ruleChain.isDebugMode();
this.configuration = ruleChain.getConfiguration();
this.additionalInfo = ruleChain.getAdditionalInfo();
}
@@ -157,6 +164,7 @@ public class RuleChainEntity implements SearchTextEntity<RuleChain> {
ruleChain.setFirstRuleNodeId(new RuleNodeId(this.firstRuleNodeId));
}
ruleChain.setRoot(this.root);
+ ruleChain.setDebugMode(this.debugMode);
ruleChain.setConfiguration(this.configuration);
ruleChain.setAdditionalInfo(this.additionalInfo);
return ruleChain;
diff --git a/dao/src/main/java/org/thingsboard/server/dao/model/nosql/RuleNodeEntity.java b/dao/src/main/java/org/thingsboard/server/dao/model/nosql/RuleNodeEntity.java
index ba96e4b..8d3f3c3 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/model/nosql/RuleNodeEntity.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/model/nosql/RuleNodeEntity.java
@@ -21,6 +21,8 @@ import com.datastax.driver.mapping.annotations.PartitionKey;
import com.datastax.driver.mapping.annotations.Table;
import com.fasterxml.jackson.databind.JsonNode;
import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.Setter;
import lombok.ToString;
import org.thingsboard.server.common.data.id.RuleNodeId;
import org.thingsboard.server.common.data.rule.RuleNode;
@@ -49,6 +51,11 @@ public class RuleNodeEntity implements SearchTextEntity<RuleNode> {
private JsonNode configuration;
@Column(name = ADDITIONAL_INFO_PROPERTY, codec = JsonCodec.class)
private JsonNode additionalInfo;
+ @Getter
+ @Setter
+ @Column(name = DEBUG_MODE)
+ private boolean debugMode;
+
public RuleNodeEntity() {
}
@@ -59,6 +66,7 @@ public class RuleNodeEntity implements SearchTextEntity<RuleNode> {
}
this.type = ruleNode.getType();
this.name = ruleNode.getName();
+ this.debugMode = ruleNode.isDebugMode();
this.searchText = ruleNode.getName();
this.configuration = ruleNode.getConfiguration();
this.additionalInfo = ruleNode.getAdditionalInfo();
@@ -126,6 +134,7 @@ public class RuleNodeEntity implements SearchTextEntity<RuleNode> {
ruleNode.setCreatedTime(UUIDs.unixTimestamp(id));
ruleNode.setType(this.type);
ruleNode.setName(this.name);
+ ruleNode.setDebugMode(this.debugMode);
ruleNode.setConfiguration(this.configuration);
ruleNode.setAdditionalInfo(this.additionalInfo);
return ruleNode;
diff --git a/dao/src/main/java/org/thingsboard/server/dao/model/sql/RuleChainEntity.java b/dao/src/main/java/org/thingsboard/server/dao/model/sql/RuleChainEntity.java
index 471ec7b..a48421a 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/model/sql/RuleChainEntity.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/model/sql/RuleChainEntity.java
@@ -58,6 +58,9 @@ public class RuleChainEntity extends BaseSqlEntity<RuleChain> implements SearchT
@Column(name = ModelConstants.RULE_CHAIN_ROOT_PROPERTY)
private boolean root;
+ @Column(name = ModelConstants.DEBUG_MODE)
+ private boolean debugMode;
+
@Type(type = "json")
@Column(name = ModelConstants.RULE_CHAIN_CONFIGURATION_PROPERTY)
private JsonNode configuration;
@@ -80,6 +83,7 @@ public class RuleChainEntity extends BaseSqlEntity<RuleChain> implements SearchT
this.firstRuleNodeId = UUIDConverter.fromTimeUUID(ruleChain.getFirstRuleNodeId().getId());
}
this.root = ruleChain.isRoot();
+ this.debugMode = ruleChain.isDebugMode();
this.configuration = ruleChain.getConfiguration();
this.additionalInfo = ruleChain.getAdditionalInfo();
}
@@ -104,6 +108,7 @@ public class RuleChainEntity extends BaseSqlEntity<RuleChain> implements SearchT
ruleChain.setFirstRuleNodeId(new RuleNodeId(UUIDConverter.fromString(firstRuleNodeId)));
}
ruleChain.setRoot(root);
+ ruleChain.setDebugMode(debugMode);
ruleChain.setConfiguration(configuration);
ruleChain.setAdditionalInfo(additionalInfo);
return ruleChain;
diff --git a/dao/src/main/java/org/thingsboard/server/dao/model/sql/RuleNodeEntity.java b/dao/src/main/java/org/thingsboard/server/dao/model/sql/RuleNodeEntity.java
index d960487..6a888c2 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/model/sql/RuleNodeEntity.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/model/sql/RuleNodeEntity.java
@@ -56,6 +56,9 @@ public class RuleNodeEntity extends BaseSqlEntity<RuleNode> implements SearchTex
@Column(name = ModelConstants.ADDITIONAL_INFO_PROPERTY)
private JsonNode additionalInfo;
+ @Column(name = ModelConstants.DEBUG_MODE)
+ private boolean debugMode;
+
public RuleNodeEntity() {
}
@@ -65,6 +68,7 @@ public class RuleNodeEntity extends BaseSqlEntity<RuleNode> implements SearchTex
}
this.type = ruleNode.getType();
this.name = ruleNode.getName();
+ this.debugMode = ruleNode.isDebugMode();
this.searchText = ruleNode.getName();
this.configuration = ruleNode.getConfiguration();
this.additionalInfo = ruleNode.getAdditionalInfo();
@@ -86,6 +90,7 @@ public class RuleNodeEntity extends BaseSqlEntity<RuleNode> implements SearchTex
ruleNode.setCreatedTime(UUIDs.unixTimestamp(getId()));
ruleNode.setType(type);
ruleNode.setName(name);
+ ruleNode.setDebugMode(debugMode);
ruleNode.setConfiguration(configuration);
ruleNode.setAdditionalInfo(additionalInfo);
return ruleNode;
diff --git a/dao/src/main/java/org/thingsboard/server/dao/queue/QueueBenchmark.java b/dao/src/main/java/org/thingsboard/server/dao/queue/QueueBenchmark.java
index da991fa..bb76b97 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/queue/QueueBenchmark.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/queue/QueueBenchmark.java
@@ -32,6 +32,7 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.thingsboard.server.common.msg.TbMsg;
+import org.thingsboard.server.common.msg.TbMsgDataType;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import javax.annotation.Nullable;
@@ -125,7 +126,7 @@ public class QueueBenchmark implements CommandLineRunner {
TbMsgMetaData metaData = new TbMsgMetaData();
metaData.putValue("key", "value");
String dataStr = "someContent";
- return new TbMsg(UUIDs.timeBased(), "type", null, metaData, dataStr.getBytes());
+ return new TbMsg(UUIDs.timeBased(), "type", null, metaData, TbMsgDataType.JSON, dataStr.getBytes());
}
@Bean
diff --git a/dao/src/main/java/org/thingsboard/server/dao/rule/BaseRuleChainService.java b/dao/src/main/java/org/thingsboard/server/dao/rule/BaseRuleChainService.java
index 04207ec..7d6cd00 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/rule/BaseRuleChainService.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/rule/BaseRuleChainService.java
@@ -31,7 +31,9 @@ import org.thingsboard.server.common.data.page.TextPageData;
import org.thingsboard.server.common.data.page.TextPageLink;
import org.thingsboard.server.common.data.relation.EntityRelation;
import org.thingsboard.server.common.data.relation.RelationTypeGroup;
+import org.thingsboard.server.common.data.rule.NodeConnectionInfo;
import org.thingsboard.server.common.data.rule.RuleChain;
+import org.thingsboard.server.common.data.rule.RuleChainConnectionInfo;
import org.thingsboard.server.common.data.rule.RuleChainMetaData;
import org.thingsboard.server.common.data.rule.RuleNode;
import org.thingsboard.server.dao.entity.AbstractEntityService;
@@ -148,7 +150,7 @@ public class BaseRuleChainService extends AbstractEntityService implements RuleC
ruleChainDao.save(ruleChain);
}
if (ruleChainMetaData.getConnections() != null) {
- for (RuleChainMetaData.NodeConnectionInfo nodeConnection : ruleChainMetaData.getConnections()) {
+ for (NodeConnectionInfo nodeConnection : ruleChainMetaData.getConnections()) {
EntityId from = nodes.get(nodeConnection.getFromIndex()).getId();
EntityId to = nodes.get(nodeConnection.getToIndex()).getId();
String type = nodeConnection.getType();
@@ -161,7 +163,7 @@ public class BaseRuleChainService extends AbstractEntityService implements RuleC
}
}
if (ruleChainMetaData.getRuleChainConnections() != null) {
- for (RuleChainMetaData.RuleChainConnectionInfo nodeToRuleChainConnection : ruleChainMetaData.getRuleChainConnections()) {
+ for (RuleChainConnectionInfo nodeToRuleChainConnection : ruleChainMetaData.getRuleChainConnections()) {
EntityId from = nodes.get(nodeToRuleChainConnection.getFromIndex()).getId();
EntityId to = nodeToRuleChainConnection.getTargetRuleChainId();
String type = nodeToRuleChainConnection.getType();
@@ -220,6 +222,12 @@ public class BaseRuleChainService extends AbstractEntityService implements RuleC
}
@Override
+ public RuleNode findRuleNodeById(RuleNodeId ruleNodeId) {
+ Validator.validateId(ruleNodeId, "Incorrect rule node id for search request.");
+ return ruleNodeDao.findById(ruleNodeId.getId());
+ }
+
+ @Override
public ListenableFuture<RuleChain> findRuleChainByIdAsync(RuleChainId ruleChainId) {
Validator.validateId(ruleChainId, "Incorrect rule chain id for search request.");
return ruleChainDao.findByIdAsync(ruleChainId.getId());
@@ -308,7 +316,7 @@ public class BaseRuleChainService extends AbstractEntityService implements RuleC
private void createRelation(EntityRelation relation) throws ExecutionException, InterruptedException {
log.debug("Creating relation: {}", relation);
- relationService.saveRelationAsync(relation).get();
+ relationService.saveRelation(relation);
}
private DataValidator<RuleChain> ruleChainValidator =
@@ -325,7 +333,7 @@ public class BaseRuleChainService extends AbstractEntityService implements RuleC
}
if (ruleChain.isRoot()) {
RuleChain rootRuleChain = getRootTenantRuleChain(ruleChain.getTenantId());
- if (ruleChain.getId() == null || !ruleChain.getId().equals(rootRuleChain.getId())) {
+ if (rootRuleChain != null && !rootRuleChain.getId().equals(ruleChain.getId())) {
throw new DataValidationException("Another root rule chain is present in scope of current tenant!");
}
}
diff --git a/dao/src/main/java/org/thingsboard/server/dao/rule/BaseRuleService.java b/dao/src/main/java/org/thingsboard/server/dao/rule/BaseRuleService.java
index f1df09e..fff3f6d 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/rule/BaseRuleService.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/rule/BaseRuleService.java
@@ -16,7 +16,6 @@
package org.thingsboard.server.dao.rule;
import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.node.ArrayNode;
import com.google.common.util.concurrent.ListenableFuture;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
@@ -67,67 +66,7 @@ public class BaseRuleService extends AbstractEntityService implements RuleServic
@Override
public RuleMetaData saveRule(RuleMetaData rule) {
- ruleValidator.validate(rule);
- if (rule.getTenantId() == null) {
- log.trace("Save system rule metadata with predefined id {}", systemTenantId);
- rule.setTenantId(systemTenantId);
- }
- if (rule.getId() != null) {
- RuleMetaData oldVersion = ruleDao.findById(rule.getId());
- if (rule.getState() == null) {
- rule.setState(oldVersion.getState());
- } else if (rule.getState() != oldVersion.getState()) {
- throw new IncorrectParameterException("Use Activate/Suspend method to control state of the rule!");
- }
- } else {
- if (rule.getState() == null) {
- rule.setState(ComponentLifecycleState.SUSPENDED);
- } else if (rule.getState() != ComponentLifecycleState.SUSPENDED) {
- throw new IncorrectParameterException("Use Activate/Suspend method to control state of the rule!");
- }
- }
-
- validateFilters(rule.getFilters());
- if (rule.getProcessor() != null && !rule.getProcessor().isNull()) {
- validateComponentJson(rule.getProcessor(), ComponentType.PROCESSOR);
- }
- if (rule.getAction() != null && !rule.getAction().isNull()) {
- validateComponentJson(rule.getAction(), ComponentType.ACTION);
- }
- validateRuleAndPluginState(rule);
- return ruleDao.save(rule);
- }
-
- private void validateFilters(JsonNode filtersJson) {
- if (filtersJson == null || filtersJson.isNull()) {
- throw new IncorrectParameterException("Rule filters are required!");
- }
- if (!filtersJson.isArray()) {
- throw new IncorrectParameterException("Filters json is not an array!");
- }
- ArrayNode filtersArray = (ArrayNode) filtersJson;
- for (int i = 0; i < filtersArray.size(); i++) {
- validateComponentJson(filtersArray.get(i), ComponentType.FILTER);
- }
- }
-
- private void validateComponentJson(JsonNode json, ComponentType type) {
- if (json == null || json.isNull()) {
- throw new IncorrectParameterException(type.name() + " is required!");
- }
- String clazz = getIfValid(type.name(), json, "clazz", JsonNode::isTextual, JsonNode::asText);
- String name = getIfValid(type.name(), json, "name", JsonNode::isTextual, JsonNode::asText);
- JsonNode configuration = getIfValid(type.name(), json, "configuration", JsonNode::isObject, node -> node);
- ComponentDescriptor descriptor = componentDescriptorService.findByClazz(clazz);
- if (descriptor == null) {
- throw new IncorrectParameterException(type.name() + " clazz " + clazz + " is not a valid component!");
- }
- if (descriptor.getType() != type) {
- throw new IncorrectParameterException("Clazz " + clazz + " is not a valid " + type.name() + " component!");
- }
- if (!componentDescriptorService.validate(descriptor, configuration)) {
- throw new IncorrectParameterException(type.name() + " configuration is not valid!");
- }
+ throw new RuntimeException("Not supported since v1.5!");
}
private void validateRuleAndPluginState(RuleMetaData rule) {
diff --git a/dao/src/main/java/org/thingsboard/server/dao/rule/RuleChainService.java b/dao/src/main/java/org/thingsboard/server/dao/rule/RuleChainService.java
index e5f2840..da7833d 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/rule/RuleChainService.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/rule/RuleChainService.java
@@ -42,6 +42,8 @@ public interface RuleChainService {
RuleChain findRuleChainById(RuleChainId ruleChainId);
+ RuleNode findRuleNodeById(RuleNodeId ruleNodeId);
+
ListenableFuture<RuleChain> findRuleChainByIdAsync(RuleChainId ruleChainId);
RuleChain getRootTenantRuleChain(TenantId tenantId);
diff --git a/dao/src/main/resources/cassandra/schema.cql b/dao/src/main/resources/cassandra/schema.cql
index 42c13f3..d0e62b2 100644
--- a/dao/src/main/resources/cassandra/schema.cql
+++ b/dao/src/main/resources/cassandra/schema.cql
@@ -669,6 +669,7 @@ CREATE TABLE IF NOT EXISTS thingsboard.rule_chain (
search_text text,
first_rule_node_id uuid,
root boolean,
+ debug_mode boolean,
configuration text,
additional_info text,
PRIMARY KEY (id, tenant_id)
@@ -685,6 +686,7 @@ CREATE TABLE IF NOT EXISTS thingsboard.rule_node (
id uuid,
type text,
name text,
+ debug_mode boolean,
search_text text,
configuration text,
additional_info text,
diff --git a/dao/src/main/resources/sql/schema.sql b/dao/src/main/resources/sql/schema.sql
index 106204a..d7a0978 100644
--- a/dao/src/main/resources/sql/schema.sql
+++ b/dao/src/main/resources/sql/schema.sql
@@ -263,6 +263,7 @@ CREATE TABLE IF NOT EXISTS rule_chain (
name varchar(255),
first_rule_node_id varchar(31),
root boolean,
+ debug_mode boolean,
search_text varchar(255),
tenant_id varchar(31)
);
@@ -273,5 +274,6 @@ CREATE TABLE IF NOT EXISTS rule_node (
configuration varchar(10000000),
type varchar(255),
name varchar(255),
+ debug_mode boolean,
search_text varchar(255)
);
diff --git a/dao/src/test/java/org/thingsboard/server/dao/service/AbstractServiceTest.java b/dao/src/test/java/org/thingsboard/server/dao/service/AbstractServiceTest.java
index d083a90..44a1a09 100644
--- a/dao/src/test/java/org/thingsboard/server/dao/service/AbstractServiceTest.java
+++ b/dao/src/test/java/org/thingsboard/server/dao/service/AbstractServiceTest.java
@@ -217,10 +217,10 @@ public abstract class AbstractServiceTest {
ruleMetaData.setWeight(weight);
ruleMetaData.setPluginToken(pluginToken);
- ruleMetaData.setAction(createNode(ComponentScope.TENANT, ComponentType.ACTION,
+ ruleMetaData.setAction(createNode(ComponentScope.TENANT, ComponentType.OLD_ACTION,
"org.thingsboard.component.ActionTest", "TestJsonDescriptor.json", "TestJsonData.json"));
- ruleMetaData.setProcessor(createNode(ComponentScope.TENANT, ComponentType.PROCESSOR,
- "org.thingsboard.component.ProcessorTest", "TestJsonDescriptor.json", "TestJsonData.json"));
+// ruleMetaData.setProcessor(createNode(ComponentScope.TENANT, ComponentType.PROCESSOR,
+// "org.thingsboard.component.ProcessorTest", "TestJsonDescriptor.json", "TestJsonData.json"));
ruleMetaData.setFilters(mapper.createArrayNode().add(
createNode(ComponentScope.TENANT, ComponentType.FILTER,
"org.thingsboard.component.FilterTest", "TestJsonDescriptor.json", "TestJsonData.json")
diff --git a/dao/src/test/java/org/thingsboard/server/dao/service/queue/cassandra/repository/impl/CassandraMsgRepositoryTest.java b/dao/src/test/java/org/thingsboard/server/dao/service/queue/cassandra/repository/impl/CassandraMsgRepositoryTest.java
index d17e1f2..99c6a91 100644
--- a/dao/src/test/java/org/thingsboard/server/dao/service/queue/cassandra/repository/impl/CassandraMsgRepositoryTest.java
+++ b/dao/src/test/java/org/thingsboard/server/dao/service/queue/cassandra/repository/impl/CassandraMsgRepositoryTest.java
@@ -24,6 +24,7 @@ import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.msg.TbMsg;
+import org.thingsboard.server.common.msg.TbMsgDataType;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.dao.service.AbstractServiceTest;
import org.thingsboard.server.dao.service.DaoNoSqlTest;
@@ -44,7 +45,7 @@ public class CassandraMsgRepositoryTest extends AbstractServiceTest {
@Test
public void msgCanBeSavedAndRead() throws ExecutionException, InterruptedException {
- TbMsg msg = new TbMsg(UUIDs.timeBased(), "type", new DeviceId(UUIDs.timeBased()), null, new byte[4]);
+ TbMsg msg = new TbMsg(UUIDs.timeBased(), "type", new DeviceId(UUIDs.timeBased()), null, TbMsgDataType.JSON, new byte[4]);
UUID nodeId = UUIDs.timeBased();
ListenableFuture<Void> future = msgRepository.save(msg, nodeId, 1L, 1L, 1L);
future.get();
@@ -54,7 +55,7 @@ public class CassandraMsgRepositoryTest extends AbstractServiceTest {
@Test
public void expiredMsgsAreNotReturned() throws ExecutionException, InterruptedException {
- TbMsg msg = new TbMsg(UUIDs.timeBased(), "type", new DeviceId(UUIDs.timeBased()), null, new byte[4]);
+ TbMsg msg = new TbMsg(UUIDs.timeBased(), "type", new DeviceId(UUIDs.timeBased()), null, TbMsgDataType.JSON, new byte[4]);
UUID nodeId = UUIDs.timeBased();
ListenableFuture<Void> future = msgRepository.save(msg, nodeId, 2L, 2L, 2L);
future.get();
@@ -67,7 +68,7 @@ public class CassandraMsgRepositoryTest extends AbstractServiceTest {
TbMsgMetaData metaData = new TbMsgMetaData();
metaData.putValue("key", "value");
String dataStr = "someContent";
- TbMsg msg = new TbMsg(UUIDs.timeBased(), "type", new DeviceId(UUIDs.timeBased()), metaData, dataStr.getBytes());
+ TbMsg msg = new TbMsg(UUIDs.timeBased(), "type", new DeviceId(UUIDs.timeBased()), metaData, TbMsgDataType.JSON, dataStr.getBytes());
UUID nodeId = UUIDs.timeBased();
ListenableFuture<Void> future = msgRepository.save(msg, nodeId, 1L, 1L, 1L);
future.get();
diff --git a/dao/src/test/java/org/thingsboard/server/dao/service/queue/cassandra/UnprocessedMsgFilterTest.java b/dao/src/test/java/org/thingsboard/server/dao/service/queue/cassandra/UnprocessedMsgFilterTest.java
index 6302e63..3935c9b 100644
--- a/dao/src/test/java/org/thingsboard/server/dao/service/queue/cassandra/UnprocessedMsgFilterTest.java
+++ b/dao/src/test/java/org/thingsboard/server/dao/service/queue/cassandra/UnprocessedMsgFilterTest.java
@@ -33,8 +33,8 @@ public class UnprocessedMsgFilterTest {
public void acknowledgedMsgsAreFilteredOut() {
UUID id1 = UUID.randomUUID();
UUID id2 = UUID.randomUUID();
- TbMsg msg1 = new TbMsg(id1, "T", null, null, null);
- TbMsg msg2 = new TbMsg(id2, "T", null, null, null);
+ TbMsg msg1 = new TbMsg(id1, "T", null, null, null, null);
+ TbMsg msg2 = new TbMsg(id2, "T", null, null, null, null);
List<TbMsg> msgs = Lists.newArrayList(msg1, msg2);
List<MsgAck> acks = Lists.newArrayList(new MsgAck(id2, UUID.randomUUID(), 1L, 1L));
Collection<TbMsg> actual = msgFilter.filter(msgs, acks);
pom.xml 5(+5 -0)
diff --git a/pom.xml b/pom.xml
index f331e32..f0c915a 100755
--- a/pom.xml
+++ b/pom.xml
@@ -379,6 +379,11 @@
<version>${project.version}</version>
</dependency>
<dependency>
+ <groupId>org.thingsboard.rule-engine</groupId>
+ <artifactId>rule-engine-components</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
<groupId>org.thingsboard.common</groupId>
<artifactId>message</artifactId>
<version>${project.version}</version>
diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/ActionNode.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/ActionNode.java
new file mode 100644
index 0000000..64e28f1
--- /dev/null
+++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/ActionNode.java
@@ -0,0 +1,43 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.rule.engine.api;
+
+import org.thingsboard.server.common.data.plugin.ComponentScope;
+import org.thingsboard.server.extensions.api.component.EmptyComponentConfiguration;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * @author Andrew Shvayka
+ */
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.TYPE)
+public @interface ActionNode {
+
+ String name();
+
+ ComponentScope scope() default ComponentScope.TENANT;
+
+ String descriptor() default "EmptyNodeDescriptor.json";
+
+ String[] relationTypes() default {"Success","Failure"};
+
+ boolean customRelations() default false;
+
+}
diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/EnrichmentNode.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/EnrichmentNode.java
new file mode 100644
index 0000000..2267bda
--- /dev/null
+++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/EnrichmentNode.java
@@ -0,0 +1,42 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.rule.engine.api;
+
+import org.thingsboard.server.common.data.plugin.ComponentScope;
+import org.thingsboard.server.extensions.api.component.EmptyComponentConfiguration;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * @author Andrew Shvayka
+ */
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.TYPE)
+public @interface EnrichmentNode {
+
+ String name();
+
+ ComponentScope scope() default ComponentScope.TENANT;
+
+ String descriptor() default "EmptyNodeDescriptor.json";
+
+ String[] relationTypes() default {"Success","Failure"};
+
+ boolean customRelations() default false;
+}
diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/FilterNode.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/FilterNode.java
new file mode 100644
index 0000000..5247e39
--- /dev/null
+++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/FilterNode.java
@@ -0,0 +1,43 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.rule.engine.api;
+
+import org.thingsboard.server.common.data.plugin.ComponentScope;
+import org.thingsboard.server.extensions.api.component.EmptyComponentConfiguration;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * @author Andrew Shvayka
+ */
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.TYPE)
+public @interface FilterNode {
+
+ String name();
+
+ ComponentScope scope() default ComponentScope.TENANT;
+
+ String descriptor() default "EmptyNodeDescriptor.json";
+
+ String[] relationTypes() default {"Success","Failure"};
+
+ boolean customRelations() default false;
+
+}
diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java
index fdcf56a..260a51f 100644
--- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java
+++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java
@@ -51,7 +51,7 @@ public interface TbContext {
void spawn(TbMsg msg);
- void ack(UUID msg);
+ void ack(TbMsg msg);
void tellError(TbMsg msg, Throwable th);
@@ -61,8 +61,6 @@ public interface TbContext {
UserService getUserService();
- RuleService getRuleService();
-
PluginService getPluginService();
AssetService getAssetService();
diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbNodeConfiguration.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbNodeConfiguration.java
index d06c0d2..64053cd 100644
--- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbNodeConfiguration.java
+++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbNodeConfiguration.java
@@ -22,8 +22,8 @@ import lombok.Data;
* Created by ashvayka on 19.01.18.
*/
@Data
-public class TbNodeConfiguration {
+public final class TbNodeConfiguration {
- private JsonNode data;
+ private final JsonNode data;
}
diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbNodeState.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbNodeState.java
index c48b11d..2c77a69 100644
--- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbNodeState.java
+++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbNodeState.java
@@ -18,5 +18,5 @@ package org.thingsboard.rule.engine.api;
/**
* Created by ashvayka on 19.01.18.
*/
-public class TbNodeState {
+public final class TbNodeState {
}
diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TransformationNode.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TransformationNode.java
new file mode 100644
index 0000000..bfe1dca
--- /dev/null
+++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TransformationNode.java
@@ -0,0 +1,43 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.rule.engine.api;
+
+import org.thingsboard.server.common.data.plugin.ComponentScope;
+import org.thingsboard.server.extensions.api.component.EmptyComponentConfiguration;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * @author Andrew Shvayka
+ */
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.TYPE)
+public @interface TransformationNode {
+
+ String name();
+
+ ComponentScope scope() default ComponentScope.TENANT;
+
+ String descriptor() default "EmptyNodeDescriptor.json";
+
+ String[] relationTypes() default {"Success","Failure"};
+
+ boolean customRelations() default false;
+
+}
diff --git a/rule-engine/rule-engine-api/src/main/resources/EmptyNodeDescriptor.json b/rule-engine/rule-engine-api/src/main/resources/EmptyNodeDescriptor.json
new file mode 100644
index 0000000..7a73a41
--- /dev/null
+++ b/rule-engine/rule-engine-api/src/main/resources/EmptyNodeDescriptor.json
@@ -0,0 +1,2 @@
+{
+}
\ No newline at end of file
diff --git a/rule-engine/rule-engine-components/pom.xml b/rule-engine/rule-engine-components/pom.xml
index 9b903b1..a97493b 100644
--- a/rule-engine/rule-engine-components/pom.xml
+++ b/rule-engine/rule-engine-components/pom.xml
@@ -88,11 +88,6 @@
<artifactId>mockito-all</artifactId>
<scope>test</scope>
</dependency>
- <dependency>
- <groupId>org.junit.jupiter</groupId>
- <artifactId>junit-jupiter-api</artifactId>
- <version>RELEASE</version>
- </dependency>
<!--<dependency>-->
<!--<groupId>org.springframework.boot</groupId>-->
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetAttributesNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetAttributesNode.java
index 887311c..90eadcb 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetAttributesNode.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetAttributesNode.java
@@ -21,9 +21,13 @@ import com.google.common.util.concurrent.ListenableFuture;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.thingsboard.rule.engine.TbNodeUtils;
-import org.thingsboard.rule.engine.api.*;
+import org.thingsboard.rule.engine.api.TbContext;
+import org.thingsboard.rule.engine.api.TbNodeConfiguration;
+import org.thingsboard.rule.engine.api.TbNodeException;
+import org.thingsboard.rule.engine.api.TbNodeState;
+import org.thingsboard.rule.engine.api.TbNode;
+import org.thingsboard.rule.engine.api.EnrichmentNode;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
-import org.thingsboard.server.common.data.kv.KvEntry;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.msg.TbMsg;
@@ -36,6 +40,7 @@ import static org.thingsboard.server.common.data.DataConstants.*;
* Created by ashvayka on 19.01.18.
*/
@Slf4j
+@EnrichmentNode(name = "Get Attributes Node")
public class TbGetAttributesNode implements TbNode {
private TbGetAttributesNodeConfiguration config;
@@ -61,21 +66,22 @@ public class TbGetAttributesNode implements TbNode {
}
}
- private ListenableFuture<Void> putAttr(TbMsg msg, List<KvEntry> attributes, String prefix) {
- attributes.forEach(r -> msg.getMetaData().putValue(prefix + r.getKey(), r.getValueAsString()));
- return Futures.immediateFuture(null);
- }
-
- private ListenableFuture<Void> putAttrAsync(TbContext ctx, TbMsg msg, String scope, List<String> attributes, String prefix) {
- ListenableFuture<List<AttributeKvEntry>> latest = ctx.getAttributesService().find(msg.getOriginator(), scope, attributes);
+ private ListenableFuture<Void> putAttrAsync(TbContext ctx, TbMsg msg, String scope, List<String> keys, String prefix) {
+ if (keys == null) {
+ return Futures.immediateFuture(null);
+ }
+ ListenableFuture<List<AttributeKvEntry>> latest = ctx.getAttributesService().find(msg.getOriginator(), scope, keys);
return Futures.transform(latest, (Function<? super List<AttributeKvEntry>, Void>) l -> {
l.forEach(r -> msg.getMetaData().putValue(prefix + r.getKey(), r.getValueAsString()));
return null;
});
}
- private ListenableFuture<Void> getLatestTelemetry(TbContext ctx, TbMsg msg, List<String> attributes) {
- ListenableFuture<List<TsKvEntry>> latest = ctx.getTimeseriesService().findLatest(msg.getOriginator(), attributes);
+ private ListenableFuture<Void> getLatestTelemetry(TbContext ctx, TbMsg msg, List<String> keys) {
+ if (keys == null) {
+ return Futures.immediateFuture(null);
+ }
+ ListenableFuture<List<TsKvEntry>> latest = ctx.getTimeseriesService().findLatest(msg.getOriginator(), keys);
return Futures.transform(latest, (Function<? super List<TsKvEntry>, Void>) l -> {
l.forEach(r -> msg.getMetaData().putValue(r.getKey(), r.getValueAsString()));
return null;
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetCustomerAttributeNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetCustomerAttributeNode.java
index b7b1fd7..18ddfcf 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetCustomerAttributeNode.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetCustomerAttributeNode.java
@@ -16,11 +16,13 @@
package org.thingsboard.rule.engine.metadata;
import com.google.common.util.concurrent.ListenableFuture;
+import org.thingsboard.rule.engine.api.EnrichmentNode;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.util.EntitiesCustomerIdAsyncLoader;
import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.EntityId;
+@EnrichmentNode(name="Get Customer Attributes Node")
public class TbGetCustomerAttributeNode extends TbEntityGetAttrNode<CustomerId> {
@Override
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetRelatedAttributeNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetRelatedAttributeNode.java
index 474fb5d..3a0dce8 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetRelatedAttributeNode.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetRelatedAttributeNode.java
@@ -21,9 +21,12 @@ import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.rule.engine.api.TbNodeState;
+import org.thingsboard.rule.engine.api.EnrichmentNode;
import org.thingsboard.rule.engine.util.EntitiesRelatedEntityIdAsyncLoader;
+
import org.thingsboard.server.common.data.id.EntityId;
+@EnrichmentNode(name="Get Related Entity Attributes Node")
public class TbGetRelatedAttributeNode extends TbEntityGetAttrNode<EntityId> {
private TbGetRelatedAttrNodeConfiguration config;
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTenantAttributeNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTenantAttributeNode.java
index b97e220..e51c053 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTenantAttributeNode.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTenantAttributeNode.java
@@ -17,12 +17,14 @@ package org.thingsboard.rule.engine.metadata;
import com.google.common.util.concurrent.ListenableFuture;
import lombok.extern.slf4j.Slf4j;
+import org.thingsboard.rule.engine.api.EnrichmentNode;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.util.EntitiesTenantIdAsyncLoader;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
@Slf4j
+@EnrichmentNode(name="Get Tenant Attributes Node")
public class TbGetTenantAttributeNode extends TbEntityGetAttrNode<TenantId> {
@Override
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesTenantIdAsyncLoader.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesTenantIdAsyncLoader.java
index 388881b..5d2aaa8 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesTenantIdAsyncLoader.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesTenantIdAsyncLoader.java
@@ -35,8 +35,6 @@ public class EntitiesTenantIdAsyncLoader {
return getTenantAsync(ctx.getCustomerService().findCustomerByIdAsync((CustomerId) original));
case USER:
return getTenantAsync(ctx.getUserService().findUserByIdAsync((UserId) original));
- case RULE:
- return getTenantAsync(ctx.getRuleService().findRuleByIdAsync((RuleId) original));
case PLUGIN:
return getTenantAsync(ctx.getPluginService().findPluginByIdAsync((PluginId) original));
case ASSET:
diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/filter/TbJsFilterNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/filter/TbJsFilterNodeTest.java
index 2fd9f4e..96f7032 100644
--- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/filter/TbJsFilterNodeTest.java
+++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/filter/TbJsFilterNodeTest.java
@@ -141,8 +141,7 @@ public class TbJsFilterNodeTest {
TbJsFilterNodeConfiguration config = new TbJsFilterNodeConfiguration();
config.setJsScript(script);
ObjectMapper mapper = new ObjectMapper();
- TbNodeConfiguration nodeConfiguration = new TbNodeConfiguration();
- nodeConfiguration.setData(mapper.valueToTree(config));
+ TbNodeConfiguration nodeConfiguration = new TbNodeConfiguration(mapper.valueToTree(config));
node = new TbJsFilterNode();
node.init(nodeConfiguration, null);
diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/filter/TbJsSwitchNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/filter/TbJsSwitchNodeTest.java
index a2f5f7d..e70d4e1 100644
--- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/filter/TbJsSwitchNodeTest.java
+++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/filter/TbJsSwitchNodeTest.java
@@ -138,8 +138,7 @@ public class TbJsSwitchNodeTest {
config.setAllowedRelations(relations);
config.setRouteToAllWithNoCheck(routeToAll);
ObjectMapper mapper = new ObjectMapper();
- TbNodeConfiguration nodeConfiguration = new TbNodeConfiguration();
- nodeConfiguration.setData(mapper.valueToTree(config));
+ TbNodeConfiguration nodeConfiguration = new TbNodeConfiguration(mapper.valueToTree(config));
node = new TbJsSwitchNode();
node.init(nodeConfiguration, null);
diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/TbGetCustomerAttributeNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/TbGetCustomerAttributeNodeTest.java
index 8e5ddb8..ad40f03 100644
--- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/TbGetCustomerAttributeNodeTest.java
+++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/TbGetCustomerAttributeNodeTest.java
@@ -85,8 +85,7 @@ public class TbGetCustomerAttributeNodeTest {
config.setAttrMapping(attrMapping);
config.setTelemetry(false);
ObjectMapper mapper = new ObjectMapper();
- TbNodeConfiguration nodeConfiguration = new TbNodeConfiguration();
- nodeConfiguration.setData(mapper.valueToTree(config));
+ TbNodeConfiguration nodeConfiguration = new TbNodeConfiguration(mapper.valueToTree(config));
node = new TbGetCustomerAttributeNode();
node.init(nodeConfiguration, null);
@@ -224,8 +223,7 @@ public class TbGetCustomerAttributeNodeTest {
config.setAttrMapping(attrMapping);
config.setTelemetry(true);
ObjectMapper mapper = new ObjectMapper();
- TbNodeConfiguration nodeConfiguration = new TbNodeConfiguration();
- nodeConfiguration.setData(mapper.valueToTree(config));
+ TbNodeConfiguration nodeConfiguration = new TbNodeConfiguration(mapper.valueToTree(config));
node = new TbGetCustomerAttributeNode();
node.init(nodeConfiguration, null);
diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbChangeOriginatorNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbChangeOriginatorNodeTest.java
index 77b00fb..190692c 100644
--- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbChangeOriginatorNodeTest.java
+++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbChangeOriginatorNodeTest.java
@@ -116,8 +116,7 @@ public class TbChangeOriginatorNodeTest {
config.setOriginatorSource(TbChangeOriginatorNode.CUSTOMER_SOURCE);
config.setStartNewChain(startNewChain);
ObjectMapper mapper = new ObjectMapper();
- TbNodeConfiguration nodeConfiguration = new TbNodeConfiguration();
- nodeConfiguration.setData(mapper.valueToTree(config));
+ TbNodeConfiguration nodeConfiguration = new TbNodeConfiguration(mapper.valueToTree(config));
node = new TbChangeOriginatorNode();
node.init(nodeConfiguration, null);
diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbTransformMsgNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbTransformMsgNodeTest.java
index 876e70f..d69bad8 100644
--- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbTransformMsgNodeTest.java
+++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbTransformMsgNodeTest.java
@@ -111,8 +111,7 @@ public class TbTransformMsgNodeTest {
TbTransformMsgNodeConfiguration config = new TbTransformMsgNodeConfiguration();
config.setJsScript(script);
ObjectMapper mapper = new ObjectMapper();
- TbNodeConfiguration nodeConfiguration = new TbNodeConfiguration();
- nodeConfiguration.setData(mapper.valueToTree(config));
+ TbNodeConfiguration nodeConfiguration = new TbNodeConfiguration(mapper.valueToTree(config));
node = new TbTransformMsgNode();
node.init(nodeConfiguration, null);