thingsboard-memoizeit

Queue put and ack

4/27/2018 12:49:22 PM

Details

diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
index 16e79b6..40b32d3 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
@@ -53,7 +53,6 @@ import java.util.function.Consumer;
  */
 class DefaultTbContext implements TbContext {
 
-    private static final Function<? super List<Void>, ? extends Void> LIST_VOID_FUNCTION = v -> null;
     private final ActorSystemContext mainCtx;
     private final RuleNodeCtx nodeCtx;
 
@@ -120,7 +119,7 @@ class DefaultTbContext implements TbContext {
 
     @Override
     public TbMsg newMsg(String type, EntityId originator, TbMsgMetaData metaData, String data) {
-        return new TbMsg(UUIDs.timeBased(), type, originator, metaData, data);
+        return new TbMsg(UUIDs.timeBased(), type, originator, metaData, data, nodeCtx.getSelf().getRuleChainId(), nodeCtx.getSelf().getId(), 0L);
     }
 
     @Override
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
index 847c494..fb7bc49 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
@@ -169,12 +169,12 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
 
     void onServiceToRuleEngineMsg(ServiceToRuleEngineMsg envelope) {
         checkActive();
-        putToQueue(envelope.getTbMsg(), msg -> pushMsgToNode(firstNode, msg));
+        putToQueue(enrichWithRuleChainId(envelope.getTbMsg()), msg -> pushMsgToNode(firstNode, msg));
     }
 
     void onDeviceActorToRuleEngineMsg(DeviceActorToRuleEngineMsg envelope) {
         checkActive();
-        putToQueue(envelope.getTbMsg(), msg -> {
+        putToQueue(enrichWithRuleChainId(envelope.getTbMsg()), msg -> {
             pushMsgToNode(firstNode, msg);
             envelope.getCallbackRef().tell(new RuleEngineQueuePutAckMsg(msg.getId()), self);
         });
@@ -185,7 +185,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
         RuleNodeId originator = envelope.getOriginator();
         String targetRelationType = envelope.getRelationType();
         List<RuleNodeRelation> relations = nodeRoutes.get(originator).stream()
-                .filter(r -> targetRelationType == null || targetRelationType.equals(r.getType()))
+                .filter(r -> targetRelationType == null || targetRelationType.equalsIgnoreCase(r.getType()))
                 .collect(Collectors.toList());
 
         TbMsg msg = envelope.getMsg();
@@ -212,7 +212,8 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
                 }
             }
             //TODO: Ideally this should happen in async way when all targets confirm that the copied messages are successfully written to corresponding target queues.
-            queue.ack(msg, msg.getRuleNodeId().getId(), msg.getClusterPartition());
+            EntityId ackId = msg.getRuleNodeId() != null ? msg.getRuleNodeId() : msg.getRuleChainId();
+            queue.ack(msg, ackId.getId(), msg.getClusterPartition());
         }
     }
 
@@ -232,4 +233,9 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
             nodeCtx.getSelfActor().tell(new RuleChainToRuleNodeMsg(new DefaultTbContext(systemContext, nodeCtx), msg), self);
         }
     }
+
+    private TbMsg enrichWithRuleChainId(TbMsg tbMsg) {
+        // We don't put firstNodeId because it may change over time;
+        return new TbMsg(tbMsg.getId(), tbMsg.getType(), tbMsg.getOriginator(), tbMsg.getMetaData(), tbMsg.getData(), entityId, null, 0L);
+    }
 }
diff --git a/application/src/main/java/org/thingsboard/server/actors/shared/ComponentMsgProcessor.java b/application/src/main/java/org/thingsboard/server/actors/shared/ComponentMsgProcessor.java
index a9cf795..4a7e072 100644
--- a/application/src/main/java/org/thingsboard/server/actors/shared/ComponentMsgProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/shared/ComponentMsgProcessor.java
@@ -87,6 +87,7 @@ public abstract class ComponentMsgProcessor<T extends EntityId> extends Abstract
     }
 
     protected void putToQueue(final TbMsg tbMsg, final Consumer<TbMsg> onSuccess) {
+        EntityId entityId = tbMsg.getRuleNodeId() != null ? tbMsg.getRuleNodeId() : tbMsg.getRuleChainId();
         Futures.addCallback(queue.put(tbMsg, entityId.getId(), 0), new FutureCallback<Void>() {
             @Override
             public void onSuccess(@Nullable Void result) {
diff --git a/application/src/test/java/org/thingsboard/server/mqtt/rpc/AbstractMqttServerSideRpcIntegrationTest.java b/application/src/test/java/org/thingsboard/server/mqtt/rpc/AbstractMqttServerSideRpcIntegrationTest.java
index af47764..b732e66 100644
--- a/application/src/test/java/org/thingsboard/server/mqtt/rpc/AbstractMqttServerSideRpcIntegrationTest.java
+++ b/application/src/test/java/org/thingsboard/server/mqtt/rpc/AbstractMqttServerSideRpcIntegrationTest.java
@@ -80,7 +80,6 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractC
         }
     }
 
-    @Ignore
     @Test
     public void testServerMqttOneWayRpc() throws Exception {
         Device device = new Device();
@@ -107,7 +106,6 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractC
         Assert.assertTrue(StringUtils.isEmpty(result));
     }
 
-    @Ignore
     @Test
     public void testServerMqttOneWayRpcDeviceOffline() throws Exception {
         Device device = new Device();
diff --git a/application/src/test/java/org/thingsboard/server/mqtt/rpc/sql/MqttServerSideRpcSqlIntegrationTest.java b/application/src/test/java/org/thingsboard/server/mqtt/rpc/sql/MqttServerSideRpcSqlIntegrationTest.java
index 7c9c058..d2287f0 100644
--- a/application/src/test/java/org/thingsboard/server/mqtt/rpc/sql/MqttServerSideRpcSqlIntegrationTest.java
+++ b/application/src/test/java/org/thingsboard/server/mqtt/rpc/sql/MqttServerSideRpcSqlIntegrationTest.java
@@ -15,7 +15,7 @@
  */
 package org.thingsboard.server.mqtt.rpc.sql;
 
-import org.thingsboard.server.dao.service.DaoNoSqlTest;
+import org.junit.Ignore;
 import org.thingsboard.server.dao.service.DaoSqlTest;
 import org.thingsboard.server.mqtt.rpc.AbstractMqttServerSideRpcIntegrationTest;
 
diff --git a/application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java b/application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java
index f45e303..2d56772 100644
--- a/application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java
+++ b/application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java
@@ -150,7 +150,7 @@ public abstract class AbstractRuleEngineFlowIntegrationTest extends AbstractRule
                 "CUSTOM",
                 device.getId(),
                 new TbMsgMetaData(),
-                "{}");
+                "{}", null, null, 0L);
         actorService.onMsg(new ServiceToRuleEngineMsg(savedTenant.getId(), tbMsg));
 
         Thread.sleep(3000);
diff --git a/application/src/test/java/org/thingsboard/server/rules/lifecycle/AbstractRuleEngineLifecycleIntegrationTest.java b/application/src/test/java/org/thingsboard/server/rules/lifecycle/AbstractRuleEngineLifecycleIntegrationTest.java
index 29e8b73..18583fd 100644
--- a/application/src/test/java/org/thingsboard/server/rules/lifecycle/AbstractRuleEngineLifecycleIntegrationTest.java
+++ b/application/src/test/java/org/thingsboard/server/rules/lifecycle/AbstractRuleEngineLifecycleIntegrationTest.java
@@ -138,7 +138,8 @@ public abstract class AbstractRuleEngineLifecycleIntegrationTest extends Abstrac
                 "CUSTOM",
                 device.getId(),
                 new TbMsgMetaData(),
-                "{}");
+                "{}",
+                null, null, 0L);
         actorService.onMsg(new ServiceToRuleEngineMsg(savedTenant.getId(), tbMsg));
 
         Thread.sleep(3000);
diff --git a/application/src/test/java/org/thingsboard/server/service/script/NashornJsEngineTest.java b/application/src/test/java/org/thingsboard/server/service/script/NashornJsEngineTest.java
index e6a48e2..1981287 100644
--- a/application/src/test/java/org/thingsboard/server/service/script/NashornJsEngineTest.java
+++ b/application/src/test/java/org/thingsboard/server/service/script/NashornJsEngineTest.java
@@ -42,7 +42,7 @@ public class NashornJsEngineTest {
         metaData.putValue("humidity", "99");
         String rawJson = "{\"name\": \"Vit\", \"passed\": 5, \"bigObj\": {\"prop\":42}}";
 
-        TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson);
+        TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson, null, null, 0L);
 
         TbMsg actual = scriptEngine.executeUpdate(msg);
         assertEquals("70", actual.getMetaData().getValue("temp"));
@@ -57,7 +57,7 @@ public class NashornJsEngineTest {
         metaData.putValue("humidity", "99");
         String rawJson = "{\"name\": \"Vit\", \"passed\": 5, \"bigObj\": {\"prop\":42}}";
 
-        TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson);
+        TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson, null, null, 0L);
 
         TbMsg actual = scriptEngine.executeUpdate(msg);
         assertEquals("94", actual.getMetaData().getValue("newAttr"));
@@ -72,7 +72,7 @@ public class NashornJsEngineTest {
         metaData.putValue("humidity", "99");
         String rawJson = "{\"name\":\"Vit\",\"passed\": 5,\"bigObj\":{\"prop\":42}}";
 
-        TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson);
+        TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson, null, null, 0L);
 
         TbMsg actual = scriptEngine.executeUpdate(msg);
 
@@ -89,7 +89,7 @@ public class NashornJsEngineTest {
         metaData.putValue("humidity", "99");
         String rawJson = "{\"name\": \"Vit\", \"passed\": 5, \"bigObj\": {\"prop\":42}}";
 
-        TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson);
+        TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson, null, null, 0L);
         assertFalse(scriptEngine.executeFilter(msg));
     }
 
@@ -102,7 +102,7 @@ public class NashornJsEngineTest {
         metaData.putValue("humidity", "99");
         String rawJson = "{\"name\": \"Vit\", \"passed\": 5, \"bigObj\": {\"prop\":42}}";
 
-        TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson);
+        TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson, null, null, 0L);
         assertTrue(scriptEngine.executeFilter(msg));
     }
 
@@ -122,7 +122,7 @@ public class NashornJsEngineTest {
         metaData.putValue("humidity", "99");
         String rawJson = "{\"name\": \"Vit\", \"passed\": 5, \"bigObj\": {\"prop\":42}}";
 
-        TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson);
+        TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson, null, null, 0L);
         Set<String> actual = scriptEngine.executeSwitch(msg);
         assertEquals(Sets.newHashSet("one"), actual);
     }
@@ -143,7 +143,7 @@ public class NashornJsEngineTest {
         metaData.putValue("humidity", "99");
         String rawJson = "{\"name\": \"Vit\", \"passed\": 5, \"bigObj\": {\"prop\":42}}";
 
-        TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson);
+        TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson, null, null, 0L);
         Set<String> actual = scriptEngine.executeSwitch(msg);
         assertEquals(Sets.newHashSet("one", "three"), actual);
     }
diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/queue/DummySqlMsgQueue.java b/dao/src/main/java/org/thingsboard/server/dao/sql/queue/DummySqlMsgQueue.java
new file mode 100644
index 0000000..0281d40
--- /dev/null
+++ b/dao/src/main/java/org/thingsboard/server/dao/sql/queue/DummySqlMsgQueue.java
@@ -0,0 +1,50 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.dao.sql.queue;
+
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+import org.thingsboard.server.common.msg.TbMsg;
+import org.thingsboard.server.dao.queue.MsgQueue;
+import org.thingsboard.server.dao.util.SqlDao;
+
+import java.util.Collections;
+import java.util.UUID;
+
+/**
+ * Created by ashvayka on 27.04.18.
+ */
+@Component
+@Slf4j
+@SqlDao
+public class DummySqlMsgQueue implements MsgQueue {
+    @Override
+    public ListenableFuture<Void> put(TbMsg msg, UUID nodeId, long clusterPartition) {
+        return Futures.immediateFuture(null);
+    }
+
+    @Override
+    public ListenableFuture<Void> ack(TbMsg msg, UUID nodeId, long clusterPartition) {
+        return Futures.immediateFuture(null);
+    }
+
+    @Override
+    public Iterable<TbMsg> findUnprocessed(UUID nodeId, long clusterPartition) {
+        return Collections.emptyList();
+    }
+}
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsSwitchNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsSwitchNode.java
index 3c6704b..4ee8bd5 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsSwitchNode.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsSwitchNode.java
@@ -61,7 +61,7 @@ public class TbJsSwitchNode implements TbNode {
         ctx.tellNext(msg, nextRelations);
     }
 
-   @Override
+    @Override
     public void destroy() {
         if (jsEngine != null) {
             jsEngine.destroy();
diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbAlarmNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbAlarmNodeTest.java
index 2871303..8213195 100644
--- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbAlarmNodeTest.java
+++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbAlarmNodeTest.java
@@ -118,17 +118,21 @@ public class TbAlarmNodeTest {
 
         node.onMsg(ctx, msg);
 
-        ArgumentCaptor<TbMsg> captor = ArgumentCaptor.forClass(TbMsg.class);
-        verify(ctx).tellNext(captor.capture(), eq("Created"));
-        TbMsg actualMsg = captor.getValue();
+        verify(ctx).tellNext(any(), eq("Created"));
 
-        assertEquals("ALARM", actualMsg.getType());
-        assertEquals(originator, actualMsg.getOriginator());
-        assertEquals("value", actualMsg.getMetaData().getValue("key"));
-        assertEquals(Boolean.TRUE.toString(), actualMsg.getMetaData().getValue(IS_NEW_ALARM));
-        assertNotSame(metaData, actualMsg.getMetaData());
+        ArgumentCaptor<String> typeCaptor = ArgumentCaptor.forClass(String.class);
+        ArgumentCaptor<EntityId> originatorCaptor = ArgumentCaptor.forClass(EntityId.class);
+        ArgumentCaptor<TbMsgMetaData> metadataCaptor = ArgumentCaptor.forClass(TbMsgMetaData.class);
+        ArgumentCaptor<String> dataCaptor = ArgumentCaptor.forClass(String.class);
+        verify(ctx).newMsg(typeCaptor.capture(), originatorCaptor.capture(), metadataCaptor.capture(), dataCaptor.capture());
 
-        Alarm actualAlarm = new ObjectMapper().readValue(actualMsg.getData().getBytes(), Alarm.class);
+        assertEquals("ALARM", typeCaptor.getValue());
+        assertEquals(originator, originatorCaptor.getValue());
+        assertEquals("value", metadataCaptor.getValue().getValue("key"));
+        assertEquals(Boolean.TRUE.toString(), metadataCaptor.getValue().getValue(IS_NEW_ALARM));
+        assertNotSame(metaData, metadataCaptor.getValue());
+
+        Alarm actualAlarm = new ObjectMapper().readValue(dataCaptor.getValue().getBytes(), Alarm.class);
         Alarm expectedAlarm = Alarm.builder()
                 .tenantId(tenantId)
                 .originator(originator)
@@ -208,17 +212,22 @@ public class TbAlarmNodeTest {
 
         node.onMsg(ctx, msg);
 
-        ArgumentCaptor<TbMsg> captor = ArgumentCaptor.forClass(TbMsg.class);
-        verify(ctx).tellNext(captor.capture(), eq("Created"));
-        TbMsg actualMsg = captor.getValue();
+        verify(ctx).tellNext(any(), eq("Created"));
+
+        ArgumentCaptor<String> typeCaptor = ArgumentCaptor.forClass(String.class);
+        ArgumentCaptor<EntityId> originatorCaptor = ArgumentCaptor.forClass(EntityId.class);
+        ArgumentCaptor<TbMsgMetaData> metadataCaptor = ArgumentCaptor.forClass(TbMsgMetaData.class);
+        ArgumentCaptor<String> dataCaptor = ArgumentCaptor.forClass(String.class);
+        verify(ctx).newMsg(typeCaptor.capture(), originatorCaptor.capture(), metadataCaptor.capture(), dataCaptor.capture());
+
+        assertEquals("ALARM", typeCaptor.getValue());
+        assertEquals(originator, originatorCaptor.getValue());
+        assertEquals("value", metadataCaptor.getValue().getValue("key"));
+        assertEquals(Boolean.TRUE.toString(), metadataCaptor.getValue().getValue(IS_NEW_ALARM));
+        assertNotSame(metaData, metadataCaptor.getValue());
 
-        assertEquals("ALARM", actualMsg.getType());
-        assertEquals(originator, actualMsg.getOriginator());
-        assertEquals("value", actualMsg.getMetaData().getValue("key"));
-        assertEquals(Boolean.TRUE.toString(), actualMsg.getMetaData().getValue(IS_NEW_ALARM));
-        assertNotSame(metaData, actualMsg.getMetaData());
 
-        Alarm actualAlarm = new ObjectMapper().readValue(actualMsg.getData().getBytes(), Alarm.class);
+        Alarm actualAlarm = new ObjectMapper().readValue(dataCaptor.getValue().getBytes(), Alarm.class);
         Alarm expectedAlarm = Alarm.builder()
                 .tenantId(tenantId)
                 .originator(originator)
@@ -252,17 +261,21 @@ public class TbAlarmNodeTest {
 
         node.onMsg(ctx, msg);
 
-        ArgumentCaptor<TbMsg> captor = ArgumentCaptor.forClass(TbMsg.class);
-        verify(ctx).tellNext(captor.capture(), eq("Updated"));
-        TbMsg actualMsg = captor.getValue();
+        verify(ctx).tellNext(any(), eq("Updated"));
 
-        assertEquals("ALARM", actualMsg.getType());
-        assertEquals(originator, actualMsg.getOriginator());
-        assertEquals("value", actualMsg.getMetaData().getValue("key"));
-        assertEquals(Boolean.TRUE.toString(), actualMsg.getMetaData().getValue(IS_EXISTING_ALARM));
-        assertNotSame(metaData, actualMsg.getMetaData());
+        ArgumentCaptor<String> typeCaptor = ArgumentCaptor.forClass(String.class);
+        ArgumentCaptor<EntityId> originatorCaptor = ArgumentCaptor.forClass(EntityId.class);
+        ArgumentCaptor<TbMsgMetaData> metadataCaptor = ArgumentCaptor.forClass(TbMsgMetaData.class);
+        ArgumentCaptor<String> dataCaptor = ArgumentCaptor.forClass(String.class);
+        verify(ctx).newMsg(typeCaptor.capture(), originatorCaptor.capture(), metadataCaptor.capture(), dataCaptor.capture());
 
-        Alarm actualAlarm = new ObjectMapper().readValue(actualMsg.getData().getBytes(), Alarm.class);
+        assertEquals("ALARM", typeCaptor.getValue());
+        assertEquals(originator, originatorCaptor.getValue());
+        assertEquals("value", metadataCaptor.getValue().getValue("key"));
+        assertEquals(Boolean.TRUE.toString(), metadataCaptor.getValue().getValue(IS_EXISTING_ALARM));
+        assertNotSame(metaData, metadataCaptor.getValue());
+
+        Alarm actualAlarm = new ObjectMapper().readValue(dataCaptor.getValue().getBytes(), Alarm.class);
         assertTrue(activeAlarm.getEndTs() > oldEndDate);
         Alarm expectedAlarm = Alarm.builder()
                 .tenantId(tenantId)
@@ -298,17 +311,21 @@ public class TbAlarmNodeTest {
 
         node.onMsg(ctx, msg);
 
-        ArgumentCaptor<TbMsg> captor = ArgumentCaptor.forClass(TbMsg.class);
-        verify(ctx).tellNext(captor.capture(), eq("Cleared"));
-        TbMsg actualMsg = captor.getValue();
+        verify(ctx).tellNext(any(), eq("Cleared"));
+
+        ArgumentCaptor<String> typeCaptor = ArgumentCaptor.forClass(String.class);
+        ArgumentCaptor<EntityId> originatorCaptor = ArgumentCaptor.forClass(EntityId.class);
+        ArgumentCaptor<TbMsgMetaData> metadataCaptor = ArgumentCaptor.forClass(TbMsgMetaData.class);
+        ArgumentCaptor<String> dataCaptor = ArgumentCaptor.forClass(String.class);
+        verify(ctx).newMsg(typeCaptor.capture(), originatorCaptor.capture(), metadataCaptor.capture(), dataCaptor.capture());
 
-        assertEquals("ALARM", actualMsg.getType());
-        assertEquals(originator, actualMsg.getOriginator());
-        assertEquals("value", actualMsg.getMetaData().getValue("key"));
-        assertEquals(Boolean.TRUE.toString(), actualMsg.getMetaData().getValue(IS_CLEARED_ALARM));
-        assertNotSame(metaData, actualMsg.getMetaData());
+        assertEquals("ALARM", typeCaptor.getValue());
+        assertEquals(originator, originatorCaptor.getValue());
+        assertEquals("value", metadataCaptor.getValue().getValue("key"));
+        assertEquals(Boolean.TRUE.toString(), metadataCaptor.getValue().getValue(IS_CLEARED_ALARM));
+        assertNotSame(metaData, metadataCaptor.getValue());
 
-        Alarm actualAlarm = new ObjectMapper().readValue(actualMsg.getData().getBytes(), Alarm.class);
+        Alarm actualAlarm = new ObjectMapper().readValue(dataCaptor.getValue().getBytes(), Alarm.class);
         Alarm expectedAlarm = Alarm.builder()
                 .tenantId(tenantId)
                 .originator(originator)
diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/mail/TbMsgToEmailNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/mail/TbMsgToEmailNodeTest.java
index 2e5d947..fe19a6d 100644
--- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/mail/TbMsgToEmailNodeTest.java
+++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/mail/TbMsgToEmailNodeTest.java
@@ -36,7 +36,9 @@ import java.io.IOException;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotSame;
+import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 @RunWith(MockitoJUnitRunner.class)
 public class TbMsgToEmailNodeTest {
@@ -62,17 +64,19 @@ public class TbMsgToEmailNodeTest {
 
         emailNode.onMsg(ctx, msg);
 
-        ArgumentCaptor<TbMsg> captor = ArgumentCaptor.forClass(TbMsg.class);
-        verify(ctx).tellNext(captor.capture());
-        TbMsg actualMsg = captor.getValue();
+        ArgumentCaptor<String> typeCaptor = ArgumentCaptor.forClass(String.class);
+        ArgumentCaptor<EntityId> originatorCaptor = ArgumentCaptor.forClass(EntityId.class);
+        ArgumentCaptor<TbMsgMetaData> metadataCaptor = ArgumentCaptor.forClass(TbMsgMetaData.class);
+        ArgumentCaptor<String> dataCaptor = ArgumentCaptor.forClass(String.class);
+        verify(ctx).newMsg(typeCaptor.capture(), originatorCaptor.capture(), metadataCaptor.capture(), dataCaptor.capture());
 
-        assertEquals("SEND_EMAIL", actualMsg.getType());
-        assertEquals(originator, actualMsg.getOriginator());
-        assertEquals("oreo", actualMsg.getMetaData().getValue("username"));
-        assertNotSame(metaData, actualMsg.getMetaData());
 
+        assertEquals("SEND_EMAIL", typeCaptor.getValue());
+        assertEquals(originator, originatorCaptor.getValue());
+        assertEquals("oreo", metadataCaptor.getValue().getValue("username"));
+        assertNotSame(metaData, metadataCaptor.getValue());
 
-        EmailPojo actual = new ObjectMapper().readValue(actualMsg.getData().getBytes(), EmailPojo.class);
+        EmailPojo actual = new ObjectMapper().readValue(dataCaptor.getValue().getBytes(), EmailPojo.class);
 
         EmailPojo expected = new EmailPojo.EmailPojoBuilder()
                 .from("test@mail.org")
diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbChangeOriginatorNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbChangeOriginatorNodeTest.java
index 7f864ba..1f75ed3 100644
--- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbChangeOriginatorNodeTest.java
+++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbChangeOriginatorNodeTest.java
@@ -29,6 +29,7 @@ import org.thingsboard.rule.engine.api.TbNodeException;
 import org.thingsboard.server.common.data.asset.Asset;
 import org.thingsboard.server.common.data.id.AssetId;
 import org.thingsboard.server.common.data.id.CustomerId;
+import org.thingsboard.server.common.data.id.EntityId;
 import org.thingsboard.server.common.data.id.RuleChainId;
 import org.thingsboard.server.common.data.id.RuleNodeId;
 import org.thingsboard.server.common.msg.TbMsg;
@@ -68,11 +69,14 @@ public class TbChangeOriginatorNodeTest {
         when(assetService.findAssetByIdAsync(assetId)).thenReturn(Futures.immediateFuture(asset));
 
         node.onMsg(ctx, msg);
-        ArgumentCaptor<TbMsg> captor = ArgumentCaptor.forClass(TbMsg.class);
-        verify(ctx).tellNext(captor.capture());
-        TbMsg actualMsg = captor.getValue();
-        assertEquals(customerId, actualMsg.getOriginator());
-        assertEquals(msg.getId(), actualMsg.getId());
+
+        ArgumentCaptor<String> typeCaptor = ArgumentCaptor.forClass(String.class);
+        ArgumentCaptor<EntityId> originatorCaptor = ArgumentCaptor.forClass(EntityId.class);
+        ArgumentCaptor<TbMsgMetaData> metadataCaptor = ArgumentCaptor.forClass(TbMsgMetaData.class);
+        ArgumentCaptor<String> dataCaptor = ArgumentCaptor.forClass(String.class);
+        verify(ctx).newMsg(typeCaptor.capture(), originatorCaptor.capture(), metadataCaptor.capture(), dataCaptor.capture());
+
+        assertEquals(customerId, originatorCaptor.getValue());
     }
 
     @Test
@@ -92,11 +96,13 @@ public class TbChangeOriginatorNodeTest {
         when(assetService.findAssetByIdAsync(assetId)).thenReturn(Futures.immediateFuture(asset));
 
         node.onMsg(ctx, msg);
-        ArgumentCaptor<TbMsg> captor = ArgumentCaptor.forClass(TbMsg.class);
-        verify(ctx).spawn(captor.capture());
-        TbMsg actualMsg = captor.getValue();
-        assertEquals(customerId, actualMsg.getOriginator());
-        assertEquals(msg.getId(), actualMsg.getId());
+        ArgumentCaptor<String> typeCaptor = ArgumentCaptor.forClass(String.class);
+        ArgumentCaptor<EntityId> originatorCaptor = ArgumentCaptor.forClass(EntityId.class);
+        ArgumentCaptor<TbMsgMetaData> metadataCaptor = ArgumentCaptor.forClass(TbMsgMetaData.class);
+        ArgumentCaptor<String> dataCaptor = ArgumentCaptor.forClass(String.class);
+        verify(ctx).newMsg(typeCaptor.capture(), originatorCaptor.capture(), metadataCaptor.capture(), dataCaptor.capture());
+
+        assertEquals(customerId, originatorCaptor.getValue());
     }
 
     @Test