thingsboard-aplcache

refactoring mqtt

8/30/2018 3:46:41 AM

Details

diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
index 185b7a8..091b45d 100644
--- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
+++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
@@ -51,6 +51,8 @@ import javax.security.cert.X509Certificate;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.*;
 import static io.netty.handler.codec.mqtt.MqttMessageType.*;
@@ -79,6 +81,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
     private volatile InetSocketAddress address;
     private volatile GatewaySessionCtx gatewaySessionCtx;
 
+    private Map<String,MqttQoS> mqttQoSMap = new ConcurrentHashMap<>();
+
     public MqttTransportHandler(SessionMsgProcessor processor, DeviceService deviceService, DeviceAuthService authService, RelationService relationService,
                                 MqttTransportAdaptor adaptor, SslHandler sslHandler, QuotaService quotaService) {
         this.processor = processor;
@@ -228,6 +232,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
             String topicName = subscription.topicName();
             //TODO: handle this qos level.
             MqttQoS reqQoS = subscription.qualityOfService();
+            mqttQoSMap.put(topicName, reqQoS);
             try {
                 if (topicName.equals(DEVICE_ATTRIBUTES_TOPIC)) {
                     AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, SUBSCRIBE_ATTRIBUTES_REQUEST, mqttMsg);
@@ -244,6 +249,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
                     grantedQoSList.add(getMinSupportedQos(reqQoS));
                 } else if (topicName.equals(GATEWAY_ATTRIBUTES_TOPIC)) {
                     grantedQoSList.add(getMinSupportedQos(reqQoS));
+                }else if (topicName.equals(GATEWAY_RPC_TOPIC)) {
+                    grantedQoSList.add(getMinSupportedQos(reqQoS));
                 } else {
                     log.warn("[{}] Failed to subscribe to [{}][{}]", sessionId, topicName, reqQoS);
                     grantedQoSList.add(FAILURE.value());
@@ -262,6 +269,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
         }
         log.trace("[{}] Processing subscription [{}]!", sessionId, mqttMsg.variableHeader().messageId());
         for (String topicName : mqttMsg.payload().topics()) {
+            mqttQoSMap.remove(topicName);
             try {
                 if (topicName.equals(DEVICE_ATTRIBUTES_TOPIC)) {
                     AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, UNSUBSCRIBE_ATTRIBUTES_REQUEST, mqttMsg);