thingsboard-aplcache

Improve Jpa Timeseries DAO. Fixed SQL Warning Code: -1003.

1/31/2019 3:38:03 PM

Details

diff --git a/dao/src/main/java/org/thingsboard/server/dao/model/sql/TsKvEntity.java b/dao/src/main/java/org/thingsboard/server/dao/model/sql/TsKvEntity.java
index 348f21b..fe10286 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/model/sql/TsKvEntity.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/model/sql/TsKvEntity.java
@@ -49,35 +49,52 @@ import static org.thingsboard.server.dao.model.ModelConstants.TS_COLUMN;
 @IdClass(TsKvCompositeKey.class)
 public final class TsKvEntity implements ToData<TsKvEntry> {
 
-    public TsKvEntity() {
-    }
+    private static final String SUM = "SUM";
+    private static final String AVG = "AVG";
+    private static final String MIN = "MIN";
+    private static final String MAX = "MAX";
 
-    public TsKvEntity(Long longSumValue, Double doubleSumValue, Long longCountValue, Long doubleCountValue) {
-        double sum = 0.0;
-        if (longSumValue != null) {
-            sum += longSumValue;
-        }
-        if (doubleSumValue != null) {
-            sum += doubleSumValue;
-        }
-        this.doubleValue = sum / (longCountValue + doubleCountValue);
+    public TsKvEntity() {
     }
 
-    public TsKvEntity(Long sumLongValue, Double sumDoubleValue) {
-        if (sumDoubleValue != null) {
-            this.doubleValue = sumDoubleValue + (sumLongValue != null ? sumLongValue.doubleValue() : 0.0);
-        } else {
-            this.longValue = sumLongValue;
-        }
+    public TsKvEntity(String strValue) {
+        this.strValue = strValue;
     }
 
-    public TsKvEntity(String strValue, Long longValue, Double doubleValue, boolean max) {
-        this.strValue = strValue;
-        if (longValue != null && doubleValue != null) {
-            this.doubleValue = max ? Math.max(doubleValue, longValue.doubleValue()) : Math.min(doubleValue, longValue.doubleValue());
-        } else {
-            this.longValue = longValue;
-            this.doubleValue = doubleValue;
+    public TsKvEntity(Long longValue, Double doubleValue, Long longCountValue, Long doubleCountValue, String aggType) {
+        switch (aggType) {
+            case AVG:
+                double sum = 0.0;
+                if (longValue != null) {
+                    sum += longValue;
+                }
+                if (doubleValue != null) {
+                    sum += doubleValue;
+                }
+                long totalCount = longCountValue + doubleCountValue;
+                if (totalCount > 0) {
+                    this.doubleValue = sum / (longCountValue + doubleCountValue);
+                } else {
+                    this.doubleValue = 0.0;
+                }
+                break;
+            case SUM:
+                if (doubleCountValue > 0) {
+                    this.doubleValue = doubleValue + (longValue != null ? longValue.doubleValue() : 0.0);
+                } else {
+                    this.longValue = longValue;
+                }
+                break;
+            case MIN:
+            case MAX:
+                if (longCountValue > 0 && doubleCountValue > 0) {
+                    this.doubleValue = MAX.equals(aggType) ? Math.max(doubleValue, longValue.doubleValue()) : Math.min(doubleValue, longValue.doubleValue());
+                } else if (doubleCountValue > 0) {
+                    this.doubleValue = doubleValue;
+                } else if (longCountValue > 0) {
+                    this.longValue = longValue;
+                }
+                break;
         }
     }
 
diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/JpaTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/JpaTimeseriesDao.java
index 3120148..a04944e 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/JpaTimeseriesDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/JpaTimeseriesDao.java
@@ -161,52 +161,62 @@ public class JpaTimeseriesDao extends JpaAbstractDaoListeningExecutorService imp
     }
 
     private ListenableFuture<Optional<TsKvEntry>> findAndAggregateAsync(EntityId entityId, String key, long startTs, long endTs, long ts, Aggregation aggregation) {
-        CompletableFuture<TsKvEntity> entity;
+        List<CompletableFuture<TsKvEntity>> entitiesFutures = new ArrayList<>();
         String entityIdStr = fromTimeUUID(entityId.getId());
         switch (aggregation) {
             case AVG:
-                entity = tsKvRepository.findAvg(
+                entitiesFutures.add(tsKvRepository.findAvg(
                         entityIdStr,
                         entityId.getEntityType(),
                         key,
                         startTs,
-                        endTs);
+                        endTs));
 
                 break;
             case MAX:
-                entity = tsKvRepository.findMax(
+                entitiesFutures.add(tsKvRepository.findStringMax(
                         entityIdStr,
                         entityId.getEntityType(),
                         key,
                         startTs,
-                        endTs);
+                        endTs));
+                entitiesFutures.add(tsKvRepository.findNumericMax(
+                        entityIdStr,
+                        entityId.getEntityType(),
+                        key,
+                        startTs,
+                        endTs));
 
                 break;
             case MIN:
-                entity = tsKvRepository.findMin(
+                entitiesFutures.add(tsKvRepository.findStringMin(
                         entityIdStr,
                         entityId.getEntityType(),
                         key,
                         startTs,
-                        endTs);
-
+                        endTs));
+                entitiesFutures.add(tsKvRepository.findNumericMin(
+                        entityIdStr,
+                        entityId.getEntityType(),
+                        key,
+                        startTs,
+                        endTs));
                 break;
             case SUM:
-                entity = tsKvRepository.findSum(
+                entitiesFutures.add(tsKvRepository.findSum(
                         entityIdStr,
                         entityId.getEntityType(),
                         key,
                         startTs,
-                        endTs);
-
+                        endTs));
                 break;
             case COUNT:
-                entity = tsKvRepository.findCount(
+                entitiesFutures.add(tsKvRepository.findCount(
                         entityIdStr,
                         entityId.getEntityType(),
                         key,
                         startTs,
-                        endTs);
+                        endTs));
 
                 break;
             default:
@@ -214,11 +224,27 @@ public class JpaTimeseriesDao extends JpaAbstractDaoListeningExecutorService imp
         }
 
         SettableFuture<TsKvEntity> listenableFuture = SettableFuture.create();
-        entity.whenComplete((tsKvEntity, throwable) -> {
+
+
+        CompletableFuture<List<TsKvEntity>> entities =
+                CompletableFuture.allOf(entitiesFutures.toArray(new CompletableFuture[entitiesFutures.size()]))
+                .thenApply(v -> entitiesFutures.stream()
+                        .map(CompletableFuture::join)
+                        .collect(Collectors.toList()));
+
+
+        entities.whenComplete((tsKvEntities, throwable) -> {
             if (throwable != null) {
                 listenableFuture.setException(throwable);
             } else {
-                listenableFuture.set(tsKvEntity);
+                TsKvEntity result = null;
+                for (TsKvEntity entity : tsKvEntities) {
+                    if (entity.isNotEmpty()) {
+                        result = entity;
+                        break;
+                    }
+                }
+                listenableFuture.set(result);
             }
         });
         return Futures.transform(listenableFuture, new Function<TsKvEntity, Optional<TsKvEntry>>() {
diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/TsKvRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/TsKvRepository.java
index 92bdf7e..79aa71b 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/TsKvRepository.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/TsKvRepository.java
@@ -35,7 +35,7 @@ public interface TsKvRepository extends CrudRepository<TsKvEntity, TsKvComposite
 
     @Query("SELECT tskv FROM TsKvEntity tskv WHERE tskv.entityId = :entityId " +
             "AND tskv.entityType = :entityType AND tskv.key = :entityKey " +
-            "AND tskv.ts > :startTs AND tskv.ts < :endTs")
+            "AND tskv.ts > :startTs AND tskv.ts <= :endTs")
     List<TsKvEntity> findAllWithLimit(@Param("entityId") String entityId,
                                       @Param("entityType") EntityType entityType,
                                       @Param("entityKey") String key,
@@ -55,29 +55,63 @@ public interface TsKvRepository extends CrudRepository<TsKvEntity, TsKvComposite
                 @Param("endTs") long endTs);
 
     @Async
-    @Query("SELECT new TsKvEntity(MAX(tskv.strValue), MAX(tskv.longValue), MAX(tskv.doubleValue), true) FROM TsKvEntity tskv " +
+    @Query("SELECT new TsKvEntity(MAX(tskv.strValue)) FROM TsKvEntity tskv " +
+            "WHERE tskv.strValue IS NOT NULL " +
+            "AND tskv.entityId = :entityId AND tskv.entityType = :entityType " +
+            "AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts <= :endTs")
+    CompletableFuture<TsKvEntity> findStringMax(@Param("entityId") String entityId,
+                                                @Param("entityType") EntityType entityType,
+                                                @Param("entityKey") String entityKey,
+                                                @Param("startTs") long startTs,
+                                                @Param("endTs") long endTs);
+
+    @Async
+    @Query("SELECT new TsKvEntity(MAX(COALESCE(tskv.longValue, -9223372036854775807)), " +
+            "MAX(COALESCE(tskv.doubleValue, -1.79769E+308)), " +
+            "SUM(CASE WHEN tskv.longValue IS NULL THEN 0 ELSE 1 END), " +
+            "SUM(CASE WHEN tskv.doubleValue IS NULL THEN 0 ELSE 1 END), " +
+            "'MAX') FROM TsKvEntity tskv " +
             "WHERE tskv.entityId = :entityId AND tskv.entityType = :entityType " +
-            "AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts < :endTs")
-    CompletableFuture<TsKvEntity> findMax(@Param("entityId") String entityId,
+            "AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts <= :endTs")
+    CompletableFuture<TsKvEntity> findNumericMax(@Param("entityId") String entityId,
+                                          @Param("entityType") EntityType entityType,
+                                          @Param("entityKey") String entityKey,
+                                          @Param("startTs") long startTs,
+                                          @Param("endTs") long endTs);
+
+
+    @Async
+    @Query("SELECT new TsKvEntity(MIN(tskv.strValue)) FROM TsKvEntity tskv " +
+            "WHERE tskv.strValue IS NOT NULL " +
+            "AND tskv.entityId = :entityId AND tskv.entityType = :entityType " +
+            "AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts <= :endTs")
+    CompletableFuture<TsKvEntity> findStringMin(@Param("entityId") String entityId,
                                           @Param("entityType") EntityType entityType,
                                           @Param("entityKey") String entityKey,
                                           @Param("startTs") long startTs,
                                           @Param("endTs") long endTs);
 
     @Async
-    @Query("SELECT new TsKvEntity(MIN(tskv.strValue), MIN(tskv.longValue), MIN(tskv.doubleValue), false) FROM TsKvEntity tskv " +
+    @Query("SELECT new TsKvEntity(MIN(COALESCE(tskv.longValue, 9223372036854775807)), " +
+            "MIN(COALESCE(tskv.doubleValue, 1.79769E+308)), " +
+            "SUM(CASE WHEN tskv.longValue IS NULL THEN 0 ELSE 1 END), " +
+            "SUM(CASE WHEN tskv.doubleValue IS NULL THEN 0 ELSE 1 END), " +
+            "'MIN') FROM TsKvEntity tskv " +
             "WHERE tskv.entityId = :entityId AND tskv.entityType = :entityType " +
-            "AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts < :endTs")
-    CompletableFuture<TsKvEntity> findMin(@Param("entityId") String entityId,
+            "AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts <= :endTs")
+    CompletableFuture<TsKvEntity> findNumericMin(@Param("entityId") String entityId,
                                           @Param("entityType") EntityType entityType,
                                           @Param("entityKey") String entityKey,
                                           @Param("startTs") long startTs,
                                           @Param("endTs") long endTs);
 
     @Async
-    @Query("SELECT new TsKvEntity(COUNT(tskv.booleanValue), COUNT(tskv.strValue), COUNT(tskv.longValue), COUNT(tskv.doubleValue)) FROM TsKvEntity tskv " +
+    @Query("SELECT new TsKvEntity(SUM(CASE WHEN tskv.booleanValue IS NULL THEN 0 ELSE 1 END), " +
+            "SUM(CASE WHEN tskv.strValue IS NULL THEN 0 ELSE 1 END), " +
+            "SUM(CASE WHEN tskv.longValue IS NULL THEN 0 ELSE 1 END), " +
+            "SUM(CASE WHEN tskv.doubleValue IS NULL THEN 0 ELSE 1 END)) FROM TsKvEntity tskv " +
             "WHERE tskv.entityId = :entityId AND tskv.entityType = :entityType " +
-            "AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts < :endTs")
+            "AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts <= :endTs")
     CompletableFuture<TsKvEntity> findCount(@Param("entityId") String entityId,
                                             @Param("entityType") EntityType entityType,
                                             @Param("entityKey") String entityKey,
@@ -85,23 +119,31 @@ public interface TsKvRepository extends CrudRepository<TsKvEntity, TsKvComposite
                                             @Param("endTs") long endTs);
 
     @Async
-    @Query("SELECT new TsKvEntity(SUM(tskv.longValue), SUM(tskv.doubleValue), COUNT(tskv.longValue), COUNT(tskv.doubleValue)) FROM TsKvEntity tskv " +
+    @Query("SELECT new TsKvEntity(SUM(COALESCE(tskv.longValue, 0)), " +
+            "SUM(COALESCE(tskv.doubleValue, 0.0)), " +
+            "SUM(CASE WHEN tskv.longValue IS NULL THEN 0 ELSE 1 END), " +
+            "SUM(CASE WHEN tskv.doubleValue IS NULL THEN 0 ELSE 1 END), " +
+            "'AVG') FROM TsKvEntity tskv " +
             "WHERE tskv.entityId = :entityId AND tskv.entityType = :entityType " +
-            "AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts < :endTs")
+            "AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts <= :endTs")
     CompletableFuture<TsKvEntity> findAvg(@Param("entityId") String entityId,
                                           @Param("entityType") EntityType entityType,
                                           @Param("entityKey") String entityKey,
                                           @Param("startTs") long startTs,
                                           @Param("endTs") long endTs);
 
-
     @Async
-    @Query("SELECT new TsKvEntity(SUM(tskv.longValue), SUM(tskv.doubleValue)) FROM TsKvEntity tskv " +
+    @Query("SELECT new TsKvEntity(SUM(COALESCE(tskv.longValue, 0)), " +
+            "SUM(COALESCE(tskv.doubleValue, 0.0)), " +
+            "SUM(CASE WHEN tskv.longValue IS NULL THEN 0 ELSE 1 END), " +
+            "SUM(CASE WHEN tskv.doubleValue IS NULL THEN 0 ELSE 1 END), " +
+            "'SUM') FROM TsKvEntity tskv " +
             "WHERE tskv.entityId = :entityId AND tskv.entityType = :entityType " +
-            "AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts < :endTs")
+            "AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts <= :endTs")
     CompletableFuture<TsKvEntity> findSum(@Param("entityId") String entityId,
                                           @Param("entityType") EntityType entityType,
                                           @Param("entityKey") String entityKey,
                                           @Param("startTs") long startTs,
                                           @Param("endTs") long endTs);
+
 }
diff --git a/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java b/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java
index 7378bcd..c2af9d6 100644
--- a/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java
+++ b/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java
@@ -17,10 +17,7 @@ package org.thingsboard.server.dao.service.timeseries;
 
 import com.datastax.driver.core.utils.UUIDs;
 import lombok.extern.slf4j.Slf4j;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.*;
 import org.thingsboard.server.common.data.EntityView;
 import org.thingsboard.server.common.data.Tenant;
 import org.thingsboard.server.common.data.id.DeviceId;
@@ -280,6 +277,66 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
 
         assertEquals(50000, list.get(2).getTs());
         assertEquals(java.util.Optional.of(2L), list.get(2).getLongValue());
+
+
+        entries.add(save(deviceId, 65000, "A1"));
+        entries.add(save(deviceId, 75000, "A2"));
+        entries.add(save(deviceId, 85000, "B1"));
+        entries.add(save(deviceId, 95000, "B2"));
+        entries.add(save(deviceId, 105000, "C1"));
+        entries.add(save(deviceId, 115000, "C2"));
+
+        list = tsService.findAll(tenantId, deviceId, Collections.singletonList(new BaseReadTsKvQuery(LONG_KEY, 60000,
+                120000, 20000, 3, Aggregation.NONE))).get();
+        assertEquals(3, list.size());
+        assertEquals(115000, list.get(0).getTs());
+        assertEquals(java.util.Optional.of("C2"), list.get(0).getStrValue());
+
+        assertEquals(105000, list.get(1).getTs());
+        assertEquals(java.util.Optional.of("C1"), list.get(1).getStrValue());
+
+        assertEquals(95000, list.get(2).getTs());
+        assertEquals(java.util.Optional.of("B2"), list.get(2).getStrValue());
+
+
+        list = tsService.findAll(tenantId, deviceId, Collections.singletonList(new BaseReadTsKvQuery(LONG_KEY, 60000,
+                120000, 20000, 3, Aggregation.MIN))).get();
+
+        assertEquals(3, list.size());
+        assertEquals(70000, list.get(0).getTs());
+        assertEquals(java.util.Optional.of("A1"), list.get(0).getStrValue());
+
+        assertEquals(90000, list.get(1).getTs());
+        assertEquals(java.util.Optional.of("B1"), list.get(1).getStrValue());
+
+        assertEquals(110000, list.get(2).getTs());
+        assertEquals(java.util.Optional.of("C1"), list.get(2).getStrValue());
+
+        list = tsService.findAll(tenantId, deviceId, Collections.singletonList(new BaseReadTsKvQuery(LONG_KEY, 60000,
+                120000, 20000, 3, Aggregation.MAX))).get();
+
+        assertEquals(3, list.size());
+        assertEquals(70000, list.get(0).getTs());
+        assertEquals(java.util.Optional.of("A2"), list.get(0).getStrValue());
+
+        assertEquals(90000, list.get(1).getTs());
+        assertEquals(java.util.Optional.of("B2"), list.get(1).getStrValue());
+
+        assertEquals(110000, list.get(2).getTs());
+        assertEquals(java.util.Optional.of("C2"), list.get(2).getStrValue());
+
+        list = tsService.findAll(tenantId, deviceId, Collections.singletonList(new BaseReadTsKvQuery(LONG_KEY, 60000,
+                120000, 20000, 3, Aggregation.COUNT))).get();
+
+        assertEquals(3, list.size());
+        assertEquals(70000, list.get(0).getTs());
+        assertEquals(java.util.Optional.of(2L), list.get(0).getLongValue());
+
+        assertEquals(90000, list.get(1).getTs());
+        assertEquals(java.util.Optional.of(2L), list.get(1).getLongValue());
+
+        assertEquals(110000, list.get(2).getTs());
+        assertEquals(java.util.Optional.of(2L), list.get(2).getLongValue());
     }
 
     @Test
@@ -385,6 +442,12 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
         return entry;
     }
 
+    private TsKvEntry save(DeviceId deviceId, long ts, String value) throws Exception {
+        TsKvEntry entry = new BasicTsKvEntry(ts, new StringDataEntry(LONG_KEY, value));
+        tsService.save(tenantId, deviceId, entry).get();
+        return entry;
+    }
+
 
     private void saveEntries(DeviceId deviceId, long ts) throws ExecutionException, InterruptedException {
         tsService.save(tenantId, deviceId, toTsEntry(ts, stringKvEntry)).get();