Details
diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml
index 6cb3621..ec5dde5 100644
--- a/application/src/main/resources/thingsboard.yml
+++ b/application/src/main/resources/thingsboard.yml
@@ -135,12 +135,12 @@ cassandra:
write_consistency_level: "${CASSANDRA_WRITE_CONSISTENCY_LEVEL:ONE}"
default_fetch_size: "${CASSANDRA_DEFAULT_FETCH_SIZE:2000}"
# Specify partitioning size for timestamp key-value storage. Example MINUTES, HOURS, DAYS, MONTHS,INDEFINITE
- ts_key_value_partitioning: "${TS_KV_PARTITIONING:INDEFINITE}"
+ ts_key_value_partitioning: "${TS_KV_PARTITIONING:MONTHS}"
ts_key_value_ttl: "${TS_KV_TTL:0}"
buffer_size: "${CASSANDRA_QUERY_BUFFER_SIZE:200000}"
concurrent_limit: "${CASSANDRA_QUERY_CONCURRENT_LIMIT:1000}"
permit_max_wait_time: "${PERMIT_MAX_WAIT_TIME:120000}"
- rate_limit_print_interval_ms: "${CASSANDRA_QUERY_RATE_LIMIT_PRINT_MS:30000}"
+ rate_limit_print_interval_ms: "${CASSANDRA_QUERY_RATE_LIMIT_PRINT_MS:10000}"
# SQL configuration parameters
sql:
diff --git a/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraAbstractDao.java b/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraAbstractDao.java
index d1af167..b38110b 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraAbstractDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraAbstractDao.java
@@ -35,7 +35,6 @@ import org.thingsboard.server.dao.model.type.ComponentTypeCodec;
import org.thingsboard.server.dao.model.type.DeviceCredentialsTypeCodec;
import org.thingsboard.server.dao.model.type.EntityTypeCodec;
import org.thingsboard.server.dao.model.type.JsonCodec;
-import org.thingsboard.server.dao.util.BufferedRateLimiter;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -49,7 +48,7 @@ public abstract class CassandraAbstractDao {
private ConcurrentMap<String, PreparedStatement> preparedStatementMap = new ConcurrentHashMap<>();
@Autowired
- private BufferedRateLimiter rateLimiter;
+ private CassandraBufferedRateExecutor rateLimiter;
private Session session;
@@ -115,12 +114,12 @@ public abstract class CassandraAbstractDao {
if (statement.getConsistencyLevel() == null) {
statement.setConsistencyLevel(level);
}
- return new RateLimitedResultSetFuture(getSession(), rateLimiter, statement);
+ return rateLimiter.submit(new CassandraStatementTask(getSession(), statement));
}
private static String statementToString(Statement statement) {
if (statement instanceof BoundStatement) {
- return ((BoundStatement)statement).preparedStatement().getQueryString();
+ return ((BoundStatement) statement).preparedStatement().getQueryString();
} else {
return statement.toString();
}
diff --git a/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraBufferedRateExecutor.java b/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraBufferedRateExecutor.java
new file mode 100644
index 0000000..478c76d
--- /dev/null
+++ b/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraBufferedRateExecutor.java
@@ -0,0 +1,78 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.dao.nosql;
+
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.ResultSetFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Component;
+import org.thingsboard.server.dao.nosql.tmp.AbstractBufferedRateExecutor;
+import org.thingsboard.server.dao.nosql.tmp.AsyncTaskContext;
+import org.thingsboard.server.dao.util.NoSqlAnyDao;
+
+import javax.annotation.PreDestroy;
+
+/**
+ * Created by ashvayka on 24.10.18.
+ */
+@Component
+@Slf4j
+@NoSqlAnyDao
+public class CassandraBufferedRateExecutor extends AbstractBufferedRateExecutor<CassandraStatementTask, ResultSetFuture, ResultSet> {
+
+ public CassandraBufferedRateExecutor(
+ @Value("${cassandra.query.buffer_size}") int queueLimit,
+ @Value("${cassandra.query.concurrent_limit}") int concurrencyLimit,
+ @Value("${cassandra.query.permit_max_wait_time}") long maxWaitTime,
+ @Value("${cassandra.query.dispatcher_threads:2}") int dispatcherThreads,
+ @Value("${cassandra.query.callback_threads:2}") int callbackThreads,
+ @Value("${cassandra.query.poll_ms:50}") long pollMs) {
+ super(queueLimit, concurrencyLimit, maxWaitTime, dispatcherThreads, callbackThreads, pollMs);
+ }
+
+ @Scheduled(fixedDelayString = "${cassandra.query.rate_limit_print_interval_ms}")
+ public void printStats() {
+ log.info("Permits totalAdded [{}] totalLaunched [{}] totalReleased [{}] totalFailed [{}] totalExpired [{}] totalRejected [{}] currBuffer [{}] ",
+ totalAdded.getAndSet(0), totalLaunched.getAndSet(0), totalReleased.getAndSet(0),
+ totalFailed.getAndSet(0), totalExpired.getAndSet(0), totalRejected.getAndSet(0),
+ concurrencyLevel.get());
+ }
+
+ @PreDestroy
+ public void stop() {
+ super.stop();
+ }
+
+ @Override
+ protected SettableFuture<ResultSet> create() {
+ return SettableFuture.create();
+ }
+
+ @Override
+ protected ResultSetFuture wrap(CassandraStatementTask task, SettableFuture<ResultSet> future) {
+ return new TbResultSetFuture(future);
+ }
+
+ @Override
+ protected ResultSetFuture execute(AsyncTaskContext<CassandraStatementTask, ResultSet> taskCtx) {
+ CassandraStatementTask task = taskCtx.getTask();
+ return task.getSession().executeAsync(task.getStatement());
+ }
+
+}
diff --git a/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraStatementTask.java b/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraStatementTask.java
new file mode 100644
index 0000000..ea13679
--- /dev/null
+++ b/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraStatementTask.java
@@ -0,0 +1,32 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.dao.nosql;
+
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.Statement;
+import lombok.Data;
+import org.thingsboard.server.dao.nosql.tmp.AsyncTask;
+
+/**
+ * Created by ashvayka on 24.10.18.
+ */
+@Data
+public class CassandraStatementTask implements AsyncTask {
+
+ private final Session session;
+ private final Statement statement;
+
+}
diff --git a/dao/src/main/java/org/thingsboard/server/dao/nosql/TbResultSetFuture.java b/dao/src/main/java/org/thingsboard/server/dao/nosql/TbResultSetFuture.java
new file mode 100644
index 0000000..574a5f5
--- /dev/null
+++ b/dao/src/main/java/org/thingsboard/server/dao/nosql/TbResultSetFuture.java
@@ -0,0 +1,94 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.dao.nosql;
+
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.ResultSetFuture;
+import com.google.common.util.concurrent.SettableFuture;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Created by ashvayka on 24.10.18.
+ */
+public class TbResultSetFuture implements ResultSetFuture {
+
+ private final SettableFuture<ResultSet> mainFuture;
+
+ public TbResultSetFuture(SettableFuture<ResultSet> mainFuture) {
+ this.mainFuture = mainFuture;
+ }
+
+ @Override
+ public ResultSet getUninterruptibly() {
+ return getSafe();
+ }
+
+ @Override
+ public ResultSet getUninterruptibly(long timeout, TimeUnit unit) throws TimeoutException {
+ return getSafe(timeout, unit);
+ }
+
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ return mainFuture.cancel(mayInterruptIfRunning);
+ }
+
+ @Override
+ public boolean isCancelled() {
+ return mainFuture.isCancelled();
+ }
+
+ @Override
+ public boolean isDone() {
+ return mainFuture.isDone();
+ }
+
+ @Override
+ public ResultSet get() throws InterruptedException, ExecutionException {
+ return mainFuture.get();
+ }
+
+ @Override
+ public ResultSet get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+ return mainFuture.get(timeout, unit);
+ }
+
+ @Override
+ public void addListener(Runnable listener, Executor executor) {
+ mainFuture.addListener(listener, executor);
+ }
+
+ private ResultSet getSafe() {
+ try {
+ return mainFuture.get();
+ } catch (InterruptedException | ExecutionException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ private ResultSet getSafe(long timeout, TimeUnit unit) throws TimeoutException {
+ try {
+ return mainFuture.get(timeout, unit);
+ } catch (InterruptedException | ExecutionException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+}
diff --git a/dao/src/main/java/org/thingsboard/server/dao/nosql/tmp/AbstractBufferedRateExecutor.java b/dao/src/main/java/org/thingsboard/server/dao/nosql/tmp/AbstractBufferedRateExecutor.java
new file mode 100644
index 0000000..9ad8e18
--- /dev/null
+++ b/dao/src/main/java/org/thingsboard/server/dao/nosql/tmp/AbstractBufferedRateExecutor.java
@@ -0,0 +1,169 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.dao.nosql.tmp;
+
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import lombok.extern.slf4j.Slf4j;
+
+import javax.annotation.Nullable;
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Created by ashvayka on 24.10.18.
+ */
+@Slf4j
+public abstract class AbstractBufferedRateExecutor<T extends AsyncTask, F extends ListenableFuture<V>, V> implements BufferedRateExecutor<T, F> {
+
+ private final long maxWaitTime;
+ private final long pollMs;
+ private final BlockingQueue<AsyncTaskContext<T, V>> queue;
+ private final ExecutorService dispatcherExecutor;
+ private final ExecutorService callbackExecutor;
+ private final ScheduledExecutorService timeoutExecutor;
+ private final int concurrencyLimit;
+
+ protected final AtomicInteger concurrencyLevel = new AtomicInteger();
+ protected final AtomicInteger totalAdded = new AtomicInteger();
+ protected final AtomicInteger totalLaunched = new AtomicInteger();
+ protected final AtomicInteger totalReleased = new AtomicInteger();
+ protected final AtomicInteger totalFailed = new AtomicInteger();
+ protected final AtomicInteger totalExpired = new AtomicInteger();
+ protected final AtomicInteger totalRejected = new AtomicInteger();
+
+ public AbstractBufferedRateExecutor(int queueLimit, int concurrencyLimit, long maxWaitTime, int dispatcherThreads, int callbackThreads, long pollMs) {
+ this.maxWaitTime = maxWaitTime;
+ this.pollMs = pollMs;
+ this.concurrencyLimit = concurrencyLimit;
+ this.queue = new LinkedBlockingDeque<>(queueLimit);
+ this.dispatcherExecutor = Executors.newFixedThreadPool(dispatcherThreads);
+ this.callbackExecutor = Executors.newFixedThreadPool(callbackThreads);
+ this.timeoutExecutor = Executors.newSingleThreadScheduledExecutor();
+ for (int i = 0; i < dispatcherThreads; i++) {
+ dispatcherExecutor.submit(this::dispatch);
+ }
+ }
+
+ @Override
+ public F submit(T task) {
+ SettableFuture<V> settableFuture = create();
+ F result = wrap(task, settableFuture);
+ try {
+ totalAdded.incrementAndGet();
+ queue.add(new AsyncTaskContext<>(UUID.randomUUID(), task, settableFuture, System.currentTimeMillis()));
+ } catch (IllegalStateException e) {
+ totalRejected.incrementAndGet();
+ settableFuture.setException(e);
+ }
+ return result;
+ }
+
+ public void stop() {
+ if (dispatcherExecutor != null) {
+ dispatcherExecutor.shutdownNow();
+ }
+ if (callbackExecutor != null) {
+ callbackExecutor.shutdownNow();
+ }
+ if (timeoutExecutor != null) {
+ timeoutExecutor.shutdownNow();
+ }
+ }
+
+ protected abstract SettableFuture<V> create();
+
+ protected abstract F wrap(T task, SettableFuture<V> future);
+
+ protected abstract ListenableFuture<V> execute(AsyncTaskContext<T, V> taskCtx);
+
+ private void dispatch() {
+ log.info("Buffered rate executor thread started");
+ while (!Thread.interrupted()) {
+ int curLvl = concurrencyLevel.get();
+ AsyncTaskContext<T, V> taskCtx = null;
+ try {
+ if (curLvl <= concurrencyLimit) {
+ taskCtx = queue.take();
+ final AsyncTaskContext<T, V> finalTaskCtx = taskCtx;
+ logTask("Processing", finalTaskCtx);
+ concurrencyLevel.incrementAndGet();
+ long timeout = finalTaskCtx.getCreateTime() + maxWaitTime - System.currentTimeMillis();
+ if (timeout > 0) {
+ totalLaunched.incrementAndGet();
+ ListenableFuture<V> result = execute(finalTaskCtx);
+ result = Futures.withTimeout(result, timeout, TimeUnit.MILLISECONDS, timeoutExecutor);
+ Futures.addCallback(result, new FutureCallback<V>() {
+ @Override
+ public void onSuccess(@Nullable V result) {
+ logTask("Releasing", finalTaskCtx);
+ totalReleased.incrementAndGet();
+ concurrencyLevel.decrementAndGet();
+ finalTaskCtx.getFuture().set(result);
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ if (t instanceof TimeoutException) {
+ logTask("Expired During Execution", finalTaskCtx);
+ } else {
+ logTask("Failed", finalTaskCtx);
+ }
+ totalFailed.incrementAndGet();
+ concurrencyLevel.decrementAndGet();
+ finalTaskCtx.getFuture().setException(t);
+ log.debug("[{}] Failed to execute task: {}", finalTaskCtx.getId(), finalTaskCtx.getTask(), t);
+ }
+ }, callbackExecutor);
+ } else {
+ logTask("Expired Before Execution", finalTaskCtx);
+ totalExpired.incrementAndGet();
+ concurrencyLevel.decrementAndGet();
+ taskCtx.getFuture().setException(new TimeoutException());
+ }
+ } else {
+ Thread.sleep(pollMs);
+ }
+ } catch (Throwable e) {
+ if (taskCtx != null) {
+ log.debug("[{}] Failed to execute task: {}", taskCtx.getId(), taskCtx, e);
+ totalFailed.incrementAndGet();
+ concurrencyLevel.decrementAndGet();
+ } else {
+ log.debug("Failed to queue task:", e);
+ }
+ }
+ }
+ log.info("Buffered rate executor thread stopped");
+ }
+
+ private void logTask(String action, AsyncTaskContext<T, V> taskCtx) {
+ if (log.isTraceEnabled()) {
+ log.trace("[{}] {} task: {}", taskCtx.getId(), action, taskCtx);
+ } else {
+ log.debug("[{}] {} task", taskCtx.getId(), action);
+ }
+ }
+}
diff --git a/dao/src/main/java/org/thingsboard/server/dao/nosql/tmp/AsyncTask.java b/dao/src/main/java/org/thingsboard/server/dao/nosql/tmp/AsyncTask.java
new file mode 100644
index 0000000..a16568b
--- /dev/null
+++ b/dao/src/main/java/org/thingsboard/server/dao/nosql/tmp/AsyncTask.java
@@ -0,0 +1,22 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.dao.nosql.tmp;
+
+/**
+ * Created by ashvayka on 24.10.18.
+ */
+public interface AsyncTask {
+}
diff --git a/dao/src/main/java/org/thingsboard/server/dao/nosql/tmp/AsyncTaskContext.java b/dao/src/main/java/org/thingsboard/server/dao/nosql/tmp/AsyncTaskContext.java
new file mode 100644
index 0000000..c3c98ee
--- /dev/null
+++ b/dao/src/main/java/org/thingsboard/server/dao/nosql/tmp/AsyncTaskContext.java
@@ -0,0 +1,34 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.dao.nosql.tmp;
+
+import com.google.common.util.concurrent.SettableFuture;
+import lombok.Data;
+
+import java.util.UUID;
+
+/**
+ * Created by ashvayka on 24.10.18.
+ */
+@Data
+public class AsyncTaskContext<T extends AsyncTask, V> {
+
+ private final UUID id;
+ private final T task;
+ private final SettableFuture<V> future;
+ private final long createTime;
+
+}
diff --git a/dao/src/main/java/org/thingsboard/server/dao/nosql/tmp/BufferedRateExecutor.java b/dao/src/main/java/org/thingsboard/server/dao/nosql/tmp/BufferedRateExecutor.java
new file mode 100644
index 0000000..7adc914
--- /dev/null
+++ b/dao/src/main/java/org/thingsboard/server/dao/nosql/tmp/BufferedRateExecutor.java
@@ -0,0 +1,27 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.dao.nosql.tmp;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+/**
+ * Created by ashvayka on 24.10.18.
+ */
+public interface BufferedRateExecutor<T extends AsyncTask, F extends ListenableFuture> {
+
+ F submit(T task);
+
+}