thingsboard-aplcache

Details

diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml
index 42191e5..52f6395 100644
--- a/application/src/main/resources/thingsboard.yml
+++ b/application/src/main/resources/thingsboard.yml
@@ -145,7 +145,8 @@ cassandra:
     rate_limit_print_interval_ms: "${CASSANDRA_QUERY_RATE_LIMIT_PRINT_MS:10000}"
     tenant_rate_limits:
       enabled: "${CASSANDRA_QUERY_TENANT_RATE_LIMITS_ENABLED:false}"
-      configuration: "${CASSANDRA_QUERY_TENANT_RATE_LIMITS_VALUE:1000:1,30000:60}"
+      configuration: "${CASSANDRA_QUERY_TENANT_RATE_LIMITS_CONFIGURATION:1000:1,30000:60}"
+      print_tenant_names: "${CASSANDRA_QUERY_TENANT_RATE_LIMITS_PRINT_TENANT_NAMES:false}"
 
 # SQL configuration parameters
 sql:
diff --git a/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraBufferedRateExecutor.java b/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraBufferedRateExecutor.java
index 05370ef..be40bf4 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraBufferedRateExecutor.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraBufferedRateExecutor.java
@@ -17,21 +17,24 @@ package org.thingsboard.server.dao.nosql;
 
 import com.datastax.driver.core.ResultSet;
 import com.datastax.driver.core.ResultSetFuture;
+import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.SettableFuture;
 import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.stereotype.Component;
-import org.thingsboard.server.common.data.EntityType;
 import org.thingsboard.server.common.data.id.TenantId;
-import org.thingsboard.server.common.msg.tools.TbRateLimits;
+import org.thingsboard.server.dao.entity.EntityService;
+import org.thingsboard.server.dao.tenant.TenantService;
 import org.thingsboard.server.dao.util.AbstractBufferedRateExecutor;
 import org.thingsboard.server.dao.util.AsyncTaskContext;
 import org.thingsboard.server.dao.util.NoSqlAnyDao;
 
 import javax.annotation.PreDestroy;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
 
 /**
  * Created by ashvayka on 24.10.18.
@@ -41,6 +44,12 @@ import java.util.concurrent.ConcurrentMap;
 @NoSqlAnyDao
 public class CassandraBufferedRateExecutor extends AbstractBufferedRateExecutor<CassandraStatementTask, ResultSetFuture, ResultSet> {
 
+    @Autowired
+    private EntityService entityService;
+    private Map<TenantId, String> tenantNamesCache = new HashMap<>();
+
+    private boolean printTenantNames;
+
     public CassandraBufferedRateExecutor(
             @Value("${cassandra.query.buffer_size}") int queueLimit,
             @Value("${cassandra.query.concurrent_limit}") int concurrencyLimit,
@@ -49,17 +58,37 @@ public class CassandraBufferedRateExecutor extends AbstractBufferedRateExecutor<
             @Value("${cassandra.query.callback_threads:2}") int callbackThreads,
             @Value("${cassandra.query.poll_ms:50}") long pollMs,
             @Value("${cassandra.query.tenant_rate_limits.enabled}") boolean tenantRateLimitsEnabled,
-            @Value("${cassandra.query.tenant_rate_limits.configuration}") String tenantRateLimitsConfiguration) {
+            @Value("${cassandra.query.tenant_rate_limits.configuration}") String tenantRateLimitsConfiguration,
+            @Value("${cassandra.query.tenant_rate_limits.print_tenant_names}") boolean printTenantNames) {
         super(queueLimit, concurrencyLimit, maxWaitTime, dispatcherThreads, callbackThreads, pollMs, tenantRateLimitsEnabled, tenantRateLimitsConfiguration);
+        this.printTenantNames = printTenantNames;
     }
 
     @Scheduled(fixedDelayString = "${cassandra.query.rate_limit_print_interval_ms}")
     public void printStats() {
-        log.info("Permits queueSize [{}] totalAdded [{}] totalLaunched [{}] totalReleased [{}] totalFailed [{}] totalExpired [{}] totalRejected [{}] totalRateLimited [{}] currBuffer [{}] ",
+        log.info("Permits queueSize [{}] totalAdded [{}] totalLaunched [{}] totalReleased [{}] totalFailed [{}] totalExpired [{}] totalRejected [{}] " +
+                        "totalRateLimited [{}] totalRateLimitedTenants [{}] currBuffer [{}] ",
                 getQueueSize(),
                 totalAdded.getAndSet(0), totalLaunched.getAndSet(0), totalReleased.getAndSet(0),
                 totalFailed.getAndSet(0), totalExpired.getAndSet(0), totalRejected.getAndSet(0),
-                totalRateLimited.getAndSet(0), concurrencyLevel.get());
+                totalRateLimited.getAndSet(0), rateLimitedTenants.size(), concurrencyLevel.get());
+
+        rateLimitedTenants.forEach(((tenantId, counter) -> {
+            if (printTenantNames) {
+                String name = tenantNamesCache.computeIfAbsent(tenantId, tId -> {
+                    try {
+                        return entityService.fetchEntityNameAsync(TenantId.SYS_TENANT_ID, tenantId).get();
+                    } catch (Exception e) {
+                        log.error("[{}] Failed to get tenant name", tenantId, e);
+                        return "N/A";
+                    }
+                });
+                log.info("[{}][{}] Rate limited requests: {}", tenantId, name, counter);
+            } else {
+                log.info("[{}] Rate limited requests: {}", tenantId, counter);
+            }
+        }));
+        rateLimitedTenants.clear();
     }
 
     @PreDestroy
diff --git a/dao/src/main/java/org/thingsboard/server/dao/util/AbstractBufferedRateExecutor.java b/dao/src/main/java/org/thingsboard/server/dao/util/AbstractBufferedRateExecutor.java
index c4c9b60..65848fc 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/util/AbstractBufferedRateExecutor.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/util/AbstractBufferedRateExecutor.java
@@ -25,6 +25,7 @@ import org.thingsboard.server.common.data.id.TenantId;
 import org.thingsboard.server.common.msg.tools.TbRateLimits;
 
 import javax.annotation.Nullable;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
@@ -36,6 +37,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * Created by ashvayka on 24.10.18.
@@ -53,6 +55,7 @@ public abstract class AbstractBufferedRateExecutor<T extends AsyncTask, F extend
     private final boolean perTenantLimitsEnabled;
     private final String perTenantLimitsConfiguration;
     private final ConcurrentMap<TenantId, TbRateLimits> perTenantLimits = new ConcurrentHashMap<>();
+    protected final ConcurrentMap<TenantId, AtomicInteger> rateLimitedTenants = new ConcurrentHashMap<>();
 
     protected final AtomicInteger concurrencyLevel = new AtomicInteger();
     protected final AtomicInteger totalAdded = new AtomicInteger();
@@ -90,6 +93,7 @@ public abstract class AbstractBufferedRateExecutor<T extends AsyncTask, F extend
             } else if (!task.getTenantId().isNullUid()) {
                 TbRateLimits rateLimits = perTenantLimits.computeIfAbsent(task.getTenantId(), id -> new TbRateLimits(perTenantLimitsConfiguration));
                 if (!rateLimits.tryConsume()) {
+                    rateLimitedTenants.computeIfAbsent(task.getTenantId(), tId -> new AtomicInteger(0)).incrementAndGet();
                     totalRateLimited.incrementAndGet();
                     settableFuture.setException(new TenantRateLimitException());
                     perTenantLimitReached = true;