thingsboard-aplcache

Changes

Details

diff --git a/application/src/main/java/org/thingsboard/server/actors/plugin/PluginActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/plugin/PluginActorMessageProcessor.java
index 6e78e20..7dfe9a5 100644
--- a/application/src/main/java/org/thingsboard/server/actors/plugin/PluginActorMessageProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/plugin/PluginActorMessageProcessor.java
@@ -28,9 +28,14 @@ import org.thingsboard.server.common.data.plugin.ComponentType;
 import org.thingsboard.server.common.data.plugin.PluginMetaData;
 import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
 import org.thingsboard.server.common.msg.cluster.ServerAddress;
+import org.thingsboard.server.common.msg.core.BasicStatusCodeResponse;
+import org.thingsboard.server.common.msg.session.FromDeviceRequestMsg;
+import org.thingsboard.server.common.msg.session.MsgType;
 import org.thingsboard.server.extensions.api.plugins.Plugin;
 import org.thingsboard.server.extensions.api.plugins.PluginInitializationException;
 import org.thingsboard.server.extensions.api.plugins.msg.FromDeviceRpcResponse;
+import org.thingsboard.server.extensions.api.plugins.msg.ResponsePluginToRuleMsg;
+import org.thingsboard.server.extensions.api.plugins.msg.RuleToPluginMsg;
 import org.thingsboard.server.extensions.api.plugins.msg.TimeoutMsg;
 import org.thingsboard.server.extensions.api.plugins.rest.PluginRestMsg;
 import org.thingsboard.server.extensions.api.plugins.rpc.PluginRpcMsg;
@@ -98,7 +103,19 @@ public class PluginActorMessageProcessor extends ComponentMsgProcessor<PluginId>
 
     public void onRuleToPluginMsg(RuleToPluginMsgWrapper msg) throws RuleException {
         if (state == ComponentLifecycleState.ACTIVE) {
-            pluginImpl.process(trustedCtx, msg.getRuleTenantId(), msg.getRuleId(), msg.getMsg());
+            try {
+                pluginImpl.process(trustedCtx, msg.getRuleTenantId(), msg.getRuleId(), msg.getMsg());
+            } catch (Exception ex) {
+                RuleToPluginMsg ruleMsg = msg.getMsg();
+                MsgType responceMsgType = MsgType.RULE_ENGINE_ERROR;
+                Integer requestId = 0;
+                if (ruleMsg.getPayload() instanceof FromDeviceRequestMsg) {
+                    requestId = ((FromDeviceRequestMsg) ruleMsg.getPayload()).getRequestId();
+                }
+                trustedCtx.reply(
+                        new ResponsePluginToRuleMsg(ruleMsg.getUid(), tenantId, msg.getRuleId(),
+                                BasicStatusCodeResponse.onError(responceMsgType, requestId, ex)));
+            }
         } else {
             //TODO: reply with plugin suspended message
         }
diff --git a/application/src/main/java/org/thingsboard/server/actors/rule/RuleActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/rule/RuleActorMessageProcessor.java
index 2ebebfc..fd84fe9 100644
--- a/application/src/main/java/org/thingsboard/server/actors/rule/RuleActorMessageProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/rule/RuleActorMessageProcessor.java
@@ -15,8 +15,9 @@
  */
 package org.thingsboard.server.actors.rule;
 
-import java.util.*;
-
+import akka.actor.ActorContext;
+import akka.actor.ActorRef;
+import akka.event.LoggingAdapter;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import org.springframework.util.StringUtils;
 import org.thingsboard.server.actors.ActorSystemContext;
@@ -29,23 +30,17 @@ import org.thingsboard.server.common.data.plugin.ComponentLifecycleState;
 import org.thingsboard.server.common.data.plugin.PluginMetaData;
 import org.thingsboard.server.common.data.rule.RuleMetaData;
 import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
-import org.thingsboard.server.common.msg.core.BasicRequest;
 import org.thingsboard.server.common.msg.core.BasicStatusCodeResponse;
 import org.thingsboard.server.common.msg.core.RuleEngineError;
 import org.thingsboard.server.common.msg.device.ToDeviceActorMsg;
-import org.thingsboard.server.common.msg.session.MsgType;
 import org.thingsboard.server.common.msg.session.ToDeviceMsg;
-import org.thingsboard.server.common.msg.session.ex.ProcessingTimeoutException;
-import org.thingsboard.server.extensions.api.rules.*;
 import org.thingsboard.server.extensions.api.plugins.PluginAction;
 import org.thingsboard.server.extensions.api.plugins.msg.PluginToRuleMsg;
+import org.thingsboard.server.extensions.api.plugins.msg.ResponsePluginToRuleMsg;
 import org.thingsboard.server.extensions.api.plugins.msg.RuleToPluginMsg;
+import org.thingsboard.server.extensions.api.rules.*;
 
-import com.fasterxml.jackson.databind.JsonNode;
-
-import akka.actor.ActorContext;
-import akka.actor.ActorRef;
-import akka.event.LoggingAdapter;
+import java.util.*;
 
 class RuleActorMessageProcessor extends ComponentMsgProcessor<RuleId> {
 
@@ -190,18 +185,32 @@ class RuleActorMessageProcessor extends ComponentMsgProcessor<RuleId> {
         RuleProcessingMsg pendingMsg = pendingMsgMap.remove(msg.getUid());
         if (pendingMsg != null) {
             ChainProcessingContext ctx = pendingMsg.getCtx();
-            Optional<ToDeviceMsg> ruleResponseOptional = action.convert(msg);
-            if (ruleResponseOptional.isPresent()) {
-                ctx.mergeResponse(ruleResponseOptional.get());
-                pushToNextRule(context, ctx, null);
-            } else {
+            if (isErrorResponce(msg)) {
                 pushToNextRule(context, ctx, RuleEngineError.NO_RESPONSE_FROM_ACTIONS);
+            } else {
+                Optional<ToDeviceMsg> ruleResponseOptional = action.convert(msg);
+                if (ruleResponseOptional.isPresent()) {
+                    ctx.mergeResponse(ruleResponseOptional.get());
+                    pushToNextRule(context, ctx, null);
+                } else {
+                    pushToNextRule(context, ctx, RuleEngineError.NO_RESPONSE_FROM_ACTIONS);
+                }
             }
         } else {
             logger.warning("[{}] Processing timeout detected: [{}]", entityId, msg.getUid());
         }
     }
 
+    private boolean isErrorResponce(PluginToRuleMsg<?> msg) {
+        if (msg instanceof ResponsePluginToRuleMsg) {
+            if (((ResponsePluginToRuleMsg) msg).getPayload() instanceof BasicStatusCodeResponse) {
+                BasicStatusCodeResponse responce = (BasicStatusCodeResponse) ((ResponsePluginToRuleMsg) msg).getPayload();
+                return !responce.isSuccess();
+            }
+        }
+        return false;
+    }
+
     void onTimeoutMsg(ActorContext context, RuleToPluginTimeoutMsg msg) {
         RuleProcessingMsg pendingMsg = pendingMsgMap.remove(msg.getMsgId());
         if (pendingMsg != null) {
diff --git a/application/src/main/java/org/thingsboard/server/ThingsboardServerApplication.java b/application/src/main/java/org/thingsboard/server/ThingsboardServerApplication.java
index c3444d4..751bde6 100644
--- a/application/src/main/java/org/thingsboard/server/ThingsboardServerApplication.java
+++ b/application/src/main/java/org/thingsboard/server/ThingsboardServerApplication.java
@@ -19,6 +19,7 @@ import org.springframework.boot.SpringApplication;
 import org.springframework.boot.SpringBootConfiguration;
 import org.springframework.context.annotation.ComponentScan;
 import org.springframework.scheduling.annotation.EnableAsync;
+import org.springframework.scheduling.annotation.EnableScheduling;
 import springfox.documentation.swagger2.annotations.EnableSwagger2;
 
 import java.util.Arrays;
@@ -26,6 +27,7 @@ import java.util.Arrays;
 @SpringBootConfiguration
 @EnableAsync
 @EnableSwagger2
+@EnableScheduling
 @ComponentScan({"org.thingsboard.server"})
 public class ThingsboardServerApplication {
 
diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml
index 77f06e9..07face1 100644
--- a/application/src/main/resources/thingsboard.yml
+++ b/application/src/main/resources/thingsboard.yml
@@ -133,7 +133,7 @@ quota:
     intervalMin: 2
 
 database:
-  type: "${DATABASE_TYPE:sql}" # cassandra OR sql
+  type: "${DATABASE_TYPE:cassandra}" # cassandra OR sql
 
 # Cassandra driver configuration parameters
 cassandra:
@@ -181,6 +181,10 @@ cassandra:
     default_fetch_size: "${CASSANDRA_DEFAULT_FETCH_SIZE:2000}"
     # Specify partitioning size for timestamp key-value storage. Example MINUTES, HOURS, DAYS, MONTHS
     ts_key_value_partitioning: "${TS_KV_PARTITIONING:MONTHS}"
+    buffer_size: "${CASSANDRA_QUERY_BUFFER_SIZE:200000}"
+    concurrent_limit: "${CASSANDRA_QUERY_CONCURRENT_LIMIT:1000}"
+    permit_max_wait_time: "${PERMIT_MAX_WAIT_TIME:120000}"
+    rate_limit_print_interval_ms: "${CASSANDRA_QUERY_RATE_LIMIT_PRINT_MS:30000}"
 
 # SQL configuration parameters
 sql:
@@ -222,7 +226,7 @@ caffeine:
   specs:
     relations:
       timeToLiveInMinutes: 1440
-      maxSize: 100000
+      maxSize: 0
     deviceCredentials:
       timeToLiveInMinutes: 1440
       maxSize: 100000
diff --git a/dao/src/main/java/org/thingsboard/server/dao/asset/CassandraAssetDao.java b/dao/src/main/java/org/thingsboard/server/dao/asset/CassandraAssetDao.java
index 4f923fe..64ec718 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/asset/CassandraAssetDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/asset/CassandraAssetDao.java
@@ -148,7 +148,7 @@ public class CassandraAssetDao extends CassandraAbstractSearchTextDao<AssetEntit
         query.and(eq(ENTITY_SUBTYPE_TENANT_ID_PROPERTY, tenantId));
         query.and(eq(ENTITY_SUBTYPE_ENTITY_TYPE_PROPERTY, EntityType.ASSET));
         query.setConsistencyLevel(cluster.getDefaultReadConsistencyLevel());
-        ResultSetFuture resultSetFuture = getSession().executeAsync(query);
+        ResultSetFuture resultSetFuture = executeAsyncRead(query);
         return Futures.transform(resultSetFuture, new Function<ResultSet, List<EntitySubtype>>() {
             @Nullable
             @Override
diff --git a/dao/src/main/java/org/thingsboard/server/dao/attributes/CassandraBaseAttributesDao.java b/dao/src/main/java/org/thingsboard/server/dao/attributes/CassandraBaseAttributesDao.java
index 932d6b9..8ae9dc8 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/attributes/CassandraBaseAttributesDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/attributes/CassandraBaseAttributesDao.java
@@ -147,12 +147,12 @@ public class CassandraBaseAttributesDao extends CassandraAbstractAsyncDao implem
                 .and(eq(ATTRIBUTE_TYPE_COLUMN, attributeType))
                 .and(eq(ATTRIBUTE_KEY_COLUMN, key));
         log.debug("Remove request: {}", delete.toString());
-        return getFuture(getSession().executeAsync(delete), rs -> null);
+        return getFuture(executeAsyncWrite(delete), rs -> null);
     }
 
     private PreparedStatement getSaveStmt() {
         if (saveStmt == null) {
-            saveStmt = getSession().prepare("INSERT INTO " + ModelConstants.ATTRIBUTES_KV_CF +
+            saveStmt = prepare("INSERT INTO " + ModelConstants.ATTRIBUTES_KV_CF +
                     "(" + ENTITY_TYPE_COLUMN +
                     "," + ENTITY_ID_COLUMN +
                     "," + ATTRIBUTE_TYPE_COLUMN +
diff --git a/dao/src/main/java/org/thingsboard/server/dao/audit/CassandraAuditLogDao.java b/dao/src/main/java/org/thingsboard/server/dao/audit/CassandraAuditLogDao.java
index 27f7adc..fd02b5f 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/audit/CassandraAuditLogDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/audit/CassandraAuditLogDao.java
@@ -244,12 +244,12 @@ public class CassandraAuditLogDao extends CassandraAbstractSearchTimeDao<AuditLo
             values.add("?");
         }
         String statementString = INSERT_INTO + cfName + " (" + String.join(",", columnsList) + ") VALUES (" + values.toString() + ")";
-        return getSession().prepare(statementString);
+        return prepare(statementString);
     }
 
     private PreparedStatement getPartitionInsertStmt() {
         if (partitionInsertStmt == null) {
-            partitionInsertStmt = getSession().prepare(INSERT_INTO + ModelConstants.AUDIT_LOG_BY_TENANT_ID_PARTITIONS_CF +
+            partitionInsertStmt = prepare(INSERT_INTO + ModelConstants.AUDIT_LOG_BY_TENANT_ID_PARTITIONS_CF +
                     "(" + ModelConstants.AUDIT_LOG_TENANT_ID_PROPERTY +
                     "," + ModelConstants.AUDIT_LOG_PARTITION_PROPERTY + ")" +
                     " VALUES(?, ?)");
@@ -343,7 +343,7 @@ public class CassandraAuditLogDao extends CassandraAbstractSearchTimeDao<AuditLo
                 .where(eq(ModelConstants.AUDIT_LOG_TENANT_ID_PROPERTY, tenantId));
         select.and(QueryBuilder.gte(ModelConstants.PARTITION_COLUMN, minPartition));
         select.and(QueryBuilder.lte(ModelConstants.PARTITION_COLUMN, maxPartition));
-        return getSession().execute(select);
+        return executeRead(select);
     }
 
 }
diff --git a/dao/src/main/java/org/thingsboard/server/dao/component/CassandraBaseComponentDescriptorDao.java b/dao/src/main/java/org/thingsboard/server/dao/component/CassandraBaseComponentDescriptorDao.java
index 5e03545..b5b9f15 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/component/CassandraBaseComponentDescriptorDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/component/CassandraBaseComponentDescriptorDao.java
@@ -130,7 +130,7 @@ public class CassandraBaseComponentDescriptorDao extends CassandraAbstractSearch
     public boolean removeById(UUID key) {
         Statement delete = QueryBuilder.delete().all().from(ModelConstants.COMPONENT_DESCRIPTOR_BY_ID).where(eq(ModelConstants.ID_PROPERTY, key));
         log.debug("Remove request: {}", delete.toString());
-        return getSession().execute(delete).wasApplied();
+        return executeWrite(delete).wasApplied();
     }
 
     @Override
@@ -145,7 +145,7 @@ public class CassandraBaseComponentDescriptorDao extends CassandraAbstractSearch
         log.debug("Delete plugin meta-data entity by id [{}]", clazz);
         Statement delete = QueryBuilder.delete().all().from(getColumnFamilyName()).where(eq(ModelConstants.COMPONENT_DESCRIPTOR_CLASS_PROPERTY, clazz));
         log.debug("Remove request: {}", delete.toString());
-        ResultSet resultSet = getSession().execute(delete);
+        ResultSet resultSet = executeWrite(delete);
         log.debug("Delete result: [{}]", resultSet.wasApplied());
     }
 
diff --git a/dao/src/main/java/org/thingsboard/server/dao/device/CassandraDeviceDao.java b/dao/src/main/java/org/thingsboard/server/dao/device/CassandraDeviceDao.java
index ac72ae8..641c464 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/device/CassandraDeviceDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/device/CassandraDeviceDao.java
@@ -148,7 +148,7 @@ public class CassandraDeviceDao extends CassandraAbstractSearchTextDao<DeviceEnt
         query.and(eq(ENTITY_SUBTYPE_TENANT_ID_PROPERTY, tenantId));
         query.and(eq(ENTITY_SUBTYPE_ENTITY_TYPE_PROPERTY, EntityType.DEVICE));
         query.setConsistencyLevel(cluster.getDefaultReadConsistencyLevel());
-        ResultSetFuture resultSetFuture = getSession().executeAsync(query);
+        ResultSetFuture resultSetFuture = executeAsyncRead(query);
         return Futures.transform(resultSetFuture, new Function<ResultSet, List<EntitySubtype>>() {
             @Nullable
             @Override
diff --git a/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraAbstractDao.java b/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraAbstractDao.java
index 94299ca..5c93066 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraAbstractDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraAbstractDao.java
@@ -21,6 +21,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.thingsboard.server.dao.cassandra.CassandraCluster;
 import org.thingsboard.server.dao.model.type.*;
+import org.thingsboard.server.dao.util.BufferedRateLimiter;
 
 @Slf4j
 public abstract class CassandraAbstractDao {
@@ -28,12 +29,15 @@ public abstract class CassandraAbstractDao {
     @Autowired
     protected CassandraCluster cluster;
 
+    @Autowired
+    private BufferedRateLimiter rateLimiter;
+
     private Session session;
 
     private ConsistencyLevel defaultReadLevel;
     private ConsistencyLevel defaultWriteLevel;
 
-    protected Session getSession() {
+    private Session getSession() {
         if (session == null) {
             session = cluster.getSession();
             defaultReadLevel = cluster.getDefaultReadConsistencyLevel();
@@ -50,6 +54,10 @@ public abstract class CassandraAbstractDao {
         return session;
     }
 
+    protected PreparedStatement prepare(String query) {
+        return getSession().prepare(query);
+    }
+
     private void registerCodecIfNotFound(CodecRegistry registry, TypeCodec<?> codec) {
         try {
             registry.codecFor(codec.getCqlType(), codec.getJavaType());
@@ -76,10 +84,7 @@ public abstract class CassandraAbstractDao {
 
     private ResultSet execute(Statement statement, ConsistencyLevel level) {
         log.debug("Execute cassandra statement {}", statement);
-        if (statement.getConsistencyLevel() == null) {
-            statement.setConsistencyLevel(level);
-        }
-        return getSession().execute(statement);
+        return executeAsync(statement, level).getUninterruptibly();
     }
 
     private ResultSetFuture executeAsync(Statement statement, ConsistencyLevel level) {
@@ -87,6 +92,6 @@ public abstract class CassandraAbstractDao {
         if (statement.getConsistencyLevel() == null) {
             statement.setConsistencyLevel(level);
         }
-        return getSession().executeAsync(statement);
+        return new RateLimitedResultSetFuture(getSession(), rateLimiter, statement);
     }
 }
\ No newline at end of file
diff --git a/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraAbstractModelDao.java b/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraAbstractModelDao.java
index bad7b9e..000316b 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraAbstractModelDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraAbstractModelDao.java
@@ -60,7 +60,7 @@ public abstract class CassandraAbstractModelDao<E extends BaseEntity<D>, D> exte
         List<E> list = Collections.emptyList();
         if (statement != null) {
             statement.setConsistencyLevel(cluster.getDefaultReadConsistencyLevel());
-            ResultSet resultSet = getSession().execute(statement);
+            ResultSet resultSet = executeRead(statement);
             Result<E> result = getMapper().map(resultSet);
             if (result != null) {
                 list = result.all();
@@ -72,7 +72,7 @@ public abstract class CassandraAbstractModelDao<E extends BaseEntity<D>, D> exte
     protected ListenableFuture<List<D>> findListByStatementAsync(Statement statement) {
         if (statement != null) {
             statement.setConsistencyLevel(cluster.getDefaultReadConsistencyLevel());
-            ResultSetFuture resultSetFuture = getSession().executeAsync(statement);
+            ResultSetFuture resultSetFuture = executeAsyncRead(statement);
             return Futures.transform(resultSetFuture, new Function<ResultSet, List<D>>() {
                 @Nullable
                 @Override
@@ -94,7 +94,7 @@ public abstract class CassandraAbstractModelDao<E extends BaseEntity<D>, D> exte
         E object = null;
         if (statement != null) {
             statement.setConsistencyLevel(cluster.getDefaultReadConsistencyLevel());
-            ResultSet resultSet = getSession().execute(statement);
+            ResultSet resultSet = executeRead(statement);
             Result<E> result = getMapper().map(resultSet);
             if (result != null) {
                 object = result.one();
@@ -106,7 +106,7 @@ public abstract class CassandraAbstractModelDao<E extends BaseEntity<D>, D> exte
     protected ListenableFuture<D> findOneByStatementAsync(Statement statement) {
         if (statement != null) {
             statement.setConsistencyLevel(cluster.getDefaultReadConsistencyLevel());
-            ResultSetFuture resultSetFuture = getSession().executeAsync(statement);
+            ResultSetFuture resultSetFuture = executeAsyncRead(statement);
             return Futures.transform(resultSetFuture, new Function<ResultSet, D>() {
                 @Nullable
                 @Override
@@ -181,7 +181,7 @@ public abstract class CassandraAbstractModelDao<E extends BaseEntity<D>, D> exte
     public boolean removeById(UUID key) {
         Statement delete = QueryBuilder.delete().all().from(getColumnFamilyName()).where(eq(ModelConstants.ID_PROPERTY, key));
         log.debug("Remove request: {}", delete.toString());
-        return getSession().execute(delete).wasApplied();
+        return executeWrite(delete).wasApplied();
     }
 
     @Override
diff --git a/dao/src/main/java/org/thingsboard/server/dao/nosql/RateLimitedResultSetFuture.java b/dao/src/main/java/org/thingsboard/server/dao/nosql/RateLimitedResultSetFuture.java
new file mode 100644
index 0000000..2674c6d
--- /dev/null
+++ b/dao/src/main/java/org/thingsboard/server/dao/nosql/RateLimitedResultSetFuture.java
@@ -0,0 +1,148 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.dao.nosql;
+
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.Statement;
+import com.google.common.base.Function;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.Uninterruptibles;
+import org.thingsboard.server.dao.util.AsyncRateLimiter;
+
+import javax.annotation.Nullable;
+import java.util.concurrent.*;
+
+public class RateLimitedResultSetFuture implements ResultSetFuture {
+
+    private final ListenableFuture<ResultSetFuture> originalFuture;
+    private final ListenableFuture<Void> rateLimitFuture;
+
+    public RateLimitedResultSetFuture(Session session, AsyncRateLimiter rateLimiter, Statement statement) {
+        this.rateLimitFuture = rateLimiter.acquireAsync();
+        this.originalFuture = Futures.transform(rateLimitFuture,
+                (Function<Void, ResultSetFuture>) i -> executeAsyncWithRelease(rateLimiter, session, statement));
+    }
+
+    @Override
+    public ResultSet getUninterruptibly() {
+        return safeGet().getUninterruptibly();
+    }
+
+    @Override
+    public ResultSet getUninterruptibly(long timeout, TimeUnit unit) throws TimeoutException {
+        long rateLimitStart = System.nanoTime();
+        ResultSetFuture resultSetFuture = null;
+        try {
+            resultSetFuture = originalFuture.get(timeout, unit);
+        } catch (InterruptedException | ExecutionException e) {
+            throw new IllegalStateException(e);
+        }
+        long rateLimitDurationNano = System.nanoTime() - rateLimitStart;
+        long innerTimeoutNano = unit.toNanos(timeout) - rateLimitDurationNano;
+        if (innerTimeoutNano > 0) {
+            return resultSetFuture.getUninterruptibly(innerTimeoutNano, TimeUnit.NANOSECONDS);
+        }
+        throw new TimeoutException("Timeout waiting for task.");
+    }
+
+    @Override
+    public boolean cancel(boolean mayInterruptIfRunning) {
+        if (originalFuture.isDone()) {
+            return safeGet().cancel(mayInterruptIfRunning);
+        } else {
+            return originalFuture.cancel(mayInterruptIfRunning);
+        }
+    }
+
+    @Override
+    public boolean isCancelled() {
+        if (originalFuture.isDone()) {
+            return safeGet().isCancelled();
+        }
+
+        return originalFuture.isCancelled();
+    }
+
+    @Override
+    public boolean isDone() {
+        return originalFuture.isDone() && safeGet().isDone();
+    }
+
+    @Override
+    public ResultSet get() throws InterruptedException, ExecutionException {
+        return safeGet().get();
+    }
+
+    @Override
+    public ResultSet get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+        long rateLimitStart = System.nanoTime();
+        ResultSetFuture resultSetFuture = originalFuture.get(timeout, unit);
+        long rateLimitDurationNano = System.nanoTime() - rateLimitStart;
+        long innerTimeoutNano = unit.toNanos(timeout) - rateLimitDurationNano;
+        if (innerTimeoutNano > 0) {
+            return resultSetFuture.get(innerTimeoutNano, TimeUnit.NANOSECONDS);
+        }
+        throw new TimeoutException("Timeout waiting for task.");
+    }
+
+    @Override
+    public void addListener(Runnable listener, Executor executor) {
+        originalFuture.addListener(() -> {
+            try {
+                ResultSetFuture resultSetFuture = Uninterruptibles.getUninterruptibly(originalFuture);
+                resultSetFuture.addListener(listener, executor);
+            } catch (CancellationException e) {
+                cancel(false);
+                return;
+            } catch (ExecutionException e) {
+                Futures.immediateFailedFuture(e).addListener(listener, executor);
+            }
+        }, executor);
+    }
+
+    private ResultSetFuture safeGet() {
+        try {
+            return originalFuture.get();
+        } catch (InterruptedException | ExecutionException e) {
+            throw new IllegalStateException(e);
+        }
+    }
+
+    private ResultSetFuture executeAsyncWithRelease(AsyncRateLimiter rateLimiter, Session session, Statement statement) {
+        try {
+            ResultSetFuture resultSetFuture = session.executeAsync(statement);
+            Futures.addCallback(resultSetFuture, new FutureCallback<ResultSet>() {
+                @Override
+                public void onSuccess(@Nullable ResultSet result) {
+                    rateLimiter.release();
+                }
+
+                @Override
+                public void onFailure(Throwable t) {
+                    rateLimiter.release();
+                }
+            });
+            return resultSetFuture;
+        } catch (RuntimeException re) {
+            rateLimiter.release();
+            throw re;
+        }
+    }
+}
diff --git a/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationDao.java b/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationDao.java
index 9e25241..55838d6 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationDao.java
@@ -242,7 +242,7 @@ public class BaseRelationDao extends CassandraAbstractAsyncDao implements Relati
 
     private PreparedStatement getSaveStmt() {
         if (saveStmt == null) {
-            saveStmt = getSession().prepare("INSERT INTO " + ModelConstants.RELATION_COLUMN_FAMILY_NAME + " " +
+            saveStmt = prepare("INSERT INTO " + ModelConstants.RELATION_COLUMN_FAMILY_NAME + " " +
                     "(" + ModelConstants.RELATION_FROM_ID_PROPERTY +
                     "," + ModelConstants.RELATION_FROM_TYPE_PROPERTY +
                     "," + ModelConstants.RELATION_TO_ID_PROPERTY +
@@ -257,7 +257,7 @@ public class BaseRelationDao extends CassandraAbstractAsyncDao implements Relati
 
     private PreparedStatement getDeleteStmt() {
         if (deleteStmt == null) {
-            deleteStmt = getSession().prepare("DELETE FROM " + ModelConstants.RELATION_COLUMN_FAMILY_NAME +
+            deleteStmt = prepare("DELETE FROM " + ModelConstants.RELATION_COLUMN_FAMILY_NAME +
                     WHERE + ModelConstants.RELATION_FROM_ID_PROPERTY + " = ?" +
                     AND + ModelConstants.RELATION_FROM_TYPE_PROPERTY + " = ?" +
                     AND + ModelConstants.RELATION_TO_ID_PROPERTY + " = ?" +
@@ -270,7 +270,7 @@ public class BaseRelationDao extends CassandraAbstractAsyncDao implements Relati
 
     private PreparedStatement getDeleteAllByEntityStmt() {
         if (deleteAllByEntityStmt == null) {
-            deleteAllByEntityStmt = getSession().prepare("DELETE FROM " + ModelConstants.RELATION_COLUMN_FAMILY_NAME +
+            deleteAllByEntityStmt = prepare("DELETE FROM " + ModelConstants.RELATION_COLUMN_FAMILY_NAME +
                     WHERE + ModelConstants.RELATION_FROM_ID_PROPERTY + " = ?" +
                     AND + ModelConstants.RELATION_FROM_TYPE_PROPERTY + " = ?");
         }
@@ -279,7 +279,7 @@ public class BaseRelationDao extends CassandraAbstractAsyncDao implements Relati
 
     private PreparedStatement getFindAllByFromStmt() {
         if (findAllByFromStmt == null) {
-            findAllByFromStmt = getSession().prepare(SELECT_COLUMNS + " " +
+            findAllByFromStmt = prepare(SELECT_COLUMNS + " " +
                     FROM + ModelConstants.RELATION_COLUMN_FAMILY_NAME + " " +
                     WHERE + ModelConstants.RELATION_FROM_ID_PROPERTY + EQUAL_TO_PARAM +
                     AND + ModelConstants.RELATION_FROM_TYPE_PROPERTY + EQUAL_TO_PARAM +
@@ -290,7 +290,7 @@ public class BaseRelationDao extends CassandraAbstractAsyncDao implements Relati
 
     private PreparedStatement getFindAllByFromAndTypeStmt() {
         if (findAllByFromAndTypeStmt == null) {
-            findAllByFromAndTypeStmt = getSession().prepare(SELECT_COLUMNS + " " +
+            findAllByFromAndTypeStmt = prepare(SELECT_COLUMNS + " " +
                     FROM + ModelConstants.RELATION_COLUMN_FAMILY_NAME + " " +
                     WHERE + ModelConstants.RELATION_FROM_ID_PROPERTY + EQUAL_TO_PARAM +
                     AND + ModelConstants.RELATION_FROM_TYPE_PROPERTY + EQUAL_TO_PARAM +
@@ -303,7 +303,7 @@ public class BaseRelationDao extends CassandraAbstractAsyncDao implements Relati
 
     private PreparedStatement getFindAllByToStmt() {
         if (findAllByToStmt == null) {
-            findAllByToStmt = getSession().prepare(SELECT_COLUMNS + " " +
+            findAllByToStmt = prepare(SELECT_COLUMNS + " " +
                     FROM + ModelConstants.RELATION_REVERSE_VIEW_NAME + " " +
                     WHERE + ModelConstants.RELATION_TO_ID_PROPERTY + EQUAL_TO_PARAM +
                     AND + ModelConstants.RELATION_TO_TYPE_PROPERTY + EQUAL_TO_PARAM +
@@ -314,7 +314,7 @@ public class BaseRelationDao extends CassandraAbstractAsyncDao implements Relati
 
     private PreparedStatement getFindAllByToAndTypeStmt() {
         if (findAllByToAndTypeStmt == null) {
-            findAllByToAndTypeStmt = getSession().prepare(SELECT_COLUMNS + " " +
+            findAllByToAndTypeStmt = prepare(SELECT_COLUMNS + " " +
                     FROM + ModelConstants.RELATION_REVERSE_VIEW_NAME + " " +
                     WHERE + ModelConstants.RELATION_TO_ID_PROPERTY + EQUAL_TO_PARAM +
                     AND + ModelConstants.RELATION_TO_TYPE_PROPERTY + EQUAL_TO_PARAM +
@@ -327,7 +327,7 @@ public class BaseRelationDao extends CassandraAbstractAsyncDao implements Relati
 
     private PreparedStatement getCheckRelationStmt() {
         if (checkRelationStmt == null) {
-            checkRelationStmt = getSession().prepare(SELECT_COLUMNS + " " +
+            checkRelationStmt = prepare(SELECT_COLUMNS + " " +
                     FROM + ModelConstants.RELATION_COLUMN_FAMILY_NAME + " " +
                     WHERE + ModelConstants.RELATION_FROM_ID_PROPERTY + EQUAL_TO_PARAM +
                     AND + ModelConstants.RELATION_FROM_TYPE_PROPERTY + EQUAL_TO_PARAM +
diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java
index d620e11..aba3eef 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java
@@ -73,7 +73,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
 
     private PreparedStatement partitionInsertStmt;
     private PreparedStatement partitionInsertTtlStmt;
-    private PreparedStatement[] latestInsertStmts;
+    private PreparedStatement latestInsertStmt;
     private PreparedStatement[] saveStmts;
     private PreparedStatement[] saveTtlStmts;
     private PreparedStatement[] fetchStmts;
@@ -306,13 +306,15 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
 
     @Override
     public ListenableFuture<Void> saveLatest(EntityId entityId, TsKvEntry tsKvEntry) {
-        DataType type = tsKvEntry.getDataType();
-        BoundStatement stmt = getLatestStmt(type).bind()
+        BoundStatement stmt = getLatestStmt().bind()
                 .setString(0, entityId.getEntityType().name())
                 .setUUID(1, entityId.getId())
                 .setString(2, tsKvEntry.getKey())
-                .setLong(3, tsKvEntry.getTs());
-        addValue(tsKvEntry, stmt, 4);
+                .setLong(3, tsKvEntry.getTs())
+                .set(4, tsKvEntry.getBooleanValue().orElse(null), Boolean.class)
+                .set(5, tsKvEntry.getStrValue().orElse(null), String.class)
+                .set(6, tsKvEntry.getLongValue().orElse(null), Long.class)
+                .set(7, tsKvEntry.getDoubleValue().orElse(null), Double.class);
         return getFuture(executeAsyncWrite(stmt), rs -> null);
     }
 
@@ -381,7 +383,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
         if (saveStmts == null) {
             saveStmts = new PreparedStatement[DataType.values().length];
             for (DataType type : DataType.values()) {
-                saveStmts[type.ordinal()] = getSession().prepare(INSERT_INTO + ModelConstants.TS_KV_CF +
+                saveStmts[type.ordinal()] = prepare(INSERT_INTO + ModelConstants.TS_KV_CF +
                         "(" + ModelConstants.ENTITY_TYPE_COLUMN +
                         "," + ModelConstants.ENTITY_ID_COLUMN +
                         "," + ModelConstants.KEY_COLUMN +
@@ -398,7 +400,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
         if (saveTtlStmts == null) {
             saveTtlStmts = new PreparedStatement[DataType.values().length];
             for (DataType type : DataType.values()) {
-                saveTtlStmts[type.ordinal()] = getSession().prepare(INSERT_INTO + ModelConstants.TS_KV_CF +
+                saveTtlStmts[type.ordinal()] = prepare(INSERT_INTO + ModelConstants.TS_KV_CF +
                         "(" + ModelConstants.ENTITY_TYPE_COLUMN +
                         "," + ModelConstants.ENTITY_ID_COLUMN +
                         "," + ModelConstants.KEY_COLUMN +
@@ -420,7 +422,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
                 } else if (type == Aggregation.AVG && fetchStmts[Aggregation.SUM.ordinal()] != null) {
                     fetchStmts[type.ordinal()] = fetchStmts[Aggregation.SUM.ordinal()];
                 } else {
-                    fetchStmts[type.ordinal()] = getSession().prepare(SELECT_PREFIX +
+                    fetchStmts[type.ordinal()] = prepare(SELECT_PREFIX +
                             String.join(", ", ModelConstants.getFetchColumnNames(type)) + " FROM " + ModelConstants.TS_KV_CF
                             + " WHERE " + ModelConstants.ENTITY_TYPE_COLUMN + EQUALS_PARAM
                             + "AND " + ModelConstants.ENTITY_ID_COLUMN + EQUALS_PARAM
@@ -435,26 +437,29 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
         return fetchStmts[aggType.ordinal()];
     }
 
-    private PreparedStatement getLatestStmt(DataType dataType) {
-        if (latestInsertStmts == null) {
-            latestInsertStmts = new PreparedStatement[DataType.values().length];
-            for (DataType type : DataType.values()) {
-                latestInsertStmts[type.ordinal()] = getSession().prepare(INSERT_INTO + ModelConstants.TS_KV_LATEST_CF +
-                        "(" + ModelConstants.ENTITY_TYPE_COLUMN +
-                        "," + ModelConstants.ENTITY_ID_COLUMN +
-                        "," + ModelConstants.KEY_COLUMN +
-                        "," + ModelConstants.TS_COLUMN +
-                        "," + getColumnName(type) + ")" +
-                        " VALUES(?, ?, ?, ?, ?)");
-            }
+    private PreparedStatement getLatestStmt() {
+        if (latestInsertStmt == null) {
+//            latestInsertStmt = new PreparedStatement[DataType.values().length];
+//            for (DataType type : DataType.values()) {
+            latestInsertStmt = prepare(INSERT_INTO + ModelConstants.TS_KV_LATEST_CF +
+                    "(" + ModelConstants.ENTITY_TYPE_COLUMN +
+                    "," + ModelConstants.ENTITY_ID_COLUMN +
+                    "," + ModelConstants.KEY_COLUMN +
+                    "," + ModelConstants.TS_COLUMN +
+                    "," + ModelConstants.BOOLEAN_VALUE_COLUMN +
+                    "," + ModelConstants.STRING_VALUE_COLUMN +
+                    "," + ModelConstants.LONG_VALUE_COLUMN +
+                    "," + ModelConstants.DOUBLE_VALUE_COLUMN + ")" +
+                    " VALUES(?, ?, ?, ?, ?)");
+//            }
         }
-        return latestInsertStmts[dataType.ordinal()];
+        return latestInsertStmt;
     }
 
 
     private PreparedStatement getPartitionInsertStmt() {
         if (partitionInsertStmt == null) {
-            partitionInsertStmt = getSession().prepare(INSERT_INTO + ModelConstants.TS_KV_PARTITIONS_CF +
+            partitionInsertStmt = prepare(INSERT_INTO + ModelConstants.TS_KV_PARTITIONS_CF +
                     "(" + ModelConstants.ENTITY_TYPE_COLUMN +
                     "," + ModelConstants.ENTITY_ID_COLUMN +
                     "," + ModelConstants.PARTITION_COLUMN +
@@ -466,7 +471,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
 
     private PreparedStatement getPartitionInsertTtlStmt() {
         if (partitionInsertTtlStmt == null) {
-            partitionInsertTtlStmt = getSession().prepare(INSERT_INTO + ModelConstants.TS_KV_PARTITIONS_CF +
+            partitionInsertTtlStmt = prepare(INSERT_INTO + ModelConstants.TS_KV_PARTITIONS_CF +
                     "(" + ModelConstants.ENTITY_TYPE_COLUMN +
                     "," + ModelConstants.ENTITY_ID_COLUMN +
                     "," + ModelConstants.PARTITION_COLUMN +
@@ -479,7 +484,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
 
     private PreparedStatement getFindLatestStmt() {
         if (findLatestStmt == null) {
-            findLatestStmt = getSession().prepare(SELECT_PREFIX +
+            findLatestStmt = prepare(SELECT_PREFIX +
                     ModelConstants.KEY_COLUMN + "," +
                     ModelConstants.TS_COLUMN + "," +
                     ModelConstants.STRING_VALUE_COLUMN + "," +
@@ -496,7 +501,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
 
     private PreparedStatement getFindAllLatestStmt() {
         if (findAllLatestStmt == null) {
-            findAllLatestStmt = getSession().prepare(SELECT_PREFIX +
+            findAllLatestStmt = prepare(SELECT_PREFIX +
                     ModelConstants.KEY_COLUMN + "," +
                     ModelConstants.TS_COLUMN + "," +
                     ModelConstants.STRING_VALUE_COLUMN + "," +
diff --git a/dao/src/main/java/org/thingsboard/server/dao/util/AsyncRateLimiter.java b/dao/src/main/java/org/thingsboard/server/dao/util/AsyncRateLimiter.java
new file mode 100644
index 0000000..6fb21d6
--- /dev/null
+++ b/dao/src/main/java/org/thingsboard/server/dao/util/AsyncRateLimiter.java
@@ -0,0 +1,25 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.dao.util;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+public interface AsyncRateLimiter {
+
+    ListenableFuture<Void> acquireAsync();
+
+    void release();
+}
diff --git a/dao/src/main/java/org/thingsboard/server/dao/util/BufferedRateLimiter.java b/dao/src/main/java/org/thingsboard/server/dao/util/BufferedRateLimiter.java
new file mode 100644
index 0000000..de07dbf
--- /dev/null
+++ b/dao/src/main/java/org/thingsboard/server/dao/util/BufferedRateLimiter.java
@@ -0,0 +1,160 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.dao.util;
+
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Component;
+
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
+
+@Component
+@Slf4j
+public class BufferedRateLimiter implements AsyncRateLimiter {
+
+    private final ListeningExecutorService pool = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));
+
+    private final int permitsLimit;
+    private final int maxPermitWaitTime;
+    private final AtomicInteger permits;
+    private final BlockingQueue<LockedFuture> queue;
+
+    private final AtomicInteger maxQueueSize = new AtomicInteger();
+    private final AtomicInteger maxGrantedPermissions = new AtomicInteger();
+
+    public BufferedRateLimiter(@Value("${cassandra.query.buffer_size}") int queueLimit,
+                               @Value("${cassandra.query.concurrent_limit}") int permitsLimit,
+                               @Value("${cassandra.query.permit_max_wait_time}") int maxPermitWaitTime) {
+        this.permitsLimit = permitsLimit;
+        this.maxPermitWaitTime = maxPermitWaitTime;
+        this.permits = new AtomicInteger();
+        this.queue = new LinkedBlockingQueue<>(queueLimit);
+    }
+
+    @Override
+    public ListenableFuture<Void> acquireAsync() {
+        if (queue.isEmpty()) {
+            if (permits.incrementAndGet() <= permitsLimit) {
+                if (permits.get() > maxGrantedPermissions.get()) {
+                    maxGrantedPermissions.set(permits.get());
+                }
+                return Futures.immediateFuture(null);
+            }
+            permits.decrementAndGet();
+        }
+
+        return putInQueue();
+    }
+
+    @Override
+    public void release() {
+        permits.decrementAndGet();
+        reprocessQueue();
+    }
+
+    private void reprocessQueue() {
+        while (permits.get() < permitsLimit) {
+            if (permits.incrementAndGet() <= permitsLimit) {
+                if (permits.get() > maxGrantedPermissions.get()) {
+                    maxGrantedPermissions.set(permits.get());
+                }
+                LockedFuture lockedFuture = queue.poll();
+                if (lockedFuture != null) {
+                    lockedFuture.latch.countDown();
+                } else {
+                    permits.decrementAndGet();
+                    break;
+                }
+            } else {
+                permits.decrementAndGet();
+            }
+        }
+    }
+
+    private LockedFuture createLockedFuture() {
+        CountDownLatch latch = new CountDownLatch(1);
+        ListenableFuture<Void> future = pool.submit(() -> {
+            latch.await();
+            return null;
+        });
+        return new LockedFuture(latch, future, System.currentTimeMillis());
+    }
+
+    private ListenableFuture<Void> putInQueue() {
+
+        int size = queue.size();
+        if (size > maxQueueSize.get()) {
+            maxQueueSize.set(size);
+        }
+
+        if (queue.remainingCapacity() > 0) {
+            try {
+                LockedFuture lockedFuture = createLockedFuture();
+                if (!queue.offer(lockedFuture, 1, TimeUnit.SECONDS)) {
+                    lockedFuture.cancelFuture();
+                    return Futures.immediateFailedFuture(new IllegalStateException("Rate Limit Buffer is full. Reject"));
+                }
+                return lockedFuture.future;
+            } catch (InterruptedException e) {
+                return Futures.immediateFailedFuture(new IllegalStateException("Rate Limit Task interrupted. Reject"));
+            }
+        }
+        return Futures.immediateFailedFuture(new IllegalStateException("Rate Limit Buffer is full. Reject"));
+    }
+
+    @Scheduled(fixedDelayString = "${cassandra.query.rate_limit_print_interval_ms}")
+    public void printStats() {
+        int expiredCount = 0;
+        for (LockedFuture lockedFuture : queue) {
+            if (lockedFuture.isExpired()) {
+                lockedFuture.cancelFuture();
+                expiredCount++;
+            }
+        }
+        log.info("Permits maxBuffer is [{}] max concurrent [{}] expired [{}]", maxQueueSize.getAndSet(0),
+                maxGrantedPermissions.getAndSet(0), expiredCount);
+    }
+
+    private class LockedFuture {
+        final CountDownLatch latch;
+        final ListenableFuture<Void> future;
+        final long createTime;
+
+        public LockedFuture(CountDownLatch latch, ListenableFuture<Void> future, long createTime) {
+            this.latch = latch;
+            this.future = future;
+            this.createTime = createTime;
+        }
+
+        void cancelFuture() {
+            future.cancel(false);
+            latch.countDown();
+        }
+
+        boolean isExpired() {
+            return (System.currentTimeMillis() - createTime) > maxPermitWaitTime;
+        }
+
+    }
+
+
+}
diff --git a/dao/src/test/java/org/thingsboard/server/dao/nosql/RateLimitedResultSetFutureTest.java b/dao/src/test/java/org/thingsboard/server/dao/nosql/RateLimitedResultSetFutureTest.java
new file mode 100644
index 0000000..fa62c2b
--- /dev/null
+++ b/dao/src/test/java/org/thingsboard/server/dao/nosql/RateLimitedResultSetFutureTest.java
@@ -0,0 +1,156 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.dao.nosql;
+
+import com.datastax.driver.core.*;
+import com.datastax.driver.core.exceptions.UnsupportedFeatureException;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.runners.MockitoJUnitRunner;
+import org.mockito.stubbing.Answer;
+import org.thingsboard.server.dao.util.AsyncRateLimiter;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+@RunWith(MockitoJUnitRunner.class)
+public class RateLimitedResultSetFutureTest {
+
+    private RateLimitedResultSetFuture resultSetFuture;
+
+    @Mock
+    private AsyncRateLimiter rateLimiter;
+    @Mock
+    private Session session;
+    @Mock
+    private Statement statement;
+    @Mock
+    private ResultSetFuture realFuture;
+    @Mock
+    private ResultSet rows;
+    @Mock
+    private Row row;
+
+    @Test
+    public void doNotReleasePermissionIfRateLimitFutureFailed() throws InterruptedException {
+        when(rateLimiter.acquireAsync()).thenReturn(Futures.immediateFailedFuture(new IllegalArgumentException()));
+        resultSetFuture = new RateLimitedResultSetFuture(session, rateLimiter, statement);
+        Thread.sleep(1000L);
+        verify(rateLimiter).acquireAsync();
+        try {
+            assertTrue(resultSetFuture.isDone());
+            fail();
+        } catch (Exception e) {
+            assertTrue(e instanceof IllegalStateException);
+            Throwable actualCause = e.getCause();
+            assertTrue(actualCause instanceof ExecutionException);
+        }
+        verifyNoMoreInteractions(session, rateLimiter, statement);
+
+    }
+
+    @Test
+    public void getUninterruptiblyDelegateToCassandra() throws InterruptedException, ExecutionException {
+        when(rateLimiter.acquireAsync()).thenReturn(Futures.immediateFuture(null));
+        when(session.executeAsync(statement)).thenReturn(realFuture);
+        Mockito.doAnswer((Answer<Void>) invocation -> {
+            Object[] args = invocation.getArguments();
+            Runnable task = (Runnable) args[0];
+            task.run();
+            return null;
+        }).when(realFuture).addListener(Mockito.any(), Mockito.any());
+
+        when(realFuture.getUninterruptibly()).thenReturn(rows);
+
+        resultSetFuture = new RateLimitedResultSetFuture(session, rateLimiter, statement);
+        ResultSet actual = resultSetFuture.getUninterruptibly();
+        assertSame(rows, actual);
+        verify(rateLimiter, times(1)).acquireAsync();
+        verify(rateLimiter, times(1)).release();
+    }
+
+    @Test
+    public void addListenerAllowsFutureTransformation() throws InterruptedException, ExecutionException {
+        when(rateLimiter.acquireAsync()).thenReturn(Futures.immediateFuture(null));
+        when(session.executeAsync(statement)).thenReturn(realFuture);
+        Mockito.doAnswer((Answer<Void>) invocation -> {
+            Object[] args = invocation.getArguments();
+            Runnable task = (Runnable) args[0];
+            task.run();
+            return null;
+        }).when(realFuture).addListener(Mockito.any(), Mockito.any());
+
+        when(realFuture.get()).thenReturn(rows);
+        when(rows.one()).thenReturn(row);
+
+        resultSetFuture = new RateLimitedResultSetFuture(session, rateLimiter, statement);
+
+        ListenableFuture<Row> transform = Futures.transform(resultSetFuture, ResultSet::one);
+        Row actualRow = transform.get();
+
+        assertSame(row, actualRow);
+        verify(rateLimiter, times(1)).acquireAsync();
+        verify(rateLimiter, times(1)).release();
+    }
+
+    @Test
+    public void immidiateCassandraExceptionReturnsPermit() throws InterruptedException, ExecutionException {
+        when(rateLimiter.acquireAsync()).thenReturn(Futures.immediateFuture(null));
+        when(session.executeAsync(statement)).thenThrow(new UnsupportedFeatureException(ProtocolVersion.V3, "hjg"));
+        resultSetFuture = new RateLimitedResultSetFuture(session, rateLimiter, statement);
+        ListenableFuture<Row> transform = Futures.transform(resultSetFuture, ResultSet::one);
+        try {
+            transform.get();
+            fail();
+        } catch (Exception e) {
+            assertTrue(e instanceof ExecutionException);
+        }
+        verify(rateLimiter, times(1)).acquireAsync();
+        verify(rateLimiter, times(1)).release();
+    }
+
+    @Test
+    public void queryTimeoutReturnsPermit() throws InterruptedException, ExecutionException {
+        when(rateLimiter.acquireAsync()).thenReturn(Futures.immediateFuture(null));
+        when(session.executeAsync(statement)).thenReturn(realFuture);
+        Mockito.doAnswer((Answer<Void>) invocation -> {
+            Object[] args = invocation.getArguments();
+            Runnable task = (Runnable) args[0];
+            task.run();
+            return null;
+        }).when(realFuture).addListener(Mockito.any(), Mockito.any());
+
+        when(realFuture.get()).thenThrow(new ExecutionException("Fail", new TimeoutException("timeout")));
+        resultSetFuture = new RateLimitedResultSetFuture(session, rateLimiter, statement);
+        ListenableFuture<Row> transform = Futures.transform(resultSetFuture, ResultSet::one);
+        try {
+            transform.get();
+            fail();
+        } catch (Exception e) {
+            assertTrue(e instanceof ExecutionException);
+        }
+        verify(rateLimiter, times(1)).acquireAsync();
+        verify(rateLimiter, times(1)).release();
+    }
+
+}
\ No newline at end of file
diff --git a/dao/src/test/java/org/thingsboard/server/dao/util/BufferedRateLimiterTest.java b/dao/src/test/java/org/thingsboard/server/dao/util/BufferedRateLimiterTest.java
new file mode 100644
index 0000000..5bfc3b6
--- /dev/null
+++ b/dao/src/test/java/org/thingsboard/server/dao/util/BufferedRateLimiterTest.java
@@ -0,0 +1,134 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.dao.util;
+
+import com.google.common.util.concurrent.*;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.Assert.*;
+
+
+public class BufferedRateLimiterTest {
+
+    @Test
+    public void finishedFutureReturnedIfPermitsAreGranted() {
+        BufferedRateLimiter limiter = new BufferedRateLimiter(10, 10, 100);
+        ListenableFuture<Void> actual = limiter.acquireAsync();
+        assertTrue(actual.isDone());
+    }
+
+    @Test
+    public void notFinishedFutureReturnedIfPermitsAreNotGranted() {
+        BufferedRateLimiter limiter = new BufferedRateLimiter(10, 1, 100);
+        ListenableFuture<Void> actual1 = limiter.acquireAsync();
+        ListenableFuture<Void> actual2 = limiter.acquireAsync();
+        assertTrue(actual1.isDone());
+        assertFalse(actual2.isDone());
+    }
+
+    @Test
+    public void failedFutureReturnedIfQueueIsfull() {
+        BufferedRateLimiter limiter = new BufferedRateLimiter(1, 1, 100);
+        ListenableFuture<Void> actual1 = limiter.acquireAsync();
+        ListenableFuture<Void> actual2 = limiter.acquireAsync();
+        ListenableFuture<Void> actual3 = limiter.acquireAsync();
+
+        assertTrue(actual1.isDone());
+        assertFalse(actual2.isDone());
+        assertTrue(actual3.isDone());
+        try {
+            actual3.get();
+            fail();
+        } catch (Exception e) {
+            assertTrue(e instanceof ExecutionException);
+            Throwable actualCause = e.getCause();
+            assertTrue(actualCause instanceof IllegalStateException);
+            assertEquals("Rate Limit Buffer is full. Reject", actualCause.getMessage());
+        }
+    }
+
+    @Test
+    public void releasedPermitTriggerTasksFromQueue() throws InterruptedException {
+        BufferedRateLimiter limiter = new BufferedRateLimiter(10, 2, 100);
+        ListenableFuture<Void> actual1 = limiter.acquireAsync();
+        ListenableFuture<Void> actual2 = limiter.acquireAsync();
+        ListenableFuture<Void> actual3 = limiter.acquireAsync();
+        ListenableFuture<Void> actual4 = limiter.acquireAsync();
+        assertTrue(actual1.isDone());
+        assertTrue(actual2.isDone());
+        assertFalse(actual3.isDone());
+        assertFalse(actual4.isDone());
+        limiter.release();
+        TimeUnit.MILLISECONDS.sleep(100L);
+        assertTrue(actual3.isDone());
+        assertFalse(actual4.isDone());
+        limiter.release();
+        TimeUnit.MILLISECONDS.sleep(100L);
+        assertTrue(actual4.isDone());
+    }
+
+    @Test
+    public void permitsReleasedInConcurrentMode() throws InterruptedException {
+        BufferedRateLimiter limiter = new BufferedRateLimiter(10, 2, 100);
+        AtomicInteger actualReleased = new AtomicInteger();
+        AtomicInteger actualRejected = new AtomicInteger();
+        ListeningExecutorService pool = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(5));
+        for (int i = 0; i < 100; i++) {
+            ListenableFuture<ListenableFuture<Void>> submit = pool.submit(limiter::acquireAsync);
+            Futures.addCallback(submit, new FutureCallback<ListenableFuture<Void>>() {
+                @Override
+                public void onSuccess(@Nullable ListenableFuture<Void> result) {
+                    Futures.addCallback(result, new FutureCallback<Void>() {
+                        @Override
+                        public void onSuccess(@Nullable Void result) {
+                            try {
+                                TimeUnit.MILLISECONDS.sleep(100);
+                            } catch (InterruptedException e) {
+                                e.printStackTrace();
+                            }
+                            limiter.release();
+                            actualReleased.incrementAndGet();
+                        }
+
+                        @Override
+                        public void onFailure(Throwable t) {
+                            actualRejected.incrementAndGet();
+                        }
+                    });
+                }
+
+                @Override
+                public void onFailure(Throwable t) {
+                }
+            });
+        }
+
+        TimeUnit.SECONDS.sleep(2);
+        assertTrue("Unexpected released count " + actualReleased.get(),
+                actualReleased.get() > 10 && actualReleased.get() < 20);
+        assertTrue("Unexpected rejected count " + actualRejected.get(),
+                actualRejected.get() > 80 && actualRejected.get() < 90);
+
+    }
+
+
+}
\ No newline at end of file