thingsboard-aplcache
Changes
application/src/main/java/org/thingsboard/server/actors/rule/RuleActorMessageProcessor.java 66(+35 -31)
extensions-core/src/main/java/org/thingsboard/server/extensions/core/processor/AlarmProcessor.java 4(+2 -2)
ui/src/app/rule/rule.directive.js 10(+6 -4)
Details
diff --git a/application/src/main/java/org/thingsboard/server/actors/rule/RuleActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/rule/RuleActorMessageProcessor.java
index 7a2b0e4..5704b12 100644
--- a/application/src/main/java/org/thingsboard/server/actors/rule/RuleActorMessageProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/rule/RuleActorMessageProcessor.java
@@ -18,6 +18,7 @@ package org.thingsboard.server.actors.rule;
import java.util.*;
import com.fasterxml.jackson.core.JsonProcessingException;
+import org.springframework.util.StringUtils;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.actors.plugin.RuleToPluginMsgWrapper;
import org.thingsboard.server.actors.shared.ComponentMsgProcessor;
@@ -113,8 +114,9 @@ class RuleActorMessageProcessor extends ComponentMsgProcessor<RuleId> {
}
private void initAction() throws Exception {
- JsonNode actionMd = ruleMd.getAction();
- action = initComponent(actionMd);
+ if (ruleMd.getAction() != null && !ruleMd.getAction().isNull()) {
+ action = initComponent(ruleMd.getAction());
+ }
}
private void initProcessor() throws Exception {
@@ -131,9 +133,11 @@ class RuleActorMessageProcessor extends ComponentMsgProcessor<RuleId> {
}
private void fetchPluginInfo() {
- PluginMetaData pluginMd = systemContext.getPluginService().findPluginByApiToken(ruleMd.getPluginToken());
- pluginTenantId = pluginMd.getTenantId();
- pluginId = pluginMd.getId();
+ if (!StringUtils.isEmpty(ruleMd.getPluginToken())) {
+ PluginMetaData pluginMd = systemContext.getPluginService().findPluginByApiToken(ruleMd.getPluginToken());
+ pluginTenantId = pluginMd.getTenantId();
+ pluginId = pluginMd.getId();
+ }
}
protected void onRuleProcessingMsg(ActorContext context, RuleProcessingMsg msg) throws RuleException {
@@ -162,25 +166,27 @@ class RuleActorMessageProcessor extends ComponentMsgProcessor<RuleId> {
inMsgMd = new RuleProcessingMetaData();
}
logger.debug("[{}] Going to convert in msg: {}", entityId, inMsg);
- Optional<RuleToPluginMsg<?>> ruleToPluginMsgOptional = action.convert(ruleCtx, inMsg, inMsgMd);
- if (ruleToPluginMsgOptional.isPresent()) {
- RuleToPluginMsg<?> ruleToPluginMsg = ruleToPluginMsgOptional.get();
- logger.debug("[{}] Device msg is converter to: {}", entityId, ruleToPluginMsg);
- context.parent().tell(new RuleToPluginMsgWrapper(pluginTenantId, pluginId, tenantId, entityId, ruleToPluginMsg), context.self());
- if (action.isOneWayAction()) {
- pushToNextRule(context, msg.getCtx(), RuleEngineError.NO_TWO_WAY_ACTIONS);
- } else {
- pendingMsgMap.put(ruleToPluginMsg.getUid(), msg);
- scheduleMsgWithDelay(context, new RuleToPluginTimeoutMsg(ruleToPluginMsg.getUid()), systemContext.getPluginProcessingTimeout());
+ if (action != null) {
+ Optional<RuleToPluginMsg<?>> ruleToPluginMsgOptional = action.convert(ruleCtx, inMsg, inMsgMd);
+ if (ruleToPluginMsgOptional.isPresent()) {
+ RuleToPluginMsg<?> ruleToPluginMsg = ruleToPluginMsgOptional.get();
+ logger.debug("[{}] Device msg is converter to: {}", entityId, ruleToPluginMsg);
+ context.parent().tell(new RuleToPluginMsgWrapper(pluginTenantId, pluginId, tenantId, entityId, ruleToPluginMsg), context.self());
+ if (action.isOneWayAction()) {
+ pushToNextRule(context, msg.getCtx(), RuleEngineError.NO_TWO_WAY_ACTIONS);
+ return;
+ } else {
+ pendingMsgMap.put(ruleToPluginMsg.getUid(), msg);
+ scheduleMsgWithDelay(context, new RuleToPluginTimeoutMsg(ruleToPluginMsg.getUid()), systemContext.getPluginProcessingTimeout());
+ return;
+ }
}
- } else {
- logger.debug("[{}] Nothing to send to plugin: {}", entityId, pluginId);
- pushToNextRule(context, msg.getCtx(), RuleEngineError.NO_REQUEST_FROM_ACTIONS);
- return;
}
+ logger.debug("[{}] Nothing to send to plugin: {}", entityId, pluginId);
+ pushToNextRule(context, msg.getCtx(), RuleEngineError.NO_TWO_WAY_ACTIONS);
}
- public void onPluginMsg(ActorContext context, PluginToRuleMsg<?> msg) {
+ void onPluginMsg(ActorContext context, PluginToRuleMsg<?> msg) {
RuleProcessingMsg pendingMsg = pendingMsgMap.remove(msg.getUid());
if (pendingMsg != null) {
ChainProcessingContext ctx = pendingMsg.getCtx();
@@ -196,7 +202,7 @@ class RuleActorMessageProcessor extends ComponentMsgProcessor<RuleId> {
}
}
- public void onTimeoutMsg(ActorContext context, RuleToPluginTimeoutMsg msg) {
+ void onTimeoutMsg(ActorContext context, RuleToPluginTimeoutMsg msg) {
RuleProcessingMsg pendingMsg = pendingMsgMap.remove(msg.getMsgId());
if (pendingMsg != null) {
logger.debug("[{}] Processing timeout detected [{}]: {}", entityId, msg.getMsgId(), pendingMsg);
@@ -210,13 +216,13 @@ class RuleActorMessageProcessor extends ComponentMsgProcessor<RuleId> {
ctx = ctx.withError(error);
}
if (ctx.isFailure()) {
- logger.debug("[{}] Forwarding processing chain to device actor due to failure.", ctx.getInMsg().getDeviceId());
+ logger.debug("[{}][{}] Forwarding processing chain to device actor due to failure.", ruleMd.getId(), ctx.getInMsg().getDeviceId());
ctx.getDeviceActor().tell(new RulesProcessedMsg(ctx), ActorRef.noSender());
} else if (!ctx.hasNext()) {
- logger.debug("[{}] Forwarding processing chain to device actor due to end of chain.", ctx.getInMsg().getDeviceId());
+ logger.debug("[{}][{}] Forwarding processing chain to device actor due to end of chain.", ruleMd.getId(), ctx.getInMsg().getDeviceId());
ctx.getDeviceActor().tell(new RulesProcessedMsg(ctx), ActorRef.noSender());
} else {
- logger.debug("[{}] Forwarding processing chain to next rule actor.", ctx.getInMsg().getDeviceId());
+ logger.debug("[{}][{}] Forwarding processing chain to next rule actor.", ruleMd.getId(), ctx.getInMsg().getDeviceId());
ChainProcessingContext nextTask = ctx.getNext();
nextTask.getCurrentActor().tell(new RuleProcessingMsg(nextTask), context.self());
}
@@ -269,18 +275,16 @@ class RuleActorMessageProcessor extends ComponentMsgProcessor<RuleId> {
public void onActivate(ActorContext context) throws Exception {
logger.info("[{}] Going to process onActivate rule.", entityId);
this.state = ComponentLifecycleState.ACTIVE;
- if (action != null) {
- if (filters != null) {
- filters.forEach(f -> f.resume());
- } else {
- initFilters();
- }
+ if (filters != null) {
+ filters.forEach(RuleLifecycleComponent::resume);
if (processor != null) {
processor.resume();
} else {
initProcessor();
}
- action.resume();
+ if (action != null) {
+ action.resume();
+ }
logger.info("[{}] Rule resumed.", entityId);
} else {
start();
diff --git a/application/src/main/java/org/thingsboard/server/actors/shared/rule/RuleManager.java b/application/src/main/java/org/thingsboard/server/actors/shared/rule/RuleManager.java
index db61b81..1e00a6d 100644
--- a/application/src/main/java/org/thingsboard/server/actors/shared/rule/RuleManager.java
+++ b/application/src/main/java/org/thingsboard/server/actors/shared/rule/RuleManager.java
@@ -72,16 +72,19 @@ public abstract class RuleManager {
}
public Optional<ActorRef> update(ActorContext context, RuleId ruleId, ComponentLifecycleEvent event) {
- RuleMetaData rule = null;
+ RuleMetaData rule;
if (event != ComponentLifecycleEvent.DELETED) {
rule = systemContext.getRuleService().findRuleById(ruleId);
- }
- if (rule == null) {
+ } else {
rule = ruleMap.keySet().stream()
.filter(r -> r.getId().equals(ruleId))
.peek(r -> r.setState(ComponentLifecycleState.SUSPENDED))
.findFirst()
.orElse(null);
+ if (rule != null) {
+ ruleMap.remove(rule);
+ ruleActors.remove(ruleId);
+ }
}
if (rule != null) {
RuleActorMetaData actorMd = ruleMap.get(rule);
diff --git a/application/src/main/resources/logback.xml b/application/src/main/resources/logback.xml
index 93ed505..7169894 100644
--- a/application/src/main/resources/logback.xml
+++ b/application/src/main/resources/logback.xml
@@ -25,7 +25,7 @@
</encoder>
</appender>
- <logger name="org.thingsboard.server" level="INFO" />
+ <logger name="org.thingsboard.server" level="TRACE" />
<logger name="akka" level="INFO" />
<root level="INFO">
diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml
index b133f03..7e2a773 100644
--- a/application/src/main/resources/thingsboard.yml
+++ b/application/src/main/resources/thingsboard.yml
@@ -176,7 +176,7 @@ actors:
statistics:
# Enable/disable actor statistics
enabled: "${ACTORS_STATISTICS_ENABLED:true}"
- persist_frequency: "${ACTORS_STATISTICS_PERSIST_FREQUENCY:60000}"
+ persist_frequency: "${ACTORS_STATISTICS_PERSIST_FREQUENCY:3600000}"
# Cache parameters
cache:
diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/rule/RuleMetaData.java b/common/data/src/main/java/org/thingsboard/server/common/data/rule/RuleMetaData.java
index 1451d2c..0ed44d9 100644
--- a/common/data/src/main/java/org/thingsboard/server/common/data/rule/RuleMetaData.java
+++ b/common/data/src/main/java/org/thingsboard/server/common/data/rule/RuleMetaData.java
@@ -17,6 +17,7 @@ package org.thingsboard.server.common.data.rule;
import com.fasterxml.jackson.databind.JsonNode;
import lombok.Data;
+import lombok.EqualsAndHashCode;
import org.thingsboard.server.common.data.HasName;
import org.thingsboard.server.common.data.SearchTextBased;
import org.thingsboard.server.common.data.id.RuleId;
@@ -24,6 +25,7 @@ import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.plugin.ComponentLifecycleState;
@Data
+@EqualsAndHashCode(callSuper = true)
public class RuleMetaData extends SearchTextBased<RuleId> implements HasName {
private static final long serialVersionUID = -5656679015122935465L;
diff --git a/dao/src/main/java/org/thingsboard/server/dao/rule/BaseRuleService.java b/dao/src/main/java/org/thingsboard/server/dao/rule/BaseRuleService.java
index b356c90..77d38b9 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/rule/BaseRuleService.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/rule/BaseRuleService.java
@@ -91,7 +91,9 @@ public class BaseRuleService extends AbstractEntityService implements RuleServic
if (rule.getProcessor() != null && !rule.getProcessor().isNull()) {
validateComponentJson(rule.getProcessor(), ComponentType.PROCESSOR);
}
- validateComponentJson(rule.getAction(), ComponentType.ACTION);
+ if (rule.getAction() != null && !rule.getAction().isNull()) {
+ validateComponentJson(rule.getAction(), ComponentType.ACTION);
+ }
validateRuleAndPluginState(rule);
return ruleDao.save(rule);
}
@@ -129,6 +131,9 @@ public class BaseRuleService extends AbstractEntityService implements RuleServic
}
private void validateRuleAndPluginState(RuleMetaData rule) {
+ if (org.springframework.util.StringUtils.isEmpty(rule.getPluginToken())) {
+ return;
+ }
PluginMetaData pluginMd = pluginService.findPluginByApiToken(rule.getPluginToken());
if (pluginMd == null) {
throw new IncorrectParameterException("Rule points to non-existent plugin!");
diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/processor/AlarmProcessor.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/processor/AlarmProcessor.java
index 3dec45e..e88247a 100644
--- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/processor/AlarmProcessor.java
+++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/processor/AlarmProcessor.java
@@ -125,10 +125,10 @@ public class AlarmProcessor implements RuleProcessor<AlarmProcessorConfiguration
Alarm alarm = buildAlarm(ctx, msg);
existing = ctx.createOrUpdateAlarm(alarm);
if (existing.getStartTs() == alarm.getStartTs()) {
- log.debug("[{}][{}] New Active Alarm detected");
+ log.debug("[{}][{}] New Active Alarm detected", ctx.getRuleId(), existing.getId());
md.put(IS_NEW_ALARM, Boolean.TRUE);
} else {
- log.debug("[{}][{}] Existing Active Alarm detected");
+ log.debug("[{}][{}] Existing Active Alarm detected", ctx.getRuleId(), existing.getId());
md.put(IS_EXISTING_ALARM, Boolean.TRUE);
}
} else if (isClearedAlarm) {
ui/src/app/rule/rule.directive.js 10(+6 -4)
diff --git a/ui/src/app/rule/rule.directive.js b/ui/src/app/rule/rule.directive.js
index bfd6b4c..64502f9 100644
--- a/ui/src/app/rule/rule.directive.js
+++ b/ui/src/app/rule/rule.directive.js
@@ -85,10 +85,11 @@ export default function RuleDirective($compile, $templateCache, $mdDialog, $docu
if (scope.rule) {
var valid = scope.rule.filters && scope.rule.filters.length > 0;
scope.theForm.$setValidity('filters', valid);
- valid = angular.isDefined(scope.rule.pluginToken) && scope.rule.pluginToken != null;
- scope.theForm.$setValidity('plugin', valid);
- valid = angular.isDefined(scope.rule.action) && scope.rule.action != null;
- scope.theForm.$setValidity('action', valid);
+ var processorDefined = angular.isDefined(scope.rule.processor) && scope.rule.processor != null;
+ var pluginDefined = angular.isDefined(scope.rule.pluginToken) && scope.rule.pluginToken != null;
+ var pluginActionDefined = angular.isDefined(scope.rule.action) && scope.rule.action != null;
+ valid = processorDefined && !pluginDefined || (pluginDefined && pluginActionDefined);
+ scope.theForm.$setValidity('processorOrPlugin', valid);
}
};
@@ -160,6 +161,7 @@ export default function RuleDirective($compile, $templateCache, $mdDialog, $docu
scope.$watch('rule.processor', function(newVal, prevVal) {
if (scope.rule && scope.isEdit && !angular.equals(newVal, prevVal)) {
scope.theForm.$setDirty();
+ scope.updateValidity();
}
}, true);
diff --git a/ui/src/app/rule/rule-fieldset.tpl.html b/ui/src/app/rule/rule-fieldset.tpl.html
index 0646b3f..835b783 100644
--- a/ui/src/app/rule/rule-fieldset.tpl.html
+++ b/ui/src/app/rule/rule-fieldset.tpl.html
@@ -165,11 +165,11 @@
<fieldset ng-disabled="loading || !isEdit || isReadOnly">
<md-input-container ng-if="!isEdit || isReadOnly" flex class="md-block">
<label translate>plugin.plugin</label>
- <input required name="name" ng-model="plugin.name">
+ <input name="name" ng-model="plugin.name">
</md-input-container>
<tb-plugin-select ng-show="isEdit && !isReadOnly" flex
ng-model="plugin"
- tb-required="true"
+ tb-required="false"
the-form="theForm"
plugins-scope="action">
</tb-plugin-select>