thingsboard-aplcache

Details

diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/kv/BaseTsKvQuery.java b/common/data/src/main/java/org/thingsboard/server/common/data/kv/BaseTsKvQuery.java
index 0afe00b..51d4ad2 100644
--- a/common/data/src/main/java/org/thingsboard/server/common/data/kv/BaseTsKvQuery.java
+++ b/common/data/src/main/java/org/thingsboard/server/common/data/kv/BaseTsKvQuery.java
@@ -26,18 +26,20 @@ public class BaseTsKvQuery implements TsKvQuery {
     private final long interval;
     private final int limit;
     private final Aggregation aggregation;
+    private final String orderBy;
 
-    public BaseTsKvQuery(String key, long startTs, long endTs, long interval, int limit, Aggregation aggregation) {
+    public BaseTsKvQuery(String key, long startTs, long endTs, long interval, int limit, Aggregation aggregation, String orderBy) {
         this.key = key;
         this.startTs = startTs;
         this.endTs = endTs;
         this.interval = interval;
         this.limit = limit;
         this.aggregation = aggregation;
+        this.orderBy = orderBy;
     }
 
     public BaseTsKvQuery(String key, long startTs, long endTs) {
-        this(key, startTs, endTs, endTs-startTs, 1, Aggregation.AVG);
+        this(key, startTs, endTs, endTs - startTs, 1, Aggregation.AVG, "DESC");
     }
 
 }
diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/kv/TsKvQuery.java b/common/data/src/main/java/org/thingsboard/server/common/data/kv/TsKvQuery.java
index ca9f90c..9b907c3 100644
--- a/common/data/src/main/java/org/thingsboard/server/common/data/kv/TsKvQuery.java
+++ b/common/data/src/main/java/org/thingsboard/server/common/data/kv/TsKvQuery.java
@@ -29,4 +29,5 @@ public interface TsKvQuery {
 
     Aggregation getAggregation();
 
+    String getOrderBy();
 }
diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/JpaTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/JpaTimeseriesDao.java
index 6350352..5503f49 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/JpaTimeseriesDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/JpaTimeseriesDao.java
@@ -299,6 +299,18 @@ public class JpaTimeseriesDao extends JpaAbstractDaoListeningExecutorService imp
         });
     }
 
+    @Override
+    public ListenableFuture<Void> remove(EntityId entityId, TsKvQuery query) {
+        //TODO: implement
+        return null;
+    }
+
+    @Override
+    public ListenableFuture<Void> removeLatest(EntityId entityId, TsKvQuery query) {
+        //TODO: implement
+        return null;
+    }
+
     @PreDestroy
     void onDestroy() {
         if (insertService != null) {
diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java
index c981378..a075885 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java
@@ -40,6 +40,7 @@ import static org.apache.commons.lang3.StringUtils.isBlank;
 public class BaseTimeseriesService implements TimeseriesService {
 
     public static final int INSERTS_PER_ENTRY = 3;
+    public static final int DELETES_PER_ENTRY = 2;
 
     @Autowired
     private TimeseriesDao timeseriesDao;
@@ -95,6 +96,22 @@ public class BaseTimeseriesService implements TimeseriesService {
         futures.add(timeseriesDao.save(entityId, tsKvEntry, ttl));
     }
 
+    @Override
+    public ListenableFuture<List<Void>> remove(EntityId entityId, List<TsKvQuery> tsKvQueries) {
+        validate(entityId);
+        tsKvQueries.forEach(BaseTimeseriesService::validate);
+        List<ListenableFuture<Void>> futures = Lists.newArrayListWithExpectedSize(tsKvQueries.size() * DELETES_PER_ENTRY);
+        for (TsKvQuery tsKvQuery : tsKvQueries) {
+            deleteAndRegisterFutures(futures, entityId, tsKvQuery);
+        }
+        return Futures.allAsList(futures);
+    }
+
+    private void deleteAndRegisterFutures(List<ListenableFuture<Void>> futures, EntityId entityId, TsKvQuery query) {
+        futures.add(timeseriesDao.remove(entityId, query));
+        futures.add(timeseriesDao.removeLatest(entityId, query));
+    }
+
     private static void validate(EntityId entityId) {
         Validator.validateEntityId(entityId, "Incorrect entityId " + entityId);
     }
diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java
index cda4b16..7895b59 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java
@@ -62,6 +62,8 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
     public static final String GENERATED_QUERY_FOR_ENTITY_TYPE_AND_ENTITY_ID = "Generated query [{}] for entityType {} and entityId {}";
     public static final String SELECT_PREFIX = "SELECT ";
     public static final String EQUALS_PARAM = " = ? ";
+    public static final String ASC_ORDER = "ASC";
+    public static final String DESC_ORDER = "DESC";
 
     @Autowired
     private Environment environment;
@@ -76,7 +78,8 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
     private PreparedStatement latestInsertStmt;
     private PreparedStatement[] saveStmts;
     private PreparedStatement[] saveTtlStmts;
-    private PreparedStatement[] fetchStmts;
+    private PreparedStatement[] fetchStmtsAsc;
+    private PreparedStatement[] fetchStmtsDesc;
     private PreparedStatement findLatestStmt;
     private PreparedStatement findAllLatestStmt;
 
@@ -88,7 +91,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
     public void init() {
         super.startExecutor();
         if (!isInstall()) {
-            getFetchStmt(Aggregation.NONE);
+            getFetchStmt(Aggregation.NONE, DESC_ORDER);
             Optional<TsPartitionDate> partition = TsPartitionDate.parse(partitioning);
             if (partition.isPresent()) {
                 tsFormat = partition.get();
@@ -132,7 +135,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
             while (stepTs < query.getEndTs()) {
                 long startTs = stepTs;
                 long endTs = stepTs + step;
-                TsKvQuery subQuery = new BaseTsKvQuery(query.getKey(), startTs, endTs, step, 1, query.getAggregation());
+                TsKvQuery subQuery = new BaseTsKvQuery(query.getKey(), startTs, endTs, step, 1, query.getAggregation(), query.getOrderBy());
                 futures.add(findAndAggregateAsync(entityId, subQuery, toPartitionTs(startTs), toPartitionTs(endTs)));
                 stepTs = endTs;
             }
@@ -181,7 +184,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
         if (cursor.isFull() || !cursor.hasNextPartition()) {
             resultFuture.set(cursor.getData());
         } else {
-            PreparedStatement proto = getFetchStmt(Aggregation.NONE);
+            PreparedStatement proto = getFetchStmt(Aggregation.NONE, cursor.getOrderBy());
             BoundStatement stmt = proto.bind();
             stmt.setString(0, cursor.getEntityType());
             stmt.setUUID(1, cursor.getEntityId());
@@ -231,7 +234,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
     private AsyncFunction<List<Long>, List<ResultSet>> getFetchChunksAsyncFunction(EntityId entityId, String key, Aggregation aggregation, long startTs, long endTs) {
         return partitions -> {
             try {
-                PreparedStatement proto = getFetchStmt(aggregation);
+                PreparedStatement proto = getFetchStmt(aggregation, DESC_ORDER);
                 List<ResultSetFuture> futures = new ArrayList<>(partitions.size());
                 for (Long partition : partitions) {
                     log.trace("Fetching data for partition [{}] for entityType {} and entityId {}", partition, entityId.getEntityType(), entityId.getId());
@@ -318,6 +321,99 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
         return getFuture(executeAsyncWrite(stmt), rs -> null);
     }
 
+    @Override
+    public ListenableFuture<Void> remove(EntityId entityId, TsKvQuery query) {
+        long minPartition = toPartitionTs(query.getStartTs());
+        long maxPartition = toPartitionTs(query.getEndTs());
+
+        ResultSetFuture partitionsFuture = fetchPartitions(entityId, query.getKey(), minPartition, maxPartition);
+
+        final SimpleListenableFuture<Void> resultFuture = new SimpleListenableFuture<>();
+        final ListenableFuture<List<Long>> partitionsListFuture = Futures.transform(partitionsFuture, getPartitionsArrayFunction(), readResultsProcessingExecutor);
+
+        Futures.addCallback(partitionsListFuture, new FutureCallback<List<Long>>() {
+            @Override
+            public void onSuccess(@Nullable List<Long> partitions) {
+                TsKvQueryCursor cursor = new TsKvQueryCursor(entityId.getEntityType().name(), entityId.getId(), query, partitions);
+                deleteAsync(cursor, resultFuture);
+            }
+
+            @Override
+            public void onFailure(Throwable t) {
+                log.error("[{}][{}] Failed to fetch partitions for interval {}-{}", entityId.getEntityType().name(), entityId.getId(), minPartition, maxPartition, t);
+            }
+        }, readResultsProcessingExecutor);
+        return resultFuture;
+    }
+
+    private void deleteAsync(final TsKvQueryCursor cursor, final SimpleListenableFuture<Void> resultFuture) {
+        if (!cursor.hasNextPartition()) {
+            resultFuture.set(null);
+        } else {
+            PreparedStatement proto = getDeleteStmt();
+            BoundStatement stmt = proto.bind();
+            stmt.setString(0, cursor.getEntityType());
+            stmt.setUUID(1, cursor.getEntityId());
+            stmt.setString(2, cursor.getKey());
+            stmt.setLong(3, cursor.getNextPartition());
+            stmt.setLong(4, cursor.getStartTs());
+            stmt.setLong(5, cursor.getEndTs());
+
+            Futures.addCallback(executeAsyncWrite(stmt), new FutureCallback<ResultSet>() {
+                @Override
+                public void onSuccess(@Nullable ResultSet result) {
+                    deleteAsync(cursor, resultFuture);
+                }
+
+                @Override
+                public void onFailure(Throwable t) {
+                    log.error("[{}][{}] Failed to delete data for query {}-{}", stmt, t);
+                }
+            }, readResultsProcessingExecutor);
+        }
+    }
+
+    private PreparedStatement getDeleteStmt() {
+        return prepare("DELETE FROM " + ModelConstants.TS_KV_CF +
+                " WHERE " + ModelConstants.ENTITY_TYPE_COLUMN + EQUALS_PARAM
+                + "AND " + ModelConstants.ENTITY_ID_COLUMN + EQUALS_PARAM
+                + "AND " + ModelConstants.KEY_COLUMN + EQUALS_PARAM
+                + "AND " + ModelConstants.PARTITION_COLUMN + EQUALS_PARAM
+                + "AND " + ModelConstants.TS_COLUMN + " > ? "
+                + "AND " + ModelConstants.TS_COLUMN + " <= ?");
+    }
+
+    @Override
+    public ListenableFuture<Void> removeLatest(EntityId entityId, TsKvQuery query) {
+        ListenableFuture<TsKvEntry> future = findLatest(entityId, query.getKey());
+        return Futures.transform(future, new Function<TsKvEntry, Void>() {
+            @Nullable
+            @Override
+            public Void apply(@Nullable TsKvEntry latestEntry) {
+                if (latestEntry != null) {
+                    long ts = latestEntry.getTs();
+                    if (ts >= query.getStartTs() && ts <= query.getEndTs()) {
+                        deleteLatest(entityId, latestEntry.getKey());
+
+                        //TODO: save new latest entry(< query.getStartTs() - if present) to TS_KV_LATEST_CF
+                    } else {
+                        log.trace("Won't be deleted latest value for [{}], key - {}", entityId, query.getKey());
+                    }
+                }
+                return null;
+            }
+        });
+    }
+
+    private ListenableFuture<Void> deleteLatest(EntityId entityId, String key) {
+        Statement delete = QueryBuilder.delete().from(ModelConstants.TS_KV_LATEST_CF)
+                .where(eq(ModelConstants.ENTITY_TYPE_COLUMN, entityId.getEntityType()))
+                .and(eq(ModelConstants.ENTITY_ID_COLUMN, entityId.getId()))
+                .and(eq(ModelConstants.KEY_COLUMN, key));
+        log.debug("Remove request: {}", delete.toString());
+        return getFuture(executeAsyncWrite(delete), rs -> null);
+    }
+
     private List<TsKvEntry> convertResultToTsKvEntryList(List<Row> rows) {
         List<TsKvEntry> entries = new ArrayList<>(rows.size());
         if (!rows.isEmpty()) {
@@ -413,28 +509,43 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
         return saveTtlStmts[dataType.ordinal()];
     }
 
-    private PreparedStatement getFetchStmt(Aggregation aggType) {
-        if (fetchStmts == null) {
-            fetchStmts = new PreparedStatement[Aggregation.values().length];
-            for (Aggregation type : Aggregation.values()) {
-                if (type == Aggregation.SUM && fetchStmts[Aggregation.AVG.ordinal()] != null) {
-                    fetchStmts[type.ordinal()] = fetchStmts[Aggregation.AVG.ordinal()];
-                } else if (type == Aggregation.AVG && fetchStmts[Aggregation.SUM.ordinal()] != null) {
-                    fetchStmts[type.ordinal()] = fetchStmts[Aggregation.SUM.ordinal()];
-                } else {
-                    fetchStmts[type.ordinal()] = prepare(SELECT_PREFIX +
-                            String.join(", ", ModelConstants.getFetchColumnNames(type)) + " FROM " + ModelConstants.TS_KV_CF
-                            + " WHERE " + ModelConstants.ENTITY_TYPE_COLUMN + EQUALS_PARAM
-                            + "AND " + ModelConstants.ENTITY_ID_COLUMN + EQUALS_PARAM
-                            + "AND " + ModelConstants.KEY_COLUMN + EQUALS_PARAM
-                            + "AND " + ModelConstants.PARTITION_COLUMN + EQUALS_PARAM
-                            + "AND " + ModelConstants.TS_COLUMN + " > ? "
-                            + "AND " + ModelConstants.TS_COLUMN + " <= ?"
-                            + (type == Aggregation.NONE ? " ORDER BY " + ModelConstants.TS_COLUMN + " DESC LIMIT ?" : ""));
+    private PreparedStatement getFetchStmt(Aggregation aggType, String orderBy) {
+        switch (orderBy) {
+            case ASC_ORDER:
+                if (fetchStmtsAsc == null) {
+                    fetchStmtsAsc = initFetchStmt(orderBy);
+                }
+                return fetchStmtsAsc[aggType.ordinal()];
+            case DESC_ORDER:
+                if (fetchStmtsDesc == null) {
+                    fetchStmtsDesc = initFetchStmt(orderBy);
                 }
+                return fetchStmtsDesc[aggType.ordinal()];
+            default:
+                throw new RuntimeException("Not supported" + orderBy + "order!");
+        }
+    }
+
+    private PreparedStatement[] initFetchStmt(String orderBy) {
+        PreparedStatement[] fetchStmts = new PreparedStatement[Aggregation.values().length];
+        for (Aggregation type : Aggregation.values()) {
+            if (type == Aggregation.SUM && fetchStmts[Aggregation.AVG.ordinal()] != null) {
+                fetchStmts[type.ordinal()] = fetchStmts[Aggregation.AVG.ordinal()];
+            } else if (type == Aggregation.AVG && fetchStmts[Aggregation.SUM.ordinal()] != null) {
+                fetchStmts[type.ordinal()] = fetchStmts[Aggregation.SUM.ordinal()];
+            } else {
+                fetchStmts[type.ordinal()] = prepare(SELECT_PREFIX +
+                        String.join(", ", ModelConstants.getFetchColumnNames(type)) + " FROM " + ModelConstants.TS_KV_CF
+                        + " WHERE " + ModelConstants.ENTITY_TYPE_COLUMN + EQUALS_PARAM
+                        + "AND " + ModelConstants.ENTITY_ID_COLUMN + EQUALS_PARAM
+                        + "AND " + ModelConstants.KEY_COLUMN + EQUALS_PARAM
+                        + "AND " + ModelConstants.PARTITION_COLUMN + EQUALS_PARAM
+                        + "AND " + ModelConstants.TS_COLUMN + " > ? "
+                        + "AND " + ModelConstants.TS_COLUMN + " <= ?"
+                        + (type == Aggregation.NONE ? " ORDER BY " + ModelConstants.TS_COLUMN + " " + orderBy + " LIMIT ?" : ""));
             }
         }
-        return fetchStmts[aggType.ordinal()];
+        return fetchStmts;
     }
 
     private PreparedStatement getLatestStmt() {
diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesDao.java
index 1e3f4ce..22bb166 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesDao.java
@@ -38,4 +38,8 @@ public interface TimeseriesDao {
     ListenableFuture<Void> savePartition(EntityId entityId, long tsKvEntryTs, String key, long ttl);
 
     ListenableFuture<Void> saveLatest(EntityId entityId, TsKvEntry tsKvEntry);
+
+    ListenableFuture<Void> remove(EntityId entityId, TsKvQuery query);
+
+    ListenableFuture<Void> removeLatest(EntityId entityId, TsKvQuery query);
 }
diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java
index 2cd2d8d..a149191 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java
@@ -37,4 +37,6 @@ public interface TimeseriesService {
     ListenableFuture<List<Void>> save(EntityId entityId, TsKvEntry tsKvEntry);
 
     ListenableFuture<List<Void>> save(EntityId entityId, List<TsKvEntry> tsKvEntry, long ttl);
+
+    ListenableFuture<List<Void>> remove(EntityId entityId, List<TsKvQuery> queries);
 }
diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TsKvQueryCursor.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TsKvQueryCursor.java
index d6b6bbd..c4925ee 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TsKvQueryCursor.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TsKvQueryCursor.java
@@ -23,6 +23,8 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
 
+import static org.thingsboard.server.dao.timeseries.CassandraBaseTimeseriesDao.DESC_ORDER;
+
 /**
  * Created by ashvayka on 21.02.17.
  */
@@ -40,6 +42,8 @@ public class TsKvQueryCursor {
     private final List<Long> partitions;
     @Getter
     private final List<TsKvEntry> data;
+    @Getter
+    private String orderBy;
 
     private int partitionIndex;
     private int currentLimit;
@@ -51,13 +55,14 @@ public class TsKvQueryCursor {
         this.startTs = baseQuery.getStartTs();
         this.endTs = baseQuery.getEndTs();
         this.partitions = partitions;
-        this.partitionIndex = partitions.size() - 1;
+        this.orderBy = baseQuery.getOrderBy();
+        this.partitionIndex = isDesc() ? partitions.size() - 1 : 0;
         this.data = new ArrayList<>();
         this.currentLimit = baseQuery.getLimit();
     }
 
     public boolean hasNextPartition() {
-        return partitionIndex >= 0;
+        return isDesc() ? partitionIndex >= 0 : partitionIndex <= partitions.size() - 1;
     }
 
     public boolean isFull() {
@@ -66,7 +71,11 @@ public class TsKvQueryCursor {
 
     public long getNextPartition() {
         long partition = partitions.get(partitionIndex);
-        partitionIndex--;
+        if (isDesc()) {
+            partitionIndex--;
+        } else {
+            partitionIndex++;
+        }
         return partition;
     }
 
@@ -78,4 +87,8 @@ public class TsKvQueryCursor {
         currentLimit -= newData.size();
         data.addAll(newData);
     }
+
+    private boolean isDesc() {
+        return orderBy.equals(DESC_ORDER);
+    }
 }
diff --git a/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java b/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java
index 0cb3f7f..2130ab9 100644
--- a/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java
+++ b/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java
@@ -45,6 +45,7 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
     private static final String BOOLEAN_KEY = "booleanKey";
 
     private static final long TS = 42L;
+    private static final String DESC_ORDER = "DESC";
 
     KvEntry stringKvEntry = new StringDataEntry(STRING_KEY, "value");
     KvEntry longKvEntry = new LongDataEntry(LONG_KEY, Long.MAX_VALUE);
@@ -93,6 +94,24 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
     }
 
     @Test
+    public void testDeleteDeviceTsData() throws Exception {
+        DeviceId deviceId = new DeviceId(UUIDs.timeBased());
+
+        saveEntries(deviceId, TS - 3);
+        saveEntries(deviceId, TS - 2);
+        saveEntries(deviceId, TS - 1);
+        saveEntries(deviceId, TS);
+
+        tsService.remove(deviceId, Collections.singletonList(
+                new BaseTsKvQuery(STRING_KEY, TS - 4, TS - 2))).get();
+
+        List<TsKvEntry> list = tsService.findAll(deviceId, Collections.singletonList(
+                new BaseTsKvQuery(STRING_KEY, 0, 60000, 60000, 5, Aggregation.NONE, DESC_ORDER))).get();
+
+        Assert.assertEquals(2, list.size());
+    }
+
+    @Test
     public void testFindDeviceTsData() throws Exception {
         DeviceId deviceId = new DeviceId(UUIDs.timeBased());
         List<TsKvEntry> entries = new ArrayList<>();
@@ -107,7 +126,7 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
         entries.add(save(deviceId, 55000, 600));
 
         List<TsKvEntry> list = tsService.findAll(deviceId, Collections.singletonList(new BaseTsKvQuery(LONG_KEY, 0,
-                60000, 20000, 3, Aggregation.NONE))).get();
+                60000, 20000, 3, Aggregation.NONE, DESC_ORDER))).get();
         assertEquals(3, list.size());
         assertEquals(55000, list.get(0).getTs());
         assertEquals(java.util.Optional.of(600L), list.get(0).getLongValue());
@@ -119,7 +138,7 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
         assertEquals(java.util.Optional.of(400L), list.get(2).getLongValue());
 
         list = tsService.findAll(deviceId, Collections.singletonList(new BaseTsKvQuery(LONG_KEY, 0,
-                60000, 20000, 3, Aggregation.AVG))).get();
+                60000, 20000, 3, Aggregation.AVG, DESC_ORDER))).get();
         assertEquals(3, list.size());
         assertEquals(10000, list.get(0).getTs());
         assertEquals(java.util.Optional.of(150L), list.get(0).getLongValue());
@@ -131,7 +150,7 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
         assertEquals(java.util.Optional.of(550L), list.get(2).getLongValue());
 
         list = tsService.findAll(deviceId, Collections.singletonList(new BaseTsKvQuery(LONG_KEY, 0,
-                60000, 20000, 3, Aggregation.SUM))).get();
+                60000, 20000, 3, Aggregation.SUM, DESC_ORDER))).get();
 
         assertEquals(3, list.size());
         assertEquals(10000, list.get(0).getTs());
@@ -144,7 +163,7 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
         assertEquals(java.util.Optional.of(1100L), list.get(2).getLongValue());
 
         list = tsService.findAll(deviceId, Collections.singletonList(new BaseTsKvQuery(LONG_KEY, 0,
-                60000, 20000, 3, Aggregation.MIN))).get();
+                60000, 20000, 3, Aggregation.MIN, DESC_ORDER))).get();
 
         assertEquals(3, list.size());
         assertEquals(10000, list.get(0).getTs());
@@ -157,7 +176,7 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
         assertEquals(java.util.Optional.of(500L), list.get(2).getLongValue());
 
         list = tsService.findAll(deviceId, Collections.singletonList(new BaseTsKvQuery(LONG_KEY, 0,
-                60000, 20000, 3, Aggregation.MAX))).get();
+                60000, 20000, 3, Aggregation.MAX, DESC_ORDER))).get();
 
         assertEquals(3, list.size());
         assertEquals(10000, list.get(0).getTs());
@@ -170,7 +189,7 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
         assertEquals(java.util.Optional.of(600L), list.get(2).getLongValue());
 
         list = tsService.findAll(deviceId, Collections.singletonList(new BaseTsKvQuery(LONG_KEY, 0,
-                60000, 20000, 3, Aggregation.COUNT))).get();
+                60000, 20000, 3, Aggregation.COUNT, DESC_ORDER))).get();
 
         assertEquals(3, list.size());
         assertEquals(10000, list.get(0).getTs());
diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRestMsgHandler.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRestMsgHandler.java
index 3ea754a..0c7e387 100644
--- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRestMsgHandler.java
+++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRestMsgHandler.java
@@ -140,7 +140,7 @@ public class TelemetryRestMsgHandler extends DefaultRestMsgHandler {
             Aggregation agg = (interval.isPresent() && interval.get() == 0) ? Aggregation.valueOf(Aggregation.NONE.name()) :
                                                                               Aggregation.valueOf(request.getParameter("agg", Aggregation.NONE.name()));
 
-            List<TsKvQuery> queries = keys.stream().map(key -> new BaseTsKvQuery(key, startTs.get(), endTs.get(), interval.get(), limit.orElse(TelemetryWebsocketMsgHandler.DEFAULT_LIMIT), agg))
+            List<TsKvQuery> queries = keys.stream().map(key -> new BaseTsKvQuery(key, startTs.get(), endTs.get(), interval.get(), limit.orElse(TelemetryWebsocketMsgHandler.DEFAULT_LIMIT), agg, "DESC"))
                     .collect(Collectors.toList());
             ctx.loadTimeseries(entityId, queries, getTsKvListCallback(msg));
         } else {
diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryWebsocketMsgHandler.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryWebsocketMsgHandler.java
index 1374ef6..bf75c5d 100644
--- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryWebsocketMsgHandler.java
+++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryWebsocketMsgHandler.java
@@ -54,6 +54,7 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler {
     public static final String FAILED_TO_FETCH_DATA = "Failed to fetch data!";
     public static final String FAILED_TO_FETCH_ATTRIBUTES = "Failed to fetch attributes!";
     public static final String SESSION_META_DATA_NOT_FOUND = "Session meta-data not found!";
+    public static final String ORDER_BY = "DESC";
 
     private final SubscriptionManager subscriptionManager;
 
@@ -216,7 +217,7 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler {
             log.debug("[{}] fetching timeseries data for last {} ms for keys: ({}) for device : {}", sessionId, cmd.getTimeWindow(), cmd.getKeys(), entityId);
             startTs = cmd.getStartTs();
             long endTs = cmd.getStartTs() + cmd.getTimeWindow();
-            List<TsKvQuery> queries = keys.stream().map(key -> new BaseTsKvQuery(key, startTs, endTs, cmd.getInterval(), getLimit(cmd.getLimit()), getAggregation(cmd.getAgg()))).collect(Collectors.toList());
+            List<TsKvQuery> queries = keys.stream().map(key -> new BaseTsKvQuery(key, startTs, endTs, cmd.getInterval(), getLimit(cmd.getLimit()), getAggregation(cmd.getAgg()), ORDER_BY)).collect(Collectors.toList());
             ctx.loadTimeseries(entityId, queries, getSubscriptionCallback(sessionRef, cmd, sessionId, entityId, startTs, keys));
         } else {
             List<String> keys = new ArrayList<>(getKeys(cmd).orElse(Collections.emptySet()));
@@ -300,7 +301,7 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler {
         }
         EntityId entityId = EntityIdFactory.getByTypeAndId(cmd.getEntityType(), cmd.getEntityId());
         List<String> keys = new ArrayList<>(getKeys(cmd).orElse(Collections.emptySet()));
-        List<TsKvQuery> queries = keys.stream().map(key -> new BaseTsKvQuery(key, cmd.getStartTs(), cmd.getEndTs(), cmd.getInterval(), getLimit(cmd.getLimit()), getAggregation(cmd.getAgg())))
+        List<TsKvQuery> queries = keys.stream().map(key -> new BaseTsKvQuery(key, cmd.getStartTs(), cmd.getEndTs(), cmd.getInterval(), getLimit(cmd.getLimit()), getAggregation(cmd.getAgg()), ORDER_BY))
                 .collect(Collectors.toList());
         ctx.loadTimeseries(entityId, queries, new PluginCallback<List<TsKvEntry>>() {
             @Override