thingsboard-aplcache
Changes
application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java 15(+2 -13)
extensions-api/src/main/java/org/thingsboard/server/extensions/api/plugins/PluginContext.java 4(+1 -3)
extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/cmd/GetHistoryCmd.java 50(+9 -41)
extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/cmd/SubscriptionCmd.java 42(+2 -40)
Details
diff --git a/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java b/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java
index fbd0cce..9474a62 100644
--- a/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java
+++ b/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java
@@ -151,20 +151,9 @@ public final class PluginProcessingContext implements PluginContext {
}
@Override
- public List<TsKvEntry> loadTimeseries(DeviceId deviceId, TsKvQuery query) {
+ public void loadTimeseries(DeviceId deviceId, List<TsKvQuery> queries, PluginCallback<List<TsKvEntry>> callback) {
validate(deviceId);
- try {
- return pluginCtx.tsService.findAll(DataConstants.DEVICE, deviceId, query).get();
- } catch (Exception e) {
- log.error("TODO", e);
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public void loadTimeseries(DeviceId deviceId, TsKvQuery query, PluginCallback<List<TsKvEntry>> callback) {
- validate(deviceId);
- ListenableFuture<List<TsKvEntry>> future = pluginCtx.tsService.findAll(DataConstants.DEVICE, deviceId, query);
+ ListenableFuture<List<TsKvEntry>> future = pluginCtx.tsService.findAll(DataConstants.DEVICE, deviceId, queries);
Futures.addCallback(future, getCallback(callback, v -> v), executor);
}
diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesDao.java
index 5cf71fc..81bbdf9 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesDao.java
@@ -96,7 +96,21 @@ public class BaseTimeseriesDao extends AbstractDao implements TimeseriesDao {
}
@Override
- public ListenableFuture<List<TsKvEntry>> findAllAsync(String entityType, UUID entityId, TsKvQuery query) {
+ public ListenableFuture<List<TsKvEntry>> findAllAsync(String entityType, UUID entityId, List<TsKvQuery> queries) {
+ List<ListenableFuture<List<TsKvEntry>>> futures = queries.stream().map(query -> findAllAsync(entityType, entityId, query)).collect(Collectors.toList());
+ return Futures.transform(Futures.allAsList(futures), new Function<List<List<TsKvEntry>>, List<TsKvEntry>>() {
+ @Nullable
+ @Override
+ public List<TsKvEntry> apply(@Nullable List<List<TsKvEntry>> results) {
+ List<TsKvEntry> result = new ArrayList<TsKvEntry>();
+ results.forEach(r -> result.addAll(r));
+ return result;
+ }
+ }, readResultsProcessingExecutor);
+ }
+
+
+ private ListenableFuture<List<TsKvEntry>> findAllAsync(String entityType, UUID entityId, TsKvQuery query) {
if (query.getAggregation() == Aggregation.NONE) {
return findAllAsyncWithLimit(entityType, entityId, query);
} else {
diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java
index 1d8c3df..f27ed6e 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java
@@ -18,6 +18,7 @@ package org.thingsboard.server.dao.timeseries;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Row;
+import com.google.common.base.Function;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
@@ -32,6 +33,7 @@ import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.thingsboard.server.dao.service.Validator;
+import javax.annotation.Nullable;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.time.Instant;
@@ -40,6 +42,7 @@ import java.time.ZoneOffset;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.stream.Collectors;
import static org.apache.commons.lang3.StringUtils.isBlank;
@@ -56,10 +59,10 @@ public class BaseTimeseriesService implements TimeseriesService {
private TimeseriesDao timeseriesDao;
@Override
- public ListenableFuture<List<TsKvEntry>> findAll(String entityType, UUIDBased entityId, TsKvQuery query) {
+ public ListenableFuture<List<TsKvEntry>> findAll(String entityType, UUIDBased entityId, List<TsKvQuery> queries) {
validate(entityType, entityId);
- validate(query);
- return timeseriesDao.findAllAsync(entityType, entityId.getId(), query);
+ queries.forEach(query -> validate(query));
+ return timeseriesDao.findAllAsync(entityType, entityId.getId(), queries);
}
@Override
@@ -132,7 +135,7 @@ public class BaseTimeseriesService implements TimeseriesService {
throw new IncorrectParameterException("TsKvQuery can't be null");
} else if (isBlank(query.getKey())) {
throw new IncorrectParameterException("Incorrect TsKvQuery. Key can't be empty");
- } else if (query.getAggregation() == null){
+ } else if (query.getAggregation() == null) {
throw new IncorrectParameterException("Incorrect TsKvQuery. Aggregation can't be empty");
}
}
diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesDao.java
index 7a6eed7..177003d 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesDao.java
@@ -33,9 +33,7 @@ public interface TimeseriesDao {
long toPartitionTs(long ts);
- ListenableFuture<List<TsKvEntry>> findAllAsync(String entityType, UUID entityId, TsKvQuery query);
-
-// List<TsKvEntry> find(String entityType, UUID entityId, TsKvQuery query, Optional<Long> minPartition, Optional<Long> maxPartition);
+ ListenableFuture<List<TsKvEntry>> findAllAsync(String entityType, UUID entityId, List<TsKvQuery> queries);
ResultSetFuture findLatest(String entityType, UUID entityId, String key);
diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java
index 1bafdea..cd53e94 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java
@@ -33,7 +33,7 @@ import java.util.Set;
*/
public interface TimeseriesService {
- ListenableFuture<List<TsKvEntry>> findAll(String entityType, UUIDBased entityId, TsKvQuery query);
+ ListenableFuture<List<TsKvEntry>> findAll(String entityType, UUIDBased entityId, List<TsKvQuery> queries);
ListenableFuture<List<ResultSet>> findLatest(String entityType, UUIDBased entityId, Collection<String> keys);
diff --git a/extensions-api/src/main/java/org/thingsboard/server/extensions/api/plugins/PluginContext.java b/extensions-api/src/main/java/org/thingsboard/server/extensions/api/plugins/PluginContext.java
index b5f27a1..c2c5587 100644
--- a/extensions-api/src/main/java/org/thingsboard/server/extensions/api/plugins/PluginContext.java
+++ b/extensions-api/src/main/java/org/thingsboard/server/extensions/api/plugins/PluginContext.java
@@ -82,9 +82,7 @@ public interface PluginContext {
void saveTsData(DeviceId deviceId, List<TsKvEntry> entry, PluginCallback<Void> callback);
- List<TsKvEntry> loadTimeseries(DeviceId deviceId, TsKvQuery query);
-
- void loadTimeseries(DeviceId deviceId, TsKvQuery query, PluginCallback<List<TsKvEntry>> callback);
+ void loadTimeseries(DeviceId deviceId, List<TsKvQuery> queries, PluginCallback<List<TsKvEntry>> callback);
void loadLatestTimeseries(DeviceId deviceId, Collection<String> keys, PluginCallback<List<TsKvEntry>> callback);
diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/cmd/GetHistoryCmd.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/cmd/GetHistoryCmd.java
index 14d7aa1..9f06895 100644
--- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/cmd/GetHistoryCmd.java
+++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/cmd/GetHistoryCmd.java
@@ -15,9 +15,16 @@
*/
package org.thingsboard.server.extensions.core.plugin.telemetry.cmd;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
/**
* @author Andrew Shvayka
*/
+@NoArgsConstructor
+@AllArgsConstructor
+@Data
public class GetHistoryCmd implements TelemetryPluginCmd {
private int cmdId;
@@ -25,46 +32,7 @@ public class GetHistoryCmd implements TelemetryPluginCmd {
private String keys;
private long startTs;
private long endTs;
+ private int limit;
+ private String agg;
- @Override
- public int getCmdId() {
- return cmdId;
- }
-
- @Override
- public void setCmdId(int cmdId) {
- this.cmdId = cmdId;
- }
-
- public String getDeviceId() {
- return deviceId;
- }
-
- public void setDeviceId(String deviceId) {
- this.deviceId = deviceId;
- }
-
- public String getKeys() {
- return keys;
- }
-
- public void setKeys(String keys) {
- this.keys = keys;
- }
-
- public long getStartTs() {
- return startTs;
- }
-
- public void setStartTs(long startTs) {
- this.startTs = startTs;
- }
-
- public long getEndTs() {
- return endTs;
- }
-
- public void setEndTs(long endTs) {
- this.endTs = endTs;
- }
}
diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/cmd/SubscriptionCmd.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/cmd/SubscriptionCmd.java
index 718f23a..3574eae 100644
--- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/cmd/SubscriptionCmd.java
+++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/cmd/SubscriptionCmd.java
@@ -16,11 +16,13 @@
package org.thingsboard.server.extensions.core.plugin.telemetry.cmd;
import lombok.AllArgsConstructor;
+import lombok.Data;
import lombok.NoArgsConstructor;
import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionType;
@NoArgsConstructor
@AllArgsConstructor
+@Data
public abstract class SubscriptionCmd implements TelemetryPluginCmd {
private int cmdId;
@@ -31,46 +33,6 @@ public abstract class SubscriptionCmd implements TelemetryPluginCmd {
public abstract SubscriptionType getType();
- public int getCmdId() {
- return cmdId;
- }
-
- public void setCmdId(int cmdId) {
- this.cmdId = cmdId;
- }
-
- public String getDeviceId() {
- return deviceId;
- }
-
- public void setDeviceId(String deviceId) {
- this.deviceId = deviceId;
- }
-
- public String getKeys() {
- return keys;
- }
-
- public void setTags(String tags) {
- this.keys = tags;
- }
-
- public boolean isUnsubscribe() {
- return unsubscribe;
- }
-
- public void setUnsubscribe(boolean unsubscribe) {
- this.unsubscribe = unsubscribe;
- }
-
- public String getScope() {
- return scope;
- }
-
- public void setKeys(String keys) {
- this.keys = keys;
- }
-
@Override
public String toString() {
return "SubscriptionCmd [deviceId=" + deviceId + ", tags=" + keys + ", unsubscribe=" + unsubscribe + "]";
diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/cmd/TimeseriesSubscriptionCmd.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/cmd/TimeseriesSubscriptionCmd.java
index 4f24a00..f4eacf5 100644
--- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/cmd/TimeseriesSubscriptionCmd.java
+++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/cmd/TimeseriesSubscriptionCmd.java
@@ -15,6 +15,8 @@
*/
package org.thingsboard.server.extensions.core.plugin.telemetry.cmd;
+import lombok.AllArgsConstructor;
+import lombok.Data;
import lombok.NoArgsConstructor;
import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionType;
@@ -22,17 +24,13 @@ import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionT
* @author Andrew Shvayka
*/
@NoArgsConstructor
+@AllArgsConstructor
+@Data
public class TimeseriesSubscriptionCmd extends SubscriptionCmd {
private long timeWindow;
-
- public long getTimeWindow() {
- return timeWindow;
- }
-
- public void setTimeWindow(long timeWindow) {
- this.timeWindow = timeWindow;
- }
+ private int limit;
+ private String agg;
@Override
public SubscriptionType getType() {
diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryWebsocketMsgHandler.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryWebsocketMsgHandler.java
index 5d6e68e..739bedf 100644
--- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryWebsocketMsgHandler.java
+++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryWebsocketMsgHandler.java
@@ -158,40 +158,14 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler {
log.debug("[{}] fetching timeseries data for last {} ms for keys: ({}) for device : {}", sessionId, cmd.getTimeWindow(), cmd.getKeys(), cmd.getDeviceId());
long endTs = System.currentTimeMillis();
startTs = endTs - cmd.getTimeWindow();
- for (String key : keys) {
- TsKvQuery query = new BaseTsKvQuery(key, startTs, endTs);
- data.addAll(ctx.loadTimeseries(deviceId, query));
- }
- sendWsMsg(ctx, sessionRef, new SubscriptionUpdate(cmd.getCmdId(), data));
- Map<String, Long> subState = new HashMap<>(keys.size());
- keys.forEach(key -> subState.put(key, startTs));
- data.forEach(v -> subState.put(v.getKey(), v.getTs()));
- SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), deviceId, SubscriptionType.TIMESERIES, false, subState);
- subscriptionManager.addLocalWsSubscription(ctx, sessionId, deviceId, sub);
+ List<TsKvQuery> queries = keys.stream().map(key -> new BaseTsKvQuery(key, startTs, endTs, cmd.getLimit(), Aggregation.valueOf(cmd.getAgg()))).collect(Collectors.toList());
+ ctx.loadTimeseries(deviceId, queries, getSubscriptionCallback(sessionRef, cmd, sessionId, deviceId, startTs, keys));
} else {
List<String> keys = new ArrayList<>(getKeys(cmd).orElse(Collections.emptySet()));
startTs = System.currentTimeMillis();
log.debug("[{}] fetching latest timeseries data for keys: ({}) for device : {}", sessionId, cmd.getKeys(), cmd.getDeviceId());
- ctx.loadLatestTimeseries(deviceId, keys, new PluginCallback<List<TsKvEntry>>() {
- @Override
- public void onSuccess(PluginContext ctx, List<TsKvEntry> data) {
- sendWsMsg(ctx, sessionRef, new SubscriptionUpdate(cmd.getCmdId(), data));
-
- Map<String, Long> subState = new HashMap<>(keys.size());
- keys.forEach(key -> subState.put(key, startTs));
- data.forEach(v -> subState.put(v.getKey(), v.getTs()));
- SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), deviceId, SubscriptionType.TIMESERIES, false, subState);
- subscriptionManager.addLocalWsSubscription(ctx, sessionId, deviceId, sub);
- }
-
- @Override
- public void onFailure(PluginContext ctx, Exception e) {
- SubscriptionUpdate update = new SubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR,
- "Failed to fetch data!");
- sendWsMsg(ctx, sessionRef, update);
- }
- });
+ ctx.loadLatestTimeseries(deviceId, keys, getSubscriptionCallback(sessionRef, cmd, sessionId, deviceId, startTs, keys));
}
} else {
ctx.loadLatestTimeseries(deviceId, new PluginCallback<List<TsKvEntry>>() {
@@ -216,6 +190,28 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler {
}
}
+ private PluginCallback<List<TsKvEntry>> getSubscriptionCallback(final PluginWebsocketSessionRef sessionRef, final TimeseriesSubscriptionCmd cmd, final String sessionId, final DeviceId deviceId, final long startTs, final List<String> keys) {
+ return new PluginCallback<List<TsKvEntry>>() {
+ @Override
+ public void onSuccess(PluginContext ctx, List<TsKvEntry> data) {
+ sendWsMsg(ctx, sessionRef, new SubscriptionUpdate(cmd.getCmdId(), data));
+
+ Map<String, Long> subState = new HashMap<>(keys.size());
+ keys.forEach(key -> subState.put(key, startTs));
+ data.forEach(v -> subState.put(v.getKey(), v.getTs()));
+ SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), deviceId, SubscriptionType.TIMESERIES, false, subState);
+ subscriptionManager.addLocalWsSubscription(ctx, sessionId, deviceId, sub);
+ }
+
+ @Override
+ public void onFailure(PluginContext ctx, Exception e) {
+ SubscriptionUpdate update = new SubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR,
+ "Failed to fetch data!");
+ sendWsMsg(ctx, sessionRef, update);
+ }
+ };
+ }
+
private void handleWsHistoryCmd(PluginContext ctx, PluginWebsocketSessionRef sessionRef, GetHistoryCmd cmd) {
String sessionId = sessionRef.getSessionId();
WsSessionMetaData sessionMD = wsSessionsMap.get(sessionId);
@@ -246,12 +242,19 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler {
return;
}
List<String> keys = new ArrayList<>(getKeys(cmd).orElse(Collections.emptySet()));
- List<TsKvEntry> data = new ArrayList<>();
- for (String key : keys) {
- TsKvQuery query = new BaseTsKvQuery(key, cmd.getStartTs(), cmd.getEndTs());
- data.addAll(ctx.loadTimeseries(deviceId, query));
- }
- sendWsMsg(ctx, sessionRef, new SubscriptionUpdate(cmd.getCmdId(), data));
+ List<TsKvQuery> queries = keys.stream().map(key -> new BaseTsKvQuery(key, cmd.getStartTs(), cmd.getEndTs(), cmd.getLimit(), Aggregation.valueOf(cmd.getAgg()))).collect(Collectors.toList());
+ ctx.loadTimeseries(deviceId, queries, new PluginCallback<List<TsKvEntry>>() {
+ @Override
+ public void onSuccess(PluginContext ctx, List<TsKvEntry> data) {
+ sendWsMsg(ctx, sessionRef, new SubscriptionUpdate(cmd.getCmdId(), data));
+ }
+
+ @Override
+ public void onFailure(PluginContext ctx, Exception e) {
+ sendWsMsg(ctx, sessionRef, new SubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR,
+ "Failed to fetch data!"));
+ }
+ });
}
private boolean validateSessionMetadata(PluginContext ctx, PluginWebsocketSessionRef sessionRef, SubscriptionCmd cmd, String sessionId) {