diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
index 185b7a8..091b45d 100644
--- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
+++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
@@ -51,6 +51,8 @@ import javax.security.cert.X509Certificate;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.*;
import static io.netty.handler.codec.mqtt.MqttMessageType.*;
@@ -79,6 +81,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
private volatile InetSocketAddress address;
private volatile GatewaySessionCtx gatewaySessionCtx;
+ private Map<String,MqttQoS> mqttQoSMap = new ConcurrentHashMap<>();
+
public MqttTransportHandler(SessionMsgProcessor processor, DeviceService deviceService, DeviceAuthService authService, RelationService relationService,
MqttTransportAdaptor adaptor, SslHandler sslHandler, QuotaService quotaService) {
this.processor = processor;
@@ -228,6 +232,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
String topicName = subscription.topicName();
//TODO: handle this qos level.
MqttQoS reqQoS = subscription.qualityOfService();
+ mqttQoSMap.put(topicName, reqQoS);
try {
if (topicName.equals(DEVICE_ATTRIBUTES_TOPIC)) {
AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, SUBSCRIBE_ATTRIBUTES_REQUEST, mqttMsg);
@@ -244,6 +249,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
grantedQoSList.add(getMinSupportedQos(reqQoS));
} else if (topicName.equals(GATEWAY_ATTRIBUTES_TOPIC)) {
grantedQoSList.add(getMinSupportedQos(reqQoS));
+ }else if (topicName.equals(GATEWAY_RPC_TOPIC)) {
+ grantedQoSList.add(getMinSupportedQos(reqQoS));
} else {
log.warn("[{}] Failed to subscribe to [{}][{}]", sessionId, topicName, reqQoS);
grantedQoSList.add(FAILURE.value());
@@ -262,6 +269,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
}
log.trace("[{}] Processing subscription [{}]!", sessionId, mqttMsg.variableHeader().messageId());
for (String topicName : mqttMsg.payload().topics()) {
+ mqttQoSMap.remove(topicName);
try {
if (topicName.equals(DEVICE_ATTRIBUTES_TOPIC)) {
AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, UNSUBSCRIBE_ATTRIBUTES_REQUEST, mqttMsg);