thingsboard-memoizeit

Remove Msg queue. Fix tests.

10/19/2018 7:39:46 AM

Changes

application/src/main/java/org/thingsboard/server/service/queue/DefaultMsgQueueService.java 111(+0 -111)

application/src/main/java/org/thingsboard/server/service/queue/MsgQueueService.java 32(+0 -32)

dao/src/main/java/org/thingsboard/server/dao/queue/memory/InMemoryMsgKey.java 29(+0 -29)

dao/src/main/java/org/thingsboard/server/dao/queue/memory/InMemoryMsgQueue.java 123(+0 -123)

dao/src/main/java/org/thingsboard/server/dao/queue/MsgQueue.java 34(+0 -34)

dao/src/main/java/org/thingsboard/server/dao/queue/QueueBenchmark.java 152(+0 -152)

Details

diff --git a/application/src/main/java/org/thingsboard/server/actors/shared/ComponentMsgProcessor.java b/application/src/main/java/org/thingsboard/server/actors/shared/ComponentMsgProcessor.java
index 2c40be6..c9dc307 100644
--- a/application/src/main/java/org/thingsboard/server/actors/shared/ComponentMsgProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/shared/ComponentMsgProcessor.java
@@ -26,7 +26,6 @@ import org.thingsboard.server.common.data.id.TenantId;
 import org.thingsboard.server.common.data.plugin.ComponentLifecycleState;
 import org.thingsboard.server.common.msg.TbMsg;
 import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
-import org.thingsboard.server.service.queue.MsgQueueService;
 
 import javax.annotation.Nullable;
 import java.util.function.Consumer;
diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml
index 90d1500..984fa28 100644
--- a/application/src/main/resources/thingsboard.yml
+++ b/application/src/main/resources/thingsboard.yml
@@ -182,12 +182,6 @@ cassandra:
     permit_max_wait_time: "${PERMIT_MAX_WAIT_TIME:120000}"
     rate_limit_print_interval_ms: "${CASSANDRA_QUERY_RATE_LIMIT_PRINT_MS:30000}"
 
-  queue:
-    msg.ttl: 604800 # 7 days
-    ack.ttl: 604800 # 7 days
-    partitions.ttl: 604800 # 7 days
-    partitioning: "HOURS"
-
 # SQL configuration parameters
 sql:
     # Specify executor service type used to perform timeseries insert tasks: SINGLE FIXED CACHED
@@ -221,13 +215,6 @@ actors:
     node:
       # Errors for particular actor are persisted once per specified amount of milliseconds
       error_persist_frequency: "${ACTORS_RULE_NODE_ERROR_FREQUENCY:3000}"
-    queue:
-      # Message queue type
-      type: "${ACTORS_RULE_QUEUE_TYPE:memory}"
-      # Message queue maximum size (per tenant)
-      max_size: "${ACTORS_RULE_QUEUE_MAX_SIZE:100}"
-      # Message queue cleanup period in seconds
-      cleanup_period: "${ACTORS_RULE_QUEUE_CLEANUP_PERIOD:3600}"
   statistics:
     # Enable/disable actor statistics
     enabled: "${ACTORS_STATISTICS_ENABLED:true}"
diff --git a/application/src/test/java/org/thingsboard/server/controller/AbstractRuleEngineControllerTest.java b/application/src/test/java/org/thingsboard/server/controller/AbstractRuleEngineControllerTest.java
index 1b042a8..cb31a4b 100644
--- a/application/src/test/java/org/thingsboard/server/controller/AbstractRuleEngineControllerTest.java
+++ b/application/src/test/java/org/thingsboard/server/controller/AbstractRuleEngineControllerTest.java
@@ -27,9 +27,7 @@ import org.thingsboard.server.common.data.page.TimePageData;
 import org.thingsboard.server.common.data.page.TimePageLink;
 import org.thingsboard.server.common.data.rule.RuleChain;
 import org.thingsboard.server.common.data.rule.RuleChainMetaData;
-import org.thingsboard.server.dao.queue.MsgQueue;
 import org.thingsboard.server.dao.rule.RuleChainService;
-import org.thingsboard.server.service.queue.MsgQueueService;
 
 import java.io.IOException;
 import java.util.function.Predicate;
@@ -42,9 +40,6 @@ public class AbstractRuleEngineControllerTest extends AbstractControllerTest {
     @Autowired
     protected RuleChainService ruleChainService;
 
-    @Autowired
-    protected MsgQueueService msgQueueService;
-
     protected RuleChain saveRuleChain(RuleChain ruleChain) throws Exception {
         return doPost("/api/ruleChain", ruleChain, RuleChain.class);
     }
diff --git a/application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java b/application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java
index c86d496..f050f3b 100644
--- a/application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java
+++ b/application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java
@@ -191,9 +191,6 @@ public abstract class AbstractRuleEngineFlowIntegrationTest extends AbstractRule
 
         Assert.assertEquals("serverAttributeValue1", getMetadata(outEvent).get("ss_serverAttributeKey1").asText());
         Assert.assertEquals("serverAttributeValue2", getMetadata(outEvent).get("ss_serverAttributeKey2").asText());
-
-        List<TbMsg> unAckMsgList = Lists.newArrayList(msgQueueService.findUnprocessed(savedTenant.getId(), ruleChain.getId().getId(), 0L));
-        Assert.assertEquals(0, unAckMsgList.size());
     }
 
     @Test
@@ -311,12 +308,6 @@ public abstract class AbstractRuleEngineFlowIntegrationTest extends AbstractRule
 
         Assert.assertEquals("serverAttributeValue1", getMetadata(outEvent).get("ss_serverAttributeKey1").asText());
         Assert.assertEquals("serverAttributeValue2", getMetadata(outEvent).get("ss_serverAttributeKey2").asText());
-
-        List<TbMsg> unAckMsgList = Lists.newArrayList(msgQueueService.findUnprocessed(savedTenant.getId(), rootRuleChain.getId().getId(), 0L));
-        Assert.assertEquals(0, unAckMsgList.size());
-
-        unAckMsgList = Lists.newArrayList(msgQueueService.findUnprocessed(savedTenant.getId(), secondaryRuleChain.getId().getId(), 0L));
-        Assert.assertEquals(0, unAckMsgList.size());
     }
 
 }
diff --git a/application/src/test/java/org/thingsboard/server/rules/lifecycle/AbstractRuleEngineLifecycleIntegrationTest.java b/application/src/test/java/org/thingsboard/server/rules/lifecycle/AbstractRuleEngineLifecycleIntegrationTest.java
index 7ac0789..24db457 100644
--- a/application/src/test/java/org/thingsboard/server/rules/lifecycle/AbstractRuleEngineLifecycleIntegrationTest.java
+++ b/application/src/test/java/org/thingsboard/server/rules/lifecycle/AbstractRuleEngineLifecycleIntegrationTest.java
@@ -162,73 +162,4 @@ public abstract class AbstractRuleEngineLifecycleIntegrationTest extends Abstrac
         Assert.assertEquals("serverAttributeValue", getMetadata(outEvent).get("ss_serverAttributeKey").asText());
     }
 
-    @Test
-    public void testRuleChainWithOneRuleAndMsgFromQueue() throws Exception {
-        // Creating Rule Chain
-        RuleChain ruleChain = new RuleChain();
-        ruleChain.setName("Simple Rule Chain");
-        ruleChain.setTenantId(savedTenant.getId());
-        ruleChain.setRoot(true);
-        ruleChain.setDebugMode(true);
-        ruleChain = saveRuleChain(ruleChain);
-        Assert.assertNull(ruleChain.getFirstRuleNodeId());
-
-        // Saving the device
-        Device device = new Device();
-        device.setName("My device");
-        device.setType("default");
-        device = doPost("/api/device", device, Device.class);
-
-        attributesService.save(device.getId(), DataConstants.SERVER_SCOPE,
-                Collections.singletonList(new BaseAttributeKvEntry(new StringDataEntry("serverAttributeKey", "serverAttributeValue"), System.currentTimeMillis())));
-
-        // Pushing Message to the system
-        TbMsg tbMsg = new TbMsg(UUIDs.timeBased(),
-                "CUSTOM",
-                device.getId(),
-                new TbMsgMetaData(),
-                "{}",
-                ruleChain.getId(), null, 0L);
-        msgQueueService.put(device.getTenantId(), tbMsg, ruleChain.getId().getId(), 0L);
-
-        Thread.sleep(1000);
-
-        RuleChainMetaData metaData = new RuleChainMetaData();
-        metaData.setRuleChainId(ruleChain.getId());
-
-        RuleNode ruleNode = new RuleNode();
-        ruleNode.setName("Simple Rule Node");
-        ruleNode.setType(org.thingsboard.rule.engine.metadata.TbGetAttributesNode.class.getName());
-        ruleNode.setDebugMode(true);
-        TbGetAttributesNodeConfiguration configuration = new TbGetAttributesNodeConfiguration();
-        configuration.setServerAttributeNames(Collections.singletonList("serverAttributeKey"));
-        ruleNode.setConfiguration(mapper.valueToTree(configuration));
-
-        metaData.setNodes(Collections.singletonList(ruleNode));
-        metaData.setFirstNodeIndex(0);
-
-        metaData = saveRuleChainMetaData(metaData);
-        Assert.assertNotNull(metaData);
-
-        ruleChain = getRuleChain(ruleChain.getId());
-        Assert.assertNotNull(ruleChain.getFirstRuleNodeId());
-
-        Thread.sleep(3000);
-
-        TimePageData<Event> eventsPage = getDebugEvents(savedTenant.getId(), ruleChain.getFirstRuleNodeId(), 1000);
-        List<Event> events = eventsPage.getData().stream().filter(filterByCustomEvent()).collect(Collectors.toList());
-
-        Assert.assertEquals(2, events.size());
-
-        Event inEvent = events.stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.IN)).findFirst().get();
-        Assert.assertEquals(ruleChain.getFirstRuleNodeId(), inEvent.getEntityId());
-        Assert.assertEquals(device.getId().getId().toString(), inEvent.getBody().get("entityId").asText());
-
-        Event outEvent = events.stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.OUT)).findFirst().get();
-        Assert.assertEquals(ruleChain.getFirstRuleNodeId(), outEvent.getEntityId());
-        Assert.assertEquals(device.getId().getId().toString(), outEvent.getBody().get("entityId").asText());
-
-        Assert.assertEquals("serverAttributeValue", getMetadata(outEvent).get("ss_serverAttributeKey").asText());
-    }
-
 }
diff --git a/application/src/test/java/org/thingsboard/server/system/BaseHttpDeviceApiTest.java b/application/src/test/java/org/thingsboard/server/system/BaseHttpDeviceApiTest.java
index 5e6896b..1ebccad 100644
--- a/application/src/test/java/org/thingsboard/server/system/BaseHttpDeviceApiTest.java
+++ b/application/src/test/java/org/thingsboard/server/system/BaseHttpDeviceApiTest.java
@@ -57,7 +57,7 @@ public abstract class BaseHttpDeviceApiTest extends AbstractControllerTest {
     @Test
     public void testGetAttributes() throws Exception {
         doGetAsync("/api/v1/" + "WRONG_TOKEN" + "/attributes?clientKeys=keyA,keyB,keyC").andExpect(status().isUnauthorized());
-        doGetAsync("/api/v1/" + deviceCredentials.getCredentialsId() + "/attributes?clientKeys=keyA,keyB,keyC").andExpect(status().isNotFound());
+        doGetAsync("/api/v1/" + deviceCredentials.getCredentialsId() + "/attributes?clientKeys=keyA,keyB,keyC").andExpect(status().isOk());
 
         Map<String, String> attrMap = new HashMap<>();
         attrMap.put("keyA", "valueA");
diff --git a/dao/src/test/resources/nosql-test.properties b/dao/src/test/resources/nosql-test.properties
index 06a92fa..7c3ec51 100644
--- a/dao/src/test/resources/nosql-test.properties
+++ b/dao/src/test/resources/nosql-test.properties
@@ -1,7 +1,2 @@
 database.entities.type=cassandra
 database.ts.type=cassandra
-
-cassandra.queue.partitioning=HOURS
-cassandra.queue.ack.ttl=3600
-cassandra.queue.msg.ttl=3600
-cassandra.queue.partitions.ttl=3600
\ No newline at end of file