thingsboard-developers

Details

diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActor.java
index 55525c6..4812002 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActor.java
@@ -51,6 +51,9 @@ public class RuleChainActor extends ComponentActor<RuleChainId, RuleChainActorMe
             case RULE_TO_RULE_CHAIN_TELL_NEXT_MSG:
                 processor.onTellNext((RuleNodeToRuleChainTellNextMsg) msg);
                 break;
+            case RULE_CHAIN_TO_RULE_CHAIN_MSG:
+                processor.onRuleChainToRuleChainMsg((RuleChainToRuleChainMsg) msg);
+                break;
             default:
                 return false;
         }
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
index fb7bc49..4628636 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
@@ -180,6 +180,15 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
         });
     }
 
+    void onRuleChainToRuleChainMsg(RuleChainToRuleChainMsg envelope) {
+        checkActive();
+        if(envelope.isEnqueue()) {
+            putToQueue(enrichWithRuleChainId(envelope.getMsg()), msg -> pushMsgToNode(firstNode, msg));
+        } else {
+            pushMsgToNode(firstNode, envelope.getMsg());
+        }
+    }
+
     void onTellNext(RuleNodeToRuleChainTellNextMsg envelope) {
         checkActive();
         RuleNodeId originator = envelope.getOriginator();
@@ -190,8 +199,9 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
 
         TbMsg msg = envelope.getMsg();
         int relationsCount = relations.size();
+        EntityId ackId = msg.getRuleNodeId() != null ? msg.getRuleNodeId() : msg.getRuleChainId();
         if (relationsCount == 0) {
-            queue.ack(msg, msg.getRuleNodeId().getId(), msg.getClusterPartition());
+            queue.ack(msg, ackId.getId(), msg.getClusterPartition());
         } else if (relationsCount == 1) {
             for (RuleNodeRelation relation : relations) {
                 pushToTarget(msg, relation.getOut());
@@ -201,22 +211,31 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
                 EntityId target = relation.getOut();
                 switch (target.getEntityType()) {
                     case RULE_NODE:
-                        RuleNodeId targetId = new RuleNodeId(target.getId());
-                        RuleNodeCtx targetNodeCtx = nodeActors.get(targetId);
-                        TbMsg copy = msg.copy(UUIDs.timeBased(), entityId, targetId, DEFAULT_CLUSTER_PARTITION);
-                        putToQueue(copy, queuedMsg -> pushMsgToNode(targetNodeCtx, queuedMsg));
+                        enqueueAndForwardMsgCopyToNode(msg, target);
                         break;
                     case RULE_CHAIN:
-                        parent.tell(new RuleChainToRuleChainMsg(new RuleChainId(target.getId()), entityId, msg, true), self);
+                        enqueueAndForwardMsgCopyToChain(msg, target);
                         break;
                 }
             }
             //TODO: Ideally this should happen in async way when all targets confirm that the copied messages are successfully written to corresponding target queues.
-            EntityId ackId = msg.getRuleNodeId() != null ? msg.getRuleNodeId() : msg.getRuleChainId();
             queue.ack(msg, ackId.getId(), msg.getClusterPartition());
         }
     }
 
+    private void enqueueAndForwardMsgCopyToChain(TbMsg msg, EntityId target) {
+        RuleChainId targetRCId = new RuleChainId(target.getId());
+        TbMsg copyMsg = msg.copy(UUIDs.timeBased(), targetRCId, null, DEFAULT_CLUSTER_PARTITION);
+        parent.tell(new RuleChainToRuleChainMsg(new RuleChainId(target.getId()), entityId, copyMsg, true), self);
+    }
+
+    private void enqueueAndForwardMsgCopyToNode(TbMsg msg, EntityId target) {
+        RuleNodeId targetId = new RuleNodeId(target.getId());
+        RuleNodeCtx targetNodeCtx = nodeActors.get(targetId);
+        TbMsg copy = msg.copy(UUIDs.timeBased(), entityId, targetId, DEFAULT_CLUSTER_PARTITION);
+        putToQueue(copy, queuedMsg -> pushMsgToNode(targetNodeCtx, queuedMsg));
+    }
+
     private void pushToTarget(TbMsg msg, EntityId target) {
         switch (target.getEntityType()) {
             case RULE_NODE:
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainToRuleChainMsg.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainToRuleChainMsg.java
index f0fca21..2b2623b 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainToRuleChainMsg.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainToRuleChainMsg.java
@@ -26,7 +26,7 @@ import org.thingsboard.server.common.msg.TbMsg;
  * Created by ashvayka on 19.03.18.
  */
 @Data
-final class RuleChainToRuleChainMsg implements TbActorMsg {
+public final class RuleChainToRuleChainMsg implements TbActorMsg {
 
     private final RuleChainId target;
     private final RuleChainId source;
diff --git a/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java b/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java
index 0963a6d..1325b6f 100644
--- a/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java
@@ -22,6 +22,7 @@ import org.thingsboard.server.actors.device.DeviceActor;
 import org.thingsboard.server.actors.device.DeviceActorToRuleEngineMsg;
 import org.thingsboard.server.actors.plugin.PluginTerminationMsg;
 import org.thingsboard.server.actors.ruleChain.RuleChainManagerActor;
+import org.thingsboard.server.actors.ruleChain.RuleChainToRuleChainMsg;
 import org.thingsboard.server.actors.service.ContextBasedCreator;
 import org.thingsboard.server.actors.service.DefaultActorService;
 import org.thingsboard.server.actors.shared.plugin.TenantPluginManager;
@@ -83,6 +84,9 @@ public class TenantActor extends RuleChainManagerActor {
             case DEVICE_RPC_REQUEST_TO_DEVICE_ACTOR_MSG:
                 onToDeviceActorMsg((DeviceAwareMsg) msg);
                 break;
+            case RULE_CHAIN_TO_RULE_CHAIN_MSG:
+                onRuleChainMsg((RuleChainToRuleChainMsg) msg);
+                break;
             default:
                 return false;
         }
@@ -103,6 +107,11 @@ public class TenantActor extends RuleChainManagerActor {
         ruleChainManager.getRootChainActor().tell(msg, self());
     }
 
+    private void onRuleChainMsg(RuleChainToRuleChainMsg msg) {
+        ruleChainManager.getOrCreateActor(context(), msg.getTarget()).tell(msg, self());
+    }
+
+
     private void onToDeviceActorMsg(DeviceAwareMsg msg) {
         getOrCreateDeviceActor(msg.getDeviceId()).tell(msg, ActorRef.noSender());
     }
diff --git a/application/src/test/java/org/thingsboard/server/controller/AbstractRuleEngineControllerTest.java b/application/src/test/java/org/thingsboard/server/controller/AbstractRuleEngineControllerTest.java
index 30c7dc3..5b895b1 100644
--- a/application/src/test/java/org/thingsboard/server/controller/AbstractRuleEngineControllerTest.java
+++ b/application/src/test/java/org/thingsboard/server/controller/AbstractRuleEngineControllerTest.java
@@ -27,6 +27,7 @@ import org.thingsboard.server.common.data.page.TimePageData;
 import org.thingsboard.server.common.data.page.TimePageLink;
 import org.thingsboard.server.common.data.rule.RuleChain;
 import org.thingsboard.server.common.data.rule.RuleChainMetaData;
+import org.thingsboard.server.dao.queue.MsgQueue;
 import org.thingsboard.server.dao.rule.RuleChainService;
 
 import java.io.IOException;
@@ -39,6 +40,9 @@ public class AbstractRuleEngineControllerTest extends AbstractControllerTest {
     @Autowired
     protected RuleChainService ruleChainService;
 
+    @Autowired
+    protected MsgQueue msgQueue;
+
     protected RuleChain saveRuleChain(RuleChain ruleChain) throws Exception {
         return doPost("/api/ruleChain", ruleChain, RuleChain.class);
     }
diff --git a/application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java b/application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java
index 2d56772..a294816 100644
--- a/application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java
+++ b/application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java
@@ -17,6 +17,7 @@ package org.thingsboard.server.rules.flow;
 
 import com.datastax.driver.core.utils.UUIDs;
 import com.fasterxml.jackson.databind.JsonNode;
+import com.google.common.collect.Lists;
 import lombok.Data;
 import lombok.extern.slf4j.Slf4j;
 import org.junit.After;
@@ -45,6 +46,7 @@ import org.thingsboard.server.dao.rule.RuleChainService;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.List;
 
 import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
 
@@ -186,6 +188,129 @@ public abstract class AbstractRuleEngineFlowIntegrationTest extends AbstractRule
 
         Assert.assertEquals("serverAttributeValue1", getMetadata(outEvent).get("ss_serverAttributeKey1").asText());
         Assert.assertEquals("serverAttributeValue2", getMetadata(outEvent).get("ss_serverAttributeKey2").asText());
+
+        List<TbMsg> unAckMsgList = Lists.newArrayList(msgQueue.findUnprocessed(ruleChain.getId().getId(), 0L));
+        Assert.assertEquals(0, unAckMsgList.size());
+    }
+
+    @Test
+    public void testTwoRuleChainsWithTwoRules() throws Exception {
+        // Creating Rule Chain
+        RuleChain rootRuleChain = new RuleChain();
+        rootRuleChain.setName("Root Rule Chain");
+        rootRuleChain.setTenantId(savedTenant.getId());
+        rootRuleChain.setRoot(true);
+        rootRuleChain.setDebugMode(true);
+        rootRuleChain = saveRuleChain(rootRuleChain);
+        Assert.assertNull(rootRuleChain.getFirstRuleNodeId());
+
+        // Creating Rule Chain
+        RuleChain secondaryRuleChain = new RuleChain();
+        secondaryRuleChain.setName("Secondary Rule Chain");
+        secondaryRuleChain.setTenantId(savedTenant.getId());
+        secondaryRuleChain.setRoot(false);
+        secondaryRuleChain.setDebugMode(true);
+        secondaryRuleChain = saveRuleChain(secondaryRuleChain);
+        Assert.assertNull(secondaryRuleChain.getFirstRuleNodeId());
+
+        RuleChainMetaData rootMetaData = new RuleChainMetaData();
+        rootMetaData.setRuleChainId(rootRuleChain.getId());
+
+        RuleNode ruleNode1 = new RuleNode();
+        ruleNode1.setName("Simple Rule Node 1");
+        ruleNode1.setType(org.thingsboard.rule.engine.metadata.TbGetAttributesNode.class.getName());
+        ruleNode1.setDebugMode(true);
+        TbGetAttributesNodeConfiguration configuration1 = new TbGetAttributesNodeConfiguration();
+        configuration1.setServerAttributeNames(Collections.singletonList("serverAttributeKey1"));
+        ruleNode1.setConfiguration(mapper.valueToTree(configuration1));
+
+        rootMetaData.setNodes(Collections.singletonList(ruleNode1));
+        rootMetaData.setFirstNodeIndex(0);
+        rootMetaData.addRuleChainConnectionInfo(0, secondaryRuleChain.getId(), "Success", mapper.createObjectNode());
+        rootMetaData = saveRuleChainMetaData(rootMetaData);
+        Assert.assertNotNull(rootMetaData);
+
+        rootRuleChain = getRuleChain(rootRuleChain.getId());
+        Assert.assertNotNull(rootRuleChain.getFirstRuleNodeId());
+
+
+        RuleChainMetaData secondaryMetaData = new RuleChainMetaData();
+        secondaryMetaData.setRuleChainId(secondaryRuleChain.getId());
+
+        RuleNode ruleNode2 = new RuleNode();
+        ruleNode2.setName("Simple Rule Node 2");
+        ruleNode2.setType(org.thingsboard.rule.engine.metadata.TbGetAttributesNode.class.getName());
+        ruleNode2.setDebugMode(true);
+        TbGetAttributesNodeConfiguration configuration2 = new TbGetAttributesNodeConfiguration();
+        configuration2.setServerAttributeNames(Collections.singletonList("serverAttributeKey2"));
+        ruleNode2.setConfiguration(mapper.valueToTree(configuration2));
+
+        secondaryMetaData.setNodes(Collections.singletonList(ruleNode2));
+        secondaryMetaData.setFirstNodeIndex(0);
+        secondaryMetaData = saveRuleChainMetaData(secondaryMetaData);
+        Assert.assertNotNull(secondaryMetaData);
+
+        // Saving the device
+        Device device = new Device();
+        device.setName("My device");
+        device.setType("default");
+        device = doPost("/api/device", device, Device.class);
+
+        attributesService.save(device.getId(), DataConstants.SERVER_SCOPE,
+                Collections.singletonList(new BaseAttributeKvEntry(new StringDataEntry("serverAttributeKey1", "serverAttributeValue1"), System.currentTimeMillis())));
+        attributesService.save(device.getId(), DataConstants.SERVER_SCOPE,
+                Collections.singletonList(new BaseAttributeKvEntry(new StringDataEntry("serverAttributeKey2", "serverAttributeValue2"), System.currentTimeMillis())));
+
+
+        Thread.sleep(1000);
+
+        // Pushing Message to the system
+        TbMsg tbMsg = new TbMsg(UUIDs.timeBased(),
+                "CUSTOM",
+                device.getId(),
+                new TbMsgMetaData(),
+                "{}", null, null, 0L);
+        actorService.onMsg(new ServiceToRuleEngineMsg(savedTenant.getId(), tbMsg));
+
+        Thread.sleep(3000);
+
+        TimePageData<Event> events = getDebugEvents(savedTenant.getId(), rootRuleChain.getFirstRuleNodeId(), 1000);
+
+        Assert.assertEquals(2, events.getData().size());
+
+        Event inEvent = events.getData().stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.IN)).findFirst().get();
+        Assert.assertEquals(rootRuleChain.getFirstRuleNodeId(), inEvent.getEntityId());
+        Assert.assertEquals(device.getId().getId().toString(), inEvent.getBody().get("entityId").asText());
+
+        Event outEvent = events.getData().stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.OUT)).findFirst().get();
+        Assert.assertEquals(rootRuleChain.getFirstRuleNodeId(), outEvent.getEntityId());
+        Assert.assertEquals(device.getId().getId().toString(), outEvent.getBody().get("entityId").asText());
+
+        Assert.assertEquals("serverAttributeValue1", getMetadata(outEvent).get("ss_serverAttributeKey1").asText());
+
+        RuleChain finalRuleChain = rootRuleChain;
+        RuleNode lastRuleNode = secondaryMetaData.getNodes().stream().filter(node -> !node.getId().equals(finalRuleChain.getFirstRuleNodeId())).findFirst().get();
+
+        events = getDebugEvents(savedTenant.getId(), lastRuleNode.getId(), 1000);
+
+        Assert.assertEquals(2, events.getData().size());
+
+        inEvent = events.getData().stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.IN)).findFirst().get();
+        Assert.assertEquals(lastRuleNode.getId(), inEvent.getEntityId());
+        Assert.assertEquals(device.getId().getId().toString(), inEvent.getBody().get("entityId").asText());
+
+        outEvent = events.getData().stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.OUT)).findFirst().get();
+        Assert.assertEquals(lastRuleNode.getId(), outEvent.getEntityId());
+        Assert.assertEquals(device.getId().getId().toString(), outEvent.getBody().get("entityId").asText());
+
+        Assert.assertEquals("serverAttributeValue1", getMetadata(outEvent).get("ss_serverAttributeKey1").asText());
+        Assert.assertEquals("serverAttributeValue2", getMetadata(outEvent).get("ss_serverAttributeKey2").asText());
+
+        List<TbMsg> unAckMsgList = Lists.newArrayList(msgQueue.findUnprocessed(rootRuleChain.getId().getId(), 0L));
+        Assert.assertEquals(0, unAckMsgList.size());
+
+        unAckMsgList = Lists.newArrayList(msgQueue.findUnprocessed(secondaryRuleChain.getId().getId(), 0L));
+        Assert.assertEquals(0, unAckMsgList.size());
     }
 
 }
diff --git a/application/src/test/java/org/thingsboard/server/rules/flow/nosql/RuleEngineFlowNoSqlIntegrationTest.java b/application/src/test/java/org/thingsboard/server/rules/flow/nosql/RuleEngineFlowNoSqlIntegrationTest.java
new file mode 100644
index 0000000..d073a1d
--- /dev/null
+++ b/application/src/test/java/org/thingsboard/server/rules/flow/nosql/RuleEngineFlowNoSqlIntegrationTest.java
@@ -0,0 +1,26 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.rules.flow.nosql;
+
+import org.thingsboard.server.dao.service.DaoNoSqlTest;
+import org.thingsboard.server.rules.flow.AbstractRuleEngineFlowIntegrationTest;
+
+/**
+ * Created by Valerii Sosliuk on 8/22/2017.
+ */
+@DaoNoSqlTest
+public class RuleEngineFlowNoSqlIntegrationTest extends AbstractRuleEngineFlowIntegrationTest {
+}
diff --git a/application/src/test/java/org/thingsboard/server/rules/lifecycle/nosql/RuleEngineLifecycleNoSqlIntegrationTest.java b/application/src/test/java/org/thingsboard/server/rules/lifecycle/nosql/RuleEngineLifecycleNoSqlIntegrationTest.java
new file mode 100644
index 0000000..6f66359
--- /dev/null
+++ b/application/src/test/java/org/thingsboard/server/rules/lifecycle/nosql/RuleEngineLifecycleNoSqlIntegrationTest.java
@@ -0,0 +1,26 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.rules.lifecycle.nosql;
+
+import org.thingsboard.server.dao.service.DaoNoSqlTest;
+import org.thingsboard.server.rules.lifecycle.AbstractRuleEngineLifecycleIntegrationTest;
+
+/**
+ * Created by Valerii Sosliuk on 8/22/2017.
+ */
+@DaoNoSqlTest
+public class RuleEngineLifecycleNoSqlIntegrationTest extends AbstractRuleEngineLifecycleIntegrationTest {
+}
diff --git a/application/src/test/java/org/thingsboard/server/rules/RuleEngineNoSqlTestSuite.java b/application/src/test/java/org/thingsboard/server/rules/RuleEngineNoSqlTestSuite.java
new file mode 100644
index 0000000..bffe491
--- /dev/null
+++ b/application/src/test/java/org/thingsboard/server/rules/RuleEngineNoSqlTestSuite.java
@@ -0,0 +1,42 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.rules;
+
+import org.cassandraunit.dataset.cql.ClassPathCQLDataSet;
+import org.junit.ClassRule;
+import org.junit.extensions.cpsuite.ClasspathSuite;
+import org.junit.runner.RunWith;
+import org.thingsboard.server.dao.CustomCassandraCQLUnit;
+import org.thingsboard.server.dao.CustomSqlUnit;
+
+import java.util.Arrays;
+
+@RunWith(ClasspathSuite.class)
+@ClasspathSuite.ClassnameFilters({
+        "org.thingsboard.server.rules.flow.nosql.*Test",
+        "org.thingsboard.server.rules.lifecycle.nosql.*Test"
+})
+public class RuleEngineNoSqlTestSuite {
+
+    @ClassRule
+    public static CustomCassandraCQLUnit cassandraUnit =
+            new CustomCassandraCQLUnit(
+                    Arrays.asList(
+                            new ClassPathCQLDataSet("cassandra/schema.cql", false, false),
+                            new ClassPathCQLDataSet("cassandra/system-data.cql", false, false)),
+                    "cassandra-test.yaml", 30000l);
+
+}
diff --git a/application/src/test/java/org/thingsboard/server/rules/RuleEngineSqlTestSuite.java b/application/src/test/java/org/thingsboard/server/rules/RuleEngineSqlTestSuite.java
index c438826..7b13e2f 100644
--- a/application/src/test/java/org/thingsboard/server/rules/RuleEngineSqlTestSuite.java
+++ b/application/src/test/java/org/thingsboard/server/rules/RuleEngineSqlTestSuite.java
@@ -24,8 +24,8 @@ import java.util.Arrays;
 
 @RunWith(ClasspathSuite.class)
 @ClasspathSuite.ClassnameFilters({
-        "org.thingsboard.server.rules.flow.*Test",
-        "org.thingsboard.server.rules.lifecycle.*Test"})
+        "org.thingsboard.server.rules.flow.sql.*Test",
+        "org.thingsboard.server.rules.lifecycle.sql.*Test"})
 public class RuleEngineSqlTestSuite {
 
     @ClassRule
diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/queue/InMemoryMsgKey.java b/dao/src/main/java/org/thingsboard/server/dao/sql/queue/InMemoryMsgKey.java
new file mode 100644
index 0000000..2090edf
--- /dev/null
+++ b/dao/src/main/java/org/thingsboard/server/dao/sql/queue/InMemoryMsgKey.java
@@ -0,0 +1,29 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.dao.sql.queue;
+
+import lombok.Data;
+
+import java.util.UUID;
+
+/**
+ * Created by ashvayka on 30.04.18.
+ */
+@Data
+public final class InMemoryMsgKey {
+    final UUID nodeId;
+    final long clusterPartition;
+}