thingsboard-aplcache

Aggregation Implementation

2/21/2017 8:34:57 AM

Details

diff --git a/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java b/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java
index fbd0cce..9474a62 100644
--- a/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java
+++ b/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java
@@ -151,20 +151,9 @@ public final class PluginProcessingContext implements PluginContext {
     }
 
     @Override
-    public List<TsKvEntry> loadTimeseries(DeviceId deviceId, TsKvQuery query) {
+    public void loadTimeseries(DeviceId deviceId, List<TsKvQuery> queries, PluginCallback<List<TsKvEntry>> callback) {
         validate(deviceId);
-        try {
-            return pluginCtx.tsService.findAll(DataConstants.DEVICE, deviceId, query).get();
-        } catch (Exception e) {
-            log.error("TODO", e);
-            throw new RuntimeException(e);
-        }
-    }
-
-    @Override
-    public void loadTimeseries(DeviceId deviceId, TsKvQuery query, PluginCallback<List<TsKvEntry>> callback) {
-        validate(deviceId);
-        ListenableFuture<List<TsKvEntry>> future = pluginCtx.tsService.findAll(DataConstants.DEVICE, deviceId, query);
+        ListenableFuture<List<TsKvEntry>> future = pluginCtx.tsService.findAll(DataConstants.DEVICE, deviceId, queries);
         Futures.addCallback(future, getCallback(callback, v -> v), executor);
     }
 
diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesDao.java
index 5cf71fc..81bbdf9 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesDao.java
@@ -96,7 +96,21 @@ public class BaseTimeseriesDao extends AbstractDao implements TimeseriesDao {
     }
 
     @Override
-    public ListenableFuture<List<TsKvEntry>> findAllAsync(String entityType, UUID entityId, TsKvQuery query) {
+    public ListenableFuture<List<TsKvEntry>> findAllAsync(String entityType, UUID entityId, List<TsKvQuery> queries) {
+        List<ListenableFuture<List<TsKvEntry>>> futures = queries.stream().map(query -> findAllAsync(entityType, entityId, query)).collect(Collectors.toList());
+        return Futures.transform(Futures.allAsList(futures), new Function<List<List<TsKvEntry>>, List<TsKvEntry>>() {
+            @Nullable
+            @Override
+            public List<TsKvEntry> apply(@Nullable List<List<TsKvEntry>> results) {
+                List<TsKvEntry> result = new ArrayList<TsKvEntry>();
+                results.forEach(r -> result.addAll(r));
+                return result;
+            }
+        }, readResultsProcessingExecutor);
+    }
+
+
+    private ListenableFuture<List<TsKvEntry>> findAllAsync(String entityType, UUID entityId, TsKvQuery query) {
         if (query.getAggregation() == Aggregation.NONE) {
             return findAllAsyncWithLimit(entityType, entityId, query);
         } else {
diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java
index 1d8c3df..f27ed6e 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java
@@ -18,6 +18,7 @@ package org.thingsboard.server.dao.timeseries;
 import com.datastax.driver.core.ResultSet;
 import com.datastax.driver.core.ResultSetFuture;
 import com.datastax.driver.core.Row;
+import com.google.common.base.Function;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
@@ -32,6 +33,7 @@ import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Service;
 import org.thingsboard.server.dao.service.Validator;
 
+import javax.annotation.Nullable;
 import javax.annotation.PostConstruct;
 import javax.annotation.PreDestroy;
 import java.time.Instant;
@@ -40,6 +42,7 @@ import java.time.ZoneOffset;
 import java.util.*;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.stream.Collectors;
 
 import static org.apache.commons.lang3.StringUtils.isBlank;
 
@@ -56,10 +59,10 @@ public class BaseTimeseriesService implements TimeseriesService {
     private TimeseriesDao timeseriesDao;
 
     @Override
-    public ListenableFuture<List<TsKvEntry>> findAll(String entityType, UUIDBased entityId, TsKvQuery query) {
+    public ListenableFuture<List<TsKvEntry>> findAll(String entityType, UUIDBased entityId, List<TsKvQuery> queries) {
         validate(entityType, entityId);
-        validate(query);
-        return timeseriesDao.findAllAsync(entityType, entityId.getId(), query);
+        queries.forEach(query -> validate(query));
+        return timeseriesDao.findAllAsync(entityType, entityId.getId(), queries);
     }
 
     @Override
@@ -132,7 +135,7 @@ public class BaseTimeseriesService implements TimeseriesService {
             throw new IncorrectParameterException("TsKvQuery can't be null");
         } else if (isBlank(query.getKey())) {
             throw new IncorrectParameterException("Incorrect TsKvQuery. Key can't be empty");
-        } else  if (query.getAggregation() == null){
+        } else if (query.getAggregation() == null) {
             throw new IncorrectParameterException("Incorrect TsKvQuery. Aggregation can't be empty");
         }
     }
diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesDao.java
index 7a6eed7..177003d 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesDao.java
@@ -33,9 +33,7 @@ public interface TimeseriesDao {
 
     long toPartitionTs(long ts);
 
-    ListenableFuture<List<TsKvEntry>> findAllAsync(String entityType, UUID entityId, TsKvQuery query);
-
-//    List<TsKvEntry> find(String entityType, UUID entityId, TsKvQuery query, Optional<Long> minPartition, Optional<Long> maxPartition);
+    ListenableFuture<List<TsKvEntry>> findAllAsync(String entityType, UUID entityId, List<TsKvQuery> queries);
 
     ResultSetFuture findLatest(String entityType, UUID entityId, String key);
 
diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java
index 1bafdea..cd53e94 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java
@@ -33,7 +33,7 @@ import java.util.Set;
  */
 public interface TimeseriesService {
 
-    ListenableFuture<List<TsKvEntry>> findAll(String entityType, UUIDBased entityId, TsKvQuery query);
+    ListenableFuture<List<TsKvEntry>> findAll(String entityType, UUIDBased entityId, List<TsKvQuery> queries);
 
     ListenableFuture<List<ResultSet>> findLatest(String entityType, UUIDBased entityId, Collection<String> keys);
 
diff --git a/extensions-api/src/main/java/org/thingsboard/server/extensions/api/plugins/PluginContext.java b/extensions-api/src/main/java/org/thingsboard/server/extensions/api/plugins/PluginContext.java
index b5f27a1..c2c5587 100644
--- a/extensions-api/src/main/java/org/thingsboard/server/extensions/api/plugins/PluginContext.java
+++ b/extensions-api/src/main/java/org/thingsboard/server/extensions/api/plugins/PluginContext.java
@@ -82,9 +82,7 @@ public interface PluginContext {
 
     void saveTsData(DeviceId deviceId, List<TsKvEntry> entry, PluginCallback<Void> callback);
 
-    List<TsKvEntry> loadTimeseries(DeviceId deviceId, TsKvQuery query);
-
-    void loadTimeseries(DeviceId deviceId, TsKvQuery query, PluginCallback<List<TsKvEntry>> callback);
+    void loadTimeseries(DeviceId deviceId, List<TsKvQuery> queries, PluginCallback<List<TsKvEntry>> callback);
 
     void loadLatestTimeseries(DeviceId deviceId, Collection<String> keys, PluginCallback<List<TsKvEntry>> callback);
 
diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/cmd/GetHistoryCmd.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/cmd/GetHistoryCmd.java
index 14d7aa1..9f06895 100644
--- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/cmd/GetHistoryCmd.java
+++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/cmd/GetHistoryCmd.java
@@ -15,9 +15,16 @@
  */
 package org.thingsboard.server.extensions.core.plugin.telemetry.cmd;
 
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
 /**
  * @author Andrew Shvayka
  */
+@NoArgsConstructor
+@AllArgsConstructor
+@Data
 public class GetHistoryCmd implements TelemetryPluginCmd {
 
     private int cmdId;
@@ -25,46 +32,7 @@ public class GetHistoryCmd implements TelemetryPluginCmd {
     private String keys;
     private long startTs;
     private long endTs;
+    private int limit;
+    private String agg;
 
-    @Override
-    public int getCmdId() {
-        return cmdId;
-    }
-
-    @Override
-    public void setCmdId(int cmdId) {
-        this.cmdId = cmdId;
-    }
-
-    public String getDeviceId() {
-        return deviceId;
-    }
-
-    public void setDeviceId(String deviceId) {
-        this.deviceId = deviceId;
-    }
-
-    public String getKeys() {
-        return keys;
-    }
-
-    public void setKeys(String keys) {
-        this.keys = keys;
-    }
-
-    public long getStartTs() {
-        return startTs;
-    }
-
-    public void setStartTs(long startTs) {
-        this.startTs = startTs;
-    }
-
-    public long getEndTs() {
-        return endTs;
-    }
-
-    public void setEndTs(long endTs) {
-        this.endTs = endTs;
-    }
 }
diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/cmd/SubscriptionCmd.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/cmd/SubscriptionCmd.java
index 718f23a..3574eae 100644
--- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/cmd/SubscriptionCmd.java
+++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/cmd/SubscriptionCmd.java
@@ -16,11 +16,13 @@
 package org.thingsboard.server.extensions.core.plugin.telemetry.cmd;
 
 import lombok.AllArgsConstructor;
+import lombok.Data;
 import lombok.NoArgsConstructor;
 import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionType;
 
 @NoArgsConstructor
 @AllArgsConstructor
+@Data
 public abstract class SubscriptionCmd implements TelemetryPluginCmd {
 
     private int cmdId;
@@ -31,46 +33,6 @@ public abstract class SubscriptionCmd implements TelemetryPluginCmd {
 
     public abstract SubscriptionType getType();
 
-    public int getCmdId() {
-        return cmdId;
-    }
-
-    public void setCmdId(int cmdId) {
-        this.cmdId = cmdId;
-    }
-
-    public String getDeviceId() {
-        return deviceId;
-    }
-
-    public void setDeviceId(String deviceId) {
-        this.deviceId = deviceId;
-    }
-
-    public String getKeys() {
-        return keys;
-    }
-
-    public void setTags(String tags) {
-        this.keys = tags;
-    }
-
-    public boolean isUnsubscribe() {
-        return unsubscribe;
-    }
-
-    public void setUnsubscribe(boolean unsubscribe) {
-        this.unsubscribe = unsubscribe;
-    }
-
-    public String getScope() {
-        return scope;
-    }
-
-    public void setKeys(String keys) {
-        this.keys = keys;
-    }
-
     @Override
     public String toString() {
         return "SubscriptionCmd [deviceId=" + deviceId + ", tags=" + keys + ", unsubscribe=" + unsubscribe + "]";
diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/cmd/TimeseriesSubscriptionCmd.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/cmd/TimeseriesSubscriptionCmd.java
index 4f24a00..f4eacf5 100644
--- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/cmd/TimeseriesSubscriptionCmd.java
+++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/cmd/TimeseriesSubscriptionCmd.java
@@ -15,6 +15,8 @@
  */
 package org.thingsboard.server.extensions.core.plugin.telemetry.cmd;
 
+import lombok.AllArgsConstructor;
+import lombok.Data;
 import lombok.NoArgsConstructor;
 import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionType;
 
@@ -22,17 +24,13 @@ import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionT
  * @author Andrew Shvayka
  */
 @NoArgsConstructor
+@AllArgsConstructor
+@Data
 public class TimeseriesSubscriptionCmd extends SubscriptionCmd {
 
     private long timeWindow;
-
-    public long getTimeWindow() {
-        return timeWindow;
-    }
-
-    public void setTimeWindow(long timeWindow) {
-        this.timeWindow = timeWindow;
-    }
+    private int limit;
+    private String agg;
 
     @Override
     public SubscriptionType getType() {
diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryWebsocketMsgHandler.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryWebsocketMsgHandler.java
index 5d6e68e..739bedf 100644
--- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryWebsocketMsgHandler.java
+++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryWebsocketMsgHandler.java
@@ -158,40 +158,14 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler {
                         log.debug("[{}] fetching timeseries data for last {} ms for keys: ({}) for device : {}", sessionId, cmd.getTimeWindow(), cmd.getKeys(), cmd.getDeviceId());
                         long endTs = System.currentTimeMillis();
                         startTs = endTs - cmd.getTimeWindow();
-                        for (String key : keys) {
-                            TsKvQuery query = new BaseTsKvQuery(key, startTs, endTs);
-                            data.addAll(ctx.loadTimeseries(deviceId, query));
-                        }
-                        sendWsMsg(ctx, sessionRef, new SubscriptionUpdate(cmd.getCmdId(), data));
 
-                        Map<String, Long> subState = new HashMap<>(keys.size());
-                        keys.forEach(key -> subState.put(key, startTs));
-                        data.forEach(v -> subState.put(v.getKey(), v.getTs()));
-                        SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), deviceId, SubscriptionType.TIMESERIES, false, subState);
-                        subscriptionManager.addLocalWsSubscription(ctx, sessionId, deviceId, sub);
+                        List<TsKvQuery> queries = keys.stream().map(key -> new BaseTsKvQuery(key, startTs, endTs, cmd.getLimit(), Aggregation.valueOf(cmd.getAgg()))).collect(Collectors.toList());
+                        ctx.loadTimeseries(deviceId, queries, getSubscriptionCallback(sessionRef, cmd, sessionId, deviceId, startTs, keys));
                     } else {
                         List<String> keys = new ArrayList<>(getKeys(cmd).orElse(Collections.emptySet()));
                         startTs = System.currentTimeMillis();
                         log.debug("[{}] fetching latest timeseries data for keys: ({}) for device : {}", sessionId, cmd.getKeys(), cmd.getDeviceId());
-                        ctx.loadLatestTimeseries(deviceId, keys, new PluginCallback<List<TsKvEntry>>() {
-                            @Override
-                            public void onSuccess(PluginContext ctx, List<TsKvEntry> data) {
-                                sendWsMsg(ctx, sessionRef, new SubscriptionUpdate(cmd.getCmdId(), data));
-
-                                Map<String, Long> subState = new HashMap<>(keys.size());
-                                keys.forEach(key -> subState.put(key, startTs));
-                                data.forEach(v -> subState.put(v.getKey(), v.getTs()));
-                                SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), deviceId, SubscriptionType.TIMESERIES, false, subState);
-                                subscriptionManager.addLocalWsSubscription(ctx, sessionId, deviceId, sub);
-                            }
-
-                            @Override
-                            public void onFailure(PluginContext ctx, Exception e) {
-                                SubscriptionUpdate update = new SubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR,
-                                        "Failed to fetch data!");
-                                sendWsMsg(ctx, sessionRef, update);
-                            }
-                        });
+                        ctx.loadLatestTimeseries(deviceId, keys, getSubscriptionCallback(sessionRef, cmd, sessionId, deviceId, startTs, keys));
                     }
                 } else {
                     ctx.loadLatestTimeseries(deviceId, new PluginCallback<List<TsKvEntry>>() {
@@ -216,6 +190,28 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler {
         }
     }
 
+    private PluginCallback<List<TsKvEntry>> getSubscriptionCallback(final PluginWebsocketSessionRef sessionRef, final TimeseriesSubscriptionCmd cmd, final String sessionId, final DeviceId deviceId, final long startTs, final List<String> keys) {
+        return new PluginCallback<List<TsKvEntry>>() {
+            @Override
+            public void onSuccess(PluginContext ctx, List<TsKvEntry> data) {
+                sendWsMsg(ctx, sessionRef, new SubscriptionUpdate(cmd.getCmdId(), data));
+
+                Map<String, Long> subState = new HashMap<>(keys.size());
+                keys.forEach(key -> subState.put(key, startTs));
+                data.forEach(v -> subState.put(v.getKey(), v.getTs()));
+                SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), deviceId, SubscriptionType.TIMESERIES, false, subState);
+                subscriptionManager.addLocalWsSubscription(ctx, sessionId, deviceId, sub);
+            }
+
+            @Override
+            public void onFailure(PluginContext ctx, Exception e) {
+                SubscriptionUpdate update = new SubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR,
+                        "Failed to fetch data!");
+                sendWsMsg(ctx, sessionRef, update);
+            }
+        };
+    }
+
     private void handleWsHistoryCmd(PluginContext ctx, PluginWebsocketSessionRef sessionRef, GetHistoryCmd cmd) {
         String sessionId = sessionRef.getSessionId();
         WsSessionMetaData sessionMD = wsSessionsMap.get(sessionId);
@@ -246,12 +242,19 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler {
             return;
         }
         List<String> keys = new ArrayList<>(getKeys(cmd).orElse(Collections.emptySet()));
-        List<TsKvEntry> data = new ArrayList<>();
-        for (String key : keys) {
-            TsKvQuery query = new BaseTsKvQuery(key, cmd.getStartTs(), cmd.getEndTs());
-            data.addAll(ctx.loadTimeseries(deviceId, query));
-        }
-        sendWsMsg(ctx, sessionRef, new SubscriptionUpdate(cmd.getCmdId(), data));
+        List<TsKvQuery> queries = keys.stream().map(key -> new BaseTsKvQuery(key, cmd.getStartTs(), cmd.getEndTs(), cmd.getLimit(), Aggregation.valueOf(cmd.getAgg()))).collect(Collectors.toList());
+        ctx.loadTimeseries(deviceId, queries, new PluginCallback<List<TsKvEntry>>() {
+            @Override
+            public void onSuccess(PluginContext ctx, List<TsKvEntry> data) {
+                sendWsMsg(ctx, sessionRef, new SubscriptionUpdate(cmd.getCmdId(), data));
+            }
+
+            @Override
+            public void onFailure(PluginContext ctx, Exception e) {
+                sendWsMsg(ctx, sessionRef, new SubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR,
+                        "Failed to fetch data!"));
+            }
+        });
     }
 
     private boolean validateSessionMetadata(PluginContext ctx, PluginWebsocketSessionRef sessionRef, SubscriptionCmd cmd, String sessionId) {