Details
diff --git a/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java b/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java
index bcef69b..0e2ffcb 100644
--- a/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java
+++ b/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java
@@ -16,9 +16,6 @@
package org.thingsboard.server.actors.plugin;
import akka.actor.ActorRef;
-import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.ResultSetFuture;
-import com.datastax.driver.core.Row;
import com.google.common.base.Function;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
@@ -159,7 +156,7 @@ public final class PluginProcessingContext implements PluginContext {
@Override
public void saveTsData(final EntityId entityId, final TsKvEntry entry, final PluginCallback<Void> callback) {
validate(entityId, new ValidationCallback(callback, ctx -> {
- ListenableFuture<List<ResultSet>> rsListFuture = pluginCtx.tsService.save(entityId, entry);
+ ListenableFuture<List<Void>> rsListFuture = pluginCtx.tsService.save(entityId, entry);
Futures.addCallback(rsListFuture, getListCallback(callback, v -> null), executor);
}));
}
@@ -172,7 +169,7 @@ public final class PluginProcessingContext implements PluginContext {
@Override
public void saveTsData(final EntityId entityId, final List<TsKvEntry> entries, long ttl, final PluginCallback<Void> callback) {
validate(entityId, new ValidationCallback(callback, ctx -> {
- ListenableFuture<List<ResultSet>> rsListFuture = pluginCtx.tsService.save(entityId, entries, ttl);
+ ListenableFuture<List<Void>> rsListFuture = pluginCtx.tsService.save(entityId, entries, ttl);
Futures.addCallback(rsListFuture, getListCallback(callback, v -> null), executor);
}));
}
@@ -189,26 +186,16 @@ public final class PluginProcessingContext implements PluginContext {
@Override
public void loadLatestTimeseries(final EntityId entityId, final PluginCallback<List<TsKvEntry>> callback) {
validate(entityId, new ValidationCallback(callback, ctx -> {
- ResultSetFuture future = pluginCtx.tsService.findAllLatest(entityId);
- Futures.addCallback(future, getCallback(callback, pluginCtx.tsService::convertResultSetToTsKvEntryList), executor);
+ ListenableFuture<List<TsKvEntry>> future = pluginCtx.tsService.findAllLatest(entityId);
+ Futures.addCallback(future, getCallback(callback, v -> v), executor);
}));
}
@Override
public void loadLatestTimeseries(final EntityId entityId, final Collection<String> keys, final PluginCallback<List<TsKvEntry>> callback) {
validate(entityId, new ValidationCallback(callback, ctx -> {
- ListenableFuture<List<ResultSet>> rsListFuture = pluginCtx.tsService.findLatest(entityId, keys);
- Futures.addCallback(rsListFuture, getListCallback(callback, rsList ->
- {
- List<TsKvEntry> result = new ArrayList<>();
- for (ResultSet rs : rsList) {
- Row row = rs.one();
- if (row != null) {
- result.add(pluginCtx.tsService.convertResultToTsKvEntry(row));
- }
- }
- return result;
- }), executor);
+ ListenableFuture<List<TsKvEntry>> rsListFuture = pluginCtx.tsService.findLatest(entityId, keys);
+ Futures.addCallback(rsListFuture, getCallback(callback, v -> v), executor);
}));
}
diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml
index 208d9ab..b443dcd 100644
--- a/application/src/main/resources/thingsboard.yml
+++ b/application/src/main/resources/thingsboard.yml
@@ -153,7 +153,7 @@ cassandra:
# Specify partitioning size for timestamp key-value storage. Example MINUTES, HOURS, DAYS, MONTHS
ts_key_value_partitioning: "${TS_KV_PARTITIONING:MONTHS}"
# Specify max data points per request
- min_aggregation_step_ms: "${TS_KV_MIN_AGGREGATION_STEP_MS:100}"
+ min_aggregation_step_ms: "${TS_KV_MIN_AGGREGATION_STEP_MS:1000}"
# Actor system parameters
actors:
diff --git a/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationDao.java b/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationDao.java
index 2f6fe73..af76971 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationDao.java
@@ -188,7 +188,7 @@ public class BaseRelationDao extends CassandraAbstractAsyncDao implements Relati
QueryBuilder.asc(ModelConstants.RELATION_TO_TYPE_PROPERTY)
),
pageLink, ModelConstants.RELATION_TO_ID_PROPERTY);
- return getFuture(executeAsyncRead(query), rs -> getEntityRelations(rs));
+ return getFuture(executeAsyncRead(query), this::getEntityRelations);
}
private PreparedStatement getSaveStmt() {
diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/JpaTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/JpaTimeseriesDao.java
index ae7573e..9e53cc6 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/JpaTimeseriesDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/JpaTimeseriesDao.java
@@ -15,8 +15,6 @@
*/
package org.thingsboard.server.dao.sql.timeseries;
-import com.datastax.driver.core.ResultSetFuture;
-import com.datastax.driver.core.Row;
import com.google.common.util.concurrent.ListenableFuture;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
@@ -44,37 +42,28 @@ public class JpaTimeseriesDao implements TimeseriesDao {
}
@Override
- public ResultSetFuture findLatest(EntityId entityId, String key) {
+ public ListenableFuture<TsKvEntry> findLatest(EntityId entityId, String key) {
return null;
}
@Override
- public ResultSetFuture findAllLatest(EntityId entityId) {
+ public ListenableFuture<List<TsKvEntry>> findAllLatest(EntityId entityId) {
return null;
}
@Override
- public ResultSetFuture save(EntityId entityId, long partition, TsKvEntry tsKvEntry, long ttl) {
+ public ListenableFuture<Void> save(EntityId entityId, long partition, TsKvEntry tsKvEntry, long ttl) {
return null;
}
@Override
- public ResultSetFuture savePartition(EntityId entityId, long partition, String key, long ttl) {
+ public ListenableFuture<Void> savePartition(EntityId entityId, long partition, String key, long ttl) {
return null;
}
@Override
- public ResultSetFuture saveLatest(EntityId entityId, TsKvEntry tsKvEntry) {
+ public ListenableFuture<Void> saveLatest(EntityId entityId, TsKvEntry tsKvEntry) {
return null;
}
- @Override
- public TsKvEntry convertResultToTsKvEntry(Row row) {
- return null;
- }
-
- @Override
- public List<TsKvEntry> convertResultToTsKvEntryList(List<Row> rows) {
- return null;
- }
}
diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java
index 8a1f629..c4a71b3 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java
@@ -15,9 +15,6 @@
*/
package org.thingsboard.server.dao.timeseries;
-import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.ResultSetFuture;
-import com.datastax.driver.core.Row;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
@@ -32,7 +29,6 @@ import org.thingsboard.server.dao.service.Validator;
import java.util.Collection;
import java.util.List;
-import java.util.UUID;
import static org.apache.commons.lang3.StringUtils.isBlank;
@@ -56,42 +52,35 @@ public class BaseTimeseriesService implements TimeseriesService {
}
@Override
- public ListenableFuture<List<ResultSet>> findLatest(EntityId entityId, Collection<String> keys) {
+ public ListenableFuture<List<TsKvEntry>> findLatest(EntityId entityId, Collection<String> keys) {
validate(entityId);
- List<ResultSetFuture> futures = Lists.newArrayListWithExpectedSize(keys.size());
+ List<ListenableFuture<TsKvEntry>> futures = Lists.newArrayListWithExpectedSize(keys.size());
keys.forEach(key -> Validator.validateString(key, "Incorrect key " + key));
keys.forEach(key -> futures.add(timeseriesDao.findLatest(entityId, key)));
return Futures.allAsList(futures);
}
@Override
- public ResultSetFuture findAllLatest(EntityId entityId) {
+ public ListenableFuture<List<TsKvEntry>> findAllLatest(EntityId entityId) {
validate(entityId);
return timeseriesDao.findAllLatest(entityId);
}
@Override
- public ListenableFuture<List<ResultSet>> save(EntityId entityId, TsKvEntry tsKvEntry) {
+ public ListenableFuture<List<Void>> save(EntityId entityId, TsKvEntry tsKvEntry) {
validate(entityId);
if (tsKvEntry == null) {
throw new IncorrectParameterException("Key value entry can't be null");
}
long partitionTs = timeseriesDao.toPartitionTs(tsKvEntry.getTs());
-
- List<ResultSetFuture> futures = Lists.newArrayListWithExpectedSize(INSERTS_PER_ENTRY);
+ List<ListenableFuture<Void>> futures = Lists.newArrayListWithExpectedSize(INSERTS_PER_ENTRY);
saveAndRegisterFutures(futures, entityId, tsKvEntry, partitionTs, 0L);
return Futures.allAsList(futures);
}
@Override
- public ListenableFuture<List<ResultSet>> save(EntityId entityId, List<TsKvEntry> tsKvEntries) {
- return save(entityId, tsKvEntries, 0L);
- }
-
- @Override
- public ListenableFuture<List<ResultSet>> save(EntityId entityId, List<TsKvEntry> tsKvEntries, long ttl) {
- validate(entityId);
- List<ResultSetFuture> futures = Lists.newArrayListWithExpectedSize(tsKvEntries.size() * INSERTS_PER_ENTRY);
+ public ListenableFuture<List<Void>> save(EntityId entityId, List<TsKvEntry> tsKvEntries, long ttl) {
+ List<ListenableFuture<Void>> futures = Lists.newArrayListWithExpectedSize(tsKvEntries.size() * INSERTS_PER_ENTRY);
for (TsKvEntry tsKvEntry : tsKvEntries) {
if (tsKvEntry == null) {
throw new IncorrectParameterException("Key value entry can't be null");
@@ -102,18 +91,7 @@ public class BaseTimeseriesService implements TimeseriesService {
return Futures.allAsList(futures);
}
-
- @Override
- public TsKvEntry convertResultToTsKvEntry(Row row) {
- return timeseriesDao.convertResultToTsKvEntry(row);
- }
-
- @Override
- public List<TsKvEntry> convertResultSetToTsKvEntryList(ResultSet rs) {
- return timeseriesDao.convertResultToTsKvEntryList(rs.all());
- }
-
- private void saveAndRegisterFutures(List<ResultSetFuture> futures, EntityId entityId, TsKvEntry tsKvEntry, long partitionTs, long ttl) {
+ private void saveAndRegisterFutures(List<ListenableFuture<Void>> futures, EntityId entityId, TsKvEntry tsKvEntry, long partitionTs, long ttl) {
futures.add(timeseriesDao.savePartition(entityId, partitionTs, tsKvEntry.getKey(), ttl));
futures.add(timeseriesDao.saveLatest(entityId, tsKvEntry));
futures.add(timeseriesDao.save(entityId, partitionTs, tsKvEntry, ttl));
diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java
index 4c81eed..21e01e6 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java
@@ -51,12 +51,11 @@ import static com.datastax.driver.core.querybuilder.QueryBuilder.eq;
*/
@Component
@Slf4j
-@ConditionalOnProperty(prefix = "cassandra", value = "enabled", havingValue = "true", matchIfMissing = false)
+@ConditionalOnProperty(prefix = "cassandra", value = "enabled", havingValue = "true")
public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implements TimeseriesDao {
- //@Value("${cassandra.query.min_aggregation_step_ms}")
- //TODO:
- private int minAggregationStepMs = 1000;
+ @Value("${cassandra.query.min_aggregation_step_ms}")
+ private int minAggregationStepMs;
@Value("${cassandra.query.ts_key_value_partitioning}")
private String partitioning;
@@ -103,9 +102,12 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
@Nullable
@Override
public List<TsKvEntry> apply(@Nullable List<List<TsKvEntry>> results) {
- List<TsKvEntry> result = new ArrayList<TsKvEntry>();
- results.forEach(r -> result.addAll(r));
- return result;
+ if (results == null || results.isEmpty()) {
+ return null;
+ }
+ return results.stream()
+ .flatMap(List::stream)
+ .collect(Collectors.toList());
}
}, readResultsProcessingExecutor);
}
@@ -238,26 +240,26 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
}
@Override
- public ResultSetFuture findLatest(EntityId entityId, String key) {
+ public ListenableFuture<TsKvEntry> findLatest(EntityId entityId, String key) {
BoundStatement stmt = getFindLatestStmt().bind();
stmt.setString(0, entityId.getEntityType().name());
stmt.setUUID(1, entityId.getId());
stmt.setString(2, key);
log.debug("Generated query [{}] for entityType {} and entityId {}", stmt, entityId.getEntityType(), entityId.getId());
- return executeAsyncRead(stmt);
+ return getFuture(executeAsyncRead(stmt), rs -> convertResultToTsKvEntry(rs.one()));
}
@Override
- public ResultSetFuture findAllLatest(EntityId entityId) {
+ public ListenableFuture<List<TsKvEntry>> findAllLatest(EntityId entityId) {
BoundStatement stmt = getFindAllLatestStmt().bind();
stmt.setString(0, entityId.getEntityType().name());
stmt.setUUID(1, entityId.getId());
log.debug("Generated query [{}] for entityType {} and entityId {}", stmt, entityId.getEntityType(), entityId.getId());
- return executeAsyncRead(stmt);
+ return getFuture(executeAsyncRead(stmt), rs -> convertResultToTsKvEntryList(rs.all()));
}
@Override
- public ResultSetFuture save(EntityId entityId, long partition, TsKvEntry tsKvEntry, long ttl) {
+ public ListenableFuture<Void> save(EntityId entityId, long partition, TsKvEntry tsKvEntry, long ttl) {
DataType type = tsKvEntry.getDataType();
BoundStatement stmt = (ttl == 0 ? getSaveStmt(type) : getSaveTtlStmt(type)).bind();
stmt.setString(0, entityId.getEntityType().name())
@@ -269,11 +271,11 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
if (ttl > 0) {
stmt.setInt(6, (int) ttl);
}
- return executeAsyncWrite(stmt);
+ return getFuture(executeAsyncWrite(stmt), rs -> null);
}
@Override
- public ResultSetFuture savePartition(EntityId entityId, long partition, String key, long ttl) {
+ public ListenableFuture<Void> savePartition(EntityId entityId, long partition, String key, long ttl) {
log.debug("Saving partition {} for the entity [{}-{}] and key {}", partition, entityId.getEntityType(), entityId.getId(), key);
BoundStatement stmt = (ttl == 0 ? getPartitionInsertStmt() : getPartitionInsertTtlStmt()).bind();
stmt = stmt.setString(0, entityId.getEntityType().name())
@@ -283,11 +285,11 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
if (ttl > 0) {
stmt.setInt(4, (int) ttl);
}
- return executeAsyncWrite(stmt);
+ return getFuture(executeAsyncWrite(stmt), rs -> null);
}
@Override
- public ResultSetFuture saveLatest(EntityId entityId, TsKvEntry tsKvEntry) {
+ public ListenableFuture<Void> saveLatest(EntityId entityId, TsKvEntry tsKvEntry) {
DataType type = tsKvEntry.getDataType();
BoundStatement stmt = getLatestStmt(type).bind()
.setString(0, entityId.getEntityType().name())
@@ -295,25 +297,18 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
.setString(2, tsKvEntry.getKey())
.setLong(3, tsKvEntry.getTs());
addValue(tsKvEntry, stmt, 4);
- return executeAsyncWrite(stmt);
+ return getFuture(executeAsyncWrite(stmt), rs -> null);
}
- @Override
- public List<TsKvEntry> convertResultToTsKvEntryList(List<Row> rows) {
+ private List<TsKvEntry> convertResultToTsKvEntryList(List<Row> rows) {
List<TsKvEntry> entries = new ArrayList<>(rows.size());
if (!rows.isEmpty()) {
- rows.forEach(row -> {
- TsKvEntry kvEntry = convertResultToTsKvEntry(row);
- if (kvEntry != null) {
- entries.add(kvEntry);
- }
- });
+ rows.forEach(row -> entries.add(convertResultToTsKvEntry(row)));
}
return entries;
}
- @Override
- public TsKvEntry convertResultToTsKvEntry(Row row) {
+ private TsKvEntry convertResultToTsKvEntry(Row row) {
String key = row.getString(ModelConstants.KEY_COLUMN);
long ts = row.getLong(ModelConstants.TS_COLUMN);
return new BasicTsKvEntry(ts, toKvEntry(row, key));
@@ -490,7 +485,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
return findAllLatestStmt;
}
- public static String getColumnName(DataType type) {
+ private static String getColumnName(DataType type) {
switch (type) {
case BOOLEAN:
return ModelConstants.BOOLEAN_VALUE_COLUMN;
@@ -505,7 +500,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
}
}
- public static void addValue(KvEntry kvEntry, BoundStatement stmt, int column) {
+ private static void addValue(KvEntry kvEntry, BoundStatement stmt, int column) {
switch (kvEntry.getDataType()) {
case BOOLEAN:
stmt.setBool(column, kvEntry.getBooleanValue().get().booleanValue());
diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesDao.java
index 08a61f6..8f1b003 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesDao.java
@@ -15,8 +15,6 @@
*/
package org.thingsboard.server.dao.timeseries;
-import com.datastax.driver.core.ResultSetFuture;
-import com.datastax.driver.core.Row;
import com.google.common.util.concurrent.ListenableFuture;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.kv.TsKvEntry;
@@ -33,18 +31,13 @@ public interface TimeseriesDao {
ListenableFuture<List<TsKvEntry>> findAllAsync(EntityId entityId, List<TsKvQuery> queries);
- ResultSetFuture findLatest(EntityId entityId, String key);
+ ListenableFuture<TsKvEntry> findLatest(EntityId entityId, String key);
- ResultSetFuture findAllLatest(EntityId entityId);
+ ListenableFuture<List<TsKvEntry>> findAllLatest(EntityId entityId);
- ResultSetFuture save(EntityId entityId, long partition, TsKvEntry tsKvEntry, long ttl);
+ ListenableFuture<Void> save(EntityId entityId, long partition, TsKvEntry tsKvEntry, long ttl);
- ResultSetFuture savePartition(EntityId entityId, long partition, String key, long ttl);
-
- ResultSetFuture saveLatest(EntityId entityId, TsKvEntry tsKvEntry);
-
- TsKvEntry convertResultToTsKvEntry(Row row);
-
- List<TsKvEntry> convertResultToTsKvEntryList(List<Row> rows);
+ ListenableFuture<Void> savePartition(EntityId entityId, long partition, String key, long ttl);
+ ListenableFuture<Void> saveLatest(EntityId entityId, TsKvEntry tsKvEntry);
}
diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java
index 10fed26..e6feebb 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java
@@ -15,12 +15,8 @@
*/
package org.thingsboard.server.dao.timeseries;
-import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.ResultSetFuture;
-import com.datastax.driver.core.Row;
import com.google.common.util.concurrent.ListenableFuture;
import org.thingsboard.server.common.data.id.EntityId;
-import org.thingsboard.server.common.data.id.UUIDBased;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.data.kv.TsKvQuery;
@@ -34,18 +30,11 @@ public interface TimeseriesService {
ListenableFuture<List<TsKvEntry>> findAll(EntityId entityId, List<TsKvQuery> queries);
- ListenableFuture<List<ResultSet>> findLatest(EntityId entityId, Collection<String> keys);
+ ListenableFuture<List<TsKvEntry>> findLatest(EntityId entityId, Collection<String> keys);
- ResultSetFuture findAllLatest(EntityId entityId);
+ ListenableFuture<List<TsKvEntry>> findAllLatest(EntityId entityId);
- ListenableFuture<List<ResultSet>> save(EntityId entityId, TsKvEntry tsKvEntry);
-
- ListenableFuture<List<ResultSet>> save(EntityId entityId, List<TsKvEntry> tsKvEntry);
-
- ListenableFuture<List<ResultSet>> save(EntityId entityId, List<TsKvEntry> tsKvEntry, long ttl);
-
- TsKvEntry convertResultToTsKvEntry(Row row);
-
- List<TsKvEntry> convertResultSetToTsKvEntryList(ResultSet rs);
+ ListenableFuture<List<Void>> save(EntityId entityId, TsKvEntry tsKvEntry);
+ ListenableFuture<List<Void>> save(EntityId entityId, List<TsKvEntry> tsKvEntry, long ttl);
}
diff --git a/dao/src/test/java/org/thingsboard/server/dao/timeseries/TimeseriesServiceTest.java b/dao/src/test/java/org/thingsboard/server/dao/timeseries/TimeseriesServiceTest.java
index 779aa55..793b503 100644
--- a/dao/src/test/java/org/thingsboard/server/dao/timeseries/TimeseriesServiceTest.java
+++ b/dao/src/test/java/org/thingsboard/server/dao/timeseries/TimeseriesServiceTest.java
@@ -15,13 +15,10 @@
*/
package org.thingsboard.server.dao.timeseries;
-import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.utils.UUIDs;
import lombok.extern.slf4j.Slf4j;
import org.junit.Assert;
import org.junit.Test;
-import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.kv.*;
import org.thingsboard.server.dao.service.AbstractServiceTest;
@@ -62,8 +59,7 @@ public class TimeseriesServiceTest extends AbstractServiceTest {
saveEntries(deviceId, TS - 1);
saveEntries(deviceId, TS);
- ResultSetFuture rsFuture = tsService.findAllLatest(deviceId);
- List<TsKvEntry> tsList = tsService.convertResultSetToTsKvEntryList(rsFuture.get());
+ List<TsKvEntry> tsList = tsService.findAllLatest(deviceId).get();
assertNotNull(tsList);
assertEquals(4, tsList.size());
@@ -91,9 +87,9 @@ public class TimeseriesServiceTest extends AbstractServiceTest {
saveEntries(deviceId, TS - 1);
saveEntries(deviceId, TS);
- List<ResultSet> rs = tsService.findLatest(deviceId, Collections.singleton(STRING_KEY)).get();
- Assert.assertEquals(1, rs.size());
- Assert.assertEquals(toTsEntry(TS, stringKvEntry), tsService.convertResultToTsKvEntry(rs.get(0).one()));
+ List<TsKvEntry> entries = tsService.findLatest(deviceId, Collections.singleton(STRING_KEY)).get();
+ Assert.assertEquals(1, entries.size());
+ Assert.assertEquals(toTsEntry(TS, stringKvEntry), entries.get(0));
}
@Test
diff --git a/dao/src/test/resources/cassandra-test.properties b/dao/src/test/resources/cassandra-test.properties
index a4eb3d0..e2bf050 100644
--- a/dao/src/test/resources/cassandra-test.properties
+++ b/dao/src/test/resources/cassandra-test.properties
@@ -48,4 +48,4 @@ cassandra.query.ts_key_value_partitioning=HOURS
cassandra.query.max_limit_per_request=1000
-cassandra.query.min_aggregation_step_ms=100
\ No newline at end of file
+cassandra.query.min_aggregation_step_ms=1000
\ No newline at end of file