thingsboard-aplcache
Changes
application/src/main/java/org/thingsboard/server/actors/plugin/PluginActorMessageProcessor.java 1(+1 -0)
Details
diff --git a/application/src/main/java/org/thingsboard/server/actors/plugin/PluginActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/plugin/PluginActorMessageProcessor.java
index 7dfe9a5..2af30d4 100644
--- a/application/src/main/java/org/thingsboard/server/actors/plugin/PluginActorMessageProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/plugin/PluginActorMessageProcessor.java
@@ -106,6 +106,7 @@ public class PluginActorMessageProcessor extends ComponentMsgProcessor<PluginId>
try {
pluginImpl.process(trustedCtx, msg.getRuleTenantId(), msg.getRuleId(), msg.getMsg());
} catch (Exception ex) {
+ logger.debug("[{}] Failed to process RuleToPlugin msg: [{}] [{}]", tenantId, msg.getMsg(), ex);
RuleToPluginMsg ruleMsg = msg.getMsg();
MsgType responceMsgType = MsgType.RULE_ENGINE_ERROR;
Integer requestId = 0;
diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml
index 07face1..c47fc28 100644
--- a/application/src/main/resources/thingsboard.yml
+++ b/application/src/main/resources/thingsboard.yml
@@ -133,7 +133,7 @@ quota:
intervalMin: 2
database:
- type: "${DATABASE_TYPE:cassandra}" # cassandra OR sql
+ type: "${DATABASE_TYPE:sql}" # cassandra OR sql
# Cassandra driver configuration parameters
cassandra:
@@ -226,7 +226,7 @@ caffeine:
specs:
relations:
timeToLiveInMinutes: 1440
- maxSize: 0
+ maxSize: 100000
deviceCredentials:
timeToLiveInMinutes: 1440
maxSize: 100000
diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/inmemory/HostRequestIntervalRegistry.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/inmemory/HostRequestIntervalRegistry.java
index 8d254a0..3782ed2 100644
--- a/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/inmemory/HostRequestIntervalRegistry.java
+++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/inmemory/HostRequestIntervalRegistry.java
@@ -61,13 +61,14 @@ public class HostRequestIntervalRegistry {
}
public long tick(String clientHostId) {
+ IntervalCount intervalCount = hostCounts.computeIfAbsent(clientHostId, s -> new IntervalCount(intervalDurationMs));
+ long currentCount = intervalCount.resetIfExpiredAndTick();
if (whiteList.contains(clientHostId)) {
return 0;
} else if (blackList.contains(clientHostId)) {
return Long.MAX_VALUE;
}
- IntervalCount intervalCount = hostCounts.computeIfAbsent(clientHostId, s -> new IntervalCount(intervalDurationMs));
- return intervalCount.resetIfExpiredAndTick();
+ return currentCount;
}
public void clean() {
diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java
index cf14171..cda4b16 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java
@@ -439,8 +439,6 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
private PreparedStatement getLatestStmt() {
if (latestInsertStmt == null) {
-// latestInsertStmt = new PreparedStatement[DataType.values().length];
-// for (DataType type : DataType.values()) {
latestInsertStmt = prepare(INSERT_INTO + ModelConstants.TS_KV_LATEST_CF +
"(" + ModelConstants.ENTITY_TYPE_COLUMN +
"," + ModelConstants.ENTITY_ID_COLUMN +
@@ -451,7 +449,6 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
"," + ModelConstants.LONG_VALUE_COLUMN +
"," + ModelConstants.DOUBLE_VALUE_COLUMN + ")" +
" VALUES(?, ?, ?, ?, ?, ?, ?, ?)");
-// }
}
return latestInsertStmt;
}
diff --git a/dao/src/main/java/org/thingsboard/server/dao/util/BufferedRateLimiter.java b/dao/src/main/java/org/thingsboard/server/dao/util/BufferedRateLimiter.java
index de07dbf..2acd623 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/util/BufferedRateLimiter.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/util/BufferedRateLimiter.java
@@ -29,6 +29,7 @@ import java.util.concurrent.atomic.AtomicInteger;
@Component
@Slf4j
+@NoSqlDao
public class BufferedRateLimiter implements AsyncRateLimiter {
private final ListeningExecutorService pool = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));
@@ -113,6 +114,9 @@ public class BufferedRateLimiter implements AsyncRateLimiter {
lockedFuture.cancelFuture();
return Futures.immediateFailedFuture(new IllegalStateException("Rate Limit Buffer is full. Reject"));
}
+ if(permits.get() < permitsLimit) {
+ reprocessQueue();
+ }
return lockedFuture.future;
} catch (InterruptedException e) {
return Futures.immediateFailedFuture(new IllegalStateException("Rate Limit Task interrupted. Reject"));
@@ -130,8 +134,8 @@ public class BufferedRateLimiter implements AsyncRateLimiter {
expiredCount++;
}
}
- log.info("Permits maxBuffer is [{}] max concurrent [{}] expired [{}]", maxQueueSize.getAndSet(0),
- maxGrantedPermissions.getAndSet(0), expiredCount);
+ log.info("Permits maxBuffer is [{}] max concurrent [{}] expired [{}] current granted [{}]", maxQueueSize.getAndSet(0),
+ maxGrantedPermissions.getAndSet(0), expiredCount, permits.get());
}
private class LockedFuture {
diff --git a/dao/src/test/resources/cassandra-test.properties b/dao/src/test/resources/cassandra-test.properties
index 82fcbe1..737687f 100644
--- a/dao/src/test/resources/cassandra-test.properties
+++ b/dao/src/test/resources/cassandra-test.properties
@@ -47,3 +47,8 @@ cassandra.query.default_fetch_size=2000
cassandra.query.ts_key_value_partitioning=HOURS
cassandra.query.max_limit_per_request=1000
+cassandra.query.buffer_size=100000
+cassandra.query.concurrent_limit=1000
+cassandra.query.permit_max_wait_time=20000
+cassandra.query.rate_limit_print_interval_ms=30000
+