thingsboard-memoizeit

Added get ts async implementation

6/23/2017 1:49:58 PM

Details

diff --git a/dao/src/main/java/org/thingsboard/server/dao/model/sql/AttributeKvEntity.java b/dao/src/main/java/org/thingsboard/server/dao/model/sql/AttributeKvEntity.java
index 2ea58af..c7ba821 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/model/sql/AttributeKvEntity.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/model/sql/AttributeKvEntity.java
@@ -16,6 +16,7 @@
 package org.thingsboard.server.dao.model.sql;
 
 import lombok.Data;
+import org.thingsboard.server.common.data.EntityType;
 import org.thingsboard.server.common.data.kv.*;
 import org.thingsboard.server.dao.model.ToData;
 
@@ -32,8 +33,9 @@ import static org.thingsboard.server.dao.model.ModelConstants.*;
 public class AttributeKvEntity implements ToData<AttributeKvEntry>, Serializable {
 
     @Id
+    @Enumerated(EnumType.STRING)
     @Column(name = ENTITY_TYPE_COLUMN)
-    private String entityType;
+    private EntityType entityType;
 
     @Id
     @Column(name = ENTITY_ID_COLUMN)
diff --git a/dao/src/main/java/org/thingsboard/server/dao/model/sql/TsKvCompositeKey.java b/dao/src/main/java/org/thingsboard/server/dao/model/sql/TsKvCompositeKey.java
index 48fc75c..57df750 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/model/sql/TsKvCompositeKey.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/model/sql/TsKvCompositeKey.java
@@ -18,6 +18,7 @@ package org.thingsboard.server.dao.model.sql;
 import lombok.AllArgsConstructor;
 import lombok.Data;
 import lombok.NoArgsConstructor;
+import org.thingsboard.server.common.data.EntityType;
 
 import javax.persistence.Transient;
 import java.io.Serializable;
@@ -31,7 +32,7 @@ public class TsKvCompositeKey implements Serializable{
     @Transient
     private static final long serialVersionUID = -4089175869616037523L;
 
-    private String entityType;
+    private EntityType entityType;
     private UUID entityId;
     private String key;
     private long ts;
diff --git a/dao/src/main/java/org/thingsboard/server/dao/model/sql/TsKvEntity.java b/dao/src/main/java/org/thingsboard/server/dao/model/sql/TsKvEntity.java
index 603da6f..356f3bd 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/model/sql/TsKvEntity.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/model/sql/TsKvEntity.java
@@ -16,7 +16,8 @@
 package org.thingsboard.server.dao.model.sql;
 
 import lombok.Data;
-import org.thingsboard.server.common.data.kv.TsKvEntry;
+import org.thingsboard.server.common.data.EntityType;
+import org.thingsboard.server.common.data.kv.*;
 import org.thingsboard.server.dao.model.ToData;
 
 import javax.persistence.*;
@@ -30,9 +31,41 @@ import static org.thingsboard.server.dao.model.ModelConstants.*;
 @IdClass(TsKvCompositeKey.class)
 public final class TsKvEntity implements ToData<TsKvEntry> {
 
+    public TsKvEntity() {
+    }
+
+    public TsKvEntity(Double avgLongValue, Double avgDoubleValue) {
+        this.longValue = avgLongValue.longValue();
+        this.doubleValue = avgDoubleValue;
+    }
+
+    public TsKvEntity(Long sumLongValue, Double sumDoubleValue) {
+        this.longValue = sumLongValue;
+        this.doubleValue = sumDoubleValue;
+    }
+
+    public TsKvEntity(String strValue, Long longValue, Double doubleValue) {
+        this.strValue = strValue;
+        this.longValue = longValue;
+        this.doubleValue = doubleValue;
+    }
+
+    public TsKvEntity(Long booleanValueCount, Long strValueCount, Long longValueCount, Long doubleValueCount) {
+        if (booleanValueCount != 0) {
+            this.longValue = booleanValueCount;
+        } else if (strValueCount != 0) {
+            this.longValue = strValueCount;
+        } else if (longValueCount != 0) {
+            this.longValue = longValueCount;
+        } else if (doubleValueCount != 0) {
+            this.longValue = doubleValueCount;
+        }
+    }
+
     @Id
+    @Enumerated(EnumType.STRING)
     @Column(name = ENTITY_TYPE_COLUMN)
-    private String entityType;
+    private EntityType entityType;
 
     @Id
     @Column(name = ENTITY_ID_COLUMN)
@@ -60,6 +93,16 @@ public final class TsKvEntity implements ToData<TsKvEntry> {
 
     @Override
     public TsKvEntry toData() {
-        return null;
+        KvEntry kvEntry = null;
+        if (strValue != null) {
+            kvEntry = new StringDataEntry(key, strValue);
+        } else if (longValue != null) {
+            kvEntry = new LongDataEntry(key, longValue);
+        } else if (doubleValue != null) {
+            kvEntry = new DoubleDataEntry(key, doubleValue);
+        } else if (booleanValue != null) {
+            kvEntry = new BooleanDataEntry(key, booleanValue);
+        }
+        return new BasicTsKvEntry(ts, kvEntry);
     }
 }
diff --git a/dao/src/main/java/org/thingsboard/server/dao/model/sql/TsKvLatestCompositeKey.java b/dao/src/main/java/org/thingsboard/server/dao/model/sql/TsKvLatestCompositeKey.java
index ab6006f..f7bf3b1 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/model/sql/TsKvLatestCompositeKey.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/model/sql/TsKvLatestCompositeKey.java
@@ -16,6 +16,7 @@
 package org.thingsboard.server.dao.model.sql;
 
 import lombok.*;
+import org.thingsboard.server.common.data.EntityType;
 
 import javax.persistence.Transient;
 import java.io.Serializable;
@@ -29,7 +30,7 @@ public class TsKvLatestCompositeKey implements Serializable{
     @Transient
     private static final long serialVersionUID = -4089175869616037523L;
 
-    private String entityType;
+    private EntityType entityType;
     private UUID entityId;
     private String key;
 }
\ No newline at end of file
diff --git a/dao/src/main/java/org/thingsboard/server/dao/model/sql/TsKvLatestEntity.java b/dao/src/main/java/org/thingsboard/server/dao/model/sql/TsKvLatestEntity.java
index b9a4543..47a5e00 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/model/sql/TsKvLatestEntity.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/model/sql/TsKvLatestEntity.java
@@ -16,6 +16,7 @@
 package org.thingsboard.server.dao.model.sql;
 
 import lombok.Data;
+import org.thingsboard.server.common.data.EntityType;
 import org.thingsboard.server.common.data.kv.*;
 import org.thingsboard.server.dao.model.ToData;
 
@@ -31,8 +32,9 @@ import static org.thingsboard.server.dao.model.ModelConstants.*;
 public final class TsKvLatestEntity implements ToData<TsKvEntry> {
 
     @Id
+    @Enumerated(EnumType.STRING)
     @Column(name = ENTITY_TYPE_COLUMN)
-    private String entityType;
+    private EntityType entityType;
 
     @Id
     @Column(name = ENTITY_ID_COLUMN)
diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/JpaAttributeDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/JpaAttributeDao.java
index cf69004..c77d339 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/JpaAttributeDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/JpaAttributeDao.java
@@ -83,7 +83,7 @@ public class JpaAttributeDao extends JpaAbstractDaoListeningExecutorService impl
     @Override
     public ListenableFuture<Void> save(EntityId entityId, String attributeType, AttributeKvEntry attribute) {
         AttributeKvEntity entity = new AttributeKvEntity();
-        entity.setEntityType(entityId.getEntityType().name());
+        entity.setEntityType(entityId.getEntityType());
         entity.setEntityId(entityId.getId());
         entity.setAttributeType(attributeType);
         entity.setAttributeKey(attribute.getKey());
@@ -104,7 +104,7 @@ public class JpaAttributeDao extends JpaAbstractDaoListeningExecutorService impl
                 .stream()
                 .map(key -> {
                     AttributeKvEntity entityToDelete = new AttributeKvEntity();
-                    entityToDelete.setEntityType(entityId.getEntityType().name());
+                    entityToDelete.setEntityType(entityId.getEntityType());
                     entityToDelete.setEntityId(entityId.getId());
                     entityToDelete.setAttributeType(attributeType);
                     entityToDelete.setAttributeKey(key);
diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/JpaTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/JpaTimeseriesDao.java
index bc056e1..7b64a4f 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/JpaTimeseriesDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/JpaTimeseriesDao.java
@@ -21,6 +21,7 @@ import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.data.domain.PageRequest;
 import org.springframework.stereotype.Component;
 import org.thingsboard.server.common.data.id.EntityId;
 import org.thingsboard.server.common.data.kv.Aggregation;
@@ -35,6 +36,7 @@ import org.thingsboard.server.dao.sql.JpaAbstractDaoListeningExecutorService;
 import org.thingsboard.server.dao.timeseries.TimeseriesDao;
 
 import javax.annotation.Nullable;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.stream.Collectors;
 
@@ -73,19 +75,93 @@ public class JpaTimeseriesDao extends JpaAbstractDaoListeningExecutorService imp
         if (query.getAggregation() == Aggregation.NONE) {
             return findAllAsyncWithLimit(entityId, query);
         } else {
-            return service.submit(() -> null);
+            long stepTs = query.getStartTs();
+            List<ListenableFuture<TsKvEntry>> futures = new ArrayList<>();
+            while (stepTs < query.getEndTs()) {
+                long startTs = stepTs;
+                long endTs = stepTs + query.getInterval();
+                long ts = startTs + (endTs - startTs) / 2;
+                futures.add(findAndAggregateAsync(entityId, query.getKey(), startTs, endTs, ts, query.getAggregation()));
+                stepTs = endTs;
+            }
+            return Futures.allAsList(futures);
+        }
+    }
+
+    private ListenableFuture<TsKvEntry> findAndAggregateAsync(EntityId entityId, String key, long startTs, long endTs, long ts, Aggregation aggregation) {
+        TsKvEntity entity;
+        switch (aggregation) {
+            case AVG:
+                entity = tsKvRepository.findAvg(
+                            entityId.getId(),
+                            entityId.getEntityType(),
+                            key,
+                            startTs,
+                            endTs);
+
+                break;
+            case MAX:
+                entity = tsKvRepository.findMax(
+                        entityId.getId(),
+                        entityId.getEntityType(),
+                        key,
+                        startTs,
+                        endTs);
+
+                break;
+            case MIN:
+                entity = tsKvRepository.findMin(
+                        entityId.getId(),
+                        entityId.getEntityType(),
+                        key,
+                        startTs,
+                        endTs);
+
+                break;
+            case SUM:
+                entity = tsKvRepository.findSum(
+                        entityId.getId(),
+                        entityId.getEntityType(),
+                        key,
+                        startTs,
+                        endTs);
+
+                break;
+            case COUNT:
+                entity = tsKvRepository.findCount(
+                        entityId.getId(),
+                        entityId.getEntityType(),
+                        key,
+                        startTs,
+                        endTs);
+
+                break;
+            default:
+                entity = null;
+        }
+        if (entity != null){
+            entity.setTs(ts);
         }
+        return service.submit(() -> DaoUtil.getData(entity));
     }
 
     private ListenableFuture<List<TsKvEntry>> findAllAsyncWithLimit(EntityId entityId, TsKvQuery query) {
-        return service.submit(() -> null);
+        return service.submit(() ->
+                DaoUtil.convertDataList(
+                        tsKvRepository.findAllWithLimit(
+                                entityId.getId(),
+                                entityId.getEntityType(),
+                                query.getKey(),
+                                query.getStartTs(),
+                                query.getEndTs(),
+                                new PageRequest(0, query.getLimit()))));
     }
 
     @Override
     public ListenableFuture<TsKvEntry> findLatest(EntityId entityId, String key) {
         TsKvLatestCompositeKey compositeKey =
                 new TsKvLatestCompositeKey(
-                        entityId.getEntityType().name(),
+                        entityId.getEntityType(),
                         entityId.getId(),
                         key);
         return service.submit(() ->
@@ -104,7 +180,7 @@ public class JpaTimeseriesDao extends JpaAbstractDaoListeningExecutorService imp
     @Override
     public ListenableFuture<Void> save(EntityId entityId, TsKvEntry tsKvEntry, long ttl) {
         TsKvEntity entity = new TsKvEntity();
-        entity.setEntityType(entityId.getEntityType().name());
+        entity.setEntityType(entityId.getEntityType());
         entity.setEntityId(entityId.getId());
         entity.setTs(tsKvEntry.getTs());
         entity.setKey(tsKvEntry.getKey());
@@ -126,7 +202,7 @@ public class JpaTimeseriesDao extends JpaAbstractDaoListeningExecutorService imp
     @Override
     public ListenableFuture<Void> saveLatest(EntityId entityId, TsKvEntry tsKvEntry) {
         TsKvLatestEntity latestEntity = new TsKvLatestEntity();
-        latestEntity.setEntityType(entityId.getEntityType().name());
+        latestEntity.setEntityType(entityId.getEntityType());
         latestEntity.setEntityId(entityId.getId());
         latestEntity.setTs(tsKvEntry.getTs());
         latestEntity.setKey(tsKvEntry.getKey());
diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/TsKvRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/TsKvRepository.java
index 786184e..a3f59cd 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/TsKvRepository.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/TsKvRepository.java
@@ -15,11 +15,76 @@
  */
 package org.thingsboard.server.dao.sql.timeseries;
 
+import org.springframework.data.domain.Pageable;
+import org.springframework.data.jpa.repository.Query;
 import org.springframework.data.repository.CrudRepository;
+import org.springframework.data.repository.query.Param;
+import org.thingsboard.server.common.data.EntityType;
 import org.thingsboard.server.dao.annotation.SqlDao;
 import org.thingsboard.server.dao.model.sql.TsKvCompositeKey;
 import org.thingsboard.server.dao.model.sql.TsKvEntity;
 
+import java.util.List;
+import java.util.UUID;
+
 @SqlDao
 public interface TsKvRepository extends CrudRepository<TsKvEntity, TsKvCompositeKey> {
+
+    @Query("SELECT tskv FROM TsKvEntity tskv WHERE tskv.entityId = :entityId " +
+            "AND tskv.entityType = :entityType AND tskv.key = :entityKey " +
+            "AND tskv.ts > :startTs AND tskv.ts < :endTs ORDER BY tskv.ts DESC")
+    List<TsKvEntity> findAllWithLimit(@Param("entityId") UUID entityId,
+                                      @Param("entityType") EntityType entityType,
+                                      @Param("entityKey") String key,
+                                      @Param("startTs") long startTs,
+                                      @Param("endTs") long endTs,
+                                      Pageable pageable);
+
+    @Query("SELECT new TsKvEntity(MAX(tskv.strValue), MAX(tskv.longValue), MAX(tskv.doubleValue)) FROM TsKvEntity tskv " +
+            "WHERE tskv.entityId = :entityId AND tskv.entityType = :entityType " +
+            "AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts < :endTs")
+    TsKvEntity findMax(@Param("entityId") UUID entityId,
+                       @Param("entityType") EntityType entityType,
+                       @Param("entityKey") String entityKey,
+                       @Param("startTs") long startTs,
+                       @Param("endTs") long endTs);
+
+    @Query("SELECT new TsKvEntity(MIN(tskv.strValue), MIN(tskv.longValue), MIN(tskv.doubleValue)) FROM TsKvEntity tskv " +
+            "WHERE tskv.entityId = :entityId AND tskv.entityType = :entityType " +
+            "AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts < :endTs")
+    TsKvEntity findMin(@Param("entityId") UUID entityId,
+                       @Param("entityType") EntityType entityType,
+                       @Param("entityKey") String entityKey,
+                       @Param("startTs") long startTs,
+                       @Param("endTs") long endTs);
+
+
+    @Query("SELECT new TsKvEntity(COUNT(tskv.booleanValue), COUNT(tskv.strValue), COUNT(tskv.longValue), COUNT(tskv.doubleValue)) FROM TsKvEntity tskv " +
+            "WHERE tskv.entityId = :entityId AND tskv.entityType = :entityType " +
+            "AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts < :endTs")
+    TsKvEntity findCount(@Param("entityId") UUID entityId,
+                         @Param("entityType") EntityType entityType,
+                         @Param("entityKey") String entityKey,
+                         @Param("startTs") long startTs,
+                         @Param("endTs") long endTs);
+
+
+    @Query("SELECT new TsKvEntity(AVG(tskv.longValue), AVG(tskv.doubleValue)) FROM TsKvEntity tskv " +
+            "WHERE tskv.entityId = :entityId AND tskv.entityType = :entityType " +
+            "AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts < :endTs")
+    TsKvEntity findAvg(@Param("entityId") UUID entityId,
+                       @Param("entityType") EntityType entityType,
+                       @Param("entityKey") String entityKey,
+                       @Param("startTs") long startTs,
+                       @Param("endTs") long endTs);
+
+
+    @Query("SELECT new TsKvEntity(SUM(tskv.longValue), SUM(tskv.doubleValue)) FROM TsKvEntity tskv " +
+            "WHERE tskv.entityId = :entityId AND tskv.entityType = :entityType " +
+            "AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts < :endTs")
+    TsKvEntity findSum(@Param("entityId") UUID entityId,
+                       @Param("entityType") EntityType entityType,
+                       @Param("entityKey") String entityKey,
+                       @Param("startTs") long startTs,
+                       @Param("endTs") long endTs);
 }