thingsboard-aplcache
Changes
common/message/src/main/java/org/thingsboard/server/common/msg/core/BasicGetAttributesRequest.java 14(+10 -4)
common/message/src/main/java/org/thingsboard/server/common/msg/core/GetAttributesRequest.java 5(+3 -2)
extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/cmd/AttributesSubscriptionCmd.java 4(+0 -4)
extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/cmd/SubscriptionCmd.java 9(+9 -0)
extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/cmd/TimeseriesSubscriptionCmd.java 5(+0 -5)
extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRestMsgHandler.java 21(+14 -7)
extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRpcMsgHandler.java 83(+75 -8)
extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRuleMsgHandler.java 10(+7 -3)
extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryWebsocketMsgHandler.java 15(+13 -2)
extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/SubscriptionManager.java 17(+17 -0)
extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/TelemetryStoragePlugin.java 2(+1 -1)
transport/coap/src/main/java/org/thingsboard/server/transport/coap/adaptors/JsonCoapAdaptor.java 18(+7 -11)
transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java 25(+14 -11)
transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttAdaptor.java 11(+8 -3)
ui/src/app/api/device.service.js 3(+2 -1)
Details
diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java b/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java
index 34ae9b2..a446680 100644
--- a/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java
+++ b/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java
@@ -29,6 +29,8 @@ public class DataConstants {
public static final String SERVER_SCOPE = "SERVER_SCOPE";
public static final String SHARED_SCOPE = "SHARED_SCOPE";
+ public static final String[] ALL_SCOPES = {CLIENT_SCOPE, SHARED_SCOPE, SERVER_SCOPE};
+
public static final String ALARM = "ALARM";
public static final String ERROR = "ERROR";
public static final String LC_EVENT = "LC_EVENT";
diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/core/BasicGetAttributesRequest.java b/common/message/src/main/java/org/thingsboard/server/common/msg/core/BasicGetAttributesRequest.java
index a8fdc86..676cfb7 100644
--- a/common/message/src/main/java/org/thingsboard/server/common/msg/core/BasicGetAttributesRequest.java
+++ b/common/message/src/main/java/org/thingsboard/server/common/msg/core/BasicGetAttributesRequest.java
@@ -18,6 +18,8 @@ package org.thingsboard.server.common.msg.core;
import lombok.ToString;
import org.thingsboard.server.common.msg.session.MsgType;
+import java.util.Collections;
+import java.util.Optional;
import java.util.Set;
@ToString
@@ -28,6 +30,10 @@ public class BasicGetAttributesRequest extends BasicRequest implements GetAttrib
private final Set<String> clientKeys;
private final Set<String> sharedKeys;
+ public BasicGetAttributesRequest(Integer requestId) {
+ this(requestId, Collections.emptySet(), Collections.emptySet());
+ }
+
public BasicGetAttributesRequest(Integer requestId, Set<String> clientKeys, Set<String> sharedKeys) {
super(requestId);
this.clientKeys = clientKeys;
@@ -40,13 +46,13 @@ public class BasicGetAttributesRequest extends BasicRequest implements GetAttrib
}
@Override
- public Set<String> getClientAttributeNames() {
- return clientKeys;
+ public Optional<Set<String>> getClientAttributeNames() {
+ return Optional.of(clientKeys);
}
@Override
- public Set<String> getSharedAttributeNames() {
- return sharedKeys;
+ public Optional<Set<String>> getSharedAttributeNames() {
+ return Optional.ofNullable(sharedKeys);
}
}
diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/core/GetAttributesRequest.java b/common/message/src/main/java/org/thingsboard/server/common/msg/core/GetAttributesRequest.java
index 49bca53..0a9e1c2 100644
--- a/common/message/src/main/java/org/thingsboard/server/common/msg/core/GetAttributesRequest.java
+++ b/common/message/src/main/java/org/thingsboard/server/common/msg/core/GetAttributesRequest.java
@@ -15,6 +15,7 @@
*/
package org.thingsboard.server.common.msg.core;
+import java.util.Optional;
import java.util.Set;
import org.thingsboard.server.common.msg.session.FromDeviceMsg;
@@ -22,7 +23,7 @@ import org.thingsboard.server.common.msg.session.FromDeviceRequestMsg;
public interface GetAttributesRequest extends FromDeviceRequestMsg {
- Set<String> getClientAttributeNames();
- Set<String> getSharedAttributeNames();
+ Optional<Set<String>> getClientAttributeNames();
+ Optional<Set<String>> getSharedAttributeNames();
}
diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/cmd/AttributesSubscriptionCmd.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/cmd/AttributesSubscriptionCmd.java
index e7bc414..3190bbf 100644
--- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/cmd/AttributesSubscriptionCmd.java
+++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/cmd/AttributesSubscriptionCmd.java
@@ -24,10 +24,6 @@ import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionT
@NoArgsConstructor
public class AttributesSubscriptionCmd extends SubscriptionCmd {
- public AttributesSubscriptionCmd(int cmdId, String deviceId, String keys, boolean unsubscribe) {
- super(cmdId, deviceId, keys, unsubscribe);
- }
-
@Override
public SubscriptionType getType() {
return SubscriptionType.ATTRIBUTES;
diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/cmd/SubscriptionCmd.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/cmd/SubscriptionCmd.java
index 249dfa9..0d5fa48 100644
--- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/cmd/SubscriptionCmd.java
+++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/cmd/SubscriptionCmd.java
@@ -26,6 +26,7 @@ public abstract class SubscriptionCmd implements TelemetryPluginCmd {
private int cmdId;
private String deviceId;
private String keys;
+ private String scope;
private boolean unsubscribe;
public abstract SubscriptionType getType();
@@ -62,6 +63,14 @@ public abstract class SubscriptionCmd implements TelemetryPluginCmd {
this.unsubscribe = unsubscribe;
}
+ public String getScope() {
+ return scope;
+ }
+
+ public void setKeys(String keys) {
+ this.keys = keys;
+ }
+
@Override
public String toString() {
return "SubscriptionCmd [deviceId=" + deviceId + ", tags=" + keys + ", unsubscribe=" + unsubscribe + "]";
diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/cmd/TimeseriesSubscriptionCmd.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/cmd/TimeseriesSubscriptionCmd.java
index 0b0ff91..92d7259 100644
--- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/cmd/TimeseriesSubscriptionCmd.java
+++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/cmd/TimeseriesSubscriptionCmd.java
@@ -26,11 +26,6 @@ public class TimeseriesSubscriptionCmd extends SubscriptionCmd {
private long timeWindow;
- public TimeseriesSubscriptionCmd(int cmdId, String deviceId, String keys, boolean unsubscribe, long timeWindow) {
- super(cmdId, deviceId, keys, unsubscribe);
- this.timeWindow = timeWindow;
- }
-
public long getTimeWindow() {
return timeWindow;
}
diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRestMsgHandler.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRestMsgHandler.java
index dee981a..1103635 100644
--- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRestMsgHandler.java
+++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRestMsgHandler.java
@@ -29,6 +29,7 @@ import org.thingsboard.server.extensions.api.plugins.handlers.DefaultRestMsgHand
import org.thingsboard.server.extensions.api.plugins.rest.PluginRestMsg;
import org.thingsboard.server.extensions.api.plugins.rest.RestRequest;
import org.thingsboard.server.extensions.core.plugin.telemetry.AttributeData;
+import org.thingsboard.server.extensions.core.plugin.telemetry.SubscriptionManager;
import org.thingsboard.server.extensions.core.plugin.telemetry.TsData;
import javax.servlet.ServletException;
@@ -39,6 +40,12 @@ import java.util.stream.Collectors;
@Slf4j
public class TelemetryRestMsgHandler extends DefaultRestMsgHandler {
+ private final SubscriptionManager subscriptionManager;
+
+ public TelemetryRestMsgHandler(SubscriptionManager subscriptionManager) {
+ this.subscriptionManager = subscriptionManager;
+ }
+
@Override
public void handleHttpGetRequest(PluginContext ctx, PluginRestMsg msg) throws ServletException {
RestRequest request = msg.getRequest();
@@ -74,9 +81,8 @@ public class TelemetryRestMsgHandler extends DefaultRestMsgHandler {
if (!StringUtils.isEmpty(scope)) {
attributes = ctx.loadAttributes(deviceId, scope);
} else {
- attributes = ctx.loadAttributes(deviceId, DataConstants.CLIENT_SCOPE);
- attributes.addAll(ctx.loadAttributes(deviceId, DataConstants.SERVER_SCOPE));
- attributes.addAll(ctx.loadAttributes(deviceId, DataConstants.SHARED_SCOPE));
+ attributes = new ArrayList<>();
+ Arrays.stream(DataConstants.ALL_SCOPES).forEach(s -> attributes.addAll(ctx.loadAttributes(deviceId, s)));
}
List<String> keys = attributes.stream().map(attrKv -> attrKv.getKey()).collect(Collectors.toList());
msg.getResponseHolder().setResult(new ResponseEntity<>(keys, HttpStatus.OK));
@@ -99,9 +105,8 @@ public class TelemetryRestMsgHandler extends DefaultRestMsgHandler {
if (!StringUtils.isEmpty(scope)) {
attributes = getAttributeKvEntries(ctx, scope, deviceId, keys);
} else {
- attributes = getAttributeKvEntries(ctx, DataConstants.CLIENT_SCOPE, deviceId, keys);
- attributes.addAll(getAttributeKvEntries(ctx, DataConstants.SHARED_SCOPE, deviceId, keys));
- attributes.addAll(getAttributeKvEntries(ctx, DataConstants.SERVER_SCOPE, deviceId, keys));
+ attributes = new ArrayList<>();
+ Arrays.stream(DataConstants.ALL_SCOPES).forEach(s -> attributes.addAll(getAttributeKvEntries(ctx, s, deviceId, keys)));
}
List<AttributeData> values = attributes.stream().map(attribute -> new AttributeData(attribute.getLastUpdateTs(),
attribute.getKey(), attribute.getValue())).collect(Collectors.toList());
@@ -145,6 +150,7 @@ public class TelemetryRestMsgHandler extends DefaultRestMsgHandler {
@Override
public void onSuccess(PluginContext ctx, Void value) {
msg.getResponseHolder().setResult(new ResponseEntity<>(HttpStatus.OK));
+ subscriptionManager.onAttributesUpdateFromServer(ctx, deviceId, scope, attributes);
}
@Override
@@ -172,7 +178,8 @@ public class TelemetryRestMsgHandler extends DefaultRestMsgHandler {
DeviceId deviceId = DeviceId.fromString(pathParams[0]);
String scope = pathParams[1];
if (DataConstants.SERVER_SCOPE.equals(scope) ||
- DataConstants.SHARED_SCOPE.equals(scope)) {
+ DataConstants.SHARED_SCOPE.equals(scope) ||
+ DataConstants.CLIENT_SCOPE.equals(scope)) {
String keysParam = request.getParameter("keys");
if (!StringUtils.isEmpty(keysParam)) {
String[] keys = keysParam.split(",");
diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRpcMsgHandler.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRpcMsgHandler.java
index 06467fe..e59fa64 100644
--- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRpcMsgHandler.java
+++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRpcMsgHandler.java
@@ -19,6 +19,7 @@ import com.google.protobuf.InvalidProtocolBufferException;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.common.data.id.DeviceId;
+import org.thingsboard.server.common.data.kv.*;
import org.thingsboard.server.common.msg.cluster.ServerAddress;
import org.thingsboard.server.extensions.api.plugins.PluginContext;
import org.thingsboard.server.extensions.api.plugins.handlers.RpcMsgHandler;
@@ -42,9 +43,10 @@ public class TelemetryRpcMsgHandler implements RpcMsgHandler {
private final SubscriptionManager subscriptionManager;
private static final int SUBSCRIPTION_CLAZZ = 1;
- private static final int SUBSCRIPTION_UPDATE_CLAZZ = 2;
- private static final int SESSION_CLOSE_CLAZZ = 3;
- private static final int SUBSCRIPTION_CLOSE_CLAZZ = 4;
+ private static final int ATTRIBUTES_UPDATE_CLAZZ = 2;
+ private static final int SUBSCRIPTION_UPDATE_CLAZZ = 3;
+ private static final int SESSION_CLOSE_CLAZZ = 4;
+ private static final int SUBSCRIPTION_CLOSE_CLAZZ = 5;
@Override
public void process(PluginContext ctx, RpcMsg msg) {
@@ -55,6 +57,9 @@ public class TelemetryRpcMsgHandler implements RpcMsgHandler {
case SUBSCRIPTION_UPDATE_CLAZZ:
processRemoteSubscriptionUpdate(ctx, msg);
break;
+ case ATTRIBUTES_UPDATE_CLAZZ:
+ processAttributeUpdate(ctx, msg);
+ break;
case SESSION_CLOSE_CLAZZ:
processSessionClose(ctx, msg);
break;
@@ -76,6 +81,17 @@ public class TelemetryRpcMsgHandler implements RpcMsgHandler {
subscriptionManager.onRemoteSubscriptionUpdate(ctx, proto.getSessionId(), convert(proto));
}
+ private void processAttributeUpdate(PluginContext ctx, RpcMsg msg) {
+ AttributeUpdateProto proto;
+ try {
+ proto = AttributeUpdateProto.parseFrom(msg.getMsgData());
+ } catch (InvalidProtocolBufferException e) {
+ throw new RuntimeException(e);
+ }
+ subscriptionManager.onAttributesUpdateFromServer(ctx, DeviceId.fromString(proto.getDeviceId()), proto.getScope(),
+ proto.getDataList().stream().map(this::toAttribute).collect(Collectors.toList()));
+ }
+
private void processSubscriptionCmd(PluginContext ctx, RpcMsg msg) {
SubscriptionProto proto;
try {
@@ -167,11 +183,7 @@ public class TelemetryRpcMsgHandler implements RpcMsgHandler {
} else {
Map<String, List<Object>> data = new TreeMap<>();
proto.getDataList().forEach(v -> {
- List<Object> values = data.get(v.getKey());
- if (values == null) {
- values = new ArrayList<>();
- data.put(v.getKey(), values);
- }
+ List<Object> values = data.computeIfAbsent(v.getKey(), k -> new ArrayList<>());
for (int i = 0; i < v.getTsCount(); i++) {
Object[] value = new Object[2];
value[0] = v.getTs(i);
@@ -182,4 +194,59 @@ public class TelemetryRpcMsgHandler implements RpcMsgHandler {
return new SubscriptionUpdate(proto.getSubscriptionId(), data);
}
}
+
+ public void onAttributesUpdate(PluginContext ctx, ServerAddress address, DeviceId deviceId, String scope, List<AttributeKvEntry> attributes) {
+ ctx.sendPluginRpcMsg(new RpcMsg(address, ATTRIBUTES_UPDATE_CLAZZ, getAttributesUpdateProto(deviceId, scope, attributes).toByteArray()));
+ }
+
+ private AttributeUpdateProto getAttributesUpdateProto(DeviceId deviceId, String scope, List<AttributeKvEntry> attributes) {
+ AttributeUpdateProto.Builder builder = AttributeUpdateProto.newBuilder();
+ builder.setDeviceId(deviceId.toString());
+ builder.setScope(scope);
+ attributes.forEach(
+ attr -> {
+ AttributeUpdateValueListProto.Builder dataBuilder = AttributeUpdateValueListProto.newBuilder();
+ dataBuilder.setKey(attr.getKey());
+ dataBuilder.setTs(attr.getLastUpdateTs());
+ dataBuilder.setValueType(attr.getDataType().ordinal());
+ switch (attr.getDataType()) {
+ case BOOLEAN:
+ dataBuilder.setBoolValue(attr.getBooleanValue().get());
+ break;
+ case LONG:
+ dataBuilder.setLongValue(attr.getLongValue().get());
+ break;
+ case DOUBLE:
+ dataBuilder.setDoubleValue(attr.getDoubleValue().get());
+ break;
+ case STRING:
+ dataBuilder.setStrValue(attr.getStrValue().get());
+ break;
+ }
+ builder.addData(dataBuilder.build());
+ }
+ );
+ return builder.build();
+ }
+
+ private AttributeKvEntry toAttribute(AttributeUpdateValueListProto proto) {
+ KvEntry entry = null;
+ DataType type = DataType.values()[proto.getValueType()];
+ switch (type) {
+ case BOOLEAN:
+ entry = new BooleanDataEntry(proto.getKey(), proto.getBoolValue());
+ break;
+ case LONG:
+ entry = new LongDataEntry(proto.getKey(), proto.getLongValue());
+ break;
+ case DOUBLE:
+ entry = new DoubleDataEntry(proto.getKey(), proto.getDoubleValue());
+ break;
+ case STRING:
+ entry = new StringDataEntry(proto.getKey(), proto.getStrValue());
+ break;
+ }
+ return new BaseAttributeKvEntry(entry, proto.getTs());
+ }
+
}
diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRuleMsgHandler.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRuleMsgHandler.java
index f69d17b..f14d25d 100644
--- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRuleMsgHandler.java
+++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRuleMsgHandler.java
@@ -58,10 +58,14 @@ public class TelemetryRuleMsgHandler extends DefaultRuleMsgHandler {
ctx.reply(new ResponsePluginToRuleMsg(msg.getUid(), tenantId, ruleId, response));
}
- private List<AttributeKvEntry> getAttributeKvEntries(PluginContext ctx, DeviceId deviceId, String scope, Set<String> names) {
+ private List<AttributeKvEntry> getAttributeKvEntries(PluginContext ctx, DeviceId deviceId, String scope, Optional<Set<String>> names) {
List<AttributeKvEntry> attributes;
- if (!names.isEmpty()) {
- attributes = ctx.loadAttributes(deviceId, scope, new ArrayList<>(names));
+ if (names.isPresent()) {
+ if (!names.get().isEmpty()) {
+ attributes = ctx.loadAttributes(deviceId, scope, new ArrayList<>(names.get()));
+ } else {
+ attributes = ctx.loadAttributes(deviceId, scope);
+ }
} else {
attributes = Collections.emptyList();
}
diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryWebsocketMsgHandler.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryWebsocketMsgHandler.java
index 8e2d62a..f268dd8 100644
--- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryWebsocketMsgHandler.java
+++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryWebsocketMsgHandler.java
@@ -104,7 +104,13 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler {
SubscriptionState sub;
if (keysOptional.isPresent()) {
List<String> keys = new ArrayList<>(keysOptional.get());
- List<AttributeKvEntry> data = ctx.loadAttributes(deviceId, DataConstants.CLIENT_SCOPE, keys);
+ List<AttributeKvEntry> data = new ArrayList<>();
+ if (StringUtils.isEmpty(cmd.getScope())) {
+ Arrays.stream(DataConstants.ALL_SCOPES).forEach(s -> data.addAll(ctx.loadAttributes(deviceId, s, keys)));
+ } else {
+ data.addAll(ctx.loadAttributes(deviceId, cmd.getScope(), keys));
+ }
+
List<TsKvEntry> attributesData = data.stream().map(d -> new BasicTsKvEntry(d.getLastUpdateTs(), d)).collect(Collectors.toList());
sendWsMsg(ctx, sessionRef, new SubscriptionUpdate(cmd.getCmdId(), attributesData));
@@ -114,7 +120,12 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler {
sub = new SubscriptionState(sessionId, cmd.getCmdId(), deviceId, SubscriptionType.ATTRIBUTES, false, subState);
} else {
- List<AttributeKvEntry> data = ctx.loadAttributes(deviceId, DataConstants.CLIENT_SCOPE);
+ List<AttributeKvEntry> data = new ArrayList<>();
+ if (StringUtils.isEmpty(cmd.getScope())) {
+ Arrays.stream(DataConstants.ALL_SCOPES).forEach(s -> data.addAll(ctx.loadAttributes(deviceId, s)));
+ } else {
+ data.addAll(ctx.loadAttributes(deviceId, cmd.getScope()));
+ }
List<TsKvEntry> attributesData = data.stream().map(d -> new BasicTsKvEntry(d.getLastUpdateTs(), d)).collect(Collectors.toList());
sendWsMsg(ctx, sessionRef, new SubscriptionUpdate(cmd.getCmdId(), attributesData));
diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/SubscriptionManager.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/SubscriptionManager.java
index 190d9ff..07c9629 100644
--- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/SubscriptionManager.java
+++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/SubscriptionManager.java
@@ -175,6 +175,23 @@ public class SubscriptionManager {
}
}
+ public void onAttributesUpdateFromServer(PluginContext ctx, DeviceId deviceId, String scope, List<AttributeKvEntry> attributes) {
+ Optional<ServerAddress> serverAddress = ctx.resolve(deviceId);
+ if (!serverAddress.isPresent()) {
+ onLocalSubscriptionUpdate(ctx, deviceId, SubscriptionType.ATTRIBUTES, s -> {
+ List<TsKvEntry> subscriptionUpdate = new ArrayList<TsKvEntry>();
+ for (AttributeKvEntry kv : attributes) {
+ if (s.isAllKeys() || s.getKeyStates().containsKey(kv.getKey())) {
+ subscriptionUpdate.add(new BasicTsKvEntry(kv.getLastUpdateTs(), kv));
+ }
+ }
+ return subscriptionUpdate;
+ });
+ } else {
+ rpcHandler.onAttributesUpdate(ctx, serverAddress.get(), deviceId, scope, attributes);
+ }
+ }
+
private void updateSubscriptionState(String sessionId, Subscription subState, SubscriptionUpdate update) {
log.trace("[{}] updating subscription state {} using onUpdate {}", sessionId, subState, update);
update.getLatestValues().entrySet().forEach(e -> subState.setKeyState(e.getKey(), e.getValue()));
diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/TelemetryStoragePlugin.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/TelemetryStoragePlugin.java
index 63d145d..8668639 100644
--- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/TelemetryStoragePlugin.java
+++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/TelemetryStoragePlugin.java
@@ -43,7 +43,7 @@ public class TelemetryStoragePlugin extends AbstractPlugin<EmptyComponentConfigu
public TelemetryStoragePlugin() {
this.subscriptionManager = new SubscriptionManager();
- this.restMsgHandler = new TelemetryRestMsgHandler();
+ this.restMsgHandler = new TelemetryRestMsgHandler(subscriptionManager);
this.ruleMsgHandler = new TelemetryRuleMsgHandler(subscriptionManager);
this.websocketMsgHandler = new TelemetryWebsocketMsgHandler(subscriptionManager);
this.rpcMsgHandler = new TelemetryRpcMsgHandler(subscriptionManager);
diff --git a/extensions-core/src/main/proto/telemetry.proto b/extensions-core/src/main/proto/telemetry.proto
index 5c7d7a4..60e40f7 100644
--- a/extensions-core/src/main/proto/telemetry.proto
+++ b/extensions-core/src/main/proto/telemetry.proto
@@ -36,6 +36,12 @@ message SubscriptionUpdateProto {
repeated SubscriptionUpdateValueListProto data = 5;
}
+message AttributeUpdateProto {
+ string deviceId = 1;
+ string scope = 2;
+ repeated AttributeUpdateValueListProto data = 3;
+}
+
message SessionCloseProto {
string sessionId = 1;
}
@@ -54,4 +60,14 @@ message SubscriptionUpdateValueListProto {
string key = 1;
repeated int64 ts = 2;
repeated string value = 3;
+}
+
+message AttributeUpdateValueListProto {
+ string key = 1;
+ int64 ts = 2;
+ int32 valueType = 3;
+ string strValue = 4;
+ int64 longValue = 5;
+ double doubleValue = 6;
+ bool boolValue = 7;
}
\ No newline at end of file
diff --git a/transport/coap/src/main/java/org/thingsboard/server/transport/coap/adaptors/JsonCoapAdaptor.java b/transport/coap/src/main/java/org/thingsboard/server/transport/coap/adaptors/JsonCoapAdaptor.java
index 26a9056..a9c6086 100644
--- a/transport/coap/src/main/java/org/thingsboard/server/transport/coap/adaptors/JsonCoapAdaptor.java
+++ b/transport/coap/src/main/java/org/thingsboard/server/transport/coap/adaptors/JsonCoapAdaptor.java
@@ -167,17 +167,13 @@ public class JsonCoapAdaptor implements CoapTransportAdaptor {
private FromDeviceMsg convertToGetAttributesRequest(SessionContext ctx, Request inbound) throws AdaptorException {
List<String> queryElements = inbound.getOptions().getUriQuery();
- if (queryElements == null || queryElements.size() == 0) {
- log.warn("[{}] Query is empty!", ctx.getSessionId());
- throw new AdaptorException(new IllegalArgumentException("Query is empty!"));
- }
-
- Set<String> clientKeys = toKeys(ctx, queryElements, "clientKeys");
- Set<String> sharedKeys = toKeys(ctx, queryElements, "sharedKeys");
- if (clientKeys.isEmpty() && sharedKeys.isEmpty()) {
- throw new AdaptorException("No clientKeys and serverKeys parameters!");
+ if (queryElements != null || queryElements.size() > 0) {
+ Set<String> clientKeys = toKeys(ctx, queryElements, "clientKeys");
+ Set<String> sharedKeys = toKeys(ctx, queryElements, "sharedKeys");
+ return new BasicGetAttributesRequest(0, clientKeys, sharedKeys);
+ } else {
+ return new BasicGetAttributesRequest(0);
}
- return new BasicGetAttributesRequest(0, clientKeys, sharedKeys);
}
private Set<String> toKeys(SessionContext ctx, List<String> queryElements, String attributeName) throws AdaptorException {
@@ -191,7 +187,7 @@ public class JsonCoapAdaptor implements CoapTransportAdaptor {
if (!StringUtils.isEmpty(keys)) {
return new HashSet<>(Arrays.asList(keys.split(",")));
} else {
- return Collections.emptySet();
+ return null;
}
}
diff --git a/transport/coap/src/test/java/org/thingsboard/server/transport/coap/CoapServerTest.java b/transport/coap/src/test/java/org/thingsboard/server/transport/coap/CoapServerTest.java
index a2d6c25..ceb6813 100644
--- a/transport/coap/src/test/java/org/thingsboard/server/transport/coap/CoapServerTest.java
+++ b/transport/coap/src/test/java/org/thingsboard/server/transport/coap/CoapServerTest.java
@@ -182,7 +182,7 @@ public class CoapServerTest {
public void testNoKeysAttributesGetRequest() {
CoapClient client = new CoapClient(getBaseTestUrl() + DEVICE1_TOKEN + "/" + FeatureType.ATTRIBUTES.name().toLowerCase() + "?data=key1,key2");
CoapResponse response = client.setTimeout(6000).get();
- Assert.assertEquals(ResponseCode.BAD_REQUEST, response.getCode());
+ Assert.assertEquals(ResponseCode.CONTENT, response.getCode());
}
@Test
diff --git a/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java b/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java
index e3e0666..e815bc5 100644
--- a/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java
+++ b/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java
@@ -38,6 +38,7 @@ import org.thingsboard.server.common.transport.auth.DeviceAuthService;
import org.thingsboard.server.transport.http.session.HttpSessionCtx;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
@@ -60,20 +61,22 @@ public class DeviceApiController {
@RequestMapping(value = "/{deviceToken}/attributes", method = RequestMethod.GET, produces = "application/json")
public DeferredResult<ResponseEntity> getDeviceAttributes(@PathVariable("deviceToken") String deviceToken,
- @RequestParam(value = "clientKeys", required = false) String clientKeys,
- @RequestParam(value = "sharedKeys", required = false) String sharedKeys) {
+ @RequestParam(value = "clientKeys", required = false, defaultValue = "") String clientKeys,
+ @RequestParam(value = "sharedKeys", required = false, defaultValue = "") String sharedKeys) {
DeferredResult<ResponseEntity> responseWriter = new DeferredResult<ResponseEntity>();
- if (StringUtils.isEmpty(clientKeys) && StringUtils.isEmpty(sharedKeys)) {
- responseWriter.setResult(new ResponseEntity<>(HttpStatus.BAD_REQUEST));
- } else {
- HttpSessionCtx ctx = getHttpSessionCtx(responseWriter);
- if (ctx.login(new DeviceTokenCredentials(deviceToken))) {
- Set<String> clientKeySet = new HashSet<>(Arrays.asList(clientKeys.split(",")));
- Set<String> sharedKeySet = new HashSet<>(Arrays.asList(clientKeys.split(",")));
- process(ctx, new BasicGetAttributesRequest(0, clientKeySet, sharedKeySet));
+ HttpSessionCtx ctx = getHttpSessionCtx(responseWriter);
+ if (ctx.login(new DeviceTokenCredentials(deviceToken))) {
+ GetAttributesRequest request;
+ if (StringUtils.isEmpty(clientKeys) && StringUtils.isEmpty(sharedKeys)) {
+ request = new BasicGetAttributesRequest(0);
} else {
- responseWriter.setResult(new ResponseEntity<>(HttpStatus.UNAUTHORIZED));
+ Set<String> clientKeySet = !StringUtils.isEmpty(clientKeys) ? new HashSet<>(Arrays.asList(clientKeys.split(","))) : null;
+ Set<String> sharedKeySet = !StringUtils.isEmpty(sharedKeys) ? new HashSet<>(Arrays.asList(sharedKeys.split(","))) : null;
+ request = new BasicGetAttributesRequest(0, clientKeySet, sharedKeySet);
}
+ process(ctx, request);
+ } else {
+ responseWriter.setResult(new ResponseEntity<>(HttpStatus.UNAUTHORIZED));
}
return responseWriter;
diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttAdaptor.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttAdaptor.java
index 5af244a..e84e848 100644
--- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttAdaptor.java
+++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttAdaptor.java
@@ -162,8 +162,13 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor {
Integer requestId = Integer.valueOf(topicName.substring(MqttTransportHandler.ATTRIBUTES_REQUEST_TOPIC_PREFIX.length()));
String payload = inbound.payload().toString(UTF8);
JsonElement requestBody = new JsonParser().parse(payload);
- return new BasicGetAttributesRequest(requestId,
- toStringSet(requestBody, "clientKeys"), toStringSet(requestBody, "sharedKeys"));
+ Set<String> clientKeys = toStringSet(requestBody, "clientKeys");
+ Set<String> sharedKeys = toStringSet(requestBody, "sharedKeys");
+ if (clientKeys == null && sharedKeys == null) {
+ return new BasicGetAttributesRequest(requestId);
+ } else {
+ return new BasicGetAttributesRequest(requestId, clientKeys, sharedKeys);
+ }
} catch (RuntimeException e) {
log.warn("Failed to decode get attributes request", e);
throw new AdaptorException(e);
@@ -189,7 +194,7 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor {
if (element != null) {
return new HashSet<>(Arrays.asList(element.getAsString().split(",")));
} else {
- return Collections.emptySet();
+ return null;
}
}
ui/src/app/api/device.service.js 3(+2 -1)
diff --git a/ui/src/app/api/device.service.js b/ui/src/app/api/device.service.js
index cf38d12..27cf605 100644
--- a/ui/src/app/api/device.service.js
+++ b/ui/src/app/api/device.service.js
@@ -293,7 +293,8 @@ function DeviceService($http, $q, $filter, telemetryWebsocketService, types) {
var deviceAttributesSubscription = deviceAttributesSubscriptionMap[subscriptionId];
if (!deviceAttributesSubscription) {
var subscriptionCommand = {
- deviceId: deviceId
+ deviceId: deviceId,
+ scope: attributeScope
};
var type = attributeScope === types.latestTelemetry.value ?