thingsboard-memoizeit

Details

diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java
index 7c77242..6e0657d 100644
--- a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java
+++ b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java
@@ -35,18 +35,7 @@ import org.thingsboard.server.common.data.id.EntityId;
 import org.thingsboard.server.common.data.id.EntityIdFactory;
 import org.thingsboard.server.common.data.id.EntityViewId;
 import org.thingsboard.server.common.data.id.TenantId;
-import org.thingsboard.server.common.data.kv.AttributeKvEntry;
-import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
-import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery;
-import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
-import org.thingsboard.server.common.data.kv.BooleanDataEntry;
-import org.thingsboard.server.common.data.kv.DataType;
-import org.thingsboard.server.common.data.kv.DoubleDataEntry;
-import org.thingsboard.server.common.data.kv.KvEntry;
-import org.thingsboard.server.common.data.kv.LongDataEntry;
-import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
-import org.thingsboard.server.common.data.kv.StringDataEntry;
-import org.thingsboard.server.common.data.kv.TsKvEntry;
+import org.thingsboard.server.common.data.kv.*;
 import org.thingsboard.server.common.msg.cluster.SendToClusterMsg;
 import org.thingsboard.server.common.msg.cluster.ServerAddress;
 import org.thingsboard.server.dao.attributes.AttributesService;
@@ -381,7 +370,7 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio
 
     private void addRemoteWsSubscription(ServerAddress address, String sessionId, Subscription subscription) {
         EntityId entityId = subscription.getEntityId();
-        log.trace("[{}] Registering remote subscription [{}] for device [{}] to [{}]", sessionId, subscription.getSubscriptionId(), entityId, address);
+        log.trace("[{}] Registering remote subscription [{}] for entity [{}] to [{}]", sessionId, subscription.getSubscriptionId(), entityId, address);
         registerSubscription(sessionId, entityId, subscription);
         if (subscription.getType() == TelemetryFeature.ATTRIBUTES) {
             final Map<String, Long> keyStates = subscription.getKeyStates();
@@ -401,17 +390,22 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio
             long curTs = System.currentTimeMillis();
             List<ReadTsKvQuery> queries = new ArrayList<>();
             subscription.getKeyStates().entrySet().forEach(e -> {
-                queries.add(new BaseReadTsKvQuery(e.getKey(), e.getValue() + 1L, curTs));
+                if (curTs > e.getValue()) {
+                    queries.add(new BaseReadTsKvQuery(e.getKey(), e.getValue() + 1L, curTs, 0, 1000, Aggregation.NONE));
+                } else {
+                    log.debug("[{}] Invalid subscription [{}], entityId [{}] curTs [{}]", sessionId, subscription, entityId, curTs);
+                }
             });
-
-            DonAsynchron.withCallback(tsService.findAll(entityId, queries),
-                    missedUpdates -> {
-                        if (missedUpdates != null && !missedUpdates.isEmpty()) {
-                            tellRemoteSubUpdate(address, sessionId, new SubscriptionUpdate(subscription.getSubscriptionId(), missedUpdates));
-                        }
-                    },
-                    e -> log.error("Failed to fetch missed updates.", e),
-                    tsCallBackExecutor);
+            if (!queries.isEmpty()) {
+                DonAsynchron.withCallback(tsService.findAll(entityId, queries),
+                        missedUpdates -> {
+                            if (missedUpdates != null && !missedUpdates.isEmpty()) {
+                                tellRemoteSubUpdate(address, sessionId, new SubscriptionUpdate(subscription.getSubscriptionId(), missedUpdates));
+                            }
+                        },
+                        e -> log.error("Failed to fetch missed updates.", e),
+                        tsCallBackExecutor);
+            }
         }
     }