thingsboard-aplcache

Details

diff --git a/application/src/main/java/org/thingsboard/server/controller/DeviceController.java b/application/src/main/java/org/thingsboard/server/controller/DeviceController.java
index ed9bf77..856ca8e 100644
--- a/application/src/main/java/org/thingsboard/server/controller/DeviceController.java
+++ b/application/src/main/java/org/thingsboard/server/controller/DeviceController.java
@@ -29,6 +29,7 @@ import org.thingsboard.server.common.data.device.DeviceSearchQuery;
 import org.thingsboard.server.common.data.id.CustomerId;
 import org.thingsboard.server.common.data.id.DeviceId;
 import org.thingsboard.server.common.data.id.TenantId;
+import org.thingsboard.server.common.data.kv.Aggregation;
 import org.thingsboard.server.common.data.kv.BaseTsKvQuery;
 import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
 import org.thingsboard.server.common.data.kv.LongDataEntry;
@@ -413,8 +414,10 @@ public class DeviceController extends BaseController {
 
             Device device = deviceService.findDeviceByTenantIdAndName(tenantId, "Test");
 
+            long startTs = 1519561033000L;
+            long endTs = 1528260633000L;
             timeseriesService.remove(device.getId(), Collections.singletonList(new BaseTsKvQuery("test",
-                    1519139033000L, 1524668633000L))).get();
+                    startTs, endTs, endTs - startTs, 0, Aggregation.NONE, "DESC", true))).get();
 
         } catch (Exception e) {
             throw handleException(e);
diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/kv/BaseTsKvQuery.java b/common/data/src/main/java/org/thingsboard/server/common/data/kv/BaseTsKvQuery.java
index 51d4ad2..55d2797 100644
--- a/common/data/src/main/java/org/thingsboard/server/common/data/kv/BaseTsKvQuery.java
+++ b/common/data/src/main/java/org/thingsboard/server/common/data/kv/BaseTsKvQuery.java
@@ -27,8 +27,10 @@ public class BaseTsKvQuery implements TsKvQuery {
     private final int limit;
     private final Aggregation aggregation;
     private final String orderBy;
+    private final Boolean rewriteLatestIfDeleted;
 
-    public BaseTsKvQuery(String key, long startTs, long endTs, long interval, int limit, Aggregation aggregation, String orderBy) {
+    public BaseTsKvQuery(String key, long startTs, long endTs, long interval, int limit, Aggregation aggregation, String orderBy,
+                         boolean rewriteLatestIfDeleted) {
         this.key = key;
         this.startTs = startTs;
         this.endTs = endTs;
@@ -36,10 +38,11 @@ public class BaseTsKvQuery implements TsKvQuery {
         this.limit = limit;
         this.aggregation = aggregation;
         this.orderBy = orderBy;
+        this.rewriteLatestIfDeleted = rewriteLatestIfDeleted;
     }
 
     public BaseTsKvQuery(String key, long startTs, long endTs) {
-        this(key, startTs, endTs, endTs - startTs, 1, Aggregation.AVG, "DESC");
+        this(key, startTs, endTs, endTs - startTs, 1, Aggregation.AVG, "DESC", false);
     }
 
 }
diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/kv/TsKvQuery.java b/common/data/src/main/java/org/thingsboard/server/common/data/kv/TsKvQuery.java
index 9b907c3..825df6c 100644
--- a/common/data/src/main/java/org/thingsboard/server/common/data/kv/TsKvQuery.java
+++ b/common/data/src/main/java/org/thingsboard/server/common/data/kv/TsKvQuery.java
@@ -30,4 +30,6 @@ public interface TsKvQuery {
     Aggregation getAggregation();
 
     String getOrderBy();
+
+    Boolean getRewriteLatestIfDeleted();
 }
diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java
index 423e90e..3e4e2bd 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java
@@ -134,7 +134,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
             while (stepTs < query.getEndTs()) {
                 long startTs = stepTs;
                 long endTs = stepTs + step;
-                TsKvQuery subQuery = new BaseTsKvQuery(query.getKey(), startTs, endTs, step, 1, query.getAggregation(), query.getOrderBy());
+                TsKvQuery subQuery = new BaseTsKvQuery(query.getKey(), startTs, endTs, step, 1, query.getAggregation(), query.getOrderBy(), false);
                 futures.add(findAndAggregateAsync(entityId, subQuery, toPartitionTs(startTs), toPartitionTs(endTs)));
                 stepTs = endTs;
             }
@@ -400,15 +400,6 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
                     return Futures.immediateFuture(false);
                 }, readResultsProcessingExecutor);
 
-
-        ListenableFuture<Void> savedLatestFuture = Futures.transform(booleanFuture,
-                (AsyncFunction<Boolean, Void>) isRemove -> {
-                    if (isRemove) {
-                        return getNewLatestEntryFuture(entityId, query);
-                    }
-                    return Futures.immediateFuture(null);
-                }, readResultsProcessingExecutor);
-
         ListenableFuture<Void> removedLatestFuture = Futures.transform(booleanFuture,
                 (AsyncFunction<Boolean, Void>) isRemove -> {
                     if (isRemove) {
@@ -416,15 +407,27 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
                     }
                     return Futures.immediateFuture(null);
                 }, readResultsProcessingExecutor);
-        return Futures.transform(Futures.allAsList(Arrays.asList(savedLatestFuture, removedLatestFuture)),
-                (AsyncFunction<List<Void>, Void>) list -> Futures.immediateFuture(null), readResultsProcessingExecutor);
+
+        if (query.getRewriteLatestIfDeleted()) {
+            ListenableFuture<Void> savedLatestFuture = Futures.transform(booleanFuture,
+                    (AsyncFunction<Boolean, Void>) isRemove -> {
+                        if (isRemove) {
+                            return getNewLatestEntryFuture(entityId, query);
+                        }
+                        return Futures.immediateFuture(null);
+                    }, readResultsProcessingExecutor);
+
+            return Futures.transform(Futures.allAsList(Arrays.asList(savedLatestFuture, removedLatestFuture)),
+                    (AsyncFunction<List<Void>, Void>) list -> Futures.immediateFuture(null), readResultsProcessingExecutor);
+        }
+        return removedLatestFuture;
     }
 
     private ListenableFuture<Void> getNewLatestEntryFuture(EntityId entityId, TsKvQuery query) {
         long startTs = 0;
         long endTs = query.getStartTs() - 1;
         TsKvQuery findNewLatestQuery = new BaseTsKvQuery(query.getKey(), startTs, endTs, endTs - startTs, 1,
-                Aggregation.NONE, DESC_ORDER);
+                Aggregation.NONE, DESC_ORDER, false);
         ListenableFuture<List<TsKvEntry>> future = findAllAsync(entityId, findNewLatestQuery);
 
         return Futures.transform(future, (AsyncFunction<List<TsKvEntry>, Void>) entryList -> {
diff --git a/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java b/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java
index 181a197..a8d022b 100644
--- a/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java
+++ b/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java
@@ -127,7 +127,7 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
         entries.add(save(deviceId, 55000, 600));
 
         List<TsKvEntry> list = tsService.findAll(deviceId, Collections.singletonList(new BaseTsKvQuery(LONG_KEY, 0,
-                60000, 20000, 3, Aggregation.NONE, DESC_ORDER))).get();
+                60000, 20000, 3, Aggregation.NONE, DESC_ORDER, false))).get();
         assertEquals(3, list.size());
         assertEquals(55000, list.get(0).getTs());
         assertEquals(java.util.Optional.of(600L), list.get(0).getLongValue());
@@ -139,7 +139,7 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
         assertEquals(java.util.Optional.of(400L), list.get(2).getLongValue());
 
         list = tsService.findAll(deviceId, Collections.singletonList(new BaseTsKvQuery(LONG_KEY, 0,
-                60000, 20000, 3, Aggregation.AVG, DESC_ORDER))).get();
+                60000, 20000, 3, Aggregation.AVG, DESC_ORDER, false))).get();
         assertEquals(3, list.size());
         assertEquals(10000, list.get(0).getTs());
         assertEquals(java.util.Optional.of(150L), list.get(0).getLongValue());
@@ -151,7 +151,7 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
         assertEquals(java.util.Optional.of(550L), list.get(2).getLongValue());
 
         list = tsService.findAll(deviceId, Collections.singletonList(new BaseTsKvQuery(LONG_KEY, 0,
-                60000, 20000, 3, Aggregation.SUM, DESC_ORDER))).get();
+                60000, 20000, 3, Aggregation.SUM, DESC_ORDER, false))).get();
 
         assertEquals(3, list.size());
         assertEquals(10000, list.get(0).getTs());
@@ -164,7 +164,7 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
         assertEquals(java.util.Optional.of(1100L), list.get(2).getLongValue());
 
         list = tsService.findAll(deviceId, Collections.singletonList(new BaseTsKvQuery(LONG_KEY, 0,
-                60000, 20000, 3, Aggregation.MIN, DESC_ORDER))).get();
+                60000, 20000, 3, Aggregation.MIN, DESC_ORDER, false))).get();
 
         assertEquals(3, list.size());
         assertEquals(10000, list.get(0).getTs());
@@ -177,7 +177,7 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
         assertEquals(java.util.Optional.of(500L), list.get(2).getLongValue());
 
         list = tsService.findAll(deviceId, Collections.singletonList(new BaseTsKvQuery(LONG_KEY, 0,
-                60000, 20000, 3, Aggregation.MAX, DESC_ORDER))).get();
+                60000, 20000, 3, Aggregation.MAX, DESC_ORDER, false))).get();
 
         assertEquals(3, list.size());
         assertEquals(10000, list.get(0).getTs());
@@ -190,7 +190,7 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
         assertEquals(java.util.Optional.of(600L), list.get(2).getLongValue());
 
         list = tsService.findAll(deviceId, Collections.singletonList(new BaseTsKvQuery(LONG_KEY, 0,
-                60000, 20000, 3, Aggregation.COUNT, DESC_ORDER))).get();
+                60000, 20000, 3, Aggregation.COUNT, DESC_ORDER, false))).get();
 
         assertEquals(3, list.size());
         assertEquals(10000, list.get(0).getTs());
diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRestMsgHandler.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRestMsgHandler.java
index 0c7e387..bb0813f 100644
--- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRestMsgHandler.java
+++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRestMsgHandler.java
@@ -28,7 +28,6 @@ import org.thingsboard.server.common.data.EntityType;
 import org.thingsboard.server.common.data.id.DeviceId;
 import org.thingsboard.server.common.data.id.EntityId;
 import org.thingsboard.server.common.data.id.EntityIdFactory;
-import org.thingsboard.server.common.data.id.UUIDBased;
 import org.thingsboard.server.common.data.kv.*;
 import org.thingsboard.server.common.msg.core.TelemetryUploadRequest;
 import org.thingsboard.server.common.transport.adaptor.JsonConverter;
@@ -138,9 +137,10 @@ public class TelemetryRestMsgHandler extends DefaultRestMsgHandler {
 
             // If interval is 0, convert this to a NONE aggregation, which is probably what the user really wanted
             Aggregation agg = (interval.isPresent() && interval.get() == 0) ? Aggregation.valueOf(Aggregation.NONE.name()) :
-                                                                              Aggregation.valueOf(request.getParameter("agg", Aggregation.NONE.name()));
+                    Aggregation.valueOf(request.getParameter("agg", Aggregation.NONE.name()));
 
-            List<TsKvQuery> queries = keys.stream().map(key -> new BaseTsKvQuery(key, startTs.get(), endTs.get(), interval.get(), limit.orElse(TelemetryWebsocketMsgHandler.DEFAULT_LIMIT), agg, "DESC"))
+            List<TsKvQuery> queries = keys.stream().map(key -> new BaseTsKvQuery(key, startTs.get(), endTs.get(),
+                    interval.get(), limit.orElse(TelemetryWebsocketMsgHandler.DEFAULT_LIMIT), agg, "DESC", false))
                     .collect(Collectors.toList());
             ctx.loadTimeseries(entityId, queries, getTsKvListCallback(msg));
         } else {
@@ -218,7 +218,7 @@ public class TelemetryRestMsgHandler extends DefaultRestMsgHandler {
     }
 
     private boolean handleHttpPostAttributes(PluginContext ctx, PluginRestMsg msg, RestRequest request,
-                                          EntityId entityId, String scope) throws ServletException, IOException {
+                                             EntityId entityId, String scope) throws ServletException, IOException {
         if (DataConstants.SERVER_SCOPE.equals(scope) ||
                 DataConstants.SHARED_SCOPE.equals(scope)) {
             JsonNode jsonNode;
@@ -274,7 +274,7 @@ public class TelemetryRestMsgHandler extends DefaultRestMsgHandler {
                 }
             }
         });
-        return  attributes;
+        return attributes;
     }
 
     private void handleHttpPostTimeseries(PluginContext ctx, PluginRestMsg msg, RestRequest request, EntityId entityId, long ttl) {
diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryWebsocketMsgHandler.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryWebsocketMsgHandler.java
index bf75c5d..c024b67 100644
--- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryWebsocketMsgHandler.java
+++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryWebsocketMsgHandler.java
@@ -217,7 +217,8 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler {
             log.debug("[{}] fetching timeseries data for last {} ms for keys: ({}) for device : {}", sessionId, cmd.getTimeWindow(), cmd.getKeys(), entityId);
             startTs = cmd.getStartTs();
             long endTs = cmd.getStartTs() + cmd.getTimeWindow();
-            List<TsKvQuery> queries = keys.stream().map(key -> new BaseTsKvQuery(key, startTs, endTs, cmd.getInterval(), getLimit(cmd.getLimit()), getAggregation(cmd.getAgg()), ORDER_BY)).collect(Collectors.toList());
+            List<TsKvQuery> queries = keys.stream().map(key -> new BaseTsKvQuery(key, startTs, endTs, cmd.getInterval(),
+                    getLimit(cmd.getLimit()), getAggregation(cmd.getAgg()), ORDER_BY, false)).collect(Collectors.toList());
             ctx.loadTimeseries(entityId, queries, getSubscriptionCallback(sessionRef, cmd, sessionId, entityId, startTs, keys));
         } else {
             List<String> keys = new ArrayList<>(getKeys(cmd).orElse(Collections.emptySet()));
@@ -301,7 +302,8 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler {
         }
         EntityId entityId = EntityIdFactory.getByTypeAndId(cmd.getEntityType(), cmd.getEntityId());
         List<String> keys = new ArrayList<>(getKeys(cmd).orElse(Collections.emptySet()));
-        List<TsKvQuery> queries = keys.stream().map(key -> new BaseTsKvQuery(key, cmd.getStartTs(), cmd.getEndTs(), cmd.getInterval(), getLimit(cmd.getLimit()), getAggregation(cmd.getAgg()), ORDER_BY))
+        List<TsKvQuery> queries = keys.stream().map(key -> new BaseTsKvQuery(key, cmd.getStartTs(), cmd.getEndTs(),
+                cmd.getInterval(), getLimit(cmd.getLimit()), getAggregation(cmd.getAgg()), ORDER_BY, false))
                 .collect(Collectors.toList());
         ctx.loadTimeseries(entityId, queries, new PluginCallback<List<TsKvEntry>>() {
             @Override