thingsboard-aplcache

Changes

Details

diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
index 665d2d9..8640598 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
@@ -63,6 +63,7 @@ import org.thingsboard.server.service.component.ComponentDiscoveryService;
 import org.thingsboard.server.service.executors.DbCallbackExecutorService;
 import org.thingsboard.server.service.executors.ExternalCallExecutorService;
 import org.thingsboard.server.service.mail.MailExecutorService;
+import org.thingsboard.server.service.queue.MsgQueueService;
 import org.thingsboard.server.service.rpc.DeviceRpcService;
 import org.thingsboard.server.service.script.JsExecutorService;
 import org.thingsboard.server.service.state.DeviceStateService;
@@ -196,7 +197,7 @@ public class ActorSystemContext {
 
     @Autowired
     @Getter
-    private MsgQueue msgQueue;
+    private MsgQueueService msgQueueService;
 
     @Autowired
     @Getter
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
index d069cb0..dda12e5 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
@@ -95,12 +95,12 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
 
     private void reprocess(List<RuleNode> ruleNodeList) {
         for (RuleNode ruleNode : ruleNodeList) {
-            for (TbMsg tbMsg : queue.findUnprocessed(ruleNode.getId().getId(), 0L)) {
+            for (TbMsg tbMsg : queue.findUnprocessed(tenantId, ruleNode.getId().getId(), 0L)) {
                 pushMsgToNode(nodeActors.get(ruleNode.getId()), tbMsg, "");
             }
         }
         if (firstNode != null) {
-            for (TbMsg tbMsg : queue.findUnprocessed(entityId.getId(), 0L)) {
+            for (TbMsg tbMsg : queue.findUnprocessed(tenantId, entityId.getId(), 0L)) {
                 pushMsgToNode(firstNode, tbMsg, "");
             }
         }
@@ -215,7 +215,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
         int relationsCount = relations.size();
         EntityId ackId = msg.getRuleNodeId() != null ? msg.getRuleNodeId() : msg.getRuleChainId();
         if (relationsCount == 0) {
-            queue.ack(msg, ackId.getId(), msg.getClusterPartition());
+            queue.ack(tenantId, msg, ackId.getId(), msg.getClusterPartition());
         } else if (relationsCount == 1) {
             for (RuleNodeRelation relation : relations) {
                 pushToTarget(msg, relation.getOut(), relation.getType());
@@ -233,7 +233,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
                 }
             }
             //TODO: Ideally this should happen in async way when all targets confirm that the copied messages are successfully written to corresponding target queues.
-            queue.ack(msg, ackId.getId(), msg.getClusterPartition());
+            queue.ack(tenantId, msg, ackId.getId(), msg.getClusterPartition());
         }
     }
 
diff --git a/application/src/main/java/org/thingsboard/server/actors/shared/ComponentMsgProcessor.java b/application/src/main/java/org/thingsboard/server/actors/shared/ComponentMsgProcessor.java
index 4a7e072..4e9b8db 100644
--- a/application/src/main/java/org/thingsboard/server/actors/shared/ComponentMsgProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/shared/ComponentMsgProcessor.java
@@ -27,6 +27,7 @@ import org.thingsboard.server.common.data.plugin.ComponentLifecycleState;
 import org.thingsboard.server.common.msg.TbMsg;
 import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
 import org.thingsboard.server.dao.queue.MsgQueue;
+import org.thingsboard.server.service.queue.MsgQueueService;
 
 import javax.annotation.Nullable;
 import java.util.function.Consumer;
@@ -35,14 +36,14 @@ public abstract class ComponentMsgProcessor<T extends EntityId> extends Abstract
 
     protected final TenantId tenantId;
     protected final T entityId;
-    protected final MsgQueue queue;
+    protected final MsgQueueService queue;
     protected ComponentLifecycleState state;
 
     protected ComponentMsgProcessor(ActorSystemContext systemContext, LoggingAdapter logger, TenantId tenantId, T id) {
         super(systemContext, logger);
         this.tenantId = tenantId;
         this.entityId = id;
-        this.queue = systemContext.getMsgQueue();
+        this.queue = systemContext.getMsgQueueService();
     }
 
     public abstract void start(ActorContext context) throws Exception;
@@ -88,7 +89,7 @@ public abstract class ComponentMsgProcessor<T extends EntityId> extends Abstract
 
     protected void putToQueue(final TbMsg tbMsg, final Consumer<TbMsg> onSuccess) {
         EntityId entityId = tbMsg.getRuleNodeId() != null ? tbMsg.getRuleNodeId() : tbMsg.getRuleChainId();
-        Futures.addCallback(queue.put(tbMsg, entityId.getId(), 0), new FutureCallback<Void>() {
+        Futures.addCallback(queue.put(this.tenantId, tbMsg, entityId.getId(), 0), new FutureCallback<Void>() {
             @Override
             public void onSuccess(@Nullable Void result) {
                 onSuccess.accept(tbMsg);
diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultMsgQueueService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultMsgQueueService.java
new file mode 100644
index 0000000..a67278c
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultMsgQueueService.java
@@ -0,0 +1,102 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.service.queue;
+
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Service;
+import org.thingsboard.server.common.data.id.TenantId;
+import org.thingsboard.server.common.msg.TbMsg;
+import org.thingsboard.server.dao.queue.MsgQueue;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+@Service
+@Slf4j
+public class DefaultMsgQueueService implements MsgQueueService {
+
+    @Value("${rule.queue.max_size}")
+    private long queueMaxSize;
+
+    @Value("${rule.queue.cleanup_period}")
+    private long queueCleanUpPeriod;
+
+    @Autowired
+    private MsgQueue msgQueue;
+
+    private ScheduledExecutorService cleanupExecutor;
+
+    private Map<TenantId, AtomicLong> pendingCountPerTenant = new ConcurrentHashMap<>();
+
+    @PostConstruct
+    public void init() {
+        if (queueCleanUpPeriod > 0) {
+            cleanupExecutor = Executors.newSingleThreadScheduledExecutor();
+            cleanupExecutor.scheduleAtFixedRate(() -> cleanup(),
+                    queueCleanUpPeriod, queueCleanUpPeriod, TimeUnit.SECONDS);
+        }
+    }
+
+    @PreDestroy
+    public void stop() {
+        if (cleanupExecutor != null) {
+            cleanupExecutor.shutdownNow();
+        }
+    }
+
+    @Override
+    public ListenableFuture<Void> put(TenantId tenantId, TbMsg msg, UUID nodeId, long clusterPartition) {
+        AtomicLong pendingMsgCount = pendingCountPerTenant.computeIfAbsent(tenantId, key -> new AtomicLong());
+        if (pendingMsgCount.incrementAndGet() < queueMaxSize) {
+            return msgQueue.put(tenantId, msg, nodeId, clusterPartition);
+        } else {
+            pendingMsgCount.decrementAndGet();
+            return Futures.immediateFailedFuture(new RuntimeException("Message queue is full!"));
+        }
+    }
+
+    @Override
+    public ListenableFuture<Void> ack(TenantId tenantId, TbMsg msg, UUID nodeId, long clusterPartition) {
+        ListenableFuture<Void> result = msgQueue.ack(tenantId, msg, nodeId, clusterPartition);
+        AtomicLong pendingMsgCount = pendingCountPerTenant.computeIfAbsent(tenantId, key -> new AtomicLong());
+        pendingMsgCount.decrementAndGet();
+        return result;
+    }
+
+    @Override
+    public Iterable<TbMsg> findUnprocessed(TenantId tenantId, UUID nodeId, long clusterPartition) {
+        return msgQueue.findUnprocessed(tenantId, nodeId, clusterPartition);
+    }
+
+    private void cleanup() {
+        pendingCountPerTenant.forEach((tenantId, pendingMsgCount) -> {
+            pendingMsgCount.set(0);
+            msgQueue.cleanUp(tenantId);
+        });
+    }
+
+}
diff --git a/application/src/main/java/org/thingsboard/server/service/queue/MsgQueueService.java b/application/src/main/java/org/thingsboard/server/service/queue/MsgQueueService.java
new file mode 100644
index 0000000..2cf001b
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/service/queue/MsgQueueService.java
@@ -0,0 +1,32 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.service.queue;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import org.thingsboard.server.common.data.id.TenantId;
+import org.thingsboard.server.common.msg.TbMsg;
+
+import java.util.UUID;
+
+public interface MsgQueueService {
+
+    ListenableFuture<Void> put(TenantId tenantId, TbMsg msg, UUID nodeId, long clusterPartition);
+
+    ListenableFuture<Void> ack(TenantId tenantId, TbMsg msg, UUID nodeId, long clusterPartition);
+
+    Iterable<TbMsg> findUnprocessed(TenantId tenantId, UUID nodeId, long clusterPartition);
+
+}
diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml
index ac7a259..dda6e15 100644
--- a/application/src/main/resources/thingsboard.yml
+++ b/application/src/main/resources/thingsboard.yml
@@ -305,17 +305,20 @@ spring:
 
 rule:
   queue:
-    type: "memory"
-    max_size: 10000
-
+    #Message queue type (memory or db)
+    type: "${RULE_QUEUE_TYPE:memory}"
+    #Message queue maximum size (per tenant)
+    max_size: "${RULE_QUEUE_MAX_SIZE:100}"
+    #Message queue cleanup period in seconds
+    cleanup_period: "${RULE_QUEUE_CLEANUP_PERIOD:3600}"
 
 # PostgreSQL DAO Configuration
 #spring:
 #  data:
-#    jpa:
+#    sql:
 #      repositories:
 #        enabled: "true"
-#  jpa:
+#  sql:
 #    hibernate:
 #      ddl-auto: "validate"
 #    database-platform: "${SPRING_JPA_DATABASE_PLATFORM:org.hibernate.dialect.PostgreSQLDialect}"
diff --git a/application/src/test/java/org/thingsboard/server/controller/AbstractRuleEngineControllerTest.java b/application/src/test/java/org/thingsboard/server/controller/AbstractRuleEngineControllerTest.java
index 5b895b1..6c2d3db 100644
--- a/application/src/test/java/org/thingsboard/server/controller/AbstractRuleEngineControllerTest.java
+++ b/application/src/test/java/org/thingsboard/server/controller/AbstractRuleEngineControllerTest.java
@@ -29,6 +29,7 @@ import org.thingsboard.server.common.data.rule.RuleChain;
 import org.thingsboard.server.common.data.rule.RuleChainMetaData;
 import org.thingsboard.server.dao.queue.MsgQueue;
 import org.thingsboard.server.dao.rule.RuleChainService;
+import org.thingsboard.server.service.queue.MsgQueueService;
 
 import java.io.IOException;
 
@@ -41,7 +42,7 @@ public class AbstractRuleEngineControllerTest extends AbstractControllerTest {
     protected RuleChainService ruleChainService;
 
     @Autowired
-    protected MsgQueue msgQueue;
+    protected MsgQueueService msgQueueService;
 
     protected RuleChain saveRuleChain(RuleChain ruleChain) throws Exception {
         return doPost("/api/ruleChain", ruleChain, RuleChain.class);
diff --git a/application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java b/application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java
index a294816..356dfee 100644
--- a/application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java
+++ b/application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java
@@ -189,7 +189,7 @@ public abstract class AbstractRuleEngineFlowIntegrationTest extends AbstractRule
         Assert.assertEquals("serverAttributeValue1", getMetadata(outEvent).get("ss_serverAttributeKey1").asText());
         Assert.assertEquals("serverAttributeValue2", getMetadata(outEvent).get("ss_serverAttributeKey2").asText());
 
-        List<TbMsg> unAckMsgList = Lists.newArrayList(msgQueue.findUnprocessed(ruleChain.getId().getId(), 0L));
+        List<TbMsg> unAckMsgList = Lists.newArrayList(msgQueueService.findUnprocessed(savedTenant.getId(), ruleChain.getId().getId(), 0L));
         Assert.assertEquals(0, unAckMsgList.size());
     }
 
@@ -306,10 +306,10 @@ public abstract class AbstractRuleEngineFlowIntegrationTest extends AbstractRule
         Assert.assertEquals("serverAttributeValue1", getMetadata(outEvent).get("ss_serverAttributeKey1").asText());
         Assert.assertEquals("serverAttributeValue2", getMetadata(outEvent).get("ss_serverAttributeKey2").asText());
 
-        List<TbMsg> unAckMsgList = Lists.newArrayList(msgQueue.findUnprocessed(rootRuleChain.getId().getId(), 0L));
+        List<TbMsg> unAckMsgList = Lists.newArrayList(msgQueueService.findUnprocessed(savedTenant.getId(), rootRuleChain.getId().getId(), 0L));
         Assert.assertEquals(0, unAckMsgList.size());
 
-        unAckMsgList = Lists.newArrayList(msgQueue.findUnprocessed(secondaryRuleChain.getId().getId(), 0L));
+        unAckMsgList = Lists.newArrayList(msgQueueService.findUnprocessed(savedTenant.getId(), secondaryRuleChain.getId().getId(), 0L));
         Assert.assertEquals(0, unAckMsgList.size());
     }
 
diff --git a/application/src/test/java/org/thingsboard/server/rules/lifecycle/AbstractRuleEngineLifecycleIntegrationTest.java b/application/src/test/java/org/thingsboard/server/rules/lifecycle/AbstractRuleEngineLifecycleIntegrationTest.java
index 0ea6ff4..2f25b97 100644
--- a/application/src/test/java/org/thingsboard/server/rules/lifecycle/AbstractRuleEngineLifecycleIntegrationTest.java
+++ b/application/src/test/java/org/thingsboard/server/rules/lifecycle/AbstractRuleEngineLifecycleIntegrationTest.java
@@ -186,7 +186,7 @@ public abstract class AbstractRuleEngineLifecycleIntegrationTest extends Abstrac
                 new TbMsgMetaData(),
                 "{}",
                 ruleChain.getId(), null, 0L);
-        msgQueue.put(tbMsg, ruleChain.getId().getId(), 0L);
+        msgQueueService.put(device.getTenantId(), tbMsg, ruleChain.getId().getId(), 0L);
 
         Thread.sleep(1000);
 
diff --git a/dao/src/main/java/org/thingsboard/server/dao/queue/MsgQueue.java b/dao/src/main/java/org/thingsboard/server/dao/queue/MsgQueue.java
index e49b89e..19021eb 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/queue/MsgQueue.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/queue/MsgQueue.java
@@ -16,15 +16,19 @@
 package org.thingsboard.server.dao.queue;
 
 import com.google.common.util.concurrent.ListenableFuture;
+import org.thingsboard.server.common.data.id.TenantId;
 import org.thingsboard.server.common.msg.TbMsg;
 
 import java.util.UUID;
 
 public interface MsgQueue {
 
-    ListenableFuture<Void> put(TbMsg msg, UUID nodeId, long clusterPartition);
+    ListenableFuture<Void> put(TenantId tenantId, TbMsg msg, UUID nodeId, long clusterPartition);
 
-    ListenableFuture<Void> ack(TbMsg msg, UUID nodeId, long clusterPartition);
+    ListenableFuture<Void> ack(TenantId tenantId, TbMsg msg, UUID nodeId, long clusterPartition);
+
+    Iterable<TbMsg> findUnprocessed(TenantId tenantId, UUID nodeId, long clusterPartition);
+
+    ListenableFuture<Void> cleanUp(TenantId tenantId);
 
-    Iterable<TbMsg> findUnprocessed(UUID nodeId, long clusterPartition);
 }
diff --git a/dao/src/main/java/org/thingsboard/server/dao/queue/QueueBenchmark.java b/dao/src/main/java/org/thingsboard/server/dao/queue/QueueBenchmark.java
index ef55bcb..ca61a63 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/queue/QueueBenchmark.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/queue/QueueBenchmark.java
@@ -28,8 +28,10 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.CommandLineRunner;
 import org.springframework.boot.SpringApplication;
 import org.springframework.context.annotation.Bean;
+import org.thingsboard.server.common.data.id.EntityId;
 import org.thingsboard.server.common.data.id.RuleChainId;
 import org.thingsboard.server.common.data.id.RuleNodeId;
+import org.thingsboard.server.common.data.id.TenantId;
 import org.thingsboard.server.common.msg.TbMsg;
 import org.thingsboard.server.common.msg.TbMsgDataType;
 import org.thingsboard.server.common.msg.TbMsgMetaData;
@@ -80,7 +82,7 @@ public class QueueBenchmark implements CommandLineRunner {
                     try {
                         TbMsg msg = randomMsg();
                         UUID nodeId = UUIDs.timeBased();
-                        ListenableFuture<Void> put = msgQueue.put(msg, nodeId, 100L);
+                        ListenableFuture<Void> put = msgQueue.put(new TenantId(EntityId.NULL_UUID), msg, nodeId, 100L);
 //                    ListenableFuture<Void> put = msgQueue.ack(msg, nodeId, 100L);
                         Futures.addCallback(put, new FutureCallback<Void>() {
                             @Override