thingsboard-memoizeit

Details

diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml
index 6cb3621..ec5dde5 100644
--- a/application/src/main/resources/thingsboard.yml
+++ b/application/src/main/resources/thingsboard.yml
@@ -135,12 +135,12 @@ cassandra:
     write_consistency_level: "${CASSANDRA_WRITE_CONSISTENCY_LEVEL:ONE}"
     default_fetch_size: "${CASSANDRA_DEFAULT_FETCH_SIZE:2000}"
     # Specify partitioning size for timestamp key-value storage. Example MINUTES, HOURS, DAYS, MONTHS,INDEFINITE
-    ts_key_value_partitioning: "${TS_KV_PARTITIONING:INDEFINITE}"
+    ts_key_value_partitioning: "${TS_KV_PARTITIONING:MONTHS}"
     ts_key_value_ttl: "${TS_KV_TTL:0}"
     buffer_size: "${CASSANDRA_QUERY_BUFFER_SIZE:200000}"
     concurrent_limit: "${CASSANDRA_QUERY_CONCURRENT_LIMIT:1000}"
     permit_max_wait_time: "${PERMIT_MAX_WAIT_TIME:120000}"
-    rate_limit_print_interval_ms: "${CASSANDRA_QUERY_RATE_LIMIT_PRINT_MS:30000}"
+    rate_limit_print_interval_ms: "${CASSANDRA_QUERY_RATE_LIMIT_PRINT_MS:10000}"
 
 # SQL configuration parameters
 sql:
diff --git a/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraAbstractDao.java b/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraAbstractDao.java
index d1af167..b38110b 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraAbstractDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraAbstractDao.java
@@ -35,7 +35,6 @@ import org.thingsboard.server.dao.model.type.ComponentTypeCodec;
 import org.thingsboard.server.dao.model.type.DeviceCredentialsTypeCodec;
 import org.thingsboard.server.dao.model.type.EntityTypeCodec;
 import org.thingsboard.server.dao.model.type.JsonCodec;
-import org.thingsboard.server.dao.util.BufferedRateLimiter;
 
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -49,7 +48,7 @@ public abstract class CassandraAbstractDao {
     private ConcurrentMap<String, PreparedStatement> preparedStatementMap = new ConcurrentHashMap<>();
 
     @Autowired
-    private BufferedRateLimiter rateLimiter;
+    private CassandraBufferedRateExecutor rateLimiter;
 
     private Session session;
 
@@ -115,12 +114,12 @@ public abstract class CassandraAbstractDao {
         if (statement.getConsistencyLevel() == null) {
             statement.setConsistencyLevel(level);
         }
-        return new RateLimitedResultSetFuture(getSession(), rateLimiter, statement);
+        return rateLimiter.submit(new CassandraStatementTask(getSession(), statement));
     }
 
     private static String statementToString(Statement statement) {
         if (statement instanceof BoundStatement) {
-            return ((BoundStatement)statement).preparedStatement().getQueryString();
+            return ((BoundStatement) statement).preparedStatement().getQueryString();
         } else {
             return statement.toString();
         }
diff --git a/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraBufferedRateExecutor.java b/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraBufferedRateExecutor.java
new file mode 100644
index 0000000..478c76d
--- /dev/null
+++ b/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraBufferedRateExecutor.java
@@ -0,0 +1,78 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.dao.nosql;
+
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.ResultSetFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Component;
+import org.thingsboard.server.dao.nosql.tmp.AbstractBufferedRateExecutor;
+import org.thingsboard.server.dao.nosql.tmp.AsyncTaskContext;
+import org.thingsboard.server.dao.util.NoSqlAnyDao;
+
+import javax.annotation.PreDestroy;
+
+/**
+ * Created by ashvayka on 24.10.18.
+ */
+@Component
+@Slf4j
+@NoSqlAnyDao
+public class CassandraBufferedRateExecutor extends AbstractBufferedRateExecutor<CassandraStatementTask, ResultSetFuture, ResultSet> {
+
+    public CassandraBufferedRateExecutor(
+            @Value("${cassandra.query.buffer_size}") int queueLimit,
+            @Value("${cassandra.query.concurrent_limit}") int concurrencyLimit,
+            @Value("${cassandra.query.permit_max_wait_time}") long maxWaitTime,
+            @Value("${cassandra.query.dispatcher_threads:2}") int dispatcherThreads,
+            @Value("${cassandra.query.callback_threads:2}") int callbackThreads,
+            @Value("${cassandra.query.poll_ms:50}") long pollMs) {
+        super(queueLimit, concurrencyLimit, maxWaitTime, dispatcherThreads, callbackThreads, pollMs);
+    }
+
+    @Scheduled(fixedDelayString = "${cassandra.query.rate_limit_print_interval_ms}")
+    public void printStats() {
+        log.info("Permits totalAdded [{}] totalLaunched [{}] totalReleased [{}] totalFailed [{}] totalExpired [{}] totalRejected [{}] currBuffer [{}] ",
+                totalAdded.getAndSet(0), totalLaunched.getAndSet(0), totalReleased.getAndSet(0),
+                totalFailed.getAndSet(0), totalExpired.getAndSet(0), totalRejected.getAndSet(0),
+                concurrencyLevel.get());
+    }
+
+    @PreDestroy
+    public void stop() {
+        super.stop();
+    }
+
+    @Override
+    protected SettableFuture<ResultSet> create() {
+        return SettableFuture.create();
+    }
+
+    @Override
+    protected ResultSetFuture wrap(CassandraStatementTask task, SettableFuture<ResultSet> future) {
+        return new TbResultSetFuture(future);
+    }
+
+    @Override
+    protected ResultSetFuture execute(AsyncTaskContext<CassandraStatementTask, ResultSet> taskCtx) {
+        CassandraStatementTask task = taskCtx.getTask();
+        return task.getSession().executeAsync(task.getStatement());
+    }
+
+}
diff --git a/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraStatementTask.java b/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraStatementTask.java
new file mode 100644
index 0000000..ea13679
--- /dev/null
+++ b/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraStatementTask.java
@@ -0,0 +1,32 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.dao.nosql;
+
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.Statement;
+import lombok.Data;
+import org.thingsboard.server.dao.nosql.tmp.AsyncTask;
+
+/**
+ * Created by ashvayka on 24.10.18.
+ */
+@Data
+public class CassandraStatementTask implements AsyncTask {
+
+    private final Session session;
+    private final Statement statement;
+
+}
diff --git a/dao/src/main/java/org/thingsboard/server/dao/nosql/TbResultSetFuture.java b/dao/src/main/java/org/thingsboard/server/dao/nosql/TbResultSetFuture.java
new file mode 100644
index 0000000..574a5f5
--- /dev/null
+++ b/dao/src/main/java/org/thingsboard/server/dao/nosql/TbResultSetFuture.java
@@ -0,0 +1,94 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.dao.nosql;
+
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.ResultSetFuture;
+import com.google.common.util.concurrent.SettableFuture;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Created by ashvayka on 24.10.18.
+ */
+public class TbResultSetFuture implements ResultSetFuture {
+
+    private final SettableFuture<ResultSet> mainFuture;
+
+    public TbResultSetFuture(SettableFuture<ResultSet> mainFuture) {
+        this.mainFuture = mainFuture;
+    }
+
+    @Override
+    public ResultSet getUninterruptibly() {
+        return getSafe();
+    }
+
+    @Override
+    public ResultSet getUninterruptibly(long timeout, TimeUnit unit) throws TimeoutException {
+        return getSafe(timeout, unit);
+    }
+
+    @Override
+    public boolean cancel(boolean mayInterruptIfRunning) {
+        return mainFuture.cancel(mayInterruptIfRunning);
+    }
+
+    @Override
+    public boolean isCancelled() {
+        return mainFuture.isCancelled();
+    }
+
+    @Override
+    public boolean isDone() {
+        return mainFuture.isDone();
+    }
+
+    @Override
+    public ResultSet get() throws InterruptedException, ExecutionException {
+        return mainFuture.get();
+    }
+
+    @Override
+    public ResultSet get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+        return mainFuture.get(timeout, unit);
+    }
+
+    @Override
+    public void addListener(Runnable listener, Executor executor) {
+        mainFuture.addListener(listener, executor);
+    }
+
+    private ResultSet getSafe() {
+        try {
+            return mainFuture.get();
+        } catch (InterruptedException | ExecutionException e) {
+            throw new IllegalStateException(e);
+        }
+    }
+
+    private ResultSet getSafe(long timeout, TimeUnit unit) throws TimeoutException {
+        try {
+            return mainFuture.get(timeout, unit);
+        } catch (InterruptedException | ExecutionException e) {
+            throw new IllegalStateException(e);
+        }
+    }
+
+}
diff --git a/dao/src/main/java/org/thingsboard/server/dao/nosql/tmp/AbstractBufferedRateExecutor.java b/dao/src/main/java/org/thingsboard/server/dao/nosql/tmp/AbstractBufferedRateExecutor.java
new file mode 100644
index 0000000..9ad8e18
--- /dev/null
+++ b/dao/src/main/java/org/thingsboard/server/dao/nosql/tmp/AbstractBufferedRateExecutor.java
@@ -0,0 +1,169 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.dao.nosql.tmp;
+
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import lombok.extern.slf4j.Slf4j;
+
+import javax.annotation.Nullable;
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Created by ashvayka on 24.10.18.
+ */
+@Slf4j
+public abstract class AbstractBufferedRateExecutor<T extends AsyncTask, F extends ListenableFuture<V>, V> implements BufferedRateExecutor<T, F> {
+
+    private final long maxWaitTime;
+    private final long pollMs;
+    private final BlockingQueue<AsyncTaskContext<T, V>> queue;
+    private final ExecutorService dispatcherExecutor;
+    private final ExecutorService callbackExecutor;
+    private final ScheduledExecutorService timeoutExecutor;
+    private final int concurrencyLimit;
+
+    protected final AtomicInteger concurrencyLevel = new AtomicInteger();
+    protected final AtomicInteger totalAdded = new AtomicInteger();
+    protected final AtomicInteger totalLaunched = new AtomicInteger();
+    protected final AtomicInteger totalReleased = new AtomicInteger();
+    protected final AtomicInteger totalFailed = new AtomicInteger();
+    protected final AtomicInteger totalExpired = new AtomicInteger();
+    protected final AtomicInteger totalRejected = new AtomicInteger();
+
+    public AbstractBufferedRateExecutor(int queueLimit, int concurrencyLimit, long maxWaitTime, int dispatcherThreads, int callbackThreads, long pollMs) {
+        this.maxWaitTime = maxWaitTime;
+        this.pollMs = pollMs;
+        this.concurrencyLimit = concurrencyLimit;
+        this.queue = new LinkedBlockingDeque<>(queueLimit);
+        this.dispatcherExecutor = Executors.newFixedThreadPool(dispatcherThreads);
+        this.callbackExecutor = Executors.newFixedThreadPool(callbackThreads);
+        this.timeoutExecutor = Executors.newSingleThreadScheduledExecutor();
+        for (int i = 0; i < dispatcherThreads; i++) {
+            dispatcherExecutor.submit(this::dispatch);
+        }
+    }
+
+    @Override
+    public F submit(T task) {
+        SettableFuture<V> settableFuture = create();
+        F result = wrap(task, settableFuture);
+        try {
+            totalAdded.incrementAndGet();
+            queue.add(new AsyncTaskContext<>(UUID.randomUUID(), task, settableFuture, System.currentTimeMillis()));
+        } catch (IllegalStateException e) {
+            totalRejected.incrementAndGet();
+            settableFuture.setException(e);
+        }
+        return result;
+    }
+
+    public void stop() {
+        if (dispatcherExecutor != null) {
+            dispatcherExecutor.shutdownNow();
+        }
+        if (callbackExecutor != null) {
+            callbackExecutor.shutdownNow();
+        }
+        if (timeoutExecutor != null) {
+            timeoutExecutor.shutdownNow();
+        }
+    }
+
+    protected abstract SettableFuture<V> create();
+
+    protected abstract F wrap(T task, SettableFuture<V> future);
+
+    protected abstract ListenableFuture<V> execute(AsyncTaskContext<T, V> taskCtx);
+
+    private void dispatch() {
+        log.info("Buffered rate executor thread started");
+        while (!Thread.interrupted()) {
+            int curLvl = concurrencyLevel.get();
+            AsyncTaskContext<T, V> taskCtx = null;
+            try {
+                if (curLvl <= concurrencyLimit) {
+                    taskCtx = queue.take();
+                    final AsyncTaskContext<T, V> finalTaskCtx = taskCtx;
+                    logTask("Processing", finalTaskCtx);
+                    concurrencyLevel.incrementAndGet();
+                    long timeout = finalTaskCtx.getCreateTime() + maxWaitTime - System.currentTimeMillis();
+                    if (timeout > 0) {
+                        totalLaunched.incrementAndGet();
+                        ListenableFuture<V> result = execute(finalTaskCtx);
+                        result = Futures.withTimeout(result, timeout, TimeUnit.MILLISECONDS, timeoutExecutor);
+                        Futures.addCallback(result, new FutureCallback<V>() {
+                            @Override
+                            public void onSuccess(@Nullable V result) {
+                                logTask("Releasing", finalTaskCtx);
+                                totalReleased.incrementAndGet();
+                                concurrencyLevel.decrementAndGet();
+                                finalTaskCtx.getFuture().set(result);
+                            }
+
+                            @Override
+                            public void onFailure(Throwable t) {
+                                if (t instanceof TimeoutException) {
+                                    logTask("Expired During Execution", finalTaskCtx);
+                                } else {
+                                    logTask("Failed", finalTaskCtx);
+                                }
+                                totalFailed.incrementAndGet();
+                                concurrencyLevel.decrementAndGet();
+                                finalTaskCtx.getFuture().setException(t);
+                                log.debug("[{}] Failed to execute task: {}", finalTaskCtx.getId(), finalTaskCtx.getTask(), t);
+                            }
+                        }, callbackExecutor);
+                    } else {
+                        logTask("Expired Before Execution", finalTaskCtx);
+                        totalExpired.incrementAndGet();
+                        concurrencyLevel.decrementAndGet();
+                        taskCtx.getFuture().setException(new TimeoutException());
+                    }
+                } else {
+                    Thread.sleep(pollMs);
+                }
+            } catch (Throwable e) {
+                if (taskCtx != null) {
+                    log.debug("[{}] Failed to execute task: {}", taskCtx.getId(), taskCtx, e);
+                    totalFailed.incrementAndGet();
+                    concurrencyLevel.decrementAndGet();
+                } else {
+                    log.debug("Failed to queue task:", e);
+                }
+            }
+        }
+        log.info("Buffered rate executor thread stopped");
+    }
+
+    private void logTask(String action, AsyncTaskContext<T, V> taskCtx) {
+        if (log.isTraceEnabled()) {
+            log.trace("[{}] {} task: {}", taskCtx.getId(), action, taskCtx);
+        } else {
+            log.debug("[{}] {} task", taskCtx.getId(), action);
+        }
+    }
+}
diff --git a/dao/src/main/java/org/thingsboard/server/dao/nosql/tmp/AsyncTask.java b/dao/src/main/java/org/thingsboard/server/dao/nosql/tmp/AsyncTask.java
new file mode 100644
index 0000000..a16568b
--- /dev/null
+++ b/dao/src/main/java/org/thingsboard/server/dao/nosql/tmp/AsyncTask.java
@@ -0,0 +1,22 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.dao.nosql.tmp;
+
+/**
+ * Created by ashvayka on 24.10.18.
+ */
+public interface AsyncTask {
+}
diff --git a/dao/src/main/java/org/thingsboard/server/dao/nosql/tmp/AsyncTaskContext.java b/dao/src/main/java/org/thingsboard/server/dao/nosql/tmp/AsyncTaskContext.java
new file mode 100644
index 0000000..c3c98ee
--- /dev/null
+++ b/dao/src/main/java/org/thingsboard/server/dao/nosql/tmp/AsyncTaskContext.java
@@ -0,0 +1,34 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.dao.nosql.tmp;
+
+import com.google.common.util.concurrent.SettableFuture;
+import lombok.Data;
+
+import java.util.UUID;
+
+/**
+ * Created by ashvayka on 24.10.18.
+ */
+@Data
+public class AsyncTaskContext<T extends AsyncTask, V> {
+
+    private final UUID id;
+    private final T task;
+    private final SettableFuture<V> future;
+    private final long createTime;
+
+}
diff --git a/dao/src/main/java/org/thingsboard/server/dao/nosql/tmp/BufferedRateExecutor.java b/dao/src/main/java/org/thingsboard/server/dao/nosql/tmp/BufferedRateExecutor.java
new file mode 100644
index 0000000..7adc914
--- /dev/null
+++ b/dao/src/main/java/org/thingsboard/server/dao/nosql/tmp/BufferedRateExecutor.java
@@ -0,0 +1,27 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.dao.nosql.tmp;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+/**
+ * Created by ashvayka on 24.10.18.
+ */
+public interface BufferedRateExecutor<T extends AsyncTask, F extends ListenableFuture> {
+
+    F submit(T task);
+
+}