thingsboard-memoizeit
Changes
application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java 29(+23 -6)
application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java 5(+2 -3)
extensions-api/src/main/java/org/thingsboard/server/extensions/api/device/DeviceAttributes.java 26(+26 -0)
extensions-api/src/main/java/org/thingsboard/server/extensions/api/device/DeviceAttributesEventNotificationMsg.java 32(+19 -13)
extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRestMsgHandler.java 2(+1 -1)
Details
diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
index f9c4e6f..da2c620 100644
--- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
@@ -58,6 +58,7 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import java.util.function.Predicate;
@@ -85,14 +86,24 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
this.attributeSubscriptions = new HashMap<>();
this.rpcSubscriptions = new HashMap<>();
this.rpcPendingMap = new HashMap<>();
- refreshAttributes();
+ initAttributes();
}
- private void refreshAttributes() {
+ private void initAttributes() {
this.deviceAttributes = new DeviceAttributes(fetchAttributes(DataConstants.CLIENT_SCOPE),
fetchAttributes(DataConstants.SERVER_SCOPE), fetchAttributes(DataConstants.SHARED_SCOPE));
}
+ private void refreshAttributes(DeviceAttributesEventNotificationMsg msg) {
+ if (this.deviceAttributes != null) {
+ if (msg.isDeleted()) {
+ msg.getDeletedKeys().forEach(key -> deviceAttributes.remove(key));
+ } else {
+ deviceAttributes.update(msg.getScope(), msg.getValues());
+ }
+ }
+ }
+
void processRpcRequest(ActorContext context, ToDeviceRpcRequestPluginMsg msg) {
ToDeviceRpcRequest request = msg.getMsg();
ToDeviceRpcRequestBody body = request.getBody();
@@ -196,8 +207,8 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
void processAttributesUpdate(ActorContext context, DeviceAttributesEventNotificationMsg msg) {
//TODO: improve this procedure to fetch only changed attributes and support attributes deletion
- refreshAttributes();
- Set<AttributeKey> keys = msg.getKeys();
+ refreshAttributes(msg);
+ Set<AttributeKey> keys = msg.getDeletedKeys();
if (attributeSubscriptions.size() > 0) {
ToDeviceMsg notification = null;
if (msg.isDeleted()) {
@@ -359,8 +370,14 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
}
}
- private List<AttributeKvEntry> fetchAttributes(String attributeType) {
- return systemContext.getAttributesService().findAll(this.deviceId, attributeType);
+ private List<AttributeKvEntry> fetchAttributes(String scope) {
+ try {
+ //TODO: replace this with async operation. Happens only during actor creation, but is still criticla for performance,
+ return systemContext.getAttributesService().findAll(this.deviceId, scope).get();
+ } catch (InterruptedException | ExecutionException e) {
+ logger.warning("[{}] Failed to fetch attributes for scope: {}", deviceId, scope);
+ throw new RuntimeException(e);
+ }
}
public void processCredentialsUpdate(ActorContext context, DeviceCredentialsUpdateNotificationMsg msg) {
diff --git a/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java b/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java
index b102226..bea51db 100644
--- a/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java
+++ b/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java
@@ -302,9 +302,8 @@ public final class PluginProcessingContext implements PluginContext {
@Override
public void getDevice(DeviceId deviceId, PluginCallback<Device> callback) {
- //TODO: add caching here with async api.
- Device device = pluginCtx.deviceService.findDeviceById(deviceId);
- pluginCtx.self().tell(PluginCallbackMessage.onSuccess(callback, device), ActorRef.noSender());
+ ListenableFuture<Device> deviceFuture = pluginCtx.deviceService.findDeviceByIdAsync(deviceId);
+ Futures.addCallback(deviceFuture, getCallback(callback, v -> v));
}
@Override
diff --git a/application/src/test/java/org/thingsboard/server/actors/DefaultActorServiceTest.java b/application/src/test/java/org/thingsboard/server/actors/DefaultActorServiceTest.java
index f6cb4e3..2940a62 100644
--- a/application/src/test/java/org/thingsboard/server/actors/DefaultActorServiceTest.java
+++ b/application/src/test/java/org/thingsboard/server/actors/DefaultActorServiceTest.java
@@ -22,6 +22,7 @@ import static org.mockito.Mockito.when;
import java.util.*;
+import com.google.common.util.concurrent.Futures;
import org.thingsboard.server.actors.service.DefaultActorService;
import org.thingsboard.server.common.data.id.*;
import org.thingsboard.server.common.data.kv.TsKvEntry;
@@ -226,7 +227,9 @@ public class DefaultActorServiceTest {
when(pluginMock.getConfiguration()).thenReturn(pluginAdditionalInfo);
when(pluginMock.getClazz()).thenReturn(TelemetryStoragePlugin.class.getName());
- when(attributesService.findAll(deviceId, DataConstants.CLIENT_SCOPE)).thenReturn(Collections.emptyList());
+ when(attributesService.findAll(deviceId, DataConstants.CLIENT_SCOPE)).thenReturn(Futures.immediateFuture(Collections.emptyList()));
+ when(attributesService.findAll(deviceId, DataConstants.SHARED_SCOPE)).thenReturn(Futures.immediateFuture(Collections.emptyList()));
+ when(attributesService.findAll(deviceId, DataConstants.SERVER_SCOPE)).thenReturn(Futures.immediateFuture(Collections.emptyList()));
initActorSystem();
Thread.sleep(1000);
diff --git a/dao/src/main/java/org/thingsboard/server/dao/attributes/BaseAttributesDao.java b/dao/src/main/java/org/thingsboard/server/dao/attributes/BaseAttributesDao.java
index 262d15d..fd50f4d 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/attributes/BaseAttributesDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/attributes/BaseAttributesDao.java
@@ -68,7 +68,7 @@ public class BaseAttributesDao extends AbstractAsyncDao implements AttributesDao
.and(eq(ATTRIBUTE_KEY_COLUMN, attributeKey));
log.trace("Generated query [{}] for entityId {} and key {}", select, entityId, attributeKey);
return Futures.transform(executeAsyncRead(select), (Function<? super ResultSet, ? extends Optional<AttributeKvEntry>>) input ->
- Optional.of(convertResultToAttributesKvEntry(attributeKey, input.one()))
+ Optional.ofNullable(convertResultToAttributesKvEntry(attributeKey, input.one()))
, readResultsProcessingExecutor);
}
diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesDao.java
index 42fede4..19a260f 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesDao.java
@@ -133,8 +133,8 @@ public class BaseTimeseriesDao extends AbstractAsyncDao implements TimeseriesDao
}
private ListenableFuture<List<TsKvEntry>> findAllAsyncWithLimit(String entityType, UUID entityId, TsKvQuery query) {
- long minPartition = query.getStartTs();
- long maxPartition = query.getEndTs();
+ long minPartition = toPartitionTs(query.getStartTs());
+ long maxPartition = toPartitionTs(query.getEndTs());
ResultSetFuture partitionsFuture = fetchPartitions(entityType, entityId, query.getKey(), minPartition, maxPartition);
diff --git a/extensions-api/src/main/java/org/thingsboard/server/extensions/api/device/DeviceAttributes.java b/extensions-api/src/main/java/org/thingsboard/server/extensions/api/device/DeviceAttributes.java
index 8a43e32..8628d0c 100644
--- a/extensions-api/src/main/java/org/thingsboard/server/extensions/api/device/DeviceAttributes.java
+++ b/extensions-api/src/main/java/org/thingsboard/server/extensions/api/device/DeviceAttributes.java
@@ -15,6 +15,8 @@
*/
package org.thingsboard.server.extensions.api.device;
+import org.thingsboard.server.common.data.DataConstants;
+import org.thingsboard.server.common.data.kv.AttributeKey;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import java.util.*;
@@ -65,4 +67,28 @@ public class DeviceAttributes {
public Optional<AttributeKvEntry> getServerPublicAttribute(String attribute) {
return Optional.ofNullable(serverPublicAttributesMap.get(attribute));
}
+
+ public void remove(AttributeKey key) {
+ Map<String, AttributeKvEntry> map = getMapByScope(key.getScope());
+ if (map != null) {
+ map.remove(key);
+ }
+ }
+
+ public void update(String scope, List<AttributeKvEntry> values) {
+ Map<String, AttributeKvEntry> map = getMapByScope(scope);
+ values.forEach(v -> map.put(v.getKey(), v));
+ }
+
+ private Map<String, AttributeKvEntry> getMapByScope(String scope) {
+ Map<String, AttributeKvEntry> map = null;
+ if (scope.equalsIgnoreCase(DataConstants.CLIENT_SCOPE)) {
+ map = clientSideAttributesMap;
+ } else if (scope.equalsIgnoreCase(DataConstants.SHARED_SCOPE)) {
+ map = serverPublicAttributesMap;
+ } else if (scope.equalsIgnoreCase(DataConstants.SERVER_SCOPE)) {
+ map = serverPrivateAttributesMap;
+ }
+ return map;
+ }
}
diff --git a/extensions-api/src/main/java/org/thingsboard/server/extensions/api/device/DeviceAttributesEventNotificationMsg.java b/extensions-api/src/main/java/org/thingsboard/server/extensions/api/device/DeviceAttributesEventNotificationMsg.java
index 4ff72ee..f25f4f8 100644
--- a/extensions-api/src/main/java/org/thingsboard/server/extensions/api/device/DeviceAttributesEventNotificationMsg.java
+++ b/extensions-api/src/main/java/org/thingsboard/server/extensions/api/device/DeviceAttributesEventNotificationMsg.java
@@ -15,37 +15,43 @@
*/
package org.thingsboard.server.extensions.api.device;
+import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.ToString;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.AttributeKey;
+import org.thingsboard.server.common.data.kv.AttributeKvEntry;
+import java.util.List;
import java.util.Set;
/**
* @author Andrew Shvayka
*/
@ToString
+@AllArgsConstructor
public class DeviceAttributesEventNotificationMsg implements ToDeviceActorNotificationMsg {
- @Getter private final TenantId tenantId;
- @Getter private final DeviceId deviceId;
- @Getter private final Set<AttributeKey> keys;
- @Getter private final boolean deleted;
+ @Getter
+ private final TenantId tenantId;
+ @Getter
+ private final DeviceId deviceId;
+ @Getter
+ private final Set<AttributeKey> deletedKeys;
+ @Getter
+ private final String scope;
+ @Getter
+ private final List<AttributeKvEntry> values;
+ @Getter
+ private final boolean deleted;
- public static DeviceAttributesEventNotificationMsg onUpdate(TenantId tenantId, DeviceId deviceId, Set<AttributeKey> keys) {
- return new DeviceAttributesEventNotificationMsg(tenantId, deviceId, keys, false);
+ public static DeviceAttributesEventNotificationMsg onUpdate(TenantId tenantId, DeviceId deviceId, String scope, List<AttributeKvEntry> values) {
+ return new DeviceAttributesEventNotificationMsg(tenantId, deviceId, null, scope, values, false);
}
public static DeviceAttributesEventNotificationMsg onDelete(TenantId tenantId, DeviceId deviceId, Set<AttributeKey> keys) {
- return new DeviceAttributesEventNotificationMsg(tenantId, deviceId, keys, true);
+ return new DeviceAttributesEventNotificationMsg(tenantId, deviceId, keys, null, null, true);
}
- private DeviceAttributesEventNotificationMsg(TenantId tenantId, DeviceId deviceId, Set<AttributeKey> keys, boolean deleted) {
- this.tenantId = tenantId;
- this.deviceId = deviceId;
- this.keys = keys;
- this.deleted = deleted;
- }
}
diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRestMsgHandler.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRestMsgHandler.java
index 2bd17aa..78fa4ad 100644
--- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRestMsgHandler.java
+++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRestMsgHandler.java
@@ -93,7 +93,7 @@ public class TelemetryRestMsgHandler extends DefaultRestMsgHandler {
Aggregation agg = Aggregation.valueOf(request.getParameter("agg", Aggregation.NONE.name()));
List<String> keys = Arrays.asList(keysStr.split(","));
- List<TsKvQuery> queries = keys.stream().map(key -> new BaseTsKvQuery(key, startTs.get(), endTs.get(), limit.get(), agg)).collect(Collectors.toList());
+ List<TsKvQuery> queries = keys.stream().map(key -> new BaseTsKvQuery(key, startTs.get(), endTs.get(), limit.orElse(TelemetryWebsocketMsgHandler.DEFAULT_LIMIT), agg)).collect(Collectors.toList());
ctx.loadTimeseries(deviceId, queries, new PluginCallback<List<TsKvEntry>>() {
@Override
public void onSuccess(PluginContext ctx, List<TsKvEntry> data) {
diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryWebsocketMsgHandler.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryWebsocketMsgHandler.java
index fbfacd3..4bc7ae0 100644
--- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryWebsocketMsgHandler.java
+++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryWebsocketMsgHandler.java
@@ -48,6 +48,8 @@ import java.util.stream.Collectors;
public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler {
private static final int UNKNOWN_SUBSCRIPTION_ID = 0;
+ public static final int DEFAULT_LIMIT = 100;
+ public static final Aggregation DEFAULT_AGGREGATION = Aggregation.NONE;
private final SubscriptionManager subscriptionManager;
@@ -187,13 +189,11 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler {
if (keysOptional.isPresent()) {
long startTs;
if (cmd.getTimeWindow() > 0) {
- List<TsKvEntry> data = new ArrayList<>();
List<String> keys = new ArrayList<>(getKeys(cmd).orElse(Collections.emptySet()));
log.debug("[{}] fetching timeseries data for last {} ms for keys: ({}) for device : {}", sessionId, cmd.getTimeWindow(), cmd.getKeys(), cmd.getDeviceId());
long endTs = System.currentTimeMillis();
startTs = endTs - cmd.getTimeWindow();
-
- List<TsKvQuery> queries = keys.stream().map(key -> new BaseTsKvQuery(key, startTs, endTs, cmd.getLimit(), Aggregation.valueOf(cmd.getAgg()))).collect(Collectors.toList());
+ List<TsKvQuery> queries = keys.stream().map(key -> new BaseTsKvQuery(key, startTs, endTs, getLimit(cmd.getLimit()), getAggregation(cmd.getAgg()))).collect(Collectors.toList());
ctx.loadTimeseries(deviceId, queries, getSubscriptionCallback(sessionRef, cmd, sessionId, deviceId, startTs, keys));
} else {
List<String> keys = new ArrayList<>(getKeys(cmd).orElse(Collections.emptySet()));
@@ -277,7 +277,7 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler {
}
DeviceId deviceId = DeviceId.fromString(cmd.getDeviceId());
List<String> keys = new ArrayList<>(getKeys(cmd).orElse(Collections.emptySet()));
- List<TsKvQuery> queries = keys.stream().map(key -> new BaseTsKvQuery(key, cmd.getStartTs(), cmd.getEndTs(), cmd.getLimit(), Aggregation.valueOf(cmd.getAgg()))).collect(Collectors.toList());
+ List<TsKvQuery> queries = keys.stream().map(key -> new BaseTsKvQuery(key, cmd.getStartTs(), cmd.getEndTs(), getLimit(cmd.getLimit()), getAggregation(cmd.getAgg()))).collect(Collectors.toList());
ctx.loadTimeseries(deviceId, queries, new PluginCallback<List<TsKvEntry>>() {
@Override
public void onSuccess(PluginContext ctx, List<TsKvEntry> data) {
@@ -299,6 +299,14 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler {
});
}
+ private static Aggregation getAggregation(String agg) {
+ return StringUtils.isEmpty(agg) ? DEFAULT_AGGREGATION : Aggregation.valueOf(agg);
+ }
+
+ private int getLimit(int limit) {
+ return limit == 0 ? DEFAULT_LIMIT : limit;
+ }
+
private boolean validateSessionMetadata(PluginContext ctx, PluginWebsocketSessionRef sessionRef, SubscriptionCmd cmd, String sessionId) {
WsSessionMetaData sessionMD = wsSessionsMap.get(sessionId);
if (sessionMD == null) {
diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java
index 2badd3a..854ad8f 100644
--- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java
+++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java
@@ -91,7 +91,9 @@ public class GatewaySessionCtx {
public void onDeviceDisconnect(MqttPublishMessage msg) throws AdaptorException {
String deviceName = checkDeviceName(getDeviceName(msg));
GatewayDeviceSessionCtx deviceSessionCtx = devices.remove(deviceName);
- deviceSessionCtx.setClosed(true);
+ if (deviceSessionCtx != null) {
+ deviceSessionCtx.setClosed(true);
+ }
ack(msg);
}