thingsboard-aplcache

use FAILURE chain in Customer/Tenant/Related entity not found fetch

5/11/2018 5:09:03 AM

Details

diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbEntityGetAttrNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbEntityGetAttrNode.java
index ac2ac8d..480e9eb 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbEntityGetAttrNode.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbEntityGetAttrNode.java
@@ -1,12 +1,12 @@
 /**
  * Copyright © 2016-2018 The Thingsboard Authors
- *
+ * <p>
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -18,8 +18,12 @@ package org.thingsboard.rule.engine.metadata;
 import com.google.common.base.Function;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
+import lombok.extern.slf4j.Slf4j;
 import org.thingsboard.rule.engine.TbNodeUtils;
-import org.thingsboard.rule.engine.api.*;
+import org.thingsboard.rule.engine.api.TbContext;
+import org.thingsboard.rule.engine.api.TbNode;
+import org.thingsboard.rule.engine.api.TbNodeConfiguration;
+import org.thingsboard.rule.engine.api.TbNodeException;
 import org.thingsboard.server.common.data.id.EntityId;
 import org.thingsboard.server.common.data.kv.AttributeKvEntry;
 import org.thingsboard.server.common.data.kv.KvEntry;
@@ -30,9 +34,11 @@ import java.util.List;
 import java.util.stream.Collectors;
 
 import static org.thingsboard.rule.engine.DonAsynchron.withCallback;
+import static org.thingsboard.rule.engine.api.TbRelationTypes.FAILURE;
 import static org.thingsboard.rule.engine.api.TbRelationTypes.SUCCESS;
 import static org.thingsboard.server.common.data.DataConstants.SERVER_SCOPE;
 
+@Slf4j
 public abstract class TbEntityGetAttrNode<T extends EntityId> implements TbNode {
 
     private TbGetEntityAttrNodeConfiguration config;
@@ -47,17 +53,24 @@ public abstract class TbEntityGetAttrNode<T extends EntityId> implements TbNode 
         try {
             withCallback(
                     findEntityAsync(ctx, msg.getOriginator()),
-                    entityId -> withCallback(
-                            config.isTelemetry() ? getLatestTelemetry(ctx, entityId) : getAttributesAsync(ctx, entityId),
-                            attributes -> putAttributesAndTell(ctx, msg, attributes),
-                            t -> ctx.tellError(msg, t)
-                    ),
+                    entityId -> safeGetAttributes(ctx, msg, entityId),
                     t -> ctx.tellError(msg, t));
         } catch (Throwable th) {
             ctx.tellError(msg, th);
         }
     }
 
+    private void safeGetAttributes(TbContext ctx, TbMsg msg, T entityId) {
+        if(entityId == null || entityId.isNullUid()) {
+            ctx.tellNext(msg, FAILURE);
+            return;
+        }
+
+        withCallback(config.isTelemetry() ? getLatestTelemetry(ctx, entityId) : getAttributesAsync(ctx, entityId),
+                attributes -> putAttributesAndTell(ctx, msg, attributes),
+                t -> ctx.tellError(msg, t));
+    }
+
     private ListenableFuture<List<KvEntry>> getAttributesAsync(TbContext ctx, EntityId entityId) {
         ListenableFuture<List<AttributeKvEntry>> latest = ctx.getAttributesService().find(entityId, SERVER_SCOPE, config.getAttrMapping().keySet());
         return Futures.transform(latest, (Function<? super List<AttributeKvEntry>, ? extends List<KvEntry>>) l ->
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetAttributesNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetAttributesNode.java
index 436d124..06a0f44 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetAttributesNode.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetAttributesNode.java
@@ -58,23 +58,17 @@ public class TbGetAttributesNode implements TbNode {
 
     @Override
     public void onMsg(TbContext ctx, TbMsg msg) throws TbNodeException {
-        // todo-vp: both telemetry and attributes should be processes
-        if (CollectionUtils.isNotEmpty(config.getLatestTsKeyNames())) {
-            withCallback(getLatestTelemetry(ctx, msg, config.getLatestTsKeyNames()),
-                    i -> ctx.tellNext(msg, SUCCESS),
-                    t -> ctx.tellError(msg, t));
-        } else {
-            ListenableFuture<List<Void>> future = Futures.allAsList(
-                    putAttrAsync(ctx, msg, CLIENT_SCOPE, config.getClientAttributeNames(), "cs_"),
-                    putAttrAsync(ctx, msg, SHARED_SCOPE, config.getSharedAttributeNames(), "shared_"),
-                    putAttrAsync(ctx, msg, SERVER_SCOPE, config.getServerAttributeNames(), "ss_"));
-
-            withCallback(future, i -> ctx.tellNext(msg, SUCCESS), t -> ctx.tellError(msg, t));
-        }
+        ListenableFuture<List<Void>> allFutures = Futures.allAsList(
+                putLatestTelemetry(ctx, msg, config.getLatestTsKeyNames()),
+                putAttrAsync(ctx, msg, CLIENT_SCOPE, config.getClientAttributeNames(), "cs_"),
+                putAttrAsync(ctx, msg, SHARED_SCOPE, config.getSharedAttributeNames(), "shared_"),
+                putAttrAsync(ctx, msg, SERVER_SCOPE, config.getServerAttributeNames(), "ss_")
+        );
+        withCallback(allFutures, i -> ctx.tellNext(msg, SUCCESS), t -> ctx.tellError(msg, t));
     }
 
     private ListenableFuture<Void> putAttrAsync(TbContext ctx, TbMsg msg, String scope, List<String> keys, String prefix) {
-        if (keys == null) {
+        if (CollectionUtils.isEmpty(keys)) {
             return Futures.immediateFuture(null);
         }
         ListenableFuture<List<AttributeKvEntry>> latest = ctx.getAttributesService().find(msg.getOriginator(), scope, keys);
@@ -84,8 +78,8 @@ public class TbGetAttributesNode implements TbNode {
         });
     }
 
-    private ListenableFuture<Void> getLatestTelemetry(TbContext ctx, TbMsg msg, List<String> keys) {
-        if (keys == null) {
+    private ListenableFuture<Void> putLatestTelemetry(TbContext ctx, TbMsg msg, List<String> keys) {
+        if (CollectionUtils.isEmpty(keys)) {
             return Futures.immediateFuture(null);
         }
         ListenableFuture<List<TsKvEntry>> latest = ctx.getTimeseriesService().findLatest(msg.getOriginator(), keys);
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesCustomerIdAsyncLoader.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesCustomerIdAsyncLoader.java
index 67eb808..73e1945 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesCustomerIdAsyncLoader.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesCustomerIdAsyncLoader.java
@@ -45,6 +45,6 @@ public class EntitiesCustomerIdAsyncLoader {
     private static <T extends HasCustomerId> ListenableFuture<CustomerId> getCustomerAsync(ListenableFuture<T> future) {
         return Futures.transform(future, (AsyncFunction<HasCustomerId, CustomerId>) in -> {
             return in != null ? Futures.immediateFuture(in.getCustomerId())
-                    : Futures.immediateFailedFuture(new IllegalStateException("Customer not found"));});
+                    : Futures.immediateFuture(null);});
     }
 }
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesRelatedEntityIdAsyncLoader.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesRelatedEntityIdAsyncLoader.java
index 08ce38e..55be558 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesRelatedEntityIdAsyncLoader.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesRelatedEntityIdAsyncLoader.java
@@ -40,11 +40,11 @@ public class EntitiesRelatedEntityIdAsyncLoader {
         if (relationsQuery.getDirection() == EntitySearchDirection.FROM) {
             return Futures.transform(asyncRelation, (AsyncFunction<? super List<EntityRelation>, EntityId>)
                     r -> CollectionUtils.isNotEmpty(r) ? Futures.immediateFuture(r.get(0).getTo())
-                            : Futures.immediateFailedFuture(new IllegalStateException("Relation not found")));
+                            : Futures.immediateFuture(null));
         } else if (relationsQuery.getDirection() == EntitySearchDirection.TO) {
             return Futures.transform(asyncRelation, (AsyncFunction<? super List<EntityRelation>, EntityId>)
                     r -> CollectionUtils.isNotEmpty(r) ? Futures.immediateFuture(r.get(0).getFrom())
-                            : Futures.immediateFailedFuture(new IllegalStateException("Relation not found")));
+                            : Futures.immediateFuture(null));
         }
         return Futures.immediateFailedFuture(new IllegalStateException("Unknown direction"));
     }
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesTenantIdAsyncLoader.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesTenantIdAsyncLoader.java
index 5d2aaa8..3d5c64e 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesTenantIdAsyncLoader.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesTenantIdAsyncLoader.java
@@ -53,6 +53,6 @@ public class EntitiesTenantIdAsyncLoader {
     private static <T extends HasTenantId> ListenableFuture<TenantId> getTenantAsync(ListenableFuture<T> future) {
         return Futures.transform(future, (AsyncFunction<HasTenantId, TenantId>) in -> {
             return in != null ? Futures.immediateFuture(in.getTenantId())
-                    : Futures.immediateFailedFuture(new IllegalStateException("Tenant not found"));});
+                    : Futures.immediateFuture(null);});
     }
 }
diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/TbGetCustomerAttributeNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/TbGetCustomerAttributeNodeTest.java
index 7a2d457..555a3c3 100644
--- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/TbGetCustomerAttributeNodeTest.java
+++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/TbGetCustomerAttributeNodeTest.java
@@ -1,12 +1,12 @@
 /**
  * Copyright © 2016-2018 The Thingsboard Authors
- *
+ * <p>
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -31,12 +31,7 @@ import org.thingsboard.rule.engine.api.TbNodeException;
 import org.thingsboard.server.common.data.Device;
 import org.thingsboard.server.common.data.User;
 import org.thingsboard.server.common.data.asset.Asset;
-import org.thingsboard.server.common.data.id.AssetId;
-import org.thingsboard.server.common.data.id.CustomerId;
-import org.thingsboard.server.common.data.id.DeviceId;
-import org.thingsboard.server.common.data.id.RuleChainId;
-import org.thingsboard.server.common.data.id.RuleNodeId;
-import org.thingsboard.server.common.data.id.UserId;
+import org.thingsboard.server.common.data.id.*;
 import org.thingsboard.server.common.data.kv.*;
 import org.thingsboard.server.common.msg.TbMsg;
 import org.thingsboard.server.common.msg.TbMsgMetaData;
@@ -56,6 +51,7 @@ import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.same;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
+import static org.thingsboard.rule.engine.api.TbRelationTypes.FAILURE;
 import static org.thingsboard.rule.engine.api.TbRelationTypes.SUCCESS;
 import static org.thingsboard.server.common.data.DataConstants.SERVER_SCOPE;
 
@@ -149,7 +145,7 @@ public class TbGetCustomerAttributeNodeTest {
     }
 
     @Test
-    public void errorThrownIfCustomerCannotBeFound() {
+    public void failedChainUsedIfCustomerCannotBeFound() {
         UserId userId = new UserId(UUIDs.timeBased());
         CustomerId customerId = new CustomerId(UUIDs.timeBased());
         User user = new User();
@@ -160,13 +156,9 @@ public class TbGetCustomerAttributeNodeTest {
         when(ctx.getUserService()).thenReturn(userService);
         when(userService.findUserByIdAsync(userId)).thenReturn(Futures.immediateFuture(null));
 
-        node.onMsg(ctx, msg);
-        final ArgumentCaptor<Throwable> captor = ArgumentCaptor.forClass(Throwable.class);
-        verify(ctx).tellError(same(msg), captor.capture());
 
-        Throwable value = captor.getValue();
-        assertEquals(IllegalStateException.class, value.getClass());
-        assertEquals("Customer not found", value.getMessage());
+        node.onMsg(ctx, msg);
+        verify(ctx).tellNext(msg, FAILURE);
         assertTrue(msg.getMetaData().getData().isEmpty());
     }