thingsboard-aplcache

Fixed integration tests

5/23/2018 8:30:32 AM

Details

diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
index ac902a7..cc73eae 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
@@ -1,12 +1,12 @@
 /**
  * Copyright © 2016-2018 The Thingsboard Authors
- *
+ * <p>
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -183,23 +183,35 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
 
     void onServiceToRuleEngineMsg(ServiceToRuleEngineMsg envelope) {
         checkActive();
-        putToQueue(enrichWithRuleChainId(envelope.getTbMsg()), msg -> pushMsgToNode(firstNode, msg, ""));
+        if (firstNode != null) {
+            putToQueue(enrichWithRuleChainId(envelope.getTbMsg()), msg -> pushMsgToNode(firstNode, msg, ""));
+        }
     }
 
     void onDeviceActorToRuleEngineMsg(DeviceActorToRuleEngineMsg envelope) {
         checkActive();
-        putToQueue(enrichWithRuleChainId(envelope.getTbMsg()), msg -> {
-            pushMsgToNode(firstNode, msg, "");
-            envelope.getCallbackRef().tell(new RuleEngineQueuePutAckMsg(msg.getId()), self);
-        });
+        if (firstNode != null) {
+            putToQueue(enrichWithRuleChainId(envelope.getTbMsg()), msg -> {
+                pushMsgToNode(firstNode, msg, "");
+                envelope.getCallbackRef().tell(new RuleEngineQueuePutAckMsg(msg.getId()), self);
+            });
+        }
     }
 
     void onRuleChainToRuleChainMsg(RuleChainToRuleChainMsg envelope) {
         checkActive();
         if (envelope.isEnqueue()) {
-            putToQueue(enrichWithRuleChainId(envelope.getMsg()), msg -> pushMsgToNode(firstNode, msg, envelope.getFromRelationType()));
+            if (firstNode != null) {
+                putToQueue(enrichWithRuleChainId(envelope.getMsg()), msg -> pushMsgToNode(firstNode, msg, envelope.getFromRelationType()));
+            }
         } else {
-            pushMsgToNode(firstNode, envelope.getMsg(), envelope.getFromRelationType());
+            if (firstNode != null) {
+                pushMsgToNode(firstNode, envelope.getMsg(), envelope.getFromRelationType());
+            } else {
+                TbMsg msg = envelope.getMsg();
+                EntityId ackId = msg.getRuleNodeId() != null ? msg.getRuleNodeId() : msg.getRuleChainId();
+                queue.ack(tenantId, envelope.getMsg(), ackId.getId(), msg.getClusterPartition());
+            }
         }
     }
 
diff --git a/application/src/test/java/org/thingsboard/server/controller/AbstractRuleEngineControllerTest.java b/application/src/test/java/org/thingsboard/server/controller/AbstractRuleEngineControllerTest.java
index 6c2d3db..1b042a8 100644
--- a/application/src/test/java/org/thingsboard/server/controller/AbstractRuleEngineControllerTest.java
+++ b/application/src/test/java/org/thingsboard/server/controller/AbstractRuleEngineControllerTest.java
@@ -32,6 +32,7 @@ import org.thingsboard.server.dao.rule.RuleChainService;
 import org.thingsboard.server.service.queue.MsgQueueService;
 
 import java.io.IOException;
+import java.util.function.Predicate;
 
 /**
  * Created by ashvayka on 20.03.18.
@@ -75,4 +76,9 @@ public class AbstractRuleEngineControllerTest extends AbstractControllerTest {
             throw new RuntimeException(e);
         }
     }
+
+    protected Predicate<Event> filterByCustomEvent() {
+        return event -> event.getBody().get("msgType").textValue().equals("CUSTOM");
+    }
+
 }
diff --git a/application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java b/application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java
index 356dfee..c86d496 100644
--- a/application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java
+++ b/application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java
@@ -47,6 +47,8 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
 
 import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
 
@@ -157,15 +159,15 @@ public abstract class AbstractRuleEngineFlowIntegrationTest extends AbstractRule
 
         Thread.sleep(3000);
 
-        TimePageData<Event> events = getDebugEvents(savedTenant.getId(), ruleChain.getFirstRuleNodeId(), 1000);
+        TimePageData<Event> eventsPage = getDebugEvents(savedTenant.getId(), ruleChain.getFirstRuleNodeId(), 1000);
+        List<Event> events = eventsPage.getData().stream().filter(filterByCustomEvent()).collect(Collectors.toList());
+        Assert.assertEquals(2, events.size());
 
-        Assert.assertEquals(2, events.getData().size());
-
-        Event inEvent = events.getData().stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.IN)).findFirst().get();
+        Event inEvent = events.stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.IN)).findFirst().get();
         Assert.assertEquals(ruleChain.getFirstRuleNodeId(), inEvent.getEntityId());
         Assert.assertEquals(device.getId().getId().toString(), inEvent.getBody().get("entityId").asText());
 
-        Event outEvent = events.getData().stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.OUT)).findFirst().get();
+        Event outEvent = events.stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.OUT)).findFirst().get();
         Assert.assertEquals(ruleChain.getFirstRuleNodeId(), outEvent.getEntityId());
         Assert.assertEquals(device.getId().getId().toString(), outEvent.getBody().get("entityId").asText());
 
@@ -174,15 +176,16 @@ public abstract class AbstractRuleEngineFlowIntegrationTest extends AbstractRule
         RuleChain finalRuleChain = ruleChain;
         RuleNode lastRuleNode = metaData.getNodes().stream().filter(node -> !node.getId().equals(finalRuleChain.getFirstRuleNodeId())).findFirst().get();
 
-        events = getDebugEvents(savedTenant.getId(), lastRuleNode.getId(), 1000);
+        eventsPage = getDebugEvents(savedTenant.getId(), lastRuleNode.getId(), 1000);
+        events = eventsPage.getData().stream().filter(filterByCustomEvent()).collect(Collectors.toList());
 
-        Assert.assertEquals(2, events.getData().size());
+        Assert.assertEquals(2, events.size());
 
-        inEvent = events.getData().stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.IN)).findFirst().get();
+        inEvent = events.stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.IN)).findFirst().get();
         Assert.assertEquals(lastRuleNode.getId(), inEvent.getEntityId());
         Assert.assertEquals(device.getId().getId().toString(), inEvent.getBody().get("entityId").asText());
 
-        outEvent = events.getData().stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.OUT)).findFirst().get();
+        outEvent = events.stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.OUT)).findFirst().get();
         Assert.assertEquals(lastRuleNode.getId(), outEvent.getEntityId());
         Assert.assertEquals(device.getId().getId().toString(), outEvent.getBody().get("entityId").asText());
 
@@ -274,15 +277,16 @@ public abstract class AbstractRuleEngineFlowIntegrationTest extends AbstractRule
 
         Thread.sleep(3000);
 
-        TimePageData<Event> events = getDebugEvents(savedTenant.getId(), rootRuleChain.getFirstRuleNodeId(), 1000);
+        TimePageData<Event> eventsPage = getDebugEvents(savedTenant.getId(), rootRuleChain.getFirstRuleNodeId(), 1000);
+        List<Event> events = eventsPage.getData().stream().filter(filterByCustomEvent()).collect(Collectors.toList());
 
-        Assert.assertEquals(2, events.getData().size());
+        Assert.assertEquals(2, events.size());
 
-        Event inEvent = events.getData().stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.IN)).findFirst().get();
+        Event inEvent = events.stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.IN)).findFirst().get();
         Assert.assertEquals(rootRuleChain.getFirstRuleNodeId(), inEvent.getEntityId());
         Assert.assertEquals(device.getId().getId().toString(), inEvent.getBody().get("entityId").asText());
 
-        Event outEvent = events.getData().stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.OUT)).findFirst().get();
+        Event outEvent = events.stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.OUT)).findFirst().get();
         Assert.assertEquals(rootRuleChain.getFirstRuleNodeId(), outEvent.getEntityId());
         Assert.assertEquals(device.getId().getId().toString(), outEvent.getBody().get("entityId").asText());
 
@@ -291,15 +295,17 @@ public abstract class AbstractRuleEngineFlowIntegrationTest extends AbstractRule
         RuleChain finalRuleChain = rootRuleChain;
         RuleNode lastRuleNode = secondaryMetaData.getNodes().stream().filter(node -> !node.getId().equals(finalRuleChain.getFirstRuleNodeId())).findFirst().get();
 
-        events = getDebugEvents(savedTenant.getId(), lastRuleNode.getId(), 1000);
+        eventsPage = getDebugEvents(savedTenant.getId(), lastRuleNode.getId(), 1000);
+        events = eventsPage.getData().stream().filter(filterByCustomEvent()).collect(Collectors.toList());
+
 
-        Assert.assertEquals(2, events.getData().size());
+        Assert.assertEquals(2, events.size());
 
-        inEvent = events.getData().stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.IN)).findFirst().get();
+        inEvent = events.stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.IN)).findFirst().get();
         Assert.assertEquals(lastRuleNode.getId(), inEvent.getEntityId());
         Assert.assertEquals(device.getId().getId().toString(), inEvent.getBody().get("entityId").asText());
 
-        outEvent = events.getData().stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.OUT)).findFirst().get();
+        outEvent = events.stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.OUT)).findFirst().get();
         Assert.assertEquals(lastRuleNode.getId(), outEvent.getEntityId());
         Assert.assertEquals(device.getId().getId().toString(), outEvent.getBody().get("entityId").asText());
 
diff --git a/application/src/test/java/org/thingsboard/server/rules/lifecycle/AbstractRuleEngineLifecycleIntegrationTest.java b/application/src/test/java/org/thingsboard/server/rules/lifecycle/AbstractRuleEngineLifecycleIntegrationTest.java
index 2f25b97..7ac0789 100644
--- a/application/src/test/java/org/thingsboard/server/rules/lifecycle/AbstractRuleEngineLifecycleIntegrationTest.java
+++ b/application/src/test/java/org/thingsboard/server/rules/lifecycle/AbstractRuleEngineLifecycleIntegrationTest.java
@@ -45,6 +45,8 @@ import org.thingsboard.server.dao.attributes.AttributesService;
 
 import java.io.IOException;
 import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
 
 import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
 
@@ -144,15 +146,16 @@ public abstract class AbstractRuleEngineLifecycleIntegrationTest extends Abstrac
 
         Thread.sleep(3000);
 
-        TimePageData<Event> events = getDebugEvents(savedTenant.getId(), ruleChain.getFirstRuleNodeId(), 1000);
+        TimePageData<Event> eventsPage = getDebugEvents(savedTenant.getId(), ruleChain.getFirstRuleNodeId(), 1000);
+        List<Event> events = eventsPage.getData().stream().filter(filterByCustomEvent()).collect(Collectors.toList());
 
-        Assert.assertEquals(2, events.getData().size());
+        Assert.assertEquals(2, events.size());
 
-        Event inEvent = events.getData().stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.IN)).findFirst().get();
+        Event inEvent = events.stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.IN)).findFirst().get();
         Assert.assertEquals(ruleChain.getFirstRuleNodeId(), inEvent.getEntityId());
         Assert.assertEquals(device.getId().getId().toString(), inEvent.getBody().get("entityId").asText());
 
-        Event outEvent = events.getData().stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.OUT)).findFirst().get();
+        Event outEvent = events.stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.OUT)).findFirst().get();
         Assert.assertEquals(ruleChain.getFirstRuleNodeId(), outEvent.getEntityId());
         Assert.assertEquals(device.getId().getId().toString(), outEvent.getBody().get("entityId").asText());
 
@@ -212,15 +215,16 @@ public abstract class AbstractRuleEngineLifecycleIntegrationTest extends Abstrac
 
         Thread.sleep(3000);
 
-        TimePageData<Event> events = getDebugEvents(savedTenant.getId(), ruleChain.getFirstRuleNodeId(), 1000);
+        TimePageData<Event> eventsPage = getDebugEvents(savedTenant.getId(), ruleChain.getFirstRuleNodeId(), 1000);
+        List<Event> events = eventsPage.getData().stream().filter(filterByCustomEvent()).collect(Collectors.toList());
 
-        Assert.assertEquals(2, events.getData().size());
+        Assert.assertEquals(2, events.size());
 
-        Event inEvent = events.getData().stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.IN)).findFirst().get();
+        Event inEvent = events.stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.IN)).findFirst().get();
         Assert.assertEquals(ruleChain.getFirstRuleNodeId(), inEvent.getEntityId());
         Assert.assertEquals(device.getId().getId().toString(), inEvent.getBody().get("entityId").asText());
 
-        Event outEvent = events.getData().stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.OUT)).findFirst().get();
+        Event outEvent = events.stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.OUT)).findFirst().get();
         Assert.assertEquals(ruleChain.getFirstRuleNodeId(), outEvent.getEntityId());
         Assert.assertEquals(device.getId().getId().toString(), outEvent.getBody().get("entityId").asText());
 
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbAbstractGetAttributesNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbAbstractGetAttributesNode.java
index c6212cc..65b2f11 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbAbstractGetAttributesNode.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbAbstractGetAttributesNode.java
@@ -15,7 +15,6 @@
  */
 package org.thingsboard.rule.engine.metadata;
 
-import com.google.common.base.Function;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import org.apache.commons.collections.CollectionUtils;
@@ -33,9 +32,11 @@ import java.util.List;
 import static org.thingsboard.rule.engine.DonAsynchron.withCallback;
 import static org.thingsboard.rule.engine.api.TbRelationTypes.FAILURE;
 import static org.thingsboard.rule.engine.api.TbRelationTypes.SUCCESS;
-import static org.thingsboard.server.common.data.DataConstants.*;
+import static org.thingsboard.server.common.data.DataConstants.CLIENT_SCOPE;
+import static org.thingsboard.server.common.data.DataConstants.SERVER_SCOPE;
+import static org.thingsboard.server.common.data.DataConstants.SHARED_SCOPE;
 
-public abstract class TbAbstractGetAttributesNode<C extends TbGetAttributesNodeConfiguration, T extends EntityId>  implements TbNode {
+public abstract class TbAbstractGetAttributesNode<C extends TbGetAttributesNodeConfiguration, T extends EntityId> implements TbNode {
 
     protected C config;
 
@@ -59,7 +60,7 @@ public abstract class TbAbstractGetAttributesNode<C extends TbGetAttributesNodeC
     }
 
     private void safePutAttributes(TbContext ctx, TbMsg msg, T entityId) {
-        if(entityId == null || entityId.isNullUid()) {
+        if (entityId == null || entityId.isNullUid()) {
             ctx.tellNext(msg, FAILURE);
             return;
         }
@@ -78,7 +79,13 @@ public abstract class TbAbstractGetAttributesNode<C extends TbGetAttributesNodeC
         }
         ListenableFuture<List<AttributeKvEntry>> latest = ctx.getAttributesService().find(entityId, scope, keys);
         return Futures.transform(latest, l -> {
-            l.forEach(r -> msg.getMetaData().putValue(prefix + r.getKey(), r.getValueAsString()));
+            l.forEach(r -> {
+                if (r.getValue() != null) {
+                    msg.getMetaData().putValue(prefix + r.getKey(), r.getValueAsString());
+                } else {
+                    throw new RuntimeException("[" + scope + "][" + r.getKey() + "] attribute value is not present in the DB!");
+                }
+            });
             return null;
         });
     }
@@ -89,7 +96,13 @@ public abstract class TbAbstractGetAttributesNode<C extends TbGetAttributesNodeC
         }
         ListenableFuture<List<TsKvEntry>> latest = ctx.getTimeseriesService().findLatest(entityId, keys);
         return Futures.transform(latest, l -> {
-            l.forEach(r -> msg.getMetaData().putValue(r.getKey(), r.getValueAsString()));
+            l.forEach(r -> {
+                if (r.getValue() != null) {
+                    msg.getMetaData().putValue(r.getKey(), r.getValueAsString());
+                } else {
+                    throw new RuntimeException("[" + r.getKey() + "] telemetry value is not present in the DB!");
+                }
+            });
             return null;
         });
     }
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java
index ea4446d..dc0fcdd 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java
@@ -69,10 +69,8 @@ public class TbMsgTimeseriesNode implements TbNode {
             try {
                 ts = Long.parseLong(tsStr);
             } catch (NumberFormatException e) {}
-        }
-        if (ts == -1) {
-            ctx.tellFailure(msg, new IllegalArgumentException("Msg metadata doesn't contain valid ts value: " + msg.getMetaData()));
-            return;
+        } else {
+            ts = System.currentTimeMillis();
         }
         String src = msg.getData();
         TelemetryUploadRequest telemetryUploadRequest = JsonConverter.convertToTelemetry(new JsonParser().parse(src), ts);