thingsboard-developers
Changes
application/src/main/java/org/thingsboard/server/service/telemetry/TelemetrySubscriptionService.java 16(+2 -14)
rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineTelemetryService.java 21(+21 -0)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNode.java 34(+24 -10)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNodeConfiguration.java 4(+4 -0)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTelemetryNode.java 87(+87 -0)
Details
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
index 7dca2c8..ae60308 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
@@ -1,12 +1,12 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
- *
+ * <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -16,8 +16,10 @@
package org.thingsboard.server.actors.ruleChain;
import akka.actor.ActorRef;
+import akka.actor.Cancellable;
import com.google.common.base.Function;
import org.thingsboard.rule.engine.api.ListeningExecutor;
+import org.thingsboard.rule.engine.api.RuleEngineTelemetryService;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.common.data.id.RuleNodeId;
@@ -165,6 +167,11 @@ class DefaultTbContext implements TbContext {
}
@Override
+ public RuleEngineTelemetryService getTelemetryService() {
+ return mainCtx.getTsSubService();
+ }
+
+ @Override
public RelationService getRelationService() {
return mainCtx.getRelationService();
}
diff --git a/application/src/main/java/org/thingsboard/server/controller/plugin/TbWebSocketHandler.java b/application/src/main/java/org/thingsboard/server/controller/plugin/TbWebSocketHandler.java
index ba5f9f9..42d6a20 100644
--- a/application/src/main/java/org/thingsboard/server/controller/plugin/TbWebSocketHandler.java
+++ b/application/src/main/java/org/thingsboard/server/controller/plugin/TbWebSocketHandler.java
@@ -155,7 +155,9 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements PluginWe
if (internalId != null) {
SessionMetaData sessionMd = internalSessionMap.get(internalId);
if (sessionMd != null) {
- sessionMd.session.sendMessage(new TextMessage(msg));
+ synchronized (sessionMd) {
+ sessionMd.session.sendMessage(new TextMessage(msg));
+ }
} else {
log.warn("[{}][{}] Failed to find session by internal id", externalId, internalId);
}
diff --git a/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java b/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java
index c1338e4..25f09ca 100644
--- a/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java
+++ b/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java
@@ -1,12 +1,12 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
- *
+ * <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -115,7 +115,7 @@ public class TelemetryController extends BaseController {
}
@PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')")
- @RequestMapping(value = "/{entityType}/{entityId}/keys/ATTRIBUTES", method = RequestMethod.GET)
+ @RequestMapping(value = "/{entityType}/{entityId}/keys/attributes", method = RequestMethod.GET)
@ResponseBody
public DeferredResult<ResponseEntity> getAttributeKeys(
@PathVariable("entityType") String entityType, @PathVariable("entityId") String entityIdStr) throws ThingsboardException {
@@ -123,7 +123,7 @@ public class TelemetryController extends BaseController {
}
@PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')")
- @RequestMapping(value = "/{entityType}/{entityId}/keys/ATTRIBUTES/{scope}", method = RequestMethod.GET)
+ @RequestMapping(value = "/{entityType}/{entityId}/keys/attributes/{scope}", method = RequestMethod.GET)
@ResponseBody
public DeferredResult<ResponseEntity> getAttributeKeysByScope(
@PathVariable("entityType") String entityType, @PathVariable("entityId") String entityIdStr
@@ -133,7 +133,7 @@ public class TelemetryController extends BaseController {
}
@PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')")
- @RequestMapping(value = "/{entityType}/{entityId}/values/ATTRIBUTES", method = RequestMethod.GET)
+ @RequestMapping(value = "/{entityType}/{entityId}/values/attributes", method = RequestMethod.GET)
@ResponseBody
public DeferredResult<ResponseEntity> getAttributes(
@PathVariable("entityType") String entityType, @PathVariable("entityId") String entityIdStr,
@@ -144,7 +144,7 @@ public class TelemetryController extends BaseController {
}
@PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')")
- @RequestMapping(value = "/{entityType}/{entityId}/values/ATTRIBUTES/{scope}", method = RequestMethod.GET)
+ @RequestMapping(value = "/{entityType}/{entityId}/values/attributes/{scope}", method = RequestMethod.GET)
@ResponseBody
public DeferredResult<ResponseEntity> getAttributesByScope(
@PathVariable("entityType") String entityType, @PathVariable("entityId") String entityIdStr,
@@ -156,7 +156,7 @@ public class TelemetryController extends BaseController {
}
@PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')")
- @RequestMapping(value = "/{entityType}/{entityId}/keys/TIMESERIES", method = RequestMethod.GET)
+ @RequestMapping(value = "/{entityType}/{entityId}/keys/timeseries", method = RequestMethod.GET)
@ResponseBody
public DeferredResult<ResponseEntity> getTimeseriesKeys(
@PathVariable("entityType") String entityType, @PathVariable("entityId") String entityIdStr) throws ThingsboardException {
@@ -167,7 +167,7 @@ public class TelemetryController extends BaseController {
}
@PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')")
- @RequestMapping(value = "/{entityType}/{entityId}/values/TIMESERIES", method = RequestMethod.GET)
+ @RequestMapping(value = "/{entityType}/{entityId}/values/timeseries", method = RequestMethod.GET)
@ResponseBody
public DeferredResult<ResponseEntity> getLatestTimeseries(
@PathVariable("entityType") String entityType, @PathVariable("entityId") String entityIdStr,
@@ -180,7 +180,7 @@ public class TelemetryController extends BaseController {
@PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')")
- @RequestMapping(value = "/{entityType}/{entityId}/values/TIMESERIES", method = RequestMethod.GET)
+ @RequestMapping(value = "/{entityType}/{entityId}/values/timeseries", method = RequestMethod.GET, params = {"keys", "startTs", "endTs"})
@ResponseBody
public DeferredResult<ResponseEntity> getTimeseries(
@PathVariable("entityType") String entityType, @PathVariable("entityId") String entityIdStr,
@@ -222,7 +222,7 @@ public class TelemetryController extends BaseController {
}
@PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')")
- @RequestMapping(value = "/{entityType}/{entityId}/ATTRIBUTES/{scope}", method = RequestMethod.POST)
+ @RequestMapping(value = "/{entityType}/{entityId}/attributes/{scope}", method = RequestMethod.POST)
@ResponseBody
public DeferredResult<ResponseEntity> saveEntityAttributesV2(@PathVariable("entityType") String entityType, @PathVariable("entityId") String entityIdStr,
@PathVariable("scope") String scope,
@@ -232,7 +232,7 @@ public class TelemetryController extends BaseController {
}
@PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')")
- @RequestMapping(value = "/{entityType}/{entityId}/TIMESERIES/{scope}", method = RequestMethod.POST)
+ @RequestMapping(value = "/{entityType}/{entityId}/timeseries/{scope}", method = RequestMethod.POST)
@ResponseBody
public DeferredResult<ResponseEntity> saveEntityTelemetry(@PathVariable("entityType") String entityType, @PathVariable("entityId") String entityIdStr,
@PathVariable("scope") String scope,
@@ -242,7 +242,7 @@ public class TelemetryController extends BaseController {
}
@PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')")
- @RequestMapping(value = "/{entityType}/{entityId}/TIMESERIES/{scope}/{ttl}", method = RequestMethod.POST)
+ @RequestMapping(value = "/{entityType}/{entityId}/timeseries/{scope}/{ttl}", method = RequestMethod.POST)
@ResponseBody
public DeferredResult<ResponseEntity> saveEntityTelemetryWithTTL(@PathVariable("entityType") String entityType, @PathVariable("entityId") String entityIdStr,
@PathVariable("scope") String scope, @PathVariable("ttl") Long ttl,
diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/TelemetrySubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/TelemetrySubscriptionService.java
index 7bf223f..923d06b 100644
--- a/application/src/main/java/org/thingsboard/server/service/telemetry/TelemetrySubscriptionService.java
+++ b/application/src/main/java/org/thingsboard/server/service/telemetry/TelemetrySubscriptionService.java
@@ -15,21 +15,14 @@
*/
package org.thingsboard.server.service.telemetry;
-import com.google.common.util.concurrent.FutureCallback;
+import org.thingsboard.rule.engine.api.RuleEngineTelemetryService;
import org.thingsboard.server.common.data.id.EntityId;
-import org.thingsboard.server.common.data.kv.AttributeKvEntry;
-import org.thingsboard.server.common.data.kv.KvEntry;
-import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionState;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
/**
* Created by ashvayka on 27.03.18.
*/
-public interface TelemetrySubscriptionService {
+public interface TelemetrySubscriptionService extends RuleEngineTelemetryService {
void addLocalWsSubscription(String sessionId, EntityId entityId, SubscriptionState sub);
@@ -37,9 +30,4 @@ public interface TelemetrySubscriptionService {
void removeSubscription(String sessionId, int cmdId);
- void saveAndNotify(EntityId entityId, List<TsKvEntry> ts, FutureCallback<Void> callback);
-
- void saveAndNotify(EntityId entityId, List<TsKvEntry> ts, long ttl, FutureCallback<Void> callback);
-
- void saveAndNotify(EntityId entityId, String scope, List<AttributeKvEntry> attributes, FutureCallback<Void> callback);
}
diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineTelemetryService.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineTelemetryService.java
new file mode 100644
index 0000000..253a2af
--- /dev/null
+++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineTelemetryService.java
@@ -0,0 +1,21 @@
+package org.thingsboard.rule.engine.api;
+
+import com.google.common.util.concurrent.FutureCallback;
+import org.thingsboard.server.common.data.id.EntityId;
+import org.thingsboard.server.common.data.kv.AttributeKvEntry;
+import org.thingsboard.server.common.data.kv.TsKvEntry;
+
+import java.util.List;
+
+/**
+ * Created by ashvayka on 02.04.18.
+ */
+public interface RuleEngineTelemetryService {
+
+ void saveAndNotify(EntityId entityId, List<TsKvEntry> ts, FutureCallback<Void> callback);
+
+ void saveAndNotify(EntityId entityId, List<TsKvEntry> ts, long ttl, FutureCallback<Void> callback);
+
+ void saveAndNotify(EntityId entityId, String scope, List<AttributeKvEntry> attributes, FutureCallback<Void> callback);
+
+}
diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java
index 44bd3f5..c4514a8 100644
--- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java
+++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java
@@ -81,6 +81,8 @@ public interface TbContext {
RuleChainService getRuleChainService();
+ RuleEngineTelemetryService getTelemetryService();
+
TimeseriesService getTimeseriesService();
RelationService getRelationService();
diff --git a/rule-engine/rule-engine-components/pom.xml b/rule-engine/rule-engine-components/pom.xml
index a97493b..3deea9b 100644
--- a/rule-engine/rule-engine-components/pom.xml
+++ b/rule-engine/rule-engine-components/pom.xml
@@ -44,6 +44,11 @@
<scope>provided</scope>
</dependency>
<dependency>
+ <groupId>org.thingsboard.common</groupId>
+ <artifactId>transport</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<scope>provided</scope>
@@ -88,6 +93,10 @@
<artifactId>mockito-all</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.thingsboard.common</groupId>
+ <artifactId>transport</artifactId>
+ </dependency>
<!--<dependency>-->
<!--<groupId>org.springframework.boot</groupId>-->
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNode.java
index 43851a3..5294203 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNode.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNode.java
@@ -1,12 +1,12 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
- *
+ * <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,6 +17,7 @@ package org.thingsboard.rule.engine.debug;
import com.datastax.driver.core.utils.UUIDs;
import lombok.extern.slf4j.Slf4j;
+import org.springframework.util.StringUtils;
import org.thingsboard.rule.engine.TbNodeUtils;
import org.thingsboard.rule.engine.api.ListeningExecutor;
import org.thingsboard.rule.engine.api.RuleNode;
@@ -26,6 +27,8 @@ import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.rule.engine.filter.TbJsFilterNodeConfiguration;
import org.thingsboard.rule.engine.js.NashornJsEngine;
+import org.thingsboard.server.common.data.id.EntityId;
+import org.thingsboard.server.common.data.id.EntityIdFactory;
import org.thingsboard.server.common.data.plugin.ComponentType;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData;
@@ -33,6 +36,7 @@ import org.thingsboard.server.common.msg.TbMsgMetaData;
import javax.script.Bindings;
import java.nio.charset.StandardCharsets;
+import java.util.UUID;
import java.util.concurrent.TimeUnit;
import static org.thingsboard.rule.engine.DonAsynchron.withCallback;
@@ -53,30 +57,40 @@ public class TbMsgGeneratorNode implements TbNode {
private TbMsgGeneratorNodeConfiguration config;
private long delay;
+ private EntityId originatorId;
+ private UUID nextTickId;
@Override
public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
this.config = TbNodeUtils.convert(configuration, TbMsgGeneratorNodeConfiguration.class);
this.delay = TimeUnit.SECONDS.toMillis(config.getPeriodInSeconds());
- ctx.tellSelf(newTickMsg(ctx), delay);
+ if (!StringUtils.isEmpty(config.getOriginatorId())) {
+ originatorId = EntityIdFactory.getByTypeAndUuid(config.getOriginatorType(), config.getOriginatorId());
+ } else {
+ originatorId = ctx.getSelfId();
+ }
+ sentTickMsg(ctx);
}
@Override
public void onMsg(TbContext ctx, TbMsg msg) {
- if (msg.getType().equals(TB_MSG_GENERATOR_NODE_MSG)) {
+ if (msg.getType().equals(TB_MSG_GENERATOR_NODE_MSG) && msg.getId().equals(nextTickId)) {
TbMsgMetaData metaData = new TbMsgMetaData();
if (config.getMsgMetaData() != null) {
config.getMsgMetaData().forEach(metaData::putValue);
}
- ctx.tellNext(new TbMsg(UUIDs.timeBased(), config.getMsgType(), ctx.getSelfId(), metaData, config.getMsgBody().getBytes(StandardCharsets.UTF_8)));
- ctx.tellSelf(newTickMsg(ctx), delay);
+ ctx.tellNext(new TbMsg(UUIDs.timeBased(), config.getMsgType(), originatorId, metaData, config.getMsgBody().getBytes(StandardCharsets.UTF_8)));
+ sentTickMsg(ctx);
}
}
- private TbMsg newTickMsg(TbContext ctx) {
- return new TbMsg(UUIDs.timeBased(), TB_MSG_GENERATOR_NODE_MSG, ctx.getSelfId(), new TbMsgMetaData(), new byte[]{});
+ private void sentTickMsg(TbContext ctx) {
+ TbMsg tickMsg = new TbMsg(UUIDs.timeBased(), TB_MSG_GENERATOR_NODE_MSG, ctx.getSelfId(), new TbMsgMetaData(), new byte[]{});
+ nextTickId = tickMsg.getId();
+ ctx.tellSelf(tickMsg, delay);
}
+
@Override
public void destroy() {
}
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNodeConfiguration.java
index e15b13f..8eb37a2 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNodeConfiguration.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNodeConfiguration.java
@@ -17,6 +17,8 @@ package org.thingsboard.rule.engine.debug;
import lombok.Data;
import org.thingsboard.rule.engine.api.NodeConfiguration;
+import org.thingsboard.server.common.data.EntityType;
+
import java.util.Map;
@Data
@@ -24,6 +26,8 @@ public class TbMsgGeneratorNodeConfiguration implements NodeConfiguration<TbMsgG
private int msgCount;
private int periodInSeconds;
+ private String originatorId;
+ private EntityType originatorType;
private String msgType;
private String msgBody;
private Map<String, String> msgMetaData;
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTelemetryNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTelemetryNode.java
new file mode 100644
index 0000000..9314921
--- /dev/null
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTelemetryNode.java
@@ -0,0 +1,87 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ * <p>
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.rule.engine.telemetry;
+
+import com.google.gson.JsonParser;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.util.StringUtils;
+import org.thingsboard.rule.engine.TbNodeUtils;
+import org.thingsboard.rule.engine.api.RuleNode;
+import org.thingsboard.rule.engine.api.TbContext;
+import org.thingsboard.rule.engine.api.TbNode;
+import org.thingsboard.rule.engine.api.TbNodeConfiguration;
+import org.thingsboard.rule.engine.api.TbNodeException;
+import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
+import org.thingsboard.server.common.data.kv.KvEntry;
+import org.thingsboard.server.common.data.kv.TsKvEntry;
+import org.thingsboard.server.common.data.plugin.ComponentType;
+import org.thingsboard.server.common.msg.TbMsg;
+import org.thingsboard.server.common.msg.core.TelemetryUploadRequest;
+import org.thingsboard.server.common.transport.adaptor.JsonConverter;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+@Slf4j
+@RuleNode(
+ type = ComponentType.ACTION,
+ name = "save timeseries data",
+ configClazz = TbMsgTelemetryNodeConfiguration.class,
+ nodeDescription = "Saves timeseries data",
+ nodeDetails = "Saves timeseries telemetry data based on configurable TTL parameter. Expects messages with 'POST_TELEMETRY' message type"
+)
+
+public class TbMsgTelemetryNode implements TbNode {
+
+ private TbMsgTelemetryNodeConfiguration config;
+
+ @Override
+ public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
+ this.config = TbNodeUtils.convert(configuration, TbMsgTelemetryNodeConfiguration.class);
+ }
+
+ @Override
+ public void onMsg(TbContext ctx, TbMsg msg) {
+ if (!msg.getType().equals("POST_TELEMETRY")) {
+ ctx.tellError(msg, new IllegalArgumentException("Unsupported msg type: " + msg.getType()));
+ return;
+ }
+
+ String src = new String(msg.getData(), StandardCharsets.UTF_8);
+ TelemetryUploadRequest telemetryUploadRequest = JsonConverter.convertToTelemetry(new JsonParser().parse(src));
+ Map<Long, List<KvEntry>> tsKvMap = telemetryUploadRequest.getData();
+ if (tsKvMap == null) {
+ ctx.tellError(msg, new IllegalArgumentException("Msg body us empty: " + src));
+ return;
+ }
+ List<TsKvEntry> tsKvEntryList = new ArrayList<>();
+ for (Map.Entry<Long, List<KvEntry>> tsKvEntry : tsKvMap.entrySet()) {
+ for (KvEntry kvEntry : tsKvEntry.getValue()) {
+ tsKvEntryList.add(new BasicTsKvEntry(tsKvEntry.getKey(), kvEntry));
+ }
+ }
+ String ttlValue = msg.getMetaData().getValue("TTL");
+ long ttl = !StringUtils.isEmpty(ttlValue) ? Long.valueOf(ttlValue) : config.getDefaultTTL();
+ ctx.getTelemetryService().saveAndNotify(msg.getOriginator(), tsKvEntryList, ttl, new TelemetryNodeCallback(ctx, msg));
+ }
+
+ @Override
+ public void destroy() {
+ }
+
+}
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTelemetryNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTelemetryNodeConfiguration.java
new file mode 100644
index 0000000..8d6b7dc
--- /dev/null
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTelemetryNodeConfiguration.java
@@ -0,0 +1,34 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ * <p>
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.rule.engine.telemetry;
+
+import lombok.Data;
+import org.thingsboard.rule.engine.api.NodeConfiguration;
+
+import java.util.Map;
+
+@Data
+public class TbMsgTelemetryNodeConfiguration implements NodeConfiguration<TbMsgTelemetryNodeConfiguration> {
+
+ private long defaultTTL;
+
+ @Override
+ public TbMsgTelemetryNodeConfiguration defaultConfiguration() {
+ TbMsgTelemetryNodeConfiguration configuration = new TbMsgTelemetryNodeConfiguration();
+ configuration.setDefaultTTL(0L);
+ return configuration;
+ }
+}
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TelemetryNodeCallback.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TelemetryNodeCallback.java
new file mode 100644
index 0000000..a789326
--- /dev/null
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TelemetryNodeCallback.java
@@ -0,0 +1,27 @@
+package org.thingsboard.rule.engine.telemetry;
+
+import com.google.common.util.concurrent.FutureCallback;
+import lombok.Data;
+import org.thingsboard.rule.engine.api.TbContext;
+import org.thingsboard.server.common.msg.TbMsg;
+
+import javax.annotation.Nullable;
+
+/**
+ * Created by ashvayka on 02.04.18.
+ */
+@Data
+class TelemetryNodeCallback implements FutureCallback<Void> {
+ private final TbContext ctx;
+ private final TbMsg msg;
+
+ @Override
+ public void onSuccess(@Nullable Void result) {
+ ctx.tellNext(msg);
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ ctx.tellError(msg, t);
+ }
+}