thingsboard-aplcache

Details

diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/AggregatePartitionsFunction.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/AggregatePartitionsFunction.java
index f099eec..2362f58 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/AggregatePartitionsFunction.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/AggregatePartitionsFunction.java
@@ -17,6 +17,7 @@ package org.thingsboard.server.dao.timeseries;
 
 import com.datastax.driver.core.ResultSet;
 import com.datastax.driver.core.Row;
+import lombok.extern.slf4j.Slf4j;
 import org.thingsboard.server.common.data.kv.*;
 
 import javax.annotation.Nullable;
@@ -26,6 +27,7 @@ import java.util.Optional;
 /**
  * Created by ashvayka on 20.02.17.
  */
+@Slf4j
 public class AggregatePartitionsFunction implements com.google.common.base.Function<List<ResultSet>, Optional<TsKvEntry>> {
 
     private static final int LONG_CNT_POS = 0;
@@ -50,111 +52,118 @@ public class AggregatePartitionsFunction implements com.google.common.base.Funct
     @Nullable
     @Override
     public Optional<TsKvEntry> apply(@Nullable List<ResultSet> rsList) {
-        if (rsList == null || rsList.size() == 0) {
-            return Optional.empty();
-        }
-        long count = 0;
-        DataType dataType = null;
-
-        Boolean bValue = null;
-        String sValue = null;
-        Double dValue = null;
-        Long lValue = null;
-
-        for (ResultSet rs : rsList) {
-            for (Row row : rs.all()) {
-                long curCount;
-
-                Long curLValue = null;
-                Double curDValue = null;
-                Boolean curBValue = null;
-                String curSValue = null;
-
-                long longCount = row.getLong(LONG_CNT_POS);
-                long doubleCount = row.getLong(DOUBLE_CNT_POS);
-                long boolCount = row.getLong(BOOL_CNT_POS);
-                long strCount = row.getLong(STR_CNT_POS);
-
-                if (longCount > 0) {
-                    dataType = DataType.LONG;
-                    curCount = longCount;
-                    curLValue = getLongValue(row);
-                } else if (doubleCount > 0) {
-                    dataType = DataType.DOUBLE;
-                    curCount = doubleCount;
-                    curDValue = getDoubleValue(row);
-                } else if (boolCount > 0) {
-                    dataType = DataType.BOOLEAN;
-                    curCount = boolCount;
-                    curBValue = getBooleanValue(row);
-                } else if (strCount > 0) {
-                    dataType = DataType.STRING;
-                    curCount = strCount;
-                    curSValue = getStringValue(row);
-                } else {
-                    continue;
-                }
-
-                if (aggregation == Aggregation.COUNT) {
-                    count += curCount;
-                } else if (aggregation == Aggregation.AVG || aggregation == Aggregation.SUM) {
-                    count += curCount;
-                    if (curDValue != null) {
-                        dValue = dValue == null ? curDValue : dValue + curDValue;
-                    } else if (curLValue != null) {
-                        lValue = lValue == null ? curLValue : lValue + curLValue;
+        try {
+            log.trace("[{}][{}][{}] Going to aggregate data", key, ts, aggregation);
+            if (rsList == null || rsList.size() == 0) {
+                return Optional.empty();
+            }
+            long count = 0;
+            DataType dataType = null;
+
+            Boolean bValue = null;
+            String sValue = null;
+            Double dValue = null;
+            Long lValue = null;
+
+            for (ResultSet rs : rsList) {
+                for (Row row : rs.all()) {
+                    long curCount;
+
+                    Long curLValue = null;
+                    Double curDValue = null;
+                    Boolean curBValue = null;
+                    String curSValue = null;
+
+                    long longCount = row.getLong(LONG_CNT_POS);
+                    long doubleCount = row.getLong(DOUBLE_CNT_POS);
+                    long boolCount = row.getLong(BOOL_CNT_POS);
+                    long strCount = row.getLong(STR_CNT_POS);
+
+                    if (longCount > 0) {
+                        dataType = DataType.LONG;
+                        curCount = longCount;
+                        curLValue = getLongValue(row);
+                    } else if (doubleCount > 0) {
+                        dataType = DataType.DOUBLE;
+                        curCount = doubleCount;
+                        curDValue = getDoubleValue(row);
+                    } else if (boolCount > 0) {
+                        dataType = DataType.BOOLEAN;
+                        curCount = boolCount;
+                        curBValue = getBooleanValue(row);
+                    } else if (strCount > 0) {
+                        dataType = DataType.STRING;
+                        curCount = strCount;
+                        curSValue = getStringValue(row);
+                    } else {
+                        continue;
                     }
-                } else if (aggregation == Aggregation.MIN) {
-                    if (curDValue != null) {
-                        dValue = dValue == null ? curDValue : Math.min(dValue, curDValue);
-                    } else if (curLValue != null) {
-                        lValue = lValue == null ? curLValue : Math.min(lValue, curLValue);
-                    } else if (curBValue != null) {
-                        bValue = bValue == null ? curBValue : bValue && curBValue;
-                    } else if (curSValue != null) {
-                        if (sValue == null || curSValue.compareTo(sValue) < 0) {
-                            sValue = curSValue;
+
+                    if (aggregation == Aggregation.COUNT) {
+                        count += curCount;
+                    } else if (aggregation == Aggregation.AVG || aggregation == Aggregation.SUM) {
+                        count += curCount;
+                        if (curDValue != null) {
+                            dValue = dValue == null ? curDValue : dValue + curDValue;
+                        } else if (curLValue != null) {
+                            lValue = lValue == null ? curLValue : lValue + curLValue;
                         }
-                    }
-                } else if (aggregation == Aggregation.MAX) {
-                    if (curDValue != null) {
-                        dValue = dValue == null ? curDValue : Math.max(dValue, curDValue);
-                    } else if (curLValue != null) {
-                        lValue = lValue == null ? curLValue : Math.max(lValue, curLValue);
-                    } else if (curBValue != null) {
-                        bValue = bValue == null ? curBValue : bValue || curBValue;
-                    } else if (curSValue != null) {
-                        if (sValue == null || curSValue.compareTo(sValue) > 0) {
-                            sValue = curSValue;
+                    } else if (aggregation == Aggregation.MIN) {
+                        if (curDValue != null) {
+                            dValue = dValue == null ? curDValue : Math.min(dValue, curDValue);
+                        } else if (curLValue != null) {
+                            lValue = lValue == null ? curLValue : Math.min(lValue, curLValue);
+                        } else if (curBValue != null) {
+                            bValue = bValue == null ? curBValue : bValue && curBValue;
+                        } else if (curSValue != null) {
+                            if (sValue == null || curSValue.compareTo(sValue) < 0) {
+                                sValue = curSValue;
+                            }
+                        }
+                    } else if (aggregation == Aggregation.MAX) {
+                        if (curDValue != null) {
+                            dValue = dValue == null ? curDValue : Math.max(dValue, curDValue);
+                        } else if (curLValue != null) {
+                            lValue = lValue == null ? curLValue : Math.max(lValue, curLValue);
+                        } else if (curBValue != null) {
+                            bValue = bValue == null ? curBValue : bValue || curBValue;
+                        } else if (curSValue != null) {
+                            if (sValue == null || curSValue.compareTo(sValue) > 0) {
+                                sValue = curSValue;
+                            }
                         }
                     }
                 }
             }
-        }
-        if (dataType == null) {
-            return Optional.empty();
-        } else if (aggregation == Aggregation.COUNT) {
-            return Optional.of(new BasicTsKvEntry(ts, new LongDataEntry(key, (long) count)));
-        } else if (aggregation == Aggregation.AVG || aggregation == Aggregation.SUM) {
-            if (count == 0 || (dataType == DataType.DOUBLE && dValue == null) || (dataType == DataType.LONG && lValue == null)) {
+            if (dataType == null) {
                 return Optional.empty();
-            } else if (dataType == DataType.DOUBLE) {
-                return Optional.of(new BasicTsKvEntry(ts, new DoubleDataEntry(key, aggregation == Aggregation.SUM ? dValue : (dValue / count))));
-            } else if (dataType == DataType.LONG) {
-                return Optional.of(new BasicTsKvEntry(ts, new LongDataEntry(key, aggregation == Aggregation.SUM ? lValue : (lValue / count))));
-            }
-        } else if (aggregation == Aggregation.MIN || aggregation == Aggregation.MAX) {
-            if (dataType == DataType.DOUBLE) {
-                return Optional.of(new BasicTsKvEntry(ts, new DoubleDataEntry(key, dValue)));
-            } else if (dataType == DataType.LONG) {
-                return Optional.of(new BasicTsKvEntry(ts, new LongDataEntry(key, lValue)));
-            } else if (dataType == DataType.STRING) {
-                return Optional.of(new BasicTsKvEntry(ts, new StringDataEntry(key, sValue)));
-            } else {
-                return Optional.of(new BasicTsKvEntry(ts, new BooleanDataEntry(key, bValue)));
+            } else if (aggregation == Aggregation.COUNT) {
+                return Optional.of(new BasicTsKvEntry(ts, new LongDataEntry(key, (long) count)));
+            } else if (aggregation == Aggregation.AVG || aggregation == Aggregation.SUM) {
+                if (count == 0 || (dataType == DataType.DOUBLE && dValue == null) || (dataType == DataType.LONG && lValue == null)) {
+                    return Optional.empty();
+                } else if (dataType == DataType.DOUBLE) {
+                    return Optional.of(new BasicTsKvEntry(ts, new DoubleDataEntry(key, aggregation == Aggregation.SUM ? dValue : (dValue / count))));
+                } else if (dataType == DataType.LONG) {
+                    return Optional.of(new BasicTsKvEntry(ts, new LongDataEntry(key, aggregation == Aggregation.SUM ? lValue : (lValue / count))));
+                }
+            } else if (aggregation == Aggregation.MIN || aggregation == Aggregation.MAX) {
+                if (dataType == DataType.DOUBLE) {
+                    return Optional.of(new BasicTsKvEntry(ts, new DoubleDataEntry(key, dValue)));
+                } else if (dataType == DataType.LONG) {
+                    return Optional.of(new BasicTsKvEntry(ts, new LongDataEntry(key, lValue)));
+                } else if (dataType == DataType.STRING) {
+                    return Optional.of(new BasicTsKvEntry(ts, new StringDataEntry(key, sValue)));
+                } else {
+                    return Optional.of(new BasicTsKvEntry(ts, new BooleanDataEntry(key, bValue)));
+                }
             }
+            log.trace("[{}][{}][{}] Aggregated data is empty.", key, ts, aggregation);
+            return Optional.empty();
+        }catch (Exception e){
+            log.error("[{}][{}][{}] Failed to aggregate data", key, ts, aggregation, e);
+            return Optional.empty();
         }
-        return null;
     }
 
     private Boolean getBooleanValue(Row row) {
diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesDao.java
index c7584fe..cbf27fa 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesDao.java
@@ -215,6 +215,7 @@ public class BaseTimeseriesDao extends AbstractAsyncDao implements TimeseriesDao
                 PreparedStatement proto = getFetchStmt(aggregation);
                 List<ResultSetFuture> futures = new ArrayList<>(partitions.size());
                 for (Long partition : partitions) {
+                    log.trace("Fetching data for partition [{}] for entityType {} and entityId {}", partition, entityType, entityId);
                     BoundStatement stmt = proto.bind();
                     stmt.setString(0, entityType);
                     stmt.setUUID(1, entityId);