thingsboard-memoizeit
Changes
application/pom.xml 4(+4 -0)
application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java 63(+28 -35)
application/src/main/java/org/thingsboard/server/actors/rule/ChainProcessingContext.java 117(+0 -117)
application/src/main/java/org/thingsboard/server/actors/rule/ChainProcessingMetaData.java 41(+0 -41)
application/src/main/java/org/thingsboard/server/actors/rule/RuleActorMessageProcessor.java 345(+0 -345)
application/src/main/java/org/thingsboard/server/actors/rule/RuleProcessingContext.java 115(+0 -115)
application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java 33(+28 -5)
application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainManagerActor.java 15(+15 -0)
application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainToRuleNodeMsg.java 15(+15 -0)
application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActorMessageProcessor.java 40(+27 -13)
application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeToRuleChainTellNextMsg.java 15(+15 -0)
application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeToSelfErrorMsg.java 24(+16 -8)
application/src/main/java/org/thingsboard/server/actors/shared/rule/SystemRuleManager.java 40(+0 -40)
application/src/main/java/org/thingsboard/server/actors/shared/rule/TenantRuleManager.java 48(+0 -48)
application/src/main/java/org/thingsboard/server/actors/shared/rulechain/RuleChainManager.java 15(+15 -0)
application/src/main/java/org/thingsboard/server/actors/shared/rulechain/TenantRuleChainManager.java 11(+4 -7)
application/src/main/java/org/thingsboard/server/service/component/AnnotationComponentDiscoveryService.java 49(+34 -15)
application/src/test/java/org/thingsboard/server/controller/AbstractRuleEngineControllerTest.java 41(+41 -0)
application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java 156(+156 -0)
application/src/test/java/org/thingsboard/server/rules/flow/RuleEngineFlowSqlIntegrationTest.java 10(+8 -2)
common/message/src/main/java/org/thingsboard/server/common/msg/plugin/ComponentLifecycleMsg.java 8(+4 -4)
common/message/src/main/java/org/thingsboard/server/common/msg/system/ServiceToRuleEngineMsg.java 15(+15 -0)
dao/src/test/java/org/thingsboard/server/dao/service/queue/cassandra/repository/impl/CassandraMsgRepositoryTest.java 7(+4 -3)
dao/src/test/java/org/thingsboard/server/dao/service/queue/cassandra/UnprocessedMsgFilterTest.java 4(+2 -2)
pom.xml 5(+5 -0)
rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/ActionNode.java 43(+43 -0)
rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/EnrichmentNode.java 31(+21 -10)
rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/FilterNode.java 34(+22 -12)
rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbNodeConfiguration.java 4(+2 -2)
rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TransformationNode.java 33(+20 -13)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetAttributesNode.java 26(+17 -9)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetCustomerAttributeNode.java 2(+2 -0)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetRelatedAttributeNode.java 6(+2 -4)
Details
application/pom.xml 4(+4 -0)
diff --git a/application/pom.xml b/application/pom.xml
index 95322fa..6da40a9 100644
--- a/application/pom.xml
+++ b/application/pom.xml
@@ -58,6 +58,10 @@
<artifactId>rule-engine-api</artifactId>
</dependency>
<dependency>
+ <groupId>org.thingsboard.rule-engine</groupId>
+ <artifactId>rule-engine-components</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.thingsboard</groupId>
<artifactId>extensions-core</artifactId>
</dependency>
diff --git a/application/src/main/data/upgrade/1.5.0/schema_update.cql b/application/src/main/data/upgrade/1.5.0/schema_update.cql
index aa8b10b..ab68846 100644
--- a/application/src/main/data/upgrade/1.5.0/schema_update.cql
+++ b/application/src/main/data/upgrade/1.5.0/schema_update.cql
@@ -69,6 +69,7 @@ CREATE TABLE IF NOT EXISTS thingsboard.rule_chain (
search_text text,
first_rule_node_id uuid,
root boolean,
+ debug_mode boolean,
configuration text,
additional_info text,
PRIMARY KEY (id, tenant_id)
@@ -85,6 +86,7 @@ CREATE TABLE IF NOT EXISTS thingsboard.rule_node (
id uuid,
type text,
name text,
+ debug_mode boolean,
search_text text,
configuration text,
additional_info text,
diff --git a/application/src/main/data/upgrade/1.5.0/schema_update.sql b/application/src/main/data/upgrade/1.5.0/schema_update.sql
index 0043ed5..2bed6ad 100644
--- a/application/src/main/data/upgrade/1.5.0/schema_update.sql
+++ b/application/src/main/data/upgrade/1.5.0/schema_update.sql
@@ -21,6 +21,7 @@ CREATE TABLE IF NOT EXISTS rule_chain (
name varchar(255),
first_rule_node_id varchar(31),
root boolean,
+ debug_mode boolean,
search_text varchar(255),
tenant_id varchar(31)
);
@@ -31,5 +32,6 @@ CREATE TABLE IF NOT EXISTS rule_node (
configuration varchar(10000000),
type varchar(255),
name varchar(255),
+ debug_mode boolean,
search_text varchar(255)
);
\ No newline at end of file
diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
index 68d5547..bbe7771 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
@@ -1,12 +1,12 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
- *
+ * <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -28,12 +28,14 @@ import lombok.Setter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
+import org.springframework.util.Base64Utils;
import org.thingsboard.server.actors.service.ActorService;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.Event;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
+import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.cluster.ServerAddress;
import org.thingsboard.server.common.transport.auth.DeviceAuthService;
import org.thingsboard.server.controller.plugin.PluginWebSocketMsgEndpoint;
@@ -50,6 +52,7 @@ import org.thingsboard.server.dao.rule.RuleChainService;
import org.thingsboard.server.dao.rule.RuleService;
import org.thingsboard.server.dao.tenant.TenantService;
import org.thingsboard.server.dao.timeseries.TimeseriesService;
+import org.thingsboard.server.dao.user.UserService;
import org.thingsboard.server.service.cluster.discovery.DiscoveryService;
import org.thingsboard.server.service.cluster.routing.ClusterRoutingService;
import org.thingsboard.server.service.cluster.rpc.ClusterRpcService;
@@ -57,6 +60,7 @@ import org.thingsboard.server.service.component.ComponentDiscoveryService;
import java.io.PrintWriter;
import java.io.StringWriter;
+import java.nio.charset.StandardCharsets;
import java.util.Optional;
@Component
@@ -65,107 +69,154 @@ public class ActorSystemContext {
protected final ObjectMapper mapper = new ObjectMapper();
- @Getter @Setter private ActorService actorService;
+ @Getter
+ @Setter
+ private ActorService actorService;
@Autowired
- @Getter private DiscoveryService discoveryService;
+ @Getter
+ private DiscoveryService discoveryService;
@Autowired
- @Getter @Setter private ComponentDiscoveryService componentService;
+ @Getter
+ @Setter
+ private ComponentDiscoveryService componentService;
@Autowired
- @Getter private ClusterRoutingService routingService;
+ @Getter
+ private ClusterRoutingService routingService;
@Autowired
- @Getter private ClusterRpcService rpcService;
+ @Getter
+ private ClusterRpcService rpcService;
@Autowired
- @Getter private DeviceAuthService deviceAuthService;
+ @Getter
+ private DeviceAuthService deviceAuthService;
@Autowired
- @Getter private DeviceService deviceService;
+ @Getter
+ private DeviceService deviceService;
@Autowired
- @Getter private AssetService assetService;
+ @Getter
+ private AssetService assetService;
@Autowired
- @Getter private TenantService tenantService;
+ @Getter
+ private TenantService tenantService;
@Autowired
- @Getter private CustomerService customerService;
+ @Getter
+ private CustomerService customerService;
@Autowired
- @Getter private RuleService ruleService;
+ @Getter
+ private UserService userService;
@Autowired
- @Getter private RuleChainService ruleChainService;
+ @Getter
+ private RuleService ruleService;
@Autowired
- @Getter private PluginService pluginService;
+ @Getter
+ private RuleChainService ruleChainService;
@Autowired
- @Getter private TimeseriesService tsService;
+ @Getter
+ private PluginService pluginService;
@Autowired
- @Getter private AttributesService attributesService;
+ @Getter
+ private TimeseriesService tsService;
@Autowired
- @Getter private EventService eventService;
+ @Getter
+ private AttributesService attributesService;
@Autowired
- @Getter private AlarmService alarmService;
+ @Getter
+ private EventService eventService;
@Autowired
- @Getter private RelationService relationService;
+ @Getter
+ private AlarmService alarmService;
@Autowired
- @Getter private AuditLogService auditLogService;
+ @Getter
+ private RelationService relationService;
@Autowired
- @Getter @Setter private PluginWebSocketMsgEndpoint wsMsgEndpoint;
+ @Getter
+ private AuditLogService auditLogService;
+
+ @Autowired
+ @Getter
+ @Setter
+ private PluginWebSocketMsgEndpoint wsMsgEndpoint;
@Value("${actors.session.sync.timeout}")
- @Getter private long syncSessionTimeout;
+ @Getter
+ private long syncSessionTimeout;
@Value("${actors.plugin.termination.delay}")
- @Getter private long pluginActorTerminationDelay;
+ @Getter
+ private long pluginActorTerminationDelay;
@Value("${actors.plugin.processing.timeout}")
- @Getter private long pluginProcessingTimeout;
+ @Getter
+ private long pluginProcessingTimeout;
@Value("${actors.plugin.error_persist_frequency}")
- @Getter private long pluginErrorPersistFrequency;
+ @Getter
+ private long pluginErrorPersistFrequency;
@Value("${actors.rule.chain.error_persist_frequency}")
- @Getter private long ruleChainErrorPersistFrequency;
+ @Getter
+ private long ruleChainErrorPersistFrequency;
@Value("${actors.rule.node.error_persist_frequency}")
- @Getter private long ruleNodeErrorPersistFrequency;
+ @Getter
+ private long ruleNodeErrorPersistFrequency;
@Value("${actors.rule.termination.delay}")
- @Getter private long ruleActorTerminationDelay;
+ @Getter
+ private long ruleActorTerminationDelay;
@Value("${actors.rule.error_persist_frequency}")
- @Getter private long ruleErrorPersistFrequency;
+ @Getter
+ private long ruleErrorPersistFrequency;
@Value("${actors.statistics.enabled}")
- @Getter private boolean statisticsEnabled;
+ @Getter
+ private boolean statisticsEnabled;
@Value("${actors.statistics.persist_frequency}")
- @Getter private long statisticsPersistFrequency;
+ @Getter
+ private long statisticsPersistFrequency;
@Value("${actors.tenant.create_components_on_init}")
- @Getter private boolean tenantComponentsInitEnabled;
+ @Getter
+ private boolean tenantComponentsInitEnabled;
- @Getter @Setter private ActorSystem actorSystem;
+ @Getter
+ @Setter
+ private ActorSystem actorSystem;
- @Getter @Setter private ActorRef appActor;
+ @Getter
+ @Setter
+ private ActorRef appActor;
- @Getter @Setter private ActorRef sessionManagerActor;
+ @Getter
+ @Setter
+ private ActorRef sessionManagerActor;
- @Getter @Setter private ActorRef statsActor;
+ @Getter
+ @Setter
+ private ActorRef statsActor;
- @Getter private final Config config;
+ @Getter
+ private final Config config;
public ActorSystemContext() {
config = ConfigFactory.parseResources(AKKA_CONF_FILE_NAME).withFallback(ConfigFactory.load());
@@ -197,7 +248,7 @@ public class ActorSystemContext {
eventService.save(event);
}
- private String toString(Exception e) {
+ private String toString(Throwable e) {
StringWriter sw = new StringWriter();
e.printStackTrace(new PrintWriter(sw));
return sw.toString();
@@ -217,4 +268,64 @@ public class ActorSystemContext {
private JsonNode toBodyJson(ServerAddress server, String method, String body) {
return mapper.createObjectNode().put("server", server.toString()).put("method", method).put("error", body);
}
+
+ public String getServerAddress() {
+ return discoveryService.getCurrentServer().getServerAddress().toString();
+ }
+
+ public void persistDebugInput(TenantId tenantId, EntityId entityId, TbMsg tbMsg) {
+ persistDebug(tenantId, entityId, "IN", tbMsg, null);
+ }
+
+ public void persistDebugInput(TenantId tenantId, EntityId entityId, TbMsg tbMsg, Throwable error) {
+ persistDebug(tenantId, entityId, "IN", tbMsg, error);
+ }
+
+ public void persistDebugOutput(TenantId tenantId, EntityId entityId, TbMsg tbMsg, Throwable error) {
+ persistDebug(tenantId, entityId, "OUT", tbMsg, error);
+ }
+
+ public void persistDebugOutput(TenantId tenantId, EntityId entityId, TbMsg tbMsg) {
+ persistDebug(tenantId, entityId, "OUT", tbMsg, null);
+ }
+
+ private void persistDebug(TenantId tenantId, EntityId entityId, String type, TbMsg tbMsg, Throwable error) {
+ Event event = new Event();
+ event.setTenantId(tenantId);
+ event.setEntityId(entityId);
+ event.setType(DataConstants.DEBUG);
+
+ ObjectNode node = mapper.createObjectNode()
+ .put("type", type)
+ .put("server", getServerAddress())
+ .put("entityId", tbMsg.getOriginator().getId().toString())
+ .put("entityName", tbMsg.getOriginator().getEntityType().name())
+ .put("msgId", tbMsg.getId().toString())
+ .put("msgType", tbMsg.getType())
+ .put("dataType", tbMsg.getDataType().name());
+
+ ObjectNode mdNode = node.putObject("metadata");
+ tbMsg.getMetaData().getData().forEach(mdNode::put);
+
+ switch (tbMsg.getDataType()) {
+ case BINARY:
+ node.put("data", Base64Utils.encodeUrlSafe(tbMsg.getData()));
+ break;
+ default:
+ node.put("data", new String(tbMsg.getData(), StandardCharsets.UTF_8));
+ break;
+ }
+
+ if (error != null) {
+ node = node.put("error", toString(error));
+ }
+
+ event.setBody(node);
+ eventService.save(event);
+ }
+
+ public static Exception toException(Throwable error) {
+ return Exception.class.isInstance(error) ? (Exception) error : new Exception(error);
+ }
+
}
diff --git a/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java b/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java
index 5e05a60..a75158f 100644
--- a/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java
@@ -1,12 +1,12 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
- * <p>
+ *
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -88,7 +88,7 @@ public class AppActor extends RuleChainManagerActor {
}
@Override
- protected void process(TbActorMsg msg) {
+ protected boolean process(TbActorMsg msg) {
switch (msg.getMsgType()) {
case COMPONENT_LIFE_CYCLE_MSG:
onComponentLifecycleMsg((ComponentLifecycleMsg) msg);
@@ -96,7 +96,10 @@ public class AppActor extends RuleChainManagerActor {
case SERVICE_TO_RULE_ENGINE_MSG:
onServiceToRuleEngineMsg((ServiceToRuleEngineMsg) msg);
break;
+ default:
+ return false;
}
+ return true;
}
private void onServiceToRuleEngineMsg(ServiceToRuleEngineMsg msg) {
diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActor.java
index 861c405..87bc992 100644
--- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActor.java
@@ -18,19 +18,19 @@ package org.thingsboard.server.actors.device;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import org.thingsboard.server.actors.ActorSystemContext;
-import org.thingsboard.server.actors.rule.RulesProcessedMsg;
import org.thingsboard.server.actors.service.ContextAwareActor;
import org.thingsboard.server.actors.service.ContextBasedCreator;
-import org.thingsboard.server.actors.tenant.RuleChainDeviceMsg;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.TenantId;
+import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
import org.thingsboard.server.common.msg.device.ToDeviceActorMsg;
import org.thingsboard.server.extensions.api.device.DeviceAttributesEventNotificationMsg;
import org.thingsboard.server.extensions.api.device.DeviceCredentialsUpdateNotificationMsg;
import org.thingsboard.server.extensions.api.device.DeviceNameOrTypeUpdateMsg;
import org.thingsboard.server.extensions.api.device.ToDeviceActorNotificationMsg;
-import org.thingsboard.server.extensions.api.plugins.msg.*;
+import org.thingsboard.server.extensions.api.plugins.msg.TimeoutMsg;
+import org.thingsboard.server.extensions.api.plugins.msg.ToDeviceRpcRequestPluginMsg;
public class DeviceActor extends ContextAwareActor {
@@ -48,12 +48,17 @@ public class DeviceActor extends ContextAwareActor {
}
@Override
+ protected boolean process(TbActorMsg msg) {
+ return false;
+ }
+
+ @Override
public void onReceive(Object msg) throws Exception {
- if (msg instanceof RuleChainDeviceMsg) {
- processor.process(context(), (RuleChainDeviceMsg) msg);
- } else if (msg instanceof RulesProcessedMsg) {
- processor.onRulesProcessedMsg(context(), (RulesProcessedMsg) msg);
- } else if (msg instanceof ToDeviceActorMsg) {
+// if (msg instanceof RuleChainDeviceMsg) {
+// processor.process(context(), (RuleChainDeviceMsg) msg);
+// } else if (msg instanceof RulesProcessedMsg) {
+// processor.onRulesProcessedMsg(context(), (RulesProcessedMsg) msg);
+ if (msg instanceof ToDeviceActorMsg) {
processor.process(context(), (ToDeviceActorMsg) msg);
} else if (msg instanceof ToDeviceActorNotificationMsg) {
if (msg instanceof DeviceAttributesEventNotificationMsg) {
diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
index 21112bf..3644a49 100644
--- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
@@ -19,9 +19,7 @@ import akka.actor.ActorContext;
import akka.actor.ActorRef;
import akka.event.LoggingAdapter;
import org.thingsboard.server.actors.ActorSystemContext;
-import org.thingsboard.server.actors.rule.*;
import org.thingsboard.server.actors.shared.AbstractContextAwareMsgProcessor;
-import org.thingsboard.server.actors.tenant.RuleChainDeviceMsg;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.id.DeviceId;
@@ -37,15 +35,10 @@ import org.thingsboard.server.common.msg.session.FromDeviceMsg;
import org.thingsboard.server.common.msg.session.MsgType;
import org.thingsboard.server.common.msg.session.SessionType;
import org.thingsboard.server.common.msg.session.ToDeviceMsg;
-import org.thingsboard.server.extensions.api.device.*;
-import org.thingsboard.server.extensions.api.plugins.msg.FromDeviceRpcResponse;
-import org.thingsboard.server.extensions.api.plugins.msg.RpcError;
-import org.thingsboard.server.extensions.api.plugins.msg.TimeoutIntMsg;
-import org.thingsboard.server.extensions.api.plugins.msg.TimeoutMsg;
-import org.thingsboard.server.extensions.api.plugins.msg.ToDeviceRpcRequest;
-import org.thingsboard.server.extensions.api.plugins.msg.ToDeviceRpcRequestBody;
-import org.thingsboard.server.extensions.api.plugins.msg.ToDeviceRpcRequestPluginMsg;
-import org.thingsboard.server.extensions.api.plugins.msg.ToPluginRpcResponseDeviceMsg;
+import org.thingsboard.server.extensions.api.device.DeviceAttributes;
+import org.thingsboard.server.extensions.api.device.DeviceAttributesEventNotificationMsg;
+import org.thingsboard.server.extensions.api.device.DeviceNameOrTypeUpdateMsg;
+import org.thingsboard.server.extensions.api.plugins.msg.*;
import java.util.*;
import java.util.concurrent.ExecutionException;
@@ -230,18 +223,18 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
}
}
- void process(ActorContext context, RuleChainDeviceMsg srcMsg) {
- ChainProcessingMetaData md = new ChainProcessingMetaData(srcMsg.getRuleChain(),
- srcMsg.getToDeviceActorMsg(), new DeviceMetaData(deviceId, deviceName, deviceType, deviceAttributes), context.self());
- ChainProcessingContext ctx = new ChainProcessingContext(md);
- if (ctx.getChainLength() > 0) {
- RuleProcessingMsg msg = new RuleProcessingMsg(ctx);
- ActorRef ruleActorRef = ctx.getCurrentActor();
- ruleActorRef.tell(msg, ActorRef.noSender());
- } else {
- context.self().tell(new RulesProcessedMsg(ctx), context.self());
- }
- }
+// void process(ActorContext context, RuleChainDeviceMsg srcMsg) {
+// ChainProcessingMetaData md = new ChainProcessingMetaData(srcMsg.getRuleChain(),
+// srcMsg.getToDeviceActorMsg(), new DeviceMetaData(deviceId, deviceName, deviceType, deviceAttributes), context.self());
+// ChainProcessingContext ctx = new ChainProcessingContext(md);
+// if (ctx.getChainLength() > 0) {
+// RuleProcessingMsg msg = new RuleProcessingMsg(ctx);
+// ActorRef ruleActorRef = ctx.getCurrentActor();
+// ruleActorRef.tell(msg, ActorRef.noSender());
+// } else {
+// context.self().tell(new RulesProcessedMsg(ctx), context.self());
+// }
+// }
void processRpcResponses(ActorContext context, ToDeviceActorMsg msg) {
SessionId sessionId = msg.getSessionId();
@@ -302,18 +295,18 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
);
}
- void onRulesProcessedMsg(ActorContext context, RulesProcessedMsg msg) {
- ChainProcessingContext ctx = msg.getCtx();
- ToDeviceActorMsg inMsg = ctx.getInMsg();
- SessionId sid = inMsg.getSessionId();
- ToDeviceSessionActorMsg response;
- if (ctx.getResponse() != null) {
- response = new BasicToDeviceSessionActorMsg(ctx.getResponse(), sid);
- } else {
- response = new BasicToDeviceSessionActorMsg(ctx.getError(), sid);
- }
- sendMsgToSessionActor(response, inMsg.getServerAddress());
- }
+// void onRulesProcessedMsg(ActorContext context, RulesProcessedMsg msg) {
+// ChainProcessingContext ctx = msg.getCtx();
+// ToDeviceActorMsg inMsg = ctx.getInMsg();
+// SessionId sid = inMsg.getSessionId();
+// ToDeviceSessionActorMsg response;
+// if (ctx.getResponse() != null) {
+// response = new BasicToDeviceSessionActorMsg(ctx.getResponse(), sid);
+// } else {
+// response = new BasicToDeviceSessionActorMsg(ctx.getError(), sid);
+// }
+// sendMsgToSessionActor(response, inMsg.getServerAddress());
+// }
private void processSubscriptionCommands(ActorContext context, ToDeviceActorMsg msg) {
SessionId sessionId = msg.getSessionId();
diff --git a/application/src/main/java/org/thingsboard/server/actors/plugin/PluginActor.java b/application/src/main/java/org/thingsboard/server/actors/plugin/PluginActor.java
index 265da38..88278f3 100644
--- a/application/src/main/java/org/thingsboard/server/actors/plugin/PluginActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/plugin/PluginActor.java
@@ -23,6 +23,7 @@ import org.thingsboard.server.actors.service.ContextBasedCreator;
import org.thingsboard.server.actors.stats.StatsPersistTick;
import org.thingsboard.server.common.data.id.PluginId;
import org.thingsboard.server.common.data.id.TenantId;
+import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
import org.thingsboard.server.extensions.api.plugins.msg.TimeoutMsg;
@@ -41,6 +42,12 @@ public class PluginActor extends ComponentActor<PluginId, PluginActorMessageProc
}
@Override
+ protected boolean process(TbActorMsg msg) {
+ //TODO Move everything here, to work with TbActorMsg
+ return false;
+ }
+
+ @Override
public void onReceive(Object msg) throws Exception {
if (msg instanceof PluginWebsocketMsg) {
onWebsocketMsg((PluginWebsocketMsg<?>) msg);
diff --git a/application/src/main/java/org/thingsboard/server/actors/rpc/RpcManagerActor.java b/application/src/main/java/org/thingsboard/server/actors/rpc/RpcManagerActor.java
index 9290a8f..ba20013 100644
--- a/application/src/main/java/org/thingsboard/server/actors/rpc/RpcManagerActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/rpc/RpcManagerActor.java
@@ -23,6 +23,7 @@ import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.actors.service.ContextAwareActor;
import org.thingsboard.server.actors.service.ContextBasedCreator;
import org.thingsboard.server.actors.service.DefaultActorService;
+import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
import org.thingsboard.server.common.msg.cluster.ServerAddress;
import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
@@ -57,6 +58,12 @@ public class RpcManagerActor extends ContextAwareActor {
}
@Override
+ protected boolean process(TbActorMsg msg) {
+ //TODO Move everything here, to work with TbActorMsg
+ return false;
+ }
+
+ @Override
public void onReceive(Object msg) throws Exception {
if (msg instanceof RpcSessionTellMsg) {
onMsg((RpcSessionTellMsg) msg);
diff --git a/application/src/main/java/org/thingsboard/server/actors/rpc/RpcSessionActor.java b/application/src/main/java/org/thingsboard/server/actors/rpc/RpcSessionActor.java
index db029fa..a187444 100644
--- a/application/src/main/java/org/thingsboard/server/actors/rpc/RpcSessionActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/rpc/RpcSessionActor.java
@@ -23,6 +23,7 @@ import io.grpc.stub.StreamObserver;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.actors.service.ContextAwareActor;
import org.thingsboard.server.actors.service.ContextBasedCreator;
+import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.cluster.ServerAddress;
import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
import org.thingsboard.server.gen.cluster.ClusterRpcServiceGrpc;
@@ -48,6 +49,12 @@ public class RpcSessionActor extends ContextAwareActor {
}
@Override
+ protected boolean process(TbActorMsg msg) {
+ //TODO Move everything here, to work with TbActorMsg
+ return false;
+ }
+
+ @Override
public void onReceive(Object msg) throws Exception {
if (msg instanceof RpcSessionTellMsg) {
tell((RpcSessionTellMsg) msg);
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
index 0cd7601..a1344aa 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
@@ -1,10 +1,34 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ * <p>
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.thingsboard.server.actors.ruleChain;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.cluster.ServerAddress;
+import org.thingsboard.server.dao.alarm.AlarmService;
+import org.thingsboard.server.dao.asset.AssetService;
import org.thingsboard.server.dao.attributes.AttributesService;
+import org.thingsboard.server.dao.customer.CustomerService;
+import org.thingsboard.server.dao.device.DeviceService;
+import org.thingsboard.server.dao.plugin.PluginService;
+import org.thingsboard.server.dao.relation.RelationService;
+import org.thingsboard.server.dao.rule.RuleChainService;
+import org.thingsboard.server.dao.timeseries.TimeseriesService;
+import org.thingsboard.server.dao.user.UserService;
/**
* Created by ashvayka on 19.03.18.
@@ -26,27 +50,30 @@ class DefaultTbContext implements TbContext {
@Override
public void tellNext(TbMsg msg, String relationType) {
- nodeCtx.getChainActor().tell(new RuleNodeToRuleChainTellNextMsg(nodeCtx.getSelfId(), relationType, msg), nodeCtx.getSelf());
+ if (nodeCtx.getSelf().isDebugMode()) {
+ mainCtx.persistDebugOutput(nodeCtx.getTenantId(), nodeCtx.getSelf().getId(), msg);
+ }
+ nodeCtx.getChainActor().tell(new RuleNodeToRuleChainTellNextMsg(nodeCtx.getSelf().getId(), relationType, msg), nodeCtx.getSelfActor());
}
@Override
public void tellSelf(TbMsg msg, long delayMs) {
-
+ throw new RuntimeException("Not Implemented!");
}
@Override
public void tellOthers(TbMsg msg) {
-
+ throw new RuntimeException("Not Implemented!");
}
@Override
public void tellSibling(TbMsg msg, ServerAddress address) {
-
+ throw new RuntimeException("Not Implemented!");
}
@Override
public void spawn(TbMsg msg) {
-
+ throw new RuntimeException("Not Implemented!");
}
@Override
@@ -55,7 +82,60 @@ class DefaultTbContext implements TbContext {
}
@Override
+ public void tellError(TbMsg msg, Throwable th) {
+ if (nodeCtx.getSelf().isDebugMode()) {
+ mainCtx.persistDebugOutput(nodeCtx.getTenantId(), nodeCtx.getSelf().getId(), msg, th);
+ }
+ nodeCtx.getSelfActor().tell(new RuleNodeToSelfErrorMsg(msg, th), nodeCtx.getSelfActor());
+ }
+
+ @Override
public AttributesService getAttributesService() {
return mainCtx.getAttributesService();
}
+
+ @Override
+ public CustomerService getCustomerService() {
+ return mainCtx.getCustomerService();
+ }
+
+ @Override
+ public UserService getUserService() {
+ return mainCtx.getUserService();
+ }
+
+ @Override
+ public PluginService getPluginService() {
+ return mainCtx.getPluginService();
+ }
+
+ @Override
+ public AssetService getAssetService() {
+ return mainCtx.getAssetService();
+ }
+
+ @Override
+ public DeviceService getDeviceService() {
+ return mainCtx.getDeviceService();
+ }
+
+ @Override
+ public AlarmService getAlarmService() {
+ return mainCtx.getAlarmService();
+ }
+
+ @Override
+ public RuleChainService getRuleChainService() {
+ return mainCtx.getRuleChainService();
+ }
+
+ @Override
+ public TimeseriesService getTimeseriesService() {
+ return mainCtx.getTsService();
+ }
+
+ @Override
+ public RelationService getRelationService() {
+ return mainCtx.getRelationService();
+ }
}
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActor.java
index 20ea05d..9c8de22 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActor.java
@@ -1,12 +1,12 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
- * <p>
+ *
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -33,7 +33,7 @@ public class RuleChainActor extends ComponentActor<RuleChainId, RuleChainActorMe
}
@Override
- protected void process(TbActorMsg msg) {
+ protected boolean process(TbActorMsg msg) {
switch (msg.getMsgType()) {
case COMPONENT_LIFE_CYCLE_MSG:
onComponentLifecycleMsg((ComponentLifecycleMsg) msg);
@@ -44,7 +44,10 @@ public class RuleChainActor extends ComponentActor<RuleChainId, RuleChainActorMe
case RULE_TO_RULE_CHAIN_TELL_NEXT_MSG:
processor.onTellNext((RuleNodeToRuleChainTellNextMsg) msg);
break;
+ default:
+ return false;
}
+ return true;
}
public static class ActorCreator extends ContextBasedCreator<RuleChainActor> {
@@ -69,4 +72,6 @@ public class RuleChainActor extends ComponentActor<RuleChainId, RuleChainActorMe
protected long getErrorPersistFrequency() {
return systemContext.getRuleChainErrorPersistFrequency();
}
+
+ //TODO: failover strategy
}
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
index 5c946bd..3f6bed7 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
@@ -27,6 +27,7 @@ import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.RuleChainId;
import org.thingsboard.server.common.data.id.RuleNodeId;
import org.thingsboard.server.common.data.id.TenantId;
+import org.thingsboard.server.common.data.plugin.ComponentLifecycleState;
import org.thingsboard.server.common.data.relation.EntityRelation;
import org.thingsboard.server.common.data.rule.RuleChain;
import org.thingsboard.server.common.data.rule.RuleNode;
@@ -53,6 +54,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
private RuleNodeId firstId;
private RuleNodeCtx firstNode;
+ private ComponentLifecycleState state;
RuleChainActorMessageProcessor(TenantId tenantId, RuleChainId ruleChainId, ActorSystemContext systemContext
, LoggingAdapter logger, ActorRef parent, ActorRef self) {
@@ -66,6 +68,9 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
@Override
public void start(ActorContext context) throws Exception {
+ if (state == ComponentLifecycleState.ACTIVE) {
+ return;
+ }
RuleChain ruleChain = service.findRuleChainById(entityId);
List<RuleNode> ruleNodeList = service.getRuleChainNodes(entityId);
// Creating and starting the actors;
@@ -74,8 +79,8 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
DefaultActorService.SYSTEM_RULE_DISPATCHER_NAME : DefaultActorService.TENANT_RULE_DISPATCHER_NAME;
ActorRef ruleNodeActor = context.actorOf(
Props.create(new RuleNodeActor.ActorCreator(systemContext, tenantId, entityId, ruleNode.getId()))
- .withDispatcher(dispatcherName), ruleNode.toString());
- nodeActors.put(ruleNode.getId(), new RuleNodeCtx(self, ruleNodeActor, ruleNode.getId()));
+ .withDispatcher(dispatcherName), ruleNode.getId().toString());
+ nodeActors.put(ruleNode.getId(), new RuleNodeCtx(tenantId, self, ruleNodeActor, ruleNode));
}
// Populating the routes map;
for (RuleNode ruleNode : ruleNodeList) {
@@ -94,13 +99,15 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
firstId = ruleChain.getFirstRuleNodeId();
firstNode = nodeActors.get(ruleChain.getFirstRuleNodeId());
+ state = ComponentLifecycleState.ACTIVE;
}
@Override
public void stop(ActorContext context) throws Exception {
- nodeActors.values().stream().map(RuleNodeCtx::getSelf).forEach(context::stop);
+ nodeActors.values().stream().map(RuleNodeCtx::getSelfActor).forEach(context::stop);
nodeActors.clear();
nodeRoutes.clear();
+ state = ComponentLifecycleState.SUSPENDED;
}
@Override
@@ -109,23 +116,33 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
}
void onServiceToRuleEngineMsg(ServiceToRuleEngineMsg envelope) {
+ checkActive();
TbMsg tbMsg = envelope.getTbMsg();
//TODO: push to queue and act on ack in async way
pushMstToNode(firstNode, tbMsg);
}
void onTellNext(RuleNodeToRuleChainTellNextMsg envelope) {
+ checkActive();
RuleNodeId originator = envelope.getOriginator();
String targetRelationType = envelope.getRelationType();
//TODO: log debug output
List<RuleNodeRelation> relations = nodeRoutes.get(originator);
+ if (relations == null) {
+ return;
+ }
+ boolean copy = relations.size() > 1;
for (RuleNodeRelation relation : relations) {
+ TbMsg msg = envelope.getMsg();
+ if (copy) {
+ msg = msg.copy();
+ }
if (targetRelationType == null || targetRelationType.equalsIgnoreCase(relation.getType())) {
switch (relation.getOut().getEntityType()) {
case RULE_NODE:
RuleNodeId targetRuleNodeId = new RuleNodeId(relation.getOut().getId());
RuleNodeCtx targetRuleNode = nodeActors.get(targetRuleNodeId);
- pushMstToNode(targetRuleNode, envelope.getMsg());
+ pushMstToNode(targetRuleNode, msg);
break;
case RULE_CHAIN:
// TODO: implement
@@ -137,7 +154,13 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
private void pushMstToNode(RuleNodeCtx nodeCtx, TbMsg msg) {
//TODO: log debug input
- firstNode.getSelf().tell(new RuleChainToRuleNodeMsg(new DefaultTbContext(systemContext, nodeCtx), msg), self);
+ firstNode.getSelfActor().tell(new RuleChainToRuleNodeMsg(new DefaultTbContext(systemContext, nodeCtx), msg), self);
+ }
+
+ private void checkActive() {
+ if (state != ComponentLifecycleState.ACTIVE) {
+ throw new IllegalStateException("Rule chain is not active!");
+ }
}
}
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainManagerActor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainManagerActor.java
index cb76f04..940bd5b 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainManagerActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainManagerActor.java
@@ -1,3 +1,18 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.thingsboard.server.actors.ruleChain;
import akka.actor.ActorRef;
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainToRuleNodeMsg.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainToRuleNodeMsg.java
index d6e8262..7640210 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainToRuleNodeMsg.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainToRuleNodeMsg.java
@@ -1,3 +1,18 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.thingsboard.server.actors.ruleChain;
import lombok.Data;
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActor.java
index cba19d1..f77735a 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActor.java
@@ -35,14 +35,34 @@ public class RuleNodeActor extends ComponentActor<RuleNodeId, RuleNodeActorMessa
}
@Override
- protected void process(TbActorMsg msg) {
+ protected boolean process(TbActorMsg msg) {
switch (msg.getMsgType()) {
case RULE_CHAIN_TO_RULE_MSG:
- processor.onRuleChainToRuleNodeMsg((RuleChainToRuleNodeMsg) msg);
+ onRuleChainToRuleNodeMsg((RuleChainToRuleNodeMsg) msg);
break;
+ case RULE_TO_SELF_ERROR_MSG:
+ onRuleNodeToSelfErrorMsg((RuleNodeToSelfErrorMsg) msg);
+ break;
+ default:
+ return false;
+ }
+ return true;
+ }
+
+ private void onRuleChainToRuleNodeMsg(RuleChainToRuleNodeMsg msg) {
+ logger.debug("[{}] Going to process rule msg: {}", id, msg.getMsg());
+ try {
+ processor.onRuleChainToRuleNodeMsg(msg);
+ increaseMessagesProcessedCount();
+ } catch (Exception e) {
+ logAndPersist("onRuleMsg", e);
}
}
+ private void onRuleNodeToSelfErrorMsg(RuleNodeToSelfErrorMsg msg) {
+ logAndPersist("onRuleMsg", ActorSystemContext.toException(msg.getError()));
+ }
+
public static class ActorCreator extends ContextBasedCreator<RuleNodeActor> {
private static final long serialVersionUID = 1L;
@@ -69,4 +89,6 @@ public class RuleNodeActor extends ComponentActor<RuleNodeId, RuleNodeActorMessa
return systemContext.getRuleNodeErrorPersistFrequency();
}
+ //TODO: failover strategy
+
}
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActorMessageProcessor.java
index 99e48d8..536437a 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActorMessageProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActorMessageProcessor.java
@@ -17,28 +17,28 @@ package org.thingsboard.server.actors.ruleChain;
import akka.actor.ActorContext;
import akka.actor.ActorRef;
-import akka.actor.Props;
import akka.event.LoggingAdapter;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.springframework.util.Base64Utils;
+import org.thingsboard.rule.engine.api.TbNode;
+import org.thingsboard.rule.engine.api.TbNodeConfiguration;
+import org.thingsboard.rule.engine.api.TbNodeState;
import org.thingsboard.server.actors.ActorSystemContext;
-import org.thingsboard.server.actors.service.DefaultActorService;
import org.thingsboard.server.actors.shared.ComponentMsgProcessor;
-import org.thingsboard.server.common.data.EntityType;
-import org.thingsboard.server.common.data.id.EntityId;
+import org.thingsboard.server.common.data.DataConstants;
+import org.thingsboard.server.common.data.Event;
import org.thingsboard.server.common.data.id.RuleChainId;
import org.thingsboard.server.common.data.id.RuleNodeId;
import org.thingsboard.server.common.data.id.TenantId;
-import org.thingsboard.server.common.data.relation.EntityRelation;
-import org.thingsboard.server.common.data.rule.RuleChain;
import org.thingsboard.server.common.data.rule.RuleNode;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
-import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg;
import org.thingsboard.server.dao.rule.RuleChainService;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.nio.charset.StandardCharsets;
/**
* @author Andrew Shvayka
@@ -48,6 +48,8 @@ public class RuleNodeActorMessageProcessor extends ComponentMsgProcessor<RuleNod
private final ActorRef parent;
private final ActorRef self;
private final RuleChainService service;
+ private RuleNode ruleNode;
+ private TbNode tbNode;
RuleNodeActorMessageProcessor(TenantId tenantId, RuleChainId ruleChainId, RuleNodeId ruleNodeId, ActorSystemContext systemContext
, LoggingAdapter logger, ActorRef parent, ActorRef self) {
@@ -55,15 +57,17 @@ public class RuleNodeActorMessageProcessor extends ComponentMsgProcessor<RuleNod
this.parent = parent;
this.self = self;
this.service = systemContext.getRuleChainService();
+ this.ruleNode = systemContext.getRuleChainService().findRuleNodeById(entityId);
}
@Override
public void start(ActorContext context) throws Exception {
-
+ tbNode = initComponent(ruleNode);
}
@Override
public void stop(ActorContext context) throws Exception {
+ tbNode.destroy();
}
@Override
@@ -71,8 +75,18 @@ public class RuleNodeActorMessageProcessor extends ComponentMsgProcessor<RuleNod
}
- void onRuleChainToRuleNodeMsg(RuleChainToRuleNodeMsg msg) {
+ void onRuleChainToRuleNodeMsg(RuleChainToRuleNodeMsg msg) throws Exception {
+ if (ruleNode.isDebugMode()) {
+ systemContext.persistDebugInput(tenantId, entityId, msg.getMsg());
+ }
+ tbNode.onMsg(msg.getCtx(), msg.getMsg());
+ }
+ private TbNode initComponent(RuleNode ruleNode) throws Exception {
+ Class<?> componentClazz = Class.forName(ruleNode.getType());
+ TbNode tbNode = (TbNode) (componentClazz.newInstance());
+ tbNode.init(new TbNodeConfiguration(ruleNode.getConfiguration()), new TbNodeState());
+ return tbNode;
}
}
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeCtx.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeCtx.java
index dd25f8b..90c67c2 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeCtx.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeCtx.java
@@ -1,15 +1,33 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.thingsboard.server.actors.ruleChain;
import akka.actor.ActorRef;
import lombok.Data;
import org.thingsboard.server.common.data.id.RuleNodeId;
+import org.thingsboard.server.common.data.id.TenantId;
+import org.thingsboard.server.common.data.rule.RuleNode;
/**
* Created by ashvayka on 19.03.18.
*/
@Data
final class RuleNodeCtx {
+ private final TenantId tenantId;
private final ActorRef chainActor;
- private final ActorRef self;
- private final RuleNodeId selfId;
+ private final ActorRef selfActor;
+ private final RuleNode self;
}
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeRelation.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeRelation.java
index bd2d544..7861e54 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeRelation.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeRelation.java
@@ -1,3 +1,18 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.thingsboard.server.actors.ruleChain;
import lombok.Data;
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeToRuleChainTellNextMsg.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeToRuleChainTellNextMsg.java
index 95b3625..054284d 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeToRuleChainTellNextMsg.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeToRuleChainTellNextMsg.java
@@ -1,3 +1,18 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.thingsboard.server.actors.ruleChain;
import lombok.Data;
diff --git a/application/src/main/java/org/thingsboard/server/actors/service/ComponentActor.java b/application/src/main/java/org/thingsboard/server/actors/service/ComponentActor.java
index d0260dd..6aa68d3 100644
--- a/application/src/main/java/org/thingsboard/server/actors/service/ComponentActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/service/ComponentActor.java
@@ -141,7 +141,6 @@ public abstract class ComponentActor<T extends EntityId, P extends ComponentMsgP
messagesProcessed++;
}
-
protected void logAndPersist(String method, Exception e) {
logAndPersist(method, e, false);
}
diff --git a/application/src/main/java/org/thingsboard/server/actors/service/ContextAwareActor.java b/application/src/main/java/org/thingsboard/server/actors/service/ContextAwareActor.java
index 9cb0fc4..1d9c671 100644
--- a/application/src/main/java/org/thingsboard/server/actors/service/ContextAwareActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/service/ContextAwareActor.java
@@ -39,12 +39,13 @@ public abstract class ContextAwareActor extends UntypedActor {
logger.debug("Processing msg: {}", msg);
}
if (msg instanceof TbActorMsg) {
- process((TbActorMsg) msg);
- }
- else {
+ if(!process((TbActorMsg) msg)){
+ logger.warning("Unknown message: {}!", msg);
+ }
+ } else {
logger.warning("Unknown message: {}!", msg);
}
}
- protected abstract void process(TbActorMsg msg);
+ protected abstract boolean process(TbActorMsg msg);
}
diff --git a/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java b/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java
index 166f37b..fbb5a14 100644
--- a/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java
+++ b/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java
@@ -1,12 +1,12 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
- * <p>
+ *
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
diff --git a/application/src/main/java/org/thingsboard/server/actors/session/SessionActor.java b/application/src/main/java/org/thingsboard/server/actors/session/SessionActor.java
index 37827d6..9d324c5 100644
--- a/application/src/main/java/org/thingsboard/server/actors/session/SessionActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/session/SessionActor.java
@@ -23,6 +23,7 @@ import org.thingsboard.server.actors.service.ContextAwareActor;
import org.thingsboard.server.actors.service.ContextBasedCreator;
import org.thingsboard.server.actors.shared.SessionTimeoutMsg;
import org.thingsboard.server.common.data.id.SessionId;
+import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
import org.thingsboard.server.common.msg.core.ToDeviceSessionActorMsg;
import org.thingsboard.server.common.msg.session.ToDeviceActorSessionMsg;
@@ -61,6 +62,12 @@ public class SessionActor extends ContextAwareActor {
}
@Override
+ protected boolean process(TbActorMsg msg) {
+ //TODO Move everything here, to work with TbActorMsg
+ return false;
+ }
+
+ @Override
public void onReceive(Object msg) throws Exception {
logger.debug("[{}] Processing: {}.", sessionId, msg);
if (msg instanceof ToDeviceActorSessionMsg) {
diff --git a/application/src/main/java/org/thingsboard/server/actors/session/SessionManagerActor.java b/application/src/main/java/org/thingsboard/server/actors/session/SessionManagerActor.java
index 9d67dab..b5b1791 100644
--- a/application/src/main/java/org/thingsboard/server/actors/session/SessionManagerActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/session/SessionManagerActor.java
@@ -26,6 +26,7 @@ import org.thingsboard.server.actors.service.ContextBasedCreator;
import org.thingsboard.server.actors.service.DefaultActorService;
import org.thingsboard.server.actors.shared.SessionTimeoutMsg;
import org.thingsboard.server.common.data.id.SessionId;
+import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.aware.SessionAwareMsg;
import akka.event.Logging;
@@ -49,6 +50,12 @@ public class SessionManagerActor extends ContextAwareActor {
}
@Override
+ protected boolean process(TbActorMsg msg) {
+ //TODO Move everything here, to work with TbActorMsg
+ return false;
+ }
+
+ @Override
public void onReceive(Object msg) throws Exception {
if (msg instanceof SessionCtrlMsg) {
onSessionCtrlMsg((SessionCtrlMsg) msg);
diff --git a/application/src/main/java/org/thingsboard/server/actors/shared/EntityActorsManager.java b/application/src/main/java/org/thingsboard/server/actors/shared/EntityActorsManager.java
index 917a645..d4a1f34 100644
--- a/application/src/main/java/org/thingsboard/server/actors/shared/EntityActorsManager.java
+++ b/application/src/main/java/org/thingsboard/server/actors/shared/EntityActorsManager.java
@@ -1,3 +1,18 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.thingsboard.server.actors.shared;
import akka.actor.ActorContext;
diff --git a/application/src/main/java/org/thingsboard/server/actors/shared/plugin/PluginManager.java b/application/src/main/java/org/thingsboard/server/actors/shared/plugin/PluginManager.java
index 5142de2..3345e5f 100644
--- a/application/src/main/java/org/thingsboard/server/actors/shared/plugin/PluginManager.java
+++ b/application/src/main/java/org/thingsboard/server/actors/shared/plugin/PluginManager.java
@@ -1,12 +1,12 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
- * <p>
+ *
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
diff --git a/application/src/main/java/org/thingsboard/server/actors/shared/rulechain/RuleChainManager.java b/application/src/main/java/org/thingsboard/server/actors/shared/rulechain/RuleChainManager.java
index 97acb6c..ff0c52e 100644
--- a/application/src/main/java/org/thingsboard/server/actors/shared/rulechain/RuleChainManager.java
+++ b/application/src/main/java/org/thingsboard/server/actors/shared/rulechain/RuleChainManager.java
@@ -1,3 +1,18 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.thingsboard.server.actors.shared.rulechain;
import akka.actor.ActorRef;
diff --git a/application/src/main/java/org/thingsboard/server/actors/shared/rulechain/TenantRuleChainManager.java b/application/src/main/java/org/thingsboard/server/actors/shared/rulechain/TenantRuleChainManager.java
index aa2a631..731d8d8 100644
--- a/application/src/main/java/org/thingsboard/server/actors/shared/rulechain/TenantRuleChainManager.java
+++ b/application/src/main/java/org/thingsboard/server/actors/shared/rulechain/TenantRuleChainManager.java
@@ -1,12 +1,12 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
- * <p>
+ *
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -18,12 +18,9 @@ package org.thingsboard.server.actors.shared.rulechain;
import akka.actor.ActorContext;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.actors.service.DefaultActorService;
-import org.thingsboard.server.actors.shared.rule.RuleManager;
import org.thingsboard.server.common.data.id.TenantId;
-import org.thingsboard.server.common.data.page.PageDataIterable;
import org.thingsboard.server.common.data.page.PageDataIterable.FetchFunction;
import org.thingsboard.server.common.data.rule.RuleChain;
-import org.thingsboard.server.common.data.rule.RuleMetaData;
public class TenantRuleChainManager extends RuleChainManager {
diff --git a/application/src/main/java/org/thingsboard/server/actors/stats/StatsActor.java b/application/src/main/java/org/thingsboard/server/actors/stats/StatsActor.java
index ccc31cc..8623370 100644
--- a/application/src/main/java/org/thingsboard/server/actors/stats/StatsActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/stats/StatsActor.java
@@ -24,6 +24,7 @@ import org.thingsboard.server.actors.service.ContextAwareActor;
import org.thingsboard.server.actors.service.ContextBasedCreator;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.Event;
+import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.cluster.ServerAddress;
public class StatsActor extends ContextAwareActor {
@@ -36,6 +37,12 @@ public class StatsActor extends ContextAwareActor {
}
@Override
+ protected boolean process(TbActorMsg msg) {
+ //TODO Move everything here, to work with TbActorMsg\
+ return false;
+ }
+
+ @Override
public void onReceive(Object msg) throws Exception {
logger.debug("Received message: {}", msg);
if (msg instanceof StatsPersistMsg) {
diff --git a/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java b/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java
index a51e7a2..d53c054 100644
--- a/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java
@@ -1,12 +1,12 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
- * <p>
+ *
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -62,7 +62,7 @@ public class TenantActor extends RuleChainManagerActor {
}
@Override
- protected void process(TbActorMsg msg) {
+ protected boolean process(TbActorMsg msg) {
switch (msg.getMsgType()) {
case COMPONENT_LIFE_CYCLE_MSG:
onComponentLifecycleMsg((ComponentLifecycleMsg) msg);
@@ -70,7 +70,10 @@ public class TenantActor extends RuleChainManagerActor {
case SERVICE_TO_RULE_ENGINE_MSG:
onServiceToRuleEngineMsg((ServiceToRuleEngineMsg) msg);
break;
+ default:
+ return false;
}
+ return true;
}
private void onServiceToRuleEngineMsg(ServiceToRuleEngineMsg msg) {
diff --git a/application/src/main/java/org/thingsboard/server/controller/RuleChainController.java b/application/src/main/java/org/thingsboard/server/controller/RuleChainController.java
index 93891ac..c6befdd 100644
--- a/application/src/main/java/org/thingsboard/server/controller/RuleChainController.java
+++ b/application/src/main/java/org/thingsboard/server/controller/RuleChainController.java
@@ -78,7 +78,7 @@ public class RuleChainController extends BaseController {
ruleChain.setTenantId(getCurrentUser().getTenantId());
RuleChain savedRuleChain = checkNotNull(ruleChainService.saveRuleChain(ruleChain));
- actorService.onEntityStateChange(ruleChain.getTenantId(), ruleChain.getId(),
+ actorService.onEntityStateChange(ruleChain.getTenantId(), savedRuleChain.getId(),
created ? ComponentLifecycleEvent.CREATED : ComponentLifecycleEvent.UPDATED);
logEntityAction(savedRuleChain.getId(), savedRuleChain,
diff --git a/application/src/main/java/org/thingsboard/server/service/component/AnnotationComponentDiscoveryService.java b/application/src/main/java/org/thingsboard/server/service/component/AnnotationComponentDiscoveryService.java
index 0a6081d..910b459 100644
--- a/application/src/main/java/org/thingsboard/server/service/component/AnnotationComponentDiscoveryService.java
+++ b/application/src/main/java/org/thingsboard/server/service/component/AnnotationComponentDiscoveryService.java
@@ -26,6 +26,10 @@ import org.springframework.context.annotation.ClassPathScanningCandidateComponen
import org.springframework.core.env.Environment;
import org.springframework.core.type.filter.AnnotationTypeFilter;
import org.springframework.stereotype.Service;
+import org.thingsboard.rule.engine.api.ActionNode;
+import org.thingsboard.rule.engine.api.EnrichmentNode;
+import org.thingsboard.rule.engine.api.FilterNode;
+import org.thingsboard.rule.engine.api.TransformationNode;
import org.thingsboard.server.common.data.plugin.ComponentDescriptor;
import org.thingsboard.server.common.data.plugin.ComponentType;
import org.thingsboard.server.dao.component.ComponentDescriptorService;
@@ -79,8 +83,7 @@ public class AnnotationComponentDiscoveryService implements ComponentDiscoverySe
private List<ComponentDescriptor> persist(Set<BeanDefinition> filterDefs, ComponentType type) {
List<ComponentDescriptor> result = new ArrayList<>();
for (BeanDefinition def : filterDefs) {
- ComponentDescriptor scannedComponent = scanAndPersistComponent(def, type);
- result.add(scannedComponent);
+ result.add(scanAndPersistComponent(def, type));
}
return result;
}
@@ -93,24 +96,36 @@ public class AnnotationComponentDiscoveryService implements ComponentDiscoverySe
Class<?> clazz = Class.forName(clazzName);
String descriptorResourceName;
switch (type) {
+ case ENRICHMENT:
+ EnrichmentNode enrichmentAnnotation = clazz.getAnnotation(EnrichmentNode.class);
+ scannedComponent.setName(enrichmentAnnotation.name());
+ scannedComponent.setScope(enrichmentAnnotation.scope());
+ descriptorResourceName = enrichmentAnnotation.descriptor();
+ break;
case FILTER:
- Filter filterAnnotation = clazz.getAnnotation(Filter.class);
+ FilterNode filterAnnotation = clazz.getAnnotation(FilterNode.class);
scannedComponent.setName(filterAnnotation.name());
scannedComponent.setScope(filterAnnotation.scope());
descriptorResourceName = filterAnnotation.descriptor();
break;
- case PROCESSOR:
- Processor processorAnnotation = clazz.getAnnotation(Processor.class);
- scannedComponent.setName(processorAnnotation.name());
- scannedComponent.setScope(processorAnnotation.scope());
- descriptorResourceName = processorAnnotation.descriptor();
+ case TRANSFORMATION:
+ TransformationNode trAnnotation = clazz.getAnnotation(TransformationNode.class);
+ scannedComponent.setName(trAnnotation.name());
+ scannedComponent.setScope(trAnnotation.scope());
+ descriptorResourceName = trAnnotation.descriptor();
break;
case ACTION:
- Action actionAnnotation = clazz.getAnnotation(Action.class);
+ ActionNode actionAnnotation = clazz.getAnnotation(ActionNode.class);
scannedComponent.setName(actionAnnotation.name());
scannedComponent.setScope(actionAnnotation.scope());
descriptorResourceName = actionAnnotation.descriptor();
break;
+ case OLD_ACTION:
+ Action oldActionAnnotation = clazz.getAnnotation(Action.class);
+ scannedComponent.setName(oldActionAnnotation.name());
+ scannedComponent.setScope(oldActionAnnotation.scope());
+ descriptorResourceName = oldActionAnnotation.descriptor();
+ break;
case PLUGIN:
Plugin pluginAnnotation = clazz.getAnnotation(Plugin.class);
scannedComponent.setName(pluginAnnotation.name());
@@ -122,12 +137,12 @@ public class AnnotationComponentDiscoveryService implements ComponentDiscoverySe
log.error("Can't initialize plugin {}, due to missing action {}!", def.getBeanClassName(), actionClazz.getName());
return new ClassNotFoundException("Action: " + actionClazz.getName() + "is missing!");
});
- if (actionComponent.getType() != ComponentType.ACTION) {
+ if (actionComponent.getType() != ComponentType.OLD_ACTION) {
log.error("Plugin {} action {} has wrong component type!", def.getBeanClassName(), actionClazz.getName(), actionComponent.getType());
throw new RuntimeException("Plugin " + def.getBeanClassName() + "action " + actionClazz.getName() + " has wrong component type!");
}
}
- scannedComponent.setActions(Arrays.stream(pluginAnnotation.actions()).map(action -> action.getName()).collect(Collectors.joining(",")));
+ scannedComponent.setActions(Arrays.stream(pluginAnnotation.actions()).map(Class::getName).collect(Collectors.joining(",")));
break;
default:
throw new RuntimeException(type + " is not supported yet!");
@@ -168,11 +183,15 @@ public class AnnotationComponentDiscoveryService implements ComponentDiscoverySe
@Override
public void discoverComponents() {
- registerComponents(ComponentType.FILTER, Filter.class);
+ registerComponents(ComponentType.ENRICHMENT, EnrichmentNode.class);
+
+ registerComponents(ComponentType.FILTER, FilterNode.class);
+
+ registerComponents(ComponentType.TRANSFORMATION, TransformationNode.class);
- registerComponents(ComponentType.PROCESSOR, Processor.class);
+ registerComponents(ComponentType.ACTION, ActionNode.class);
- registerComponents(ComponentType.ACTION, Action.class);
+ registerComponents(ComponentType.OLD_ACTION, Action.class);
registerComponents(ComponentType.PLUGIN, Plugin.class);
@@ -199,7 +218,7 @@ public class AnnotationComponentDiscoveryService implements ComponentDiscoverySe
}
List<ComponentDescriptor> result = new ArrayList<>();
for (String action : plugin.getActions().split(",")) {
- getComponent(action).ifPresent(v -> result.add(v));
+ getComponent(action).ifPresent(result::add);
}
return result;
} else {
diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml
index 778c2a3..2758521 100644
--- a/application/src/main/resources/thingsboard.yml
+++ b/application/src/main/resources/thingsboard.yml
@@ -62,7 +62,7 @@ cluster:
# Plugins configuration parameters
plugins:
# Comma seperated package list used during classpath scanning for plugins
- scan_packages: "${PLUGINS_SCAN_PACKAGES:org.thingsboard.server.extensions}"
+ scan_packages: "${PLUGINS_SCAN_PACKAGES:org.thingsboard.server.extensions,org.thingsboard.rule.engine}"
# JWT Token parameters
security.jwt:
@@ -215,12 +215,12 @@ actors:
termination.delay: "${ACTORS_RULE_TERMINATION_DELAY:30000}"
# Errors for particular actor are persisted once per specified amount of milliseconds
error_persist_frequency: "${ACTORS_RULE_ERROR_FREQUENCY:3000}"
- chain:
- # Errors for particular actor are persisted once per specified amount of milliseconds
- error_persist_frequency: "${ACTORS_RULE_CHAIN_ERROR_FREQUENCY:3000}"
- node:
- # Errors for particular actor are persisted once per specified amount of milliseconds
- error_persist_frequency: "${ACTORS_RULE_NODE_ERROR_FREQUENCY:3000}"
+ chain:
+ # Errors for particular actor are persisted once per specified amount of milliseconds
+ error_persist_frequency: "${ACTORS_RULE_CHAIN_ERROR_FREQUENCY:3000}"
+ node:
+ # Errors for particular actor are persisted once per specified amount of milliseconds
+ error_persist_frequency: "${ACTORS_RULE_NODE_ERROR_FREQUENCY:3000}"
statistics:
# Enable/disable actor statistics
enabled: "${ACTORS_STATISTICS_ENABLED:true}"
diff --git a/application/src/test/java/org/thingsboard/server/controller/AbstractControllerTest.java b/application/src/test/java/org/thingsboard/server/controller/AbstractControllerTest.java
index b92e464..3ec4dc8 100644
--- a/application/src/test/java/org/thingsboard/server/controller/AbstractControllerTest.java
+++ b/application/src/test/java/org/thingsboard/server/controller/AbstractControllerTest.java
@@ -96,6 +96,8 @@ import static org.springframework.test.web.servlet.setup.MockMvcBuilders.webAppC
@Slf4j
public abstract class AbstractControllerTest {
+ protected ObjectMapper mapper = new ObjectMapper();
+
protected static final String TEST_TENANT_NAME = "TEST TENANT";
protected static final String SYS_ADMIN_EMAIL = "sysadmin@thingsboard.org";
diff --git a/application/src/test/java/org/thingsboard/server/controller/AbstractRuleEngineControllerTest.java b/application/src/test/java/org/thingsboard/server/controller/AbstractRuleEngineControllerTest.java
new file mode 100644
index 0000000..9aa7574
--- /dev/null
+++ b/application/src/test/java/org/thingsboard/server/controller/AbstractRuleEngineControllerTest.java
@@ -0,0 +1,41 @@
+package org.thingsboard.server.controller;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import org.thingsboard.server.common.data.DataConstants;
+import org.thingsboard.server.common.data.Event;
+import org.thingsboard.server.common.data.id.EntityId;
+import org.thingsboard.server.common.data.id.RuleChainId;
+import org.thingsboard.server.common.data.id.TenantId;
+import org.thingsboard.server.common.data.page.TimePageData;
+import org.thingsboard.server.common.data.page.TimePageLink;
+import org.thingsboard.server.common.data.rule.RuleChain;
+import org.thingsboard.server.common.data.rule.RuleChainMetaData;
+
+/**
+ * Created by ashvayka on 20.03.18.
+ */
+public class AbstractRuleEngineControllerTest extends AbstractControllerTest{
+
+ protected RuleChain saveRuleChain(RuleChain ruleChain) throws Exception {
+ return doPost("/api/ruleChain", ruleChain, RuleChain.class);
+ }
+
+ protected RuleChain getRuleChain(RuleChainId ruleChainId) throws Exception {
+ return doGet("/api/ruleChain/" + ruleChainId.getId().toString(), RuleChain.class);
+ }
+
+ protected RuleChainMetaData saveRuleChainMetaData(RuleChainMetaData ruleChainMD) throws Exception {
+ return doPost("/api/ruleChain/metadata", ruleChainMD, RuleChainMetaData.class);
+ }
+
+ protected RuleChainMetaData getRuleChainMetaData(RuleChainId ruleChainId) throws Exception {
+ return doGet("/api/ruleChain/metadata/" + ruleChainId.getId().toString(), RuleChainMetaData.class);
+ }
+
+ protected TimePageData<Event> getDebugEvents(TenantId tenantId, EntityId entityId, int limit) throws Exception {
+ TimePageLink pageLink = new TimePageLink(limit);
+ return doGetTypedWithTimePageLink("/api/events/{entityType}/{entityId}/{eventType}?tenantId={tenantId}&",
+ new TypeReference<TimePageData<Event>>() {
+ }, pageLink, entityId.getEntityType(), entityId.getId(), DataConstants.DEBUG, tenantId.getId());
+ }
+}
diff --git a/application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java b/application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java
new file mode 100644
index 0000000..1f2f709
--- /dev/null
+++ b/application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java
@@ -0,0 +1,156 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ * <p>
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.rules.flow;
+
+import com.datastax.driver.core.utils.UUIDs;
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.thingsboard.rule.engine.metadata.TbGetAttributesNodeConfiguration;
+import org.thingsboard.server.actors.service.ActorService;
+import org.thingsboard.server.common.data.*;
+import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
+import org.thingsboard.server.common.data.kv.StringDataEntry;
+import org.thingsboard.server.common.data.page.TimePageData;
+import org.thingsboard.server.common.data.rule.RuleChain;
+import org.thingsboard.server.common.data.rule.RuleChainMetaData;
+import org.thingsboard.server.common.data.rule.RuleNode;
+import org.thingsboard.server.common.data.security.Authority;
+import org.thingsboard.server.common.msg.TbMsg;
+import org.thingsboard.server.common.msg.TbMsgMetaData;
+import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg;
+import org.thingsboard.server.controller.AbstractRuleEngineControllerTest;
+import org.thingsboard.server.dao.attributes.AttributesService;
+
+import java.util.Collections;
+
+import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
+
+/**
+ * @author Valerii Sosliuk
+ */
+@Slf4j
+public abstract class AbstractRuleEngineFlowIntegrationTest extends AbstractRuleEngineControllerTest {
+
+ private static final String MQTT_URL = "tcp://localhost:1883";
+ private static final Long TIME_TO_HANDLE_REQUEST = 500L;
+
+ private Tenant savedTenant;
+ private User tenantAdmin;
+
+ @Autowired
+ private ActorService actorService;
+
+ @Autowired
+ private AttributesService attributesService;
+
+ @Before
+ public void beforeTest() throws Exception {
+ loginSysAdmin();
+
+ Tenant tenant = new Tenant();
+ tenant.setTitle("My tenant");
+ savedTenant = doPost("/api/tenant", tenant, Tenant.class);
+ Assert.assertNotNull(savedTenant);
+
+ tenantAdmin = new User();
+ tenantAdmin.setAuthority(Authority.TENANT_ADMIN);
+ tenantAdmin.setTenantId(savedTenant.getId());
+ tenantAdmin.setEmail("tenant2@thingsboard.org");
+ tenantAdmin.setFirstName("Joe");
+ tenantAdmin.setLastName("Downs");
+
+ createUserAndLogin(tenantAdmin, "testPassword1");
+ }
+
+ @After
+ public void afterTest() throws Exception {
+ loginSysAdmin();
+ if (savedTenant != null) {
+ doDelete("/api/tenant/" + savedTenant.getId().getId().toString()).andExpect(status().isOk());
+ }
+ }
+
+ @Test
+ public void testSimpleRuleChainCreation() throws Exception {
+ // Creating Rule Chain
+ RuleChain ruleChain = new RuleChain();
+ ruleChain.setName("Simple Rule Chain");
+ ruleChain.setTenantId(savedTenant.getId());
+ ruleChain.setRoot(true);
+ ruleChain.setDebugMode(true);
+ ruleChain = saveRuleChain(ruleChain);
+ Assert.assertNull(ruleChain.getFirstRuleNodeId());
+
+ RuleChainMetaData metaData = new RuleChainMetaData();
+ metaData.setRuleChainId(ruleChain.getId());
+
+ RuleNode ruleNode = new RuleNode();
+ ruleNode.setName("Simple Rule Node");
+ ruleNode.setType(org.thingsboard.rule.engine.metadata.TbGetAttributesNode.class.getName());
+ ruleNode.setDebugMode(true);
+ TbGetAttributesNodeConfiguration configuration = new TbGetAttributesNodeConfiguration();
+ configuration.setServerAttributeNames(Collections.singletonList("serverAttributeKey"));
+ ruleNode.setConfiguration(mapper.valueToTree(configuration));
+
+ metaData.setNodes(Collections.singletonList(ruleNode));
+ metaData.setFirstNodeIndex(0);
+
+ metaData = saveRuleChainMetaData(metaData);
+ Assert.assertNotNull(metaData);
+
+ ruleChain = getRuleChain(ruleChain.getId());
+ Assert.assertNotNull(ruleChain.getFirstRuleNodeId());
+
+ // Saving the device
+ Device device = new Device();
+ device.setName("My device");
+ device.setType("default");
+ device = doPost("/api/device", device, Device.class);
+
+ attributesService.save(device.getId(), DataConstants.SERVER_SCOPE,
+ Collections.singletonList(new BaseAttributeKvEntry(new StringDataEntry("serverAttributeKey", "serverAttributeValue"), System.currentTimeMillis())));
+
+ // Pushing Message to the system
+ TbMsg tbMsg = new TbMsg(UUIDs.timeBased(),
+ "CUSTOM",
+ device.getId(),
+ new TbMsgMetaData(),
+ new byte[]{});
+ actorService.onMsg(new ServiceToRuleEngineMsg(savedTenant.getId(), tbMsg));
+
+ Thread.sleep(3000);
+
+ TimePageData<Event> events = getDebugEvents(savedTenant.getId(), ruleChain.getFirstRuleNodeId(), 1000);
+
+ Assert.assertEquals(2, events.getData().size());
+
+ Event inEvent = events.getData().stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.IN)).findFirst().get();
+ Assert.assertEquals(ruleChain.getFirstRuleNodeId(), inEvent.getEntityId());
+ Assert.assertEquals(device.getId().getId().toString(), inEvent.getBody().get("entityId").asText());
+
+ Event outEvent = events.getData().stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.OUT)).findFirst().get();
+ Assert.assertEquals(ruleChain.getFirstRuleNodeId(), outEvent.getEntityId());
+ Assert.assertEquals(device.getId().getId().toString(), outEvent.getBody().get("entityId").asText());
+
+ Assert.assertEquals("serverAttributeValue", outEvent.getBody().get("metadata").get("ss.serverAttributeKey").asText());
+ }
+
+}
diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java b/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java
index 5624ed8..659a242 100644
--- a/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java
+++ b/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java
@@ -37,9 +37,12 @@ public class DataConstants {
public static final String ERROR = "ERROR";
public static final String LC_EVENT = "LC_EVENT";
public static final String STATS = "STATS";
- public static final String RULE_CHAIN_DEBUG = "DEBUG_RULE_CHAIN";
- public static final String RULE_NODE_DEBUG = "DEBUG_RULE_NODE";
+ public static final String DEBUG = "DEBUG";
public static final String ONEWAY = "ONEWAY";
public static final String TWOWAY = "TWOWAY";
+
+ public static final String IN = "IN";
+ public static final String OUT = "OUT";
+
}
diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/plugin/ComponentType.java b/common/data/src/main/java/org/thingsboard/server/common/data/plugin/ComponentType.java
index ab6acca..a103064 100644
--- a/common/data/src/main/java/org/thingsboard/server/common/data/plugin/ComponentType.java
+++ b/common/data/src/main/java/org/thingsboard/server/common/data/plugin/ComponentType.java
@@ -20,6 +20,6 @@ package org.thingsboard.server.common.data.plugin;
*/
public enum ComponentType {
- ENRICHMENT, FILTER, PROCESSOR, TRANSFORMATION, ACTION, PLUGIN
+ ENRICHMENT, FILTER, TRANSFORMATION, ACTION, OLD_ACTION, PLUGIN
}
diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/rule/RuleChain.java b/common/data/src/main/java/org/thingsboard/server/common/data/rule/RuleChain.java
index f2ba0cc..218061a 100644
--- a/common/data/src/main/java/org/thingsboard/server/common/data/rule/RuleChain.java
+++ b/common/data/src/main/java/org/thingsboard/server/common/data/rule/RuleChain.java
@@ -38,6 +38,7 @@ public class RuleChain extends SearchTextBasedWithAdditionalInfo<RuleChainId> im
private String name;
private RuleNodeId firstRuleNodeId;
private boolean root;
+ private boolean debugMode;
private transient JsonNode configuration;
@JsonIgnore
private byte[] configurationBytes;
diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/rule/RuleNode.java b/common/data/src/main/java/org/thingsboard/server/common/data/rule/RuleNode.java
index d044000..fbc1103 100644
--- a/common/data/src/main/java/org/thingsboard/server/common/data/rule/RuleNode.java
+++ b/common/data/src/main/java/org/thingsboard/server/common/data/rule/RuleNode.java
@@ -34,6 +34,7 @@ public class RuleNode extends SearchTextBasedWithAdditionalInfo<RuleNodeId> impl
private String type;
private String name;
+ private boolean debugMode;
private transient JsonNode configuration;
@JsonIgnore
private byte[] configurationBytes;
diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java b/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java
index 22fc428..7c00ee6 100644
--- a/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java
+++ b/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java
@@ -1,3 +1,18 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.thingsboard.server.common.msg;
/**
@@ -34,4 +49,9 @@ public enum MsgType {
*/
RULE_TO_RULE_CHAIN_TELL_NEXT_MSG,
+ /**
+ * Message that is sent by RuleActor implementation to RuleActor itself to log the error.
+ */
+ RULE_TO_SELF_ERROR_MSG,
+
}
diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/plugin/ComponentLifecycleMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/plugin/ComponentLifecycleMsg.java
index 315eb86..c104281 100644
--- a/common/message/src/main/java/org/thingsboard/server/common/msg/plugin/ComponentLifecycleMsg.java
+++ b/common/message/src/main/java/org/thingsboard/server/common/msg/plugin/ComponentLifecycleMsg.java
@@ -1,12 +1,12 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
- * <p>
+ *
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/system/ServiceToRuleEngineMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/system/ServiceToRuleEngineMsg.java
index 3d82033..0792b63 100644
--- a/common/message/src/main/java/org/thingsboard/server/common/msg/system/ServiceToRuleEngineMsg.java
+++ b/common/message/src/main/java/org/thingsboard/server/common/msg/system/ServiceToRuleEngineMsg.java
@@ -1,3 +1,18 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.thingsboard.server.common.msg.system;
import lombok.Data;
diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/TbActorMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/TbActorMsg.java
index e7668ff..c361c11 100644
--- a/common/message/src/main/java/org/thingsboard/server/common/msg/TbActorMsg.java
+++ b/common/message/src/main/java/org/thingsboard/server/common/msg/TbActorMsg.java
@@ -1,3 +1,18 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.thingsboard.server.common.msg;
/**
diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java
index c689b24..524cc5f 100644
--- a/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java
+++ b/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java
@@ -1,12 +1,12 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
- * <p>
+ *
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,6 +17,7 @@ package org.thingsboard.server.common.msg;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
+import lombok.AllArgsConstructor;
import lombok.Data;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.EntityIdFactory;
@@ -30,6 +31,7 @@ import java.util.UUID;
* Created by ashvayka on 13.01.18.
*/
@Data
+@AllArgsConstructor
public final class TbMsg implements Serializable {
private final UUID id;
@@ -39,6 +41,15 @@ public final class TbMsg implements Serializable {
private final TbMsgDataType dataType;
private final byte[] data;
+ public TbMsg(UUID id, String type, EntityId originator, TbMsgMetaData metaData, byte[] data) {
+ this.id = id;
+ this.type = type;
+ this.originator = originator;
+ this.metaData = metaData;
+ this.dataType = TbMsgDataType.JSON;
+ this.data = data;
+ }
+
public static ByteBuffer toBytes(TbMsg msg) {
MsgProtos.TbMsgProto.Builder builder = MsgProtos.TbMsgProto.newBuilder();
builder.setId(msg.getId().toString());
@@ -61,9 +72,7 @@ public final class TbMsg implements Serializable {
public static TbMsg fromBytes(ByteBuffer buffer) {
try {
MsgProtos.TbMsgProto proto = MsgProtos.TbMsgProto.parseFrom(buffer.array());
- TbMsgMetaData metaData = new TbMsgMetaData();
- metaData.setData(proto.getMetaData().getDataMap());
-
+ TbMsgMetaData metaData = new TbMsgMetaData(proto.getMetaData().getDataMap());
EntityId entityId = EntityIdFactory.getByTypeAndId(proto.getEntityType(), proto.getEntityId());
TbMsgDataType dataType = TbMsgDataType.values()[proto.getDataType()];
return new TbMsg(UUID.fromString(proto.getId()), proto.getType(), entityId, metaData, dataType, proto.getData().toByteArray());
@@ -71,4 +80,11 @@ public final class TbMsg implements Serializable {
throw new IllegalStateException("Could not parse protobuf for TbMsg", e);
}
}
+
+ public TbMsg copy() {
+ int dataSize = data.length;
+ byte[] dataCopy = new byte[dataSize];
+ System.arraycopy( data, 0, dataCopy, 0, data.length );
+ return new TbMsg(id, type, originator, metaData.copy(), dataType, dataCopy);
+ }
}
diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsgDataType.java b/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsgDataType.java
index b6e2d5a..2e367e9 100644
--- a/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsgDataType.java
+++ b/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsgDataType.java
@@ -1,3 +1,18 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.thingsboard.server.common.msg;
/**
diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsgMetaData.java b/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsgMetaData.java
index 1bbc792..74a2d3f 100644
--- a/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsgMetaData.java
+++ b/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsgMetaData.java
@@ -1,12 +1,12 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
- *
+ * <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -15,9 +15,12 @@
*/
package org.thingsboard.server.common.msg;
+import lombok.AllArgsConstructor;
import lombok.Data;
+import lombok.NoArgsConstructor;
import java.io.Serializable;
+import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -25,10 +28,15 @@ import java.util.concurrent.ConcurrentHashMap;
* Created by ashvayka on 13.01.18.
*/
@Data
+@NoArgsConstructor
public final class TbMsgMetaData implements Serializable {
private Map<String, String> data = new ConcurrentHashMap<>();
+ TbMsgMetaData(Map<String, String> data) {
+ this.data = data;
+ }
+
public String getValue(String key) {
return data.get(key);
}
@@ -37,4 +45,7 @@ public final class TbMsgMetaData implements Serializable {
data.put(key, value);
}
+ public TbMsgMetaData copy() {
+ return new TbMsgMetaData(new ConcurrentHashMap<>(data));
+ }
}
diff --git a/dao/src/main/java/org/thingsboard/server/dao/model/ModelConstants.java b/dao/src/main/java/org/thingsboard/server/dao/model/ModelConstants.java
index a159b9e..8c34cd3 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/model/ModelConstants.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/model/ModelConstants.java
@@ -332,6 +332,8 @@ public class ModelConstants {
public static final String EVENT_BY_TYPE_AND_ID_VIEW_NAME = "event_by_type_and_id";
public static final String EVENT_BY_ID_VIEW_NAME = "event_by_id";
+ public static final String DEBUG_MODE = "debug_mode";
+
/**
* Cassandra rule chain constants.
*/
diff --git a/dao/src/main/java/org/thingsboard/server/dao/model/nosql/RuleChainEntity.java b/dao/src/main/java/org/thingsboard/server/dao/model/nosql/RuleChainEntity.java
index 34659a8..bfec390 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/model/nosql/RuleChainEntity.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/model/nosql/RuleChainEntity.java
@@ -1,12 +1,12 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
- *
+ * <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -22,6 +22,8 @@ import com.datastax.driver.mapping.annotations.PartitionKey;
import com.datastax.driver.mapping.annotations.Table;
import com.fasterxml.jackson.databind.JsonNode;
import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.Setter;
import lombok.ToString;
import org.thingsboard.server.common.data.id.RuleChainId;
import org.thingsboard.server.common.data.id.RuleNodeId;
@@ -54,6 +56,10 @@ public class RuleChainEntity implements SearchTextEntity<RuleChain> {
private UUID firstRuleNodeId;
@Column(name = RULE_CHAIN_ROOT_PROPERTY)
private boolean root;
+ @Getter
+ @Setter
+ @Column(name = DEBUG_MODE)
+ private boolean debugMode;
@Column(name = RULE_CHAIN_CONFIGURATION_PROPERTY, codec = JsonCodec.class)
private JsonNode configuration;
@Column(name = ADDITIONAL_INFO_PROPERTY, codec = JsonCodec.class)
@@ -71,6 +77,7 @@ public class RuleChainEntity implements SearchTextEntity<RuleChain> {
this.searchText = ruleChain.getName();
this.firstRuleNodeId = DaoUtil.getId(ruleChain.getFirstRuleNodeId());
this.root = ruleChain.isRoot();
+ this.debugMode = ruleChain.isDebugMode();
this.configuration = ruleChain.getConfiguration();
this.additionalInfo = ruleChain.getAdditionalInfo();
}
@@ -157,6 +164,7 @@ public class RuleChainEntity implements SearchTextEntity<RuleChain> {
ruleChain.setFirstRuleNodeId(new RuleNodeId(this.firstRuleNodeId));
}
ruleChain.setRoot(this.root);
+ ruleChain.setDebugMode(this.debugMode);
ruleChain.setConfiguration(this.configuration);
ruleChain.setAdditionalInfo(this.additionalInfo);
return ruleChain;
diff --git a/dao/src/main/java/org/thingsboard/server/dao/model/nosql/RuleNodeEntity.java b/dao/src/main/java/org/thingsboard/server/dao/model/nosql/RuleNodeEntity.java
index ba96e4b..8d3f3c3 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/model/nosql/RuleNodeEntity.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/model/nosql/RuleNodeEntity.java
@@ -21,6 +21,8 @@ import com.datastax.driver.mapping.annotations.PartitionKey;
import com.datastax.driver.mapping.annotations.Table;
import com.fasterxml.jackson.databind.JsonNode;
import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.Setter;
import lombok.ToString;
import org.thingsboard.server.common.data.id.RuleNodeId;
import org.thingsboard.server.common.data.rule.RuleNode;
@@ -49,6 +51,11 @@ public class RuleNodeEntity implements SearchTextEntity<RuleNode> {
private JsonNode configuration;
@Column(name = ADDITIONAL_INFO_PROPERTY, codec = JsonCodec.class)
private JsonNode additionalInfo;
+ @Getter
+ @Setter
+ @Column(name = DEBUG_MODE)
+ private boolean debugMode;
+
public RuleNodeEntity() {
}
@@ -59,6 +66,7 @@ public class RuleNodeEntity implements SearchTextEntity<RuleNode> {
}
this.type = ruleNode.getType();
this.name = ruleNode.getName();
+ this.debugMode = ruleNode.isDebugMode();
this.searchText = ruleNode.getName();
this.configuration = ruleNode.getConfiguration();
this.additionalInfo = ruleNode.getAdditionalInfo();
@@ -126,6 +134,7 @@ public class RuleNodeEntity implements SearchTextEntity<RuleNode> {
ruleNode.setCreatedTime(UUIDs.unixTimestamp(id));
ruleNode.setType(this.type);
ruleNode.setName(this.name);
+ ruleNode.setDebugMode(this.debugMode);
ruleNode.setConfiguration(this.configuration);
ruleNode.setAdditionalInfo(this.additionalInfo);
return ruleNode;
diff --git a/dao/src/main/java/org/thingsboard/server/dao/model/sql/RuleChainEntity.java b/dao/src/main/java/org/thingsboard/server/dao/model/sql/RuleChainEntity.java
index 471ec7b..a48421a 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/model/sql/RuleChainEntity.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/model/sql/RuleChainEntity.java
@@ -58,6 +58,9 @@ public class RuleChainEntity extends BaseSqlEntity<RuleChain> implements SearchT
@Column(name = ModelConstants.RULE_CHAIN_ROOT_PROPERTY)
private boolean root;
+ @Column(name = ModelConstants.DEBUG_MODE)
+ private boolean debugMode;
+
@Type(type = "json")
@Column(name = ModelConstants.RULE_CHAIN_CONFIGURATION_PROPERTY)
private JsonNode configuration;
@@ -80,6 +83,7 @@ public class RuleChainEntity extends BaseSqlEntity<RuleChain> implements SearchT
this.firstRuleNodeId = UUIDConverter.fromTimeUUID(ruleChain.getFirstRuleNodeId().getId());
}
this.root = ruleChain.isRoot();
+ this.debugMode = ruleChain.isDebugMode();
this.configuration = ruleChain.getConfiguration();
this.additionalInfo = ruleChain.getAdditionalInfo();
}
@@ -104,6 +108,7 @@ public class RuleChainEntity extends BaseSqlEntity<RuleChain> implements SearchT
ruleChain.setFirstRuleNodeId(new RuleNodeId(UUIDConverter.fromString(firstRuleNodeId)));
}
ruleChain.setRoot(root);
+ ruleChain.setDebugMode(debugMode);
ruleChain.setConfiguration(configuration);
ruleChain.setAdditionalInfo(additionalInfo);
return ruleChain;
diff --git a/dao/src/main/java/org/thingsboard/server/dao/model/sql/RuleNodeEntity.java b/dao/src/main/java/org/thingsboard/server/dao/model/sql/RuleNodeEntity.java
index d960487..6a888c2 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/model/sql/RuleNodeEntity.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/model/sql/RuleNodeEntity.java
@@ -56,6 +56,9 @@ public class RuleNodeEntity extends BaseSqlEntity<RuleNode> implements SearchTex
@Column(name = ModelConstants.ADDITIONAL_INFO_PROPERTY)
private JsonNode additionalInfo;
+ @Column(name = ModelConstants.DEBUG_MODE)
+ private boolean debugMode;
+
public RuleNodeEntity() {
}
@@ -65,6 +68,7 @@ public class RuleNodeEntity extends BaseSqlEntity<RuleNode> implements SearchTex
}
this.type = ruleNode.getType();
this.name = ruleNode.getName();
+ this.debugMode = ruleNode.isDebugMode();
this.searchText = ruleNode.getName();
this.configuration = ruleNode.getConfiguration();
this.additionalInfo = ruleNode.getAdditionalInfo();
@@ -86,6 +90,7 @@ public class RuleNodeEntity extends BaseSqlEntity<RuleNode> implements SearchTex
ruleNode.setCreatedTime(UUIDs.unixTimestamp(getId()));
ruleNode.setType(type);
ruleNode.setName(name);
+ ruleNode.setDebugMode(debugMode);
ruleNode.setConfiguration(configuration);
ruleNode.setAdditionalInfo(additionalInfo);
return ruleNode;
diff --git a/dao/src/main/java/org/thingsboard/server/dao/queue/QueueBenchmark.java b/dao/src/main/java/org/thingsboard/server/dao/queue/QueueBenchmark.java
index da991fa..bb76b97 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/queue/QueueBenchmark.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/queue/QueueBenchmark.java
@@ -32,6 +32,7 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.thingsboard.server.common.msg.TbMsg;
+import org.thingsboard.server.common.msg.TbMsgDataType;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import javax.annotation.Nullable;
@@ -125,7 +126,7 @@ public class QueueBenchmark implements CommandLineRunner {
TbMsgMetaData metaData = new TbMsgMetaData();
metaData.putValue("key", "value");
String dataStr = "someContent";
- return new TbMsg(UUIDs.timeBased(), "type", null, metaData, dataStr.getBytes());
+ return new TbMsg(UUIDs.timeBased(), "type", null, metaData, TbMsgDataType.JSON, dataStr.getBytes());
}
@Bean
diff --git a/dao/src/main/java/org/thingsboard/server/dao/rule/BaseRuleChainService.java b/dao/src/main/java/org/thingsboard/server/dao/rule/BaseRuleChainService.java
index 04207ec..3daed33 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/rule/BaseRuleChainService.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/rule/BaseRuleChainService.java
@@ -220,6 +220,12 @@ public class BaseRuleChainService extends AbstractEntityService implements RuleC
}
@Override
+ public RuleNode findRuleNodeById(RuleNodeId ruleNodeId) {
+ Validator.validateId(ruleNodeId, "Incorrect rule node id for search request.");
+ return ruleNodeDao.findById(ruleNodeId.getId());
+ }
+
+ @Override
public ListenableFuture<RuleChain> findRuleChainByIdAsync(RuleChainId ruleChainId) {
Validator.validateId(ruleChainId, "Incorrect rule chain id for search request.");
return ruleChainDao.findByIdAsync(ruleChainId.getId());
@@ -325,7 +331,7 @@ public class BaseRuleChainService extends AbstractEntityService implements RuleC
}
if (ruleChain.isRoot()) {
RuleChain rootRuleChain = getRootTenantRuleChain(ruleChain.getTenantId());
- if (ruleChain.getId() == null || !ruleChain.getId().equals(rootRuleChain.getId())) {
+ if (rootRuleChain != null && !rootRuleChain.getId().equals(ruleChain.getId())) {
throw new DataValidationException("Another root rule chain is present in scope of current tenant!");
}
}
diff --git a/dao/src/main/java/org/thingsboard/server/dao/rule/RuleChainService.java b/dao/src/main/java/org/thingsboard/server/dao/rule/RuleChainService.java
index e5f2840..da7833d 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/rule/RuleChainService.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/rule/RuleChainService.java
@@ -42,6 +42,8 @@ public interface RuleChainService {
RuleChain findRuleChainById(RuleChainId ruleChainId);
+ RuleNode findRuleNodeById(RuleNodeId ruleNodeId);
+
ListenableFuture<RuleChain> findRuleChainByIdAsync(RuleChainId ruleChainId);
RuleChain getRootTenantRuleChain(TenantId tenantId);
diff --git a/dao/src/main/resources/cassandra/schema.cql b/dao/src/main/resources/cassandra/schema.cql
index 42c13f3..d0e62b2 100644
--- a/dao/src/main/resources/cassandra/schema.cql
+++ b/dao/src/main/resources/cassandra/schema.cql
@@ -669,6 +669,7 @@ CREATE TABLE IF NOT EXISTS thingsboard.rule_chain (
search_text text,
first_rule_node_id uuid,
root boolean,
+ debug_mode boolean,
configuration text,
additional_info text,
PRIMARY KEY (id, tenant_id)
@@ -685,6 +686,7 @@ CREATE TABLE IF NOT EXISTS thingsboard.rule_node (
id uuid,
type text,
name text,
+ debug_mode boolean,
search_text text,
configuration text,
additional_info text,
diff --git a/dao/src/main/resources/sql/schema.sql b/dao/src/main/resources/sql/schema.sql
index 106204a..d7a0978 100644
--- a/dao/src/main/resources/sql/schema.sql
+++ b/dao/src/main/resources/sql/schema.sql
@@ -263,6 +263,7 @@ CREATE TABLE IF NOT EXISTS rule_chain (
name varchar(255),
first_rule_node_id varchar(31),
root boolean,
+ debug_mode boolean,
search_text varchar(255),
tenant_id varchar(31)
);
@@ -273,5 +274,6 @@ CREATE TABLE IF NOT EXISTS rule_node (
configuration varchar(10000000),
type varchar(255),
name varchar(255),
+ debug_mode boolean,
search_text varchar(255)
);
diff --git a/dao/src/test/java/org/thingsboard/server/dao/service/AbstractServiceTest.java b/dao/src/test/java/org/thingsboard/server/dao/service/AbstractServiceTest.java
index d083a90..44a1a09 100644
--- a/dao/src/test/java/org/thingsboard/server/dao/service/AbstractServiceTest.java
+++ b/dao/src/test/java/org/thingsboard/server/dao/service/AbstractServiceTest.java
@@ -217,10 +217,10 @@ public abstract class AbstractServiceTest {
ruleMetaData.setWeight(weight);
ruleMetaData.setPluginToken(pluginToken);
- ruleMetaData.setAction(createNode(ComponentScope.TENANT, ComponentType.ACTION,
+ ruleMetaData.setAction(createNode(ComponentScope.TENANT, ComponentType.OLD_ACTION,
"org.thingsboard.component.ActionTest", "TestJsonDescriptor.json", "TestJsonData.json"));
- ruleMetaData.setProcessor(createNode(ComponentScope.TENANT, ComponentType.PROCESSOR,
- "org.thingsboard.component.ProcessorTest", "TestJsonDescriptor.json", "TestJsonData.json"));
+// ruleMetaData.setProcessor(createNode(ComponentScope.TENANT, ComponentType.PROCESSOR,
+// "org.thingsboard.component.ProcessorTest", "TestJsonDescriptor.json", "TestJsonData.json"));
ruleMetaData.setFilters(mapper.createArrayNode().add(
createNode(ComponentScope.TENANT, ComponentType.FILTER,
"org.thingsboard.component.FilterTest", "TestJsonDescriptor.json", "TestJsonData.json")
diff --git a/dao/src/test/java/org/thingsboard/server/dao/service/queue/cassandra/repository/impl/CassandraMsgRepositoryTest.java b/dao/src/test/java/org/thingsboard/server/dao/service/queue/cassandra/repository/impl/CassandraMsgRepositoryTest.java
index d17e1f2..99c6a91 100644
--- a/dao/src/test/java/org/thingsboard/server/dao/service/queue/cassandra/repository/impl/CassandraMsgRepositoryTest.java
+++ b/dao/src/test/java/org/thingsboard/server/dao/service/queue/cassandra/repository/impl/CassandraMsgRepositoryTest.java
@@ -24,6 +24,7 @@ import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.msg.TbMsg;
+import org.thingsboard.server.common.msg.TbMsgDataType;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.dao.service.AbstractServiceTest;
import org.thingsboard.server.dao.service.DaoNoSqlTest;
@@ -44,7 +45,7 @@ public class CassandraMsgRepositoryTest extends AbstractServiceTest {
@Test
public void msgCanBeSavedAndRead() throws ExecutionException, InterruptedException {
- TbMsg msg = new TbMsg(UUIDs.timeBased(), "type", new DeviceId(UUIDs.timeBased()), null, new byte[4]);
+ TbMsg msg = new TbMsg(UUIDs.timeBased(), "type", new DeviceId(UUIDs.timeBased()), null, TbMsgDataType.JSON, new byte[4]);
UUID nodeId = UUIDs.timeBased();
ListenableFuture<Void> future = msgRepository.save(msg, nodeId, 1L, 1L, 1L);
future.get();
@@ -54,7 +55,7 @@ public class CassandraMsgRepositoryTest extends AbstractServiceTest {
@Test
public void expiredMsgsAreNotReturned() throws ExecutionException, InterruptedException {
- TbMsg msg = new TbMsg(UUIDs.timeBased(), "type", new DeviceId(UUIDs.timeBased()), null, new byte[4]);
+ TbMsg msg = new TbMsg(UUIDs.timeBased(), "type", new DeviceId(UUIDs.timeBased()), null, TbMsgDataType.JSON, new byte[4]);
UUID nodeId = UUIDs.timeBased();
ListenableFuture<Void> future = msgRepository.save(msg, nodeId, 2L, 2L, 2L);
future.get();
@@ -67,7 +68,7 @@ public class CassandraMsgRepositoryTest extends AbstractServiceTest {
TbMsgMetaData metaData = new TbMsgMetaData();
metaData.putValue("key", "value");
String dataStr = "someContent";
- TbMsg msg = new TbMsg(UUIDs.timeBased(), "type", new DeviceId(UUIDs.timeBased()), metaData, dataStr.getBytes());
+ TbMsg msg = new TbMsg(UUIDs.timeBased(), "type", new DeviceId(UUIDs.timeBased()), metaData, TbMsgDataType.JSON, dataStr.getBytes());
UUID nodeId = UUIDs.timeBased();
ListenableFuture<Void> future = msgRepository.save(msg, nodeId, 1L, 1L, 1L);
future.get();
diff --git a/dao/src/test/java/org/thingsboard/server/dao/service/queue/cassandra/UnprocessedMsgFilterTest.java b/dao/src/test/java/org/thingsboard/server/dao/service/queue/cassandra/UnprocessedMsgFilterTest.java
index 6302e63..3935c9b 100644
--- a/dao/src/test/java/org/thingsboard/server/dao/service/queue/cassandra/UnprocessedMsgFilterTest.java
+++ b/dao/src/test/java/org/thingsboard/server/dao/service/queue/cassandra/UnprocessedMsgFilterTest.java
@@ -33,8 +33,8 @@ public class UnprocessedMsgFilterTest {
public void acknowledgedMsgsAreFilteredOut() {
UUID id1 = UUID.randomUUID();
UUID id2 = UUID.randomUUID();
- TbMsg msg1 = new TbMsg(id1, "T", null, null, null);
- TbMsg msg2 = new TbMsg(id2, "T", null, null, null);
+ TbMsg msg1 = new TbMsg(id1, "T", null, null, null, null);
+ TbMsg msg2 = new TbMsg(id2, "T", null, null, null, null);
List<TbMsg> msgs = Lists.newArrayList(msg1, msg2);
List<MsgAck> acks = Lists.newArrayList(new MsgAck(id2, UUID.randomUUID(), 1L, 1L));
Collection<TbMsg> actual = msgFilter.filter(msgs, acks);
pom.xml 5(+5 -0)
diff --git a/pom.xml b/pom.xml
index f331e32..f0c915a 100755
--- a/pom.xml
+++ b/pom.xml
@@ -379,6 +379,11 @@
<version>${project.version}</version>
</dependency>
<dependency>
+ <groupId>org.thingsboard.rule-engine</groupId>
+ <artifactId>rule-engine-components</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
<groupId>org.thingsboard.common</groupId>
<artifactId>message</artifactId>
<version>${project.version}</version>
diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/ActionNode.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/ActionNode.java
new file mode 100644
index 0000000..64e28f1
--- /dev/null
+++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/ActionNode.java
@@ -0,0 +1,43 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.rule.engine.api;
+
+import org.thingsboard.server.common.data.plugin.ComponentScope;
+import org.thingsboard.server.extensions.api.component.EmptyComponentConfiguration;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * @author Andrew Shvayka
+ */
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.TYPE)
+public @interface ActionNode {
+
+ String name();
+
+ ComponentScope scope() default ComponentScope.TENANT;
+
+ String descriptor() default "EmptyNodeDescriptor.json";
+
+ String[] relationTypes() default {"Success","Failure"};
+
+ boolean customRelations() default false;
+
+}
diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java
index ede076b..a105d20 100644
--- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java
+++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java
@@ -58,8 +58,6 @@ public interface TbContext {
UserService getUserService();
- RuleService getRuleService();
-
PluginService getPluginService();
AssetService getAssetService();
diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbNodeConfiguration.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbNodeConfiguration.java
index d06c0d2..64053cd 100644
--- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbNodeConfiguration.java
+++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbNodeConfiguration.java
@@ -22,8 +22,8 @@ import lombok.Data;
* Created by ashvayka on 19.01.18.
*/
@Data
-public class TbNodeConfiguration {
+public final class TbNodeConfiguration {
- private JsonNode data;
+ private final JsonNode data;
}
diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbNodeState.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbNodeState.java
index c48b11d..2c77a69 100644
--- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbNodeState.java
+++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbNodeState.java
@@ -18,5 +18,5 @@ package org.thingsboard.rule.engine.api;
/**
* Created by ashvayka on 19.01.18.
*/
-public class TbNodeState {
+public final class TbNodeState {
}
diff --git a/rule-engine/rule-engine-api/src/main/resources/EmptyNodeDescriptor.json b/rule-engine/rule-engine-api/src/main/resources/EmptyNodeDescriptor.json
new file mode 100644
index 0000000..7a73a41
--- /dev/null
+++ b/rule-engine/rule-engine-api/src/main/resources/EmptyNodeDescriptor.json
@@ -0,0 +1,2 @@
+{
+}
\ No newline at end of file
diff --git a/rule-engine/rule-engine-components/pom.xml b/rule-engine/rule-engine-components/pom.xml
index 9b903b1..a97493b 100644
--- a/rule-engine/rule-engine-components/pom.xml
+++ b/rule-engine/rule-engine-components/pom.xml
@@ -88,11 +88,6 @@
<artifactId>mockito-all</artifactId>
<scope>test</scope>
</dependency>
- <dependency>
- <groupId>org.junit.jupiter</groupId>
- <artifactId>junit-jupiter-api</artifactId>
- <version>RELEASE</version>
- </dependency>
<!--<dependency>-->
<!--<groupId>org.springframework.boot</groupId>-->
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetAttributesNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetAttributesNode.java
index 4d41921..ac12f95 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetAttributesNode.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetAttributesNode.java
@@ -1,12 +1,12 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
- *
+ * <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -15,6 +15,7 @@
*/
package org.thingsboard.rule.engine.metadata;
+import com.google.common.base.Function;
import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
@@ -33,6 +34,7 @@ import static org.thingsboard.server.common.data.DataConstants.*;
* Created by ashvayka on 19.01.18.
*/
@Slf4j
+@EnrichmentNode(name = "Get Attributes Node")
public class TbGetAttributesNode implements TbNode {
private TbGetAttributesNodeConfiguration config;
@@ -52,14 +54,20 @@ public class TbGetAttributesNode implements TbNode {
withCallback(future, i -> ctx.tellNext(msg), t -> ctx.tellError(msg, t));
}
- private ListenableFuture<Void> putAttributesAsync(TbMsg msg, List<AttributeKvEntry> attributes, String prefix) {
- attributes.forEach(r -> msg.getMetaData().putValue(prefix + r.getKey(), r.getValueAsString()));
- return Futures.immediateFuture(null);
+ private Void putAttributesAsync(TbMsg msg, List<AttributeKvEntry> attributes, String prefix) {
+ if (attributes != null) {
+ attributes.forEach(r -> msg.getMetaData().putValue(prefix + r.getKey(), r.getValueAsString()));
+ }
+ return null;
}
private ListenableFuture<Void> putAttrAsync(TbContext ctx, TbMsg msg, String scope, List<String> attributes, String prefix) {
- return Futures.transform(ctx.getAttributesService().find(msg.getOriginator(), scope, attributes),
- (AsyncFunction<List<AttributeKvEntry>, Void>) i -> putAttributesAsync(msg, i, prefix));
+ if (attributes != null) {
+ return Futures.transform(ctx.getAttributesService().find(msg.getOriginator(), scope, attributes),
+ (Function<List<AttributeKvEntry>, Void>) i -> putAttributesAsync(msg, i, prefix));
+ } else {
+ return Futures.immediateFuture(null);
+ }
}
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetCustomerAttributeNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetCustomerAttributeNode.java
index 57f9b79..2e3d617 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetCustomerAttributeNode.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetCustomerAttributeNode.java
@@ -18,11 +18,13 @@ package org.thingsboard.rule.engine.metadata;
import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import org.thingsboard.rule.engine.api.EnrichmentNode;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.server.common.data.HasCustomerId;
import org.thingsboard.server.common.data.id.*;
+@EnrichmentNode(name="Get Customer Attributes Node")
public class TbGetCustomerAttributeNode extends TbEntityGetAttrNode<CustomerId> {
@Override
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetRelatedAttributeNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetRelatedAttributeNode.java
index 5823c18..83aa4e2 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetRelatedAttributeNode.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetRelatedAttributeNode.java
@@ -20,10 +20,7 @@ import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.commons.collections.CollectionUtils;
import org.thingsboard.rule.engine.TbNodeUtils;
-import org.thingsboard.rule.engine.api.TbContext;
-import org.thingsboard.rule.engine.api.TbNodeConfiguration;
-import org.thingsboard.rule.engine.api.TbNodeException;
-import org.thingsboard.rule.engine.api.TbNodeState;
+import org.thingsboard.rule.engine.api.*;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.relation.EntityRelation;
import org.thingsboard.server.common.data.relation.EntitySearchDirection;
@@ -33,6 +30,7 @@ import java.util.List;
import static org.thingsboard.server.common.data.relation.RelationTypeGroup.COMMON;
+@EnrichmentNode(name="Get Related Entity Attributes Node")
public class TbGetRelatedAttributeNode extends TbEntityGetAttrNode<EntityId> {
private TbGetRelatedAttrNodeConfiguration config;
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTenantAttributeNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTenantAttributeNode.java
index 2cf9a97..a1b0516 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTenantAttributeNode.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTenantAttributeNode.java
@@ -19,6 +19,7 @@ import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import lombok.extern.slf4j.Slf4j;
+import org.thingsboard.rule.engine.api.EnrichmentNode;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.server.common.data.HasTenantId;
@@ -26,6 +27,7 @@ import org.thingsboard.server.common.data.alarm.AlarmId;
import org.thingsboard.server.common.data.id.*;
@Slf4j
+@EnrichmentNode(name="Get Tenant Attributes Node")
public class TbGetTenantAttributeNode extends TbEntityGetAttrNode<TenantId> {
@Override
@@ -38,8 +40,6 @@ public class TbGetTenantAttributeNode extends TbEntityGetAttrNode<TenantId> {
return getTenantAsync(ctx.getCustomerService().findCustomerByIdAsync((CustomerId) originator));
case USER:
return getTenantAsync(ctx.getUserService().findUserByIdAsync((UserId) originator));
- case RULE:
- return getTenantAsync(ctx.getRuleService().findRuleByIdAsync((RuleId) originator));
case PLUGIN:
return getTenantAsync(ctx.getPluginService().findPluginByIdAsync((PluginId) originator));
case ASSET:
diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/TbGetCustomerAttributeNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/TbGetCustomerAttributeNodeTest.java
index 8e5ddb8..ad40f03 100644
--- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/TbGetCustomerAttributeNodeTest.java
+++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/TbGetCustomerAttributeNodeTest.java
@@ -85,8 +85,7 @@ public class TbGetCustomerAttributeNodeTest {
config.setAttrMapping(attrMapping);
config.setTelemetry(false);
ObjectMapper mapper = new ObjectMapper();
- TbNodeConfiguration nodeConfiguration = new TbNodeConfiguration();
- nodeConfiguration.setData(mapper.valueToTree(config));
+ TbNodeConfiguration nodeConfiguration = new TbNodeConfiguration(mapper.valueToTree(config));
node = new TbGetCustomerAttributeNode();
node.init(nodeConfiguration, null);
@@ -224,8 +223,7 @@ public class TbGetCustomerAttributeNodeTest {
config.setAttrMapping(attrMapping);
config.setTelemetry(true);
ObjectMapper mapper = new ObjectMapper();
- TbNodeConfiguration nodeConfiguration = new TbNodeConfiguration();
- nodeConfiguration.setData(mapper.valueToTree(config));
+ TbNodeConfiguration nodeConfiguration = new TbNodeConfiguration(mapper.valueToTree(config));
node = new TbGetCustomerAttributeNode();
node.init(nodeConfiguration, null);