thingsboard-aplcache
Changes
application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java 8(+4 -4)
application/src/main/java/org/thingsboard/server/service/queue/DefaultMsgQueueService.java 102(+102 -0)
application/src/test/java/org/thingsboard/server/controller/AbstractRuleEngineControllerTest.java 3(+2 -1)
application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java 6(+3 -3)
application/src/test/java/org/thingsboard/server/rules/lifecycle/AbstractRuleEngineLifecycleIntegrationTest.java 2(+1 -1)
dao/src/main/java/org/thingsboard/server/dao/queue/db/nosql/repository/CassandraAckRepository.java 6(+3 -3)
dao/src/main/java/org/thingsboard/server/dao/queue/db/nosql/repository/CassandraMsgRepository.java 4(+2 -2)
dao/src/main/java/org/thingsboard/server/dao/queue/db/nosql/repository/CassandraProcessedPartitionRepository.java 4(+2 -2)
dao/src/main/java/org/thingsboard/server/dao/queue/db/repository/ProcessedPartitionRepository.java 2(+1 -1)
dao/src/test/java/org/thingsboard/server/dao/queue/db/nosql/repository/CassandraAckRepositoryTest.java 5(+2 -3)
dao/src/test/java/org/thingsboard/server/dao/queue/db/nosql/repository/CassandraMsgRepositoryTest.java 3(+1 -2)
Details
diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
index 665d2d9..8640598 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
@@ -63,6 +63,7 @@ import org.thingsboard.server.service.component.ComponentDiscoveryService;
import org.thingsboard.server.service.executors.DbCallbackExecutorService;
import org.thingsboard.server.service.executors.ExternalCallExecutorService;
import org.thingsboard.server.service.mail.MailExecutorService;
+import org.thingsboard.server.service.queue.MsgQueueService;
import org.thingsboard.server.service.rpc.DeviceRpcService;
import org.thingsboard.server.service.script.JsExecutorService;
import org.thingsboard.server.service.state.DeviceStateService;
@@ -196,7 +197,7 @@ public class ActorSystemContext {
@Autowired
@Getter
- private MsgQueue msgQueue;
+ private MsgQueueService msgQueueService;
@Autowired
@Getter
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
index d069cb0..dda12e5 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
@@ -95,12 +95,12 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
private void reprocess(List<RuleNode> ruleNodeList) {
for (RuleNode ruleNode : ruleNodeList) {
- for (TbMsg tbMsg : queue.findUnprocessed(ruleNode.getId().getId(), 0L)) {
+ for (TbMsg tbMsg : queue.findUnprocessed(tenantId, ruleNode.getId().getId(), 0L)) {
pushMsgToNode(nodeActors.get(ruleNode.getId()), tbMsg, "");
}
}
if (firstNode != null) {
- for (TbMsg tbMsg : queue.findUnprocessed(entityId.getId(), 0L)) {
+ for (TbMsg tbMsg : queue.findUnprocessed(tenantId, entityId.getId(), 0L)) {
pushMsgToNode(firstNode, tbMsg, "");
}
}
@@ -215,7 +215,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
int relationsCount = relations.size();
EntityId ackId = msg.getRuleNodeId() != null ? msg.getRuleNodeId() : msg.getRuleChainId();
if (relationsCount == 0) {
- queue.ack(msg, ackId.getId(), msg.getClusterPartition());
+ queue.ack(tenantId, msg, ackId.getId(), msg.getClusterPartition());
} else if (relationsCount == 1) {
for (RuleNodeRelation relation : relations) {
pushToTarget(msg, relation.getOut(), relation.getType());
@@ -233,7 +233,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
}
}
//TODO: Ideally this should happen in async way when all targets confirm that the copied messages are successfully written to corresponding target queues.
- queue.ack(msg, ackId.getId(), msg.getClusterPartition());
+ queue.ack(tenantId, msg, ackId.getId(), msg.getClusterPartition());
}
}
diff --git a/application/src/main/java/org/thingsboard/server/actors/shared/ComponentMsgProcessor.java b/application/src/main/java/org/thingsboard/server/actors/shared/ComponentMsgProcessor.java
index 4a7e072..4e9b8db 100644
--- a/application/src/main/java/org/thingsboard/server/actors/shared/ComponentMsgProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/shared/ComponentMsgProcessor.java
@@ -27,6 +27,7 @@ import org.thingsboard.server.common.data.plugin.ComponentLifecycleState;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
import org.thingsboard.server.dao.queue.MsgQueue;
+import org.thingsboard.server.service.queue.MsgQueueService;
import javax.annotation.Nullable;
import java.util.function.Consumer;
@@ -35,14 +36,14 @@ public abstract class ComponentMsgProcessor<T extends EntityId> extends Abstract
protected final TenantId tenantId;
protected final T entityId;
- protected final MsgQueue queue;
+ protected final MsgQueueService queue;
protected ComponentLifecycleState state;
protected ComponentMsgProcessor(ActorSystemContext systemContext, LoggingAdapter logger, TenantId tenantId, T id) {
super(systemContext, logger);
this.tenantId = tenantId;
this.entityId = id;
- this.queue = systemContext.getMsgQueue();
+ this.queue = systemContext.getMsgQueueService();
}
public abstract void start(ActorContext context) throws Exception;
@@ -88,7 +89,7 @@ public abstract class ComponentMsgProcessor<T extends EntityId> extends Abstract
protected void putToQueue(final TbMsg tbMsg, final Consumer<TbMsg> onSuccess) {
EntityId entityId = tbMsg.getRuleNodeId() != null ? tbMsg.getRuleNodeId() : tbMsg.getRuleChainId();
- Futures.addCallback(queue.put(tbMsg, entityId.getId(), 0), new FutureCallback<Void>() {
+ Futures.addCallback(queue.put(this.tenantId, tbMsg, entityId.getId(), 0), new FutureCallback<Void>() {
@Override
public void onSuccess(@Nullable Void result) {
onSuccess.accept(tbMsg);
diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultMsgQueueService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultMsgQueueService.java
new file mode 100644
index 0000000..a67278c
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultMsgQueueService.java
@@ -0,0 +1,102 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.service.queue;
+
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Service;
+import org.thingsboard.server.common.data.id.TenantId;
+import org.thingsboard.server.common.msg.TbMsg;
+import org.thingsboard.server.dao.queue.MsgQueue;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+@Service
+@Slf4j
+public class DefaultMsgQueueService implements MsgQueueService {
+
+ @Value("${rule.queue.max_size}")
+ private long queueMaxSize;
+
+ @Value("${rule.queue.cleanup_period}")
+ private long queueCleanUpPeriod;
+
+ @Autowired
+ private MsgQueue msgQueue;
+
+ private ScheduledExecutorService cleanupExecutor;
+
+ private Map<TenantId, AtomicLong> pendingCountPerTenant = new ConcurrentHashMap<>();
+
+ @PostConstruct
+ public void init() {
+ if (queueCleanUpPeriod > 0) {
+ cleanupExecutor = Executors.newSingleThreadScheduledExecutor();
+ cleanupExecutor.scheduleAtFixedRate(() -> cleanup(),
+ queueCleanUpPeriod, queueCleanUpPeriod, TimeUnit.SECONDS);
+ }
+ }
+
+ @PreDestroy
+ public void stop() {
+ if (cleanupExecutor != null) {
+ cleanupExecutor.shutdownNow();
+ }
+ }
+
+ @Override
+ public ListenableFuture<Void> put(TenantId tenantId, TbMsg msg, UUID nodeId, long clusterPartition) {
+ AtomicLong pendingMsgCount = pendingCountPerTenant.computeIfAbsent(tenantId, key -> new AtomicLong());
+ if (pendingMsgCount.incrementAndGet() < queueMaxSize) {
+ return msgQueue.put(tenantId, msg, nodeId, clusterPartition);
+ } else {
+ pendingMsgCount.decrementAndGet();
+ return Futures.immediateFailedFuture(new RuntimeException("Message queue is full!"));
+ }
+ }
+
+ @Override
+ public ListenableFuture<Void> ack(TenantId tenantId, TbMsg msg, UUID nodeId, long clusterPartition) {
+ ListenableFuture<Void> result = msgQueue.ack(tenantId, msg, nodeId, clusterPartition);
+ AtomicLong pendingMsgCount = pendingCountPerTenant.computeIfAbsent(tenantId, key -> new AtomicLong());
+ pendingMsgCount.decrementAndGet();
+ return result;
+ }
+
+ @Override
+ public Iterable<TbMsg> findUnprocessed(TenantId tenantId, UUID nodeId, long clusterPartition) {
+ return msgQueue.findUnprocessed(tenantId, nodeId, clusterPartition);
+ }
+
+ private void cleanup() {
+ pendingCountPerTenant.forEach((tenantId, pendingMsgCount) -> {
+ pendingMsgCount.set(0);
+ msgQueue.cleanUp(tenantId);
+ });
+ }
+
+}
diff --git a/application/src/main/java/org/thingsboard/server/service/queue/MsgQueueService.java b/application/src/main/java/org/thingsboard/server/service/queue/MsgQueueService.java
new file mode 100644
index 0000000..2cf001b
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/service/queue/MsgQueueService.java
@@ -0,0 +1,32 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.service.queue;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import org.thingsboard.server.common.data.id.TenantId;
+import org.thingsboard.server.common.msg.TbMsg;
+
+import java.util.UUID;
+
+public interface MsgQueueService {
+
+ ListenableFuture<Void> put(TenantId tenantId, TbMsg msg, UUID nodeId, long clusterPartition);
+
+ ListenableFuture<Void> ack(TenantId tenantId, TbMsg msg, UUID nodeId, long clusterPartition);
+
+ Iterable<TbMsg> findUnprocessed(TenantId tenantId, UUID nodeId, long clusterPartition);
+
+}
diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml
index ac7a259..dda6e15 100644
--- a/application/src/main/resources/thingsboard.yml
+++ b/application/src/main/resources/thingsboard.yml
@@ -305,17 +305,20 @@ spring:
rule:
queue:
- type: "memory"
- max_size: 10000
-
+ #Message queue type (memory or db)
+ type: "${RULE_QUEUE_TYPE:memory}"
+ #Message queue maximum size (per tenant)
+ max_size: "${RULE_QUEUE_MAX_SIZE:100}"
+ #Message queue cleanup period in seconds
+ cleanup_period: "${RULE_QUEUE_CLEANUP_PERIOD:3600}"
# PostgreSQL DAO Configuration
#spring:
# data:
-# jpa:
+# sql:
# repositories:
# enabled: "true"
-# jpa:
+# sql:
# hibernate:
# ddl-auto: "validate"
# database-platform: "${SPRING_JPA_DATABASE_PLATFORM:org.hibernate.dialect.PostgreSQLDialect}"
diff --git a/application/src/test/java/org/thingsboard/server/controller/AbstractRuleEngineControllerTest.java b/application/src/test/java/org/thingsboard/server/controller/AbstractRuleEngineControllerTest.java
index 5b895b1..6c2d3db 100644
--- a/application/src/test/java/org/thingsboard/server/controller/AbstractRuleEngineControllerTest.java
+++ b/application/src/test/java/org/thingsboard/server/controller/AbstractRuleEngineControllerTest.java
@@ -29,6 +29,7 @@ import org.thingsboard.server.common.data.rule.RuleChain;
import org.thingsboard.server.common.data.rule.RuleChainMetaData;
import org.thingsboard.server.dao.queue.MsgQueue;
import org.thingsboard.server.dao.rule.RuleChainService;
+import org.thingsboard.server.service.queue.MsgQueueService;
import java.io.IOException;
@@ -41,7 +42,7 @@ public class AbstractRuleEngineControllerTest extends AbstractControllerTest {
protected RuleChainService ruleChainService;
@Autowired
- protected MsgQueue msgQueue;
+ protected MsgQueueService msgQueueService;
protected RuleChain saveRuleChain(RuleChain ruleChain) throws Exception {
return doPost("/api/ruleChain", ruleChain, RuleChain.class);
diff --git a/application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java b/application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java
index a294816..356dfee 100644
--- a/application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java
+++ b/application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java
@@ -189,7 +189,7 @@ public abstract class AbstractRuleEngineFlowIntegrationTest extends AbstractRule
Assert.assertEquals("serverAttributeValue1", getMetadata(outEvent).get("ss_serverAttributeKey1").asText());
Assert.assertEquals("serverAttributeValue2", getMetadata(outEvent).get("ss_serverAttributeKey2").asText());
- List<TbMsg> unAckMsgList = Lists.newArrayList(msgQueue.findUnprocessed(ruleChain.getId().getId(), 0L));
+ List<TbMsg> unAckMsgList = Lists.newArrayList(msgQueueService.findUnprocessed(savedTenant.getId(), ruleChain.getId().getId(), 0L));
Assert.assertEquals(0, unAckMsgList.size());
}
@@ -306,10 +306,10 @@ public abstract class AbstractRuleEngineFlowIntegrationTest extends AbstractRule
Assert.assertEquals("serverAttributeValue1", getMetadata(outEvent).get("ss_serverAttributeKey1").asText());
Assert.assertEquals("serverAttributeValue2", getMetadata(outEvent).get("ss_serverAttributeKey2").asText());
- List<TbMsg> unAckMsgList = Lists.newArrayList(msgQueue.findUnprocessed(rootRuleChain.getId().getId(), 0L));
+ List<TbMsg> unAckMsgList = Lists.newArrayList(msgQueueService.findUnprocessed(savedTenant.getId(), rootRuleChain.getId().getId(), 0L));
Assert.assertEquals(0, unAckMsgList.size());
- unAckMsgList = Lists.newArrayList(msgQueue.findUnprocessed(secondaryRuleChain.getId().getId(), 0L));
+ unAckMsgList = Lists.newArrayList(msgQueueService.findUnprocessed(savedTenant.getId(), secondaryRuleChain.getId().getId(), 0L));
Assert.assertEquals(0, unAckMsgList.size());
}
diff --git a/application/src/test/java/org/thingsboard/server/rules/lifecycle/AbstractRuleEngineLifecycleIntegrationTest.java b/application/src/test/java/org/thingsboard/server/rules/lifecycle/AbstractRuleEngineLifecycleIntegrationTest.java
index 0ea6ff4..2f25b97 100644
--- a/application/src/test/java/org/thingsboard/server/rules/lifecycle/AbstractRuleEngineLifecycleIntegrationTest.java
+++ b/application/src/test/java/org/thingsboard/server/rules/lifecycle/AbstractRuleEngineLifecycleIntegrationTest.java
@@ -186,7 +186,7 @@ public abstract class AbstractRuleEngineLifecycleIntegrationTest extends Abstrac
new TbMsgMetaData(),
"{}",
ruleChain.getId(), null, 0L);
- msgQueue.put(tbMsg, ruleChain.getId().getId(), 0L);
+ msgQueueService.put(device.getTenantId(), tbMsg, ruleChain.getId().getId(), 0L);
Thread.sleep(1000);
diff --git a/dao/src/main/java/org/thingsboard/server/dao/queue/MsgQueue.java b/dao/src/main/java/org/thingsboard/server/dao/queue/MsgQueue.java
index e49b89e..19021eb 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/queue/MsgQueue.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/queue/MsgQueue.java
@@ -16,15 +16,19 @@
package org.thingsboard.server.dao.queue;
import com.google.common.util.concurrent.ListenableFuture;
+import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.TbMsg;
import java.util.UUID;
public interface MsgQueue {
- ListenableFuture<Void> put(TbMsg msg, UUID nodeId, long clusterPartition);
+ ListenableFuture<Void> put(TenantId tenantId, TbMsg msg, UUID nodeId, long clusterPartition);
- ListenableFuture<Void> ack(TbMsg msg, UUID nodeId, long clusterPartition);
+ ListenableFuture<Void> ack(TenantId tenantId, TbMsg msg, UUID nodeId, long clusterPartition);
+
+ Iterable<TbMsg> findUnprocessed(TenantId tenantId, UUID nodeId, long clusterPartition);
+
+ ListenableFuture<Void> cleanUp(TenantId tenantId);
- Iterable<TbMsg> findUnprocessed(UUID nodeId, long clusterPartition);
}
diff --git a/dao/src/main/java/org/thingsboard/server/dao/queue/QueueBenchmark.java b/dao/src/main/java/org/thingsboard/server/dao/queue/QueueBenchmark.java
index ef55bcb..ca61a63 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/queue/QueueBenchmark.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/queue/QueueBenchmark.java
@@ -28,8 +28,10 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.context.annotation.Bean;
+import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.RuleChainId;
import org.thingsboard.server.common.data.id.RuleNodeId;
+import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgDataType;
import org.thingsboard.server.common.msg.TbMsgMetaData;
@@ -80,7 +82,7 @@ public class QueueBenchmark implements CommandLineRunner {
try {
TbMsg msg = randomMsg();
UUID nodeId = UUIDs.timeBased();
- ListenableFuture<Void> put = msgQueue.put(msg, nodeId, 100L);
+ ListenableFuture<Void> put = msgQueue.put(new TenantId(EntityId.NULL_UUID), msg, nodeId, 100L);
// ListenableFuture<Void> put = msgQueue.ack(msg, nodeId, 100L);
Futures.addCallback(put, new FutureCallback<Void>() {
@Override