Details
diff --git a/application/src/main/java/org/thingsboard/server/controller/EntityViewController.java b/application/src/main/java/org/thingsboard/server/controller/EntityViewController.java
index 9b7ffd2..2a9c073 100644
--- a/application/src/main/java/org/thingsboard/server/controller/EntityViewController.java
+++ b/application/src/main/java/org/thingsboard/server/controller/EntityViewController.java
@@ -29,7 +29,6 @@ import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.ResponseStatus;
import org.springframework.web.bind.annotation.RestController;
-import org.thingsboard.rule.engine.api.msg.DeviceAttributesEventNotificationMsg;
import org.thingsboard.server.common.data.Customer;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.EntitySubtype;
@@ -39,7 +38,6 @@ import org.thingsboard.server.common.data.audit.ActionType;
import org.thingsboard.server.common.data.entityview.EntityViewSearchQuery;
import org.thingsboard.server.common.data.exception.ThingsboardException;
import org.thingsboard.server.common.data.id.CustomerId;
-import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.EntityViewId;
import org.thingsboard.server.common.data.id.TenantId;
@@ -47,7 +45,6 @@ import org.thingsboard.server.common.data.id.UUIDBased;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.page.TextPageData;
import org.thingsboard.server.common.data.page.TextPageLink;
-import org.thingsboard.server.common.msg.cluster.SendToClusterMsg;
import org.thingsboard.server.dao.exception.IncorrectParameterException;
import org.thingsboard.server.dao.model.ModelConstants;
import org.thingsboard.server.service.security.model.SecurityUser;
@@ -174,7 +171,7 @@ public class EntityViewController extends BaseController {
EntityView entityView = checkEntityViewId(entityViewId);
entityViewService.deleteEntityView(entityViewId);
logEntityAction(entityViewId, entityView, entityView.getCustomerId(),
- ActionType.DELETED,null, strEntityViewId);
+ ActionType.DELETED, null, strEntityViewId);
} catch (Exception e) {
logEntityAction(emptyId(EntityType.ENTITY_VIEW),
null,
@@ -185,10 +182,23 @@ public class EntityViewController extends BaseController {
}
@PreAuthorize("hasAuthority('TENANT_ADMIN')")
+ @RequestMapping(value = "/tenant/entityViews", params = {"entityViewName"}, method = RequestMethod.GET)
+ @ResponseBody
+ public EntityView getTenantEntityView(
+ @RequestParam String entityViewName) throws ThingsboardException {
+ try {
+ TenantId tenantId = getCurrentUser().getTenantId();
+ return checkNotNull(entityViewService.findEntityViewByTenantIdAndName(tenantId, entityViewName));
+ } catch (Exception e) {
+ throw handleException(e);
+ }
+ }
+
+ @PreAuthorize("hasAuthority('TENANT_ADMIN')")
@RequestMapping(value = "/customer/{customerId}/entityView/{entityViewId}", method = RequestMethod.POST)
@ResponseBody
public EntityView assignEntityViewToCustomer(@PathVariable(CUSTOMER_ID) String strCustomerId,
- @PathVariable(ENTITY_VIEW_ID) String strEntityViewId) throws ThingsboardException {
+ @PathVariable(ENTITY_VIEW_ID) String strEntityViewId) throws ThingsboardException {
checkParameter(CUSTOMER_ID, strCustomerId);
checkParameter(ENTITY_VIEW_ID, strEntityViewId);
try {
diff --git a/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java b/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java
index 6c37614..ef09a7a 100644
--- a/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java
+++ b/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java
@@ -49,9 +49,11 @@ import org.thingsboard.server.common.data.kv.Aggregation;
import org.thingsboard.server.common.data.kv.AttributeKey;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
+import org.thingsboard.server.common.data.kv.BaseDeleteTsKvQuery;
import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery;
import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
import org.thingsboard.server.common.data.kv.BooleanDataEntry;
+import org.thingsboard.server.common.data.kv.DeleteTsKvQuery;
import org.thingsboard.server.common.data.kv.DoubleDataEntry;
import org.thingsboard.server.common.data.kv.KvEntry;
import org.thingsboard.server.common.data.kv.LongDataEntry;
@@ -60,12 +62,10 @@ import org.thingsboard.server.common.data.kv.StringDataEntry;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.msg.cluster.SendToClusterMsg;
import org.thingsboard.server.common.transport.adaptor.JsonConverter;
-import org.thingsboard.server.dao.attributes.AttributesService;
import org.thingsboard.server.dao.timeseries.TimeseriesService;
import org.thingsboard.server.service.security.AccessValidator;
import org.thingsboard.server.service.security.model.SecurityUser;
import org.thingsboard.server.service.telemetry.AttributeData;
-import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService;
import org.thingsboard.server.service.telemetry.TsData;
import org.thingsboard.server.service.telemetry.exception.InvalidParametersException;
import org.thingsboard.server.service.telemetry.exception.UncheckedApiException;
@@ -250,6 +250,60 @@ public class TelemetryController extends BaseController {
}
@PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')")
+ @RequestMapping(value = "/{entityType}/{entityId}/timeseries/delete", method = RequestMethod.DELETE)
+ @ResponseBody
+ public DeferredResult<ResponseEntity> deleteEntityTimeseries(@PathVariable("entityType") String entityType, @PathVariable("entityId") String entityIdStr,
+ @RequestParam(name = "keys") String keysStr,
+ @RequestParam(name = "deleteAllDataForKeys", defaultValue = "false") boolean deleteAllDataForKeys,
+ @RequestParam(name = "startTs", required = false) Long startTs,
+ @RequestParam(name = "endTs", required = false) Long endTs,
+ @RequestParam(name = "rewriteLatestIfDeleted", defaultValue = "false") boolean rewriteLatestIfDeleted) throws ThingsboardException {
+ EntityId entityId = EntityIdFactory.getByTypeAndId(entityType, entityIdStr);
+ return deleteTimeseries(entityId, keysStr, deleteAllDataForKeys, startTs, endTs, rewriteLatestIfDeleted);
+ }
+
+ private DeferredResult<ResponseEntity> deleteTimeseries(EntityId entityIdStr, String keysStr, boolean deleteAllDataForKeys,
+ Long startTs, Long endTs, boolean rewriteLatestIfDeleted) throws ThingsboardException {
+ List<String> keys = toKeysList(keysStr);
+ if (keys.isEmpty()) {
+ return getImmediateDeferredResult("Empty keys: " + keysStr, HttpStatus.BAD_REQUEST);
+ }
+ SecurityUser user = getCurrentUser();
+
+ long deleteFromTs;
+ long deleteToTs;
+ if (deleteAllDataForKeys) {
+ deleteFromTs = 0L;
+ deleteToTs = System.currentTimeMillis();
+ } else {
+ deleteFromTs = startTs;
+ deleteToTs = endTs;
+ }
+
+ return accessValidator.validateEntityAndCallback(user, entityIdStr, (result, entityId) -> {
+ List<DeleteTsKvQuery> deleteTsKvQueries = new ArrayList<>();
+ for (String key : keys) {
+ deleteTsKvQueries.add(new BaseDeleteTsKvQuery(key, deleteFromTs, deleteToTs, rewriteLatestIfDeleted));
+ }
+
+ ListenableFuture<List<Void>> future = tsService.remove(entityId, deleteTsKvQueries);
+ Futures.addCallback(future, new FutureCallback<List<Void>>() {
+ @Override
+ public void onSuccess(@Nullable List<Void> tmp) {
+ logTimeseriesDeleted(user, entityId, keys, null);
+ result.setResult(new ResponseEntity<>(HttpStatus.OK));
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ logTimeseriesDeleted(user, entityId, keys, t);
+ result.setResult(new ResponseEntity<>(HttpStatus.INTERNAL_SERVER_ERROR));
+ }
+ }, executor);
+ });
+ }
+
+ @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')")
@RequestMapping(value = "/{deviceId}/{scope}", method = RequestMethod.DELETE)
@ResponseBody
public DeferredResult<ResponseEntity> deleteEntityAttributes(@PathVariable("deviceId") String deviceIdStr,
@@ -506,6 +560,15 @@ public class TelemetryController extends BaseController {
};
}
+ private void logTimeseriesDeleted(SecurityUser user, EntityId entityId, List<String> keys, Throwable e) {
+ try {
+ logEntityAction(user, (UUIDBased & EntityId) entityId, null, null, ActionType.TIMESERIES_DELETED, toException(e),
+ keys);
+ } catch (ThingsboardException te) {
+ log.warn("Failed to log timeseries delete", te);
+ }
+ }
+
private void logAttributesDeleted(SecurityUser user, EntityId entityId, String scope, List<String> keys, Throwable e) {
try {
logEntityAction(user, (UUIDBased & EntityId) entityId, null, null, ActionType.ATTRIBUTES_DELETED, toException(e),
diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/audit/ActionType.java b/common/data/src/main/java/org/thingsboard/server/common/data/audit/ActionType.java
index c37d460..822387c 100644
--- a/common/data/src/main/java/org/thingsboard/server/common/data/audit/ActionType.java
+++ b/common/data/src/main/java/org/thingsboard/server/common/data/audit/ActionType.java
@@ -24,6 +24,7 @@ public enum ActionType {
UPDATED(false), // log entity
ATTRIBUTES_UPDATED(false), // log attributes/values
ATTRIBUTES_DELETED(false), // log attributes
+ TIMESERIES_DELETED(false), // log timeseries
RPC_CALL(false), // log method and params
CREDENTIALS_UPDATED(false), // log new credentials
ASSIGNED_TO_CUSTOMER(false), // log customer name
@@ -32,11 +33,11 @@ public enum ActionType {
SUSPENDED(false), // log string id
CREDENTIALS_READ(true), // log device id
ATTRIBUTES_READ(true), // log attributes
- RELATION_ADD_OR_UPDATE (false),
- RELATION_DELETED (false),
- RELATIONS_DELETED (false),
- ALARM_ACK (false),
- ALARM_CLEAR (false);
+ RELATION_ADD_OR_UPDATE(false),
+ RELATION_DELETED(false),
+ RELATIONS_DELETED(false),
+ ALARM_ACK(false),
+ ALARM_CLEAR(false);
private final boolean isRead;
diff --git a/dao/src/main/java/org/thingsboard/server/dao/entityview/EntityViewService.java b/dao/src/main/java/org/thingsboard/server/dao/entityview/EntityViewService.java
index da87b44..9c82866 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/entityview/EntityViewService.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/entityview/EntityViewService.java
@@ -43,6 +43,8 @@ public interface EntityViewService {
EntityView findEntityViewById(EntityViewId entityViewId);
+ EntityView findEntityViewByTenantIdAndName(TenantId tenantId, String name);
+
TextPageData<EntityView> findEntityViewByTenantId(TenantId tenantId, TextPageLink pageLink);
TextPageData<EntityView> findEntityViewByTenantIdAndType(TenantId tenantId, TextPageLink pageLink, String type);
diff --git a/dao/src/main/java/org/thingsboard/server/dao/entityview/EntityViewServiceImpl.java b/dao/src/main/java/org/thingsboard/server/dao/entityview/EntityViewServiceImpl.java
index 2d94cc2..9f8949d 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/entityview/EntityViewServiceImpl.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/entityview/EntityViewServiceImpl.java
@@ -29,8 +29,6 @@ import org.springframework.cache.annotation.Cacheable;
import org.springframework.cache.annotation.Caching;
import org.springframework.stereotype.Service;
import org.thingsboard.server.common.data.Customer;
-import org.thingsboard.server.common.data.DataConstants;
-import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.EntitySubtype;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.EntityView;
@@ -40,12 +38,10 @@ import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.EntityViewId;
import org.thingsboard.server.common.data.id.TenantId;
-import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.page.TextPageData;
import org.thingsboard.server.common.data.page.TextPageLink;
import org.thingsboard.server.common.data.relation.EntityRelation;
import org.thingsboard.server.common.data.relation.EntitySearchDirection;
-import org.thingsboard.server.dao.attributes.AttributesService;
import org.thingsboard.server.dao.customer.CustomerDao;
import org.thingsboard.server.dao.entity.AbstractEntityService;
import org.thingsboard.server.dao.exception.DataValidationException;
@@ -56,15 +52,13 @@ import org.thingsboard.server.dao.tenant.TenantDao;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
-import java.util.concurrent.ExecutionException;
+import java.util.Optional;
import java.util.stream.Collectors;
import static org.thingsboard.server.common.data.CacheConstants.ENTITY_VIEW_CACHE;
-import static org.thingsboard.server.common.data.CacheConstants.RELATIONS_CACHE;
import static org.thingsboard.server.dao.model.ModelConstants.NULL_UUID;
import static org.thingsboard.server.dao.service.Validator.validateId;
import static org.thingsboard.server.dao.service.Validator.validatePageLink;
@@ -96,6 +90,7 @@ public class EntityViewServiceImpl extends AbstractEntityService implements Enti
@Caching(evict = {
@CacheEvict(cacheNames = ENTITY_VIEW_CACHE, key = "{#entityView.tenantId, #entityView.entityId}"),
+ @CacheEvict(cacheNames = ENTITY_VIEW_CACHE, key = "{#entityView.tenantId, #entityView.name}"),
@CacheEvict(cacheNames = ENTITY_VIEW_CACHE, key = "{#entityView.id}")})
@Override
public EntityView saveEntityView(EntityView entityView) {
@@ -137,6 +132,15 @@ public class EntityViewServiceImpl extends AbstractEntityService implements Enti
return entityViewDao.findById(entityViewId.getId());
}
+ @Cacheable(cacheNames = ENTITY_VIEW_CACHE, key = "{#tenantId, #name}")
+ @Override
+ public EntityView findEntityViewByTenantIdAndName(TenantId tenantId, String name) {
+ log.trace("Executing findEntityViewByTenantIdAndName [{}][{}]", tenantId, name);
+ validateId(tenantId, INCORRECT_TENANT_ID + tenantId);
+ Optional<EntityView> entityViewOpt = entityViewDao.findEntityViewByTenantIdAndName(tenantId.getId(), name);
+ return entityViewOpt.orElse(null);
+ }
+
@Override
public TextPageData<EntityView> findEntityViewByTenantId(TenantId tenantId, TextPageLink pageLink) {
log.trace("Executing findEntityViewsByTenantId, tenantId [{}], pageLink [{}]", tenantId, pageLink);
@@ -255,6 +259,7 @@ public class EntityViewServiceImpl extends AbstractEntityService implements Enti
deleteEntityRelations(entityViewId);
EntityView entityView = entityViewDao.findById(entityViewId.getId());
cacheManager.getCache(ENTITY_VIEW_CACHE).evict(Arrays.asList(entityView.getTenantId(), entityView.getEntityId()));
+ cacheManager.getCache(ENTITY_VIEW_CACHE).evict(Arrays.asList(entityView.getTenantId(), entityView.getName()));
entityViewDao.removeById(entityViewId.getId());
}
diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/JpaTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/JpaTimeseriesDao.java
index 5bd9175..04227a3 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/JpaTimeseriesDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/JpaTimeseriesDao.java
@@ -17,6 +17,7 @@ package org.thingsboard.server.dao.sql.timeseries;
import com.google.common.base.Function;
import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
@@ -31,6 +32,7 @@ import org.springframework.stereotype.Component;
import org.thingsboard.server.common.data.UUIDConverter;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.kv.Aggregation;
+import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery;
import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
import org.thingsboard.server.common.data.kv.DeleteTsKvQuery;
import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
@@ -41,9 +43,9 @@ import org.thingsboard.server.dao.model.sql.TsKvEntity;
import org.thingsboard.server.dao.model.sql.TsKvLatestCompositeKey;
import org.thingsboard.server.dao.model.sql.TsKvLatestEntity;
import org.thingsboard.server.dao.sql.JpaAbstractDaoListeningExecutorService;
+import org.thingsboard.server.dao.timeseries.SimpleListenableFuture;
import org.thingsboard.server.dao.timeseries.TimeseriesDao;
import org.thingsboard.server.dao.timeseries.TsInsertExecutorType;
-import org.thingsboard.server.dao.util.SqlDao;
import org.thingsboard.server.dao.util.SqlTsDao;
import javax.annotation.Nullable;
@@ -53,6 +55,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
@@ -64,6 +67,8 @@ import static org.thingsboard.server.common.data.UUIDConverter.fromTimeUUID;
@SqlTsDao
public class JpaTimeseriesDao extends JpaAbstractDaoListeningExecutorService implements TimeseriesDao {
+ private static final String DESC_ORDER = "DESC";
+
@Value("${sql.ts_inserts_executor_type}")
private String insertExecutorType;
@@ -326,14 +331,72 @@ public class JpaTimeseriesDao extends JpaAbstractDaoListeningExecutorService imp
@Override
public ListenableFuture<Void> removeLatest(EntityId entityId, DeleteTsKvQuery query) {
- TsKvLatestEntity latestEntity = new TsKvLatestEntity();
- latestEntity.setEntityType(entityId.getEntityType());
- latestEntity.setEntityId(fromTimeUUID(entityId.getId()));
- latestEntity.setKey(query.getKey());
- return service.submit(() -> {
- tsKvLatestRepository.delete(latestEntity);
- return null;
+ ListenableFuture<TsKvEntry> latestFuture = findLatest(entityId, query.getKey());
+
+ ListenableFuture<Boolean> booleanFuture = Futures.transform(latestFuture, tsKvEntry -> {
+ long ts = tsKvEntry.getTs();
+ return ts > query.getStartTs() && ts <= query.getEndTs();
+ }, service);
+
+ ListenableFuture<Void> removedLatestFuture = Futures.transformAsync(booleanFuture, isRemove -> {
+ if (isRemove) {
+ TsKvLatestEntity latestEntity = new TsKvLatestEntity();
+ latestEntity.setEntityType(entityId.getEntityType());
+ latestEntity.setEntityId(fromTimeUUID(entityId.getId()));
+ latestEntity.setKey(query.getKey());
+ return service.submit(() -> {
+ tsKvLatestRepository.delete(latestEntity);
+ return null;
+ });
+ }
+ return Futures.immediateFuture(null);
+ }, service);
+
+ final SimpleListenableFuture<Void> resultFuture = new SimpleListenableFuture<>();
+ Futures.addCallback(removedLatestFuture, new FutureCallback<Void>() {
+ @Override
+ public void onSuccess(@Nullable Void result) {
+ if (query.getRewriteLatestIfDeleted()) {
+ ListenableFuture<Void> savedLatestFuture = Futures.transformAsync(booleanFuture, isRemove -> {
+ if (isRemove) {
+ return getNewLatestEntryFuture(entityId, query);
+ }
+ return Futures.immediateFuture(null);
+ }, service);
+
+ try {
+ resultFuture.set(savedLatestFuture.get());
+ } catch (InterruptedException | ExecutionException e) {
+ log.warn("Could not get latest saved value for [{}], {}", entityId, query.getKey(), e);
+ }
+ } else {
+ resultFuture.set(null);
+ }
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ log.warn("[{}] Failed to process remove of the latest value", entityId, t);
+ }
});
+ return resultFuture;
+ }
+
+ private ListenableFuture<Void> getNewLatestEntryFuture(EntityId entityId, DeleteTsKvQuery query) {
+ long startTs = 0;
+ long endTs = query.getStartTs() - 1;
+ ReadTsKvQuery findNewLatestQuery = new BaseReadTsKvQuery(query.getKey(), startTs, endTs, endTs - startTs, 1,
+ Aggregation.NONE, DESC_ORDER);
+ ListenableFuture<List<TsKvEntry>> future = findAllAsync(entityId, findNewLatestQuery);
+
+ return Futures.transformAsync(future, entryList -> {
+ if (entryList.size() == 1) {
+ return saveLatest(entityId, entryList.get(0));
+ } else {
+ log.trace("Could not find new latest value for [{}], key - {}", entityId, query.getKey());
+ }
+ return Futures.immediateFuture(null);
+ }, service);
}
@Override
diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/TsKvRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/TsKvRepository.java
index 4c743e5..296d173 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/TsKvRepository.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/TsKvRepository.java
@@ -47,7 +47,7 @@ public interface TsKvRepository extends CrudRepository<TsKvEntity, TsKvComposite
@Modifying
@Query("DELETE FROM TsKvEntity tskv WHERE tskv.entityId = :entityId " +
"AND tskv.entityType = :entityType AND tskv.key = :entityKey " +
- "AND tskv.ts > :startTs AND tskv.ts < :endTs")
+ "AND tskv.ts > :startTs AND tskv.ts <= :endTs")
void delete(@Param("entityId") String entityId,
@Param("entityType") EntityType entityType,
@Param("entityKey") String key,
diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java
index 709bfd5..fdc69f9 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java
@@ -48,7 +48,6 @@ import org.thingsboard.server.common.data.kv.StringDataEntry;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.dao.model.ModelConstants;
import org.thingsboard.server.dao.nosql.CassandraAbstractAsyncDao;
-import org.thingsboard.server.dao.util.NoSqlDao;
import org.thingsboard.server.dao.util.NoSqlTsDao;
import javax.annotation.Nullable;
@@ -62,6 +61,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
+import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import static com.datastax.driver.core.querybuilder.QueryBuilder.eq;
@@ -434,14 +434,14 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
public ListenableFuture<Void> removeLatest(EntityId entityId, DeleteTsKvQuery query) {
ListenableFuture<TsKvEntry> latestEntryFuture = findLatest(entityId, query.getKey());
- ListenableFuture<Boolean> booleanFuture = Futures.transformAsync(latestEntryFuture, latestEntry -> {
+ ListenableFuture<Boolean> booleanFuture = Futures.transform(latestEntryFuture, latestEntry -> {
long ts = latestEntry.getTs();
- if (ts >= query.getStartTs() && ts <= query.getEndTs()) {
- return Futures.immediateFuture(true);
+ if (ts > query.getStartTs() && ts <= query.getEndTs()) {
+ return true;
} else {
log.trace("Won't be deleted latest value for [{}], key - {}", entityId, query.getKey());
}
- return Futures.immediateFuture(false);
+ return false;
}, readResultsProcessingExecutor);
ListenableFuture<Void> removedLatestFuture = Futures.transformAsync(booleanFuture, isRemove -> {
@@ -451,18 +451,34 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
return Futures.immediateFuture(null);
}, readResultsProcessingExecutor);
- if (query.getRewriteLatestIfDeleted()) {
- ListenableFuture<Void> savedLatestFuture = Futures.transformAsync(booleanFuture, isRemove -> {
- if (isRemove) {
- return getNewLatestEntryFuture(entityId, query);
+ final SimpleListenableFuture<Void> resultFuture = new SimpleListenableFuture<>();
+ Futures.addCallback(removedLatestFuture, new FutureCallback<Void>() {
+ @Override
+ public void onSuccess(@Nullable Void result) {
+ if (query.getRewriteLatestIfDeleted()) {
+ ListenableFuture<Void> savedLatestFuture = Futures.transformAsync(booleanFuture, isRemove -> {
+ if (isRemove) {
+ return getNewLatestEntryFuture(entityId, query);
+ }
+ return Futures.immediateFuture(null);
+ }, readResultsProcessingExecutor);
+
+ try {
+ resultFuture.set(savedLatestFuture.get());
+ } catch (InterruptedException | ExecutionException e) {
+ log.warn("Could not get latest saved value for [{}], {}", entityId, query.getKey(), e);
+ }
+ } else {
+ resultFuture.set(null);
}
- return Futures.immediateFuture(null);
- }, readResultsProcessingExecutor);
+ }
- return Futures.transformAsync(Futures.allAsList(Arrays.asList(savedLatestFuture, removedLatestFuture)),
- list -> Futures.immediateFuture(null), readResultsProcessingExecutor);
- }
- return removedLatestFuture;
+ @Override
+ public void onFailure(Throwable t) {
+ log.warn("[{}] Failed to process remove of the latest value", entityId, t);
+ }
+ });
+ return resultFuture;
}
private ListenableFuture<Void> getNewLatestEntryFuture(EntityId entityId, DeleteTsKvQuery query) {
diff --git a/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java b/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java
index 88f4d84..81de40a 100644
--- a/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java
+++ b/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java
@@ -152,7 +152,7 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
}
@Test
- public void testDeleteDeviceTsData() throws Exception {
+ public void testDeleteDeviceTsDataWithoutOverwritingLatest() throws Exception {
DeviceId deviceId = new DeviceId(UUIDs.timeBased());
saveEntries(deviceId, 10000);
@@ -172,6 +172,26 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
}
@Test
+ public void testDeleteDeviceTsDataWithOverwritingLatest() throws Exception {
+ DeviceId deviceId = new DeviceId(UUIDs.timeBased());
+
+ saveEntries(deviceId, 10000);
+ saveEntries(deviceId, 20000);
+ saveEntries(deviceId, 30000);
+ saveEntries(deviceId, 40000);
+
+ tsService.remove(deviceId, Collections.singletonList(
+ new BaseDeleteTsKvQuery(STRING_KEY, 25000, 45000, true))).get();
+
+ List<TsKvEntry> list = tsService.findAll(deviceId, Collections.singletonList(
+ new BaseReadTsKvQuery(STRING_KEY, 5000, 45000, 10000, 10, Aggregation.NONE))).get();
+ Assert.assertEquals(2, list.size());
+
+ List<TsKvEntry> latest = tsService.findLatest(deviceId, Collections.singletonList(STRING_KEY)).get();
+ Assert.assertEquals(20000, latest.get(0).getTs());
+ }
+
+ @Test
public void testFindDeviceTsData() throws Exception {
DeviceId deviceId = new DeviceId(UUIDs.timeBased());
List<TsKvEntry> entries = new ArrayList<>();