thingsboard-developers

tmp commit to merge with master

10/4/2018 10:36:46 AM

Details

diff --git a/common/transport/pom.xml b/common/transport/pom.xml
index a349012..b8a7ab5 100644
--- a/common/transport/pom.xml
+++ b/common/transport/pom.xml
@@ -86,6 +86,19 @@
             <groupId>org.apache.commons</groupId>
             <artifactId>commons-lang3</artifactId>
         </dependency>
+        <dependency>
+            <groupId>com.google.protobuf</groupId>
+            <artifactId>protobuf-java</artifactId>
+        </dependency>
     </dependencies>
 
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.xolstice.maven.plugins</groupId>
+                <artifactId>protobuf-maven-plugin</artifactId>
+            </plugin>
+        </plugins>
+    </build>
+
 </project>
diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/TransportService.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/TransportService.java
new file mode 100644
index 0000000..9ff19b7
--- /dev/null
+++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/TransportService.java
@@ -0,0 +1,16 @@
+package org.thingsboard.server.common.transport;
+
+import org.thingsboard.server.gen.transport.TransportProtos;
+
+/**
+ * Created by ashvayka on 04.10.18.
+ */
+public interface TransportService {
+
+    void process(TransportProtos.SessionEventMsg msg);
+
+    void process(TransportProtos.PostTelemetryMsg msg);
+
+    void process(TransportProtos.PostAttributeMsg msg);
+
+}
diff --git a/common/transport/src/main/proto/transport.proto b/common/transport/src/main/proto/transport.proto
new file mode 100644
index 0000000..d77e79b
--- /dev/null
+++ b/common/transport/src/main/proto/transport.proto
@@ -0,0 +1,79 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+syntax = "proto3";
+package transport;
+
+option java_package = "org.thingsboard.server.gen.transport";
+option java_outer_classname = "TransportProtos";
+
+/**
+ * Data Structures;
+ */
+message SessionInfoProto {
+  string nodeId = 1;
+  int64 sessionIdMSB = 2;
+  int64 sessionIdLSB = 3;
+}
+
+enum SessionEvent {
+  OPEN = 0;
+  CLOSED = 1;
+}
+
+message KeyValueProto {
+  string key = 1;
+  bool bool_v = 2;
+  int64 long_v = 3;
+  double double_v = 4;
+  string string_v = 5;
+}
+
+message TsKvListProto {
+  int64 ts = 1;
+  repeated KeyValueProto kv = 2;
+}
+
+/**
+ * Messages that use Data Structures;
+ */
+message SessionEventMsg {
+  SessionInfoProto sessionInfo = 1;
+  int64 deviceIdMSB = 2;
+  int64 deviceIdLSB = 3;
+}
+
+message PostTelemetryMsg {
+  SessionInfoProto sessionInfo = 1;
+  repeated TsKvListProto tsKvList = 2;
+}
+
+message PostAttributeMsg {
+  SessionInfoProto sessionInfo = 1;
+  repeated TsKvListProto tsKvList = 2;
+}
+
+message GetAttributeRequestMsg {
+  SessionInfoProto sessionInfo = 1;
+  repeated string clientAttributeNames = 2;
+  repeated string sharedAttributeNames = 3;
+}
+
+message GetAttributeResponseMsg {
+  SessionInfoProto sessionInfo = 1;
+  repeated TsKvListProto clientAttributeList = 2;
+  repeated TsKvListProto sharedAttributeList = 3;
+  repeated string deletedAttributeKeys = 4;
+}
\ No newline at end of file
diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttSslHandlerProvider.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttSslHandlerProvider.java
index 9c458fd..70ed501 100644
--- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttSslHandlerProvider.java
+++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttSslHandlerProvider.java
@@ -60,7 +60,6 @@ public class MqttSslHandlerProvider {
     @Autowired
     private DeviceCredentialsService deviceCredentialsService;
 
-
     public SslHandler getSslHandler() {
         try {
             URL ksUrl = Resources.getResource(keyStoreFile);
diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java
new file mode 100644
index 0000000..43e2a34
--- /dev/null
+++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java
@@ -0,0 +1,38 @@
+package org.thingsboard.server.transport.mqtt;
+
+import io.netty.handler.ssl.SslHandler;
+import lombok.Data;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Lazy;
+import org.springframework.stereotype.Component;
+import org.thingsboard.server.common.transport.TransportService;
+import org.thingsboard.server.common.transport.quota.host.HostRequestsQuotaService;
+import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor;
+
+/**
+ * Created by ashvayka on 04.10.18.
+ */
+@Component
+@Data
+public class MqttTransportContext {
+
+    @Autowired
+    @Lazy
+    private TransportService transportService;
+
+    @Autowired(required = false)
+    private MqttSslHandlerProvider sslHandlerProvider;
+
+    @Autowired(required = false)
+    private HostRequestsQuotaService quotaService;
+
+    @Autowired
+    private MqttTransportAdaptor adaptor;
+
+    @Value("${mqtt.netty.max_payload_size}")
+    private Integer maxPayloadSize;
+
+    private SslHandler sslHandler;
+
+}
diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportServerInitializer.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportServerInitializer.java
index 432966e..102a0d9 100644
--- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportServerInitializer.java
+++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportServerInitializer.java
@@ -22,6 +22,7 @@ import io.netty.handler.codec.mqtt.MqttDecoder;
 import io.netty.handler.codec.mqtt.MqttEncoder;
 import io.netty.handler.ssl.SslHandler;
 import org.thingsboard.server.common.transport.SessionMsgProcessor;
+import org.thingsboard.server.common.transport.TransportService;
 import org.thingsboard.server.common.transport.auth.DeviceAuthService;
 import org.thingsboard.server.common.transport.quota.QuotaService;
 import org.thingsboard.server.dao.device.DeviceService;
@@ -33,41 +34,25 @@ import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor;
  */
 public class MqttTransportServerInitializer extends ChannelInitializer<SocketChannel> {
 
-    private final SessionMsgProcessor processor;
-    private final DeviceService deviceService;
-    private final DeviceAuthService authService;
-    private final RelationService relationService;
-    private final MqttTransportAdaptor adaptor;
-    private final MqttSslHandlerProvider sslHandlerProvider;
-    private final QuotaService quotaService;
-    private final int maxPayloadSize;
+    private final MqttTransportContext context;
 
-    public MqttTransportServerInitializer(SessionMsgProcessor processor, DeviceService deviceService, DeviceAuthService authService, RelationService relationService,
-                                          MqttTransportAdaptor adaptor, MqttSslHandlerProvider sslHandlerProvider,
-                                          QuotaService quotaService, int maxPayloadSize) {
-        this.processor = processor;
-        this.deviceService = deviceService;
-        this.authService = authService;
-        this.relationService = relationService;
-        this.adaptor = adaptor;
-        this.sslHandlerProvider = sslHandlerProvider;
-        this.quotaService = quotaService;
-        this.maxPayloadSize = maxPayloadSize;
+    public MqttTransportServerInitializer(MqttTransportContext context) {
+        this.context = context;
     }
 
     @Override
     public void initChannel(SocketChannel ch) {
         ChannelPipeline pipeline = ch.pipeline();
         SslHandler sslHandler = null;
-        if (sslHandlerProvider != null) {
-            sslHandler = sslHandlerProvider.getSslHandler();
+        if (context.getSslHandlerProvider() != null) {
+            sslHandler = context.getSslHandlerProvider().getSslHandler();
             pipeline.addLast(sslHandler);
+            context.setSslHandler(sslHandler);
         }
-        pipeline.addLast("decoder", new MqttDecoder(maxPayloadSize));
+        pipeline.addLast("decoder", new MqttDecoder(context.getMaxPayloadSize()));
         pipeline.addLast("encoder", MqttEncoder.INSTANCE);
 
-        MqttTransportHandler handler = new MqttTransportHandler(processor, deviceService, authService, relationService,
-                adaptor, sslHandler, quotaService);
+        MqttTransportHandler handler = new MqttTransportHandler(context);
 
         pipeline.addLast(handler);
         ch.closeFuture().addListener(handler);
diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportService.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportService.java
index bb8d4ad..f435cc2 100644
--- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportService.java
+++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportService.java
@@ -23,11 +23,14 @@ import io.netty.channel.socket.nio.NioServerSocketChannel;
 import io.netty.util.ResourceLeakDetector;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 import org.springframework.context.ApplicationContext;
+import org.springframework.context.annotation.Lazy;
 import org.springframework.stereotype.Service;
 import org.thingsboard.server.common.transport.SessionMsgProcessor;
+import org.thingsboard.server.common.transport.TransportService;
 import org.thingsboard.server.common.transport.auth.DeviceAuthService;
 import org.thingsboard.server.common.transport.quota.host.HostRequestsQuotaService;
 import org.thingsboard.server.dao.device.DeviceService;
@@ -48,27 +51,6 @@ public class MqttTransportService {
     private static final String V1 = "v1";
     private static final String DEVICE = "device";
 
-    @Autowired(required = false)
-    private ApplicationContext appContext;
-
-    @Autowired(required = false)
-    private SessionMsgProcessor processor;
-
-    @Autowired(required = false)
-    private DeviceService deviceService;
-
-    @Autowired(required = false)
-    private DeviceAuthService authService;
-
-    @Autowired(required = false)
-    private RelationService relationService;
-
-    @Autowired(required = false)
-    private MqttSslHandlerProvider sslHandlerProvider;
-
-    @Autowired(required = false)
-    private HostRequestsQuotaService quotaService;
-
     @Value("${mqtt.bind_address}")
     private String host;
     @Value("${mqtt.bind_port}")
@@ -82,10 +64,9 @@ public class MqttTransportService {
     private Integer bossGroupThreadCount;
     @Value("${mqtt.netty.worker_group_thread_count}")
     private Integer workerGroupThreadCount;
-    @Value("${mqtt.netty.max_payload_size}")
-    private Integer maxPayloadSize;
 
-    private MqttTransportAdaptor adaptor;
+    @Autowired
+    private MqttTransportContext context;
 
     private Channel serverChannel;
     private EventLoopGroup bossGroup;
@@ -97,17 +78,12 @@ public class MqttTransportService {
         ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.valueOf(leakDetectorLevel.toUpperCase()));
 
         log.info("Starting MQTT transport...");
-        log.info("Lookup MQTT transport adaptor {}", adaptorName);
-        this.adaptor = (MqttTransportAdaptor) appContext.getBean(adaptorName);
-
-        log.info("Starting MQTT transport server");
         bossGroup = new NioEventLoopGroup(bossGroupThreadCount);
         workerGroup = new NioEventLoopGroup(workerGroupThreadCount);
         ServerBootstrap b = new ServerBootstrap();
         b.group(bossGroup, workerGroup)
                 .channel(NioServerSocketChannel.class)
-                .childHandler(new MqttTransportServerInitializer(processor, deviceService, authService, relationService,
-                        adaptor, sslHandlerProvider, quotaService, maxPayloadSize));
+                .childHandler(new MqttTransportServerInitializer(context));
 
         serverChannel = b.bind(host, port).sync().channel();
         log.info("Mqtt transport started!");