thingsboard-aplcache

Implementation

2/21/2017 7:26:15 AM

Details

diff --git a/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java b/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java
index 6490124..fbd0cce 100644
--- a/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java
+++ b/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java
@@ -1,12 +1,12 @@
 /**
  * Copyright © 2016-2017 The Thingsboard Authors
- * <p>
+ *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml
index d2cac80..42b37db 100644
--- a/application/src/main/resources/thingsboard.yml
+++ b/application/src/main/resources/thingsboard.yml
@@ -140,7 +140,7 @@ cassandra:
     # Specify partitioning size for timestamp key-value storage. Example MINUTES, HOURS, DAYS, MONTHS
     ts_key_value_partitioning: "${TS_KV_PARTITIONING:MONTHS}"
     # Specify max data points per request
-    max_limit_per_request: "${TS_KV_MAX_LIMIT_PER_REQUEST:86400}"
+    min_aggregation_step_ms: "${TS_KV_MIN_AGGREGATION_STEP_MS:100}"
 
 # Actor system parameters
 actors:
diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/kv/Aggregation.java b/common/data/src/main/java/org/thingsboard/server/common/data/kv/Aggregation.java
index f8fad6c..479a49a 100644
--- a/common/data/src/main/java/org/thingsboard/server/common/data/kv/Aggregation.java
+++ b/common/data/src/main/java/org/thingsboard/server/common/data/kv/Aggregation.java
@@ -1,3 +1,18 @@
+/**
+ * Copyright © 2016-2017 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.thingsboard.server.common.data.kv;
 
 /**
diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/kv/BaseTsKvQuery.java b/common/data/src/main/java/org/thingsboard/server/common/data/kv/BaseTsKvQuery.java
index 78c887f..ed48206 100644
--- a/common/data/src/main/java/org/thingsboard/server/common/data/kv/BaseTsKvQuery.java
+++ b/common/data/src/main/java/org/thingsboard/server/common/data/kv/BaseTsKvQuery.java
@@ -1,12 +1,12 @@
 /**
  * Copyright © 2016-2017 The Thingsboard Authors
- * <p>
+ *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
diff --git a/dao/src/main/java/org/thingsboard/server/dao/model/ModelConstants.java b/dao/src/main/java/org/thingsboard/server/dao/model/ModelConstants.java
index 0f8418a..d3ed5d1 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/model/ModelConstants.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/model/ModelConstants.java
@@ -1,12 +1,12 @@
 /**
  * Copyright © 2016-2017 The Thingsboard Authors
- * <p>
+ *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -19,6 +19,7 @@ import java.util.UUID;
 
 import com.datastax.driver.core.utils.UUIDs;
 import org.apache.commons.lang3.ArrayUtils;
+import org.thingsboard.server.common.data.kv.Aggregation;
 
 public class ModelConstants {
 
@@ -261,16 +262,17 @@ public class ModelConstants {
     public static final String LONG_VALUE_COLUMN = "long_v";
     public static final String DOUBLE_VALUE_COLUMN = "dbl_v";
 
+    public static final String[] NONE_AGGREGATION_COLUMNS = new String[]{LONG_VALUE_COLUMN, DOUBLE_VALUE_COLUMN, BOOLEAN_VALUE_COLUMN, STRING_VALUE_COLUMN, KEY_COLUMN, TS_COLUMN};
+
     public static final String[] COUNT_AGGREGATION_COLUMNS = new String[]{count(LONG_VALUE_COLUMN), count(DOUBLE_VALUE_COLUMN), count(BOOLEAN_VALUE_COLUMN), count(STRING_VALUE_COLUMN)};
 
-    public static final String[] NONE_AGGREGATION_COLUMNS = new String[]{LONG_VALUE_COLUMN, DOUBLE_VALUE_COLUMN, BOOLEAN_VALUE_COLUMN, STRING_VALUE_COLUMN,};
     public static final String[] MIN_AGGREGATION_COLUMNS = ArrayUtils.addAll(COUNT_AGGREGATION_COLUMNS,
             new String[]{min(LONG_VALUE_COLUMN), min(DOUBLE_VALUE_COLUMN), min(BOOLEAN_VALUE_COLUMN), min(STRING_VALUE_COLUMN)});
     public static final String[] MAX_AGGREGATION_COLUMNS = ArrayUtils.addAll(COUNT_AGGREGATION_COLUMNS,
             new String[]{max(LONG_VALUE_COLUMN), max(DOUBLE_VALUE_COLUMN), max(BOOLEAN_VALUE_COLUMN), max(STRING_VALUE_COLUMN)});
     public static final String[] SUM_AGGREGATION_COLUMNS = ArrayUtils.addAll(COUNT_AGGREGATION_COLUMNS,
             new String[]{sum(LONG_VALUE_COLUMN), sum(DOUBLE_VALUE_COLUMN)});
-    public static final String[] AVG_AGGREGATION_COLUMNS = ArrayUtils.addAll(COUNT_AGGREGATION_COLUMNS, SUM_AGGREGATION_COLUMNS);
+    public static final String[] AVG_AGGREGATION_COLUMNS = SUM_AGGREGATION_COLUMNS;
 
     public static String min(String s) {
         return "min(" + s + ")";
@@ -287,4 +289,23 @@ public class ModelConstants {
     public static String count(String s) {
         return "count(" + s + ")";
     }
+
+    public static String[] getFetchColumnNames(Aggregation aggregation) {
+        switch (aggregation) {
+            case NONE:
+                return NONE_AGGREGATION_COLUMNS;
+            case MIN:
+                return MIN_AGGREGATION_COLUMNS;
+            case MAX:
+                return MAX_AGGREGATION_COLUMNS;
+            case SUM:
+                return SUM_AGGREGATION_COLUMNS;
+            case COUNT:
+                return COUNT_AGGREGATION_COLUMNS;
+            case AVG:
+                return AVG_AGGREGATION_COLUMNS;
+            default:
+                throw new RuntimeException("Aggregation type: " + aggregation + " is not supported!");
+        }
+    }
 }
diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/AggregatePartitionsFunction.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/AggregatePartitionsFunction.java
index 9ee9022..f099eec 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/AggregatePartitionsFunction.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/AggregatePartitionsFunction.java
@@ -1,3 +1,18 @@
+/**
+ * Copyright © 2016-2017 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.thingsboard.server.dao.timeseries;
 
 import com.datastax.driver.core.ResultSet;
@@ -84,8 +99,11 @@ public class AggregatePartitionsFunction implements com.google.common.base.Funct
                     count += curCount;
                 } else if (aggregation == Aggregation.AVG || aggregation == Aggregation.SUM) {
                     count += curCount;
-                    dValue = dValue == null ? curDValue : dValue + curDValue;
-                    lValue = lValue == null ? curLValue : lValue + curLValue;
+                    if (curDValue != null) {
+                        dValue = dValue == null ? curDValue : dValue + curDValue;
+                    } else if (curLValue != null) {
+                        lValue = lValue == null ? curLValue : lValue + curLValue;
+                    }
                 } else if (aggregation == Aggregation.MIN) {
                     if (curDValue != null) {
                         dValue = dValue == null ? curDValue : Math.min(dValue, curDValue);
diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesDao.java
index 3419729..5cf71fc 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesDao.java
@@ -1,12 +1,12 @@
 /**
  * Copyright © 2016-2017 The Thingsboard Authors
- * <p>
+ *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -20,6 +20,7 @@ import com.datastax.driver.core.querybuilder.QueryBuilder;
 import com.datastax.driver.core.querybuilder.Select;
 import com.google.common.base.Function;
 import com.google.common.util.concurrent.AsyncFunction;
+import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import lombok.extern.slf4j.Slf4j;
@@ -51,14 +52,8 @@ import static com.datastax.driver.core.querybuilder.QueryBuilder.select;
 @Slf4j
 public class BaseTimeseriesDao extends AbstractDao implements TimeseriesDao {
 
-    @Value("${cassandra.query.max_limit_per_request}")
-    protected Integer maxLimitPerRequest;
-
-    @Value("${cassandra.query.read_result_processing_threads}")
-    private int readResultsProcessingThreads;
-
-    @Value("${cassandra.query.min_read_step}")
-    private int minReadStep;
+    @Value("${cassandra.query.min_aggregation_step_ms}")
+    private int minAggregationStepMs;
 
     @Value("${cassandra.query.ts_key_value_partitioning}")
     private String partitioning;
@@ -77,7 +72,7 @@ public class BaseTimeseriesDao extends AbstractDao implements TimeseriesDao {
     @PostConstruct
     public void init() {
         getFetchStmt(Aggregation.NONE);
-        readResultsProcessingExecutor = Executors.newFixedThreadPool(readResultsProcessingThreads);
+        readResultsProcessingExecutor = Executors.newCachedThreadPool();
         Optional<TsPartitionDate> partition = TsPartitionDate.parse(partitioning);
         if (partition.isPresent()) {
             tsFormat = partition.get();
@@ -100,33 +95,12 @@ public class BaseTimeseriesDao extends AbstractDao implements TimeseriesDao {
         return tsFormat.truncatedTo(time).toInstant(ZoneOffset.UTC).toEpochMilli();
     }
 
-
-    private static String[] getFetchColumnNames(Aggregation aggregation) {
-        switch (aggregation) {
-            case NONE:
-                return ModelConstants.NONE_AGGREGATION_COLUMNS;
-            case MIN:
-                return ModelConstants.MIN_AGGREGATION_COLUMNS;
-            case MAX:
-                return ModelConstants.MAX_AGGREGATION_COLUMNS;
-            case SUM:
-                return ModelConstants.SUM_AGGREGATION_COLUMNS;
-            case COUNT:
-                return ModelConstants.COUNT_AGGREGATION_COLUMNS;
-            case AVG:
-                return ModelConstants.AVG_AGGREGATION_COLUMNS;
-            default:
-                throw new RuntimeException("Aggregation type: " + aggregation + " is not supported!");
-        }
-    }
-
     @Override
-    public ListenableFuture<List<TsKvEntry>> findAllAsync(String entityType, UUID entityId, TsKvQuery query, long minPartition, long maxPartition) {
+    public ListenableFuture<List<TsKvEntry>> findAllAsync(String entityType, UUID entityId, TsKvQuery query) {
         if (query.getAggregation() == Aggregation.NONE) {
-            //TODO:
-            return null;
+            return findAllAsyncWithLimit(entityType, entityId, query);
         } else {
-            long step = Math.max((query.getEndTs() - query.getStartTs()) / query.getLimit(), minReadStep);
+            long step = Math.max((query.getEndTs() - query.getStartTs()) / query.getLimit(), minAggregationStepMs);
             long stepTs = query.getStartTs();
             List<ListenableFuture<Optional<TsKvEntry>>> futures = new ArrayList<>();
             while (stepTs < query.getEndTs()) {
@@ -143,23 +117,88 @@ public class BaseTimeseriesDao extends AbstractDao implements TimeseriesDao {
                 public List<TsKvEntry> apply(@Nullable List<Optional<TsKvEntry>> input) {
                     return input.stream().filter(v -> v.isPresent()).map(v -> v.get()).collect(Collectors.toList());
                 }
-            });
+            }, readResultsProcessingExecutor);
+        }
+    }
+
+    private ListenableFuture<List<TsKvEntry>> findAllAsyncWithLimit(String entityType, UUID entityId, TsKvQuery query) {
+        long minPartition = query.getStartTs();
+        long maxPartition = query.getEndTs();
+
+        ResultSetFuture partitionsFuture = fetchPartitions(entityType, entityId, query.getKey(), minPartition, maxPartition);
+
+        final SimpleListenableFuture<List<TsKvEntry>> resultFuture = new SimpleListenableFuture<>();
+        final ListenableFuture<List<Long>> partitionsListFuture = Futures.transform(partitionsFuture, getPartitionsArrayFunction(), readResultsProcessingExecutor);
+
+        Futures.addCallback(partitionsListFuture, new FutureCallback<List<Long>>() {
+            @Override
+            public void onSuccess(@Nullable List<Long> partitions) {
+                TsKvQueryCursor cursor = new TsKvQueryCursor(entityType, entityId, query, partitions);
+                findAllAsyncSequentiallyWithLimit(cursor, resultFuture);
+            }
+
+            @Override
+            public void onFailure(Throwable t) {
+                log.error("[{}][{}] Failed to fetch partitions for interval {}-{}", entityType, entityId, minPartition, maxPartition, t);
+            }
+        }, readResultsProcessingExecutor);
+
+        return resultFuture;
+    }
+
+    private void findAllAsyncSequentiallyWithLimit(final TsKvQueryCursor cursor, final SimpleListenableFuture<List<TsKvEntry>> resultFuture) {
+        if (cursor.isFull() || !cursor.hasNextPartition()) {
+            resultFuture.set(cursor.getData());
+        } else {
+            PreparedStatement proto = getFetchStmt(Aggregation.NONE);
+            BoundStatement stmt = proto.bind();
+            stmt.setString(0, cursor.getEntityType());
+            stmt.setUUID(1, cursor.getEntityId());
+            stmt.setString(2, cursor.getKey());
+            stmt.setLong(3, cursor.getNextPartition());
+            stmt.setLong(4, cursor.getStartTs());
+            stmt.setLong(5, cursor.getEndTs());
+            stmt.setInt(6, cursor.getCurrentLimit());
+
+            Futures.addCallback(executeAsyncRead(stmt), new FutureCallback<ResultSet>() {
+                @Override
+                public void onSuccess(@Nullable ResultSet result) {
+                    cursor.addData(convertResultToTsKvEntryList(result.all()));
+                    findAllAsyncSequentiallyWithLimit(cursor, resultFuture);
+                }
+
+                @Override
+                public void onFailure(Throwable t) {
+                    log.error("[{}][{}] Failed to fetch data for query {}-{}", stmt, t);
+                }
+            }, readResultsProcessingExecutor);
         }
     }
 
     private ListenableFuture<Optional<TsKvEntry>> findAndAggregateAsync(String entityType, UUID entityId, TsKvQuery query, long minPartition, long maxPartition) {
         final Aggregation aggregation = query.getAggregation();
+        final String key = query.getKey();
         final long startTs = query.getStartTs();
         final long endTs = query.getEndTs();
         final long ts = startTs + (endTs - startTs) / 2;
 
-        ResultSetFuture partitionsFuture = fetchPartitions(entityType, entityId, query.getKey(), minPartition, maxPartition);
-        com.google.common.base.Function<ResultSet, List<Long>> toArrayFunction = rows -> rows.all().stream()
-                .map(row -> row.getLong(ModelConstants.PARTITION_COLUMN)).collect(Collectors.toList());
+        ResultSetFuture partitionsFuture = fetchPartitions(entityType, entityId, key, minPartition, maxPartition);
+
+        ListenableFuture<List<Long>> partitionsListFuture = Futures.transform(partitionsFuture, getPartitionsArrayFunction(), readResultsProcessingExecutor);
 
-        ListenableFuture<List<Long>> partitionsListFuture = Futures.transform(partitionsFuture, toArrayFunction, readResultsProcessingExecutor);
+        ListenableFuture<List<ResultSet>> aggregationChunks = Futures.transform(partitionsListFuture,
+                getFetchChunksAsyncFunction(entityType, entityId, key, aggregation, startTs, endTs), readResultsProcessingExecutor);
 
-        AsyncFunction<List<Long>, List<ResultSet>> fetchChunksFunction = partitions -> {
+        return Futures.transform(aggregationChunks, new AggregatePartitionsFunction(aggregation, key, ts), readResultsProcessingExecutor);
+    }
+
+    private Function<ResultSet, List<Long>> getPartitionsArrayFunction() {
+        return rows -> rows.all().stream()
+                .map(row -> row.getLong(ModelConstants.PARTITION_COLUMN)).collect(Collectors.toList());
+    }
+
+    private AsyncFunction<List<Long>, List<ResultSet>> getFetchChunksAsyncFunction(String entityType, UUID entityId, String key, Aggregation aggregation, long startTs, long endTs) {
+        return partitions -> {
             try {
                 PreparedStatement proto = getFetchStmt(aggregation);
                 List<ResultSetFuture> futures = new ArrayList<>(partitions.size());
@@ -167,7 +206,7 @@ public class BaseTimeseriesDao extends AbstractDao implements TimeseriesDao {
                     BoundStatement stmt = proto.bind();
                     stmt.setString(0, entityType);
                     stmt.setUUID(1, entityId);
-                    stmt.setString(2, query.getKey());
+                    stmt.setString(2, key);
                     stmt.setLong(3, partition);
                     stmt.setLong(4, startTs);
                     stmt.setLong(5, endTs);
@@ -180,10 +219,6 @@ public class BaseTimeseriesDao extends AbstractDao implements TimeseriesDao {
                 throw e;
             }
         };
-
-        ListenableFuture<List<ResultSet>> aggregationChunks = Futures.transform(partitionsListFuture, fetchChunksFunction, readResultsProcessingExecutor);
-
-        return Futures.transform(aggregationChunks, new AggregatePartitionsFunction(aggregation, query.getKey(), ts), readResultsProcessingExecutor);
     }
 
     @Override
@@ -320,14 +355,21 @@ public class BaseTimeseriesDao extends AbstractDao implements TimeseriesDao {
         if (fetchStmts == null) {
             fetchStmts = new PreparedStatement[Aggregation.values().length];
             for (Aggregation type : Aggregation.values()) {
-                fetchStmts[type.ordinal()] = getSession().prepare("SELECT " +
-                        String.join(", ", getFetchColumnNames(type)) + " FROM " + ModelConstants.TS_KV_CF
-                        + " WHERE " + ModelConstants.ENTITY_TYPE_COLUMN + " = ? "
-                        + "AND " + ModelConstants.ENTITY_ID_COLUMN + " = ? "
-                        + "AND " + ModelConstants.KEY_COLUMN + " = ? "
-                        + "AND " + ModelConstants.PARTITION_COLUMN + " = ? "
-                        + "AND " + ModelConstants.TS_COLUMN + " > ? "
-                        + "AND " + ModelConstants.TS_COLUMN + " <= ?");
+                if (type == Aggregation.SUM && fetchStmts[Aggregation.AVG.ordinal()] != null) {
+                    fetchStmts[type.ordinal()] = fetchStmts[Aggregation.AVG.ordinal()];
+                } else if (type == Aggregation.AVG && fetchStmts[Aggregation.SUM.ordinal()] != null) {
+                    fetchStmts[type.ordinal()] = fetchStmts[Aggregation.SUM.ordinal()];
+                } else {
+                    fetchStmts[type.ordinal()] = getSession().prepare("SELECT " +
+                            String.join(", ", ModelConstants.getFetchColumnNames(type)) + " FROM " + ModelConstants.TS_KV_CF
+                            + " WHERE " + ModelConstants.ENTITY_TYPE_COLUMN + " = ? "
+                            + "AND " + ModelConstants.ENTITY_ID_COLUMN + " = ? "
+                            + "AND " + ModelConstants.KEY_COLUMN + " = ? "
+                            + "AND " + ModelConstants.PARTITION_COLUMN + " = ? "
+                            + "AND " + ModelConstants.TS_COLUMN + " > ? "
+                            + "AND " + ModelConstants.TS_COLUMN + " <= ?"
+                            + (type == Aggregation.NONE ? " ORDER BY " + ModelConstants.TS_COLUMN + " DESC LIMIT ?" : ""));
+                }
             }
         }
         return fetchStmts[aggType.ordinal()];
diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java
index a8b4ef5..1d8c3df 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java
@@ -1,12 +1,12 @@
 /**
  * Copyright © 2016-2017 The Thingsboard Authors
- * <p>
+ *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -59,7 +59,7 @@ public class BaseTimeseriesService implements TimeseriesService {
     public ListenableFuture<List<TsKvEntry>> findAll(String entityType, UUIDBased entityId, TsKvQuery query) {
         validate(entityType, entityId);
         validate(query);
-        return timeseriesDao.findAllAsync(entityType, entityId.getId(), query, timeseriesDao.toPartitionTs(query.getStartTs()), timeseriesDao.toPartitionTs(query.getEndTs()));
+        return timeseriesDao.findAllAsync(entityType, entityId.getId(), query);
     }
 
     @Override
@@ -132,7 +132,8 @@ public class BaseTimeseriesService implements TimeseriesService {
             throw new IncorrectParameterException("TsKvQuery can't be null");
         } else if (isBlank(query.getKey())) {
             throw new IncorrectParameterException("Incorrect TsKvQuery. Key can't be empty");
+        } else  if (query.getAggregation() == null){
+            throw new IncorrectParameterException("Incorrect TsKvQuery. Aggregation can't be empty");
         }
-        //TODO: add validation of all params
     }
 }
diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/SimpleListenableFuture.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/SimpleListenableFuture.java
new file mode 100644
index 0000000..e10a40d
--- /dev/null
+++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/SimpleListenableFuture.java
@@ -0,0 +1,33 @@
+/**
+ * Copyright © 2016-2017 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.dao.timeseries;
+
+import com.google.common.util.concurrent.AbstractFuture;
+
+/**
+ * Created by ashvayka on 21.02.17.
+ */
+public class SimpleListenableFuture<V> extends AbstractFuture<V> {
+
+    public SimpleListenableFuture() {
+
+    }
+
+    public boolean set(V value) {
+        return super.set(value);
+    }
+
+}
diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesDao.java
index 83b78da..7a6eed7 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesDao.java
@@ -33,7 +33,7 @@ public interface TimeseriesDao {
 
     long toPartitionTs(long ts);
 
-    ListenableFuture<List<TsKvEntry>> findAllAsync(String entityType, UUID entityId, TsKvQuery query, long minPartition, long maxPartition);
+    ListenableFuture<List<TsKvEntry>> findAllAsync(String entityType, UUID entityId, TsKvQuery query);
 
 //    List<TsKvEntry> find(String entityType, UUID entityId, TsKvQuery query, Optional<Long> minPartition, Optional<Long> maxPartition);
 
diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TsKvQueryCursor.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TsKvQueryCursor.java
new file mode 100644
index 0000000..cad1232
--- /dev/null
+++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TsKvQueryCursor.java
@@ -0,0 +1,82 @@
+/**
+ * Copyright © 2016-2017 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.dao.timeseries;
+
+import lombok.Data;
+import lombok.Getter;
+import org.thingsboard.server.common.data.kv.TsKvEntry;
+import org.thingsboard.server.common.data.kv.TsKvQuery;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+/**
+ * Created by ashvayka on 21.02.17.
+ */
+public class TsKvQueryCursor {
+    @Getter
+    private final String entityType;
+    @Getter
+    private final UUID entityId;
+    @Getter
+    private final String key;
+    @Getter
+    private final long startTs;
+    @Getter
+    private final long endTs;
+    private final List<Long> partitions;
+    @Getter
+    private final List<TsKvEntry> data;
+
+    private int partitionIndex;
+    private int currentLimit;
+
+    public TsKvQueryCursor(String entityType, UUID entityId, TsKvQuery baseQuery, List<Long> partitions) {
+        this.entityType = entityType;
+        this.entityId = entityId;
+        this.key = baseQuery.getKey();
+        this.startTs = baseQuery.getStartTs();
+        this.endTs = baseQuery.getEndTs();
+        this.partitions = partitions;
+        this.partitionIndex = partitions.size() - 1;
+        this.data = new ArrayList<>();
+        this.currentLimit = baseQuery.getLimit();
+    }
+
+    public boolean hasNextPartition() {
+        return partitionIndex >= 0;
+    }
+
+    public boolean isFull() {
+        return currentLimit <= 0;
+    }
+
+    public long getNextPartition() {
+        long partition = partitions.get(partitionIndex);
+        partitionIndex--;
+        return partition;
+    }
+
+    public int getCurrentLimit() {
+        return currentLimit;
+    }
+
+    public void addData(List<TsKvEntry> newData) {
+        currentLimit -= newData.size();
+        data.addAll(newData);
+    }
+}
diff --git a/dao/src/test/java/org/thingsboard/server/dao/DaoTestSuite.java b/dao/src/test/java/org/thingsboard/server/dao/DaoTestSuite.java
index ac48536..b150259 100644
--- a/dao/src/test/java/org/thingsboard/server/dao/DaoTestSuite.java
+++ b/dao/src/test/java/org/thingsboard/server/dao/DaoTestSuite.java
@@ -25,11 +25,11 @@ import java.util.Arrays;
 
 @RunWith(ClasspathSuite.class)
 @ClassnameFilters({
-//        "org.thingsboard.server.dao.service.*Test",
-//        "org.thingsboard.server.dao.kv.*Test",
-//        "org.thingsboard.server.dao.plugin.*Test",
-//        "org.thingsboard.server.dao.rule.*Test",
-//        "org.thingsboard.server.dao.attributes.*Test",
+        "org.thingsboard.server.dao.service.*Test",
+        "org.thingsboard.server.dao.kv.*Test",
+        "org.thingsboard.server.dao.plugin.*Test",
+        "org.thingsboard.server.dao.rule.*Test",
+        "org.thingsboard.server.dao.attributes.*Test",
         "org.thingsboard.server.dao.timeseries.*Test"
 })
 public class DaoTestSuite {
diff --git a/dao/src/test/java/org/thingsboard/server/dao/timeseries/TimeseriesServiceTest.java b/dao/src/test/java/org/thingsboard/server/dao/timeseries/TimeseriesServiceTest.java
index 51fce6f..13c25c3 100644
--- a/dao/src/test/java/org/thingsboard/server/dao/timeseries/TimeseriesServiceTest.java
+++ b/dao/src/test/java/org/thingsboard/server/dao/timeseries/TimeseriesServiceTest.java
@@ -51,8 +51,6 @@ public class TimeseriesServiceTest extends AbstractServiceTest {
     private static final String DOUBLE_KEY = "doubleKey";
     private static final String BOOLEAN_KEY = "booleanKey";
 
-    public static final int PARTITION_MINUTES = 1100;
-
     private static final long TS = 42L;
 
     KvEntry stringKvEntry = new StringDataEntry(STRING_KEY, "value");
@@ -103,49 +101,101 @@ public class TimeseriesServiceTest extends AbstractServiceTest {
     }
 
     @Test
-    public void testFindDeviceTsDataByQuery() throws Exception {
+    public void testFindDeviceTsData() throws Exception {
         DeviceId deviceId = new DeviceId(UUIDs.timeBased());
-        LocalDateTime localDateTime = LocalDateTime.now(ZoneOffset.UTC).minusMinutes(PARTITION_MINUTES);
-        log.debug("Start event time is {}", localDateTime);
-        List<TsKvEntry> entries = new ArrayList<>(PARTITION_MINUTES);
-
-        for (int i = 0; i < PARTITION_MINUTES; i++) {
-            long time = localDateTime.plusMinutes(i).toInstant(ZoneOffset.UTC).toEpochMilli();
-            BasicTsKvEntry tsKvEntry = new BasicTsKvEntry(time, stringKvEntry);
-            tsService.save(DataConstants.DEVICE, deviceId, tsKvEntry).get();
-            entries.add(tsKvEntry);
-        }
-        log.debug("Saved all records {}", localDateTime);
-        List<TsKvEntry> list = tsService.findAll(DataConstants.DEVICE, deviceId, new BaseTsKvQuery(STRING_KEY, entries.get(599).getTs(),
-                LocalDateTime.now(ZoneOffset.UTC).toInstant(ZoneOffset.UTC).toEpochMilli(), PARTITION_MINUTES - 599, Aggregation.MIN)).get();
-        log.debug("Fetched records {}", localDateTime);
-        List<TsKvEntry> expected = entries.subList(600, PARTITION_MINUTES);
-        assertEquals(expected.size(), list.size());
-        assertEquals(expected, list);
-    }
+        List<TsKvEntry> entries = new ArrayList<>();
+
+        entries.add(save(deviceId, 5000, 100));
+        entries.add(save(deviceId, 15000, 200));
+
+        entries.add(save(deviceId, 25000, 300));
+        entries.add(save(deviceId, 35000, 400));
+
+        entries.add(save(deviceId, 45000, 500));
+        entries.add(save(deviceId, 55000, 600));
+
+        List<TsKvEntry> list = tsService.findAll(DataConstants.DEVICE, deviceId, new BaseTsKvQuery(LONG_KEY, 0,
+                60000, 3, Aggregation.NONE)).get();
+        assertEquals(3, list.size());
+        assertEquals(55000, list.get(0).getTs());
+        assertEquals(java.util.Optional.of(600L), list.get(0).getLongValue());
+
+        assertEquals(45000, list.get(1).getTs());
+        assertEquals(java.util.Optional.of(500L), list.get(1).getLongValue());
+
+        assertEquals(35000, list.get(2).getTs());
+        assertEquals(java.util.Optional.of(400L), list.get(2).getLongValue());
+
+        list = tsService.findAll(DataConstants.DEVICE, deviceId, new BaseTsKvQuery(LONG_KEY, 0,
+                60000, 3, Aggregation.AVG)).get();
+        assertEquals(3, list.size());
+        assertEquals(10000, list.get(0).getTs());
+        assertEquals(java.util.Optional.of(150L), list.get(0).getLongValue());
+
+        assertEquals(30000, list.get(1).getTs());
+        assertEquals(java.util.Optional.of(350L), list.get(1).getLongValue());
+
+        assertEquals(50000, list.get(2).getTs());
+        assertEquals(java.util.Optional.of(550L), list.get(2).getLongValue());
+
+        list = tsService.findAll(DataConstants.DEVICE, deviceId, new BaseTsKvQuery(LONG_KEY, 0,
+                60000, 3, Aggregation.SUM)).get();
+
+        assertEquals(3, list.size());
+        assertEquals(10000, list.get(0).getTs());
+        assertEquals(java.util.Optional.of(300L), list.get(0).getLongValue());
+
+        assertEquals(30000, list.get(1).getTs());
+        assertEquals(java.util.Optional.of(700L), list.get(1).getLongValue());
 
-//    @Test
-//    public void testFindDeviceTsDataByQuery() throws Exception {
-//        DeviceId deviceId = new DeviceId(UUIDs.timeBased());
-//        LocalDateTime localDateTime = LocalDateTime.now(ZoneOffset.UTC).minusMinutes(PARTITION_MINUTES);
-//        log.debug("Start event time is {}", localDateTime);
-//        List<TsKvEntry> entries = new ArrayList<>(PARTITION_MINUTES);
-//
-//        for (int i = 0; i < PARTITION_MINUTES; i++) {
-//            long time = localDateTime.plusMinutes(i).toInstant(ZoneOffset.UTC).toEpochMilli();
-//            BasicTsKvEntry tsKvEntry = new BasicTsKvEntry(time, stringKvEntry);
-//            tsService.save(DataConstants.DEVICE, deviceId, tsKvEntry).get();
-//            entries.add(tsKvEntry);
-//        }
-//        log.debug("Saved all records {}", localDateTime);
-//        List<TsKvEntry> list = tsService.findAll(DataConstants.DEVICE, deviceId, new BaseTsKvQuery(STRING_KEY, entries.get(599).getTs(),
-//                LocalDateTime.now(ZoneOffset.UTC).toInstant(ZoneOffset.UTC).toEpochMilli(), PARTITION_MINUTES - 599, Aggregation.MIN)).get();
-//        log.debug("Fetched records {}", localDateTime);
-//        List<TsKvEntry> expected = entries.subList(600, PARTITION_MINUTES);
-//        assertEquals(expected.size(), list.size());
-//        assertEquals(expected, list);
-//    }
+        assertEquals(50000, list.get(2).getTs());
+        assertEquals(java.util.Optional.of(1100L), list.get(2).getLongValue());
 
+        list = tsService.findAll(DataConstants.DEVICE, deviceId, new BaseTsKvQuery(LONG_KEY, 0,
+                60000, 3, Aggregation.MIN)).get();
+
+        assertEquals(3, list.size());
+        assertEquals(10000, list.get(0).getTs());
+        assertEquals(java.util.Optional.of(100L), list.get(0).getLongValue());
+
+        assertEquals(30000, list.get(1).getTs());
+        assertEquals(java.util.Optional.of(300L), list.get(1).getLongValue());
+
+        assertEquals(50000, list.get(2).getTs());
+        assertEquals(java.util.Optional.of(500L), list.get(2).getLongValue());
+
+        list = tsService.findAll(DataConstants.DEVICE, deviceId, new BaseTsKvQuery(LONG_KEY, 0,
+                60000, 3, Aggregation.MAX)).get();
+
+        assertEquals(3, list.size());
+        assertEquals(10000, list.get(0).getTs());
+        assertEquals(java.util.Optional.of(200L), list.get(0).getLongValue());
+
+        assertEquals(30000, list.get(1).getTs());
+        assertEquals(java.util.Optional.of(400L), list.get(1).getLongValue());
+
+        assertEquals(50000, list.get(2).getTs());
+        assertEquals(java.util.Optional.of(600L), list.get(2).getLongValue());
+
+        list = tsService.findAll(DataConstants.DEVICE, deviceId, new BaseTsKvQuery(LONG_KEY, 0,
+                60000, 3, Aggregation.COUNT)).get();
+
+        assertEquals(3, list.size());
+        assertEquals(10000, list.get(0).getTs());
+        assertEquals(java.util.Optional.of(2L), list.get(0).getLongValue());
+
+        assertEquals(30000, list.get(1).getTs());
+        assertEquals(java.util.Optional.of(2L), list.get(1).getLongValue());
+
+        assertEquals(50000, list.get(2).getTs());
+        assertEquals(java.util.Optional.of(2L), list.get(2).getLongValue());
+    }
+
+    private TsKvEntry save(DeviceId deviceId, long ts, long value) throws Exception {
+        TsKvEntry entry = new BasicTsKvEntry(ts, new LongDataEntry(LONG_KEY, value));
+        tsService.save(DataConstants.DEVICE, deviceId, entry).get();
+        return entry;
+    }
 
     private void saveEntries(DeviceId deviceId, long ts) throws ExecutionException, InterruptedException {
         tsService.save(DataConstants.DEVICE, deviceId, toTsEntry(ts, stringKvEntry)).get();
diff --git a/dao/src/test/resources/cassandra-test.properties b/dao/src/test/resources/cassandra-test.properties
index 0a207e7..210d2c0 100644
--- a/dao/src/test/resources/cassandra-test.properties
+++ b/dao/src/test/resources/cassandra-test.properties
@@ -48,6 +48,4 @@ cassandra.query.ts_key_value_partitioning=HOURS
 
 cassandra.query.max_limit_per_request=1000
 
-cassandra.query.read_result_processing_threads=3
-
-cassandra.query.min_read_step=100
\ No newline at end of file
+cassandra.query.min_aggregation_step_ms=100
\ No newline at end of file