diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/AggregatePartitionsFunction.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/AggregatePartitionsFunction.java
index f099eec..2362f58 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/AggregatePartitionsFunction.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/AggregatePartitionsFunction.java
@@ -17,6 +17,7 @@ package org.thingsboard.server.dao.timeseries;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
+import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.common.data.kv.*;
import javax.annotation.Nullable;
@@ -26,6 +27,7 @@ import java.util.Optional;
/**
* Created by ashvayka on 20.02.17.
*/
+@Slf4j
public class AggregatePartitionsFunction implements com.google.common.base.Function<List<ResultSet>, Optional<TsKvEntry>> {
private static final int LONG_CNT_POS = 0;
@@ -50,111 +52,118 @@ public class AggregatePartitionsFunction implements com.google.common.base.Funct
@Nullable
@Override
public Optional<TsKvEntry> apply(@Nullable List<ResultSet> rsList) {
- if (rsList == null || rsList.size() == 0) {
- return Optional.empty();
- }
- long count = 0;
- DataType dataType = null;
-
- Boolean bValue = null;
- String sValue = null;
- Double dValue = null;
- Long lValue = null;
-
- for (ResultSet rs : rsList) {
- for (Row row : rs.all()) {
- long curCount;
-
- Long curLValue = null;
- Double curDValue = null;
- Boolean curBValue = null;
- String curSValue = null;
-
- long longCount = row.getLong(LONG_CNT_POS);
- long doubleCount = row.getLong(DOUBLE_CNT_POS);
- long boolCount = row.getLong(BOOL_CNT_POS);
- long strCount = row.getLong(STR_CNT_POS);
-
- if (longCount > 0) {
- dataType = DataType.LONG;
- curCount = longCount;
- curLValue = getLongValue(row);
- } else if (doubleCount > 0) {
- dataType = DataType.DOUBLE;
- curCount = doubleCount;
- curDValue = getDoubleValue(row);
- } else if (boolCount > 0) {
- dataType = DataType.BOOLEAN;
- curCount = boolCount;
- curBValue = getBooleanValue(row);
- } else if (strCount > 0) {
- dataType = DataType.STRING;
- curCount = strCount;
- curSValue = getStringValue(row);
- } else {
- continue;
- }
-
- if (aggregation == Aggregation.COUNT) {
- count += curCount;
- } else if (aggregation == Aggregation.AVG || aggregation == Aggregation.SUM) {
- count += curCount;
- if (curDValue != null) {
- dValue = dValue == null ? curDValue : dValue + curDValue;
- } else if (curLValue != null) {
- lValue = lValue == null ? curLValue : lValue + curLValue;
+ try {
+ log.trace("[{}][{}][{}] Going to aggregate data", key, ts, aggregation);
+ if (rsList == null || rsList.size() == 0) {
+ return Optional.empty();
+ }
+ long count = 0;
+ DataType dataType = null;
+
+ Boolean bValue = null;
+ String sValue = null;
+ Double dValue = null;
+ Long lValue = null;
+
+ for (ResultSet rs : rsList) {
+ for (Row row : rs.all()) {
+ long curCount;
+
+ Long curLValue = null;
+ Double curDValue = null;
+ Boolean curBValue = null;
+ String curSValue = null;
+
+ long longCount = row.getLong(LONG_CNT_POS);
+ long doubleCount = row.getLong(DOUBLE_CNT_POS);
+ long boolCount = row.getLong(BOOL_CNT_POS);
+ long strCount = row.getLong(STR_CNT_POS);
+
+ if (longCount > 0) {
+ dataType = DataType.LONG;
+ curCount = longCount;
+ curLValue = getLongValue(row);
+ } else if (doubleCount > 0) {
+ dataType = DataType.DOUBLE;
+ curCount = doubleCount;
+ curDValue = getDoubleValue(row);
+ } else if (boolCount > 0) {
+ dataType = DataType.BOOLEAN;
+ curCount = boolCount;
+ curBValue = getBooleanValue(row);
+ } else if (strCount > 0) {
+ dataType = DataType.STRING;
+ curCount = strCount;
+ curSValue = getStringValue(row);
+ } else {
+ continue;
}
- } else if (aggregation == Aggregation.MIN) {
- if (curDValue != null) {
- dValue = dValue == null ? curDValue : Math.min(dValue, curDValue);
- } else if (curLValue != null) {
- lValue = lValue == null ? curLValue : Math.min(lValue, curLValue);
- } else if (curBValue != null) {
- bValue = bValue == null ? curBValue : bValue && curBValue;
- } else if (curSValue != null) {
- if (sValue == null || curSValue.compareTo(sValue) < 0) {
- sValue = curSValue;
+
+ if (aggregation == Aggregation.COUNT) {
+ count += curCount;
+ } else if (aggregation == Aggregation.AVG || aggregation == Aggregation.SUM) {
+ count += curCount;
+ if (curDValue != null) {
+ dValue = dValue == null ? curDValue : dValue + curDValue;
+ } else if (curLValue != null) {
+ lValue = lValue == null ? curLValue : lValue + curLValue;
}
- }
- } else if (aggregation == Aggregation.MAX) {
- if (curDValue != null) {
- dValue = dValue == null ? curDValue : Math.max(dValue, curDValue);
- } else if (curLValue != null) {
- lValue = lValue == null ? curLValue : Math.max(lValue, curLValue);
- } else if (curBValue != null) {
- bValue = bValue == null ? curBValue : bValue || curBValue;
- } else if (curSValue != null) {
- if (sValue == null || curSValue.compareTo(sValue) > 0) {
- sValue = curSValue;
+ } else if (aggregation == Aggregation.MIN) {
+ if (curDValue != null) {
+ dValue = dValue == null ? curDValue : Math.min(dValue, curDValue);
+ } else if (curLValue != null) {
+ lValue = lValue == null ? curLValue : Math.min(lValue, curLValue);
+ } else if (curBValue != null) {
+ bValue = bValue == null ? curBValue : bValue && curBValue;
+ } else if (curSValue != null) {
+ if (sValue == null || curSValue.compareTo(sValue) < 0) {
+ sValue = curSValue;
+ }
+ }
+ } else if (aggregation == Aggregation.MAX) {
+ if (curDValue != null) {
+ dValue = dValue == null ? curDValue : Math.max(dValue, curDValue);
+ } else if (curLValue != null) {
+ lValue = lValue == null ? curLValue : Math.max(lValue, curLValue);
+ } else if (curBValue != null) {
+ bValue = bValue == null ? curBValue : bValue || curBValue;
+ } else if (curSValue != null) {
+ if (sValue == null || curSValue.compareTo(sValue) > 0) {
+ sValue = curSValue;
+ }
}
}
}
}
- }
- if (dataType == null) {
- return Optional.empty();
- } else if (aggregation == Aggregation.COUNT) {
- return Optional.of(new BasicTsKvEntry(ts, new LongDataEntry(key, (long) count)));
- } else if (aggregation == Aggregation.AVG || aggregation == Aggregation.SUM) {
- if (count == 0 || (dataType == DataType.DOUBLE && dValue == null) || (dataType == DataType.LONG && lValue == null)) {
+ if (dataType == null) {
return Optional.empty();
- } else if (dataType == DataType.DOUBLE) {
- return Optional.of(new BasicTsKvEntry(ts, new DoubleDataEntry(key, aggregation == Aggregation.SUM ? dValue : (dValue / count))));
- } else if (dataType == DataType.LONG) {
- return Optional.of(new BasicTsKvEntry(ts, new LongDataEntry(key, aggregation == Aggregation.SUM ? lValue : (lValue / count))));
- }
- } else if (aggregation == Aggregation.MIN || aggregation == Aggregation.MAX) {
- if (dataType == DataType.DOUBLE) {
- return Optional.of(new BasicTsKvEntry(ts, new DoubleDataEntry(key, dValue)));
- } else if (dataType == DataType.LONG) {
- return Optional.of(new BasicTsKvEntry(ts, new LongDataEntry(key, lValue)));
- } else if (dataType == DataType.STRING) {
- return Optional.of(new BasicTsKvEntry(ts, new StringDataEntry(key, sValue)));
- } else {
- return Optional.of(new BasicTsKvEntry(ts, new BooleanDataEntry(key, bValue)));
+ } else if (aggregation == Aggregation.COUNT) {
+ return Optional.of(new BasicTsKvEntry(ts, new LongDataEntry(key, (long) count)));
+ } else if (aggregation == Aggregation.AVG || aggregation == Aggregation.SUM) {
+ if (count == 0 || (dataType == DataType.DOUBLE && dValue == null) || (dataType == DataType.LONG && lValue == null)) {
+ return Optional.empty();
+ } else if (dataType == DataType.DOUBLE) {
+ return Optional.of(new BasicTsKvEntry(ts, new DoubleDataEntry(key, aggregation == Aggregation.SUM ? dValue : (dValue / count))));
+ } else if (dataType == DataType.LONG) {
+ return Optional.of(new BasicTsKvEntry(ts, new LongDataEntry(key, aggregation == Aggregation.SUM ? lValue : (lValue / count))));
+ }
+ } else if (aggregation == Aggregation.MIN || aggregation == Aggregation.MAX) {
+ if (dataType == DataType.DOUBLE) {
+ return Optional.of(new BasicTsKvEntry(ts, new DoubleDataEntry(key, dValue)));
+ } else if (dataType == DataType.LONG) {
+ return Optional.of(new BasicTsKvEntry(ts, new LongDataEntry(key, lValue)));
+ } else if (dataType == DataType.STRING) {
+ return Optional.of(new BasicTsKvEntry(ts, new StringDataEntry(key, sValue)));
+ } else {
+ return Optional.of(new BasicTsKvEntry(ts, new BooleanDataEntry(key, bValue)));
+ }
}
+ log.trace("[{}][{}][{}] Aggregated data is empty.", key, ts, aggregation);
+ return Optional.empty();
+ }catch (Exception e){
+ log.error("[{}][{}][{}] Failed to aggregate data", key, ts, aggregation, e);
+ return Optional.empty();
}
- return null;
}
private Boolean getBooleanValue(Row row) {
diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesDao.java
index c7584fe..cbf27fa 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesDao.java
@@ -215,6 +215,7 @@ public class BaseTimeseriesDao extends AbstractAsyncDao implements TimeseriesDao
PreparedStatement proto = getFetchStmt(aggregation);
List<ResultSetFuture> futures = new ArrayList<>(partitions.size());
for (Long partition : partitions) {
+ log.trace("Fetching data for partition [{}] for entityType {} and entityId {}", partition, entityType, entityId);
BoundStatement stmt = proto.bind();
stmt.setString(0, entityType);
stmt.setUUID(1, entityId);