diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/TsKvRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/TsKvRepository.java
index 296d173..92bdf7e 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/TsKvRepository.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/TsKvRepository.java
@@ -55,7 +55,7 @@ public interface TsKvRepository extends CrudRepository<TsKvEntity, TsKvComposite
@Param("endTs") long endTs);
@Async
- @Query("SELECT new TsKvEntity(MAX(tskv.strValue), MAX(tskv.longValue), MAX(tskv.doubleValue)) FROM TsKvEntity tskv " +
+ @Query("SELECT new TsKvEntity(MAX(tskv.strValue), MAX(tskv.longValue), MAX(tskv.doubleValue), true) FROM TsKvEntity tskv " +
"WHERE tskv.entityId = :entityId AND tskv.entityType = :entityType " +
"AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts < :endTs")
CompletableFuture<TsKvEntity> findMax(@Param("entityId") String entityId,
@@ -65,7 +65,7 @@ public interface TsKvRepository extends CrudRepository<TsKvEntity, TsKvComposite
@Param("endTs") long endTs);
@Async
- @Query("SELECT new TsKvEntity(MIN(tskv.strValue), MIN(tskv.longValue), MIN(tskv.doubleValue)) FROM TsKvEntity tskv " +
+ @Query("SELECT new TsKvEntity(MIN(tskv.strValue), MIN(tskv.longValue), MIN(tskv.doubleValue), false) FROM TsKvEntity tskv " +
"WHERE tskv.entityId = :entityId AND tskv.entityType = :entityType " +
"AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts < :endTs")
CompletableFuture<TsKvEntity> findMin(@Param("entityId") String entityId,
@@ -85,7 +85,7 @@ public interface TsKvRepository extends CrudRepository<TsKvEntity, TsKvComposite
@Param("endTs") long endTs);
@Async
- @Query("SELECT new TsKvEntity(AVG(tskv.longValue), AVG(tskv.doubleValue)) FROM TsKvEntity tskv " +
+ @Query("SELECT new TsKvEntity(SUM(tskv.longValue), SUM(tskv.doubleValue), COUNT(tskv.longValue), COUNT(tskv.doubleValue)) FROM TsKvEntity tskv " +
"WHERE tskv.entityId = :entityId AND tskv.entityType = :entityType " +
"AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts < :endTs")
CompletableFuture<TsKvEntity> findAvg(@Param("entityId") String entityId,
diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/AggregatePartitionsFunction.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/AggregatePartitionsFunction.java
index b5ebb10..ea1e8f1 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/AggregatePartitionsFunction.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/AggregatePartitionsFunction.java
@@ -98,6 +98,7 @@ public class AggregatePartitionsFunction implements com.google.common.base.Funct
curLValue = getLongValue(row);
}
if (doubleCount > 0) {
+ aggResult.hasDouble = true;
aggResult.dataType = DataType.DOUBLE;
curCount += doubleCount;
curDValue = getDoubleValue(row);
@@ -222,17 +223,25 @@ public class AggregatePartitionsFunction implements com.google.common.base.Funct
if (aggResult.count == 0 || (aggResult.dataType == DataType.DOUBLE && aggResult.dValue == null) || (aggResult.dataType == DataType.LONG && aggResult.lValue == null)) {
return Optional.empty();
} else if (aggResult.dataType == DataType.DOUBLE || aggResult.dataType == DataType.LONG) {
- double sum = Optional.ofNullable(aggResult.dValue).orElse(0.0d) + Optional.ofNullable(aggResult.lValue).orElse(0L);
- return Optional.of(new BasicTsKvEntry(ts, new DoubleDataEntry(key, aggregation == Aggregation.SUM ? sum : (sum / aggResult.count))));
+ if(aggregation == Aggregation.AVG || aggResult.hasDouble) {
+ double sum = Optional.ofNullable(aggResult.dValue).orElse(0.0d) + Optional.ofNullable(aggResult.lValue).orElse(0L);
+ return Optional.of(new BasicTsKvEntry(ts, new DoubleDataEntry(key, aggregation == Aggregation.SUM ? sum : (sum / aggResult.count))));
+ } else {
+ return Optional.of(new BasicTsKvEntry(ts, new LongDataEntry(key, aggregation == Aggregation.SUM ? aggResult.lValue : (aggResult.lValue / aggResult.count))));
+ }
}
return Optional.empty();
}
private Optional<TsKvEntry> processMinOrMaxResult(AggregationResult aggResult) {
if (aggResult.dataType == DataType.DOUBLE || aggResult.dataType == DataType.LONG) {
- double currentD = aggregation == Aggregation.MIN ? Optional.ofNullable(aggResult.dValue).orElse(Double.MAX_VALUE) : Optional.ofNullable(aggResult.dValue).orElse(Double.MIN_VALUE);
- double currentL = aggregation == Aggregation.MIN ? Optional.ofNullable(aggResult.lValue).orElse(Long.MAX_VALUE) : Optional.ofNullable(aggResult.lValue).orElse(Long.MIN_VALUE);
- return Optional.of(new BasicTsKvEntry(ts, new DoubleDataEntry(key, aggregation == Aggregation.MIN ? Math.min(currentD, currentL) : Math.max(currentD, currentL))));
+ if(aggResult.hasDouble) {
+ double currentD = aggregation == Aggregation.MIN ? Optional.ofNullable(aggResult.dValue).orElse(Double.MAX_VALUE) : Optional.ofNullable(aggResult.dValue).orElse(Double.MIN_VALUE);
+ double currentL = aggregation == Aggregation.MIN ? Optional.ofNullable(aggResult.lValue).orElse(Long.MAX_VALUE) : Optional.ofNullable(aggResult.lValue).orElse(Long.MIN_VALUE);
+ return Optional.of(new BasicTsKvEntry(ts, new DoubleDataEntry(key, aggregation == Aggregation.MIN ? Math.min(currentD, currentL) : Math.max(currentD, currentL))));
+ } else {
+ return Optional.of(new BasicTsKvEntry(ts, new LongDataEntry(key, aggResult.lValue)));
+ }
} else if (aggResult.dataType == DataType.STRING) {
return Optional.of(new BasicTsKvEntry(ts, new StringDataEntry(key, aggResult.sValue)));
} else {
@@ -247,5 +256,6 @@ public class AggregatePartitionsFunction implements com.google.common.base.Funct
Double dValue = null;
Long lValue = null;
long count = 0;
+ boolean hasDouble = false;
}
}
diff --git a/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java b/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java
index b409dea..7378bcd 100644
--- a/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java
+++ b/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java
@@ -221,13 +221,13 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
60000, 20000, 3, Aggregation.AVG))).get();
assertEquals(3, list.size());
assertEquals(10000, list.get(0).getTs());
- assertEquals(java.util.Optional.of(150L), list.get(0).getLongValue());
+ assertEquals(java.util.Optional.of(150.0), list.get(0).getDoubleValue());
assertEquals(30000, list.get(1).getTs());
- assertEquals(java.util.Optional.of(350L), list.get(1).getLongValue());
+ assertEquals(java.util.Optional.of(350.0), list.get(1).getDoubleValue());
assertEquals(50000, list.get(2).getTs());
- assertEquals(java.util.Optional.of(550L), list.get(2).getLongValue());
+ assertEquals(java.util.Optional.of(550.0), list.get(2).getDoubleValue());
list = tsService.findAll(tenantId, deviceId, Collections.singletonList(new BaseReadTsKvQuery(LONG_KEY, 0,
60000, 20000, 3, Aggregation.SUM))).get();
@@ -282,12 +282,110 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
assertEquals(java.util.Optional.of(2L), list.get(2).getLongValue());
}
+ @Test
+ public void testFindDeviceLongAndDoubleTsData() throws Exception {
+ DeviceId deviceId = new DeviceId(UUIDs.timeBased());
+ List<TsKvEntry> entries = new ArrayList<>();
+
+ entries.add(save(deviceId, 5000, 100));
+ entries.add(save(deviceId, 15000, 200.0));
+
+ entries.add(save(deviceId, 25000, 300));
+ entries.add(save(deviceId, 35000, 400.0));
+
+ entries.add(save(deviceId, 45000, 500));
+ entries.add(save(deviceId, 55000, 600.0));
+
+ List<TsKvEntry> list = tsService.findAll(tenantId, deviceId, Collections.singletonList(new BaseReadTsKvQuery(LONG_KEY, 0,
+ 60000, 20000, 3, Aggregation.NONE))).get();
+ assertEquals(3, list.size());
+ assertEquals(55000, list.get(0).getTs());
+ assertEquals(java.util.Optional.of(600.0), list.get(0).getDoubleValue());
+
+ assertEquals(45000, list.get(1).getTs());
+ assertEquals(java.util.Optional.of(500L), list.get(1).getLongValue());
+
+ assertEquals(35000, list.get(2).getTs());
+ assertEquals(java.util.Optional.of(400.0), list.get(2).getDoubleValue());
+
+ list = tsService.findAll(tenantId, deviceId, Collections.singletonList(new BaseReadTsKvQuery(LONG_KEY, 0,
+ 60000, 20000, 3, Aggregation.AVG))).get();
+ assertEquals(3, list.size());
+ assertEquals(10000, list.get(0).getTs());
+ assertEquals(java.util.Optional.of(150.0), list.get(0).getDoubleValue());
+
+ assertEquals(30000, list.get(1).getTs());
+ assertEquals(java.util.Optional.of(350.0), list.get(1).getDoubleValue());
+
+ assertEquals(50000, list.get(2).getTs());
+ assertEquals(java.util.Optional.of(550.0), list.get(2).getDoubleValue());
+
+ list = tsService.findAll(tenantId, deviceId, Collections.singletonList(new BaseReadTsKvQuery(LONG_KEY, 0,
+ 60000, 20000, 3, Aggregation.SUM))).get();
+
+ assertEquals(3, list.size());
+ assertEquals(10000, list.get(0).getTs());
+ assertEquals(java.util.Optional.of(300.0), list.get(0).getDoubleValue());
+
+ assertEquals(30000, list.get(1).getTs());
+ assertEquals(java.util.Optional.of(700.0), list.get(1).getDoubleValue());
+
+ assertEquals(50000, list.get(2).getTs());
+ assertEquals(java.util.Optional.of(1100.0), list.get(2).getDoubleValue());
+
+ list = tsService.findAll(tenantId, deviceId, Collections.singletonList(new BaseReadTsKvQuery(LONG_KEY, 0,
+ 60000, 20000, 3, Aggregation.MIN))).get();
+
+ assertEquals(3, list.size());
+ assertEquals(10000, list.get(0).getTs());
+ assertEquals(java.util.Optional.of(100.0), list.get(0).getDoubleValue());
+
+ assertEquals(30000, list.get(1).getTs());
+ assertEquals(java.util.Optional.of(300.0), list.get(1).getDoubleValue());
+
+ assertEquals(50000, list.get(2).getTs());
+ assertEquals(java.util.Optional.of(500.0), list.get(2).getDoubleValue());
+
+ list = tsService.findAll(tenantId, deviceId, Collections.singletonList(new BaseReadTsKvQuery(LONG_KEY, 0,
+ 60000, 20000, 3, Aggregation.MAX))).get();
+
+ assertEquals(3, list.size());
+ assertEquals(10000, list.get(0).getTs());
+ assertEquals(java.util.Optional.of(200.0), list.get(0).getDoubleValue());
+
+ assertEquals(30000, list.get(1).getTs());
+ assertEquals(java.util.Optional.of(400.0), list.get(1).getDoubleValue());
+
+ assertEquals(50000, list.get(2).getTs());
+ assertEquals(java.util.Optional.of(600.0), list.get(2).getDoubleValue());
+
+ list = tsService.findAll(tenantId, deviceId, Collections.singletonList(new BaseReadTsKvQuery(LONG_KEY, 0,
+ 60000, 20000, 3, Aggregation.COUNT))).get();
+
+ assertEquals(3, list.size());
+ assertEquals(10000, list.get(0).getTs());
+ assertEquals(java.util.Optional.of(2L), list.get(0).getLongValue());
+
+ assertEquals(30000, list.get(1).getTs());
+ assertEquals(java.util.Optional.of(2L), list.get(1).getLongValue());
+
+ assertEquals(50000, list.get(2).getTs());
+ assertEquals(java.util.Optional.of(2L), list.get(2).getLongValue());
+ }
+
private TsKvEntry save(DeviceId deviceId, long ts, long value) throws Exception {
TsKvEntry entry = new BasicTsKvEntry(ts, new LongDataEntry(LONG_KEY, value));
tsService.save(tenantId, deviceId, entry).get();
return entry;
}
+ private TsKvEntry save(DeviceId deviceId, long ts, double value) throws Exception {
+ TsKvEntry entry = new BasicTsKvEntry(ts, new DoubleDataEntry(LONG_KEY, value));
+ tsService.save(tenantId, deviceId, entry).get();
+ return entry;
+ }
+
+
private void saveEntries(DeviceId deviceId, long ts) throws ExecutionException, InterruptedException {
tsService.save(tenantId, deviceId, toTsEntry(ts, stringKvEntry)).get();
tsService.save(tenantId, deviceId, toTsEntry(ts, longKvEntry)).get();