thingsboard-memoizeit

TB-66: Implementation

6/25/2017 9:11:00 AM

Details

diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
index 36868c5..9c4dd40 100644
--- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
+++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
@@ -37,6 +37,7 @@ import org.thingsboard.server.common.transport.adaptor.AdaptorException;
 import org.thingsboard.server.common.transport.auth.DeviceAuthService;
 import org.thingsboard.server.dao.EncryptionUtil;
 import org.thingsboard.server.dao.device.DeviceService;
+import org.thingsboard.server.dao.relation.RelationService;
 import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor;
 import org.thingsboard.server.transport.mqtt.session.GatewaySessionCtx;
 import org.thingsboard.server.transport.mqtt.session.DeviceSessionCtx;
@@ -67,14 +68,16 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
     private final SessionMsgProcessor processor;
     private final DeviceService deviceService;
     private final DeviceAuthService authService;
+    private final RelationService relationService;
     private final SslHandler sslHandler;
     private volatile boolean connected;
     private volatile GatewaySessionCtx gatewaySessionCtx;
 
-    public MqttTransportHandler(SessionMsgProcessor processor, DeviceService deviceService, DeviceAuthService authService,
+    public MqttTransportHandler(SessionMsgProcessor processor, DeviceService deviceService, DeviceAuthService authService, RelationService relationService,
                                 MqttTransportAdaptor adaptor, SslHandler sslHandler) {
         this.processor = processor;
         this.deviceService = deviceService;
+        this.relationService = relationService;
         this.authService = authService;
         this.adaptor = adaptor;
         this.deviceSessionCtx = new DeviceSessionCtx(processor, authService, adaptor);
@@ -371,7 +374,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
         if (infoNode != null) {
             JsonNode gatewayNode = infoNode.get("gateway");
             if (gatewayNode != null && gatewayNode.asBoolean()) {
-                gatewaySessionCtx = new GatewaySessionCtx(processor, deviceService, authService, deviceSessionCtx);
+                gatewaySessionCtx = new GatewaySessionCtx(processor, deviceService, authService, relationService, deviceSessionCtx);
             }
         }
     }
diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportServerInitializer.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportServerInitializer.java
index af109f6..1469290 100644
--- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportServerInitializer.java
+++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportServerInitializer.java
@@ -30,6 +30,7 @@ import org.springframework.beans.factory.annotation.Value;
 import org.thingsboard.server.common.transport.SessionMsgProcessor;
 import org.thingsboard.server.common.transport.auth.DeviceAuthService;
 import org.thingsboard.server.dao.device.DeviceService;
+import org.thingsboard.server.dao.relation.RelationService;
 import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor;
 
 import javax.net.ssl.SSLException;
@@ -45,14 +46,17 @@ public class MqttTransportServerInitializer extends ChannelInitializer<SocketCha
     private final SessionMsgProcessor processor;
     private final DeviceService deviceService;
     private final DeviceAuthService authService;
+    private final RelationService relationService;
     private final MqttTransportAdaptor adaptor;
     private final MqttSslHandlerProvider sslHandlerProvider;
 
-    public MqttTransportServerInitializer(SessionMsgProcessor processor, DeviceService deviceService, DeviceAuthService authService, MqttTransportAdaptor adaptor,
+    public MqttTransportServerInitializer(SessionMsgProcessor processor, DeviceService deviceService, DeviceAuthService authService, RelationService relationService,
+                                          MqttTransportAdaptor adaptor,
                                           MqttSslHandlerProvider sslHandlerProvider) {
         this.processor = processor;
         this.deviceService = deviceService;
         this.authService = authService;
+        this.relationService = relationService;
         this.adaptor = adaptor;
         this.sslHandlerProvider = sslHandlerProvider;
     }
@@ -68,7 +72,7 @@ public class MqttTransportServerInitializer extends ChannelInitializer<SocketCha
         pipeline.addLast("decoder", new MqttDecoder(MAX_PAYLOAD_SIZE));
         pipeline.addLast("encoder", MqttEncoder.INSTANCE);
 
-        MqttTransportHandler handler = new MqttTransportHandler(processor, deviceService, authService, adaptor, sslHandler);
+        MqttTransportHandler handler = new MqttTransportHandler(processor, deviceService, authService, relationService, adaptor, sslHandler);
         pipeline.addLast(handler);
         ch.closeFuture().addListener(handler);
     }
diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportService.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportService.java
index 8710809..179dad5 100644
--- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportService.java
+++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportService.java
@@ -29,6 +29,7 @@ import org.springframework.stereotype.Service;
 import org.thingsboard.server.common.transport.SessionMsgProcessor;
 import org.thingsboard.server.common.transport.auth.DeviceAuthService;
 import org.thingsboard.server.dao.device.DeviceService;
+import org.thingsboard.server.dao.relation.RelationService;
 import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor;
 
 import javax.annotation.PostConstruct;
@@ -57,6 +58,9 @@ public class MqttTransportService {
     private DeviceAuthService authService;
 
     @Autowired(required = false)
+    private RelationService relationService;
+
+    @Autowired(required = false)
     private MqttSslHandlerProvider sslHandlerProvider;
 
     @Value("${mqtt.bind_address}")
@@ -95,7 +99,7 @@ public class MqttTransportService {
         ServerBootstrap b = new ServerBootstrap();
         b.group(bossGroup, workerGroup)
                 .channel(NioServerSocketChannel.class)
-                .childHandler(new MqttTransportServerInitializer(processor, deviceService, authService, adaptor, sslHandlerProvider));
+                .childHandler(new MqttTransportServerInitializer(processor, deviceService, authService, relationService, adaptor, sslHandlerProvider));
 
         serverChannel = b.bind(host, port).sync().channel();
         log.info("Mqtt transport started!");
diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java
index d6a953a..1977344 100644
--- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java
+++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java
@@ -27,6 +27,7 @@ import org.springframework.util.StringUtils;
 import org.thingsboard.server.common.data.Device;
 import org.thingsboard.server.common.data.id.SessionId;
 import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
+import org.thingsboard.server.common.data.relation.EntityRelation;
 import org.thingsboard.server.common.msg.core.*;
 import org.thingsboard.server.common.msg.session.BasicAdaptorToSessionActorMsg;
 import org.thingsboard.server.common.msg.session.BasicToDeviceActorSessionMsg;
@@ -36,6 +37,7 @@ import org.thingsboard.server.common.transport.adaptor.AdaptorException;
 import org.thingsboard.server.common.transport.adaptor.JsonConverter;
 import org.thingsboard.server.common.transport.auth.DeviceAuthService;
 import org.thingsboard.server.dao.device.DeviceService;
+import org.thingsboard.server.dao.relation.RelationService;
 import org.thingsboard.server.transport.mqtt.MqttTransportHandler;
 import org.thingsboard.server.transport.mqtt.adaptors.JsonMqttAdaptor;
 
@@ -58,28 +60,34 @@ public class GatewaySessionCtx {
     private final SessionMsgProcessor processor;
     private final DeviceService deviceService;
     private final DeviceAuthService authService;
+    private final RelationService relationService;
     private final Map<String, GatewayDeviceSessionCtx> devices;
     private ChannelHandlerContext channel;
 
-    public GatewaySessionCtx(SessionMsgProcessor processor, DeviceService deviceService, DeviceAuthService authService, DeviceSessionCtx gatewaySessionCtx) {
+    public GatewaySessionCtx(SessionMsgProcessor processor, DeviceService deviceService, DeviceAuthService authService, RelationService relationService, DeviceSessionCtx gatewaySessionCtx) {
         this.processor = processor;
         this.deviceService = deviceService;
         this.authService = authService;
+        this.relationService = relationService;
         this.gateway = gatewaySessionCtx.getDevice();
         this.gatewaySessionId = gatewaySessionCtx.getSessionId();
         this.devices = new HashMap<>();
     }
 
     public void onDeviceConnect(MqttPublishMessage msg) throws AdaptorException {
-        String deviceName = checkDeviceName(getDeviceName(msg));
+        JsonElement json = getJson(msg);
+        String deviceName = checkDeviceName(getDeviceName(json));
+        String deviceType = getDeviceType(json);
         if (!devices.containsKey(deviceName)) {
             Optional<Device> deviceOpt = deviceService.findDeviceByTenantIdAndName(gateway.getTenantId(), deviceName);
             Device device = deviceOpt.orElseGet(() -> {
                 Device newDevice = new Device();
                 newDevice.setTenantId(gateway.getTenantId());
                 newDevice.setName(deviceName);
-                newDevice.setType("default");
-                return deviceService.saveDevice(newDevice);
+                newDevice.setType(deviceType);
+                newDevice = deviceService.saveDevice(newDevice);
+                relationService.saveRelation(new EntityRelation(gateway.getId(), newDevice.getId(), "Created"));
+                return newDevice;
             });
             GatewayDeviceSessionCtx ctx = new GatewayDeviceSessionCtx(this, device);
             devices.put(deviceName, ctx);
@@ -91,7 +99,7 @@ public class GatewaySessionCtx {
     }
 
     public void onDeviceDisconnect(MqttPublishMessage msg) throws AdaptorException {
-        String deviceName = checkDeviceName(getDeviceName(msg));
+        String deviceName = checkDeviceName(getDeviceName(getJson(msg)));
         GatewayDeviceSessionCtx deviceSessionCtx = devices.remove(deviceName);
         if (deviceSessionCtx != null) {
             processor.process(SessionCloseMsg.onDisconnect(deviceSessionCtx.getSessionId()));
@@ -211,11 +219,19 @@ public class GatewaySessionCtx {
         }
     }
 
-    private String getDeviceName(MqttPublishMessage mqttMsg) throws AdaptorException {
-        JsonElement json = JsonMqttAdaptor.validateJsonPayload(gatewaySessionId, mqttMsg.payload());
+    private String getDeviceName(JsonElement json) throws AdaptorException {
         return json.getAsJsonObject().get("device").getAsString();
     }
 
+    private String getDeviceType(JsonElement json) throws AdaptorException {
+        JsonElement type = json.getAsJsonObject().get("type");
+        return type == null ? "default" : type.getAsString();
+    }
+
+    private JsonElement getJson(MqttPublishMessage mqttMsg) throws AdaptorException {
+        return JsonMqttAdaptor.validateJsonPayload(gatewaySessionId, mqttMsg.payload());
+    }
+
     protected SessionMsgProcessor getProcessor() {
         return processor;
     }
@@ -229,7 +245,9 @@ public class GatewaySessionCtx {
     }
 
     private void ack(MqttPublishMessage msg) {
-        writeAndFlush(MqttTransportHandler.createMqttPubAckMsg(msg.variableHeader().messageId()));
+        if(msg.variableHeader().messageId() > 0) {
+            writeAndFlush(MqttTransportHandler.createMqttPubAckMsg(msg.variableHeader().messageId()));
+        }
     }
 
     protected void writeAndFlush(MqttMessage mqttMessage) {
diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/SslUtil.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/SslUtil.java
index adda344..590c289 100644
--- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/SslUtil.java
+++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/SslUtil.java
@@ -18,9 +18,7 @@ package org.thingsboard.server.transport.mqtt.util;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.util.Base64Utils;
 import org.thingsboard.server.dao.EncryptionUtil;
-import sun.misc.BASE64Encoder;
 
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.security.cert.CertificateEncodingException;
 import java.security.cert.X509Certificate;