thingsboard-aplcache
Changes
application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java 34(+23 -11)
application/src/test/java/org/thingsboard/server/controller/AbstractRuleEngineControllerTest.java 6(+6 -0)
application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java 40(+23 -17)
application/src/test/java/org/thingsboard/server/rules/lifecycle/AbstractRuleEngineLifecycleIntegrationTest.java 20(+12 -8)
Details
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
index ac902a7..cc73eae 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
@@ -1,12 +1,12 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
- *
+ * <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -183,23 +183,35 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
void onServiceToRuleEngineMsg(ServiceToRuleEngineMsg envelope) {
checkActive();
- putToQueue(enrichWithRuleChainId(envelope.getTbMsg()), msg -> pushMsgToNode(firstNode, msg, ""));
+ if (firstNode != null) {
+ putToQueue(enrichWithRuleChainId(envelope.getTbMsg()), msg -> pushMsgToNode(firstNode, msg, ""));
+ }
}
void onDeviceActorToRuleEngineMsg(DeviceActorToRuleEngineMsg envelope) {
checkActive();
- putToQueue(enrichWithRuleChainId(envelope.getTbMsg()), msg -> {
- pushMsgToNode(firstNode, msg, "");
- envelope.getCallbackRef().tell(new RuleEngineQueuePutAckMsg(msg.getId()), self);
- });
+ if (firstNode != null) {
+ putToQueue(enrichWithRuleChainId(envelope.getTbMsg()), msg -> {
+ pushMsgToNode(firstNode, msg, "");
+ envelope.getCallbackRef().tell(new RuleEngineQueuePutAckMsg(msg.getId()), self);
+ });
+ }
}
void onRuleChainToRuleChainMsg(RuleChainToRuleChainMsg envelope) {
checkActive();
if (envelope.isEnqueue()) {
- putToQueue(enrichWithRuleChainId(envelope.getMsg()), msg -> pushMsgToNode(firstNode, msg, envelope.getFromRelationType()));
+ if (firstNode != null) {
+ putToQueue(enrichWithRuleChainId(envelope.getMsg()), msg -> pushMsgToNode(firstNode, msg, envelope.getFromRelationType()));
+ }
} else {
- pushMsgToNode(firstNode, envelope.getMsg(), envelope.getFromRelationType());
+ if (firstNode != null) {
+ pushMsgToNode(firstNode, envelope.getMsg(), envelope.getFromRelationType());
+ } else {
+ TbMsg msg = envelope.getMsg();
+ EntityId ackId = msg.getRuleNodeId() != null ? msg.getRuleNodeId() : msg.getRuleChainId();
+ queue.ack(tenantId, envelope.getMsg(), ackId.getId(), msg.getClusterPartition());
+ }
}
}
diff --git a/application/src/test/java/org/thingsboard/server/controller/AbstractRuleEngineControllerTest.java b/application/src/test/java/org/thingsboard/server/controller/AbstractRuleEngineControllerTest.java
index 6c2d3db..1b042a8 100644
--- a/application/src/test/java/org/thingsboard/server/controller/AbstractRuleEngineControllerTest.java
+++ b/application/src/test/java/org/thingsboard/server/controller/AbstractRuleEngineControllerTest.java
@@ -32,6 +32,7 @@ import org.thingsboard.server.dao.rule.RuleChainService;
import org.thingsboard.server.service.queue.MsgQueueService;
import java.io.IOException;
+import java.util.function.Predicate;
/**
* Created by ashvayka on 20.03.18.
@@ -75,4 +76,9 @@ public class AbstractRuleEngineControllerTest extends AbstractControllerTest {
throw new RuntimeException(e);
}
}
+
+ protected Predicate<Event> filterByCustomEvent() {
+ return event -> event.getBody().get("msgType").textValue().equals("CUSTOM");
+ }
+
}
diff --git a/application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java b/application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java
index 356dfee..c86d496 100644
--- a/application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java
+++ b/application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java
@@ -47,6 +47,8 @@ import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
@@ -157,15 +159,15 @@ public abstract class AbstractRuleEngineFlowIntegrationTest extends AbstractRule
Thread.sleep(3000);
- TimePageData<Event> events = getDebugEvents(savedTenant.getId(), ruleChain.getFirstRuleNodeId(), 1000);
+ TimePageData<Event> eventsPage = getDebugEvents(savedTenant.getId(), ruleChain.getFirstRuleNodeId(), 1000);
+ List<Event> events = eventsPage.getData().stream().filter(filterByCustomEvent()).collect(Collectors.toList());
+ Assert.assertEquals(2, events.size());
- Assert.assertEquals(2, events.getData().size());
-
- Event inEvent = events.getData().stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.IN)).findFirst().get();
+ Event inEvent = events.stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.IN)).findFirst().get();
Assert.assertEquals(ruleChain.getFirstRuleNodeId(), inEvent.getEntityId());
Assert.assertEquals(device.getId().getId().toString(), inEvent.getBody().get("entityId").asText());
- Event outEvent = events.getData().stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.OUT)).findFirst().get();
+ Event outEvent = events.stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.OUT)).findFirst().get();
Assert.assertEquals(ruleChain.getFirstRuleNodeId(), outEvent.getEntityId());
Assert.assertEquals(device.getId().getId().toString(), outEvent.getBody().get("entityId").asText());
@@ -174,15 +176,16 @@ public abstract class AbstractRuleEngineFlowIntegrationTest extends AbstractRule
RuleChain finalRuleChain = ruleChain;
RuleNode lastRuleNode = metaData.getNodes().stream().filter(node -> !node.getId().equals(finalRuleChain.getFirstRuleNodeId())).findFirst().get();
- events = getDebugEvents(savedTenant.getId(), lastRuleNode.getId(), 1000);
+ eventsPage = getDebugEvents(savedTenant.getId(), lastRuleNode.getId(), 1000);
+ events = eventsPage.getData().stream().filter(filterByCustomEvent()).collect(Collectors.toList());
- Assert.assertEquals(2, events.getData().size());
+ Assert.assertEquals(2, events.size());
- inEvent = events.getData().stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.IN)).findFirst().get();
+ inEvent = events.stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.IN)).findFirst().get();
Assert.assertEquals(lastRuleNode.getId(), inEvent.getEntityId());
Assert.assertEquals(device.getId().getId().toString(), inEvent.getBody().get("entityId").asText());
- outEvent = events.getData().stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.OUT)).findFirst().get();
+ outEvent = events.stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.OUT)).findFirst().get();
Assert.assertEquals(lastRuleNode.getId(), outEvent.getEntityId());
Assert.assertEquals(device.getId().getId().toString(), outEvent.getBody().get("entityId").asText());
@@ -274,15 +277,16 @@ public abstract class AbstractRuleEngineFlowIntegrationTest extends AbstractRule
Thread.sleep(3000);
- TimePageData<Event> events = getDebugEvents(savedTenant.getId(), rootRuleChain.getFirstRuleNodeId(), 1000);
+ TimePageData<Event> eventsPage = getDebugEvents(savedTenant.getId(), rootRuleChain.getFirstRuleNodeId(), 1000);
+ List<Event> events = eventsPage.getData().stream().filter(filterByCustomEvent()).collect(Collectors.toList());
- Assert.assertEquals(2, events.getData().size());
+ Assert.assertEquals(2, events.size());
- Event inEvent = events.getData().stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.IN)).findFirst().get();
+ Event inEvent = events.stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.IN)).findFirst().get();
Assert.assertEquals(rootRuleChain.getFirstRuleNodeId(), inEvent.getEntityId());
Assert.assertEquals(device.getId().getId().toString(), inEvent.getBody().get("entityId").asText());
- Event outEvent = events.getData().stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.OUT)).findFirst().get();
+ Event outEvent = events.stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.OUT)).findFirst().get();
Assert.assertEquals(rootRuleChain.getFirstRuleNodeId(), outEvent.getEntityId());
Assert.assertEquals(device.getId().getId().toString(), outEvent.getBody().get("entityId").asText());
@@ -291,15 +295,17 @@ public abstract class AbstractRuleEngineFlowIntegrationTest extends AbstractRule
RuleChain finalRuleChain = rootRuleChain;
RuleNode lastRuleNode = secondaryMetaData.getNodes().stream().filter(node -> !node.getId().equals(finalRuleChain.getFirstRuleNodeId())).findFirst().get();
- events = getDebugEvents(savedTenant.getId(), lastRuleNode.getId(), 1000);
+ eventsPage = getDebugEvents(savedTenant.getId(), lastRuleNode.getId(), 1000);
+ events = eventsPage.getData().stream().filter(filterByCustomEvent()).collect(Collectors.toList());
+
- Assert.assertEquals(2, events.getData().size());
+ Assert.assertEquals(2, events.size());
- inEvent = events.getData().stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.IN)).findFirst().get();
+ inEvent = events.stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.IN)).findFirst().get();
Assert.assertEquals(lastRuleNode.getId(), inEvent.getEntityId());
Assert.assertEquals(device.getId().getId().toString(), inEvent.getBody().get("entityId").asText());
- outEvent = events.getData().stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.OUT)).findFirst().get();
+ outEvent = events.stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.OUT)).findFirst().get();
Assert.assertEquals(lastRuleNode.getId(), outEvent.getEntityId());
Assert.assertEquals(device.getId().getId().toString(), outEvent.getBody().get("entityId").asText());
diff --git a/application/src/test/java/org/thingsboard/server/rules/lifecycle/AbstractRuleEngineLifecycleIntegrationTest.java b/application/src/test/java/org/thingsboard/server/rules/lifecycle/AbstractRuleEngineLifecycleIntegrationTest.java
index 2f25b97..7ac0789 100644
--- a/application/src/test/java/org/thingsboard/server/rules/lifecycle/AbstractRuleEngineLifecycleIntegrationTest.java
+++ b/application/src/test/java/org/thingsboard/server/rules/lifecycle/AbstractRuleEngineLifecycleIntegrationTest.java
@@ -45,6 +45,8 @@ import org.thingsboard.server.dao.attributes.AttributesService;
import java.io.IOException;
import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
@@ -144,15 +146,16 @@ public abstract class AbstractRuleEngineLifecycleIntegrationTest extends Abstrac
Thread.sleep(3000);
- TimePageData<Event> events = getDebugEvents(savedTenant.getId(), ruleChain.getFirstRuleNodeId(), 1000);
+ TimePageData<Event> eventsPage = getDebugEvents(savedTenant.getId(), ruleChain.getFirstRuleNodeId(), 1000);
+ List<Event> events = eventsPage.getData().stream().filter(filterByCustomEvent()).collect(Collectors.toList());
- Assert.assertEquals(2, events.getData().size());
+ Assert.assertEquals(2, events.size());
- Event inEvent = events.getData().stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.IN)).findFirst().get();
+ Event inEvent = events.stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.IN)).findFirst().get();
Assert.assertEquals(ruleChain.getFirstRuleNodeId(), inEvent.getEntityId());
Assert.assertEquals(device.getId().getId().toString(), inEvent.getBody().get("entityId").asText());
- Event outEvent = events.getData().stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.OUT)).findFirst().get();
+ Event outEvent = events.stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.OUT)).findFirst().get();
Assert.assertEquals(ruleChain.getFirstRuleNodeId(), outEvent.getEntityId());
Assert.assertEquals(device.getId().getId().toString(), outEvent.getBody().get("entityId").asText());
@@ -212,15 +215,16 @@ public abstract class AbstractRuleEngineLifecycleIntegrationTest extends Abstrac
Thread.sleep(3000);
- TimePageData<Event> events = getDebugEvents(savedTenant.getId(), ruleChain.getFirstRuleNodeId(), 1000);
+ TimePageData<Event> eventsPage = getDebugEvents(savedTenant.getId(), ruleChain.getFirstRuleNodeId(), 1000);
+ List<Event> events = eventsPage.getData().stream().filter(filterByCustomEvent()).collect(Collectors.toList());
- Assert.assertEquals(2, events.getData().size());
+ Assert.assertEquals(2, events.size());
- Event inEvent = events.getData().stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.IN)).findFirst().get();
+ Event inEvent = events.stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.IN)).findFirst().get();
Assert.assertEquals(ruleChain.getFirstRuleNodeId(), inEvent.getEntityId());
Assert.assertEquals(device.getId().getId().toString(), inEvent.getBody().get("entityId").asText());
- Event outEvent = events.getData().stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.OUT)).findFirst().get();
+ Event outEvent = events.stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.OUT)).findFirst().get();
Assert.assertEquals(ruleChain.getFirstRuleNodeId(), outEvent.getEntityId());
Assert.assertEquals(device.getId().getId().toString(), outEvent.getBody().get("entityId").asText());
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbAbstractGetAttributesNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbAbstractGetAttributesNode.java
index c6212cc..65b2f11 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbAbstractGetAttributesNode.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbAbstractGetAttributesNode.java
@@ -15,7 +15,6 @@
*/
package org.thingsboard.rule.engine.metadata;
-import com.google.common.base.Function;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.commons.collections.CollectionUtils;
@@ -33,9 +32,11 @@ import java.util.List;
import static org.thingsboard.rule.engine.DonAsynchron.withCallback;
import static org.thingsboard.rule.engine.api.TbRelationTypes.FAILURE;
import static org.thingsboard.rule.engine.api.TbRelationTypes.SUCCESS;
-import static org.thingsboard.server.common.data.DataConstants.*;
+import static org.thingsboard.server.common.data.DataConstants.CLIENT_SCOPE;
+import static org.thingsboard.server.common.data.DataConstants.SERVER_SCOPE;
+import static org.thingsboard.server.common.data.DataConstants.SHARED_SCOPE;
-public abstract class TbAbstractGetAttributesNode<C extends TbGetAttributesNodeConfiguration, T extends EntityId> implements TbNode {
+public abstract class TbAbstractGetAttributesNode<C extends TbGetAttributesNodeConfiguration, T extends EntityId> implements TbNode {
protected C config;
@@ -59,7 +60,7 @@ public abstract class TbAbstractGetAttributesNode<C extends TbGetAttributesNodeC
}
private void safePutAttributes(TbContext ctx, TbMsg msg, T entityId) {
- if(entityId == null || entityId.isNullUid()) {
+ if (entityId == null || entityId.isNullUid()) {
ctx.tellNext(msg, FAILURE);
return;
}
@@ -78,7 +79,13 @@ public abstract class TbAbstractGetAttributesNode<C extends TbGetAttributesNodeC
}
ListenableFuture<List<AttributeKvEntry>> latest = ctx.getAttributesService().find(entityId, scope, keys);
return Futures.transform(latest, l -> {
- l.forEach(r -> msg.getMetaData().putValue(prefix + r.getKey(), r.getValueAsString()));
+ l.forEach(r -> {
+ if (r.getValue() != null) {
+ msg.getMetaData().putValue(prefix + r.getKey(), r.getValueAsString());
+ } else {
+ throw new RuntimeException("[" + scope + "][" + r.getKey() + "] attribute value is not present in the DB!");
+ }
+ });
return null;
});
}
@@ -89,7 +96,13 @@ public abstract class TbAbstractGetAttributesNode<C extends TbGetAttributesNodeC
}
ListenableFuture<List<TsKvEntry>> latest = ctx.getTimeseriesService().findLatest(entityId, keys);
return Futures.transform(latest, l -> {
- l.forEach(r -> msg.getMetaData().putValue(r.getKey(), r.getValueAsString()));
+ l.forEach(r -> {
+ if (r.getValue() != null) {
+ msg.getMetaData().putValue(r.getKey(), r.getValueAsString());
+ } else {
+ throw new RuntimeException("[" + r.getKey() + "] telemetry value is not present in the DB!");
+ }
+ });
return null;
});
}
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java
index ea4446d..dc0fcdd 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java
@@ -69,10 +69,8 @@ public class TbMsgTimeseriesNode implements TbNode {
try {
ts = Long.parseLong(tsStr);
} catch (NumberFormatException e) {}
- }
- if (ts == -1) {
- ctx.tellFailure(msg, new IllegalArgumentException("Msg metadata doesn't contain valid ts value: " + msg.getMetaData()));
- return;
+ } else {
+ ts = System.currentTimeMillis();
}
String src = msg.getData();
TelemetryUploadRequest telemetryUploadRequest = JsonConverter.convertToTelemetry(new JsonParser().parse(src), ts);