thingsboard-memoizeit

Get rid of cassandra specific classes in timeseries dao and

6/14/2017 5:58:45 AM

Details

diff --git a/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java b/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java
index bcef69b..0e2ffcb 100644
--- a/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java
+++ b/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java
@@ -16,9 +16,6 @@
 package org.thingsboard.server.actors.plugin;
 
 import akka.actor.ActorRef;
-import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.ResultSetFuture;
-import com.datastax.driver.core.Row;
 import com.google.common.base.Function;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
@@ -159,7 +156,7 @@ public final class PluginProcessingContext implements PluginContext {
     @Override
     public void saveTsData(final EntityId entityId, final TsKvEntry entry, final PluginCallback<Void> callback) {
         validate(entityId, new ValidationCallback(callback, ctx -> {
-            ListenableFuture<List<ResultSet>> rsListFuture = pluginCtx.tsService.save(entityId, entry);
+            ListenableFuture<List<Void>> rsListFuture = pluginCtx.tsService.save(entityId, entry);
             Futures.addCallback(rsListFuture, getListCallback(callback, v -> null), executor);
         }));
     }
@@ -172,7 +169,7 @@ public final class PluginProcessingContext implements PluginContext {
     @Override
     public void saveTsData(final EntityId entityId, final List<TsKvEntry> entries, long ttl, final PluginCallback<Void> callback) {
         validate(entityId, new ValidationCallback(callback, ctx -> {
-            ListenableFuture<List<ResultSet>> rsListFuture = pluginCtx.tsService.save(entityId, entries, ttl);
+            ListenableFuture<List<Void>> rsListFuture = pluginCtx.tsService.save(entityId, entries, ttl);
             Futures.addCallback(rsListFuture, getListCallback(callback, v -> null), executor);
         }));
     }
@@ -189,26 +186,16 @@ public final class PluginProcessingContext implements PluginContext {
     @Override
     public void loadLatestTimeseries(final EntityId entityId, final PluginCallback<List<TsKvEntry>> callback) {
         validate(entityId, new ValidationCallback(callback, ctx -> {
-            ResultSetFuture future = pluginCtx.tsService.findAllLatest(entityId);
-            Futures.addCallback(future, getCallback(callback, pluginCtx.tsService::convertResultSetToTsKvEntryList), executor);
+            ListenableFuture<List<TsKvEntry>> future = pluginCtx.tsService.findAllLatest(entityId);
+            Futures.addCallback(future, getCallback(callback, v -> v), executor);
         }));
     }
 
     @Override
     public void loadLatestTimeseries(final EntityId entityId, final Collection<String> keys, final PluginCallback<List<TsKvEntry>> callback) {
         validate(entityId, new ValidationCallback(callback, ctx -> {
-            ListenableFuture<List<ResultSet>> rsListFuture = pluginCtx.tsService.findLatest(entityId, keys);
-            Futures.addCallback(rsListFuture, getListCallback(callback, rsList ->
-            {
-                List<TsKvEntry> result = new ArrayList<>();
-                for (ResultSet rs : rsList) {
-                    Row row = rs.one();
-                    if (row != null) {
-                        result.add(pluginCtx.tsService.convertResultToTsKvEntry(row));
-                    }
-                }
-                return result;
-            }), executor);
+            ListenableFuture<List<TsKvEntry>> rsListFuture = pluginCtx.tsService.findLatest(entityId, keys);
+            Futures.addCallback(rsListFuture, getCallback(callback, v -> v), executor);
         }));
     }
 
diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml
index 208d9ab..b443dcd 100644
--- a/application/src/main/resources/thingsboard.yml
+++ b/application/src/main/resources/thingsboard.yml
@@ -153,7 +153,7 @@ cassandra:
     # Specify partitioning size for timestamp key-value storage. Example MINUTES, HOURS, DAYS, MONTHS
     ts_key_value_partitioning: "${TS_KV_PARTITIONING:MONTHS}"
     # Specify max data points per request
-    min_aggregation_step_ms: "${TS_KV_MIN_AGGREGATION_STEP_MS:100}"
+    min_aggregation_step_ms: "${TS_KV_MIN_AGGREGATION_STEP_MS:1000}"
 
 # Actor system parameters
 actors:
diff --git a/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationDao.java b/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationDao.java
index 2f6fe73..af76971 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationDao.java
@@ -188,7 +188,7 @@ public class BaseRelationDao extends CassandraAbstractAsyncDao implements Relati
                                 QueryBuilder.asc(ModelConstants.RELATION_TO_TYPE_PROPERTY)
                 ),
                 pageLink, ModelConstants.RELATION_TO_ID_PROPERTY);
-        return getFuture(executeAsyncRead(query), rs -> getEntityRelations(rs));
+        return getFuture(executeAsyncRead(query), this::getEntityRelations);
     }
 
     private PreparedStatement getSaveStmt() {
diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/JpaTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/JpaTimeseriesDao.java
index ae7573e..9e53cc6 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/JpaTimeseriesDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/JpaTimeseriesDao.java
@@ -15,8 +15,6 @@
  */
 package org.thingsboard.server.dao.sql.timeseries;
 
-import com.datastax.driver.core.ResultSetFuture;
-import com.datastax.driver.core.Row;
 import com.google.common.util.concurrent.ListenableFuture;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
@@ -44,37 +42,28 @@ public class JpaTimeseriesDao implements TimeseriesDao {
     }
 
     @Override
-    public ResultSetFuture findLatest(EntityId entityId, String key) {
+    public ListenableFuture<TsKvEntry> findLatest(EntityId entityId, String key) {
         return null;
     }
 
     @Override
-    public ResultSetFuture findAllLatest(EntityId entityId) {
+    public ListenableFuture<List<TsKvEntry>> findAllLatest(EntityId entityId) {
         return null;
     }
 
     @Override
-    public ResultSetFuture save(EntityId entityId, long partition, TsKvEntry tsKvEntry, long ttl) {
+    public ListenableFuture<Void> save(EntityId entityId, long partition, TsKvEntry tsKvEntry, long ttl) {
         return null;
     }
 
     @Override
-    public ResultSetFuture savePartition(EntityId entityId, long partition, String key, long ttl) {
+    public ListenableFuture<Void> savePartition(EntityId entityId, long partition, String key, long ttl) {
         return null;
     }
 
     @Override
-    public ResultSetFuture saveLatest(EntityId entityId, TsKvEntry tsKvEntry) {
+    public ListenableFuture<Void> saveLatest(EntityId entityId, TsKvEntry tsKvEntry) {
         return null;
     }
 
-    @Override
-    public TsKvEntry convertResultToTsKvEntry(Row row) {
-        return null;
-    }
-
-    @Override
-    public List<TsKvEntry> convertResultToTsKvEntryList(List<Row> rows) {
-        return null;
-    }
 }
diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java
index 8a1f629..c4a71b3 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java
@@ -15,9 +15,6 @@
  */
 package org.thingsboard.server.dao.timeseries;
 
-import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.ResultSetFuture;
-import com.datastax.driver.core.Row;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
@@ -32,7 +29,6 @@ import org.thingsboard.server.dao.service.Validator;
 
 import java.util.Collection;
 import java.util.List;
-import java.util.UUID;
 
 import static org.apache.commons.lang3.StringUtils.isBlank;
 
@@ -56,42 +52,35 @@ public class BaseTimeseriesService implements TimeseriesService {
     }
 
     @Override
-    public ListenableFuture<List<ResultSet>> findLatest(EntityId entityId, Collection<String> keys) {
+    public ListenableFuture<List<TsKvEntry>> findLatest(EntityId entityId, Collection<String> keys) {
         validate(entityId);
-        List<ResultSetFuture> futures = Lists.newArrayListWithExpectedSize(keys.size());
+        List<ListenableFuture<TsKvEntry>> futures = Lists.newArrayListWithExpectedSize(keys.size());
         keys.forEach(key -> Validator.validateString(key, "Incorrect key " + key));
         keys.forEach(key -> futures.add(timeseriesDao.findLatest(entityId, key)));
         return Futures.allAsList(futures);
     }
 
     @Override
-    public ResultSetFuture findAllLatest(EntityId entityId) {
+    public ListenableFuture<List<TsKvEntry>> findAllLatest(EntityId entityId) {
         validate(entityId);
         return timeseriesDao.findAllLatest(entityId);
     }
 
     @Override
-    public ListenableFuture<List<ResultSet>> save(EntityId entityId, TsKvEntry tsKvEntry) {
+    public ListenableFuture<List<Void>> save(EntityId entityId, TsKvEntry tsKvEntry) {
         validate(entityId);
         if (tsKvEntry == null) {
             throw new IncorrectParameterException("Key value entry can't be null");
         }
         long partitionTs = timeseriesDao.toPartitionTs(tsKvEntry.getTs());
-
-        List<ResultSetFuture> futures = Lists.newArrayListWithExpectedSize(INSERTS_PER_ENTRY);
+        List<ListenableFuture<Void>> futures = Lists.newArrayListWithExpectedSize(INSERTS_PER_ENTRY);
         saveAndRegisterFutures(futures, entityId, tsKvEntry, partitionTs, 0L);
         return Futures.allAsList(futures);
     }
 
     @Override
-    public ListenableFuture<List<ResultSet>> save(EntityId entityId, List<TsKvEntry> tsKvEntries) {
-        return save(entityId, tsKvEntries, 0L);
-    }
-
-    @Override
-    public ListenableFuture<List<ResultSet>> save(EntityId entityId, List<TsKvEntry> tsKvEntries, long ttl) {
-        validate(entityId);
-        List<ResultSetFuture> futures = Lists.newArrayListWithExpectedSize(tsKvEntries.size() * INSERTS_PER_ENTRY);
+    public ListenableFuture<List<Void>> save(EntityId entityId, List<TsKvEntry> tsKvEntries, long ttl) {
+        List<ListenableFuture<Void>> futures = Lists.newArrayListWithExpectedSize(tsKvEntries.size() * INSERTS_PER_ENTRY);
         for (TsKvEntry tsKvEntry : tsKvEntries) {
             if (tsKvEntry == null) {
                 throw new IncorrectParameterException("Key value entry can't be null");
@@ -102,18 +91,7 @@ public class BaseTimeseriesService implements TimeseriesService {
         return Futures.allAsList(futures);
     }
 
-
-    @Override
-    public TsKvEntry convertResultToTsKvEntry(Row row) {
-        return timeseriesDao.convertResultToTsKvEntry(row);
-    }
-
-    @Override
-    public List<TsKvEntry> convertResultSetToTsKvEntryList(ResultSet rs) {
-        return timeseriesDao.convertResultToTsKvEntryList(rs.all());
-    }
-
-    private void saveAndRegisterFutures(List<ResultSetFuture> futures, EntityId entityId, TsKvEntry tsKvEntry, long partitionTs, long ttl) {
+    private void saveAndRegisterFutures(List<ListenableFuture<Void>> futures, EntityId entityId, TsKvEntry tsKvEntry, long partitionTs, long ttl) {
         futures.add(timeseriesDao.savePartition(entityId, partitionTs, tsKvEntry.getKey(), ttl));
         futures.add(timeseriesDao.saveLatest(entityId, tsKvEntry));
         futures.add(timeseriesDao.save(entityId, partitionTs, tsKvEntry, ttl));
diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java
index 4c81eed..21e01e6 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java
@@ -51,12 +51,11 @@ import static com.datastax.driver.core.querybuilder.QueryBuilder.eq;
  */
 @Component
 @Slf4j
-@ConditionalOnProperty(prefix = "cassandra", value = "enabled", havingValue = "true", matchIfMissing = false)
+@ConditionalOnProperty(prefix = "cassandra", value = "enabled", havingValue = "true")
 public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implements TimeseriesDao {
 
-    //@Value("${cassandra.query.min_aggregation_step_ms}")
-    //TODO:
-    private int minAggregationStepMs = 1000;
+    @Value("${cassandra.query.min_aggregation_step_ms}")
+    private int minAggregationStepMs;
 
     @Value("${cassandra.query.ts_key_value_partitioning}")
     private String partitioning;
@@ -103,9 +102,12 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
             @Nullable
             @Override
             public List<TsKvEntry> apply(@Nullable List<List<TsKvEntry>> results) {
-                List<TsKvEntry> result = new ArrayList<TsKvEntry>();
-                results.forEach(r -> result.addAll(r));
-                return result;
+                if (results == null || results.isEmpty()) {
+                    return null;
+                }
+                return results.stream()
+                        .flatMap(List::stream)
+                        .collect(Collectors.toList());
             }
         }, readResultsProcessingExecutor);
     }
@@ -238,26 +240,26 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
     }
 
     @Override
-    public ResultSetFuture findLatest(EntityId entityId, String key) {
+    public ListenableFuture<TsKvEntry> findLatest(EntityId entityId, String key) {
         BoundStatement stmt = getFindLatestStmt().bind();
         stmt.setString(0, entityId.getEntityType().name());
         stmt.setUUID(1, entityId.getId());
         stmt.setString(2, key);
         log.debug("Generated query [{}] for entityType {} and entityId {}", stmt, entityId.getEntityType(), entityId.getId());
-        return executeAsyncRead(stmt);
+        return getFuture(executeAsyncRead(stmt), rs -> convertResultToTsKvEntry(rs.one()));
     }
 
     @Override
-    public ResultSetFuture findAllLatest(EntityId entityId) {
+    public ListenableFuture<List<TsKvEntry>> findAllLatest(EntityId entityId) {
         BoundStatement stmt = getFindAllLatestStmt().bind();
         stmt.setString(0, entityId.getEntityType().name());
         stmt.setUUID(1, entityId.getId());
         log.debug("Generated query [{}] for entityType {} and entityId {}", stmt, entityId.getEntityType(), entityId.getId());
-        return executeAsyncRead(stmt);
+        return getFuture(executeAsyncRead(stmt), rs -> convertResultToTsKvEntryList(rs.all()));
     }
 
     @Override
-    public ResultSetFuture save(EntityId entityId, long partition, TsKvEntry tsKvEntry, long ttl) {
+    public ListenableFuture<Void> save(EntityId entityId, long partition, TsKvEntry tsKvEntry, long ttl) {
         DataType type = tsKvEntry.getDataType();
         BoundStatement stmt = (ttl == 0 ? getSaveStmt(type) : getSaveTtlStmt(type)).bind();
         stmt.setString(0, entityId.getEntityType().name())
@@ -269,11 +271,11 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
         if (ttl > 0) {
             stmt.setInt(6, (int) ttl);
         }
-        return executeAsyncWrite(stmt);
+        return getFuture(executeAsyncWrite(stmt), rs -> null);
     }
 
     @Override
-    public ResultSetFuture savePartition(EntityId entityId, long partition, String key, long ttl) {
+    public ListenableFuture<Void> savePartition(EntityId entityId, long partition, String key, long ttl) {
         log.debug("Saving partition {} for the entity [{}-{}] and key {}", partition, entityId.getEntityType(), entityId.getId(), key);
         BoundStatement stmt = (ttl == 0 ? getPartitionInsertStmt() : getPartitionInsertTtlStmt()).bind();
         stmt = stmt.setString(0, entityId.getEntityType().name())
@@ -283,11 +285,11 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
         if (ttl > 0) {
             stmt.setInt(4, (int) ttl);
         }
-        return executeAsyncWrite(stmt);
+        return getFuture(executeAsyncWrite(stmt), rs -> null);
     }
 
     @Override
-    public ResultSetFuture saveLatest(EntityId entityId, TsKvEntry tsKvEntry) {
+    public ListenableFuture<Void> saveLatest(EntityId entityId, TsKvEntry tsKvEntry) {
         DataType type = tsKvEntry.getDataType();
         BoundStatement stmt = getLatestStmt(type).bind()
                 .setString(0, entityId.getEntityType().name())
@@ -295,25 +297,18 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
                 .setString(2, tsKvEntry.getKey())
                 .setLong(3, tsKvEntry.getTs());
         addValue(tsKvEntry, stmt, 4);
-        return executeAsyncWrite(stmt);
+        return getFuture(executeAsyncWrite(stmt), rs -> null);
     }
 
-    @Override
-    public List<TsKvEntry> convertResultToTsKvEntryList(List<Row> rows) {
+    private List<TsKvEntry> convertResultToTsKvEntryList(List<Row> rows) {
         List<TsKvEntry> entries = new ArrayList<>(rows.size());
         if (!rows.isEmpty()) {
-            rows.forEach(row -> {
-                TsKvEntry kvEntry = convertResultToTsKvEntry(row);
-                if (kvEntry != null) {
-                    entries.add(kvEntry);
-                }
-            });
+            rows.forEach(row -> entries.add(convertResultToTsKvEntry(row)));
         }
         return entries;
     }
 
-    @Override
-    public TsKvEntry convertResultToTsKvEntry(Row row) {
+    private TsKvEntry convertResultToTsKvEntry(Row row) {
         String key = row.getString(ModelConstants.KEY_COLUMN);
         long ts = row.getLong(ModelConstants.TS_COLUMN);
         return new BasicTsKvEntry(ts, toKvEntry(row, key));
@@ -490,7 +485,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
         return findAllLatestStmt;
     }
 
-    public static String getColumnName(DataType type) {
+    private static String getColumnName(DataType type) {
         switch (type) {
             case BOOLEAN:
                 return ModelConstants.BOOLEAN_VALUE_COLUMN;
@@ -505,7 +500,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
         }
     }
 
-    public static void addValue(KvEntry kvEntry, BoundStatement stmt, int column) {
+    private static void addValue(KvEntry kvEntry, BoundStatement stmt, int column) {
         switch (kvEntry.getDataType()) {
             case BOOLEAN:
                 stmt.setBool(column, kvEntry.getBooleanValue().get().booleanValue());
diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesDao.java
index 08a61f6..8f1b003 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesDao.java
@@ -15,8 +15,6 @@
  */
 package org.thingsboard.server.dao.timeseries;
 
-import com.datastax.driver.core.ResultSetFuture;
-import com.datastax.driver.core.Row;
 import com.google.common.util.concurrent.ListenableFuture;
 import org.thingsboard.server.common.data.id.EntityId;
 import org.thingsboard.server.common.data.kv.TsKvEntry;
@@ -33,18 +31,13 @@ public interface TimeseriesDao {
 
     ListenableFuture<List<TsKvEntry>> findAllAsync(EntityId entityId, List<TsKvQuery> queries);
 
-    ResultSetFuture findLatest(EntityId entityId, String key);
+    ListenableFuture<TsKvEntry> findLatest(EntityId entityId, String key);
 
-    ResultSetFuture findAllLatest(EntityId entityId);
+    ListenableFuture<List<TsKvEntry>> findAllLatest(EntityId entityId);
 
-    ResultSetFuture save(EntityId entityId, long partition, TsKvEntry tsKvEntry, long ttl);
+    ListenableFuture<Void> save(EntityId entityId, long partition, TsKvEntry tsKvEntry, long ttl);
 
-    ResultSetFuture savePartition(EntityId entityId, long partition, String key, long ttl);
-
-    ResultSetFuture saveLatest(EntityId entityId, TsKvEntry tsKvEntry);
-
-    TsKvEntry convertResultToTsKvEntry(Row row);
-
-    List<TsKvEntry> convertResultToTsKvEntryList(List<Row> rows);
+    ListenableFuture<Void> savePartition(EntityId entityId, long partition, String key, long ttl);
 
+    ListenableFuture<Void> saveLatest(EntityId entityId, TsKvEntry tsKvEntry);
 }
diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java
index 10fed26..e6feebb 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java
@@ -15,12 +15,8 @@
  */
 package org.thingsboard.server.dao.timeseries;
 
-import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.ResultSetFuture;
-import com.datastax.driver.core.Row;
 import com.google.common.util.concurrent.ListenableFuture;
 import org.thingsboard.server.common.data.id.EntityId;
-import org.thingsboard.server.common.data.id.UUIDBased;
 import org.thingsboard.server.common.data.kv.TsKvEntry;
 import org.thingsboard.server.common.data.kv.TsKvQuery;
 
@@ -34,18 +30,11 @@ public interface TimeseriesService {
 
     ListenableFuture<List<TsKvEntry>> findAll(EntityId entityId, List<TsKvQuery> queries);
 
-    ListenableFuture<List<ResultSet>> findLatest(EntityId entityId, Collection<String> keys);
+    ListenableFuture<List<TsKvEntry>> findLatest(EntityId entityId, Collection<String> keys);
 
-    ResultSetFuture findAllLatest(EntityId entityId);
+    ListenableFuture<List<TsKvEntry>> findAllLatest(EntityId entityId);
 
-    ListenableFuture<List<ResultSet>> save(EntityId entityId, TsKvEntry tsKvEntry);
-
-    ListenableFuture<List<ResultSet>> save(EntityId entityId, List<TsKvEntry> tsKvEntry);
-
-    ListenableFuture<List<ResultSet>> save(EntityId entityId, List<TsKvEntry> tsKvEntry, long ttl);
-
-    TsKvEntry convertResultToTsKvEntry(Row row);
-
-    List<TsKvEntry> convertResultSetToTsKvEntryList(ResultSet rs);
+    ListenableFuture<List<Void>> save(EntityId entityId, TsKvEntry tsKvEntry);
 
+    ListenableFuture<List<Void>> save(EntityId entityId, List<TsKvEntry> tsKvEntry, long ttl);
 }
diff --git a/dao/src/test/java/org/thingsboard/server/dao/timeseries/TimeseriesServiceTest.java b/dao/src/test/java/org/thingsboard/server/dao/timeseries/TimeseriesServiceTest.java
index 779aa55..793b503 100644
--- a/dao/src/test/java/org/thingsboard/server/dao/timeseries/TimeseriesServiceTest.java
+++ b/dao/src/test/java/org/thingsboard/server/dao/timeseries/TimeseriesServiceTest.java
@@ -15,13 +15,10 @@
  */
 package org.thingsboard.server.dao.timeseries;
 
-import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.ResultSetFuture;
 import com.datastax.driver.core.utils.UUIDs;
 import lombok.extern.slf4j.Slf4j;
 import org.junit.Assert;
 import org.junit.Test;
-import org.thingsboard.server.common.data.DataConstants;
 import org.thingsboard.server.common.data.id.DeviceId;
 import org.thingsboard.server.common.data.kv.*;
 import org.thingsboard.server.dao.service.AbstractServiceTest;
@@ -62,8 +59,7 @@ public class TimeseriesServiceTest extends AbstractServiceTest {
         saveEntries(deviceId, TS - 1);
         saveEntries(deviceId, TS);
 
-        ResultSetFuture rsFuture = tsService.findAllLatest(deviceId);
-        List<TsKvEntry> tsList = tsService.convertResultSetToTsKvEntryList(rsFuture.get());
+        List<TsKvEntry> tsList = tsService.findAllLatest(deviceId).get();
 
         assertNotNull(tsList);
         assertEquals(4, tsList.size());
@@ -91,9 +87,9 @@ public class TimeseriesServiceTest extends AbstractServiceTest {
         saveEntries(deviceId, TS - 1);
         saveEntries(deviceId, TS);
 
-        List<ResultSet> rs = tsService.findLatest(deviceId, Collections.singleton(STRING_KEY)).get();
-        Assert.assertEquals(1, rs.size());
-        Assert.assertEquals(toTsEntry(TS, stringKvEntry), tsService.convertResultToTsKvEntry(rs.get(0).one()));
+        List<TsKvEntry> entries = tsService.findLatest(deviceId, Collections.singleton(STRING_KEY)).get();
+        Assert.assertEquals(1, entries.size());
+        Assert.assertEquals(toTsEntry(TS, stringKvEntry), entries.get(0));
     }
 
     @Test
diff --git a/dao/src/test/resources/cassandra-test.properties b/dao/src/test/resources/cassandra-test.properties
index a4eb3d0..e2bf050 100644
--- a/dao/src/test/resources/cassandra-test.properties
+++ b/dao/src/test/resources/cassandra-test.properties
@@ -48,4 +48,4 @@ cassandra.query.ts_key_value_partitioning=HOURS
 
 cassandra.query.max_limit_per_request=1000
 
-cassandra.query.min_aggregation_step_ms=100
\ No newline at end of file
+cassandra.query.min_aggregation_step_ms=1000
\ No newline at end of file