Details
diff --git a/application/src/main/java/org/thingsboard/server/controller/DeviceController.java b/application/src/main/java/org/thingsboard/server/controller/DeviceController.java
index 856ca8e..79557f6 100644
--- a/application/src/main/java/org/thingsboard/server/controller/DeviceController.java
+++ b/application/src/main/java/org/thingsboard/server/controller/DeviceController.java
@@ -16,7 +16,6 @@
package org.thingsboard.server.controller;
import com.google.common.util.concurrent.ListenableFuture;
-import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.web.bind.annotation.*;
@@ -29,23 +28,17 @@ import org.thingsboard.server.common.data.device.DeviceSearchQuery;
import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.TenantId;
-import org.thingsboard.server.common.data.kv.Aggregation;
-import org.thingsboard.server.common.data.kv.BaseTsKvQuery;
-import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
-import org.thingsboard.server.common.data.kv.LongDataEntry;
import org.thingsboard.server.common.data.page.TextPageData;
import org.thingsboard.server.common.data.page.TextPageLink;
import org.thingsboard.server.common.data.security.Authority;
import org.thingsboard.server.common.data.security.DeviceCredentials;
import org.thingsboard.server.dao.exception.IncorrectParameterException;
import org.thingsboard.server.dao.model.ModelConstants;
-import org.thingsboard.server.dao.timeseries.TimeseriesService;
import org.thingsboard.server.exception.ThingsboardErrorCode;
import org.thingsboard.server.exception.ThingsboardException;
import org.thingsboard.server.service.security.model.SecurityUser;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
@@ -53,9 +46,6 @@ import java.util.stream.Collectors;
@RequestMapping("/api")
public class DeviceController extends BaseController {
- @Autowired
- protected TimeseriesService timeseriesService;
-
public static final String DEVICE_ID = "deviceId";
@PreAuthorize("hasAnyAuthority('TENANT_ADMIN', 'CUSTOMER_USER')")
@@ -377,50 +367,4 @@ public class DeviceController extends BaseController {
throw handleException(e);
}
}
-
- @PreAuthorize("hasAnyAuthority('TENANT_ADMIN')")
- @RequestMapping(value = "/device/testSave", method = RequestMethod.GET)
- @ResponseBody
- public void testSave() throws ThingsboardException {
- try {
- SecurityUser user = getCurrentUser();
- TenantId tenantId = user.getTenantId();
-
- Device device = deviceService.findDeviceByTenantIdAndName(tenantId, "Test");
-
- timeseriesService.save(device.getId(), new BasicTsKvEntry(1516892633000L,
- new LongDataEntry("test", 1L))).get();
- timeseriesService.save(device.getId(), new BasicTsKvEntry(1519571033000L,
- new LongDataEntry("test", 2L))).get();
- timeseriesService.save(device.getId(), new BasicTsKvEntry(1521990233000L,
- new LongDataEntry("test", 3L))).get();
- timeseriesService.save(device.getId(), new BasicTsKvEntry(1524668633000L,
- new LongDataEntry("test", 4L))).get();
- timeseriesService.save(device.getId(), new BasicTsKvEntry(1527260633000L,
- new LongDataEntry("test", 5L))).get();
-
- } catch (Exception e) {
- throw handleException(e);
- }
- }
-
- @PreAuthorize("hasAnyAuthority('TENANT_ADMIN')")
- @RequestMapping(value = "/device/testDelete", method = RequestMethod.GET)
- @ResponseBody
- public void testDelete() throws ThingsboardException {
- try {
- SecurityUser user = getCurrentUser();
- TenantId tenantId = user.getTenantId();
-
- Device device = deviceService.findDeviceByTenantIdAndName(tenantId, "Test");
-
- long startTs = 1519561033000L;
- long endTs = 1528260633000L;
- timeseriesService.remove(device.getId(), Collections.singletonList(new BaseTsKvQuery("test",
- startTs, endTs, endTs - startTs, 0, Aggregation.NONE, "DESC", true))).get();
-
- } catch (Exception e) {
- throw handleException(e);
- }
- }
}
diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/JpaTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/JpaTimeseriesDao.java
index bef82c3..7915e84 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/JpaTimeseriesDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/JpaTimeseriesDao.java
@@ -300,14 +300,27 @@ public class JpaTimeseriesDao extends JpaAbstractDaoListeningExecutorService imp
@Override
public ListenableFuture<Void> remove(EntityId entityId, TsKvQuery query) {
- //TODO: implement
- return null;
+ return insertService.submit(() -> {
+ tsKvRepository.delete(
+ fromTimeUUID(entityId.getId()),
+ entityId.getEntityType(),
+ query.getKey(),
+ query.getStartTs(),
+ query.getEndTs());
+ return null;
+ });
}
@Override
public ListenableFuture<Void> removeLatest(EntityId entityId, TsKvQuery query) {
- //TODO: implement
- return null;
+ TsKvLatestEntity latestEntity = new TsKvLatestEntity();
+ latestEntity.setEntityType(entityId.getEntityType());
+ latestEntity.setEntityId(fromTimeUUID(entityId.getId()));
+ latestEntity.setKey(query.getKey());
+ return insertService.submit(() -> {
+ tsKvLatestRepository.delete(latestEntity);
+ return null;
+ });
}
@Override
diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/TsKvRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/TsKvRepository.java
index a1d1920..2b39d25 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/TsKvRepository.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/TsKvRepository.java
@@ -16,10 +16,12 @@
package org.thingsboard.server.dao.sql.timeseries;
import org.springframework.data.domain.Pageable;
+import org.springframework.data.jpa.repository.Modifying;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.CrudRepository;
import org.springframework.data.repository.query.Param;
import org.springframework.scheduling.annotation.Async;
+import org.springframework.transaction.annotation.Transactional;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.dao.model.sql.TsKvCompositeKey;
import org.thingsboard.server.dao.model.sql.TsKvEntity;
@@ -41,6 +43,17 @@ public interface TsKvRepository extends CrudRepository<TsKvEntity, TsKvComposite
@Param("endTs") long endTs,
Pageable pageable);
+ @Transactional
+ @Modifying
+ @Query("DELETE FROM TsKvEntity tskv WHERE tskv.entityId = :entityId " +
+ "AND tskv.entityType = :entityType AND tskv.key = :entityKey " +
+ "AND tskv.ts > :startTs AND tskv.ts < :endTs")
+ void delete(@Param("entityId") String entityId,
+ @Param("entityType") EntityType entityType,
+ @Param("entityKey") String key,
+ @Param("startTs") long startTs,
+ @Param("endTs") long endTs);
+
@Async
@Query("SELECT new TsKvEntity(MAX(tskv.strValue), MAX(tskv.longValue), MAX(tskv.doubleValue)) FROM TsKvEntity tskv " +
"WHERE tskv.entityId = :entityId AND tskv.entityType = :entityType " +
@@ -56,30 +69,30 @@ public interface TsKvRepository extends CrudRepository<TsKvEntity, TsKvComposite
"WHERE tskv.entityId = :entityId AND tskv.entityType = :entityType " +
"AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts < :endTs")
CompletableFuture<TsKvEntity> findMin(@Param("entityId") String entityId,
- @Param("entityType") EntityType entityType,
- @Param("entityKey") String entityKey,
- @Param("startTs") long startTs,
- @Param("endTs") long endTs);
+ @Param("entityType") EntityType entityType,
+ @Param("entityKey") String entityKey,
+ @Param("startTs") long startTs,
+ @Param("endTs") long endTs);
@Async
@Query("SELECT new TsKvEntity(COUNT(tskv.booleanValue), COUNT(tskv.strValue), COUNT(tskv.longValue), COUNT(tskv.doubleValue)) FROM TsKvEntity tskv " +
"WHERE tskv.entityId = :entityId AND tskv.entityType = :entityType " +
"AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts < :endTs")
CompletableFuture<TsKvEntity> findCount(@Param("entityId") String entityId,
- @Param("entityType") EntityType entityType,
- @Param("entityKey") String entityKey,
- @Param("startTs") long startTs,
- @Param("endTs") long endTs);
+ @Param("entityType") EntityType entityType,
+ @Param("entityKey") String entityKey,
+ @Param("startTs") long startTs,
+ @Param("endTs") long endTs);
@Async
@Query("SELECT new TsKvEntity(AVG(tskv.longValue), AVG(tskv.doubleValue)) FROM TsKvEntity tskv " +
"WHERE tskv.entityId = :entityId AND tskv.entityType = :entityType " +
"AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts < :endTs")
CompletableFuture<TsKvEntity> findAvg(@Param("entityId") String entityId,
- @Param("entityType") EntityType entityType,
- @Param("entityKey") String entityKey,
- @Param("startTs") long startTs,
- @Param("endTs") long endTs);
+ @Param("entityType") EntityType entityType,
+ @Param("entityKey") String entityKey,
+ @Param("startTs") long startTs,
+ @Param("endTs") long endTs);
@Async
@@ -87,8 +100,8 @@ public interface TsKvRepository extends CrudRepository<TsKvEntity, TsKvComposite
"WHERE tskv.entityId = :entityId AND tskv.entityType = :entityType " +
"AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts < :endTs")
CompletableFuture<TsKvEntity> findSum(@Param("entityId") String entityId,
- @Param("entityType") EntityType entityType,
- @Param("entityKey") String entityKey,
- @Param("startTs") long startTs,
- @Param("endTs") long endTs);
+ @Param("entityType") EntityType entityType,
+ @Param("entityKey") String entityKey,
+ @Param("startTs") long startTs,
+ @Param("endTs") long endTs);
}
diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java
index 3e4e2bd..db61088 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java
@@ -441,7 +441,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
}
private ListenableFuture<Void> deleteLatest(EntityId entityId, String key) {
- Statement delete = QueryBuilder.delete().from(ModelConstants.TS_KV_LATEST_CF)
+ Statement delete = QueryBuilder.delete().all().from(ModelConstants.TS_KV_LATEST_CF)
.where(eq(ModelConstants.ENTITY_TYPE_COLUMN, entityId.getEntityType()))
.and(eq(ModelConstants.ENTITY_ID_COLUMN, entityId.getId()))
.and(eq(ModelConstants.KEY_COLUMN, key));
@@ -453,25 +453,36 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
public ListenableFuture<Void> removePartition(EntityId entityId, TsKvQuery query) {
long minPartition = toPartitionTs(query.getStartTs());
long maxPartition = toPartitionTs(query.getEndTs());
+ if (minPartition == maxPartition) {
+ return Futures.immediateFuture(null);
+ } else {
+ ResultSetFuture partitionsFuture = fetchPartitions(entityId, query.getKey(), minPartition, maxPartition);
- ResultSetFuture partitionsFuture = fetchPartitions(entityId, query.getKey(), minPartition, maxPartition);
+ final SimpleListenableFuture<Void> resultFuture = new SimpleListenableFuture<>();
+ final ListenableFuture<List<Long>> partitionsListFuture = Futures.transform(partitionsFuture, getPartitionsArrayFunction(), readResultsProcessingExecutor);
- final SimpleListenableFuture<Void> resultFuture = new SimpleListenableFuture<>();
- final ListenableFuture<List<Long>> partitionsListFuture = Futures.transform(partitionsFuture, getPartitionsArrayFunction(), readResultsProcessingExecutor);
-
- Futures.addCallback(partitionsListFuture, new FutureCallback<List<Long>>() {
- @Override
- public void onSuccess(@Nullable List<Long> partitions) {
- TsKvQueryCursor cursor = new TsKvQueryCursor(entityId.getEntityType().name(), entityId.getId(), query, partitions);
- deletePartitionAsync(cursor, resultFuture);
- }
+ Futures.addCallback(partitionsListFuture, new FutureCallback<List<Long>>() {
+ @Override
+ public void onSuccess(@Nullable List<Long> partitions) {
+ int index = 0;
+ if (minPartition != query.getStartTs()) {
+ index = 1;
+ }
+ List<Long> partitionsToDelete = new ArrayList<>();
+ for (int i = index; i < partitions.size() - 1; i++) {
+ partitionsToDelete.add(partitions.get(i));
+ }
+ TsKvQueryCursor cursor = new TsKvQueryCursor(entityId.getEntityType().name(), entityId.getId(), query, partitionsToDelete);
+ deletePartitionAsync(cursor, resultFuture);
+ }
- @Override
- public void onFailure(Throwable t) {
- log.error("[{}][{}] Failed to fetch partitions for interval {}-{}", entityId.getEntityType().name(), entityId.getId(), minPartition, maxPartition, t);
- }
- }, readResultsProcessingExecutor);
- return resultFuture;
+ @Override
+ public void onFailure(Throwable t) {
+ log.error("[{}][{}] Failed to fetch partitions for interval {}-{}", entityId.getEntityType().name(), entityId.getId(), minPartition, maxPartition, t);
+ }
+ }, readResultsProcessingExecutor);
+ return resultFuture;
+ }
}
private void deletePartitionAsync(final TsKvQueryCursor cursor, final SimpleListenableFuture<Void> resultFuture) {
diff --git a/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java b/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java
index a8d022b..b3a742c 100644
--- a/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java
+++ b/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java
@@ -93,24 +93,27 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
Assert.assertEquals(toTsEntry(TS, stringKvEntry), entries.get(0));
}
- //TODO: sql delete implement
- /*@Test
+ @Test
public void testDeleteDeviceTsData() throws Exception {
DeviceId deviceId = new DeviceId(UUIDs.timeBased());
+ saveEntries(deviceId, TS - 4);
saveEntries(deviceId, TS - 3);
saveEntries(deviceId, TS - 2);
saveEntries(deviceId, TS - 1);
- saveEntries(deviceId, TS);
tsService.remove(deviceId, Collections.singletonList(
- new BaseTsKvQuery(STRING_KEY, TS - 4, TS - 2))).get();
+ new BaseTsKvQuery(STRING_KEY, TS - 4, TS, 60000, 0, Aggregation.NONE, DESC_ORDER,
+ false))).get();
List<TsKvEntry> list = tsService.findAll(deviceId, Collections.singletonList(
- new BaseTsKvQuery(STRING_KEY, 0, 60000, 60000, 5, Aggregation.NONE, DESC_ORDER))).get();
+ new BaseTsKvQuery(STRING_KEY, 0, 60000, 60000, 5, Aggregation.NONE, DESC_ORDER,
+ false))).get();
+ Assert.assertEquals(1, list.size());
- Assert.assertEquals(2, list.size());
- }*/
+ List<TsKvEntry> latest = tsService.findLatest(deviceId, Collections.singletonList(STRING_KEY)).get();
+ Assert.assertEquals(null, latest.get(0).getValueAsString());
+ }
@Test
public void testFindDeviceTsData() throws Exception {