diff --git a/application/src/main/java/org/thingsboard/server/controller/DeviceController.java b/application/src/main/java/org/thingsboard/server/controller/DeviceController.java
index bceea54..ed9bf77 100644
--- a/application/src/main/java/org/thingsboard/server/controller/DeviceController.java
+++ b/application/src/main/java/org/thingsboard/server/controller/DeviceController.java
@@ -16,6 +16,7 @@
package org.thingsboard.server.controller;
import com.google.common.util.concurrent.ListenableFuture;
+import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.web.bind.annotation.*;
@@ -23,23 +24,27 @@ import org.thingsboard.server.common.data.Customer;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.EntitySubtype;
import org.thingsboard.server.common.data.EntityType;
-import org.thingsboard.server.common.data.audit.ActionStatus;
import org.thingsboard.server.common.data.audit.ActionType;
import org.thingsboard.server.common.data.device.DeviceSearchQuery;
import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.TenantId;
+import org.thingsboard.server.common.data.kv.BaseTsKvQuery;
+import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
+import org.thingsboard.server.common.data.kv.LongDataEntry;
import org.thingsboard.server.common.data.page.TextPageData;
import org.thingsboard.server.common.data.page.TextPageLink;
import org.thingsboard.server.common.data.security.Authority;
import org.thingsboard.server.common.data.security.DeviceCredentials;
import org.thingsboard.server.dao.exception.IncorrectParameterException;
import org.thingsboard.server.dao.model.ModelConstants;
+import org.thingsboard.server.dao.timeseries.TimeseriesService;
import org.thingsboard.server.exception.ThingsboardErrorCode;
import org.thingsboard.server.exception.ThingsboardException;
import org.thingsboard.server.service.security.model.SecurityUser;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
@@ -47,6 +52,9 @@ import java.util.stream.Collectors;
@RequestMapping("/api")
public class DeviceController extends BaseController {
+ @Autowired
+ protected TimeseriesService timeseriesService;
+
public static final String DEVICE_ID = "deviceId";
@PreAuthorize("hasAnyAuthority('TENANT_ADMIN', 'CUSTOMER_USER')")
@@ -70,7 +78,7 @@ public class DeviceController extends BaseController {
device.setTenantId(getCurrentUser().getTenantId());
if (getCurrentUser().getAuthority() == Authority.CUSTOMER_USER) {
if (device.getId() == null || device.getId().isNullUid() ||
- device.getCustomerId() == null || device.getCustomerId().isNullUid()) {
+ device.getCustomerId() == null || device.getCustomerId().isNullUid()) {
throw new ThingsboardException("You don't have permission to perform this operation!",
ThingsboardErrorCode.PERMISSION_DENIED);
} else {
@@ -368,4 +376,48 @@ public class DeviceController extends BaseController {
throw handleException(e);
}
}
+
+ @PreAuthorize("hasAnyAuthority('TENANT_ADMIN')")
+ @RequestMapping(value = "/device/testSave", method = RequestMethod.GET)
+ @ResponseBody
+ public void testSave() throws ThingsboardException {
+ try {
+ SecurityUser user = getCurrentUser();
+ TenantId tenantId = user.getTenantId();
+
+ Device device = deviceService.findDeviceByTenantIdAndName(tenantId, "Test");
+
+ timeseriesService.save(device.getId(), new BasicTsKvEntry(1516892633000L,
+ new LongDataEntry("test", 1L))).get();
+ timeseriesService.save(device.getId(), new BasicTsKvEntry(1519571033000L,
+ new LongDataEntry("test", 2L))).get();
+ timeseriesService.save(device.getId(), new BasicTsKvEntry(1521990233000L,
+ new LongDataEntry("test", 3L))).get();
+ timeseriesService.save(device.getId(), new BasicTsKvEntry(1524668633000L,
+ new LongDataEntry("test", 4L))).get();
+ timeseriesService.save(device.getId(), new BasicTsKvEntry(1527260633000L,
+ new LongDataEntry("test", 5L))).get();
+
+ } catch (Exception e) {
+ throw handleException(e);
+ }
+ }
+
+ @PreAuthorize("hasAnyAuthority('TENANT_ADMIN')")
+ @RequestMapping(value = "/device/testDelete", method = RequestMethod.GET)
+ @ResponseBody
+ public void testDelete() throws ThingsboardException {
+ try {
+ SecurityUser user = getCurrentUser();
+ TenantId tenantId = user.getTenantId();
+
+ Device device = deviceService.findDeviceByTenantIdAndName(tenantId, "Test");
+
+ timeseriesService.remove(device.getId(), Collections.singletonList(new BaseTsKvQuery("test",
+ 1519139033000L, 1524668633000L))).get();
+
+ } catch (Exception e) {
+ throw handleException(e);
+ }
+ }
}
diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java
index a075885..f035fd8 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java
@@ -40,7 +40,7 @@ import static org.apache.commons.lang3.StringUtils.isBlank;
public class BaseTimeseriesService implements TimeseriesService {
public static final int INSERTS_PER_ENTRY = 3;
- public static final int DELETES_PER_ENTRY = 2;
+ public static final int DELETES_PER_ENTRY = INSERTS_PER_ENTRY;
@Autowired
private TimeseriesDao timeseriesDao;
@@ -110,6 +110,7 @@ public class BaseTimeseriesService implements TimeseriesService {
private void deleteAndRegisterFutures(List<ListenableFuture<Void>> futures, EntityId entityId, TsKvQuery query) {
futures.add(timeseriesDao.remove(entityId, query));
futures.add(timeseriesDao.removeLatest(entityId, query));
+ futures.add(timeseriesDao.removePartition(entityId, query));
}
private static void validate(EntityId entityId) {
diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java
index 7895b59..423e90e 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java
@@ -41,10 +41,7 @@ import javax.annotation.PreDestroy;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Optional;
+import java.util.*;
import java.util.stream.Collectors;
import static com.datastax.driver.core.querybuilder.QueryBuilder.eq;
@@ -82,6 +79,8 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
private PreparedStatement[] fetchStmtsDesc;
private PreparedStatement findLatestStmt;
private PreparedStatement findAllLatestStmt;
+ private PreparedStatement deleteStmt;
+ private PreparedStatement deletePartitionStmt;
private boolean isInstall() {
return environment.acceptsProfiles("install");
@@ -374,35 +373,68 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
}
private PreparedStatement getDeleteStmt() {
- return prepare("DELETE FROM " + ModelConstants.TS_KV_CF +
- " WHERE " + ModelConstants.ENTITY_TYPE_COLUMN + EQUALS_PARAM
- + "AND " + ModelConstants.ENTITY_ID_COLUMN + EQUALS_PARAM
- + "AND " + ModelConstants.KEY_COLUMN + EQUALS_PARAM
- + "AND " + ModelConstants.PARTITION_COLUMN + EQUALS_PARAM
- + "AND " + ModelConstants.TS_COLUMN + " > ? "
- + "AND " + ModelConstants.TS_COLUMN + " <= ?");
+ if (deleteStmt == null) {
+ deleteStmt = prepare("DELETE FROM " + ModelConstants.TS_KV_CF +
+ " WHERE " + ModelConstants.ENTITY_TYPE_COLUMN + EQUALS_PARAM
+ + "AND " + ModelConstants.ENTITY_ID_COLUMN + EQUALS_PARAM
+ + "AND " + ModelConstants.KEY_COLUMN + EQUALS_PARAM
+ + "AND " + ModelConstants.PARTITION_COLUMN + EQUALS_PARAM
+ + "AND " + ModelConstants.TS_COLUMN + " > ? "
+ + "AND " + ModelConstants.TS_COLUMN + " <= ?");
+ }
+ return deleteStmt;
}
@Override
public ListenableFuture<Void> removeLatest(EntityId entityId, TsKvQuery query) {
- ListenableFuture<TsKvEntry> future = findLatest(entityId, query.getKey());
- return Futures.transform(future, new Function<TsKvEntry, Void>() {
- @Nullable
- @Override
- public Void apply(@Nullable TsKvEntry latestEntry) {
- if (latestEntry != null) {
+ ListenableFuture<TsKvEntry> latestEntryFuture = findLatest(entityId, query.getKey());
+
+ ListenableFuture<Boolean> booleanFuture = Futures.transform(latestEntryFuture,
+ (AsyncFunction<TsKvEntry, Boolean>) latestEntry -> {
long ts = latestEntry.getTs();
if (ts >= query.getStartTs() && ts <= query.getEndTs()) {
- deleteLatest(entityId, latestEntry.getKey());
-
- //TODO: save new latest entry(< query.getStartTs() - if present) to TS_KV_LATEST_CF
+ return Futures.immediateFuture(true);
} else {
log.trace("Won't be deleted latest value for [{}], key - {}", entityId, query.getKey());
}
- }
- return null;
+ return Futures.immediateFuture(false);
+ }, readResultsProcessingExecutor);
+
+
+ ListenableFuture<Void> savedLatestFuture = Futures.transform(booleanFuture,
+ (AsyncFunction<Boolean, Void>) isRemove -> {
+ if (isRemove) {
+ return getNewLatestEntryFuture(entityId, query);
+ }
+ return Futures.immediateFuture(null);
+ }, readResultsProcessingExecutor);
+
+ ListenableFuture<Void> removedLatestFuture = Futures.transform(booleanFuture,
+ (AsyncFunction<Boolean, Void>) isRemove -> {
+ if (isRemove) {
+ return deleteLatest(entityId, query.getKey());
+ }
+ return Futures.immediateFuture(null);
+ }, readResultsProcessingExecutor);
+ return Futures.transform(Futures.allAsList(Arrays.asList(savedLatestFuture, removedLatestFuture)),
+ (AsyncFunction<List<Void>, Void>) list -> Futures.immediateFuture(null), readResultsProcessingExecutor);
+ }
+
+ private ListenableFuture<Void> getNewLatestEntryFuture(EntityId entityId, TsKvQuery query) {
+ long startTs = 0;
+ long endTs = query.getStartTs() - 1;
+ TsKvQuery findNewLatestQuery = new BaseTsKvQuery(query.getKey(), startTs, endTs, endTs - startTs, 1,
+ Aggregation.NONE, DESC_ORDER);
+ ListenableFuture<List<TsKvEntry>> future = findAllAsync(entityId, findNewLatestQuery);
+
+ return Futures.transform(future, (AsyncFunction<List<TsKvEntry>, Void>) entryList -> {
+ if (entryList.size() == 1) {
+ return saveLatest(entityId, entryList.get(0));
+ } else {
+ log.trace("Could not find new latest value for [{}], key - {}", entityId, query.getKey());
}
- });
+ return Futures.immediateFuture(null);
+ }, readResultsProcessingExecutor);
}
private ListenableFuture<Void> deleteLatest(EntityId entityId, String key) {
@@ -414,6 +446,67 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
return getFuture(executeAsyncWrite(delete), rs -> null);
}
+ @Override
+ public ListenableFuture<Void> removePartition(EntityId entityId, TsKvQuery query) {
+ long minPartition = toPartitionTs(query.getStartTs());
+ long maxPartition = toPartitionTs(query.getEndTs());
+
+ ResultSetFuture partitionsFuture = fetchPartitions(entityId, query.getKey(), minPartition, maxPartition);
+
+ final SimpleListenableFuture<Void> resultFuture = new SimpleListenableFuture<>();
+ final ListenableFuture<List<Long>> partitionsListFuture = Futures.transform(partitionsFuture, getPartitionsArrayFunction(), readResultsProcessingExecutor);
+
+ Futures.addCallback(partitionsListFuture, new FutureCallback<List<Long>>() {
+ @Override
+ public void onSuccess(@Nullable List<Long> partitions) {
+ TsKvQueryCursor cursor = new TsKvQueryCursor(entityId.getEntityType().name(), entityId.getId(), query, partitions);
+ deletePartitionAsync(cursor, resultFuture);
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ log.error("[{}][{}] Failed to fetch partitions for interval {}-{}", entityId.getEntityType().name(), entityId.getId(), minPartition, maxPartition, t);
+ }
+ }, readResultsProcessingExecutor);
+ return resultFuture;
+ }
+
+ private void deletePartitionAsync(final TsKvQueryCursor cursor, final SimpleListenableFuture<Void> resultFuture) {
+ if (!cursor.hasNextPartition()) {
+ resultFuture.set(null);
+ } else {
+ PreparedStatement proto = getDeletePartitionStmt();
+ BoundStatement stmt = proto.bind();
+ stmt.setString(0, cursor.getEntityType());
+ stmt.setUUID(1, cursor.getEntityId());
+ stmt.setLong(2, cursor.getNextPartition());
+ stmt.setString(3, cursor.getKey());
+
+ Futures.addCallback(executeAsyncWrite(stmt), new FutureCallback<ResultSet>() {
+ @Override
+ public void onSuccess(@Nullable ResultSet result) {
+ deletePartitionAsync(cursor, resultFuture);
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ log.error("[{}][{}] Failed to delete data for query {}-{}", stmt, t);
+ }
+ }, readResultsProcessingExecutor);
+ }
+ }
+
+ private PreparedStatement getDeletePartitionStmt() {
+ if (deletePartitionStmt == null) {
+ deletePartitionStmt = prepare("DELETE FROM " + ModelConstants.TS_KV_PARTITIONS_CF +
+ " WHERE " + ModelConstants.ENTITY_TYPE_COLUMN + EQUALS_PARAM
+ + "AND " + ModelConstants.ENTITY_ID_COLUMN + EQUALS_PARAM
+ + "AND " + ModelConstants.PARTITION_COLUMN + EQUALS_PARAM
+ + "AND " + ModelConstants.KEY_COLUMN + EQUALS_PARAM);
+ }
+ return deletePartitionStmt;
+ }
+
private List<TsKvEntry> convertResultToTsKvEntryList(List<Row> rows) {
List<TsKvEntry> entries = new ArrayList<>(rows.size());
if (!rows.isEmpty()) {