thingsboard-developers

add Timeseries GetAttribute node

3/19/2018 2:33:04 PM

Details

diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java
index e852500..2908a4a 100644
--- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java
+++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java
@@ -25,6 +25,7 @@ import org.thingsboard.server.dao.device.DeviceService;
 import org.thingsboard.server.dao.plugin.PluginService;
 import org.thingsboard.server.dao.rule.RuleChainService;
 import org.thingsboard.server.dao.rule.RuleService;
+import org.thingsboard.server.dao.timeseries.TimeseriesService;
 import org.thingsboard.server.dao.user.UserService;
 
 import java.util.UUID;
@@ -68,4 +69,6 @@ public interface TbContext {
 
     RuleChainService getRuleChainService();
 
+    TimeseriesService getTimeseriesService();
+
 }
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbEntityGetAttrNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbEntityGetAttrNode.java
index 7ba7dbc..2d66414 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbEntityGetAttrNode.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbEntityGetAttrNode.java
@@ -15,14 +15,19 @@
  */
 package org.thingsboard.rule.engine.metadata;
 
+import com.google.common.base.Function;
+import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import org.thingsboard.rule.engine.TbNodeUtils;
 import org.thingsboard.rule.engine.api.*;
 import org.thingsboard.server.common.data.id.EntityId;
 import org.thingsboard.server.common.data.kv.AttributeKvEntry;
+import org.thingsboard.server.common.data.kv.KvEntry;
+import org.thingsboard.server.common.data.kv.TsKvEntry;
 import org.thingsboard.server.common.msg.TbMsg;
 
 import java.util.List;
+import java.util.stream.Collectors;
 
 import static org.thingsboard.rule.engine.DonAsynchron.withCallback;
 import static org.thingsboard.server.common.data.DataConstants.SERVER_SCOPE;
@@ -42,7 +47,7 @@ public abstract class TbEntityGetAttrNode<T extends EntityId> implements TbNode 
             withCallback(
                     findEntityAsync(ctx, msg.getOriginator()),
                     entityId -> withCallback(
-                            ctx.getAttributesService().find(entityId, SERVER_SCOPE, config.getAttrMapping().keySet()),
+                            config.isTelemetry() ? getLatestTelemetry(ctx, entityId) : getAttributesAsync(ctx, entityId),
                             attributes -> putAttributesAndTell(ctx, msg, attributes),
                             t -> ctx.tellError(msg, t)
                     ),
@@ -52,7 +57,20 @@ public abstract class TbEntityGetAttrNode<T extends EntityId> implements TbNode 
         }
     }
 
-    private void putAttributesAndTell(TbContext ctx, TbMsg msg, List<AttributeKvEntry> attributes) {
+    private ListenableFuture<List<KvEntry>> getAttributesAsync(TbContext ctx, EntityId entityId) {
+        ListenableFuture<List<AttributeKvEntry>> latest = ctx.getAttributesService().find(entityId, SERVER_SCOPE, config.getAttrMapping().keySet());
+        return Futures.transform(latest, (Function<? super List<AttributeKvEntry>, ? extends List<KvEntry>>) l ->
+                l.stream().map(i -> (KvEntry) i).collect(Collectors.toList()));
+    }
+
+    private ListenableFuture<List<KvEntry>> getLatestTelemetry(TbContext ctx, EntityId entityId) {
+        ListenableFuture<List<TsKvEntry>> latest = ctx.getTimeseriesService().findLatest(entityId, config.getAttrMapping().keySet());
+        return Futures.transform(latest, (Function<? super List<TsKvEntry>, ? extends List<KvEntry>>) l ->
+                l.stream().map(i -> (KvEntry) i).collect(Collectors.toList()));
+    }
+
+
+    private void putAttributesAndTell(TbContext ctx, TbMsg msg, List<KvEntry> attributes) {
         attributes.forEach(r -> {
             String attrName = config.getAttrMapping().get(r.getKey());
             msg.getMetaData().putValue(attrName, r.getValueAsString());
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetEntityAttrNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetEntityAttrNodeConfiguration.java
index bd53e83..a5e85c5 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetEntityAttrNodeConfiguration.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetEntityAttrNodeConfiguration.java
@@ -24,4 +24,5 @@ import java.util.Optional;
 public class TbGetEntityAttrNodeConfiguration {
 
     private Map<String, String> attrMapping;
+    private boolean isTelemetry = false;
 }
diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/TbGetCustomerAttributeNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/TbGetCustomerAttributeNodeTest.java
index ae41c00..23c5d4b 100644
--- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/TbGetCustomerAttributeNodeTest.java
+++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/TbGetCustomerAttributeNodeTest.java
@@ -35,14 +35,13 @@ import org.thingsboard.server.common.data.id.AssetId;
 import org.thingsboard.server.common.data.id.CustomerId;
 import org.thingsboard.server.common.data.id.DeviceId;
 import org.thingsboard.server.common.data.id.UserId;
-import org.thingsboard.server.common.data.kv.AttributeKvEntry;
-import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
-import org.thingsboard.server.common.data.kv.StringDataEntry;
+import org.thingsboard.server.common.data.kv.*;
 import org.thingsboard.server.common.msg.TbMsg;
 import org.thingsboard.server.common.msg.TbMsgMetaData;
 import org.thingsboard.server.dao.asset.AssetService;
 import org.thingsboard.server.dao.attributes.AttributesService;
 import org.thingsboard.server.dao.device.DeviceService;
+import org.thingsboard.server.dao.timeseries.TimeseriesService;
 import org.thingsboard.server.dao.user.UserService;
 
 import java.util.Collections;
@@ -68,6 +67,8 @@ public class TbGetCustomerAttributeNodeTest {
     @Mock
     private AttributesService attributesService;
     @Mock
+    private TimeseriesService timeseriesService;
+    @Mock
     private UserService userService;
     @Mock
     private AssetService assetService;
@@ -82,6 +83,7 @@ public class TbGetCustomerAttributeNodeTest {
         Map<String, String> attrMapping = new HashMap<>();
         attrMapping.putIfAbsent("temperature", "tempo");
         config.setAttrMapping(attrMapping);
+        config.setTelemetry(false);
         ObjectMapper mapper = new ObjectMapper();
         TbNodeConfiguration nodeConfiguration = new TbNodeConfiguration();
         nodeConfiguration.setData(mapper.valueToTree(config));
@@ -214,6 +216,42 @@ public class TbGetCustomerAttributeNodeTest {
         entityAttributeFetched(customerId);
     }
 
+    @Test
+    public void deviceCustomerTelemetryFetched() throws TbNodeException {
+        TbGetEntityAttrNodeConfiguration config = new TbGetEntityAttrNodeConfiguration();
+        Map<String, String> attrMapping = new HashMap<>();
+        attrMapping.putIfAbsent("temperature", "tempo");
+        config.setAttrMapping(attrMapping);
+        config.setTelemetry(true);
+        ObjectMapper mapper = new ObjectMapper();
+        TbNodeConfiguration nodeConfiguration = new TbNodeConfiguration();
+        nodeConfiguration.setData(mapper.valueToTree(config));
+
+        node = new TbGetCustomerAttributeNode();
+        node.init(nodeConfiguration, null);
+
+
+        DeviceId deviceId = new DeviceId(UUIDs.timeBased());
+        CustomerId customerId = new CustomerId(UUIDs.timeBased());
+        Device device = new Device();
+        device.setCustomerId(customerId);
+
+        msg = new TbMsg(UUIDs.timeBased(), "USER", deviceId, new TbMsgMetaData(), new byte[4]);
+
+        when(ctx.getDeviceService()).thenReturn(deviceService);
+        when(deviceService.findDeviceByIdAsync(deviceId)).thenReturn(Futures.immediateFuture(device));
+
+        List<TsKvEntry> timeseries = Lists.newArrayList(new BasicTsKvEntry(1L, new StringDataEntry("temperature", "highest")));
+
+        when(ctx.getTimeseriesService()).thenReturn(timeseriesService);
+        when(timeseriesService.findLatest(customerId, Collections.singleton("temperature")))
+                .thenReturn(Futures.immediateFuture(timeseries));
+
+        node.onMsg(ctx, msg);
+        verify(ctx).tellNext(msg);
+        assertEquals(msg.getMetaData().getValue("tempo"), "highest");
+    }
+
     private void entityAttributeFetched(CustomerId customerId) {
         List<AttributeKvEntry> attributes = Lists.newArrayList(new BaseAttributeKvEntry(new StringDataEntry("temperature", "high"), 1L));