thingsboard-developers
Changes
application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java 33(+26 -7)
application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainToRuleChainMsg.java 2(+1 -1)
application/src/test/java/org/thingsboard/server/controller/AbstractRuleEngineControllerTest.java 4(+4 -0)
application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java 125(+125 -0)
application/src/test/java/org/thingsboard/server/rules/flow/nosql/RuleEngineFlowNoSqlIntegrationTest.java 26(+26 -0)
application/src/test/java/org/thingsboard/server/rules/flow/sql/RuleEngineFlowSqlIntegrationTest.java 3(+2 -1)
application/src/test/java/org/thingsboard/server/rules/lifecycle/nosql/RuleEngineLifecycleNoSqlIntegrationTest.java 26(+26 -0)
Details
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActor.java
index 55525c6..4812002 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActor.java
@@ -51,6 +51,9 @@ public class RuleChainActor extends ComponentActor<RuleChainId, RuleChainActorMe
case RULE_TO_RULE_CHAIN_TELL_NEXT_MSG:
processor.onTellNext((RuleNodeToRuleChainTellNextMsg) msg);
break;
+ case RULE_CHAIN_TO_RULE_CHAIN_MSG:
+ processor.onRuleChainToRuleChainMsg((RuleChainToRuleChainMsg) msg);
+ break;
default:
return false;
}
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
index fb7bc49..4628636 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
@@ -180,6 +180,15 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
});
}
+ void onRuleChainToRuleChainMsg(RuleChainToRuleChainMsg envelope) {
+ checkActive();
+ if(envelope.isEnqueue()) {
+ putToQueue(enrichWithRuleChainId(envelope.getMsg()), msg -> pushMsgToNode(firstNode, msg));
+ } else {
+ pushMsgToNode(firstNode, envelope.getMsg());
+ }
+ }
+
void onTellNext(RuleNodeToRuleChainTellNextMsg envelope) {
checkActive();
RuleNodeId originator = envelope.getOriginator();
@@ -190,8 +199,9 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
TbMsg msg = envelope.getMsg();
int relationsCount = relations.size();
+ EntityId ackId = msg.getRuleNodeId() != null ? msg.getRuleNodeId() : msg.getRuleChainId();
if (relationsCount == 0) {
- queue.ack(msg, msg.getRuleNodeId().getId(), msg.getClusterPartition());
+ queue.ack(msg, ackId.getId(), msg.getClusterPartition());
} else if (relationsCount == 1) {
for (RuleNodeRelation relation : relations) {
pushToTarget(msg, relation.getOut());
@@ -201,22 +211,31 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
EntityId target = relation.getOut();
switch (target.getEntityType()) {
case RULE_NODE:
- RuleNodeId targetId = new RuleNodeId(target.getId());
- RuleNodeCtx targetNodeCtx = nodeActors.get(targetId);
- TbMsg copy = msg.copy(UUIDs.timeBased(), entityId, targetId, DEFAULT_CLUSTER_PARTITION);
- putToQueue(copy, queuedMsg -> pushMsgToNode(targetNodeCtx, queuedMsg));
+ enqueueAndForwardMsgCopyToNode(msg, target);
break;
case RULE_CHAIN:
- parent.tell(new RuleChainToRuleChainMsg(new RuleChainId(target.getId()), entityId, msg, true), self);
+ enqueueAndForwardMsgCopyToChain(msg, target);
break;
}
}
//TODO: Ideally this should happen in async way when all targets confirm that the copied messages are successfully written to corresponding target queues.
- EntityId ackId = msg.getRuleNodeId() != null ? msg.getRuleNodeId() : msg.getRuleChainId();
queue.ack(msg, ackId.getId(), msg.getClusterPartition());
}
}
+ private void enqueueAndForwardMsgCopyToChain(TbMsg msg, EntityId target) {
+ RuleChainId targetRCId = new RuleChainId(target.getId());
+ TbMsg copyMsg = msg.copy(UUIDs.timeBased(), targetRCId, null, DEFAULT_CLUSTER_PARTITION);
+ parent.tell(new RuleChainToRuleChainMsg(new RuleChainId(target.getId()), entityId, copyMsg, true), self);
+ }
+
+ private void enqueueAndForwardMsgCopyToNode(TbMsg msg, EntityId target) {
+ RuleNodeId targetId = new RuleNodeId(target.getId());
+ RuleNodeCtx targetNodeCtx = nodeActors.get(targetId);
+ TbMsg copy = msg.copy(UUIDs.timeBased(), entityId, targetId, DEFAULT_CLUSTER_PARTITION);
+ putToQueue(copy, queuedMsg -> pushMsgToNode(targetNodeCtx, queuedMsg));
+ }
+
private void pushToTarget(TbMsg msg, EntityId target) {
switch (target.getEntityType()) {
case RULE_NODE:
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainToRuleChainMsg.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainToRuleChainMsg.java
index f0fca21..2b2623b 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainToRuleChainMsg.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainToRuleChainMsg.java
@@ -26,7 +26,7 @@ import org.thingsboard.server.common.msg.TbMsg;
* Created by ashvayka on 19.03.18.
*/
@Data
-final class RuleChainToRuleChainMsg implements TbActorMsg {
+public final class RuleChainToRuleChainMsg implements TbActorMsg {
private final RuleChainId target;
private final RuleChainId source;
diff --git a/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java b/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java
index 0963a6d..1325b6f 100644
--- a/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java
@@ -22,6 +22,7 @@ import org.thingsboard.server.actors.device.DeviceActor;
import org.thingsboard.server.actors.device.DeviceActorToRuleEngineMsg;
import org.thingsboard.server.actors.plugin.PluginTerminationMsg;
import org.thingsboard.server.actors.ruleChain.RuleChainManagerActor;
+import org.thingsboard.server.actors.ruleChain.RuleChainToRuleChainMsg;
import org.thingsboard.server.actors.service.ContextBasedCreator;
import org.thingsboard.server.actors.service.DefaultActorService;
import org.thingsboard.server.actors.shared.plugin.TenantPluginManager;
@@ -83,6 +84,9 @@ public class TenantActor extends RuleChainManagerActor {
case DEVICE_RPC_REQUEST_TO_DEVICE_ACTOR_MSG:
onToDeviceActorMsg((DeviceAwareMsg) msg);
break;
+ case RULE_CHAIN_TO_RULE_CHAIN_MSG:
+ onRuleChainMsg((RuleChainToRuleChainMsg) msg);
+ break;
default:
return false;
}
@@ -103,6 +107,11 @@ public class TenantActor extends RuleChainManagerActor {
ruleChainManager.getRootChainActor().tell(msg, self());
}
+ private void onRuleChainMsg(RuleChainToRuleChainMsg msg) {
+ ruleChainManager.getOrCreateActor(context(), msg.getTarget()).tell(msg, self());
+ }
+
+
private void onToDeviceActorMsg(DeviceAwareMsg msg) {
getOrCreateDeviceActor(msg.getDeviceId()).tell(msg, ActorRef.noSender());
}
diff --git a/application/src/test/java/org/thingsboard/server/controller/AbstractRuleEngineControllerTest.java b/application/src/test/java/org/thingsboard/server/controller/AbstractRuleEngineControllerTest.java
index 30c7dc3..5b895b1 100644
--- a/application/src/test/java/org/thingsboard/server/controller/AbstractRuleEngineControllerTest.java
+++ b/application/src/test/java/org/thingsboard/server/controller/AbstractRuleEngineControllerTest.java
@@ -27,6 +27,7 @@ import org.thingsboard.server.common.data.page.TimePageData;
import org.thingsboard.server.common.data.page.TimePageLink;
import org.thingsboard.server.common.data.rule.RuleChain;
import org.thingsboard.server.common.data.rule.RuleChainMetaData;
+import org.thingsboard.server.dao.queue.MsgQueue;
import org.thingsboard.server.dao.rule.RuleChainService;
import java.io.IOException;
@@ -39,6 +40,9 @@ public class AbstractRuleEngineControllerTest extends AbstractControllerTest {
@Autowired
protected RuleChainService ruleChainService;
+ @Autowired
+ protected MsgQueue msgQueue;
+
protected RuleChain saveRuleChain(RuleChain ruleChain) throws Exception {
return doPost("/api/ruleChain", ruleChain, RuleChain.class);
}
diff --git a/application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java b/application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java
index 2d56772..a294816 100644
--- a/application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java
+++ b/application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java
@@ -17,6 +17,7 @@ package org.thingsboard.server.rules.flow;
import com.datastax.driver.core.utils.UUIDs;
import com.fasterxml.jackson.databind.JsonNode;
+import com.google.common.collect.Lists;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.junit.After;
@@ -45,6 +46,7 @@ import org.thingsboard.server.dao.rule.RuleChainService;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
+import java.util.List;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
@@ -186,6 +188,129 @@ public abstract class AbstractRuleEngineFlowIntegrationTest extends AbstractRule
Assert.assertEquals("serverAttributeValue1", getMetadata(outEvent).get("ss_serverAttributeKey1").asText());
Assert.assertEquals("serverAttributeValue2", getMetadata(outEvent).get("ss_serverAttributeKey2").asText());
+
+ List<TbMsg> unAckMsgList = Lists.newArrayList(msgQueue.findUnprocessed(ruleChain.getId().getId(), 0L));
+ Assert.assertEquals(0, unAckMsgList.size());
+ }
+
+ @Test
+ public void testTwoRuleChainsWithTwoRules() throws Exception {
+ // Creating Rule Chain
+ RuleChain rootRuleChain = new RuleChain();
+ rootRuleChain.setName("Root Rule Chain");
+ rootRuleChain.setTenantId(savedTenant.getId());
+ rootRuleChain.setRoot(true);
+ rootRuleChain.setDebugMode(true);
+ rootRuleChain = saveRuleChain(rootRuleChain);
+ Assert.assertNull(rootRuleChain.getFirstRuleNodeId());
+
+ // Creating Rule Chain
+ RuleChain secondaryRuleChain = new RuleChain();
+ secondaryRuleChain.setName("Secondary Rule Chain");
+ secondaryRuleChain.setTenantId(savedTenant.getId());
+ secondaryRuleChain.setRoot(false);
+ secondaryRuleChain.setDebugMode(true);
+ secondaryRuleChain = saveRuleChain(secondaryRuleChain);
+ Assert.assertNull(secondaryRuleChain.getFirstRuleNodeId());
+
+ RuleChainMetaData rootMetaData = new RuleChainMetaData();
+ rootMetaData.setRuleChainId(rootRuleChain.getId());
+
+ RuleNode ruleNode1 = new RuleNode();
+ ruleNode1.setName("Simple Rule Node 1");
+ ruleNode1.setType(org.thingsboard.rule.engine.metadata.TbGetAttributesNode.class.getName());
+ ruleNode1.setDebugMode(true);
+ TbGetAttributesNodeConfiguration configuration1 = new TbGetAttributesNodeConfiguration();
+ configuration1.setServerAttributeNames(Collections.singletonList("serverAttributeKey1"));
+ ruleNode1.setConfiguration(mapper.valueToTree(configuration1));
+
+ rootMetaData.setNodes(Collections.singletonList(ruleNode1));
+ rootMetaData.setFirstNodeIndex(0);
+ rootMetaData.addRuleChainConnectionInfo(0, secondaryRuleChain.getId(), "Success", mapper.createObjectNode());
+ rootMetaData = saveRuleChainMetaData(rootMetaData);
+ Assert.assertNotNull(rootMetaData);
+
+ rootRuleChain = getRuleChain(rootRuleChain.getId());
+ Assert.assertNotNull(rootRuleChain.getFirstRuleNodeId());
+
+
+ RuleChainMetaData secondaryMetaData = new RuleChainMetaData();
+ secondaryMetaData.setRuleChainId(secondaryRuleChain.getId());
+
+ RuleNode ruleNode2 = new RuleNode();
+ ruleNode2.setName("Simple Rule Node 2");
+ ruleNode2.setType(org.thingsboard.rule.engine.metadata.TbGetAttributesNode.class.getName());
+ ruleNode2.setDebugMode(true);
+ TbGetAttributesNodeConfiguration configuration2 = new TbGetAttributesNodeConfiguration();
+ configuration2.setServerAttributeNames(Collections.singletonList("serverAttributeKey2"));
+ ruleNode2.setConfiguration(mapper.valueToTree(configuration2));
+
+ secondaryMetaData.setNodes(Collections.singletonList(ruleNode2));
+ secondaryMetaData.setFirstNodeIndex(0);
+ secondaryMetaData = saveRuleChainMetaData(secondaryMetaData);
+ Assert.assertNotNull(secondaryMetaData);
+
+ // Saving the device
+ Device device = new Device();
+ device.setName("My device");
+ device.setType("default");
+ device = doPost("/api/device", device, Device.class);
+
+ attributesService.save(device.getId(), DataConstants.SERVER_SCOPE,
+ Collections.singletonList(new BaseAttributeKvEntry(new StringDataEntry("serverAttributeKey1", "serverAttributeValue1"), System.currentTimeMillis())));
+ attributesService.save(device.getId(), DataConstants.SERVER_SCOPE,
+ Collections.singletonList(new BaseAttributeKvEntry(new StringDataEntry("serverAttributeKey2", "serverAttributeValue2"), System.currentTimeMillis())));
+
+
+ Thread.sleep(1000);
+
+ // Pushing Message to the system
+ TbMsg tbMsg = new TbMsg(UUIDs.timeBased(),
+ "CUSTOM",
+ device.getId(),
+ new TbMsgMetaData(),
+ "{}", null, null, 0L);
+ actorService.onMsg(new ServiceToRuleEngineMsg(savedTenant.getId(), tbMsg));
+
+ Thread.sleep(3000);
+
+ TimePageData<Event> events = getDebugEvents(savedTenant.getId(), rootRuleChain.getFirstRuleNodeId(), 1000);
+
+ Assert.assertEquals(2, events.getData().size());
+
+ Event inEvent = events.getData().stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.IN)).findFirst().get();
+ Assert.assertEquals(rootRuleChain.getFirstRuleNodeId(), inEvent.getEntityId());
+ Assert.assertEquals(device.getId().getId().toString(), inEvent.getBody().get("entityId").asText());
+
+ Event outEvent = events.getData().stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.OUT)).findFirst().get();
+ Assert.assertEquals(rootRuleChain.getFirstRuleNodeId(), outEvent.getEntityId());
+ Assert.assertEquals(device.getId().getId().toString(), outEvent.getBody().get("entityId").asText());
+
+ Assert.assertEquals("serverAttributeValue1", getMetadata(outEvent).get("ss_serverAttributeKey1").asText());
+
+ RuleChain finalRuleChain = rootRuleChain;
+ RuleNode lastRuleNode = secondaryMetaData.getNodes().stream().filter(node -> !node.getId().equals(finalRuleChain.getFirstRuleNodeId())).findFirst().get();
+
+ events = getDebugEvents(savedTenant.getId(), lastRuleNode.getId(), 1000);
+
+ Assert.assertEquals(2, events.getData().size());
+
+ inEvent = events.getData().stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.IN)).findFirst().get();
+ Assert.assertEquals(lastRuleNode.getId(), inEvent.getEntityId());
+ Assert.assertEquals(device.getId().getId().toString(), inEvent.getBody().get("entityId").asText());
+
+ outEvent = events.getData().stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.OUT)).findFirst().get();
+ Assert.assertEquals(lastRuleNode.getId(), outEvent.getEntityId());
+ Assert.assertEquals(device.getId().getId().toString(), outEvent.getBody().get("entityId").asText());
+
+ Assert.assertEquals("serverAttributeValue1", getMetadata(outEvent).get("ss_serverAttributeKey1").asText());
+ Assert.assertEquals("serverAttributeValue2", getMetadata(outEvent).get("ss_serverAttributeKey2").asText());
+
+ List<TbMsg> unAckMsgList = Lists.newArrayList(msgQueue.findUnprocessed(rootRuleChain.getId().getId(), 0L));
+ Assert.assertEquals(0, unAckMsgList.size());
+
+ unAckMsgList = Lists.newArrayList(msgQueue.findUnprocessed(secondaryRuleChain.getId().getId(), 0L));
+ Assert.assertEquals(0, unAckMsgList.size());
}
}
diff --git a/application/src/test/java/org/thingsboard/server/rules/flow/nosql/RuleEngineFlowNoSqlIntegrationTest.java b/application/src/test/java/org/thingsboard/server/rules/flow/nosql/RuleEngineFlowNoSqlIntegrationTest.java
new file mode 100644
index 0000000..d073a1d
--- /dev/null
+++ b/application/src/test/java/org/thingsboard/server/rules/flow/nosql/RuleEngineFlowNoSqlIntegrationTest.java
@@ -0,0 +1,26 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.rules.flow.nosql;
+
+import org.thingsboard.server.dao.service.DaoNoSqlTest;
+import org.thingsboard.server.rules.flow.AbstractRuleEngineFlowIntegrationTest;
+
+/**
+ * Created by Valerii Sosliuk on 8/22/2017.
+ */
+@DaoNoSqlTest
+public class RuleEngineFlowNoSqlIntegrationTest extends AbstractRuleEngineFlowIntegrationTest {
+}
diff --git a/application/src/test/java/org/thingsboard/server/rules/lifecycle/nosql/RuleEngineLifecycleNoSqlIntegrationTest.java b/application/src/test/java/org/thingsboard/server/rules/lifecycle/nosql/RuleEngineLifecycleNoSqlIntegrationTest.java
new file mode 100644
index 0000000..6f66359
--- /dev/null
+++ b/application/src/test/java/org/thingsboard/server/rules/lifecycle/nosql/RuleEngineLifecycleNoSqlIntegrationTest.java
@@ -0,0 +1,26 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.rules.lifecycle.nosql;
+
+import org.thingsboard.server.dao.service.DaoNoSqlTest;
+import org.thingsboard.server.rules.lifecycle.AbstractRuleEngineLifecycleIntegrationTest;
+
+/**
+ * Created by Valerii Sosliuk on 8/22/2017.
+ */
+@DaoNoSqlTest
+public class RuleEngineLifecycleNoSqlIntegrationTest extends AbstractRuleEngineLifecycleIntegrationTest {
+}
diff --git a/application/src/test/java/org/thingsboard/server/rules/RuleEngineNoSqlTestSuite.java b/application/src/test/java/org/thingsboard/server/rules/RuleEngineNoSqlTestSuite.java
new file mode 100644
index 0000000..bffe491
--- /dev/null
+++ b/application/src/test/java/org/thingsboard/server/rules/RuleEngineNoSqlTestSuite.java
@@ -0,0 +1,42 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.rules;
+
+import org.cassandraunit.dataset.cql.ClassPathCQLDataSet;
+import org.junit.ClassRule;
+import org.junit.extensions.cpsuite.ClasspathSuite;
+import org.junit.runner.RunWith;
+import org.thingsboard.server.dao.CustomCassandraCQLUnit;
+import org.thingsboard.server.dao.CustomSqlUnit;
+
+import java.util.Arrays;
+
+@RunWith(ClasspathSuite.class)
+@ClasspathSuite.ClassnameFilters({
+ "org.thingsboard.server.rules.flow.nosql.*Test",
+ "org.thingsboard.server.rules.lifecycle.nosql.*Test"
+})
+public class RuleEngineNoSqlTestSuite {
+
+ @ClassRule
+ public static CustomCassandraCQLUnit cassandraUnit =
+ new CustomCassandraCQLUnit(
+ Arrays.asList(
+ new ClassPathCQLDataSet("cassandra/schema.cql", false, false),
+ new ClassPathCQLDataSet("cassandra/system-data.cql", false, false)),
+ "cassandra-test.yaml", 30000l);
+
+}
diff --git a/application/src/test/java/org/thingsboard/server/rules/RuleEngineSqlTestSuite.java b/application/src/test/java/org/thingsboard/server/rules/RuleEngineSqlTestSuite.java
index c438826..7b13e2f 100644
--- a/application/src/test/java/org/thingsboard/server/rules/RuleEngineSqlTestSuite.java
+++ b/application/src/test/java/org/thingsboard/server/rules/RuleEngineSqlTestSuite.java
@@ -24,8 +24,8 @@ import java.util.Arrays;
@RunWith(ClasspathSuite.class)
@ClasspathSuite.ClassnameFilters({
- "org.thingsboard.server.rules.flow.*Test",
- "org.thingsboard.server.rules.lifecycle.*Test"})
+ "org.thingsboard.server.rules.flow.sql.*Test",
+ "org.thingsboard.server.rules.lifecycle.sql.*Test"})
public class RuleEngineSqlTestSuite {
@ClassRule
diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/queue/InMemoryMsgKey.java b/dao/src/main/java/org/thingsboard/server/dao/sql/queue/InMemoryMsgKey.java
new file mode 100644
index 0000000..2090edf
--- /dev/null
+++ b/dao/src/main/java/org/thingsboard/server/dao/sql/queue/InMemoryMsgKey.java
@@ -0,0 +1,29 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.dao.sql.queue;
+
+import lombok.Data;
+
+import java.util.UUID;
+
+/**
+ * Created by ashvayka on 30.04.18.
+ */
+@Data
+public final class InMemoryMsgKey {
+ final UUID nodeId;
+ final long clusterPartition;
+}