thingsboard-aplcache

Details

diff --git a/application/src/main/java/org/thingsboard/server/actors/plugin/PluginActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/plugin/PluginActorMessageProcessor.java
index 7dfe9a5..2af30d4 100644
--- a/application/src/main/java/org/thingsboard/server/actors/plugin/PluginActorMessageProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/plugin/PluginActorMessageProcessor.java
@@ -106,6 +106,7 @@ public class PluginActorMessageProcessor extends ComponentMsgProcessor<PluginId>
             try {
                 pluginImpl.process(trustedCtx, msg.getRuleTenantId(), msg.getRuleId(), msg.getMsg());
             } catch (Exception ex) {
+                logger.debug("[{}] Failed to process RuleToPlugin msg: [{}] [{}]", tenantId, msg.getMsg(), ex);
                 RuleToPluginMsg ruleMsg = msg.getMsg();
                 MsgType responceMsgType = MsgType.RULE_ENGINE_ERROR;
                 Integer requestId = 0;
diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml
index 07face1..c47fc28 100644
--- a/application/src/main/resources/thingsboard.yml
+++ b/application/src/main/resources/thingsboard.yml
@@ -133,7 +133,7 @@ quota:
     intervalMin: 2
 
 database:
-  type: "${DATABASE_TYPE:cassandra}" # cassandra OR sql
+  type: "${DATABASE_TYPE:sql}" # cassandra OR sql
 
 # Cassandra driver configuration parameters
 cassandra:
@@ -226,7 +226,7 @@ caffeine:
   specs:
     relations:
       timeToLiveInMinutes: 1440
-      maxSize: 0
+      maxSize: 100000
     deviceCredentials:
       timeToLiveInMinutes: 1440
       maxSize: 100000
diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/inmemory/HostRequestIntervalRegistry.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/inmemory/HostRequestIntervalRegistry.java
index 8d254a0..3782ed2 100644
--- a/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/inmemory/HostRequestIntervalRegistry.java
+++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/inmemory/HostRequestIntervalRegistry.java
@@ -61,13 +61,14 @@ public class HostRequestIntervalRegistry {
     }
 
     public long tick(String clientHostId) {
+        IntervalCount intervalCount = hostCounts.computeIfAbsent(clientHostId, s -> new IntervalCount(intervalDurationMs));
+        long currentCount = intervalCount.resetIfExpiredAndTick();
         if (whiteList.contains(clientHostId)) {
             return 0;
         } else if (blackList.contains(clientHostId)) {
             return Long.MAX_VALUE;
         }
-        IntervalCount intervalCount = hostCounts.computeIfAbsent(clientHostId, s -> new IntervalCount(intervalDurationMs));
-        return intervalCount.resetIfExpiredAndTick();
+        return currentCount;
     }
 
     public void clean() {
diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java
index cf14171..cda4b16 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java
@@ -439,8 +439,6 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
 
     private PreparedStatement getLatestStmt() {
         if (latestInsertStmt == null) {
-//            latestInsertStmt = new PreparedStatement[DataType.values().length];
-//            for (DataType type : DataType.values()) {
             latestInsertStmt = prepare(INSERT_INTO + ModelConstants.TS_KV_LATEST_CF +
                     "(" + ModelConstants.ENTITY_TYPE_COLUMN +
                     "," + ModelConstants.ENTITY_ID_COLUMN +
@@ -451,7 +449,6 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
                     "," + ModelConstants.LONG_VALUE_COLUMN +
                     "," + ModelConstants.DOUBLE_VALUE_COLUMN + ")" +
                     " VALUES(?, ?, ?, ?, ?, ?, ?, ?)");
-//            }
         }
         return latestInsertStmt;
     }
diff --git a/dao/src/main/java/org/thingsboard/server/dao/util/BufferedRateLimiter.java b/dao/src/main/java/org/thingsboard/server/dao/util/BufferedRateLimiter.java
index de07dbf..2acd623 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/util/BufferedRateLimiter.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/util/BufferedRateLimiter.java
@@ -29,6 +29,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 @Component
 @Slf4j
+@NoSqlDao
 public class BufferedRateLimiter implements AsyncRateLimiter {
 
     private final ListeningExecutorService pool = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));
@@ -113,6 +114,9 @@ public class BufferedRateLimiter implements AsyncRateLimiter {
                     lockedFuture.cancelFuture();
                     return Futures.immediateFailedFuture(new IllegalStateException("Rate Limit Buffer is full. Reject"));
                 }
+                if(permits.get() < permitsLimit) {
+                    reprocessQueue();
+                }
                 return lockedFuture.future;
             } catch (InterruptedException e) {
                 return Futures.immediateFailedFuture(new IllegalStateException("Rate Limit Task interrupted. Reject"));
@@ -130,8 +134,8 @@ public class BufferedRateLimiter implements AsyncRateLimiter {
                 expiredCount++;
             }
         }
-        log.info("Permits maxBuffer is [{}] max concurrent [{}] expired [{}]", maxQueueSize.getAndSet(0),
-                maxGrantedPermissions.getAndSet(0), expiredCount);
+        log.info("Permits maxBuffer is [{}] max concurrent [{}] expired [{}] current granted [{}]", maxQueueSize.getAndSet(0),
+                maxGrantedPermissions.getAndSet(0), expiredCount, permits.get());
     }
 
     private class LockedFuture {
diff --git a/dao/src/test/resources/cassandra-test.properties b/dao/src/test/resources/cassandra-test.properties
index 82fcbe1..737687f 100644
--- a/dao/src/test/resources/cassandra-test.properties
+++ b/dao/src/test/resources/cassandra-test.properties
@@ -47,3 +47,8 @@ cassandra.query.default_fetch_size=2000
 cassandra.query.ts_key_value_partitioning=HOURS
 
 cassandra.query.max_limit_per_request=1000
+cassandra.query.buffer_size=100000
+cassandra.query.concurrent_limit=1000
+cassandra.query.permit_max_wait_time=20000
+cassandra.query.rate_limit_print_interval_ms=30000
+