thingsboard-aplcache

improved removing timeseries

5/30/2018 12:27:06 PM

Details

diff --git a/application/src/main/java/org/thingsboard/server/controller/DeviceController.java b/application/src/main/java/org/thingsboard/server/controller/DeviceController.java
index bceea54..ed9bf77 100644
--- a/application/src/main/java/org/thingsboard/server/controller/DeviceController.java
+++ b/application/src/main/java/org/thingsboard/server/controller/DeviceController.java
@@ -16,6 +16,7 @@
 package org.thingsboard.server.controller;
 
 import com.google.common.util.concurrent.ListenableFuture;
+import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.http.HttpStatus;
 import org.springframework.security.access.prepost.PreAuthorize;
 import org.springframework.web.bind.annotation.*;
@@ -23,23 +24,27 @@ import org.thingsboard.server.common.data.Customer;
 import org.thingsboard.server.common.data.Device;
 import org.thingsboard.server.common.data.EntitySubtype;
 import org.thingsboard.server.common.data.EntityType;
-import org.thingsboard.server.common.data.audit.ActionStatus;
 import org.thingsboard.server.common.data.audit.ActionType;
 import org.thingsboard.server.common.data.device.DeviceSearchQuery;
 import org.thingsboard.server.common.data.id.CustomerId;
 import org.thingsboard.server.common.data.id.DeviceId;
 import org.thingsboard.server.common.data.id.TenantId;
+import org.thingsboard.server.common.data.kv.BaseTsKvQuery;
+import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
+import org.thingsboard.server.common.data.kv.LongDataEntry;
 import org.thingsboard.server.common.data.page.TextPageData;
 import org.thingsboard.server.common.data.page.TextPageLink;
 import org.thingsboard.server.common.data.security.Authority;
 import org.thingsboard.server.common.data.security.DeviceCredentials;
 import org.thingsboard.server.dao.exception.IncorrectParameterException;
 import org.thingsboard.server.dao.model.ModelConstants;
+import org.thingsboard.server.dao.timeseries.TimeseriesService;
 import org.thingsboard.server.exception.ThingsboardErrorCode;
 import org.thingsboard.server.exception.ThingsboardException;
 import org.thingsboard.server.service.security.model.SecurityUser;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.stream.Collectors;
 
@@ -47,6 +52,9 @@ import java.util.stream.Collectors;
 @RequestMapping("/api")
 public class DeviceController extends BaseController {
 
+    @Autowired
+    protected TimeseriesService timeseriesService;
+
     public static final String DEVICE_ID = "deviceId";
 
     @PreAuthorize("hasAnyAuthority('TENANT_ADMIN', 'CUSTOMER_USER')")
@@ -70,7 +78,7 @@ public class DeviceController extends BaseController {
             device.setTenantId(getCurrentUser().getTenantId());
             if (getCurrentUser().getAuthority() == Authority.CUSTOMER_USER) {
                 if (device.getId() == null || device.getId().isNullUid() ||
-                    device.getCustomerId() == null || device.getCustomerId().isNullUid()) {
+                        device.getCustomerId() == null || device.getCustomerId().isNullUid()) {
                     throw new ThingsboardException("You don't have permission to perform this operation!",
                             ThingsboardErrorCode.PERMISSION_DENIED);
                 } else {
@@ -368,4 +376,48 @@ public class DeviceController extends BaseController {
             throw handleException(e);
         }
     }
+
+    @PreAuthorize("hasAnyAuthority('TENANT_ADMIN')")
+    @RequestMapping(value = "/device/testSave", method = RequestMethod.GET)
+    @ResponseBody
+    public void testSave() throws ThingsboardException {
+        try {
+            SecurityUser user = getCurrentUser();
+            TenantId tenantId = user.getTenantId();
+
+            Device device = deviceService.findDeviceByTenantIdAndName(tenantId, "Test");
+
+            timeseriesService.save(device.getId(), new BasicTsKvEntry(1516892633000L,
+                    new LongDataEntry("test", 1L))).get();
+            timeseriesService.save(device.getId(), new BasicTsKvEntry(1519571033000L,
+                    new LongDataEntry("test", 2L))).get();
+            timeseriesService.save(device.getId(), new BasicTsKvEntry(1521990233000L,
+                    new LongDataEntry("test", 3L))).get();
+            timeseriesService.save(device.getId(), new BasicTsKvEntry(1524668633000L,
+                    new LongDataEntry("test", 4L))).get();
+            timeseriesService.save(device.getId(), new BasicTsKvEntry(1527260633000L,
+                    new LongDataEntry("test", 5L))).get();
+
+        } catch (Exception e) {
+            throw handleException(e);
+        }
+    }
+
+    @PreAuthorize("hasAnyAuthority('TENANT_ADMIN')")
+    @RequestMapping(value = "/device/testDelete", method = RequestMethod.GET)
+    @ResponseBody
+    public void testDelete() throws ThingsboardException {
+        try {
+            SecurityUser user = getCurrentUser();
+            TenantId tenantId = user.getTenantId();
+
+            Device device = deviceService.findDeviceByTenantIdAndName(tenantId, "Test");
+
+            timeseriesService.remove(device.getId(), Collections.singletonList(new BaseTsKvQuery("test",
+                    1519139033000L, 1524668633000L))).get();
+
+        } catch (Exception e) {
+            throw handleException(e);
+        }
+    }
 }
diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml
index c47fc28..aef9280 100644
--- a/application/src/main/resources/thingsboard.yml
+++ b/application/src/main/resources/thingsboard.yml
@@ -66,8 +66,8 @@ plugins:
 
 # JWT Token parameters
 security.jwt:
-  tokenExpirationTime: "${JWT_TOKEN_EXPIRATION_TIME:900}" # Number of seconds (15 mins)
-  refreshTokenExpTime: "${JWT_REFRESH_TOKEN_EXPIRATION_TIME:3600}" # Seconds (1 hour)
+  tokenExpirationTime: "${JWT_TOKEN_EXPIRATION_TIME:9000000}" # Number of seconds (15 mins)
+  refreshTokenExpTime: "${JWT_REFRESH_TOKEN_EXPIRATION_TIME:36000000}" # Seconds (1 hour)
   tokenIssuer: "${JWT_TOKEN_ISSUER:thingsboard.io}"
   tokenSigningKey: "${JWT_TOKEN_SIGNING_KEY:thingsboardDefaultSigningKey}"
 
@@ -133,7 +133,7 @@ quota:
     intervalMin: 2
 
 database:
-  type: "${DATABASE_TYPE:sql}" # cassandra OR sql
+  type: "${DATABASE_TYPE:cassandra}" # cassandra OR sql
 
 # Cassandra driver configuration parameters
 cassandra:
diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/JpaTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/JpaTimeseriesDao.java
index 5503f49..bef82c3 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/JpaTimeseriesDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/JpaTimeseriesDao.java
@@ -42,7 +42,6 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.stream.Collectors;
 
@@ -311,6 +310,11 @@ public class JpaTimeseriesDao extends JpaAbstractDaoListeningExecutorService imp
         return null;
     }
 
+    @Override
+    public ListenableFuture<Void> removePartition(EntityId entityId, TsKvQuery query) {
+        return insertService.submit(() -> null);
+    }
+
     @PreDestroy
     void onDestroy() {
         if (insertService != null) {
diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java
index a075885..f035fd8 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java
@@ -40,7 +40,7 @@ import static org.apache.commons.lang3.StringUtils.isBlank;
 public class BaseTimeseriesService implements TimeseriesService {
 
     public static final int INSERTS_PER_ENTRY = 3;
-    public static final int DELETES_PER_ENTRY = 2;
+    public static final int DELETES_PER_ENTRY = INSERTS_PER_ENTRY;
 
     @Autowired
     private TimeseriesDao timeseriesDao;
@@ -110,6 +110,7 @@ public class BaseTimeseriesService implements TimeseriesService {
     private void deleteAndRegisterFutures(List<ListenableFuture<Void>> futures, EntityId entityId, TsKvQuery query) {
         futures.add(timeseriesDao.remove(entityId, query));
         futures.add(timeseriesDao.removeLatest(entityId, query));
+        futures.add(timeseriesDao.removePartition(entityId, query));
     }
 
     private static void validate(EntityId entityId) {
diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java
index 7895b59..423e90e 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java
@@ -41,10 +41,7 @@ import javax.annotation.PreDestroy;
 import java.time.Instant;
 import java.time.LocalDateTime;
 import java.time.ZoneOffset;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Optional;
+import java.util.*;
 import java.util.stream.Collectors;
 
 import static com.datastax.driver.core.querybuilder.QueryBuilder.eq;
@@ -82,6 +79,8 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
     private PreparedStatement[] fetchStmtsDesc;
     private PreparedStatement findLatestStmt;
     private PreparedStatement findAllLatestStmt;
+    private PreparedStatement deleteStmt;
+    private PreparedStatement deletePartitionStmt;
 
     private boolean isInstall() {
         return environment.acceptsProfiles("install");
@@ -374,35 +373,68 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
     }
 
     private PreparedStatement getDeleteStmt() {
-        return prepare("DELETE FROM " + ModelConstants.TS_KV_CF +
-                " WHERE " + ModelConstants.ENTITY_TYPE_COLUMN + EQUALS_PARAM
-                + "AND " + ModelConstants.ENTITY_ID_COLUMN + EQUALS_PARAM
-                + "AND " + ModelConstants.KEY_COLUMN + EQUALS_PARAM
-                + "AND " + ModelConstants.PARTITION_COLUMN + EQUALS_PARAM
-                + "AND " + ModelConstants.TS_COLUMN + " > ? "
-                + "AND " + ModelConstants.TS_COLUMN + " <= ?");
+        if (deleteStmt == null) {
+            deleteStmt = prepare("DELETE FROM " + ModelConstants.TS_KV_CF +
+                    " WHERE " + ModelConstants.ENTITY_TYPE_COLUMN + EQUALS_PARAM
+                    + "AND " + ModelConstants.ENTITY_ID_COLUMN + EQUALS_PARAM
+                    + "AND " + ModelConstants.KEY_COLUMN + EQUALS_PARAM
+                    + "AND " + ModelConstants.PARTITION_COLUMN + EQUALS_PARAM
+                    + "AND " + ModelConstants.TS_COLUMN + " > ? "
+                    + "AND " + ModelConstants.TS_COLUMN + " <= ?");
+        }
+        return deleteStmt;
     }
 
     @Override
     public ListenableFuture<Void> removeLatest(EntityId entityId, TsKvQuery query) {
-        ListenableFuture<TsKvEntry> future = findLatest(entityId, query.getKey());
-        return Futures.transform(future, new Function<TsKvEntry, Void>() {
-            @Nullable
-            @Override
-            public Void apply(@Nullable TsKvEntry latestEntry) {
-                if (latestEntry != null) {
+        ListenableFuture<TsKvEntry> latestEntryFuture = findLatest(entityId, query.getKey());
+
+        ListenableFuture<Boolean> booleanFuture = Futures.transform(latestEntryFuture,
+                (AsyncFunction<TsKvEntry, Boolean>) latestEntry -> {
                     long ts = latestEntry.getTs();
                     if (ts >= query.getStartTs() && ts <= query.getEndTs()) {
-                        deleteLatest(entityId, latestEntry.getKey());
-
-                        //TODO: save new latest entry(< query.getStartTs() - if present) to TS_KV_LATEST_CF
+                        return Futures.immediateFuture(true);
                     } else {
                         log.trace("Won't be deleted latest value for [{}], key - {}", entityId, query.getKey());
                     }
-                }
-                return null;
+                    return Futures.immediateFuture(false);
+                }, readResultsProcessingExecutor);
+
+
+        ListenableFuture<Void> savedLatestFuture = Futures.transform(booleanFuture,
+                (AsyncFunction<Boolean, Void>) isRemove -> {
+                    if (isRemove) {
+                        return getNewLatestEntryFuture(entityId, query);
+                    }
+                    return Futures.immediateFuture(null);
+                }, readResultsProcessingExecutor);
+
+        ListenableFuture<Void> removedLatestFuture = Futures.transform(booleanFuture,
+                (AsyncFunction<Boolean, Void>) isRemove -> {
+                    if (isRemove) {
+                        return deleteLatest(entityId, query.getKey());
+                    }
+                    return Futures.immediateFuture(null);
+                }, readResultsProcessingExecutor);
+        return Futures.transform(Futures.allAsList(Arrays.asList(savedLatestFuture, removedLatestFuture)),
+                (AsyncFunction<List<Void>, Void>) list -> Futures.immediateFuture(null), readResultsProcessingExecutor);
+    }
+
+    private ListenableFuture<Void> getNewLatestEntryFuture(EntityId entityId, TsKvQuery query) {
+        long startTs = 0;
+        long endTs = query.getStartTs() - 1;
+        TsKvQuery findNewLatestQuery = new BaseTsKvQuery(query.getKey(), startTs, endTs, endTs - startTs, 1,
+                Aggregation.NONE, DESC_ORDER);
+        ListenableFuture<List<TsKvEntry>> future = findAllAsync(entityId, findNewLatestQuery);
+
+        return Futures.transform(future, (AsyncFunction<List<TsKvEntry>, Void>) entryList -> {
+            if (entryList.size() == 1) {
+                return saveLatest(entityId, entryList.get(0));
+            } else {
+                log.trace("Could not find new latest value for [{}], key - {}", entityId, query.getKey());
             }
-        });
+            return Futures.immediateFuture(null);
+        }, readResultsProcessingExecutor);
     }
 
     private ListenableFuture<Void> deleteLatest(EntityId entityId, String key) {
@@ -414,6 +446,67 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
         return getFuture(executeAsyncWrite(delete), rs -> null);
     }
 
+    @Override
+    public ListenableFuture<Void> removePartition(EntityId entityId, TsKvQuery query) {
+        long minPartition = toPartitionTs(query.getStartTs());
+        long maxPartition = toPartitionTs(query.getEndTs());
+
+        ResultSetFuture partitionsFuture = fetchPartitions(entityId, query.getKey(), minPartition, maxPartition);
+
+        final SimpleListenableFuture<Void> resultFuture = new SimpleListenableFuture<>();
+        final ListenableFuture<List<Long>> partitionsListFuture = Futures.transform(partitionsFuture, getPartitionsArrayFunction(), readResultsProcessingExecutor);
+
+        Futures.addCallback(partitionsListFuture, new FutureCallback<List<Long>>() {
+            @Override
+            public void onSuccess(@Nullable List<Long> partitions) {
+                TsKvQueryCursor cursor = new TsKvQueryCursor(entityId.getEntityType().name(), entityId.getId(), query, partitions);
+                deletePartitionAsync(cursor, resultFuture);
+            }
+
+            @Override
+            public void onFailure(Throwable t) {
+                log.error("[{}][{}] Failed to fetch partitions for interval {}-{}", entityId.getEntityType().name(), entityId.getId(), minPartition, maxPartition, t);
+            }
+        }, readResultsProcessingExecutor);
+        return resultFuture;
+    }
+
+    private void deletePartitionAsync(final TsKvQueryCursor cursor, final SimpleListenableFuture<Void> resultFuture) {
+        if (!cursor.hasNextPartition()) {
+            resultFuture.set(null);
+        } else {
+            PreparedStatement proto = getDeletePartitionStmt();
+            BoundStatement stmt = proto.bind();
+            stmt.setString(0, cursor.getEntityType());
+            stmt.setUUID(1, cursor.getEntityId());
+            stmt.setLong(2, cursor.getNextPartition());
+            stmt.setString(3, cursor.getKey());
+
+            Futures.addCallback(executeAsyncWrite(stmt), new FutureCallback<ResultSet>() {
+                @Override
+                public void onSuccess(@Nullable ResultSet result) {
+                    deletePartitionAsync(cursor, resultFuture);
+                }
+
+                @Override
+                public void onFailure(Throwable t) {
+                    log.error("[{}][{}] Failed to delete data for query {}-{}", stmt, t);
+                }
+            }, readResultsProcessingExecutor);
+        }
+    }
+
+    private PreparedStatement getDeletePartitionStmt() {
+        if (deletePartitionStmt == null) {
+            deletePartitionStmt = prepare("DELETE FROM " + ModelConstants.TS_KV_PARTITIONS_CF +
+                    " WHERE " + ModelConstants.ENTITY_TYPE_COLUMN + EQUALS_PARAM
+                    + "AND " + ModelConstants.ENTITY_ID_COLUMN + EQUALS_PARAM
+                    + "AND " + ModelConstants.PARTITION_COLUMN + EQUALS_PARAM
+                    + "AND " + ModelConstants.KEY_COLUMN + EQUALS_PARAM);
+        }
+        return deletePartitionStmt;
+    }
+
     private List<TsKvEntry> convertResultToTsKvEntryList(List<Row> rows) {
         List<TsKvEntry> entries = new ArrayList<>(rows.size());
         if (!rows.isEmpty()) {
diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesDao.java
index 22bb166..62dbd50 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesDao.java
@@ -42,4 +42,6 @@ public interface TimeseriesDao {
     ListenableFuture<Void> remove(EntityId entityId, TsKvQuery query);
 
     ListenableFuture<Void> removeLatest(EntityId entityId, TsKvQuery query);
+
+    ListenableFuture<Void> removePartition(EntityId entityId, TsKvQuery query);
 }