thingsboard-memoizeit
Changes
application/src/main/java/org/thingsboard/server/service/queue/DefaultMsgQueueService.java 111(+0 -111)
application/src/test/java/org/thingsboard/server/controller/AbstractRuleEngineControllerTest.java 5(+0 -5)
application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java 9(+0 -9)
Details
diff --git a/application/src/main/java/org/thingsboard/server/actors/shared/ComponentMsgProcessor.java b/application/src/main/java/org/thingsboard/server/actors/shared/ComponentMsgProcessor.java
index 2c40be6..c9dc307 100644
--- a/application/src/main/java/org/thingsboard/server/actors/shared/ComponentMsgProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/shared/ComponentMsgProcessor.java
@@ -26,7 +26,6 @@ import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.plugin.ComponentLifecycleState;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
-import org.thingsboard.server.service.queue.MsgQueueService;
import javax.annotation.Nullable;
import java.util.function.Consumer;
diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml
index 90d1500..984fa28 100644
--- a/application/src/main/resources/thingsboard.yml
+++ b/application/src/main/resources/thingsboard.yml
@@ -182,12 +182,6 @@ cassandra:
permit_max_wait_time: "${PERMIT_MAX_WAIT_TIME:120000}"
rate_limit_print_interval_ms: "${CASSANDRA_QUERY_RATE_LIMIT_PRINT_MS:30000}"
- queue:
- msg.ttl: 604800 # 7 days
- ack.ttl: 604800 # 7 days
- partitions.ttl: 604800 # 7 days
- partitioning: "HOURS"
-
# SQL configuration parameters
sql:
# Specify executor service type used to perform timeseries insert tasks: SINGLE FIXED CACHED
@@ -221,13 +215,6 @@ actors:
node:
# Errors for particular actor are persisted once per specified amount of milliseconds
error_persist_frequency: "${ACTORS_RULE_NODE_ERROR_FREQUENCY:3000}"
- queue:
- # Message queue type
- type: "${ACTORS_RULE_QUEUE_TYPE:memory}"
- # Message queue maximum size (per tenant)
- max_size: "${ACTORS_RULE_QUEUE_MAX_SIZE:100}"
- # Message queue cleanup period in seconds
- cleanup_period: "${ACTORS_RULE_QUEUE_CLEANUP_PERIOD:3600}"
statistics:
# Enable/disable actor statistics
enabled: "${ACTORS_STATISTICS_ENABLED:true}"
diff --git a/application/src/test/java/org/thingsboard/server/controller/AbstractRuleEngineControllerTest.java b/application/src/test/java/org/thingsboard/server/controller/AbstractRuleEngineControllerTest.java
index 1b042a8..cb31a4b 100644
--- a/application/src/test/java/org/thingsboard/server/controller/AbstractRuleEngineControllerTest.java
+++ b/application/src/test/java/org/thingsboard/server/controller/AbstractRuleEngineControllerTest.java
@@ -27,9 +27,7 @@ import org.thingsboard.server.common.data.page.TimePageData;
import org.thingsboard.server.common.data.page.TimePageLink;
import org.thingsboard.server.common.data.rule.RuleChain;
import org.thingsboard.server.common.data.rule.RuleChainMetaData;
-import org.thingsboard.server.dao.queue.MsgQueue;
import org.thingsboard.server.dao.rule.RuleChainService;
-import org.thingsboard.server.service.queue.MsgQueueService;
import java.io.IOException;
import java.util.function.Predicate;
@@ -42,9 +40,6 @@ public class AbstractRuleEngineControllerTest extends AbstractControllerTest {
@Autowired
protected RuleChainService ruleChainService;
- @Autowired
- protected MsgQueueService msgQueueService;
-
protected RuleChain saveRuleChain(RuleChain ruleChain) throws Exception {
return doPost("/api/ruleChain", ruleChain, RuleChain.class);
}
diff --git a/application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java b/application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java
index c86d496..f050f3b 100644
--- a/application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java
+++ b/application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java
@@ -191,9 +191,6 @@ public abstract class AbstractRuleEngineFlowIntegrationTest extends AbstractRule
Assert.assertEquals("serverAttributeValue1", getMetadata(outEvent).get("ss_serverAttributeKey1").asText());
Assert.assertEquals("serverAttributeValue2", getMetadata(outEvent).get("ss_serverAttributeKey2").asText());
-
- List<TbMsg> unAckMsgList = Lists.newArrayList(msgQueueService.findUnprocessed(savedTenant.getId(), ruleChain.getId().getId(), 0L));
- Assert.assertEquals(0, unAckMsgList.size());
}
@Test
@@ -311,12 +308,6 @@ public abstract class AbstractRuleEngineFlowIntegrationTest extends AbstractRule
Assert.assertEquals("serverAttributeValue1", getMetadata(outEvent).get("ss_serverAttributeKey1").asText());
Assert.assertEquals("serverAttributeValue2", getMetadata(outEvent).get("ss_serverAttributeKey2").asText());
-
- List<TbMsg> unAckMsgList = Lists.newArrayList(msgQueueService.findUnprocessed(savedTenant.getId(), rootRuleChain.getId().getId(), 0L));
- Assert.assertEquals(0, unAckMsgList.size());
-
- unAckMsgList = Lists.newArrayList(msgQueueService.findUnprocessed(savedTenant.getId(), secondaryRuleChain.getId().getId(), 0L));
- Assert.assertEquals(0, unAckMsgList.size());
}
}
diff --git a/application/src/test/java/org/thingsboard/server/rules/lifecycle/AbstractRuleEngineLifecycleIntegrationTest.java b/application/src/test/java/org/thingsboard/server/rules/lifecycle/AbstractRuleEngineLifecycleIntegrationTest.java
index 7ac0789..24db457 100644
--- a/application/src/test/java/org/thingsboard/server/rules/lifecycle/AbstractRuleEngineLifecycleIntegrationTest.java
+++ b/application/src/test/java/org/thingsboard/server/rules/lifecycle/AbstractRuleEngineLifecycleIntegrationTest.java
@@ -162,73 +162,4 @@ public abstract class AbstractRuleEngineLifecycleIntegrationTest extends Abstrac
Assert.assertEquals("serverAttributeValue", getMetadata(outEvent).get("ss_serverAttributeKey").asText());
}
- @Test
- public void testRuleChainWithOneRuleAndMsgFromQueue() throws Exception {
- // Creating Rule Chain
- RuleChain ruleChain = new RuleChain();
- ruleChain.setName("Simple Rule Chain");
- ruleChain.setTenantId(savedTenant.getId());
- ruleChain.setRoot(true);
- ruleChain.setDebugMode(true);
- ruleChain = saveRuleChain(ruleChain);
- Assert.assertNull(ruleChain.getFirstRuleNodeId());
-
- // Saving the device
- Device device = new Device();
- device.setName("My device");
- device.setType("default");
- device = doPost("/api/device", device, Device.class);
-
- attributesService.save(device.getId(), DataConstants.SERVER_SCOPE,
- Collections.singletonList(new BaseAttributeKvEntry(new StringDataEntry("serverAttributeKey", "serverAttributeValue"), System.currentTimeMillis())));
-
- // Pushing Message to the system
- TbMsg tbMsg = new TbMsg(UUIDs.timeBased(),
- "CUSTOM",
- device.getId(),
- new TbMsgMetaData(),
- "{}",
- ruleChain.getId(), null, 0L);
- msgQueueService.put(device.getTenantId(), tbMsg, ruleChain.getId().getId(), 0L);
-
- Thread.sleep(1000);
-
- RuleChainMetaData metaData = new RuleChainMetaData();
- metaData.setRuleChainId(ruleChain.getId());
-
- RuleNode ruleNode = new RuleNode();
- ruleNode.setName("Simple Rule Node");
- ruleNode.setType(org.thingsboard.rule.engine.metadata.TbGetAttributesNode.class.getName());
- ruleNode.setDebugMode(true);
- TbGetAttributesNodeConfiguration configuration = new TbGetAttributesNodeConfiguration();
- configuration.setServerAttributeNames(Collections.singletonList("serverAttributeKey"));
- ruleNode.setConfiguration(mapper.valueToTree(configuration));
-
- metaData.setNodes(Collections.singletonList(ruleNode));
- metaData.setFirstNodeIndex(0);
-
- metaData = saveRuleChainMetaData(metaData);
- Assert.assertNotNull(metaData);
-
- ruleChain = getRuleChain(ruleChain.getId());
- Assert.assertNotNull(ruleChain.getFirstRuleNodeId());
-
- Thread.sleep(3000);
-
- TimePageData<Event> eventsPage = getDebugEvents(savedTenant.getId(), ruleChain.getFirstRuleNodeId(), 1000);
- List<Event> events = eventsPage.getData().stream().filter(filterByCustomEvent()).collect(Collectors.toList());
-
- Assert.assertEquals(2, events.size());
-
- Event inEvent = events.stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.IN)).findFirst().get();
- Assert.assertEquals(ruleChain.getFirstRuleNodeId(), inEvent.getEntityId());
- Assert.assertEquals(device.getId().getId().toString(), inEvent.getBody().get("entityId").asText());
-
- Event outEvent = events.stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.OUT)).findFirst().get();
- Assert.assertEquals(ruleChain.getFirstRuleNodeId(), outEvent.getEntityId());
- Assert.assertEquals(device.getId().getId().toString(), outEvent.getBody().get("entityId").asText());
-
- Assert.assertEquals("serverAttributeValue", getMetadata(outEvent).get("ss_serverAttributeKey").asText());
- }
-
}
diff --git a/application/src/test/java/org/thingsboard/server/system/BaseHttpDeviceApiTest.java b/application/src/test/java/org/thingsboard/server/system/BaseHttpDeviceApiTest.java
index 5e6896b..1ebccad 100644
--- a/application/src/test/java/org/thingsboard/server/system/BaseHttpDeviceApiTest.java
+++ b/application/src/test/java/org/thingsboard/server/system/BaseHttpDeviceApiTest.java
@@ -57,7 +57,7 @@ public abstract class BaseHttpDeviceApiTest extends AbstractControllerTest {
@Test
public void testGetAttributes() throws Exception {
doGetAsync("/api/v1/" + "WRONG_TOKEN" + "/attributes?clientKeys=keyA,keyB,keyC").andExpect(status().isUnauthorized());
- doGetAsync("/api/v1/" + deviceCredentials.getCredentialsId() + "/attributes?clientKeys=keyA,keyB,keyC").andExpect(status().isNotFound());
+ doGetAsync("/api/v1/" + deviceCredentials.getCredentialsId() + "/attributes?clientKeys=keyA,keyB,keyC").andExpect(status().isOk());
Map<String, String> attrMap = new HashMap<>();
attrMap.put("keyA", "valueA");
diff --git a/dao/src/test/resources/nosql-test.properties b/dao/src/test/resources/nosql-test.properties
index 06a92fa..7c3ec51 100644
--- a/dao/src/test/resources/nosql-test.properties
+++ b/dao/src/test/resources/nosql-test.properties
@@ -1,7 +1,2 @@
database.entities.type=cassandra
database.ts.type=cassandra
-
-cassandra.queue.partitioning=HOURS
-cassandra.queue.ack.ttl=3600
-cassandra.queue.msg.ttl=3600
-cassandra.queue.partitions.ttl=3600
\ No newline at end of file