thingsboard-aplcache

Merge pull request #1014 from davidgin/fixedPartitions address

8/17/2018 10:54:58 AM

Details

diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml
index 3e62453..743a860 100644
--- a/application/src/main/resources/thingsboard.yml
+++ b/application/src/main/resources/thingsboard.yml
@@ -205,7 +205,7 @@ cassandra:
     read_consistency_level: "${CASSANDRA_READ_CONSISTENCY_LEVEL:ONE}"
     write_consistency_level: "${CASSANDRA_WRITE_CONSISTENCY_LEVEL:ONE}"
     default_fetch_size: "${CASSANDRA_DEFAULT_FETCH_SIZE:2000}"
-    # Specify partitioning size for timestamp key-value storage. Example MINUTES, HOURS, DAYS, MONTHS
+    # Specify partitioning size for timestamp key-value storage. Example MINUTES, HOURS, DAYS, MONTHS,INDEFINITE
     ts_key_value_partitioning: "${TS_KV_PARTITIONING:MONTHS}"
     ts_key_value_ttl: "${TS_KV_TTL:0}"
     buffer_size: "${CASSANDRA_QUERY_BUFFER_SIZE:200000}"
diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java
index 7aa317c..c025e64 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java
@@ -54,10 +54,11 @@ import javax.annotation.PreDestroy;
 import java.time.Instant;
 import java.time.LocalDateTime;
 import java.time.ZoneOffset;
-import java.util.ArrayList;
-import java.util.Collections;
+import java.util.Arrays;
 import java.util.List;
+import java.util.ArrayList;
 import java.util.Optional;
+import java.util.Collections;
 import java.util.stream.Collectors;
 
 import static com.datastax.driver.core.querybuilder.QueryBuilder.eq;
@@ -76,6 +77,9 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
     public static final String SELECT_PREFIX = "SELECT ";
     public static final String EQUALS_PARAM = " = ? ";
 
+
+    private static List<Long> FIXED_PARTITION = Arrays.asList(new Long[]{0L});
+
     @Autowired
     private Environment environment;
 
@@ -163,14 +167,25 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
         }
     }
 
+    public boolean isFixedPartitioning() {
+        return tsFormat.getTruncateUnit().equals(TsPartitionDate.EPOCH_START);
+    }
+
+    private ListenableFuture<List<Long>> getPartitionsFuture(TsKvQuery query, EntityId entityId, long minPartition, long maxPartition) {
+        if (isFixedPartitioning()) { //no need to fetch partitions from DB
+            return Futures.immediateFuture(FIXED_PARTITION);
+        }
+        ResultSetFuture partitionsFuture = fetchPartitions(entityId, query.getKey(), minPartition, maxPartition);
+        return Futures.transform(partitionsFuture, getPartitionsArrayFunction(), readResultsProcessingExecutor);
+    }
+
     private ListenableFuture<List<TsKvEntry>> findAllAsyncWithLimit(EntityId entityId, TsKvQuery query) {
+
         long minPartition = toPartitionTs(query.getStartTs());
         long maxPartition = toPartitionTs(query.getEndTs());
 
-        ResultSetFuture partitionsFuture = fetchPartitions(entityId, query.getKey(), minPartition, maxPartition);
-
+        final ListenableFuture<List<Long>> partitionsListFuture = getPartitionsFuture(query, entityId, minPartition, maxPartition);
         final SimpleListenableFuture<List<TsKvEntry>> resultFuture = new SimpleListenableFuture<>();
-        final ListenableFuture<List<Long>> partitionsListFuture = Futures.transform(partitionsFuture, getPartitionsArrayFunction(), readResultsProcessingExecutor);
 
         Futures.addCallback(partitionsListFuture, new FutureCallback<List<Long>>() {
             @Override
@@ -181,7 +196,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
 
             @Override
             public void onFailure(Throwable t) {
-                log.error("[{}][{}] Failed to fetch partitions for interval {}-{}", entityId.getEntityType().name(), entityId.getId(), minPartition, maxPartition, t);
+                log.error("[{}][{}] Failed to fetch partitions for interval {}-{}", entityId.getEntityType().name(), entityId.getId(), toPartitionTs(query.getStartTs()), toPartitionTs(query.getEndTs()), t);
             }
         }, readResultsProcessingExecutor);
 
@@ -229,10 +244,8 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
         final long endTs = query.getEndTs();
         final long ts = startTs + (endTs - startTs) / 2;
 
-        ResultSetFuture partitionsFuture = fetchPartitions(entityId, key, minPartition, maxPartition);
-
-        ListenableFuture<List<Long>> partitionsListFuture = Futures.transform(partitionsFuture, getPartitionsArrayFunction(), readResultsProcessingExecutor);
 
+        ListenableFuture<List<Long>> partitionsListFuture = getPartitionsFuture(query, entityId, minPartition, maxPartition);
         ListenableFuture<List<ResultSet>> aggregationChunks = Futures.transformAsync(partitionsListFuture,
                 getFetchChunksAsyncFunction(entityId, key, aggregation, startTs, endTs), readResultsProcessingExecutor);
 
@@ -308,6 +321,9 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
 
     @Override
     public ListenableFuture<Void> savePartition(EntityId entityId, long tsKvEntryTs, String key, long ttl) {
+        if (isFixedPartitioning()) {
+            return Futures.immediateFuture(null);
+        }
         ttl = computeTtl(ttl);
         long partition = toPartitionTs(tsKvEntryTs);
         log.debug("Saving partition {} for the entity [{}-{}] and key {}", partition, entityId.getEntityType(), entityId.getId(), key);
diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TsPartitionDate.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TsPartitionDate.java
index 4148004..93283fd 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TsPartitionDate.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TsPartitionDate.java
@@ -16,22 +16,25 @@
 package org.thingsboard.server.dao.timeseries;
 
 import java.time.LocalDateTime;
+import java.time.ZoneOffset;
 import java.time.temporal.ChronoUnit;
 import java.time.temporal.TemporalUnit;
 import java.util.Optional;
 
 public enum TsPartitionDate {
 
-    MINUTES("yyyy-MM-dd-HH-mm", ChronoUnit.MINUTES), HOURS("yyyy-MM-dd-HH", ChronoUnit.HOURS), DAYS("yyyy-MM-dd", ChronoUnit.DAYS), MONTHS("yyyy-MM", ChronoUnit.MONTHS), YEARS("yyyy", ChronoUnit.YEARS);
+    MINUTES("yyyy-MM-dd-HH-mm", ChronoUnit.MINUTES), HOURS("yyyy-MM-dd-HH", ChronoUnit.HOURS), DAYS("yyyy-MM-dd", ChronoUnit.DAYS), MONTHS("yyyy-MM", ChronoUnit.MONTHS), YEARS("yyyy", ChronoUnit.YEARS),INDEFINITE("",ChronoUnit.FOREVER);
 
     private final String pattern;
     private final transient TemporalUnit truncateUnit;
+    public final static LocalDateTime EPOCH_START = LocalDateTime.ofEpochSecond(0,0, ZoneOffset.UTC);
 
     TsPartitionDate(String pattern, TemporalUnit truncateUnit) {
         this.pattern = pattern;
         this.truncateUnit = truncateUnit;
     }
 
+
     public String getPattern() {
         return pattern;
     }
@@ -46,6 +49,8 @@ public enum TsPartitionDate {
                 return time.truncatedTo(ChronoUnit.DAYS).withDayOfMonth(1);
             case YEARS:
                 return time.truncatedTo(ChronoUnit.DAYS).withDayOfYear(1);
+            case INDEFINITE:
+                 return EPOCH_START;
             default:
                 return time.truncatedTo(truncateUnit);
         }