thingsboard-memoizeit

Merge pull request #1055 from dmytro-landiak/master OrderBy

9/10/2018 1:42:21 PM

Changes

Details

diff --git a/application/src/main/java/org/thingsboard/server/controller/DeviceController.java b/application/src/main/java/org/thingsboard/server/controller/DeviceController.java
index a6daeec..24608cb 100644
--- a/application/src/main/java/org/thingsboard/server/controller/DeviceController.java
+++ b/application/src/main/java/org/thingsboard/server/controller/DeviceController.java
@@ -76,7 +76,7 @@ public class DeviceController extends BaseController {
             device.setTenantId(getCurrentUser().getTenantId());
             if (getCurrentUser().getAuthority() == Authority.CUSTOMER_USER) {
                 if (device.getId() == null || device.getId().isNullUid() ||
-                    device.getCustomerId() == null || device.getCustomerId().isNullUid()) {
+                        device.getCustomerId() == null || device.getCustomerId().isNullUid()) {
                     throw new ThingsboardException("You don't have permission to perform this operation!",
                             ThingsboardErrorCode.PERMISSION_DENIED);
                 } else {
diff --git a/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java b/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java
index 10a16ec..7062ca9 100644
--- a/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java
+++ b/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java
@@ -49,15 +49,15 @@ import org.thingsboard.server.common.data.kv.Aggregation;
 import org.thingsboard.server.common.data.kv.AttributeKey;
 import org.thingsboard.server.common.data.kv.AttributeKvEntry;
 import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
-import org.thingsboard.server.common.data.kv.BaseTsKvQuery;
+import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery;
 import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
 import org.thingsboard.server.common.data.kv.BooleanDataEntry;
 import org.thingsboard.server.common.data.kv.DoubleDataEntry;
 import org.thingsboard.server.common.data.kv.KvEntry;
 import org.thingsboard.server.common.data.kv.LongDataEntry;
+import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
 import org.thingsboard.server.common.data.kv.StringDataEntry;
 import org.thingsboard.server.common.data.kv.TsKvEntry;
-import org.thingsboard.server.common.data.kv.TsKvQuery;
 import org.thingsboard.server.common.msg.cluster.SendToClusterMsg;
 import org.thingsboard.server.common.msg.core.TelemetryUploadRequest;
 import org.thingsboard.server.common.transport.adaptor.JsonConverter;
@@ -81,7 +81,6 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.UUID;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.stream.Collectors;
@@ -201,7 +200,7 @@ public class TelemetryController extends BaseController {
                 (result, entityId) -> {
                     // If interval is 0, convert this to a NONE aggregation, which is probably what the user really wanted
                     Aggregation agg = interval == 0L ? Aggregation.valueOf(Aggregation.NONE.name()) : Aggregation.valueOf(aggStr);
-                    List<TsKvQuery> queries = toKeysList(keys).stream().map(key -> new BaseTsKvQuery(key, startTs, endTs, interval, limit, agg))
+                    List<ReadTsKvQuery> queries = toKeysList(keys).stream().map(key -> new BaseReadTsKvQuery(key, startTs, endTs, interval, limit, agg))
                             .collect(Collectors.toList());
 
                     Futures.addCallback(tsService.findAll(entityId, queries), getTsKvListCallback(result));
diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java
index 5c85036..548c417 100644
--- a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java
+++ b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java
@@ -35,16 +35,16 @@ import org.thingsboard.server.common.data.id.EntityIdFactory;
 import org.thingsboard.server.common.data.id.TenantId;
 import org.thingsboard.server.common.data.kv.AttributeKvEntry;
 import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
-import org.thingsboard.server.common.data.kv.BaseTsKvQuery;
+import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery;
 import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
 import org.thingsboard.server.common.data.kv.BooleanDataEntry;
 import org.thingsboard.server.common.data.kv.DataType;
 import org.thingsboard.server.common.data.kv.DoubleDataEntry;
 import org.thingsboard.server.common.data.kv.KvEntry;
 import org.thingsboard.server.common.data.kv.LongDataEntry;
+import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
 import org.thingsboard.server.common.data.kv.StringDataEntry;
 import org.thingsboard.server.common.data.kv.TsKvEntry;
-import org.thingsboard.server.common.data.kv.TsKvQuery;
 import org.thingsboard.server.common.msg.cluster.SendToClusterMsg;
 import org.thingsboard.server.common.msg.cluster.ServerAddress;
 import org.thingsboard.server.dao.attributes.AttributesService;
@@ -370,9 +370,9 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio
                     e -> log.error("Failed to fetch missed updates.", e), tsCallBackExecutor);
         } else if (subscription.getType() == TelemetryFeature.TIMESERIES) {
             long curTs = System.currentTimeMillis();
-            List<TsKvQuery> queries = new ArrayList<>();
+            List<ReadTsKvQuery> queries = new ArrayList<>();
             subscription.getKeyStates().entrySet().forEach(e -> {
-                queries.add(new BaseTsKvQuery(e.getKey(), e.getValue() + 1L, curTs));
+                queries.add(new BaseReadTsKvQuery(e.getKey(), e.getValue() + 1L, curTs));
             });
 
             DonAsynchron.withCallback(tsService.findAll(entityId, queries),
diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetryWebSocketService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetryWebSocketService.java
index 3b2d690..2ff8e89 100644
--- a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetryWebSocketService.java
+++ b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetryWebSocketService.java
@@ -30,10 +30,10 @@ import org.thingsboard.server.common.data.id.EntityId;
 import org.thingsboard.server.common.data.id.EntityIdFactory;
 import org.thingsboard.server.common.data.kv.Aggregation;
 import org.thingsboard.server.common.data.kv.AttributeKvEntry;
-import org.thingsboard.server.common.data.kv.BaseTsKvQuery;
+import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery;
 import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
+import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
 import org.thingsboard.server.common.data.kv.TsKvEntry;
-import org.thingsboard.server.common.data.kv.TsKvQuery;
 import org.thingsboard.server.dao.attributes.AttributesService;
 import org.thingsboard.server.dao.timeseries.TimeseriesService;
 import org.thingsboard.server.service.security.AccessValidator;
@@ -251,7 +251,7 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi
         }
         EntityId entityId = EntityIdFactory.getByTypeAndId(cmd.getEntityType(), cmd.getEntityId());
         List<String> keys = new ArrayList<>(getKeys(cmd).orElse(Collections.emptySet()));
-        List<TsKvQuery> queries = keys.stream().map(key -> new BaseTsKvQuery(key, cmd.getStartTs(), cmd.getEndTs(), cmd.getInterval(), getLimit(cmd.getLimit()), getAggregation(cmd.getAgg())))
+        List<ReadTsKvQuery> queries = keys.stream().map(key -> new BaseReadTsKvQuery(key, cmd.getStartTs(), cmd.getEndTs(), cmd.getInterval(), getLimit(cmd.getLimit()), getAggregation(cmd.getAgg())))
                 .collect(Collectors.toList());
 
         FutureCallback<List<TsKvEntry>> callback = new FutureCallback<List<TsKvEntry>>() {
@@ -337,7 +337,7 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi
             log.debug("[{}] fetching timeseries data for last {} ms for keys: ({}) for device : {}", sessionId, cmd.getTimeWindow(), cmd.getKeys(), entityId);
             startTs = cmd.getStartTs();
             long endTs = cmd.getStartTs() + cmd.getTimeWindow();
-            List<TsKvQuery> queries = keys.stream().map(key -> new BaseTsKvQuery(key, startTs, endTs, cmd.getInterval(),
+            List<ReadTsKvQuery> queries = keys.stream().map(key -> new BaseReadTsKvQuery(key, startTs, endTs, cmd.getInterval(),
                     getLimit(cmd.getLimit()), getAggregation(cmd.getAgg()))).collect(Collectors.toList());
 
             final FutureCallback<List<TsKvEntry>> callback = getSubscriptionCallback(sessionRef, cmd, sessionId, entityId, startTs, keys);
diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/kv/BaseDeleteTsKvQuery.java b/common/data/src/main/java/org/thingsboard/server/common/data/kv/BaseDeleteTsKvQuery.java
new file mode 100644
index 0000000..11b95ca
--- /dev/null
+++ b/common/data/src/main/java/org/thingsboard/server/common/data/kv/BaseDeleteTsKvQuery.java
@@ -0,0 +1,35 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.common.data.kv;
+
+import lombok.Data;
+
+@Data
+public class BaseDeleteTsKvQuery extends BaseTsKvQuery implements DeleteTsKvQuery {
+
+    private final Boolean rewriteLatestIfDeleted;
+
+    public BaseDeleteTsKvQuery(String key, long startTs, long endTs, boolean rewriteLatestIfDeleted) {
+        super(key, startTs, endTs);
+        this.rewriteLatestIfDeleted = rewriteLatestIfDeleted;
+    }
+
+    public BaseDeleteTsKvQuery(String key, long startTs, long endTs) {
+        this(key, startTs, endTs, false);
+    }
+
+
+}
diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/kv/BaseReadTsKvQuery.java b/common/data/src/main/java/org/thingsboard/server/common/data/kv/BaseReadTsKvQuery.java
new file mode 100644
index 0000000..3c48adf
--- /dev/null
+++ b/common/data/src/main/java/org/thingsboard/server/common/data/kv/BaseReadTsKvQuery.java
@@ -0,0 +1,45 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.common.data.kv;
+
+import lombok.Data;
+
+@Data
+public class BaseReadTsKvQuery extends BaseTsKvQuery implements ReadTsKvQuery {
+
+    private final long interval;
+    private final int limit;
+    private final Aggregation aggregation;
+    private final String orderBy;
+
+    public BaseReadTsKvQuery(String key, long startTs, long endTs, long interval, int limit, Aggregation aggregation) {
+        this(key, startTs, endTs, interval, limit, aggregation, "DESC");
+    }
+
+    public BaseReadTsKvQuery(String key, long startTs, long endTs, long interval, int limit, Aggregation aggregation,
+                             String orderBy) {
+        super(key, startTs, endTs);
+        this.interval = interval;
+        this.limit = limit;
+        this.aggregation = aggregation;
+        this.orderBy = orderBy;
+    }
+
+    public BaseReadTsKvQuery(String key, long startTs, long endTs) {
+        this(key, startTs, endTs, endTs - startTs, 1, Aggregation.AVG, "DESC");
+    }
+
+}
diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/kv/BaseTsKvQuery.java b/common/data/src/main/java/org/thingsboard/server/common/data/kv/BaseTsKvQuery.java
index 0afe00b..c4fa69c 100644
--- a/common/data/src/main/java/org/thingsboard/server/common/data/kv/BaseTsKvQuery.java
+++ b/common/data/src/main/java/org/thingsboard/server/common/data/kv/BaseTsKvQuery.java
@@ -23,21 +23,11 @@ public class BaseTsKvQuery implements TsKvQuery {
     private final String key;
     private final long startTs;
     private final long endTs;
-    private final long interval;
-    private final int limit;
-    private final Aggregation aggregation;
 
-    public BaseTsKvQuery(String key, long startTs, long endTs, long interval, int limit, Aggregation aggregation) {
+    public BaseTsKvQuery(String key, long startTs, long endTs) {
         this.key = key;
         this.startTs = startTs;
         this.endTs = endTs;
-        this.interval = interval;
-        this.limit = limit;
-        this.aggregation = aggregation;
-    }
-
-    public BaseTsKvQuery(String key, long startTs, long endTs) {
-        this(key, startTs, endTs, endTs-startTs, 1, Aggregation.AVG);
     }
 
 }
diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/kv/DeleteTsKvQuery.java b/common/data/src/main/java/org/thingsboard/server/common/data/kv/DeleteTsKvQuery.java
new file mode 100644
index 0000000..7607a59
--- /dev/null
+++ b/common/data/src/main/java/org/thingsboard/server/common/data/kv/DeleteTsKvQuery.java
@@ -0,0 +1,22 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.common.data.kv;
+
+public interface DeleteTsKvQuery extends TsKvQuery {
+
+    Boolean getRewriteLatestIfDeleted();
+
+}
diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/kv/ReadTsKvQuery.java b/common/data/src/main/java/org/thingsboard/server/common/data/kv/ReadTsKvQuery.java
new file mode 100644
index 0000000..7d54745
--- /dev/null
+++ b/common/data/src/main/java/org/thingsboard/server/common/data/kv/ReadTsKvQuery.java
@@ -0,0 +1,28 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.common.data.kv;
+
+public interface ReadTsKvQuery extends TsKvQuery {
+
+    long getInterval();
+
+    int getLimit();
+
+    Aggregation getAggregation();
+
+    String getOrderBy();
+
+}
diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/kv/TsKvQuery.java b/common/data/src/main/java/org/thingsboard/server/common/data/kv/TsKvQuery.java
index ca9f90c..9df514e 100644
--- a/common/data/src/main/java/org/thingsboard/server/common/data/kv/TsKvQuery.java
+++ b/common/data/src/main/java/org/thingsboard/server/common/data/kv/TsKvQuery.java
@@ -23,10 +23,4 @@ public interface TsKvQuery {
 
     long getEndTs();
 
-    long getInterval();
-
-    int getLimit();
-
-    Aggregation getAggregation();
-
 }
diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/JpaTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/JpaTimeseriesDao.java
index e5f145f..1eb2f00 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/JpaTimeseriesDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/JpaTimeseriesDao.java
@@ -31,9 +31,10 @@ import org.thingsboard.server.common.data.UUIDConverter;
 import org.thingsboard.server.common.data.id.EntityId;
 import org.thingsboard.server.common.data.kv.Aggregation;
 import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
+import org.thingsboard.server.common.data.kv.DeleteTsKvQuery;
+import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
 import org.thingsboard.server.common.data.kv.StringDataEntry;
 import org.thingsboard.server.common.data.kv.TsKvEntry;
-import org.thingsboard.server.common.data.kv.TsKvQuery;
 import org.thingsboard.server.dao.DaoUtil;
 import org.thingsboard.server.dao.model.sql.TsKvEntity;
 import org.thingsboard.server.dao.model.sql.TsKvLatestCompositeKey;
@@ -102,7 +103,7 @@ public class JpaTimeseriesDao extends JpaAbstractDaoListeningExecutorService imp
     }
 
     @Override
-    public ListenableFuture<List<TsKvEntry>> findAllAsync(EntityId entityId, List<TsKvQuery> queries) {
+    public ListenableFuture<List<TsKvEntry>> findAllAsync(EntityId entityId, List<ReadTsKvQuery> queries) {
         List<ListenableFuture<List<TsKvEntry>>> futures = queries
                 .stream()
                 .map(query -> findAllAsync(entityId, query))
@@ -121,7 +122,7 @@ public class JpaTimeseriesDao extends JpaAbstractDaoListeningExecutorService imp
         }, service);
     }
 
-    private ListenableFuture<List<TsKvEntry>> findAllAsync(EntityId entityId, TsKvQuery query) {
+    private ListenableFuture<List<TsKvEntry>> findAllAsync(EntityId entityId, ReadTsKvQuery query) {
         if (query.getAggregation() == Aggregation.NONE) {
             return findAllAsyncWithLimit(entityId, query);
         } else {
@@ -228,7 +229,7 @@ public class JpaTimeseriesDao extends JpaAbstractDaoListeningExecutorService imp
         });
     }
 
-    private ListenableFuture<List<TsKvEntry>> findAllAsyncWithLimit(EntityId entityId, TsKvQuery query) {
+    private ListenableFuture<List<TsKvEntry>> findAllAsyncWithLimit(EntityId entityId, ReadTsKvQuery query) {
         return Futures.immediateFuture(
                 DaoUtil.convertDataList(
                         tsKvRepository.findAllWithLimit(
@@ -306,6 +307,36 @@ public class JpaTimeseriesDao extends JpaAbstractDaoListeningExecutorService imp
         });
     }
 
+    @Override
+    public ListenableFuture<Void> remove(EntityId entityId, DeleteTsKvQuery query) {
+        return service.submit(() -> {
+            tsKvRepository.delete(
+                    fromTimeUUID(entityId.getId()),
+                    entityId.getEntityType(),
+                    query.getKey(),
+                    query.getStartTs(),
+                    query.getEndTs());
+            return null;
+        });
+    }
+
+    @Override
+    public ListenableFuture<Void> removeLatest(EntityId entityId, DeleteTsKvQuery query) {
+        TsKvLatestEntity latestEntity = new TsKvLatestEntity();
+        latestEntity.setEntityType(entityId.getEntityType());
+        latestEntity.setEntityId(fromTimeUUID(entityId.getId()));
+        latestEntity.setKey(query.getKey());
+        return service.submit(() -> {
+            tsKvLatestRepository.delete(latestEntity);
+            return null;
+        });
+    }
+
+    @Override
+    public ListenableFuture<Void> removePartition(EntityId entityId, DeleteTsKvQuery query) {
+        return service.submit(() -> null);
+    }
+
     @PreDestroy
     void onDestroy() {
         if (insertService != null) {
diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/TsKvRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/TsKvRepository.java
index a1d1920..2b39d25 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/TsKvRepository.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/TsKvRepository.java
@@ -16,10 +16,12 @@
 package org.thingsboard.server.dao.sql.timeseries;
 
 import org.springframework.data.domain.Pageable;
+import org.springframework.data.jpa.repository.Modifying;
 import org.springframework.data.jpa.repository.Query;
 import org.springframework.data.repository.CrudRepository;
 import org.springframework.data.repository.query.Param;
 import org.springframework.scheduling.annotation.Async;
+import org.springframework.transaction.annotation.Transactional;
 import org.thingsboard.server.common.data.EntityType;
 import org.thingsboard.server.dao.model.sql.TsKvCompositeKey;
 import org.thingsboard.server.dao.model.sql.TsKvEntity;
@@ -41,6 +43,17 @@ public interface TsKvRepository extends CrudRepository<TsKvEntity, TsKvComposite
                                       @Param("endTs") long endTs,
                                       Pageable pageable);
 
+    @Transactional
+    @Modifying
+    @Query("DELETE FROM TsKvEntity tskv WHERE tskv.entityId = :entityId " +
+            "AND tskv.entityType = :entityType AND tskv.key = :entityKey " +
+            "AND tskv.ts > :startTs AND tskv.ts < :endTs")
+    void delete(@Param("entityId") String entityId,
+                @Param("entityType") EntityType entityType,
+                @Param("entityKey") String key,
+                @Param("startTs") long startTs,
+                @Param("endTs") long endTs);
+
     @Async
     @Query("SELECT new TsKvEntity(MAX(tskv.strValue), MAX(tskv.longValue), MAX(tskv.doubleValue)) FROM TsKvEntity tskv " +
             "WHERE tskv.entityId = :entityId AND tskv.entityType = :entityType " +
@@ -56,30 +69,30 @@ public interface TsKvRepository extends CrudRepository<TsKvEntity, TsKvComposite
             "WHERE tskv.entityId = :entityId AND tskv.entityType = :entityType " +
             "AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts < :endTs")
     CompletableFuture<TsKvEntity> findMin(@Param("entityId") String entityId,
-                       @Param("entityType") EntityType entityType,
-                       @Param("entityKey") String entityKey,
-                       @Param("startTs") long startTs,
-                       @Param("endTs") long endTs);
+                                          @Param("entityType") EntityType entityType,
+                                          @Param("entityKey") String entityKey,
+                                          @Param("startTs") long startTs,
+                                          @Param("endTs") long endTs);
 
     @Async
     @Query("SELECT new TsKvEntity(COUNT(tskv.booleanValue), COUNT(tskv.strValue), COUNT(tskv.longValue), COUNT(tskv.doubleValue)) FROM TsKvEntity tskv " +
             "WHERE tskv.entityId = :entityId AND tskv.entityType = :entityType " +
             "AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts < :endTs")
     CompletableFuture<TsKvEntity> findCount(@Param("entityId") String entityId,
-                                 @Param("entityType") EntityType entityType,
-                                 @Param("entityKey") String entityKey,
-                                 @Param("startTs") long startTs,
-                                 @Param("endTs") long endTs);
+                                            @Param("entityType") EntityType entityType,
+                                            @Param("entityKey") String entityKey,
+                                            @Param("startTs") long startTs,
+                                            @Param("endTs") long endTs);
 
     @Async
     @Query("SELECT new TsKvEntity(AVG(tskv.longValue), AVG(tskv.doubleValue)) FROM TsKvEntity tskv " +
             "WHERE tskv.entityId = :entityId AND tskv.entityType = :entityType " +
             "AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts < :endTs")
     CompletableFuture<TsKvEntity> findAvg(@Param("entityId") String entityId,
-                       @Param("entityType") EntityType entityType,
-                       @Param("entityKey") String entityKey,
-                       @Param("startTs") long startTs,
-                       @Param("endTs") long endTs);
+                                          @Param("entityType") EntityType entityType,
+                                          @Param("entityKey") String entityKey,
+                                          @Param("startTs") long startTs,
+                                          @Param("endTs") long endTs);
 
 
     @Async
@@ -87,8 +100,8 @@ public interface TsKvRepository extends CrudRepository<TsKvEntity, TsKvComposite
             "WHERE tskv.entityId = :entityId AND tskv.entityType = :entityType " +
             "AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts < :endTs")
     CompletableFuture<TsKvEntity> findSum(@Param("entityId") String entityId,
-                       @Param("entityType") EntityType entityType,
-                       @Param("entityKey") String entityKey,
-                       @Param("startTs") long startTs,
-                       @Param("endTs") long endTs);
+                                          @Param("entityType") EntityType entityType,
+                                          @Param("entityKey") String entityKey,
+                                          @Param("startTs") long startTs,
+                                          @Param("endTs") long endTs);
 }
diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java
index c981378..98e859f 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java
@@ -22,8 +22,9 @@ import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 import org.thingsboard.server.common.data.id.EntityId;
+import org.thingsboard.server.common.data.kv.DeleteTsKvQuery;
+import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
 import org.thingsboard.server.common.data.kv.TsKvEntry;
-import org.thingsboard.server.common.data.kv.TsKvQuery;
 import org.thingsboard.server.dao.exception.IncorrectParameterException;
 import org.thingsboard.server.dao.service.Validator;
 
@@ -40,14 +41,15 @@ import static org.apache.commons.lang3.StringUtils.isBlank;
 public class BaseTimeseriesService implements TimeseriesService {
 
     public static final int INSERTS_PER_ENTRY = 3;
+    public static final int DELETES_PER_ENTRY = INSERTS_PER_ENTRY;
 
     @Autowired
     private TimeseriesDao timeseriesDao;
 
     @Override
-    public ListenableFuture<List<TsKvEntry>> findAll(EntityId entityId, List<TsKvQuery> queries) {
+    public ListenableFuture<List<TsKvEntry>> findAll(EntityId entityId, List<ReadTsKvQuery> queries) {
         validate(entityId);
-        queries.forEach(query -> validate(query));
+        queries.forEach(BaseTimeseriesService::validate);
         return timeseriesDao.findAllAsync(entityId, queries);
     }
 
@@ -95,17 +97,42 @@ public class BaseTimeseriesService implements TimeseriesService {
         futures.add(timeseriesDao.save(entityId, tsKvEntry, ttl));
     }
 
+    @Override
+    public ListenableFuture<List<Void>> remove(EntityId entityId, List<DeleteTsKvQuery> deleteTsKvQueries) {
+        validate(entityId);
+        deleteTsKvQueries.forEach(BaseTimeseriesService::validate);
+        List<ListenableFuture<Void>> futures = Lists.newArrayListWithExpectedSize(deleteTsKvQueries.size() * DELETES_PER_ENTRY);
+        for (DeleteTsKvQuery tsKvQuery : deleteTsKvQueries) {
+            deleteAndRegisterFutures(futures, entityId, tsKvQuery);
+        }
+        return Futures.allAsList(futures);
+    }
+
+    private void deleteAndRegisterFutures(List<ListenableFuture<Void>> futures, EntityId entityId, DeleteTsKvQuery query) {
+        futures.add(timeseriesDao.remove(entityId, query));
+        futures.add(timeseriesDao.removeLatest(entityId, query));
+        futures.add(timeseriesDao.removePartition(entityId, query));
+    }
+
     private static void validate(EntityId entityId) {
         Validator.validateEntityId(entityId, "Incorrect entityId " + entityId);
     }
 
-    private static void validate(TsKvQuery query) {
+    private static void validate(ReadTsKvQuery query) {
         if (query == null) {
-            throw new IncorrectParameterException("TsKvQuery can't be null");
+            throw new IncorrectParameterException("ReadTsKvQuery can't be null");
         } else if (isBlank(query.getKey())) {
-            throw new IncorrectParameterException("Incorrect TsKvQuery. Key can't be empty");
+            throw new IncorrectParameterException("Incorrect ReadTsKvQuery. Key can't be empty");
         } else if (query.getAggregation() == null) {
-            throw new IncorrectParameterException("Incorrect TsKvQuery. Aggregation can't be empty");
+            throw new IncorrectParameterException("Incorrect ReadTsKvQuery. Aggregation can't be empty");
+        }
+    }
+
+    private static void validate(DeleteTsKvQuery query) {
+        if (query == null) {
+            throw new IncorrectParameterException("DeleteTsKvQuery can't be null");
+        } else if (isBlank(query.getKey())) {
+            throw new IncorrectParameterException("Incorrect DeleteTsKvQuery. Key can't be empty");
         }
     }
 }
diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java
index c025e64..03d569d 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java
@@ -20,6 +20,7 @@ import com.datastax.driver.core.PreparedStatement;
 import com.datastax.driver.core.ResultSet;
 import com.datastax.driver.core.ResultSetFuture;
 import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Statement;
 import com.datastax.driver.core.querybuilder.QueryBuilder;
 import com.datastax.driver.core.querybuilder.Select;
 import com.google.common.base.Function;
@@ -34,16 +35,17 @@ import org.springframework.core.env.Environment;
 import org.springframework.stereotype.Component;
 import org.thingsboard.server.common.data.id.EntityId;
 import org.thingsboard.server.common.data.kv.Aggregation;
-import org.thingsboard.server.common.data.kv.BaseTsKvQuery;
+import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery;
 import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
 import org.thingsboard.server.common.data.kv.BooleanDataEntry;
 import org.thingsboard.server.common.data.kv.DataType;
+import org.thingsboard.server.common.data.kv.DeleteTsKvQuery;
 import org.thingsboard.server.common.data.kv.DoubleDataEntry;
 import org.thingsboard.server.common.data.kv.KvEntry;
 import org.thingsboard.server.common.data.kv.LongDataEntry;
+import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
 import org.thingsboard.server.common.data.kv.StringDataEntry;
 import org.thingsboard.server.common.data.kv.TsKvEntry;
-import org.thingsboard.server.common.data.kv.TsKvQuery;
 import org.thingsboard.server.dao.model.ModelConstants;
 import org.thingsboard.server.dao.nosql.CassandraAbstractAsyncDao;
 import org.thingsboard.server.dao.util.NoSqlDao;
@@ -54,11 +56,11 @@ import javax.annotation.PreDestroy;
 import java.time.Instant;
 import java.time.LocalDateTime;
 import java.time.ZoneOffset;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
-import java.util.ArrayList;
 import java.util.Optional;
-import java.util.Collections;
 import java.util.stream.Collectors;
 
 import static com.datastax.driver.core.querybuilder.QueryBuilder.eq;
@@ -76,8 +78,8 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
     public static final String GENERATED_QUERY_FOR_ENTITY_TYPE_AND_ENTITY_ID = "Generated query [{}] for entityType {} and entityId {}";
     public static final String SELECT_PREFIX = "SELECT ";
     public static final String EQUALS_PARAM = " = ? ";
-
-
+    public static final String ASC_ORDER = "ASC";
+    public static final String DESC_ORDER = "DESC";
     private static List<Long> FIXED_PARTITION = Arrays.asList(new Long[]{0L});
 
     @Autowired
@@ -96,9 +98,12 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
     private PreparedStatement latestInsertStmt;
     private PreparedStatement[] saveStmts;
     private PreparedStatement[] saveTtlStmts;
-    private PreparedStatement[] fetchStmts;
+    private PreparedStatement[] fetchStmtsAsc;
+    private PreparedStatement[] fetchStmtsDesc;
     private PreparedStatement findLatestStmt;
     private PreparedStatement findAllLatestStmt;
+    private PreparedStatement deleteStmt;
+    private PreparedStatement deletePartitionStmt;
 
     private boolean isInstall() {
         return environment.acceptsProfiles("install");
@@ -108,7 +113,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
     public void init() {
         super.startExecutor();
         if (!isInstall()) {
-            getFetchStmt(Aggregation.NONE);
+            getFetchStmt(Aggregation.NONE, DESC_ORDER);
             Optional<TsPartitionDate> partition = TsPartitionDate.parse(partitioning);
             if (partition.isPresent()) {
                 tsFormat = partition.get();
@@ -125,7 +130,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
     }
 
     @Override
-    public ListenableFuture<List<TsKvEntry>> findAllAsync(EntityId entityId, List<TsKvQuery> queries) {
+    public ListenableFuture<List<TsKvEntry>> findAllAsync(EntityId entityId, List<ReadTsKvQuery> queries) {
         List<ListenableFuture<List<TsKvEntry>>> futures = queries.stream().map(query -> findAllAsync(entityId, query)).collect(Collectors.toList());
         return Futures.transform(Futures.allAsList(futures), new Function<List<List<TsKvEntry>>, List<TsKvEntry>>() {
             @Nullable
@@ -142,7 +147,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
     }
 
 
-    private ListenableFuture<List<TsKvEntry>> findAllAsync(EntityId entityId, TsKvQuery query) {
+    private ListenableFuture<List<TsKvEntry>> findAllAsync(EntityId entityId, ReadTsKvQuery query) {
         if (query.getAggregation() == Aggregation.NONE) {
             return findAllAsyncWithLimit(entityId, query);
         } else {
@@ -152,7 +157,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
             while (stepTs < query.getEndTs()) {
                 long startTs = stepTs;
                 long endTs = stepTs + step;
-                TsKvQuery subQuery = new BaseTsKvQuery(query.getKey(), startTs, endTs, step, 1, query.getAggregation());
+                ReadTsKvQuery subQuery = new BaseReadTsKvQuery(query.getKey(), startTs, endTs, step, 1, query.getAggregation(), query.getOrderBy());
                 futures.add(findAndAggregateAsync(entityId, subQuery, toPartitionTs(startTs), toPartitionTs(endTs)));
                 stepTs = endTs;
             }
@@ -171,7 +176,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
         return tsFormat.getTruncateUnit().equals(TsPartitionDate.EPOCH_START);
     }
 
-    private ListenableFuture<List<Long>> getPartitionsFuture(TsKvQuery query, EntityId entityId, long minPartition, long maxPartition) {
+    private ListenableFuture<List<Long>> getPartitionsFuture(ReadTsKvQuery query, EntityId entityId, long minPartition, long maxPartition) {
         if (isFixedPartitioning()) { //no need to fetch partitions from DB
             return Futures.immediateFuture(FIXED_PARTITION);
         }
@@ -179,11 +184,9 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
         return Futures.transform(partitionsFuture, getPartitionsArrayFunction(), readResultsProcessingExecutor);
     }
 
-    private ListenableFuture<List<TsKvEntry>> findAllAsyncWithLimit(EntityId entityId, TsKvQuery query) {
-
+    private ListenableFuture<List<TsKvEntry>> findAllAsyncWithLimit(EntityId entityId, ReadTsKvQuery query) {
         long minPartition = toPartitionTs(query.getStartTs());
         long maxPartition = toPartitionTs(query.getEndTs());
-
         final ListenableFuture<List<Long>> partitionsListFuture = getPartitionsFuture(query, entityId, minPartition, maxPartition);
         final SimpleListenableFuture<List<TsKvEntry>> resultFuture = new SimpleListenableFuture<>();
 
@@ -212,7 +215,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
         if (cursor.isFull() || !cursor.hasNextPartition()) {
             resultFuture.set(cursor.getData());
         } else {
-            PreparedStatement proto = getFetchStmt(Aggregation.NONE);
+            PreparedStatement proto = getFetchStmt(Aggregation.NONE, cursor.getOrderBy());
             BoundStatement stmt = proto.bind();
             stmt.setString(0, cursor.getEntityType());
             stmt.setUUID(1, cursor.getEntityId());
@@ -237,14 +240,12 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
         }
     }
 
-    private ListenableFuture<Optional<TsKvEntry>> findAndAggregateAsync(EntityId entityId, TsKvQuery query, long minPartition, long maxPartition) {
+    private ListenableFuture<Optional<TsKvEntry>> findAndAggregateAsync(EntityId entityId, ReadTsKvQuery query, long minPartition, long maxPartition) {
         final Aggregation aggregation = query.getAggregation();
         final String key = query.getKey();
         final long startTs = query.getStartTs();
         final long endTs = query.getEndTs();
         final long ts = startTs + (endTs - startTs) / 2;
-
-
         ListenableFuture<List<Long>> partitionsListFuture = getPartitionsFuture(query, entityId, minPartition, maxPartition);
         ListenableFuture<List<ResultSet>> aggregationChunks = Futures.transformAsync(partitionsListFuture,
                 getFetchChunksAsyncFunction(entityId, key, aggregation, startTs, endTs), readResultsProcessingExecutor);
@@ -260,7 +261,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
     private AsyncFunction<List<Long>, List<ResultSet>> getFetchChunksAsyncFunction(EntityId entityId, String key, Aggregation aggregation, long startTs, long endTs) {
         return partitions -> {
             try {
-                PreparedStatement proto = getFetchStmt(aggregation);
+                PreparedStatement proto = getFetchStmt(aggregation, DESC_ORDER);
                 List<ResultSetFuture> futures = new ArrayList<>(partitions.size());
                 for (Long partition : partitions) {
                     log.trace("Fetching data for partition [{}] for entityType {} and entityId {}", partition, entityId.getEntityType(), entityId.getId());
@@ -363,6 +364,204 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
         return getFuture(executeAsyncWrite(stmt), rs -> null);
     }
 
+    @Override
+    public ListenableFuture<Void> remove(EntityId entityId, DeleteTsKvQuery query) {
+        long minPartition = toPartitionTs(query.getStartTs());
+        long maxPartition = toPartitionTs(query.getEndTs());
+
+        ResultSetFuture partitionsFuture = fetchPartitions(entityId, query.getKey(), minPartition, maxPartition);
+
+        final SimpleListenableFuture<Void> resultFuture = new SimpleListenableFuture<>();
+        final ListenableFuture<List<Long>> partitionsListFuture = Futures.transform(partitionsFuture, getPartitionsArrayFunction(), readResultsProcessingExecutor);
+
+        Futures.addCallback(partitionsListFuture, new FutureCallback<List<Long>>() {
+            @Override
+            public void onSuccess(@Nullable List<Long> partitions) {
+                QueryCursor cursor = new QueryCursor(entityId.getEntityType().name(), entityId.getId(), query, partitions);
+                deleteAsync(cursor, resultFuture);
+            }
+
+            @Override
+            public void onFailure(Throwable t) {
+                log.error("[{}][{}] Failed to fetch partitions for interval {}-{}", entityId.getEntityType().name(), entityId.getId(), minPartition, maxPartition, t);
+            }
+        }, readResultsProcessingExecutor);
+        return resultFuture;
+    }
+
+    private void deleteAsync(final QueryCursor cursor, final SimpleListenableFuture<Void> resultFuture) {
+        if (!cursor.hasNextPartition()) {
+            resultFuture.set(null);
+        } else {
+            PreparedStatement proto = getDeleteStmt();
+            BoundStatement stmt = proto.bind();
+            stmt.setString(0, cursor.getEntityType());
+            stmt.setUUID(1, cursor.getEntityId());
+            stmt.setString(2, cursor.getKey());
+            stmt.setLong(3, cursor.getNextPartition());
+            stmt.setLong(4, cursor.getStartTs());
+            stmt.setLong(5, cursor.getEndTs());
+
+            Futures.addCallback(executeAsyncWrite(stmt), new FutureCallback<ResultSet>() {
+                @Override
+                public void onSuccess(@Nullable ResultSet result) {
+                    deleteAsync(cursor, resultFuture);
+                }
+
+                @Override
+                public void onFailure(Throwable t) {
+                    log.error("[{}][{}] Failed to delete data for query {}-{}", stmt, t);
+                }
+            }, readResultsProcessingExecutor);
+        }
+    }
+
+    private PreparedStatement getDeleteStmt() {
+        if (deleteStmt == null) {
+            deleteStmt = prepare("DELETE FROM " + ModelConstants.TS_KV_CF +
+                    " WHERE " + ModelConstants.ENTITY_TYPE_COLUMN + EQUALS_PARAM
+                    + "AND " + ModelConstants.ENTITY_ID_COLUMN + EQUALS_PARAM
+                    + "AND " + ModelConstants.KEY_COLUMN + EQUALS_PARAM
+                    + "AND " + ModelConstants.PARTITION_COLUMN + EQUALS_PARAM
+                    + "AND " + ModelConstants.TS_COLUMN + " > ? "
+                    + "AND " + ModelConstants.TS_COLUMN + " <= ?");
+        }
+        return deleteStmt;
+    }
+
+    @Override
+    public ListenableFuture<Void> removeLatest(EntityId entityId, DeleteTsKvQuery query) {
+        ListenableFuture<TsKvEntry> latestEntryFuture = findLatest(entityId, query.getKey());
+
+        ListenableFuture<Boolean> booleanFuture = Futures.transformAsync(latestEntryFuture, latestEntry -> {
+            long ts = latestEntry.getTs();
+            if (ts >= query.getStartTs() && ts <= query.getEndTs()) {
+                return Futures.immediateFuture(true);
+            } else {
+                log.trace("Won't be deleted latest value for [{}], key - {}", entityId, query.getKey());
+            }
+            return Futures.immediateFuture(false);
+        }, readResultsProcessingExecutor);
+
+        ListenableFuture<Void> removedLatestFuture = Futures.transformAsync(booleanFuture, isRemove -> {
+            if (isRemove) {
+                return deleteLatest(entityId, query.getKey());
+            }
+            return Futures.immediateFuture(null);
+        }, readResultsProcessingExecutor);
+
+        if (query.getRewriteLatestIfDeleted()) {
+            ListenableFuture<Void> savedLatestFuture = Futures.transformAsync(booleanFuture, isRemove -> {
+                if (isRemove) {
+                    return getNewLatestEntryFuture(entityId, query);
+                }
+                return Futures.immediateFuture(null);
+            }, readResultsProcessingExecutor);
+
+            return Futures.transformAsync(Futures.allAsList(Arrays.asList(savedLatestFuture, removedLatestFuture)),
+                    list -> Futures.immediateFuture(null), readResultsProcessingExecutor);
+        }
+        return removedLatestFuture;
+    }
+
+    private ListenableFuture<Void> getNewLatestEntryFuture(EntityId entityId, DeleteTsKvQuery query) {
+        long startTs = 0;
+        long endTs = query.getStartTs() - 1;
+        ReadTsKvQuery findNewLatestQuery = new BaseReadTsKvQuery(query.getKey(), startTs, endTs, endTs - startTs, 1,
+                Aggregation.NONE, DESC_ORDER);
+        ListenableFuture<List<TsKvEntry>> future = findAllAsync(entityId, findNewLatestQuery);
+
+        return Futures.transformAsync(future, entryList -> {
+            if (entryList.size() == 1) {
+                return saveLatest(entityId, entryList.get(0));
+            } else {
+                log.trace("Could not find new latest value for [{}], key - {}", entityId, query.getKey());
+            }
+            return Futures.immediateFuture(null);
+        }, readResultsProcessingExecutor);
+    }
+
+    private ListenableFuture<Void> deleteLatest(EntityId entityId, String key) {
+        Statement delete = QueryBuilder.delete().all().from(ModelConstants.TS_KV_LATEST_CF)
+                .where(eq(ModelConstants.ENTITY_TYPE_COLUMN, entityId.getEntityType()))
+                .and(eq(ModelConstants.ENTITY_ID_COLUMN, entityId.getId()))
+                .and(eq(ModelConstants.KEY_COLUMN, key));
+        log.debug("Remove request: {}", delete.toString());
+        return getFuture(executeAsyncWrite(delete), rs -> null);
+    }
+
+    @Override
+    public ListenableFuture<Void> removePartition(EntityId entityId, DeleteTsKvQuery query) {
+        long minPartition = toPartitionTs(query.getStartTs());
+        long maxPartition = toPartitionTs(query.getEndTs());
+        if (minPartition == maxPartition) {
+            return Futures.immediateFuture(null);
+        } else {
+            ResultSetFuture partitionsFuture = fetchPartitions(entityId, query.getKey(), minPartition, maxPartition);
+
+            final SimpleListenableFuture<Void> resultFuture = new SimpleListenableFuture<>();
+            final ListenableFuture<List<Long>> partitionsListFuture = Futures.transform(partitionsFuture, getPartitionsArrayFunction(), readResultsProcessingExecutor);
+
+            Futures.addCallback(partitionsListFuture, new FutureCallback<List<Long>>() {
+                @Override
+                public void onSuccess(@Nullable List<Long> partitions) {
+                    int index = 0;
+                    if (minPartition != query.getStartTs()) {
+                        index = 1;
+                    }
+                    List<Long> partitionsToDelete = new ArrayList<>();
+                    for (int i = index; i < partitions.size() - 1; i++) {
+                        partitionsToDelete.add(partitions.get(i));
+                    }
+                    QueryCursor cursor = new QueryCursor(entityId.getEntityType().name(), entityId.getId(), query, partitionsToDelete);
+                    deletePartitionAsync(cursor, resultFuture);
+                }
+
+                @Override
+                public void onFailure(Throwable t) {
+                    log.error("[{}][{}] Failed to fetch partitions for interval {}-{}", entityId.getEntityType().name(), entityId.getId(), minPartition, maxPartition, t);
+                }
+            }, readResultsProcessingExecutor);
+            return resultFuture;
+        }
+    }
+
+    private void deletePartitionAsync(final QueryCursor cursor, final SimpleListenableFuture<Void> resultFuture) {
+        if (!cursor.hasNextPartition()) {
+            resultFuture.set(null);
+        } else {
+            PreparedStatement proto = getDeletePartitionStmt();
+            BoundStatement stmt = proto.bind();
+            stmt.setString(0, cursor.getEntityType());
+            stmt.setUUID(1, cursor.getEntityId());
+            stmt.setLong(2, cursor.getNextPartition());
+            stmt.setString(3, cursor.getKey());
+
+            Futures.addCallback(executeAsyncWrite(stmt), new FutureCallback<ResultSet>() {
+                @Override
+                public void onSuccess(@Nullable ResultSet result) {
+                    deletePartitionAsync(cursor, resultFuture);
+                }
+
+                @Override
+                public void onFailure(Throwable t) {
+                    log.error("[{}][{}] Failed to delete data for query {}-{}", stmt, t);
+                }
+            }, readResultsProcessingExecutor);
+        }
+    }
+
+    private PreparedStatement getDeletePartitionStmt() {
+        if (deletePartitionStmt == null) {
+            deletePartitionStmt = prepare("DELETE FROM " + ModelConstants.TS_KV_PARTITIONS_CF +
+                    " WHERE " + ModelConstants.ENTITY_TYPE_COLUMN + EQUALS_PARAM
+                    + "AND " + ModelConstants.ENTITY_ID_COLUMN + EQUALS_PARAM
+                    + "AND " + ModelConstants.PARTITION_COLUMN + EQUALS_PARAM
+                    + "AND " + ModelConstants.KEY_COLUMN + EQUALS_PARAM);
+        }
+        return deletePartitionStmt;
+    }
+
     private List<TsKvEntry> convertResultToTsKvEntryList(List<Row> rows) {
         List<TsKvEntry> entries = new ArrayList<>(rows.size());
         if (!rows.isEmpty()) {
@@ -458,28 +657,43 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
         return saveTtlStmts[dataType.ordinal()];
     }
 
-    private PreparedStatement getFetchStmt(Aggregation aggType) {
-        if (fetchStmts == null) {
-            fetchStmts = new PreparedStatement[Aggregation.values().length];
-            for (Aggregation type : Aggregation.values()) {
-                if (type == Aggregation.SUM && fetchStmts[Aggregation.AVG.ordinal()] != null) {
-                    fetchStmts[type.ordinal()] = fetchStmts[Aggregation.AVG.ordinal()];
-                } else if (type == Aggregation.AVG && fetchStmts[Aggregation.SUM.ordinal()] != null) {
-                    fetchStmts[type.ordinal()] = fetchStmts[Aggregation.SUM.ordinal()];
-                } else {
-                    fetchStmts[type.ordinal()] = prepare(SELECT_PREFIX +
-                            String.join(", ", ModelConstants.getFetchColumnNames(type)) + " FROM " + ModelConstants.TS_KV_CF
-                            + " WHERE " + ModelConstants.ENTITY_TYPE_COLUMN + EQUALS_PARAM
-                            + "AND " + ModelConstants.ENTITY_ID_COLUMN + EQUALS_PARAM
-                            + "AND " + ModelConstants.KEY_COLUMN + EQUALS_PARAM
-                            + "AND " + ModelConstants.PARTITION_COLUMN + EQUALS_PARAM
-                            + "AND " + ModelConstants.TS_COLUMN + " > ? "
-                            + "AND " + ModelConstants.TS_COLUMN + " <= ?"
-                            + (type == Aggregation.NONE ? " ORDER BY " + ModelConstants.TS_COLUMN + " DESC LIMIT ?" : ""));
+    private PreparedStatement getFetchStmt(Aggregation aggType, String orderBy) {
+        switch (orderBy) {
+            case ASC_ORDER:
+                if (fetchStmtsAsc == null) {
+                    fetchStmtsAsc = initFetchStmt(orderBy);
+                }
+                return fetchStmtsAsc[aggType.ordinal()];
+            case DESC_ORDER:
+                if (fetchStmtsDesc == null) {
+                    fetchStmtsDesc = initFetchStmt(orderBy);
                 }
+                return fetchStmtsDesc[aggType.ordinal()];
+            default:
+                throw new RuntimeException("Not supported" + orderBy + "order!");
+        }
+    }
+
+    private PreparedStatement[] initFetchStmt(String orderBy) {
+        PreparedStatement[] fetchStmts = new PreparedStatement[Aggregation.values().length];
+        for (Aggregation type : Aggregation.values()) {
+            if (type == Aggregation.SUM && fetchStmts[Aggregation.AVG.ordinal()] != null) {
+                fetchStmts[type.ordinal()] = fetchStmts[Aggregation.AVG.ordinal()];
+            } else if (type == Aggregation.AVG && fetchStmts[Aggregation.SUM.ordinal()] != null) {
+                fetchStmts[type.ordinal()] = fetchStmts[Aggregation.SUM.ordinal()];
+            } else {
+                fetchStmts[type.ordinal()] = prepare(SELECT_PREFIX +
+                        String.join(", ", ModelConstants.getFetchColumnNames(type)) + " FROM " + ModelConstants.TS_KV_CF
+                        + " WHERE " + ModelConstants.ENTITY_TYPE_COLUMN + EQUALS_PARAM
+                        + "AND " + ModelConstants.ENTITY_ID_COLUMN + EQUALS_PARAM
+                        + "AND " + ModelConstants.KEY_COLUMN + EQUALS_PARAM
+                        + "AND " + ModelConstants.PARTITION_COLUMN + EQUALS_PARAM
+                        + "AND " + ModelConstants.TS_COLUMN + " > ? "
+                        + "AND " + ModelConstants.TS_COLUMN + " <= ?"
+                        + (type == Aggregation.NONE ? " ORDER BY " + ModelConstants.TS_COLUMN + " " + orderBy + " LIMIT ?" : ""));
             }
         }
-        return fetchStmts[aggType.ordinal()];
+        return fetchStmts;
     }
 
     private PreparedStatement getLatestStmt() {
diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/QueryCursor.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/QueryCursor.java
new file mode 100644
index 0000000..f210608
--- /dev/null
+++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/QueryCursor.java
@@ -0,0 +1,60 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.dao.timeseries;
+
+import lombok.Getter;
+import org.thingsboard.server.common.data.kv.TsKvQuery;
+
+import java.util.List;
+import java.util.UUID;
+
+public class QueryCursor {
+
+    @Getter
+    protected final String entityType;
+    @Getter
+    protected final UUID entityId;
+    @Getter
+    protected final String key;
+    @Getter
+    private final long startTs;
+    @Getter
+    private final long endTs;
+
+    final List<Long> partitions;
+    private int partitionIndex;
+
+    public QueryCursor(String entityType, UUID entityId, TsKvQuery baseQuery, List<Long> partitions) {
+        this.entityType = entityType;
+        this.entityId = entityId;
+        this.key = baseQuery.getKey();
+        this.startTs = baseQuery.getStartTs();
+        this.endTs = baseQuery.getEndTs();
+        this.partitions = partitions;
+        this.partitionIndex = partitions.size() - 1;
+    }
+
+    public boolean hasNextPartition() {
+        return partitionIndex >= 0;
+    }
+
+    public long getNextPartition() {
+        long partition = partitions.get(partitionIndex);
+        partitionIndex--;
+        return partition;
+    }
+
+}
diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesDao.java
index 1e3f4ce..a8cb547 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesDao.java
@@ -17,8 +17,9 @@ package org.thingsboard.server.dao.timeseries;
 
 import com.google.common.util.concurrent.ListenableFuture;
 import org.thingsboard.server.common.data.id.EntityId;
+import org.thingsboard.server.common.data.kv.DeleteTsKvQuery;
+import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
 import org.thingsboard.server.common.data.kv.TsKvEntry;
-import org.thingsboard.server.common.data.kv.TsKvQuery;
 
 import java.util.List;
 
@@ -27,7 +28,7 @@ import java.util.List;
  */
 public interface TimeseriesDao {
 
-    ListenableFuture<List<TsKvEntry>> findAllAsync(EntityId entityId, List<TsKvQuery> queries);
+    ListenableFuture<List<TsKvEntry>> findAllAsync(EntityId entityId, List<ReadTsKvQuery> queries);
 
     ListenableFuture<TsKvEntry> findLatest(EntityId entityId, String key);
 
@@ -38,4 +39,10 @@ public interface TimeseriesDao {
     ListenableFuture<Void> savePartition(EntityId entityId, long tsKvEntryTs, String key, long ttl);
 
     ListenableFuture<Void> saveLatest(EntityId entityId, TsKvEntry tsKvEntry);
+
+    ListenableFuture<Void> remove(EntityId entityId, DeleteTsKvQuery query);
+
+    ListenableFuture<Void> removeLatest(EntityId entityId, DeleteTsKvQuery query);
+
+    ListenableFuture<Void> removePartition(EntityId entityId, DeleteTsKvQuery query);
 }
diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java
index 2cd2d8d..c5076d2 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java
@@ -17,8 +17,9 @@ package org.thingsboard.server.dao.timeseries;
 
 import com.google.common.util.concurrent.ListenableFuture;
 import org.thingsboard.server.common.data.id.EntityId;
+import org.thingsboard.server.common.data.kv.DeleteTsKvQuery;
+import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
 import org.thingsboard.server.common.data.kv.TsKvEntry;
-import org.thingsboard.server.common.data.kv.TsKvQuery;
 
 import java.util.Collection;
 import java.util.List;
@@ -28,7 +29,7 @@ import java.util.List;
  */
 public interface TimeseriesService {
 
-    ListenableFuture<List<TsKvEntry>> findAll(EntityId entityId, List<TsKvQuery> queries);
+    ListenableFuture<List<TsKvEntry>> findAll(EntityId entityId, List<ReadTsKvQuery> queries);
 
     ListenableFuture<List<TsKvEntry>> findLatest(EntityId entityId, Collection<String> keys);
 
@@ -37,4 +38,6 @@ public interface TimeseriesService {
     ListenableFuture<List<Void>> save(EntityId entityId, TsKvEntry tsKvEntry);
 
     ListenableFuture<List<Void>> save(EntityId entityId, List<TsKvEntry> tsKvEntry, long ttl);
+
+    ListenableFuture<List<Void>> remove(EntityId entityId, List<DeleteTsKvQuery> queries);
 }
diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TsKvQueryCursor.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TsKvQueryCursor.java
index d6b6bbd..b56be56 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TsKvQueryCursor.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TsKvQueryCursor.java
@@ -16,57 +16,53 @@
 package org.thingsboard.server.dao.timeseries;
 
 import lombok.Getter;
+import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
 import org.thingsboard.server.common.data.kv.TsKvEntry;
-import org.thingsboard.server.common.data.kv.TsKvQuery;
 
 import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
 
+import static org.thingsboard.server.dao.timeseries.CassandraBaseTimeseriesDao.DESC_ORDER;
+
 /**
  * Created by ashvayka on 21.02.17.
  */
-public class TsKvQueryCursor {
-    @Getter
-    private final String entityType;
-    @Getter
-    private final UUID entityId;
-    @Getter
-    private final String key;
-    @Getter
-    private final long startTs;
-    @Getter
-    private final long endTs;
-    private final List<Long> partitions;
+public class TsKvQueryCursor extends QueryCursor {
+
     @Getter
     private final List<TsKvEntry> data;
+    @Getter
+    private String orderBy;
 
     private int partitionIndex;
     private int currentLimit;
 
-    public TsKvQueryCursor(String entityType, UUID entityId, TsKvQuery baseQuery, List<Long> partitions) {
-        this.entityType = entityType;
-        this.entityId = entityId;
-        this.key = baseQuery.getKey();
-        this.startTs = baseQuery.getStartTs();
-        this.endTs = baseQuery.getEndTs();
-        this.partitions = partitions;
-        this.partitionIndex = partitions.size() - 1;
+    public TsKvQueryCursor(String entityType, UUID entityId, ReadTsKvQuery baseQuery, List<Long> partitions) {
+        super(entityType, entityId, baseQuery, partitions);
+        this.orderBy = baseQuery.getOrderBy();
+        this.partitionIndex = isDesc() ? partitions.size() - 1 : 0;
         this.data = new ArrayList<>();
         this.currentLimit = baseQuery.getLimit();
     }
 
+    @Override
     public boolean hasNextPartition() {
-        return partitionIndex >= 0;
+        return isDesc() ? partitionIndex >= 0 : partitionIndex <= partitions.size() - 1;
     }
 
     public boolean isFull() {
         return currentLimit <= 0;
     }
 
+    @Override
     public long getNextPartition() {
         long partition = partitions.get(partitionIndex);
-        partitionIndex--;
+        if (isDesc()) {
+            partitionIndex--;
+        } else {
+            partitionIndex++;
+        }
         return partition;
     }
 
@@ -78,4 +74,8 @@ public class TsKvQueryCursor {
         currentLimit -= newData.size();
         data.addAll(newData);
     }
+
+    private boolean isDesc() {
+        return orderBy.equals(DESC_ORDER);
+    }
 }
diff --git a/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java b/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java
index f045bf1..4528d9a 100644
--- a/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java
+++ b/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java
@@ -21,7 +21,8 @@ import org.junit.Assert;
 import org.junit.Test;
 import org.thingsboard.server.common.data.id.DeviceId;
 import org.thingsboard.server.common.data.kv.Aggregation;
-import org.thingsboard.server.common.data.kv.BaseTsKvQuery;
+import org.thingsboard.server.common.data.kv.BaseDeleteTsKvQuery;
+import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery;
 import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
 import org.thingsboard.server.common.data.kv.BooleanDataEntry;
 import org.thingsboard.server.common.data.kv.DoubleDataEntry;
@@ -53,6 +54,7 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
     private static final String BOOLEAN_KEY = "booleanKey";
 
     private static final long TS = 42L;
+    private static final String DESC_ORDER = "DESC";
 
     KvEntry stringKvEntry = new StringDataEntry(STRING_KEY, "value");
     KvEntry longKvEntry = new LongDataEntry(LONG_KEY, Long.MAX_VALUE);
@@ -101,6 +103,26 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
     }
 
     @Test
+    public void testDeleteDeviceTsData() throws Exception {
+        DeviceId deviceId = new DeviceId(UUIDs.timeBased());
+
+        saveEntries(deviceId, 10000);
+        saveEntries(deviceId, 20000);
+        saveEntries(deviceId, 30000);
+        saveEntries(deviceId, 40000);
+
+        tsService.remove(deviceId, Collections.singletonList(
+                new BaseDeleteTsKvQuery(STRING_KEY, 15000, 45000))).get();
+
+        List<TsKvEntry> list = tsService.findAll(deviceId, Collections.singletonList(
+                new BaseReadTsKvQuery(STRING_KEY, 5000, 45000, 10000, 10, Aggregation.NONE))).get();
+        Assert.assertEquals(1, list.size());
+
+        List<TsKvEntry> latest = tsService.findLatest(deviceId, Collections.singletonList(STRING_KEY)).get();
+        Assert.assertEquals(null, latest.get(0).getValueAsString());
+    }
+
+    @Test
     public void testFindDeviceTsData() throws Exception {
         DeviceId deviceId = new DeviceId(UUIDs.timeBased());
         List<TsKvEntry> entries = new ArrayList<>();
@@ -114,7 +136,7 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
         entries.add(save(deviceId, 45000, 500));
         entries.add(save(deviceId, 55000, 600));
 
-        List<TsKvEntry> list = tsService.findAll(deviceId, Collections.singletonList(new BaseTsKvQuery(LONG_KEY, 0,
+        List<TsKvEntry> list = tsService.findAll(deviceId, Collections.singletonList(new BaseReadTsKvQuery(LONG_KEY, 0,
                 60000, 20000, 3, Aggregation.NONE))).get();
         assertEquals(3, list.size());
         assertEquals(55000, list.get(0).getTs());
@@ -126,7 +148,7 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
         assertEquals(35000, list.get(2).getTs());
         assertEquals(java.util.Optional.of(400L), list.get(2).getLongValue());
 
-        list = tsService.findAll(deviceId, Collections.singletonList(new BaseTsKvQuery(LONG_KEY, 0,
+        list = tsService.findAll(deviceId, Collections.singletonList(new BaseReadTsKvQuery(LONG_KEY, 0,
                 60000, 20000, 3, Aggregation.AVG))).get();
         assertEquals(3, list.size());
         assertEquals(10000, list.get(0).getTs());
@@ -138,7 +160,7 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
         assertEquals(50000, list.get(2).getTs());
         assertEquals(java.util.Optional.of(550L), list.get(2).getLongValue());
 
-        list = tsService.findAll(deviceId, Collections.singletonList(new BaseTsKvQuery(LONG_KEY, 0,
+        list = tsService.findAll(deviceId, Collections.singletonList(new BaseReadTsKvQuery(LONG_KEY, 0,
                 60000, 20000, 3, Aggregation.SUM))).get();
 
         assertEquals(3, list.size());
@@ -151,7 +173,7 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
         assertEquals(50000, list.get(2).getTs());
         assertEquals(java.util.Optional.of(1100L), list.get(2).getLongValue());
 
-        list = tsService.findAll(deviceId, Collections.singletonList(new BaseTsKvQuery(LONG_KEY, 0,
+        list = tsService.findAll(deviceId, Collections.singletonList(new BaseReadTsKvQuery(LONG_KEY, 0,
                 60000, 20000, 3, Aggregation.MIN))).get();
 
         assertEquals(3, list.size());
@@ -164,7 +186,7 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
         assertEquals(50000, list.get(2).getTs());
         assertEquals(java.util.Optional.of(500L), list.get(2).getLongValue());
 
-        list = tsService.findAll(deviceId, Collections.singletonList(new BaseTsKvQuery(LONG_KEY, 0,
+        list = tsService.findAll(deviceId, Collections.singletonList(new BaseReadTsKvQuery(LONG_KEY, 0,
                 60000, 20000, 3, Aggregation.MAX))).get();
 
         assertEquals(3, list.size());
@@ -177,7 +199,7 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
         assertEquals(50000, list.get(2).getTs());
         assertEquals(java.util.Optional.of(600L), list.get(2).getLongValue());
 
-        list = tsService.findAll(deviceId, Collections.singletonList(new BaseTsKvQuery(LONG_KEY, 0,
+        list = tsService.findAll(deviceId, Collections.singletonList(new BaseReadTsKvQuery(LONG_KEY, 0,
                 60000, 20000, 3, Aggregation.COUNT))).get();
 
         assertEquals(3, list.size());