thingsboard-developers
Changes
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbEntityGetAttrNode.java 22(+20 -2)
Details
diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java
index e852500..2908a4a 100644
--- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java
+++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java
@@ -25,6 +25,7 @@ import org.thingsboard.server.dao.device.DeviceService;
import org.thingsboard.server.dao.plugin.PluginService;
import org.thingsboard.server.dao.rule.RuleChainService;
import org.thingsboard.server.dao.rule.RuleService;
+import org.thingsboard.server.dao.timeseries.TimeseriesService;
import org.thingsboard.server.dao.user.UserService;
import java.util.UUID;
@@ -68,4 +69,6 @@ public interface TbContext {
RuleChainService getRuleChainService();
+ TimeseriesService getTimeseriesService();
+
}
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbEntityGetAttrNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbEntityGetAttrNode.java
index 7ba7dbc..2d66414 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbEntityGetAttrNode.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbEntityGetAttrNode.java
@@ -15,14 +15,19 @@
*/
package org.thingsboard.rule.engine.metadata;
+import com.google.common.base.Function;
+import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.thingsboard.rule.engine.TbNodeUtils;
import org.thingsboard.rule.engine.api.*;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
+import org.thingsboard.server.common.data.kv.KvEntry;
+import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.msg.TbMsg;
import java.util.List;
+import java.util.stream.Collectors;
import static org.thingsboard.rule.engine.DonAsynchron.withCallback;
import static org.thingsboard.server.common.data.DataConstants.SERVER_SCOPE;
@@ -42,7 +47,7 @@ public abstract class TbEntityGetAttrNode<T extends EntityId> implements TbNode
withCallback(
findEntityAsync(ctx, msg.getOriginator()),
entityId -> withCallback(
- ctx.getAttributesService().find(entityId, SERVER_SCOPE, config.getAttrMapping().keySet()),
+ config.isTelemetry() ? getLatestTelemetry(ctx, entityId) : getAttributesAsync(ctx, entityId),
attributes -> putAttributesAndTell(ctx, msg, attributes),
t -> ctx.tellError(msg, t)
),
@@ -52,7 +57,20 @@ public abstract class TbEntityGetAttrNode<T extends EntityId> implements TbNode
}
}
- private void putAttributesAndTell(TbContext ctx, TbMsg msg, List<AttributeKvEntry> attributes) {
+ private ListenableFuture<List<KvEntry>> getAttributesAsync(TbContext ctx, EntityId entityId) {
+ ListenableFuture<List<AttributeKvEntry>> latest = ctx.getAttributesService().find(entityId, SERVER_SCOPE, config.getAttrMapping().keySet());
+ return Futures.transform(latest, (Function<? super List<AttributeKvEntry>, ? extends List<KvEntry>>) l ->
+ l.stream().map(i -> (KvEntry) i).collect(Collectors.toList()));
+ }
+
+ private ListenableFuture<List<KvEntry>> getLatestTelemetry(TbContext ctx, EntityId entityId) {
+ ListenableFuture<List<TsKvEntry>> latest = ctx.getTimeseriesService().findLatest(entityId, config.getAttrMapping().keySet());
+ return Futures.transform(latest, (Function<? super List<TsKvEntry>, ? extends List<KvEntry>>) l ->
+ l.stream().map(i -> (KvEntry) i).collect(Collectors.toList()));
+ }
+
+
+ private void putAttributesAndTell(TbContext ctx, TbMsg msg, List<KvEntry> attributes) {
attributes.forEach(r -> {
String attrName = config.getAttrMapping().get(r.getKey());
msg.getMetaData().putValue(attrName, r.getValueAsString());
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetEntityAttrNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetEntityAttrNodeConfiguration.java
index bd53e83..a5e85c5 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetEntityAttrNodeConfiguration.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetEntityAttrNodeConfiguration.java
@@ -24,4 +24,5 @@ import java.util.Optional;
public class TbGetEntityAttrNodeConfiguration {
private Map<String, String> attrMapping;
+ private boolean isTelemetry = false;
}
diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/TbGetCustomerAttributeNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/TbGetCustomerAttributeNodeTest.java
index ae41c00..23c5d4b 100644
--- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/TbGetCustomerAttributeNodeTest.java
+++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/TbGetCustomerAttributeNodeTest.java
@@ -35,14 +35,13 @@ import org.thingsboard.server.common.data.id.AssetId;
import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.UserId;
-import org.thingsboard.server.common.data.kv.AttributeKvEntry;
-import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
-import org.thingsboard.server.common.data.kv.StringDataEntry;
+import org.thingsboard.server.common.data.kv.*;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.dao.asset.AssetService;
import org.thingsboard.server.dao.attributes.AttributesService;
import org.thingsboard.server.dao.device.DeviceService;
+import org.thingsboard.server.dao.timeseries.TimeseriesService;
import org.thingsboard.server.dao.user.UserService;
import java.util.Collections;
@@ -68,6 +67,8 @@ public class TbGetCustomerAttributeNodeTest {
@Mock
private AttributesService attributesService;
@Mock
+ private TimeseriesService timeseriesService;
+ @Mock
private UserService userService;
@Mock
private AssetService assetService;
@@ -82,6 +83,7 @@ public class TbGetCustomerAttributeNodeTest {
Map<String, String> attrMapping = new HashMap<>();
attrMapping.putIfAbsent("temperature", "tempo");
config.setAttrMapping(attrMapping);
+ config.setTelemetry(false);
ObjectMapper mapper = new ObjectMapper();
TbNodeConfiguration nodeConfiguration = new TbNodeConfiguration();
nodeConfiguration.setData(mapper.valueToTree(config));
@@ -214,6 +216,42 @@ public class TbGetCustomerAttributeNodeTest {
entityAttributeFetched(customerId);
}
+ @Test
+ public void deviceCustomerTelemetryFetched() throws TbNodeException {
+ TbGetEntityAttrNodeConfiguration config = new TbGetEntityAttrNodeConfiguration();
+ Map<String, String> attrMapping = new HashMap<>();
+ attrMapping.putIfAbsent("temperature", "tempo");
+ config.setAttrMapping(attrMapping);
+ config.setTelemetry(true);
+ ObjectMapper mapper = new ObjectMapper();
+ TbNodeConfiguration nodeConfiguration = new TbNodeConfiguration();
+ nodeConfiguration.setData(mapper.valueToTree(config));
+
+ node = new TbGetCustomerAttributeNode();
+ node.init(nodeConfiguration, null);
+
+
+ DeviceId deviceId = new DeviceId(UUIDs.timeBased());
+ CustomerId customerId = new CustomerId(UUIDs.timeBased());
+ Device device = new Device();
+ device.setCustomerId(customerId);
+
+ msg = new TbMsg(UUIDs.timeBased(), "USER", deviceId, new TbMsgMetaData(), new byte[4]);
+
+ when(ctx.getDeviceService()).thenReturn(deviceService);
+ when(deviceService.findDeviceByIdAsync(deviceId)).thenReturn(Futures.immediateFuture(device));
+
+ List<TsKvEntry> timeseries = Lists.newArrayList(new BasicTsKvEntry(1L, new StringDataEntry("temperature", "highest")));
+
+ when(ctx.getTimeseriesService()).thenReturn(timeseriesService);
+ when(timeseriesService.findLatest(customerId, Collections.singleton("temperature")))
+ .thenReturn(Futures.immediateFuture(timeseries));
+
+ node.onMsg(ctx, msg);
+ verify(ctx).tellNext(msg);
+ assertEquals(msg.getMetaData().getValue("tempo"), "highest");
+ }
+
private void entityAttributeFetched(CustomerId customerId) {
List<AttributeKvEntry> attributes = Lists.newArrayList(new BaseAttributeKvEntry(new StringDataEntry("temperature", "high"), 1L));