thingsboard-aplcache

Attribute subscription

1/11/2018 2:08:36 PM

Details

diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRpcMsgHandler.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRpcMsgHandler.java
index ba5d610..874c480 100644
--- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRpcMsgHandler.java
+++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRpcMsgHandler.java
@@ -114,7 +114,7 @@ public class TelemetryRpcMsgHandler implements RpcMsgHandler {
         }
         Map<String, Long> statesMap = proto.getKeyStatesList().stream().collect(Collectors.toMap(SubscriptionKetStateProto::getKey, SubscriptionKetStateProto::getTs));
         Subscription subscription = new Subscription(
-                new SubscriptionState(proto.getSessionId(), proto.getSubscriptionId(), EntityIdFactory.getByTypeAndId(proto.getEntityType(), proto.getEntityId()), SubscriptionType.valueOf(proto.getType()), proto.getAllKeys(), statesMap),
+                new SubscriptionState(proto.getSessionId(), proto.getSubscriptionId(), EntityIdFactory.getByTypeAndId(proto.getEntityType(), proto.getEntityId()), SubscriptionType.valueOf(proto.getType()), proto.getAllKeys(), statesMap, proto.getScope()),
                 false, msg.getServerAddress());
         subscriptionManager.addRemoteWsSubscription(ctx, msg.getServerAddress(), proto.getSessionId(), subscription);
     }
@@ -127,6 +127,7 @@ public class TelemetryRpcMsgHandler implements RpcMsgHandler {
         builder.setEntityId(cmd.getEntityId().getId().toString());
         builder.setType(cmd.getType().name());
         builder.setAllKeys(cmd.isAllKeys());
+        builder.setScope(cmd.getScope());
         cmd.getKeyStates().entrySet().forEach(e -> builder.addKeyStates(SubscriptionKetStateProto.newBuilder().setKey(e.getKey()).setTs(e.getValue()).build()));
         ctx.sendPluginRpcMsg(new RpcMsg(address, SUBSCRIPTION_CLAZZ, builder.build().toByteArray()));
     }
diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryWebsocketMsgHandler.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryWebsocketMsgHandler.java
index 7b0e6d8..6f02c9a 100644
--- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryWebsocketMsgHandler.java
+++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryWebsocketMsgHandler.java
@@ -131,7 +131,7 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler {
                 keys.forEach(key -> subState.put(key, 0L));
                 attributesData.forEach(v -> subState.put(v.getKey(), v.getTs()));
 
-                SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), entityId, SubscriptionType.ATTRIBUTES, false, subState);
+                SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), entityId, SubscriptionType.ATTRIBUTES, false, subState, cmd.getScope());
                 subscriptionManager.addLocalWsSubscription(ctx, sessionId, entityId, sub);
             }
 
@@ -168,7 +168,7 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler {
                 Map<String, Long> subState = new HashMap<>(attributesData.size());
                 attributesData.forEach(v -> subState.put(v.getKey(), v.getTs()));
 
-                SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), entityId, SubscriptionType.ATTRIBUTES, true, subState);
+                SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), entityId, SubscriptionType.ATTRIBUTES, true, subState, cmd.getScope());
                 subscriptionManager.addLocalWsSubscription(ctx, sessionId, entityId, sub);
             }
 
@@ -234,7 +234,7 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler {
                 sendWsMsg(ctx, sessionRef, new SubscriptionUpdate(cmd.getCmdId(), data));
                 Map<String, Long> subState = new HashMap<>(data.size());
                 data.forEach(v -> subState.put(v.getKey(), v.getTs()));
-                SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), entityId, SubscriptionType.TIMESERIES, true, subState);
+                SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), entityId, SubscriptionType.TIMESERIES, true, subState, cmd.getScope());
                 subscriptionManager.addLocalWsSubscription(ctx, sessionId, entityId, sub);
             }
 
@@ -262,7 +262,7 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler {
                 Map<String, Long> subState = new HashMap<>(keys.size());
                 keys.forEach(key -> subState.put(key, startTs));
                 data.forEach(v -> subState.put(v.getKey(), v.getTs()));
-                SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), entityId, SubscriptionType.TIMESERIES, false, subState);
+                SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), entityId, SubscriptionType.TIMESERIES, false, subState, cmd.getScope());
                 subscriptionManager.addLocalWsSubscription(ctx, sessionId, entityId, sub);
             }
 
diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/sub/Subscription.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/sub/Subscription.java
index 1285cfa..360b018 100644
--- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/sub/Subscription.java
+++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/sub/Subscription.java
@@ -51,6 +51,10 @@ public class Subscription {
         return getSub().getType();
     }
 
+    public String getScope() {
+        return getSub().getScope();
+    }
+
     public boolean isAllKeys() {
         return getSub().isAllKeys();
     }
diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/sub/SubscriptionState.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/sub/SubscriptionState.java
index 5e15fda..7e0a2ba 100644
--- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/sub/SubscriptionState.java
+++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/sub/SubscriptionState.java
@@ -33,6 +33,7 @@ public class SubscriptionState {
     @Getter private final SubscriptionType type;
     @Getter private final boolean allKeys;
     @Getter private final Map<String, Long> keyStates;
+    @Getter private final String scope;
 
     @Override
     public boolean equals(Object o) {
diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/SubscriptionManager.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/SubscriptionManager.java
index d137e10..bad1678 100644
--- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/SubscriptionManager.java
+++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/SubscriptionManager.java
@@ -33,6 +33,7 @@ import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionU
 
 import java.util.*;
 import java.util.function.Function;
+import java.util.function.Predicate;
 
 /**
  * @author Andrew Shvayka
@@ -174,9 +175,13 @@ public class SubscriptionManager {
     }
 
     public void onLocalSubscriptionUpdate(PluginContext ctx, EntityId entityId, SubscriptionType type, Function<Subscription, List<TsKvEntry>> f) {
+        onLocalSubscriptionUpdate(ctx, entityId, s -> type == s.getType(), f);
+    }
+
+    public void onLocalSubscriptionUpdate(PluginContext ctx, EntityId entityId, Predicate<Subscription> filter, Function<Subscription, List<TsKvEntry>> f) {
         Set<Subscription> deviceSubscriptions = subscriptionsByEntityId.get(entityId);
         if (deviceSubscriptions != null) {
-            deviceSubscriptions.stream().filter(s -> type == s.getType()).forEach(s -> {
+            deviceSubscriptions.stream().filter(filter).forEach(s -> {
                 String sessionId = s.getWsSessionId();
                 List<TsKvEntry> subscriptionUpdate = f.apply(s);
                 if (!subscriptionUpdate.isEmpty()) {
@@ -206,7 +211,7 @@ public class SubscriptionManager {
     public void onAttributesUpdateFromServer(PluginContext ctx, EntityId entityId, String scope, List<AttributeKvEntry> attributes) {
         Optional<ServerAddress> serverAddress = ctx.resolve(entityId);
         if (!serverAddress.isPresent()) {
-            onLocalSubscriptionUpdate(ctx, entityId, SubscriptionType.ATTRIBUTES, s -> {
+            onLocalSubscriptionUpdate(ctx, entityId, s -> SubscriptionType.ATTRIBUTES == s.getType() && scope.equals(s.getScope()), s -> {
                 List<TsKvEntry> subscriptionUpdate = new ArrayList<TsKvEntry>();
                 for (AttributeKvEntry kv : attributes) {
                     if (s.isAllKeys() || s.getKeyStates().containsKey(kv.getKey())) {
diff --git a/extensions-core/src/main/proto/telemetry.proto b/extensions-core/src/main/proto/telemetry.proto
index 2bfef59..59c5c14 100644
--- a/extensions-core/src/main/proto/telemetry.proto
+++ b/extensions-core/src/main/proto/telemetry.proto
@@ -27,6 +27,7 @@ message SubscriptionProto {
   string type = 5;
   bool allKeys = 6;
   repeated SubscriptionKetStateProto keyStates = 7;
+  string scope = 8;
 }
 
 message SubscriptionUpdateProto {