thingsboard-memoizeit
Changes
application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java 14(+10 -4)
application/src/test/java/org/thingsboard/server/mqtt/rpc/AbstractMqttServerSideRpcIntegrationTest.java 2(+0 -2)
application/src/test/java/org/thingsboard/server/mqtt/rpc/sql/MqttServerSideRpcSqlIntegrationTest.java 2(+1 -1)
application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java 2(+1 -1)
application/src/test/java/org/thingsboard/server/rules/lifecycle/AbstractRuleEngineLifecycleIntegrationTest.java 3(+2 -1)
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsSwitchNode.java 2(+1 -1)
rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbAlarmNodeTest.java 89(+53 -36)
Details
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
index 16e79b6..40b32d3 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
@@ -53,7 +53,6 @@ import java.util.function.Consumer;
*/
class DefaultTbContext implements TbContext {
- private static final Function<? super List<Void>, ? extends Void> LIST_VOID_FUNCTION = v -> null;
private final ActorSystemContext mainCtx;
private final RuleNodeCtx nodeCtx;
@@ -120,7 +119,7 @@ class DefaultTbContext implements TbContext {
@Override
public TbMsg newMsg(String type, EntityId originator, TbMsgMetaData metaData, String data) {
- return new TbMsg(UUIDs.timeBased(), type, originator, metaData, data);
+ return new TbMsg(UUIDs.timeBased(), type, originator, metaData, data, nodeCtx.getSelf().getRuleChainId(), nodeCtx.getSelf().getId(), 0L);
}
@Override
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
index 847c494..fb7bc49 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
@@ -169,12 +169,12 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
void onServiceToRuleEngineMsg(ServiceToRuleEngineMsg envelope) {
checkActive();
- putToQueue(envelope.getTbMsg(), msg -> pushMsgToNode(firstNode, msg));
+ putToQueue(enrichWithRuleChainId(envelope.getTbMsg()), msg -> pushMsgToNode(firstNode, msg));
}
void onDeviceActorToRuleEngineMsg(DeviceActorToRuleEngineMsg envelope) {
checkActive();
- putToQueue(envelope.getTbMsg(), msg -> {
+ putToQueue(enrichWithRuleChainId(envelope.getTbMsg()), msg -> {
pushMsgToNode(firstNode, msg);
envelope.getCallbackRef().tell(new RuleEngineQueuePutAckMsg(msg.getId()), self);
});
@@ -185,7 +185,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
RuleNodeId originator = envelope.getOriginator();
String targetRelationType = envelope.getRelationType();
List<RuleNodeRelation> relations = nodeRoutes.get(originator).stream()
- .filter(r -> targetRelationType == null || targetRelationType.equals(r.getType()))
+ .filter(r -> targetRelationType == null || targetRelationType.equalsIgnoreCase(r.getType()))
.collect(Collectors.toList());
TbMsg msg = envelope.getMsg();
@@ -212,7 +212,8 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
}
}
//TODO: Ideally this should happen in async way when all targets confirm that the copied messages are successfully written to corresponding target queues.
- queue.ack(msg, msg.getRuleNodeId().getId(), msg.getClusterPartition());
+ EntityId ackId = msg.getRuleNodeId() != null ? msg.getRuleNodeId() : msg.getRuleChainId();
+ queue.ack(msg, ackId.getId(), msg.getClusterPartition());
}
}
@@ -232,4 +233,9 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
nodeCtx.getSelfActor().tell(new RuleChainToRuleNodeMsg(new DefaultTbContext(systemContext, nodeCtx), msg), self);
}
}
+
+ private TbMsg enrichWithRuleChainId(TbMsg tbMsg) {
+ // We don't put firstNodeId because it may change over time;
+ return new TbMsg(tbMsg.getId(), tbMsg.getType(), tbMsg.getOriginator(), tbMsg.getMetaData(), tbMsg.getData(), entityId, null, 0L);
+ }
}
diff --git a/application/src/main/java/org/thingsboard/server/actors/shared/ComponentMsgProcessor.java b/application/src/main/java/org/thingsboard/server/actors/shared/ComponentMsgProcessor.java
index a9cf795..4a7e072 100644
--- a/application/src/main/java/org/thingsboard/server/actors/shared/ComponentMsgProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/shared/ComponentMsgProcessor.java
@@ -87,6 +87,7 @@ public abstract class ComponentMsgProcessor<T extends EntityId> extends Abstract
}
protected void putToQueue(final TbMsg tbMsg, final Consumer<TbMsg> onSuccess) {
+ EntityId entityId = tbMsg.getRuleNodeId() != null ? tbMsg.getRuleNodeId() : tbMsg.getRuleChainId();
Futures.addCallback(queue.put(tbMsg, entityId.getId(), 0), new FutureCallback<Void>() {
@Override
public void onSuccess(@Nullable Void result) {
diff --git a/application/src/test/java/org/thingsboard/server/mqtt/rpc/AbstractMqttServerSideRpcIntegrationTest.java b/application/src/test/java/org/thingsboard/server/mqtt/rpc/AbstractMqttServerSideRpcIntegrationTest.java
index af47764..b732e66 100644
--- a/application/src/test/java/org/thingsboard/server/mqtt/rpc/AbstractMqttServerSideRpcIntegrationTest.java
+++ b/application/src/test/java/org/thingsboard/server/mqtt/rpc/AbstractMqttServerSideRpcIntegrationTest.java
@@ -80,7 +80,6 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractC
}
}
- @Ignore
@Test
public void testServerMqttOneWayRpc() throws Exception {
Device device = new Device();
@@ -107,7 +106,6 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractC
Assert.assertTrue(StringUtils.isEmpty(result));
}
- @Ignore
@Test
public void testServerMqttOneWayRpcDeviceOffline() throws Exception {
Device device = new Device();
diff --git a/application/src/test/java/org/thingsboard/server/mqtt/rpc/sql/MqttServerSideRpcSqlIntegrationTest.java b/application/src/test/java/org/thingsboard/server/mqtt/rpc/sql/MqttServerSideRpcSqlIntegrationTest.java
index 7c9c058..d2287f0 100644
--- a/application/src/test/java/org/thingsboard/server/mqtt/rpc/sql/MqttServerSideRpcSqlIntegrationTest.java
+++ b/application/src/test/java/org/thingsboard/server/mqtt/rpc/sql/MqttServerSideRpcSqlIntegrationTest.java
@@ -15,7 +15,7 @@
*/
package org.thingsboard.server.mqtt.rpc.sql;
-import org.thingsboard.server.dao.service.DaoNoSqlTest;
+import org.junit.Ignore;
import org.thingsboard.server.dao.service.DaoSqlTest;
import org.thingsboard.server.mqtt.rpc.AbstractMqttServerSideRpcIntegrationTest;
diff --git a/application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java b/application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java
index f45e303..2d56772 100644
--- a/application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java
+++ b/application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java
@@ -150,7 +150,7 @@ public abstract class AbstractRuleEngineFlowIntegrationTest extends AbstractRule
"CUSTOM",
device.getId(),
new TbMsgMetaData(),
- "{}");
+ "{}", null, null, 0L);
actorService.onMsg(new ServiceToRuleEngineMsg(savedTenant.getId(), tbMsg));
Thread.sleep(3000);
diff --git a/application/src/test/java/org/thingsboard/server/rules/lifecycle/AbstractRuleEngineLifecycleIntegrationTest.java b/application/src/test/java/org/thingsboard/server/rules/lifecycle/AbstractRuleEngineLifecycleIntegrationTest.java
index 29e8b73..18583fd 100644
--- a/application/src/test/java/org/thingsboard/server/rules/lifecycle/AbstractRuleEngineLifecycleIntegrationTest.java
+++ b/application/src/test/java/org/thingsboard/server/rules/lifecycle/AbstractRuleEngineLifecycleIntegrationTest.java
@@ -138,7 +138,8 @@ public abstract class AbstractRuleEngineLifecycleIntegrationTest extends Abstrac
"CUSTOM",
device.getId(),
new TbMsgMetaData(),
- "{}");
+ "{}",
+ null, null, 0L);
actorService.onMsg(new ServiceToRuleEngineMsg(savedTenant.getId(), tbMsg));
Thread.sleep(3000);
diff --git a/application/src/test/java/org/thingsboard/server/service/script/NashornJsEngineTest.java b/application/src/test/java/org/thingsboard/server/service/script/NashornJsEngineTest.java
index e6a48e2..1981287 100644
--- a/application/src/test/java/org/thingsboard/server/service/script/NashornJsEngineTest.java
+++ b/application/src/test/java/org/thingsboard/server/service/script/NashornJsEngineTest.java
@@ -42,7 +42,7 @@ public class NashornJsEngineTest {
metaData.putValue("humidity", "99");
String rawJson = "{\"name\": \"Vit\", \"passed\": 5, \"bigObj\": {\"prop\":42}}";
- TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson);
+ TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson, null, null, 0L);
TbMsg actual = scriptEngine.executeUpdate(msg);
assertEquals("70", actual.getMetaData().getValue("temp"));
@@ -57,7 +57,7 @@ public class NashornJsEngineTest {
metaData.putValue("humidity", "99");
String rawJson = "{\"name\": \"Vit\", \"passed\": 5, \"bigObj\": {\"prop\":42}}";
- TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson);
+ TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson, null, null, 0L);
TbMsg actual = scriptEngine.executeUpdate(msg);
assertEquals("94", actual.getMetaData().getValue("newAttr"));
@@ -72,7 +72,7 @@ public class NashornJsEngineTest {
metaData.putValue("humidity", "99");
String rawJson = "{\"name\":\"Vit\",\"passed\": 5,\"bigObj\":{\"prop\":42}}";
- TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson);
+ TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson, null, null, 0L);
TbMsg actual = scriptEngine.executeUpdate(msg);
@@ -89,7 +89,7 @@ public class NashornJsEngineTest {
metaData.putValue("humidity", "99");
String rawJson = "{\"name\": \"Vit\", \"passed\": 5, \"bigObj\": {\"prop\":42}}";
- TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson);
+ TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson, null, null, 0L);
assertFalse(scriptEngine.executeFilter(msg));
}
@@ -102,7 +102,7 @@ public class NashornJsEngineTest {
metaData.putValue("humidity", "99");
String rawJson = "{\"name\": \"Vit\", \"passed\": 5, \"bigObj\": {\"prop\":42}}";
- TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson);
+ TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson, null, null, 0L);
assertTrue(scriptEngine.executeFilter(msg));
}
@@ -122,7 +122,7 @@ public class NashornJsEngineTest {
metaData.putValue("humidity", "99");
String rawJson = "{\"name\": \"Vit\", \"passed\": 5, \"bigObj\": {\"prop\":42}}";
- TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson);
+ TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson, null, null, 0L);
Set<String> actual = scriptEngine.executeSwitch(msg);
assertEquals(Sets.newHashSet("one"), actual);
}
@@ -143,7 +143,7 @@ public class NashornJsEngineTest {
metaData.putValue("humidity", "99");
String rawJson = "{\"name\": \"Vit\", \"passed\": 5, \"bigObj\": {\"prop\":42}}";
- TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson);
+ TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson, null, null, 0L);
Set<String> actual = scriptEngine.executeSwitch(msg);
assertEquals(Sets.newHashSet("one", "three"), actual);
}
diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/queue/DummySqlMsgQueue.java b/dao/src/main/java/org/thingsboard/server/dao/sql/queue/DummySqlMsgQueue.java
new file mode 100644
index 0000000..0281d40
--- /dev/null
+++ b/dao/src/main/java/org/thingsboard/server/dao/sql/queue/DummySqlMsgQueue.java
@@ -0,0 +1,50 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.dao.sql.queue;
+
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+import org.thingsboard.server.common.msg.TbMsg;
+import org.thingsboard.server.dao.queue.MsgQueue;
+import org.thingsboard.server.dao.util.SqlDao;
+
+import java.util.Collections;
+import java.util.UUID;
+
+/**
+ * Created by ashvayka on 27.04.18.
+ */
+@Component
+@Slf4j
+@SqlDao
+public class DummySqlMsgQueue implements MsgQueue {
+ @Override
+ public ListenableFuture<Void> put(TbMsg msg, UUID nodeId, long clusterPartition) {
+ return Futures.immediateFuture(null);
+ }
+
+ @Override
+ public ListenableFuture<Void> ack(TbMsg msg, UUID nodeId, long clusterPartition) {
+ return Futures.immediateFuture(null);
+ }
+
+ @Override
+ public Iterable<TbMsg> findUnprocessed(UUID nodeId, long clusterPartition) {
+ return Collections.emptyList();
+ }
+}
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsSwitchNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsSwitchNode.java
index 3c6704b..4ee8bd5 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsSwitchNode.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsSwitchNode.java
@@ -61,7 +61,7 @@ public class TbJsSwitchNode implements TbNode {
ctx.tellNext(msg, nextRelations);
}
- @Override
+ @Override
public void destroy() {
if (jsEngine != null) {
jsEngine.destroy();
diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbAlarmNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbAlarmNodeTest.java
index 2871303..8213195 100644
--- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbAlarmNodeTest.java
+++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbAlarmNodeTest.java
@@ -118,17 +118,21 @@ public class TbAlarmNodeTest {
node.onMsg(ctx, msg);
- ArgumentCaptor<TbMsg> captor = ArgumentCaptor.forClass(TbMsg.class);
- verify(ctx).tellNext(captor.capture(), eq("Created"));
- TbMsg actualMsg = captor.getValue();
+ verify(ctx).tellNext(any(), eq("Created"));
- assertEquals("ALARM", actualMsg.getType());
- assertEquals(originator, actualMsg.getOriginator());
- assertEquals("value", actualMsg.getMetaData().getValue("key"));
- assertEquals(Boolean.TRUE.toString(), actualMsg.getMetaData().getValue(IS_NEW_ALARM));
- assertNotSame(metaData, actualMsg.getMetaData());
+ ArgumentCaptor<String> typeCaptor = ArgumentCaptor.forClass(String.class);
+ ArgumentCaptor<EntityId> originatorCaptor = ArgumentCaptor.forClass(EntityId.class);
+ ArgumentCaptor<TbMsgMetaData> metadataCaptor = ArgumentCaptor.forClass(TbMsgMetaData.class);
+ ArgumentCaptor<String> dataCaptor = ArgumentCaptor.forClass(String.class);
+ verify(ctx).newMsg(typeCaptor.capture(), originatorCaptor.capture(), metadataCaptor.capture(), dataCaptor.capture());
- Alarm actualAlarm = new ObjectMapper().readValue(actualMsg.getData().getBytes(), Alarm.class);
+ assertEquals("ALARM", typeCaptor.getValue());
+ assertEquals(originator, originatorCaptor.getValue());
+ assertEquals("value", metadataCaptor.getValue().getValue("key"));
+ assertEquals(Boolean.TRUE.toString(), metadataCaptor.getValue().getValue(IS_NEW_ALARM));
+ assertNotSame(metaData, metadataCaptor.getValue());
+
+ Alarm actualAlarm = new ObjectMapper().readValue(dataCaptor.getValue().getBytes(), Alarm.class);
Alarm expectedAlarm = Alarm.builder()
.tenantId(tenantId)
.originator(originator)
@@ -208,17 +212,22 @@ public class TbAlarmNodeTest {
node.onMsg(ctx, msg);
- ArgumentCaptor<TbMsg> captor = ArgumentCaptor.forClass(TbMsg.class);
- verify(ctx).tellNext(captor.capture(), eq("Created"));
- TbMsg actualMsg = captor.getValue();
+ verify(ctx).tellNext(any(), eq("Created"));
+
+ ArgumentCaptor<String> typeCaptor = ArgumentCaptor.forClass(String.class);
+ ArgumentCaptor<EntityId> originatorCaptor = ArgumentCaptor.forClass(EntityId.class);
+ ArgumentCaptor<TbMsgMetaData> metadataCaptor = ArgumentCaptor.forClass(TbMsgMetaData.class);
+ ArgumentCaptor<String> dataCaptor = ArgumentCaptor.forClass(String.class);
+ verify(ctx).newMsg(typeCaptor.capture(), originatorCaptor.capture(), metadataCaptor.capture(), dataCaptor.capture());
+
+ assertEquals("ALARM", typeCaptor.getValue());
+ assertEquals(originator, originatorCaptor.getValue());
+ assertEquals("value", metadataCaptor.getValue().getValue("key"));
+ assertEquals(Boolean.TRUE.toString(), metadataCaptor.getValue().getValue(IS_NEW_ALARM));
+ assertNotSame(metaData, metadataCaptor.getValue());
- assertEquals("ALARM", actualMsg.getType());
- assertEquals(originator, actualMsg.getOriginator());
- assertEquals("value", actualMsg.getMetaData().getValue("key"));
- assertEquals(Boolean.TRUE.toString(), actualMsg.getMetaData().getValue(IS_NEW_ALARM));
- assertNotSame(metaData, actualMsg.getMetaData());
- Alarm actualAlarm = new ObjectMapper().readValue(actualMsg.getData().getBytes(), Alarm.class);
+ Alarm actualAlarm = new ObjectMapper().readValue(dataCaptor.getValue().getBytes(), Alarm.class);
Alarm expectedAlarm = Alarm.builder()
.tenantId(tenantId)
.originator(originator)
@@ -252,17 +261,21 @@ public class TbAlarmNodeTest {
node.onMsg(ctx, msg);
- ArgumentCaptor<TbMsg> captor = ArgumentCaptor.forClass(TbMsg.class);
- verify(ctx).tellNext(captor.capture(), eq("Updated"));
- TbMsg actualMsg = captor.getValue();
+ verify(ctx).tellNext(any(), eq("Updated"));
- assertEquals("ALARM", actualMsg.getType());
- assertEquals(originator, actualMsg.getOriginator());
- assertEquals("value", actualMsg.getMetaData().getValue("key"));
- assertEquals(Boolean.TRUE.toString(), actualMsg.getMetaData().getValue(IS_EXISTING_ALARM));
- assertNotSame(metaData, actualMsg.getMetaData());
+ ArgumentCaptor<String> typeCaptor = ArgumentCaptor.forClass(String.class);
+ ArgumentCaptor<EntityId> originatorCaptor = ArgumentCaptor.forClass(EntityId.class);
+ ArgumentCaptor<TbMsgMetaData> metadataCaptor = ArgumentCaptor.forClass(TbMsgMetaData.class);
+ ArgumentCaptor<String> dataCaptor = ArgumentCaptor.forClass(String.class);
+ verify(ctx).newMsg(typeCaptor.capture(), originatorCaptor.capture(), metadataCaptor.capture(), dataCaptor.capture());
- Alarm actualAlarm = new ObjectMapper().readValue(actualMsg.getData().getBytes(), Alarm.class);
+ assertEquals("ALARM", typeCaptor.getValue());
+ assertEquals(originator, originatorCaptor.getValue());
+ assertEquals("value", metadataCaptor.getValue().getValue("key"));
+ assertEquals(Boolean.TRUE.toString(), metadataCaptor.getValue().getValue(IS_EXISTING_ALARM));
+ assertNotSame(metaData, metadataCaptor.getValue());
+
+ Alarm actualAlarm = new ObjectMapper().readValue(dataCaptor.getValue().getBytes(), Alarm.class);
assertTrue(activeAlarm.getEndTs() > oldEndDate);
Alarm expectedAlarm = Alarm.builder()
.tenantId(tenantId)
@@ -298,17 +311,21 @@ public class TbAlarmNodeTest {
node.onMsg(ctx, msg);
- ArgumentCaptor<TbMsg> captor = ArgumentCaptor.forClass(TbMsg.class);
- verify(ctx).tellNext(captor.capture(), eq("Cleared"));
- TbMsg actualMsg = captor.getValue();
+ verify(ctx).tellNext(any(), eq("Cleared"));
+
+ ArgumentCaptor<String> typeCaptor = ArgumentCaptor.forClass(String.class);
+ ArgumentCaptor<EntityId> originatorCaptor = ArgumentCaptor.forClass(EntityId.class);
+ ArgumentCaptor<TbMsgMetaData> metadataCaptor = ArgumentCaptor.forClass(TbMsgMetaData.class);
+ ArgumentCaptor<String> dataCaptor = ArgumentCaptor.forClass(String.class);
+ verify(ctx).newMsg(typeCaptor.capture(), originatorCaptor.capture(), metadataCaptor.capture(), dataCaptor.capture());
- assertEquals("ALARM", actualMsg.getType());
- assertEquals(originator, actualMsg.getOriginator());
- assertEquals("value", actualMsg.getMetaData().getValue("key"));
- assertEquals(Boolean.TRUE.toString(), actualMsg.getMetaData().getValue(IS_CLEARED_ALARM));
- assertNotSame(metaData, actualMsg.getMetaData());
+ assertEquals("ALARM", typeCaptor.getValue());
+ assertEquals(originator, originatorCaptor.getValue());
+ assertEquals("value", metadataCaptor.getValue().getValue("key"));
+ assertEquals(Boolean.TRUE.toString(), metadataCaptor.getValue().getValue(IS_CLEARED_ALARM));
+ assertNotSame(metaData, metadataCaptor.getValue());
- Alarm actualAlarm = new ObjectMapper().readValue(actualMsg.getData().getBytes(), Alarm.class);
+ Alarm actualAlarm = new ObjectMapper().readValue(dataCaptor.getValue().getBytes(), Alarm.class);
Alarm expectedAlarm = Alarm.builder()
.tenantId(tenantId)
.originator(originator)
diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/mail/TbMsgToEmailNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/mail/TbMsgToEmailNodeTest.java
index 2e5d947..fe19a6d 100644
--- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/mail/TbMsgToEmailNodeTest.java
+++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/mail/TbMsgToEmailNodeTest.java
@@ -36,7 +36,9 @@ import java.io.IOException;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotSame;
+import static org.mockito.Matchers.any;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class TbMsgToEmailNodeTest {
@@ -62,17 +64,19 @@ public class TbMsgToEmailNodeTest {
emailNode.onMsg(ctx, msg);
- ArgumentCaptor<TbMsg> captor = ArgumentCaptor.forClass(TbMsg.class);
- verify(ctx).tellNext(captor.capture());
- TbMsg actualMsg = captor.getValue();
+ ArgumentCaptor<String> typeCaptor = ArgumentCaptor.forClass(String.class);
+ ArgumentCaptor<EntityId> originatorCaptor = ArgumentCaptor.forClass(EntityId.class);
+ ArgumentCaptor<TbMsgMetaData> metadataCaptor = ArgumentCaptor.forClass(TbMsgMetaData.class);
+ ArgumentCaptor<String> dataCaptor = ArgumentCaptor.forClass(String.class);
+ verify(ctx).newMsg(typeCaptor.capture(), originatorCaptor.capture(), metadataCaptor.capture(), dataCaptor.capture());
- assertEquals("SEND_EMAIL", actualMsg.getType());
- assertEquals(originator, actualMsg.getOriginator());
- assertEquals("oreo", actualMsg.getMetaData().getValue("username"));
- assertNotSame(metaData, actualMsg.getMetaData());
+ assertEquals("SEND_EMAIL", typeCaptor.getValue());
+ assertEquals(originator, originatorCaptor.getValue());
+ assertEquals("oreo", metadataCaptor.getValue().getValue("username"));
+ assertNotSame(metaData, metadataCaptor.getValue());
- EmailPojo actual = new ObjectMapper().readValue(actualMsg.getData().getBytes(), EmailPojo.class);
+ EmailPojo actual = new ObjectMapper().readValue(dataCaptor.getValue().getBytes(), EmailPojo.class);
EmailPojo expected = new EmailPojo.EmailPojoBuilder()
.from("test@mail.org")
diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbChangeOriginatorNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbChangeOriginatorNodeTest.java
index 7f864ba..1f75ed3 100644
--- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbChangeOriginatorNodeTest.java
+++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbChangeOriginatorNodeTest.java
@@ -29,6 +29,7 @@ import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.server.common.data.asset.Asset;
import org.thingsboard.server.common.data.id.AssetId;
import org.thingsboard.server.common.data.id.CustomerId;
+import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.RuleChainId;
import org.thingsboard.server.common.data.id.RuleNodeId;
import org.thingsboard.server.common.msg.TbMsg;
@@ -68,11 +69,14 @@ public class TbChangeOriginatorNodeTest {
when(assetService.findAssetByIdAsync(assetId)).thenReturn(Futures.immediateFuture(asset));
node.onMsg(ctx, msg);
- ArgumentCaptor<TbMsg> captor = ArgumentCaptor.forClass(TbMsg.class);
- verify(ctx).tellNext(captor.capture());
- TbMsg actualMsg = captor.getValue();
- assertEquals(customerId, actualMsg.getOriginator());
- assertEquals(msg.getId(), actualMsg.getId());
+
+ ArgumentCaptor<String> typeCaptor = ArgumentCaptor.forClass(String.class);
+ ArgumentCaptor<EntityId> originatorCaptor = ArgumentCaptor.forClass(EntityId.class);
+ ArgumentCaptor<TbMsgMetaData> metadataCaptor = ArgumentCaptor.forClass(TbMsgMetaData.class);
+ ArgumentCaptor<String> dataCaptor = ArgumentCaptor.forClass(String.class);
+ verify(ctx).newMsg(typeCaptor.capture(), originatorCaptor.capture(), metadataCaptor.capture(), dataCaptor.capture());
+
+ assertEquals(customerId, originatorCaptor.getValue());
}
@Test
@@ -92,11 +96,13 @@ public class TbChangeOriginatorNodeTest {
when(assetService.findAssetByIdAsync(assetId)).thenReturn(Futures.immediateFuture(asset));
node.onMsg(ctx, msg);
- ArgumentCaptor<TbMsg> captor = ArgumentCaptor.forClass(TbMsg.class);
- verify(ctx).spawn(captor.capture());
- TbMsg actualMsg = captor.getValue();
- assertEquals(customerId, actualMsg.getOriginator());
- assertEquals(msg.getId(), actualMsg.getId());
+ ArgumentCaptor<String> typeCaptor = ArgumentCaptor.forClass(String.class);
+ ArgumentCaptor<EntityId> originatorCaptor = ArgumentCaptor.forClass(EntityId.class);
+ ArgumentCaptor<TbMsgMetaData> metadataCaptor = ArgumentCaptor.forClass(TbMsgMetaData.class);
+ ArgumentCaptor<String> dataCaptor = ArgumentCaptor.forClass(String.class);
+ verify(ctx).newMsg(typeCaptor.capture(), originatorCaptor.capture(), metadataCaptor.capture(), dataCaptor.capture());
+
+ assertEquals(customerId, originatorCaptor.getValue());
}
@Test