Details
diff --git a/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java b/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java
index b077ca3..ff2ed25 100644
--- a/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java
+++ b/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java
@@ -28,13 +28,7 @@ import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.util.StringUtils;
-import org.springframework.web.bind.annotation.PathVariable;
-import org.springframework.web.bind.annotation.RequestBody;
-import org.springframework.web.bind.annotation.RequestMapping;
-import org.springframework.web.bind.annotation.RequestMethod;
-import org.springframework.web.bind.annotation.RequestParam;
-import org.springframework.web.bind.annotation.ResponseBody;
-import org.springframework.web.bind.annotation.RestController;
+import org.springframework.web.bind.annotation.*;
import org.springframework.web.context.request.async.DeferredResult;
import org.thingsboard.rule.engine.api.msg.DeviceAttributesEventNotificationMsg;
import org.thingsboard.server.common.data.DataConstants;
@@ -45,19 +39,7 @@ import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.EntityIdFactory;
import org.thingsboard.server.common.data.id.UUIDBased;
-import org.thingsboard.server.common.data.kv.Aggregation;
-import org.thingsboard.server.common.data.kv.AttributeKey;
-import org.thingsboard.server.common.data.kv.AttributeKvEntry;
-import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
-import org.thingsboard.server.common.data.kv.BaseTsKvQuery;
-import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
-import org.thingsboard.server.common.data.kv.BooleanDataEntry;
-import org.thingsboard.server.common.data.kv.DoubleDataEntry;
-import org.thingsboard.server.common.data.kv.KvEntry;
-import org.thingsboard.server.common.data.kv.LongDataEntry;
-import org.thingsboard.server.common.data.kv.StringDataEntry;
-import org.thingsboard.server.common.data.kv.TsKvEntry;
-import org.thingsboard.server.common.data.kv.TsKvQuery;
+import org.thingsboard.server.common.data.kv.*;
import org.thingsboard.server.common.msg.cluster.SendToClusterMsg;
import org.thingsboard.server.common.msg.core.TelemetryUploadRequest;
import org.thingsboard.server.common.transport.adaptor.JsonConverter;
@@ -74,14 +56,7 @@ import org.thingsboard.server.service.telemetry.exception.UncheckedApiException;
import javax.annotation.Nullable;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
+import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
@@ -201,7 +176,7 @@ public class TelemetryController extends BaseController {
(result, entityId) -> {
// If interval is 0, convert this to a NONE aggregation, which is probably what the user really wanted
Aggregation agg = interval == 0L ? Aggregation.valueOf(Aggregation.NONE.name()) : Aggregation.valueOf(aggStr);
- List<TsKvQuery> queries = toKeysList(keys).stream().map(key -> new BaseTsKvQuery(key, startTs, endTs, interval, limit, agg, "DESC", false))
+ List<TsKvQuery> queries = toKeysList(keys).stream().map(key -> new BaseTsKvQuery(key, startTs, endTs, interval, limit, agg, "DESC"))
.collect(Collectors.toList());
Futures.addCallback(tsService.findAll(entityId, queries), getTsKvListCallback(result));
diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetryWebSocketService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetryWebSocketService.java
index c9d9898..0e8a0d4 100644
--- a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetryWebSocketService.java
+++ b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetryWebSocketService.java
@@ -28,22 +28,12 @@ import org.springframework.util.StringUtils;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.EntityIdFactory;
-import org.thingsboard.server.common.data.kv.Aggregation;
-import org.thingsboard.server.common.data.kv.AttributeKvEntry;
-import org.thingsboard.server.common.data.kv.BaseTsKvQuery;
-import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
-import org.thingsboard.server.common.data.kv.TsKvEntry;
-import org.thingsboard.server.common.data.kv.TsKvQuery;
+import org.thingsboard.server.common.data.kv.*;
import org.thingsboard.server.dao.attributes.AttributesService;
import org.thingsboard.server.dao.timeseries.TimeseriesService;
import org.thingsboard.server.service.security.AccessValidator;
import org.thingsboard.server.service.security.ValidationResult;
-import org.thingsboard.server.service.telemetry.cmd.AttributesSubscriptionCmd;
-import org.thingsboard.server.service.telemetry.cmd.GetHistoryCmd;
-import org.thingsboard.server.service.telemetry.cmd.SubscriptionCmd;
-import org.thingsboard.server.service.telemetry.cmd.TelemetryPluginCmd;
-import org.thingsboard.server.service.telemetry.cmd.TelemetryPluginCmdsWrapper;
-import org.thingsboard.server.service.telemetry.cmd.TimeseriesSubscriptionCmd;
+import org.thingsboard.server.service.telemetry.cmd.*;
import org.thingsboard.server.service.telemetry.exception.UnauthorizedException;
import org.thingsboard.server.service.telemetry.sub.SubscriptionErrorCode;
import org.thingsboard.server.service.telemetry.sub.SubscriptionState;
@@ -53,14 +43,7 @@ import javax.annotation.Nullable;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
+import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
@@ -251,7 +234,7 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi
}
EntityId entityId = EntityIdFactory.getByTypeAndId(cmd.getEntityType(), cmd.getEntityId());
List<String> keys = new ArrayList<>(getKeys(cmd).orElse(Collections.emptySet()));
- List<TsKvQuery> queries = keys.stream().map(key -> new BaseTsKvQuery(key, cmd.getStartTs(), cmd.getEndTs(), cmd.getInterval(), getLimit(cmd.getLimit()), getAggregation(cmd.getAgg()), "DESC", false))
+ List<TsKvQuery> queries = keys.stream().map(key -> new BaseTsKvQuery(key, cmd.getStartTs(), cmd.getEndTs(), cmd.getInterval(), getLimit(cmd.getLimit()), getAggregation(cmd.getAgg()), "DESC"))
.collect(Collectors.toList());
FutureCallback<List<TsKvEntry>> callback = new FutureCallback<List<TsKvEntry>>() {
@@ -338,7 +321,7 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi
startTs = cmd.getStartTs();
long endTs = cmd.getStartTs() + cmd.getTimeWindow();
List<TsKvQuery> queries = keys.stream().map(key -> new BaseTsKvQuery(key, startTs, endTs, cmd.getInterval(),
- getLimit(cmd.getLimit()), getAggregation(cmd.getAgg()), "DESC", false)).collect(Collectors.toList());
+ getLimit(cmd.getLimit()), getAggregation(cmd.getAgg()), "DESC")).collect(Collectors.toList());
final FutureCallback<List<TsKvEntry>> callback = getSubscriptionCallback(sessionRef, cmd, sessionId, entityId, startTs, keys);
accessValidator.validate(sessionRef.getSecurityCtx(), entityId,
diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/kv/BaseDeleteTsKvQuery.java b/common/data/src/main/java/org/thingsboard/server/common/data/kv/BaseDeleteTsKvQuery.java
new file mode 100644
index 0000000..f0c8de2
--- /dev/null
+++ b/common/data/src/main/java/org/thingsboard/server/common/data/kv/BaseDeleteTsKvQuery.java
@@ -0,0 +1,35 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.common.data.kv;
+
+import lombok.Data;
+
+@Data
+public class BaseDeleteTsKvQuery extends BaseQuery implements DeleteTsKvQuery {
+
+ private final Boolean rewriteLatestIfDeleted;
+
+ public BaseDeleteTsKvQuery(String key, long startTs, long endTs, boolean rewriteLatestIfDeleted) {
+ super(key, startTs, endTs);
+ this.rewriteLatestIfDeleted = rewriteLatestIfDeleted;
+ }
+
+ public BaseDeleteTsKvQuery(String key, long startTs, long endTs) {
+ this(key, startTs, endTs, false);
+ }
+
+
+}
diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/kv/BaseQuery.java b/common/data/src/main/java/org/thingsboard/server/common/data/kv/BaseQuery.java
new file mode 100644
index 0000000..b7e7139
--- /dev/null
+++ b/common/data/src/main/java/org/thingsboard/server/common/data/kv/BaseQuery.java
@@ -0,0 +1,33 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.common.data.kv;
+
+import lombok.Data;
+
+@Data
+public class BaseQuery implements Query {
+
+ private final String key;
+ private final long startTs;
+ private final long endTs;
+
+ public BaseQuery(String key, long startTs, long endTs) {
+ this.key = key;
+ this.startTs = startTs;
+ this.endTs = endTs;
+ }
+
+}
diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/kv/BaseTsKvQuery.java b/common/data/src/main/java/org/thingsboard/server/common/data/kv/BaseTsKvQuery.java
index 55d2797..873e1a2 100644
--- a/common/data/src/main/java/org/thingsboard/server/common/data/kv/BaseTsKvQuery.java
+++ b/common/data/src/main/java/org/thingsboard/server/common/data/kv/BaseTsKvQuery.java
@@ -18,31 +18,24 @@ package org.thingsboard.server.common.data.kv;
import lombok.Data;
@Data
-public class BaseTsKvQuery implements TsKvQuery {
+public class BaseTsKvQuery extends BaseQuery implements TsKvQuery {
- private final String key;
- private final long startTs;
- private final long endTs;
private final long interval;
private final int limit;
private final Aggregation aggregation;
private final String orderBy;
- private final Boolean rewriteLatestIfDeleted;
- public BaseTsKvQuery(String key, long startTs, long endTs, long interval, int limit, Aggregation aggregation, String orderBy,
- boolean rewriteLatestIfDeleted) {
- this.key = key;
- this.startTs = startTs;
- this.endTs = endTs;
+ public BaseTsKvQuery(String key, long startTs, long endTs, long interval, int limit, Aggregation aggregation,
+ String orderBy) {
+ super(key, startTs, endTs);
this.interval = interval;
this.limit = limit;
this.aggregation = aggregation;
this.orderBy = orderBy;
- this.rewriteLatestIfDeleted = rewriteLatestIfDeleted;
}
public BaseTsKvQuery(String key, long startTs, long endTs) {
- this(key, startTs, endTs, endTs - startTs, 1, Aggregation.AVG, "DESC", false);
+ this(key, startTs, endTs, endTs - startTs, 1, Aggregation.AVG, "DESC");
}
}
diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/kv/DeleteTsKvQuery.java b/common/data/src/main/java/org/thingsboard/server/common/data/kv/DeleteTsKvQuery.java
new file mode 100644
index 0000000..a0cd919
--- /dev/null
+++ b/common/data/src/main/java/org/thingsboard/server/common/data/kv/DeleteTsKvQuery.java
@@ -0,0 +1,22 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.common.data.kv;
+
+public interface DeleteTsKvQuery extends Query {
+
+ Boolean getRewriteLatestIfDeleted();
+
+}
diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/kv/Query.java b/common/data/src/main/java/org/thingsboard/server/common/data/kv/Query.java
new file mode 100644
index 0000000..9450604
--- /dev/null
+++ b/common/data/src/main/java/org/thingsboard/server/common/data/kv/Query.java
@@ -0,0 +1,26 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.common.data.kv;
+
+public interface Query {
+
+ String getKey();
+
+ long getStartTs();
+
+ long getEndTs();
+
+}
diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/kv/TsKvQuery.java b/common/data/src/main/java/org/thingsboard/server/common/data/kv/TsKvQuery.java
index 825df6c..9fe1136 100644
--- a/common/data/src/main/java/org/thingsboard/server/common/data/kv/TsKvQuery.java
+++ b/common/data/src/main/java/org/thingsboard/server/common/data/kv/TsKvQuery.java
@@ -15,13 +15,7 @@
*/
package org.thingsboard.server.common.data.kv;
-public interface TsKvQuery {
-
- String getKey();
-
- long getStartTs();
-
- long getEndTs();
+public interface TsKvQuery extends Query {
long getInterval();
@@ -31,5 +25,4 @@ public interface TsKvQuery {
String getOrderBy();
- Boolean getRewriteLatestIfDeleted();
}
diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/JpaTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/JpaTimeseriesDao.java
index 84b4374..56609a7 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/JpaTimeseriesDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/JpaTimeseriesDao.java
@@ -17,11 +17,7 @@ package org.thingsboard.server.dao.sql.timeseries;
import com.google.common.base.Function;
import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.SettableFuture;
+import com.google.common.util.concurrent.*;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
@@ -29,11 +25,7 @@ import org.springframework.data.domain.PageRequest;
import org.springframework.stereotype.Component;
import org.thingsboard.server.common.data.UUIDConverter;
import org.thingsboard.server.common.data.id.EntityId;
-import org.thingsboard.server.common.data.kv.Aggregation;
-import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
-import org.thingsboard.server.common.data.kv.StringDataEntry;
-import org.thingsboard.server.common.data.kv.TsKvEntry;
-import org.thingsboard.server.common.data.kv.TsKvQuery;
+import org.thingsboard.server.common.data.kv.*;
import org.thingsboard.server.dao.DaoUtil;
import org.thingsboard.server.dao.model.sql.TsKvEntity;
import org.thingsboard.server.dao.model.sql.TsKvLatestCompositeKey;
@@ -307,7 +299,7 @@ public class JpaTimeseriesDao extends JpaAbstractDaoListeningExecutorService imp
}
@Override
- public ListenableFuture<Void> remove(EntityId entityId, TsKvQuery query) {
+ public ListenableFuture<Void> remove(EntityId entityId, DeleteTsKvQuery query) {
return service.submit(() -> {
tsKvRepository.delete(
fromTimeUUID(entityId.getId()),
@@ -320,7 +312,7 @@ public class JpaTimeseriesDao extends JpaAbstractDaoListeningExecutorService imp
}
@Override
- public ListenableFuture<Void> removeLatest(EntityId entityId, TsKvQuery query) {
+ public ListenableFuture<Void> removeLatest(EntityId entityId, DeleteTsKvQuery query) {
TsKvLatestEntity latestEntity = new TsKvLatestEntity();
latestEntity.setEntityType(entityId.getEntityType());
latestEntity.setEntityId(fromTimeUUID(entityId.getId()));
@@ -332,7 +324,7 @@ public class JpaTimeseriesDao extends JpaAbstractDaoListeningExecutorService imp
}
@Override
- public ListenableFuture<Void> removePartition(EntityId entityId, TsKvQuery query) {
+ public ListenableFuture<Void> removePartition(EntityId entityId, DeleteTsKvQuery query) {
return service.submit(() -> null);
}
diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java
index f035fd8..8695200 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java
@@ -22,6 +22,7 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.thingsboard.server.common.data.id.EntityId;
+import org.thingsboard.server.common.data.kv.DeleteTsKvQuery;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.data.kv.TsKvQuery;
import org.thingsboard.server.dao.exception.IncorrectParameterException;
@@ -48,7 +49,7 @@ public class BaseTimeseriesService implements TimeseriesService {
@Override
public ListenableFuture<List<TsKvEntry>> findAll(EntityId entityId, List<TsKvQuery> queries) {
validate(entityId);
- queries.forEach(query -> validate(query));
+ queries.forEach(BaseTimeseriesService::validate);
return timeseriesDao.findAllAsync(entityId, queries);
}
@@ -97,17 +98,17 @@ public class BaseTimeseriesService implements TimeseriesService {
}
@Override
- public ListenableFuture<List<Void>> remove(EntityId entityId, List<TsKvQuery> tsKvQueries) {
+ public ListenableFuture<List<Void>> remove(EntityId entityId, List<DeleteTsKvQuery> deleteTsKvQueries) {
validate(entityId);
- tsKvQueries.forEach(BaseTimeseriesService::validate);
- List<ListenableFuture<Void>> futures = Lists.newArrayListWithExpectedSize(tsKvQueries.size() * DELETES_PER_ENTRY);
- for (TsKvQuery tsKvQuery : tsKvQueries) {
+ deleteTsKvQueries.forEach(BaseTimeseriesService::validate);
+ List<ListenableFuture<Void>> futures = Lists.newArrayListWithExpectedSize(deleteTsKvQueries.size() * DELETES_PER_ENTRY);
+ for (DeleteTsKvQuery tsKvQuery : deleteTsKvQueries) {
deleteAndRegisterFutures(futures, entityId, tsKvQuery);
}
return Futures.allAsList(futures);
}
- private void deleteAndRegisterFutures(List<ListenableFuture<Void>> futures, EntityId entityId, TsKvQuery query) {
+ private void deleteAndRegisterFutures(List<ListenableFuture<Void>> futures, EntityId entityId, DeleteTsKvQuery query) {
futures.add(timeseriesDao.remove(entityId, query));
futures.add(timeseriesDao.removeLatest(entityId, query));
futures.add(timeseriesDao.removePartition(entityId, query));
@@ -126,4 +127,12 @@ public class BaseTimeseriesService implements TimeseriesService {
throw new IncorrectParameterException("Incorrect TsKvQuery. Aggregation can't be empty");
}
}
+
+ private static void validate(DeleteTsKvQuery query) {
+ if (query == null) {
+ throw new IncorrectParameterException("DeleteTsKvQuery can't be null");
+ } else if (isBlank(query.getKey())) {
+ throw new IncorrectParameterException("Incorrect DeleteTsKvQuery. Key can't be empty");
+ }
+ }
}
diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java
index ce5b5b3..7be0e37 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java
@@ -138,7 +138,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
while (stepTs < query.getEndTs()) {
long startTs = stepTs;
long endTs = stepTs + step;
- TsKvQuery subQuery = new BaseTsKvQuery(query.getKey(), startTs, endTs, step, 1, query.getAggregation(), query.getOrderBy(), false);
+ TsKvQuery subQuery = new BaseTsKvQuery(query.getKey(), startTs, endTs, step, 1, query.getAggregation(), query.getOrderBy());
futures.add(findAndAggregateAsync(entityId, subQuery, toPartitionTs(startTs), toPartitionTs(endTs)));
stepTs = endTs;
}
@@ -346,7 +346,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
}
@Override
- public ListenableFuture<Void> remove(EntityId entityId, TsKvQuery query) {
+ public ListenableFuture<Void> remove(EntityId entityId, DeleteTsKvQuery query) {
long minPartition = toPartitionTs(query.getStartTs());
long maxPartition = toPartitionTs(query.getEndTs());
@@ -358,7 +358,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
Futures.addCallback(partitionsListFuture, new FutureCallback<List<Long>>() {
@Override
public void onSuccess(@Nullable List<Long> partitions) {
- TsKvQueryCursor cursor = new TsKvQueryCursor(entityId.getEntityType().name(), entityId.getId(), query, partitions);
+ QueryCursor cursor = new QueryCursor(entityId.getEntityType().name(), entityId.getId(), query, partitions);
deleteAsync(cursor, resultFuture);
}
@@ -370,7 +370,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
return resultFuture;
}
- private void deleteAsync(final TsKvQueryCursor cursor, final SimpleListenableFuture<Void> resultFuture) {
+ private void deleteAsync(final QueryCursor cursor, final SimpleListenableFuture<Void> resultFuture) {
if (!cursor.hasNextPartition()) {
resultFuture.set(null);
} else {
@@ -411,7 +411,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
}
@Override
- public ListenableFuture<Void> removeLatest(EntityId entityId, TsKvQuery query) {
+ public ListenableFuture<Void> removeLatest(EntityId entityId, DeleteTsKvQuery query) {
ListenableFuture<TsKvEntry> latestEntryFuture = findLatest(entityId, query.getKey());
ListenableFuture<Boolean> booleanFuture = Futures.transformAsync(latestEntryFuture, latestEntry -> {
@@ -445,11 +445,11 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
return removedLatestFuture;
}
- private ListenableFuture<Void> getNewLatestEntryFuture(EntityId entityId, TsKvQuery query) {
+ private ListenableFuture<Void> getNewLatestEntryFuture(EntityId entityId, DeleteTsKvQuery query) {
long startTs = 0;
long endTs = query.getStartTs() - 1;
TsKvQuery findNewLatestQuery = new BaseTsKvQuery(query.getKey(), startTs, endTs, endTs - startTs, 1,
- Aggregation.NONE, DESC_ORDER, false);
+ Aggregation.NONE, DESC_ORDER);
ListenableFuture<List<TsKvEntry>> future = findAllAsync(entityId, findNewLatestQuery);
return Futures.transformAsync(future, entryList -> {
@@ -472,7 +472,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
}
@Override
- public ListenableFuture<Void> removePartition(EntityId entityId, TsKvQuery query) {
+ public ListenableFuture<Void> removePartition(EntityId entityId, DeleteTsKvQuery query) {
long minPartition = toPartitionTs(query.getStartTs());
long maxPartition = toPartitionTs(query.getEndTs());
if (minPartition == maxPartition) {
@@ -494,7 +494,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
for (int i = index; i < partitions.size() - 1; i++) {
partitionsToDelete.add(partitions.get(i));
}
- TsKvQueryCursor cursor = new TsKvQueryCursor(entityId.getEntityType().name(), entityId.getId(), query, partitionsToDelete);
+ QueryCursor cursor = new QueryCursor(entityId.getEntityType().name(), entityId.getId(), query, partitionsToDelete);
deletePartitionAsync(cursor, resultFuture);
}
@@ -507,7 +507,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
}
}
- private void deletePartitionAsync(final TsKvQueryCursor cursor, final SimpleListenableFuture<Void> resultFuture) {
+ private void deletePartitionAsync(final QueryCursor cursor, final SimpleListenableFuture<Void> resultFuture) {
if (!cursor.hasNextPartition()) {
resultFuture.set(null);
} else {
diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/QueryCursor.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/QueryCursor.java
new file mode 100644
index 0000000..bcd6975
--- /dev/null
+++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/QueryCursor.java
@@ -0,0 +1,60 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.dao.timeseries;
+
+import lombok.Getter;
+import org.thingsboard.server.common.data.kv.Query;
+
+import java.util.List;
+import java.util.UUID;
+
+public class QueryCursor {
+
+ @Getter
+ protected final String entityType;
+ @Getter
+ protected final UUID entityId;
+ @Getter
+ protected final String key;
+ @Getter
+ private final long startTs;
+ @Getter
+ private final long endTs;
+
+ final List<Long> partitions;
+ private int partitionIndex;
+
+ public QueryCursor(String entityType, UUID entityId, Query baseQuery, List<Long> partitions) {
+ this.entityType = entityType;
+ this.entityId = entityId;
+ this.key = baseQuery.getKey();
+ this.startTs = baseQuery.getStartTs();
+ this.endTs = baseQuery.getEndTs();
+ this.partitions = partitions;
+ this.partitionIndex = partitions.size() - 1;
+ }
+
+ public boolean hasNextPartition() {
+ return partitionIndex >= 0;
+ }
+
+ public long getNextPartition() {
+ long partition = partitions.get(partitionIndex);
+ partitionIndex--;
+ return partition;
+ }
+
+}
diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesDao.java
index 62dbd50..7f05ff2 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesDao.java
@@ -17,6 +17,7 @@ package org.thingsboard.server.dao.timeseries;
import com.google.common.util.concurrent.ListenableFuture;
import org.thingsboard.server.common.data.id.EntityId;
+import org.thingsboard.server.common.data.kv.DeleteTsKvQuery;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.data.kv.TsKvQuery;
@@ -39,9 +40,9 @@ public interface TimeseriesDao {
ListenableFuture<Void> saveLatest(EntityId entityId, TsKvEntry tsKvEntry);
- ListenableFuture<Void> remove(EntityId entityId, TsKvQuery query);
+ ListenableFuture<Void> remove(EntityId entityId, DeleteTsKvQuery query);
- ListenableFuture<Void> removeLatest(EntityId entityId, TsKvQuery query);
+ ListenableFuture<Void> removeLatest(EntityId entityId, DeleteTsKvQuery query);
- ListenableFuture<Void> removePartition(EntityId entityId, TsKvQuery query);
+ ListenableFuture<Void> removePartition(EntityId entityId, DeleteTsKvQuery query);
}
diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java
index a149191..345d852 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java
@@ -17,6 +17,7 @@ package org.thingsboard.server.dao.timeseries;
import com.google.common.util.concurrent.ListenableFuture;
import org.thingsboard.server.common.data.id.EntityId;
+import org.thingsboard.server.common.data.kv.DeleteTsKvQuery;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.data.kv.TsKvQuery;
@@ -38,5 +39,5 @@ public interface TimeseriesService {
ListenableFuture<List<Void>> save(EntityId entityId, List<TsKvEntry> tsKvEntry, long ttl);
- ListenableFuture<List<Void>> remove(EntityId entityId, List<TsKvQuery> queries);
+ ListenableFuture<List<Void>> remove(EntityId entityId, List<DeleteTsKvQuery> queries);
}
diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TsKvQueryCursor.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TsKvQueryCursor.java
index c4925ee..2d93f5d 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TsKvQueryCursor.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TsKvQueryCursor.java
@@ -28,18 +28,8 @@ import static org.thingsboard.server.dao.timeseries.CassandraBaseTimeseriesDao.D
/**
* Created by ashvayka on 21.02.17.
*/
-public class TsKvQueryCursor {
- @Getter
- private final String entityType;
- @Getter
- private final UUID entityId;
- @Getter
- private final String key;
- @Getter
- private final long startTs;
- @Getter
- private final long endTs;
- private final List<Long> partitions;
+public class TsKvQueryCursor extends QueryCursor {
+
@Getter
private final List<TsKvEntry> data;
@Getter
@@ -49,18 +39,14 @@ public class TsKvQueryCursor {
private int currentLimit;
public TsKvQueryCursor(String entityType, UUID entityId, TsKvQuery baseQuery, List<Long> partitions) {
- this.entityType = entityType;
- this.entityId = entityId;
- this.key = baseQuery.getKey();
- this.startTs = baseQuery.getStartTs();
- this.endTs = baseQuery.getEndTs();
- this.partitions = partitions;
+ super(entityType, entityId, baseQuery, partitions);
this.orderBy = baseQuery.getOrderBy();
this.partitionIndex = isDesc() ? partitions.size() - 1 : 0;
this.data = new ArrayList<>();
this.currentLimit = baseQuery.getLimit();
}
+ @Override
public boolean hasNextPartition() {
return isDesc() ? partitionIndex >= 0 : partitionIndex <= partitions.size() - 1;
}
@@ -69,6 +55,7 @@ public class TsKvQueryCursor {
return currentLimit <= 0;
}
+ @Override
public long getNextPartition() {
long partition = partitions.get(partitionIndex);
if (isDesc()) {
diff --git a/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java b/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java
index 0d54b20..68b3fa1 100644
--- a/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java
+++ b/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java
@@ -20,15 +20,7 @@ import lombok.extern.slf4j.Slf4j;
import org.junit.Assert;
import org.junit.Test;
import org.thingsboard.server.common.data.id.DeviceId;
-import org.thingsboard.server.common.data.kv.Aggregation;
-import org.thingsboard.server.common.data.kv.BaseTsKvQuery;
-import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
-import org.thingsboard.server.common.data.kv.BooleanDataEntry;
-import org.thingsboard.server.common.data.kv.DoubleDataEntry;
-import org.thingsboard.server.common.data.kv.KvEntry;
-import org.thingsboard.server.common.data.kv.LongDataEntry;
-import org.thingsboard.server.common.data.kv.StringDataEntry;
-import org.thingsboard.server.common.data.kv.TsKvEntry;
+import org.thingsboard.server.common.data.kv.*;
import org.thingsboard.server.dao.service.AbstractServiceTest;
import java.util.ArrayList;
@@ -111,12 +103,10 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
saveEntries(deviceId, 40000);
tsService.remove(deviceId, Collections.singletonList(
- new BaseTsKvQuery(STRING_KEY, 15000, 45000, 10000, 0, Aggregation.NONE, DESC_ORDER,
- false))).get();
+ new BaseDeleteTsKvQuery(STRING_KEY, 15000, 45000))).get();
List<TsKvEntry> list = tsService.findAll(deviceId, Collections.singletonList(
- new BaseTsKvQuery(STRING_KEY, 5000, 45000, 10000, 10, Aggregation.NONE, DESC_ORDER,
- false))).get();
+ new BaseTsKvQuery(STRING_KEY, 5000, 45000, 10000, 10, Aggregation.NONE, DESC_ORDER))).get();
Assert.assertEquals(1, list.size());
List<TsKvEntry> latest = tsService.findLatest(deviceId, Collections.singletonList(STRING_KEY)).get();
@@ -138,7 +128,7 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
entries.add(save(deviceId, 55000, 600));
List<TsKvEntry> list = tsService.findAll(deviceId, Collections.singletonList(new BaseTsKvQuery(LONG_KEY, 0,
- 60000, 20000, 3, Aggregation.NONE, DESC_ORDER, false))).get();
+ 60000, 20000, 3, Aggregation.NONE, DESC_ORDER))).get();
assertEquals(3, list.size());
assertEquals(55000, list.get(0).getTs());
assertEquals(java.util.Optional.of(600L), list.get(0).getLongValue());
@@ -150,7 +140,7 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
assertEquals(java.util.Optional.of(400L), list.get(2).getLongValue());
list = tsService.findAll(deviceId, Collections.singletonList(new BaseTsKvQuery(LONG_KEY, 0,
- 60000, 20000, 3, Aggregation.AVG, DESC_ORDER, false))).get();
+ 60000, 20000, 3, Aggregation.AVG, DESC_ORDER))).get();
assertEquals(3, list.size());
assertEquals(10000, list.get(0).getTs());
assertEquals(java.util.Optional.of(150L), list.get(0).getLongValue());
@@ -162,7 +152,7 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
assertEquals(java.util.Optional.of(550L), list.get(2).getLongValue());
list = tsService.findAll(deviceId, Collections.singletonList(new BaseTsKvQuery(LONG_KEY, 0,
- 60000, 20000, 3, Aggregation.SUM, DESC_ORDER, false))).get();
+ 60000, 20000, 3, Aggregation.SUM, DESC_ORDER))).get();
assertEquals(3, list.size());
assertEquals(10000, list.get(0).getTs());
@@ -175,7 +165,7 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
assertEquals(java.util.Optional.of(1100L), list.get(2).getLongValue());
list = tsService.findAll(deviceId, Collections.singletonList(new BaseTsKvQuery(LONG_KEY, 0,
- 60000, 20000, 3, Aggregation.MIN, DESC_ORDER, false))).get();
+ 60000, 20000, 3, Aggregation.MIN, DESC_ORDER))).get();
assertEquals(3, list.size());
assertEquals(10000, list.get(0).getTs());
@@ -188,7 +178,7 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
assertEquals(java.util.Optional.of(500L), list.get(2).getLongValue());
list = tsService.findAll(deviceId, Collections.singletonList(new BaseTsKvQuery(LONG_KEY, 0,
- 60000, 20000, 3, Aggregation.MAX, DESC_ORDER, false))).get();
+ 60000, 20000, 3, Aggregation.MAX, DESC_ORDER))).get();
assertEquals(3, list.size());
assertEquals(10000, list.get(0).getTs());
@@ -201,7 +191,7 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
assertEquals(java.util.Optional.of(600L), list.get(2).getLongValue());
list = tsService.findAll(deviceId, Collections.singletonList(new BaseTsKvQuery(LONG_KEY, 0,
- 60000, 20000, 3, Aggregation.COUNT, DESC_ORDER, false))).get();
+ 60000, 20000, 3, Aggregation.COUNT, DESC_ORDER))).get();
assertEquals(3, list.size());
assertEquals(10000, list.get(0).getTs());