thingsboard-aplcache
Changes
application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java 53(+28 -25)
application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java 84(+54 -30)
application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainToRuleChainMsg.java 40(+40 -0)
application/src/main/java/org/thingsboard/server/actors/shared/ComponentMsgProcessor.java 26(+25 -1)
common/message/src/main/proto/tbmsg.proto 17(+13 -4)
dao/src/main/java/org/thingsboard/server/dao/service/queue/cassandra/CassandraMsgQueue.java 4(+2 -2)
dao/src/test/java/org/thingsboard/server/dao/service/queue/cassandra/repository/impl/CassandraMsgRepositoryTest.java 11(+8 -3)
dao/src/test/java/org/thingsboard/server/dao/service/queue/cassandra/UnprocessedMsgFilterTest.java 4(+2 -2)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbAlarmNode.java 10(+5 -5)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNode.java 8(+4 -4)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mail/TbMsgToEmailNode.java 6(+3 -3)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mail/TbSendEmailNode.java 3(+0 -3)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbChangeOriginatorNode.java 2(+1 -1)
rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbAlarmNodeTest.java 17(+11 -6)
rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/filter/TbJsFilterNodeTest.java 11(+8 -3)
rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/filter/TbJsSwitchNodeTest.java 7(+6 -1)
rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/mail/TbMsgToEmailNodeTest.java 7(+6 -1)
rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/TbGetCustomerAttributeNodeTest.java 22(+14 -8)
Details
diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
index c1fd878..fad7a1b 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
@@ -49,6 +49,7 @@ import org.thingsboard.server.dao.customer.CustomerService;
import org.thingsboard.server.dao.device.DeviceService;
import org.thingsboard.server.dao.event.EventService;
import org.thingsboard.server.dao.plugin.PluginService;
+import org.thingsboard.server.dao.queue.MsgQueue;
import org.thingsboard.server.dao.relation.RelationService;
import org.thingsboard.server.dao.rule.RuleChainService;
import org.thingsboard.server.dao.rule.RuleService;
@@ -187,6 +188,10 @@ public class ActorSystemContext {
@Getter
private MailService mailService;
+ @Autowired
+ @Getter
+ private MsgQueue msgQueue;
+
@Value("${actors.session.sync.timeout}")
@Getter
private long syncSessionTimeout;
diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
index 728fef1..cd043bf 100644
--- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
@@ -204,9 +204,13 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
void processQueueAck(ActorContext context, RuleEngineQueuePutAckMsg msg) {
PendingSessionMsgData data = pendingMsgs.remove(msg.getId());
if (data != null && data.isReplyOnQueueAck()) {
- logger.debug("[{}] Queue put [{}] ack detected!", deviceId, msg.getId());
- ToDeviceMsg toDeviceMsg = BasicStatusCodeResponse.onSuccess(data.getSessionMsgType(), data.getRequestId());
- sendMsgToSessionActor(new BasicToDeviceSessionActorMsg(toDeviceMsg, data.getSessionId()), data.getServerAddress());
+ int remainingAcks = data.getAckMsgCount() - 1;
+ data.setAckMsgCount(remainingAcks);
+ logger.debug("[{}] Queue put [{}] ack detected. Remaining acks: {}!", deviceId, msg.getId(), remainingAcks);
+ if (remainingAcks == 0) {
+ ToDeviceMsg toDeviceMsg = BasicStatusCodeResponse.onSuccess(data.getSessionMsgType(), data.getRequestId());
+ sendMsgToSessionActor(new BasicToDeviceSessionActorMsg(toDeviceMsg, data.getSessionId()), data.getServerAddress());
+ }
}
}
@@ -320,8 +324,10 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
kv.getStrValue().ifPresent(v -> json.addProperty(kv.getKey(), v));
}
- TbMsg tbMsg = new TbMsg(UUIDs.timeBased(), SessionMsgType.POST_ATTRIBUTES_REQUEST.name(), deviceId, defaultMetaData, TbMsgDataType.JSON, gson.toJson(json));
- pushToRuleEngineWithTimeout(context, tbMsg, src, request);
+ TbMsg tbMsg = new TbMsg(UUIDs.timeBased(), SessionMsgType.POST_ATTRIBUTES_REQUEST.name(), deviceId, defaultMetaData, TbMsgDataType.JSON, gson.toJson(json), null, null, 0L);
+ PendingSessionMsgData msgData = new PendingSessionMsgData(src.getSessionId(), src.getServerAddress(),
+ SessionMsgType.POST_ATTRIBUTES_REQUEST, request.getRequestId(), true, 1);
+ pushToRuleEngineWithTimeout(context, tbMsg, msgData);
}
private void handlePostTelemetryRequest(ActorContext context, DeviceToDeviceActorMsg src) {
@@ -329,10 +335,12 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
Map<Long, List<KvEntry>> tsData = request.getData();
- JsonArray json = new JsonArray();
+ PendingSessionMsgData msgData = new PendingSessionMsgData(src.getSessionId(), src.getServerAddress(),
+ SessionMsgType.POST_TELEMETRY_REQUEST, request.getRequestId(), true, tsData.size());
+
for (Map.Entry<Long, List<KvEntry>> entry : tsData.entrySet()) {
- JsonObject ts = new JsonObject();
- ts.addProperty("ts", entry.getKey());
+ JsonObject json = new JsonObject();
+ json.addProperty("ts", entry.getKey());
JsonObject values = new JsonObject();
for (KvEntry kv : entry.getValue()) {
kv.getBooleanValue().ifPresent(v -> values.addProperty(kv.getKey(), v));
@@ -340,12 +348,10 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
kv.getDoubleValue().ifPresent(v -> values.addProperty(kv.getKey(), v));
kv.getStrValue().ifPresent(v -> values.addProperty(kv.getKey(), v));
}
- ts.add("values", values);
- json.add(ts);
+ json.add("values", values);
+ TbMsg tbMsg = new TbMsg(UUIDs.timeBased(), SessionMsgType.POST_TELEMETRY_REQUEST.name(), deviceId, defaultMetaData, TbMsgDataType.JSON, gson.toJson(json), null, null, 0L);
+ pushToRuleEngineWithTimeout(context, tbMsg, msgData);
}
-
- TbMsg tbMsg = new TbMsg(UUIDs.timeBased(), SessionMsgType.POST_TELEMETRY_REQUEST.name(), deviceId, defaultMetaData, TbMsgDataType.JSON, gson.toJson(json));
- pushToRuleEngineWithTimeout(context, tbMsg, src, request);
}
private void handleClientSideRPCRequest(ActorContext context, DeviceToDeviceActorMsg src) {
@@ -357,8 +363,9 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
TbMsgMetaData requestMetaData = defaultMetaData.copy();
requestMetaData.putValue("requestId", Integer.toString(request.getRequestId()));
- TbMsg tbMsg = new TbMsg(UUIDs.timeBased(), SessionMsgType.TO_SERVER_RPC_REQUEST.name(), deviceId, requestMetaData, TbMsgDataType.JSON, gson.toJson(json));
- pushToRuleEngineWithTimeout(context, tbMsg, src, request);
+ TbMsg tbMsg = new TbMsg(UUIDs.timeBased(), SessionMsgType.TO_SERVER_RPC_REQUEST.name(), deviceId, requestMetaData, TbMsgDataType.JSON, gson.toJson(json), null, null, 0L);
+ PendingSessionMsgData msgData = new PendingSessionMsgData(src.getSessionId(), src.getServerAddress(), SessionMsgType.TO_SERVER_RPC_REQUEST, request.getRequestId(), false, 1);
+ pushToRuleEngineWithTimeout(context, tbMsg, msgData);
scheduleMsgWithDelay(context, new DeviceActorClientSideRpcTimeoutMsg(request.getRequestId(), systemContext.getClientSideRpcTimeout()), systemContext.getClientSideRpcTimeout());
toServerRpcPendingMap.put(request.getRequestId(), new ToServerRpcRequestMetadata(src.getSessionId(), src.getSessionType(), src.getServerAddress()));
@@ -380,19 +387,15 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
}
}
- private void pushToRuleEngineWithTimeout(ActorContext context, TbMsg tbMsg, DeviceToDeviceActorMsg src, FromDeviceRequestMsg fromDeviceRequestMsg) {
- pushToRuleEngineWithTimeout(context, tbMsg, src, fromDeviceRequestMsg, true);
- }
-
- private void pushToRuleEngineWithTimeout(ActorContext context, TbMsg tbMsg, DeviceToDeviceActorMsg src, FromDeviceRequestMsg fromDeviceRequestMsg, boolean replyOnAck) {
- SessionMsgType sessionMsgType = fromDeviceRequestMsg.getMsgType();
- int requestId = fromDeviceRequestMsg.getRequestId();
+ private void pushToRuleEngineWithTimeout(ActorContext context, TbMsg tbMsg, PendingSessionMsgData pendingMsgData) {
+ SessionMsgType sessionMsgType = pendingMsgData.getSessionMsgType();
+ int requestId = pendingMsgData.getRequestId();
if (systemContext.isQueuePersistenceEnabled()) {
- pendingMsgs.put(tbMsg.getId(), new PendingSessionMsgData(src.getSessionId(), src.getServerAddress(), sessionMsgType, requestId, replyOnAck));
+ pendingMsgs.put(tbMsg.getId(), pendingMsgData);
scheduleMsgWithDelay(context, new DeviceActorQueueTimeoutMsg(tbMsg.getId(), systemContext.getQueuePersistenceTimeout()), systemContext.getQueuePersistenceTimeout());
} else {
- ToDeviceSessionActorMsg response = new BasicToDeviceSessionActorMsg(BasicStatusCodeResponse.onSuccess(sessionMsgType, requestId), src.getSessionId());
- sendMsgToSessionActor(response, src.getServerAddress());
+ ToDeviceSessionActorMsg response = new BasicToDeviceSessionActorMsg(BasicStatusCodeResponse.onSuccess(sessionMsgType, requestId), pendingMsgData.getSessionId());
+ sendMsgToSessionActor(response, pendingMsgData.getServerAddress());
}
context.parent().tell(new DeviceActorToRuleEngineMsg(context.self(), tbMsg), context.self());
}
diff --git a/application/src/main/java/org/thingsboard/server/actors/device/PendingSessionMsgData.java b/application/src/main/java/org/thingsboard/server/actors/device/PendingSessionMsgData.java
index 2ce05b2..23ad966 100644
--- a/application/src/main/java/org/thingsboard/server/actors/device/PendingSessionMsgData.java
+++ b/application/src/main/java/org/thingsboard/server/actors/device/PendingSessionMsgData.java
@@ -15,6 +15,7 @@
*/
package org.thingsboard.server.actors.device;
+import lombok.AllArgsConstructor;
import lombok.Data;
import org.thingsboard.server.common.data.id.SessionId;
import org.thingsboard.server.common.msg.cluster.ServerAddress;
@@ -26,6 +27,7 @@ import java.util.Optional;
* Created by ashvayka on 17.04.18.
*/
@Data
+@AllArgsConstructor
public final class PendingSessionMsgData {
private final SessionId sessionId;
@@ -33,5 +35,6 @@ public final class PendingSessionMsgData {
private final SessionMsgType sessionMsgType;
private final int requestId;
private final boolean replyOnQueueAck;
+ private int ackMsgCount;
}
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
index bf6132a..16e79b6 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
@@ -239,7 +239,6 @@ class DefaultTbContext implements TbContext {
.build());
});
}
-
};
}
}
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
index f05b9a8..847c494 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
@@ -19,6 +19,7 @@ import akka.actor.ActorContext;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.event.LoggingAdapter;
+import com.datastax.driver.core.utils.UUIDs;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.actors.device.DeviceActorToRuleEngineMsg;
import org.thingsboard.server.actors.device.RuleEngineQueuePutAckMsg;
@@ -41,6 +42,7 @@ import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg;
import org.thingsboard.server.dao.rule.RuleChainService;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -52,6 +54,7 @@ import java.util.stream.Collectors;
*/
public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleChainId> {
+ private static final long DEFAULT_CLUSTER_PARTITION = 0L;
private final ActorRef parent;
private final ActorRef self;
private final Map<RuleNodeId, RuleNodeCtx> nodeActors;
@@ -83,6 +86,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
nodeActors.put(ruleNode.getId(), new RuleNodeCtx(tenantId, self, ruleNodeActor, ruleNode));
}
initRoutes(ruleChain, ruleNodeList);
+ //TODO: read all messages from queues of the actors and push then to the corresponding node actors;
started = true;
} else {
onUpdate(context);
@@ -142,15 +146,19 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
// Populating the routes map;
for (RuleNode ruleNode : ruleNodeList) {
List<EntityRelation> relations = service.getRuleNodeRelations(ruleNode.getId());
- for (EntityRelation relation : relations) {
- if (relation.getTo().getEntityType() == EntityType.RULE_NODE) {
- RuleNodeCtx ruleNodeCtx = nodeActors.get(new RuleNodeId(relation.getTo().getId()));
- if (ruleNodeCtx == null) {
- throw new IllegalArgumentException("Rule Node [" + relation.getFrom() + "] has invalid relation to Rule node [" + relation.getTo() + "]");
+ if (relations.size() == 0) {
+ nodeRoutes.put(ruleNode.getId(), Collections.emptyList());
+ } else {
+ for (EntityRelation relation : relations) {
+ if (relation.getTo().getEntityType() == EntityType.RULE_NODE) {
+ RuleNodeCtx ruleNodeCtx = nodeActors.get(new RuleNodeId(relation.getTo().getId()));
+ if (ruleNodeCtx == null) {
+ throw new IllegalArgumentException("Rule Node [" + relation.getFrom() + "] has invalid relation to Rule node [" + relation.getTo() + "]");
+ }
}
+ nodeRoutes.computeIfAbsent(ruleNode.getId(), k -> new ArrayList<>())
+ .add(new RuleNodeRelation(ruleNode.getId(), relation.getTo(), relation.getType()));
}
- nodeRoutes.computeIfAbsent(ruleNode.getId(), k -> new ArrayList<>())
- .add(new RuleNodeRelation(ruleNode.getId(), relation.getTo(), relation.getType()));
}
}
@@ -161,45 +169,61 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
void onServiceToRuleEngineMsg(ServiceToRuleEngineMsg envelope) {
checkActive();
- TbMsg tbMsg = envelope.getTbMsg();
- //TODO: push to queue and act on ack in async way
- pushMsgToNode(firstNode, tbMsg);
+ putToQueue(envelope.getTbMsg(), msg -> pushMsgToNode(firstNode, msg));
}
- public void onDeviceActorToRuleEngineMsg(DeviceActorToRuleEngineMsg envelope) {
+ void onDeviceActorToRuleEngineMsg(DeviceActorToRuleEngineMsg envelope) {
checkActive();
- TbMsg tbMsg = envelope.getTbMsg();
- //TODO: push to queue and act on ack in async way
- pushMsgToNode(firstNode, tbMsg);
- envelope.getCallbackRef().tell(new RuleEngineQueuePutAckMsg(tbMsg.getId()), self);
+ putToQueue(envelope.getTbMsg(), msg -> {
+ pushMsgToNode(firstNode, msg);
+ envelope.getCallbackRef().tell(new RuleEngineQueuePutAckMsg(msg.getId()), self);
+ });
}
void onTellNext(RuleNodeToRuleChainTellNextMsg envelope) {
checkActive();
RuleNodeId originator = envelope.getOriginator();
String targetRelationType = envelope.getRelationType();
- List<RuleNodeRelation> relations = nodeRoutes.get(originator);
- if (relations == null) {
- return;
- }
- boolean copy = relations.size() > 1;
- for (RuleNodeRelation relation : relations) {
- TbMsg msg = envelope.getMsg();
- if (copy) {
- msg = msg.copy();
+ List<RuleNodeRelation> relations = nodeRoutes.get(originator).stream()
+ .filter(r -> targetRelationType == null || targetRelationType.equals(r.getType()))
+ .collect(Collectors.toList());
+
+ TbMsg msg = envelope.getMsg();
+ int relationsCount = relations.size();
+ if (relationsCount == 0) {
+ queue.ack(msg, msg.getRuleNodeId().getId(), msg.getClusterPartition());
+ } else if (relationsCount == 1) {
+ for (RuleNodeRelation relation : relations) {
+ pushToTarget(msg, relation.getOut());
}
- if (targetRelationType == null || targetRelationType.equalsIgnoreCase(relation.getType())) {
- switch (relation.getOut().getEntityType()) {
+ } else {
+ for (RuleNodeRelation relation : relations) {
+ EntityId target = relation.getOut();
+ switch (target.getEntityType()) {
case RULE_NODE:
- RuleNodeId targetRuleNodeId = new RuleNodeId(relation.getOut().getId());
- RuleNodeCtx targetRuleNode = nodeActors.get(targetRuleNodeId);
- pushMsgToNode(targetRuleNode, msg);
+ RuleNodeId targetId = new RuleNodeId(target.getId());
+ RuleNodeCtx targetNodeCtx = nodeActors.get(targetId);
+ TbMsg copy = msg.copy(UUIDs.timeBased(), entityId, targetId, DEFAULT_CLUSTER_PARTITION);
+ putToQueue(copy, queuedMsg -> pushMsgToNode(targetNodeCtx, queuedMsg));
break;
case RULE_CHAIN:
-// TODO: implement
+ parent.tell(new RuleChainToRuleChainMsg(new RuleChainId(target.getId()), entityId, msg, true), self);
break;
}
}
+ //TODO: Ideally this should happen in async way when all targets confirm that the copied messages are successfully written to corresponding target queues.
+ queue.ack(msg, msg.getRuleNodeId().getId(), msg.getClusterPartition());
+ }
+ }
+
+ private void pushToTarget(TbMsg msg, EntityId target) {
+ switch (target.getEntityType()) {
+ case RULE_NODE:
+ pushMsgToNode(nodeActors.get(new RuleNodeId(target.getId())), msg);
+ break;
+ case RULE_CHAIN:
+ parent.tell(new RuleChainToRuleChainMsg(new RuleChainId(target.getId()), entityId, msg, false), self);
+ break;
}
}
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainToRuleChainMsg.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainToRuleChainMsg.java
new file mode 100644
index 0000000..f0fca21
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainToRuleChainMsg.java
@@ -0,0 +1,40 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.actors.ruleChain;
+
+import lombok.Data;
+import org.thingsboard.rule.engine.api.TbContext;
+import org.thingsboard.server.common.data.id.RuleChainId;
+import org.thingsboard.server.common.msg.MsgType;
+import org.thingsboard.server.common.msg.TbActorMsg;
+import org.thingsboard.server.common.msg.TbMsg;
+
+/**
+ * Created by ashvayka on 19.03.18.
+ */
+@Data
+final class RuleChainToRuleChainMsg implements TbActorMsg {
+
+ private final RuleChainId target;
+ private final RuleChainId source;
+ private final TbMsg msg;
+ private final boolean enqueue;
+
+ @Override
+ public MsgType getMsgType() {
+ return MsgType.RULE_CHAIN_TO_RULE_CHAIN_MSG;
+ }
+}
diff --git a/application/src/main/java/org/thingsboard/server/actors/shared/ComponentMsgProcessor.java b/application/src/main/java/org/thingsboard/server/actors/shared/ComponentMsgProcessor.java
index e25d3a7..a9cf795 100644
--- a/application/src/main/java/org/thingsboard/server/actors/shared/ComponentMsgProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/shared/ComponentMsgProcessor.java
@@ -17,22 +17,32 @@ package org.thingsboard.server.actors.shared;
import akka.actor.ActorContext;
import akka.event.LoggingAdapter;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.actors.stats.StatsPersistTick;
+import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.plugin.ComponentLifecycleState;
+import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
+import org.thingsboard.server.dao.queue.MsgQueue;
-public abstract class ComponentMsgProcessor<T> extends AbstractContextAwareMsgProcessor {
+import javax.annotation.Nullable;
+import java.util.function.Consumer;
+
+public abstract class ComponentMsgProcessor<T extends EntityId> extends AbstractContextAwareMsgProcessor {
protected final TenantId tenantId;
protected final T entityId;
+ protected final MsgQueue queue;
protected ComponentLifecycleState state;
protected ComponentMsgProcessor(ActorSystemContext systemContext, LoggingAdapter logger, TenantId tenantId, T id) {
super(systemContext, logger);
this.tenantId = tenantId;
this.entityId = id;
+ this.queue = systemContext.getMsgQueue();
}
public abstract void start(ActorContext context) throws Exception;
@@ -75,4 +85,18 @@ public abstract class ComponentMsgProcessor<T> extends AbstractContextAwareMsgPr
throw new IllegalStateException("Rule chain is not active!");
}
}
+
+ protected void putToQueue(final TbMsg tbMsg, final Consumer<TbMsg> onSuccess) {
+ Futures.addCallback(queue.put(tbMsg, entityId.getId(), 0), new FutureCallback<Void>() {
+ @Override
+ public void onSuccess(@Nullable Void result) {
+ onSuccess.accept(tbMsg);
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ logger.debug("Failed to push message [{}] to queue due to [{}]", tbMsg, t);
+ }
+ });
+ }
}
diff --git a/application/src/main/java/org/thingsboard/server/controller/RuleChainController.java b/application/src/main/java/org/thingsboard/server/controller/RuleChainController.java
index 48c9cd5..05e7836 100644
--- a/application/src/main/java/org/thingsboard/server/controller/RuleChainController.java
+++ b/application/src/main/java/org/thingsboard/server/controller/RuleChainController.java
@@ -237,7 +237,7 @@ public class RuleChainController extends BaseController {
ScriptEngine engine = null;
try {
engine = new NashornJsEngine(script, functionName, argNames);
- TbMsg inMsg = new TbMsg(UUIDs.timeBased(), msgType, null, new TbMsgMetaData(metadata), data);
+ TbMsg inMsg = new TbMsg(UUIDs.timeBased(), msgType, null, new TbMsgMetaData(metadata), data, null, null, 0L);
switch (scriptType) {
case "update":
output = msgToOutput(engine.executeUpdate(inMsg));
diff --git a/application/src/main/java/org/thingsboard/server/service/script/NashornJsEngine.java b/application/src/main/java/org/thingsboard/server/service/script/NashornJsEngine.java
index d68f6fe..8e689ec 100644
--- a/application/src/main/java/org/thingsboard/server/service/script/NashornJsEngine.java
+++ b/application/src/main/java/org/thingsboard/server/service/script/NashornJsEngine.java
@@ -118,7 +118,7 @@ public class NashornJsEngine implements org.thingsboard.rule.engine.api.ScriptEn
String newData = data != null ? data : msg.getData();
TbMsgMetaData newMetadata = metadata != null ? new TbMsgMetaData(metadata) : msg.getMetaData();
String newMessageType = !StringUtils.isEmpty(messageType) ? messageType : msg.getType();
- return new TbMsg(msg.getId(), newMessageType, msg.getOriginator(), newMetadata, newData);
+ return new TbMsg(msg.getId(), newMessageType, msg.getOriginator(), newMetadata, newData, msg.getRuleChainId(), msg.getRuleNodeId(), msg.getClusterPartition());
} catch (Throwable th) {
th.printStackTrace();
throw new RuntimeException("Failed to unbind message data from javascript result", th);
diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java b/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java
index 74c08da..d63456e 100644
--- a/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java
+++ b/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java
@@ -48,6 +48,11 @@ public enum MsgType {
RULE_CHAIN_TO_RULE_MSG,
/**
+ * Message that is sent by RuleChainActor to other RuleChainActor with command to process TbMsg.
+ */
+ RULE_CHAIN_TO_RULE_CHAIN_MSG,
+
+ /**
* Message that is sent by RuleActor to RuleChainActor with command to process TbMsg by next nodes in chain.
*/
RULE_TO_RULE_CHAIN_TELL_NEXT_MSG,
diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java
index 1c7de3b..dd80bf7 100644
--- a/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java
+++ b/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java
@@ -15,12 +15,13 @@
*/
package org.thingsboard.server.common.msg;
-import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import lombok.AllArgsConstructor;
import lombok.Data;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.EntityIdFactory;
+import org.thingsboard.server.common.data.id.RuleChainId;
+import org.thingsboard.server.common.data.id.RuleNodeId;
import org.thingsboard.server.common.msg.gen.MsgProtos;
import java.io.Serializable;
@@ -41,22 +42,40 @@ public final class TbMsg implements Serializable {
private final TbMsgDataType dataType;
private final String data;
- public TbMsg(UUID id, String type, EntityId originator, TbMsgMetaData metaData, String data) {
+ //The following fields are not persisted to DB, because they can always be recovered from the context;
+ private final RuleChainId ruleChainId;
+ private final RuleNodeId ruleNodeId;
+ private final long clusterPartition;
+
+ public TbMsg(UUID id, String type, EntityId originator, TbMsgMetaData metaData, String data,
+ RuleChainId ruleChainId, RuleNodeId ruleNodeId, long clusterPartition) {
this.id = id;
this.type = type;
this.originator = originator;
this.metaData = metaData;
- this.dataType = TbMsgDataType.JSON;
this.data = data;
+ this.dataType = TbMsgDataType.JSON;
+ this.ruleChainId = ruleChainId;
+ this.ruleNodeId = ruleNodeId;
+ this.clusterPartition = clusterPartition;
}
public static ByteBuffer toBytes(TbMsg msg) {
MsgProtos.TbMsgProto.Builder builder = MsgProtos.TbMsgProto.newBuilder();
builder.setId(msg.getId().toString());
builder.setType(msg.getType());
- if (msg.getOriginator() != null) {
- builder.setEntityType(msg.getOriginator().getEntityType().name());
- builder.setEntityId(msg.getOriginator().getId().toString());
+ builder.setEntityType(msg.getOriginator().getEntityType().name());
+ builder.setEntityIdMSB(msg.getOriginator().getId().getMostSignificantBits());
+ builder.setEntityIdLSB(msg.getOriginator().getId().getLeastSignificantBits());
+
+ if (msg.getRuleChainId() != null) {
+ builder.setRuleChainIdMSB(msg.getRuleChainId().getId().getMostSignificantBits());
+ builder.setRuleChainIdLSB(msg.getRuleChainId().getId().getLeastSignificantBits());
+ }
+
+ if (msg.getRuleNodeId() != null) {
+ builder.setRuleNodeIdMSB(msg.getRuleNodeId().getId().getMostSignificantBits());
+ builder.setRuleNodeIdLSB(msg.getRuleNodeId().getId().getLeastSignificantBits());
}
if (msg.getMetaData() != null) {
@@ -73,15 +92,18 @@ public final class TbMsg implements Serializable {
try {
MsgProtos.TbMsgProto proto = MsgProtos.TbMsgProto.parseFrom(buffer.array());
TbMsgMetaData metaData = new TbMsgMetaData(proto.getMetaData().getDataMap());
- EntityId entityId = EntityIdFactory.getByTypeAndId(proto.getEntityType(), proto.getEntityId());
+ EntityId entityId = EntityIdFactory.getByTypeAndUuid(proto.getEntityType(), new UUID(proto.getEntityIdMSB(), proto.getEntityIdLSB()));
+ RuleChainId ruleChainId = new RuleChainId(new UUID(proto.getRuleChainIdMSB(), proto.getRuleChainIdLSB()));
+ RuleNodeId ruleNodeId = new RuleNodeId(new UUID(proto.getRuleNodeIdMSB(), proto.getRuleChainIdLSB()));
TbMsgDataType dataType = TbMsgDataType.values()[proto.getDataType()];
- return new TbMsg(UUID.fromString(proto.getId()), proto.getType(), entityId, metaData, dataType, proto.getData());
+ return new TbMsg(UUID.fromString(proto.getId()), proto.getType(), entityId, metaData, dataType, proto.getData(), ruleChainId, ruleNodeId, proto.getClusterPartition());
} catch (InvalidProtocolBufferException e) {
throw new IllegalStateException("Could not parse protobuf for TbMsg", e);
}
}
- public TbMsg copy() {
- return new TbMsg(id, type, originator, metaData.copy(), dataType, data);
+ public TbMsg copy(UUID newId, RuleChainId ruleChainId, RuleNodeId ruleNodeId, long clusterPartition) {
+ return new TbMsg(newId, type, originator, metaData, dataType, data, ruleChainId, ruleNodeId, clusterPartition);
}
+
}
common/message/src/main/proto/tbmsg.proto 17(+13 -4)
diff --git a/common/message/src/main/proto/tbmsg.proto b/common/message/src/main/proto/tbmsg.proto
index 4ce1fb6..60003dc 100644
--- a/common/message/src/main/proto/tbmsg.proto
+++ b/common/message/src/main/proto/tbmsg.proto
@@ -27,10 +27,19 @@ message TbMsgProto {
string id = 1;
string type = 2;
string entityType = 3;
- string entityId = 4;
+ int64 entityIdMSB = 4;
+ int64 entityIdLSB = 5;
- TbMsgMetaDataProto metaData = 5;
+ int64 ruleChainIdMSB = 6;
+ int64 ruleChainIdLSB = 7;
+
+ int64 ruleNodeIdMSB = 8;
+ int64 ruleNodeIdLSB = 9;
+ int64 clusterPartition = 10;
+
+ TbMsgMetaDataProto metaData = 11;
+
+ int32 dataType = 12;
+ string data = 13;
- int32 dataType = 6;
- string data = 7;
}
\ No newline at end of file
diff --git a/dao/src/main/java/org/thingsboard/server/dao/queue/QueueBenchmark.java b/dao/src/main/java/org/thingsboard/server/dao/queue/QueueBenchmark.java
index 85f42ae..ef55bcb 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/queue/QueueBenchmark.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/queue/QueueBenchmark.java
@@ -27,10 +27,9 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
-import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
-import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.ComponentScan;
+import org.thingsboard.server.common.data.id.RuleChainId;
+import org.thingsboard.server.common.data.id.RuleNodeId;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgDataType;
import org.thingsboard.server.common.msg.TbMsgMetaData;
@@ -61,11 +60,11 @@ public class QueueBenchmark implements CommandLineRunner {
}
@Autowired
- private MsqQueue msqQueue;
+ private MsgQueue msgQueue;
@Override
public void run(String... strings) throws Exception {
- System.out.println("It works + " + msqQueue);
+ System.out.println("It works + " + msgQueue);
long start = System.currentTimeMillis();
@@ -81,8 +80,8 @@ public class QueueBenchmark implements CommandLineRunner {
try {
TbMsg msg = randomMsg();
UUID nodeId = UUIDs.timeBased();
- ListenableFuture<Void> put = msqQueue.put(msg, nodeId, 100L);
-// ListenableFuture<Void> put = msqQueue.ack(msg, nodeId, 100L);
+ ListenableFuture<Void> put = msgQueue.put(msg, nodeId, 100L);
+// ListenableFuture<Void> put = msgQueue.ack(msg, nodeId, 100L);
Futures.addCallback(put, new FutureCallback<Void>() {
@Override
public void onSuccess(@Nullable Void result) {
@@ -126,7 +125,7 @@ public class QueueBenchmark implements CommandLineRunner {
TbMsgMetaData metaData = new TbMsgMetaData();
metaData.putValue("key", "value");
String dataStr = "someContent";
- return new TbMsg(UUIDs.timeBased(), "type", null, metaData, TbMsgDataType.JSON, dataStr);
+ return new TbMsg(UUIDs.timeBased(), "type", null, metaData, TbMsgDataType.JSON, dataStr, new RuleChainId(UUIDs.timeBased()), new RuleNodeId(UUIDs.timeBased()), 0L);
}
@Bean
diff --git a/dao/src/test/java/org/thingsboard/server/dao/service/queue/cassandra/repository/impl/CassandraMsgRepositoryTest.java b/dao/src/test/java/org/thingsboard/server/dao/service/queue/cassandra/repository/impl/CassandraMsgRepositoryTest.java
index a766aa4..ae252ae 100644
--- a/dao/src/test/java/org/thingsboard/server/dao/service/queue/cassandra/repository/impl/CassandraMsgRepositoryTest.java
+++ b/dao/src/test/java/org/thingsboard/server/dao/service/queue/cassandra/repository/impl/CassandraMsgRepositoryTest.java
@@ -23,6 +23,8 @@ import org.junit.Before;
import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.thingsboard.server.common.data.id.DeviceId;
+import org.thingsboard.server.common.data.id.RuleChainId;
+import org.thingsboard.server.common.data.id.RuleNodeId;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgDataType;
import org.thingsboard.server.common.msg.TbMsgMetaData;
@@ -45,7 +47,8 @@ public class CassandraMsgRepositoryTest extends AbstractServiceTest {
@Test
public void msgCanBeSavedAndRead() throws ExecutionException, InterruptedException {
- TbMsg msg = new TbMsg(UUIDs.timeBased(), "type", new DeviceId(UUIDs.timeBased()), null, TbMsgDataType.JSON, "0000");
+ TbMsg msg = new TbMsg(UUIDs.timeBased(), "type", new DeviceId(UUIDs.timeBased()), null, TbMsgDataType.JSON, "0000",
+ new RuleChainId(UUIDs.timeBased()), new RuleNodeId(UUIDs.timeBased()), 0L);
UUID nodeId = UUIDs.timeBased();
ListenableFuture<Void> future = msgRepository.save(msg, nodeId, 1L, 1L, 1L);
future.get();
@@ -55,7 +58,8 @@ public class CassandraMsgRepositoryTest extends AbstractServiceTest {
@Test
public void expiredMsgsAreNotReturned() throws ExecutionException, InterruptedException {
- TbMsg msg = new TbMsg(UUIDs.timeBased(), "type", new DeviceId(UUIDs.timeBased()), null, TbMsgDataType.JSON, "0000");
+ TbMsg msg = new TbMsg(UUIDs.timeBased(), "type", new DeviceId(UUIDs.timeBased()), null, TbMsgDataType.JSON, "0000",
+ new RuleChainId(UUIDs.timeBased()), new RuleNodeId(UUIDs.timeBased()), 0L);
UUID nodeId = UUIDs.timeBased();
ListenableFuture<Void> future = msgRepository.save(msg, nodeId, 2L, 2L, 2L);
future.get();
@@ -68,7 +72,8 @@ public class CassandraMsgRepositoryTest extends AbstractServiceTest {
TbMsgMetaData metaData = new TbMsgMetaData();
metaData.putValue("key", "value");
String dataStr = "someContent";
- TbMsg msg = new TbMsg(UUIDs.timeBased(), "type", new DeviceId(UUIDs.timeBased()), metaData, TbMsgDataType.JSON, dataStr);
+ TbMsg msg = new TbMsg(UUIDs.timeBased(), "type", new DeviceId(UUIDs.timeBased()), metaData, TbMsgDataType.JSON, dataStr,
+ new RuleChainId(UUIDs.timeBased()), new RuleNodeId(UUIDs.timeBased()), 0L);
UUID nodeId = UUIDs.timeBased();
ListenableFuture<Void> future = msgRepository.save(msg, nodeId, 1L, 1L, 1L);
future.get();
diff --git a/dao/src/test/java/org/thingsboard/server/dao/service/queue/cassandra/UnprocessedMsgFilterTest.java b/dao/src/test/java/org/thingsboard/server/dao/service/queue/cassandra/UnprocessedMsgFilterTest.java
index 3935c9b..caccc85 100644
--- a/dao/src/test/java/org/thingsboard/server/dao/service/queue/cassandra/UnprocessedMsgFilterTest.java
+++ b/dao/src/test/java/org/thingsboard/server/dao/service/queue/cassandra/UnprocessedMsgFilterTest.java
@@ -33,8 +33,8 @@ public class UnprocessedMsgFilterTest {
public void acknowledgedMsgsAreFilteredOut() {
UUID id1 = UUID.randomUUID();
UUID id2 = UUID.randomUUID();
- TbMsg msg1 = new TbMsg(id1, "T", null, null, null, null);
- TbMsg msg2 = new TbMsg(id2, "T", null, null, null, null);
+ TbMsg msg1 = new TbMsg(id1, "T", null, null, null, null, null, null, 0L);
+ TbMsg msg2 = new TbMsg(id2, "T", null, null, null, null, null, null, 0L);
List<TbMsg> msgs = Lists.newArrayList(msg1, msg2);
List<MsgAck> acks = Lists.newArrayList(new MsgAck(id2, UUID.randomUUID(), 1L, 1L));
Collection<TbMsg> actual = msgFilter.filter(msgs, acks);
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbAlarmNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbAlarmNode.java
index a242093..42ad237 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbAlarmNode.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbAlarmNode.java
@@ -91,11 +91,11 @@ public class TbAlarmNode implements TbNode {
if (alarmResult.alarm == null) {
ctx.tellNext(msg, "False");
} else if (alarmResult.isCreated) {
- ctx.tellNext(toAlarmMsg(alarmResult, msg), "Created");
+ ctx.tellNext(toAlarmMsg(ctx, alarmResult, msg), "Created");
} else if (alarmResult.isUpdated) {
- ctx.tellNext(toAlarmMsg(alarmResult, msg), "Updated");
+ ctx.tellNext(toAlarmMsg(ctx, alarmResult, msg), "Updated");
} else if (alarmResult.isCleared) {
- ctx.tellNext(toAlarmMsg(alarmResult, msg), "Cleared");
+ ctx.tellNext(toAlarmMsg(ctx, alarmResult, msg), "Cleared");
}
},
t -> ctx.tellError(msg, t));
@@ -176,7 +176,7 @@ public class TbAlarmNode implements TbNode {
return ctx.getJsExecutor().executeAsync(() -> buildDetailsJsEngine.executeJson(msg));
}
- private TbMsg toAlarmMsg(AlarmResult alarmResult, TbMsg originalMsg) {
+ private TbMsg toAlarmMsg(TbContext ctx, AlarmResult alarmResult, TbMsg originalMsg) {
JsonNode jsonNodes = mapper.valueToTree(alarmResult.alarm);
String data = jsonNodes.toString();
TbMsgMetaData metaData = originalMsg.getMetaData().copy();
@@ -187,7 +187,7 @@ public class TbAlarmNode implements TbNode {
} else if (alarmResult.isCleared) {
metaData.putValue(IS_CLEARED_ALARM, Boolean.TRUE.toString());
}
- return new TbMsg(UUIDs.timeBased(), "ALARM", originalMsg.getOriginator(), metaData, data);
+ return ctx.newMsg("ALARM", originalMsg.getOriginator(), metaData, data);
}
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNode.java
index 65b1371..2e95f4a 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNode.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNode.java
@@ -78,18 +78,18 @@ public class TbMsgGeneratorNode implements TbNode {
}
private void sentTickMsg(TbContext ctx) {
- TbMsg tickMsg = new TbMsg(UUIDs.timeBased(), TB_MSG_GENERATOR_NODE_MSG, ctx.getSelfId(), new TbMsgMetaData(), "");
+ TbMsg tickMsg = ctx.newMsg(TB_MSG_GENERATOR_NODE_MSG, ctx.getSelfId(), new TbMsgMetaData(), "");
nextTickId = tickMsg.getId();
ctx.tellSelf(tickMsg, delay);
}
- protected ListenableFuture<TbMsg> generate(TbContext ctx) {
+ private ListenableFuture<TbMsg> generate(TbContext ctx) {
return ctx.getJsExecutor().executeAsync(() -> {
if (prevMsg == null) {
- prevMsg = new TbMsg(UUIDs.timeBased(), "", originatorId, new TbMsgMetaData(), "{}");
+ prevMsg = ctx.newMsg( "", originatorId, new TbMsgMetaData(), "{}");
}
TbMsg generated = jsEngine.executeGenerate(prevMsg);
- prevMsg = new TbMsg(UUIDs.timeBased(), generated.getType(), originatorId, generated.getMetaData(), generated.getData());
+ prevMsg = ctx.newMsg(generated.getType(), originatorId, generated.getMetaData(), generated.getData());
return prevMsg;
});
}
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mail/TbMsgToEmailNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mail/TbMsgToEmailNode.java
index cae2058..7c7e685 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mail/TbMsgToEmailNode.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mail/TbMsgToEmailNode.java
@@ -76,7 +76,7 @@ public class TbMsgToEmailNode implements TbNode {
public void onMsg(TbContext ctx, TbMsg msg) {
try {
EmailPojo email = convert(msg);
- TbMsg emailMsg = buildEmailMsg(msg, email);
+ TbMsg emailMsg = buildEmailMsg(ctx, msg, email);
ctx.tellNext(emailMsg);
} catch (Exception ex) {
log.warn("Can not convert message to email " + ex.getMessage());
@@ -84,9 +84,9 @@ public class TbMsgToEmailNode implements TbNode {
}
}
- private TbMsg buildEmailMsg(TbMsg msg, EmailPojo email) throws JsonProcessingException {
+ private TbMsg buildEmailMsg(TbContext ctx, TbMsg msg, EmailPojo email) throws JsonProcessingException {
String emailJson = MAPPER.writeValueAsString(email);
- return new TbMsg(UUIDs.timeBased(), SEND_EMAIL_TYPE, msg.getOriginator(), msg.getMetaData().copy(), emailJson);
+ return ctx.newMsg(SEND_EMAIL_TYPE, msg.getOriginator(), msg.getMetaData().copy(), emailJson);
}
private EmailPojo convert(TbMsg msg) throws IOException {
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mail/TbSendEmailNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mail/TbSendEmailNode.java
index 407476b..0b74d6e 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mail/TbSendEmailNode.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mail/TbSendEmailNode.java
@@ -63,8 +63,6 @@ public class TbSendEmailNode implements TbNode {
} catch (Exception ex) {
ctx.tellError(msg, ex);
}
-
-
}
private EmailPojo getEmail(TbMsg msg) throws IOException {
@@ -84,6 +82,5 @@ public class TbSendEmailNode implements TbNode {
@Override
public void destroy() {
-
}
}
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbChangeOriginatorNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbChangeOriginatorNode.java
index 05ad49c..36287fe 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbChangeOriginatorNode.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbChangeOriginatorNode.java
@@ -60,7 +60,7 @@ public class TbChangeOriginatorNode extends TbAbstractTransformNode {
@Override
protected ListenableFuture<TbMsg> transform(TbContext ctx, TbMsg msg) {
ListenableFuture<? extends EntityId> newOriginator = getNewOriginator(ctx, msg.getOriginator());
- return Futures.transform(newOriginator, (Function<EntityId, TbMsg>) n -> new TbMsg(msg.getId(), msg.getType(), n, msg.getMetaData(), msg.getData()));
+ return Futures.transform(newOriginator, (Function<EntityId, TbMsg>) n -> ctx.newMsg(msg.getType(), n, msg.getMetaData(), msg.getData()));
}
private ListenableFuture<? extends EntityId> getNewOriginator(TbContext ctx, EntityId original) {
diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbAlarmNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbAlarmNodeTest.java
index c0aec73..2871303 100644
--- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbAlarmNodeTest.java
+++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbAlarmNodeTest.java
@@ -35,6 +35,8 @@ import org.thingsboard.rule.engine.api.*;
import org.thingsboard.server.common.data.alarm.Alarm;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.EntityId;
+import org.thingsboard.server.common.data.id.RuleChainId;
+import org.thingsboard.server.common.data.id.RuleNodeId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData;
@@ -73,6 +75,9 @@ public class TbAlarmNodeTest {
@Mock
private ScriptEngine detailsJs;
+ private RuleChainId ruleChainId = new RuleChainId(UUIDs.timeBased());
+ private RuleNodeId ruleNodeId = new RuleNodeId(UUIDs.timeBased());
+
private ListeningExecutor dbExecutor;
private EntityId originator = new DeviceId(UUIDs.timeBased());
@@ -103,7 +108,7 @@ public class TbAlarmNodeTest {
public void newAlarmCanBeCreated() throws ScriptException, IOException {
initWithScript();
metaData.putValue("key", "value");
- TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", originator, metaData, rawJson);
+ TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", originator, metaData, rawJson, ruleChainId, ruleNodeId, 0L);
when(createJs.executeFilter(msg)).thenReturn(true);
when(detailsJs.executeJson(msg)).thenReturn(null);
@@ -143,7 +148,7 @@ public class TbAlarmNodeTest {
public void shouldCreateScriptThrowsException() throws ScriptException {
initWithScript();
metaData.putValue("key", "value");
- TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", originator, metaData, rawJson);
+ TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", originator, metaData, rawJson, ruleChainId, ruleNodeId, 0L);
when(createJs.executeFilter(msg)).thenThrow(new NotImplementedException("message"));
@@ -165,7 +170,7 @@ public class TbAlarmNodeTest {
public void buildDetailsThrowsException() throws ScriptException, IOException {
initWithScript();
metaData.putValue("key", "value");
- TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", originator, metaData, rawJson);
+ TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", originator, metaData, rawJson, ruleChainId, ruleNodeId, 0L);
when(createJs.executeFilter(msg)).thenReturn(true);
when(detailsJs.executeJson(msg)).thenThrow(new NotImplementedException("message"));
@@ -191,7 +196,7 @@ public class TbAlarmNodeTest {
public void ifAlarmClearedCreateNew() throws ScriptException, IOException {
initWithScript();
metaData.putValue("key", "value");
- TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", originator, metaData, rawJson);
+ TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", originator, metaData, rawJson, ruleChainId, ruleNodeId, 0L);
Alarm clearedAlarm = Alarm.builder().status(CLEARED_ACK).build();
@@ -233,7 +238,7 @@ public class TbAlarmNodeTest {
public void alarmCanBeUpdated() throws ScriptException, IOException {
initWithScript();
metaData.putValue("key", "value");
- TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", originator, metaData, rawJson);
+ TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", originator, metaData, rawJson, ruleChainId, ruleNodeId, 0L);
long oldEndDate = System.currentTimeMillis();
Alarm activeAlarm = Alarm.builder().type("SomeType").tenantId(tenantId).originator(originator).status(ACTIVE_UNACK).severity(WARNING).endTs(oldEndDate).build();
@@ -279,7 +284,7 @@ public class TbAlarmNodeTest {
public void alarmCanBeCleared() throws ScriptException, IOException {
initWithScript();
metaData.putValue("key", "value");
- TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", originator, metaData, rawJson);
+ TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", originator, metaData, rawJson, ruleChainId, ruleNodeId, 0L);
long oldEndDate = System.currentTimeMillis();
Alarm activeAlarm = Alarm.builder().type("SomeType").tenantId(tenantId).originator(originator).status(ACTIVE_UNACK).severity(WARNING).endTs(oldEndDate).build();
diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/filter/TbJsFilterNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/filter/TbJsFilterNodeTest.java
index 08a22f0..d0b13eb 100644
--- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/filter/TbJsFilterNodeTest.java
+++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/filter/TbJsFilterNodeTest.java
@@ -27,6 +27,8 @@ import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
import org.thingsboard.rule.engine.api.*;
+import org.thingsboard.server.common.data.id.RuleChainId;
+import org.thingsboard.server.common.data.id.RuleNodeId;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData;
@@ -48,10 +50,13 @@ public class TbJsFilterNodeTest {
@Mock
private ScriptEngine scriptEngine;
+ private RuleChainId ruleChainId = new RuleChainId(UUIDs.timeBased());
+ private RuleNodeId ruleNodeId = new RuleNodeId(UUIDs.timeBased());
+
@Test
public void falseEvaluationDoNotSendMsg() throws TbNodeException, ScriptException {
initWithScript();
- TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, new TbMsgMetaData(), "{}");
+ TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, new TbMsgMetaData(), "{}", ruleChainId, ruleNodeId, 0L);
mockJsExecutor();
when(scriptEngine.executeFilter(msg)).thenReturn(false);
@@ -64,7 +69,7 @@ public class TbJsFilterNodeTest {
public void exceptionInJsThrowsException() throws TbNodeException, ScriptException {
initWithScript();
TbMsgMetaData metaData = new TbMsgMetaData();
- TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, "{}");
+ TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, "{}", ruleChainId, ruleNodeId, 0L);
mockJsExecutor();
when(scriptEngine.executeFilter(msg)).thenThrow(new ScriptException("error"));
@@ -77,7 +82,7 @@ public class TbJsFilterNodeTest {
public void metadataConditionCanBeTrue() throws TbNodeException, ScriptException {
initWithScript();
TbMsgMetaData metaData = new TbMsgMetaData();
- TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, "{}");
+ TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, "{}", ruleChainId, ruleNodeId, 0L);
mockJsExecutor();
when(scriptEngine.executeFilter(msg)).thenReturn(true);
diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/filter/TbJsSwitchNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/filter/TbJsSwitchNodeTest.java
index a495124..cc4c297 100644
--- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/filter/TbJsSwitchNodeTest.java
+++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/filter/TbJsSwitchNodeTest.java
@@ -28,6 +28,8 @@ import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
import org.thingsboard.rule.engine.api.*;
+import org.thingsboard.server.common.data.id.RuleChainId;
+import org.thingsboard.server.common.data.id.RuleNodeId;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData;
@@ -51,6 +53,9 @@ public class TbJsSwitchNodeTest {
@Mock
private ScriptEngine scriptEngine;
+ private RuleChainId ruleChainId = new RuleChainId(UUIDs.timeBased());
+ private RuleNodeId ruleNodeId = new RuleNodeId(UUIDs.timeBased());
+
@Test
public void multipleRoutesAreAllowed() throws TbNodeException, ScriptException {
initWithScript();
@@ -59,7 +64,7 @@ public class TbJsSwitchNodeTest {
metaData.putValue("humidity", "99");
String rawJson = "{\"name\": \"Vit\", \"passed\": 5}";
- TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson);
+ TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson, ruleChainId, ruleNodeId, 0L);
mockJsExecutor();
when(scriptEngine.executeSwitch(msg)).thenReturn(Sets.newHashSet("one", "three"));
diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/mail/TbMsgToEmailNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/mail/TbMsgToEmailNodeTest.java
index 877047c..2e5d947 100644
--- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/mail/TbMsgToEmailNodeTest.java
+++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/mail/TbMsgToEmailNodeTest.java
@@ -27,6 +27,8 @@ import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.EntityId;
+import org.thingsboard.server.common.data.id.RuleChainId;
+import org.thingsboard.server.common.data.id.RuleNodeId;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData;
@@ -48,12 +50,15 @@ public class TbMsgToEmailNodeTest {
private TbMsgMetaData metaData = new TbMsgMetaData();
private String rawJson = "{\"name\": \"temp\", \"passed\": 5 , \"complex\": {\"val\":12, \"count\":100}}";
+ private RuleChainId ruleChainId = new RuleChainId(UUIDs.timeBased());
+ private RuleNodeId ruleNodeId = new RuleNodeId(UUIDs.timeBased());
+
@Test
public void msgCanBeConverted() throws IOException {
initWithScript();
metaData.putValue("username", "oreo");
metaData.putValue("userEmail", "user@email.io");
- TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", originator, metaData, rawJson);
+ TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", originator, metaData, rawJson, ruleChainId, ruleNodeId, 0L);
emailNode.onMsg(ctx, msg);
diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/TbGetCustomerAttributeNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/TbGetCustomerAttributeNodeTest.java
index e26312b..158b7cd 100644
--- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/TbGetCustomerAttributeNodeTest.java
+++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/TbGetCustomerAttributeNodeTest.java
@@ -34,6 +34,8 @@ import org.thingsboard.server.common.data.asset.Asset;
import org.thingsboard.server.common.data.id.AssetId;
import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.DeviceId;
+import org.thingsboard.server.common.data.id.RuleChainId;
+import org.thingsboard.server.common.data.id.RuleNodeId;
import org.thingsboard.server.common.data.id.UserId;
import org.thingsboard.server.common.data.kv.*;
import org.thingsboard.server.common.msg.TbMsg;
@@ -77,6 +79,9 @@ public class TbGetCustomerAttributeNodeTest {
private TbMsg msg;
+ private RuleChainId ruleChainId = new RuleChainId(UUIDs.timeBased());
+ private RuleNodeId ruleNodeId = new RuleNodeId(UUIDs.timeBased());
+
@Before
public void init() throws TbNodeException {
TbGetEntityAttrNodeConfiguration config = new TbGetEntityAttrNodeConfiguration();
@@ -98,7 +103,8 @@ public class TbGetCustomerAttributeNodeTest {
User user = new User();
user.setCustomerId(customerId);
- msg = new TbMsg(UUIDs.timeBased(), "USER", userId, new TbMsgMetaData(), "{}");
+
+ msg = new TbMsg(UUIDs.timeBased(), "USER", userId, new TbMsgMetaData(), "{}", ruleChainId, ruleNodeId, 0L);
when(ctx.getUserService()).thenReturn(userService);
when(userService.findUserByIdAsync(userId)).thenReturn(Futures.immediateFuture(user));
@@ -123,7 +129,7 @@ public class TbGetCustomerAttributeNodeTest {
User user = new User();
user.setCustomerId(customerId);
- msg = new TbMsg(UUIDs.timeBased(), "USER", userId, new TbMsgMetaData(), "{}");
+ msg = new TbMsg(UUIDs.timeBased(), "USER", userId, new TbMsgMetaData(), "{}", ruleChainId, ruleNodeId, 0L);
when(ctx.getUserService()).thenReturn(userService);
when(userService.findUserByIdAsync(userId)).thenReturn(Futures.immediateFuture(user));
@@ -148,7 +154,7 @@ public class TbGetCustomerAttributeNodeTest {
User user = new User();
user.setCustomerId(customerId);
- msg = new TbMsg(UUIDs.timeBased(), "USER", userId, new TbMsgMetaData(), "{}");
+ msg = new TbMsg(UUIDs.timeBased(), "USER", userId, new TbMsgMetaData(), "{}", ruleChainId, ruleNodeId, 0L);
when(ctx.getUserService()).thenReturn(userService);
when(userService.findUserByIdAsync(userId)).thenReturn(Futures.immediateFuture(null));
@@ -166,7 +172,7 @@ public class TbGetCustomerAttributeNodeTest {
@Test
public void customerAttributeAddedInMetadata() {
CustomerId customerId = new CustomerId(UUIDs.timeBased());
- msg = new TbMsg(UUIDs.timeBased(), "CUSTOMER", customerId, new TbMsgMetaData(), "{}");
+ msg = new TbMsg(UUIDs.timeBased(), "CUSTOMER", customerId, new TbMsgMetaData(), "{}", ruleChainId, ruleNodeId, 0L);
entityAttributeFetched(customerId);
}
@@ -177,7 +183,7 @@ public class TbGetCustomerAttributeNodeTest {
User user = new User();
user.setCustomerId(customerId);
- msg = new TbMsg(UUIDs.timeBased(), "USER", userId, new TbMsgMetaData(), "{}");
+ msg = new TbMsg(UUIDs.timeBased(), "USER", userId, new TbMsgMetaData(), "{}", ruleChainId, ruleNodeId, 0L);
when(ctx.getUserService()).thenReturn(userService);
when(userService.findUserByIdAsync(userId)).thenReturn(Futures.immediateFuture(user));
@@ -192,7 +198,7 @@ public class TbGetCustomerAttributeNodeTest {
Asset asset = new Asset();
asset.setCustomerId(customerId);
- msg = new TbMsg(UUIDs.timeBased(), "USER", assetId, new TbMsgMetaData(), "{}");
+ msg = new TbMsg(UUIDs.timeBased(), "USER", assetId, new TbMsgMetaData(), "{}", ruleChainId, ruleNodeId, 0L);
when(ctx.getAssetService()).thenReturn(assetService);
when(assetService.findAssetByIdAsync(assetId)).thenReturn(Futures.immediateFuture(asset));
@@ -207,7 +213,7 @@ public class TbGetCustomerAttributeNodeTest {
Device device = new Device();
device.setCustomerId(customerId);
- msg = new TbMsg(UUIDs.timeBased(), "USER", deviceId, new TbMsgMetaData(), "{}");
+ msg = new TbMsg(UUIDs.timeBased(), "USER", deviceId, new TbMsgMetaData(), "{}", ruleChainId, ruleNodeId, 0L);
when(ctx.getDeviceService()).thenReturn(deviceService);
when(deviceService.findDeviceByIdAsync(deviceId)).thenReturn(Futures.immediateFuture(device));
@@ -234,7 +240,7 @@ public class TbGetCustomerAttributeNodeTest {
Device device = new Device();
device.setCustomerId(customerId);
- msg = new TbMsg(UUIDs.timeBased(), "USER", deviceId, new TbMsgMetaData(), "{}");
+ msg = new TbMsg(UUIDs.timeBased(), "USER", deviceId, new TbMsgMetaData(), "{}", ruleChainId, ruleNodeId, 0L);
when(ctx.getDeviceService()).thenReturn(deviceService);
when(deviceService.findDeviceByIdAsync(deviceId)).thenReturn(Futures.immediateFuture(device));
diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbChangeOriginatorNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbChangeOriginatorNodeTest.java
index 1b66433..7f864ba 100644
--- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbChangeOriginatorNodeTest.java
+++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbChangeOriginatorNodeTest.java
@@ -29,6 +29,8 @@ import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.server.common.data.asset.Asset;
import org.thingsboard.server.common.data.id.AssetId;
import org.thingsboard.server.common.data.id.CustomerId;
+import org.thingsboard.server.common.data.id.RuleChainId;
+import org.thingsboard.server.common.data.id.RuleNodeId;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.dao.asset.AssetService;
@@ -57,7 +59,10 @@ public class TbChangeOriginatorNodeTest {
Asset asset = new Asset();
asset.setCustomerId(customerId);
- TbMsg msg = new TbMsg(UUIDs.timeBased(), "ASSET", assetId, new TbMsgMetaData(), "{}");
+ RuleChainId ruleChainId = new RuleChainId(UUIDs.timeBased());
+ RuleNodeId ruleNodeId = new RuleNodeId(UUIDs.timeBased());
+
+ TbMsg msg = new TbMsg(UUIDs.timeBased(), "ASSET", assetId, new TbMsgMetaData(), "{}", ruleChainId, ruleNodeId, 0L);
when(ctx.getAssetService()).thenReturn(assetService);
when(assetService.findAssetByIdAsync(assetId)).thenReturn(Futures.immediateFuture(asset));
@@ -78,7 +83,10 @@ public class TbChangeOriginatorNodeTest {
Asset asset = new Asset();
asset.setCustomerId(customerId);
- TbMsg msg = new TbMsg(UUIDs.timeBased(), "ASSET", assetId, new TbMsgMetaData(), "{}");
+ RuleChainId ruleChainId = new RuleChainId(UUIDs.timeBased());
+ RuleNodeId ruleNodeId = new RuleNodeId(UUIDs.timeBased());
+
+ TbMsg msg = new TbMsg(UUIDs.timeBased(), "ASSET", assetId, new TbMsgMetaData(), "{}", ruleChainId, ruleNodeId, 0L);
when(ctx.getAssetService()).thenReturn(assetService);
when(assetService.findAssetByIdAsync(assetId)).thenReturn(Futures.immediateFuture(asset));
@@ -99,7 +107,10 @@ public class TbChangeOriginatorNodeTest {
Asset asset = new Asset();
asset.setCustomerId(customerId);
- TbMsg msg = new TbMsg(UUIDs.timeBased(), "ASSET", assetId, new TbMsgMetaData(), "{}");
+ RuleChainId ruleChainId = new RuleChainId(UUIDs.timeBased());
+ RuleNodeId ruleNodeId = new RuleNodeId(UUIDs.timeBased());
+
+ TbMsg msg = new TbMsg(UUIDs.timeBased(), "ASSET", assetId, new TbMsgMetaData(), "{}", ruleChainId, ruleNodeId, 0L);
when(ctx.getAssetService()).thenReturn(assetService);
when(assetService.findAssetByIdAsync(assetId)).thenReturn(Futures.immediateFailedFuture(new IllegalStateException("wrong")));
diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbTransformMsgNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbTransformMsgNodeTest.java
index b904d7e..a29bc90 100644
--- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbTransformMsgNodeTest.java
+++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbTransformMsgNodeTest.java
@@ -27,6 +27,8 @@ import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
import org.thingsboard.rule.engine.api.*;
+import org.thingsboard.server.common.data.id.RuleChainId;
+import org.thingsboard.server.common.data.id.RuleNodeId;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData;
@@ -56,8 +58,10 @@ public class TbTransformMsgNodeTest {
metaData.putValue("temp", "7");
String rawJson = "{\"passed\": 5}";
- TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson);
- TbMsg transformedMsg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, "{new}");
+ RuleChainId ruleChainId = new RuleChainId(UUIDs.timeBased());
+ RuleNodeId ruleNodeId = new RuleNodeId(UUIDs.timeBased());
+ TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson, ruleChainId, ruleNodeId, 0L);
+ TbMsg transformedMsg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, "{new}", ruleChainId, ruleNodeId, 0L);
mockJsExecutor();
when(scriptEngine.executeUpdate(msg)).thenReturn(transformedMsg);
@@ -77,8 +81,10 @@ public class TbTransformMsgNodeTest {
metaData.putValue("temp", "7");
String rawJson = "{\"passed\": 5";
- TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson);
- TbMsg transformedMsg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, "{new}");
+ RuleChainId ruleChainId = new RuleChainId(UUIDs.timeBased());
+ RuleNodeId ruleNodeId = new RuleNodeId(UUIDs.timeBased());
+ TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson, ruleChainId, ruleNodeId, 0L);
+ TbMsg transformedMsg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, "{new}", ruleChainId, ruleNodeId, 0L);
mockJsExecutor();
when(scriptEngine.executeUpdate(msg)).thenReturn(transformedMsg);
@@ -97,7 +103,9 @@ public class TbTransformMsgNodeTest {
metaData.putValue("temp", "7");
String rawJson = "{\"passed\": 5";
- TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson);
+ RuleChainId ruleChainId = new RuleChainId(UUIDs.timeBased());
+ RuleNodeId ruleNodeId = new RuleNodeId(UUIDs.timeBased());
+ TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson, ruleChainId, ruleNodeId, 0L);
mockJsExecutor();
when(scriptEngine.executeUpdate(msg)).thenThrow(new IllegalStateException("error"));