thingsboard-aplcache

Details

diff --git a/application/src/main/java/org/thingsboard/server/controller/DeviceController.java b/application/src/main/java/org/thingsboard/server/controller/DeviceController.java
index 856ca8e..79557f6 100644
--- a/application/src/main/java/org/thingsboard/server/controller/DeviceController.java
+++ b/application/src/main/java/org/thingsboard/server/controller/DeviceController.java
@@ -16,7 +16,6 @@
 package org.thingsboard.server.controller;
 
 import com.google.common.util.concurrent.ListenableFuture;
-import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.http.HttpStatus;
 import org.springframework.security.access.prepost.PreAuthorize;
 import org.springframework.web.bind.annotation.*;
@@ -29,23 +28,17 @@ import org.thingsboard.server.common.data.device.DeviceSearchQuery;
 import org.thingsboard.server.common.data.id.CustomerId;
 import org.thingsboard.server.common.data.id.DeviceId;
 import org.thingsboard.server.common.data.id.TenantId;
-import org.thingsboard.server.common.data.kv.Aggregation;
-import org.thingsboard.server.common.data.kv.BaseTsKvQuery;
-import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
-import org.thingsboard.server.common.data.kv.LongDataEntry;
 import org.thingsboard.server.common.data.page.TextPageData;
 import org.thingsboard.server.common.data.page.TextPageLink;
 import org.thingsboard.server.common.data.security.Authority;
 import org.thingsboard.server.common.data.security.DeviceCredentials;
 import org.thingsboard.server.dao.exception.IncorrectParameterException;
 import org.thingsboard.server.dao.model.ModelConstants;
-import org.thingsboard.server.dao.timeseries.TimeseriesService;
 import org.thingsboard.server.exception.ThingsboardErrorCode;
 import org.thingsboard.server.exception.ThingsboardException;
 import org.thingsboard.server.service.security.model.SecurityUser;
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 import java.util.stream.Collectors;
 
@@ -53,9 +46,6 @@ import java.util.stream.Collectors;
 @RequestMapping("/api")
 public class DeviceController extends BaseController {
 
-    @Autowired
-    protected TimeseriesService timeseriesService;
-
     public static final String DEVICE_ID = "deviceId";
 
     @PreAuthorize("hasAnyAuthority('TENANT_ADMIN', 'CUSTOMER_USER')")
@@ -377,50 +367,4 @@ public class DeviceController extends BaseController {
             throw handleException(e);
         }
     }
-
-    @PreAuthorize("hasAnyAuthority('TENANT_ADMIN')")
-    @RequestMapping(value = "/device/testSave", method = RequestMethod.GET)
-    @ResponseBody
-    public void testSave() throws ThingsboardException {
-        try {
-            SecurityUser user = getCurrentUser();
-            TenantId tenantId = user.getTenantId();
-
-            Device device = deviceService.findDeviceByTenantIdAndName(tenantId, "Test");
-
-            timeseriesService.save(device.getId(), new BasicTsKvEntry(1516892633000L,
-                    new LongDataEntry("test", 1L))).get();
-            timeseriesService.save(device.getId(), new BasicTsKvEntry(1519571033000L,
-                    new LongDataEntry("test", 2L))).get();
-            timeseriesService.save(device.getId(), new BasicTsKvEntry(1521990233000L,
-                    new LongDataEntry("test", 3L))).get();
-            timeseriesService.save(device.getId(), new BasicTsKvEntry(1524668633000L,
-                    new LongDataEntry("test", 4L))).get();
-            timeseriesService.save(device.getId(), new BasicTsKvEntry(1527260633000L,
-                    new LongDataEntry("test", 5L))).get();
-
-        } catch (Exception e) {
-            throw handleException(e);
-        }
-    }
-
-    @PreAuthorize("hasAnyAuthority('TENANT_ADMIN')")
-    @RequestMapping(value = "/device/testDelete", method = RequestMethod.GET)
-    @ResponseBody
-    public void testDelete() throws ThingsboardException {
-        try {
-            SecurityUser user = getCurrentUser();
-            TenantId tenantId = user.getTenantId();
-
-            Device device = deviceService.findDeviceByTenantIdAndName(tenantId, "Test");
-
-            long startTs = 1519561033000L;
-            long endTs = 1528260633000L;
-            timeseriesService.remove(device.getId(), Collections.singletonList(new BaseTsKvQuery("test",
-                    startTs, endTs, endTs - startTs, 0, Aggregation.NONE, "DESC", true))).get();
-
-        } catch (Exception e) {
-            throw handleException(e);
-        }
-    }
 }
diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/JpaTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/JpaTimeseriesDao.java
index bef82c3..7915e84 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/JpaTimeseriesDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/JpaTimeseriesDao.java
@@ -300,14 +300,27 @@ public class JpaTimeseriesDao extends JpaAbstractDaoListeningExecutorService imp
 
     @Override
     public ListenableFuture<Void> remove(EntityId entityId, TsKvQuery query) {
-        //TODO: implement
-        return null;
+        return insertService.submit(() -> {
+            tsKvRepository.delete(
+                    fromTimeUUID(entityId.getId()),
+                    entityId.getEntityType(),
+                    query.getKey(),
+                    query.getStartTs(),
+                    query.getEndTs());
+            return null;
+        });
     }
 
     @Override
     public ListenableFuture<Void> removeLatest(EntityId entityId, TsKvQuery query) {
-        //TODO: implement
-        return null;
+        TsKvLatestEntity latestEntity = new TsKvLatestEntity();
+        latestEntity.setEntityType(entityId.getEntityType());
+        latestEntity.setEntityId(fromTimeUUID(entityId.getId()));
+        latestEntity.setKey(query.getKey());
+        return insertService.submit(() -> {
+            tsKvLatestRepository.delete(latestEntity);
+            return null;
+        });
     }
 
     @Override
diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/TsKvRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/TsKvRepository.java
index a1d1920..2b39d25 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/TsKvRepository.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/TsKvRepository.java
@@ -16,10 +16,12 @@
 package org.thingsboard.server.dao.sql.timeseries;
 
 import org.springframework.data.domain.Pageable;
+import org.springframework.data.jpa.repository.Modifying;
 import org.springframework.data.jpa.repository.Query;
 import org.springframework.data.repository.CrudRepository;
 import org.springframework.data.repository.query.Param;
 import org.springframework.scheduling.annotation.Async;
+import org.springframework.transaction.annotation.Transactional;
 import org.thingsboard.server.common.data.EntityType;
 import org.thingsboard.server.dao.model.sql.TsKvCompositeKey;
 import org.thingsboard.server.dao.model.sql.TsKvEntity;
@@ -41,6 +43,17 @@ public interface TsKvRepository extends CrudRepository<TsKvEntity, TsKvComposite
                                       @Param("endTs") long endTs,
                                       Pageable pageable);
 
+    @Transactional
+    @Modifying
+    @Query("DELETE FROM TsKvEntity tskv WHERE tskv.entityId = :entityId " +
+            "AND tskv.entityType = :entityType AND tskv.key = :entityKey " +
+            "AND tskv.ts > :startTs AND tskv.ts < :endTs")
+    void delete(@Param("entityId") String entityId,
+                @Param("entityType") EntityType entityType,
+                @Param("entityKey") String key,
+                @Param("startTs") long startTs,
+                @Param("endTs") long endTs);
+
     @Async
     @Query("SELECT new TsKvEntity(MAX(tskv.strValue), MAX(tskv.longValue), MAX(tskv.doubleValue)) FROM TsKvEntity tskv " +
             "WHERE tskv.entityId = :entityId AND tskv.entityType = :entityType " +
@@ -56,30 +69,30 @@ public interface TsKvRepository extends CrudRepository<TsKvEntity, TsKvComposite
             "WHERE tskv.entityId = :entityId AND tskv.entityType = :entityType " +
             "AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts < :endTs")
     CompletableFuture<TsKvEntity> findMin(@Param("entityId") String entityId,
-                       @Param("entityType") EntityType entityType,
-                       @Param("entityKey") String entityKey,
-                       @Param("startTs") long startTs,
-                       @Param("endTs") long endTs);
+                                          @Param("entityType") EntityType entityType,
+                                          @Param("entityKey") String entityKey,
+                                          @Param("startTs") long startTs,
+                                          @Param("endTs") long endTs);
 
     @Async
     @Query("SELECT new TsKvEntity(COUNT(tskv.booleanValue), COUNT(tskv.strValue), COUNT(tskv.longValue), COUNT(tskv.doubleValue)) FROM TsKvEntity tskv " +
             "WHERE tskv.entityId = :entityId AND tskv.entityType = :entityType " +
             "AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts < :endTs")
     CompletableFuture<TsKvEntity> findCount(@Param("entityId") String entityId,
-                                 @Param("entityType") EntityType entityType,
-                                 @Param("entityKey") String entityKey,
-                                 @Param("startTs") long startTs,
-                                 @Param("endTs") long endTs);
+                                            @Param("entityType") EntityType entityType,
+                                            @Param("entityKey") String entityKey,
+                                            @Param("startTs") long startTs,
+                                            @Param("endTs") long endTs);
 
     @Async
     @Query("SELECT new TsKvEntity(AVG(tskv.longValue), AVG(tskv.doubleValue)) FROM TsKvEntity tskv " +
             "WHERE tskv.entityId = :entityId AND tskv.entityType = :entityType " +
             "AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts < :endTs")
     CompletableFuture<TsKvEntity> findAvg(@Param("entityId") String entityId,
-                       @Param("entityType") EntityType entityType,
-                       @Param("entityKey") String entityKey,
-                       @Param("startTs") long startTs,
-                       @Param("endTs") long endTs);
+                                          @Param("entityType") EntityType entityType,
+                                          @Param("entityKey") String entityKey,
+                                          @Param("startTs") long startTs,
+                                          @Param("endTs") long endTs);
 
 
     @Async
@@ -87,8 +100,8 @@ public interface TsKvRepository extends CrudRepository<TsKvEntity, TsKvComposite
             "WHERE tskv.entityId = :entityId AND tskv.entityType = :entityType " +
             "AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts < :endTs")
     CompletableFuture<TsKvEntity> findSum(@Param("entityId") String entityId,
-                       @Param("entityType") EntityType entityType,
-                       @Param("entityKey") String entityKey,
-                       @Param("startTs") long startTs,
-                       @Param("endTs") long endTs);
+                                          @Param("entityType") EntityType entityType,
+                                          @Param("entityKey") String entityKey,
+                                          @Param("startTs") long startTs,
+                                          @Param("endTs") long endTs);
 }
diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java
index 3e4e2bd..db61088 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java
@@ -441,7 +441,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
     }
 
     private ListenableFuture<Void> deleteLatest(EntityId entityId, String key) {
-        Statement delete = QueryBuilder.delete().from(ModelConstants.TS_KV_LATEST_CF)
+        Statement delete = QueryBuilder.delete().all().from(ModelConstants.TS_KV_LATEST_CF)
                 .where(eq(ModelConstants.ENTITY_TYPE_COLUMN, entityId.getEntityType()))
                 .and(eq(ModelConstants.ENTITY_ID_COLUMN, entityId.getId()))
                 .and(eq(ModelConstants.KEY_COLUMN, key));
@@ -453,25 +453,36 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
     public ListenableFuture<Void> removePartition(EntityId entityId, TsKvQuery query) {
         long minPartition = toPartitionTs(query.getStartTs());
         long maxPartition = toPartitionTs(query.getEndTs());
+        if (minPartition == maxPartition) {
+            return Futures.immediateFuture(null);
+        } else {
+            ResultSetFuture partitionsFuture = fetchPartitions(entityId, query.getKey(), minPartition, maxPartition);
 
-        ResultSetFuture partitionsFuture = fetchPartitions(entityId, query.getKey(), minPartition, maxPartition);
+            final SimpleListenableFuture<Void> resultFuture = new SimpleListenableFuture<>();
+            final ListenableFuture<List<Long>> partitionsListFuture = Futures.transform(partitionsFuture, getPartitionsArrayFunction(), readResultsProcessingExecutor);
 
-        final SimpleListenableFuture<Void> resultFuture = new SimpleListenableFuture<>();
-        final ListenableFuture<List<Long>> partitionsListFuture = Futures.transform(partitionsFuture, getPartitionsArrayFunction(), readResultsProcessingExecutor);
-
-        Futures.addCallback(partitionsListFuture, new FutureCallback<List<Long>>() {
-            @Override
-            public void onSuccess(@Nullable List<Long> partitions) {
-                TsKvQueryCursor cursor = new TsKvQueryCursor(entityId.getEntityType().name(), entityId.getId(), query, partitions);
-                deletePartitionAsync(cursor, resultFuture);
-            }
+            Futures.addCallback(partitionsListFuture, new FutureCallback<List<Long>>() {
+                @Override
+                public void onSuccess(@Nullable List<Long> partitions) {
+                    int index = 0;
+                    if (minPartition != query.getStartTs()) {
+                        index = 1;
+                    }
+                    List<Long> partitionsToDelete = new ArrayList<>();
+                    for (int i = index; i < partitions.size() - 1; i++) {
+                        partitionsToDelete.add(partitions.get(i));
+                    }
+                    TsKvQueryCursor cursor = new TsKvQueryCursor(entityId.getEntityType().name(), entityId.getId(), query, partitionsToDelete);
+                    deletePartitionAsync(cursor, resultFuture);
+                }
 
-            @Override
-            public void onFailure(Throwable t) {
-                log.error("[{}][{}] Failed to fetch partitions for interval {}-{}", entityId.getEntityType().name(), entityId.getId(), minPartition, maxPartition, t);
-            }
-        }, readResultsProcessingExecutor);
-        return resultFuture;
+                @Override
+                public void onFailure(Throwable t) {
+                    log.error("[{}][{}] Failed to fetch partitions for interval {}-{}", entityId.getEntityType().name(), entityId.getId(), minPartition, maxPartition, t);
+                }
+            }, readResultsProcessingExecutor);
+            return resultFuture;
+        }
     }
 
     private void deletePartitionAsync(final TsKvQueryCursor cursor, final SimpleListenableFuture<Void> resultFuture) {
diff --git a/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java b/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java
index a8d022b..b3a742c 100644
--- a/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java
+++ b/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java
@@ -93,24 +93,27 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
         Assert.assertEquals(toTsEntry(TS, stringKvEntry), entries.get(0));
     }
 
-    //TODO: sql delete implement
-    /*@Test
+    @Test
     public void testDeleteDeviceTsData() throws Exception {
         DeviceId deviceId = new DeviceId(UUIDs.timeBased());
 
+        saveEntries(deviceId, TS - 4);
         saveEntries(deviceId, TS - 3);
         saveEntries(deviceId, TS - 2);
         saveEntries(deviceId, TS - 1);
-        saveEntries(deviceId, TS);
 
         tsService.remove(deviceId, Collections.singletonList(
-                new BaseTsKvQuery(STRING_KEY, TS - 4, TS - 2))).get();
+                new BaseTsKvQuery(STRING_KEY, TS - 4, TS, 60000, 0, Aggregation.NONE, DESC_ORDER,
+                        false))).get();
 
         List<TsKvEntry> list = tsService.findAll(deviceId, Collections.singletonList(
-                new BaseTsKvQuery(STRING_KEY, 0, 60000, 60000, 5, Aggregation.NONE, DESC_ORDER))).get();
+                new BaseTsKvQuery(STRING_KEY, 0, 60000, 60000, 5, Aggregation.NONE, DESC_ORDER,
+                        false))).get();
+        Assert.assertEquals(1, list.size());
 
-        Assert.assertEquals(2, list.size());
-    }*/
+        List<TsKvEntry> latest = tsService.findLatest(deviceId, Collections.singletonList(STRING_KEY)).get();
+        Assert.assertEquals(null, latest.get(0).getValueAsString());
+    }
 
     @Test
     public void testFindDeviceTsData() throws Exception {