thingsboard-memoizeit

Rule Engine draft

2/7/2018 10:17:33 AM

Changes

dao/pom.xml 1(+0 -1)

pom.xml 1(+1 -0)

rule-engine/pom.xml 42(+42 -0)

Details

diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml
index 18e35c6..6bb3917 100644
--- a/application/src/main/resources/thingsboard.yml
+++ b/application/src/main/resources/thingsboard.yml
@@ -247,7 +247,7 @@ spring:
     database-platform: "${SPRING_JPA_DATABASE_PLATFORM:org.hibernate.dialect.HSQLDialect}"
   datasource:
     driverClassName: "${SPRING_DRIVER_CLASS_NAME:org.hsqldb.jdbc.JDBCDriver}"
-    url: "${SPRING_DATASOURCE_URL:jdbc:hsqldb:file:${SQL_DATA_FOLDER:/tmp}/thingsboardDb;sql.enforce_size=false}"
+    url: "${SPRING_DATASOURCE_URL:jdbc:hsqldb:file:${SQL_DATA_FOLDER:/tmp}/thingsboardDb;sql.enforce_size=false;hsqldb.log_size=5}"
     username: "${SPRING_DATASOURCE_USERNAME:sa}"
     password: "${SPRING_DATASOURCE_PASSWORD:}"
 

dao/pom.xml 1(+0 -1)

diff --git a/dao/pom.xml b/dao/pom.xml
index d9463e4..8d43822 100644
--- a/dao/pom.xml
+++ b/dao/pom.xml
@@ -23,7 +23,6 @@
         <version>1.4.0-SNAPSHOT</version>
         <artifactId>thingsboard</artifactId>
     </parent>
-    <groupId>org.thingsboard</groupId>
     <artifactId>dao</artifactId>
     <packaging>jar</packaging>
 

pom.xml 1(+1 -0)

diff --git a/pom.xml b/pom.xml
index 4b77abb..27e5216 100755
--- a/pom.xml
+++ b/pom.xml
@@ -85,6 +85,7 @@
         <module>extensions-api</module>
         <module>extensions-core</module>
         <module>extensions</module>
+        <module>rule-engine</module>
         <module>transport</module>
         <module>ui</module>
         <module>tools</module>

rule-engine/pom.xml 42(+42 -0)

diff --git a/rule-engine/pom.xml b/rule-engine/pom.xml
new file mode 100644
index 0000000..e23f871
--- /dev/null
+++ b/rule-engine/pom.xml
@@ -0,0 +1,42 @@
+<!--
+
+    Copyright © 2016-2017 The Thingsboard Authors
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+        http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.thingsboard</groupId>
+        <version>1.4.0-SNAPSHOT</version>
+        <artifactId>thingsboard</artifactId>
+    </parent>
+    <groupId>org.thingsboard</groupId>
+    <artifactId>rule-engine</artifactId>
+    <packaging>pom</packaging>
+
+    <name>Thingsboard Extensions</name>
+    <url>https://thingsboard.io</url>
+
+    <properties>
+        <main.dir>${basedir}/..</main.dir>
+    </properties>
+
+    <modules>
+        <module>rule-engine-api</module>
+        <module>rule-engine-components</module>
+    </modules>
+
+</project>
diff --git a/rule-engine/rule-engine-api/pom.xml b/rule-engine/rule-engine-api/pom.xml
new file mode 100644
index 0000000..69cbc65
--- /dev/null
+++ b/rule-engine/rule-engine-api/pom.xml
@@ -0,0 +1,67 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Copyright © 2016-2017 The Thingsboard Authors
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+        http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.thingsboard</groupId>
+        <version>1.4.0-SNAPSHOT</version>
+        <artifactId>rule-engine</artifactId>
+    </parent>
+    <groupId>org.thingsboard.rule-engine</groupId>
+    <artifactId>rule-engine-api</artifactId>
+    <packaging>jar</packaging>
+
+    <name>Thingsboard Rule Engine API</name>
+    <url>https://thingsboard.io</url>
+
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+        <main.dir>${basedir}/../..</main.dir>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.thingsboard.common</groupId>
+            <artifactId>message</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.thingsboard</groupId>
+            <artifactId>extensions-api</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.thingsboard</groupId>
+            <artifactId>dao</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-core</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-classic</artifactId>
+            <scope>provided</scope>
+        </dependency>
+    </dependencies>
+</project>
\ No newline at end of file
diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java
new file mode 100644
index 0000000..b833a97
--- /dev/null
+++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java
@@ -0,0 +1,29 @@
+package org.thingsboard.rule.engine.api;
+
+import org.thingsboard.server.common.msg.cluster.ServerAddress;
+import org.thingsboard.server.dao.attributes.AttributesService;
+
+import java.util.UUID;
+
+/**
+ * Created by ashvayka on 13.01.18.
+ */
+public interface TbContext {
+
+    void tellNext(TbMsg msg);
+
+    void tellNext(TbMsg msg, String relationType);
+
+    void tellSelf(TbMsg msg, long delayMs);
+
+    void tellOthers(TbMsg msg);
+
+    void tellSibling(TbMsg msg, ServerAddress address);
+
+    void spawn(TbMsg msg);
+
+    void ack(UUID msg);
+
+    AttributesService getAttributesService();
+
+}
diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbMsg.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbMsg.java
new file mode 100644
index 0000000..9917c6a
--- /dev/null
+++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbMsg.java
@@ -0,0 +1,22 @@
+package org.thingsboard.rule.engine.api;
+
+import lombok.Data;
+import org.thingsboard.server.common.data.id.EntityId;
+
+import java.io.Serializable;
+import java.util.UUID;
+
+/**
+ * Created by ashvayka on 13.01.18.
+ */
+@Data
+public final class TbMsg implements Serializable {
+
+    private final UUID id;
+    private final String type;
+    private final EntityId originator;
+    private final TbMsgMetaData metaData;
+
+    private final byte[] data;
+
+}
diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbMsgMetaData.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbMsgMetaData.java
new file mode 100644
index 0000000..56abf2a
--- /dev/null
+++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbMsgMetaData.java
@@ -0,0 +1,24 @@
+package org.thingsboard.rule.engine.api;
+
+import lombok.Data;
+
+import java.io.Serializable;
+import java.util.Map;
+
+/**
+ * Created by ashvayka on 13.01.18.
+ */
+@Data
+public final class TbMsgMetaData implements Serializable {
+
+    private Map<String, String> data;
+
+    public String getValue(String key) {
+        return data.get(key);
+    }
+
+    public void putValue(String key, String value) {
+        data.put(key, value);
+    }
+
+}
diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbNode.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbNode.java
new file mode 100644
index 0000000..70993df
--- /dev/null
+++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbNode.java
@@ -0,0 +1,16 @@
+package org.thingsboard.rule.engine.api;
+
+import java.util.concurrent.ExecutionException;
+
+/**
+ * Created by ashvayka on 19.01.18.
+ */
+public interface TbNode {
+
+    void init(TbNodeConfiguration configuration, TbNodeState state) throws TbNodeException;
+
+    void onMsg(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException, TbNodeException;
+
+    void destroy();
+
+}
diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbNodeConfiguration.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbNodeConfiguration.java
new file mode 100644
index 0000000..4ab84aa
--- /dev/null
+++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbNodeConfiguration.java
@@ -0,0 +1,14 @@
+package org.thingsboard.rule.engine.api;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import lombok.Data;
+
+/**
+ * Created by ashvayka on 19.01.18.
+ */
+@Data
+public class TbNodeConfiguration {
+
+    private JsonNode data;
+
+}
diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbNodeException.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbNodeException.java
new file mode 100644
index 0000000..5a3744b
--- /dev/null
+++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbNodeException.java
@@ -0,0 +1,14 @@
+package org.thingsboard.rule.engine.api;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+
+/**
+ * Created by ashvayka on 19.01.18.
+ */
+public class TbNodeException extends Exception {
+
+    public TbNodeException(Exception e) {
+        super(e);
+    }
+
+}
diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbNodeState.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbNodeState.java
new file mode 100644
index 0000000..15f4c30
--- /dev/null
+++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbNodeState.java
@@ -0,0 +1,7 @@
+package org.thingsboard.rule.engine.api;
+
+/**
+ * Created by ashvayka on 19.01.18.
+ */
+public class TbNodeState {
+}
diff --git a/rule-engine/rule-engine-components/pom.xml b/rule-engine/rule-engine-components/pom.xml
new file mode 100644
index 0000000..ee0d83f
--- /dev/null
+++ b/rule-engine/rule-engine-components/pom.xml
@@ -0,0 +1,67 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Copyright © 2016-2017 The Thingsboard Authors
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+        http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.thingsboard</groupId>
+        <version>1.4.0-SNAPSHOT</version>
+        <artifactId>rule-engine</artifactId>
+    </parent>
+    <groupId>org.thingsboard.rule-engine</groupId>
+    <artifactId>rule-engine-components</artifactId>
+    <packaging>jar</packaging>
+
+    <name>Thingsboard Rule Engine Components</name>
+    <url>https://thingsboard.io</url>
+
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+        <main.dir>${basedir}/../..</main.dir>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.thingsboard</groupId>
+            <artifactId>dao</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-core</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-classic</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.thingsboard</groupId>
+            <artifactId>extensions-api</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.thingsboard.rule-engine</groupId>
+            <artifactId>rule-engine-api</artifactId>
+            <version>1.4.0-SNAPSHOT</version>
+        </dependency>
+    </dependencies>
+</project>
\ No newline at end of file
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbMsgTypeFilterNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbMsgTypeFilterNode.java
new file mode 100644
index 0000000..034ee48
--- /dev/null
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbMsgTypeFilterNode.java
@@ -0,0 +1,35 @@
+package org.thingsboard.rule.engine.filter;
+
+import lombok.extern.slf4j.Slf4j;
+import org.thingsboard.rule.engine.TbNodeUtils;
+import org.thingsboard.rule.engine.api.*;
+import org.thingsboard.rule.engine.metadata.TbGetAttributesNodeConfiguration;
+import org.thingsboard.server.common.data.DataConstants;
+import org.thingsboard.server.common.data.kv.AttributeKvEntry;
+import org.thingsboard.server.dao.attributes.AttributesService;
+
+import java.util.List;
+
+/**
+ * Created by ashvayka on 19.01.18.
+ */
+@Slf4j
+public class TbMsgTypeFilterNode implements TbNode {
+
+    TbMsgTypeFilterNodeConfiguration config;
+
+    @Override
+    public void init(TbNodeConfiguration configuration, TbNodeState state) throws TbNodeException {
+        this.config = TbNodeUtils.convert(configuration, TbMsgTypeFilterNodeConfiguration.class);
+    }
+
+    @Override
+    public void onMsg(TbContext ctx, TbMsg msg) throws TbNodeException {
+        ctx.tellNext(msg, Boolean.toString(config.getMessageTypes().contains(msg.getType())));
+    }
+
+    @Override
+    public void destroy() {
+
+    }
+}
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbMsgTypeFilterNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbMsgTypeFilterNodeConfiguration.java
new file mode 100644
index 0000000..e766207
--- /dev/null
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbMsgTypeFilterNodeConfiguration.java
@@ -0,0 +1,15 @@
+package org.thingsboard.rule.engine.filter;
+
+import lombok.Data;
+
+import java.util.List;
+
+/**
+ * Created by ashvayka on 19.01.18.
+ */
+@Data
+public class TbMsgTypeFilterNodeConfiguration {
+
+    private List<String> messageTypes;
+
+}
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetAttributesNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetAttributesNode.java
new file mode 100644
index 0000000..b623af7
--- /dev/null
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetAttributesNode.java
@@ -0,0 +1,51 @@
+package org.thingsboard.rule.engine.metadata;
+
+import lombok.extern.slf4j.Slf4j;
+import org.thingsboard.rule.engine.TbNodeUtils;
+import org.thingsboard.rule.engine.api.*;
+import org.thingsboard.server.common.data.DataConstants;
+import org.thingsboard.server.common.data.kv.AttributeKvEntry;
+import org.thingsboard.server.dao.attributes.AttributesService;
+
+import java.util.List;
+
+/**
+ * Created by ashvayka on 19.01.18.
+ */
+@Slf4j
+public class TbGetAttributesNode implements TbNode {
+
+    TbGetAttributesNodeConfiguration config;
+
+    @Override
+    public void init(TbNodeConfiguration configuration, TbNodeState state) throws TbNodeException {
+        this.config = TbNodeUtils.convert(configuration, TbGetAttributesNodeConfiguration.class);
+    }
+
+    @Override
+    public void onMsg(TbContext ctx, TbMsg msg) throws TbNodeException {
+        try {
+            //TODO: refactor this to work async and fetch attributes from cache.
+            AttributesService service = ctx.getAttributesService();
+            fetchAttributes(msg, service, config.getClientAttributeNames(), DataConstants.CLIENT_SCOPE, "cs.");
+            fetchAttributes(msg, service, config.getServerAttributeNames(), DataConstants.SERVER_SCOPE, "ss.");
+            fetchAttributes(msg, service, config.getSharedAttributeNames(), DataConstants.SHARED_SCOPE, "shared.");
+            ctx.tellNext(msg);
+        } catch (Exception e) {
+            log.warn("[{}][{}] Failed to fetch attributes", msg.getOriginator(), msg.getId(), e);
+            throw new TbNodeException(e);
+        }
+    }
+
+    private void fetchAttributes(TbMsg msg, AttributesService service, List<String> attributeNames, String scope, String prefix) throws InterruptedException, java.util.concurrent.ExecutionException {
+        if (attributeNames != null && attributeNames.isEmpty()) {
+            List<AttributeKvEntry> attributes = service.find(msg.getOriginator(), scope, attributeNames).get();
+            attributes.forEach(attr -> msg.getMetaData().putValue(prefix + attr.getKey(), attr.getValueAsString()));
+        }
+    }
+
+    @Override
+    public void destroy() {
+
+    }
+}
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetAttributesNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetAttributesNodeConfiguration.java
new file mode 100644
index 0000000..076500c
--- /dev/null
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetAttributesNodeConfiguration.java
@@ -0,0 +1,17 @@
+package org.thingsboard.rule.engine.metadata;
+
+import lombok.Data;
+
+import java.util.List;
+
+/**
+ * Created by ashvayka on 19.01.18.
+ */
+@Data
+public class TbGetAttributesNodeConfiguration {
+
+    private List<String> clientAttributeNames;
+    private List<String> sharedAttributeNames;
+    private List<String> serverAttributeNames;
+
+}
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/TbNodeUtils.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/TbNodeUtils.java
new file mode 100644
index 0000000..962ea10
--- /dev/null
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/TbNodeUtils.java
@@ -0,0 +1,23 @@
+package org.thingsboard.rule.engine;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.thingsboard.rule.engine.api.TbNodeConfiguration;
+import org.thingsboard.rule.engine.api.TbNodeException;
+
+/**
+ * Created by ashvayka on 19.01.18.
+ */
+public class TbNodeUtils {
+
+    private static final ObjectMapper mapper = new ObjectMapper();
+
+    public static <T> T convert(TbNodeConfiguration configuration, Class<T> clazz) throws TbNodeException {
+        try {
+            return mapper.treeToValue(configuration.getData(), clazz);
+        } catch (JsonProcessingException e) {
+            throw new TbNodeException(e);
+        }
+    }
+
+}
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbTransformNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbTransformNode.java
new file mode 100644
index 0000000..837f333
--- /dev/null
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbTransformNode.java
@@ -0,0 +1,52 @@
+package org.thingsboard.rule.engine.transform;
+
+import lombok.extern.slf4j.Slf4j;
+import org.thingsboard.rule.engine.TbNodeUtils;
+import org.thingsboard.rule.engine.api.*;
+import org.thingsboard.rule.engine.metadata.TbGetAttributesNodeConfiguration;
+import org.thingsboard.server.common.data.DataConstants;
+import org.thingsboard.server.common.data.kv.AttributeKvEntry;
+import org.thingsboard.server.dao.attributes.AttributesService;
+
+import java.util.List;
+
+/**
+ * Created by ashvayka on 19.01.18.
+ */
+@Slf4j
+public class TbTransformNode implements TbNode {
+
+    TbGetAttributesNodeConfiguration config;
+
+    @Override
+    public void init(TbNodeConfiguration configuration, TbNodeState state) throws TbNodeException {
+        this.config = TbNodeUtils.convert(configuration, TbGetAttributesNodeConfiguration.class);
+    }
+
+    @Override
+    public void onMsg(TbContext ctx, TbMsg msg) throws TbNodeException {
+        try {
+            //TODO: refactor this to work async and fetch attributes from cache.
+            AttributesService service = ctx.getAttributesService();
+            fetchAttributes(msg, service, config.getClientAttributeNames(), DataConstants.CLIENT_SCOPE, "cs.");
+            fetchAttributes(msg, service, config.getServerAttributeNames(), DataConstants.SERVER_SCOPE, "ss.");
+            fetchAttributes(msg, service, config.getSharedAttributeNames(), DataConstants.SHARED_SCOPE, "shared.");
+            ctx.tellNext(msg);
+        } catch (Exception e) {
+            log.warn("[{}][{}] Failed to fetch attributes", msg.getOriginator(), msg.getId(), e);
+            throw new TbNodeException(e);
+        }
+    }
+
+    private void fetchAttributes(TbMsg msg, AttributesService service, List<String> attributeNames, String scope, String prefix) throws InterruptedException, java.util.concurrent.ExecutionException {
+        if (attributeNames != null && attributeNames.isEmpty()) {
+            List<AttributeKvEntry> attributes = service.find(msg.getOriginator(), scope, attributeNames).get();
+            attributes.forEach(attr -> msg.getMetaData().putValue(prefix + attr.getKey(), attr.getValueAsString()));
+        }
+    }
+
+    @Override
+    public void destroy() {
+
+    }
+}