thingsboard-aplcache
Changes
application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetryWebSocketService.java 4(+2 -2)
Details
diff --git a/application/src/main/java/org/thingsboard/server/controller/DeviceController.java b/application/src/main/java/org/thingsboard/server/controller/DeviceController.java
index a6daeec..24608cb 100644
--- a/application/src/main/java/org/thingsboard/server/controller/DeviceController.java
+++ b/application/src/main/java/org/thingsboard/server/controller/DeviceController.java
@@ -76,7 +76,7 @@ public class DeviceController extends BaseController {
device.setTenantId(getCurrentUser().getTenantId());
if (getCurrentUser().getAuthority() == Authority.CUSTOMER_USER) {
if (device.getId() == null || device.getId().isNullUid() ||
- device.getCustomerId() == null || device.getCustomerId().isNullUid()) {
+ device.getCustomerId() == null || device.getCustomerId().isNullUid()) {
throw new ThingsboardException("You don't have permission to perform this operation!",
ThingsboardErrorCode.PERMISSION_DENIED);
} else {
diff --git a/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java b/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java
index 10a16ec..b077ca3 100644
--- a/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java
+++ b/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java
@@ -201,7 +201,7 @@ public class TelemetryController extends BaseController {
(result, entityId) -> {
// If interval is 0, convert this to a NONE aggregation, which is probably what the user really wanted
Aggregation agg = interval == 0L ? Aggregation.valueOf(Aggregation.NONE.name()) : Aggregation.valueOf(aggStr);
- List<TsKvQuery> queries = toKeysList(keys).stream().map(key -> new BaseTsKvQuery(key, startTs, endTs, interval, limit, agg))
+ List<TsKvQuery> queries = toKeysList(keys).stream().map(key -> new BaseTsKvQuery(key, startTs, endTs, interval, limit, agg, "DESC", false))
.collect(Collectors.toList());
Futures.addCallback(tsService.findAll(entityId, queries), getTsKvListCallback(result));
diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetryWebSocketService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetryWebSocketService.java
index 3b2d690..c9d9898 100644
--- a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetryWebSocketService.java
+++ b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetryWebSocketService.java
@@ -251,7 +251,7 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi
}
EntityId entityId = EntityIdFactory.getByTypeAndId(cmd.getEntityType(), cmd.getEntityId());
List<String> keys = new ArrayList<>(getKeys(cmd).orElse(Collections.emptySet()));
- List<TsKvQuery> queries = keys.stream().map(key -> new BaseTsKvQuery(key, cmd.getStartTs(), cmd.getEndTs(), cmd.getInterval(), getLimit(cmd.getLimit()), getAggregation(cmd.getAgg())))
+ List<TsKvQuery> queries = keys.stream().map(key -> new BaseTsKvQuery(key, cmd.getStartTs(), cmd.getEndTs(), cmd.getInterval(), getLimit(cmd.getLimit()), getAggregation(cmd.getAgg()), "DESC", false))
.collect(Collectors.toList());
FutureCallback<List<TsKvEntry>> callback = new FutureCallback<List<TsKvEntry>>() {
@@ -338,7 +338,7 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi
startTs = cmd.getStartTs();
long endTs = cmd.getStartTs() + cmd.getTimeWindow();
List<TsKvQuery> queries = keys.stream().map(key -> new BaseTsKvQuery(key, startTs, endTs, cmd.getInterval(),
- getLimit(cmd.getLimit()), getAggregation(cmd.getAgg()))).collect(Collectors.toList());
+ getLimit(cmd.getLimit()), getAggregation(cmd.getAgg()), "DESC", false)).collect(Collectors.toList());
final FutureCallback<List<TsKvEntry>> callback = getSubscriptionCallback(sessionRef, cmd, sessionId, entityId, startTs, keys);
accessValidator.validate(sessionRef.getSecurityCtx(), entityId,
diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/kv/BaseTsKvQuery.java b/common/data/src/main/java/org/thingsboard/server/common/data/kv/BaseTsKvQuery.java
index 0afe00b..55d2797 100644
--- a/common/data/src/main/java/org/thingsboard/server/common/data/kv/BaseTsKvQuery.java
+++ b/common/data/src/main/java/org/thingsboard/server/common/data/kv/BaseTsKvQuery.java
@@ -26,18 +26,23 @@ public class BaseTsKvQuery implements TsKvQuery {
private final long interval;
private final int limit;
private final Aggregation aggregation;
+ private final String orderBy;
+ private final Boolean rewriteLatestIfDeleted;
- public BaseTsKvQuery(String key, long startTs, long endTs, long interval, int limit, Aggregation aggregation) {
+ public BaseTsKvQuery(String key, long startTs, long endTs, long interval, int limit, Aggregation aggregation, String orderBy,
+ boolean rewriteLatestIfDeleted) {
this.key = key;
this.startTs = startTs;
this.endTs = endTs;
this.interval = interval;
this.limit = limit;
this.aggregation = aggregation;
+ this.orderBy = orderBy;
+ this.rewriteLatestIfDeleted = rewriteLatestIfDeleted;
}
public BaseTsKvQuery(String key, long startTs, long endTs) {
- this(key, startTs, endTs, endTs-startTs, 1, Aggregation.AVG);
+ this(key, startTs, endTs, endTs - startTs, 1, Aggregation.AVG, "DESC", false);
}
}
diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/kv/TsKvQuery.java b/common/data/src/main/java/org/thingsboard/server/common/data/kv/TsKvQuery.java
index ca9f90c..825df6c 100644
--- a/common/data/src/main/java/org/thingsboard/server/common/data/kv/TsKvQuery.java
+++ b/common/data/src/main/java/org/thingsboard/server/common/data/kv/TsKvQuery.java
@@ -29,4 +29,7 @@ public interface TsKvQuery {
Aggregation getAggregation();
+ String getOrderBy();
+
+ Boolean getRewriteLatestIfDeleted();
}
diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/JpaTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/JpaTimeseriesDao.java
index e5f145f..84b4374 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/JpaTimeseriesDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/JpaTimeseriesDao.java
@@ -306,6 +306,36 @@ public class JpaTimeseriesDao extends JpaAbstractDaoListeningExecutorService imp
});
}
+ @Override
+ public ListenableFuture<Void> remove(EntityId entityId, TsKvQuery query) {
+ return service.submit(() -> {
+ tsKvRepository.delete(
+ fromTimeUUID(entityId.getId()),
+ entityId.getEntityType(),
+ query.getKey(),
+ query.getStartTs(),
+ query.getEndTs());
+ return null;
+ });
+ }
+
+ @Override
+ public ListenableFuture<Void> removeLatest(EntityId entityId, TsKvQuery query) {
+ TsKvLatestEntity latestEntity = new TsKvLatestEntity();
+ latestEntity.setEntityType(entityId.getEntityType());
+ latestEntity.setEntityId(fromTimeUUID(entityId.getId()));
+ latestEntity.setKey(query.getKey());
+ return service.submit(() -> {
+ tsKvLatestRepository.delete(latestEntity);
+ return null;
+ });
+ }
+
+ @Override
+ public ListenableFuture<Void> removePartition(EntityId entityId, TsKvQuery query) {
+ return service.submit(() -> null);
+ }
+
@PreDestroy
void onDestroy() {
if (insertService != null) {
diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/TsKvRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/TsKvRepository.java
index a1d1920..2b39d25 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/TsKvRepository.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/TsKvRepository.java
@@ -16,10 +16,12 @@
package org.thingsboard.server.dao.sql.timeseries;
import org.springframework.data.domain.Pageable;
+import org.springframework.data.jpa.repository.Modifying;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.CrudRepository;
import org.springframework.data.repository.query.Param;
import org.springframework.scheduling.annotation.Async;
+import org.springframework.transaction.annotation.Transactional;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.dao.model.sql.TsKvCompositeKey;
import org.thingsboard.server.dao.model.sql.TsKvEntity;
@@ -41,6 +43,17 @@ public interface TsKvRepository extends CrudRepository<TsKvEntity, TsKvComposite
@Param("endTs") long endTs,
Pageable pageable);
+ @Transactional
+ @Modifying
+ @Query("DELETE FROM TsKvEntity tskv WHERE tskv.entityId = :entityId " +
+ "AND tskv.entityType = :entityType AND tskv.key = :entityKey " +
+ "AND tskv.ts > :startTs AND tskv.ts < :endTs")
+ void delete(@Param("entityId") String entityId,
+ @Param("entityType") EntityType entityType,
+ @Param("entityKey") String key,
+ @Param("startTs") long startTs,
+ @Param("endTs") long endTs);
+
@Async
@Query("SELECT new TsKvEntity(MAX(tskv.strValue), MAX(tskv.longValue), MAX(tskv.doubleValue)) FROM TsKvEntity tskv " +
"WHERE tskv.entityId = :entityId AND tskv.entityType = :entityType " +
@@ -56,30 +69,30 @@ public interface TsKvRepository extends CrudRepository<TsKvEntity, TsKvComposite
"WHERE tskv.entityId = :entityId AND tskv.entityType = :entityType " +
"AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts < :endTs")
CompletableFuture<TsKvEntity> findMin(@Param("entityId") String entityId,
- @Param("entityType") EntityType entityType,
- @Param("entityKey") String entityKey,
- @Param("startTs") long startTs,
- @Param("endTs") long endTs);
+ @Param("entityType") EntityType entityType,
+ @Param("entityKey") String entityKey,
+ @Param("startTs") long startTs,
+ @Param("endTs") long endTs);
@Async
@Query("SELECT new TsKvEntity(COUNT(tskv.booleanValue), COUNT(tskv.strValue), COUNT(tskv.longValue), COUNT(tskv.doubleValue)) FROM TsKvEntity tskv " +
"WHERE tskv.entityId = :entityId AND tskv.entityType = :entityType " +
"AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts < :endTs")
CompletableFuture<TsKvEntity> findCount(@Param("entityId") String entityId,
- @Param("entityType") EntityType entityType,
- @Param("entityKey") String entityKey,
- @Param("startTs") long startTs,
- @Param("endTs") long endTs);
+ @Param("entityType") EntityType entityType,
+ @Param("entityKey") String entityKey,
+ @Param("startTs") long startTs,
+ @Param("endTs") long endTs);
@Async
@Query("SELECT new TsKvEntity(AVG(tskv.longValue), AVG(tskv.doubleValue)) FROM TsKvEntity tskv " +
"WHERE tskv.entityId = :entityId AND tskv.entityType = :entityType " +
"AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts < :endTs")
CompletableFuture<TsKvEntity> findAvg(@Param("entityId") String entityId,
- @Param("entityType") EntityType entityType,
- @Param("entityKey") String entityKey,
- @Param("startTs") long startTs,
- @Param("endTs") long endTs);
+ @Param("entityType") EntityType entityType,
+ @Param("entityKey") String entityKey,
+ @Param("startTs") long startTs,
+ @Param("endTs") long endTs);
@Async
@@ -87,8 +100,8 @@ public interface TsKvRepository extends CrudRepository<TsKvEntity, TsKvComposite
"WHERE tskv.entityId = :entityId AND tskv.entityType = :entityType " +
"AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts < :endTs")
CompletableFuture<TsKvEntity> findSum(@Param("entityId") String entityId,
- @Param("entityType") EntityType entityType,
- @Param("entityKey") String entityKey,
- @Param("startTs") long startTs,
- @Param("endTs") long endTs);
+ @Param("entityType") EntityType entityType,
+ @Param("entityKey") String entityKey,
+ @Param("startTs") long startTs,
+ @Param("endTs") long endTs);
}
diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java
index c981378..f035fd8 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java
@@ -40,6 +40,7 @@ import static org.apache.commons.lang3.StringUtils.isBlank;
public class BaseTimeseriesService implements TimeseriesService {
public static final int INSERTS_PER_ENTRY = 3;
+ public static final int DELETES_PER_ENTRY = INSERTS_PER_ENTRY;
@Autowired
private TimeseriesDao timeseriesDao;
@@ -95,6 +96,23 @@ public class BaseTimeseriesService implements TimeseriesService {
futures.add(timeseriesDao.save(entityId, tsKvEntry, ttl));
}
+ @Override
+ public ListenableFuture<List<Void>> remove(EntityId entityId, List<TsKvQuery> tsKvQueries) {
+ validate(entityId);
+ tsKvQueries.forEach(BaseTimeseriesService::validate);
+ List<ListenableFuture<Void>> futures = Lists.newArrayListWithExpectedSize(tsKvQueries.size() * DELETES_PER_ENTRY);
+ for (TsKvQuery tsKvQuery : tsKvQueries) {
+ deleteAndRegisterFutures(futures, entityId, tsKvQuery);
+ }
+ return Futures.allAsList(futures);
+ }
+
+ private void deleteAndRegisterFutures(List<ListenableFuture<Void>> futures, EntityId entityId, TsKvQuery query) {
+ futures.add(timeseriesDao.remove(entityId, query));
+ futures.add(timeseriesDao.removeLatest(entityId, query));
+ futures.add(timeseriesDao.removePartition(entityId, query));
+ }
+
private static void validate(EntityId entityId) {
Validator.validateEntityId(entityId, "Incorrect entityId " + entityId);
}
diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java
index 7aa317c..197f3e0 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java
@@ -15,11 +15,7 @@
*/
package org.thingsboard.server.dao.timeseries;
-import com.datastax.driver.core.BoundStatement;
-import com.datastax.driver.core.PreparedStatement;
-import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.ResultSetFuture;
-import com.datastax.driver.core.Row;
+import com.datastax.driver.core.*;
import com.datastax.driver.core.querybuilder.QueryBuilder;
import com.datastax.driver.core.querybuilder.Select;
import com.google.common.base.Function;
@@ -54,10 +50,7 @@ import javax.annotation.PreDestroy;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Optional;
+import java.util.*;
import java.util.stream.Collectors;
import static com.datastax.driver.core.querybuilder.QueryBuilder.eq;
@@ -75,6 +68,8 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
public static final String GENERATED_QUERY_FOR_ENTITY_TYPE_AND_ENTITY_ID = "Generated query [{}] for entityType {} and entityId {}";
public static final String SELECT_PREFIX = "SELECT ";
public static final String EQUALS_PARAM = " = ? ";
+ public static final String ASC_ORDER = "ASC";
+ public static final String DESC_ORDER = "DESC";
@Autowired
private Environment environment;
@@ -92,9 +87,12 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
private PreparedStatement latestInsertStmt;
private PreparedStatement[] saveStmts;
private PreparedStatement[] saveTtlStmts;
- private PreparedStatement[] fetchStmts;
+ private PreparedStatement[] fetchStmtsAsc;
+ private PreparedStatement[] fetchStmtsDesc;
private PreparedStatement findLatestStmt;
private PreparedStatement findAllLatestStmt;
+ private PreparedStatement deleteStmt;
+ private PreparedStatement deletePartitionStmt;
private boolean isInstall() {
return environment.acceptsProfiles("install");
@@ -104,7 +102,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
public void init() {
super.startExecutor();
if (!isInstall()) {
- getFetchStmt(Aggregation.NONE);
+ getFetchStmt(Aggregation.NONE, DESC_ORDER);
Optional<TsPartitionDate> partition = TsPartitionDate.parse(partitioning);
if (partition.isPresent()) {
tsFormat = partition.get();
@@ -148,7 +146,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
while (stepTs < query.getEndTs()) {
long startTs = stepTs;
long endTs = stepTs + step;
- TsKvQuery subQuery = new BaseTsKvQuery(query.getKey(), startTs, endTs, step, 1, query.getAggregation());
+ TsKvQuery subQuery = new BaseTsKvQuery(query.getKey(), startTs, endTs, step, 1, query.getAggregation(), query.getOrderBy(), false);
futures.add(findAndAggregateAsync(entityId, subQuery, toPartitionTs(startTs), toPartitionTs(endTs)));
stepTs = endTs;
}
@@ -197,7 +195,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
if (cursor.isFull() || !cursor.hasNextPartition()) {
resultFuture.set(cursor.getData());
} else {
- PreparedStatement proto = getFetchStmt(Aggregation.NONE);
+ PreparedStatement proto = getFetchStmt(Aggregation.NONE, cursor.getOrderBy());
BoundStatement stmt = proto.bind();
stmt.setString(0, cursor.getEntityType());
stmt.setUUID(1, cursor.getEntityId());
@@ -247,7 +245,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
private AsyncFunction<List<Long>, List<ResultSet>> getFetchChunksAsyncFunction(EntityId entityId, String key, Aggregation aggregation, long startTs, long endTs) {
return partitions -> {
try {
- PreparedStatement proto = getFetchStmt(aggregation);
+ PreparedStatement proto = getFetchStmt(aggregation, DESC_ORDER);
List<ResultSetFuture> futures = new ArrayList<>(partitions.size());
for (Long partition : partitions) {
log.trace("Fetching data for partition [{}] for entityType {} and entityId {}", partition, entityId.getEntityType(), entityId.getId());
@@ -347,6 +345,204 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
return getFuture(executeAsyncWrite(stmt), rs -> null);
}
+ @Override
+ public ListenableFuture<Void> remove(EntityId entityId, TsKvQuery query) {
+ long minPartition = toPartitionTs(query.getStartTs());
+ long maxPartition = toPartitionTs(query.getEndTs());
+
+ ResultSetFuture partitionsFuture = fetchPartitions(entityId, query.getKey(), minPartition, maxPartition);
+
+ final SimpleListenableFuture<Void> resultFuture = new SimpleListenableFuture<>();
+ final ListenableFuture<List<Long>> partitionsListFuture = Futures.transform(partitionsFuture, getPartitionsArrayFunction(), readResultsProcessingExecutor);
+
+ Futures.addCallback(partitionsListFuture, new FutureCallback<List<Long>>() {
+ @Override
+ public void onSuccess(@Nullable List<Long> partitions) {
+ TsKvQueryCursor cursor = new TsKvQueryCursor(entityId.getEntityType().name(), entityId.getId(), query, partitions);
+ deleteAsync(cursor, resultFuture);
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ log.error("[{}][{}] Failed to fetch partitions for interval {}-{}", entityId.getEntityType().name(), entityId.getId(), minPartition, maxPartition, t);
+ }
+ }, readResultsProcessingExecutor);
+ return resultFuture;
+ }
+
+ private void deleteAsync(final TsKvQueryCursor cursor, final SimpleListenableFuture<Void> resultFuture) {
+ if (!cursor.hasNextPartition()) {
+ resultFuture.set(null);
+ } else {
+ PreparedStatement proto = getDeleteStmt();
+ BoundStatement stmt = proto.bind();
+ stmt.setString(0, cursor.getEntityType());
+ stmt.setUUID(1, cursor.getEntityId());
+ stmt.setString(2, cursor.getKey());
+ stmt.setLong(3, cursor.getNextPartition());
+ stmt.setLong(4, cursor.getStartTs());
+ stmt.setLong(5, cursor.getEndTs());
+
+ Futures.addCallback(executeAsyncWrite(stmt), new FutureCallback<ResultSet>() {
+ @Override
+ public void onSuccess(@Nullable ResultSet result) {
+ deleteAsync(cursor, resultFuture);
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ log.error("[{}][{}] Failed to delete data for query {}-{}", stmt, t);
+ }
+ }, readResultsProcessingExecutor);
+ }
+ }
+
+ private PreparedStatement getDeleteStmt() {
+ if (deleteStmt == null) {
+ deleteStmt = prepare("DELETE FROM " + ModelConstants.TS_KV_CF +
+ " WHERE " + ModelConstants.ENTITY_TYPE_COLUMN + EQUALS_PARAM
+ + "AND " + ModelConstants.ENTITY_ID_COLUMN + EQUALS_PARAM
+ + "AND " + ModelConstants.KEY_COLUMN + EQUALS_PARAM
+ + "AND " + ModelConstants.PARTITION_COLUMN + EQUALS_PARAM
+ + "AND " + ModelConstants.TS_COLUMN + " > ? "
+ + "AND " + ModelConstants.TS_COLUMN + " <= ?");
+ }
+ return deleteStmt;
+ }
+
+ @Override
+ public ListenableFuture<Void> removeLatest(EntityId entityId, TsKvQuery query) {
+ ListenableFuture<TsKvEntry> latestEntryFuture = findLatest(entityId, query.getKey());
+
+ ListenableFuture<Boolean> booleanFuture = Futures.transformAsync(latestEntryFuture, latestEntry -> {
+ long ts = latestEntry.getTs();
+ if (ts >= query.getStartTs() && ts <= query.getEndTs()) {
+ return Futures.immediateFuture(true);
+ } else {
+ log.trace("Won't be deleted latest value for [{}], key - {}", entityId, query.getKey());
+ }
+ return Futures.immediateFuture(false);
+ }, readResultsProcessingExecutor);
+
+ ListenableFuture<Void> removedLatestFuture = Futures.transformAsync(booleanFuture, isRemove -> {
+ if (isRemove) {
+ return deleteLatest(entityId, query.getKey());
+ }
+ return Futures.immediateFuture(null);
+ }, readResultsProcessingExecutor);
+
+ if (query.getRewriteLatestIfDeleted()) {
+ ListenableFuture<Void> savedLatestFuture = Futures.transformAsync(booleanFuture, isRemove -> {
+ if (isRemove) {
+ return getNewLatestEntryFuture(entityId, query);
+ }
+ return Futures.immediateFuture(null);
+ }, readResultsProcessingExecutor);
+
+ return Futures.transformAsync(Futures.allAsList(Arrays.asList(savedLatestFuture, removedLatestFuture)),
+ list -> Futures.immediateFuture(null), readResultsProcessingExecutor);
+ }
+ return removedLatestFuture;
+ }
+
+ private ListenableFuture<Void> getNewLatestEntryFuture(EntityId entityId, TsKvQuery query) {
+ long startTs = 0;
+ long endTs = query.getStartTs() - 1;
+ TsKvQuery findNewLatestQuery = new BaseTsKvQuery(query.getKey(), startTs, endTs, endTs - startTs, 1,
+ Aggregation.NONE, DESC_ORDER, false);
+ ListenableFuture<List<TsKvEntry>> future = findAllAsync(entityId, findNewLatestQuery);
+
+ return Futures.transformAsync(future, entryList -> {
+ if (entryList.size() == 1) {
+ return saveLatest(entityId, entryList.get(0));
+ } else {
+ log.trace("Could not find new latest value for [{}], key - {}", entityId, query.getKey());
+ }
+ return Futures.immediateFuture(null);
+ }, readResultsProcessingExecutor);
+ }
+
+ private ListenableFuture<Void> deleteLatest(EntityId entityId, String key) {
+ Statement delete = QueryBuilder.delete().all().from(ModelConstants.TS_KV_LATEST_CF)
+ .where(eq(ModelConstants.ENTITY_TYPE_COLUMN, entityId.getEntityType()))
+ .and(eq(ModelConstants.ENTITY_ID_COLUMN, entityId.getId()))
+ .and(eq(ModelConstants.KEY_COLUMN, key));
+ log.debug("Remove request: {}", delete.toString());
+ return getFuture(executeAsyncWrite(delete), rs -> null);
+ }
+
+ @Override
+ public ListenableFuture<Void> removePartition(EntityId entityId, TsKvQuery query) {
+ long minPartition = toPartitionTs(query.getStartTs());
+ long maxPartition = toPartitionTs(query.getEndTs());
+ if (minPartition == maxPartition) {
+ return Futures.immediateFuture(null);
+ } else {
+ ResultSetFuture partitionsFuture = fetchPartitions(entityId, query.getKey(), minPartition, maxPartition);
+
+ final SimpleListenableFuture<Void> resultFuture = new SimpleListenableFuture<>();
+ final ListenableFuture<List<Long>> partitionsListFuture = Futures.transform(partitionsFuture, getPartitionsArrayFunction(), readResultsProcessingExecutor);
+
+ Futures.addCallback(partitionsListFuture, new FutureCallback<List<Long>>() {
+ @Override
+ public void onSuccess(@Nullable List<Long> partitions) {
+ int index = 0;
+ if (minPartition != query.getStartTs()) {
+ index = 1;
+ }
+ List<Long> partitionsToDelete = new ArrayList<>();
+ for (int i = index; i < partitions.size() - 1; i++) {
+ partitionsToDelete.add(partitions.get(i));
+ }
+ TsKvQueryCursor cursor = new TsKvQueryCursor(entityId.getEntityType().name(), entityId.getId(), query, partitionsToDelete);
+ deletePartitionAsync(cursor, resultFuture);
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ log.error("[{}][{}] Failed to fetch partitions for interval {}-{}", entityId.getEntityType().name(), entityId.getId(), minPartition, maxPartition, t);
+ }
+ }, readResultsProcessingExecutor);
+ return resultFuture;
+ }
+ }
+
+ private void deletePartitionAsync(final TsKvQueryCursor cursor, final SimpleListenableFuture<Void> resultFuture) {
+ if (!cursor.hasNextPartition()) {
+ resultFuture.set(null);
+ } else {
+ PreparedStatement proto = getDeletePartitionStmt();
+ BoundStatement stmt = proto.bind();
+ stmt.setString(0, cursor.getEntityType());
+ stmt.setUUID(1, cursor.getEntityId());
+ stmt.setLong(2, cursor.getNextPartition());
+ stmt.setString(3, cursor.getKey());
+
+ Futures.addCallback(executeAsyncWrite(stmt), new FutureCallback<ResultSet>() {
+ @Override
+ public void onSuccess(@Nullable ResultSet result) {
+ deletePartitionAsync(cursor, resultFuture);
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ log.error("[{}][{}] Failed to delete data for query {}-{}", stmt, t);
+ }
+ }, readResultsProcessingExecutor);
+ }
+ }
+
+ private PreparedStatement getDeletePartitionStmt() {
+ if (deletePartitionStmt == null) {
+ deletePartitionStmt = prepare("DELETE FROM " + ModelConstants.TS_KV_PARTITIONS_CF +
+ " WHERE " + ModelConstants.ENTITY_TYPE_COLUMN + EQUALS_PARAM
+ + "AND " + ModelConstants.ENTITY_ID_COLUMN + EQUALS_PARAM
+ + "AND " + ModelConstants.PARTITION_COLUMN + EQUALS_PARAM
+ + "AND " + ModelConstants.KEY_COLUMN + EQUALS_PARAM);
+ }
+ return deletePartitionStmt;
+ }
+
private List<TsKvEntry> convertResultToTsKvEntryList(List<Row> rows) {
List<TsKvEntry> entries = new ArrayList<>(rows.size());
if (!rows.isEmpty()) {
@@ -442,28 +638,43 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
return saveTtlStmts[dataType.ordinal()];
}
- private PreparedStatement getFetchStmt(Aggregation aggType) {
- if (fetchStmts == null) {
- fetchStmts = new PreparedStatement[Aggregation.values().length];
- for (Aggregation type : Aggregation.values()) {
- if (type == Aggregation.SUM && fetchStmts[Aggregation.AVG.ordinal()] != null) {
- fetchStmts[type.ordinal()] = fetchStmts[Aggregation.AVG.ordinal()];
- } else if (type == Aggregation.AVG && fetchStmts[Aggregation.SUM.ordinal()] != null) {
- fetchStmts[type.ordinal()] = fetchStmts[Aggregation.SUM.ordinal()];
- } else {
- fetchStmts[type.ordinal()] = prepare(SELECT_PREFIX +
- String.join(", ", ModelConstants.getFetchColumnNames(type)) + " FROM " + ModelConstants.TS_KV_CF
- + " WHERE " + ModelConstants.ENTITY_TYPE_COLUMN + EQUALS_PARAM
- + "AND " + ModelConstants.ENTITY_ID_COLUMN + EQUALS_PARAM
- + "AND " + ModelConstants.KEY_COLUMN + EQUALS_PARAM
- + "AND " + ModelConstants.PARTITION_COLUMN + EQUALS_PARAM
- + "AND " + ModelConstants.TS_COLUMN + " > ? "
- + "AND " + ModelConstants.TS_COLUMN + " <= ?"
- + (type == Aggregation.NONE ? " ORDER BY " + ModelConstants.TS_COLUMN + " DESC LIMIT ?" : ""));
+ private PreparedStatement getFetchStmt(Aggregation aggType, String orderBy) {
+ switch (orderBy) {
+ case ASC_ORDER:
+ if (fetchStmtsAsc == null) {
+ fetchStmtsAsc = initFetchStmt(orderBy);
+ }
+ return fetchStmtsAsc[aggType.ordinal()];
+ case DESC_ORDER:
+ if (fetchStmtsDesc == null) {
+ fetchStmtsDesc = initFetchStmt(orderBy);
}
+ return fetchStmtsDesc[aggType.ordinal()];
+ default:
+ throw new RuntimeException("Not supported" + orderBy + "order!");
+ }
+ }
+
+ private PreparedStatement[] initFetchStmt(String orderBy) {
+ PreparedStatement[] fetchStmts = new PreparedStatement[Aggregation.values().length];
+ for (Aggregation type : Aggregation.values()) {
+ if (type == Aggregation.SUM && fetchStmts[Aggregation.AVG.ordinal()] != null) {
+ fetchStmts[type.ordinal()] = fetchStmts[Aggregation.AVG.ordinal()];
+ } else if (type == Aggregation.AVG && fetchStmts[Aggregation.SUM.ordinal()] != null) {
+ fetchStmts[type.ordinal()] = fetchStmts[Aggregation.SUM.ordinal()];
+ } else {
+ fetchStmts[type.ordinal()] = prepare(SELECT_PREFIX +
+ String.join(", ", ModelConstants.getFetchColumnNames(type)) + " FROM " + ModelConstants.TS_KV_CF
+ + " WHERE " + ModelConstants.ENTITY_TYPE_COLUMN + EQUALS_PARAM
+ + "AND " + ModelConstants.ENTITY_ID_COLUMN + EQUALS_PARAM
+ + "AND " + ModelConstants.KEY_COLUMN + EQUALS_PARAM
+ + "AND " + ModelConstants.PARTITION_COLUMN + EQUALS_PARAM
+ + "AND " + ModelConstants.TS_COLUMN + " > ? "
+ + "AND " + ModelConstants.TS_COLUMN + " <= ?"
+ + (type == Aggregation.NONE ? " ORDER BY " + ModelConstants.TS_COLUMN + " " + orderBy + " LIMIT ?" : ""));
}
}
- return fetchStmts[aggType.ordinal()];
+ return fetchStmts;
}
private PreparedStatement getLatestStmt() {
diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesDao.java
index 1e3f4ce..62dbd50 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesDao.java
@@ -38,4 +38,10 @@ public interface TimeseriesDao {
ListenableFuture<Void> savePartition(EntityId entityId, long tsKvEntryTs, String key, long ttl);
ListenableFuture<Void> saveLatest(EntityId entityId, TsKvEntry tsKvEntry);
+
+ ListenableFuture<Void> remove(EntityId entityId, TsKvQuery query);
+
+ ListenableFuture<Void> removeLatest(EntityId entityId, TsKvQuery query);
+
+ ListenableFuture<Void> removePartition(EntityId entityId, TsKvQuery query);
}
diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java
index 2cd2d8d..a149191 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java
@@ -37,4 +37,6 @@ public interface TimeseriesService {
ListenableFuture<List<Void>> save(EntityId entityId, TsKvEntry tsKvEntry);
ListenableFuture<List<Void>> save(EntityId entityId, List<TsKvEntry> tsKvEntry, long ttl);
+
+ ListenableFuture<List<Void>> remove(EntityId entityId, List<TsKvQuery> queries);
}
diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TsKvQueryCursor.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TsKvQueryCursor.java
index d6b6bbd..c4925ee 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TsKvQueryCursor.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TsKvQueryCursor.java
@@ -23,6 +23,8 @@ import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
+import static org.thingsboard.server.dao.timeseries.CassandraBaseTimeseriesDao.DESC_ORDER;
+
/**
* Created by ashvayka on 21.02.17.
*/
@@ -40,6 +42,8 @@ public class TsKvQueryCursor {
private final List<Long> partitions;
@Getter
private final List<TsKvEntry> data;
+ @Getter
+ private String orderBy;
private int partitionIndex;
private int currentLimit;
@@ -51,13 +55,14 @@ public class TsKvQueryCursor {
this.startTs = baseQuery.getStartTs();
this.endTs = baseQuery.getEndTs();
this.partitions = partitions;
- this.partitionIndex = partitions.size() - 1;
+ this.orderBy = baseQuery.getOrderBy();
+ this.partitionIndex = isDesc() ? partitions.size() - 1 : 0;
this.data = new ArrayList<>();
this.currentLimit = baseQuery.getLimit();
}
public boolean hasNextPartition() {
- return partitionIndex >= 0;
+ return isDesc() ? partitionIndex >= 0 : partitionIndex <= partitions.size() - 1;
}
public boolean isFull() {
@@ -66,7 +71,11 @@ public class TsKvQueryCursor {
public long getNextPartition() {
long partition = partitions.get(partitionIndex);
- partitionIndex--;
+ if (isDesc()) {
+ partitionIndex--;
+ } else {
+ partitionIndex++;
+ }
return partition;
}
@@ -78,4 +87,8 @@ public class TsKvQueryCursor {
currentLimit -= newData.size();
data.addAll(newData);
}
+
+ private boolean isDesc() {
+ return orderBy.equals(DESC_ORDER);
+ }
}
diff --git a/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java b/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java
index f045bf1..0d54b20 100644
--- a/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java
+++ b/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java
@@ -53,6 +53,7 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
private static final String BOOLEAN_KEY = "booleanKey";
private static final long TS = 42L;
+ private static final String DESC_ORDER = "DESC";
KvEntry stringKvEntry = new StringDataEntry(STRING_KEY, "value");
KvEntry longKvEntry = new LongDataEntry(LONG_KEY, Long.MAX_VALUE);
@@ -101,6 +102,28 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
}
@Test
+ public void testDeleteDeviceTsData() throws Exception {
+ DeviceId deviceId = new DeviceId(UUIDs.timeBased());
+
+ saveEntries(deviceId, 10000);
+ saveEntries(deviceId, 20000);
+ saveEntries(deviceId, 30000);
+ saveEntries(deviceId, 40000);
+
+ tsService.remove(deviceId, Collections.singletonList(
+ new BaseTsKvQuery(STRING_KEY, 15000, 45000, 10000, 0, Aggregation.NONE, DESC_ORDER,
+ false))).get();
+
+ List<TsKvEntry> list = tsService.findAll(deviceId, Collections.singletonList(
+ new BaseTsKvQuery(STRING_KEY, 5000, 45000, 10000, 10, Aggregation.NONE, DESC_ORDER,
+ false))).get();
+ Assert.assertEquals(1, list.size());
+
+ List<TsKvEntry> latest = tsService.findLatest(deviceId, Collections.singletonList(STRING_KEY)).get();
+ Assert.assertEquals(null, latest.get(0).getValueAsString());
+ }
+
+ @Test
public void testFindDeviceTsData() throws Exception {
DeviceId deviceId = new DeviceId(UUIDs.timeBased());
List<TsKvEntry> entries = new ArrayList<>();
@@ -115,7 +138,7 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
entries.add(save(deviceId, 55000, 600));
List<TsKvEntry> list = tsService.findAll(deviceId, Collections.singletonList(new BaseTsKvQuery(LONG_KEY, 0,
- 60000, 20000, 3, Aggregation.NONE))).get();
+ 60000, 20000, 3, Aggregation.NONE, DESC_ORDER, false))).get();
assertEquals(3, list.size());
assertEquals(55000, list.get(0).getTs());
assertEquals(java.util.Optional.of(600L), list.get(0).getLongValue());
@@ -127,7 +150,7 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
assertEquals(java.util.Optional.of(400L), list.get(2).getLongValue());
list = tsService.findAll(deviceId, Collections.singletonList(new BaseTsKvQuery(LONG_KEY, 0,
- 60000, 20000, 3, Aggregation.AVG))).get();
+ 60000, 20000, 3, Aggregation.AVG, DESC_ORDER, false))).get();
assertEquals(3, list.size());
assertEquals(10000, list.get(0).getTs());
assertEquals(java.util.Optional.of(150L), list.get(0).getLongValue());
@@ -139,7 +162,7 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
assertEquals(java.util.Optional.of(550L), list.get(2).getLongValue());
list = tsService.findAll(deviceId, Collections.singletonList(new BaseTsKvQuery(LONG_KEY, 0,
- 60000, 20000, 3, Aggregation.SUM))).get();
+ 60000, 20000, 3, Aggregation.SUM, DESC_ORDER, false))).get();
assertEquals(3, list.size());
assertEquals(10000, list.get(0).getTs());
@@ -152,7 +175,7 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
assertEquals(java.util.Optional.of(1100L), list.get(2).getLongValue());
list = tsService.findAll(deviceId, Collections.singletonList(new BaseTsKvQuery(LONG_KEY, 0,
- 60000, 20000, 3, Aggregation.MIN))).get();
+ 60000, 20000, 3, Aggregation.MIN, DESC_ORDER, false))).get();
assertEquals(3, list.size());
assertEquals(10000, list.get(0).getTs());
@@ -165,7 +188,7 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
assertEquals(java.util.Optional.of(500L), list.get(2).getLongValue());
list = tsService.findAll(deviceId, Collections.singletonList(new BaseTsKvQuery(LONG_KEY, 0,
- 60000, 20000, 3, Aggregation.MAX))).get();
+ 60000, 20000, 3, Aggregation.MAX, DESC_ORDER, false))).get();
assertEquals(3, list.size());
assertEquals(10000, list.get(0).getTs());
@@ -178,7 +201,7 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
assertEquals(java.util.Optional.of(600L), list.get(2).getLongValue());
list = tsService.findAll(deviceId, Collections.singletonList(new BaseTsKvQuery(LONG_KEY, 0,
- 60000, 20000, 3, Aggregation.COUNT))).get();
+ 60000, 20000, 3, Aggregation.COUNT, DESC_ORDER, false))).get();
assertEquals(3, list.size());
assertEquals(10000, list.get(0).getTs());