thingsboard-aplcache

Changes

Details

diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
index c1fd878..fad7a1b 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
@@ -49,6 +49,7 @@ import org.thingsboard.server.dao.customer.CustomerService;
 import org.thingsboard.server.dao.device.DeviceService;
 import org.thingsboard.server.dao.event.EventService;
 import org.thingsboard.server.dao.plugin.PluginService;
+import org.thingsboard.server.dao.queue.MsgQueue;
 import org.thingsboard.server.dao.relation.RelationService;
 import org.thingsboard.server.dao.rule.RuleChainService;
 import org.thingsboard.server.dao.rule.RuleService;
@@ -187,6 +188,10 @@ public class ActorSystemContext {
     @Getter
     private MailService mailService;
 
+    @Autowired
+    @Getter
+    private MsgQueue msgQueue;
+
     @Value("${actors.session.sync.timeout}")
     @Getter
     private long syncSessionTimeout;
diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
index 728fef1..cd043bf 100644
--- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
@@ -204,9 +204,13 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
     void processQueueAck(ActorContext context, RuleEngineQueuePutAckMsg msg) {
         PendingSessionMsgData data = pendingMsgs.remove(msg.getId());
         if (data != null && data.isReplyOnQueueAck()) {
-            logger.debug("[{}] Queue put [{}] ack detected!", deviceId, msg.getId());
-            ToDeviceMsg toDeviceMsg = BasicStatusCodeResponse.onSuccess(data.getSessionMsgType(), data.getRequestId());
-            sendMsgToSessionActor(new BasicToDeviceSessionActorMsg(toDeviceMsg, data.getSessionId()), data.getServerAddress());
+            int remainingAcks = data.getAckMsgCount() - 1;
+            data.setAckMsgCount(remainingAcks);
+            logger.debug("[{}] Queue put [{}] ack detected. Remaining acks: {}!", deviceId, msg.getId(), remainingAcks);
+            if (remainingAcks == 0) {
+                ToDeviceMsg toDeviceMsg = BasicStatusCodeResponse.onSuccess(data.getSessionMsgType(), data.getRequestId());
+                sendMsgToSessionActor(new BasicToDeviceSessionActorMsg(toDeviceMsg, data.getSessionId()), data.getServerAddress());
+            }
         }
     }
 
@@ -320,8 +324,10 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
             kv.getStrValue().ifPresent(v -> json.addProperty(kv.getKey(), v));
         }
 
-        TbMsg tbMsg = new TbMsg(UUIDs.timeBased(), SessionMsgType.POST_ATTRIBUTES_REQUEST.name(), deviceId, defaultMetaData, TbMsgDataType.JSON, gson.toJson(json));
-        pushToRuleEngineWithTimeout(context, tbMsg, src, request);
+        TbMsg tbMsg = new TbMsg(UUIDs.timeBased(), SessionMsgType.POST_ATTRIBUTES_REQUEST.name(), deviceId, defaultMetaData, TbMsgDataType.JSON, gson.toJson(json), null, null, 0L);
+        PendingSessionMsgData msgData = new PendingSessionMsgData(src.getSessionId(), src.getServerAddress(),
+                SessionMsgType.POST_ATTRIBUTES_REQUEST, request.getRequestId(), true, 1);
+        pushToRuleEngineWithTimeout(context, tbMsg, msgData);
     }
 
     private void handlePostTelemetryRequest(ActorContext context, DeviceToDeviceActorMsg src) {
@@ -329,10 +335,12 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
 
         Map<Long, List<KvEntry>> tsData = request.getData();
 
-        JsonArray json = new JsonArray();
+        PendingSessionMsgData msgData = new PendingSessionMsgData(src.getSessionId(), src.getServerAddress(),
+                SessionMsgType.POST_TELEMETRY_REQUEST, request.getRequestId(), true, tsData.size());
+
         for (Map.Entry<Long, List<KvEntry>> entry : tsData.entrySet()) {
-            JsonObject ts = new JsonObject();
-            ts.addProperty("ts", entry.getKey());
+            JsonObject json = new JsonObject();
+            json.addProperty("ts", entry.getKey());
             JsonObject values = new JsonObject();
             for (KvEntry kv : entry.getValue()) {
                 kv.getBooleanValue().ifPresent(v -> values.addProperty(kv.getKey(), v));
@@ -340,12 +348,10 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
                 kv.getDoubleValue().ifPresent(v -> values.addProperty(kv.getKey(), v));
                 kv.getStrValue().ifPresent(v -> values.addProperty(kv.getKey(), v));
             }
-            ts.add("values", values);
-            json.add(ts);
+            json.add("values", values);
+            TbMsg tbMsg = new TbMsg(UUIDs.timeBased(), SessionMsgType.POST_TELEMETRY_REQUEST.name(), deviceId, defaultMetaData, TbMsgDataType.JSON, gson.toJson(json), null, null, 0L);
+            pushToRuleEngineWithTimeout(context, tbMsg, msgData);
         }
-
-        TbMsg tbMsg = new TbMsg(UUIDs.timeBased(), SessionMsgType.POST_TELEMETRY_REQUEST.name(), deviceId, defaultMetaData, TbMsgDataType.JSON, gson.toJson(json));
-        pushToRuleEngineWithTimeout(context, tbMsg, src, request);
     }
 
     private void handleClientSideRPCRequest(ActorContext context, DeviceToDeviceActorMsg src) {
@@ -357,8 +363,9 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
 
         TbMsgMetaData requestMetaData = defaultMetaData.copy();
         requestMetaData.putValue("requestId", Integer.toString(request.getRequestId()));
-        TbMsg tbMsg = new TbMsg(UUIDs.timeBased(), SessionMsgType.TO_SERVER_RPC_REQUEST.name(), deviceId, requestMetaData, TbMsgDataType.JSON, gson.toJson(json));
-        pushToRuleEngineWithTimeout(context, tbMsg, src, request);
+        TbMsg tbMsg = new TbMsg(UUIDs.timeBased(), SessionMsgType.TO_SERVER_RPC_REQUEST.name(), deviceId, requestMetaData, TbMsgDataType.JSON, gson.toJson(json), null, null, 0L);
+        PendingSessionMsgData msgData = new PendingSessionMsgData(src.getSessionId(), src.getServerAddress(), SessionMsgType.TO_SERVER_RPC_REQUEST, request.getRequestId(), false, 1);
+        pushToRuleEngineWithTimeout(context, tbMsg, msgData);
 
         scheduleMsgWithDelay(context, new DeviceActorClientSideRpcTimeoutMsg(request.getRequestId(), systemContext.getClientSideRpcTimeout()), systemContext.getClientSideRpcTimeout());
         toServerRpcPendingMap.put(request.getRequestId(), new ToServerRpcRequestMetadata(src.getSessionId(), src.getSessionType(), src.getServerAddress()));
@@ -380,19 +387,15 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
         }
     }
 
-    private void pushToRuleEngineWithTimeout(ActorContext context, TbMsg tbMsg, DeviceToDeviceActorMsg src, FromDeviceRequestMsg fromDeviceRequestMsg) {
-        pushToRuleEngineWithTimeout(context, tbMsg, src, fromDeviceRequestMsg, true);
-    }
-
-    private void pushToRuleEngineWithTimeout(ActorContext context, TbMsg tbMsg, DeviceToDeviceActorMsg src, FromDeviceRequestMsg fromDeviceRequestMsg, boolean replyOnAck) {
-        SessionMsgType sessionMsgType = fromDeviceRequestMsg.getMsgType();
-        int requestId = fromDeviceRequestMsg.getRequestId();
+    private void pushToRuleEngineWithTimeout(ActorContext context, TbMsg tbMsg, PendingSessionMsgData pendingMsgData) {
+        SessionMsgType sessionMsgType = pendingMsgData.getSessionMsgType();
+        int requestId = pendingMsgData.getRequestId();
         if (systemContext.isQueuePersistenceEnabled()) {
-            pendingMsgs.put(tbMsg.getId(), new PendingSessionMsgData(src.getSessionId(), src.getServerAddress(), sessionMsgType, requestId, replyOnAck));
+            pendingMsgs.put(tbMsg.getId(), pendingMsgData);
             scheduleMsgWithDelay(context, new DeviceActorQueueTimeoutMsg(tbMsg.getId(), systemContext.getQueuePersistenceTimeout()), systemContext.getQueuePersistenceTimeout());
         } else {
-            ToDeviceSessionActorMsg response = new BasicToDeviceSessionActorMsg(BasicStatusCodeResponse.onSuccess(sessionMsgType, requestId), src.getSessionId());
-            sendMsgToSessionActor(response, src.getServerAddress());
+            ToDeviceSessionActorMsg response = new BasicToDeviceSessionActorMsg(BasicStatusCodeResponse.onSuccess(sessionMsgType, requestId), pendingMsgData.getSessionId());
+            sendMsgToSessionActor(response, pendingMsgData.getServerAddress());
         }
         context.parent().tell(new DeviceActorToRuleEngineMsg(context.self(), tbMsg), context.self());
     }
diff --git a/application/src/main/java/org/thingsboard/server/actors/device/PendingSessionMsgData.java b/application/src/main/java/org/thingsboard/server/actors/device/PendingSessionMsgData.java
index 2ce05b2..23ad966 100644
--- a/application/src/main/java/org/thingsboard/server/actors/device/PendingSessionMsgData.java
+++ b/application/src/main/java/org/thingsboard/server/actors/device/PendingSessionMsgData.java
@@ -15,6 +15,7 @@
  */
 package org.thingsboard.server.actors.device;
 
+import lombok.AllArgsConstructor;
 import lombok.Data;
 import org.thingsboard.server.common.data.id.SessionId;
 import org.thingsboard.server.common.msg.cluster.ServerAddress;
@@ -26,6 +27,7 @@ import java.util.Optional;
  * Created by ashvayka on 17.04.18.
  */
 @Data
+@AllArgsConstructor
 public final class PendingSessionMsgData {
 
     private final SessionId sessionId;
@@ -33,5 +35,6 @@ public final class PendingSessionMsgData {
     private final SessionMsgType sessionMsgType;
     private final int requestId;
     private final boolean replyOnQueueAck;
+    private int ackMsgCount;
 
 }
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
index bf6132a..40b32d3 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
@@ -53,7 +53,6 @@ import java.util.function.Consumer;
  */
 class DefaultTbContext implements TbContext {
 
-    private static final Function<? super List<Void>, ? extends Void> LIST_VOID_FUNCTION = v -> null;
     private final ActorSystemContext mainCtx;
     private final RuleNodeCtx nodeCtx;
 
@@ -120,7 +119,7 @@ class DefaultTbContext implements TbContext {
 
     @Override
     public TbMsg newMsg(String type, EntityId originator, TbMsgMetaData metaData, String data) {
-        return new TbMsg(UUIDs.timeBased(), type, originator, metaData, data);
+        return new TbMsg(UUIDs.timeBased(), type, originator, metaData, data, nodeCtx.getSelf().getRuleChainId(), nodeCtx.getSelf().getId(), 0L);
     }
 
     @Override
@@ -239,7 +238,6 @@ class DefaultTbContext implements TbContext {
                             .build());
                 });
             }
-
         };
     }
 }
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
index f05b9a8..fb7bc49 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
@@ -19,6 +19,7 @@ import akka.actor.ActorContext;
 import akka.actor.ActorRef;
 import akka.actor.Props;
 import akka.event.LoggingAdapter;
+import com.datastax.driver.core.utils.UUIDs;
 import org.thingsboard.server.actors.ActorSystemContext;
 import org.thingsboard.server.actors.device.DeviceActorToRuleEngineMsg;
 import org.thingsboard.server.actors.device.RuleEngineQueuePutAckMsg;
@@ -41,6 +42,7 @@ import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg;
 import org.thingsboard.server.dao.rule.RuleChainService;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -52,6 +54,7 @@ import java.util.stream.Collectors;
  */
 public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleChainId> {
 
+    private static final long DEFAULT_CLUSTER_PARTITION = 0L;
     private final ActorRef parent;
     private final ActorRef self;
     private final Map<RuleNodeId, RuleNodeCtx> nodeActors;
@@ -83,6 +86,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
                 nodeActors.put(ruleNode.getId(), new RuleNodeCtx(tenantId, self, ruleNodeActor, ruleNode));
             }
             initRoutes(ruleChain, ruleNodeList);
+            //TODO: read all messages from queues of the actors and push then to the corresponding node actors;
             started = true;
         } else {
             onUpdate(context);
@@ -142,15 +146,19 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
         // Populating the routes map;
         for (RuleNode ruleNode : ruleNodeList) {
             List<EntityRelation> relations = service.getRuleNodeRelations(ruleNode.getId());
-            for (EntityRelation relation : relations) {
-                if (relation.getTo().getEntityType() == EntityType.RULE_NODE) {
-                    RuleNodeCtx ruleNodeCtx = nodeActors.get(new RuleNodeId(relation.getTo().getId()));
-                    if (ruleNodeCtx == null) {
-                        throw new IllegalArgumentException("Rule Node [" + relation.getFrom() + "] has invalid relation to Rule node [" + relation.getTo() + "]");
+            if (relations.size() == 0) {
+                nodeRoutes.put(ruleNode.getId(), Collections.emptyList());
+            } else {
+                for (EntityRelation relation : relations) {
+                    if (relation.getTo().getEntityType() == EntityType.RULE_NODE) {
+                        RuleNodeCtx ruleNodeCtx = nodeActors.get(new RuleNodeId(relation.getTo().getId()));
+                        if (ruleNodeCtx == null) {
+                            throw new IllegalArgumentException("Rule Node [" + relation.getFrom() + "] has invalid relation to Rule node [" + relation.getTo() + "]");
+                        }
                     }
+                    nodeRoutes.computeIfAbsent(ruleNode.getId(), k -> new ArrayList<>())
+                            .add(new RuleNodeRelation(ruleNode.getId(), relation.getTo(), relation.getType()));
                 }
-                nodeRoutes.computeIfAbsent(ruleNode.getId(), k -> new ArrayList<>())
-                        .add(new RuleNodeRelation(ruleNode.getId(), relation.getTo(), relation.getType()));
             }
         }
 
@@ -161,45 +169,62 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
 
     void onServiceToRuleEngineMsg(ServiceToRuleEngineMsg envelope) {
         checkActive();
-        TbMsg tbMsg = envelope.getTbMsg();
-        //TODO: push to queue and act on ack in async way
-        pushMsgToNode(firstNode, tbMsg);
+        putToQueue(enrichWithRuleChainId(envelope.getTbMsg()), msg -> pushMsgToNode(firstNode, msg));
     }
 
-    public void onDeviceActorToRuleEngineMsg(DeviceActorToRuleEngineMsg envelope) {
+    void onDeviceActorToRuleEngineMsg(DeviceActorToRuleEngineMsg envelope) {
         checkActive();
-        TbMsg tbMsg = envelope.getTbMsg();
-        //TODO: push to queue and act on ack in async way
-        pushMsgToNode(firstNode, tbMsg);
-        envelope.getCallbackRef().tell(new RuleEngineQueuePutAckMsg(tbMsg.getId()), self);
+        putToQueue(enrichWithRuleChainId(envelope.getTbMsg()), msg -> {
+            pushMsgToNode(firstNode, msg);
+            envelope.getCallbackRef().tell(new RuleEngineQueuePutAckMsg(msg.getId()), self);
+        });
     }
 
     void onTellNext(RuleNodeToRuleChainTellNextMsg envelope) {
         checkActive();
         RuleNodeId originator = envelope.getOriginator();
         String targetRelationType = envelope.getRelationType();
-        List<RuleNodeRelation> relations = nodeRoutes.get(originator);
-        if (relations == null) {
-            return;
-        }
-        boolean copy = relations.size() > 1;
-        for (RuleNodeRelation relation : relations) {
-            TbMsg msg = envelope.getMsg();
-            if (copy) {
-                msg = msg.copy();
+        List<RuleNodeRelation> relations = nodeRoutes.get(originator).stream()
+                .filter(r -> targetRelationType == null || targetRelationType.equalsIgnoreCase(r.getType()))
+                .collect(Collectors.toList());
+
+        TbMsg msg = envelope.getMsg();
+        int relationsCount = relations.size();
+        if (relationsCount == 0) {
+            queue.ack(msg, msg.getRuleNodeId().getId(), msg.getClusterPartition());
+        } else if (relationsCount == 1) {
+            for (RuleNodeRelation relation : relations) {
+                pushToTarget(msg, relation.getOut());
             }
-            if (targetRelationType == null || targetRelationType.equalsIgnoreCase(relation.getType())) {
-                switch (relation.getOut().getEntityType()) {
+        } else {
+            for (RuleNodeRelation relation : relations) {
+                EntityId target = relation.getOut();
+                switch (target.getEntityType()) {
                     case RULE_NODE:
-                        RuleNodeId targetRuleNodeId = new RuleNodeId(relation.getOut().getId());
-                        RuleNodeCtx targetRuleNode = nodeActors.get(targetRuleNodeId);
-                        pushMsgToNode(targetRuleNode, msg);
+                        RuleNodeId targetId = new RuleNodeId(target.getId());
+                        RuleNodeCtx targetNodeCtx = nodeActors.get(targetId);
+                        TbMsg copy = msg.copy(UUIDs.timeBased(), entityId, targetId, DEFAULT_CLUSTER_PARTITION);
+                        putToQueue(copy, queuedMsg -> pushMsgToNode(targetNodeCtx, queuedMsg));
                         break;
                     case RULE_CHAIN:
-//                        TODO: implement
+                        parent.tell(new RuleChainToRuleChainMsg(new RuleChainId(target.getId()), entityId, msg, true), self);
                         break;
                 }
             }
+            //TODO: Ideally this should happen in async way when all targets confirm that the copied messages are successfully written to corresponding target queues.
+            EntityId ackId = msg.getRuleNodeId() != null ? msg.getRuleNodeId() : msg.getRuleChainId();
+            queue.ack(msg, ackId.getId(), msg.getClusterPartition());
+        }
+    }
+
+    private void pushToTarget(TbMsg msg, EntityId target) {
+        switch (target.getEntityType()) {
+            case RULE_NODE:
+                pushMsgToNode(nodeActors.get(new RuleNodeId(target.getId())), msg);
+                break;
+            case RULE_CHAIN:
+                parent.tell(new RuleChainToRuleChainMsg(new RuleChainId(target.getId()), entityId, msg, false), self);
+                break;
         }
     }
 
@@ -208,4 +233,9 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
             nodeCtx.getSelfActor().tell(new RuleChainToRuleNodeMsg(new DefaultTbContext(systemContext, nodeCtx), msg), self);
         }
     }
+
+    private TbMsg enrichWithRuleChainId(TbMsg tbMsg) {
+        // We don't put firstNodeId because it may change over time;
+        return new TbMsg(tbMsg.getId(), tbMsg.getType(), tbMsg.getOriginator(), tbMsg.getMetaData(), tbMsg.getData(), entityId, null, 0L);
+    }
 }
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainToRuleChainMsg.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainToRuleChainMsg.java
new file mode 100644
index 0000000..f0fca21
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainToRuleChainMsg.java
@@ -0,0 +1,40 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.actors.ruleChain;
+
+import lombok.Data;
+import org.thingsboard.rule.engine.api.TbContext;
+import org.thingsboard.server.common.data.id.RuleChainId;
+import org.thingsboard.server.common.msg.MsgType;
+import org.thingsboard.server.common.msg.TbActorMsg;
+import org.thingsboard.server.common.msg.TbMsg;
+
+/**
+ * Created by ashvayka on 19.03.18.
+ */
+@Data
+final class RuleChainToRuleChainMsg implements TbActorMsg {
+
+    private final RuleChainId target;
+    private final RuleChainId source;
+    private final TbMsg msg;
+    private final boolean enqueue;
+
+    @Override
+    public MsgType getMsgType() {
+        return MsgType.RULE_CHAIN_TO_RULE_CHAIN_MSG;
+    }
+}
diff --git a/application/src/main/java/org/thingsboard/server/actors/shared/ComponentMsgProcessor.java b/application/src/main/java/org/thingsboard/server/actors/shared/ComponentMsgProcessor.java
index e25d3a7..4a7e072 100644
--- a/application/src/main/java/org/thingsboard/server/actors/shared/ComponentMsgProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/shared/ComponentMsgProcessor.java
@@ -17,22 +17,32 @@ package org.thingsboard.server.actors.shared;
 
 import akka.actor.ActorContext;
 import akka.event.LoggingAdapter;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
 import org.thingsboard.server.actors.ActorSystemContext;
 import org.thingsboard.server.actors.stats.StatsPersistTick;
+import org.thingsboard.server.common.data.id.EntityId;
 import org.thingsboard.server.common.data.id.TenantId;
 import org.thingsboard.server.common.data.plugin.ComponentLifecycleState;
+import org.thingsboard.server.common.msg.TbMsg;
 import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
+import org.thingsboard.server.dao.queue.MsgQueue;
 
-public abstract class ComponentMsgProcessor<T> extends AbstractContextAwareMsgProcessor {
+import javax.annotation.Nullable;
+import java.util.function.Consumer;
+
+public abstract class ComponentMsgProcessor<T extends EntityId> extends AbstractContextAwareMsgProcessor {
 
     protected final TenantId tenantId;
     protected final T entityId;
+    protected final MsgQueue queue;
     protected ComponentLifecycleState state;
 
     protected ComponentMsgProcessor(ActorSystemContext systemContext, LoggingAdapter logger, TenantId tenantId, T id) {
         super(systemContext, logger);
         this.tenantId = tenantId;
         this.entityId = id;
+        this.queue = systemContext.getMsgQueue();
     }
 
     public abstract void start(ActorContext context) throws Exception;
@@ -75,4 +85,19 @@ public abstract class ComponentMsgProcessor<T> extends AbstractContextAwareMsgPr
             throw new IllegalStateException("Rule chain is not active!");
         }
     }
+
+    protected void putToQueue(final TbMsg tbMsg, final Consumer<TbMsg> onSuccess) {
+        EntityId entityId = tbMsg.getRuleNodeId() != null ? tbMsg.getRuleNodeId() : tbMsg.getRuleChainId();
+        Futures.addCallback(queue.put(tbMsg, entityId.getId(), 0), new FutureCallback<Void>() {
+            @Override
+            public void onSuccess(@Nullable Void result) {
+                onSuccess.accept(tbMsg);
+            }
+
+            @Override
+            public void onFailure(Throwable t) {
+                logger.debug("Failed to push message [{}] to queue due to [{}]", tbMsg, t);
+            }
+        });
+    }
 }
diff --git a/application/src/main/java/org/thingsboard/server/controller/RuleChainController.java b/application/src/main/java/org/thingsboard/server/controller/RuleChainController.java
index 48c9cd5..05e7836 100644
--- a/application/src/main/java/org/thingsboard/server/controller/RuleChainController.java
+++ b/application/src/main/java/org/thingsboard/server/controller/RuleChainController.java
@@ -237,7 +237,7 @@ public class RuleChainController extends BaseController {
             ScriptEngine engine = null;
             try {
                 engine = new NashornJsEngine(script, functionName, argNames);
-                TbMsg inMsg = new TbMsg(UUIDs.timeBased(), msgType, null, new TbMsgMetaData(metadata), data);
+                TbMsg inMsg = new TbMsg(UUIDs.timeBased(), msgType, null, new TbMsgMetaData(metadata), data, null, null, 0L);
                 switch (scriptType) {
                     case "update":
                         output = msgToOutput(engine.executeUpdate(inMsg));
diff --git a/application/src/main/java/org/thingsboard/server/service/script/NashornJsEngine.java b/application/src/main/java/org/thingsboard/server/service/script/NashornJsEngine.java
index d68f6fe..8e689ec 100644
--- a/application/src/main/java/org/thingsboard/server/service/script/NashornJsEngine.java
+++ b/application/src/main/java/org/thingsboard/server/service/script/NashornJsEngine.java
@@ -118,7 +118,7 @@ public class NashornJsEngine implements org.thingsboard.rule.engine.api.ScriptEn
             String newData = data != null ? data : msg.getData();
             TbMsgMetaData newMetadata = metadata != null ? new TbMsgMetaData(metadata) : msg.getMetaData();
             String newMessageType = !StringUtils.isEmpty(messageType) ? messageType : msg.getType();
-            return new TbMsg(msg.getId(), newMessageType, msg.getOriginator(), newMetadata, newData);
+            return new TbMsg(msg.getId(), newMessageType, msg.getOriginator(), newMetadata, newData, msg.getRuleChainId(), msg.getRuleNodeId(), msg.getClusterPartition());
         } catch (Throwable th) {
             th.printStackTrace();
             throw new RuntimeException("Failed to unbind message data from javascript result", th);
diff --git a/application/src/test/java/org/thingsboard/server/controller/BaseComponentDescriptorControllerTest.java b/application/src/test/java/org/thingsboard/server/controller/BaseComponentDescriptorControllerTest.java
index d04fb64..3a07d29 100644
--- a/application/src/test/java/org/thingsboard/server/controller/BaseComponentDescriptorControllerTest.java
+++ b/application/src/test/java/org/thingsboard/server/controller/BaseComponentDescriptorControllerTest.java
@@ -36,7 +36,7 @@ import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.
 
 public abstract class BaseComponentDescriptorControllerTest extends AbstractControllerTest {
 
-    private static final int AMOUNT_OF_DEFAULT_FILTER_NODES = 3;
+    private static final int AMOUNT_OF_DEFAULT_FILTER_NODES = 4;
     private Tenant savedTenant;
     private User tenantAdmin;
 
@@ -87,7 +87,7 @@ public abstract class BaseComponentDescriptorControllerTest extends AbstractCont
                 });
 
         Assert.assertNotNull(descriptors);
-        Assert.assertEquals(AMOUNT_OF_DEFAULT_FILTER_NODES, descriptors.size());
+        Assert.assertTrue(descriptors.size() >= AMOUNT_OF_DEFAULT_FILTER_NODES);
 
         for (ComponentType type : ComponentType.values()) {
             doGet("/api/components/" + type).andExpect(status().isOk());
diff --git a/application/src/test/java/org/thingsboard/server/mqtt/rpc/AbstractMqttServerSideRpcIntegrationTest.java b/application/src/test/java/org/thingsboard/server/mqtt/rpc/AbstractMqttServerSideRpcIntegrationTest.java
index af47764..b732e66 100644
--- a/application/src/test/java/org/thingsboard/server/mqtt/rpc/AbstractMqttServerSideRpcIntegrationTest.java
+++ b/application/src/test/java/org/thingsboard/server/mqtt/rpc/AbstractMqttServerSideRpcIntegrationTest.java
@@ -80,7 +80,6 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractC
         }
     }
 
-    @Ignore
     @Test
     public void testServerMqttOneWayRpc() throws Exception {
         Device device = new Device();
@@ -107,7 +106,6 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractC
         Assert.assertTrue(StringUtils.isEmpty(result));
     }
 
-    @Ignore
     @Test
     public void testServerMqttOneWayRpcDeviceOffline() throws Exception {
         Device device = new Device();
diff --git a/application/src/test/java/org/thingsboard/server/mqtt/rpc/sql/MqttServerSideRpcSqlIntegrationTest.java b/application/src/test/java/org/thingsboard/server/mqtt/rpc/sql/MqttServerSideRpcSqlIntegrationTest.java
index 7c9c058..f3d69ba 100644
--- a/application/src/test/java/org/thingsboard/server/mqtt/rpc/sql/MqttServerSideRpcSqlIntegrationTest.java
+++ b/application/src/test/java/org/thingsboard/server/mqtt/rpc/sql/MqttServerSideRpcSqlIntegrationTest.java
@@ -15,7 +15,6 @@
  */
 package org.thingsboard.server.mqtt.rpc.sql;
 
-import org.thingsboard.server.dao.service.DaoNoSqlTest;
 import org.thingsboard.server.dao.service.DaoSqlTest;
 import org.thingsboard.server.mqtt.rpc.AbstractMqttServerSideRpcIntegrationTest;
 
diff --git a/application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java b/application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java
index f45e303..2d56772 100644
--- a/application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java
+++ b/application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java
@@ -150,7 +150,7 @@ public abstract class AbstractRuleEngineFlowIntegrationTest extends AbstractRule
                 "CUSTOM",
                 device.getId(),
                 new TbMsgMetaData(),
-                "{}");
+                "{}", null, null, 0L);
         actorService.onMsg(new ServiceToRuleEngineMsg(savedTenant.getId(), tbMsg));
 
         Thread.sleep(3000);
diff --git a/application/src/test/java/org/thingsboard/server/rules/lifecycle/AbstractRuleEngineLifecycleIntegrationTest.java b/application/src/test/java/org/thingsboard/server/rules/lifecycle/AbstractRuleEngineLifecycleIntegrationTest.java
index 29e8b73..18583fd 100644
--- a/application/src/test/java/org/thingsboard/server/rules/lifecycle/AbstractRuleEngineLifecycleIntegrationTest.java
+++ b/application/src/test/java/org/thingsboard/server/rules/lifecycle/AbstractRuleEngineLifecycleIntegrationTest.java
@@ -138,7 +138,8 @@ public abstract class AbstractRuleEngineLifecycleIntegrationTest extends Abstrac
                 "CUSTOM",
                 device.getId(),
                 new TbMsgMetaData(),
-                "{}");
+                "{}",
+                null, null, 0L);
         actorService.onMsg(new ServiceToRuleEngineMsg(savedTenant.getId(), tbMsg));
 
         Thread.sleep(3000);
diff --git a/application/src/test/java/org/thingsboard/server/service/script/NashornJsEngineTest.java b/application/src/test/java/org/thingsboard/server/service/script/NashornJsEngineTest.java
index e6a48e2..1981287 100644
--- a/application/src/test/java/org/thingsboard/server/service/script/NashornJsEngineTest.java
+++ b/application/src/test/java/org/thingsboard/server/service/script/NashornJsEngineTest.java
@@ -42,7 +42,7 @@ public class NashornJsEngineTest {
         metaData.putValue("humidity", "99");
         String rawJson = "{\"name\": \"Vit\", \"passed\": 5, \"bigObj\": {\"prop\":42}}";
 
-        TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson);
+        TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson, null, null, 0L);
 
         TbMsg actual = scriptEngine.executeUpdate(msg);
         assertEquals("70", actual.getMetaData().getValue("temp"));
@@ -57,7 +57,7 @@ public class NashornJsEngineTest {
         metaData.putValue("humidity", "99");
         String rawJson = "{\"name\": \"Vit\", \"passed\": 5, \"bigObj\": {\"prop\":42}}";
 
-        TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson);
+        TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson, null, null, 0L);
 
         TbMsg actual = scriptEngine.executeUpdate(msg);
         assertEquals("94", actual.getMetaData().getValue("newAttr"));
@@ -72,7 +72,7 @@ public class NashornJsEngineTest {
         metaData.putValue("humidity", "99");
         String rawJson = "{\"name\":\"Vit\",\"passed\": 5,\"bigObj\":{\"prop\":42}}";
 
-        TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson);
+        TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson, null, null, 0L);
 
         TbMsg actual = scriptEngine.executeUpdate(msg);
 
@@ -89,7 +89,7 @@ public class NashornJsEngineTest {
         metaData.putValue("humidity", "99");
         String rawJson = "{\"name\": \"Vit\", \"passed\": 5, \"bigObj\": {\"prop\":42}}";
 
-        TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson);
+        TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson, null, null, 0L);
         assertFalse(scriptEngine.executeFilter(msg));
     }
 
@@ -102,7 +102,7 @@ public class NashornJsEngineTest {
         metaData.putValue("humidity", "99");
         String rawJson = "{\"name\": \"Vit\", \"passed\": 5, \"bigObj\": {\"prop\":42}}";
 
-        TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson);
+        TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson, null, null, 0L);
         assertTrue(scriptEngine.executeFilter(msg));
     }
 
@@ -122,7 +122,7 @@ public class NashornJsEngineTest {
         metaData.putValue("humidity", "99");
         String rawJson = "{\"name\": \"Vit\", \"passed\": 5, \"bigObj\": {\"prop\":42}}";
 
-        TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson);
+        TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson, null, null, 0L);
         Set<String> actual = scriptEngine.executeSwitch(msg);
         assertEquals(Sets.newHashSet("one"), actual);
     }
@@ -143,7 +143,7 @@ public class NashornJsEngineTest {
         metaData.putValue("humidity", "99");
         String rawJson = "{\"name\": \"Vit\", \"passed\": 5, \"bigObj\": {\"prop\":42}}";
 
-        TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson);
+        TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson, null, null, 0L);
         Set<String> actual = scriptEngine.executeSwitch(msg);
         assertEquals(Sets.newHashSet("one", "three"), actual);
     }
diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java b/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java
index 74c08da..d63456e 100644
--- a/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java
+++ b/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java
@@ -48,6 +48,11 @@ public enum MsgType {
     RULE_CHAIN_TO_RULE_MSG,
 
     /**
+     * Message that is sent by RuleChainActor to other RuleChainActor with command to process TbMsg.
+     */
+    RULE_CHAIN_TO_RULE_CHAIN_MSG,
+
+    /**
      * Message that is sent by RuleActor to RuleChainActor with command to process TbMsg by next nodes in chain.
      */
     RULE_TO_RULE_CHAIN_TELL_NEXT_MSG,
diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java
index 1c7de3b..dd80bf7 100644
--- a/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java
+++ b/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java
@@ -15,12 +15,13 @@
  */
 package org.thingsboard.server.common.msg;
 
-import com.google.protobuf.ByteString;
 import com.google.protobuf.InvalidProtocolBufferException;
 import lombok.AllArgsConstructor;
 import lombok.Data;
 import org.thingsboard.server.common.data.id.EntityId;
 import org.thingsboard.server.common.data.id.EntityIdFactory;
+import org.thingsboard.server.common.data.id.RuleChainId;
+import org.thingsboard.server.common.data.id.RuleNodeId;
 import org.thingsboard.server.common.msg.gen.MsgProtos;
 
 import java.io.Serializable;
@@ -41,22 +42,40 @@ public final class TbMsg implements Serializable {
     private final TbMsgDataType dataType;
     private final String data;
 
-    public TbMsg(UUID id, String type, EntityId originator, TbMsgMetaData metaData, String data) {
+    //The following fields are not persisted to DB, because they can always be recovered from the context;
+    private final RuleChainId ruleChainId;
+    private final RuleNodeId ruleNodeId;
+    private final long clusterPartition;
+
+    public TbMsg(UUID id, String type, EntityId originator, TbMsgMetaData metaData, String data,
+                 RuleChainId ruleChainId, RuleNodeId ruleNodeId, long clusterPartition) {
         this.id = id;
         this.type = type;
         this.originator = originator;
         this.metaData = metaData;
-        this.dataType = TbMsgDataType.JSON;
         this.data = data;
+        this.dataType = TbMsgDataType.JSON;
+        this.ruleChainId = ruleChainId;
+        this.ruleNodeId = ruleNodeId;
+        this.clusterPartition = clusterPartition;
     }
 
     public static ByteBuffer toBytes(TbMsg msg) {
         MsgProtos.TbMsgProto.Builder builder = MsgProtos.TbMsgProto.newBuilder();
         builder.setId(msg.getId().toString());
         builder.setType(msg.getType());
-        if (msg.getOriginator() != null) {
-            builder.setEntityType(msg.getOriginator().getEntityType().name());
-            builder.setEntityId(msg.getOriginator().getId().toString());
+        builder.setEntityType(msg.getOriginator().getEntityType().name());
+        builder.setEntityIdMSB(msg.getOriginator().getId().getMostSignificantBits());
+        builder.setEntityIdLSB(msg.getOriginator().getId().getLeastSignificantBits());
+
+        if (msg.getRuleChainId() != null) {
+            builder.setRuleChainIdMSB(msg.getRuleChainId().getId().getMostSignificantBits());
+            builder.setRuleChainIdLSB(msg.getRuleChainId().getId().getLeastSignificantBits());
+        }
+
+        if (msg.getRuleNodeId() != null) {
+            builder.setRuleNodeIdMSB(msg.getRuleNodeId().getId().getMostSignificantBits());
+            builder.setRuleNodeIdLSB(msg.getRuleNodeId().getId().getLeastSignificantBits());
         }
 
         if (msg.getMetaData() != null) {
@@ -73,15 +92,18 @@ public final class TbMsg implements Serializable {
         try {
             MsgProtos.TbMsgProto proto = MsgProtos.TbMsgProto.parseFrom(buffer.array());
             TbMsgMetaData metaData = new TbMsgMetaData(proto.getMetaData().getDataMap());
-            EntityId entityId = EntityIdFactory.getByTypeAndId(proto.getEntityType(), proto.getEntityId());
+            EntityId entityId = EntityIdFactory.getByTypeAndUuid(proto.getEntityType(), new UUID(proto.getEntityIdMSB(), proto.getEntityIdLSB()));
+            RuleChainId ruleChainId = new RuleChainId(new UUID(proto.getRuleChainIdMSB(), proto.getRuleChainIdLSB()));
+            RuleNodeId ruleNodeId = new RuleNodeId(new UUID(proto.getRuleNodeIdMSB(), proto.getRuleChainIdLSB()));
             TbMsgDataType dataType = TbMsgDataType.values()[proto.getDataType()];
-            return new TbMsg(UUID.fromString(proto.getId()), proto.getType(), entityId, metaData, dataType, proto.getData());
+            return new TbMsg(UUID.fromString(proto.getId()), proto.getType(), entityId, metaData, dataType, proto.getData(), ruleChainId, ruleNodeId, proto.getClusterPartition());
         } catch (InvalidProtocolBufferException e) {
             throw new IllegalStateException("Could not parse protobuf for TbMsg", e);
         }
     }
 
-    public TbMsg copy() {
-        return new TbMsg(id, type, originator, metaData.copy(), dataType, data);
+    public TbMsg copy(UUID newId, RuleChainId ruleChainId, RuleNodeId ruleNodeId, long clusterPartition) {
+        return new TbMsg(newId, type, originator, metaData, dataType, data, ruleChainId, ruleNodeId, clusterPartition);
     }
+
 }
diff --git a/common/message/src/main/proto/tbmsg.proto b/common/message/src/main/proto/tbmsg.proto
index 4ce1fb6..60003dc 100644
--- a/common/message/src/main/proto/tbmsg.proto
+++ b/common/message/src/main/proto/tbmsg.proto
@@ -27,10 +27,19 @@ message TbMsgProto {
     string id = 1;
     string type = 2;
     string entityType = 3;
-    string entityId = 4;
+    int64 entityIdMSB = 4;
+    int64 entityIdLSB = 5;
 
-    TbMsgMetaDataProto metaData = 5;
+    int64 ruleChainIdMSB = 6;
+    int64 ruleChainIdLSB = 7;
+
+    int64 ruleNodeIdMSB = 8;
+    int64 ruleNodeIdLSB = 9;
+    int64 clusterPartition = 10;
+
+    TbMsgMetaDataProto metaData = 11;
+
+    int32 dataType = 12;
+    string data = 13;
 
-    int32 dataType = 6;
-    string data = 7;
 }
\ No newline at end of file
diff --git a/dao/src/main/java/org/thingsboard/server/dao/queue/QueueBenchmark.java b/dao/src/main/java/org/thingsboard/server/dao/queue/QueueBenchmark.java
index 85f42ae..ef55bcb 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/queue/QueueBenchmark.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/queue/QueueBenchmark.java
@@ -27,10 +27,9 @@ import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.CommandLineRunner;
 import org.springframework.boot.SpringApplication;
-import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
-import org.springframework.boot.autoconfigure.SpringBootApplication;
 import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.ComponentScan;
+import org.thingsboard.server.common.data.id.RuleChainId;
+import org.thingsboard.server.common.data.id.RuleNodeId;
 import org.thingsboard.server.common.msg.TbMsg;
 import org.thingsboard.server.common.msg.TbMsgDataType;
 import org.thingsboard.server.common.msg.TbMsgMetaData;
@@ -61,11 +60,11 @@ public class QueueBenchmark implements CommandLineRunner {
     }
 
     @Autowired
-    private MsqQueue msqQueue;
+    private MsgQueue msgQueue;
 
     @Override
     public void run(String... strings) throws Exception {
-        System.out.println("It works + " + msqQueue);
+        System.out.println("It works + " + msgQueue);
 
 
         long start = System.currentTimeMillis();
@@ -81,8 +80,8 @@ public class QueueBenchmark implements CommandLineRunner {
                     try {
                         TbMsg msg = randomMsg();
                         UUID nodeId = UUIDs.timeBased();
-                        ListenableFuture<Void> put = msqQueue.put(msg, nodeId, 100L);
-//                    ListenableFuture<Void> put = msqQueue.ack(msg, nodeId, 100L);
+                        ListenableFuture<Void> put = msgQueue.put(msg, nodeId, 100L);
+//                    ListenableFuture<Void> put = msgQueue.ack(msg, nodeId, 100L);
                         Futures.addCallback(put, new FutureCallback<Void>() {
                             @Override
                             public void onSuccess(@Nullable Void result) {
@@ -126,7 +125,7 @@ public class QueueBenchmark implements CommandLineRunner {
         TbMsgMetaData metaData = new TbMsgMetaData();
         metaData.putValue("key", "value");
         String dataStr = "someContent";
-        return new TbMsg(UUIDs.timeBased(), "type", null, metaData, TbMsgDataType.JSON, dataStr);
+        return new TbMsg(UUIDs.timeBased(), "type", null, metaData, TbMsgDataType.JSON, dataStr, new RuleChainId(UUIDs.timeBased()), new RuleNodeId(UUIDs.timeBased()), 0L);
     }
 
     @Bean
diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/queue/DummySqlMsgQueue.java b/dao/src/main/java/org/thingsboard/server/dao/sql/queue/DummySqlMsgQueue.java
new file mode 100644
index 0000000..0281d40
--- /dev/null
+++ b/dao/src/main/java/org/thingsboard/server/dao/sql/queue/DummySqlMsgQueue.java
@@ -0,0 +1,50 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.dao.sql.queue;
+
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+import org.thingsboard.server.common.msg.TbMsg;
+import org.thingsboard.server.dao.queue.MsgQueue;
+import org.thingsboard.server.dao.util.SqlDao;
+
+import java.util.Collections;
+import java.util.UUID;
+
+/**
+ * Created by ashvayka on 27.04.18.
+ */
+@Component
+@Slf4j
+@SqlDao
+public class DummySqlMsgQueue implements MsgQueue {
+    @Override
+    public ListenableFuture<Void> put(TbMsg msg, UUID nodeId, long clusterPartition) {
+        return Futures.immediateFuture(null);
+    }
+
+    @Override
+    public ListenableFuture<Void> ack(TbMsg msg, UUID nodeId, long clusterPartition) {
+        return Futures.immediateFuture(null);
+    }
+
+    @Override
+    public Iterable<TbMsg> findUnprocessed(UUID nodeId, long clusterPartition) {
+        return Collections.emptyList();
+    }
+}
diff --git a/dao/src/test/java/org/thingsboard/server/dao/service/queue/cassandra/repository/impl/CassandraMsgRepositoryTest.java b/dao/src/test/java/org/thingsboard/server/dao/service/queue/cassandra/repository/impl/CassandraMsgRepositoryTest.java
index a766aa4..ae252ae 100644
--- a/dao/src/test/java/org/thingsboard/server/dao/service/queue/cassandra/repository/impl/CassandraMsgRepositoryTest.java
+++ b/dao/src/test/java/org/thingsboard/server/dao/service/queue/cassandra/repository/impl/CassandraMsgRepositoryTest.java
@@ -23,6 +23,8 @@ import org.junit.Before;
 import org.junit.Test;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.thingsboard.server.common.data.id.DeviceId;
+import org.thingsboard.server.common.data.id.RuleChainId;
+import org.thingsboard.server.common.data.id.RuleNodeId;
 import org.thingsboard.server.common.msg.TbMsg;
 import org.thingsboard.server.common.msg.TbMsgDataType;
 import org.thingsboard.server.common.msg.TbMsgMetaData;
@@ -45,7 +47,8 @@ public class CassandraMsgRepositoryTest extends AbstractServiceTest {
 
     @Test
     public void msgCanBeSavedAndRead() throws ExecutionException, InterruptedException {
-        TbMsg msg = new TbMsg(UUIDs.timeBased(), "type", new DeviceId(UUIDs.timeBased()), null, TbMsgDataType.JSON, "0000");
+        TbMsg msg = new TbMsg(UUIDs.timeBased(), "type", new DeviceId(UUIDs.timeBased()), null, TbMsgDataType.JSON, "0000",
+                new RuleChainId(UUIDs.timeBased()), new RuleNodeId(UUIDs.timeBased()), 0L);
         UUID nodeId = UUIDs.timeBased();
         ListenableFuture<Void> future = msgRepository.save(msg, nodeId, 1L, 1L, 1L);
         future.get();
@@ -55,7 +58,8 @@ public class CassandraMsgRepositoryTest extends AbstractServiceTest {
 
     @Test
     public void expiredMsgsAreNotReturned() throws ExecutionException, InterruptedException {
-        TbMsg msg = new TbMsg(UUIDs.timeBased(), "type", new DeviceId(UUIDs.timeBased()), null, TbMsgDataType.JSON, "0000");
+        TbMsg msg = new TbMsg(UUIDs.timeBased(), "type", new DeviceId(UUIDs.timeBased()), null, TbMsgDataType.JSON, "0000",
+                new RuleChainId(UUIDs.timeBased()), new RuleNodeId(UUIDs.timeBased()), 0L);
         UUID nodeId = UUIDs.timeBased();
         ListenableFuture<Void> future = msgRepository.save(msg, nodeId, 2L, 2L, 2L);
         future.get();
@@ -68,7 +72,8 @@ public class CassandraMsgRepositoryTest extends AbstractServiceTest {
         TbMsgMetaData metaData = new TbMsgMetaData();
         metaData.putValue("key", "value");
         String dataStr = "someContent";
-        TbMsg msg = new TbMsg(UUIDs.timeBased(), "type", new DeviceId(UUIDs.timeBased()), metaData, TbMsgDataType.JSON, dataStr);
+        TbMsg msg = new TbMsg(UUIDs.timeBased(), "type", new DeviceId(UUIDs.timeBased()), metaData, TbMsgDataType.JSON, dataStr,
+                new RuleChainId(UUIDs.timeBased()), new RuleNodeId(UUIDs.timeBased()), 0L);
         UUID nodeId = UUIDs.timeBased();
         ListenableFuture<Void> future = msgRepository.save(msg, nodeId, 1L, 1L, 1L);
         future.get();
diff --git a/dao/src/test/java/org/thingsboard/server/dao/service/queue/cassandra/UnprocessedMsgFilterTest.java b/dao/src/test/java/org/thingsboard/server/dao/service/queue/cassandra/UnprocessedMsgFilterTest.java
index 3935c9b..caccc85 100644
--- a/dao/src/test/java/org/thingsboard/server/dao/service/queue/cassandra/UnprocessedMsgFilterTest.java
+++ b/dao/src/test/java/org/thingsboard/server/dao/service/queue/cassandra/UnprocessedMsgFilterTest.java
@@ -33,8 +33,8 @@ public class UnprocessedMsgFilterTest {
     public void acknowledgedMsgsAreFilteredOut() {
         UUID id1 = UUID.randomUUID();
         UUID id2 = UUID.randomUUID();
-        TbMsg msg1 = new TbMsg(id1, "T", null, null, null, null);
-        TbMsg msg2 = new TbMsg(id2, "T", null, null, null, null);
+        TbMsg msg1 = new TbMsg(id1, "T", null, null, null, null, null, null, 0L);
+        TbMsg msg2 = new TbMsg(id2, "T", null, null, null, null, null, null, 0L);
         List<TbMsg> msgs = Lists.newArrayList(msg1, msg2);
         List<MsgAck> acks = Lists.newArrayList(new MsgAck(id2, UUID.randomUUID(), 1L, 1L));
         Collection<TbMsg> actual = msgFilter.filter(msgs, acks);
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbAlarmNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbAlarmNode.java
index a242093..42ad237 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbAlarmNode.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbAlarmNode.java
@@ -91,11 +91,11 @@ public class TbAlarmNode implements TbNode {
                     if (alarmResult.alarm == null) {
                         ctx.tellNext(msg, "False");
                     } else if (alarmResult.isCreated) {
-                        ctx.tellNext(toAlarmMsg(alarmResult, msg), "Created");
+                        ctx.tellNext(toAlarmMsg(ctx, alarmResult, msg), "Created");
                     } else if (alarmResult.isUpdated) {
-                        ctx.tellNext(toAlarmMsg(alarmResult, msg), "Updated");
+                        ctx.tellNext(toAlarmMsg(ctx, alarmResult, msg), "Updated");
                     } else if (alarmResult.isCleared) {
-                        ctx.tellNext(toAlarmMsg(alarmResult, msg), "Cleared");
+                        ctx.tellNext(toAlarmMsg(ctx, alarmResult, msg), "Cleared");
                     }
                 },
                 t -> ctx.tellError(msg, t));
@@ -176,7 +176,7 @@ public class TbAlarmNode implements TbNode {
         return ctx.getJsExecutor().executeAsync(() -> buildDetailsJsEngine.executeJson(msg));
     }
 
-    private TbMsg toAlarmMsg(AlarmResult alarmResult, TbMsg originalMsg) {
+    private TbMsg toAlarmMsg(TbContext ctx, AlarmResult alarmResult, TbMsg originalMsg) {
         JsonNode jsonNodes = mapper.valueToTree(alarmResult.alarm);
         String data = jsonNodes.toString();
         TbMsgMetaData metaData = originalMsg.getMetaData().copy();
@@ -187,7 +187,7 @@ public class TbAlarmNode implements TbNode {
         } else if (alarmResult.isCleared) {
             metaData.putValue(IS_CLEARED_ALARM, Boolean.TRUE.toString());
         }
-        return new TbMsg(UUIDs.timeBased(), "ALARM", originalMsg.getOriginator(), metaData, data);
+        return ctx.newMsg("ALARM", originalMsg.getOriginator(), metaData, data);
     }
 
 
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNode.java
index 65b1371..2e95f4a 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNode.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNode.java
@@ -78,18 +78,18 @@ public class TbMsgGeneratorNode implements TbNode {
     }
 
     private void sentTickMsg(TbContext ctx) {
-        TbMsg tickMsg = new TbMsg(UUIDs.timeBased(), TB_MSG_GENERATOR_NODE_MSG, ctx.getSelfId(), new TbMsgMetaData(), "");
+        TbMsg tickMsg = ctx.newMsg(TB_MSG_GENERATOR_NODE_MSG, ctx.getSelfId(), new TbMsgMetaData(), "");
         nextTickId = tickMsg.getId();
         ctx.tellSelf(tickMsg, delay);
     }
 
-    protected ListenableFuture<TbMsg> generate(TbContext ctx) {
+    private ListenableFuture<TbMsg> generate(TbContext ctx) {
         return ctx.getJsExecutor().executeAsync(() -> {
             if (prevMsg == null) {
-                prevMsg = new TbMsg(UUIDs.timeBased(), "", originatorId, new TbMsgMetaData(), "{}");
+                prevMsg = ctx.newMsg( "", originatorId, new TbMsgMetaData(), "{}");
             }
             TbMsg generated = jsEngine.executeGenerate(prevMsg);
-            prevMsg = new TbMsg(UUIDs.timeBased(), generated.getType(), originatorId, generated.getMetaData(), generated.getData());
+            prevMsg = ctx.newMsg(generated.getType(), originatorId, generated.getMetaData(), generated.getData());
             return prevMsg;
         });
     }
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsSwitchNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsSwitchNode.java
index 3c6704b..4ee8bd5 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsSwitchNode.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsSwitchNode.java
@@ -61,7 +61,7 @@ public class TbJsSwitchNode implements TbNode {
         ctx.tellNext(msg, nextRelations);
     }
 
-   @Override
+    @Override
     public void destroy() {
         if (jsEngine != null) {
             jsEngine.destroy();
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mail/TbMsgToEmailNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mail/TbMsgToEmailNode.java
index cae2058..7c7e685 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mail/TbMsgToEmailNode.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mail/TbMsgToEmailNode.java
@@ -76,7 +76,7 @@ public class TbMsgToEmailNode implements TbNode {
     public void onMsg(TbContext ctx, TbMsg msg) {
         try {
             EmailPojo email = convert(msg);
-            TbMsg emailMsg = buildEmailMsg(msg, email);
+            TbMsg emailMsg = buildEmailMsg(ctx, msg, email);
             ctx.tellNext(emailMsg);
         } catch (Exception ex) {
             log.warn("Can not convert message to email " + ex.getMessage());
@@ -84,9 +84,9 @@ public class TbMsgToEmailNode implements TbNode {
         }
     }
 
-    private TbMsg buildEmailMsg(TbMsg msg, EmailPojo email) throws JsonProcessingException {
+    private TbMsg buildEmailMsg(TbContext ctx, TbMsg msg, EmailPojo email) throws JsonProcessingException {
         String emailJson = MAPPER.writeValueAsString(email);
-        return new TbMsg(UUIDs.timeBased(), SEND_EMAIL_TYPE, msg.getOriginator(), msg.getMetaData().copy(), emailJson);
+        return ctx.newMsg(SEND_EMAIL_TYPE, msg.getOriginator(), msg.getMetaData().copy(), emailJson);
     }
 
     private EmailPojo convert(TbMsg msg) throws IOException {
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mail/TbSendEmailNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mail/TbSendEmailNode.java
index 407476b..0b74d6e 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mail/TbSendEmailNode.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mail/TbSendEmailNode.java
@@ -63,8 +63,6 @@ public class TbSendEmailNode implements TbNode {
         } catch (Exception ex) {
             ctx.tellError(msg, ex);
         }
-
-
     }
 
     private EmailPojo getEmail(TbMsg msg) throws IOException {
@@ -84,6 +82,5 @@ public class TbSendEmailNode implements TbNode {
 
     @Override
     public void destroy() {
-
     }
 }
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbChangeOriginatorNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbChangeOriginatorNode.java
index 05ad49c..36287fe 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbChangeOriginatorNode.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbChangeOriginatorNode.java
@@ -60,7 +60,7 @@ public class TbChangeOriginatorNode extends TbAbstractTransformNode {
     @Override
     protected ListenableFuture<TbMsg> transform(TbContext ctx, TbMsg msg) {
         ListenableFuture<? extends EntityId> newOriginator = getNewOriginator(ctx, msg.getOriginator());
-        return Futures.transform(newOriginator, (Function<EntityId, TbMsg>) n -> new TbMsg(msg.getId(), msg.getType(), n, msg.getMetaData(), msg.getData()));
+        return Futures.transform(newOriginator, (Function<EntityId, TbMsg>) n -> ctx.newMsg(msg.getType(), n, msg.getMetaData(), msg.getData()));
     }
 
     private ListenableFuture<? extends EntityId> getNewOriginator(TbContext ctx, EntityId original) {
diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbAlarmNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbAlarmNodeTest.java
index c0aec73..8213195 100644
--- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbAlarmNodeTest.java
+++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbAlarmNodeTest.java
@@ -35,6 +35,8 @@ import org.thingsboard.rule.engine.api.*;
 import org.thingsboard.server.common.data.alarm.Alarm;
 import org.thingsboard.server.common.data.id.DeviceId;
 import org.thingsboard.server.common.data.id.EntityId;
+import org.thingsboard.server.common.data.id.RuleChainId;
+import org.thingsboard.server.common.data.id.RuleNodeId;
 import org.thingsboard.server.common.data.id.TenantId;
 import org.thingsboard.server.common.msg.TbMsg;
 import org.thingsboard.server.common.msg.TbMsgMetaData;
@@ -73,6 +75,9 @@ public class TbAlarmNodeTest {
     @Mock
     private ScriptEngine detailsJs;
 
+    private RuleChainId ruleChainId = new RuleChainId(UUIDs.timeBased());
+    private RuleNodeId ruleNodeId = new RuleNodeId(UUIDs.timeBased());
+
     private ListeningExecutor dbExecutor;
 
     private EntityId originator = new DeviceId(UUIDs.timeBased());
@@ -103,7 +108,7 @@ public class TbAlarmNodeTest {
     public void newAlarmCanBeCreated() throws ScriptException, IOException {
         initWithScript();
         metaData.putValue("key", "value");
-        TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", originator, metaData, rawJson);
+        TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", originator, metaData, rawJson, ruleChainId, ruleNodeId, 0L);
 
         when(createJs.executeFilter(msg)).thenReturn(true);
         when(detailsJs.executeJson(msg)).thenReturn(null);
@@ -113,17 +118,21 @@ public class TbAlarmNodeTest {
 
         node.onMsg(ctx, msg);
 
-        ArgumentCaptor<TbMsg> captor = ArgumentCaptor.forClass(TbMsg.class);
-        verify(ctx).tellNext(captor.capture(), eq("Created"));
-        TbMsg actualMsg = captor.getValue();
+        verify(ctx).tellNext(any(), eq("Created"));
+
+        ArgumentCaptor<String> typeCaptor = ArgumentCaptor.forClass(String.class);
+        ArgumentCaptor<EntityId> originatorCaptor = ArgumentCaptor.forClass(EntityId.class);
+        ArgumentCaptor<TbMsgMetaData> metadataCaptor = ArgumentCaptor.forClass(TbMsgMetaData.class);
+        ArgumentCaptor<String> dataCaptor = ArgumentCaptor.forClass(String.class);
+        verify(ctx).newMsg(typeCaptor.capture(), originatorCaptor.capture(), metadataCaptor.capture(), dataCaptor.capture());
 
-        assertEquals("ALARM", actualMsg.getType());
-        assertEquals(originator, actualMsg.getOriginator());
-        assertEquals("value", actualMsg.getMetaData().getValue("key"));
-        assertEquals(Boolean.TRUE.toString(), actualMsg.getMetaData().getValue(IS_NEW_ALARM));
-        assertNotSame(metaData, actualMsg.getMetaData());
+        assertEquals("ALARM", typeCaptor.getValue());
+        assertEquals(originator, originatorCaptor.getValue());
+        assertEquals("value", metadataCaptor.getValue().getValue("key"));
+        assertEquals(Boolean.TRUE.toString(), metadataCaptor.getValue().getValue(IS_NEW_ALARM));
+        assertNotSame(metaData, metadataCaptor.getValue());
 
-        Alarm actualAlarm = new ObjectMapper().readValue(actualMsg.getData().getBytes(), Alarm.class);
+        Alarm actualAlarm = new ObjectMapper().readValue(dataCaptor.getValue().getBytes(), Alarm.class);
         Alarm expectedAlarm = Alarm.builder()
                 .tenantId(tenantId)
                 .originator(originator)
@@ -143,7 +152,7 @@ public class TbAlarmNodeTest {
     public void shouldCreateScriptThrowsException() throws ScriptException {
         initWithScript();
         metaData.putValue("key", "value");
-        TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", originator, metaData, rawJson);
+        TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", originator, metaData, rawJson, ruleChainId, ruleNodeId, 0L);
 
         when(createJs.executeFilter(msg)).thenThrow(new NotImplementedException("message"));
 
@@ -165,7 +174,7 @@ public class TbAlarmNodeTest {
     public void buildDetailsThrowsException() throws ScriptException, IOException {
         initWithScript();
         metaData.putValue("key", "value");
-        TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", originator, metaData, rawJson);
+        TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", originator, metaData, rawJson, ruleChainId, ruleNodeId, 0L);
 
         when(createJs.executeFilter(msg)).thenReturn(true);
         when(detailsJs.executeJson(msg)).thenThrow(new NotImplementedException("message"));
@@ -191,7 +200,7 @@ public class TbAlarmNodeTest {
     public void ifAlarmClearedCreateNew() throws ScriptException, IOException {
         initWithScript();
         metaData.putValue("key", "value");
-        TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", originator, metaData, rawJson);
+        TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", originator, metaData, rawJson, ruleChainId, ruleNodeId, 0L);
 
         Alarm clearedAlarm = Alarm.builder().status(CLEARED_ACK).build();
 
@@ -203,17 +212,22 @@ public class TbAlarmNodeTest {
 
         node.onMsg(ctx, msg);
 
-        ArgumentCaptor<TbMsg> captor = ArgumentCaptor.forClass(TbMsg.class);
-        verify(ctx).tellNext(captor.capture(), eq("Created"));
-        TbMsg actualMsg = captor.getValue();
+        verify(ctx).tellNext(any(), eq("Created"));
+
+        ArgumentCaptor<String> typeCaptor = ArgumentCaptor.forClass(String.class);
+        ArgumentCaptor<EntityId> originatorCaptor = ArgumentCaptor.forClass(EntityId.class);
+        ArgumentCaptor<TbMsgMetaData> metadataCaptor = ArgumentCaptor.forClass(TbMsgMetaData.class);
+        ArgumentCaptor<String> dataCaptor = ArgumentCaptor.forClass(String.class);
+        verify(ctx).newMsg(typeCaptor.capture(), originatorCaptor.capture(), metadataCaptor.capture(), dataCaptor.capture());
 
-        assertEquals("ALARM", actualMsg.getType());
-        assertEquals(originator, actualMsg.getOriginator());
-        assertEquals("value", actualMsg.getMetaData().getValue("key"));
-        assertEquals(Boolean.TRUE.toString(), actualMsg.getMetaData().getValue(IS_NEW_ALARM));
-        assertNotSame(metaData, actualMsg.getMetaData());
+        assertEquals("ALARM", typeCaptor.getValue());
+        assertEquals(originator, originatorCaptor.getValue());
+        assertEquals("value", metadataCaptor.getValue().getValue("key"));
+        assertEquals(Boolean.TRUE.toString(), metadataCaptor.getValue().getValue(IS_NEW_ALARM));
+        assertNotSame(metaData, metadataCaptor.getValue());
 
-        Alarm actualAlarm = new ObjectMapper().readValue(actualMsg.getData().getBytes(), Alarm.class);
+
+        Alarm actualAlarm = new ObjectMapper().readValue(dataCaptor.getValue().getBytes(), Alarm.class);
         Alarm expectedAlarm = Alarm.builder()
                 .tenantId(tenantId)
                 .originator(originator)
@@ -233,7 +247,7 @@ public class TbAlarmNodeTest {
     public void alarmCanBeUpdated() throws ScriptException, IOException {
         initWithScript();
         metaData.putValue("key", "value");
-        TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", originator, metaData, rawJson);
+        TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", originator, metaData, rawJson, ruleChainId, ruleNodeId, 0L);
 
         long oldEndDate = System.currentTimeMillis();
         Alarm activeAlarm = Alarm.builder().type("SomeType").tenantId(tenantId).originator(originator).status(ACTIVE_UNACK).severity(WARNING).endTs(oldEndDate).build();
@@ -247,17 +261,21 @@ public class TbAlarmNodeTest {
 
         node.onMsg(ctx, msg);
 
-        ArgumentCaptor<TbMsg> captor = ArgumentCaptor.forClass(TbMsg.class);
-        verify(ctx).tellNext(captor.capture(), eq("Updated"));
-        TbMsg actualMsg = captor.getValue();
+        verify(ctx).tellNext(any(), eq("Updated"));
+
+        ArgumentCaptor<String> typeCaptor = ArgumentCaptor.forClass(String.class);
+        ArgumentCaptor<EntityId> originatorCaptor = ArgumentCaptor.forClass(EntityId.class);
+        ArgumentCaptor<TbMsgMetaData> metadataCaptor = ArgumentCaptor.forClass(TbMsgMetaData.class);
+        ArgumentCaptor<String> dataCaptor = ArgumentCaptor.forClass(String.class);
+        verify(ctx).newMsg(typeCaptor.capture(), originatorCaptor.capture(), metadataCaptor.capture(), dataCaptor.capture());
 
-        assertEquals("ALARM", actualMsg.getType());
-        assertEquals(originator, actualMsg.getOriginator());
-        assertEquals("value", actualMsg.getMetaData().getValue("key"));
-        assertEquals(Boolean.TRUE.toString(), actualMsg.getMetaData().getValue(IS_EXISTING_ALARM));
-        assertNotSame(metaData, actualMsg.getMetaData());
+        assertEquals("ALARM", typeCaptor.getValue());
+        assertEquals(originator, originatorCaptor.getValue());
+        assertEquals("value", metadataCaptor.getValue().getValue("key"));
+        assertEquals(Boolean.TRUE.toString(), metadataCaptor.getValue().getValue(IS_EXISTING_ALARM));
+        assertNotSame(metaData, metadataCaptor.getValue());
 
-        Alarm actualAlarm = new ObjectMapper().readValue(actualMsg.getData().getBytes(), Alarm.class);
+        Alarm actualAlarm = new ObjectMapper().readValue(dataCaptor.getValue().getBytes(), Alarm.class);
         assertTrue(activeAlarm.getEndTs() > oldEndDate);
         Alarm expectedAlarm = Alarm.builder()
                 .tenantId(tenantId)
@@ -279,7 +297,7 @@ public class TbAlarmNodeTest {
     public void alarmCanBeCleared() throws ScriptException, IOException {
         initWithScript();
         metaData.putValue("key", "value");
-        TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", originator, metaData, rawJson);
+        TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", originator, metaData, rawJson, ruleChainId, ruleNodeId, 0L);
 
         long oldEndDate = System.currentTimeMillis();
         Alarm activeAlarm = Alarm.builder().type("SomeType").tenantId(tenantId).originator(originator).status(ACTIVE_UNACK).severity(WARNING).endTs(oldEndDate).build();
@@ -293,17 +311,21 @@ public class TbAlarmNodeTest {
 
         node.onMsg(ctx, msg);
 
-        ArgumentCaptor<TbMsg> captor = ArgumentCaptor.forClass(TbMsg.class);
-        verify(ctx).tellNext(captor.capture(), eq("Cleared"));
-        TbMsg actualMsg = captor.getValue();
+        verify(ctx).tellNext(any(), eq("Cleared"));
+
+        ArgumentCaptor<String> typeCaptor = ArgumentCaptor.forClass(String.class);
+        ArgumentCaptor<EntityId> originatorCaptor = ArgumentCaptor.forClass(EntityId.class);
+        ArgumentCaptor<TbMsgMetaData> metadataCaptor = ArgumentCaptor.forClass(TbMsgMetaData.class);
+        ArgumentCaptor<String> dataCaptor = ArgumentCaptor.forClass(String.class);
+        verify(ctx).newMsg(typeCaptor.capture(), originatorCaptor.capture(), metadataCaptor.capture(), dataCaptor.capture());
 
-        assertEquals("ALARM", actualMsg.getType());
-        assertEquals(originator, actualMsg.getOriginator());
-        assertEquals("value", actualMsg.getMetaData().getValue("key"));
-        assertEquals(Boolean.TRUE.toString(), actualMsg.getMetaData().getValue(IS_CLEARED_ALARM));
-        assertNotSame(metaData, actualMsg.getMetaData());
+        assertEquals("ALARM", typeCaptor.getValue());
+        assertEquals(originator, originatorCaptor.getValue());
+        assertEquals("value", metadataCaptor.getValue().getValue("key"));
+        assertEquals(Boolean.TRUE.toString(), metadataCaptor.getValue().getValue(IS_CLEARED_ALARM));
+        assertNotSame(metaData, metadataCaptor.getValue());
 
-        Alarm actualAlarm = new ObjectMapper().readValue(actualMsg.getData().getBytes(), Alarm.class);
+        Alarm actualAlarm = new ObjectMapper().readValue(dataCaptor.getValue().getBytes(), Alarm.class);
         Alarm expectedAlarm = Alarm.builder()
                 .tenantId(tenantId)
                 .originator(originator)
diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/filter/TbJsFilterNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/filter/TbJsFilterNodeTest.java
index 08a22f0..d0b13eb 100644
--- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/filter/TbJsFilterNodeTest.java
+++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/filter/TbJsFilterNodeTest.java
@@ -27,6 +27,8 @@ import org.mockito.Mock;
 import org.mockito.runners.MockitoJUnitRunner;
 import org.mockito.stubbing.Answer;
 import org.thingsboard.rule.engine.api.*;
+import org.thingsboard.server.common.data.id.RuleChainId;
+import org.thingsboard.server.common.data.id.RuleNodeId;
 import org.thingsboard.server.common.msg.TbMsg;
 import org.thingsboard.server.common.msg.TbMsgMetaData;
 
@@ -48,10 +50,13 @@ public class TbJsFilterNodeTest {
     @Mock
     private ScriptEngine scriptEngine;
 
+    private RuleChainId ruleChainId = new RuleChainId(UUIDs.timeBased());
+    private RuleNodeId ruleNodeId = new RuleNodeId(UUIDs.timeBased());
+
     @Test
     public void falseEvaluationDoNotSendMsg() throws TbNodeException, ScriptException {
         initWithScript();
-        TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, new TbMsgMetaData(), "{}");
+        TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, new TbMsgMetaData(), "{}", ruleChainId, ruleNodeId, 0L);
         mockJsExecutor();
         when(scriptEngine.executeFilter(msg)).thenReturn(false);
 
@@ -64,7 +69,7 @@ public class TbJsFilterNodeTest {
     public void exceptionInJsThrowsException() throws TbNodeException, ScriptException {
         initWithScript();
         TbMsgMetaData metaData = new TbMsgMetaData();
-        TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, "{}");
+        TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, "{}", ruleChainId, ruleNodeId, 0L);
         mockJsExecutor();
         when(scriptEngine.executeFilter(msg)).thenThrow(new ScriptException("error"));
 
@@ -77,7 +82,7 @@ public class TbJsFilterNodeTest {
     public void metadataConditionCanBeTrue() throws TbNodeException, ScriptException {
         initWithScript();
         TbMsgMetaData metaData = new TbMsgMetaData();
-        TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, "{}");
+        TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, "{}", ruleChainId, ruleNodeId, 0L);
         mockJsExecutor();
         when(scriptEngine.executeFilter(msg)).thenReturn(true);
 
diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/filter/TbJsSwitchNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/filter/TbJsSwitchNodeTest.java
index a495124..cc4c297 100644
--- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/filter/TbJsSwitchNodeTest.java
+++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/filter/TbJsSwitchNodeTest.java
@@ -28,6 +28,8 @@ import org.mockito.Mock;
 import org.mockito.runners.MockitoJUnitRunner;
 import org.mockito.stubbing.Answer;
 import org.thingsboard.rule.engine.api.*;
+import org.thingsboard.server.common.data.id.RuleChainId;
+import org.thingsboard.server.common.data.id.RuleNodeId;
 import org.thingsboard.server.common.msg.TbMsg;
 import org.thingsboard.server.common.msg.TbMsgMetaData;
 
@@ -51,6 +53,9 @@ public class TbJsSwitchNodeTest {
     @Mock
     private ScriptEngine scriptEngine;
 
+    private RuleChainId ruleChainId = new RuleChainId(UUIDs.timeBased());
+    private RuleNodeId ruleNodeId = new RuleNodeId(UUIDs.timeBased());
+
     @Test
     public void multipleRoutesAreAllowed() throws TbNodeException, ScriptException {
         initWithScript();
@@ -59,7 +64,7 @@ public class TbJsSwitchNodeTest {
         metaData.putValue("humidity", "99");
         String rawJson = "{\"name\": \"Vit\", \"passed\": 5}";
 
-        TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson);
+        TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson, ruleChainId, ruleNodeId, 0L);
         mockJsExecutor();
         when(scriptEngine.executeSwitch(msg)).thenReturn(Sets.newHashSet("one", "three"));
 
diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/mail/TbMsgToEmailNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/mail/TbMsgToEmailNodeTest.java
index 877047c..fe19a6d 100644
--- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/mail/TbMsgToEmailNodeTest.java
+++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/mail/TbMsgToEmailNodeTest.java
@@ -27,6 +27,8 @@ import org.thingsboard.rule.engine.api.TbNodeConfiguration;
 import org.thingsboard.rule.engine.api.TbNodeException;
 import org.thingsboard.server.common.data.id.DeviceId;
 import org.thingsboard.server.common.data.id.EntityId;
+import org.thingsboard.server.common.data.id.RuleChainId;
+import org.thingsboard.server.common.data.id.RuleNodeId;
 import org.thingsboard.server.common.msg.TbMsg;
 import org.thingsboard.server.common.msg.TbMsgMetaData;
 
@@ -34,7 +36,9 @@ import java.io.IOException;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotSame;
+import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 @RunWith(MockitoJUnitRunner.class)
 public class TbMsgToEmailNodeTest {
@@ -48,26 +52,31 @@ public class TbMsgToEmailNodeTest {
     private TbMsgMetaData metaData = new TbMsgMetaData();
     private String rawJson = "{\"name\": \"temp\", \"passed\": 5 , \"complex\": {\"val\":12, \"count\":100}}";
 
+    private RuleChainId ruleChainId = new RuleChainId(UUIDs.timeBased());
+    private RuleNodeId ruleNodeId = new RuleNodeId(UUIDs.timeBased());
+
     @Test
     public void msgCanBeConverted() throws IOException {
         initWithScript();
         metaData.putValue("username", "oreo");
         metaData.putValue("userEmail", "user@email.io");
-        TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", originator, metaData, rawJson);
+        TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", originator, metaData, rawJson, ruleChainId, ruleNodeId, 0L);
 
         emailNode.onMsg(ctx, msg);
 
-        ArgumentCaptor<TbMsg> captor = ArgumentCaptor.forClass(TbMsg.class);
-        verify(ctx).tellNext(captor.capture());
-        TbMsg actualMsg = captor.getValue();
+        ArgumentCaptor<String> typeCaptor = ArgumentCaptor.forClass(String.class);
+        ArgumentCaptor<EntityId> originatorCaptor = ArgumentCaptor.forClass(EntityId.class);
+        ArgumentCaptor<TbMsgMetaData> metadataCaptor = ArgumentCaptor.forClass(TbMsgMetaData.class);
+        ArgumentCaptor<String> dataCaptor = ArgumentCaptor.forClass(String.class);
+        verify(ctx).newMsg(typeCaptor.capture(), originatorCaptor.capture(), metadataCaptor.capture(), dataCaptor.capture());
 
-        assertEquals("SEND_EMAIL", actualMsg.getType());
-        assertEquals(originator, actualMsg.getOriginator());
-        assertEquals("oreo", actualMsg.getMetaData().getValue("username"));
-        assertNotSame(metaData, actualMsg.getMetaData());
 
+        assertEquals("SEND_EMAIL", typeCaptor.getValue());
+        assertEquals(originator, originatorCaptor.getValue());
+        assertEquals("oreo", metadataCaptor.getValue().getValue("username"));
+        assertNotSame(metaData, metadataCaptor.getValue());
 
-        EmailPojo actual = new ObjectMapper().readValue(actualMsg.getData().getBytes(), EmailPojo.class);
+        EmailPojo actual = new ObjectMapper().readValue(dataCaptor.getValue().getBytes(), EmailPojo.class);
 
         EmailPojo expected = new EmailPojo.EmailPojoBuilder()
                 .from("test@mail.org")
diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/TbGetCustomerAttributeNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/TbGetCustomerAttributeNodeTest.java
index e26312b..158b7cd 100644
--- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/TbGetCustomerAttributeNodeTest.java
+++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/metadata/TbGetCustomerAttributeNodeTest.java
@@ -34,6 +34,8 @@ import org.thingsboard.server.common.data.asset.Asset;
 import org.thingsboard.server.common.data.id.AssetId;
 import org.thingsboard.server.common.data.id.CustomerId;
 import org.thingsboard.server.common.data.id.DeviceId;
+import org.thingsboard.server.common.data.id.RuleChainId;
+import org.thingsboard.server.common.data.id.RuleNodeId;
 import org.thingsboard.server.common.data.id.UserId;
 import org.thingsboard.server.common.data.kv.*;
 import org.thingsboard.server.common.msg.TbMsg;
@@ -77,6 +79,9 @@ public class TbGetCustomerAttributeNodeTest {
 
     private TbMsg msg;
 
+    private RuleChainId ruleChainId = new RuleChainId(UUIDs.timeBased());
+    private RuleNodeId ruleNodeId = new RuleNodeId(UUIDs.timeBased());
+
     @Before
     public void init() throws TbNodeException {
         TbGetEntityAttrNodeConfiguration config = new TbGetEntityAttrNodeConfiguration();
@@ -98,7 +103,8 @@ public class TbGetCustomerAttributeNodeTest {
         User user = new User();
         user.setCustomerId(customerId);
 
-        msg = new TbMsg(UUIDs.timeBased(), "USER", userId, new TbMsgMetaData(), "{}");
+
+        msg = new TbMsg(UUIDs.timeBased(), "USER", userId, new TbMsgMetaData(), "{}", ruleChainId, ruleNodeId, 0L);
 
         when(ctx.getUserService()).thenReturn(userService);
         when(userService.findUserByIdAsync(userId)).thenReturn(Futures.immediateFuture(user));
@@ -123,7 +129,7 @@ public class TbGetCustomerAttributeNodeTest {
         User user = new User();
         user.setCustomerId(customerId);
 
-        msg = new TbMsg(UUIDs.timeBased(), "USER", userId, new TbMsgMetaData(), "{}");
+        msg = new TbMsg(UUIDs.timeBased(), "USER", userId, new TbMsgMetaData(), "{}", ruleChainId, ruleNodeId, 0L);
 
         when(ctx.getUserService()).thenReturn(userService);
         when(userService.findUserByIdAsync(userId)).thenReturn(Futures.immediateFuture(user));
@@ -148,7 +154,7 @@ public class TbGetCustomerAttributeNodeTest {
         User user = new User();
         user.setCustomerId(customerId);
 
-        msg = new TbMsg(UUIDs.timeBased(), "USER", userId, new TbMsgMetaData(), "{}");
+        msg = new TbMsg(UUIDs.timeBased(), "USER", userId, new TbMsgMetaData(), "{}", ruleChainId, ruleNodeId, 0L);
 
         when(ctx.getUserService()).thenReturn(userService);
         when(userService.findUserByIdAsync(userId)).thenReturn(Futures.immediateFuture(null));
@@ -166,7 +172,7 @@ public class TbGetCustomerAttributeNodeTest {
     @Test
     public void customerAttributeAddedInMetadata() {
         CustomerId customerId = new CustomerId(UUIDs.timeBased());
-        msg = new TbMsg(UUIDs.timeBased(), "CUSTOMER", customerId, new TbMsgMetaData(), "{}");
+        msg = new TbMsg(UUIDs.timeBased(), "CUSTOMER", customerId, new TbMsgMetaData(), "{}", ruleChainId, ruleNodeId, 0L);
         entityAttributeFetched(customerId);
     }
 
@@ -177,7 +183,7 @@ public class TbGetCustomerAttributeNodeTest {
         User user = new User();
         user.setCustomerId(customerId);
 
-        msg = new TbMsg(UUIDs.timeBased(), "USER", userId, new TbMsgMetaData(), "{}");
+        msg = new TbMsg(UUIDs.timeBased(), "USER", userId, new TbMsgMetaData(), "{}", ruleChainId, ruleNodeId, 0L);
 
         when(ctx.getUserService()).thenReturn(userService);
         when(userService.findUserByIdAsync(userId)).thenReturn(Futures.immediateFuture(user));
@@ -192,7 +198,7 @@ public class TbGetCustomerAttributeNodeTest {
         Asset asset = new Asset();
         asset.setCustomerId(customerId);
 
-        msg = new TbMsg(UUIDs.timeBased(), "USER", assetId, new TbMsgMetaData(), "{}");
+        msg = new TbMsg(UUIDs.timeBased(), "USER", assetId, new TbMsgMetaData(), "{}", ruleChainId, ruleNodeId, 0L);
 
         when(ctx.getAssetService()).thenReturn(assetService);
         when(assetService.findAssetByIdAsync(assetId)).thenReturn(Futures.immediateFuture(asset));
@@ -207,7 +213,7 @@ public class TbGetCustomerAttributeNodeTest {
         Device device = new Device();
         device.setCustomerId(customerId);
 
-        msg = new TbMsg(UUIDs.timeBased(), "USER", deviceId, new TbMsgMetaData(), "{}");
+        msg = new TbMsg(UUIDs.timeBased(), "USER", deviceId, new TbMsgMetaData(), "{}", ruleChainId, ruleNodeId, 0L);
 
         when(ctx.getDeviceService()).thenReturn(deviceService);
         when(deviceService.findDeviceByIdAsync(deviceId)).thenReturn(Futures.immediateFuture(device));
@@ -234,7 +240,7 @@ public class TbGetCustomerAttributeNodeTest {
         Device device = new Device();
         device.setCustomerId(customerId);
 
-        msg = new TbMsg(UUIDs.timeBased(), "USER", deviceId, new TbMsgMetaData(), "{}");
+        msg = new TbMsg(UUIDs.timeBased(), "USER", deviceId, new TbMsgMetaData(), "{}", ruleChainId, ruleNodeId, 0L);
 
         when(ctx.getDeviceService()).thenReturn(deviceService);
         when(deviceService.findDeviceByIdAsync(deviceId)).thenReturn(Futures.immediateFuture(device));
diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbChangeOriginatorNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbChangeOriginatorNodeTest.java
index 1b66433..1f75ed3 100644
--- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbChangeOriginatorNodeTest.java
+++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbChangeOriginatorNodeTest.java
@@ -29,6 +29,9 @@ import org.thingsboard.rule.engine.api.TbNodeException;
 import org.thingsboard.server.common.data.asset.Asset;
 import org.thingsboard.server.common.data.id.AssetId;
 import org.thingsboard.server.common.data.id.CustomerId;
+import org.thingsboard.server.common.data.id.EntityId;
+import org.thingsboard.server.common.data.id.RuleChainId;
+import org.thingsboard.server.common.data.id.RuleNodeId;
 import org.thingsboard.server.common.msg.TbMsg;
 import org.thingsboard.server.common.msg.TbMsgMetaData;
 import org.thingsboard.server.dao.asset.AssetService;
@@ -57,17 +60,23 @@ public class TbChangeOriginatorNodeTest {
         Asset asset = new Asset();
         asset.setCustomerId(customerId);
 
-        TbMsg msg = new TbMsg(UUIDs.timeBased(), "ASSET", assetId, new TbMsgMetaData(), "{}");
+        RuleChainId ruleChainId = new RuleChainId(UUIDs.timeBased());
+        RuleNodeId ruleNodeId = new RuleNodeId(UUIDs.timeBased());
+
+        TbMsg msg = new TbMsg(UUIDs.timeBased(), "ASSET", assetId, new TbMsgMetaData(), "{}", ruleChainId, ruleNodeId, 0L);
 
         when(ctx.getAssetService()).thenReturn(assetService);
         when(assetService.findAssetByIdAsync(assetId)).thenReturn(Futures.immediateFuture(asset));
 
         node.onMsg(ctx, msg);
-        ArgumentCaptor<TbMsg> captor = ArgumentCaptor.forClass(TbMsg.class);
-        verify(ctx).tellNext(captor.capture());
-        TbMsg actualMsg = captor.getValue();
-        assertEquals(customerId, actualMsg.getOriginator());
-        assertEquals(msg.getId(), actualMsg.getId());
+
+        ArgumentCaptor<String> typeCaptor = ArgumentCaptor.forClass(String.class);
+        ArgumentCaptor<EntityId> originatorCaptor = ArgumentCaptor.forClass(EntityId.class);
+        ArgumentCaptor<TbMsgMetaData> metadataCaptor = ArgumentCaptor.forClass(TbMsgMetaData.class);
+        ArgumentCaptor<String> dataCaptor = ArgumentCaptor.forClass(String.class);
+        verify(ctx).newMsg(typeCaptor.capture(), originatorCaptor.capture(), metadataCaptor.capture(), dataCaptor.capture());
+
+        assertEquals(customerId, originatorCaptor.getValue());
     }
 
     @Test
@@ -78,17 +87,22 @@ public class TbChangeOriginatorNodeTest {
         Asset asset = new Asset();
         asset.setCustomerId(customerId);
 
-        TbMsg msg = new TbMsg(UUIDs.timeBased(), "ASSET", assetId, new TbMsgMetaData(), "{}");
+        RuleChainId ruleChainId = new RuleChainId(UUIDs.timeBased());
+        RuleNodeId ruleNodeId = new RuleNodeId(UUIDs.timeBased());
+
+        TbMsg msg = new TbMsg(UUIDs.timeBased(), "ASSET", assetId, new TbMsgMetaData(), "{}", ruleChainId, ruleNodeId, 0L);
 
         when(ctx.getAssetService()).thenReturn(assetService);
         when(assetService.findAssetByIdAsync(assetId)).thenReturn(Futures.immediateFuture(asset));
 
         node.onMsg(ctx, msg);
-        ArgumentCaptor<TbMsg> captor = ArgumentCaptor.forClass(TbMsg.class);
-        verify(ctx).spawn(captor.capture());
-        TbMsg actualMsg = captor.getValue();
-        assertEquals(customerId, actualMsg.getOriginator());
-        assertEquals(msg.getId(), actualMsg.getId());
+        ArgumentCaptor<String> typeCaptor = ArgumentCaptor.forClass(String.class);
+        ArgumentCaptor<EntityId> originatorCaptor = ArgumentCaptor.forClass(EntityId.class);
+        ArgumentCaptor<TbMsgMetaData> metadataCaptor = ArgumentCaptor.forClass(TbMsgMetaData.class);
+        ArgumentCaptor<String> dataCaptor = ArgumentCaptor.forClass(String.class);
+        verify(ctx).newMsg(typeCaptor.capture(), originatorCaptor.capture(), metadataCaptor.capture(), dataCaptor.capture());
+
+        assertEquals(customerId, originatorCaptor.getValue());
     }
 
     @Test
@@ -99,7 +113,10 @@ public class TbChangeOriginatorNodeTest {
         Asset asset = new Asset();
         asset.setCustomerId(customerId);
 
-        TbMsg msg = new TbMsg(UUIDs.timeBased(), "ASSET", assetId, new TbMsgMetaData(), "{}");
+        RuleChainId ruleChainId = new RuleChainId(UUIDs.timeBased());
+        RuleNodeId ruleNodeId = new RuleNodeId(UUIDs.timeBased());
+
+        TbMsg msg = new TbMsg(UUIDs.timeBased(), "ASSET", assetId, new TbMsgMetaData(), "{}", ruleChainId, ruleNodeId, 0L);
 
         when(ctx.getAssetService()).thenReturn(assetService);
         when(assetService.findAssetByIdAsync(assetId)).thenReturn(Futures.immediateFailedFuture(new IllegalStateException("wrong")));
diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbTransformMsgNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbTransformMsgNodeTest.java
index b904d7e..a29bc90 100644
--- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbTransformMsgNodeTest.java
+++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbTransformMsgNodeTest.java
@@ -27,6 +27,8 @@ import org.mockito.Mock;
 import org.mockito.runners.MockitoJUnitRunner;
 import org.mockito.stubbing.Answer;
 import org.thingsboard.rule.engine.api.*;
+import org.thingsboard.server.common.data.id.RuleChainId;
+import org.thingsboard.server.common.data.id.RuleNodeId;
 import org.thingsboard.server.common.msg.TbMsg;
 import org.thingsboard.server.common.msg.TbMsgMetaData;
 
@@ -56,8 +58,10 @@ public class TbTransformMsgNodeTest {
         metaData.putValue("temp", "7");
         String rawJson = "{\"passed\": 5}";
 
-        TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson);
-        TbMsg transformedMsg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, "{new}");
+        RuleChainId ruleChainId = new RuleChainId(UUIDs.timeBased());
+        RuleNodeId ruleNodeId = new RuleNodeId(UUIDs.timeBased());
+        TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson, ruleChainId, ruleNodeId, 0L);
+        TbMsg transformedMsg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, "{new}", ruleChainId, ruleNodeId, 0L);
         mockJsExecutor();
         when(scriptEngine.executeUpdate(msg)).thenReturn(transformedMsg);
 
@@ -77,8 +81,10 @@ public class TbTransformMsgNodeTest {
         metaData.putValue("temp", "7");
         String rawJson = "{\"passed\": 5";
 
-        TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson);
-        TbMsg transformedMsg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, "{new}");
+        RuleChainId ruleChainId = new RuleChainId(UUIDs.timeBased());
+        RuleNodeId ruleNodeId = new RuleNodeId(UUIDs.timeBased());
+        TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson, ruleChainId, ruleNodeId, 0L);
+        TbMsg transformedMsg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, "{new}", ruleChainId, ruleNodeId, 0L);
         mockJsExecutor();
         when(scriptEngine.executeUpdate(msg)).thenReturn(transformedMsg);
 
@@ -97,7 +103,9 @@ public class TbTransformMsgNodeTest {
         metaData.putValue("temp", "7");
         String rawJson = "{\"passed\": 5";
 
-        TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson);
+        RuleChainId ruleChainId = new RuleChainId(UUIDs.timeBased());
+        RuleNodeId ruleNodeId = new RuleNodeId(UUIDs.timeBased());
+        TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson, ruleChainId, ruleNodeId, 0L);
         mockJsExecutor();
         when(scriptEngine.executeUpdate(msg)).thenThrow(new IllegalStateException("error"));