thingsboard-aplcache

Mqtt Sub QoS improvement

8/30/2018 4:34:09 AM

Details

diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/session/DeviceAwareSessionContext.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/session/DeviceAwareSessionContext.java
index f0fb51e..2e7b412 100644
--- a/common/transport/src/main/java/org/thingsboard/server/common/transport/session/DeviceAwareSessionContext.java
+++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/session/DeviceAwareSessionContext.java
@@ -73,4 +73,6 @@ public abstract class DeviceAwareSessionContext implements SessionContext {
     public Device getDevice() {
         return device;
     }
+
+
 }
diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttAdaptor.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttAdaptor.java
index f0b29cb..c8baaf7 100644
--- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttAdaptor.java
+++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttAdaptor.java
@@ -170,7 +170,7 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor {
 
     private MqttPublishMessage createMqttPublishMsg(DeviceSessionCtx ctx, String topic, JsonElement json) {
         MqttFixedHeader mqttFixedHeader =
-                new MqttFixedHeader(MqttMessageType.PUBLISH, false, MqttQoS.AT_LEAST_ONCE, false, 0);
+                new MqttFixedHeader(MqttMessageType.PUBLISH, false, ctx.getQoSForTopic(topic), false, 0);
         MqttPublishVariableHeader header = new MqttPublishVariableHeader(topic, ctx.nextMsgId());
         ByteBuf payload = ALLOCATOR.buffer();
         payload.writeBytes(GSON.toJson(json).getBytes(UTF8));
diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
index 091b45d..5effd2b 100644
--- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
+++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
@@ -53,6 +53,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 
 import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.*;
 import static io.netty.handler.codec.mqtt.MqttMessageType.*;
@@ -77,12 +78,12 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
     private final RelationService relationService;
     private final QuotaService quotaService;
     private final SslHandler sslHandler;
+    private final ConcurrentMap<String, Integer> mqttQoSMap;
+
     private volatile boolean connected;
     private volatile InetSocketAddress address;
     private volatile GatewaySessionCtx gatewaySessionCtx;
 
-    private Map<String,MqttQoS> mqttQoSMap = new ConcurrentHashMap<>();
-
     public MqttTransportHandler(SessionMsgProcessor processor, DeviceService deviceService, DeviceAuthService authService, RelationService relationService,
                                 MqttTransportAdaptor adaptor, SslHandler sslHandler, QuotaService quotaService) {
         this.processor = processor;
@@ -90,7 +91,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
         this.relationService = relationService;
         this.authService = authService;
         this.adaptor = adaptor;
-        this.deviceSessionCtx = new DeviceSessionCtx(processor, authService, adaptor);
+        this.mqttQoSMap = new ConcurrentHashMap<>();
+        this.deviceSessionCtx = new DeviceSessionCtx(processor, authService, adaptor, mqttQoSMap);
         this.sessionId = deviceSessionCtx.getSessionId().toUidStr();
         this.sslHandler = sslHandler;
         this.quotaService = quotaService;
@@ -170,18 +172,25 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
 
     private void handleMqttPublishMsg(String topicName, int msgId, MqttPublishMessage mqttMsg) {
         try {
-            if (topicName.equals(GATEWAY_TELEMETRY_TOPIC)) {
-                gatewaySessionCtx.onDeviceTelemetry(mqttMsg);
-            } else if (topicName.equals(GATEWAY_ATTRIBUTES_TOPIC)) {
-                gatewaySessionCtx.onDeviceAttributes(mqttMsg);
-            } else if (topicName.equals(GATEWAY_ATTRIBUTES_REQUEST_TOPIC)) {
-                gatewaySessionCtx.onDeviceAttributesRequest(mqttMsg);
-            } else if (topicName.equals(GATEWAY_RPC_TOPIC)) {
-                gatewaySessionCtx.onDeviceRpcResponse(mqttMsg);
-            } else if (topicName.equals(GATEWAY_CONNECT_TOPIC)) {
-                gatewaySessionCtx.onDeviceConnect(mqttMsg);
-            } else if (topicName.equals(GATEWAY_DISCONNECT_TOPIC)) {
-                gatewaySessionCtx.onDeviceDisconnect(mqttMsg);
+            switch (topicName) {
+                case GATEWAY_TELEMETRY_TOPIC:
+                    gatewaySessionCtx.onDeviceTelemetry(mqttMsg);
+                    break;
+                case GATEWAY_ATTRIBUTES_TOPIC:
+                    gatewaySessionCtx.onDeviceAttributes(mqttMsg);
+                    break;
+                case GATEWAY_ATTRIBUTES_REQUEST_TOPIC:
+                    gatewaySessionCtx.onDeviceAttributesRequest(mqttMsg);
+                    break;
+                case GATEWAY_RPC_TOPIC:
+                    gatewaySessionCtx.onDeviceRpcResponse(mqttMsg);
+                    break;
+                case GATEWAY_CONNECT_TOPIC:
+                    gatewaySessionCtx.onDeviceConnect(mqttMsg);
+                    break;
+                case GATEWAY_DISCONNECT_TOPIC:
+                    gatewaySessionCtx.onDeviceDisconnect(mqttMsg);
+                    break;
             }
         } catch (RuntimeException | AdaptorException e) {
             log.warn("[{}] Failed to process publish msg [{}][{}]", sessionId, topicName, msgId, e);
@@ -229,40 +238,53 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
         log.trace("[{}] Processing subscription [{}]!", sessionId, mqttMsg.variableHeader().messageId());
         List<Integer> grantedQoSList = new ArrayList<>();
         for (MqttTopicSubscription subscription : mqttMsg.payload().topicSubscriptions()) {
-            String topicName = subscription.topicName();
-            //TODO: handle this qos level.
+            String topic = subscription.topicName();
             MqttQoS reqQoS = subscription.qualityOfService();
-            mqttQoSMap.put(topicName, reqQoS);
             try {
-                if (topicName.equals(DEVICE_ATTRIBUTES_TOPIC)) {
-                    AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, SUBSCRIBE_ATTRIBUTES_REQUEST, mqttMsg);
-                    processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg));
-                    grantedQoSList.add(getMinSupportedQos(reqQoS));
-                } else if (topicName.equals(DEVICE_RPC_REQUESTS_SUB_TOPIC)) {
-                    AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, SUBSCRIBE_RPC_COMMANDS_REQUEST, mqttMsg);
-                    processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg));
-                    grantedQoSList.add(getMinSupportedQos(reqQoS));
-                } else if (topicName.equals(DEVICE_RPC_RESPONSE_SUB_TOPIC)) {
-                    grantedQoSList.add(getMinSupportedQos(reqQoS));
-                } else if (topicName.equals(DEVICE_ATTRIBUTES_RESPONSES_TOPIC)) {
-                    deviceSessionCtx.setAllowAttributeResponses();
-                    grantedQoSList.add(getMinSupportedQos(reqQoS));
-                } else if (topicName.equals(GATEWAY_ATTRIBUTES_TOPIC)) {
-                    grantedQoSList.add(getMinSupportedQos(reqQoS));
-                }else if (topicName.equals(GATEWAY_RPC_TOPIC)) {
-                    grantedQoSList.add(getMinSupportedQos(reqQoS));
-                } else {
-                    log.warn("[{}] Failed to subscribe to [{}][{}]", sessionId, topicName, reqQoS);
-                    grantedQoSList.add(FAILURE.value());
+                switch (topic) {
+                    case DEVICE_ATTRIBUTES_TOPIC: {
+                        AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, SUBSCRIBE_ATTRIBUTES_REQUEST, mqttMsg);
+                        processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg));
+                        registerSubQoS(topic, grantedQoSList, reqQoS);
+                        break;
+                    }
+                    case DEVICE_RPC_REQUESTS_SUB_TOPIC: {
+                        AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, SUBSCRIBE_RPC_COMMANDS_REQUEST, mqttMsg);
+                        processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg));
+                        registerSubQoS(topic, grantedQoSList, reqQoS);
+                        break;
+                    }
+                    case DEVICE_RPC_RESPONSE_SUB_TOPIC:
+                        registerSubQoS(topic, grantedQoSList, reqQoS);
+                        break;
+                    case DEVICE_ATTRIBUTES_RESPONSES_TOPIC:
+                        deviceSessionCtx.setAllowAttributeResponses();
+                        registerSubQoS(topic, grantedQoSList, reqQoS);
+                        break;
+                    case GATEWAY_ATTRIBUTES_TOPIC:
+                        registerSubQoS(topic, grantedQoSList, reqQoS);
+                        break;
+                    case GATEWAY_RPC_TOPIC:
+                        registerSubQoS(topic, grantedQoSList, reqQoS);
+                        break;
+                    default:
+                        log.warn("[{}] Failed to subscribe to [{}][{}]", sessionId, topic, reqQoS);
+                        grantedQoSList.add(FAILURE.value());
+                        break;
                 }
             } catch (AdaptorException e) {
-                log.warn("[{}] Failed to subscribe to [{}][{}]", sessionId, topicName, reqQoS);
+                log.warn("[{}] Failed to subscribe to [{}][{}]", sessionId, topic, reqQoS);
                 grantedQoSList.add(FAILURE.value());
             }
         }
         ctx.writeAndFlush(createSubAckMessage(mqttMsg.variableHeader().messageId(), grantedQoSList));
     }
 
+    private void registerSubQoS(String topic, List<Integer> grantedQoSList, MqttQoS reqQoS) {
+        grantedQoSList.add(getMinSupportedQos(reqQoS));
+        mqttQoSMap.put(topic, getMinSupportedQos(reqQoS));
+    }
+
     private void processUnsubscribe(ChannelHandlerContext ctx, MqttUnsubscribeMessage mqttMsg) {
         if (!checkConnected(ctx)) {
             return;
@@ -271,14 +293,20 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
         for (String topicName : mqttMsg.payload().topics()) {
             mqttQoSMap.remove(topicName);
             try {
-                if (topicName.equals(DEVICE_ATTRIBUTES_TOPIC)) {
-                    AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, UNSUBSCRIBE_ATTRIBUTES_REQUEST, mqttMsg);
-                    processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg));
-                } else if (topicName.equals(DEVICE_RPC_REQUESTS_SUB_TOPIC)) {
-                    AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, UNSUBSCRIBE_RPC_COMMANDS_REQUEST, mqttMsg);
-                    processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg));
-                } else if (topicName.equals(DEVICE_ATTRIBUTES_RESPONSES_TOPIC)) {
-                    deviceSessionCtx.setDisallowAttributeResponses();
+                switch (topicName) {
+                    case DEVICE_ATTRIBUTES_TOPIC: {
+                        AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, UNSUBSCRIBE_ATTRIBUTES_REQUEST, mqttMsg);
+                        processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg));
+                        break;
+                    }
+                    case DEVICE_RPC_REQUESTS_SUB_TOPIC: {
+                        AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, UNSUBSCRIBE_RPC_COMMANDS_REQUEST, mqttMsg);
+                        processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg));
+                        break;
+                    }
+                    case DEVICE_ATTRIBUTES_RESPONSES_TOPIC:
+                        deviceSessionCtx.setDisallowAttributeResponses();
+                        break;
                 }
             } catch (AdaptorException e) {
                 log.warn("[{}] Failed to process unsubscription [{}] to [{}]", sessionId, mqttMsg.variableHeader().messageId(), topicName);
diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/DeviceSessionCtx.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/DeviceSessionCtx.java
index 9367a04..3dbb3ef 100644
--- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/DeviceSessionCtx.java
+++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/DeviceSessionCtx.java
@@ -30,13 +30,15 @@ import org.thingsboard.server.common.transport.auth.DeviceAuthService;
 import org.thingsboard.server.common.transport.session.DeviceAwareSessionContext;
 import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor;
 
+import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * @author Andrew Shvayka
  */
 @Slf4j
-public class DeviceSessionCtx extends DeviceAwareSessionContext {
+public class DeviceSessionCtx extends MqttDeviceAwareSessionContext {
 
     private final MqttTransportAdaptor adaptor;
     private final MqttSessionId sessionId;
@@ -44,8 +46,8 @@ public class DeviceSessionCtx extends DeviceAwareSessionContext {
     private volatile boolean allowAttributeResponses;
     private AtomicInteger msgIdSeq = new AtomicInteger(0);
 
-    public DeviceSessionCtx(SessionMsgProcessor processor, DeviceAuthService authService, MqttTransportAdaptor adaptor) {
-        super(processor, authService);
+    public DeviceSessionCtx(SessionMsgProcessor processor, DeviceAuthService authService, MqttTransportAdaptor adaptor, ConcurrentMap<String, Integer> mqttQoSMap) {
+        super(processor, authService, mqttQoSMap);
         this.adaptor = adaptor;
         this.sessionId = new MqttSessionId();
     }
diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionCtx.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionCtx.java
index dd9c921..e5be5c7 100644
--- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionCtx.java
+++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionCtx.java
@@ -38,13 +38,15 @@ import org.thingsboard.server.transport.mqtt.MqttTransportHandler;
 
 import java.nio.charset.Charset;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * Created by ashvayka on 19.01.17.
  */
-public class GatewayDeviceSessionCtx extends DeviceAwareSessionContext {
+public class GatewayDeviceSessionCtx extends MqttDeviceAwareSessionContext {
 
     private static final Gson GSON = new Gson();
     private static final Charset UTF8 = Charset.forName("UTF-8");
@@ -56,8 +58,8 @@ public class GatewayDeviceSessionCtx extends DeviceAwareSessionContext {
     private volatile boolean closed;
     private AtomicInteger msgIdSeq = new AtomicInteger(0);
 
-    public GatewayDeviceSessionCtx(GatewaySessionCtx parent, Device device) {
-        super(parent.getProcessor(), parent.getAuthService(), device);
+    public GatewayDeviceSessionCtx(GatewaySessionCtx parent, Device device, ConcurrentMap<String, Integer> mqttQoSMap) {
+        super(parent.getProcessor(), parent.getAuthService(), device, mqttQoSMap);
         this.parent = parent;
         this.sessionId = new MqttSessionId();
     }
@@ -195,7 +197,7 @@ public class GatewayDeviceSessionCtx extends DeviceAwareSessionContext {
 
     private MqttPublishMessage createMqttPublishMsg(String topic, JsonElement json) {
         MqttFixedHeader mqttFixedHeader =
-                new MqttFixedHeader(MqttMessageType.PUBLISH, false, MqttQoS.AT_LEAST_ONCE, false, 0);
+                new MqttFixedHeader(MqttMessageType.PUBLISH, false, getQoSForTopic(topic), false, 0);
         MqttPublishVariableHeader header = new MqttPublishVariableHeader(topic, msgIdSeq.incrementAndGet());
         ByteBuf payload = ALLOCATOR.buffer();
         payload.writeBytes(GSON.toJson(json).getBytes(UTF8));
diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java
index 69ed17f..98ad6d2 100644
--- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java
+++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java
@@ -43,6 +43,7 @@ import org.thingsboard.server.transport.mqtt.MqttTransportHandler;
 import org.thingsboard.server.transport.mqtt.adaptors.JsonMqttAdaptor;
 
 import java.util.*;
+import java.util.concurrent.ConcurrentMap;
 import java.util.stream.Collectors;
 
 import static org.thingsboard.server.transport.mqtt.adaptors.JsonMqttAdaptor.validateJsonPayload;
@@ -63,6 +64,7 @@ public class GatewaySessionCtx {
     private final DeviceAuthService authService;
     private final RelationService relationService;
     private final Map<String, GatewayDeviceSessionCtx> devices;
+    private final ConcurrentMap<String, Integer> mqttQoSMap;
     private ChannelHandlerContext channel;
 
     public GatewaySessionCtx(SessionMsgProcessor processor, DeviceService deviceService, DeviceAuthService authService, RelationService relationService, DeviceSessionCtx gatewaySessionCtx) {
@@ -73,6 +75,7 @@ public class GatewaySessionCtx {
         this.gateway = gatewaySessionCtx.getDevice();
         this.gatewaySessionId = gatewaySessionCtx.getSessionId();
         this.devices = new HashMap<>();
+        this.mqttQoSMap = gatewaySessionCtx.getMqttQoSMap();
     }
 
     public void onDeviceConnect(MqttPublishMessage msg) throws AdaptorException {
@@ -96,7 +99,7 @@ public class GatewaySessionCtx {
                 relationService.saveRelationAsync(new EntityRelation(gateway.getId(), device.getId(), "Created"));
                 processor.onDeviceAdded(device);
             }
-            GatewayDeviceSessionCtx ctx = new GatewayDeviceSessionCtx(this, device);
+            GatewayDeviceSessionCtx ctx = new GatewayDeviceSessionCtx(this, device, mqttQoSMap);
             devices.put(deviceName, ctx);
             log.debug("[{}] Added device [{}] to the gateway session", gatewaySessionId, deviceName);
             processor.process(new BasicTransportToDeviceSessionActorMsg(device, new BasicAdaptorToSessionActorMsg(ctx, new AttributesSubscribeMsg())));
diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/MqttDeviceAwareSessionContext.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/MqttDeviceAwareSessionContext.java
new file mode 100644
index 0000000..f085064
--- /dev/null
+++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/MqttDeviceAwareSessionContext.java
@@ -0,0 +1,57 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.transport.mqtt.session;
+
+import io.netty.handler.codec.mqtt.MqttQoS;
+import org.thingsboard.server.common.data.Device;
+import org.thingsboard.server.common.transport.SessionMsgProcessor;
+import org.thingsboard.server.common.transport.auth.DeviceAuthService;
+import org.thingsboard.server.common.transport.session.DeviceAwareSessionContext;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * Created by ashvayka on 30.08.18.
+ */
+public abstract class MqttDeviceAwareSessionContext extends DeviceAwareSessionContext {
+
+    private final ConcurrentMap<String, Integer> mqttQoSMap;
+
+    public MqttDeviceAwareSessionContext(SessionMsgProcessor processor, DeviceAuthService authService, ConcurrentMap<String, Integer> mqttQoSMap) {
+        super(processor, authService);
+        this.mqttQoSMap = mqttQoSMap;
+    }
+
+    public MqttDeviceAwareSessionContext(SessionMsgProcessor processor, DeviceAuthService authService, Device device, ConcurrentMap<String, Integer> mqttQoSMap) {
+        super(processor, authService, device);
+        this.mqttQoSMap = mqttQoSMap;
+    }
+
+    public ConcurrentMap<String, Integer> getMqttQoSMap() {
+        return mqttQoSMap;
+    }
+
+    public MqttQoS getQoSForTopic(String topic) {
+        Integer qos = mqttQoSMap.get(topic);
+        if (qos != null) {
+            return MqttQoS.valueOf(qos);
+        } else {
+            return MqttQoS.AT_LEAST_ONCE;
+        }
+    }
+
+}