thingsboard-aplcache
Changes
common/transport/src/main/java/org/thingsboard/server/common/transport/quota/AbstractQuotaService.java 21(+6 -15)
common/transport/src/main/java/org/thingsboard/server/common/transport/quota/host/HostIntervalRegistryCleaner.java 29(+29 -0)
common/transport/src/main/java/org/thingsboard/server/common/transport/quota/host/HostIntervalRegistryLogger.java 52(+52 -0)
common/transport/src/main/java/org/thingsboard/server/common/transport/quota/host/HostRequestIntervalRegistry.java 37(+37 -0)
common/transport/src/main/java/org/thingsboard/server/common/transport/quota/host/HostRequestLimitPolicy.java 13(+4 -9)
common/transport/src/main/java/org/thingsboard/server/common/transport/quota/host/HostRequestsQuotaService.java 37(+37 -0)
common/transport/src/main/java/org/thingsboard/server/common/transport/quota/inmemory/IntervalRegistryCleaner.java 10(+3 -7)
common/transport/src/main/java/org/thingsboard/server/common/transport/quota/inmemory/IntervalRegistryLogger.java 24(+4 -20)
common/transport/src/main/java/org/thingsboard/server/common/transport/quota/inmemory/KeyBasedIntervalRegistry.java 25(+7 -18)
common/transport/src/main/java/org/thingsboard/server/common/transport/quota/RequestLimitPolicy.java 30(+30 -0)
common/transport/src/main/java/org/thingsboard/server/common/transport/quota/tenant/TenantIntervalRegistryCleaner.java 29(+29 -0)
common/transport/src/main/java/org/thingsboard/server/common/transport/quota/tenant/TenantIntervalRegistryLogger.java 52(+52 -0)
common/transport/src/main/java/org/thingsboard/server/common/transport/quota/tenant/TenantMsgsIntervalRegistry.java 31(+31 -0)
common/transport/src/main/java/org/thingsboard/server/common/transport/quota/tenant/TenantQuotaService.java 30(+30 -0)
common/transport/src/main/java/org/thingsboard/server/common/transport/quota/tenant/TenantRequestLimitPolicy.java 28(+28 -0)
common/transport/src/test/java/org/thingsboard/server/common/transport/quota/HostRequestLimitPolicyTest.java 1(+1 -0)
common/transport/src/test/java/org/thingsboard/server/common/transport/quota/HostRequestsQuotaServiceTest.java 8(+3 -5)
common/transport/src/test/java/org/thingsboard/server/common/transport/quota/inmemory/HostRequestIntervalRegistryTest.java 4(+1 -3)
common/transport/src/test/java/org/thingsboard/server/common/transport/quota/inmemory/IntervalRegistryLoggerTest.java 4(+3 -1)
dao/src/main/java/org/thingsboard/server/dao/queue/db/nosql/repository/CassandraAckRepository.java 2(+1 -1)
dao/src/test/java/org/thingsboard/server/dao/queue/db/nosql/repository/CassandraAckRepositoryTest.java 2(+1 -1)
transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportService.java 3(+2 -1)
Details
diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultMsgQueueService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultMsgQueueService.java
index a67278c..a4558eb 100644
--- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultMsgQueueService.java
+++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultMsgQueueService.java
@@ -23,6 +23,7 @@ import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.TbMsg;
+import org.thingsboard.server.common.transport.quota.tenant.TenantQuotaService;
import org.thingsboard.server.dao.queue.MsgQueue;
import javax.annotation.PostConstruct;
@@ -48,6 +49,9 @@ public class DefaultMsgQueueService implements MsgQueueService {
@Autowired
private MsgQueue msgQueue;
+ @Autowired
+ private TenantQuotaService quotaService;
+
private ScheduledExecutorService cleanupExecutor;
private Map<TenantId, AtomicLong> pendingCountPerTenant = new ConcurrentHashMap<>();
@@ -70,6 +74,11 @@ public class DefaultMsgQueueService implements MsgQueueService {
@Override
public ListenableFuture<Void> put(TenantId tenantId, TbMsg msg, UUID nodeId, long clusterPartition) {
+ if(quotaService.isQuotaExceeded(tenantId.getId().toString())) {
+ log.warn("Tenant TbMsg Quota exceeded for [{}:{}] . Reject", tenantId.getId());
+ return Futures.immediateFailedFuture(new RuntimeException("Tenant TbMsg Quota exceeded"));
+ }
+
AtomicLong pendingMsgCount = pendingCountPerTenant.computeIfAbsent(tenantId, key -> new AtomicLong());
if (pendingMsgCount.incrementAndGet() < queueMaxSize) {
return msgQueue.put(tenantId, msg, nodeId, clusterPartition);
diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml
index e479562..3d50258 100644
--- a/application/src/main/resources/thingsboard.yml
+++ b/application/src/main/resources/thingsboard.yml
@@ -131,9 +131,28 @@ quota:
whitelist: "${QUOTA_HOST_WHITELIST:localhost,127.0.0.1}"
# Array of blacklist hosts
blacklist: "${QUOTA_HOST_BLACKLIST:}"
- log:
- topSize: 10
- intervalMin: 2
+ log:
+ topSize: 10
+ intervalMin: 2
+ rule:
+ tenant:
+ # Max allowed number of API requests in interval for single tenant
+ limit: "${QUOTA_TENANT_LIMIT:100000}"
+ # Interval duration
+ intervalMs: "${QUOTA_TENANT_INTERVAL_MS:60000}"
+ # Maximum silence duration for tenant after which Tenant removed from QuotaService. Must be bigger than intervalMs
+ ttlMs: "${QUOTA_TENANT_TTL_MS:60000}"
+ # Interval for scheduled task that cleans expired records. TTL is used for expiring
+ cleanPeriodMs: "${QUOTA_TENANT_CLEAN_PERIOD_MS:300000}"
+ # Enable Host API Limits
+ enabled: "${QUOTA_TENANT_ENABLED:false}"
+ # Array of whitelist tenants
+ whitelist: "${QUOTA_TENANT_WHITELIST:}"
+ # Array of blacklist tenants
+ blacklist: "${QUOTA_HOST_BLACKLIST:}"
+ log:
+ topSize: 10
+ intervalMin: 2
database:
type: "${DATABASE_TYPE:sql}" # cassandra OR sql
diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/host/HostIntervalRegistryCleaner.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/host/HostIntervalRegistryCleaner.java
new file mode 100644
index 0000000..ea75f5d
--- /dev/null
+++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/host/HostIntervalRegistryCleaner.java
@@ -0,0 +1,29 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.common.transport.quota.host;
+
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+import org.thingsboard.server.common.transport.quota.inmemory.IntervalRegistryCleaner;
+
+@Component
+public class HostIntervalRegistryCleaner extends IntervalRegistryCleaner {
+
+ public HostIntervalRegistryCleaner(HostRequestIntervalRegistry intervalRegistry,
+ @Value("${quota.host.cleanPeriodMs}") long cleanPeriodMs) {
+ super(intervalRegistry, cleanPeriodMs);
+ }
+}
diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/host/HostIntervalRegistryLogger.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/host/HostIntervalRegistryLogger.java
new file mode 100644
index 0000000..65767f1
--- /dev/null
+++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/host/HostIntervalRegistryLogger.java
@@ -0,0 +1,52 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.common.transport.quota.host;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+import org.thingsboard.server.common.transport.quota.inmemory.IntervalRegistryLogger;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+@Component
+@Slf4j
+public class HostIntervalRegistryLogger extends IntervalRegistryLogger {
+
+ private final long logIntervalMin;
+
+ public HostIntervalRegistryLogger(@Value("${quota.host.log.topSize}") int topSize,
+ @Value("${quota.host.log.intervalMin}") long logIntervalMin,
+ HostRequestIntervalRegistry intervalRegistry) {
+ super(topSize, logIntervalMin, intervalRegistry);
+ this.logIntervalMin = logIntervalMin;
+ }
+
+ protected void log(Map<String, Long> top, int uniqHosts, long requestsCount) {
+ long rps = requestsCount / TimeUnit.MINUTES.toSeconds(logIntervalMin);
+ StringBuilder builder = new StringBuilder("Quota Statistic : ");
+ builder.append("uniqHosts : ").append(uniqHosts).append("; ");
+ builder.append("requestsCount : ").append(requestsCount).append("; ");
+ builder.append("RPS : ").append(rps).append(" ");
+ builder.append("top -> ");
+ for (Map.Entry<String, Long> host : top.entrySet()) {
+ builder.append(host.getKey()).append(" : ").append(host.getValue()).append("; ");
+ }
+
+ log.info(builder.toString());
+ }
+}
diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/host/HostRequestIntervalRegistry.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/host/HostRequestIntervalRegistry.java
new file mode 100644
index 0000000..9b3b461
--- /dev/null
+++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/host/HostRequestIntervalRegistry.java
@@ -0,0 +1,37 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.common.transport.quota.host;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+import org.thingsboard.server.common.transport.quota.inmemory.KeyBasedIntervalRegistry;
+
+/**
+ * @author Vitaliy Paromskiy
+ * @version 1.0
+ */
+@Component
+@Slf4j
+public class HostRequestIntervalRegistry extends KeyBasedIntervalRegistry {
+
+ public HostRequestIntervalRegistry(@Value("${quota.host.intervalMs}") long intervalDurationMs,
+ @Value("${quota.host.ttlMs}") long ttlMs,
+ @Value("${quota.host.whitelist}") String whiteList,
+ @Value("${quota.host.blacklist}") String blackList) {
+ super(intervalDurationMs, ttlMs, whiteList, blackList, "host");
+ }
+}
diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/host/HostRequestsQuotaService.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/host/HostRequestsQuotaService.java
new file mode 100644
index 0000000..69342b5
--- /dev/null
+++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/host/HostRequestsQuotaService.java
@@ -0,0 +1,37 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.common.transport.quota.host;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Service;
+import org.thingsboard.server.common.transport.quota.AbstractQuotaService;
+
+/**
+ * @author Vitaliy Paromskiy
+ * @version 1.0
+ */
+@Service
+@Slf4j
+public class HostRequestsQuotaService extends AbstractQuotaService {
+
+ public HostRequestsQuotaService(HostRequestIntervalRegistry requestRegistry, HostRequestLimitPolicy requestsPolicy,
+ HostIntervalRegistryCleaner registryCleaner, HostIntervalRegistryLogger registryLogger,
+ @Value("${quota.host.enabled}") boolean enabled) {
+ super(requestRegistry, requestsPolicy, registryCleaner, registryLogger, enabled);
+ }
+
+}
diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/inmemory/IntervalRegistryCleaner.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/inmemory/IntervalRegistryCleaner.java
index a227d2a..0c510ff 100644
--- a/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/inmemory/IntervalRegistryCleaner.java
+++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/inmemory/IntervalRegistryCleaner.java
@@ -16,10 +16,7 @@
package org.thingsboard.server.common.transport.quota.inmemory;
import lombok.extern.slf4j.Slf4j;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.stereotype.Component;
-import javax.annotation.PreDestroy;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -28,15 +25,14 @@ import java.util.concurrent.TimeUnit;
* @author Vitaliy Paromskiy
* @version 1.0
*/
-@Component
@Slf4j
-public class IntervalRegistryCleaner {
+public abstract class IntervalRegistryCleaner {
- private final HostRequestIntervalRegistry intervalRegistry;
+ private final KeyBasedIntervalRegistry intervalRegistry;
private final long cleanPeriodMs;
private ScheduledExecutorService executor;
- public IntervalRegistryCleaner(HostRequestIntervalRegistry intervalRegistry, @Value("${quota.host.cleanPeriodMs}") long cleanPeriodMs) {
+ public IntervalRegistryCleaner(KeyBasedIntervalRegistry intervalRegistry, long cleanPeriodMs) {
this.intervalRegistry = intervalRegistry;
this.cleanPeriodMs = cleanPeriodMs;
}
diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/inmemory/IntervalRegistryLogger.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/inmemory/IntervalRegistryLogger.java
index 8b34a6b..30399a1 100644
--- a/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/inmemory/IntervalRegistryLogger.java
+++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/inmemory/IntervalRegistryLogger.java
@@ -17,8 +17,6 @@ package org.thingsboard.server.common.transport.quota.inmemory;
import com.google.common.collect.MinMaxPriorityQueue;
import lombok.extern.slf4j.Slf4j;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.stereotype.Component;
import java.util.Comparator;
import java.util.Map;
@@ -32,17 +30,15 @@ import java.util.stream.Collectors;
* @author Vitaliy Paromskiy
* @version 1.0
*/
-@Component
@Slf4j
-public class IntervalRegistryLogger {
+public abstract class IntervalRegistryLogger {
private final int topSize;
- private final HostRequestIntervalRegistry intervalRegistry;
+ private final KeyBasedIntervalRegistry intervalRegistry;
private final long logIntervalMin;
private ScheduledExecutorService executor;
- public IntervalRegistryLogger(@Value("${quota.log.topSize}") int topSize, @Value("${quota.log.intervalMin}") long logIntervalMin,
- HostRequestIntervalRegistry intervalRegistry) {
+ public IntervalRegistryLogger(int topSize, long logIntervalMin, KeyBasedIntervalRegistry intervalRegistry) {
this.topSize = topSize;
this.logIntervalMin = logIntervalMin;
this.intervalRegistry = intervalRegistry;
@@ -79,17 +75,5 @@ public class IntervalRegistryLogger {
return topQueue.stream().collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
- private void log(Map<String, Long> top, int uniqHosts, long requestsCount) {
- long rps = requestsCount / TimeUnit.MINUTES.toSeconds(logIntervalMin);
- StringBuilder builder = new StringBuilder("Quota Statistic : ");
- builder.append("uniqHosts : ").append(uniqHosts).append("; ");
- builder.append("requestsCount : ").append(requestsCount).append("; ");
- builder.append("RPS : ").append(rps).append(" ");
- builder.append("top -> ");
- for (Map.Entry<String, Long> host : top.entrySet()) {
- builder.append(host.getKey()).append(" : ").append(host.getValue()).append("; ");
- }
-
- log.info(builder.toString());
- }
+ protected abstract void log(Map<String, Long> top, int uniqHosts, long requestsCount);
}
diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/RequestLimitPolicy.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/RequestLimitPolicy.java
new file mode 100644
index 0000000..0ff1230
--- /dev/null
+++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/RequestLimitPolicy.java
@@ -0,0 +1,30 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.common.transport.quota;
+
+
+public abstract class RequestLimitPolicy {
+
+ private final long limit;
+
+ public RequestLimitPolicy(long limit) {
+ this.limit = limit;
+ }
+
+ public boolean isValid(long currentValue) {
+ return currentValue <= limit;
+ }
+}
diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/tenant/TenantIntervalRegistryCleaner.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/tenant/TenantIntervalRegistryCleaner.java
new file mode 100644
index 0000000..c481170
--- /dev/null
+++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/tenant/TenantIntervalRegistryCleaner.java
@@ -0,0 +1,29 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.common.transport.quota.tenant;
+
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+import org.thingsboard.server.common.transport.quota.inmemory.IntervalRegistryCleaner;
+
+@Component
+public class TenantIntervalRegistryCleaner extends IntervalRegistryCleaner {
+
+ public TenantIntervalRegistryCleaner(TenantMsgsIntervalRegistry intervalRegistry,
+ @Value("${quota.rule.tenant.cleanPeriodMs}") long cleanPeriodMs) {
+ super(intervalRegistry, cleanPeriodMs);
+ }
+}
diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/tenant/TenantIntervalRegistryLogger.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/tenant/TenantIntervalRegistryLogger.java
new file mode 100644
index 0000000..c56f457
--- /dev/null
+++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/tenant/TenantIntervalRegistryLogger.java
@@ -0,0 +1,52 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.common.transport.quota.tenant;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+import org.thingsboard.server.common.transport.quota.inmemory.IntervalRegistryLogger;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+@Slf4j
+@Component
+public class TenantIntervalRegistryLogger extends IntervalRegistryLogger {
+
+ private final long logIntervalMin;
+
+ public TenantIntervalRegistryLogger(@Value("${quota.rule.tenant.log.topSize}") int topSize,
+ @Value("${quota.rule.tenant.log.intervalMin}") long logIntervalMin,
+ TenantMsgsIntervalRegistry intervalRegistry) {
+ super(topSize, logIntervalMin, intervalRegistry);
+ this.logIntervalMin = logIntervalMin;
+ }
+
+ protected void log(Map<String, Long> top, int uniqHosts, long requestsCount) {
+ long rps = requestsCount / TimeUnit.MINUTES.toSeconds(logIntervalMin);
+ StringBuilder builder = new StringBuilder("Tenant Quota Statistic : ");
+ builder.append("uniqTenants : ").append(uniqHosts).append("; ");
+ builder.append("requestsCount : ").append(requestsCount).append("; ");
+ builder.append("RPS : ").append(rps).append(" ");
+ builder.append("top -> ");
+ for (Map.Entry<String, Long> host : top.entrySet()) {
+ builder.append(host.getKey()).append(" : ").append(host.getValue()).append("; ");
+ }
+
+ log.info(builder.toString());
+ }
+}
diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/tenant/TenantMsgsIntervalRegistry.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/tenant/TenantMsgsIntervalRegistry.java
new file mode 100644
index 0000000..6e8402c
--- /dev/null
+++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/tenant/TenantMsgsIntervalRegistry.java
@@ -0,0 +1,31 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.common.transport.quota.tenant;
+
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+import org.thingsboard.server.common.transport.quota.inmemory.KeyBasedIntervalRegistry;
+
+@Component
+public class TenantMsgsIntervalRegistry extends KeyBasedIntervalRegistry {
+
+ public TenantMsgsIntervalRegistry(@Value("${quota.rule.tenant.intervalMs}") long intervalDurationMs,
+ @Value("${quota.rule.tenant.ttlMs}") long ttlMs,
+ @Value("${quota.rule.tenant.whitelist}") String whiteList,
+ @Value("${quota.rule.tenant.blacklist}") String blackList) {
+ super(intervalDurationMs, ttlMs, whiteList, blackList, "Rule Tenant");
+ }
+}
diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/tenant/TenantQuotaService.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/tenant/TenantQuotaService.java
new file mode 100644
index 0000000..a68860a
--- /dev/null
+++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/tenant/TenantQuotaService.java
@@ -0,0 +1,30 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.common.transport.quota.tenant;
+
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+import org.thingsboard.server.common.transport.quota.AbstractQuotaService;
+
+@Component
+public class TenantQuotaService extends AbstractQuotaService {
+
+ public TenantQuotaService(TenantMsgsIntervalRegistry requestRegistry, TenantRequestLimitPolicy requestsPolicy,
+ TenantIntervalRegistryCleaner registryCleaner, TenantIntervalRegistryLogger registryLogger,
+ @Value("${quota.rule.tenant.enabled}") boolean enabled) {
+ super(requestRegistry, requestsPolicy, registryCleaner, registryLogger, enabled);
+ }
+}
diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/tenant/TenantRequestLimitPolicy.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/tenant/TenantRequestLimitPolicy.java
new file mode 100644
index 0000000..cc32c81
--- /dev/null
+++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/tenant/TenantRequestLimitPolicy.java
@@ -0,0 +1,28 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.common.transport.quota.tenant;
+
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+import org.thingsboard.server.common.transport.quota.RequestLimitPolicy;
+
+@Component
+public class TenantRequestLimitPolicy extends RequestLimitPolicy {
+
+ public TenantRequestLimitPolicy(@Value("${quota.rule.tenant.limit}") long limit) {
+ super(limit);
+ }
+}
diff --git a/common/transport/src/test/java/org/thingsboard/server/common/transport/quota/HostRequestLimitPolicyTest.java b/common/transport/src/test/java/org/thingsboard/server/common/transport/quota/HostRequestLimitPolicyTest.java
index 174d182..07e03ef 100644
--- a/common/transport/src/test/java/org/thingsboard/server/common/transport/quota/HostRequestLimitPolicyTest.java
+++ b/common/transport/src/test/java/org/thingsboard/server/common/transport/quota/HostRequestLimitPolicyTest.java
@@ -16,6 +16,7 @@
package org.thingsboard.server.common.transport.quota;
import org.junit.Test;
+import org.thingsboard.server.common.transport.quota.host.HostRequestLimitPolicy;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
diff --git a/common/transport/src/test/java/org/thingsboard/server/common/transport/quota/HostRequestsQuotaServiceTest.java b/common/transport/src/test/java/org/thingsboard/server/common/transport/quota/HostRequestsQuotaServiceTest.java
index 547f0cf..20f8a55 100644
--- a/common/transport/src/test/java/org/thingsboard/server/common/transport/quota/HostRequestsQuotaServiceTest.java
+++ b/common/transport/src/test/java/org/thingsboard/server/common/transport/quota/HostRequestsQuotaServiceTest.java
@@ -17,9 +17,7 @@ package org.thingsboard.server.common.transport.quota;
import org.junit.Before;
import org.junit.Test;
-import org.thingsboard.server.common.transport.quota.inmemory.HostRequestIntervalRegistry;
-import org.thingsboard.server.common.transport.quota.inmemory.IntervalRegistryCleaner;
-import org.thingsboard.server.common.transport.quota.inmemory.IntervalRegistryLogger;
+import org.thingsboard.server.common.transport.quota.host.*;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@@ -35,8 +33,8 @@ public class HostRequestsQuotaServiceTest {
private HostRequestIntervalRegistry requestRegistry = mock(HostRequestIntervalRegistry.class);
private HostRequestLimitPolicy requestsPolicy = mock(HostRequestLimitPolicy.class);
- private IntervalRegistryCleaner registryCleaner = mock(IntervalRegistryCleaner.class);
- private IntervalRegistryLogger registryLogger = mock(IntervalRegistryLogger.class);
+ private HostIntervalRegistryCleaner registryCleaner = mock(HostIntervalRegistryCleaner.class);
+ private HostIntervalRegistryLogger registryLogger = mock(HostIntervalRegistryLogger.class);
@Before
public void init() {
diff --git a/common/transport/src/test/java/org/thingsboard/server/common/transport/quota/inmemory/HostRequestIntervalRegistryTest.java b/common/transport/src/test/java/org/thingsboard/server/common/transport/quota/inmemory/HostRequestIntervalRegistryTest.java
index 78b82ee..b49dd00 100644
--- a/common/transport/src/test/java/org/thingsboard/server/common/transport/quota/inmemory/HostRequestIntervalRegistryTest.java
+++ b/common/transport/src/test/java/org/thingsboard/server/common/transport/quota/inmemory/HostRequestIntervalRegistryTest.java
@@ -15,11 +15,9 @@
*/
package org.thingsboard.server.common.transport.quota.inmemory;
-import com.google.common.collect.Sets;
import org.junit.Before;
import org.junit.Test;
-
-import java.util.Collections;
+import org.thingsboard.server.common.transport.quota.host.HostRequestIntervalRegistry;
import static org.junit.Assert.assertEquals;
diff --git a/common/transport/src/test/java/org/thingsboard/server/common/transport/quota/inmemory/IntervalRegistryLoggerTest.java b/common/transport/src/test/java/org/thingsboard/server/common/transport/quota/inmemory/IntervalRegistryLoggerTest.java
index c9139ae..6e51420 100644
--- a/common/transport/src/test/java/org/thingsboard/server/common/transport/quota/inmemory/IntervalRegistryLoggerTest.java
+++ b/common/transport/src/test/java/org/thingsboard/server/common/transport/quota/inmemory/IntervalRegistryLoggerTest.java
@@ -18,6 +18,8 @@ package org.thingsboard.server.common.transport.quota.inmemory;
import com.google.common.collect.ImmutableMap;
import org.junit.Before;
import org.junit.Test;
+import org.thingsboard.server.common.transport.quota.host.HostIntervalRegistryLogger;
+import org.thingsboard.server.common.transport.quota.host.HostRequestIntervalRegistry;
import java.util.Collections;
import java.util.Map;
@@ -37,7 +39,7 @@ public class IntervalRegistryLoggerTest {
@Before
public void init() {
- logger = new IntervalRegistryLogger(3, 10, requestRegistry);
+ logger = new HostIntervalRegistryLogger(3, 10, requestRegistry);
}
@Test
diff --git a/dao/src/main/java/org/thingsboard/server/dao/queue/db/nosql/CassandraMsgQueue.java b/dao/src/main/java/org/thingsboard/server/dao/queue/db/nosql/CassandraMsgQueue.java
index ceaab58..ce481b7 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/queue/db/nosql/CassandraMsgQueue.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/queue/db/nosql/CassandraMsgQueue.java
@@ -26,6 +26,8 @@ import org.springframework.stereotype.Component;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.dao.queue.MsgQueue;
+import org.thingsboard.server.dao.queue.db.MsgAck;
+import org.thingsboard.server.dao.queue.db.UnprocessedMsgFilter;
import org.thingsboard.server.dao.queue.db.repository.AckRepository;
import org.thingsboard.server.dao.queue.db.repository.MsgRepository;
import org.thingsboard.server.dao.util.NoSqlDao;
diff --git a/dao/src/main/java/org/thingsboard/server/dao/queue/db/nosql/repository/CassandraAckRepository.java b/dao/src/main/java/org/thingsboard/server/dao/queue/db/nosql/repository/CassandraAckRepository.java
index 31ab074..6c59c59 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/queue/db/nosql/repository/CassandraAckRepository.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/queue/db/nosql/repository/CassandraAckRepository.java
@@ -26,7 +26,7 @@ import com.google.common.util.concurrent.ListenableFuture;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.thingsboard.server.dao.nosql.CassandraAbstractDao;
-import org.thingsboard.server.dao.queue.db.nosql.MsgAck;
+import org.thingsboard.server.dao.queue.db.MsgAck;
import org.thingsboard.server.dao.queue.db.repository.AckRepository;
import org.thingsboard.server.dao.util.NoSqlDao;
diff --git a/dao/src/main/java/org/thingsboard/server/dao/queue/db/repository/AckRepository.java b/dao/src/main/java/org/thingsboard/server/dao/queue/db/repository/AckRepository.java
index 458dba8..6fbd2da 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/queue/db/repository/AckRepository.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/queue/db/repository/AckRepository.java
@@ -16,7 +16,7 @@
package org.thingsboard.server.dao.queue.db.repository;
import com.google.common.util.concurrent.ListenableFuture;
-import org.thingsboard.server.dao.queue.db.nosql.MsgAck;
+import org.thingsboard.server.dao.queue.db.MsgAck;
import java.util.List;
import java.util.UUID;
diff --git a/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationService.java b/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationService.java
index 7b2b391..ea29f6f 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationService.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationService.java
@@ -16,6 +16,7 @@
package org.thingsboard.server.dao.relation;
import com.google.common.base.Function;
+import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import lombok.extern.slf4j.Slf4j;
@@ -176,97 +177,65 @@ public class BaseRelationService implements RelationService {
}
@Override
- public boolean deleteEntityRelations(EntityId entity) {
- Cache cache = cacheManager.getCache(RELATIONS_CACHE);
- log.trace("Executing deleteEntityRelations [{}]", entity);
- validate(entity);
- List<ListenableFuture<List<EntityRelation>>> inboundRelationsList = new ArrayList<>();
- for (RelationTypeGroup typeGroup : RelationTypeGroup.values()) {
- inboundRelationsList.add(relationDao.findAllByTo(entity, typeGroup));
- }
- ListenableFuture<List<List<EntityRelation>>> inboundRelations = Futures.allAsList(inboundRelationsList);
- ListenableFuture<List<Boolean>> inboundDeletions = Futures.transform(inboundRelations, relations ->
- getBooleans(relations, cache, true));
-
- ListenableFuture<Boolean> inboundFuture = Futures.transform(inboundDeletions, getListToBooleanFunction());
- boolean inboundDeleteResult = false;
+ public void deleteEntityRelations(EntityId entityId) {
try {
- inboundDeleteResult = inboundFuture.get();
+ deleteEntityRelationsAsync(entityId).get();
} catch (InterruptedException | ExecutionException e) {
- log.error("Error deleting entity inbound relations", e);
- }
-
- List<ListenableFuture<List<EntityRelation>>> outboundRelationsList = new ArrayList<>();
- for (RelationTypeGroup typeGroup : RelationTypeGroup.values()) {
- outboundRelationsList.add(relationDao.findAllByFrom(entity, typeGroup));
- }
- ListenableFuture<List<List<EntityRelation>>> outboundRelations = Futures.allAsList(outboundRelationsList);
- Futures.transform(outboundRelations, relations -> getBooleans(relations, cache, false));
-
- boolean outboundDeleteResult = relationDao.deleteOutboundRelations(entity);
- return inboundDeleteResult && outboundDeleteResult;
- }
-
- private List<Boolean> getBooleans(List<List<EntityRelation>> relations, Cache cache, boolean isRemove) {
- List<Boolean> results = new ArrayList<>();
- for (List<EntityRelation> relationList : relations) {
- relationList.forEach(relation -> checkFromDeleteSync(cache, results, relation, isRemove));
+ throw new RuntimeException(e);
}
- return results;
- }
-
- private void checkFromDeleteSync(Cache cache, List<Boolean> results, EntityRelation relation, boolean isRemove) {
- if (isRemove) {
- results.add(relationDao.deleteRelation(relation));
- }
- cacheEviction(relation, cache);
}
@Override
- public ListenableFuture<Boolean> deleteEntityRelationsAsync(EntityId entity) {
+ public ListenableFuture<Void> deleteEntityRelationsAsync(EntityId entityId) {
Cache cache = cacheManager.getCache(RELATIONS_CACHE);
- log.trace("Executing deleteEntityRelationsAsync [{}]", entity);
- validate(entity);
+ log.trace("Executing deleteEntityRelationsAsync [{}]", entityId);
+ validate(entityId);
List<ListenableFuture<List<EntityRelation>>> inboundRelationsList = new ArrayList<>();
for (RelationTypeGroup typeGroup : RelationTypeGroup.values()) {
- inboundRelationsList.add(relationDao.findAllByTo(entity, typeGroup));
+ inboundRelationsList.add(relationDao.findAllByTo(entityId, typeGroup));
}
+
ListenableFuture<List<List<EntityRelation>>> inboundRelations = Futures.allAsList(inboundRelationsList);
+
+ List<ListenableFuture<List<EntityRelation>>> outboundRelationsList = new ArrayList<>();
+ for (RelationTypeGroup typeGroup : RelationTypeGroup.values()) {
+ outboundRelationsList.add(relationDao.findAllByFrom(entityId, typeGroup));
+ }
+
+ ListenableFuture<List<List<EntityRelation>>> outboundRelations = Futures.allAsList(outboundRelationsList);
+
ListenableFuture<List<Boolean>> inboundDeletions = Futures.transformAsync(inboundRelations,
relations -> {
- List<ListenableFuture<Boolean>> results = getListenableFutures(relations, cache, true);
+ List<ListenableFuture<Boolean>> results = deleteRelationGroupsAsync(relations, cache, true);
return Futures.allAsList(results);
});
- ListenableFuture<Boolean> inboundFuture = Futures.transform(inboundDeletions, getListToBooleanFunction());
+ ListenableFuture<List<Boolean>> outboundDeletions = Futures.transformAsync(outboundRelations,
+ relations -> {
+ List<ListenableFuture<Boolean>> results = deleteRelationGroupsAsync(relations, cache, false);
+ return Futures.allAsList(results);
+ });
- List<ListenableFuture<List<EntityRelation>>> outboundRelationsList = new ArrayList<>();
- for (RelationTypeGroup typeGroup : RelationTypeGroup.values()) {
- outboundRelationsList.add(relationDao.findAllByFrom(entity, typeGroup));
- }
- ListenableFuture<List<List<EntityRelation>>> outboundRelations = Futures.allAsList(outboundRelationsList);
- Futures.transformAsync(outboundRelations, relations -> {
- List<ListenableFuture<Boolean>> results = getListenableFutures(relations, cache, false);
- return Futures.allAsList(results);
- });
+ ListenableFuture<List<List<Boolean>>> deletionsFuture = Futures.allAsList(inboundDeletions, outboundDeletions);
- ListenableFuture<Boolean> outboundFuture = relationDao.deleteOutboundRelationsAsync(entity);
- return Futures.transform(Futures.allAsList(Arrays.asList(inboundFuture, outboundFuture)), getListToBooleanFunction());
+ return Futures.transform(Futures.transformAsync(deletionsFuture, (deletions) -> relationDao.deleteOutboundRelationsAsync(entityId)), result -> null);
}
- private List<ListenableFuture<Boolean>> getListenableFutures(List<List<EntityRelation>> relations, Cache cache, boolean isRemove) {
+ private List<ListenableFuture<Boolean>> deleteRelationGroupsAsync(List<List<EntityRelation>> relations, Cache cache, boolean deleteFromDb) {
List<ListenableFuture<Boolean>> results = new ArrayList<>();
for (List<EntityRelation> relationList : relations) {
- relationList.forEach(relation -> checkFromDeleteAsync(cache, results, relation, isRemove));
+ relationList.forEach(relation -> results.add(deleteAsync(cache, relation, deleteFromDb)));
}
return results;
}
- private void checkFromDeleteAsync(Cache cache, List<ListenableFuture<Boolean>> results, EntityRelation relation, boolean isRemove) {
- if (isRemove) {
- results.add(relationDao.deleteRelationAsync(relation));
- }
+ private ListenableFuture<Boolean> deleteAsync(Cache cache, EntityRelation relation, boolean deleteFromDb) {
cacheEviction(relation, cache);
+ if (deleteFromDb) {
+ return relationDao.deleteRelationAsync(relation);
+ } else {
+ return Futures.immediateFuture(false);
+ }
}
private void cacheEviction(EntityRelation relation, Cache cache) {
diff --git a/dao/src/main/java/org/thingsboard/server/dao/relation/RelationService.java b/dao/src/main/java/org/thingsboard/server/dao/relation/RelationService.java
index ca1b959..da58b11 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/relation/RelationService.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/relation/RelationService.java
@@ -23,6 +23,7 @@ import org.thingsboard.server.common.data.relation.EntityRelationsQuery;
import org.thingsboard.server.common.data.relation.RelationTypeGroup;
import java.util.List;
+import java.util.concurrent.ExecutionException;
/**
* Created by ashvayka on 27.04.17.
@@ -47,9 +48,9 @@ public interface RelationService {
ListenableFuture<Boolean> deleteRelationAsync(EntityId from, EntityId to, String relationType, RelationTypeGroup typeGroup);
- boolean deleteEntityRelations(EntityId entity);
+ void deleteEntityRelations(EntityId entity);
- ListenableFuture<Boolean> deleteEntityRelationsAsync(EntityId entity);
+ ListenableFuture<Void> deleteEntityRelationsAsync(EntityId entity);
List<EntityRelation> findByFrom(EntityId from, RelationTypeGroup typeGroup);
diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/relation/JpaRelationDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/relation/JpaRelationDao.java
index d57aa45..a8de677 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/sql/relation/JpaRelationDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/sql/relation/JpaRelationDao.java
@@ -127,39 +127,35 @@ public class JpaRelationDao extends JpaAbstractDaoListeningExecutorService imple
@Override
public boolean deleteRelation(EntityRelation relation) {
RelationCompositeKey key = new RelationCompositeKey(relation);
- boolean relationExistsBeforeDelete = relationRepository.exists(key);
- relationRepository.delete(key);
- return relationExistsBeforeDelete;
+ return deleteRelationIfExists(key);
}
@Override
public ListenableFuture<Boolean> deleteRelationAsync(EntityRelation relation) {
RelationCompositeKey key = new RelationCompositeKey(relation);
return service.submit(
- () -> {
- boolean relationExistsBeforeDelete = relationRepository.exists(key);
- relationRepository.delete(key);
- return relationExistsBeforeDelete;
- });
+ () -> deleteRelationIfExists(key));
}
@Override
public boolean deleteRelation(EntityId from, EntityId to, String relationType, RelationTypeGroup typeGroup) {
RelationCompositeKey key = getRelationCompositeKey(from, to, relationType, typeGroup);
- boolean relationExistsBeforeDelete = relationRepository.exists(key);
- relationRepository.delete(key);
- return relationExistsBeforeDelete;
+ return deleteRelationIfExists(key);
}
@Override
public ListenableFuture<Boolean> deleteRelationAsync(EntityId from, EntityId to, String relationType, RelationTypeGroup typeGroup) {
RelationCompositeKey key = getRelationCompositeKey(from, to, relationType, typeGroup);
return service.submit(
- () -> {
- boolean relationExistsBeforeDelete = relationRepository.exists(key);
- relationRepository.delete(key);
- return relationExistsBeforeDelete;
- });
+ () -> deleteRelationIfExists(key));
+ }
+
+ private boolean deleteRelationIfExists(RelationCompositeKey key) {
+ boolean relationExistsBeforeDelete = relationRepository.exists(key);
+ if (relationExistsBeforeDelete) {
+ relationRepository.delete(key);
+ }
+ return relationExistsBeforeDelete;
}
@Override
@@ -167,7 +163,9 @@ public class JpaRelationDao extends JpaAbstractDaoListeningExecutorService imple
boolean relationExistsBeforeDelete = relationRepository
.findAllByFromIdAndFromType(UUIDConverter.fromTimeUUID(entity.getId()), entity.getEntityType().name())
.size() > 0;
- relationRepository.deleteByFromIdAndFromType(UUIDConverter.fromTimeUUID(entity.getId()), entity.getEntityType().name());
+ if (relationExistsBeforeDelete) {
+ relationRepository.deleteByFromIdAndFromType(UUIDConverter.fromTimeUUID(entity.getId()), entity.getEntityType().name());
+ }
return relationExistsBeforeDelete;
}
@@ -178,7 +176,9 @@ public class JpaRelationDao extends JpaAbstractDaoListeningExecutorService imple
boolean relationExistsBeforeDelete = relationRepository
.findAllByFromIdAndFromType(UUIDConverter.fromTimeUUID(entity.getId()), entity.getEntityType().name())
.size() > 0;
- relationRepository.deleteByFromIdAndFromType(UUIDConverter.fromTimeUUID(entity.getId()), entity.getEntityType().name());
+ if (relationExistsBeforeDelete) {
+ relationRepository.deleteByFromIdAndFromType(UUIDConverter.fromTimeUUID(entity.getId()), entity.getEntityType().name());
+ }
return relationExistsBeforeDelete;
});
}
diff --git a/dao/src/test/java/org/thingsboard/server/dao/queue/db/nosql/repository/CassandraAckRepositoryTest.java b/dao/src/test/java/org/thingsboard/server/dao/queue/db/nosql/repository/CassandraAckRepositoryTest.java
index 9b743c9..b2f38dc 100644
--- a/dao/src/test/java/org/thingsboard/server/dao/queue/db/nosql/repository/CassandraAckRepositoryTest.java
+++ b/dao/src/test/java/org/thingsboard/server/dao/queue/db/nosql/repository/CassandraAckRepositoryTest.java
@@ -21,9 +21,9 @@ import com.google.common.util.concurrent.ListenableFuture;
import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.util.ReflectionTestUtils;
-import org.thingsboard.server.dao.queue.db.nosql.MsgAck;
import org.thingsboard.server.dao.service.AbstractServiceTest;
import org.thingsboard.server.dao.service.DaoNoSqlTest;
+import org.thingsboard.server.dao.queue.db.MsgAck;
import java.util.List;
import java.util.UUID;
diff --git a/dao/src/test/java/org/thingsboard/server/dao/queue/db/nosql/UnprocessedMsgFilterTest.java b/dao/src/test/java/org/thingsboard/server/dao/queue/db/nosql/UnprocessedMsgFilterTest.java
index 43c6e72..fd9bf21 100644
--- a/dao/src/test/java/org/thingsboard/server/dao/queue/db/nosql/UnprocessedMsgFilterTest.java
+++ b/dao/src/test/java/org/thingsboard/server/dao/queue/db/nosql/UnprocessedMsgFilterTest.java
@@ -18,6 +18,8 @@ package org.thingsboard.server.dao.queue.db.nosql;
import com.google.common.collect.Lists;
import org.junit.Test;
import org.thingsboard.server.common.msg.TbMsg;
+import org.thingsboard.server.dao.queue.db.MsgAck;
+import org.thingsboard.server.dao.queue.db.UnprocessedMsgFilter;
import java.util.Collection;
import java.util.List;
diff --git a/dao/src/test/java/org/thingsboard/server/dao/service/BaseRelationServiceTest.java b/dao/src/test/java/org/thingsboard/server/dao/service/BaseRelationServiceTest.java
index a774bfd..743aefc 100644
--- a/dao/src/test/java/org/thingsboard/server/dao/service/BaseRelationServiceTest.java
+++ b/dao/src/test/java/org/thingsboard/server/dao/service/BaseRelationServiceTest.java
@@ -96,7 +96,7 @@ public abstract class BaseRelationServiceTest extends AbstractServiceTest {
saveRelation(relationA);
saveRelation(relationB);
- Assert.assertTrue(relationService.deleteEntityRelationsAsync(childId).get());
+ Assert.assertNull(relationService.deleteEntityRelationsAsync(childId).get());
Assert.assertFalse(relationService.checkRelation(parentId, childId, EntityRelation.CONTAINS_TYPE, RelationTypeGroup.COMMON).get());
diff --git a/dao/src/test/resources/application-test.properties b/dao/src/test/resources/application-test.properties
index dbd8b84..f2dab45 100644
--- a/dao/src/test/resources/application-test.properties
+++ b/dao/src/test/resources/application-test.properties
@@ -30,4 +30,3 @@ redis.connection.db=0
redis.connection.password=
rule.queue.type=memory
-rule.queue.max_size=10000
\ No newline at end of file
diff --git a/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportService.java b/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportService.java
index 15706d4..6c8437c 100644
--- a/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportService.java
+++ b/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportService.java
@@ -27,6 +27,7 @@ import org.springframework.stereotype.Service;
import org.thingsboard.server.common.transport.SessionMsgProcessor;
import org.thingsboard.server.common.transport.auth.DeviceAuthService;
import org.thingsboard.server.common.transport.quota.QuotaService;
+import org.thingsboard.server.common.transport.quota.host.HostRequestsQuotaService;
import org.thingsboard.server.transport.coap.adaptors.CoapTransportAdaptor;
import javax.annotation.PostConstruct;
@@ -55,7 +56,7 @@ public class CoapTransportService {
private DeviceAuthService authService;
@Autowired(required = false)
- private QuotaService quotaService;
+ private HostRequestsQuotaService quotaService;
@Value("${coap.bind_address}")
diff --git a/transport/coap/src/test/java/org/thingsboard/server/transport/coap/CoapServerTest.java b/transport/coap/src/test/java/org/thingsboard/server/transport/coap/CoapServerTest.java
index 320f06e..4815056 100644
--- a/transport/coap/src/test/java/org/thingsboard/server/transport/coap/CoapServerTest.java
+++ b/transport/coap/src/test/java/org/thingsboard/server/transport/coap/CoapServerTest.java
@@ -50,7 +50,7 @@ import org.thingsboard.server.common.msg.session.*;
import org.thingsboard.server.common.transport.SessionMsgProcessor;
import org.thingsboard.server.common.transport.auth.DeviceAuthResult;
import org.thingsboard.server.common.transport.auth.DeviceAuthService;
-import org.thingsboard.server.common.transport.quota.QuotaService;
+import org.thingsboard.server.common.transport.quota.host.HostRequestsQuotaService;
import java.util.ArrayList;
import java.util.List;
@@ -134,8 +134,8 @@ public class CoapServerTest {
}
@Bean
- public static QuotaService quotaService() {
- return key -> false;
+ public static HostRequestsQuotaService quotaService() {
+ return new HostRequestsQuotaService(null, null, null, null, false);
}
}
diff --git a/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java b/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java
index 930b442..d26d076 100644
--- a/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java
+++ b/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java
@@ -36,6 +36,7 @@ import org.thingsboard.server.common.transport.SessionMsgProcessor;
import org.thingsboard.server.common.transport.adaptor.JsonConverter;
import org.thingsboard.server.common.transport.auth.DeviceAuthService;
import org.thingsboard.server.common.transport.quota.QuotaService;
+import org.thingsboard.server.common.transport.quota.host.HostRequestsQuotaService;
import org.thingsboard.server.transport.http.session.HttpSessionCtx;
import javax.servlet.http.HttpServletRequest;
@@ -61,7 +62,7 @@ public class DeviceApiController {
private DeviceAuthService authService;
@Autowired(required = false)
- private QuotaService quotaService;
+ private HostRequestsQuotaService quotaService;
@RequestMapping(value = "/{deviceToken}/attributes", method = RequestMethod.GET, produces = "application/json")
public DeferredResult<ResponseEntity> getDeviceAttributes(@PathVariable("deviceToken") String deviceToken,
diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportService.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportService.java
index f0129e1..1b37ed4 100644
--- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportService.java
+++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportService.java
@@ -29,7 +29,7 @@ import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Service;
import org.thingsboard.server.common.transport.SessionMsgProcessor;
import org.thingsboard.server.common.transport.auth.DeviceAuthService;
-import org.thingsboard.server.common.transport.quota.QuotaService;
+import org.thingsboard.server.common.transport.quota.host.HostRequestsQuotaService;
import org.thingsboard.server.dao.device.DeviceService;
import org.thingsboard.server.dao.relation.RelationService;
import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor;
@@ -67,7 +67,7 @@ public class MqttTransportService {
private MqttSslHandlerProvider sslHandlerProvider;
@Autowired(required = false)
- private QuotaService quotaService;
+ private HostRequestsQuotaService quotaService;
@Value("${mqtt.bind_address}")
private String host;