thingsboard-aplcache

Aggregation

2/20/2017 2:02:03 PM

Details

diff --git a/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java b/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java
index 92e0ec4..6490124 100644
--- a/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java
+++ b/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java
@@ -1,12 +1,12 @@
 /**
  * Copyright © 2016-2017 The Thingsboard Authors
- *
+ * <p>
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,6 +17,7 @@ package org.thingsboard.server.actors.plugin;
 
 import java.io.IOException;
 import java.util.*;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
 import java.util.stream.Collectors;
@@ -152,7 +153,19 @@ public final class PluginProcessingContext implements PluginContext {
     @Override
     public List<TsKvEntry> loadTimeseries(DeviceId deviceId, TsKvQuery query) {
         validate(deviceId);
-        return pluginCtx.tsService.find(DataConstants.DEVICE, deviceId, query);
+        try {
+            return pluginCtx.tsService.findAll(DataConstants.DEVICE, deviceId, query).get();
+        } catch (Exception e) {
+            log.error("TODO", e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void loadTimeseries(DeviceId deviceId, TsKvQuery query, PluginCallback<List<TsKvEntry>> callback) {
+        validate(deviceId);
+        ListenableFuture<List<TsKvEntry>> future = pluginCtx.tsService.findAll(DataConstants.DEVICE, deviceId, query);
+        Futures.addCallback(future, getCallback(callback, v -> v), executor);
     }
 
     @Override
@@ -235,10 +248,10 @@ public final class PluginProcessingContext implements PluginContext {
         };
     }
 
-    private <T> FutureCallback<ResultSet> getCallback(final PluginCallback<T> callback, Function<ResultSet, T> transformer) {
-        return new FutureCallback<ResultSet>() {
+    private <T, R> FutureCallback<R> getCallback(final PluginCallback<T> callback, Function<R, T> transformer) {
+        return new FutureCallback<R>() {
             @Override
-            public void onSuccess(@Nullable ResultSet result) {
+            public void onSuccess(@Nullable R result) {
                 pluginCtx.self().tell(PluginCallbackMessage.onSuccess(callback, transformer.apply(result)), ActorRef.noSender());
             }
 
diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/kv/Aggregation.java b/common/data/src/main/java/org/thingsboard/server/common/data/kv/Aggregation.java
new file mode 100644
index 0000000..f8fad6c
--- /dev/null
+++ b/common/data/src/main/java/org/thingsboard/server/common/data/kv/Aggregation.java
@@ -0,0 +1,10 @@
+package org.thingsboard.server.common.data.kv;
+
+/**
+ * Created by ashvayka on 20.02.17.
+ */
+public enum Aggregation {
+
+    MIN, MAX, AVG, SUM, COUNT, NONE;
+
+}
diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/kv/BaseTsKvQuery.java b/common/data/src/main/java/org/thingsboard/server/common/data/kv/BaseTsKvQuery.java
index 7bb1a3f..78c887f 100644
--- a/common/data/src/main/java/org/thingsboard/server/common/data/kv/BaseTsKvQuery.java
+++ b/common/data/src/main/java/org/thingsboard/server/common/data/kv/BaseTsKvQuery.java
@@ -1,12 +1,12 @@
 /**
  * Copyright © 2016-2017 The Thingsboard Authors
- *
+ * <p>
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -15,59 +15,27 @@
  */
 package org.thingsboard.server.common.data.kv;
 
-import java.util.Optional;
+import lombok.Data;
 
+@Data
 public class BaseTsKvQuery implements TsKvQuery {
 
-    private String key;
-    private Optional<Long> startTs;
-    private Optional<Long> endTs;
-    private Optional<Integer> limit;
+    private final String key;
+    private final long startTs;
+    private final long endTs;
+    private final int limit;
+    private final Aggregation aggregation;
 
-    public BaseTsKvQuery(String key, Optional<Long> startTs, Optional<Long> endTs, Optional<Integer> limit) {
+    public BaseTsKvQuery(String key, long startTs, long endTs, int limit, Aggregation aggregation) {
         this.key = key;
         this.startTs = startTs;
         this.endTs = endTs;
         this.limit = limit;
-    }
-    
-    public BaseTsKvQuery(String key, Long startTs, Long endTs, Integer limit) {
-        this(key, Optional.ofNullable(startTs), Optional.ofNullable(endTs), Optional.ofNullable(limit));
-    }
-
-    public BaseTsKvQuery(String key, Long startTs, Integer limit) {
-        this(key, startTs, null, limit);
-    }
-
-    public BaseTsKvQuery(String key, Long startTs, Long endTs) {
-        this(key, startTs, endTs, null);
-    }
-
-    public BaseTsKvQuery(String key, Long startTs) {
-        this(key, startTs, null, null);
+        this.aggregation = aggregation;
     }
 
-    public BaseTsKvQuery(String key, Integer limit) {
-        this(key, null, null, limit);
+    public BaseTsKvQuery(String key, long startTs, long endTs) {
+        this(key, startTs, endTs, 1, Aggregation.AVG);
     }
 
-    @Override
-    public String getKey() {
-        return key;
-    }
-
-    @Override
-    public Optional<Long> getStartTs() {
-        return startTs;
-    }
-
-    @Override
-    public Optional<Long> getEndTs() {
-        return endTs;
-    }
-
-    @Override
-    public Optional<Integer> getLimit() {
-        return limit;
-    }
 }
diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/kv/TsKvQuery.java b/common/data/src/main/java/org/thingsboard/server/common/data/kv/TsKvQuery.java
index 1303117..10a13ce 100644
--- a/common/data/src/main/java/org/thingsboard/server/common/data/kv/TsKvQuery.java
+++ b/common/data/src/main/java/org/thingsboard/server/common/data/kv/TsKvQuery.java
@@ -21,10 +21,12 @@ public interface TsKvQuery {
 
     String getKey();
 
-    Optional<Long> getStartTs();
+    long getStartTs();
 
-    Optional<Long> getEndTs();
+    long getEndTs();
 
-    Optional<Integer> getLimit();
+    int getLimit();
+
+    Aggregation getAggregation();
 
 }
diff --git a/dao/src/main/java/org/thingsboard/server/dao/model/ModelConstants.java b/dao/src/main/java/org/thingsboard/server/dao/model/ModelConstants.java
index b68fb75..0f8418a 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/model/ModelConstants.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/model/ModelConstants.java
@@ -1,12 +1,12 @@
 /**
  * Copyright © 2016-2017 The Thingsboard Authors
- *
+ * <p>
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -18,14 +18,15 @@ package org.thingsboard.server.dao.model;
 import java.util.UUID;
 
 import com.datastax.driver.core.utils.UUIDs;
+import org.apache.commons.lang3.ArrayUtils;
 
 public class ModelConstants {
 
     private ModelConstants() {
     }
-    
+
     public static UUID NULL_UUID = UUIDs.startOf(0);
-    
+
     /**
      * Generic constants.
      */
@@ -38,7 +39,7 @@ public class ModelConstants {
     public static final String ALIAS_PROPERTY = "alias";
     public static final String SEARCH_TEXT_PROPERTY = "search_text";
     public static final String ADDITIONAL_INFO_PROPERTY = "additional_info";
-    
+
     /**
      * Cassandra user constants.
      */
@@ -50,11 +51,11 @@ public class ModelConstants {
     public static final String USER_FIRST_NAME_PROPERTY = "first_name";
     public static final String USER_LAST_NAME_PROPERTY = "last_name";
     public static final String USER_ADDITIONAL_INFO_PROPERTY = ADDITIONAL_INFO_PROPERTY;
-    
+
     public static final String USER_BY_EMAIL_COLUMN_FAMILY_NAME = "user_by_email";
     public static final String USER_BY_TENANT_AND_SEARCH_TEXT_COLUMN_FAMILY_NAME = "user_by_tenant_and_search_text";
     public static final String USER_BY_CUSTOMER_AND_SEARCH_TEXT_COLUMN_FAMILY_NAME = "user_by_customer_and_search_text";
-    
+
     /**
      * Cassandra user_credentials constants.
      */
@@ -64,20 +65,20 @@ public class ModelConstants {
     public static final String USER_CREDENTIALS_PASSWORD_PROPERTY = "password";
     public static final String USER_CREDENTIALS_ACTIVATE_TOKEN_PROPERTY = "activate_token";
     public static final String USER_CREDENTIALS_RESET_TOKEN_PROPERTY = "reset_token";
-    
+
     public static final String USER_CREDENTIALS_BY_USER_COLUMN_FAMILY_NAME = "user_credentials_by_user";
     public static final String USER_CREDENTIALS_BY_ACTIVATE_TOKEN_COLUMN_FAMILY_NAME = "user_credentials_by_activate_token";
     public static final String USER_CREDENTIALS_BY_RESET_TOKEN_COLUMN_FAMILY_NAME = "user_credentials_by_reset_token";
-    
+
     /**
      * Cassandra admin_settings constants.
      */
     public static final String ADMIN_SETTINGS_COLUMN_FAMILY_NAME = "admin_settings";
     public static final String ADMIN_SETTINGS_KEY_PROPERTY = "key";
     public static final String ADMIN_SETTINGS_JSON_VALUE_PROPERTY = "json_value";
-    
+
     public static final String ADMIN_SETTINGS_BY_KEY_COLUMN_FAMILY_NAME = "admin_settings_by_key";
-    
+
     /**
      * Cassandra contact constants.
      */
@@ -97,9 +98,9 @@ public class ModelConstants {
     public static final String TENANT_TITLE_PROPERTY = TITLE_PROPERTY;
     public static final String TENANT_REGION_PROPERTY = "region";
     public static final String TENANT_ADDITIONAL_INFO_PROPERTY = ADDITIONAL_INFO_PROPERTY;
-    
+
     public static final String TENANT_BY_REGION_AND_SEARCH_TEXT_COLUMN_FAMILY_NAME = "tenant_by_region_and_search_text";
-    
+
     /**
      * Cassandra customer constants.
      */
@@ -107,9 +108,9 @@ public class ModelConstants {
     public static final String CUSTOMER_TENANT_ID_PROPERTY = TENTANT_ID_PROPERTY;
     public static final String CUSTOMER_TITLE_PROPERTY = TITLE_PROPERTY;
     public static final String CUSTOMER_ADDITIONAL_INFO_PROPERTY = ADDITIONAL_INFO_PROPERTY;
-    
+
     public static final String CUSTOMER_BY_TENANT_AND_SEARCH_TEXT_COLUMN_FAMILY_NAME = "customer_by_tenant_and_search_text";
-    
+
     /**
      * Cassandra device constants.
      */
@@ -118,12 +119,12 @@ public class ModelConstants {
     public static final String DEVICE_CUSTOMER_ID_PROPERTY = CUSTOMER_ID_PROPERTY;
     public static final String DEVICE_NAME_PROPERTY = "name";
     public static final String DEVICE_ADDITIONAL_INFO_PROPERTY = ADDITIONAL_INFO_PROPERTY;
-    
+
     public static final String DEVICE_BY_TENANT_AND_SEARCH_TEXT_COLUMN_FAMILY_NAME = "device_by_tenant_and_search_text";
     public static final String DEVICE_BY_CUSTOMER_AND_SEARCH_TEXT_COLUMN_FAMILY_NAME = "device_by_customer_and_search_text";
     public static final String DEVICE_BY_TENANT_AND_NAME_VIEW_NAME = "device_by_tenant_and_name";
 
-    
+
     /**
      * Cassandra device_credentials constants.
      */
@@ -132,7 +133,7 @@ public class ModelConstants {
     public static final String DEVICE_CREDENTIALS_CREDENTIALS_TYPE_PROPERTY = "credentials_type";
     public static final String DEVICE_CREDENTIALS_CREDENTIALS_ID_PROPERTY = "credentials_id";
     public static final String DEVICE_CREDENTIALS_CREDENTIALS_VALUE_PROPERTY = "credentials_value";
-    
+
     public static final String DEVICE_CREDENTIALS_BY_DEVICE_COLUMN_FAMILY_NAME = "device_credentials_by_device";
     public static final String DEVICE_CREDENTIALS_BY_CREDENTIALS_ID_COLUMN_FAMILY_NAME = "device_credentials_by_credentials_id";
 
@@ -203,9 +204,9 @@ public class ModelConstants {
     public static final String COMPONENT_DESCRIPTOR_BY_SCOPE_TYPE_AND_SEARCH_TEXT_COLUMN_FAMILY_NAME = "component_desc_by_scope_type_search_text";
     public static final String COMPONENT_DESCRIPTOR_BY_ID = "component_desc_by_id";
 
-  /**
-   * Cassandra rule metadata constants.
-   */
+    /**
+     * Cassandra rule metadata constants.
+     */
     public static final String RULE_COLUMN_FAMILY_NAME = "rule";
     public static final String RULE_TENANT_ID_PROPERTY = TENTANT_ID_PROPERTY;
     public static final String RULE_NAME_PROPERTY = "name";
@@ -259,4 +260,31 @@ public class ModelConstants {
     public static final String STRING_VALUE_COLUMN = "str_v";
     public static final String LONG_VALUE_COLUMN = "long_v";
     public static final String DOUBLE_VALUE_COLUMN = "dbl_v";
+
+    public static final String[] COUNT_AGGREGATION_COLUMNS = new String[]{count(LONG_VALUE_COLUMN), count(DOUBLE_VALUE_COLUMN), count(BOOLEAN_VALUE_COLUMN), count(STRING_VALUE_COLUMN)};
+
+    public static final String[] NONE_AGGREGATION_COLUMNS = new String[]{LONG_VALUE_COLUMN, DOUBLE_VALUE_COLUMN, BOOLEAN_VALUE_COLUMN, STRING_VALUE_COLUMN,};
+    public static final String[] MIN_AGGREGATION_COLUMNS = ArrayUtils.addAll(COUNT_AGGREGATION_COLUMNS,
+            new String[]{min(LONG_VALUE_COLUMN), min(DOUBLE_VALUE_COLUMN), min(BOOLEAN_VALUE_COLUMN), min(STRING_VALUE_COLUMN)});
+    public static final String[] MAX_AGGREGATION_COLUMNS = ArrayUtils.addAll(COUNT_AGGREGATION_COLUMNS,
+            new String[]{max(LONG_VALUE_COLUMN), max(DOUBLE_VALUE_COLUMN), max(BOOLEAN_VALUE_COLUMN), max(STRING_VALUE_COLUMN)});
+    public static final String[] SUM_AGGREGATION_COLUMNS = ArrayUtils.addAll(COUNT_AGGREGATION_COLUMNS,
+            new String[]{sum(LONG_VALUE_COLUMN), sum(DOUBLE_VALUE_COLUMN)});
+    public static final String[] AVG_AGGREGATION_COLUMNS = ArrayUtils.addAll(COUNT_AGGREGATION_COLUMNS, SUM_AGGREGATION_COLUMNS);
+
+    public static String min(String s) {
+        return "min(" + s + ")";
+    }
+
+    public static String max(String s) {
+        return "max(" + s + ")";
+    }
+
+    public static String sum(String s) {
+        return "sum(" + s + ")";
+    }
+
+    public static String count(String s) {
+        return "count(" + s + ")";
+    }
 }
diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/AggregatePartitionsFunction.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/AggregatePartitionsFunction.java
new file mode 100644
index 0000000..9ee9022
--- /dev/null
+++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/AggregatePartitionsFunction.java
@@ -0,0 +1,175 @@
+package org.thingsboard.server.dao.timeseries;
+
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import org.thingsboard.server.common.data.kv.*;
+
+import javax.annotation.Nullable;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * Created by ashvayka on 20.02.17.
+ */
+public class AggregatePartitionsFunction implements com.google.common.base.Function<List<ResultSet>, Optional<TsKvEntry>> {
+
+    private static final int LONG_CNT_POS = 0;
+    private static final int DOUBLE_CNT_POS = 1;
+    private static final int BOOL_CNT_POS = 2;
+    private static final int STR_CNT_POS = 3;
+    private static final int LONG_POS = 4;
+    private static final int DOUBLE_POS = 5;
+    private static final int BOOL_POS = 6;
+    private static final int STR_POS = 7;
+
+    private final Aggregation aggregation;
+    private final String key;
+    private final long ts;
+
+    public AggregatePartitionsFunction(Aggregation aggregation, String key, long ts) {
+        this.aggregation = aggregation;
+        this.key = key;
+        this.ts = ts;
+    }
+
+    @Nullable
+    @Override
+    public Optional<TsKvEntry> apply(@Nullable List<ResultSet> rsList) {
+        if (rsList == null || rsList.size() == 0) {
+            return Optional.empty();
+        }
+        long count = 0;
+        DataType dataType = null;
+
+        Boolean bValue = null;
+        String sValue = null;
+        Double dValue = null;
+        Long lValue = null;
+
+        for (ResultSet rs : rsList) {
+            for (Row row : rs.all()) {
+                long curCount;
+
+                Long curLValue = null;
+                Double curDValue = null;
+                Boolean curBValue = null;
+                String curSValue = null;
+
+                long longCount = row.getLong(LONG_CNT_POS);
+                long doubleCount = row.getLong(DOUBLE_CNT_POS);
+                long boolCount = row.getLong(BOOL_CNT_POS);
+                long strCount = row.getLong(STR_CNT_POS);
+
+                if (longCount > 0) {
+                    dataType = DataType.LONG;
+                    curCount = longCount;
+                    curLValue = getLongValue(row);
+                } else if (doubleCount > 0) {
+                    dataType = DataType.DOUBLE;
+                    curCount = doubleCount;
+                    curDValue = getDoubleValue(row);
+                } else if (boolCount > 0) {
+                    dataType = DataType.BOOLEAN;
+                    curCount = boolCount;
+                    curBValue = getBooleanValue(row);
+                } else if (strCount > 0) {
+                    dataType = DataType.STRING;
+                    curCount = strCount;
+                    curSValue = getStringValue(row);
+                } else {
+                    continue;
+                }
+
+                if (aggregation == Aggregation.COUNT) {
+                    count += curCount;
+                } else if (aggregation == Aggregation.AVG || aggregation == Aggregation.SUM) {
+                    count += curCount;
+                    dValue = dValue == null ? curDValue : dValue + curDValue;
+                    lValue = lValue == null ? curLValue : lValue + curLValue;
+                } else if (aggregation == Aggregation.MIN) {
+                    if (curDValue != null) {
+                        dValue = dValue == null ? curDValue : Math.min(dValue, curDValue);
+                    } else if (curLValue != null) {
+                        lValue = lValue == null ? curLValue : Math.min(lValue, curLValue);
+                    } else if (curBValue != null) {
+                        bValue = bValue == null ? curBValue : bValue && curBValue;
+                    } else if (curSValue != null) {
+                        if (sValue == null || curSValue.compareTo(sValue) < 0) {
+                            sValue = curSValue;
+                        }
+                    }
+                } else if (aggregation == Aggregation.MAX) {
+                    if (curDValue != null) {
+                        dValue = dValue == null ? curDValue : Math.max(dValue, curDValue);
+                    } else if (curLValue != null) {
+                        lValue = lValue == null ? curLValue : Math.max(lValue, curLValue);
+                    } else if (curBValue != null) {
+                        bValue = bValue == null ? curBValue : bValue || curBValue;
+                    } else if (curSValue != null) {
+                        if (sValue == null || curSValue.compareTo(sValue) > 0) {
+                            sValue = curSValue;
+                        }
+                    }
+                }
+            }
+        }
+        if (dataType == null) {
+            return Optional.empty();
+        } else if (aggregation == Aggregation.COUNT) {
+            return Optional.of(new BasicTsKvEntry(ts, new LongDataEntry(key, (long) count)));
+        } else if (aggregation == Aggregation.AVG || aggregation == Aggregation.SUM) {
+            if (count == 0 || (dataType == DataType.DOUBLE && dValue == null) || (dataType == DataType.LONG && lValue == null)) {
+                return Optional.empty();
+            } else if (dataType == DataType.DOUBLE) {
+                return Optional.of(new BasicTsKvEntry(ts, new DoubleDataEntry(key, aggregation == Aggregation.SUM ? dValue : (dValue / count))));
+            } else if (dataType == DataType.LONG) {
+                return Optional.of(new BasicTsKvEntry(ts, new LongDataEntry(key, aggregation == Aggregation.SUM ? lValue : (lValue / count))));
+            }
+        } else if (aggregation == Aggregation.MIN || aggregation == Aggregation.MAX) {
+            if (dataType == DataType.DOUBLE) {
+                return Optional.of(new BasicTsKvEntry(ts, new DoubleDataEntry(key, dValue)));
+            } else if (dataType == DataType.LONG) {
+                return Optional.of(new BasicTsKvEntry(ts, new LongDataEntry(key, lValue)));
+            } else if (dataType == DataType.STRING) {
+                return Optional.of(new BasicTsKvEntry(ts, new StringDataEntry(key, sValue)));
+            } else {
+                return Optional.of(new BasicTsKvEntry(ts, new BooleanDataEntry(key, bValue)));
+            }
+        }
+        return null;
+    }
+
+    private Boolean getBooleanValue(Row row) {
+        if (aggregation == Aggregation.MIN || aggregation == Aggregation.MAX) {
+            return row.getBool(BOOL_POS);
+        } else {
+            return null;
+        }
+    }
+
+    private String getStringValue(Row row) {
+        if (aggregation == Aggregation.MIN || aggregation == Aggregation.MAX) {
+            return row.getString(STR_POS);
+        } else {
+            return null;
+        }
+    }
+
+    private Long getLongValue(Row row) {
+        if (aggregation == Aggregation.MIN || aggregation == Aggregation.MAX
+                || aggregation == Aggregation.SUM || aggregation == Aggregation.AVG) {
+            return row.getLong(LONG_POS);
+        } else {
+            return null;
+        }
+    }
+
+    private Double getDoubleValue(Row row) {
+        if (aggregation == Aggregation.MIN || aggregation == Aggregation.MAX
+                || aggregation == Aggregation.SUM || aggregation == Aggregation.AVG) {
+            return row.getDouble(DOUBLE_POS);
+        } else {
+            return null;
+        }
+    }
+}
diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesDao.java
index 09c415c..3419729 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesDao.java
@@ -1,12 +1,12 @@
 /**
  * Copyright © 2016-2017 The Thingsboard Authors
- *
+ * <p>
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -18,6 +18,10 @@ package org.thingsboard.server.dao.timeseries;
 import com.datastax.driver.core.*;
 import com.datastax.driver.core.querybuilder.QueryBuilder;
 import com.datastax.driver.core.querybuilder.Select;
+import com.google.common.base.Function;
+import com.google.common.util.concurrent.AsyncFunction;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Component;
@@ -26,7 +30,16 @@ import org.thingsboard.server.common.data.kv.DataType;
 import org.thingsboard.server.dao.AbstractDao;
 import org.thingsboard.server.dao.model.ModelConstants;
 
+import javax.annotation.Nullable;
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
 import java.util.*;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.stream.Collectors;
 
 import static com.datastax.driver.core.querybuilder.QueryBuilder.eq;
 import static com.datastax.driver.core.querybuilder.QueryBuilder.select;
@@ -41,48 +54,136 @@ public class BaseTimeseriesDao extends AbstractDao implements TimeseriesDao {
     @Value("${cassandra.query.max_limit_per_request}")
     protected Integer maxLimitPerRequest;
 
+    @Value("${cassandra.query.read_result_processing_threads}")
+    private int readResultsProcessingThreads;
+
+    @Value("${cassandra.query.min_read_step}")
+    private int minReadStep;
+
+    @Value("${cassandra.query.ts_key_value_partitioning}")
+    private String partitioning;
+
+    private TsPartitionDate tsFormat;
+
+    private ExecutorService readResultsProcessingExecutor;
+
     private PreparedStatement partitionInsertStmt;
     private PreparedStatement[] latestInsertStmts;
     private PreparedStatement[] saveStmts;
+    private PreparedStatement[] fetchStmts;
     private PreparedStatement findLatestStmt;
     private PreparedStatement findAllLatestStmt;
 
+    @PostConstruct
+    public void init() {
+        getFetchStmt(Aggregation.NONE);
+        readResultsProcessingExecutor = Executors.newFixedThreadPool(readResultsProcessingThreads);
+        Optional<TsPartitionDate> partition = TsPartitionDate.parse(partitioning);
+        if (partition.isPresent()) {
+            tsFormat = partition.get();
+        } else {
+            log.warn("Incorrect configuration of partitioning {}", partitioning);
+            throw new RuntimeException("Failed to parse partitioning property: " + partitioning + "!");
+        }
+    }
+
+    @PreDestroy
+    public void stop() {
+        if (readResultsProcessingExecutor != null) {
+            readResultsProcessingExecutor.shutdownNow();
+        }
+    }
+
     @Override
-    public List<TsKvEntry> find(String entityType, UUID entityId, TsKvQuery query, Optional<Long> minPartition, Optional<Long> maxPartition) {
-        List<Row> rows = Collections.emptyList();
-        Long[] parts = fetchPartitions(entityType, entityId, query.getKey(), minPartition, maxPartition);
-        int partsLength = parts.length;
-        if (parts != null && partsLength > 0) {
-            int limit = maxLimitPerRequest;
-            Optional<Integer> lim = query.getLimit();
-            if (lim.isPresent() && lim.get() < maxLimitPerRequest) {
-                limit = lim.get();
-            }
+    public long toPartitionTs(long ts) {
+        LocalDateTime time = LocalDateTime.ofInstant(Instant.ofEpochMilli(ts), ZoneOffset.UTC);
+        return tsFormat.truncatedTo(time).toInstant(ZoneOffset.UTC).toEpochMilli();
+    }
 
-            rows = new ArrayList<>(limit);
-            int lastIdx = partsLength - 1;
-            for (int i = 0; i < partsLength; i++) {
-                int currentLimit;
-                if (rows.size() >= limit) {
-                    break;
-                } else {
-                    currentLimit = limit - rows.size();
+
+    private static String[] getFetchColumnNames(Aggregation aggregation) {
+        switch (aggregation) {
+            case NONE:
+                return ModelConstants.NONE_AGGREGATION_COLUMNS;
+            case MIN:
+                return ModelConstants.MIN_AGGREGATION_COLUMNS;
+            case MAX:
+                return ModelConstants.MAX_AGGREGATION_COLUMNS;
+            case SUM:
+                return ModelConstants.SUM_AGGREGATION_COLUMNS;
+            case COUNT:
+                return ModelConstants.COUNT_AGGREGATION_COLUMNS;
+            case AVG:
+                return ModelConstants.AVG_AGGREGATION_COLUMNS;
+            default:
+                throw new RuntimeException("Aggregation type: " + aggregation + " is not supported!");
+        }
+    }
+
+    @Override
+    public ListenableFuture<List<TsKvEntry>> findAllAsync(String entityType, UUID entityId, TsKvQuery query, long minPartition, long maxPartition) {
+        if (query.getAggregation() == Aggregation.NONE) {
+            //TODO:
+            return null;
+        } else {
+            long step = Math.max((query.getEndTs() - query.getStartTs()) / query.getLimit(), minReadStep);
+            long stepTs = query.getStartTs();
+            List<ListenableFuture<Optional<TsKvEntry>>> futures = new ArrayList<>();
+            while (stepTs < query.getEndTs()) {
+                long startTs = stepTs;
+                long endTs = stepTs + step;
+                TsKvQuery subQuery = new BaseTsKvQuery(query.getKey(), startTs, endTs, 1, query.getAggregation());
+                futures.add(findAndAggregateAsync(entityType, entityId, subQuery, toPartitionTs(startTs), toPartitionTs(endTs)));
+                stepTs = endTs;
+            }
+            ListenableFuture<List<Optional<TsKvEntry>>> future = Futures.allAsList(futures);
+            return Futures.transform(future, new Function<List<Optional<TsKvEntry>>, List<TsKvEntry>>() {
+                @Nullable
+                @Override
+                public List<TsKvEntry> apply(@Nullable List<Optional<TsKvEntry>> input) {
+                    return input.stream().filter(v -> v.isPresent()).map(v -> v.get()).collect(Collectors.toList());
                 }
-                Long partition = parts[i];
-                Select.Where where = select().from(ModelConstants.TS_KV_CF).where(eq(ModelConstants.ENTITY_TYPE_COLUMN, entityType))
-                        .and(eq(ModelConstants.ENTITY_ID_COLUMN, entityId))
-                        .and(eq(ModelConstants.KEY_COLUMN, query.getKey()))
-                        .and(eq(ModelConstants.PARTITION_COLUMN, partition));
-                if (i == 0 && query.getStartTs().isPresent()) {
-                    where.and(QueryBuilder.gt(ModelConstants.TS_COLUMN, query.getStartTs().get()));
-                } else if (i == lastIdx && query.getEndTs().isPresent()) {
-                    where.and(QueryBuilder.lte(ModelConstants.TS_COLUMN, query.getEndTs().get()));
+            });
+        }
+    }
+
+    private ListenableFuture<Optional<TsKvEntry>> findAndAggregateAsync(String entityType, UUID entityId, TsKvQuery query, long minPartition, long maxPartition) {
+        final Aggregation aggregation = query.getAggregation();
+        final long startTs = query.getStartTs();
+        final long endTs = query.getEndTs();
+        final long ts = startTs + (endTs - startTs) / 2;
+
+        ResultSetFuture partitionsFuture = fetchPartitions(entityType, entityId, query.getKey(), minPartition, maxPartition);
+        com.google.common.base.Function<ResultSet, List<Long>> toArrayFunction = rows -> rows.all().stream()
+                .map(row -> row.getLong(ModelConstants.PARTITION_COLUMN)).collect(Collectors.toList());
+
+        ListenableFuture<List<Long>> partitionsListFuture = Futures.transform(partitionsFuture, toArrayFunction, readResultsProcessingExecutor);
+
+        AsyncFunction<List<Long>, List<ResultSet>> fetchChunksFunction = partitions -> {
+            try {
+                PreparedStatement proto = getFetchStmt(aggregation);
+                List<ResultSetFuture> futures = new ArrayList<>(partitions.size());
+                for (Long partition : partitions) {
+                    BoundStatement stmt = proto.bind();
+                    stmt.setString(0, entityType);
+                    stmt.setUUID(1, entityId);
+                    stmt.setString(2, query.getKey());
+                    stmt.setLong(3, partition);
+                    stmt.setLong(4, startTs);
+                    stmt.setLong(5, endTs);
+                    log.debug("Generated query [{}] for entityType {} and entityId {}", stmt, entityType, entityId);
+                    futures.add(executeAsyncRead(stmt));
                 }
-                where.limit(currentLimit);
-                rows.addAll(executeRead(where).all());
+                return Futures.allAsList(futures);
+            } catch (Throwable e) {
+                log.error("Failed to fetch data", e);
+                throw e;
             }
-        }
-        return convertResultToTsKvEntryList(rows);
+        };
+
+        ListenableFuture<List<ResultSet>> aggregationChunks = Futures.transform(partitionsListFuture, fetchChunksFunction, readResultsProcessingExecutor);
+
+        return Futures.transform(aggregationChunks, new AggregatePartitionsFunction(aggregation, query.getKey(), ts), readResultsProcessingExecutor);
     }
 
     @Override
@@ -190,13 +291,12 @@ public class BaseTimeseriesDao extends AbstractDao implements TimeseriesDao {
      * Select existing partitions from the table
      * <code>{@link ModelConstants#TS_KV_PARTITIONS_CF}</code> for the given entity
      */
-    private Long[] fetchPartitions(String entityType, UUID entityId, String key, Optional<Long> minPartition, Optional<Long> maxPartition) {
+    private ResultSetFuture fetchPartitions(String entityType, UUID entityId, String key, long minPartition, long maxPartition) {
         Select.Where select = QueryBuilder.select(ModelConstants.PARTITION_COLUMN).from(ModelConstants.TS_KV_PARTITIONS_CF).where(eq(ModelConstants.ENTITY_TYPE_COLUMN, entityType))
                 .and(eq(ModelConstants.ENTITY_ID_COLUMN, entityId)).and(eq(ModelConstants.KEY_COLUMN, key));
-        minPartition.ifPresent(startTs -> select.and(QueryBuilder.gte(ModelConstants.PARTITION_COLUMN, minPartition.get())));
-        maxPartition.ifPresent(endTs -> select.and(QueryBuilder.lte(ModelConstants.PARTITION_COLUMN, maxPartition.get())));
-        ResultSet resultSet = executeRead(select);
-        return resultSet.all().stream().map(row -> row.getLong(ModelConstants.PARTITION_COLUMN)).toArray(Long[]::new);
+        select.and(QueryBuilder.gte(ModelConstants.PARTITION_COLUMN, minPartition));
+        select.and(QueryBuilder.lte(ModelConstants.PARTITION_COLUMN, maxPartition));
+        return executeAsyncRead(select);
     }
 
     private PreparedStatement getSaveStmt(DataType dataType) {
@@ -216,6 +316,23 @@ public class BaseTimeseriesDao extends AbstractDao implements TimeseriesDao {
         return saveStmts[dataType.ordinal()];
     }
 
+    private PreparedStatement getFetchStmt(Aggregation aggType) {
+        if (fetchStmts == null) {
+            fetchStmts = new PreparedStatement[Aggregation.values().length];
+            for (Aggregation type : Aggregation.values()) {
+                fetchStmts[type.ordinal()] = getSession().prepare("SELECT " +
+                        String.join(", ", getFetchColumnNames(type)) + " FROM " + ModelConstants.TS_KV_CF
+                        + " WHERE " + ModelConstants.ENTITY_TYPE_COLUMN + " = ? "
+                        + "AND " + ModelConstants.ENTITY_ID_COLUMN + " = ? "
+                        + "AND " + ModelConstants.KEY_COLUMN + " = ? "
+                        + "AND " + ModelConstants.PARTITION_COLUMN + " = ? "
+                        + "AND " + ModelConstants.TS_COLUMN + " > ? "
+                        + "AND " + ModelConstants.TS_COLUMN + " <= ?");
+            }
+        }
+        return fetchStmts[aggType.ordinal()];
+    }
+
     private PreparedStatement getLatestStmt(DataType dataType) {
         if (latestInsertStmts == null) {
             latestInsertStmts = new PreparedStatement[DataType.values().length];
diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java
index 419b534..a8b4ef5 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java
@@ -1,12 +1,12 @@
 /**
  * Copyright © 2016-2017 The Thingsboard Authors
- *
+ * <p>
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -23,21 +23,23 @@ import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import lombok.extern.slf4j.Slf4j;
 import org.thingsboard.server.common.data.id.UUIDBased;
+import org.thingsboard.server.common.data.kv.BaseTsKvQuery;
 import org.thingsboard.server.common.data.kv.TsKvEntry;
 import org.thingsboard.server.common.data.kv.TsKvQuery;
 import org.thingsboard.server.dao.exception.IncorrectParameterException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Service;
 import org.thingsboard.server.dao.service.Validator;
 
 import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
 import java.time.Instant;
 import java.time.LocalDateTime;
 import java.time.ZoneOffset;
 import java.util.*;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 import static org.apache.commons.lang3.StringUtils.isBlank;
 
@@ -50,38 +52,14 @@ public class BaseTimeseriesService implements TimeseriesService {
 
     public static final int INSERTS_PER_ENTRY = 3;
 
-    @Value("${cassandra.query.ts_key_value_partitioning}")
-    private String partitioning;
-
     @Autowired
     private TimeseriesDao timeseriesDao;
 
-    private TsPartitionDate tsFormat;
-
-    @PostConstruct
-    public void init() {
-        Optional<TsPartitionDate> partition = TsPartitionDate.parse(partitioning);
-        if (partition.isPresent()) {
-            tsFormat = partition.get();
-        } else {
-            log.warn("Incorrect configuration of partitioning {}", partitioning);
-            throw new RuntimeException("Failed to parse partitioning property: " + partitioning + "!");
-        }
-    }
-
     @Override
-    public List<TsKvEntry> find(String entityType, UUIDBased entityId, TsKvQuery query) {
+    public ListenableFuture<List<TsKvEntry>> findAll(String entityType, UUIDBased entityId, TsKvQuery query) {
         validate(entityType, entityId);
         validate(query);
-        return timeseriesDao.find(entityType, entityId.getId(), query, toPartitionTs(query.getStartTs()), toPartitionTs(query.getEndTs()));
-    }
-
-    private Optional<Long> toPartitionTs(Optional<Long> ts) {
-        if (ts.isPresent()) {
-            return Optional.of(toPartitionTs(ts.get()));
-        } else {
-            return Optional.empty();
-        }
+        return timeseriesDao.findAllAsync(entityType, entityId.getId(), query, timeseriesDao.toPartitionTs(query.getStartTs()), timeseriesDao.toPartitionTs(query.getEndTs()));
     }
 
     @Override
@@ -106,7 +84,7 @@ public class BaseTimeseriesService implements TimeseriesService {
             throw new IncorrectParameterException("Key value entry can't be null");
         }
         UUID uid = entityId.getId();
-        long partitionTs = toPartitionTs(tsKvEntry.getTs());
+        long partitionTs = timeseriesDao.toPartitionTs(tsKvEntry.getTs());
 
         List<ResultSetFuture> futures = Lists.newArrayListWithExpectedSize(INSERTS_PER_ENTRY);
         saveAndRegisterFutures(futures, entityType, tsKvEntry, uid, partitionTs);
@@ -122,7 +100,7 @@ public class BaseTimeseriesService implements TimeseriesService {
                 throw new IncorrectParameterException("Key value entry can't be null");
             }
             UUID uid = entityId.getId();
-            long partitionTs = toPartitionTs(tsKvEntry.getTs());
+            long partitionTs = timeseriesDao.toPartitionTs(tsKvEntry.getTs());
             saveAndRegisterFutures(futures, entityType, tsKvEntry, uid, partitionTs);
         }
         return Futures.allAsList(futures);
@@ -144,14 +122,6 @@ public class BaseTimeseriesService implements TimeseriesService {
         futures.add(timeseriesDao.save(entityType, uid, partitionTs, tsKvEntry));
     }
 
-    private long toPartitionTs(long ts) {
-        LocalDateTime time = LocalDateTime.ofInstant(Instant.ofEpochMilli(ts), ZoneOffset.UTC);
-
-        LocalDateTime parititonTime = tsFormat.truncatedTo(time);
-
-        return parititonTime.toInstant(ZoneOffset.UTC).toEpochMilli();
-    }
-
     private static void validate(String entityType, UUIDBased entityId) {
         Validator.validateString(entityType, "Incorrect entityType " + entityType);
         Validator.validateId(entityId, "Incorrect entityId " + entityId);
@@ -163,5 +133,6 @@ public class BaseTimeseriesService implements TimeseriesService {
         } else if (isBlank(query.getKey())) {
             throw new IncorrectParameterException("Incorrect TsKvQuery. Key can't be empty");
         }
+        //TODO: add validation of all params
     }
 }
diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesDao.java
index 294f574..83b78da 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesDao.java
@@ -17,6 +17,7 @@ package org.thingsboard.server.dao.timeseries;
 
 import com.datastax.driver.core.ResultSetFuture;
 import com.datastax.driver.core.Row;
+import com.google.common.util.concurrent.ListenableFuture;
 import org.thingsboard.server.common.data.kv.TsKvEntry;
 import org.thingsboard.server.common.data.kv.TsKvQuery;
 
@@ -30,7 +31,11 @@ import java.util.UUID;
  */
 public interface TimeseriesDao {
 
-    List<TsKvEntry> find(String entityType, UUID entityId, TsKvQuery query, Optional<Long> minPartition, Optional<Long> maxPartition);
+    long toPartitionTs(long ts);
+
+    ListenableFuture<List<TsKvEntry>> findAllAsync(String entityType, UUID entityId, TsKvQuery query, long minPartition, long maxPartition);
+
+//    List<TsKvEntry> find(String entityType, UUID entityId, TsKvQuery query, Optional<Long> minPartition, Optional<Long> maxPartition);
 
     ResultSetFuture findLatest(String entityType, UUID entityId, String key);
 
diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java
index d8b31af..1bafdea 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java
@@ -19,6 +19,7 @@ import com.datastax.driver.core.ResultSet;
 import com.datastax.driver.core.ResultSetFuture;
 import com.datastax.driver.core.Row;
 import com.google.common.util.concurrent.ListenableFuture;
+import org.thingsboard.server.common.data.id.DeviceId;
 import org.thingsboard.server.common.data.id.UUIDBased;
 import org.thingsboard.server.common.data.kv.TsKvEntry;
 import org.thingsboard.server.common.data.kv.TsKvQuery;
@@ -32,8 +33,7 @@ import java.util.Set;
  */
 public interface TimeseriesService {
 
-    //TODO: Replace this with async operation
-    List<TsKvEntry> find(String entityType, UUIDBased entityId, TsKvQuery query);
+    ListenableFuture<List<TsKvEntry>> findAll(String entityType, UUIDBased entityId, TsKvQuery query);
 
     ListenableFuture<List<ResultSet>> findLatest(String entityType, UUIDBased entityId, Collection<String> keys);
 
diff --git a/dao/src/test/java/org/thingsboard/server/dao/DaoTestSuite.java b/dao/src/test/java/org/thingsboard/server/dao/DaoTestSuite.java
index b150259..ac48536 100644
--- a/dao/src/test/java/org/thingsboard/server/dao/DaoTestSuite.java
+++ b/dao/src/test/java/org/thingsboard/server/dao/DaoTestSuite.java
@@ -25,11 +25,11 @@ import java.util.Arrays;
 
 @RunWith(ClasspathSuite.class)
 @ClassnameFilters({
-        "org.thingsboard.server.dao.service.*Test",
-        "org.thingsboard.server.dao.kv.*Test",
-        "org.thingsboard.server.dao.plugin.*Test",
-        "org.thingsboard.server.dao.rule.*Test",
-        "org.thingsboard.server.dao.attributes.*Test",
+//        "org.thingsboard.server.dao.service.*Test",
+//        "org.thingsboard.server.dao.kv.*Test",
+//        "org.thingsboard.server.dao.plugin.*Test",
+//        "org.thingsboard.server.dao.rule.*Test",
+//        "org.thingsboard.server.dao.attributes.*Test",
         "org.thingsboard.server.dao.timeseries.*Test"
 })
 public class DaoTestSuite {
diff --git a/dao/src/test/java/org/thingsboard/server/dao/timeseries/TimeseriesServiceTest.java b/dao/src/test/java/org/thingsboard/server/dao/timeseries/TimeseriesServiceTest.java
index 9e4f492..51fce6f 100644
--- a/dao/src/test/java/org/thingsboard/server/dao/timeseries/TimeseriesServiceTest.java
+++ b/dao/src/test/java/org/thingsboard/server/dao/timeseries/TimeseriesServiceTest.java
@@ -116,14 +116,36 @@ public class TimeseriesServiceTest extends AbstractServiceTest {
             entries.add(tsKvEntry);
         }
         log.debug("Saved all records {}", localDateTime);
-        List<TsKvEntry> list = tsService.find(DataConstants.DEVICE, deviceId, new BaseTsKvQuery(STRING_KEY, entries.get(599).getTs(),
-                LocalDateTime.now(ZoneOffset.UTC).toInstant(ZoneOffset.UTC).toEpochMilli()));
+        List<TsKvEntry> list = tsService.findAll(DataConstants.DEVICE, deviceId, new BaseTsKvQuery(STRING_KEY, entries.get(599).getTs(),
+                LocalDateTime.now(ZoneOffset.UTC).toInstant(ZoneOffset.UTC).toEpochMilli(), PARTITION_MINUTES - 599, Aggregation.MIN)).get();
         log.debug("Fetched records {}", localDateTime);
         List<TsKvEntry> expected = entries.subList(600, PARTITION_MINUTES);
         assertEquals(expected.size(), list.size());
         assertEquals(expected, list);
     }
 
+//    @Test
+//    public void testFindDeviceTsDataByQuery() throws Exception {
+//        DeviceId deviceId = new DeviceId(UUIDs.timeBased());
+//        LocalDateTime localDateTime = LocalDateTime.now(ZoneOffset.UTC).minusMinutes(PARTITION_MINUTES);
+//        log.debug("Start event time is {}", localDateTime);
+//        List<TsKvEntry> entries = new ArrayList<>(PARTITION_MINUTES);
+//
+//        for (int i = 0; i < PARTITION_MINUTES; i++) {
+//            long time = localDateTime.plusMinutes(i).toInstant(ZoneOffset.UTC).toEpochMilli();
+//            BasicTsKvEntry tsKvEntry = new BasicTsKvEntry(time, stringKvEntry);
+//            tsService.save(DataConstants.DEVICE, deviceId, tsKvEntry).get();
+//            entries.add(tsKvEntry);
+//        }
+//        log.debug("Saved all records {}", localDateTime);
+//        List<TsKvEntry> list = tsService.findAll(DataConstants.DEVICE, deviceId, new BaseTsKvQuery(STRING_KEY, entries.get(599).getTs(),
+//                LocalDateTime.now(ZoneOffset.UTC).toInstant(ZoneOffset.UTC).toEpochMilli(), PARTITION_MINUTES - 599, Aggregation.MIN)).get();
+//        log.debug("Fetched records {}", localDateTime);
+//        List<TsKvEntry> expected = entries.subList(600, PARTITION_MINUTES);
+//        assertEquals(expected.size(), list.size());
+//        assertEquals(expected, list);
+//    }
+
 
     private void saveEntries(DeviceId deviceId, long ts) throws ExecutionException, InterruptedException {
         tsService.save(DataConstants.DEVICE, deviceId, toTsEntry(ts, stringKvEntry)).get();
diff --git a/dao/src/test/resources/cassandra-test.properties b/dao/src/test/resources/cassandra-test.properties
index 82fcbe1..0a207e7 100644
--- a/dao/src/test/resources/cassandra-test.properties
+++ b/dao/src/test/resources/cassandra-test.properties
@@ -2,7 +2,7 @@ cassandra.cluster_name=Thingsboard Cluster
 
 cassandra.keyspace_name=thingsboard
 
-cassandra.url=127.0.0.1:9142
+cassandra.url=127.0.0.1:9042
 
 cassandra.ssl=false
 
@@ -47,3 +47,7 @@ cassandra.query.default_fetch_size=2000
 cassandra.query.ts_key_value_partitioning=HOURS
 
 cassandra.query.max_limit_per_request=1000
+
+cassandra.query.read_result_processing_threads=3
+
+cassandra.query.min_read_step=100
\ No newline at end of file
diff --git a/extensions-api/src/main/java/org/thingsboard/server/extensions/api/plugins/PluginContext.java b/extensions-api/src/main/java/org/thingsboard/server/extensions/api/plugins/PluginContext.java
index fb6ae07..b5f27a1 100644
--- a/extensions-api/src/main/java/org/thingsboard/server/extensions/api/plugins/PluginContext.java
+++ b/extensions-api/src/main/java/org/thingsboard/server/extensions/api/plugins/PluginContext.java
@@ -84,6 +84,8 @@ public interface PluginContext {
 
     List<TsKvEntry> loadTimeseries(DeviceId deviceId, TsKvQuery query);
 
+    void loadTimeseries(DeviceId deviceId, TsKvQuery query, PluginCallback<List<TsKvEntry>> callback);
+
     void loadLatestTimeseries(DeviceId deviceId, Collection<String> keys, PluginCallback<List<TsKvEntry>> callback);
 
     void loadLatestTimeseries(DeviceId deviceId, PluginCallback<List<TsKvEntry>> callback);
diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRestMsgHandler.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRestMsgHandler.java
index d441e81..e93f0b5 100644
--- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRestMsgHandler.java
+++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRestMsgHandler.java
@@ -95,8 +95,9 @@ public class TelemetryRestMsgHandler extends DefaultRestMsgHandler {
                     Optional<Integer> limit = request.getIntParamValue("limit");
                     Map<String, List<TsData>> data = new LinkedHashMap<>();
                     for (String key : keys.split(",")) {
-                        List<TsKvEntry> entries = ctx.loadTimeseries(deviceId, new BaseTsKvQuery(key, startTs, endTs, limit));
-                        data.put(key, entries.stream().map(v -> new TsData(v.getTs(), v.getValueAsString())).collect(Collectors.toList()));
+                        //TODO: refactoring
+//                        List<TsKvEntry> entries = ctx.loadTimeseries(deviceId, new BaseTsKvQuery(key, startTs, endTs, limit));
+//                        data.put(key, entries.stream().map(v -> new TsData(v.getTs(), v.getValueAsString())).collect(Collectors.toList()));
                     }
                     msg.getResponseHolder().setResult(new ResponseEntity<>(data, HttpStatus.OK));
                 } else if ("attributes".equals(entity)) {