thingsboard-aplcache
Changes
application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java 15(+7 -8)
application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java 36(+18 -18)
application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainToRuleChainMsg.java 1(+1 -0)
application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainToRuleNodeMsg.java 1(+1 -0)
application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActorMessageProcessor.java 4(+2 -2)
common/transport/src/main/java/org/thingsboard/server/common/transport/adaptor/JsonConverter.java 9(+8 -1)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsFilterNode.java 2(+1 -1)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsSwitchNodeConfiguration.java 2(+1 -1)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbMsgTypeFilterNode.java 2(+1 -1)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java 3(+1 -2)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rabbitmq/TbRabbitMqNode.java 2(+1 -1)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java 16(+13 -3)
ui/src/app/locale/locale.constant.js 1(+1 -0)
Details
diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
index 54737f8..665d2d9 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
@@ -312,23 +312,23 @@ public class ActorSystemContext {
return discoveryService.getCurrentServer().getServerAddress().toString();
}
- public void persistDebugInput(TenantId tenantId, EntityId entityId, TbMsg tbMsg) {
- persistDebug(tenantId, entityId, "IN", tbMsg, null);
+ public void persistDebugInput(TenantId tenantId, EntityId entityId, TbMsg tbMsg, String relationType) {
+ persistDebug(tenantId, entityId, "IN", tbMsg, relationType, null);
}
- public void persistDebugInput(TenantId tenantId, EntityId entityId, TbMsg tbMsg, Throwable error) {
- persistDebug(tenantId, entityId, "IN", tbMsg, error);
+ public void persistDebugInput(TenantId tenantId, EntityId entityId, TbMsg tbMsg, String relationType, Throwable error) {
+ persistDebug(tenantId, entityId, "IN", tbMsg, relationType, error);
}
- public void persistDebugOutput(TenantId tenantId, EntityId entityId, TbMsg tbMsg, Throwable error) {
- persistDebug(tenantId, entityId, "OUT", tbMsg, error);
+ public void persistDebugOutput(TenantId tenantId, EntityId entityId, TbMsg tbMsg, String relationType, Throwable error) {
+ persistDebug(tenantId, entityId, "OUT", tbMsg, relationType, error);
}
- public void persistDebugOutput(TenantId tenantId, EntityId entityId, TbMsg tbMsg) {
- persistDebug(tenantId, entityId, "OUT", tbMsg, null);
+ public void persistDebugOutput(TenantId tenantId, EntityId entityId, TbMsg tbMsg, String relationType) {
+ persistDebug(tenantId, entityId, "OUT", tbMsg, relationType, null);
}
- private void persistDebug(TenantId tenantId, EntityId entityId, String type, TbMsg tbMsg, Throwable error) {
+ private void persistDebug(TenantId tenantId, EntityId entityId, String type, TbMsg tbMsg, String relationType, Throwable error) {
try {
Event event = new Event();
event.setTenantId(tenantId);
@@ -345,6 +345,7 @@ public class ActorSystemContext {
.put("msgId", tbMsg.getId().toString())
.put("msgType", tbMsg.getType())
.put("dataType", tbMsg.getDataType().name())
+ .put("relationType", relationType)
.put("data", tbMsg.getData())
.put("metadata", metadata);
diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
index dd270e2..f5f5848 100644
--- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
@@ -355,16 +355,15 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
for (Map.Entry<Long, List<KvEntry>> entry : tsData.entrySet()) {
JsonObject json = new JsonObject();
- json.addProperty("ts", entry.getKey());
- JsonObject values = new JsonObject();
for (KvEntry kv : entry.getValue()) {
- kv.getBooleanValue().ifPresent(v -> values.addProperty(kv.getKey(), v));
- kv.getLongValue().ifPresent(v -> values.addProperty(kv.getKey(), v));
- kv.getDoubleValue().ifPresent(v -> values.addProperty(kv.getKey(), v));
- kv.getStrValue().ifPresent(v -> values.addProperty(kv.getKey(), v));
+ kv.getBooleanValue().ifPresent(v -> json.addProperty(kv.getKey(), v));
+ kv.getLongValue().ifPresent(v -> json.addProperty(kv.getKey(), v));
+ kv.getDoubleValue().ifPresent(v -> json.addProperty(kv.getKey(), v));
+ kv.getStrValue().ifPresent(v -> json.addProperty(kv.getKey(), v));
}
- json.add("values", values);
- TbMsg tbMsg = new TbMsg(UUIDs.timeBased(), SessionMsgType.POST_TELEMETRY_REQUEST.name(), deviceId, defaultMetaData.copy(), TbMsgDataType.JSON, gson.toJson(json), null, null, 0L);
+ TbMsgMetaData metaData = defaultMetaData.copy();
+ metaData.putValue("ts", entry.getKey()+"");
+ TbMsg tbMsg = new TbMsg(UUIDs.timeBased(), SessionMsgType.POST_TELEMETRY_REQUEST.name(), deviceId, metaData, TbMsgDataType.JSON, gson.toJson(json), null, null, 0L);
pushToRuleEngineWithTimeout(context, tbMsg, msgData);
}
}
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
index 039fdf6..2e8955b 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
@@ -69,7 +69,7 @@ class DefaultTbContext implements TbContext {
@Override
public void tellNext(TbMsg msg, String relationType, Throwable th) {
if (nodeCtx.getSelf().isDebugMode()) {
- mainCtx.persistDebugOutput(nodeCtx.getTenantId(), nodeCtx.getSelf().getId(), msg, th);
+ mainCtx.persistDebugOutput(nodeCtx.getTenantId(), nodeCtx.getSelf().getId(), msg, relationType, th);
}
nodeCtx.getChainActor().tell(new RuleNodeToRuleChainTellNextMsg(nodeCtx.getSelf().getId(), relationType, msg), nodeCtx.getSelfActor());
}
@@ -102,7 +102,7 @@ class DefaultTbContext implements TbContext {
@Override
public void tellError(TbMsg msg, Throwable th) {
if (nodeCtx.getSelf().isDebugMode()) {
- mainCtx.persistDebugOutput(nodeCtx.getTenantId(), nodeCtx.getSelf().getId(), msg, th);
+ mainCtx.persistDebugOutput(nodeCtx.getTenantId(), nodeCtx.getSelf().getId(), msg, "", th);
}
nodeCtx.getSelfActor().tell(new RuleNodeToSelfErrorMsg(msg, th), nodeCtx.getSelfActor());
}
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
index 9c60327..d069cb0 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
@@ -96,12 +96,12 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
private void reprocess(List<RuleNode> ruleNodeList) {
for (RuleNode ruleNode : ruleNodeList) {
for (TbMsg tbMsg : queue.findUnprocessed(ruleNode.getId().getId(), 0L)) {
- pushMsgToNode(nodeActors.get(ruleNode.getId()), tbMsg);
+ pushMsgToNode(nodeActors.get(ruleNode.getId()), tbMsg, "");
}
}
if (firstNode != null) {
for (TbMsg tbMsg : queue.findUnprocessed(entityId.getId(), 0L)) {
- pushMsgToNode(firstNode, tbMsg);
+ pushMsgToNode(firstNode, tbMsg, "");
}
}
}
@@ -183,13 +183,13 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
void onServiceToRuleEngineMsg(ServiceToRuleEngineMsg envelope) {
checkActive();
- putToQueue(enrichWithRuleChainId(envelope.getTbMsg()), msg -> pushMsgToNode(firstNode, msg));
+ putToQueue(enrichWithRuleChainId(envelope.getTbMsg()), msg -> pushMsgToNode(firstNode, msg, ""));
}
void onDeviceActorToRuleEngineMsg(DeviceActorToRuleEngineMsg envelope) {
checkActive();
putToQueue(enrichWithRuleChainId(envelope.getTbMsg()), msg -> {
- pushMsgToNode(firstNode, msg);
+ pushMsgToNode(firstNode, msg, "");
envelope.getCallbackRef().tell(new RuleEngineQueuePutAckMsg(msg.getId()), self);
});
}
@@ -197,9 +197,9 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
void onRuleChainToRuleChainMsg(RuleChainToRuleChainMsg envelope) {
checkActive();
if (envelope.isEnqueue()) {
- putToQueue(enrichWithRuleChainId(envelope.getMsg()), msg -> pushMsgToNode(firstNode, msg));
+ putToQueue(enrichWithRuleChainId(envelope.getMsg()), msg -> pushMsgToNode(firstNode, msg, envelope.getFromRelationType()));
} else {
- pushMsgToNode(firstNode, envelope.getMsg());
+ pushMsgToNode(firstNode, envelope.getMsg(), envelope.getFromRelationType());
}
}
@@ -218,17 +218,17 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
queue.ack(msg, ackId.getId(), msg.getClusterPartition());
} else if (relationsCount == 1) {
for (RuleNodeRelation relation : relations) {
- pushToTarget(msg, relation.getOut());
+ pushToTarget(msg, relation.getOut(), relation.getType());
}
} else {
for (RuleNodeRelation relation : relations) {
EntityId target = relation.getOut();
switch (target.getEntityType()) {
case RULE_NODE:
- enqueueAndForwardMsgCopyToNode(msg, target);
+ enqueueAndForwardMsgCopyToNode(msg, target, relation.getType());
break;
case RULE_CHAIN:
- enqueueAndForwardMsgCopyToChain(msg, target);
+ enqueueAndForwardMsgCopyToChain(msg, target, relation.getType());
break;
}
}
@@ -237,33 +237,33 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
}
}
- private void enqueueAndForwardMsgCopyToChain(TbMsg msg, EntityId target) {
+ private void enqueueAndForwardMsgCopyToChain(TbMsg msg, EntityId target, String fromRelationType) {
RuleChainId targetRCId = new RuleChainId(target.getId());
TbMsg copyMsg = msg.copy(UUIDs.timeBased(), targetRCId, null, DEFAULT_CLUSTER_PARTITION);
- parent.tell(new RuleChainToRuleChainMsg(new RuleChainId(target.getId()), entityId, copyMsg, true), self);
+ parent.tell(new RuleChainToRuleChainMsg(new RuleChainId(target.getId()), entityId, copyMsg, fromRelationType, true), self);
}
- private void enqueueAndForwardMsgCopyToNode(TbMsg msg, EntityId target) {
+ private void enqueueAndForwardMsgCopyToNode(TbMsg msg, EntityId target, String fromRelationType) {
RuleNodeId targetId = new RuleNodeId(target.getId());
RuleNodeCtx targetNodeCtx = nodeActors.get(targetId);
TbMsg copy = msg.copy(UUIDs.timeBased(), entityId, targetId, DEFAULT_CLUSTER_PARTITION);
- putToQueue(copy, queuedMsg -> pushMsgToNode(targetNodeCtx, queuedMsg));
+ putToQueue(copy, queuedMsg -> pushMsgToNode(targetNodeCtx, queuedMsg, fromRelationType));
}
- private void pushToTarget(TbMsg msg, EntityId target) {
+ private void pushToTarget(TbMsg msg, EntityId target, String fromRelationType) {
switch (target.getEntityType()) {
case RULE_NODE:
- pushMsgToNode(nodeActors.get(new RuleNodeId(target.getId())), msg);
+ pushMsgToNode(nodeActors.get(new RuleNodeId(target.getId())), msg, fromRelationType);
break;
case RULE_CHAIN:
- parent.tell(new RuleChainToRuleChainMsg(new RuleChainId(target.getId()), entityId, msg, false), self);
+ parent.tell(new RuleChainToRuleChainMsg(new RuleChainId(target.getId()), entityId, msg, fromRelationType, false), self);
break;
}
}
- private void pushMsgToNode(RuleNodeCtx nodeCtx, TbMsg msg) {
+ private void pushMsgToNode(RuleNodeCtx nodeCtx, TbMsg msg, String fromRelationType) {
if (nodeCtx != null) {
- nodeCtx.getSelfActor().tell(new RuleChainToRuleNodeMsg(new DefaultTbContext(systemContext, nodeCtx), msg), self);
+ nodeCtx.getSelfActor().tell(new RuleChainToRuleNodeMsg(new DefaultTbContext(systemContext, nodeCtx), msg, fromRelationType), self);
}
}
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainToRuleChainMsg.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainToRuleChainMsg.java
index 2b2623b..0861646 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainToRuleChainMsg.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainToRuleChainMsg.java
@@ -31,6 +31,7 @@ public final class RuleChainToRuleChainMsg implements TbActorMsg {
private final RuleChainId target;
private final RuleChainId source;
private final TbMsg msg;
+ private final String fromRelationType;
private final boolean enqueue;
@Override
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainToRuleNodeMsg.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainToRuleNodeMsg.java
index e7d866c..abcddc9 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainToRuleNodeMsg.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainToRuleNodeMsg.java
@@ -29,6 +29,7 @@ final class RuleChainToRuleNodeMsg implements TbActorMsg {
private final TbContext ctx;
private final TbMsg msg;
+ private final String fromRelationType;
@Override
public MsgType getMsgType() {
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActorMessageProcessor.java
index ea857db..f23ba6b 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActorMessageProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActorMessageProcessor.java
@@ -93,7 +93,7 @@ public class RuleNodeActorMessageProcessor extends ComponentMsgProcessor<RuleNod
public void onRuleToSelfMsg(RuleNodeToSelfMsg msg) throws Exception {
checkActive();
if (ruleNode.isDebugMode()) {
- systemContext.persistDebugInput(tenantId, entityId, msg.getMsg());
+ systemContext.persistDebugInput(tenantId, entityId, msg.getMsg(), "Self");
}
tbNode.onMsg(defaultCtx, msg.getMsg());
}
@@ -101,7 +101,7 @@ public class RuleNodeActorMessageProcessor extends ComponentMsgProcessor<RuleNod
void onRuleChainToRuleNodeMsg(RuleChainToRuleNodeMsg msg) throws Exception {
checkActive();
if (ruleNode.isDebugMode()) {
- systemContext.persistDebugInput(tenantId, entityId, msg.getMsg());
+ systemContext.persistDebugInput(tenantId, entityId, msg.getMsg(), msg.getFromRelationType());
}
tbNode.onMsg(msg.getCtx(), msg.getMsg());
}
diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/adaptor/JsonConverter.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/adaptor/JsonConverter.java
index b99c450..e7c1734 100644
--- a/common/transport/src/main/java/org/thingsboard/server/common/transport/adaptor/JsonConverter.java
+++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/adaptor/JsonConverter.java
@@ -36,9 +36,16 @@ public class JsonConverter {
return convertToTelemetry(jsonObject, BasicRequest.DEFAULT_REQUEST_ID);
}
+ public static TelemetryUploadRequest convertToTelemetry(JsonElement jsonObject, long ts) throws JsonSyntaxException {
+ return convertToTelemetry(jsonObject, ts, BasicRequest.DEFAULT_REQUEST_ID);
+ }
+
public static TelemetryUploadRequest convertToTelemetry(JsonElement jsonObject, int requestId) throws JsonSyntaxException {
+ return convertToTelemetry(jsonObject, System.currentTimeMillis(), requestId);
+ }
+
+ private static TelemetryUploadRequest convertToTelemetry(JsonElement jsonObject, long systemTs, int requestId) throws JsonSyntaxException {
BasicTelemetryUploadRequest request = new BasicTelemetryUploadRequest(requestId);
- long systemTs = System.currentTimeMillis();
if (jsonObject.isJsonObject()) {
parseObject(request, systemTs, jsonObject);
} else if (jsonObject.isJsonArray()) {
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsFilterNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsFilterNode.java
index 8ad344f..c77d122 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsFilterNode.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsFilterNode.java
@@ -52,7 +52,7 @@ public class TbJsFilterNode implements TbNode {
public void onMsg(TbContext ctx, TbMsg msg) {
ListeningExecutor jsExecutor = ctx.getJsExecutor();
withCallback(jsExecutor.executeAsync(() -> jsEngine.executeFilter(msg)),
- filterResult -> ctx.tellNext(msg, Boolean.toString(filterResult)),
+ filterResult -> ctx.tellNext(msg, filterResult.booleanValue() ? "True" : "False"),
t -> ctx.tellError(msg, t));
}
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsSwitchNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsSwitchNodeConfiguration.java
index 4c5808b..671132d 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsSwitchNodeConfiguration.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsSwitchNodeConfiguration.java
@@ -31,7 +31,7 @@ public class TbJsSwitchNodeConfiguration implements NodeConfiguration<TbJsSwitch
TbJsSwitchNodeConfiguration configuration = new TbJsSwitchNodeConfiguration();
configuration.setJsScript("function nextRelation(metadata, msg) {\n" +
" return ['one','nine'];\n" +
- "};\n" +
+ "}\n" +
"if(msgType === 'POST_TELEMETRY') {\n" +
" return ['two'];\n" +
"}\n" +
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbMsgTypeFilterNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbMsgTypeFilterNode.java
index df826e6..a183549 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbMsgTypeFilterNode.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbMsgTypeFilterNode.java
@@ -45,7 +45,7 @@ public class TbMsgTypeFilterNode implements TbNode {
@Override
public void onMsg(TbContext ctx, TbMsg msg) throws TbNodeException {
- ctx.tellNext(msg, Boolean.toString(config.getMessageTypes().contains(msg.getType())));
+ ctx.tellNext(msg, config.getMessageTypes().contains(msg.getType()) ? "True" : "False");
}
@Override
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java
index f2632d7..9694b04 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java
@@ -80,8 +80,7 @@ public class TbMqttNode implements TbNode {
this.mqttClient.publish(topic, Unpooled.wrappedBuffer(msg.getData().getBytes(UTF8)), MqttQoS.AT_LEAST_ONCE)
.addListener(future -> {
if (future.isSuccess()) {
- TbMsg next = ctx.transformMsg(msg, msg.getType(), msg.getOriginator(), msg.getMetaData(), msg.getData());
- ctx.tellNext(next, TbRelationTypes.SUCCESS);
+ ctx.tellNext(msg, TbRelationTypes.SUCCESS);
} else {
TbMsg next = processException(ctx, msg, future.cause());
ctx.tellNext(next, TbRelationTypes.FAILURE, future.cause());
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rabbitmq/TbRabbitMqNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rabbitmq/TbRabbitMqNode.java
index 69e7c1d..993b172 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rabbitmq/TbRabbitMqNode.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rabbitmq/TbRabbitMqNode.java
@@ -106,7 +106,7 @@ public class TbRabbitMqNode implements TbNode {
routingKey,
properties,
msg.getData().getBytes(UTF8));
- return ctx.transformMsg(msg, msg.getType(), msg.getOriginator(), msg.getMetaData(), msg.getData());
+ return msg;
}
private TbMsg processException(TbContext ctx, TbMsg origMsg, Throwable t) {
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java
index 114ca27..d91d5d4 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java
@@ -63,12 +63,22 @@ public class TbMsgTimeseriesNode implements TbNode {
ctx.tellError(msg, new IllegalArgumentException("Unsupported msg type: " + msg.getType()));
return;
}
-
+ long ts = -1;
+ String tsStr = msg.getMetaData().getValue("ts");
+ if (!StringUtils.isEmpty(tsStr)) {
+ try {
+ ts = Long.parseLong(tsStr);
+ } catch (NumberFormatException e) {}
+ }
+ if (ts == -1) {
+ ctx.tellError(msg, new IllegalArgumentException("Msg metadata doesn't contain valid ts value: " + msg.getMetaData()));
+ return;
+ }
String src = msg.getData();
- TelemetryUploadRequest telemetryUploadRequest = JsonConverter.convertToTelemetry(new JsonParser().parse(src));
+ TelemetryUploadRequest telemetryUploadRequest = JsonConverter.convertToTelemetry(new JsonParser().parse(src), ts);
Map<Long, List<KvEntry>> tsKvMap = telemetryUploadRequest.getData();
if (tsKvMap == null) {
- ctx.tellError(msg, new IllegalArgumentException("Msg body us empty: " + src));
+ ctx.tellError(msg, new IllegalArgumentException("Msg body is empty: " + src));
return;
}
List<TsKvEntry> tsKvEntryList = new ArrayList<>();
diff --git a/ui/src/app/event/event-header-debug-rulenode.tpl.html b/ui/src/app/event/event-header-debug-rulenode.tpl.html
index 34f4513..bbf6ae0 100644
--- a/ui/src/app/event/event-header-debug-rulenode.tpl.html
+++ b/ui/src/app/event/event-header-debug-rulenode.tpl.html
@@ -21,7 +21,7 @@
<div translate class="tb-cell" flex="15">event.entity</div>
<div translate class="tb-cell" flex="20">event.message-id</div>
<div translate class="tb-cell" flex="20">event.message-type</div>
-<div translate class="tb-cell" flex="15">event.data-type</div>
+<div translate class="tb-cell" flex="15">event.relation-type</div>
<div translate class="tb-cell" flex="10">event.data</div>
<div translate class="tb-cell" flex="10">event.metadata</div>
<div translate class="tb-cell" flex="10">event.error</div>
diff --git a/ui/src/app/event/event-row-debug-rulenode.tpl.html b/ui/src/app/event/event-row-debug-rulenode.tpl.html
index bb832b1..857cd8c 100644
--- a/ui/src/app/event/event-row-debug-rulenode.tpl.html
+++ b/ui/src/app/event/event-row-debug-rulenode.tpl.html
@@ -21,7 +21,7 @@
<div class="tb-cell" flex="15">{{event.body.entityName}}</div>
<div class="tb-cell tb-nowrap" flex="20" ng-mouseenter="checkTooltip($event)">{{event.body.msgId}}</div>
<div class="tb-cell" flex="20" ng-mouseenter="checkTooltip($event)">{{event.body.msgType}}</div>
-<div class="tb-cell" flex="15">{{event.body.dataType}}</div>
+<div class="tb-cell" flex="15">{{event.body.relationType}}</div>
<div class="tb-cell" flex="10">
<md-button ng-if="event.body.data" class="md-icon-button md-primary"
ng-click="showContent($event, event.body.data, 'event.data', event.body.dataType)"
ui/src/app/locale/locale.constant.js 1(+1 -0)
diff --git a/ui/src/app/locale/locale.constant.js b/ui/src/app/locale/locale.constant.js
index 141ac67..fc161e7 100644
--- a/ui/src/app/locale/locale.constant.js
+++ b/ui/src/app/locale/locale.constant.js
@@ -782,6 +782,7 @@ export default angular.module('thingsboard.locale', [])
"message-id": "Message Id",
"message-type": "Message Type",
"data-type": "Data Type",
+ "relation-type": "Relation Type",
"metadata": "Metadata",
"data": "Data",
"event": "Event",