thingsboard-memoizeit

Changes

Details

diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/alarm/Alarm.java b/common/data/src/main/java/org/thingsboard/server/common/data/alarm/Alarm.java
index 70f5042..125406c 100644
--- a/common/data/src/main/java/org/thingsboard/server/common/data/alarm/Alarm.java
+++ b/common/data/src/main/java/org/thingsboard/server/common/data/alarm/Alarm.java
@@ -22,6 +22,7 @@ import lombok.Builder;
 import lombok.Data;
 import org.thingsboard.server.common.data.BaseData;
 import org.thingsboard.server.common.data.HasName;
+import org.thingsboard.server.common.data.HasTenantId;
 import org.thingsboard.server.common.data.id.EntityId;
 import org.thingsboard.server.common.data.id.TenantId;
 
@@ -31,7 +32,7 @@ import org.thingsboard.server.common.data.id.TenantId;
 @Data
 @Builder
 @AllArgsConstructor
-public class Alarm extends BaseData<AlarmId> implements HasName {
+public class Alarm extends BaseData<AlarmId> implements HasName, HasTenantId {
 
     private TenantId tenantId;
     private String type;
diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/asset/Asset.java b/common/data/src/main/java/org/thingsboard/server/common/data/asset/Asset.java
index cc3c111..c7b246c 100644
--- a/common/data/src/main/java/org/thingsboard/server/common/data/asset/Asset.java
+++ b/common/data/src/main/java/org/thingsboard/server/common/data/asset/Asset.java
@@ -17,16 +17,13 @@ package org.thingsboard.server.common.data.asset;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import lombok.EqualsAndHashCode;
-import org.thingsboard.server.common.data.HasAdditionalInfo;
-import org.thingsboard.server.common.data.HasName;
-import org.thingsboard.server.common.data.SearchTextBased;
-import org.thingsboard.server.common.data.SearchTextBasedWithAdditionalInfo;
+import org.thingsboard.server.common.data.*;
 import org.thingsboard.server.common.data.id.AssetId;
 import org.thingsboard.server.common.data.id.CustomerId;
 import org.thingsboard.server.common.data.id.TenantId;
 
 @EqualsAndHashCode(callSuper = true)
-public class Asset extends SearchTextBasedWithAdditionalInfo<AssetId> implements HasName {
+public class Asset extends SearchTextBasedWithAdditionalInfo<AssetId> implements HasName, HasTenantId, HasCustomerId {
 
     private static final long serialVersionUID = 2807343040519543363L;
 
diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/Customer.java b/common/data/src/main/java/org/thingsboard/server/common/data/Customer.java
index 03115a9..078c97b 100644
--- a/common/data/src/main/java/org/thingsboard/server/common/data/Customer.java
+++ b/common/data/src/main/java/org/thingsboard/server/common/data/Customer.java
@@ -23,7 +23,7 @@ import org.thingsboard.server.common.data.id.TenantId;
 
 import com.fasterxml.jackson.databind.JsonNode;
 
-public class Customer extends ContactBased<CustomerId> implements HasName {
+public class Customer extends ContactBased<CustomerId> implements HasName, HasTenantId {
     
     private static final long serialVersionUID = -1599722990298929275L;
     
diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/Device.java b/common/data/src/main/java/org/thingsboard/server/common/data/Device.java
index 13fa011..95662c1 100644
--- a/common/data/src/main/java/org/thingsboard/server/common/data/Device.java
+++ b/common/data/src/main/java/org/thingsboard/server/common/data/Device.java
@@ -23,7 +23,7 @@ import org.thingsboard.server.common.data.id.TenantId;
 import com.fasterxml.jackson.databind.JsonNode;
 
 @EqualsAndHashCode(callSuper = true)
-public class Device extends SearchTextBasedWithAdditionalInfo<DeviceId> implements HasName {
+public class Device extends SearchTextBasedWithAdditionalInfo<DeviceId> implements HasName, HasTenantId, HasCustomerId {
 
     private static final long serialVersionUID = 2807343040519543363L;
 
diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/HasCustomerId.java b/common/data/src/main/java/org/thingsboard/server/common/data/HasCustomerId.java
new file mode 100644
index 0000000..e89eba8
--- /dev/null
+++ b/common/data/src/main/java/org/thingsboard/server/common/data/HasCustomerId.java
@@ -0,0 +1,23 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.common.data;
+
+import org.thingsboard.server.common.data.id.CustomerId;
+
+public interface HasCustomerId {
+
+    CustomerId getCustomerId();
+}
diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/HasTenantId.java b/common/data/src/main/java/org/thingsboard/server/common/data/HasTenantId.java
new file mode 100644
index 0000000..ebba003
--- /dev/null
+++ b/common/data/src/main/java/org/thingsboard/server/common/data/HasTenantId.java
@@ -0,0 +1,23 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.common.data;
+
+import org.thingsboard.server.common.data.id.TenantId;
+
+public interface HasTenantId {
+
+    TenantId getTenantId();
+}
diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/plugin/PluginMetaData.java b/common/data/src/main/java/org/thingsboard/server/common/data/plugin/PluginMetaData.java
index 8576264..4c33ffe 100644
--- a/common/data/src/main/java/org/thingsboard/server/common/data/plugin/PluginMetaData.java
+++ b/common/data/src/main/java/org/thingsboard/server/common/data/plugin/PluginMetaData.java
@@ -21,6 +21,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import lombok.EqualsAndHashCode;
 import lombok.extern.slf4j.Slf4j;
 import org.thingsboard.server.common.data.HasName;
+import org.thingsboard.server.common.data.HasTenantId;
 import org.thingsboard.server.common.data.SearchTextBasedWithAdditionalInfo;
 import org.thingsboard.server.common.data.id.PluginId;
 import org.thingsboard.server.common.data.id.TenantId;
@@ -32,7 +33,7 @@ import java.io.IOException;
 
 @EqualsAndHashCode(callSuper = true)
 @Slf4j
-public class PluginMetaData extends SearchTextBasedWithAdditionalInfo<PluginId> implements HasName {
+public class PluginMetaData extends SearchTextBasedWithAdditionalInfo<PluginId> implements HasName, HasTenantId {
 
     private static final long serialVersionUID = 1L;
 
diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/rule/RuleChain.java b/common/data/src/main/java/org/thingsboard/server/common/data/rule/RuleChain.java
index e82c850..f2ba0cc 100644
--- a/common/data/src/main/java/org/thingsboard/server/common/data/rule/RuleChain.java
+++ b/common/data/src/main/java/org/thingsboard/server/common/data/rule/RuleChain.java
@@ -21,6 +21,7 @@ import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.extern.slf4j.Slf4j;
 import org.thingsboard.server.common.data.HasName;
+import org.thingsboard.server.common.data.HasTenantId;
 import org.thingsboard.server.common.data.SearchTextBasedWithAdditionalInfo;
 import org.thingsboard.server.common.data.id.RuleChainId;
 import org.thingsboard.server.common.data.id.RuleNodeId;
@@ -29,7 +30,7 @@ import org.thingsboard.server.common.data.id.TenantId;
 @Data
 @EqualsAndHashCode(callSuper = true)
 @Slf4j
-public class RuleChain extends SearchTextBasedWithAdditionalInfo<RuleChainId> implements HasName {
+public class RuleChain extends SearchTextBasedWithAdditionalInfo<RuleChainId> implements HasName, HasTenantId {
 
     private static final long serialVersionUID = -5656679015121935465L;
 
diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/rule/RuleMetaData.java b/common/data/src/main/java/org/thingsboard/server/common/data/rule/RuleMetaData.java
index 98adeb7..953e5eb 100644
--- a/common/data/src/main/java/org/thingsboard/server/common/data/rule/RuleMetaData.java
+++ b/common/data/src/main/java/org/thingsboard/server/common/data/rule/RuleMetaData.java
@@ -23,6 +23,7 @@ import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.extern.slf4j.Slf4j;
 import org.thingsboard.server.common.data.HasName;
+import org.thingsboard.server.common.data.HasTenantId;
 import org.thingsboard.server.common.data.SearchTextBasedWithAdditionalInfo;
 import org.thingsboard.server.common.data.id.RuleId;
 import org.thingsboard.server.common.data.id.TenantId;
@@ -31,7 +32,7 @@ import org.thingsboard.server.common.data.plugin.ComponentLifecycleState;
 @Data
 @EqualsAndHashCode(callSuper = true)
 @Slf4j
-public class RuleMetaData extends SearchTextBasedWithAdditionalInfo<RuleId> implements HasName {
+public class RuleMetaData extends SearchTextBasedWithAdditionalInfo<RuleId> implements HasName, HasTenantId {
 
     private static final long serialVersionUID = -5656679015122935465L;
 
diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/User.java b/common/data/src/main/java/org/thingsboard/server/common/data/User.java
index c893d64..3b95776 100644
--- a/common/data/src/main/java/org/thingsboard/server/common/data/User.java
+++ b/common/data/src/main/java/org/thingsboard/server/common/data/User.java
@@ -25,7 +25,7 @@ import org.thingsboard.server.common.data.security.Authority;
 import com.fasterxml.jackson.databind.JsonNode;
 
 @EqualsAndHashCode(callSuper = true)
-public class User extends SearchTextBasedWithAdditionalInfo<UserId> implements HasName {
+public class User extends SearchTextBasedWithAdditionalInfo<UserId> implements HasName, HasTenantId, HasCustomerId {
 
     private static final long serialVersionUID = 8250339805336035966L;
 
diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/inmemory/HostRequestIntervalRegistry.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/inmemory/HostRequestIntervalRegistry.java
index 8d254a0..3782ed2 100644
--- a/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/inmemory/HostRequestIntervalRegistry.java
+++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/inmemory/HostRequestIntervalRegistry.java
@@ -61,13 +61,14 @@ public class HostRequestIntervalRegistry {
     }
 
     public long tick(String clientHostId) {
+        IntervalCount intervalCount = hostCounts.computeIfAbsent(clientHostId, s -> new IntervalCount(intervalDurationMs));
+        long currentCount = intervalCount.resetIfExpiredAndTick();
         if (whiteList.contains(clientHostId)) {
             return 0;
         } else if (blackList.contains(clientHostId)) {
             return Long.MAX_VALUE;
         }
-        IntervalCount intervalCount = hostCounts.computeIfAbsent(clientHostId, s -> new IntervalCount(intervalDurationMs));
-        return intervalCount.resetIfExpiredAndTick();
+        return currentCount;
     }
 
     public void clean() {
diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java
index a1c85e2..ede076b 100644
--- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java
+++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java
@@ -17,7 +17,17 @@ package org.thingsboard.rule.engine.api;
 
 import org.thingsboard.server.common.msg.TbMsg;
 import org.thingsboard.server.common.msg.cluster.ServerAddress;
+import org.thingsboard.server.dao.alarm.AlarmService;
+import org.thingsboard.server.dao.asset.AssetService;
 import org.thingsboard.server.dao.attributes.AttributesService;
+import org.thingsboard.server.dao.customer.CustomerService;
+import org.thingsboard.server.dao.device.DeviceService;
+import org.thingsboard.server.dao.plugin.PluginService;
+import org.thingsboard.server.dao.relation.RelationService;
+import org.thingsboard.server.dao.rule.RuleChainService;
+import org.thingsboard.server.dao.rule.RuleService;
+import org.thingsboard.server.dao.timeseries.TimeseriesService;
+import org.thingsboard.server.dao.user.UserService;
 
 import java.util.UUID;
 
@@ -40,6 +50,28 @@ public interface TbContext {
 
     void ack(TbMsg msg);
 
+    void tellError(TbMsg msg, Throwable th);
+
     AttributesService getAttributesService();
 
+    CustomerService getCustomerService();
+
+    UserService getUserService();
+
+    RuleService getRuleService();
+
+    PluginService getPluginService();
+
+    AssetService getAssetService();
+
+    DeviceService getDeviceService();
+
+    AlarmService getAlarmService();
+
+    RuleChainService getRuleChainService();
+
+    TimeseriesService getTimeseriesService();
+
+    RelationService getRelationService();
+
 }
diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbNodeException.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbNodeException.java
index 6766999..b42ec8e 100644
--- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbNodeException.java
+++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbNodeException.java
@@ -22,6 +22,10 @@ import com.fasterxml.jackson.core.JsonProcessingException;
  */
 public class TbNodeException extends Exception {
 
+    public TbNodeException(String message) {
+        super(message);
+    }
+
     public TbNodeException(Exception e) {
         super(e);
     }
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/DonAsynchron.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/DonAsynchron.java
new file mode 100644
index 0000000..4fed574
--- /dev/null
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/DonAsynchron.java
@@ -0,0 +1,45 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.rule.engine;
+
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+
+import javax.annotation.Nullable;
+import java.util.function.Consumer;
+
+public class DonAsynchron {
+
+    public static  <T> void withCallback(ListenableFuture<T> future, Consumer<T> onSuccess, Consumer<Throwable> onFailure) {
+        Futures.addCallback(future, new FutureCallback<T>() {
+            @Override
+            public void onSuccess(@Nullable T result) {
+                try {
+                    onSuccess.accept(result);
+                } catch (Throwable th) {
+                    onFailure(th);
+                }
+
+            }
+
+            @Override
+            public void onFailure(Throwable t) {
+                onFailure.accept(t);
+            }
+        });
+    }
+}
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbEntityGetAttrNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbEntityGetAttrNode.java
new file mode 100644
index 0000000..52850be
--- /dev/null
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbEntityGetAttrNode.java
@@ -0,0 +1,88 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.rule.engine.metadata;
+
+import com.google.common.base.Function;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.thingsboard.rule.engine.TbNodeUtils;
+import org.thingsboard.rule.engine.api.*;
+import org.thingsboard.server.common.data.id.EntityId;
+import org.thingsboard.server.common.data.kv.AttributeKvEntry;
+import org.thingsboard.server.common.data.kv.KvEntry;
+import org.thingsboard.server.common.data.kv.TsKvEntry;
+import org.thingsboard.server.common.msg.TbMsg;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.thingsboard.rule.engine.DonAsynchron.withCallback;
+import static org.thingsboard.server.common.data.DataConstants.SERVER_SCOPE;
+
+public abstract class TbEntityGetAttrNode<T extends EntityId> implements TbNode {
+
+    private TbGetEntityAttrNodeConfiguration config;
+
+    @Override
+    public void init(TbNodeConfiguration configuration, TbNodeState state) throws TbNodeException {
+        this.config = TbNodeUtils.convert(configuration, TbGetEntityAttrNodeConfiguration.class);
+    }
+
+    @Override
+    public void onMsg(TbContext ctx, TbMsg msg) {
+        try {
+            withCallback(
+                    findEntityAsync(ctx, msg.getOriginator()),
+                    entityId -> withCallback(
+                            config.isTelemetry() ? getLatestTelemetry(ctx, entityId) : getAttributesAsync(ctx, entityId),
+                            attributes -> putAttributesAndTell(ctx, msg, attributes),
+                            t -> ctx.tellError(msg, t)
+                    ),
+                    t -> ctx.tellError(msg, t));
+        } catch (Throwable th) {
+            ctx.tellError(msg, th);
+        }
+    }
+
+    private ListenableFuture<List<KvEntry>> getAttributesAsync(TbContext ctx, EntityId entityId) {
+        ListenableFuture<List<AttributeKvEntry>> latest = ctx.getAttributesService().find(entityId, SERVER_SCOPE, config.getAttrMapping().keySet());
+        return Futures.transform(latest, (Function<? super List<AttributeKvEntry>, ? extends List<KvEntry>>) l ->
+                l.stream().map(i -> (KvEntry) i).collect(Collectors.toList()));
+    }
+
+    private ListenableFuture<List<KvEntry>> getLatestTelemetry(TbContext ctx, EntityId entityId) {
+        ListenableFuture<List<TsKvEntry>> latest = ctx.getTimeseriesService().findLatest(entityId, config.getAttrMapping().keySet());
+        return Futures.transform(latest, (Function<? super List<TsKvEntry>, ? extends List<KvEntry>>) l ->
+                l.stream().map(i -> (KvEntry) i).collect(Collectors.toList()));
+    }
+
+
+    private void putAttributesAndTell(TbContext ctx, TbMsg msg, List<KvEntry> attributes) {
+        attributes.forEach(r -> {
+            String attrName = config.getAttrMapping().get(r.getKey());
+            msg.getMetaData().putValue(attrName, r.getValueAsString());
+        });
+        ctx.tellNext(msg);
+    }
+
+    @Override
+    public void destroy() {
+
+    }
+
+    protected abstract ListenableFuture<T> findEntityAsync(TbContext ctx, EntityId originator);
+
+}
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetAttributesNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetAttributesNode.java
index 11c644c..4d41921 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetAttributesNode.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetAttributesNode.java
@@ -15,23 +15,27 @@
  */
 package org.thingsboard.rule.engine.metadata;
 
+import com.google.common.util.concurrent.AsyncFunction;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
 import lombok.extern.slf4j.Slf4j;
 import org.thingsboard.rule.engine.TbNodeUtils;
 import org.thingsboard.rule.engine.api.*;
-import org.thingsboard.server.common.data.DataConstants;
 import org.thingsboard.server.common.data.kv.AttributeKvEntry;
 import org.thingsboard.server.common.msg.TbMsg;
-import org.thingsboard.server.dao.attributes.AttributesService;
 
 import java.util.List;
 
+import static org.thingsboard.rule.engine.DonAsynchron.withCallback;
+import static org.thingsboard.server.common.data.DataConstants.*;
+
 /**
  * Created by ashvayka on 19.01.18.
  */
 @Slf4j
 public class TbGetAttributesNode implements TbNode {
 
-    TbGetAttributesNodeConfiguration config;
+    private TbGetAttributesNodeConfiguration config;
 
     @Override
     public void init(TbNodeConfiguration configuration, TbNodeState state) throws TbNodeException {
@@ -40,26 +44,25 @@ public class TbGetAttributesNode implements TbNode {
 
     @Override
     public void onMsg(TbContext ctx, TbMsg msg) throws TbNodeException {
-        try {
-            //TODO: refactor this to work async and fetch attributes from cache.
-            AttributesService service = ctx.getAttributesService();
-            fetchAttributes(msg, service, config.getClientAttributeNames(), DataConstants.CLIENT_SCOPE, "cs.");
-            fetchAttributes(msg, service, config.getServerAttributeNames(), DataConstants.SERVER_SCOPE, "ss.");
-            fetchAttributes(msg, service, config.getSharedAttributeNames(), DataConstants.SHARED_SCOPE, "shared.");
-            ctx.tellNext(msg);
-        } catch (Exception e) {
-            log.warn("[{}][{}] Failed to fetch attributes", msg.getOriginator(), msg.getId(), e);
-            throw new TbNodeException(e);
-        }
+        ListenableFuture<List<Void>> future = Futures.allAsList(
+                putAttrAsync(ctx, msg, CLIENT_SCOPE, config.getClientAttributeNames(), "cs."),
+                putAttrAsync(ctx, msg, SHARED_SCOPE, config.getSharedAttributeNames(), "shared."),
+                putAttrAsync(ctx, msg, SERVER_SCOPE, config.getServerAttributeNames(), "ss."));
+
+        withCallback(future, i -> ctx.tellNext(msg), t -> ctx.tellError(msg, t));
     }
 
-    private void fetchAttributes(TbMsg msg, AttributesService service, List<String> attributeNames, String scope, String prefix) throws InterruptedException, java.util.concurrent.ExecutionException {
-        if (attributeNames != null && attributeNames.isEmpty()) {
-            List<AttributeKvEntry> attributes = service.find(msg.getOriginator(), scope, attributeNames).get();
-            attributes.forEach(attr -> msg.getMetaData().putValue(prefix + attr.getKey(), attr.getValueAsString()));
-        }
+    private ListenableFuture<Void> putAttributesAsync(TbMsg msg, List<AttributeKvEntry> attributes, String prefix) {
+        attributes.forEach(r -> msg.getMetaData().putValue(prefix + r.getKey(), r.getValueAsString()));
+        return Futures.immediateFuture(null);
     }
 
+    private ListenableFuture<Void> putAttrAsync(TbContext ctx, TbMsg msg, String scope, List<String> attributes, String prefix) {
+        return Futures.transform(ctx.getAttributesService().find(msg.getOriginator(), scope, attributes),
+                (AsyncFunction<List<AttributeKvEntry>, Void>) i -> putAttributesAsync(msg, i, prefix));
+    }
+
+
     @Override
     public void destroy() {
 
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetCustomerAttributeNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetCustomerAttributeNode.java
new file mode 100644
index 0000000..57f9b79
--- /dev/null
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetCustomerAttributeNode.java
@@ -0,0 +1,51 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.rule.engine.metadata;
+
+import com.google.common.util.concurrent.AsyncFunction;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.thingsboard.rule.engine.api.TbContext;
+import org.thingsboard.rule.engine.api.TbNodeException;
+import org.thingsboard.server.common.data.HasCustomerId;
+import org.thingsboard.server.common.data.id.*;
+
+public class TbGetCustomerAttributeNode extends TbEntityGetAttrNode<CustomerId> {
+
+    @Override
+    protected ListenableFuture<CustomerId> findEntityAsync(TbContext ctx, EntityId originator) {
+
+        switch (originator.getEntityType()) {
+            case CUSTOMER:
+                return Futures.immediateFuture((CustomerId) originator);
+            case USER:
+                return getCustomerAsync(ctx.getUserService().findUserByIdAsync((UserId) originator));
+            case ASSET:
+                return getCustomerAsync(ctx.getAssetService().findAssetByIdAsync((AssetId) originator));
+            case DEVICE:
+                return getCustomerAsync(ctx.getDeviceService().findDeviceByIdAsync((DeviceId) originator));
+            default:
+                return Futures.immediateFailedFuture(new TbNodeException("Unexpected originator EntityType " + originator));
+        }
+    }
+
+    private <T extends HasCustomerId> ListenableFuture<CustomerId> getCustomerAsync(ListenableFuture<T> future) {
+        return Futures.transform(future, (AsyncFunction<HasCustomerId, CustomerId>) in -> {
+            return in != null ? Futures.immediateFuture(in.getCustomerId())
+                    : Futures.immediateFailedFuture(new IllegalStateException("Customer not found"));});
+    }
+
+}
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetEntityAttrNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetEntityAttrNodeConfiguration.java
new file mode 100644
index 0000000..a5e85c5
--- /dev/null
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetEntityAttrNodeConfiguration.java
@@ -0,0 +1,28 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.rule.engine.metadata;
+
+import lombok.Data;
+
+import java.util.Map;
+import java.util.Optional;
+
+@Data
+public class TbGetEntityAttrNodeConfiguration {
+
+    private Map<String, String> attrMapping;
+    private boolean isTelemetry = false;
+}
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetRelatedAttributeNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetRelatedAttributeNode.java
new file mode 100644
index 0000000..5823c18
--- /dev/null
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetRelatedAttributeNode.java
@@ -0,0 +1,62 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.rule.engine.metadata;
+
+import com.google.common.util.concurrent.AsyncFunction;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.commons.collections.CollectionUtils;
+import org.thingsboard.rule.engine.TbNodeUtils;
+import org.thingsboard.rule.engine.api.TbContext;
+import org.thingsboard.rule.engine.api.TbNodeConfiguration;
+import org.thingsboard.rule.engine.api.TbNodeException;
+import org.thingsboard.rule.engine.api.TbNodeState;
+import org.thingsboard.server.common.data.id.EntityId;
+import org.thingsboard.server.common.data.relation.EntityRelation;
+import org.thingsboard.server.common.data.relation.EntitySearchDirection;
+import org.thingsboard.server.dao.relation.RelationService;
+
+import java.util.List;
+
+import static org.thingsboard.server.common.data.relation.RelationTypeGroup.COMMON;
+
+public class TbGetRelatedAttributeNode extends TbEntityGetAttrNode<EntityId> {
+
+    private TbGetRelatedAttrNodeConfiguration config;
+
+    @Override
+    public void init(TbNodeConfiguration configuration, TbNodeState state) throws TbNodeException {
+        this.config = TbNodeUtils.convert(configuration, TbGetRelatedAttrNodeConfiguration.class);
+    }
+
+    @Override
+    protected ListenableFuture<EntityId> findEntityAsync(TbContext ctx, EntityId originator) {
+        RelationService relationService = ctx.getRelationService();
+        if (config.getDirection() == EntitySearchDirection.FROM) {
+            ListenableFuture<List<EntityRelation>> asyncRelation = relationService.findByFromAndTypeAsync(originator, config.getRelationType(), COMMON);
+            return Futures.transform(asyncRelation, (AsyncFunction<? super List<EntityRelation>, EntityId>)
+                    r -> CollectionUtils.isNotEmpty(r) ? Futures.immediateFuture(r.get(0).getTo())
+                            : Futures.immediateFailedFuture(new IllegalStateException("Relation not found")));
+        } else if (config.getDirection() == EntitySearchDirection.TO) {
+            ListenableFuture<List<EntityRelation>> asyncRelation = relationService.findByToAndTypeAsync(originator, config.getRelationType(), COMMON);
+            return Futures.transform(asyncRelation, (AsyncFunction<? super List<EntityRelation>, EntityId>)
+                    r -> CollectionUtils.isNotEmpty(r) ? Futures.immediateFuture(r.get(0).getFrom())
+                            : Futures.immediateFailedFuture(new IllegalStateException("Relation not found")));
+        }
+
+        return Futures.immediateFailedFuture(new IllegalStateException("Unknown direction"));
+    }
+}
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetRelatedAttrNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetRelatedAttrNodeConfiguration.java
new file mode 100644
index 0000000..75b0a65
--- /dev/null
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetRelatedAttrNodeConfiguration.java
@@ -0,0 +1,26 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.rule.engine.metadata;
+
+import lombok.Data;
+import org.thingsboard.server.common.data.relation.EntitySearchDirection;
+
+@Data
+public class TbGetRelatedAttrNodeConfiguration {
+
+    private String relationType;
+    private EntitySearchDirection direction;
+}
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTenantAttributeNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTenantAttributeNode.java
new file mode 100644
index 0000000..2cf9a97
--- /dev/null
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTenantAttributeNode.java
@@ -0,0 +1,64 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.rule.engine.metadata;
+
+import com.google.common.util.concurrent.AsyncFunction;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import lombok.extern.slf4j.Slf4j;
+import org.thingsboard.rule.engine.api.TbContext;
+import org.thingsboard.rule.engine.api.TbNodeException;
+import org.thingsboard.server.common.data.HasTenantId;
+import org.thingsboard.server.common.data.alarm.AlarmId;
+import org.thingsboard.server.common.data.id.*;
+
+@Slf4j
+public class TbGetTenantAttributeNode extends TbEntityGetAttrNode<TenantId> {
+
+    @Override
+    protected ListenableFuture<TenantId> findEntityAsync(TbContext ctx, EntityId originator) {
+
+        switch (originator.getEntityType()) {
+            case TENANT:
+                return Futures.immediateFuture((TenantId) originator);
+            case CUSTOMER:
+                return getTenantAsync(ctx.getCustomerService().findCustomerByIdAsync((CustomerId) originator));
+            case USER:
+                return getTenantAsync(ctx.getUserService().findUserByIdAsync((UserId) originator));
+            case RULE:
+                return getTenantAsync(ctx.getRuleService().findRuleByIdAsync((RuleId) originator));
+            case PLUGIN:
+                return getTenantAsync(ctx.getPluginService().findPluginByIdAsync((PluginId) originator));
+            case ASSET:
+                return getTenantAsync(ctx.getAssetService().findAssetByIdAsync((AssetId) originator));
+            case DEVICE:
+                return getTenantAsync(ctx.getDeviceService().findDeviceByIdAsync((DeviceId) originator));
+            case ALARM:
+                return getTenantAsync(ctx.getAlarmService().findAlarmByIdAsync((AlarmId) originator));
+            case RULE_CHAIN:
+                return getTenantAsync(ctx.getRuleChainService().findRuleChainByIdAsync((RuleChainId) originator));
+            default:
+                return Futures.immediateFailedFuture(new TbNodeException("Unexpected originator EntityType " + originator));
+        }
+    }
+
+    private <T extends HasTenantId> ListenableFuture<TenantId> getTenantAsync(ListenableFuture<T> future) {
+        return Futures.transform(future, (AsyncFunction<HasTenantId, TenantId>) in -> {
+            return in != null ? Futures.immediateFuture(in.getTenantId())
+                : Futures.immediateFailedFuture(new IllegalStateException("Tenant not found"));});
+    }
+
+}
diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/TbGetCustomerAttributeNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/TbGetCustomerAttributeNodeTest.java
new file mode 100644
index 0000000..8e5ddb8
--- /dev/null
+++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/TbGetCustomerAttributeNodeTest.java
@@ -0,0 +1,266 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.rule.engine.metadata;
+
+import com.datastax.driver.core.utils.UUIDs;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.Futures;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+import org.thingsboard.rule.engine.api.TbContext;
+import org.thingsboard.rule.engine.api.TbNodeConfiguration;
+import org.thingsboard.rule.engine.api.TbNodeException;
+import org.thingsboard.server.common.data.Device;
+import org.thingsboard.server.common.data.User;
+import org.thingsboard.server.common.data.asset.Asset;
+import org.thingsboard.server.common.data.id.AssetId;
+import org.thingsboard.server.common.data.id.CustomerId;
+import org.thingsboard.server.common.data.id.DeviceId;
+import org.thingsboard.server.common.data.id.UserId;
+import org.thingsboard.server.common.data.kv.*;
+import org.thingsboard.server.common.msg.TbMsg;
+import org.thingsboard.server.common.msg.TbMsgMetaData;
+import org.thingsboard.server.dao.asset.AssetService;
+import org.thingsboard.server.dao.attributes.AttributesService;
+import org.thingsboard.server.dao.device.DeviceService;
+import org.thingsboard.server.dao.timeseries.TimeseriesService;
+import org.thingsboard.server.dao.user.UserService;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.same;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.thingsboard.server.common.data.DataConstants.SERVER_SCOPE;
+
+@RunWith(MockitoJUnitRunner.class)
+public class TbGetCustomerAttributeNodeTest {
+
+    private TbGetCustomerAttributeNode node;
+
+    @Mock
+    private TbContext ctx;
+
+    @Mock
+    private AttributesService attributesService;
+    @Mock
+    private TimeseriesService timeseriesService;
+    @Mock
+    private UserService userService;
+    @Mock
+    private AssetService assetService;
+    @Mock
+    private DeviceService deviceService;
+
+    private TbMsg msg;
+
+    @Before
+    public void init() throws TbNodeException {
+        TbGetEntityAttrNodeConfiguration config = new TbGetEntityAttrNodeConfiguration();
+        Map<String, String> attrMapping = new HashMap<>();
+        attrMapping.putIfAbsent("temperature", "tempo");
+        config.setAttrMapping(attrMapping);
+        config.setTelemetry(false);
+        ObjectMapper mapper = new ObjectMapper();
+        TbNodeConfiguration nodeConfiguration = new TbNodeConfiguration();
+        nodeConfiguration.setData(mapper.valueToTree(config));
+
+        node = new TbGetCustomerAttributeNode();
+        node.init(nodeConfiguration, null);
+    }
+
+    @Test
+    public void errorThrownIfCannotLoadAttributes() {
+        UserId userId = new UserId(UUIDs.timeBased());
+        CustomerId customerId = new CustomerId(UUIDs.timeBased());
+        User user = new User();
+        user.setCustomerId(customerId);
+
+        msg = new TbMsg(UUIDs.timeBased(), "USER", userId, new TbMsgMetaData(), new byte[4]);
+
+        when(ctx.getUserService()).thenReturn(userService);
+        when(userService.findUserByIdAsync(userId)).thenReturn(Futures.immediateFuture(user));
+
+        when(ctx.getAttributesService()).thenReturn(attributesService);
+        when(attributesService.find(customerId, SERVER_SCOPE, Collections.singleton("temperature")))
+                .thenThrow(new IllegalStateException("something wrong"));
+
+        node.onMsg(ctx, msg);
+        final ArgumentCaptor<Throwable> captor = ArgumentCaptor.forClass(Throwable.class);
+        verify(ctx).tellError(same(msg), captor.capture());
+
+        Throwable value = captor.getValue();
+        assertEquals("something wrong", value.getMessage());
+        assertTrue(msg.getMetaData().getData().isEmpty());
+    }
+
+    @Test
+    public void errorThrownIfCannotLoadAttributesAsync() {
+        UserId userId = new UserId(UUIDs.timeBased());
+        CustomerId customerId = new CustomerId(UUIDs.timeBased());
+        User user = new User();
+        user.setCustomerId(customerId);
+
+        msg = new TbMsg(UUIDs.timeBased(), "USER", userId, new TbMsgMetaData(), new byte[4]);
+
+        when(ctx.getUserService()).thenReturn(userService);
+        when(userService.findUserByIdAsync(userId)).thenReturn(Futures.immediateFuture(user));
+
+        when(ctx.getAttributesService()).thenReturn(attributesService);
+        when(attributesService.find(customerId, SERVER_SCOPE, Collections.singleton("temperature")))
+                .thenReturn(Futures.immediateFailedFuture(new IllegalStateException("something wrong")));
+
+        node.onMsg(ctx, msg);
+        final ArgumentCaptor<Throwable> captor = ArgumentCaptor.forClass(Throwable.class);
+        verify(ctx).tellError(same(msg), captor.capture());
+
+        Throwable value = captor.getValue();
+        assertEquals("something wrong", value.getMessage());
+        assertTrue(msg.getMetaData().getData().isEmpty());
+    }
+
+    @Test
+    public void errorThrownIfCustomerCannotBeFound() {
+        UserId userId = new UserId(UUIDs.timeBased());
+        CustomerId customerId = new CustomerId(UUIDs.timeBased());
+        User user = new User();
+        user.setCustomerId(customerId);
+
+        msg = new TbMsg(UUIDs.timeBased(), "USER", userId, new TbMsgMetaData(), new byte[4]);
+
+        when(ctx.getUserService()).thenReturn(userService);
+        when(userService.findUserByIdAsync(userId)).thenReturn(Futures.immediateFuture(null));
+
+        node.onMsg(ctx, msg);
+        final ArgumentCaptor<Throwable> captor = ArgumentCaptor.forClass(Throwable.class);
+        verify(ctx).tellError(same(msg), captor.capture());
+
+        Throwable value = captor.getValue();
+        assertEquals(IllegalStateException.class, value.getClass());
+        assertEquals("Customer not found", value.getMessage());
+        assertTrue(msg.getMetaData().getData().isEmpty());
+    }
+
+    @Test
+    public void customerAttributeAddedInMetadata() {
+        CustomerId customerId = new CustomerId(UUIDs.timeBased());
+        msg = new TbMsg(UUIDs.timeBased(), "CUSTOMER", customerId, new TbMsgMetaData(), new byte[4]);
+        entityAttributeFetched(customerId);
+    }
+
+    @Test
+    public void usersCustomerAttributesFetched() {
+        UserId userId = new UserId(UUIDs.timeBased());
+        CustomerId customerId = new CustomerId(UUIDs.timeBased());
+        User user = new User();
+        user.setCustomerId(customerId);
+
+        msg = new TbMsg(UUIDs.timeBased(), "USER", userId, new TbMsgMetaData(), new byte[4]);
+
+        when(ctx.getUserService()).thenReturn(userService);
+        when(userService.findUserByIdAsync(userId)).thenReturn(Futures.immediateFuture(user));
+
+        entityAttributeFetched(customerId);
+    }
+
+    @Test
+    public void assetsCustomerAttributesFetched() {
+        AssetId assetId = new AssetId(UUIDs.timeBased());
+        CustomerId customerId = new CustomerId(UUIDs.timeBased());
+        Asset asset = new Asset();
+        asset.setCustomerId(customerId);
+
+        msg = new TbMsg(UUIDs.timeBased(), "USER", assetId, new TbMsgMetaData(), new byte[4]);
+
+        when(ctx.getAssetService()).thenReturn(assetService);
+        when(assetService.findAssetByIdAsync(assetId)).thenReturn(Futures.immediateFuture(asset));
+
+        entityAttributeFetched(customerId);
+    }
+
+    @Test
+    public void deviceCustomerAttributesFetched() {
+        DeviceId deviceId = new DeviceId(UUIDs.timeBased());
+        CustomerId customerId = new CustomerId(UUIDs.timeBased());
+        Device device = new Device();
+        device.setCustomerId(customerId);
+
+        msg = new TbMsg(UUIDs.timeBased(), "USER", deviceId, new TbMsgMetaData(), new byte[4]);
+
+        when(ctx.getDeviceService()).thenReturn(deviceService);
+        when(deviceService.findDeviceByIdAsync(deviceId)).thenReturn(Futures.immediateFuture(device));
+
+        entityAttributeFetched(customerId);
+    }
+
+    @Test
+    public void deviceCustomerTelemetryFetched() throws TbNodeException {
+        TbGetEntityAttrNodeConfiguration config = new TbGetEntityAttrNodeConfiguration();
+        Map<String, String> attrMapping = new HashMap<>();
+        attrMapping.putIfAbsent("temperature", "tempo");
+        config.setAttrMapping(attrMapping);
+        config.setTelemetry(true);
+        ObjectMapper mapper = new ObjectMapper();
+        TbNodeConfiguration nodeConfiguration = new TbNodeConfiguration();
+        nodeConfiguration.setData(mapper.valueToTree(config));
+
+        node = new TbGetCustomerAttributeNode();
+        node.init(nodeConfiguration, null);
+
+
+        DeviceId deviceId = new DeviceId(UUIDs.timeBased());
+        CustomerId customerId = new CustomerId(UUIDs.timeBased());
+        Device device = new Device();
+        device.setCustomerId(customerId);
+
+        msg = new TbMsg(UUIDs.timeBased(), "USER", deviceId, new TbMsgMetaData(), new byte[4]);
+
+        when(ctx.getDeviceService()).thenReturn(deviceService);
+        when(deviceService.findDeviceByIdAsync(deviceId)).thenReturn(Futures.immediateFuture(device));
+
+        List<TsKvEntry> timeseries = Lists.newArrayList(new BasicTsKvEntry(1L, new StringDataEntry("temperature", "highest")));
+
+        when(ctx.getTimeseriesService()).thenReturn(timeseriesService);
+        when(timeseriesService.findLatest(customerId, Collections.singleton("temperature")))
+                .thenReturn(Futures.immediateFuture(timeseries));
+
+        node.onMsg(ctx, msg);
+        verify(ctx).tellNext(msg);
+        assertEquals(msg.getMetaData().getValue("tempo"), "highest");
+    }
+
+    private void entityAttributeFetched(CustomerId customerId) {
+        List<AttributeKvEntry> attributes = Lists.newArrayList(new BaseAttributeKvEntry(new StringDataEntry("temperature", "high"), 1L));
+
+        when(ctx.getAttributesService()).thenReturn(attributesService);
+        when(attributesService.find(customerId, SERVER_SCOPE, Collections.singleton("temperature")))
+                .thenReturn(Futures.immediateFuture(attributes));
+
+        node.onMsg(ctx, msg);
+        verify(ctx).tellNext(msg);
+        assertEquals(msg.getMetaData().getValue("tempo"), "high");
+    }
+}
\ No newline at end of file