thingsboard-aplcache

Details

diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java b/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java
index 34ae9b2..a446680 100644
--- a/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java
+++ b/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java
@@ -29,6 +29,8 @@ public class DataConstants {
     public static final String SERVER_SCOPE = "SERVER_SCOPE";
     public static final String SHARED_SCOPE = "SHARED_SCOPE";
 
+    public static final String[] ALL_SCOPES = {CLIENT_SCOPE, SHARED_SCOPE, SERVER_SCOPE};
+
     public static final String ALARM = "ALARM";
     public static final String ERROR = "ERROR";
     public static final String LC_EVENT = "LC_EVENT";
diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRestMsgHandler.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRestMsgHandler.java
index dee981a..1103635 100644
--- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRestMsgHandler.java
+++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRestMsgHandler.java
@@ -29,6 +29,7 @@ import org.thingsboard.server.extensions.api.plugins.handlers.DefaultRestMsgHand
 import org.thingsboard.server.extensions.api.plugins.rest.PluginRestMsg;
 import org.thingsboard.server.extensions.api.plugins.rest.RestRequest;
 import org.thingsboard.server.extensions.core.plugin.telemetry.AttributeData;
+import org.thingsboard.server.extensions.core.plugin.telemetry.SubscriptionManager;
 import org.thingsboard.server.extensions.core.plugin.telemetry.TsData;
 
 import javax.servlet.ServletException;
@@ -39,6 +40,12 @@ import java.util.stream.Collectors;
 @Slf4j
 public class TelemetryRestMsgHandler extends DefaultRestMsgHandler {
 
+    private final SubscriptionManager subscriptionManager;
+
+    public TelemetryRestMsgHandler(SubscriptionManager subscriptionManager) {
+        this.subscriptionManager = subscriptionManager;
+    }
+
     @Override
     public void handleHttpGetRequest(PluginContext ctx, PluginRestMsg msg) throws ServletException {
         RestRequest request = msg.getRequest();
@@ -74,9 +81,8 @@ public class TelemetryRestMsgHandler extends DefaultRestMsgHandler {
                     if (!StringUtils.isEmpty(scope)) {
                         attributes = ctx.loadAttributes(deviceId, scope);
                     } else {
-                        attributes = ctx.loadAttributes(deviceId, DataConstants.CLIENT_SCOPE);
-                        attributes.addAll(ctx.loadAttributes(deviceId, DataConstants.SERVER_SCOPE));
-                        attributes.addAll(ctx.loadAttributes(deviceId, DataConstants.SHARED_SCOPE));
+                        attributes = new ArrayList<>();
+                        Arrays.stream(DataConstants.ALL_SCOPES).forEach(s -> attributes.addAll(ctx.loadAttributes(deviceId, s)));
                     }
                     List<String> keys = attributes.stream().map(attrKv -> attrKv.getKey()).collect(Collectors.toList());
                     msg.getResponseHolder().setResult(new ResponseEntity<>(keys, HttpStatus.OK));
@@ -99,9 +105,8 @@ public class TelemetryRestMsgHandler extends DefaultRestMsgHandler {
                     if (!StringUtils.isEmpty(scope)) {
                         attributes = getAttributeKvEntries(ctx, scope, deviceId, keys);
                     } else {
-                        attributes = getAttributeKvEntries(ctx, DataConstants.CLIENT_SCOPE, deviceId, keys);
-                        attributes.addAll(getAttributeKvEntries(ctx, DataConstants.SHARED_SCOPE, deviceId, keys));
-                        attributes.addAll(getAttributeKvEntries(ctx, DataConstants.SERVER_SCOPE, deviceId, keys));
+                        attributes = new ArrayList<>();
+                        Arrays.stream(DataConstants.ALL_SCOPES).forEach(s -> attributes.addAll(getAttributeKvEntries(ctx, s, deviceId, keys)));
                     }
                     List<AttributeData> values = attributes.stream().map(attribute -> new AttributeData(attribute.getLastUpdateTs(),
                             attribute.getKey(), attribute.getValue())).collect(Collectors.toList());
@@ -145,6 +150,7 @@ public class TelemetryRestMsgHandler extends DefaultRestMsgHandler {
                                 @Override
                                 public void onSuccess(PluginContext ctx, Void value) {
                                     msg.getResponseHolder().setResult(new ResponseEntity<>(HttpStatus.OK));
+                                    subscriptionManager.onAttributesUpdateFromServer(ctx, deviceId, scope, attributes);
                                 }
 
                                 @Override
@@ -172,7 +178,8 @@ public class TelemetryRestMsgHandler extends DefaultRestMsgHandler {
                 DeviceId deviceId = DeviceId.fromString(pathParams[0]);
                 String scope = pathParams[1];
                 if (DataConstants.SERVER_SCOPE.equals(scope) ||
-                        DataConstants.SHARED_SCOPE.equals(scope)) {
+                        DataConstants.SHARED_SCOPE.equals(scope) ||
+                        DataConstants.CLIENT_SCOPE.equals(scope)) {
                     String keysParam = request.getParameter("keys");
                     if (!StringUtils.isEmpty(keysParam)) {
                         String[] keys = keysParam.split(",");
diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRpcMsgHandler.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRpcMsgHandler.java
index 06467fe..e59fa64 100644
--- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRpcMsgHandler.java
+++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRpcMsgHandler.java
@@ -19,6 +19,7 @@ import com.google.protobuf.InvalidProtocolBufferException;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.thingsboard.server.common.data.id.DeviceId;
+import org.thingsboard.server.common.data.kv.*;
 import org.thingsboard.server.common.msg.cluster.ServerAddress;
 import org.thingsboard.server.extensions.api.plugins.PluginContext;
 import org.thingsboard.server.extensions.api.plugins.handlers.RpcMsgHandler;
@@ -42,9 +43,10 @@ public class TelemetryRpcMsgHandler implements RpcMsgHandler {
     private final SubscriptionManager subscriptionManager;
 
     private static final int SUBSCRIPTION_CLAZZ = 1;
-    private static final int SUBSCRIPTION_UPDATE_CLAZZ = 2;
-    private static final int SESSION_CLOSE_CLAZZ = 3;
-    private static final int SUBSCRIPTION_CLOSE_CLAZZ = 4;
+    private static final int ATTRIBUTES_UPDATE_CLAZZ = 2;
+    private static final int SUBSCRIPTION_UPDATE_CLAZZ = 3;
+    private static final int SESSION_CLOSE_CLAZZ = 4;
+    private static final int SUBSCRIPTION_CLOSE_CLAZZ = 5;
 
     @Override
     public void process(PluginContext ctx, RpcMsg msg) {
@@ -55,6 +57,9 @@ public class TelemetryRpcMsgHandler implements RpcMsgHandler {
             case SUBSCRIPTION_UPDATE_CLAZZ:
                 processRemoteSubscriptionUpdate(ctx, msg);
                 break;
+            case ATTRIBUTES_UPDATE_CLAZZ:
+                processAttributeUpdate(ctx, msg);
+                break;
             case SESSION_CLOSE_CLAZZ:
                 processSessionClose(ctx, msg);
                 break;
@@ -76,6 +81,17 @@ public class TelemetryRpcMsgHandler implements RpcMsgHandler {
         subscriptionManager.onRemoteSubscriptionUpdate(ctx, proto.getSessionId(), convert(proto));
     }
 
+    private void processAttributeUpdate(PluginContext ctx, RpcMsg msg) {
+        AttributeUpdateProto proto;
+        try {
+            proto = AttributeUpdateProto.parseFrom(msg.getMsgData());
+        } catch (InvalidProtocolBufferException e) {
+            throw new RuntimeException(e);
+        }
+        subscriptionManager.onAttributesUpdateFromServer(ctx, DeviceId.fromString(proto.getDeviceId()), proto.getScope(),
+                proto.getDataList().stream().map(this::toAttribute).collect(Collectors.toList()));
+    }
+
     private void processSubscriptionCmd(PluginContext ctx, RpcMsg msg) {
         SubscriptionProto proto;
         try {
@@ -167,11 +183,7 @@ public class TelemetryRpcMsgHandler implements RpcMsgHandler {
         } else {
             Map<String, List<Object>> data = new TreeMap<>();
             proto.getDataList().forEach(v -> {
-                List<Object> values = data.get(v.getKey());
-                if (values == null) {
-                    values = new ArrayList<>();
-                    data.put(v.getKey(), values);
-                }
+                List<Object> values = data.computeIfAbsent(v.getKey(), k -> new ArrayList<>());
                 for (int i = 0; i < v.getTsCount(); i++) {
                     Object[] value = new Object[2];
                     value[0] = v.getTs(i);
@@ -182,4 +194,59 @@ public class TelemetryRpcMsgHandler implements RpcMsgHandler {
             return new SubscriptionUpdate(proto.getSubscriptionId(), data);
         }
     }
+
+    public void onAttributesUpdate(PluginContext ctx, ServerAddress address, DeviceId deviceId, String scope, List<AttributeKvEntry> attributes) {
+        ctx.sendPluginRpcMsg(new RpcMsg(address, ATTRIBUTES_UPDATE_CLAZZ, getAttributesUpdateProto(deviceId, scope, attributes).toByteArray()));
+    }
+
+    private AttributeUpdateProto getAttributesUpdateProto(DeviceId deviceId, String scope, List<AttributeKvEntry> attributes) {
+        AttributeUpdateProto.Builder builder = AttributeUpdateProto.newBuilder();
+        builder.setDeviceId(deviceId.toString());
+        builder.setScope(scope);
+        attributes.forEach(
+                attr -> {
+                    AttributeUpdateValueListProto.Builder dataBuilder = AttributeUpdateValueListProto.newBuilder();
+                    dataBuilder.setKey(attr.getKey());
+                    dataBuilder.setTs(attr.getLastUpdateTs());
+                    dataBuilder.setValueType(attr.getDataType().ordinal());
+                    switch (attr.getDataType()) {
+                        case BOOLEAN:
+                            dataBuilder.setBoolValue(attr.getBooleanValue().get());
+                            break;
+                        case LONG:
+                            dataBuilder.setLongValue(attr.getLongValue().get());
+                            break;
+                        case DOUBLE:
+                            dataBuilder.setDoubleValue(attr.getDoubleValue().get());
+                            break;
+                        case STRING:
+                            dataBuilder.setStrValue(attr.getStrValue().get());
+                            break;
+                    }
+                    builder.addData(dataBuilder.build());
+                }
+        );
+        return builder.build();
+    }
+
+    private AttributeKvEntry toAttribute(AttributeUpdateValueListProto proto) {
+        KvEntry entry = null;
+        DataType type = DataType.values()[proto.getValueType()];
+        switch (type) {
+            case BOOLEAN:
+                entry = new BooleanDataEntry(proto.getKey(), proto.getBoolValue());
+                break;
+            case LONG:
+                entry = new LongDataEntry(proto.getKey(), proto.getLongValue());
+                break;
+            case DOUBLE:
+                entry = new DoubleDataEntry(proto.getKey(), proto.getDoubleValue());
+                break;
+            case STRING:
+                entry = new StringDataEntry(proto.getKey(), proto.getStrValue());
+                break;
+        }
+        return new BaseAttributeKvEntry(entry, proto.getTs());
+    }
+
 }
diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryWebsocketMsgHandler.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryWebsocketMsgHandler.java
index 8e2d62a..849b322 100644
--- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryWebsocketMsgHandler.java
+++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryWebsocketMsgHandler.java
@@ -104,7 +104,8 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler {
                 SubscriptionState sub;
                 if (keysOptional.isPresent()) {
                     List<String> keys = new ArrayList<>(keysOptional.get());
-                    List<AttributeKvEntry> data = ctx.loadAttributes(deviceId, DataConstants.CLIENT_SCOPE, keys);
+                    List<AttributeKvEntry> data = new ArrayList<>();
+                    Arrays.stream(DataConstants.ALL_SCOPES).forEach(s -> data.addAll(ctx.loadAttributes(deviceId, s, keys)));
                     List<TsKvEntry> attributesData = data.stream().map(d -> new BasicTsKvEntry(d.getLastUpdateTs(), d)).collect(Collectors.toList());
                     sendWsMsg(ctx, sessionRef, new SubscriptionUpdate(cmd.getCmdId(), attributesData));
 
@@ -114,7 +115,8 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler {
 
                     sub = new SubscriptionState(sessionId, cmd.getCmdId(), deviceId, SubscriptionType.ATTRIBUTES, false, subState);
                 } else {
-                    List<AttributeKvEntry> data = ctx.loadAttributes(deviceId, DataConstants.CLIENT_SCOPE);
+                    List<AttributeKvEntry> data = new ArrayList<>();
+                    Arrays.stream(DataConstants.ALL_SCOPES).forEach(s -> data.addAll(ctx.loadAttributes(deviceId, s)));
                     List<TsKvEntry> attributesData = data.stream().map(d -> new BasicTsKvEntry(d.getLastUpdateTs(), d)).collect(Collectors.toList());
                     sendWsMsg(ctx, sessionRef, new SubscriptionUpdate(cmd.getCmdId(), attributesData));
 
diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/SubscriptionManager.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/SubscriptionManager.java
index 190d9ff..07c9629 100644
--- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/SubscriptionManager.java
+++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/SubscriptionManager.java
@@ -175,6 +175,23 @@ public class SubscriptionManager {
         }
     }
 
+    public void onAttributesUpdateFromServer(PluginContext ctx, DeviceId deviceId, String scope, List<AttributeKvEntry> attributes) {
+        Optional<ServerAddress> serverAddress = ctx.resolve(deviceId);
+        if (!serverAddress.isPresent()) {
+            onLocalSubscriptionUpdate(ctx, deviceId, SubscriptionType.ATTRIBUTES, s -> {
+                List<TsKvEntry> subscriptionUpdate = new ArrayList<TsKvEntry>();
+                for (AttributeKvEntry kv : attributes) {
+                    if (s.isAllKeys() || s.getKeyStates().containsKey(kv.getKey())) {
+                        subscriptionUpdate.add(new BasicTsKvEntry(kv.getLastUpdateTs(), kv));
+                    }
+                }
+                return subscriptionUpdate;
+            });
+        } else {
+            rpcHandler.onAttributesUpdate(ctx, serverAddress.get(), deviceId, scope, attributes);
+        }
+    }
+
     private void updateSubscriptionState(String sessionId, Subscription subState, SubscriptionUpdate update) {
         log.trace("[{}] updating subscription state {} using onUpdate {}", sessionId, subState, update);
         update.getLatestValues().entrySet().forEach(e -> subState.setKeyState(e.getKey(), e.getValue()));
diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/TelemetryStoragePlugin.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/TelemetryStoragePlugin.java
index 63d145d..8668639 100644
--- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/TelemetryStoragePlugin.java
+++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/TelemetryStoragePlugin.java
@@ -43,7 +43,7 @@ public class TelemetryStoragePlugin extends AbstractPlugin<EmptyComponentConfigu
 
     public TelemetryStoragePlugin() {
         this.subscriptionManager = new SubscriptionManager();
-        this.restMsgHandler = new TelemetryRestMsgHandler();
+        this.restMsgHandler = new TelemetryRestMsgHandler(subscriptionManager);
         this.ruleMsgHandler = new TelemetryRuleMsgHandler(subscriptionManager);
         this.websocketMsgHandler = new TelemetryWebsocketMsgHandler(subscriptionManager);
         this.rpcMsgHandler = new TelemetryRpcMsgHandler(subscriptionManager);
diff --git a/extensions-core/src/main/proto/telemetry.proto b/extensions-core/src/main/proto/telemetry.proto
index 5c7d7a4..60e40f7 100644
--- a/extensions-core/src/main/proto/telemetry.proto
+++ b/extensions-core/src/main/proto/telemetry.proto
@@ -36,6 +36,12 @@ message SubscriptionUpdateProto {
     repeated SubscriptionUpdateValueListProto data = 5;
 }
 
+message AttributeUpdateProto {
+    string deviceId = 1;
+    string scope = 2;
+    repeated AttributeUpdateValueListProto data = 3;
+}
+
 message SessionCloseProto {
     string sessionId = 1;
 }
@@ -54,4 +60,14 @@ message SubscriptionUpdateValueListProto {
     string key = 1;
     repeated int64 ts = 2;
     repeated string value = 3;
+}
+
+message AttributeUpdateValueListProto {
+    string key = 1;
+    int64 ts = 2;
+    int32 valueType = 3;
+    string strValue = 4;
+    int64 longValue = 5;
+    double doubleValue = 6;
+    bool boolValue = 7;
 }
\ No newline at end of file