thingsboard-aplcache

Merge pull request #1139 from ShvaykaD/develop/2.1.2 Fix

10/10/2018 1:42:14 PM

Details

diff --git a/application/src/test/java/org/thingsboard/server/mqtt/rpc/AbstractMqttServerSideRpcIntegrationTest.java b/application/src/test/java/org/thingsboard/server/mqtt/rpc/AbstractMqttServerSideRpcIntegrationTest.java
index f3e29dc..74ecdb2 100644
--- a/application/src/test/java/org/thingsboard/server/mqtt/rpc/AbstractMqttServerSideRpcIntegrationTest.java
+++ b/application/src/test/java/org/thingsboard/server/mqtt/rpc/AbstractMqttServerSideRpcIntegrationTest.java
@@ -16,6 +16,7 @@
 package org.thingsboard.server.mqtt.rpc;
 
 import com.datastax.driver.core.utils.UUIDs;
+import io.netty.handler.codec.mqtt.MqttQoS;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
@@ -23,19 +24,19 @@ import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
 import org.eclipse.paho.client.mqttv3.MqttCallback;
 import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
 import org.eclipse.paho.client.mqttv3.MqttMessage;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.*;
 import org.thingsboard.server.common.data.Device;
 import org.thingsboard.server.common.data.Tenant;
 import org.thingsboard.server.common.data.User;
 import org.thingsboard.server.common.data.security.Authority;
 import org.thingsboard.server.common.data.security.DeviceCredentials;
 import org.thingsboard.server.controller.AbstractControllerTest;
+import org.thingsboard.server.mqtt.telemetry.AbstractMqttTelemetryIntegrationTest;
 import org.thingsboard.server.service.security.AccessValidator;
 
 import java.util.Arrays;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
@@ -101,13 +102,19 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractC
         MqttConnectOptions options = new MqttConnectOptions();
         options.setUserName(accessToken);
         client.connect(options).waitForCompletion();
-        client.subscribe("v1/devices/me/rpc/request/+", 1);
-        client.setCallback(new TestMqttCallback(client));
+
+        TestMqttCallback callback = new TestMqttCallback(client);
+        client.setCallback(callback);
+        CountDownLatch latch = new CountDownLatch(1);
+        latch.countDown();
+        client.subscribe("v1/devices/me/rpc/request/+", MqttQoS.AT_MOST_ONCE.value());
 
         String setGpioRequest = "{\"method\":\"setGpio\",\"params\":{\"pin\": \"23\",\"value\": 1}}";
         String deviceId = savedDevice.getId().getId().toString();
         String result = doPostAsync("/api/plugins/rpc/oneway/" + deviceId, setGpioRequest, String.class, status().isOk());
         Assert.assertTrue(StringUtils.isEmpty(result));
+        latch.await(3, TimeUnit.SECONDS);
+        assertEquals(MqttQoS.AT_MOST_ONCE.value(), callback.getQoS());
     }
 
     @Test
@@ -204,11 +211,16 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractC
     private static class TestMqttCallback implements MqttCallback {
 
         private final MqttAsyncClient client;
+        private Integer qoS;
 
         TestMqttCallback(MqttAsyncClient client) {
             this.client = client;
         }
 
+        int getQoS() {
+            return qoS;
+        }
+
         @Override
         public void connectionLost(Throwable throwable) {
         }
@@ -219,6 +231,7 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractC
             MqttMessage message = new MqttMessage();
             String responseTopic = requestTopic.replace("request", "response");
             message.setPayload("{\"value1\":\"A\", \"value2\":\"B\"}".getBytes("UTF-8"));
+            qoS = mqttMessage.getQos();
             client.publish(responseTopic, message);
         }
 
diff --git a/application/src/test/java/org/thingsboard/server/mqtt/telemetry/AbstractMqttTelemetryIntegrationTest.java b/application/src/test/java/org/thingsboard/server/mqtt/telemetry/AbstractMqttTelemetryIntegrationTest.java
index d02d07d..df9601e 100644
--- a/application/src/test/java/org/thingsboard/server/mqtt/telemetry/AbstractMqttTelemetryIntegrationTest.java
+++ b/application/src/test/java/org/thingsboard/server/mqtt/telemetry/AbstractMqttTelemetryIntegrationTest.java
@@ -15,10 +15,9 @@
  */
 package org.thingsboard.server.mqtt.telemetry;
 
+import io.netty.handler.codec.mqtt.MqttQoS;
 import lombok.extern.slf4j.Slf4j;
-import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
-import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
-import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.eclipse.paho.client.mqttv3.*;
 import org.junit.Before;
 import org.junit.Ignore;
 import org.junit.Test;
@@ -30,9 +29,12 @@ import org.thingsboard.server.dao.service.DaoNoSqlTest;
 
 import java.net.URI;
 import java.util.*;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
 
 /**
  * @author Valerii Sosliuk
@@ -94,4 +96,60 @@ public abstract class AbstractMqttTelemetryIntegrationTest extends AbstractContr
         assertEquals("3.0", values.get("key3").get(0).get("value"));
         assertEquals("4", values.get("key4").get(0).get("value"));
     }
+
+    @Test
+    public void testMqttQoSLevel() throws Exception {
+        String clientId = MqttAsyncClient.generateClientId();
+        MqttAsyncClient client = new MqttAsyncClient(MQTT_URL, clientId);
+
+        MqttConnectOptions options = new MqttConnectOptions();
+        options.setUserName(accessToken);
+        client.connect(options).waitForCompletion(3000);
+        TestMqttCallback callback = new TestMqttCallback(client);
+        client.setCallback(callback);
+        CountDownLatch latch = new CountDownLatch(1);
+        latch.countDown();
+        client.subscribe("v1/devices/me/attributes", MqttQoS.AT_MOST_ONCE.value());
+        String payload = "{\"key\":\"value\"}";
+        String result = doPostAsync("/api/plugins/telemetry/" + savedDevice.getId() + "/SHARED_SCOPE", payload, String.class, status().isOk());
+        latch.await(3, TimeUnit.SECONDS);
+        assertEquals(payload, callback.getPayload());
+        assertEquals(MqttQoS.AT_MOST_ONCE.value(), callback.getQoS());
+    }
+
+    private static class TestMqttCallback implements MqttCallback {
+
+        private final MqttAsyncClient client;
+        private Integer qoS;
+        private String payload;
+
+        String getPayload() {
+            return payload;
+        }
+
+        TestMqttCallback(MqttAsyncClient client) {
+            this.client = client;
+        }
+
+        int getQoS() {
+            return qoS;
+        }
+
+        @Override
+        public void connectionLost(Throwable throwable) {
+        }
+
+        @Override
+        public void messageArrived(String requestTopic, MqttMessage mqttMessage) {
+            payload = new String(mqttMessage.getPayload());
+            qoS = mqttMessage.getQos();
+        }
+
+        @Override
+        public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
+
+        }
+    }
+
+
 }
diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTopicMatcher.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTopicMatcher.java
new file mode 100644
index 0000000..b4a2028
--- /dev/null
+++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTopicMatcher.java
@@ -0,0 +1,55 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.transport.mqtt;
+
+import java.util.regex.Pattern;
+
+public class MqttTopicMatcher {
+
+    private final String topic;
+    private final Pattern topicRegex;
+
+    MqttTopicMatcher(String topic) {
+        if(topic == null){
+            throw new NullPointerException("topic");
+        }
+        this.topic = topic;
+        this.topicRegex = Pattern.compile(topic.replace("+", "[^/]+").replace("#", ".+") + "$");
+    }
+
+    public String getTopic() {
+        return topic;
+    }
+
+    public boolean matches(String topic){
+        return this.topicRegex.matcher(topic).matches();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        MqttTopicMatcher that = (MqttTopicMatcher) o;
+
+        return topic.equals(that.topic);
+    }
+
+    @Override
+    public int hashCode() {
+        return topic.hashCode();
+    }
+}
diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
index ed0c8c6..3c8365e 100644
--- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
+++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
@@ -54,6 +54,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.regex.Pattern;
 
 import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.*;
 import static io.netty.handler.codec.mqtt.MqttMessageType.*;
@@ -78,7 +79,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
     private final RelationService relationService;
     private final QuotaService quotaService;
     private final SslHandler sslHandler;
-    private final ConcurrentMap<String, Integer> mqttQoSMap;
+    private final ConcurrentMap<MqttTopicMatcher, Integer> mqttQoSMap;
 
     private volatile boolean connected;
     private volatile InetSocketAddress address;
@@ -278,7 +279,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
 
     private void registerSubQoS(String topic, List<Integer> grantedQoSList, MqttQoS reqQoS) {
         grantedQoSList.add(getMinSupportedQos(reqQoS));
-        mqttQoSMap.put(topic, getMinSupportedQos(reqQoS));
+        mqttQoSMap.put(new MqttTopicMatcher(topic), getMinSupportedQos(reqQoS));
     }
 
     private void processUnsubscribe(ChannelHandlerContext ctx, MqttUnsubscribeMessage mqttMsg) {
@@ -287,7 +288,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
         }
         log.trace("[{}] Processing subscription [{}]!", sessionId, mqttMsg.variableHeader().messageId());
         for (String topicName : mqttMsg.payload().topics()) {
-            mqttQoSMap.remove(topicName);
+            mqttQoSMap.remove(new MqttTopicMatcher(topicName));
             try {
                 switch (topicName) {
                     case DEVICE_ATTRIBUTES_TOPIC: {
diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/DeviceSessionCtx.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/DeviceSessionCtx.java
index 3dbb3ef..256ed11 100644
--- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/DeviceSessionCtx.java
+++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/DeviceSessionCtx.java
@@ -16,7 +16,10 @@
 package org.thingsboard.server.transport.mqtt.session;
 
 import io.netty.channel.ChannelHandlerContext;
-import io.netty.handler.codec.mqtt.*;
+import io.netty.handler.codec.mqtt.MqttFixedHeader;
+import io.netty.handler.codec.mqtt.MqttMessage;
+import io.netty.handler.codec.mqtt.MqttMessageType;
+import io.netty.handler.codec.mqtt.MqttQoS;
 import lombok.extern.slf4j.Slf4j;
 import org.thingsboard.server.common.data.id.SessionId;
 import org.thingsboard.server.common.msg.session.SessionActorToAdaptorMsg;
@@ -27,10 +30,9 @@ import org.thingsboard.server.common.msg.session.ex.SessionException;
 import org.thingsboard.server.common.transport.SessionMsgProcessor;
 import org.thingsboard.server.common.transport.adaptor.AdaptorException;
 import org.thingsboard.server.common.transport.auth.DeviceAuthService;
-import org.thingsboard.server.common.transport.session.DeviceAwareSessionContext;
+import org.thingsboard.server.transport.mqtt.MqttTopicMatcher;
 import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor;
 
-import java.util.Map;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -46,7 +48,7 @@ public class DeviceSessionCtx extends MqttDeviceAwareSessionContext {
     private volatile boolean allowAttributeResponses;
     private AtomicInteger msgIdSeq = new AtomicInteger(0);
 
-    public DeviceSessionCtx(SessionMsgProcessor processor, DeviceAuthService authService, MqttTransportAdaptor adaptor, ConcurrentMap<String, Integer> mqttQoSMap) {
+    public DeviceSessionCtx(SessionMsgProcessor processor, DeviceAuthService authService, MqttTransportAdaptor adaptor, ConcurrentMap<MqttTopicMatcher, Integer> mqttQoSMap) {
         super(processor, authService, mqttQoSMap);
         this.adaptor = adaptor;
         this.sessionId = new MqttSessionId();
diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionCtx.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionCtx.java
index e5be5c7..bbc9c08 100644
--- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionCtx.java
+++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionCtx.java
@@ -33,6 +33,7 @@ import org.thingsboard.server.common.msg.session.*;
 import org.thingsboard.server.common.msg.session.ex.SessionException;
 import org.thingsboard.server.common.transport.adaptor.JsonConverter;
 import org.thingsboard.server.common.transport.session.DeviceAwareSessionContext;
+import org.thingsboard.server.transport.mqtt.MqttTopicMatcher;
 import org.thingsboard.server.transport.mqtt.MqttTopics;
 import org.thingsboard.server.transport.mqtt.MqttTransportHandler;
 
@@ -58,7 +59,7 @@ public class GatewayDeviceSessionCtx extends MqttDeviceAwareSessionContext {
     private volatile boolean closed;
     private AtomicInteger msgIdSeq = new AtomicInteger(0);
 
-    public GatewayDeviceSessionCtx(GatewaySessionCtx parent, Device device, ConcurrentMap<String, Integer> mqttQoSMap) {
+    public GatewayDeviceSessionCtx(GatewaySessionCtx parent, Device device, ConcurrentMap<MqttTopicMatcher, Integer> mqttQoSMap) {
         super(parent.getProcessor(), parent.getAuthService(), device, mqttQoSMap);
         this.parent = parent;
         this.sessionId = new MqttSessionId();
diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java
index 98ad6d2..b9291b7 100644
--- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java
+++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java
@@ -39,6 +39,7 @@ import org.thingsboard.server.common.transport.adaptor.JsonConverter;
 import org.thingsboard.server.common.transport.auth.DeviceAuthService;
 import org.thingsboard.server.dao.device.DeviceService;
 import org.thingsboard.server.dao.relation.RelationService;
+import org.thingsboard.server.transport.mqtt.MqttTopicMatcher;
 import org.thingsboard.server.transport.mqtt.MqttTransportHandler;
 import org.thingsboard.server.transport.mqtt.adaptors.JsonMqttAdaptor;
 
@@ -64,7 +65,7 @@ public class GatewaySessionCtx {
     private final DeviceAuthService authService;
     private final RelationService relationService;
     private final Map<String, GatewayDeviceSessionCtx> devices;
-    private final ConcurrentMap<String, Integer> mqttQoSMap;
+    private final ConcurrentMap<MqttTopicMatcher, Integer> mqttQoSMap;
     private ChannelHandlerContext channel;
 
     public GatewaySessionCtx(SessionMsgProcessor processor, DeviceService deviceService, DeviceAuthService authService, RelationService relationService, DeviceSessionCtx gatewaySessionCtx) {
diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/MqttDeviceAwareSessionContext.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/MqttDeviceAwareSessionContext.java
index f085064..0752331 100644
--- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/MqttDeviceAwareSessionContext.java
+++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/MqttDeviceAwareSessionContext.java
@@ -20,35 +20,42 @@ import org.thingsboard.server.common.data.Device;
 import org.thingsboard.server.common.transport.SessionMsgProcessor;
 import org.thingsboard.server.common.transport.auth.DeviceAuthService;
 import org.thingsboard.server.common.transport.session.DeviceAwareSessionContext;
+import org.thingsboard.server.transport.mqtt.MqttTopicMatcher;
 
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentMap;
+import java.util.stream.Collectors;
 
 /**
  * Created by ashvayka on 30.08.18.
  */
 public abstract class MqttDeviceAwareSessionContext extends DeviceAwareSessionContext {
 
-    private final ConcurrentMap<String, Integer> mqttQoSMap;
+    private final ConcurrentMap<MqttTopicMatcher, Integer> mqttQoSMap;
 
-    public MqttDeviceAwareSessionContext(SessionMsgProcessor processor, DeviceAuthService authService, ConcurrentMap<String, Integer> mqttQoSMap) {
+    public MqttDeviceAwareSessionContext(SessionMsgProcessor processor, DeviceAuthService authService, ConcurrentMap<MqttTopicMatcher, Integer> mqttQoSMap) {
         super(processor, authService);
         this.mqttQoSMap = mqttQoSMap;
     }
 
-    public MqttDeviceAwareSessionContext(SessionMsgProcessor processor, DeviceAuthService authService, Device device, ConcurrentMap<String, Integer> mqttQoSMap) {
+    public MqttDeviceAwareSessionContext(SessionMsgProcessor processor, DeviceAuthService authService, Device device, ConcurrentMap<MqttTopicMatcher, Integer> mqttQoSMap) {
         super(processor, authService, device);
         this.mqttQoSMap = mqttQoSMap;
     }
 
-    public ConcurrentMap<String, Integer> getMqttQoSMap() {
+    public ConcurrentMap<MqttTopicMatcher, Integer> getMqttQoSMap() {
         return mqttQoSMap;
     }
 
     public MqttQoS getQoSForTopic(String topic) {
-        Integer qos = mqttQoSMap.get(topic);
-        if (qos != null) {
-            return MqttQoS.valueOf(qos);
+        List<Integer> qosList = mqttQoSMap.entrySet()
+                .stream()
+                .filter(entry -> entry.getKey().matches(topic))
+                .map(Map.Entry::getValue)
+                .collect(Collectors.toList());
+        if (!qosList.isEmpty()) {
+            return MqttQoS.valueOf(qosList.get(0));
         } else {
             return MqttQoS.AT_LEAST_ONCE;
         }