thingsboard-aplcache

Update post telemetry message body format. Add relationType

5/11/2018 2:04:02 PM

Changes

Details

diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
index 54737f8..665d2d9 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
@@ -312,23 +312,23 @@ public class ActorSystemContext {
         return discoveryService.getCurrentServer().getServerAddress().toString();
     }
 
-    public void persistDebugInput(TenantId tenantId, EntityId entityId, TbMsg tbMsg) {
-        persistDebug(tenantId, entityId, "IN", tbMsg, null);
+    public void persistDebugInput(TenantId tenantId, EntityId entityId, TbMsg tbMsg, String relationType) {
+        persistDebug(tenantId, entityId, "IN", tbMsg, relationType, null);
     }
 
-    public void persistDebugInput(TenantId tenantId, EntityId entityId, TbMsg tbMsg, Throwable error) {
-        persistDebug(tenantId, entityId, "IN", tbMsg, error);
+    public void persistDebugInput(TenantId tenantId, EntityId entityId, TbMsg tbMsg, String relationType, Throwable error) {
+        persistDebug(tenantId, entityId, "IN", tbMsg, relationType, error);
     }
 
-    public void persistDebugOutput(TenantId tenantId, EntityId entityId, TbMsg tbMsg, Throwable error) {
-        persistDebug(tenantId, entityId, "OUT", tbMsg, error);
+    public void persistDebugOutput(TenantId tenantId, EntityId entityId, TbMsg tbMsg, String relationType, Throwable error) {
+        persistDebug(tenantId, entityId, "OUT", tbMsg, relationType, error);
     }
 
-    public void persistDebugOutput(TenantId tenantId, EntityId entityId, TbMsg tbMsg) {
-        persistDebug(tenantId, entityId, "OUT", tbMsg, null);
+    public void persistDebugOutput(TenantId tenantId, EntityId entityId, TbMsg tbMsg, String relationType) {
+        persistDebug(tenantId, entityId, "OUT", tbMsg, relationType, null);
     }
 
-    private void persistDebug(TenantId tenantId, EntityId entityId, String type, TbMsg tbMsg, Throwable error) {
+    private void persistDebug(TenantId tenantId, EntityId entityId, String type, TbMsg tbMsg, String relationType, Throwable error) {
         try {
             Event event = new Event();
             event.setTenantId(tenantId);
@@ -345,6 +345,7 @@ public class ActorSystemContext {
                     .put("msgId", tbMsg.getId().toString())
                     .put("msgType", tbMsg.getType())
                     .put("dataType", tbMsg.getDataType().name())
+                    .put("relationType", relationType)
                     .put("data", tbMsg.getData())
                     .put("metadata", metadata);
 
diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
index dd270e2..f5f5848 100644
--- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
@@ -355,16 +355,15 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
 
         for (Map.Entry<Long, List<KvEntry>> entry : tsData.entrySet()) {
             JsonObject json = new JsonObject();
-            json.addProperty("ts", entry.getKey());
-            JsonObject values = new JsonObject();
             for (KvEntry kv : entry.getValue()) {
-                kv.getBooleanValue().ifPresent(v -> values.addProperty(kv.getKey(), v));
-                kv.getLongValue().ifPresent(v -> values.addProperty(kv.getKey(), v));
-                kv.getDoubleValue().ifPresent(v -> values.addProperty(kv.getKey(), v));
-                kv.getStrValue().ifPresent(v -> values.addProperty(kv.getKey(), v));
+                kv.getBooleanValue().ifPresent(v -> json.addProperty(kv.getKey(), v));
+                kv.getLongValue().ifPresent(v -> json.addProperty(kv.getKey(), v));
+                kv.getDoubleValue().ifPresent(v -> json.addProperty(kv.getKey(), v));
+                kv.getStrValue().ifPresent(v -> json.addProperty(kv.getKey(), v));
             }
-            json.add("values", values);
-            TbMsg tbMsg = new TbMsg(UUIDs.timeBased(), SessionMsgType.POST_TELEMETRY_REQUEST.name(), deviceId, defaultMetaData.copy(), TbMsgDataType.JSON, gson.toJson(json), null, null, 0L);
+            TbMsgMetaData metaData = defaultMetaData.copy();
+            metaData.putValue("ts", entry.getKey()+"");
+            TbMsg tbMsg = new TbMsg(UUIDs.timeBased(), SessionMsgType.POST_TELEMETRY_REQUEST.name(), deviceId, metaData, TbMsgDataType.JSON, gson.toJson(json), null, null, 0L);
             pushToRuleEngineWithTimeout(context, tbMsg, msgData);
         }
     }
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
index 039fdf6..2e8955b 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
@@ -69,7 +69,7 @@ class DefaultTbContext implements TbContext {
     @Override
     public void tellNext(TbMsg msg, String relationType, Throwable th) {
         if (nodeCtx.getSelf().isDebugMode()) {
-            mainCtx.persistDebugOutput(nodeCtx.getTenantId(), nodeCtx.getSelf().getId(), msg, th);
+            mainCtx.persistDebugOutput(nodeCtx.getTenantId(), nodeCtx.getSelf().getId(), msg, relationType, th);
         }
         nodeCtx.getChainActor().tell(new RuleNodeToRuleChainTellNextMsg(nodeCtx.getSelf().getId(), relationType, msg), nodeCtx.getSelfActor());
     }
@@ -102,7 +102,7 @@ class DefaultTbContext implements TbContext {
     @Override
     public void tellError(TbMsg msg, Throwable th) {
         if (nodeCtx.getSelf().isDebugMode()) {
-            mainCtx.persistDebugOutput(nodeCtx.getTenantId(), nodeCtx.getSelf().getId(), msg, th);
+            mainCtx.persistDebugOutput(nodeCtx.getTenantId(), nodeCtx.getSelf().getId(), msg, "", th);
         }
         nodeCtx.getSelfActor().tell(new RuleNodeToSelfErrorMsg(msg, th), nodeCtx.getSelfActor());
     }
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
index 9c60327..d069cb0 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
@@ -96,12 +96,12 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
     private void reprocess(List<RuleNode> ruleNodeList) {
         for (RuleNode ruleNode : ruleNodeList) {
             for (TbMsg tbMsg : queue.findUnprocessed(ruleNode.getId().getId(), 0L)) {
-                pushMsgToNode(nodeActors.get(ruleNode.getId()), tbMsg);
+                pushMsgToNode(nodeActors.get(ruleNode.getId()), tbMsg, "");
             }
         }
         if (firstNode != null) {
             for (TbMsg tbMsg : queue.findUnprocessed(entityId.getId(), 0L)) {
-                pushMsgToNode(firstNode, tbMsg);
+                pushMsgToNode(firstNode, tbMsg, "");
             }
         }
     }
@@ -183,13 +183,13 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
 
     void onServiceToRuleEngineMsg(ServiceToRuleEngineMsg envelope) {
         checkActive();
-        putToQueue(enrichWithRuleChainId(envelope.getTbMsg()), msg -> pushMsgToNode(firstNode, msg));
+        putToQueue(enrichWithRuleChainId(envelope.getTbMsg()), msg -> pushMsgToNode(firstNode, msg, ""));
     }
 
     void onDeviceActorToRuleEngineMsg(DeviceActorToRuleEngineMsg envelope) {
         checkActive();
         putToQueue(enrichWithRuleChainId(envelope.getTbMsg()), msg -> {
-            pushMsgToNode(firstNode, msg);
+            pushMsgToNode(firstNode, msg, "");
             envelope.getCallbackRef().tell(new RuleEngineQueuePutAckMsg(msg.getId()), self);
         });
     }
@@ -197,9 +197,9 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
     void onRuleChainToRuleChainMsg(RuleChainToRuleChainMsg envelope) {
         checkActive();
         if (envelope.isEnqueue()) {
-            putToQueue(enrichWithRuleChainId(envelope.getMsg()), msg -> pushMsgToNode(firstNode, msg));
+            putToQueue(enrichWithRuleChainId(envelope.getMsg()), msg -> pushMsgToNode(firstNode, msg, envelope.getFromRelationType()));
         } else {
-            pushMsgToNode(firstNode, envelope.getMsg());
+            pushMsgToNode(firstNode, envelope.getMsg(), envelope.getFromRelationType());
         }
     }
 
@@ -218,17 +218,17 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
             queue.ack(msg, ackId.getId(), msg.getClusterPartition());
         } else if (relationsCount == 1) {
             for (RuleNodeRelation relation : relations) {
-                pushToTarget(msg, relation.getOut());
+                pushToTarget(msg, relation.getOut(), relation.getType());
             }
         } else {
             for (RuleNodeRelation relation : relations) {
                 EntityId target = relation.getOut();
                 switch (target.getEntityType()) {
                     case RULE_NODE:
-                        enqueueAndForwardMsgCopyToNode(msg, target);
+                        enqueueAndForwardMsgCopyToNode(msg, target, relation.getType());
                         break;
                     case RULE_CHAIN:
-                        enqueueAndForwardMsgCopyToChain(msg, target);
+                        enqueueAndForwardMsgCopyToChain(msg, target, relation.getType());
                         break;
                 }
             }
@@ -237,33 +237,33 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
         }
     }
 
-    private void enqueueAndForwardMsgCopyToChain(TbMsg msg, EntityId target) {
+    private void enqueueAndForwardMsgCopyToChain(TbMsg msg, EntityId target, String fromRelationType) {
         RuleChainId targetRCId = new RuleChainId(target.getId());
         TbMsg copyMsg = msg.copy(UUIDs.timeBased(), targetRCId, null, DEFAULT_CLUSTER_PARTITION);
-        parent.tell(new RuleChainToRuleChainMsg(new RuleChainId(target.getId()), entityId, copyMsg, true), self);
+        parent.tell(new RuleChainToRuleChainMsg(new RuleChainId(target.getId()), entityId, copyMsg, fromRelationType, true), self);
     }
 
-    private void enqueueAndForwardMsgCopyToNode(TbMsg msg, EntityId target) {
+    private void enqueueAndForwardMsgCopyToNode(TbMsg msg, EntityId target, String fromRelationType) {
         RuleNodeId targetId = new RuleNodeId(target.getId());
         RuleNodeCtx targetNodeCtx = nodeActors.get(targetId);
         TbMsg copy = msg.copy(UUIDs.timeBased(), entityId, targetId, DEFAULT_CLUSTER_PARTITION);
-        putToQueue(copy, queuedMsg -> pushMsgToNode(targetNodeCtx, queuedMsg));
+        putToQueue(copy, queuedMsg -> pushMsgToNode(targetNodeCtx, queuedMsg, fromRelationType));
     }
 
-    private void pushToTarget(TbMsg msg, EntityId target) {
+    private void pushToTarget(TbMsg msg, EntityId target, String fromRelationType) {
         switch (target.getEntityType()) {
             case RULE_NODE:
-                pushMsgToNode(nodeActors.get(new RuleNodeId(target.getId())), msg);
+                pushMsgToNode(nodeActors.get(new RuleNodeId(target.getId())), msg, fromRelationType);
                 break;
             case RULE_CHAIN:
-                parent.tell(new RuleChainToRuleChainMsg(new RuleChainId(target.getId()), entityId, msg, false), self);
+                parent.tell(new RuleChainToRuleChainMsg(new RuleChainId(target.getId()), entityId, msg, fromRelationType, false), self);
                 break;
         }
     }
 
-    private void pushMsgToNode(RuleNodeCtx nodeCtx, TbMsg msg) {
+    private void pushMsgToNode(RuleNodeCtx nodeCtx, TbMsg msg, String fromRelationType) {
         if (nodeCtx != null) {
-            nodeCtx.getSelfActor().tell(new RuleChainToRuleNodeMsg(new DefaultTbContext(systemContext, nodeCtx), msg), self);
+            nodeCtx.getSelfActor().tell(new RuleChainToRuleNodeMsg(new DefaultTbContext(systemContext, nodeCtx), msg, fromRelationType), self);
         }
     }
 
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainToRuleChainMsg.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainToRuleChainMsg.java
index 2b2623b..0861646 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainToRuleChainMsg.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainToRuleChainMsg.java
@@ -31,6 +31,7 @@ public final class RuleChainToRuleChainMsg implements TbActorMsg {
     private final RuleChainId target;
     private final RuleChainId source;
     private final TbMsg msg;
+    private final String fromRelationType;
     private final boolean enqueue;
 
     @Override
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainToRuleNodeMsg.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainToRuleNodeMsg.java
index e7d866c..abcddc9 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainToRuleNodeMsg.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainToRuleNodeMsg.java
@@ -29,6 +29,7 @@ final class RuleChainToRuleNodeMsg implements TbActorMsg {
 
     private final TbContext ctx;
     private final TbMsg msg;
+    private final String fromRelationType;
 
     @Override
     public MsgType getMsgType() {
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActorMessageProcessor.java
index ea857db..f23ba6b 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActorMessageProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActorMessageProcessor.java
@@ -93,7 +93,7 @@ public class RuleNodeActorMessageProcessor extends ComponentMsgProcessor<RuleNod
     public void onRuleToSelfMsg(RuleNodeToSelfMsg msg) throws Exception {
         checkActive();
         if (ruleNode.isDebugMode()) {
-            systemContext.persistDebugInput(tenantId, entityId, msg.getMsg());
+            systemContext.persistDebugInput(tenantId, entityId, msg.getMsg(), "Self");
         }
         tbNode.onMsg(defaultCtx, msg.getMsg());
     }
@@ -101,7 +101,7 @@ public class RuleNodeActorMessageProcessor extends ComponentMsgProcessor<RuleNod
     void onRuleChainToRuleNodeMsg(RuleChainToRuleNodeMsg msg) throws Exception {
         checkActive();
         if (ruleNode.isDebugMode()) {
-            systemContext.persistDebugInput(tenantId, entityId, msg.getMsg());
+            systemContext.persistDebugInput(tenantId, entityId, msg.getMsg(), msg.getFromRelationType());
         }
         tbNode.onMsg(msg.getCtx(), msg.getMsg());
     }
diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/adaptor/JsonConverter.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/adaptor/JsonConverter.java
index b99c450..e7c1734 100644
--- a/common/transport/src/main/java/org/thingsboard/server/common/transport/adaptor/JsonConverter.java
+++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/adaptor/JsonConverter.java
@@ -36,9 +36,16 @@ public class JsonConverter {
         return convertToTelemetry(jsonObject, BasicRequest.DEFAULT_REQUEST_ID);
     }
 
+    public static TelemetryUploadRequest convertToTelemetry(JsonElement jsonObject, long ts) throws JsonSyntaxException {
+        return convertToTelemetry(jsonObject, ts, BasicRequest.DEFAULT_REQUEST_ID);
+    }
+
     public static TelemetryUploadRequest convertToTelemetry(JsonElement jsonObject, int requestId) throws JsonSyntaxException {
+        return convertToTelemetry(jsonObject, System.currentTimeMillis(), requestId);
+    }
+
+    private static TelemetryUploadRequest convertToTelemetry(JsonElement jsonObject, long systemTs, int requestId) throws JsonSyntaxException {
         BasicTelemetryUploadRequest request = new BasicTelemetryUploadRequest(requestId);
-        long systemTs = System.currentTimeMillis();
         if (jsonObject.isJsonObject()) {
             parseObject(request, systemTs, jsonObject);
         } else if (jsonObject.isJsonArray()) {
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsFilterNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsFilterNode.java
index 8ad344f..c77d122 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsFilterNode.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsFilterNode.java
@@ -52,7 +52,7 @@ public class TbJsFilterNode implements TbNode {
     public void onMsg(TbContext ctx, TbMsg msg) {
         ListeningExecutor jsExecutor = ctx.getJsExecutor();
         withCallback(jsExecutor.executeAsync(() -> jsEngine.executeFilter(msg)),
-                filterResult -> ctx.tellNext(msg, Boolean.toString(filterResult)),
+                filterResult -> ctx.tellNext(msg, filterResult.booleanValue() ? "True" : "False"),
                 t -> ctx.tellError(msg, t));
     }
 
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsSwitchNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsSwitchNodeConfiguration.java
index 4c5808b..671132d 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsSwitchNodeConfiguration.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsSwitchNodeConfiguration.java
@@ -31,7 +31,7 @@ public class TbJsSwitchNodeConfiguration implements NodeConfiguration<TbJsSwitch
         TbJsSwitchNodeConfiguration configuration = new TbJsSwitchNodeConfiguration();
         configuration.setJsScript("function nextRelation(metadata, msg) {\n" +
                 "    return ['one','nine'];\n" +
-                "};\n" +
+                "}\n" +
                 "if(msgType === 'POST_TELEMETRY') {\n" +
                 "    return ['two'];\n" +
                 "}\n" +
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbMsgTypeFilterNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbMsgTypeFilterNode.java
index df826e6..a183549 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbMsgTypeFilterNode.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbMsgTypeFilterNode.java
@@ -45,7 +45,7 @@ public class TbMsgTypeFilterNode implements TbNode {
 
     @Override
     public void onMsg(TbContext ctx, TbMsg msg) throws TbNodeException {
-        ctx.tellNext(msg, Boolean.toString(config.getMessageTypes().contains(msg.getType())));
+        ctx.tellNext(msg, config.getMessageTypes().contains(msg.getType()) ? "True" : "False");
     }
 
     @Override
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java
index f2632d7..9694b04 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java
@@ -80,8 +80,7 @@ public class TbMqttNode implements TbNode {
         this.mqttClient.publish(topic, Unpooled.wrappedBuffer(msg.getData().getBytes(UTF8)), MqttQoS.AT_LEAST_ONCE)
                 .addListener(future -> {
                     if (future.isSuccess()) {
-                        TbMsg next = ctx.transformMsg(msg, msg.getType(), msg.getOriginator(), msg.getMetaData(), msg.getData());
-                        ctx.tellNext(next, TbRelationTypes.SUCCESS);
+                        ctx.tellNext(msg, TbRelationTypes.SUCCESS);
                     } else {
                         TbMsg next = processException(ctx, msg, future.cause());
                         ctx.tellNext(next, TbRelationTypes.FAILURE, future.cause());
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rabbitmq/TbRabbitMqNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rabbitmq/TbRabbitMqNode.java
index 69e7c1d..993b172 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rabbitmq/TbRabbitMqNode.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rabbitmq/TbRabbitMqNode.java
@@ -106,7 +106,7 @@ public class TbRabbitMqNode implements TbNode {
                 routingKey,
                 properties,
                 msg.getData().getBytes(UTF8));
-        return ctx.transformMsg(msg, msg.getType(), msg.getOriginator(), msg.getMetaData(), msg.getData());
+        return msg;
     }
 
     private TbMsg processException(TbContext ctx, TbMsg origMsg, Throwable t) {
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java
index 114ca27..d91d5d4 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java
@@ -63,12 +63,22 @@ public class TbMsgTimeseriesNode implements TbNode {
             ctx.tellError(msg, new IllegalArgumentException("Unsupported msg type: " + msg.getType()));
             return;
         }
-
+        long ts = -1;
+        String tsStr = msg.getMetaData().getValue("ts");
+        if (!StringUtils.isEmpty(tsStr)) {
+            try {
+                ts = Long.parseLong(tsStr);
+            } catch (NumberFormatException e) {}
+        }
+        if (ts == -1) {
+            ctx.tellError(msg, new IllegalArgumentException("Msg metadata doesn't contain valid ts value: " + msg.getMetaData()));
+            return;
+        }
         String src = msg.getData();
-        TelemetryUploadRequest telemetryUploadRequest = JsonConverter.convertToTelemetry(new JsonParser().parse(src));
+        TelemetryUploadRequest telemetryUploadRequest = JsonConverter.convertToTelemetry(new JsonParser().parse(src), ts);
         Map<Long, List<KvEntry>> tsKvMap = telemetryUploadRequest.getData();
         if (tsKvMap == null) {
-            ctx.tellError(msg, new IllegalArgumentException("Msg body us empty: " + src));
+            ctx.tellError(msg, new IllegalArgumentException("Msg body is empty: " + src));
             return;
         }
         List<TsKvEntry> tsKvEntryList = new ArrayList<>();
diff --git a/ui/src/app/event/event-header-debug-rulenode.tpl.html b/ui/src/app/event/event-header-debug-rulenode.tpl.html
index 34f4513..bbf6ae0 100644
--- a/ui/src/app/event/event-header-debug-rulenode.tpl.html
+++ b/ui/src/app/event/event-header-debug-rulenode.tpl.html
@@ -21,7 +21,7 @@
 <div translate class="tb-cell" flex="15">event.entity</div>
 <div translate class="tb-cell" flex="20">event.message-id</div>
 <div translate class="tb-cell" flex="20">event.message-type</div>
-<div translate class="tb-cell" flex="15">event.data-type</div>
+<div translate class="tb-cell" flex="15">event.relation-type</div>
 <div translate class="tb-cell" flex="10">event.data</div>
 <div translate class="tb-cell" flex="10">event.metadata</div>
 <div translate class="tb-cell" flex="10">event.error</div>
diff --git a/ui/src/app/event/event-row-debug-rulenode.tpl.html b/ui/src/app/event/event-row-debug-rulenode.tpl.html
index bb832b1..857cd8c 100644
--- a/ui/src/app/event/event-row-debug-rulenode.tpl.html
+++ b/ui/src/app/event/event-row-debug-rulenode.tpl.html
@@ -21,7 +21,7 @@
 <div class="tb-cell" flex="15">{{event.body.entityName}}</div>
 <div class="tb-cell tb-nowrap" flex="20" ng-mouseenter="checkTooltip($event)">{{event.body.msgId}}</div>
 <div class="tb-cell" flex="20" ng-mouseenter="checkTooltip($event)">{{event.body.msgType}}</div>
-<div class="tb-cell" flex="15">{{event.body.dataType}}</div>
+<div class="tb-cell" flex="15">{{event.body.relationType}}</div>
 <div class="tb-cell" flex="10">
     <md-button ng-if="event.body.data" class="md-icon-button md-primary"
                ng-click="showContent($event, event.body.data, 'event.data', event.body.dataType)"
diff --git a/ui/src/app/locale/locale.constant.js b/ui/src/app/locale/locale.constant.js
index 141ac67..fc161e7 100644
--- a/ui/src/app/locale/locale.constant.js
+++ b/ui/src/app/locale/locale.constant.js
@@ -782,6 +782,7 @@ export default angular.module('thingsboard.locale', [])
                     "message-id": "Message Id",
                     "message-type": "Message Type",
                     "data-type": "Data Type",
+                    "relation-type": "Relation Type",
                     "metadata": "Metadata",
                     "data": "Data",
                     "event": "Event",