thingsboard-developers
Changes
common/transport/pom.xml 13(+13 -0)
common/transport/src/main/java/org/thingsboard/server/common/transport/TransportService.java 16(+16 -0)
transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttSslHandlerProvider.java 1(+0 -1)
transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java 38(+38 -0)
Details
common/transport/pom.xml 13(+13 -0)
diff --git a/common/transport/pom.xml b/common/transport/pom.xml
index a349012..b8a7ab5 100644
--- a/common/transport/pom.xml
+++ b/common/transport/pom.xml
@@ -86,6 +86,19 @@
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
+ <dependency>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ </dependency>
</dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.xolstice.maven.plugins</groupId>
+ <artifactId>protobuf-maven-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+
</project>
diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/TransportService.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/TransportService.java
new file mode 100644
index 0000000..9ff19b7
--- /dev/null
+++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/TransportService.java
@@ -0,0 +1,16 @@
+package org.thingsboard.server.common.transport;
+
+import org.thingsboard.server.gen.transport.TransportProtos;
+
+/**
+ * Created by ashvayka on 04.10.18.
+ */
+public interface TransportService {
+
+ void process(TransportProtos.SessionEventMsg msg);
+
+ void process(TransportProtos.PostTelemetryMsg msg);
+
+ void process(TransportProtos.PostAttributeMsg msg);
+
+}
diff --git a/common/transport/src/main/proto/transport.proto b/common/transport/src/main/proto/transport.proto
new file mode 100644
index 0000000..d77e79b
--- /dev/null
+++ b/common/transport/src/main/proto/transport.proto
@@ -0,0 +1,79 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+syntax = "proto3";
+package transport;
+
+option java_package = "org.thingsboard.server.gen.transport";
+option java_outer_classname = "TransportProtos";
+
+/**
+ * Data Structures;
+ */
+message SessionInfoProto {
+ string nodeId = 1;
+ int64 sessionIdMSB = 2;
+ int64 sessionIdLSB = 3;
+}
+
+enum SessionEvent {
+ OPEN = 0;
+ CLOSED = 1;
+}
+
+message KeyValueProto {
+ string key = 1;
+ bool bool_v = 2;
+ int64 long_v = 3;
+ double double_v = 4;
+ string string_v = 5;
+}
+
+message TsKvListProto {
+ int64 ts = 1;
+ repeated KeyValueProto kv = 2;
+}
+
+/**
+ * Messages that use Data Structures;
+ */
+message SessionEventMsg {
+ SessionInfoProto sessionInfo = 1;
+ int64 deviceIdMSB = 2;
+ int64 deviceIdLSB = 3;
+}
+
+message PostTelemetryMsg {
+ SessionInfoProto sessionInfo = 1;
+ repeated TsKvListProto tsKvList = 2;
+}
+
+message PostAttributeMsg {
+ SessionInfoProto sessionInfo = 1;
+ repeated TsKvListProto tsKvList = 2;
+}
+
+message GetAttributeRequestMsg {
+ SessionInfoProto sessionInfo = 1;
+ repeated string clientAttributeNames = 2;
+ repeated string sharedAttributeNames = 3;
+}
+
+message GetAttributeResponseMsg {
+ SessionInfoProto sessionInfo = 1;
+ repeated TsKvListProto clientAttributeList = 2;
+ repeated TsKvListProto sharedAttributeList = 3;
+ repeated string deletedAttributeKeys = 4;
+}
\ No newline at end of file
diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttSslHandlerProvider.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttSslHandlerProvider.java
index 9c458fd..70ed501 100644
--- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttSslHandlerProvider.java
+++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttSslHandlerProvider.java
@@ -60,7 +60,6 @@ public class MqttSslHandlerProvider {
@Autowired
private DeviceCredentialsService deviceCredentialsService;
-
public SslHandler getSslHandler() {
try {
URL ksUrl = Resources.getResource(keyStoreFile);
diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java
new file mode 100644
index 0000000..43e2a34
--- /dev/null
+++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java
@@ -0,0 +1,38 @@
+package org.thingsboard.server.transport.mqtt;
+
+import io.netty.handler.ssl.SslHandler;
+import lombok.Data;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Lazy;
+import org.springframework.stereotype.Component;
+import org.thingsboard.server.common.transport.TransportService;
+import org.thingsboard.server.common.transport.quota.host.HostRequestsQuotaService;
+import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor;
+
+/**
+ * Created by ashvayka on 04.10.18.
+ */
+@Component
+@Data
+public class MqttTransportContext {
+
+ @Autowired
+ @Lazy
+ private TransportService transportService;
+
+ @Autowired(required = false)
+ private MqttSslHandlerProvider sslHandlerProvider;
+
+ @Autowired(required = false)
+ private HostRequestsQuotaService quotaService;
+
+ @Autowired
+ private MqttTransportAdaptor adaptor;
+
+ @Value("${mqtt.netty.max_payload_size}")
+ private Integer maxPayloadSize;
+
+ private SslHandler sslHandler;
+
+}
diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportServerInitializer.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportServerInitializer.java
index 432966e..102a0d9 100644
--- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportServerInitializer.java
+++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportServerInitializer.java
@@ -22,6 +22,7 @@ import io.netty.handler.codec.mqtt.MqttDecoder;
import io.netty.handler.codec.mqtt.MqttEncoder;
import io.netty.handler.ssl.SslHandler;
import org.thingsboard.server.common.transport.SessionMsgProcessor;
+import org.thingsboard.server.common.transport.TransportService;
import org.thingsboard.server.common.transport.auth.DeviceAuthService;
import org.thingsboard.server.common.transport.quota.QuotaService;
import org.thingsboard.server.dao.device.DeviceService;
@@ -33,41 +34,25 @@ import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor;
*/
public class MqttTransportServerInitializer extends ChannelInitializer<SocketChannel> {
- private final SessionMsgProcessor processor;
- private final DeviceService deviceService;
- private final DeviceAuthService authService;
- private final RelationService relationService;
- private final MqttTransportAdaptor adaptor;
- private final MqttSslHandlerProvider sslHandlerProvider;
- private final QuotaService quotaService;
- private final int maxPayloadSize;
+ private final MqttTransportContext context;
- public MqttTransportServerInitializer(SessionMsgProcessor processor, DeviceService deviceService, DeviceAuthService authService, RelationService relationService,
- MqttTransportAdaptor adaptor, MqttSslHandlerProvider sslHandlerProvider,
- QuotaService quotaService, int maxPayloadSize) {
- this.processor = processor;
- this.deviceService = deviceService;
- this.authService = authService;
- this.relationService = relationService;
- this.adaptor = adaptor;
- this.sslHandlerProvider = sslHandlerProvider;
- this.quotaService = quotaService;
- this.maxPayloadSize = maxPayloadSize;
+ public MqttTransportServerInitializer(MqttTransportContext context) {
+ this.context = context;
}
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
SslHandler sslHandler = null;
- if (sslHandlerProvider != null) {
- sslHandler = sslHandlerProvider.getSslHandler();
+ if (context.getSslHandlerProvider() != null) {
+ sslHandler = context.getSslHandlerProvider().getSslHandler();
pipeline.addLast(sslHandler);
+ context.setSslHandler(sslHandler);
}
- pipeline.addLast("decoder", new MqttDecoder(maxPayloadSize));
+ pipeline.addLast("decoder", new MqttDecoder(context.getMaxPayloadSize()));
pipeline.addLast("encoder", MqttEncoder.INSTANCE);
- MqttTransportHandler handler = new MqttTransportHandler(processor, deviceService, authService, relationService,
- adaptor, sslHandler, quotaService);
+ MqttTransportHandler handler = new MqttTransportHandler(context);
pipeline.addLast(handler);
ch.closeFuture().addListener(handler);
diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportService.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportService.java
index bb8d4ad..f435cc2 100644
--- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportService.java
+++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportService.java
@@ -23,11 +23,14 @@ import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.util.ResourceLeakDetector;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.ApplicationContext;
+import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
import org.thingsboard.server.common.transport.SessionMsgProcessor;
+import org.thingsboard.server.common.transport.TransportService;
import org.thingsboard.server.common.transport.auth.DeviceAuthService;
import org.thingsboard.server.common.transport.quota.host.HostRequestsQuotaService;
import org.thingsboard.server.dao.device.DeviceService;
@@ -48,27 +51,6 @@ public class MqttTransportService {
private static final String V1 = "v1";
private static final String DEVICE = "device";
- @Autowired(required = false)
- private ApplicationContext appContext;
-
- @Autowired(required = false)
- private SessionMsgProcessor processor;
-
- @Autowired(required = false)
- private DeviceService deviceService;
-
- @Autowired(required = false)
- private DeviceAuthService authService;
-
- @Autowired(required = false)
- private RelationService relationService;
-
- @Autowired(required = false)
- private MqttSslHandlerProvider sslHandlerProvider;
-
- @Autowired(required = false)
- private HostRequestsQuotaService quotaService;
-
@Value("${mqtt.bind_address}")
private String host;
@Value("${mqtt.bind_port}")
@@ -82,10 +64,9 @@ public class MqttTransportService {
private Integer bossGroupThreadCount;
@Value("${mqtt.netty.worker_group_thread_count}")
private Integer workerGroupThreadCount;
- @Value("${mqtt.netty.max_payload_size}")
- private Integer maxPayloadSize;
- private MqttTransportAdaptor adaptor;
+ @Autowired
+ private MqttTransportContext context;
private Channel serverChannel;
private EventLoopGroup bossGroup;
@@ -97,17 +78,12 @@ public class MqttTransportService {
ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.valueOf(leakDetectorLevel.toUpperCase()));
log.info("Starting MQTT transport...");
- log.info("Lookup MQTT transport adaptor {}", adaptorName);
- this.adaptor = (MqttTransportAdaptor) appContext.getBean(adaptorName);
-
- log.info("Starting MQTT transport server");
bossGroup = new NioEventLoopGroup(bossGroupThreadCount);
workerGroup = new NioEventLoopGroup(workerGroupThreadCount);
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
- .childHandler(new MqttTransportServerInitializer(processor, deviceService, authService, relationService,
- adaptor, sslHandlerProvider, quotaService, maxPayloadSize));
+ .childHandler(new MqttTransportServerInitializer(context));
serverChannel = b.bind(host, port).sync().channel();
log.info("Mqtt transport started!");