thingsboard-aplcache

Details

diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
index 4628636..2cd1755 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
@@ -86,13 +86,26 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
                 nodeActors.put(ruleNode.getId(), new RuleNodeCtx(tenantId, self, ruleNodeActor, ruleNode));
             }
             initRoutes(ruleChain, ruleNodeList);
-            //TODO: read all messages from queues of the actors and push then to the corresponding node actors;
+            reprocess(ruleNodeList);
             started = true;
         } else {
             onUpdate(context);
         }
     }
 
+    private void reprocess(List<RuleNode> ruleNodeList) {
+        for (RuleNode ruleNode : ruleNodeList) {
+            for (TbMsg tbMsg : queue.findUnprocessed(ruleNode.getId().getId(), 0L)) {
+                pushMsgToNode(nodeActors.get(ruleNode.getId()), tbMsg);
+            }
+        }
+        if (firstNode != null) {
+            for (TbMsg tbMsg : queue.findUnprocessed(entityId.getId(), 0L)) {
+                pushMsgToNode(firstNode, tbMsg);
+            }
+        }
+    }
+
     @Override
     public void onUpdate(ActorContext context) throws Exception {
         RuleChain ruleChain = service.findRuleChainById(entityId);
@@ -117,6 +130,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
         });
 
         initRoutes(ruleChain, ruleNodeList);
+        reprocess(ruleNodeList);
     }
 
     @Override
@@ -182,7 +196,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
 
     void onRuleChainToRuleChainMsg(RuleChainToRuleChainMsg envelope) {
         checkActive();
-        if(envelope.isEnqueue()) {
+        if (envelope.isEnqueue()) {
             putToQueue(enrichWithRuleChainId(envelope.getMsg()), msg -> pushMsgToNode(firstNode, msg));
         } else {
             pushMsgToNode(firstNode, envelope.getMsg());
diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml
index d9e9012..744cc5b 100644
--- a/application/src/main/resources/thingsboard.yml
+++ b/application/src/main/resources/thingsboard.yml
@@ -302,7 +302,9 @@ spring:
 
 rule:
   queue:
-    msg_partitioning: "${QUEUE_MSG_PARTITIONING:HOURS}"
+    type: "memory"
+    max_size: 10000
+
 
 # PostgreSQL DAO Configuration
 #spring:
diff --git a/application/src/test/java/org/thingsboard/server/rules/lifecycle/AbstractRuleEngineLifecycleIntegrationTest.java b/application/src/test/java/org/thingsboard/server/rules/lifecycle/AbstractRuleEngineLifecycleIntegrationTest.java
index 18583fd..0ea6ff4 100644
--- a/application/src/test/java/org/thingsboard/server/rules/lifecycle/AbstractRuleEngineLifecycleIntegrationTest.java
+++ b/application/src/test/java/org/thingsboard/server/rules/lifecycle/AbstractRuleEngineLifecycleIntegrationTest.java
@@ -159,4 +159,72 @@ public abstract class AbstractRuleEngineLifecycleIntegrationTest extends Abstrac
         Assert.assertEquals("serverAttributeValue", getMetadata(outEvent).get("ss_serverAttributeKey").asText());
     }
 
+    @Test
+    public void testRuleChainWithOneRuleAndMsgFromQueue() throws Exception {
+        // Creating Rule Chain
+        RuleChain ruleChain = new RuleChain();
+        ruleChain.setName("Simple Rule Chain");
+        ruleChain.setTenantId(savedTenant.getId());
+        ruleChain.setRoot(true);
+        ruleChain.setDebugMode(true);
+        ruleChain = saveRuleChain(ruleChain);
+        Assert.assertNull(ruleChain.getFirstRuleNodeId());
+
+        // Saving the device
+        Device device = new Device();
+        device.setName("My device");
+        device.setType("default");
+        device = doPost("/api/device", device, Device.class);
+
+        attributesService.save(device.getId(), DataConstants.SERVER_SCOPE,
+                Collections.singletonList(new BaseAttributeKvEntry(new StringDataEntry("serverAttributeKey", "serverAttributeValue"), System.currentTimeMillis())));
+
+        // Pushing Message to the system
+        TbMsg tbMsg = new TbMsg(UUIDs.timeBased(),
+                "CUSTOM",
+                device.getId(),
+                new TbMsgMetaData(),
+                "{}",
+                ruleChain.getId(), null, 0L);
+        msgQueue.put(tbMsg, ruleChain.getId().getId(), 0L);
+
+        Thread.sleep(1000);
+
+        RuleChainMetaData metaData = new RuleChainMetaData();
+        metaData.setRuleChainId(ruleChain.getId());
+
+        RuleNode ruleNode = new RuleNode();
+        ruleNode.setName("Simple Rule Node");
+        ruleNode.setType(org.thingsboard.rule.engine.metadata.TbGetAttributesNode.class.getName());
+        ruleNode.setDebugMode(true);
+        TbGetAttributesNodeConfiguration configuration = new TbGetAttributesNodeConfiguration();
+        configuration.setServerAttributeNames(Collections.singletonList("serverAttributeKey"));
+        ruleNode.setConfiguration(mapper.valueToTree(configuration));
+
+        metaData.setNodes(Collections.singletonList(ruleNode));
+        metaData.setFirstNodeIndex(0);
+
+        metaData = saveRuleChainMetaData(metaData);
+        Assert.assertNotNull(metaData);
+
+        ruleChain = getRuleChain(ruleChain.getId());
+        Assert.assertNotNull(ruleChain.getFirstRuleNodeId());
+
+        Thread.sleep(3000);
+
+        TimePageData<Event> events = getDebugEvents(savedTenant.getId(), ruleChain.getFirstRuleNodeId(), 1000);
+
+        Assert.assertEquals(2, events.getData().size());
+
+        Event inEvent = events.getData().stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.IN)).findFirst().get();
+        Assert.assertEquals(ruleChain.getFirstRuleNodeId(), inEvent.getEntityId());
+        Assert.assertEquals(device.getId().getId().toString(), inEvent.getBody().get("entityId").asText());
+
+        Event outEvent = events.getData().stream().filter(e -> e.getBody().get("type").asText().equals(DataConstants.OUT)).findFirst().get();
+        Assert.assertEquals(ruleChain.getFirstRuleNodeId(), outEvent.getEntityId());
+        Assert.assertEquals(device.getId().getId().toString(), outEvent.getBody().get("entityId").asText());
+
+        Assert.assertEquals("serverAttributeValue", getMetadata(outEvent).get("ss_serverAttributeKey").asText());
+    }
+
 }
diff --git a/dao/src/main/java/org/thingsboard/server/dao/service/queue/cassandra/QueuePartitioner.java b/dao/src/main/java/org/thingsboard/server/dao/service/queue/cassandra/QueuePartitioner.java
index faad701..a60f685 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/service/queue/cassandra/QueuePartitioner.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/service/queue/cassandra/QueuePartitioner.java
@@ -30,6 +30,7 @@ import java.time.ZoneOffset;
 import java.util.List;
 import java.util.Optional;
 import java.util.UUID;
+import java.util.concurrent.TimeUnit;
 
 @Component
 @Slf4j
@@ -60,7 +61,7 @@ public class QueuePartitioner {
 
     public List<Long> findUnprocessedPartitions(UUID nodeId, long clusteredHash) {
         Optional<Long> lastPartitionOption = processedPartitionRepository.findLastProcessedPartition(nodeId, clusteredHash);
-        long lastPartition = lastPartitionOption.orElse(System.currentTimeMillis() - 7 * 24 * 60 * 60 * 100);
+        long lastPartition = lastPartitionOption.orElse(System.currentTimeMillis() - TimeUnit.DAYS.toMillis(7));
         List<Long> unprocessedPartitions = Lists.newArrayList();
 
         LocalDateTime current = LocalDateTime.ofInstant(Instant.ofEpochMilli(lastPartition), ZoneOffset.UTC);
diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/queue/InMemoryMsgQueue.java b/dao/src/main/java/org/thingsboard/server/dao/sql/queue/InMemoryMsgQueue.java
index dd23168..ce579cd 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/sql/queue/InMemoryMsgQueue.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/sql/queue/InMemoryMsgQueue.java
@@ -15,10 +15,14 @@
  */
 package org.thingsboard.server.dao.sql.queue;
 
+import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
+import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 import org.springframework.stereotype.Component;
 import org.thingsboard.server.common.msg.TbMsg;
 import org.thingsboard.server.dao.queue.MsgQueue;
@@ -40,13 +44,17 @@ import java.util.concurrent.atomic.AtomicLong;
  * Created by ashvayka on 27.04.18.
  */
 @Component
+//@ConditionalOnProperty(prefix = "rule.queue", value = "type", havingValue = "memory", matchIfMissing = true)
 @Slf4j
 @SqlDao
 public class InMemoryMsgQueue implements MsgQueue {
 
+    @Value("${rule.queue.max_size}")
+    @Getter
+    private long maxSize;
+
     private ListeningExecutorService queueExecutor;
-    //TODO:
-    private AtomicLong pendingMsgCount;
+    private AtomicLong pendingMsgCount = new AtomicLong();
     private Map<InMemoryMsgKey, Map<UUID, TbMsg>> data = new HashMap<>();
 
     @PostConstruct
@@ -64,10 +72,15 @@ public class InMemoryMsgQueue implements MsgQueue {
 
     @Override
     public ListenableFuture<Void> put(TbMsg msg, UUID nodeId, long clusterPartition) {
-        return queueExecutor.submit(() -> {
-            data.computeIfAbsent(new InMemoryMsgKey(nodeId, clusterPartition), key -> new HashMap<>()).put(msg.getId(), msg);
-            return null;
-        });
+        if (pendingMsgCount.get() < maxSize) {
+            return queueExecutor.submit(() -> {
+                data.computeIfAbsent(new InMemoryMsgKey(nodeId, clusterPartition), key -> new HashMap<>()).put(msg.getId(), msg);
+                pendingMsgCount.incrementAndGet();
+                return null;
+            });
+        } else {
+            return Futures.immediateFailedFuture(new RuntimeException("Message queue is full!"));
+        }
     }
 
     @Override
@@ -76,14 +89,15 @@ public class InMemoryMsgQueue implements MsgQueue {
             InMemoryMsgKey key = new InMemoryMsgKey(nodeId, clusterPartition);
             Map<UUID, TbMsg> map = data.get(key);
             if (map != null) {
-                map.remove(msg.getId());
+                if (map.remove(msg.getId()) != null) {
+                    pendingMsgCount.decrementAndGet();
+                }
                 if (map.isEmpty()) {
                     data.remove(key);
                 }
             }
             return null;
         });
-
     }
 
     @Override
diff --git a/dao/src/test/java/org/thingsboard/server/dao/service/queue/cassandra/QueuePartitionerTest.java b/dao/src/test/java/org/thingsboard/server/dao/service/queue/cassandra/QueuePartitionerTest.java
index a6ec464..2d3b61f 100644
--- a/dao/src/test/java/org/thingsboard/server/dao/service/queue/cassandra/QueuePartitionerTest.java
+++ b/dao/src/test/java/org/thingsboard/server/dao/service/queue/cassandra/QueuePartitionerTest.java
@@ -75,7 +75,7 @@ public class QueuePartitionerTest {
         long clusteredHash = 101L;
         when(partitionRepo.findLastProcessedPartition(nodeId, clusteredHash)).thenReturn(Optional.empty());
         List<Long> actual = queuePartitioner.findUnprocessedPartitions(nodeId, clusteredHash);
-        assertEquals(1011, actual.size());
+        assertEquals(10083, actual.size());
     }
 
 }
\ No newline at end of file
diff --git a/dao/src/test/java/org/thingsboard/server/dao/service/queue/cassandra/repository/impl/CassandraAckRepositoryTest.java b/dao/src/test/java/org/thingsboard/server/dao/service/queue/cassandra/repository/impl/CassandraAckRepositoryTest.java
index a86238e..cff4dc9 100644
--- a/dao/src/test/java/org/thingsboard/server/dao/service/queue/cassandra/repository/impl/CassandraAckRepositoryTest.java
+++ b/dao/src/test/java/org/thingsboard/server/dao/service/queue/cassandra/repository/impl/CassandraAckRepositoryTest.java
@@ -21,6 +21,7 @@ import com.google.common.util.concurrent.ListenableFuture;
 import org.junit.Before;
 import org.junit.Test;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.test.util.ReflectionTestUtils;
 import org.thingsboard.server.dao.service.AbstractServiceTest;
 import org.thingsboard.server.dao.service.DaoNoSqlTest;
 import org.thingsboard.server.dao.service.queue.cassandra.MsgAck;
@@ -66,6 +67,7 @@ public class CassandraAckRepositoryTest extends AbstractServiceTest {
 
     @Test
     public void expiredAcksAreNotReturned() throws ExecutionException, InterruptedException {
+        ReflectionTestUtils.setField(ackRepository, "ackQueueTtl", 1);
         UUID msgId = UUIDs.timeBased();
         UUID nodeId = UUIDs.timeBased();
         MsgAck ack = new MsgAck(msgId, nodeId, 30L, 40L);
diff --git a/dao/src/test/java/org/thingsboard/server/dao/service/queue/cassandra/repository/impl/CassandraMsgRepositoryTest.java b/dao/src/test/java/org/thingsboard/server/dao/service/queue/cassandra/repository/impl/CassandraMsgRepositoryTest.java
index ae252ae..fa286aa 100644
--- a/dao/src/test/java/org/thingsboard/server/dao/service/queue/cassandra/repository/impl/CassandraMsgRepositoryTest.java
+++ b/dao/src/test/java/org/thingsboard/server/dao/service/queue/cassandra/repository/impl/CassandraMsgRepositoryTest.java
@@ -22,6 +22,7 @@ import com.google.common.util.concurrent.ListenableFuture;
 import org.junit.Before;
 import org.junit.Test;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.test.util.ReflectionTestUtils;
 import org.thingsboard.server.common.data.id.DeviceId;
 import org.thingsboard.server.common.data.id.RuleChainId;
 import org.thingsboard.server.common.data.id.RuleNodeId;
@@ -58,6 +59,7 @@ public class CassandraMsgRepositoryTest extends AbstractServiceTest {
 
     @Test
     public void expiredMsgsAreNotReturned() throws ExecutionException, InterruptedException {
+        ReflectionTestUtils.setField(msgRepository, "msqQueueTtl", 1);
         TbMsg msg = new TbMsg(UUIDs.timeBased(), "type", new DeviceId(UUIDs.timeBased()), null, TbMsgDataType.JSON, "0000",
                 new RuleChainId(UUIDs.timeBased()), new RuleNodeId(UUIDs.timeBased()), 0L);
         UUID nodeId = UUIDs.timeBased();
diff --git a/dao/src/test/java/org/thingsboard/server/dao/service/queue/cassandra/repository/impl/CassandraProcessedPartitionRepositoryTest.java b/dao/src/test/java/org/thingsboard/server/dao/service/queue/cassandra/repository/impl/CassandraProcessedPartitionRepositoryTest.java
index 1b48790..2ae810a 100644
--- a/dao/src/test/java/org/thingsboard/server/dao/service/queue/cassandra/repository/impl/CassandraProcessedPartitionRepositoryTest.java
+++ b/dao/src/test/java/org/thingsboard/server/dao/service/queue/cassandra/repository/impl/CassandraProcessedPartitionRepositoryTest.java
@@ -21,6 +21,7 @@ import com.google.common.util.concurrent.ListenableFuture;
 import org.junit.Before;
 import org.junit.Test;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.test.util.ReflectionTestUtils;
 import org.thingsboard.server.dao.service.AbstractServiceTest;
 import org.thingsboard.server.dao.service.DaoNoSqlTest;
 
@@ -61,6 +62,7 @@ public class CassandraProcessedPartitionRepositoryTest extends AbstractServiceTe
 
     @Test
     public void expiredPartitionsAreNotReturned() throws ExecutionException, InterruptedException {
+        ReflectionTestUtils.setField(partitionRepository, "partitionsTtl", 1);
         UUID nodeId = UUIDs.timeBased();
         ListenableFuture<Void> future = partitionRepository.partitionProcessed(nodeId, 404L, 10L);
         future.get();
diff --git a/dao/src/test/resources/application-test.properties b/dao/src/test/resources/application-test.properties
index 42b71f8..dbd8b84 100644
--- a/dao/src/test/resources/application-test.properties
+++ b/dao/src/test/resources/application-test.properties
@@ -28,3 +28,6 @@ redis.connection.host=localhost
 redis.connection.port=6379
 redis.connection.db=0
 redis.connection.password=
+
+rule.queue.type=memory
+rule.queue.max_size=10000
\ No newline at end of file
diff --git a/dao/src/test/resources/nosql-test.properties b/dao/src/test/resources/nosql-test.properties
index 7c02666..e37e228 100644
--- a/dao/src/test/resources/nosql-test.properties
+++ b/dao/src/test/resources/nosql-test.properties
@@ -1,6 +1,6 @@
 database.type=cassandra
 
 cassandra.queue.partitioning=HOURS
-cassandra.queue.ack.ttl=1
-cassandra.queue.msg.ttl=1
-cassandra.queue.partitions.ttl=1
\ No newline at end of file
+cassandra.queue.ack.ttl=3600
+cassandra.queue.msg.ttl=3600
+cassandra.queue.partitions.ttl=3600
\ No newline at end of file