diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionCtx.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionCtx.java
index 7646131..7bed03a 100644
--- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionCtx.java
+++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionCtx.java
@@ -16,6 +16,7 @@
package org.thingsboard.server.transport.mqtt.session;
import com.google.gson.Gson;
+import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import io.netty.buffer.ByteBuf;
@@ -24,6 +25,7 @@ import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.handler.codec.mqtt.*;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.id.SessionId;
+import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.KvEntry;
import org.thingsboard.server.common.msg.core.*;
import org.thingsboard.server.common.msg.kv.AttributesKVMsg;
@@ -35,6 +37,7 @@ import org.thingsboard.server.transport.mqtt.MqttTopics;
import org.thingsboard.server.transport.mqtt.MqttTransportHandler;
import java.nio.charset.Charset;
+import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
@@ -135,40 +138,43 @@ public class GatewayDeviceSessionCtx extends DeviceAwareSessionContext {
if (responseData.isPresent()) {
AttributesKVMsg msg = responseData.get();
if (msg.getClientAttributes() != null) {
- msg.getClientAttributes().forEach(v -> addValueToJson(result, "value", v));
+ addValues(result, msg.getClientAttributes());
}
if (msg.getSharedAttributes() != null) {
- msg.getSharedAttributes().forEach(v -> addValueToJson(result, "value", v));
+ addValues(result, msg.getSharedAttributes());
}
}
return createMqttPublishMsg(topic, result);
}
+ private void addValues(JsonObject result, List<AttributeKvEntry> kvList) {
+ if (kvList.size() == 1) {
+ addValueToJson(result, "value", kvList.get(0));
+ } else {
+ JsonObject values;
+ if (result.has("values")) {
+ values = result.get("values").getAsJsonObject();
+ } else {
+ values = new JsonObject();
+ result.add("values", values);
+ }
+ kvList.forEach(value -> addValueToJson(values, value.getKey(), value));
+ }
+ }
+
private void addValueToJson(JsonObject json, String name, KvEntry entry) {
switch (entry.getDataType()) {
case BOOLEAN:
- Optional<Boolean> booleanValue = entry.getBooleanValue();
- if (booleanValue.isPresent()) {
- json.addProperty(name, booleanValue.get());
- }
+ entry.getBooleanValue().ifPresent(aBoolean -> json.addProperty(name, aBoolean));
break;
case STRING:
- Optional<String> stringValue = entry.getStrValue();
- if (stringValue.isPresent()) {
- json.addProperty(name, stringValue.get());
- }
+ entry.getStrValue().ifPresent(aString -> json.addProperty(name, aString));
break;
case DOUBLE:
- Optional<Double> doubleValue = entry.getDoubleValue();
- if (doubleValue.isPresent()) {
- json.addProperty(name, doubleValue.get());
- }
+ entry.getDoubleValue().ifPresent(aDouble -> json.addProperty(name, aDouble));
break;
case LONG:
- Optional<Long> longValue = entry.getLongValue();
- if (longValue.isPresent()) {
- json.addProperty(name, longValue.get());
- }
+ entry.getLongValue().ifPresent(aLong -> json.addProperty(name, aLong));
break;
}
}
diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java
index 00d1f0c..5bccf49 100644
--- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java
+++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java
@@ -41,10 +41,7 @@ import org.thingsboard.server.dao.relation.RelationService;
import org.thingsboard.server.transport.mqtt.MqttTransportHandler;
import org.thingsboard.server.transport.mqtt.adaptors.JsonMqttAdaptor;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
+import java.util.*;
import java.util.stream.Collectors;
import static org.thingsboard.server.transport.mqtt.adaptors.JsonMqttAdaptor.validateJsonPayload;
@@ -193,13 +190,22 @@ public class GatewaySessionCtx {
int requestId = jsonObj.get("id").getAsInt();
String deviceName = jsonObj.get(DEVICE_PROPERTY).getAsString();
boolean clientScope = jsonObj.get("client").getAsBoolean();
- String key = jsonObj.get("key").getAsString();
+ Set<String> keys;
+ if (jsonObj.has("key")) {
+ keys = Collections.singleton(jsonObj.get("key").getAsString());
+ } else {
+ JsonArray keysArray = jsonObj.get("keys").getAsJsonArray();
+ keys = new HashSet<>();
+ for (JsonElement keyObj : keysArray) {
+ keys.add(keyObj.getAsString());
+ }
+ }
BasicGetAttributesRequest request;
if (clientScope) {
- request = new BasicGetAttributesRequest(requestId, Collections.singleton(key), null);
+ request = new BasicGetAttributesRequest(requestId, keys, null);
} else {
- request = new BasicGetAttributesRequest(requestId, null, Collections.singleton(key));
+ request = new BasicGetAttributesRequest(requestId, null, keys);
}
GatewayDeviceSessionCtx deviceSessionCtx = devices.get(deviceName);
processor.process(new BasicToDeviceActorSessionMsg(deviceSessionCtx.getDevice(),
@@ -251,7 +257,7 @@ public class GatewaySessionCtx {
}
private void ack(MqttPublishMessage msg) {
- if(msg.variableHeader().messageId() > 0) {
+ if (msg.variableHeader().messageId() > 0) {
writeAndFlush(MqttTransportHandler.createMqttPubAckMsg(msg.variableHeader().messageId()));
}
}