thingsboard-aplcache

Fixes for subscription

9/18/2018 1:42:14 PM

Details

diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java
index 3ded882..617cd20 100644
--- a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java
+++ b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java
@@ -5,7 +5,7 @@
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -68,6 +68,7 @@ import javax.annotation.PostConstruct;
 import javax.annotation.PreDestroy;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
@@ -140,24 +141,64 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio
 
     @Override
     public void addLocalWsSubscription(String sessionId, EntityId entityId, SubscriptionState sub) {
+        long startTime = 0L;
+        long endTime = 0L;
         if (entityId.getEntityType().equals(EntityType.ENTITY_VIEW)) {
             EntityView entityView = entityViewService.findEntityViewById(new EntityViewId(entityId.getId()));
             entityId = entityView.getEntityId();
+            startTime = entityView.getStartTimeMs();
+            endTime = entityView.getEndTimeMs();
+            sub = getUpdatedSubscriptionState(entityId, sub, entityView);
         }
         Optional<ServerAddress> server = routingService.resolveById(entityId);
         Subscription subscription;
         if (server.isPresent()) {
             ServerAddress address = server.get();
             log.trace("[{}] Forwarding subscription [{}] for [{}] entity [{}] to [{}]", sessionId, sub.getSubscriptionId(), entityId.getEntityType().name(), entityId, address);
-            subscription = new Subscription(sub, true, address);
+            subscription = new Subscription(sub, true, address, startTime, endTime);
             tellNewSubscription(address, sessionId, subscription);
         } else {
             log.trace("[{}] Registering local subscription [{}] for [{}] entity [{}]", sessionId, sub.getSubscriptionId(), entityId.getEntityType().name(), entityId);
-            subscription = new Subscription(sub, true);
+            subscription = new Subscription(sub, true, null, startTime, endTime);
         }
         registerSubscription(sessionId, entityId, subscription);
     }
 
+    private SubscriptionState getUpdatedSubscriptionState(EntityId entityId, SubscriptionState sub, EntityView entityView) {
+        boolean allKeys;
+        Map<String, Long> keyStates;
+        if (sub.getType().equals(TelemetryFeature.TIMESERIES) && !entityView.getKeys().getTimeseries().isEmpty()) {
+            allKeys = false;
+            keyStates = sub.getKeyStates().entrySet()
+                    .stream().filter(entry -> entityView.getKeys().getTimeseries().contains(entry.getKey()))
+                    .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+        } else if (sub.getType().equals(TelemetryFeature.ATTRIBUTES)) {
+            if (sub.getScope().equals(DataConstants.CLIENT_SCOPE) && !entityView.getKeys().getAttributes().getCs().isEmpty()) {
+                allKeys = false;
+                keyStates = filterMap(sub, entityView.getKeys().getAttributes().getCs());
+            } else if (sub.getScope().equals(DataConstants.SERVER_SCOPE) && !entityView.getKeys().getAttributes().getSs().isEmpty()) {
+                allKeys = false;
+                keyStates = filterMap(sub, entityView.getKeys().getAttributes().getSs());
+            } else if (sub.getScope().equals(DataConstants.SERVER_SCOPE) && !entityView.getKeys().getAttributes().getSh().isEmpty()) {
+                allKeys = false;
+                keyStates = filterMap(sub, entityView.getKeys().getAttributes().getSh());
+            } else {
+                allKeys = sub.isAllKeys();
+                keyStates = sub.getKeyStates();
+            }
+        } else {
+            allKeys = sub.isAllKeys();
+            keyStates = sub.getKeyStates();
+        }
+        return new SubscriptionState(sub.getWsSessionId(), sub.getSubscriptionId(), entityId, sub.getType(), allKeys, keyStates, sub.getScope());
+    }
+
+    private Map<String, Long> filterMap(SubscriptionState sub, List<String> allowedKeys) {
+        return sub.getKeyStates().entrySet()
+                .stream().filter(entry -> allowedKeys.contains(entry.getKey()))
+                .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+    }
+
     @Override
     public void cleanupLocalWsSessionSubscriptions(TelemetryWebSocketSessionRef sessionRef, String sessionId) {
         cleanupLocalWsSessionSubscriptions(sessionId);
@@ -426,7 +467,7 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio
         onLocalSubUpdate(entityId, s -> TelemetryFeature.ATTRIBUTES == s.getType() && (StringUtils.isEmpty(s.getScope()) || scope.equals(s.getScope())), s -> {
             List<TsKvEntry> subscriptionUpdate = null;
             for (AttributeKvEntry kv : attributes) {
-                if (s.isAllKeys() || s.getKeyStates().containsKey(kv.getKey())) {
+                if (isInTimeRange(s, kv.getLastUpdateTs()) && (s.isAllKeys() || s.getKeyStates().containsKey(kv.getKey()))) {
                     if (subscriptionUpdate == null) {
                         subscriptionUpdate = new ArrayList<>();
                     }
@@ -441,7 +482,7 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio
         onLocalSubUpdate(entityId, s -> TelemetryFeature.TIMESERIES == s.getType(), s -> {
             List<TsKvEntry> subscriptionUpdate = null;
             for (TsKvEntry kv : ts) {
-                if (s.isAllKeys() || s.getKeyStates().containsKey((kv.getKey()))) {
+                if (isInTimeRange(s, kv.getTs()) && (s.isAllKeys() || s.getKeyStates().containsKey((kv.getKey())))) {
                     if (subscriptionUpdate == null) {
                         subscriptionUpdate = new ArrayList<>();
                     }
@@ -452,6 +493,11 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio
         });
     }
 
+    private boolean isInTimeRange(Subscription subscription, long kvTime) {
+        return (subscription.getStartTime() == 0 || subscription.getStartTime() <= kvTime)
+                && (subscription.getEndTime() == 0 || subscription.getEndTime() >= kvTime);
+    }
+
     private void onLocalSubUpdate(EntityId entityId, Predicate<Subscription> filter, Function<Subscription, List<TsKvEntry>> f) {
         Set<Subscription> deviceSubscriptions = subscriptionsByEntityId.get(entityId);
         if (deviceSubscriptions != null) {
diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/sub/Subscription.java b/application/src/main/java/org/thingsboard/server/service/telemetry/sub/Subscription.java
index 47c80e9..32d6243 100644
--- a/application/src/main/java/org/thingsboard/server/service/telemetry/sub/Subscription.java
+++ b/application/src/main/java/org/thingsboard/server/service/telemetry/sub/Subscription.java
@@ -33,10 +33,6 @@ public class Subscription {
     private long startTime;
     private long endTime;
 
-    public Subscription(SubscriptionState sub, boolean local) {
-        this(sub, local, null);
-    }
-
     public Subscription(SubscriptionState sub, boolean local, ServerAddress server) {
         this(sub, local, server, 0L, 0L);
     }