thingsboard-aplcache
Changes
application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java 18(+16 -2)
application/src/test/java/org/thingsboard/server/rules/lifecycle/AbstractRuleEngineLifecycleIntegrationTest.java 68(+68 -0)
dao/src/test/java/org/thingsboard/server/dao/service/queue/cassandra/QueuePartitionerTest.java 2(+1 -1)
dao/src/test/java/org/thingsboard/server/dao/service/queue/cassandra/repository/impl/CassandraAckRepositoryTest.java 2(+2 -0)
dao/src/test/java/org/thingsboard/server/dao/service/queue/cassandra/repository/impl/CassandraMsgRepositoryTest.java 2(+2 -0)
Details
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
index 4628636..2cd1755 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
@@ -86,13 +86,26 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
nodeActors.put(ruleNode.getId(), new RuleNodeCtx(tenantId, self, ruleNodeActor, ruleNode));
}
initRoutes(ruleChain, ruleNodeList);
- //TODO: read all messages from queues of the actors and push then to the corresponding node actors;
+ reprocess(ruleNodeList);
started = true;
} else {
onUpdate(context);
}
}
+ private void reprocess(List<RuleNode> ruleNodeList) {
+ for (RuleNode ruleNode : ruleNodeList) {
+ for (TbMsg tbMsg : queue.findUnprocessed(ruleNode.getId().getId(), 0L)) {
+ pushMsgToNode(nodeActors.get(ruleNode.getId()), tbMsg);
+ }
+ }
+ if (firstNode != null) {
+ for (TbMsg tbMsg : queue.findUnprocessed(entityId.getId(), 0L)) {
+ pushMsgToNode(firstNode, tbMsg);
+ }
+ }
+ }
+
@Override
public void onUpdate(ActorContext context) throws Exception {
RuleChain ruleChain = service.findRuleChainById(entityId);
@@ -117,6 +130,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
});
initRoutes(ruleChain, ruleNodeList);
+ reprocess(ruleNodeList);
}
@Override
@@ -182,7 +196,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
void onRuleChainToRuleChainMsg(RuleChainToRuleChainMsg envelope) {
checkActive();
- if(envelope.isEnqueue()) {
+ if (envelope.isEnqueue()) {
putToQueue(enrichWithRuleChainId(envelope.getMsg()), msg -> pushMsgToNode(firstNode, msg));
} else {
pushMsgToNode(firstNode, envelope.getMsg());
diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml
index d9e9012..744cc5b 100644
--- a/application/src/main/resources/thingsboard.yml
+++ b/application/src/main/resources/thingsboard.yml
@@ -302,7 +302,9 @@ spring:
rule:
queue:
- msg_partitioning: "${QUEUE_MSG_PARTITIONING:HOURS}"
+ type: "memory"
+ max_size: 10000
+
# PostgreSQL DAO Configuration
#spring:
diff --git a/application/src/test/java/org/thingsboard/server/rules/lifecycle/AbstractRuleEngineLifecycleIntegrationTest.java b/application/src/test/java/org/thingsboard/server/rules/lifecycle/AbstractRuleEngineLifecycleIntegrationTest.java
index 18583fd..0ea6ff4 100644
--- a/application/src/test/java/org/thingsboard/server/rules/lifecycle/AbstractRuleEngineLifecycleIntegrationTest.java
+++ b/application/src/test/java/org/thingsboard/server/rules/lifecycle/AbstractRuleEngineLifecycleIntegrationTest.java
@@ -159,4 +159,72 @@ public abstract class AbstractRuleEngineLifecycleIntegrationTest extends Abstrac
Assert.assertEquals("serverAttributeValue", getMetadata(outEvent).get("ss_serverAttributeKey").asText());
}
+ @Test
+ public void testRuleChainWithOneRuleAndMsgFromQueue() throws Exception {
+ // Creating Rule Chain
+ RuleChain ruleChain = new RuleChain();
+ ruleChain.setName("Simple Rule Chain");
+ ruleChain.setTenantId(savedTenant.getId());
+ ruleChain.setRoot(true);
+ ruleChain.setDebugMode(true);
+ ruleChain = saveRuleChain(ruleChain);
+ Assert.assertNull(ruleChain.getFirstRuleNodeId());
+
+ // Saving the device
+ Device device = new Device();
+ device.setName("My device");
+ device.setType("default");
+ device = doPost("/api/device", device, Device.class);
+
+ attributesService.save(device.getId(), DataConstants.SERVER_SCOPE,
+ Collections.singletonList(new BaseAttributeKvEntry(new StringDataEntry("serverAttributeKey", "serverAttributeValue"), System.currentTimeMillis())));
+
+ // Pushing Message to the system
+ TbMsg tbMsg = new TbMsg(UUIDs.timeBased(),
+ "CUSTOM",
+ device.getId(),
+ new TbMsgMetaData(),
+ "{}",
+ ruleChain.getId(), null, 0L);
+ msgQueue.put(tbMsg, ruleChain.getId().getId(), 0L);
+
+ Thread.sleep(1000);
+
+ RuleChainMetaData metaData = new RuleChainMetaData();
+ metaData.setRuleChainId(ruleChain.getId());
+
+ RuleNode ruleNode = new RuleNode();
+ ruleNode.setName("Simple Rule Node");
+ ruleNode.setType(org.thingsboard.rule.engine.metadata.TbGetAttributesNode.class.getName());
+ ruleNode.setDebugMode(true);
+ TbGetAttributesNodeConfiguration configuration = new TbGetAttributesNodeConfiguration();
+ configuration.setServerAttributeNames(Collections.singletonList("serverAttributeKey"));
+ ruleNode.setConfiguration(mapper.valueToTree(configuration));
+
+ metaData.setNodes(Collections.singletonList(ruleNode));
+ metaData.setFirstNodeIndex(0);
+
+ metaData = saveRuleChainMetaData(metaData);
+ Assert.assertNotNull(metaData);
+
+ ruleChain = getRuleChain(ruleChain.getId());
+ Assert.assertNotNull(ruleChain.getFirstRuleNodeId());
+
+ Thread.sleep(3000);
+
+ TimePageData<Event> events = getDebugEvents(savedTenant.getId(), ruleChain.getFirstRuleNodeId(), 1000);
+
+ Assert.assertEquals(2, events.getData().size());
+
+ Event inEvent = events.getData().stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.IN)).findFirst().get();
+ Assert.assertEquals(ruleChain.getFirstRuleNodeId(), inEvent.getEntityId());
+ Assert.assertEquals(device.getId().getId().toString(), inEvent.getBody().get("entityId").asText());
+
+ Event outEvent = events.getData().stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.OUT)).findFirst().get();
+ Assert.assertEquals(ruleChain.getFirstRuleNodeId(), outEvent.getEntityId());
+ Assert.assertEquals(device.getId().getId().toString(), outEvent.getBody().get("entityId").asText());
+
+ Assert.assertEquals("serverAttributeValue", getMetadata(outEvent).get("ss_serverAttributeKey").asText());
+ }
+
}
diff --git a/dao/src/main/java/org/thingsboard/server/dao/service/queue/cassandra/QueuePartitioner.java b/dao/src/main/java/org/thingsboard/server/dao/service/queue/cassandra/QueuePartitioner.java
index faad701..a60f685 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/service/queue/cassandra/QueuePartitioner.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/service/queue/cassandra/QueuePartitioner.java
@@ -30,6 +30,7 @@ import java.time.ZoneOffset;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
+import java.util.concurrent.TimeUnit;
@Component
@Slf4j
@@ -60,7 +61,7 @@ public class QueuePartitioner {
public List<Long> findUnprocessedPartitions(UUID nodeId, long clusteredHash) {
Optional<Long> lastPartitionOption = processedPartitionRepository.findLastProcessedPartition(nodeId, clusteredHash);
- long lastPartition = lastPartitionOption.orElse(System.currentTimeMillis() - 7 * 24 * 60 * 60 * 100);
+ long lastPartition = lastPartitionOption.orElse(System.currentTimeMillis() - TimeUnit.DAYS.toMillis(7));
List<Long> unprocessedPartitions = Lists.newArrayList();
LocalDateTime current = LocalDateTime.ofInstant(Instant.ofEpochMilli(lastPartition), ZoneOffset.UTC);
diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/queue/InMemoryMsgQueue.java b/dao/src/main/java/org/thingsboard/server/dao/sql/queue/InMemoryMsgQueue.java
index dd23168..ce579cd 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/sql/queue/InMemoryMsgQueue.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/sql/queue/InMemoryMsgQueue.java
@@ -15,10 +15,14 @@
*/
package org.thingsboard.server.dao.sql.queue;
+import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
+import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.dao.queue.MsgQueue;
@@ -40,13 +44,17 @@ import java.util.concurrent.atomic.AtomicLong;
* Created by ashvayka on 27.04.18.
*/
@Component
+//@ConditionalOnProperty(prefix = "rule.queue", value = "type", havingValue = "memory", matchIfMissing = true)
@Slf4j
@SqlDao
public class InMemoryMsgQueue implements MsgQueue {
+ @Value("${rule.queue.max_size}")
+ @Getter
+ private long maxSize;
+
private ListeningExecutorService queueExecutor;
- //TODO:
- private AtomicLong pendingMsgCount;
+ private AtomicLong pendingMsgCount = new AtomicLong();
private Map<InMemoryMsgKey, Map<UUID, TbMsg>> data = new HashMap<>();
@PostConstruct
@@ -64,10 +72,15 @@ public class InMemoryMsgQueue implements MsgQueue {
@Override
public ListenableFuture<Void> put(TbMsg msg, UUID nodeId, long clusterPartition) {
- return queueExecutor.submit(() -> {
- data.computeIfAbsent(new InMemoryMsgKey(nodeId, clusterPartition), key -> new HashMap<>()).put(msg.getId(), msg);
- return null;
- });
+ if (pendingMsgCount.get() < maxSize) {
+ return queueExecutor.submit(() -> {
+ data.computeIfAbsent(new InMemoryMsgKey(nodeId, clusterPartition), key -> new HashMap<>()).put(msg.getId(), msg);
+ pendingMsgCount.incrementAndGet();
+ return null;
+ });
+ } else {
+ return Futures.immediateFailedFuture(new RuntimeException("Message queue is full!"));
+ }
}
@Override
@@ -76,14 +89,15 @@ public class InMemoryMsgQueue implements MsgQueue {
InMemoryMsgKey key = new InMemoryMsgKey(nodeId, clusterPartition);
Map<UUID, TbMsg> map = data.get(key);
if (map != null) {
- map.remove(msg.getId());
+ if (map.remove(msg.getId()) != null) {
+ pendingMsgCount.decrementAndGet();
+ }
if (map.isEmpty()) {
data.remove(key);
}
}
return null;
});
-
}
@Override
diff --git a/dao/src/test/java/org/thingsboard/server/dao/service/queue/cassandra/QueuePartitionerTest.java b/dao/src/test/java/org/thingsboard/server/dao/service/queue/cassandra/QueuePartitionerTest.java
index a6ec464..2d3b61f 100644
--- a/dao/src/test/java/org/thingsboard/server/dao/service/queue/cassandra/QueuePartitionerTest.java
+++ b/dao/src/test/java/org/thingsboard/server/dao/service/queue/cassandra/QueuePartitionerTest.java
@@ -75,7 +75,7 @@ public class QueuePartitionerTest {
long clusteredHash = 101L;
when(partitionRepo.findLastProcessedPartition(nodeId, clusteredHash)).thenReturn(Optional.empty());
List<Long> actual = queuePartitioner.findUnprocessedPartitions(nodeId, clusteredHash);
- assertEquals(1011, actual.size());
+ assertEquals(10083, actual.size());
}
}
\ No newline at end of file
diff --git a/dao/src/test/java/org/thingsboard/server/dao/service/queue/cassandra/repository/impl/CassandraAckRepositoryTest.java b/dao/src/test/java/org/thingsboard/server/dao/service/queue/cassandra/repository/impl/CassandraAckRepositoryTest.java
index a86238e..cff4dc9 100644
--- a/dao/src/test/java/org/thingsboard/server/dao/service/queue/cassandra/repository/impl/CassandraAckRepositoryTest.java
+++ b/dao/src/test/java/org/thingsboard/server/dao/service/queue/cassandra/repository/impl/CassandraAckRepositoryTest.java
@@ -21,6 +21,7 @@ import com.google.common.util.concurrent.ListenableFuture;
import org.junit.Before;
import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.test.util.ReflectionTestUtils;
import org.thingsboard.server.dao.service.AbstractServiceTest;
import org.thingsboard.server.dao.service.DaoNoSqlTest;
import org.thingsboard.server.dao.service.queue.cassandra.MsgAck;
@@ -66,6 +67,7 @@ public class CassandraAckRepositoryTest extends AbstractServiceTest {
@Test
public void expiredAcksAreNotReturned() throws ExecutionException, InterruptedException {
+ ReflectionTestUtils.setField(ackRepository, "ackQueueTtl", 1);
UUID msgId = UUIDs.timeBased();
UUID nodeId = UUIDs.timeBased();
MsgAck ack = new MsgAck(msgId, nodeId, 30L, 40L);
diff --git a/dao/src/test/java/org/thingsboard/server/dao/service/queue/cassandra/repository/impl/CassandraMsgRepositoryTest.java b/dao/src/test/java/org/thingsboard/server/dao/service/queue/cassandra/repository/impl/CassandraMsgRepositoryTest.java
index ae252ae..fa286aa 100644
--- a/dao/src/test/java/org/thingsboard/server/dao/service/queue/cassandra/repository/impl/CassandraMsgRepositoryTest.java
+++ b/dao/src/test/java/org/thingsboard/server/dao/service/queue/cassandra/repository/impl/CassandraMsgRepositoryTest.java
@@ -22,6 +22,7 @@ import com.google.common.util.concurrent.ListenableFuture;
import org.junit.Before;
import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.test.util.ReflectionTestUtils;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.RuleChainId;
import org.thingsboard.server.common.data.id.RuleNodeId;
@@ -58,6 +59,7 @@ public class CassandraMsgRepositoryTest extends AbstractServiceTest {
@Test
public void expiredMsgsAreNotReturned() throws ExecutionException, InterruptedException {
+ ReflectionTestUtils.setField(msgRepository, "msqQueueTtl", 1);
TbMsg msg = new TbMsg(UUIDs.timeBased(), "type", new DeviceId(UUIDs.timeBased()), null, TbMsgDataType.JSON, "0000",
new RuleChainId(UUIDs.timeBased()), new RuleNodeId(UUIDs.timeBased()), 0L);
UUID nodeId = UUIDs.timeBased();
diff --git a/dao/src/test/java/org/thingsboard/server/dao/service/queue/cassandra/repository/impl/CassandraProcessedPartitionRepositoryTest.java b/dao/src/test/java/org/thingsboard/server/dao/service/queue/cassandra/repository/impl/CassandraProcessedPartitionRepositoryTest.java
index 1b48790..2ae810a 100644
--- a/dao/src/test/java/org/thingsboard/server/dao/service/queue/cassandra/repository/impl/CassandraProcessedPartitionRepositoryTest.java
+++ b/dao/src/test/java/org/thingsboard/server/dao/service/queue/cassandra/repository/impl/CassandraProcessedPartitionRepositoryTest.java
@@ -21,6 +21,7 @@ import com.google.common.util.concurrent.ListenableFuture;
import org.junit.Before;
import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.test.util.ReflectionTestUtils;
import org.thingsboard.server.dao.service.AbstractServiceTest;
import org.thingsboard.server.dao.service.DaoNoSqlTest;
@@ -61,6 +62,7 @@ public class CassandraProcessedPartitionRepositoryTest extends AbstractServiceTe
@Test
public void expiredPartitionsAreNotReturned() throws ExecutionException, InterruptedException {
+ ReflectionTestUtils.setField(partitionRepository, "partitionsTtl", 1);
UUID nodeId = UUIDs.timeBased();
ListenableFuture<Void> future = partitionRepository.partitionProcessed(nodeId, 404L, 10L);
future.get();
diff --git a/dao/src/test/resources/application-test.properties b/dao/src/test/resources/application-test.properties
index 42b71f8..dbd8b84 100644
--- a/dao/src/test/resources/application-test.properties
+++ b/dao/src/test/resources/application-test.properties
@@ -28,3 +28,6 @@ redis.connection.host=localhost
redis.connection.port=6379
redis.connection.db=0
redis.connection.password=
+
+rule.queue.type=memory
+rule.queue.max_size=10000
\ No newline at end of file
diff --git a/dao/src/test/resources/nosql-test.properties b/dao/src/test/resources/nosql-test.properties
index 7c02666..e37e228 100644
--- a/dao/src/test/resources/nosql-test.properties
+++ b/dao/src/test/resources/nosql-test.properties
@@ -1,6 +1,6 @@
database.type=cassandra
cassandra.queue.partitioning=HOURS
-cassandra.queue.ack.ttl=1
-cassandra.queue.msg.ttl=1
-cassandra.queue.partitions.ttl=1
\ No newline at end of file
+cassandra.queue.ack.ttl=3600
+cassandra.queue.msg.ttl=3600
+cassandra.queue.partitions.ttl=3600
\ No newline at end of file