thingsboard-aplcache
Changes
application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java 29(+21 -8)
dao/src/main/java/org/thingsboard/server/dao/timeseries/AggregatePartitionsFunction.java 175(+175 -0)
Details
diff --git a/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java b/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java
index 92e0ec4..6490124 100644
--- a/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java
+++ b/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java
@@ -1,12 +1,12 @@
/**
* Copyright © 2016-2017 The Thingsboard Authors
- *
+ * <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,6 +17,7 @@ package org.thingsboard.server.actors.plugin;
import java.io.IOException;
import java.util.*;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
@@ -152,7 +153,19 @@ public final class PluginProcessingContext implements PluginContext {
@Override
public List<TsKvEntry> loadTimeseries(DeviceId deviceId, TsKvQuery query) {
validate(deviceId);
- return pluginCtx.tsService.find(DataConstants.DEVICE, deviceId, query);
+ try {
+ return pluginCtx.tsService.findAll(DataConstants.DEVICE, deviceId, query).get();
+ } catch (Exception e) {
+ log.error("TODO", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void loadTimeseries(DeviceId deviceId, TsKvQuery query, PluginCallback<List<TsKvEntry>> callback) {
+ validate(deviceId);
+ ListenableFuture<List<TsKvEntry>> future = pluginCtx.tsService.findAll(DataConstants.DEVICE, deviceId, query);
+ Futures.addCallback(future, getCallback(callback, v -> v), executor);
}
@Override
@@ -235,10 +248,10 @@ public final class PluginProcessingContext implements PluginContext {
};
}
- private <T> FutureCallback<ResultSet> getCallback(final PluginCallback<T> callback, Function<ResultSet, T> transformer) {
- return new FutureCallback<ResultSet>() {
+ private <T, R> FutureCallback<R> getCallback(final PluginCallback<T> callback, Function<R, T> transformer) {
+ return new FutureCallback<R>() {
@Override
- public void onSuccess(@Nullable ResultSet result) {
+ public void onSuccess(@Nullable R result) {
pluginCtx.self().tell(PluginCallbackMessage.onSuccess(callback, transformer.apply(result)), ActorRef.noSender());
}
diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/kv/Aggregation.java b/common/data/src/main/java/org/thingsboard/server/common/data/kv/Aggregation.java
new file mode 100644
index 0000000..f8fad6c
--- /dev/null
+++ b/common/data/src/main/java/org/thingsboard/server/common/data/kv/Aggregation.java
@@ -0,0 +1,10 @@
+package org.thingsboard.server.common.data.kv;
+
+/**
+ * Created by ashvayka on 20.02.17.
+ */
+public enum Aggregation {
+
+ MIN, MAX, AVG, SUM, COUNT, NONE;
+
+}
diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/kv/BaseTsKvQuery.java b/common/data/src/main/java/org/thingsboard/server/common/data/kv/BaseTsKvQuery.java
index 7bb1a3f..78c887f 100644
--- a/common/data/src/main/java/org/thingsboard/server/common/data/kv/BaseTsKvQuery.java
+++ b/common/data/src/main/java/org/thingsboard/server/common/data/kv/BaseTsKvQuery.java
@@ -1,12 +1,12 @@
/**
* Copyright © 2016-2017 The Thingsboard Authors
- *
+ * <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -15,59 +15,27 @@
*/
package org.thingsboard.server.common.data.kv;
-import java.util.Optional;
+import lombok.Data;
+@Data
public class BaseTsKvQuery implements TsKvQuery {
- private String key;
- private Optional<Long> startTs;
- private Optional<Long> endTs;
- private Optional<Integer> limit;
+ private final String key;
+ private final long startTs;
+ private final long endTs;
+ private final int limit;
+ private final Aggregation aggregation;
- public BaseTsKvQuery(String key, Optional<Long> startTs, Optional<Long> endTs, Optional<Integer> limit) {
+ public BaseTsKvQuery(String key, long startTs, long endTs, int limit, Aggregation aggregation) {
this.key = key;
this.startTs = startTs;
this.endTs = endTs;
this.limit = limit;
- }
-
- public BaseTsKvQuery(String key, Long startTs, Long endTs, Integer limit) {
- this(key, Optional.ofNullable(startTs), Optional.ofNullable(endTs), Optional.ofNullable(limit));
- }
-
- public BaseTsKvQuery(String key, Long startTs, Integer limit) {
- this(key, startTs, null, limit);
- }
-
- public BaseTsKvQuery(String key, Long startTs, Long endTs) {
- this(key, startTs, endTs, null);
- }
-
- public BaseTsKvQuery(String key, Long startTs) {
- this(key, startTs, null, null);
+ this.aggregation = aggregation;
}
- public BaseTsKvQuery(String key, Integer limit) {
- this(key, null, null, limit);
+ public BaseTsKvQuery(String key, long startTs, long endTs) {
+ this(key, startTs, endTs, 1, Aggregation.AVG);
}
- @Override
- public String getKey() {
- return key;
- }
-
- @Override
- public Optional<Long> getStartTs() {
- return startTs;
- }
-
- @Override
- public Optional<Long> getEndTs() {
- return endTs;
- }
-
- @Override
- public Optional<Integer> getLimit() {
- return limit;
- }
}
diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/kv/TsKvQuery.java b/common/data/src/main/java/org/thingsboard/server/common/data/kv/TsKvQuery.java
index 1303117..10a13ce 100644
--- a/common/data/src/main/java/org/thingsboard/server/common/data/kv/TsKvQuery.java
+++ b/common/data/src/main/java/org/thingsboard/server/common/data/kv/TsKvQuery.java
@@ -21,10 +21,12 @@ public interface TsKvQuery {
String getKey();
- Optional<Long> getStartTs();
+ long getStartTs();
- Optional<Long> getEndTs();
+ long getEndTs();
- Optional<Integer> getLimit();
+ int getLimit();
+
+ Aggregation getAggregation();
}
diff --git a/dao/src/main/java/org/thingsboard/server/dao/model/ModelConstants.java b/dao/src/main/java/org/thingsboard/server/dao/model/ModelConstants.java
index b68fb75..0f8418a 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/model/ModelConstants.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/model/ModelConstants.java
@@ -1,12 +1,12 @@
/**
* Copyright © 2016-2017 The Thingsboard Authors
- *
+ * <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -18,14 +18,15 @@ package org.thingsboard.server.dao.model;
import java.util.UUID;
import com.datastax.driver.core.utils.UUIDs;
+import org.apache.commons.lang3.ArrayUtils;
public class ModelConstants {
private ModelConstants() {
}
-
+
public static UUID NULL_UUID = UUIDs.startOf(0);
-
+
/**
* Generic constants.
*/
@@ -38,7 +39,7 @@ public class ModelConstants {
public static final String ALIAS_PROPERTY = "alias";
public static final String SEARCH_TEXT_PROPERTY = "search_text";
public static final String ADDITIONAL_INFO_PROPERTY = "additional_info";
-
+
/**
* Cassandra user constants.
*/
@@ -50,11 +51,11 @@ public class ModelConstants {
public static final String USER_FIRST_NAME_PROPERTY = "first_name";
public static final String USER_LAST_NAME_PROPERTY = "last_name";
public static final String USER_ADDITIONAL_INFO_PROPERTY = ADDITIONAL_INFO_PROPERTY;
-
+
public static final String USER_BY_EMAIL_COLUMN_FAMILY_NAME = "user_by_email";
public static final String USER_BY_TENANT_AND_SEARCH_TEXT_COLUMN_FAMILY_NAME = "user_by_tenant_and_search_text";
public static final String USER_BY_CUSTOMER_AND_SEARCH_TEXT_COLUMN_FAMILY_NAME = "user_by_customer_and_search_text";
-
+
/**
* Cassandra user_credentials constants.
*/
@@ -64,20 +65,20 @@ public class ModelConstants {
public static final String USER_CREDENTIALS_PASSWORD_PROPERTY = "password";
public static final String USER_CREDENTIALS_ACTIVATE_TOKEN_PROPERTY = "activate_token";
public static final String USER_CREDENTIALS_RESET_TOKEN_PROPERTY = "reset_token";
-
+
public static final String USER_CREDENTIALS_BY_USER_COLUMN_FAMILY_NAME = "user_credentials_by_user";
public static final String USER_CREDENTIALS_BY_ACTIVATE_TOKEN_COLUMN_FAMILY_NAME = "user_credentials_by_activate_token";
public static final String USER_CREDENTIALS_BY_RESET_TOKEN_COLUMN_FAMILY_NAME = "user_credentials_by_reset_token";
-
+
/**
* Cassandra admin_settings constants.
*/
public static final String ADMIN_SETTINGS_COLUMN_FAMILY_NAME = "admin_settings";
public static final String ADMIN_SETTINGS_KEY_PROPERTY = "key";
public static final String ADMIN_SETTINGS_JSON_VALUE_PROPERTY = "json_value";
-
+
public static final String ADMIN_SETTINGS_BY_KEY_COLUMN_FAMILY_NAME = "admin_settings_by_key";
-
+
/**
* Cassandra contact constants.
*/
@@ -97,9 +98,9 @@ public class ModelConstants {
public static final String TENANT_TITLE_PROPERTY = TITLE_PROPERTY;
public static final String TENANT_REGION_PROPERTY = "region";
public static final String TENANT_ADDITIONAL_INFO_PROPERTY = ADDITIONAL_INFO_PROPERTY;
-
+
public static final String TENANT_BY_REGION_AND_SEARCH_TEXT_COLUMN_FAMILY_NAME = "tenant_by_region_and_search_text";
-
+
/**
* Cassandra customer constants.
*/
@@ -107,9 +108,9 @@ public class ModelConstants {
public static final String CUSTOMER_TENANT_ID_PROPERTY = TENTANT_ID_PROPERTY;
public static final String CUSTOMER_TITLE_PROPERTY = TITLE_PROPERTY;
public static final String CUSTOMER_ADDITIONAL_INFO_PROPERTY = ADDITIONAL_INFO_PROPERTY;
-
+
public static final String CUSTOMER_BY_TENANT_AND_SEARCH_TEXT_COLUMN_FAMILY_NAME = "customer_by_tenant_and_search_text";
-
+
/**
* Cassandra device constants.
*/
@@ -118,12 +119,12 @@ public class ModelConstants {
public static final String DEVICE_CUSTOMER_ID_PROPERTY = CUSTOMER_ID_PROPERTY;
public static final String DEVICE_NAME_PROPERTY = "name";
public static final String DEVICE_ADDITIONAL_INFO_PROPERTY = ADDITIONAL_INFO_PROPERTY;
-
+
public static final String DEVICE_BY_TENANT_AND_SEARCH_TEXT_COLUMN_FAMILY_NAME = "device_by_tenant_and_search_text";
public static final String DEVICE_BY_CUSTOMER_AND_SEARCH_TEXT_COLUMN_FAMILY_NAME = "device_by_customer_and_search_text";
public static final String DEVICE_BY_TENANT_AND_NAME_VIEW_NAME = "device_by_tenant_and_name";
-
+
/**
* Cassandra device_credentials constants.
*/
@@ -132,7 +133,7 @@ public class ModelConstants {
public static final String DEVICE_CREDENTIALS_CREDENTIALS_TYPE_PROPERTY = "credentials_type";
public static final String DEVICE_CREDENTIALS_CREDENTIALS_ID_PROPERTY = "credentials_id";
public static final String DEVICE_CREDENTIALS_CREDENTIALS_VALUE_PROPERTY = "credentials_value";
-
+
public static final String DEVICE_CREDENTIALS_BY_DEVICE_COLUMN_FAMILY_NAME = "device_credentials_by_device";
public static final String DEVICE_CREDENTIALS_BY_CREDENTIALS_ID_COLUMN_FAMILY_NAME = "device_credentials_by_credentials_id";
@@ -203,9 +204,9 @@ public class ModelConstants {
public static final String COMPONENT_DESCRIPTOR_BY_SCOPE_TYPE_AND_SEARCH_TEXT_COLUMN_FAMILY_NAME = "component_desc_by_scope_type_search_text";
public static final String COMPONENT_DESCRIPTOR_BY_ID = "component_desc_by_id";
- /**
- * Cassandra rule metadata constants.
- */
+ /**
+ * Cassandra rule metadata constants.
+ */
public static final String RULE_COLUMN_FAMILY_NAME = "rule";
public static final String RULE_TENANT_ID_PROPERTY = TENTANT_ID_PROPERTY;
public static final String RULE_NAME_PROPERTY = "name";
@@ -259,4 +260,31 @@ public class ModelConstants {
public static final String STRING_VALUE_COLUMN = "str_v";
public static final String LONG_VALUE_COLUMN = "long_v";
public static final String DOUBLE_VALUE_COLUMN = "dbl_v";
+
+ public static final String[] COUNT_AGGREGATION_COLUMNS = new String[]{count(LONG_VALUE_COLUMN), count(DOUBLE_VALUE_COLUMN), count(BOOLEAN_VALUE_COLUMN), count(STRING_VALUE_COLUMN)};
+
+ public static final String[] NONE_AGGREGATION_COLUMNS = new String[]{LONG_VALUE_COLUMN, DOUBLE_VALUE_COLUMN, BOOLEAN_VALUE_COLUMN, STRING_VALUE_COLUMN,};
+ public static final String[] MIN_AGGREGATION_COLUMNS = ArrayUtils.addAll(COUNT_AGGREGATION_COLUMNS,
+ new String[]{min(LONG_VALUE_COLUMN), min(DOUBLE_VALUE_COLUMN), min(BOOLEAN_VALUE_COLUMN), min(STRING_VALUE_COLUMN)});
+ public static final String[] MAX_AGGREGATION_COLUMNS = ArrayUtils.addAll(COUNT_AGGREGATION_COLUMNS,
+ new String[]{max(LONG_VALUE_COLUMN), max(DOUBLE_VALUE_COLUMN), max(BOOLEAN_VALUE_COLUMN), max(STRING_VALUE_COLUMN)});
+ public static final String[] SUM_AGGREGATION_COLUMNS = ArrayUtils.addAll(COUNT_AGGREGATION_COLUMNS,
+ new String[]{sum(LONG_VALUE_COLUMN), sum(DOUBLE_VALUE_COLUMN)});
+ public static final String[] AVG_AGGREGATION_COLUMNS = ArrayUtils.addAll(COUNT_AGGREGATION_COLUMNS, SUM_AGGREGATION_COLUMNS);
+
+ public static String min(String s) {
+ return "min(" + s + ")";
+ }
+
+ public static String max(String s) {
+ return "max(" + s + ")";
+ }
+
+ public static String sum(String s) {
+ return "sum(" + s + ")";
+ }
+
+ public static String count(String s) {
+ return "count(" + s + ")";
+ }
}
diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/AggregatePartitionsFunction.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/AggregatePartitionsFunction.java
new file mode 100644
index 0000000..9ee9022
--- /dev/null
+++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/AggregatePartitionsFunction.java
@@ -0,0 +1,175 @@
+package org.thingsboard.server.dao.timeseries;
+
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import org.thingsboard.server.common.data.kv.*;
+
+import javax.annotation.Nullable;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * Created by ashvayka on 20.02.17.
+ */
+public class AggregatePartitionsFunction implements com.google.common.base.Function<List<ResultSet>, Optional<TsKvEntry>> {
+
+ private static final int LONG_CNT_POS = 0;
+ private static final int DOUBLE_CNT_POS = 1;
+ private static final int BOOL_CNT_POS = 2;
+ private static final int STR_CNT_POS = 3;
+ private static final int LONG_POS = 4;
+ private static final int DOUBLE_POS = 5;
+ private static final int BOOL_POS = 6;
+ private static final int STR_POS = 7;
+
+ private final Aggregation aggregation;
+ private final String key;
+ private final long ts;
+
+ public AggregatePartitionsFunction(Aggregation aggregation, String key, long ts) {
+ this.aggregation = aggregation;
+ this.key = key;
+ this.ts = ts;
+ }
+
+ @Nullable
+ @Override
+ public Optional<TsKvEntry> apply(@Nullable List<ResultSet> rsList) {
+ if (rsList == null || rsList.size() == 0) {
+ return Optional.empty();
+ }
+ long count = 0;
+ DataType dataType = null;
+
+ Boolean bValue = null;
+ String sValue = null;
+ Double dValue = null;
+ Long lValue = null;
+
+ for (ResultSet rs : rsList) {
+ for (Row row : rs.all()) {
+ long curCount;
+
+ Long curLValue = null;
+ Double curDValue = null;
+ Boolean curBValue = null;
+ String curSValue = null;
+
+ long longCount = row.getLong(LONG_CNT_POS);
+ long doubleCount = row.getLong(DOUBLE_CNT_POS);
+ long boolCount = row.getLong(BOOL_CNT_POS);
+ long strCount = row.getLong(STR_CNT_POS);
+
+ if (longCount > 0) {
+ dataType = DataType.LONG;
+ curCount = longCount;
+ curLValue = getLongValue(row);
+ } else if (doubleCount > 0) {
+ dataType = DataType.DOUBLE;
+ curCount = doubleCount;
+ curDValue = getDoubleValue(row);
+ } else if (boolCount > 0) {
+ dataType = DataType.BOOLEAN;
+ curCount = boolCount;
+ curBValue = getBooleanValue(row);
+ } else if (strCount > 0) {
+ dataType = DataType.STRING;
+ curCount = strCount;
+ curSValue = getStringValue(row);
+ } else {
+ continue;
+ }
+
+ if (aggregation == Aggregation.COUNT) {
+ count += curCount;
+ } else if (aggregation == Aggregation.AVG || aggregation == Aggregation.SUM) {
+ count += curCount;
+ dValue = dValue == null ? curDValue : dValue + curDValue;
+ lValue = lValue == null ? curLValue : lValue + curLValue;
+ } else if (aggregation == Aggregation.MIN) {
+ if (curDValue != null) {
+ dValue = dValue == null ? curDValue : Math.min(dValue, curDValue);
+ } else if (curLValue != null) {
+ lValue = lValue == null ? curLValue : Math.min(lValue, curLValue);
+ } else if (curBValue != null) {
+ bValue = bValue == null ? curBValue : bValue && curBValue;
+ } else if (curSValue != null) {
+ if (sValue == null || curSValue.compareTo(sValue) < 0) {
+ sValue = curSValue;
+ }
+ }
+ } else if (aggregation == Aggregation.MAX) {
+ if (curDValue != null) {
+ dValue = dValue == null ? curDValue : Math.max(dValue, curDValue);
+ } else if (curLValue != null) {
+ lValue = lValue == null ? curLValue : Math.max(lValue, curLValue);
+ } else if (curBValue != null) {
+ bValue = bValue == null ? curBValue : bValue || curBValue;
+ } else if (curSValue != null) {
+ if (sValue == null || curSValue.compareTo(sValue) > 0) {
+ sValue = curSValue;
+ }
+ }
+ }
+ }
+ }
+ if (dataType == null) {
+ return Optional.empty();
+ } else if (aggregation == Aggregation.COUNT) {
+ return Optional.of(new BasicTsKvEntry(ts, new LongDataEntry(key, (long) count)));
+ } else if (aggregation == Aggregation.AVG || aggregation == Aggregation.SUM) {
+ if (count == 0 || (dataType == DataType.DOUBLE && dValue == null) || (dataType == DataType.LONG && lValue == null)) {
+ return Optional.empty();
+ } else if (dataType == DataType.DOUBLE) {
+ return Optional.of(new BasicTsKvEntry(ts, new DoubleDataEntry(key, aggregation == Aggregation.SUM ? dValue : (dValue / count))));
+ } else if (dataType == DataType.LONG) {
+ return Optional.of(new BasicTsKvEntry(ts, new LongDataEntry(key, aggregation == Aggregation.SUM ? lValue : (lValue / count))));
+ }
+ } else if (aggregation == Aggregation.MIN || aggregation == Aggregation.MAX) {
+ if (dataType == DataType.DOUBLE) {
+ return Optional.of(new BasicTsKvEntry(ts, new DoubleDataEntry(key, dValue)));
+ } else if (dataType == DataType.LONG) {
+ return Optional.of(new BasicTsKvEntry(ts, new LongDataEntry(key, lValue)));
+ } else if (dataType == DataType.STRING) {
+ return Optional.of(new BasicTsKvEntry(ts, new StringDataEntry(key, sValue)));
+ } else {
+ return Optional.of(new BasicTsKvEntry(ts, new BooleanDataEntry(key, bValue)));
+ }
+ }
+ return null;
+ }
+
+ private Boolean getBooleanValue(Row row) {
+ if (aggregation == Aggregation.MIN || aggregation == Aggregation.MAX) {
+ return row.getBool(BOOL_POS);
+ } else {
+ return null;
+ }
+ }
+
+ private String getStringValue(Row row) {
+ if (aggregation == Aggregation.MIN || aggregation == Aggregation.MAX) {
+ return row.getString(STR_POS);
+ } else {
+ return null;
+ }
+ }
+
+ private Long getLongValue(Row row) {
+ if (aggregation == Aggregation.MIN || aggregation == Aggregation.MAX
+ || aggregation == Aggregation.SUM || aggregation == Aggregation.AVG) {
+ return row.getLong(LONG_POS);
+ } else {
+ return null;
+ }
+ }
+
+ private Double getDoubleValue(Row row) {
+ if (aggregation == Aggregation.MIN || aggregation == Aggregation.MAX
+ || aggregation == Aggregation.SUM || aggregation == Aggregation.AVG) {
+ return row.getDouble(DOUBLE_POS);
+ } else {
+ return null;
+ }
+ }
+}
diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesDao.java
index 09c415c..3419729 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesDao.java
@@ -1,12 +1,12 @@
/**
* Copyright © 2016-2017 The Thingsboard Authors
- *
+ * <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -18,6 +18,10 @@ package org.thingsboard.server.dao.timeseries;
import com.datastax.driver.core.*;
import com.datastax.driver.core.querybuilder.QueryBuilder;
import com.datastax.driver.core.querybuilder.Select;
+import com.google.common.base.Function;
+import com.google.common.util.concurrent.AsyncFunction;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@@ -26,7 +30,16 @@ import org.thingsboard.server.common.data.kv.DataType;
import org.thingsboard.server.dao.AbstractDao;
import org.thingsboard.server.dao.model.ModelConstants;
+import javax.annotation.Nullable;
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
import java.util.*;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.stream.Collectors;
import static com.datastax.driver.core.querybuilder.QueryBuilder.eq;
import static com.datastax.driver.core.querybuilder.QueryBuilder.select;
@@ -41,48 +54,136 @@ public class BaseTimeseriesDao extends AbstractDao implements TimeseriesDao {
@Value("${cassandra.query.max_limit_per_request}")
protected Integer maxLimitPerRequest;
+ @Value("${cassandra.query.read_result_processing_threads}")
+ private int readResultsProcessingThreads;
+
+ @Value("${cassandra.query.min_read_step}")
+ private int minReadStep;
+
+ @Value("${cassandra.query.ts_key_value_partitioning}")
+ private String partitioning;
+
+ private TsPartitionDate tsFormat;
+
+ private ExecutorService readResultsProcessingExecutor;
+
private PreparedStatement partitionInsertStmt;
private PreparedStatement[] latestInsertStmts;
private PreparedStatement[] saveStmts;
+ private PreparedStatement[] fetchStmts;
private PreparedStatement findLatestStmt;
private PreparedStatement findAllLatestStmt;
+ @PostConstruct
+ public void init() {
+ getFetchStmt(Aggregation.NONE);
+ readResultsProcessingExecutor = Executors.newFixedThreadPool(readResultsProcessingThreads);
+ Optional<TsPartitionDate> partition = TsPartitionDate.parse(partitioning);
+ if (partition.isPresent()) {
+ tsFormat = partition.get();
+ } else {
+ log.warn("Incorrect configuration of partitioning {}", partitioning);
+ throw new RuntimeException("Failed to parse partitioning property: " + partitioning + "!");
+ }
+ }
+
+ @PreDestroy
+ public void stop() {
+ if (readResultsProcessingExecutor != null) {
+ readResultsProcessingExecutor.shutdownNow();
+ }
+ }
+
@Override
- public List<TsKvEntry> find(String entityType, UUID entityId, TsKvQuery query, Optional<Long> minPartition, Optional<Long> maxPartition) {
- List<Row> rows = Collections.emptyList();
- Long[] parts = fetchPartitions(entityType, entityId, query.getKey(), minPartition, maxPartition);
- int partsLength = parts.length;
- if (parts != null && partsLength > 0) {
- int limit = maxLimitPerRequest;
- Optional<Integer> lim = query.getLimit();
- if (lim.isPresent() && lim.get() < maxLimitPerRequest) {
- limit = lim.get();
- }
+ public long toPartitionTs(long ts) {
+ LocalDateTime time = LocalDateTime.ofInstant(Instant.ofEpochMilli(ts), ZoneOffset.UTC);
+ return tsFormat.truncatedTo(time).toInstant(ZoneOffset.UTC).toEpochMilli();
+ }
- rows = new ArrayList<>(limit);
- int lastIdx = partsLength - 1;
- for (int i = 0; i < partsLength; i++) {
- int currentLimit;
- if (rows.size() >= limit) {
- break;
- } else {
- currentLimit = limit - rows.size();
+
+ private static String[] getFetchColumnNames(Aggregation aggregation) {
+ switch (aggregation) {
+ case NONE:
+ return ModelConstants.NONE_AGGREGATION_COLUMNS;
+ case MIN:
+ return ModelConstants.MIN_AGGREGATION_COLUMNS;
+ case MAX:
+ return ModelConstants.MAX_AGGREGATION_COLUMNS;
+ case SUM:
+ return ModelConstants.SUM_AGGREGATION_COLUMNS;
+ case COUNT:
+ return ModelConstants.COUNT_AGGREGATION_COLUMNS;
+ case AVG:
+ return ModelConstants.AVG_AGGREGATION_COLUMNS;
+ default:
+ throw new RuntimeException("Aggregation type: " + aggregation + " is not supported!");
+ }
+ }
+
+ @Override
+ public ListenableFuture<List<TsKvEntry>> findAllAsync(String entityType, UUID entityId, TsKvQuery query, long minPartition, long maxPartition) {
+ if (query.getAggregation() == Aggregation.NONE) {
+ //TODO:
+ return null;
+ } else {
+ long step = Math.max((query.getEndTs() - query.getStartTs()) / query.getLimit(), minReadStep);
+ long stepTs = query.getStartTs();
+ List<ListenableFuture<Optional<TsKvEntry>>> futures = new ArrayList<>();
+ while (stepTs < query.getEndTs()) {
+ long startTs = stepTs;
+ long endTs = stepTs + step;
+ TsKvQuery subQuery = new BaseTsKvQuery(query.getKey(), startTs, endTs, 1, query.getAggregation());
+ futures.add(findAndAggregateAsync(entityType, entityId, subQuery, toPartitionTs(startTs), toPartitionTs(endTs)));
+ stepTs = endTs;
+ }
+ ListenableFuture<List<Optional<TsKvEntry>>> future = Futures.allAsList(futures);
+ return Futures.transform(future, new Function<List<Optional<TsKvEntry>>, List<TsKvEntry>>() {
+ @Nullable
+ @Override
+ public List<TsKvEntry> apply(@Nullable List<Optional<TsKvEntry>> input) {
+ return input.stream().filter(v -> v.isPresent()).map(v -> v.get()).collect(Collectors.toList());
}
- Long partition = parts[i];
- Select.Where where = select().from(ModelConstants.TS_KV_CF).where(eq(ModelConstants.ENTITY_TYPE_COLUMN, entityType))
- .and(eq(ModelConstants.ENTITY_ID_COLUMN, entityId))
- .and(eq(ModelConstants.KEY_COLUMN, query.getKey()))
- .and(eq(ModelConstants.PARTITION_COLUMN, partition));
- if (i == 0 && query.getStartTs().isPresent()) {
- where.and(QueryBuilder.gt(ModelConstants.TS_COLUMN, query.getStartTs().get()));
- } else if (i == lastIdx && query.getEndTs().isPresent()) {
- where.and(QueryBuilder.lte(ModelConstants.TS_COLUMN, query.getEndTs().get()));
+ });
+ }
+ }
+
+ private ListenableFuture<Optional<TsKvEntry>> findAndAggregateAsync(String entityType, UUID entityId, TsKvQuery query, long minPartition, long maxPartition) {
+ final Aggregation aggregation = query.getAggregation();
+ final long startTs = query.getStartTs();
+ final long endTs = query.getEndTs();
+ final long ts = startTs + (endTs - startTs) / 2;
+
+ ResultSetFuture partitionsFuture = fetchPartitions(entityType, entityId, query.getKey(), minPartition, maxPartition);
+ com.google.common.base.Function<ResultSet, List<Long>> toArrayFunction = rows -> rows.all().stream()
+ .map(row -> row.getLong(ModelConstants.PARTITION_COLUMN)).collect(Collectors.toList());
+
+ ListenableFuture<List<Long>> partitionsListFuture = Futures.transform(partitionsFuture, toArrayFunction, readResultsProcessingExecutor);
+
+ AsyncFunction<List<Long>, List<ResultSet>> fetchChunksFunction = partitions -> {
+ try {
+ PreparedStatement proto = getFetchStmt(aggregation);
+ List<ResultSetFuture> futures = new ArrayList<>(partitions.size());
+ for (Long partition : partitions) {
+ BoundStatement stmt = proto.bind();
+ stmt.setString(0, entityType);
+ stmt.setUUID(1, entityId);
+ stmt.setString(2, query.getKey());
+ stmt.setLong(3, partition);
+ stmt.setLong(4, startTs);
+ stmt.setLong(5, endTs);
+ log.debug("Generated query [{}] for entityType {} and entityId {}", stmt, entityType, entityId);
+ futures.add(executeAsyncRead(stmt));
}
- where.limit(currentLimit);
- rows.addAll(executeRead(where).all());
+ return Futures.allAsList(futures);
+ } catch (Throwable e) {
+ log.error("Failed to fetch data", e);
+ throw e;
}
- }
- return convertResultToTsKvEntryList(rows);
+ };
+
+ ListenableFuture<List<ResultSet>> aggregationChunks = Futures.transform(partitionsListFuture, fetchChunksFunction, readResultsProcessingExecutor);
+
+ return Futures.transform(aggregationChunks, new AggregatePartitionsFunction(aggregation, query.getKey(), ts), readResultsProcessingExecutor);
}
@Override
@@ -190,13 +291,12 @@ public class BaseTimeseriesDao extends AbstractDao implements TimeseriesDao {
* Select existing partitions from the table
* <code>{@link ModelConstants#TS_KV_PARTITIONS_CF}</code> for the given entity
*/
- private Long[] fetchPartitions(String entityType, UUID entityId, String key, Optional<Long> minPartition, Optional<Long> maxPartition) {
+ private ResultSetFuture fetchPartitions(String entityType, UUID entityId, String key, long minPartition, long maxPartition) {
Select.Where select = QueryBuilder.select(ModelConstants.PARTITION_COLUMN).from(ModelConstants.TS_KV_PARTITIONS_CF).where(eq(ModelConstants.ENTITY_TYPE_COLUMN, entityType))
.and(eq(ModelConstants.ENTITY_ID_COLUMN, entityId)).and(eq(ModelConstants.KEY_COLUMN, key));
- minPartition.ifPresent(startTs -> select.and(QueryBuilder.gte(ModelConstants.PARTITION_COLUMN, minPartition.get())));
- maxPartition.ifPresent(endTs -> select.and(QueryBuilder.lte(ModelConstants.PARTITION_COLUMN, maxPartition.get())));
- ResultSet resultSet = executeRead(select);
- return resultSet.all().stream().map(row -> row.getLong(ModelConstants.PARTITION_COLUMN)).toArray(Long[]::new);
+ select.and(QueryBuilder.gte(ModelConstants.PARTITION_COLUMN, minPartition));
+ select.and(QueryBuilder.lte(ModelConstants.PARTITION_COLUMN, maxPartition));
+ return executeAsyncRead(select);
}
private PreparedStatement getSaveStmt(DataType dataType) {
@@ -216,6 +316,23 @@ public class BaseTimeseriesDao extends AbstractDao implements TimeseriesDao {
return saveStmts[dataType.ordinal()];
}
+ private PreparedStatement getFetchStmt(Aggregation aggType) {
+ if (fetchStmts == null) {
+ fetchStmts = new PreparedStatement[Aggregation.values().length];
+ for (Aggregation type : Aggregation.values()) {
+ fetchStmts[type.ordinal()] = getSession().prepare("SELECT " +
+ String.join(", ", getFetchColumnNames(type)) + " FROM " + ModelConstants.TS_KV_CF
+ + " WHERE " + ModelConstants.ENTITY_TYPE_COLUMN + " = ? "
+ + "AND " + ModelConstants.ENTITY_ID_COLUMN + " = ? "
+ + "AND " + ModelConstants.KEY_COLUMN + " = ? "
+ + "AND " + ModelConstants.PARTITION_COLUMN + " = ? "
+ + "AND " + ModelConstants.TS_COLUMN + " > ? "
+ + "AND " + ModelConstants.TS_COLUMN + " <= ?");
+ }
+ }
+ return fetchStmts[aggType.ordinal()];
+ }
+
private PreparedStatement getLatestStmt(DataType dataType) {
if (latestInsertStmts == null) {
latestInsertStmts = new PreparedStatement[DataType.values().length];
diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java
index 419b534..a8b4ef5 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java
@@ -1,12 +1,12 @@
/**
* Copyright © 2016-2017 The Thingsboard Authors
- *
+ * <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -23,21 +23,23 @@ import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.common.data.id.UUIDBased;
+import org.thingsboard.server.common.data.kv.BaseTsKvQuery;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.data.kv.TsKvQuery;
import org.thingsboard.server.dao.exception.IncorrectParameterException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.thingsboard.server.dao.service.Validator;
import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.*;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import static org.apache.commons.lang3.StringUtils.isBlank;
@@ -50,38 +52,14 @@ public class BaseTimeseriesService implements TimeseriesService {
public static final int INSERTS_PER_ENTRY = 3;
- @Value("${cassandra.query.ts_key_value_partitioning}")
- private String partitioning;
-
@Autowired
private TimeseriesDao timeseriesDao;
- private TsPartitionDate tsFormat;
-
- @PostConstruct
- public void init() {
- Optional<TsPartitionDate> partition = TsPartitionDate.parse(partitioning);
- if (partition.isPresent()) {
- tsFormat = partition.get();
- } else {
- log.warn("Incorrect configuration of partitioning {}", partitioning);
- throw new RuntimeException("Failed to parse partitioning property: " + partitioning + "!");
- }
- }
-
@Override
- public List<TsKvEntry> find(String entityType, UUIDBased entityId, TsKvQuery query) {
+ public ListenableFuture<List<TsKvEntry>> findAll(String entityType, UUIDBased entityId, TsKvQuery query) {
validate(entityType, entityId);
validate(query);
- return timeseriesDao.find(entityType, entityId.getId(), query, toPartitionTs(query.getStartTs()), toPartitionTs(query.getEndTs()));
- }
-
- private Optional<Long> toPartitionTs(Optional<Long> ts) {
- if (ts.isPresent()) {
- return Optional.of(toPartitionTs(ts.get()));
- } else {
- return Optional.empty();
- }
+ return timeseriesDao.findAllAsync(entityType, entityId.getId(), query, timeseriesDao.toPartitionTs(query.getStartTs()), timeseriesDao.toPartitionTs(query.getEndTs()));
}
@Override
@@ -106,7 +84,7 @@ public class BaseTimeseriesService implements TimeseriesService {
throw new IncorrectParameterException("Key value entry can't be null");
}
UUID uid = entityId.getId();
- long partitionTs = toPartitionTs(tsKvEntry.getTs());
+ long partitionTs = timeseriesDao.toPartitionTs(tsKvEntry.getTs());
List<ResultSetFuture> futures = Lists.newArrayListWithExpectedSize(INSERTS_PER_ENTRY);
saveAndRegisterFutures(futures, entityType, tsKvEntry, uid, partitionTs);
@@ -122,7 +100,7 @@ public class BaseTimeseriesService implements TimeseriesService {
throw new IncorrectParameterException("Key value entry can't be null");
}
UUID uid = entityId.getId();
- long partitionTs = toPartitionTs(tsKvEntry.getTs());
+ long partitionTs = timeseriesDao.toPartitionTs(tsKvEntry.getTs());
saveAndRegisterFutures(futures, entityType, tsKvEntry, uid, partitionTs);
}
return Futures.allAsList(futures);
@@ -144,14 +122,6 @@ public class BaseTimeseriesService implements TimeseriesService {
futures.add(timeseriesDao.save(entityType, uid, partitionTs, tsKvEntry));
}
- private long toPartitionTs(long ts) {
- LocalDateTime time = LocalDateTime.ofInstant(Instant.ofEpochMilli(ts), ZoneOffset.UTC);
-
- LocalDateTime parititonTime = tsFormat.truncatedTo(time);
-
- return parititonTime.toInstant(ZoneOffset.UTC).toEpochMilli();
- }
-
private static void validate(String entityType, UUIDBased entityId) {
Validator.validateString(entityType, "Incorrect entityType " + entityType);
Validator.validateId(entityId, "Incorrect entityId " + entityId);
@@ -163,5 +133,6 @@ public class BaseTimeseriesService implements TimeseriesService {
} else if (isBlank(query.getKey())) {
throw new IncorrectParameterException("Incorrect TsKvQuery. Key can't be empty");
}
+ //TODO: add validation of all params
}
}
diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesDao.java
index 294f574..83b78da 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesDao.java
@@ -17,6 +17,7 @@ package org.thingsboard.server.dao.timeseries;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Row;
+import com.google.common.util.concurrent.ListenableFuture;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.data.kv.TsKvQuery;
@@ -30,7 +31,11 @@ import java.util.UUID;
*/
public interface TimeseriesDao {
- List<TsKvEntry> find(String entityType, UUID entityId, TsKvQuery query, Optional<Long> minPartition, Optional<Long> maxPartition);
+ long toPartitionTs(long ts);
+
+ ListenableFuture<List<TsKvEntry>> findAllAsync(String entityType, UUID entityId, TsKvQuery query, long minPartition, long maxPartition);
+
+// List<TsKvEntry> find(String entityType, UUID entityId, TsKvQuery query, Optional<Long> minPartition, Optional<Long> maxPartition);
ResultSetFuture findLatest(String entityType, UUID entityId, String key);
diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java
index d8b31af..1bafdea 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java
@@ -19,6 +19,7 @@ import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Row;
import com.google.common.util.concurrent.ListenableFuture;
+import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.UUIDBased;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.data.kv.TsKvQuery;
@@ -32,8 +33,7 @@ import java.util.Set;
*/
public interface TimeseriesService {
- //TODO: Replace this with async operation
- List<TsKvEntry> find(String entityType, UUIDBased entityId, TsKvQuery query);
+ ListenableFuture<List<TsKvEntry>> findAll(String entityType, UUIDBased entityId, TsKvQuery query);
ListenableFuture<List<ResultSet>> findLatest(String entityType, UUIDBased entityId, Collection<String> keys);
diff --git a/dao/src/test/java/org/thingsboard/server/dao/DaoTestSuite.java b/dao/src/test/java/org/thingsboard/server/dao/DaoTestSuite.java
index b150259..ac48536 100644
--- a/dao/src/test/java/org/thingsboard/server/dao/DaoTestSuite.java
+++ b/dao/src/test/java/org/thingsboard/server/dao/DaoTestSuite.java
@@ -25,11 +25,11 @@ import java.util.Arrays;
@RunWith(ClasspathSuite.class)
@ClassnameFilters({
- "org.thingsboard.server.dao.service.*Test",
- "org.thingsboard.server.dao.kv.*Test",
- "org.thingsboard.server.dao.plugin.*Test",
- "org.thingsboard.server.dao.rule.*Test",
- "org.thingsboard.server.dao.attributes.*Test",
+// "org.thingsboard.server.dao.service.*Test",
+// "org.thingsboard.server.dao.kv.*Test",
+// "org.thingsboard.server.dao.plugin.*Test",
+// "org.thingsboard.server.dao.rule.*Test",
+// "org.thingsboard.server.dao.attributes.*Test",
"org.thingsboard.server.dao.timeseries.*Test"
})
public class DaoTestSuite {
diff --git a/dao/src/test/java/org/thingsboard/server/dao/timeseries/TimeseriesServiceTest.java b/dao/src/test/java/org/thingsboard/server/dao/timeseries/TimeseriesServiceTest.java
index 9e4f492..51fce6f 100644
--- a/dao/src/test/java/org/thingsboard/server/dao/timeseries/TimeseriesServiceTest.java
+++ b/dao/src/test/java/org/thingsboard/server/dao/timeseries/TimeseriesServiceTest.java
@@ -116,14 +116,36 @@ public class TimeseriesServiceTest extends AbstractServiceTest {
entries.add(tsKvEntry);
}
log.debug("Saved all records {}", localDateTime);
- List<TsKvEntry> list = tsService.find(DataConstants.DEVICE, deviceId, new BaseTsKvQuery(STRING_KEY, entries.get(599).getTs(),
- LocalDateTime.now(ZoneOffset.UTC).toInstant(ZoneOffset.UTC).toEpochMilli()));
+ List<TsKvEntry> list = tsService.findAll(DataConstants.DEVICE, deviceId, new BaseTsKvQuery(STRING_KEY, entries.get(599).getTs(),
+ LocalDateTime.now(ZoneOffset.UTC).toInstant(ZoneOffset.UTC).toEpochMilli(), PARTITION_MINUTES - 599, Aggregation.MIN)).get();
log.debug("Fetched records {}", localDateTime);
List<TsKvEntry> expected = entries.subList(600, PARTITION_MINUTES);
assertEquals(expected.size(), list.size());
assertEquals(expected, list);
}
+// @Test
+// public void testFindDeviceTsDataByQuery() throws Exception {
+// DeviceId deviceId = new DeviceId(UUIDs.timeBased());
+// LocalDateTime localDateTime = LocalDateTime.now(ZoneOffset.UTC).minusMinutes(PARTITION_MINUTES);
+// log.debug("Start event time is {}", localDateTime);
+// List<TsKvEntry> entries = new ArrayList<>(PARTITION_MINUTES);
+//
+// for (int i = 0; i < PARTITION_MINUTES; i++) {
+// long time = localDateTime.plusMinutes(i).toInstant(ZoneOffset.UTC).toEpochMilli();
+// BasicTsKvEntry tsKvEntry = new BasicTsKvEntry(time, stringKvEntry);
+// tsService.save(DataConstants.DEVICE, deviceId, tsKvEntry).get();
+// entries.add(tsKvEntry);
+// }
+// log.debug("Saved all records {}", localDateTime);
+// List<TsKvEntry> list = tsService.findAll(DataConstants.DEVICE, deviceId, new BaseTsKvQuery(STRING_KEY, entries.get(599).getTs(),
+// LocalDateTime.now(ZoneOffset.UTC).toInstant(ZoneOffset.UTC).toEpochMilli(), PARTITION_MINUTES - 599, Aggregation.MIN)).get();
+// log.debug("Fetched records {}", localDateTime);
+// List<TsKvEntry> expected = entries.subList(600, PARTITION_MINUTES);
+// assertEquals(expected.size(), list.size());
+// assertEquals(expected, list);
+// }
+
private void saveEntries(DeviceId deviceId, long ts) throws ExecutionException, InterruptedException {
tsService.save(DataConstants.DEVICE, deviceId, toTsEntry(ts, stringKvEntry)).get();
diff --git a/dao/src/test/resources/cassandra-test.properties b/dao/src/test/resources/cassandra-test.properties
index 82fcbe1..0a207e7 100644
--- a/dao/src/test/resources/cassandra-test.properties
+++ b/dao/src/test/resources/cassandra-test.properties
@@ -2,7 +2,7 @@ cassandra.cluster_name=Thingsboard Cluster
cassandra.keyspace_name=thingsboard
-cassandra.url=127.0.0.1:9142
+cassandra.url=127.0.0.1:9042
cassandra.ssl=false
@@ -47,3 +47,7 @@ cassandra.query.default_fetch_size=2000
cassandra.query.ts_key_value_partitioning=HOURS
cassandra.query.max_limit_per_request=1000
+
+cassandra.query.read_result_processing_threads=3
+
+cassandra.query.min_read_step=100
\ No newline at end of file
diff --git a/extensions-api/src/main/java/org/thingsboard/server/extensions/api/plugins/PluginContext.java b/extensions-api/src/main/java/org/thingsboard/server/extensions/api/plugins/PluginContext.java
index fb6ae07..b5f27a1 100644
--- a/extensions-api/src/main/java/org/thingsboard/server/extensions/api/plugins/PluginContext.java
+++ b/extensions-api/src/main/java/org/thingsboard/server/extensions/api/plugins/PluginContext.java
@@ -84,6 +84,8 @@ public interface PluginContext {
List<TsKvEntry> loadTimeseries(DeviceId deviceId, TsKvQuery query);
+ void loadTimeseries(DeviceId deviceId, TsKvQuery query, PluginCallback<List<TsKvEntry>> callback);
+
void loadLatestTimeseries(DeviceId deviceId, Collection<String> keys, PluginCallback<List<TsKvEntry>> callback);
void loadLatestTimeseries(DeviceId deviceId, PluginCallback<List<TsKvEntry>> callback);
diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRestMsgHandler.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRestMsgHandler.java
index d441e81..e93f0b5 100644
--- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRestMsgHandler.java
+++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRestMsgHandler.java
@@ -95,8 +95,9 @@ public class TelemetryRestMsgHandler extends DefaultRestMsgHandler {
Optional<Integer> limit = request.getIntParamValue("limit");
Map<String, List<TsData>> data = new LinkedHashMap<>();
for (String key : keys.split(",")) {
- List<TsKvEntry> entries = ctx.loadTimeseries(deviceId, new BaseTsKvQuery(key, startTs, endTs, limit));
- data.put(key, entries.stream().map(v -> new TsData(v.getTs(), v.getValueAsString())).collect(Collectors.toList()));
+ //TODO: refactoring
+// List<TsKvEntry> entries = ctx.loadTimeseries(deviceId, new BaseTsKvQuery(key, startTs, endTs, limit));
+// data.put(key, entries.stream().map(v -> new TsData(v.getTs(), v.getValueAsString())).collect(Collectors.toList()));
}
msg.getResponseHolder().setResult(new ResponseEntity<>(data, HttpStatus.OK));
} else if ("attributes".equals(entity)) {