thingsboard-memoizeit

Implementation and test fixes

2/22/2017 8:53:56 AM

Details

diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
index f9c4e6f..da2c620 100644
--- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
@@ -58,6 +58,7 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeoutException;
 import java.util.function.Consumer;
 import java.util.function.Predicate;
@@ -85,14 +86,24 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
         this.attributeSubscriptions = new HashMap<>();
         this.rpcSubscriptions = new HashMap<>();
         this.rpcPendingMap = new HashMap<>();
-        refreshAttributes();
+        initAttributes();
     }
 
-    private void refreshAttributes() {
+    private void initAttributes() {
         this.deviceAttributes = new DeviceAttributes(fetchAttributes(DataConstants.CLIENT_SCOPE),
                 fetchAttributes(DataConstants.SERVER_SCOPE), fetchAttributes(DataConstants.SHARED_SCOPE));
     }
 
+    private void refreshAttributes(DeviceAttributesEventNotificationMsg msg) {
+        if (this.deviceAttributes != null) {
+            if (msg.isDeleted()) {
+                msg.getDeletedKeys().forEach(key -> deviceAttributes.remove(key));
+            } else {
+                deviceAttributes.update(msg.getScope(), msg.getValues());
+            }
+        }
+    }
+
     void processRpcRequest(ActorContext context, ToDeviceRpcRequestPluginMsg msg) {
         ToDeviceRpcRequest request = msg.getMsg();
         ToDeviceRpcRequestBody body = request.getBody();
@@ -196,8 +207,8 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
 
     void processAttributesUpdate(ActorContext context, DeviceAttributesEventNotificationMsg msg) {
         //TODO: improve this procedure to fetch only changed attributes and support attributes deletion
-        refreshAttributes();
-        Set<AttributeKey> keys = msg.getKeys();
+        refreshAttributes(msg);
+        Set<AttributeKey> keys = msg.getDeletedKeys();
         if (attributeSubscriptions.size() > 0) {
             ToDeviceMsg notification = null;
             if (msg.isDeleted()) {
@@ -359,8 +370,14 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
         }
     }
 
-    private List<AttributeKvEntry> fetchAttributes(String attributeType) {
-        return systemContext.getAttributesService().findAll(this.deviceId, attributeType);
+    private List<AttributeKvEntry> fetchAttributes(String scope) {
+        try {
+            //TODO: replace this with async operation. Happens only during actor creation, but is still criticla for performance,
+            return systemContext.getAttributesService().findAll(this.deviceId, scope).get();
+        } catch (InterruptedException | ExecutionException e) {
+            logger.warning("[{}] Failed to fetch attributes for scope: {}", deviceId, scope);
+            throw new RuntimeException(e);
+        }
     }
 
     public void processCredentialsUpdate(ActorContext context, DeviceCredentialsUpdateNotificationMsg msg) {
diff --git a/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java b/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java
index b102226..bea51db 100644
--- a/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java
+++ b/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java
@@ -302,9 +302,8 @@ public final class PluginProcessingContext implements PluginContext {
 
     @Override
     public void getDevice(DeviceId deviceId, PluginCallback<Device> callback) {
-        //TODO: add caching here with async api.
-        Device device = pluginCtx.deviceService.findDeviceById(deviceId);
-        pluginCtx.self().tell(PluginCallbackMessage.onSuccess(callback, device), ActorRef.noSender());
+        ListenableFuture<Device> deviceFuture = pluginCtx.deviceService.findDeviceByIdAsync(deviceId);
+        Futures.addCallback(deviceFuture, getCallback(callback, v -> v));
     }
 
     @Override
diff --git a/application/src/test/java/org/thingsboard/server/actors/DefaultActorServiceTest.java b/application/src/test/java/org/thingsboard/server/actors/DefaultActorServiceTest.java
index f6cb4e3..2940a62 100644
--- a/application/src/test/java/org/thingsboard/server/actors/DefaultActorServiceTest.java
+++ b/application/src/test/java/org/thingsboard/server/actors/DefaultActorServiceTest.java
@@ -22,6 +22,7 @@ import static org.mockito.Mockito.when;
 
 import java.util.*;
 
+import com.google.common.util.concurrent.Futures;
 import org.thingsboard.server.actors.service.DefaultActorService;
 import org.thingsboard.server.common.data.id.*;
 import org.thingsboard.server.common.data.kv.TsKvEntry;
@@ -226,7 +227,9 @@ public class DefaultActorServiceTest {
         when(pluginMock.getConfiguration()).thenReturn(pluginAdditionalInfo);
         when(pluginMock.getClazz()).thenReturn(TelemetryStoragePlugin.class.getName());
 
-        when(attributesService.findAll(deviceId, DataConstants.CLIENT_SCOPE)).thenReturn(Collections.emptyList());
+        when(attributesService.findAll(deviceId, DataConstants.CLIENT_SCOPE)).thenReturn(Futures.immediateFuture(Collections.emptyList()));
+        when(attributesService.findAll(deviceId, DataConstants.SHARED_SCOPE)).thenReturn(Futures.immediateFuture(Collections.emptyList()));
+        when(attributesService.findAll(deviceId, DataConstants.SERVER_SCOPE)).thenReturn(Futures.immediateFuture(Collections.emptyList()));
 
         initActorSystem();
         Thread.sleep(1000);
diff --git a/dao/src/main/java/org/thingsboard/server/dao/attributes/BaseAttributesDao.java b/dao/src/main/java/org/thingsboard/server/dao/attributes/BaseAttributesDao.java
index 262d15d..fd50f4d 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/attributes/BaseAttributesDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/attributes/BaseAttributesDao.java
@@ -68,7 +68,7 @@ public class BaseAttributesDao extends AbstractAsyncDao implements AttributesDao
                 .and(eq(ATTRIBUTE_KEY_COLUMN, attributeKey));
         log.trace("Generated query [{}] for entityId {} and key {}", select, entityId, attributeKey);
         return Futures.transform(executeAsyncRead(select), (Function<? super ResultSet, ? extends Optional<AttributeKvEntry>>) input ->
-                        Optional.of(convertResultToAttributesKvEntry(attributeKey, input.one()))
+                        Optional.ofNullable(convertResultToAttributesKvEntry(attributeKey, input.one()))
                 , readResultsProcessingExecutor);
     }
 
diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesDao.java
index 42fede4..19a260f 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesDao.java
@@ -133,8 +133,8 @@ public class BaseTimeseriesDao extends AbstractAsyncDao implements TimeseriesDao
     }
 
     private ListenableFuture<List<TsKvEntry>> findAllAsyncWithLimit(String entityType, UUID entityId, TsKvQuery query) {
-        long minPartition = query.getStartTs();
-        long maxPartition = query.getEndTs();
+        long minPartition = toPartitionTs(query.getStartTs());
+        long maxPartition = toPartitionTs(query.getEndTs());
 
         ResultSetFuture partitionsFuture = fetchPartitions(entityType, entityId, query.getKey(), minPartition, maxPartition);
 
diff --git a/extensions-api/src/main/java/org/thingsboard/server/extensions/api/device/DeviceAttributes.java b/extensions-api/src/main/java/org/thingsboard/server/extensions/api/device/DeviceAttributes.java
index 8a43e32..8628d0c 100644
--- a/extensions-api/src/main/java/org/thingsboard/server/extensions/api/device/DeviceAttributes.java
+++ b/extensions-api/src/main/java/org/thingsboard/server/extensions/api/device/DeviceAttributes.java
@@ -15,6 +15,8 @@
  */
 package org.thingsboard.server.extensions.api.device;
 
+import org.thingsboard.server.common.data.DataConstants;
+import org.thingsboard.server.common.data.kv.AttributeKey;
 import org.thingsboard.server.common.data.kv.AttributeKvEntry;
 
 import java.util.*;
@@ -65,4 +67,28 @@ public class DeviceAttributes {
     public Optional<AttributeKvEntry> getServerPublicAttribute(String attribute) {
         return Optional.ofNullable(serverPublicAttributesMap.get(attribute));
     }
+
+    public void remove(AttributeKey key) {
+        Map<String, AttributeKvEntry> map = getMapByScope(key.getScope());
+        if (map != null) {
+            map.remove(key);
+        }
+    }
+
+    public void update(String scope, List<AttributeKvEntry> values) {
+        Map<String, AttributeKvEntry> map = getMapByScope(scope);
+        values.forEach(v -> map.put(v.getKey(), v));
+    }
+
+    private Map<String, AttributeKvEntry> getMapByScope(String scope) {
+        Map<String, AttributeKvEntry> map = null;
+        if (scope.equalsIgnoreCase(DataConstants.CLIENT_SCOPE)) {
+            map = clientSideAttributesMap;
+        } else if (scope.equalsIgnoreCase(DataConstants.SHARED_SCOPE)) {
+            map = serverPublicAttributesMap;
+        } else if (scope.equalsIgnoreCase(DataConstants.SERVER_SCOPE)) {
+            map = serverPrivateAttributesMap;
+        }
+        return map;
+    }
 }
diff --git a/extensions-api/src/main/java/org/thingsboard/server/extensions/api/device/DeviceAttributesEventNotificationMsg.java b/extensions-api/src/main/java/org/thingsboard/server/extensions/api/device/DeviceAttributesEventNotificationMsg.java
index 4ff72ee..f25f4f8 100644
--- a/extensions-api/src/main/java/org/thingsboard/server/extensions/api/device/DeviceAttributesEventNotificationMsg.java
+++ b/extensions-api/src/main/java/org/thingsboard/server/extensions/api/device/DeviceAttributesEventNotificationMsg.java
@@ -15,37 +15,43 @@
  */
 package org.thingsboard.server.extensions.api.device;
 
+import lombok.AllArgsConstructor;
 import lombok.Getter;
 import lombok.ToString;
 import org.thingsboard.server.common.data.id.DeviceId;
 import org.thingsboard.server.common.data.id.TenantId;
 import org.thingsboard.server.common.data.kv.AttributeKey;
+import org.thingsboard.server.common.data.kv.AttributeKvEntry;
 
+import java.util.List;
 import java.util.Set;
 
 /**
  * @author Andrew Shvayka
  */
 @ToString
+@AllArgsConstructor
 public class DeviceAttributesEventNotificationMsg implements ToDeviceActorNotificationMsg {
 
-    @Getter private final TenantId tenantId;
-    @Getter private final DeviceId deviceId;
-    @Getter private final Set<AttributeKey> keys;
-    @Getter private final boolean deleted;
+    @Getter
+    private final TenantId tenantId;
+    @Getter
+    private final DeviceId deviceId;
+    @Getter
+    private final Set<AttributeKey> deletedKeys;
+    @Getter
+    private final String scope;
+    @Getter
+    private final List<AttributeKvEntry> values;
+    @Getter
+    private final boolean deleted;
 
-    public static DeviceAttributesEventNotificationMsg onUpdate(TenantId tenantId, DeviceId deviceId, Set<AttributeKey> keys) {
-        return new DeviceAttributesEventNotificationMsg(tenantId, deviceId, keys, false);
+    public static DeviceAttributesEventNotificationMsg onUpdate(TenantId tenantId, DeviceId deviceId, String scope, List<AttributeKvEntry> values) {
+        return new DeviceAttributesEventNotificationMsg(tenantId, deviceId, null, scope, values, false);
     }
 
     public static DeviceAttributesEventNotificationMsg onDelete(TenantId tenantId, DeviceId deviceId, Set<AttributeKey> keys) {
-        return new DeviceAttributesEventNotificationMsg(tenantId, deviceId, keys, true);
+        return new DeviceAttributesEventNotificationMsg(tenantId, deviceId, keys, null, null, true);
     }
 
-    private DeviceAttributesEventNotificationMsg(TenantId tenantId, DeviceId deviceId, Set<AttributeKey> keys, boolean deleted) {
-        this.tenantId = tenantId;
-        this.deviceId = deviceId;
-        this.keys = keys;
-        this.deleted = deleted;
-    }
 }
diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRestMsgHandler.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRestMsgHandler.java
index 2bd17aa..78fa4ad 100644
--- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRestMsgHandler.java
+++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRestMsgHandler.java
@@ -93,7 +93,7 @@ public class TelemetryRestMsgHandler extends DefaultRestMsgHandler {
                     Aggregation agg = Aggregation.valueOf(request.getParameter("agg", Aggregation.NONE.name()));
 
                     List<String> keys = Arrays.asList(keysStr.split(","));
-                    List<TsKvQuery> queries = keys.stream().map(key -> new BaseTsKvQuery(key, startTs.get(), endTs.get(), limit.get(), agg)).collect(Collectors.toList());
+                    List<TsKvQuery> queries = keys.stream().map(key -> new BaseTsKvQuery(key, startTs.get(), endTs.get(), limit.orElse(TelemetryWebsocketMsgHandler.DEFAULT_LIMIT), agg)).collect(Collectors.toList());
                     ctx.loadTimeseries(deviceId, queries, new PluginCallback<List<TsKvEntry>>() {
                         @Override
                         public void onSuccess(PluginContext ctx, List<TsKvEntry> data) {
diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryWebsocketMsgHandler.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryWebsocketMsgHandler.java
index fbfacd3..4bc7ae0 100644
--- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryWebsocketMsgHandler.java
+++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryWebsocketMsgHandler.java
@@ -48,6 +48,8 @@ import java.util.stream.Collectors;
 public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler {
 
     private static final int UNKNOWN_SUBSCRIPTION_ID = 0;
+    public static final int DEFAULT_LIMIT = 100;
+    public static final Aggregation DEFAULT_AGGREGATION = Aggregation.NONE;
 
     private final SubscriptionManager subscriptionManager;
 
@@ -187,13 +189,11 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler {
                 if (keysOptional.isPresent()) {
                     long startTs;
                     if (cmd.getTimeWindow() > 0) {
-                        List<TsKvEntry> data = new ArrayList<>();
                         List<String> keys = new ArrayList<>(getKeys(cmd).orElse(Collections.emptySet()));
                         log.debug("[{}] fetching timeseries data for last {} ms for keys: ({}) for device : {}", sessionId, cmd.getTimeWindow(), cmd.getKeys(), cmd.getDeviceId());
                         long endTs = System.currentTimeMillis();
                         startTs = endTs - cmd.getTimeWindow();
-
-                        List<TsKvQuery> queries = keys.stream().map(key -> new BaseTsKvQuery(key, startTs, endTs, cmd.getLimit(), Aggregation.valueOf(cmd.getAgg()))).collect(Collectors.toList());
+                        List<TsKvQuery> queries = keys.stream().map(key -> new BaseTsKvQuery(key, startTs, endTs, getLimit(cmd.getLimit()), getAggregation(cmd.getAgg()))).collect(Collectors.toList());
                         ctx.loadTimeseries(deviceId, queries, getSubscriptionCallback(sessionRef, cmd, sessionId, deviceId, startTs, keys));
                     } else {
                         List<String> keys = new ArrayList<>(getKeys(cmd).orElse(Collections.emptySet()));
@@ -277,7 +277,7 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler {
         }
         DeviceId deviceId = DeviceId.fromString(cmd.getDeviceId());
         List<String> keys = new ArrayList<>(getKeys(cmd).orElse(Collections.emptySet()));
-        List<TsKvQuery> queries = keys.stream().map(key -> new BaseTsKvQuery(key, cmd.getStartTs(), cmd.getEndTs(), cmd.getLimit(), Aggregation.valueOf(cmd.getAgg()))).collect(Collectors.toList());
+        List<TsKvQuery> queries = keys.stream().map(key -> new BaseTsKvQuery(key, cmd.getStartTs(), cmd.getEndTs(), getLimit(cmd.getLimit()), getAggregation(cmd.getAgg()))).collect(Collectors.toList());
         ctx.loadTimeseries(deviceId, queries, new PluginCallback<List<TsKvEntry>>() {
             @Override
             public void onSuccess(PluginContext ctx, List<TsKvEntry> data) {
@@ -299,6 +299,14 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler {
         });
     }
 
+    private static Aggregation getAggregation(String agg) {
+        return StringUtils.isEmpty(agg) ? DEFAULT_AGGREGATION : Aggregation.valueOf(agg);
+    }
+
+    private int getLimit(int limit) {
+        return limit == 0 ? DEFAULT_LIMIT : limit;
+    }
+
     private boolean validateSessionMetadata(PluginContext ctx, PluginWebsocketSessionRef sessionRef, SubscriptionCmd cmd, String sessionId) {
         WsSessionMetaData sessionMD = wsSessionsMap.get(sessionId);
         if (sessionMD == null) {
diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java
index 2badd3a..854ad8f 100644
--- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java
+++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java
@@ -91,7 +91,9 @@ public class GatewaySessionCtx {
     public void onDeviceDisconnect(MqttPublishMessage msg) throws AdaptorException {
         String deviceName = checkDeviceName(getDeviceName(msg));
         GatewayDeviceSessionCtx deviceSessionCtx = devices.remove(deviceName);
-        deviceSessionCtx.setClosed(true);
+        if (deviceSessionCtx != null) {
+            deviceSessionCtx.setClosed(true);
+        }
         ack(msg);
     }