Details
diff --git a/dao/src/main/java/org/thingsboard/server/dao/model/sql/AttributeKvEntity.java b/dao/src/main/java/org/thingsboard/server/dao/model/sql/AttributeKvEntity.java
index 2ea58af..c7ba821 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/model/sql/AttributeKvEntity.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/model/sql/AttributeKvEntity.java
@@ -16,6 +16,7 @@
package org.thingsboard.server.dao.model.sql;
import lombok.Data;
+import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.kv.*;
import org.thingsboard.server.dao.model.ToData;
@@ -32,8 +33,9 @@ import static org.thingsboard.server.dao.model.ModelConstants.*;
public class AttributeKvEntity implements ToData<AttributeKvEntry>, Serializable {
@Id
+ @Enumerated(EnumType.STRING)
@Column(name = ENTITY_TYPE_COLUMN)
- private String entityType;
+ private EntityType entityType;
@Id
@Column(name = ENTITY_ID_COLUMN)
diff --git a/dao/src/main/java/org/thingsboard/server/dao/model/sql/TsKvCompositeKey.java b/dao/src/main/java/org/thingsboard/server/dao/model/sql/TsKvCompositeKey.java
index 48fc75c..57df750 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/model/sql/TsKvCompositeKey.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/model/sql/TsKvCompositeKey.java
@@ -18,6 +18,7 @@ package org.thingsboard.server.dao.model.sql;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
+import org.thingsboard.server.common.data.EntityType;
import javax.persistence.Transient;
import java.io.Serializable;
@@ -31,7 +32,7 @@ public class TsKvCompositeKey implements Serializable{
@Transient
private static final long serialVersionUID = -4089175869616037523L;
- private String entityType;
+ private EntityType entityType;
private UUID entityId;
private String key;
private long ts;
diff --git a/dao/src/main/java/org/thingsboard/server/dao/model/sql/TsKvEntity.java b/dao/src/main/java/org/thingsboard/server/dao/model/sql/TsKvEntity.java
index 603da6f..356f3bd 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/model/sql/TsKvEntity.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/model/sql/TsKvEntity.java
@@ -16,7 +16,8 @@
package org.thingsboard.server.dao.model.sql;
import lombok.Data;
-import org.thingsboard.server.common.data.kv.TsKvEntry;
+import org.thingsboard.server.common.data.EntityType;
+import org.thingsboard.server.common.data.kv.*;
import org.thingsboard.server.dao.model.ToData;
import javax.persistence.*;
@@ -30,9 +31,41 @@ import static org.thingsboard.server.dao.model.ModelConstants.*;
@IdClass(TsKvCompositeKey.class)
public final class TsKvEntity implements ToData<TsKvEntry> {
+ public TsKvEntity() {
+ }
+
+ public TsKvEntity(Double avgLongValue, Double avgDoubleValue) {
+ this.longValue = avgLongValue.longValue();
+ this.doubleValue = avgDoubleValue;
+ }
+
+ public TsKvEntity(Long sumLongValue, Double sumDoubleValue) {
+ this.longValue = sumLongValue;
+ this.doubleValue = sumDoubleValue;
+ }
+
+ public TsKvEntity(String strValue, Long longValue, Double doubleValue) {
+ this.strValue = strValue;
+ this.longValue = longValue;
+ this.doubleValue = doubleValue;
+ }
+
+ public TsKvEntity(Long booleanValueCount, Long strValueCount, Long longValueCount, Long doubleValueCount) {
+ if (booleanValueCount != 0) {
+ this.longValue = booleanValueCount;
+ } else if (strValueCount != 0) {
+ this.longValue = strValueCount;
+ } else if (longValueCount != 0) {
+ this.longValue = longValueCount;
+ } else if (doubleValueCount != 0) {
+ this.longValue = doubleValueCount;
+ }
+ }
+
@Id
+ @Enumerated(EnumType.STRING)
@Column(name = ENTITY_TYPE_COLUMN)
- private String entityType;
+ private EntityType entityType;
@Id
@Column(name = ENTITY_ID_COLUMN)
@@ -60,6 +93,16 @@ public final class TsKvEntity implements ToData<TsKvEntry> {
@Override
public TsKvEntry toData() {
- return null;
+ KvEntry kvEntry = null;
+ if (strValue != null) {
+ kvEntry = new StringDataEntry(key, strValue);
+ } else if (longValue != null) {
+ kvEntry = new LongDataEntry(key, longValue);
+ } else if (doubleValue != null) {
+ kvEntry = new DoubleDataEntry(key, doubleValue);
+ } else if (booleanValue != null) {
+ kvEntry = new BooleanDataEntry(key, booleanValue);
+ }
+ return new BasicTsKvEntry(ts, kvEntry);
}
}
diff --git a/dao/src/main/java/org/thingsboard/server/dao/model/sql/TsKvLatestCompositeKey.java b/dao/src/main/java/org/thingsboard/server/dao/model/sql/TsKvLatestCompositeKey.java
index ab6006f..f7bf3b1 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/model/sql/TsKvLatestCompositeKey.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/model/sql/TsKvLatestCompositeKey.java
@@ -16,6 +16,7 @@
package org.thingsboard.server.dao.model.sql;
import lombok.*;
+import org.thingsboard.server.common.data.EntityType;
import javax.persistence.Transient;
import java.io.Serializable;
@@ -29,7 +30,7 @@ public class TsKvLatestCompositeKey implements Serializable{
@Transient
private static final long serialVersionUID = -4089175869616037523L;
- private String entityType;
+ private EntityType entityType;
private UUID entityId;
private String key;
}
\ No newline at end of file
diff --git a/dao/src/main/java/org/thingsboard/server/dao/model/sql/TsKvLatestEntity.java b/dao/src/main/java/org/thingsboard/server/dao/model/sql/TsKvLatestEntity.java
index b9a4543..47a5e00 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/model/sql/TsKvLatestEntity.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/model/sql/TsKvLatestEntity.java
@@ -16,6 +16,7 @@
package org.thingsboard.server.dao.model.sql;
import lombok.Data;
+import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.kv.*;
import org.thingsboard.server.dao.model.ToData;
@@ -31,8 +32,9 @@ import static org.thingsboard.server.dao.model.ModelConstants.*;
public final class TsKvLatestEntity implements ToData<TsKvEntry> {
@Id
+ @Enumerated(EnumType.STRING)
@Column(name = ENTITY_TYPE_COLUMN)
- private String entityType;
+ private EntityType entityType;
@Id
@Column(name = ENTITY_ID_COLUMN)
diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/JpaAttributeDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/JpaAttributeDao.java
index cf69004..c77d339 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/JpaAttributeDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/JpaAttributeDao.java
@@ -83,7 +83,7 @@ public class JpaAttributeDao extends JpaAbstractDaoListeningExecutorService impl
@Override
public ListenableFuture<Void> save(EntityId entityId, String attributeType, AttributeKvEntry attribute) {
AttributeKvEntity entity = new AttributeKvEntity();
- entity.setEntityType(entityId.getEntityType().name());
+ entity.setEntityType(entityId.getEntityType());
entity.setEntityId(entityId.getId());
entity.setAttributeType(attributeType);
entity.setAttributeKey(attribute.getKey());
@@ -104,7 +104,7 @@ public class JpaAttributeDao extends JpaAbstractDaoListeningExecutorService impl
.stream()
.map(key -> {
AttributeKvEntity entityToDelete = new AttributeKvEntity();
- entityToDelete.setEntityType(entityId.getEntityType().name());
+ entityToDelete.setEntityType(entityId.getEntityType());
entityToDelete.setEntityId(entityId.getId());
entityToDelete.setAttributeType(attributeType);
entityToDelete.setAttributeKey(key);
diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/JpaTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/JpaTimeseriesDao.java
index bc056e1..7b64a4f 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/JpaTimeseriesDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/JpaTimeseriesDao.java
@@ -21,6 +21,7 @@ import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.data.domain.PageRequest;
import org.springframework.stereotype.Component;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.kv.Aggregation;
@@ -35,6 +36,7 @@ import org.thingsboard.server.dao.sql.JpaAbstractDaoListeningExecutorService;
import org.thingsboard.server.dao.timeseries.TimeseriesDao;
import javax.annotation.Nullable;
+import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
@@ -73,19 +75,93 @@ public class JpaTimeseriesDao extends JpaAbstractDaoListeningExecutorService imp
if (query.getAggregation() == Aggregation.NONE) {
return findAllAsyncWithLimit(entityId, query);
} else {
- return service.submit(() -> null);
+ long stepTs = query.getStartTs();
+ List<ListenableFuture<TsKvEntry>> futures = new ArrayList<>();
+ while (stepTs < query.getEndTs()) {
+ long startTs = stepTs;
+ long endTs = stepTs + query.getInterval();
+ long ts = startTs + (endTs - startTs) / 2;
+ futures.add(findAndAggregateAsync(entityId, query.getKey(), startTs, endTs, ts, query.getAggregation()));
+ stepTs = endTs;
+ }
+ return Futures.allAsList(futures);
+ }
+ }
+
+ private ListenableFuture<TsKvEntry> findAndAggregateAsync(EntityId entityId, String key, long startTs, long endTs, long ts, Aggregation aggregation) {
+ TsKvEntity entity;
+ switch (aggregation) {
+ case AVG:
+ entity = tsKvRepository.findAvg(
+ entityId.getId(),
+ entityId.getEntityType(),
+ key,
+ startTs,
+ endTs);
+
+ break;
+ case MAX:
+ entity = tsKvRepository.findMax(
+ entityId.getId(),
+ entityId.getEntityType(),
+ key,
+ startTs,
+ endTs);
+
+ break;
+ case MIN:
+ entity = tsKvRepository.findMin(
+ entityId.getId(),
+ entityId.getEntityType(),
+ key,
+ startTs,
+ endTs);
+
+ break;
+ case SUM:
+ entity = tsKvRepository.findSum(
+ entityId.getId(),
+ entityId.getEntityType(),
+ key,
+ startTs,
+ endTs);
+
+ break;
+ case COUNT:
+ entity = tsKvRepository.findCount(
+ entityId.getId(),
+ entityId.getEntityType(),
+ key,
+ startTs,
+ endTs);
+
+ break;
+ default:
+ entity = null;
+ }
+ if (entity != null){
+ entity.setTs(ts);
}
+ return service.submit(() -> DaoUtil.getData(entity));
}
private ListenableFuture<List<TsKvEntry>> findAllAsyncWithLimit(EntityId entityId, TsKvQuery query) {
- return service.submit(() -> null);
+ return service.submit(() ->
+ DaoUtil.convertDataList(
+ tsKvRepository.findAllWithLimit(
+ entityId.getId(),
+ entityId.getEntityType(),
+ query.getKey(),
+ query.getStartTs(),
+ query.getEndTs(),
+ new PageRequest(0, query.getLimit()))));
}
@Override
public ListenableFuture<TsKvEntry> findLatest(EntityId entityId, String key) {
TsKvLatestCompositeKey compositeKey =
new TsKvLatestCompositeKey(
- entityId.getEntityType().name(),
+ entityId.getEntityType(),
entityId.getId(),
key);
return service.submit(() ->
@@ -104,7 +180,7 @@ public class JpaTimeseriesDao extends JpaAbstractDaoListeningExecutorService imp
@Override
public ListenableFuture<Void> save(EntityId entityId, TsKvEntry tsKvEntry, long ttl) {
TsKvEntity entity = new TsKvEntity();
- entity.setEntityType(entityId.getEntityType().name());
+ entity.setEntityType(entityId.getEntityType());
entity.setEntityId(entityId.getId());
entity.setTs(tsKvEntry.getTs());
entity.setKey(tsKvEntry.getKey());
@@ -126,7 +202,7 @@ public class JpaTimeseriesDao extends JpaAbstractDaoListeningExecutorService imp
@Override
public ListenableFuture<Void> saveLatest(EntityId entityId, TsKvEntry tsKvEntry) {
TsKvLatestEntity latestEntity = new TsKvLatestEntity();
- latestEntity.setEntityType(entityId.getEntityType().name());
+ latestEntity.setEntityType(entityId.getEntityType());
latestEntity.setEntityId(entityId.getId());
latestEntity.setTs(tsKvEntry.getTs());
latestEntity.setKey(tsKvEntry.getKey());
diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/TsKvRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/TsKvRepository.java
index 786184e..a3f59cd 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/TsKvRepository.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/TsKvRepository.java
@@ -15,11 +15,76 @@
*/
package org.thingsboard.server.dao.sql.timeseries;
+import org.springframework.data.domain.Pageable;
+import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.CrudRepository;
+import org.springframework.data.repository.query.Param;
+import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.dao.annotation.SqlDao;
import org.thingsboard.server.dao.model.sql.TsKvCompositeKey;
import org.thingsboard.server.dao.model.sql.TsKvEntity;
+import java.util.List;
+import java.util.UUID;
+
@SqlDao
public interface TsKvRepository extends CrudRepository<TsKvEntity, TsKvCompositeKey> {
+
+ @Query("SELECT tskv FROM TsKvEntity tskv WHERE tskv.entityId = :entityId " +
+ "AND tskv.entityType = :entityType AND tskv.key = :entityKey " +
+ "AND tskv.ts > :startTs AND tskv.ts < :endTs ORDER BY tskv.ts DESC")
+ List<TsKvEntity> findAllWithLimit(@Param("entityId") UUID entityId,
+ @Param("entityType") EntityType entityType,
+ @Param("entityKey") String key,
+ @Param("startTs") long startTs,
+ @Param("endTs") long endTs,
+ Pageable pageable);
+
+ @Query("SELECT new TsKvEntity(MAX(tskv.strValue), MAX(tskv.longValue), MAX(tskv.doubleValue)) FROM TsKvEntity tskv " +
+ "WHERE tskv.entityId = :entityId AND tskv.entityType = :entityType " +
+ "AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts < :endTs")
+ TsKvEntity findMax(@Param("entityId") UUID entityId,
+ @Param("entityType") EntityType entityType,
+ @Param("entityKey") String entityKey,
+ @Param("startTs") long startTs,
+ @Param("endTs") long endTs);
+
+ @Query("SELECT new TsKvEntity(MIN(tskv.strValue), MIN(tskv.longValue), MIN(tskv.doubleValue)) FROM TsKvEntity tskv " +
+ "WHERE tskv.entityId = :entityId AND tskv.entityType = :entityType " +
+ "AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts < :endTs")
+ TsKvEntity findMin(@Param("entityId") UUID entityId,
+ @Param("entityType") EntityType entityType,
+ @Param("entityKey") String entityKey,
+ @Param("startTs") long startTs,
+ @Param("endTs") long endTs);
+
+
+ @Query("SELECT new TsKvEntity(COUNT(tskv.booleanValue), COUNT(tskv.strValue), COUNT(tskv.longValue), COUNT(tskv.doubleValue)) FROM TsKvEntity tskv " +
+ "WHERE tskv.entityId = :entityId AND tskv.entityType = :entityType " +
+ "AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts < :endTs")
+ TsKvEntity findCount(@Param("entityId") UUID entityId,
+ @Param("entityType") EntityType entityType,
+ @Param("entityKey") String entityKey,
+ @Param("startTs") long startTs,
+ @Param("endTs") long endTs);
+
+
+ @Query("SELECT new TsKvEntity(AVG(tskv.longValue), AVG(tskv.doubleValue)) FROM TsKvEntity tskv " +
+ "WHERE tskv.entityId = :entityId AND tskv.entityType = :entityType " +
+ "AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts < :endTs")
+ TsKvEntity findAvg(@Param("entityId") UUID entityId,
+ @Param("entityType") EntityType entityType,
+ @Param("entityKey") String entityKey,
+ @Param("startTs") long startTs,
+ @Param("endTs") long endTs);
+
+
+ @Query("SELECT new TsKvEntity(SUM(tskv.longValue), SUM(tskv.doubleValue)) FROM TsKvEntity tskv " +
+ "WHERE tskv.entityId = :entityId AND tskv.entityType = :entityType " +
+ "AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts < :endTs")
+ TsKvEntity findSum(@Param("entityId") UUID entityId,
+ @Param("entityType") EntityType entityType,
+ @Param("entityKey") String entityKey,
+ @Param("startTs") long startTs,
+ @Param("endTs") long endTs);
}