diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java
index 7c77242..6e0657d 100644
--- a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java
+++ b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java
@@ -35,18 +35,7 @@ import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.EntityIdFactory;
import org.thingsboard.server.common.data.id.EntityViewId;
import org.thingsboard.server.common.data.id.TenantId;
-import org.thingsboard.server.common.data.kv.AttributeKvEntry;
-import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
-import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery;
-import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
-import org.thingsboard.server.common.data.kv.BooleanDataEntry;
-import org.thingsboard.server.common.data.kv.DataType;
-import org.thingsboard.server.common.data.kv.DoubleDataEntry;
-import org.thingsboard.server.common.data.kv.KvEntry;
-import org.thingsboard.server.common.data.kv.LongDataEntry;
-import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
-import org.thingsboard.server.common.data.kv.StringDataEntry;
-import org.thingsboard.server.common.data.kv.TsKvEntry;
+import org.thingsboard.server.common.data.kv.*;
import org.thingsboard.server.common.msg.cluster.SendToClusterMsg;
import org.thingsboard.server.common.msg.cluster.ServerAddress;
import org.thingsboard.server.dao.attributes.AttributesService;
@@ -381,7 +370,7 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio
private void addRemoteWsSubscription(ServerAddress address, String sessionId, Subscription subscription) {
EntityId entityId = subscription.getEntityId();
- log.trace("[{}] Registering remote subscription [{}] for device [{}] to [{}]", sessionId, subscription.getSubscriptionId(), entityId, address);
+ log.trace("[{}] Registering remote subscription [{}] for entity [{}] to [{}]", sessionId, subscription.getSubscriptionId(), entityId, address);
registerSubscription(sessionId, entityId, subscription);
if (subscription.getType() == TelemetryFeature.ATTRIBUTES) {
final Map<String, Long> keyStates = subscription.getKeyStates();
@@ -401,17 +390,22 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio
long curTs = System.currentTimeMillis();
List<ReadTsKvQuery> queries = new ArrayList<>();
subscription.getKeyStates().entrySet().forEach(e -> {
- queries.add(new BaseReadTsKvQuery(e.getKey(), e.getValue() + 1L, curTs));
+ if (curTs > e.getValue()) {
+ queries.add(new BaseReadTsKvQuery(e.getKey(), e.getValue() + 1L, curTs, 0, 1000, Aggregation.NONE));
+ } else {
+ log.debug("[{}] Invalid subscription [{}], entityId [{}] curTs [{}]", sessionId, subscription, entityId, curTs);
+ }
});
-
- DonAsynchron.withCallback(tsService.findAll(entityId, queries),
- missedUpdates -> {
- if (missedUpdates != null && !missedUpdates.isEmpty()) {
- tellRemoteSubUpdate(address, sessionId, new SubscriptionUpdate(subscription.getSubscriptionId(), missedUpdates));
- }
- },
- e -> log.error("Failed to fetch missed updates.", e),
- tsCallBackExecutor);
+ if (!queries.isEmpty()) {
+ DonAsynchron.withCallback(tsService.findAll(entityId, queries),
+ missedUpdates -> {
+ if (missedUpdates != null && !missedUpdates.isEmpty()) {
+ tellRemoteSubUpdate(address, sessionId, new SubscriptionUpdate(subscription.getSubscriptionId(), missedUpdates));
+ }
+ },
+ e -> log.error("Failed to fetch missed updates.", e),
+ tsCallBackExecutor);
+ }
}
}