thingsboard-developers

Telmetry plugin removed

4/2/2018 3:09:34 PM

Details

diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
index 7dca2c8..ae60308 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
@@ -1,12 +1,12 @@
 /**
  * Copyright © 2016-2018 The Thingsboard Authors
- *
+ * <p>
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -16,8 +16,10 @@
 package org.thingsboard.server.actors.ruleChain;
 
 import akka.actor.ActorRef;
+import akka.actor.Cancellable;
 import com.google.common.base.Function;
 import org.thingsboard.rule.engine.api.ListeningExecutor;
+import org.thingsboard.rule.engine.api.RuleEngineTelemetryService;
 import org.thingsboard.rule.engine.api.TbContext;
 import org.thingsboard.server.actors.ActorSystemContext;
 import org.thingsboard.server.common.data.id.RuleNodeId;
@@ -165,6 +167,11 @@ class DefaultTbContext implements TbContext {
     }
 
     @Override
+    public RuleEngineTelemetryService getTelemetryService() {
+        return mainCtx.getTsSubService();
+    }
+
+    @Override
     public RelationService getRelationService() {
         return mainCtx.getRelationService();
     }
diff --git a/application/src/main/java/org/thingsboard/server/controller/plugin/TbWebSocketHandler.java b/application/src/main/java/org/thingsboard/server/controller/plugin/TbWebSocketHandler.java
index ba5f9f9..42d6a20 100644
--- a/application/src/main/java/org/thingsboard/server/controller/plugin/TbWebSocketHandler.java
+++ b/application/src/main/java/org/thingsboard/server/controller/plugin/TbWebSocketHandler.java
@@ -155,7 +155,9 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements PluginWe
         if (internalId != null) {
             SessionMetaData sessionMd = internalSessionMap.get(internalId);
             if (sessionMd != null) {
-                sessionMd.session.sendMessage(new TextMessage(msg));
+                synchronized (sessionMd) {
+                    sessionMd.session.sendMessage(new TextMessage(msg));
+                }
             } else {
                 log.warn("[{}][{}] Failed to find session by internal id", externalId, internalId);
             }
diff --git a/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java b/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java
index c1338e4..25f09ca 100644
--- a/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java
+++ b/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java
@@ -1,12 +1,12 @@
 /**
  * Copyright © 2016-2018 The Thingsboard Authors
- *
+ * <p>
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -115,7 +115,7 @@ public class TelemetryController extends BaseController {
     }
 
     @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')")
-    @RequestMapping(value = "/{entityType}/{entityId}/keys/ATTRIBUTES", method = RequestMethod.GET)
+    @RequestMapping(value = "/{entityType}/{entityId}/keys/attributes", method = RequestMethod.GET)
     @ResponseBody
     public DeferredResult<ResponseEntity> getAttributeKeys(
             @PathVariable("entityType") String entityType, @PathVariable("entityId") String entityIdStr) throws ThingsboardException {
@@ -123,7 +123,7 @@ public class TelemetryController extends BaseController {
     }
 
     @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')")
-    @RequestMapping(value = "/{entityType}/{entityId}/keys/ATTRIBUTES/{scope}", method = RequestMethod.GET)
+    @RequestMapping(value = "/{entityType}/{entityId}/keys/attributes/{scope}", method = RequestMethod.GET)
     @ResponseBody
     public DeferredResult<ResponseEntity> getAttributeKeysByScope(
             @PathVariable("entityType") String entityType, @PathVariable("entityId") String entityIdStr
@@ -133,7 +133,7 @@ public class TelemetryController extends BaseController {
     }
 
     @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')")
-    @RequestMapping(value = "/{entityType}/{entityId}/values/ATTRIBUTES", method = RequestMethod.GET)
+    @RequestMapping(value = "/{entityType}/{entityId}/values/attributes", method = RequestMethod.GET)
     @ResponseBody
     public DeferredResult<ResponseEntity> getAttributes(
             @PathVariable("entityType") String entityType, @PathVariable("entityId") String entityIdStr,
@@ -144,7 +144,7 @@ public class TelemetryController extends BaseController {
     }
 
     @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')")
-    @RequestMapping(value = "/{entityType}/{entityId}/values/ATTRIBUTES/{scope}", method = RequestMethod.GET)
+    @RequestMapping(value = "/{entityType}/{entityId}/values/attributes/{scope}", method = RequestMethod.GET)
     @ResponseBody
     public DeferredResult<ResponseEntity> getAttributesByScope(
             @PathVariable("entityType") String entityType, @PathVariable("entityId") String entityIdStr,
@@ -156,7 +156,7 @@ public class TelemetryController extends BaseController {
     }
 
     @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')")
-    @RequestMapping(value = "/{entityType}/{entityId}/keys/TIMESERIES", method = RequestMethod.GET)
+    @RequestMapping(value = "/{entityType}/{entityId}/keys/timeseries", method = RequestMethod.GET)
     @ResponseBody
     public DeferredResult<ResponseEntity> getTimeseriesKeys(
             @PathVariable("entityType") String entityType, @PathVariable("entityId") String entityIdStr) throws ThingsboardException {
@@ -167,7 +167,7 @@ public class TelemetryController extends BaseController {
     }
 
     @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')")
-    @RequestMapping(value = "/{entityType}/{entityId}/values/TIMESERIES", method = RequestMethod.GET)
+    @RequestMapping(value = "/{entityType}/{entityId}/values/timeseries", method = RequestMethod.GET)
     @ResponseBody
     public DeferredResult<ResponseEntity> getLatestTimeseries(
             @PathVariable("entityType") String entityType, @PathVariable("entityId") String entityIdStr,
@@ -180,7 +180,7 @@ public class TelemetryController extends BaseController {
 
 
     @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')")
-    @RequestMapping(value = "/{entityType}/{entityId}/values/TIMESERIES", method = RequestMethod.GET)
+    @RequestMapping(value = "/{entityType}/{entityId}/values/timeseries", method = RequestMethod.GET, params = {"keys", "startTs", "endTs"})
     @ResponseBody
     public DeferredResult<ResponseEntity> getTimeseries(
             @PathVariable("entityType") String entityType, @PathVariable("entityId") String entityIdStr,
@@ -222,7 +222,7 @@ public class TelemetryController extends BaseController {
     }
 
     @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')")
-    @RequestMapping(value = "/{entityType}/{entityId}/ATTRIBUTES/{scope}", method = RequestMethod.POST)
+    @RequestMapping(value = "/{entityType}/{entityId}/attributes/{scope}", method = RequestMethod.POST)
     @ResponseBody
     public DeferredResult<ResponseEntity> saveEntityAttributesV2(@PathVariable("entityType") String entityType, @PathVariable("entityId") String entityIdStr,
                                                                  @PathVariable("scope") String scope,
@@ -232,7 +232,7 @@ public class TelemetryController extends BaseController {
     }
 
     @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')")
-    @RequestMapping(value = "/{entityType}/{entityId}/TIMESERIES/{scope}", method = RequestMethod.POST)
+    @RequestMapping(value = "/{entityType}/{entityId}/timeseries/{scope}", method = RequestMethod.POST)
     @ResponseBody
     public DeferredResult<ResponseEntity> saveEntityTelemetry(@PathVariable("entityType") String entityType, @PathVariable("entityId") String entityIdStr,
                                                               @PathVariable("scope") String scope,
@@ -242,7 +242,7 @@ public class TelemetryController extends BaseController {
     }
 
     @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')")
-    @RequestMapping(value = "/{entityType}/{entityId}/TIMESERIES/{scope}/{ttl}", method = RequestMethod.POST)
+    @RequestMapping(value = "/{entityType}/{entityId}/timeseries/{scope}/{ttl}", method = RequestMethod.POST)
     @ResponseBody
     public DeferredResult<ResponseEntity> saveEntityTelemetryWithTTL(@PathVariable("entityType") String entityType, @PathVariable("entityId") String entityIdStr,
                                                                      @PathVariable("scope") String scope, @PathVariable("ttl") Long ttl,
diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/TelemetrySubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/TelemetrySubscriptionService.java
index 7bf223f..923d06b 100644
--- a/application/src/main/java/org/thingsboard/server/service/telemetry/TelemetrySubscriptionService.java
+++ b/application/src/main/java/org/thingsboard/server/service/telemetry/TelemetrySubscriptionService.java
@@ -15,21 +15,14 @@
  */
 package org.thingsboard.server.service.telemetry;
 
-import com.google.common.util.concurrent.FutureCallback;
+import org.thingsboard.rule.engine.api.RuleEngineTelemetryService;
 import org.thingsboard.server.common.data.id.EntityId;
-import org.thingsboard.server.common.data.kv.AttributeKvEntry;
-import org.thingsboard.server.common.data.kv.KvEntry;
-import org.thingsboard.server.common.data.kv.TsKvEntry;
 import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionState;
 
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
 /**
  * Created by ashvayka on 27.03.18.
  */
-public interface TelemetrySubscriptionService {
+public interface TelemetrySubscriptionService extends RuleEngineTelemetryService {
 
     void addLocalWsSubscription(String sessionId, EntityId entityId, SubscriptionState sub);
 
@@ -37,9 +30,4 @@ public interface TelemetrySubscriptionService {
 
     void removeSubscription(String sessionId, int cmdId);
 
-    void saveAndNotify(EntityId entityId, List<TsKvEntry> ts, FutureCallback<Void> callback);
-
-    void saveAndNotify(EntityId entityId, List<TsKvEntry> ts, long ttl, FutureCallback<Void> callback);
-
-    void saveAndNotify(EntityId entityId, String scope, List<AttributeKvEntry> attributes, FutureCallback<Void> callback);
 }
diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineTelemetryService.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineTelemetryService.java
new file mode 100644
index 0000000..253a2af
--- /dev/null
+++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineTelemetryService.java
@@ -0,0 +1,21 @@
+package org.thingsboard.rule.engine.api;
+
+import com.google.common.util.concurrent.FutureCallback;
+import org.thingsboard.server.common.data.id.EntityId;
+import org.thingsboard.server.common.data.kv.AttributeKvEntry;
+import org.thingsboard.server.common.data.kv.TsKvEntry;
+
+import java.util.List;
+
+/**
+ * Created by ashvayka on 02.04.18.
+ */
+public interface RuleEngineTelemetryService {
+
+    void saveAndNotify(EntityId entityId, List<TsKvEntry> ts, FutureCallback<Void> callback);
+
+    void saveAndNotify(EntityId entityId, List<TsKvEntry> ts, long ttl, FutureCallback<Void> callback);
+
+    void saveAndNotify(EntityId entityId, String scope, List<AttributeKvEntry> attributes, FutureCallback<Void> callback);
+
+}
diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java
index 44bd3f5..c4514a8 100644
--- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java
+++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java
@@ -81,6 +81,8 @@ public interface TbContext {
 
     RuleChainService getRuleChainService();
 
+    RuleEngineTelemetryService getTelemetryService();
+
     TimeseriesService getTimeseriesService();
 
     RelationService getRelationService();
diff --git a/rule-engine/rule-engine-components/pom.xml b/rule-engine/rule-engine-components/pom.xml
index a97493b..3deea9b 100644
--- a/rule-engine/rule-engine-components/pom.xml
+++ b/rule-engine/rule-engine-components/pom.xml
@@ -44,6 +44,11 @@
             <scope>provided</scope>
         </dependency>
         <dependency>
+            <groupId>org.thingsboard.common</groupId>
+            <artifactId>transport</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
             <groupId>ch.qos.logback</groupId>
             <artifactId>logback-core</artifactId>
             <scope>provided</scope>
@@ -88,6 +93,10 @@
             <artifactId>mockito-all</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.thingsboard.common</groupId>
+            <artifactId>transport</artifactId>
+        </dependency>
 
         <!--<dependency>-->
             <!--<groupId>org.springframework.boot</groupId>-->
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNode.java
index 43851a3..5294203 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNode.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNode.java
@@ -1,12 +1,12 @@
 /**
  * Copyright © 2016-2018 The Thingsboard Authors
- *
+ * <p>
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,6 +17,7 @@ package org.thingsboard.rule.engine.debug;
 
 import com.datastax.driver.core.utils.UUIDs;
 import lombok.extern.slf4j.Slf4j;
+import org.springframework.util.StringUtils;
 import org.thingsboard.rule.engine.TbNodeUtils;
 import org.thingsboard.rule.engine.api.ListeningExecutor;
 import org.thingsboard.rule.engine.api.RuleNode;
@@ -26,6 +27,8 @@ import org.thingsboard.rule.engine.api.TbNodeConfiguration;
 import org.thingsboard.rule.engine.api.TbNodeException;
 import org.thingsboard.rule.engine.filter.TbJsFilterNodeConfiguration;
 import org.thingsboard.rule.engine.js.NashornJsEngine;
+import org.thingsboard.server.common.data.id.EntityId;
+import org.thingsboard.server.common.data.id.EntityIdFactory;
 import org.thingsboard.server.common.data.plugin.ComponentType;
 import org.thingsboard.server.common.msg.TbMsg;
 import org.thingsboard.server.common.msg.TbMsgMetaData;
@@ -33,6 +36,7 @@ import org.thingsboard.server.common.msg.TbMsgMetaData;
 import javax.script.Bindings;
 
 import java.nio.charset.StandardCharsets;
+import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
 import static org.thingsboard.rule.engine.DonAsynchron.withCallback;
@@ -53,30 +57,40 @@ public class TbMsgGeneratorNode implements TbNode {
 
     private TbMsgGeneratorNodeConfiguration config;
     private long delay;
+    private EntityId originatorId;
+    private UUID nextTickId;
 
     @Override
     public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
         this.config = TbNodeUtils.convert(configuration, TbMsgGeneratorNodeConfiguration.class);
         this.delay = TimeUnit.SECONDS.toMillis(config.getPeriodInSeconds());
-        ctx.tellSelf(newTickMsg(ctx), delay);
+        if (!StringUtils.isEmpty(config.getOriginatorId())) {
+            originatorId = EntityIdFactory.getByTypeAndUuid(config.getOriginatorType(), config.getOriginatorId());
+        } else {
+            originatorId = ctx.getSelfId();
+        }
+        sentTickMsg(ctx);
     }
 
     @Override
     public void onMsg(TbContext ctx, TbMsg msg) {
-        if (msg.getType().equals(TB_MSG_GENERATOR_NODE_MSG)) {
+        if (msg.getType().equals(TB_MSG_GENERATOR_NODE_MSG) && msg.getId().equals(nextTickId)) {
             TbMsgMetaData metaData = new TbMsgMetaData();
             if (config.getMsgMetaData() != null) {
                 config.getMsgMetaData().forEach(metaData::putValue);
             }
-            ctx.tellNext(new TbMsg(UUIDs.timeBased(), config.getMsgType(), ctx.getSelfId(), metaData, config.getMsgBody().getBytes(StandardCharsets.UTF_8)));
-            ctx.tellSelf(newTickMsg(ctx), delay);
+            ctx.tellNext(new TbMsg(UUIDs.timeBased(), config.getMsgType(), originatorId, metaData, config.getMsgBody().getBytes(StandardCharsets.UTF_8)));
+            sentTickMsg(ctx);
         }
     }
 
-    private TbMsg newTickMsg(TbContext ctx) {
-        return new TbMsg(UUIDs.timeBased(), TB_MSG_GENERATOR_NODE_MSG, ctx.getSelfId(), new TbMsgMetaData(), new byte[]{});
+    private void sentTickMsg(TbContext ctx) {
+        TbMsg tickMsg = new TbMsg(UUIDs.timeBased(), TB_MSG_GENERATOR_NODE_MSG, ctx.getSelfId(), new TbMsgMetaData(), new byte[]{});
+        nextTickId = tickMsg.getId();
+        ctx.tellSelf(tickMsg, delay);
     }
 
+
     @Override
     public void destroy() {
     }
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNodeConfiguration.java
index e15b13f..8eb37a2 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNodeConfiguration.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNodeConfiguration.java
@@ -17,6 +17,8 @@ package org.thingsboard.rule.engine.debug;
 
 import lombok.Data;
 import org.thingsboard.rule.engine.api.NodeConfiguration;
+import org.thingsboard.server.common.data.EntityType;
+
 import java.util.Map;
 
 @Data
@@ -24,6 +26,8 @@ public class TbMsgGeneratorNodeConfiguration implements NodeConfiguration<TbMsgG
 
     private int msgCount;
     private int periodInSeconds;
+    private String originatorId;
+    private EntityType originatorType;
     private String msgType;
     private String msgBody;
     private Map<String, String> msgMetaData;
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTelemetryNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTelemetryNode.java
new file mode 100644
index 0000000..9314921
--- /dev/null
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTelemetryNode.java
@@ -0,0 +1,87 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ * <p>
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.rule.engine.telemetry;
+
+import com.google.gson.JsonParser;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.util.StringUtils;
+import org.thingsboard.rule.engine.TbNodeUtils;
+import org.thingsboard.rule.engine.api.RuleNode;
+import org.thingsboard.rule.engine.api.TbContext;
+import org.thingsboard.rule.engine.api.TbNode;
+import org.thingsboard.rule.engine.api.TbNodeConfiguration;
+import org.thingsboard.rule.engine.api.TbNodeException;
+import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
+import org.thingsboard.server.common.data.kv.KvEntry;
+import org.thingsboard.server.common.data.kv.TsKvEntry;
+import org.thingsboard.server.common.data.plugin.ComponentType;
+import org.thingsboard.server.common.msg.TbMsg;
+import org.thingsboard.server.common.msg.core.TelemetryUploadRequest;
+import org.thingsboard.server.common.transport.adaptor.JsonConverter;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+@Slf4j
+@RuleNode(
+        type = ComponentType.ACTION,
+        name = "save timeseries data",
+        configClazz = TbMsgTelemetryNodeConfiguration.class,
+        nodeDescription = "Saves timeseries data",
+        nodeDetails = "Saves timeseries telemetry data based on configurable TTL parameter. Expects messages with 'POST_TELEMETRY' message type"
+)
+
+public class TbMsgTelemetryNode implements TbNode {
+
+    private TbMsgTelemetryNodeConfiguration config;
+
+    @Override
+    public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
+        this.config = TbNodeUtils.convert(configuration, TbMsgTelemetryNodeConfiguration.class);
+    }
+
+    @Override
+    public void onMsg(TbContext ctx, TbMsg msg) {
+        if (!msg.getType().equals("POST_TELEMETRY")) {
+            ctx.tellError(msg, new IllegalArgumentException("Unsupported msg type: " + msg.getType()));
+            return;
+        }
+
+        String src = new String(msg.getData(), StandardCharsets.UTF_8);
+        TelemetryUploadRequest telemetryUploadRequest = JsonConverter.convertToTelemetry(new JsonParser().parse(src));
+        Map<Long, List<KvEntry>> tsKvMap = telemetryUploadRequest.getData();
+        if (tsKvMap == null) {
+            ctx.tellError(msg, new IllegalArgumentException("Msg body us empty: " + src));
+            return;
+        }
+        List<TsKvEntry> tsKvEntryList = new ArrayList<>();
+        for (Map.Entry<Long, List<KvEntry>> tsKvEntry : tsKvMap.entrySet()) {
+            for (KvEntry kvEntry : tsKvEntry.getValue()) {
+                tsKvEntryList.add(new BasicTsKvEntry(tsKvEntry.getKey(), kvEntry));
+            }
+        }
+        String ttlValue = msg.getMetaData().getValue("TTL");
+        long ttl = !StringUtils.isEmpty(ttlValue) ? Long.valueOf(ttlValue) : config.getDefaultTTL();
+        ctx.getTelemetryService().saveAndNotify(msg.getOriginator(), tsKvEntryList, ttl, new TelemetryNodeCallback(ctx, msg));
+    }
+
+    @Override
+    public void destroy() {
+    }
+
+}
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTelemetryNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTelemetryNodeConfiguration.java
new file mode 100644
index 0000000..8d6b7dc
--- /dev/null
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTelemetryNodeConfiguration.java
@@ -0,0 +1,34 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ * <p>
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.rule.engine.telemetry;
+
+import lombok.Data;
+import org.thingsboard.rule.engine.api.NodeConfiguration;
+
+import java.util.Map;
+
+@Data
+public class TbMsgTelemetryNodeConfiguration implements NodeConfiguration<TbMsgTelemetryNodeConfiguration> {
+
+    private long defaultTTL;
+
+    @Override
+    public TbMsgTelemetryNodeConfiguration defaultConfiguration() {
+        TbMsgTelemetryNodeConfiguration configuration = new TbMsgTelemetryNodeConfiguration();
+        configuration.setDefaultTTL(0L);
+        return configuration;
+    }
+}
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TelemetryNodeCallback.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TelemetryNodeCallback.java
new file mode 100644
index 0000000..a789326
--- /dev/null
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TelemetryNodeCallback.java
@@ -0,0 +1,27 @@
+package org.thingsboard.rule.engine.telemetry;
+
+import com.google.common.util.concurrent.FutureCallback;
+import lombok.Data;
+import org.thingsboard.rule.engine.api.TbContext;
+import org.thingsboard.server.common.msg.TbMsg;
+
+import javax.annotation.Nullable;
+
+/**
+ * Created by ashvayka on 02.04.18.
+ */
+@Data
+class TelemetryNodeCallback implements FutureCallback<Void> {
+    private final TbContext ctx;
+    private final TbMsg msg;
+
+    @Override
+    public void onSuccess(@Nullable Void result) {
+        ctx.tellNext(msg);
+    }
+
+    @Override
+    public void onFailure(Throwable t) {
+        ctx.tellError(msg, t);
+    }
+}