thingsboard-aplcache

aggregation for numeric data types should process both types

12/12/2018 3:03:17 PM

Details

diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/AggregatePartitionsFunction.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/AggregatePartitionsFunction.java
index ac5ee64..b5ebb10 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/AggregatePartitionsFunction.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/AggregatePartitionsFunction.java
@@ -79,7 +79,7 @@ public class AggregatePartitionsFunction implements com.google.common.base.Funct
     }
 
     private void processResultSetRow(Row row, AggregationResult aggResult) {
-        long curCount;
+        long curCount = 0L;
 
         Long curLValue = null;
         Double curDValue = null;
@@ -91,14 +91,17 @@ public class AggregatePartitionsFunction implements com.google.common.base.Funct
         long boolCount = row.getLong(BOOL_CNT_POS);
         long strCount = row.getLong(STR_CNT_POS);
 
-        if (longCount > 0) {
-            aggResult.dataType = DataType.LONG;
-            curCount = longCount;
-            curLValue = getLongValue(row);
-        } else if (doubleCount > 0) {
-            aggResult.dataType = DataType.DOUBLE;
-            curCount = doubleCount;
-            curDValue = getDoubleValue(row);
+        if (longCount > 0 || doubleCount > 0) {
+            if (longCount > 0) {
+                aggResult.dataType = DataType.LONG;
+                curCount += longCount;
+                curLValue = getLongValue(row);
+            }
+            if (doubleCount > 0) {
+                aggResult.dataType = DataType.DOUBLE;
+                curCount += doubleCount;
+                curDValue = getDoubleValue(row);
+            }
         } else if (boolCount > 0) {
             aggResult.dataType = DataType.BOOLEAN;
             curCount = boolCount;
@@ -126,16 +129,20 @@ public class AggregatePartitionsFunction implements com.google.common.base.Funct
         aggResult.count += curCount;
         if (curDValue != null) {
             aggResult.dValue = aggResult.dValue == null ? curDValue : aggResult.dValue + curDValue;
-        } else if (curLValue != null) {
+        }
+        if (curLValue != null) {
             aggResult.lValue = aggResult.lValue == null ? curLValue : aggResult.lValue + curLValue;
         }
     }
 
     private void processMinAggregation(AggregationResult aggResult, Long curLValue, Double curDValue, Boolean curBValue, String curSValue) {
-        if (curDValue != null) {
-            aggResult.dValue = aggResult.dValue == null ? curDValue : Math.min(aggResult.dValue, curDValue);
-        } else if (curLValue != null) {
-            aggResult.lValue = aggResult.lValue == null ? curLValue : Math.min(aggResult.lValue, curLValue);
+        if (curDValue != null || curLValue != null) {
+            if (curDValue != null) {
+                aggResult.dValue = aggResult.dValue == null ? curDValue : Math.min(aggResult.dValue, curDValue);
+            }
+            if (curLValue != null) {
+                aggResult.lValue = aggResult.lValue == null ? curLValue : Math.min(aggResult.lValue, curLValue);
+            }
         } else if (curBValue != null) {
             aggResult.bValue = aggResult.bValue == null ? curBValue : aggResult.bValue && curBValue;
         } else if (curSValue != null && (aggResult.sValue == null || curSValue.compareTo(aggResult.sValue) < 0)) {
@@ -144,10 +151,13 @@ public class AggregatePartitionsFunction implements com.google.common.base.Funct
     }
 
     private void processMaxAggregation(AggregationResult aggResult, Long curLValue, Double curDValue, Boolean curBValue, String curSValue) {
-        if (curDValue != null) {
-            aggResult.dValue = aggResult.dValue == null ? curDValue : Math.max(aggResult.dValue, curDValue);
-        } else if (curLValue != null) {
-            aggResult.lValue = aggResult.lValue == null ? curLValue : Math.max(aggResult.lValue, curLValue);
+        if (curDValue != null || curLValue != null) {
+            if (curDValue != null) {
+                aggResult.dValue = aggResult.dValue == null ? curDValue : Math.max(aggResult.dValue, curDValue);
+            }
+            if (curLValue != null) {
+                aggResult.lValue = aggResult.lValue == null ? curLValue : Math.max(aggResult.lValue, curLValue);
+            }
         } else if (curBValue != null) {
             aggResult.bValue = aggResult.bValue == null ? curBValue : aggResult.bValue || curBValue;
         } else if (curSValue != null && (aggResult.sValue == null || curSValue.compareTo(aggResult.sValue) > 0)) {
@@ -211,20 +221,19 @@ public class AggregatePartitionsFunction implements com.google.common.base.Funct
     private Optional<TsKvEntry> processAvgOrSumResult(AggregationResult aggResult) {
         if (aggResult.count == 0 || (aggResult.dataType == DataType.DOUBLE && aggResult.dValue == null) || (aggResult.dataType == DataType.LONG && aggResult.lValue == null)) {
             return Optional.empty();
-        } else if (aggResult.dataType == DataType.DOUBLE) {
-            return Optional.of(new BasicTsKvEntry(ts, new DoubleDataEntry(key, aggregation == Aggregation.SUM ? aggResult.dValue : (aggResult.dValue / aggResult.count))));
-        } else if (aggResult.dataType == DataType.LONG) {
-            return Optional.of(new BasicTsKvEntry(ts, new LongDataEntry(key, aggregation == Aggregation.SUM ? aggResult.lValue : (aggResult.lValue / aggResult.count))));
+        } else if (aggResult.dataType == DataType.DOUBLE || aggResult.dataType == DataType.LONG) {
+            double sum = Optional.ofNullable(aggResult.dValue).orElse(0.0d) + Optional.ofNullable(aggResult.lValue).orElse(0L);
+            return Optional.of(new BasicTsKvEntry(ts, new DoubleDataEntry(key, aggregation == Aggregation.SUM ? sum : (sum / aggResult.count))));
         }
         return Optional.empty();
     }
 
     private Optional<TsKvEntry> processMinOrMaxResult(AggregationResult aggResult) {
-        if (aggResult.dataType == DataType.DOUBLE) {
-            return Optional.of(new BasicTsKvEntry(ts, new DoubleDataEntry(key, aggResult.dValue)));
-        } else if (aggResult.dataType == DataType.LONG) {
-            return Optional.of(new BasicTsKvEntry(ts, new LongDataEntry(key, aggResult.lValue)));
-        } else if (aggResult.dataType == DataType.STRING) {
+        if (aggResult.dataType == DataType.DOUBLE || aggResult.dataType == DataType.LONG) {
+            double currentD = aggregation == Aggregation.MIN ? Optional.ofNullable(aggResult.dValue).orElse(Double.MAX_VALUE) : Optional.ofNullable(aggResult.dValue).orElse(Double.MIN_VALUE);
+            double currentL = aggregation == Aggregation.MIN ? Optional.ofNullable(aggResult.lValue).orElse(Long.MAX_VALUE) : Optional.ofNullable(aggResult.lValue).orElse(Long.MIN_VALUE);
+            return Optional.of(new BasicTsKvEntry(ts, new DoubleDataEntry(key, aggregation == Aggregation.MIN ? Math.min(currentD, currentL) : Math.max(currentD, currentL))));
+        }  else if (aggResult.dataType == DataType.STRING) {
             return Optional.of(new BasicTsKvEntry(ts, new StringDataEntry(key, aggResult.sValue)));
         } else {
             return Optional.of(new BasicTsKvEntry(ts, new BooleanDataEntry(key, aggResult.bValue)));