Details
diff --git a/dao/src/main/java/org/thingsboard/server/dao/exception/BufferLimitException.java b/dao/src/main/java/org/thingsboard/server/dao/exception/BufferLimitException.java
new file mode 100644
index 0000000..3334dc6
--- /dev/null
+++ b/dao/src/main/java/org/thingsboard/server/dao/exception/BufferLimitException.java
@@ -0,0 +1,25 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.dao.exception;
+
+public class BufferLimitException extends RuntimeException {
+
+ private static final long serialVersionUID = 4513762009041887588L;
+
+ public BufferLimitException() {
+ super("Rate Limit Buffer is full");
+ }
+}
diff --git a/dao/src/main/java/org/thingsboard/server/dao/nosql/RateLimitedResultSetFuture.java b/dao/src/main/java/org/thingsboard/server/dao/nosql/RateLimitedResultSetFuture.java
index 2674c6d..d250563 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/nosql/RateLimitedResultSetFuture.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/nosql/RateLimitedResultSetFuture.java
@@ -24,6 +24,7 @@ import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.Uninterruptibles;
+import org.thingsboard.server.dao.exception.BufferLimitException;
import org.thingsboard.server.dao.util.AsyncRateLimiter;
import javax.annotation.Nullable;
@@ -35,9 +36,15 @@ public class RateLimitedResultSetFuture implements ResultSetFuture {
private final ListenableFuture<Void> rateLimitFuture;
public RateLimitedResultSetFuture(Session session, AsyncRateLimiter rateLimiter, Statement statement) {
- this.rateLimitFuture = rateLimiter.acquireAsync();
+ this.rateLimitFuture = Futures.withFallback(rateLimiter.acquireAsync(), t -> {
+ if (!(t instanceof BufferLimitException)) {
+ rateLimiter.release();
+ }
+ return Futures.immediateFailedFuture(t);
+ });
this.originalFuture = Futures.transform(rateLimitFuture,
(Function<Void, ResultSetFuture>) i -> executeAsyncWithRelease(rateLimiter, session, statement));
+
}
@Override
@@ -108,10 +115,7 @@ public class RateLimitedResultSetFuture implements ResultSetFuture {
try {
ResultSetFuture resultSetFuture = Uninterruptibles.getUninterruptibly(originalFuture);
resultSetFuture.addListener(listener, executor);
- } catch (CancellationException e) {
- cancel(false);
- return;
- } catch (ExecutionException e) {
+ } catch (CancellationException | ExecutionException e) {
Futures.immediateFailedFuture(e).addListener(listener, executor);
}
}, executor);
diff --git a/dao/src/main/java/org/thingsboard/server/dao/util/BufferedRateLimiter.java b/dao/src/main/java/org/thingsboard/server/dao/util/BufferedRateLimiter.java
index 2acd623..03eb46f 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/util/BufferedRateLimiter.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/util/BufferedRateLimiter.java
@@ -23,6 +23,7 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
+import org.thingsboard.server.dao.exception.BufferLimitException;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
@@ -41,6 +42,9 @@ public class BufferedRateLimiter implements AsyncRateLimiter {
private final AtomicInteger maxQueueSize = new AtomicInteger();
private final AtomicInteger maxGrantedPermissions = new AtomicInteger();
+ private final AtomicInteger totalGranted = new AtomicInteger();
+ private final AtomicInteger totalReleased = new AtomicInteger();
+ private final AtomicInteger totalRequested = new AtomicInteger();
public BufferedRateLimiter(@Value("${cassandra.query.buffer_size}") int queueLimit,
@Value("${cassandra.query.concurrent_limit}") int permitsLimit,
@@ -53,11 +57,13 @@ public class BufferedRateLimiter implements AsyncRateLimiter {
@Override
public ListenableFuture<Void> acquireAsync() {
+ totalRequested.incrementAndGet();
if (queue.isEmpty()) {
if (permits.incrementAndGet() <= permitsLimit) {
if (permits.get() > maxGrantedPermissions.get()) {
maxGrantedPermissions.set(permits.get());
}
+ totalGranted.incrementAndGet();
return Futures.immediateFuture(null);
}
permits.decrementAndGet();
@@ -69,6 +75,7 @@ public class BufferedRateLimiter implements AsyncRateLimiter {
@Override
public void release() {
permits.decrementAndGet();
+ totalReleased.incrementAndGet();
reprocessQueue();
}
@@ -80,6 +87,7 @@ public class BufferedRateLimiter implements AsyncRateLimiter {
}
LockedFuture lockedFuture = queue.poll();
if (lockedFuture != null) {
+ totalGranted.incrementAndGet();
lockedFuture.latch.countDown();
} else {
permits.decrementAndGet();
@@ -112,17 +120,17 @@ public class BufferedRateLimiter implements AsyncRateLimiter {
LockedFuture lockedFuture = createLockedFuture();
if (!queue.offer(lockedFuture, 1, TimeUnit.SECONDS)) {
lockedFuture.cancelFuture();
- return Futures.immediateFailedFuture(new IllegalStateException("Rate Limit Buffer is full. Reject"));
+ return Futures.immediateFailedFuture(new BufferLimitException());
}
if(permits.get() < permitsLimit) {
reprocessQueue();
}
return lockedFuture.future;
} catch (InterruptedException e) {
- return Futures.immediateFailedFuture(new IllegalStateException("Rate Limit Task interrupted. Reject"));
+ return Futures.immediateFailedFuture(new BufferLimitException());
}
}
- return Futures.immediateFailedFuture(new IllegalStateException("Rate Limit Buffer is full. Reject"));
+ return Futures.immediateFailedFuture(new BufferLimitException());
}
@Scheduled(fixedDelayString = "${cassandra.query.rate_limit_print_interval_ms}")
@@ -134,8 +142,11 @@ public class BufferedRateLimiter implements AsyncRateLimiter {
expiredCount++;
}
}
- log.info("Permits maxBuffer is [{}] max concurrent [{}] expired [{}] current granted [{}]", maxQueueSize.getAndSet(0),
- maxGrantedPermissions.getAndSet(0), expiredCount, permits.get());
+ log.info("Permits maxBuffer [{}] maxPermits [{}] expired [{}] currPermits [{}] currBuffer [{}] " +
+ "totalPermits [{}] totalRequests [{}] totalReleased [{}]",
+ maxQueueSize.getAndSet(0), maxGrantedPermissions.getAndSet(0), expiredCount,
+ permits.get(), queue.size(),
+ totalGranted.getAndSet(0), totalRequested.getAndSet(0), totalReleased.getAndSet(0));
}
private class LockedFuture {
diff --git a/dao/src/test/java/org/thingsboard/server/dao/nosql/RateLimitedResultSetFutureTest.java b/dao/src/test/java/org/thingsboard/server/dao/nosql/RateLimitedResultSetFutureTest.java
index fa62c2b..f49668d 100644
--- a/dao/src/test/java/org/thingsboard/server/dao/nosql/RateLimitedResultSetFutureTest.java
+++ b/dao/src/test/java/org/thingsboard/server/dao/nosql/RateLimitedResultSetFutureTest.java
@@ -19,16 +19,17 @@ import com.datastax.driver.core.*;
import com.datastax.driver.core.exceptions.UnsupportedFeatureException;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.runners.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
+import org.thingsboard.server.dao.exception.BufferLimitException;
import org.thingsboard.server.dao.util.AsyncRateLimiter;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeoutException;
+import java.util.concurrent.*;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
@@ -53,7 +54,7 @@ public class RateLimitedResultSetFutureTest {
@Test
public void doNotReleasePermissionIfRateLimitFutureFailed() throws InterruptedException {
- when(rateLimiter.acquireAsync()).thenReturn(Futures.immediateFailedFuture(new IllegalArgumentException()));
+ when(rateLimiter.acquireAsync()).thenReturn(Futures.immediateFailedFuture(new BufferLimitException()));
resultSetFuture = new RateLimitedResultSetFuture(session, rateLimiter, statement);
Thread.sleep(1000L);
verify(rateLimiter).acquireAsync();
@@ -153,4 +154,29 @@ public class RateLimitedResultSetFutureTest {
verify(rateLimiter, times(1)).release();
}
+ @Test
+ public void expiredQueryReturnPermit() throws InterruptedException, ExecutionException {
+ CountDownLatch latch = new CountDownLatch(1);
+ ListenableFuture<Void> future = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1)).submit(() -> {
+ latch.await();
+ return null;
+ });
+ when(rateLimiter.acquireAsync()).thenReturn(future);
+ resultSetFuture = new RateLimitedResultSetFuture(session, rateLimiter, statement);
+
+ ListenableFuture<Row> transform = Futures.transform(resultSetFuture, ResultSet::one);
+// TimeUnit.MILLISECONDS.sleep(200);
+ future.cancel(false);
+ latch.countDown();
+
+ try {
+ transform.get();
+ fail();
+ } catch (Exception e) {
+ assertTrue(e instanceof ExecutionException);
+ }
+ verify(rateLimiter, times(1)).acquireAsync();
+ verify(rateLimiter, times(1)).release();
+ }
+
}
\ No newline at end of file
diff --git a/dao/src/test/java/org/thingsboard/server/dao/util/BufferedRateLimiterTest.java b/dao/src/test/java/org/thingsboard/server/dao/util/BufferedRateLimiterTest.java
index 5bfc3b6..67c3ce8 100644
--- a/dao/src/test/java/org/thingsboard/server/dao/util/BufferedRateLimiterTest.java
+++ b/dao/src/test/java/org/thingsboard/server/dao/util/BufferedRateLimiterTest.java
@@ -17,6 +17,7 @@ package org.thingsboard.server.dao.util;
import com.google.common.util.concurrent.*;
import org.junit.Test;
+import org.thingsboard.server.dao.exception.BufferLimitException;
import javax.annotation.Nullable;
import java.util.concurrent.ExecutionException;
@@ -61,8 +62,8 @@ public class BufferedRateLimiterTest {
} catch (Exception e) {
assertTrue(e instanceof ExecutionException);
Throwable actualCause = e.getCause();
- assertTrue(actualCause instanceof IllegalStateException);
- assertEquals("Rate Limit Buffer is full. Reject", actualCause.getMessage());
+ assertTrue(actualCause instanceof BufferLimitException);
+ assertEquals("Rate Limit Buffer is full", actualCause.getMessage());
}
}