thingsboard-aplcache

Details

diff --git a/dao/src/main/java/org/thingsboard/server/dao/exception/BufferLimitException.java b/dao/src/main/java/org/thingsboard/server/dao/exception/BufferLimitException.java
new file mode 100644
index 0000000..3334dc6
--- /dev/null
+++ b/dao/src/main/java/org/thingsboard/server/dao/exception/BufferLimitException.java
@@ -0,0 +1,25 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.dao.exception;
+
+public class BufferLimitException extends RuntimeException {
+
+    private static final long serialVersionUID = 4513762009041887588L;
+
+    public BufferLimitException() {
+        super("Rate Limit Buffer is full");
+    }
+}
diff --git a/dao/src/main/java/org/thingsboard/server/dao/nosql/RateLimitedResultSetFuture.java b/dao/src/main/java/org/thingsboard/server/dao/nosql/RateLimitedResultSetFuture.java
index 2674c6d..d250563 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/nosql/RateLimitedResultSetFuture.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/nosql/RateLimitedResultSetFuture.java
@@ -24,6 +24,7 @@ import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.Uninterruptibles;
+import org.thingsboard.server.dao.exception.BufferLimitException;
 import org.thingsboard.server.dao.util.AsyncRateLimiter;
 
 import javax.annotation.Nullable;
@@ -35,9 +36,15 @@ public class RateLimitedResultSetFuture implements ResultSetFuture {
     private final ListenableFuture<Void> rateLimitFuture;
 
     public RateLimitedResultSetFuture(Session session, AsyncRateLimiter rateLimiter, Statement statement) {
-        this.rateLimitFuture = rateLimiter.acquireAsync();
+        this.rateLimitFuture = Futures.withFallback(rateLimiter.acquireAsync(), t -> {
+            if (!(t instanceof BufferLimitException)) {
+                rateLimiter.release();
+            }
+            return Futures.immediateFailedFuture(t);
+        });
         this.originalFuture = Futures.transform(rateLimitFuture,
                 (Function<Void, ResultSetFuture>) i -> executeAsyncWithRelease(rateLimiter, session, statement));
+
     }
 
     @Override
@@ -108,10 +115,7 @@ public class RateLimitedResultSetFuture implements ResultSetFuture {
             try {
                 ResultSetFuture resultSetFuture = Uninterruptibles.getUninterruptibly(originalFuture);
                 resultSetFuture.addListener(listener, executor);
-            } catch (CancellationException e) {
-                cancel(false);
-                return;
-            } catch (ExecutionException e) {
+            } catch (CancellationException | ExecutionException e) {
                 Futures.immediateFailedFuture(e).addListener(listener, executor);
             }
         }, executor);
diff --git a/dao/src/main/java/org/thingsboard/server/dao/util/BufferedRateLimiter.java b/dao/src/main/java/org/thingsboard/server/dao/util/BufferedRateLimiter.java
index 2acd623..03eb46f 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/util/BufferedRateLimiter.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/util/BufferedRateLimiter.java
@@ -23,6 +23,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.stereotype.Component;
+import org.thingsboard.server.dao.exception.BufferLimitException;
 
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -41,6 +42,9 @@ public class BufferedRateLimiter implements AsyncRateLimiter {
 
     private final AtomicInteger maxQueueSize = new AtomicInteger();
     private final AtomicInteger maxGrantedPermissions = new AtomicInteger();
+    private final AtomicInteger totalGranted = new AtomicInteger();
+    private final AtomicInteger totalReleased = new AtomicInteger();
+    private final AtomicInteger totalRequested = new AtomicInteger();
 
     public BufferedRateLimiter(@Value("${cassandra.query.buffer_size}") int queueLimit,
                                @Value("${cassandra.query.concurrent_limit}") int permitsLimit,
@@ -53,11 +57,13 @@ public class BufferedRateLimiter implements AsyncRateLimiter {
 
     @Override
     public ListenableFuture<Void> acquireAsync() {
+        totalRequested.incrementAndGet();
         if (queue.isEmpty()) {
             if (permits.incrementAndGet() <= permitsLimit) {
                 if (permits.get() > maxGrantedPermissions.get()) {
                     maxGrantedPermissions.set(permits.get());
                 }
+                totalGranted.incrementAndGet();
                 return Futures.immediateFuture(null);
             }
             permits.decrementAndGet();
@@ -69,6 +75,7 @@ public class BufferedRateLimiter implements AsyncRateLimiter {
     @Override
     public void release() {
         permits.decrementAndGet();
+        totalReleased.incrementAndGet();
         reprocessQueue();
     }
 
@@ -80,6 +87,7 @@ public class BufferedRateLimiter implements AsyncRateLimiter {
                 }
                 LockedFuture lockedFuture = queue.poll();
                 if (lockedFuture != null) {
+                    totalGranted.incrementAndGet();
                     lockedFuture.latch.countDown();
                 } else {
                     permits.decrementAndGet();
@@ -112,17 +120,17 @@ public class BufferedRateLimiter implements AsyncRateLimiter {
                 LockedFuture lockedFuture = createLockedFuture();
                 if (!queue.offer(lockedFuture, 1, TimeUnit.SECONDS)) {
                     lockedFuture.cancelFuture();
-                    return Futures.immediateFailedFuture(new IllegalStateException("Rate Limit Buffer is full. Reject"));
+                    return Futures.immediateFailedFuture(new BufferLimitException());
                 }
                 if(permits.get() < permitsLimit) {
                     reprocessQueue();
                 }
                 return lockedFuture.future;
             } catch (InterruptedException e) {
-                return Futures.immediateFailedFuture(new IllegalStateException("Rate Limit Task interrupted. Reject"));
+                return Futures.immediateFailedFuture(new BufferLimitException());
             }
         }
-        return Futures.immediateFailedFuture(new IllegalStateException("Rate Limit Buffer is full. Reject"));
+        return Futures.immediateFailedFuture(new BufferLimitException());
     }
 
     @Scheduled(fixedDelayString = "${cassandra.query.rate_limit_print_interval_ms}")
@@ -134,8 +142,11 @@ public class BufferedRateLimiter implements AsyncRateLimiter {
                 expiredCount++;
             }
         }
-        log.info("Permits maxBuffer is [{}] max concurrent [{}] expired [{}] current granted [{}]", maxQueueSize.getAndSet(0),
-                maxGrantedPermissions.getAndSet(0), expiredCount, permits.get());
+        log.info("Permits maxBuffer [{}] maxPermits [{}] expired [{}] currPermits [{}] currBuffer [{}] " +
+                        "totalPermits [{}] totalRequests [{}] totalReleased [{}]",
+                maxQueueSize.getAndSet(0), maxGrantedPermissions.getAndSet(0), expiredCount,
+                permits.get(), queue.size(),
+                totalGranted.getAndSet(0), totalRequested.getAndSet(0), totalReleased.getAndSet(0));
     }
 
     private class LockedFuture {
diff --git a/dao/src/test/java/org/thingsboard/server/dao/nosql/RateLimitedResultSetFutureTest.java b/dao/src/test/java/org/thingsboard/server/dao/nosql/RateLimitedResultSetFutureTest.java
index fa62c2b..f49668d 100644
--- a/dao/src/test/java/org/thingsboard/server/dao/nosql/RateLimitedResultSetFutureTest.java
+++ b/dao/src/test/java/org/thingsboard/server/dao/nosql/RateLimitedResultSetFutureTest.java
@@ -19,16 +19,17 @@ import com.datastax.driver.core.*;
 import com.datastax.driver.core.exceptions.UnsupportedFeatureException;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.Mock;
 import org.mockito.Mockito;
 import org.mockito.runners.MockitoJUnitRunner;
 import org.mockito.stubbing.Answer;
+import org.thingsboard.server.dao.exception.BufferLimitException;
 import org.thingsboard.server.dao.util.AsyncRateLimiter;
 
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeoutException;
+import java.util.concurrent.*;
 
 import static org.junit.Assert.*;
 import static org.mockito.Mockito.*;
@@ -53,7 +54,7 @@ public class RateLimitedResultSetFutureTest {
 
     @Test
     public void doNotReleasePermissionIfRateLimitFutureFailed() throws InterruptedException {
-        when(rateLimiter.acquireAsync()).thenReturn(Futures.immediateFailedFuture(new IllegalArgumentException()));
+        when(rateLimiter.acquireAsync()).thenReturn(Futures.immediateFailedFuture(new BufferLimitException()));
         resultSetFuture = new RateLimitedResultSetFuture(session, rateLimiter, statement);
         Thread.sleep(1000L);
         verify(rateLimiter).acquireAsync();
@@ -153,4 +154,29 @@ public class RateLimitedResultSetFutureTest {
         verify(rateLimiter, times(1)).release();
     }
 
+    @Test
+    public void expiredQueryReturnPermit() throws InterruptedException, ExecutionException {
+        CountDownLatch latch = new CountDownLatch(1);
+        ListenableFuture<Void> future = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1)).submit(() -> {
+            latch.await();
+            return null;
+        });
+        when(rateLimiter.acquireAsync()).thenReturn(future);
+        resultSetFuture = new RateLimitedResultSetFuture(session, rateLimiter, statement);
+
+        ListenableFuture<Row> transform = Futures.transform(resultSetFuture, ResultSet::one);
+//        TimeUnit.MILLISECONDS.sleep(200);
+        future.cancel(false);
+        latch.countDown();
+
+        try {
+            transform.get();
+            fail();
+        } catch (Exception e) {
+            assertTrue(e instanceof ExecutionException);
+        }
+        verify(rateLimiter, times(1)).acquireAsync();
+        verify(rateLimiter, times(1)).release();
+    }
+
 }
\ No newline at end of file
diff --git a/dao/src/test/java/org/thingsboard/server/dao/util/BufferedRateLimiterTest.java b/dao/src/test/java/org/thingsboard/server/dao/util/BufferedRateLimiterTest.java
index 5bfc3b6..67c3ce8 100644
--- a/dao/src/test/java/org/thingsboard/server/dao/util/BufferedRateLimiterTest.java
+++ b/dao/src/test/java/org/thingsboard/server/dao/util/BufferedRateLimiterTest.java
@@ -17,6 +17,7 @@ package org.thingsboard.server.dao.util;
 
 import com.google.common.util.concurrent.*;
 import org.junit.Test;
+import org.thingsboard.server.dao.exception.BufferLimitException;
 
 import javax.annotation.Nullable;
 import java.util.concurrent.ExecutionException;
@@ -61,8 +62,8 @@ public class BufferedRateLimiterTest {
         } catch (Exception e) {
             assertTrue(e instanceof ExecutionException);
             Throwable actualCause = e.getCause();
-            assertTrue(actualCause instanceof IllegalStateException);
-            assertEquals("Rate Limit Buffer is full. Reject", actualCause.getMessage());
+            assertTrue(actualCause instanceof BufferLimitException);
+            assertEquals("Rate Limit Buffer is full", actualCause.getMessage());
         }
     }