thingsboard-aplcache

TB-73: Implementation

7/24/2017 9:14:01 AM

Details

diff --git a/application/src/main/java/org/thingsboard/server/actors/rule/RuleActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/rule/RuleActorMessageProcessor.java
index 7a2b0e4..890ef25 100644
--- a/application/src/main/java/org/thingsboard/server/actors/rule/RuleActorMessageProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/rule/RuleActorMessageProcessor.java
@@ -18,6 +18,7 @@ package org.thingsboard.server.actors.rule;
 import java.util.*;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
+import org.springframework.util.StringUtils;
 import org.thingsboard.server.actors.ActorSystemContext;
 import org.thingsboard.server.actors.plugin.RuleToPluginMsgWrapper;
 import org.thingsboard.server.actors.shared.ComponentMsgProcessor;
@@ -113,8 +114,9 @@ class RuleActorMessageProcessor extends ComponentMsgProcessor<RuleId> {
     }
 
     private void initAction() throws Exception {
-        JsonNode actionMd = ruleMd.getAction();
-        action = initComponent(actionMd);
+        if (ruleMd.getAction() != null && !ruleMd.getAction().isNull()) {
+            action = initComponent(ruleMd.getAction());
+        }
     }
 
     private void initProcessor() throws Exception {
@@ -131,9 +133,11 @@ class RuleActorMessageProcessor extends ComponentMsgProcessor<RuleId> {
     }
 
     private void fetchPluginInfo() {
-        PluginMetaData pluginMd = systemContext.getPluginService().findPluginByApiToken(ruleMd.getPluginToken());
-        pluginTenantId = pluginMd.getTenantId();
-        pluginId = pluginMd.getId();
+        if (!StringUtils.isEmpty(ruleMd.getPluginToken())) {
+            PluginMetaData pluginMd = systemContext.getPluginService().findPluginByApiToken(ruleMd.getPluginToken());
+            pluginTenantId = pluginMd.getTenantId();
+            pluginId = pluginMd.getId();
+        }
     }
 
     protected void onRuleProcessingMsg(ActorContext context, RuleProcessingMsg msg) throws RuleException {
@@ -162,25 +166,26 @@ class RuleActorMessageProcessor extends ComponentMsgProcessor<RuleId> {
             inMsgMd = new RuleProcessingMetaData();
         }
         logger.debug("[{}] Going to convert in msg: {}", entityId, inMsg);
-        Optional<RuleToPluginMsg<?>> ruleToPluginMsgOptional = action.convert(ruleCtx, inMsg, inMsgMd);
-        if (ruleToPluginMsgOptional.isPresent()) {
-            RuleToPluginMsg<?> ruleToPluginMsg = ruleToPluginMsgOptional.get();
-            logger.debug("[{}] Device msg is converter to: {}", entityId, ruleToPluginMsg);
-            context.parent().tell(new RuleToPluginMsgWrapper(pluginTenantId, pluginId, tenantId, entityId, ruleToPluginMsg), context.self());
-            if (action.isOneWayAction()) {
-                pushToNextRule(context, msg.getCtx(), RuleEngineError.NO_TWO_WAY_ACTIONS);
-            } else {
-                pendingMsgMap.put(ruleToPluginMsg.getUid(), msg);
-                scheduleMsgWithDelay(context, new RuleToPluginTimeoutMsg(ruleToPluginMsg.getUid()), systemContext.getPluginProcessingTimeout());
+        if (action != null) {
+            Optional<RuleToPluginMsg<?>> ruleToPluginMsgOptional = action.convert(ruleCtx, inMsg, inMsgMd);
+            if (ruleToPluginMsgOptional.isPresent()) {
+                RuleToPluginMsg<?> ruleToPluginMsg = ruleToPluginMsgOptional.get();
+                logger.debug("[{}] Device msg is converter to: {}", entityId, ruleToPluginMsg);
+                context.parent().tell(new RuleToPluginMsgWrapper(pluginTenantId, pluginId, tenantId, entityId, ruleToPluginMsg), context.self());
+                if (action.isOneWayAction()) {
+                    pushToNextRule(context, msg.getCtx(), RuleEngineError.NO_TWO_WAY_ACTIONS);
+                } else {
+                    pendingMsgMap.put(ruleToPluginMsg.getUid(), msg);
+                    scheduleMsgWithDelay(context, new RuleToPluginTimeoutMsg(ruleToPluginMsg.getUid()), systemContext.getPluginProcessingTimeout());
+                }
             }
         } else {
             logger.debug("[{}] Nothing to send to plugin: {}", entityId, pluginId);
-            pushToNextRule(context, msg.getCtx(), RuleEngineError.NO_REQUEST_FROM_ACTIONS);
-            return;
+            pushToNextRule(context, msg.getCtx(), RuleEngineError.NO_TWO_WAY_ACTIONS);
         }
     }
 
-    public void onPluginMsg(ActorContext context, PluginToRuleMsg<?> msg) {
+    void onPluginMsg(ActorContext context, PluginToRuleMsg<?> msg) {
         RuleProcessingMsg pendingMsg = pendingMsgMap.remove(msg.getUid());
         if (pendingMsg != null) {
             ChainProcessingContext ctx = pendingMsg.getCtx();
@@ -196,7 +201,7 @@ class RuleActorMessageProcessor extends ComponentMsgProcessor<RuleId> {
         }
     }
 
-    public void onTimeoutMsg(ActorContext context, RuleToPluginTimeoutMsg msg) {
+    void onTimeoutMsg(ActorContext context, RuleToPluginTimeoutMsg msg) {
         RuleProcessingMsg pendingMsg = pendingMsgMap.remove(msg.getMsgId());
         if (pendingMsg != null) {
             logger.debug("[{}] Processing timeout detected [{}]: {}", entityId, msg.getMsgId(), pendingMsg);
@@ -269,18 +274,16 @@ class RuleActorMessageProcessor extends ComponentMsgProcessor<RuleId> {
     public void onActivate(ActorContext context) throws Exception {
         logger.info("[{}] Going to process onActivate rule.", entityId);
         this.state = ComponentLifecycleState.ACTIVE;
-        if (action != null) {
-            if (filters != null) {
-                filters.forEach(f -> f.resume());
-            } else {
-                initFilters();
-            }
+        if (filters != null) {
+            filters.forEach(RuleLifecycleComponent::resume);
             if (processor != null) {
                 processor.resume();
             } else {
                 initProcessor();
             }
-            action.resume();
+            if (action != null) {
+                action.resume();
+            }
             logger.info("[{}] Rule resumed.", entityId);
         } else {
             start();
diff --git a/dao/src/main/java/org/thingsboard/server/dao/rule/BaseRuleService.java b/dao/src/main/java/org/thingsboard/server/dao/rule/BaseRuleService.java
index b356c90..77d38b9 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/rule/BaseRuleService.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/rule/BaseRuleService.java
@@ -91,7 +91,9 @@ public class BaseRuleService extends AbstractEntityService implements RuleServic
         if (rule.getProcessor() != null && !rule.getProcessor().isNull()) {
             validateComponentJson(rule.getProcessor(), ComponentType.PROCESSOR);
         }
-        validateComponentJson(rule.getAction(), ComponentType.ACTION);
+        if (rule.getAction() != null && !rule.getAction().isNull()) {
+            validateComponentJson(rule.getAction(), ComponentType.ACTION);
+        }
         validateRuleAndPluginState(rule);
         return ruleDao.save(rule);
     }
@@ -129,6 +131,9 @@ public class BaseRuleService extends AbstractEntityService implements RuleServic
     }
 
     private void validateRuleAndPluginState(RuleMetaData rule) {
+        if (org.springframework.util.StringUtils.isEmpty(rule.getPluginToken())) {
+            return;
+        }
         PluginMetaData pluginMd = pluginService.findPluginByApiToken(rule.getPluginToken());
         if (pluginMd == null) {
             throw new IncorrectParameterException("Rule points to non-existent plugin!");