thingsboard-developers
Changes
common/transport/src/main/java/org/thingsboard/server/common/transport/session/DeviceAwareSessionContext.java 56(+14 -42)
common/transport/src/main/java/org/thingsboard/server/common/transport/TransportService.java 9(+6 -3)
common/transport/src/main/java/org/thingsboard/server/common/transport/TransportServiceCallback.java 11(+11 -0)
transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java 26(+26 -0)
transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java 575(+323 -252)
transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/DeviceSessionCtx.java 6(+2 -4)
transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java 4(+4 -0)
transport/mqtt-transport/pom.xml 88(+88 -0)
transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/ThingsboardMqttTransportApplication.java 48(+48 -0)
transport/pom.xml 1(+1 -0)
Details
diff --git a/application/src/main/java/org/thingsboard/server/ThingsboardServerApplication.java b/application/src/main/java/org/thingsboard/server/ThingsboardServerApplication.java
index 751bde6..c072311 100644
--- a/application/src/main/java/org/thingsboard/server/ThingsboardServerApplication.java
+++ b/application/src/main/java/org/thingsboard/server/ThingsboardServerApplication.java
@@ -1,12 +1,12 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
- *
+ * <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/session/DeviceAwareSessionContext.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/session/DeviceAwareSessionContext.java
index 2e7b412..2826945 100644
--- a/common/transport/src/main/java/org/thingsboard/server/common/transport/session/DeviceAwareSessionContext.java
+++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/session/DeviceAwareSessionContext.java
@@ -1,12 +1,12 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
- *
+ * <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -15,6 +15,7 @@
*/
package org.thingsboard.server.common.transport.session;
+import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.security.DeviceCredentialsFilter;
@@ -23,56 +24,27 @@ import org.thingsboard.server.common.msg.session.SessionContext;
import org.thingsboard.server.common.transport.SessionMsgProcessor;
import org.thingsboard.server.common.transport.auth.DeviceAuthResult;
import org.thingsboard.server.common.transport.auth.DeviceAuthService;
+import org.thingsboard.server.gen.transport.TransportProtos;
import java.util.Optional;
/**
* @author Andrew Shvayka
*/
-@Slf4j
+@Data
public abstract class DeviceAwareSessionContext implements SessionContext {
- protected final DeviceAuthService authService;
- protected final SessionMsgProcessor processor;
+ private volatile TransportProtos.DeviceInfoProto deviceInfo;
- protected volatile Device device;
-
- public DeviceAwareSessionContext(SessionMsgProcessor processor, DeviceAuthService authService) {
- this.processor = processor;
- this.authService = authService;
- }
-
- public DeviceAwareSessionContext(SessionMsgProcessor processor, DeviceAuthService authService, Device device) {
- this(processor, authService);
- this.device = device;
- }
-
-
- public boolean login(DeviceCredentialsFilter credentials) {
- DeviceAuthResult result = authService.process(credentials);
- if (result.isSuccess()) {
- Optional<Device> deviceOpt = authService.findDeviceById(result.getDeviceId());
- if (deviceOpt.isPresent()) {
- device = deviceOpt.get();
- }
- return true;
- } else {
- log.debug("Can't find device using credentials [{}] due to {}", credentials, result.getErrorMsg());
- return false;
- }
- }
-
- public DeviceAuthService getAuthService() {
- return authService;
+ public long getDeviceIdMSB() {
+ return deviceInfo.getDeviceIdMSB();
}
- public SessionMsgProcessor getProcessor() {
- return processor;
+ public long getDeviceIdLSB() {
+ return deviceInfo.getDeviceIdLSB();
}
- public Device getDevice() {
- return device;
+ public boolean isConnected() {
+ return deviceInfo != null;
}
-
-
}
diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/TransportService.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/TransportService.java
index 9ff19b7..ff0debc 100644
--- a/common/transport/src/main/java/org/thingsboard/server/common/transport/TransportService.java
+++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/TransportService.java
@@ -7,10 +7,13 @@ import org.thingsboard.server.gen.transport.TransportProtos;
*/
public interface TransportService {
- void process(TransportProtos.SessionEventMsg msg);
+ void process(TransportProtos.ValidateDeviceTokenRequestMsg msg,
+ TransportServiceCallback<TransportProtos.ValidateDeviceTokenResponseMsg> callback);
- void process(TransportProtos.PostTelemetryMsg msg);
+ void process(TransportProtos.SessionEventMsg msg, TransportServiceCallback<Void> callback);
- void process(TransportProtos.PostAttributeMsg msg);
+ void process(TransportProtos.PostTelemetryMsg msg, TransportServiceCallback<Void> callback);
+
+ void process(TransportProtos.PostAttributeMsg msg, TransportServiceCallback<Void> callback);
}
diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/TransportServiceCallback.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/TransportServiceCallback.java
new file mode 100644
index 0000000..bfc7ac8
--- /dev/null
+++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/TransportServiceCallback.java
@@ -0,0 +1,11 @@
+package org.thingsboard.server.common.transport;
+
+/**
+ * Created by ashvayka on 04.10.18.
+ */
+public interface TransportServiceCallback<T> {
+
+ void onSuccess(T msg);
+ void onError(Exception e);
+
+}
diff --git a/common/transport/src/main/proto/transport.proto b/common/transport/src/main/proto/transport.proto
index d77e79b..65d1468 100644
--- a/common/transport/src/main/proto/transport.proto
+++ b/common/transport/src/main/proto/transport.proto
@@ -46,6 +46,14 @@ message TsKvListProto {
repeated KeyValueProto kv = 2;
}
+message DeviceInfoProto {
+ int64 deviceIdMSB = 1;
+ int64 deviceIdLSB = 2;
+ string deviceName = 3;
+ string deviceType = 4;
+ string additionalInfo = 5;
+}
+
/**
* Messages that use Data Structures;
*/
@@ -53,6 +61,7 @@ message SessionEventMsg {
SessionInfoProto sessionInfo = 1;
int64 deviceIdMSB = 2;
int64 deviceIdLSB = 3;
+ SessionEvent event = 4;
}
message PostTelemetryMsg {
@@ -76,4 +85,13 @@ message GetAttributeResponseMsg {
repeated TsKvListProto clientAttributeList = 2;
repeated TsKvListProto sharedAttributeList = 3;
repeated string deletedAttributeKeys = 4;
-}
\ No newline at end of file
+}
+
+message ValidateDeviceTokenRequestMsg {
+ string token = 1;
+}
+
+message ValidateDeviceTokenResponseMsg {
+ DeviceInfoProto deviceInfo = 1;
+}
+
diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java
index 43e2a34..cc40ee0 100644
--- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java
+++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java
@@ -1,22 +1,33 @@
package org.thingsboard.server.transport.mqtt;
+import com.fasterxml.jackson.databind.ObjectMapper;
import io.netty.handler.ssl.SslHandler;
import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.RandomStringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;
+import org.springframework.util.StringUtils;
import org.thingsboard.server.common.transport.TransportService;
import org.thingsboard.server.common.transport.quota.host.HostRequestsQuotaService;
import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor;
+import javax.annotation.PostConstruct;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
/**
* Created by ashvayka on 04.10.18.
*/
+@Slf4j
@Component
@Data
public class MqttTransportContext {
+ private final ObjectMapper mapper = new ObjectMapper();
+
@Autowired
@Lazy
private TransportService transportService;
@@ -33,6 +44,21 @@ public class MqttTransportContext {
@Value("${mqtt.netty.max_payload_size}")
private Integer maxPayloadSize;
+ @Value("${cluster.node_id:#{null}}")
+ private String nodeId;
+
private SslHandler sslHandler;
+ @PostConstruct
+ public void init() {
+ if (StringUtils.isEmpty(nodeId)) {
+ try {
+ nodeId = InetAddress.getLocalHost().getHostName();
+ } catch (UnknownHostException e) {
+ nodeId = RandomStringUtils.randomAlphabetic(10);
+ }
+ }
+ log.info("Current NodeId: {}", nodeId);
+ }
+
}
diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
index ed0c8c6..fd02d3a 100644
--- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
+++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
@@ -1,12 +1,12 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
- *
+ * <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -18,9 +18,21 @@ package org.thingsboard.server.transport.mqtt;
import com.fasterxml.jackson.databind.JsonNode;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
-import io.netty.handler.codec.mqtt.*;
+import io.netty.handler.codec.mqtt.MqttConnAckMessage;
+import io.netty.handler.codec.mqtt.MqttConnAckVariableHeader;
+import io.netty.handler.codec.mqtt.MqttConnectMessage;
import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
+import io.netty.handler.codec.mqtt.MqttFixedHeader;
+import io.netty.handler.codec.mqtt.MqttMessage;
+import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
+import io.netty.handler.codec.mqtt.MqttPubAckMessage;
+import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttQoS;
+import io.netty.handler.codec.mqtt.MqttSubAckMessage;
+import io.netty.handler.codec.mqtt.MqttSubAckPayload;
+import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
+import io.netty.handler.codec.mqtt.MqttTopicSubscription;
+import io.netty.handler.codec.mqtt.MqttUnsubscribeMessage;
import io.netty.handler.ssl.SslHandler;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
@@ -34,13 +46,12 @@ import org.thingsboard.server.common.msg.session.AdaptorToSessionActorMsg;
import org.thingsboard.server.common.msg.session.BasicAdaptorToSessionActorMsg;
import org.thingsboard.server.common.msg.session.BasicTransportToDeviceSessionActorMsg;
import org.thingsboard.server.common.msg.session.ctrl.SessionCloseMsg;
-import org.thingsboard.server.common.transport.SessionMsgProcessor;
+import org.thingsboard.server.common.transport.TransportService;
+import org.thingsboard.server.common.transport.TransportServiceCallback;
import org.thingsboard.server.common.transport.adaptor.AdaptorException;
-import org.thingsboard.server.common.transport.auth.DeviceAuthService;
import org.thingsboard.server.common.transport.quota.QuotaService;
import org.thingsboard.server.dao.EncryptionUtil;
-import org.thingsboard.server.dao.device.DeviceService;
-import org.thingsboard.server.dao.relation.RelationService;
+import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor;
import org.thingsboard.server.transport.mqtt.session.DeviceSessionCtx;
import org.thingsboard.server.transport.mqtt.session.GatewaySessionCtx;
@@ -48,18 +59,51 @@ import org.thingsboard.server.transport.mqtt.util.SslUtil;
import javax.net.ssl.SSLPeerUnverifiedException;
import javax.security.cert.X509Certificate;
+import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
-import java.util.Map;
+import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.*;
-import static io.netty.handler.codec.mqtt.MqttMessageType.*;
-import static io.netty.handler.codec.mqtt.MqttQoS.*;
-import static org.thingsboard.server.common.msg.session.SessionMsgType.*;
-import static org.thingsboard.server.transport.mqtt.MqttTopics.*;
+import org.thingsboard.server.gen.transport.TransportProtos.*;
+
+import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_ACCEPTED;
+import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD;
+import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED;
+import static io.netty.handler.codec.mqtt.MqttMessageType.CONNACK;
+import static io.netty.handler.codec.mqtt.MqttMessageType.PINGRESP;
+import static io.netty.handler.codec.mqtt.MqttMessageType.PUBACK;
+import static io.netty.handler.codec.mqtt.MqttMessageType.SUBACK;
+import static io.netty.handler.codec.mqtt.MqttMessageType.UNSUBACK;
+import static io.netty.handler.codec.mqtt.MqttQoS.AT_LEAST_ONCE;
+import static io.netty.handler.codec.mqtt.MqttQoS.AT_MOST_ONCE;
+import static io.netty.handler.codec.mqtt.MqttQoS.FAILURE;
+import static org.thingsboard.server.common.msg.session.SessionMsgType.GET_ATTRIBUTES_REQUEST;
+import static org.thingsboard.server.common.msg.session.SessionMsgType.POST_ATTRIBUTES_REQUEST;
+import static org.thingsboard.server.common.msg.session.SessionMsgType.POST_TELEMETRY_REQUEST;
+import static org.thingsboard.server.common.msg.session.SessionMsgType.SUBSCRIBE_ATTRIBUTES_REQUEST;
+import static org.thingsboard.server.common.msg.session.SessionMsgType.SUBSCRIBE_RPC_COMMANDS_REQUEST;
+import static org.thingsboard.server.common.msg.session.SessionMsgType.TO_DEVICE_RPC_RESPONSE;
+import static org.thingsboard.server.common.msg.session.SessionMsgType.TO_SERVER_RPC_REQUEST;
+import static org.thingsboard.server.common.msg.session.SessionMsgType.UNSUBSCRIBE_ATTRIBUTES_REQUEST;
+import static org.thingsboard.server.common.msg.session.SessionMsgType.UNSUBSCRIBE_RPC_COMMANDS_REQUEST;
+import static org.thingsboard.server.transport.mqtt.MqttTopics.BASE_GATEWAY_API_TOPIC;
+import static org.thingsboard.server.transport.mqtt.MqttTopics.DEVICE_ATTRIBUTES_REQUEST_TOPIC_PREFIX;
+import static org.thingsboard.server.transport.mqtt.MqttTopics.DEVICE_ATTRIBUTES_RESPONSES_TOPIC;
+import static org.thingsboard.server.transport.mqtt.MqttTopics.DEVICE_ATTRIBUTES_TOPIC;
+import static org.thingsboard.server.transport.mqtt.MqttTopics.DEVICE_RPC_REQUESTS_SUB_TOPIC;
+import static org.thingsboard.server.transport.mqtt.MqttTopics.DEVICE_RPC_REQUESTS_TOPIC;
+import static org.thingsboard.server.transport.mqtt.MqttTopics.DEVICE_RPC_RESPONSE_SUB_TOPIC;
+import static org.thingsboard.server.transport.mqtt.MqttTopics.DEVICE_RPC_RESPONSE_TOPIC;
+import static org.thingsboard.server.transport.mqtt.MqttTopics.DEVICE_TELEMETRY_TOPIC;
+import static org.thingsboard.server.transport.mqtt.MqttTopics.GATEWAY_ATTRIBUTES_REQUEST_TOPIC;
+import static org.thingsboard.server.transport.mqtt.MqttTopics.GATEWAY_ATTRIBUTES_TOPIC;
+import static org.thingsboard.server.transport.mqtt.MqttTopics.GATEWAY_CONNECT_TOPIC;
+import static org.thingsboard.server.transport.mqtt.MqttTopics.GATEWAY_DISCONNECT_TOPIC;
+import static org.thingsboard.server.transport.mqtt.MqttTopics.GATEWAY_RPC_TOPIC;
+import static org.thingsboard.server.transport.mqtt.MqttTopics.GATEWAY_TELEMETRY_TOPIC;
/**
* @author Andrew Shvayka
@@ -69,33 +113,34 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
public static final MqttQoS MAX_SUPPORTED_QOS_LVL = AT_LEAST_ONCE;
- private final DeviceSessionCtx deviceSessionCtx;
- private final String sessionId;
+ private final UUID sessionId;
+ private final MqttTransportContext context;
private final MqttTransportAdaptor adaptor;
- private final SessionMsgProcessor processor;
- private final DeviceService deviceService;
- private final DeviceAuthService authService;
- private final RelationService relationService;
+ private final TransportService transportService;
private final QuotaService quotaService;
private final SslHandler sslHandler;
private final ConcurrentMap<String, Integer> mqttQoSMap;
- private volatile boolean connected;
+ private final SessionInfoProto sessionInfo;
+
private volatile InetSocketAddress address;
+ private volatile DeviceSessionCtx deviceSessionCtx;
private volatile GatewaySessionCtx gatewaySessionCtx;
- public MqttTransportHandler(SessionMsgProcessor processor, DeviceService deviceService, DeviceAuthService authService, RelationService relationService,
- MqttTransportAdaptor adaptor, SslHandler sslHandler, QuotaService quotaService) {
- this.processor = processor;
- this.deviceService = deviceService;
- this.relationService = relationService;
- this.authService = authService;
- this.adaptor = adaptor;
+ public MqttTransportHandler(MqttTransportContext context) {
+ this.sessionId = UUID.randomUUID();
+ this.context = context;
+ this.transportService = context.getTransportService();
+ this.adaptor = context.getAdaptor();
+ this.quotaService = context.getQuotaService();
+ this.sslHandler = context.getSslHandler();
this.mqttQoSMap = new ConcurrentHashMap<>();
- this.deviceSessionCtx = new DeviceSessionCtx(processor, authService, adaptor, mqttQoSMap);
- this.sessionId = deviceSessionCtx.getSessionId().toUidStr();
- this.sslHandler = sslHandler;
- this.quotaService = quotaService;
+ this.sessionInfo = SessionInfoProto.newBuilder()
+ .setNodeId(context.getNodeId())
+ .setSessionIdMSB(sessionId.getMostSignificantBits())
+ .setSessionIdLSB(sessionId.getLeastSignificantBits())
+ .build();
+ this.deviceSessionCtx = new DeviceSessionCtx(mqttQoSMap);
}
@Override
@@ -127,196 +172,196 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
case CONNECT:
processConnect(ctx, (MqttConnectMessage) msg);
break;
- case PUBLISH:
- processPublish(ctx, (MqttPublishMessage) msg);
- break;
- case SUBSCRIBE:
- processSubscribe(ctx, (MqttSubscribeMessage) msg);
- break;
- case UNSUBSCRIBE:
- processUnsubscribe(ctx, (MqttUnsubscribeMessage) msg);
- break;
- case PINGREQ:
- if (checkConnected(ctx)) {
- ctx.writeAndFlush(new MqttMessage(new MqttFixedHeader(PINGRESP, false, AT_MOST_ONCE, false, 0)));
- }
- break;
- case DISCONNECT:
- if (checkConnected(ctx)) {
- processDisconnect(ctx);
- }
- break;
+// case PUBLISH:
+// processPublish(ctx, (MqttPublishMessage) msg);
+// break;
+// case SUBSCRIBE:
+// processSubscribe(ctx, (MqttSubscribeMessage) msg);
+// break;
+// case UNSUBSCRIBE:
+// processUnsubscribe(ctx, (MqttUnsubscribeMessage) msg);
+// break;
+// case PINGREQ:
+// if (checkConnected(ctx)) {
+// ctx.writeAndFlush(new MqttMessage(new MqttFixedHeader(PINGRESP, false, AT_MOST_ONCE, false, 0)));
+// }
+// break;
+// case DISCONNECT:
+// if (checkConnected(ctx)) {
+// processDisconnect(ctx);
+// }
+// break;
default:
break;
}
}
- private void processPublish(ChannelHandlerContext ctx, MqttPublishMessage mqttMsg) {
- if (!checkConnected(ctx)) {
- return;
- }
- String topicName = mqttMsg.variableHeader().topicName();
- int msgId = mqttMsg.variableHeader().messageId();
- log.trace("[{}] Processing publish msg [{}][{}]!", sessionId, topicName, msgId);
-
- if (topicName.startsWith(BASE_GATEWAY_API_TOPIC)) {
- if (gatewaySessionCtx != null) {
- gatewaySessionCtx.setChannel(ctx);
- handleMqttPublishMsg(topicName, msgId, mqttMsg);
- }
- } else {
- processDevicePublish(ctx, mqttMsg, topicName, msgId);
- }
- }
-
- private void handleMqttPublishMsg(String topicName, int msgId, MqttPublishMessage mqttMsg) {
- try {
- switch (topicName) {
- case GATEWAY_TELEMETRY_TOPIC:
- gatewaySessionCtx.onDeviceTelemetry(mqttMsg);
- break;
- case GATEWAY_ATTRIBUTES_TOPIC:
- gatewaySessionCtx.onDeviceAttributes(mqttMsg);
- break;
- case GATEWAY_ATTRIBUTES_REQUEST_TOPIC:
- gatewaySessionCtx.onDeviceAttributesRequest(mqttMsg);
- break;
- case GATEWAY_RPC_TOPIC:
- gatewaySessionCtx.onDeviceRpcResponse(mqttMsg);
- break;
- case GATEWAY_CONNECT_TOPIC:
- gatewaySessionCtx.onDeviceConnect(mqttMsg);
- break;
- case GATEWAY_DISCONNECT_TOPIC:
- gatewaySessionCtx.onDeviceDisconnect(mqttMsg);
- break;
- }
- } catch (RuntimeException | AdaptorException e) {
- log.warn("[{}] Failed to process publish msg [{}][{}]", sessionId, topicName, msgId, e);
- }
- }
-
- private void processDevicePublish(ChannelHandlerContext ctx, MqttPublishMessage mqttMsg, String topicName, int msgId) {
- AdaptorToSessionActorMsg msg = null;
- try {
- if (topicName.equals(DEVICE_TELEMETRY_TOPIC)) {
- msg = adaptor.convertToActorMsg(deviceSessionCtx, POST_TELEMETRY_REQUEST, mqttMsg);
- } else if (topicName.equals(DEVICE_ATTRIBUTES_TOPIC)) {
- msg = adaptor.convertToActorMsg(deviceSessionCtx, POST_ATTRIBUTES_REQUEST, mqttMsg);
- } else if (topicName.startsWith(DEVICE_ATTRIBUTES_REQUEST_TOPIC_PREFIX)) {
- msg = adaptor.convertToActorMsg(deviceSessionCtx, GET_ATTRIBUTES_REQUEST, mqttMsg);
- if (msgId >= 0) {
- ctx.writeAndFlush(createMqttPubAckMsg(msgId));
- }
- } else if (topicName.startsWith(DEVICE_RPC_RESPONSE_TOPIC)) {
- msg = adaptor.convertToActorMsg(deviceSessionCtx, TO_DEVICE_RPC_RESPONSE, mqttMsg);
- if (msgId >= 0) {
- ctx.writeAndFlush(createMqttPubAckMsg(msgId));
- }
- } else if (topicName.startsWith(DEVICE_RPC_REQUESTS_TOPIC)) {
- msg = adaptor.convertToActorMsg(deviceSessionCtx, TO_SERVER_RPC_REQUEST, mqttMsg);
- if (msgId >= 0) {
- ctx.writeAndFlush(createMqttPubAckMsg(msgId));
- }
- }
- } catch (AdaptorException e) {
- log.warn("[{}] Failed to process publish msg [{}][{}]", sessionId, topicName, msgId, e);
- }
- if (msg != null) {
- processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg));
- } else {
- log.info("[{}] Closing current session due to invalid publish msg [{}][{}]", sessionId, topicName, msgId);
- ctx.close();
- }
- }
-
- private void processSubscribe(ChannelHandlerContext ctx, MqttSubscribeMessage mqttMsg) {
- if (!checkConnected(ctx)) {
- return;
- }
- log.trace("[{}] Processing subscription [{}]!", sessionId, mqttMsg.variableHeader().messageId());
- List<Integer> grantedQoSList = new ArrayList<>();
- for (MqttTopicSubscription subscription : mqttMsg.payload().topicSubscriptions()) {
- String topic = subscription.topicName();
- MqttQoS reqQoS = subscription.qualityOfService();
- try {
- switch (topic) {
- case DEVICE_ATTRIBUTES_TOPIC: {
- AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, SUBSCRIBE_ATTRIBUTES_REQUEST, mqttMsg);
- processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg));
- registerSubQoS(topic, grantedQoSList, reqQoS);
- break;
- }
- case DEVICE_RPC_REQUESTS_SUB_TOPIC: {
- AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, SUBSCRIBE_RPC_COMMANDS_REQUEST, mqttMsg);
- processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg));
- registerSubQoS(topic, grantedQoSList, reqQoS);
- break;
- }
- case DEVICE_RPC_RESPONSE_SUB_TOPIC:
- case GATEWAY_ATTRIBUTES_TOPIC:
- case GATEWAY_RPC_TOPIC:
- registerSubQoS(topic, grantedQoSList, reqQoS);
- break;
- case DEVICE_ATTRIBUTES_RESPONSES_TOPIC:
- deviceSessionCtx.setAllowAttributeResponses();
- registerSubQoS(topic, grantedQoSList, reqQoS);
- break;
- default:
- log.warn("[{}] Failed to subscribe to [{}][{}]", sessionId, topic, reqQoS);
- grantedQoSList.add(FAILURE.value());
- break;
- }
- } catch (AdaptorException e) {
- log.warn("[{}] Failed to subscribe to [{}][{}]", sessionId, topic, reqQoS);
- grantedQoSList.add(FAILURE.value());
- }
- }
- ctx.writeAndFlush(createSubAckMessage(mqttMsg.variableHeader().messageId(), grantedQoSList));
- }
-
- private void registerSubQoS(String topic, List<Integer> grantedQoSList, MqttQoS reqQoS) {
- grantedQoSList.add(getMinSupportedQos(reqQoS));
- mqttQoSMap.put(topic, getMinSupportedQos(reqQoS));
- }
-
- private void processUnsubscribe(ChannelHandlerContext ctx, MqttUnsubscribeMessage mqttMsg) {
- if (!checkConnected(ctx)) {
- return;
- }
- log.trace("[{}] Processing subscription [{}]!", sessionId, mqttMsg.variableHeader().messageId());
- for (String topicName : mqttMsg.payload().topics()) {
- mqttQoSMap.remove(topicName);
- try {
- switch (topicName) {
- case DEVICE_ATTRIBUTES_TOPIC: {
- AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, UNSUBSCRIBE_ATTRIBUTES_REQUEST, mqttMsg);
- processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg));
- break;
- }
- case DEVICE_RPC_REQUESTS_SUB_TOPIC: {
- AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, UNSUBSCRIBE_RPC_COMMANDS_REQUEST, mqttMsg);
- processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg));
- break;
- }
- case DEVICE_ATTRIBUTES_RESPONSES_TOPIC:
- deviceSessionCtx.setDisallowAttributeResponses();
- break;
- }
- } catch (AdaptorException e) {
- log.warn("[{}] Failed to process unsubscription [{}] to [{}]", sessionId, mqttMsg.variableHeader().messageId(), topicName);
- }
- }
- ctx.writeAndFlush(createUnSubAckMessage(mqttMsg.variableHeader().messageId()));
- }
-
- private MqttMessage createUnSubAckMessage(int msgId) {
- MqttFixedHeader mqttFixedHeader =
- new MqttFixedHeader(UNSUBACK, false, AT_LEAST_ONCE, false, 0);
- MqttMessageIdVariableHeader mqttMessageIdVariableHeader = MqttMessageIdVariableHeader.from(msgId);
- return new MqttMessage(mqttFixedHeader, mqttMessageIdVariableHeader);
- }
+// private void processPublish(ChannelHandlerContext ctx, MqttPublishMessage mqttMsg) {
+// if (!checkConnected(ctx)) {
+// return;
+// }
+// String topicName = mqttMsg.variableHeader().topicName();
+// int msgId = mqttMsg.variableHeader().packetId();
+// log.trace("[{}] Processing publish msg [{}][{}]!", sessionId, topicName, msgId);
+//
+// if (topicName.startsWith(BASE_GATEWAY_API_TOPIC)) {
+// if (gatewaySessionCtx != null) {
+// gatewaySessionCtx.setChannel(ctx);
+// handleMqttPublishMsg(topicName, msgId, mqttMsg);
+// }
+// } else {
+// processDevicePublish(ctx, mqttMsg, topicName, msgId);
+// }
+// }
+//
+// private void handleMqttPublishMsg(String topicName, int msgId, MqttPublishMessage mqttMsg) {
+// try {
+// switch (topicName) {
+// case GATEWAY_TELEMETRY_TOPIC:
+// gatewaySessionCtx.onDeviceTelemetry(mqttMsg);
+// break;
+// case GATEWAY_ATTRIBUTES_TOPIC:
+// gatewaySessionCtx.onDeviceAttributes(mqttMsg);
+// break;
+// case GATEWAY_ATTRIBUTES_REQUEST_TOPIC:
+// gatewaySessionCtx.onDeviceAttributesRequest(mqttMsg);
+// break;
+// case GATEWAY_RPC_TOPIC:
+// gatewaySessionCtx.onDeviceRpcResponse(mqttMsg);
+// break;
+// case GATEWAY_CONNECT_TOPIC:
+// gatewaySessionCtx.onDeviceConnect(mqttMsg);
+// break;
+// case GATEWAY_DISCONNECT_TOPIC:
+// gatewaySessionCtx.onDeviceDisconnect(mqttMsg);
+// break;
+// }
+// } catch (RuntimeException | AdaptorException e) {
+// log.warn("[{}] Failed to process publish msg [{}][{}]", sessionId, topicName, msgId, e);
+// }
+// }
+//
+// private void processDevicePublish(ChannelHandlerContext ctx, MqttPublishMessage mqttMsg, String topicName, int msgId) {
+// AdaptorToSessionActorMsg msg = null;
+// try {
+// if (topicName.equals(DEVICE_TELEMETRY_TOPIC)) {
+// msg = adaptor.convertToActorMsg(deviceSessionCtx, POST_TELEMETRY_REQUEST, mqttMsg);
+// } else if (topicName.equals(DEVICE_ATTRIBUTES_TOPIC)) {
+// msg = adaptor.convertToActorMsg(deviceSessionCtx, POST_ATTRIBUTES_REQUEST, mqttMsg);
+// } else if (topicName.startsWith(DEVICE_ATTRIBUTES_REQUEST_TOPIC_PREFIX)) {
+// msg = adaptor.convertToActorMsg(deviceSessionCtx, GET_ATTRIBUTES_REQUEST, mqttMsg);
+// if (msgId >= 0) {
+// ctx.writeAndFlush(createMqttPubAckMsg(msgId));
+// }
+// } else if (topicName.startsWith(DEVICE_RPC_RESPONSE_TOPIC)) {
+// msg = adaptor.convertToActorMsg(deviceSessionCtx, TO_DEVICE_RPC_RESPONSE, mqttMsg);
+// if (msgId >= 0) {
+// ctx.writeAndFlush(createMqttPubAckMsg(msgId));
+// }
+// } else if (topicName.startsWith(DEVICE_RPC_REQUESTS_TOPIC)) {
+// msg = adaptor.convertToActorMsg(deviceSessionCtx, TO_SERVER_RPC_REQUEST, mqttMsg);
+// if (msgId >= 0) {
+// ctx.writeAndFlush(createMqttPubAckMsg(msgId));
+// }
+// }
+// } catch (AdaptorException e) {
+// log.warn("[{}] Failed to process publish msg [{}][{}]", sessionId, topicName, msgId, e);
+// }
+// if (msg != null) {
+// processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg));
+// } else {
+// log.info("[{}] Closing current session due to invalid publish msg [{}][{}]", sessionId, topicName, msgId);
+// ctx.close();
+// }
+// }
+//
+// private void processSubscribe(ChannelHandlerContext ctx, MqttSubscribeMessage mqttMsg) {
+// if (!checkConnected(ctx)) {
+// return;
+// }
+// log.trace("[{}] Processing subscription [{}]!", sessionId, mqttMsg.variableHeader().messageId());
+// List<Integer> grantedQoSList = new ArrayList<>();
+// for (MqttTopicSubscription subscription : mqttMsg.payload().topicSubscriptions()) {
+// String topic = subscription.topicName();
+// MqttQoS reqQoS = subscription.qualityOfService();
+// try {
+// switch (topic) {
+// case DEVICE_ATTRIBUTES_TOPIC: {
+// AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, SUBSCRIBE_ATTRIBUTES_REQUEST, mqttMsg);
+// processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg));
+// registerSubQoS(topic, grantedQoSList, reqQoS);
+// break;
+// }
+// case DEVICE_RPC_REQUESTS_SUB_TOPIC: {
+// AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, SUBSCRIBE_RPC_COMMANDS_REQUEST, mqttMsg);
+// processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg));
+// registerSubQoS(topic, grantedQoSList, reqQoS);
+// break;
+// }
+// case DEVICE_RPC_RESPONSE_SUB_TOPIC:
+// case GATEWAY_ATTRIBUTES_TOPIC:
+// case GATEWAY_RPC_TOPIC:
+// registerSubQoS(topic, grantedQoSList, reqQoS);
+// break;
+// case DEVICE_ATTRIBUTES_RESPONSES_TOPIC:
+// deviceSessionCtx.setAllowAttributeResponses();
+// registerSubQoS(topic, grantedQoSList, reqQoS);
+// break;
+// default:
+// log.warn("[{}] Failed to subscribe to [{}][{}]", sessionId, topic, reqQoS);
+// grantedQoSList.add(FAILURE.value());
+// break;
+// }
+// } catch (AdaptorException e) {
+// log.warn("[{}] Failed to subscribe to [{}][{}]", sessionId, topic, reqQoS);
+// grantedQoSList.add(FAILURE.value());
+// }
+// }
+// ctx.writeAndFlush(createSubAckMessage(mqttMsg.variableHeader().messageId(), grantedQoSList));
+// }
+//
+// private void registerSubQoS(String topic, List<Integer> grantedQoSList, MqttQoS reqQoS) {
+// grantedQoSList.add(getMinSupportedQos(reqQoS));
+// mqttQoSMap.put(topic, getMinSupportedQos(reqQoS));
+// }
+//
+// private void processUnsubscribe(ChannelHandlerContext ctx, MqttUnsubscribeMessage mqttMsg) {
+// if (!checkConnected(ctx)) {
+// return;
+// }
+// log.trace("[{}] Processing subscription [{}]!", sessionId, mqttMsg.variableHeader().messageId());
+// for (String topicName : mqttMsg.payload().topics()) {
+// mqttQoSMap.remove(topicName);
+// try {
+// switch (topicName) {
+// case DEVICE_ATTRIBUTES_TOPIC: {
+// AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, UNSUBSCRIBE_ATTRIBUTES_REQUEST, mqttMsg);
+// processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg));
+// break;
+// }
+// case DEVICE_RPC_REQUESTS_SUB_TOPIC: {
+// AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, UNSUBSCRIBE_RPC_COMMANDS_REQUEST, mqttMsg);
+// processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg));
+// break;
+// }
+// case DEVICE_ATTRIBUTES_RESPONSES_TOPIC:
+// deviceSessionCtx.setDisallowAttributeResponses();
+// break;
+// }
+// } catch (AdaptorException e) {
+// log.warn("[{}] Failed to process unsubscription [{}] to [{}]", sessionId, mqttMsg.variableHeader().messageId(), topicName);
+// }
+// }
+// ctx.writeAndFlush(createUnSubAckMessage(mqttMsg.variableHeader().messageId()));
+// }
+//
+// private MqttMessage createUnSubAckMessage(int msgId) {
+// MqttFixedHeader mqttFixedHeader =
+// new MqttFixedHeader(UNSUBACK, false, AT_LEAST_ONCE, false, 0);
+// MqttMessageIdVariableHeader mqttMessageIdVariableHeader = MqttMessageIdVariableHeader.from(msgId);
+// return new MqttMessage(mqttFixedHeader, mqttMessageIdVariableHeader);
+// }
private void processConnect(ChannelHandlerContext ctx, MqttConnectMessage msg) {
log.info("[{}] Processing connect msg for client: {}!", sessionId, msg.payload().clientIdentifier());
@@ -333,36 +378,58 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
if (StringUtils.isEmpty(userName)) {
ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD));
ctx.close();
- } else if (!deviceSessionCtx.login(new DeviceTokenCredentials(msg.payload().userName()))) {
- ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_NOT_AUTHORIZED));
- ctx.close();
} else {
- ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED));
- connected = true;
- processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(),
- new BasicAdaptorToSessionActorMsg(deviceSessionCtx, new SessionOpenMsg())));
- checkGatewaySession();
+ transportService.process(ValidateDeviceTokenRequestMsg.newBuilder().setToken(userName).build(),
+ new TransportServiceCallback<ValidateDeviceTokenResponseMsg>() {
+ @Override
+ public void onSuccess(ValidateDeviceTokenResponseMsg msg) {
+ if (!msg.hasDeviceInfo()) {
+ ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_NOT_AUTHORIZED));
+ ctx.close();
+ } else {
+ ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED));
+ deviceSessionCtx.setDeviceInfo(deviceSessionCtx.getDeviceInfo());
+ transportService.process(getSessionEventMsg(SessionEvent.OPEN), null);
+ checkGatewaySession();
+ }
+ }
+
+ @Override
+ public void onError(Exception e) {
+ log.trace("[{}] Failed to process credentials: {}", address, userName, e);
+ ctx.writeAndFlush(createMqttConnAckMsg(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE));
+ ctx.close();
+ }
+ });
}
}
+ protected SessionEventMsg getSessionEventMsg(SessionEvent event) {
+ return SessionEventMsg.newBuilder()
+ .setSessionInfo(sessionInfo)
+ .setDeviceIdMSB(deviceSessionCtx.getDeviceIdMSB())
+ .setDeviceIdLSB(deviceSessionCtx.getDeviceIdLSB())
+ .setEvent(event).build();
+ }
+
private void processX509CertConnect(ChannelHandlerContext ctx, X509Certificate cert) {
- try {
- String strCert = SslUtil.getX509CertificateString(cert);
- String sha3Hash = EncryptionUtil.getSha3Hash(strCert);
- if (deviceSessionCtx.login(new DeviceX509Credentials(sha3Hash))) {
- ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED));
- connected = true;
- processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(),
- new BasicAdaptorToSessionActorMsg(deviceSessionCtx, new SessionOpenMsg())));
- checkGatewaySession();
- } else {
- ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_NOT_AUTHORIZED));
- ctx.close();
- }
- } catch (Exception e) {
- ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_NOT_AUTHORIZED));
- ctx.close();
- }
+// try {
+// String strCert = SslUtil.getX509CertificateString(cert);
+// String sha3Hash = EncryptionUtil.getSha3Hash(strCert);
+// if (deviceSessionCtx.login(new DeviceX509Credentials(sha3Hash))) {
+// ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED));
+// connected = true;
+// processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(),
+// new BasicAdaptorToSessionActorMsg(deviceSessionCtx, new SessionOpenMsg())));
+// checkGatewaySession();
+// } else {
+// ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_NOT_AUTHORIZED));
+// ctx.close();
+// }
+// } catch (Exception e) {
+// ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_NOT_AUTHORIZED));
+// ctx.close();
+// }
}
private X509Certificate getX509Certificate() {
@@ -380,8 +447,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
private void processDisconnect(ChannelHandlerContext ctx) {
ctx.close();
- if (connected) {
- processor.process(SessionCloseMsg.onDisconnect(deviceSessionCtx.getSessionId()));
+ if (deviceSessionCtx.isConnected()) {
+ transportService.process(getSessionEventMsg(SessionEvent.CLOSED), null);
if (gatewaySessionCtx != null) {
gatewaySessionCtx.onGatewayDisconnect();
}
@@ -428,7 +495,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
}
private boolean checkConnected(ChannelHandlerContext ctx) {
- if (connected) {
+ if (deviceSessionCtx.isConnected()) {
return true;
} else {
log.info("[{}] Closing current session due to invalid msg order [{}][{}]", sessionId);
@@ -438,18 +505,22 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
}
private void checkGatewaySession() {
- Device device = deviceSessionCtx.getDevice();
- JsonNode infoNode = device.getAdditionalInfo();
- if (infoNode != null) {
- JsonNode gatewayNode = infoNode.get("gateway");
- if (gatewayNode != null && gatewayNode.asBoolean()) {
- gatewaySessionCtx = new GatewaySessionCtx(processor, deviceService, authService, relationService, deviceSessionCtx);
+ DeviceInfoProto device = deviceSessionCtx.getDeviceInfo();
+ try {
+ JsonNode infoNode = context.getMapper().readTree(device.getAdditionalInfo());
+ if (infoNode != null) {
+ JsonNode gatewayNode = infoNode.get("gateway");
+ if (gatewayNode != null && gatewayNode.asBoolean()) {
+ gatewaySessionCtx = new GatewaySessionCtx(deviceSessionCtx);
+ }
}
+ } catch (IOException e) {
+ log.trace("[{}][{}] Failed to fetch device additional info", sessionId, device.getDeviceName(), e);
}
}
@Override
public void operationComplete(Future<? super Void> future) throws Exception {
- processor.process(SessionCloseMsg.onError(deviceSessionCtx.getSessionId()));
+ transportService.process(getSessionEventMsg(SessionEvent.CLOSED), null);
}
}
diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/DeviceSessionCtx.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/DeviceSessionCtx.java
index 3dbb3ef..3f29a82 100644
--- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/DeviceSessionCtx.java
+++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/DeviceSessionCtx.java
@@ -40,15 +40,13 @@ import java.util.concurrent.atomic.AtomicInteger;
@Slf4j
public class DeviceSessionCtx extends MqttDeviceAwareSessionContext {
- private final MqttTransportAdaptor adaptor;
private final MqttSessionId sessionId;
private ChannelHandlerContext channel;
private volatile boolean allowAttributeResponses;
private AtomicInteger msgIdSeq = new AtomicInteger(0);
- public DeviceSessionCtx(SessionMsgProcessor processor, DeviceAuthService authService, MqttTransportAdaptor adaptor, ConcurrentMap<String, Integer> mqttQoSMap) {
- super(processor, authService, mqttQoSMap);
- this.adaptor = adaptor;
+ public DeviceSessionCtx(ConcurrentMap<String, Integer> mqttQoSMap) {
+ super(null, null, null);
this.sessionId = new MqttSessionId();
}
diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java
index 98ad6d2..dec8602 100644
--- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java
+++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java
@@ -78,6 +78,10 @@ public class GatewaySessionCtx {
this.mqttQoSMap = gatewaySessionCtx.getMqttQoSMap();
}
+ public GatewaySessionCtx(DeviceSessionCtx deviceSessionCtx) {
+
+ }
+
public void onDeviceConnect(MqttPublishMessage msg) throws AdaptorException {
JsonElement json = getJson(msg);
String deviceName = checkDeviceName(getDeviceName(json));
transport/mqtt-transport/pom.xml 88(+88 -0)
diff --git a/transport/mqtt-transport/pom.xml b/transport/mqtt-transport/pom.xml
new file mode 100644
index 0000000..8d8b24d
--- /dev/null
+++ b/transport/mqtt-transport/pom.xml
@@ -0,0 +1,88 @@
+<!--
+
+ Copyright © 2016-2018 The Thingsboard Authors
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.thingsboard</groupId>
+ <version>2.2.0-SNAPSHOT</version>
+ <artifactId>transport</artifactId>
+ </parent>
+ <groupId>org.thingsboard.transport</groupId>
+ <artifactId>mqtt-transport</artifactId>
+ <packaging>jar</packaging>
+
+ <name>Thingsboard MQTT Transport Service</name>
+ <url>https://thingsboard.io</url>
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <main.dir>${basedir}/../..</main.dir>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.thingsboard.common</groupId>
+ <artifactId>transport</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-all</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework</groupId>
+ <artifactId>spring-context</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>log4j-over-slf4j</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-classic</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-test</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+</project>
diff --git a/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/ThingsboardMqttTransportApplication.java b/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/ThingsboardMqttTransportApplication.java
new file mode 100644
index 0000000..3c54938
--- /dev/null
+++ b/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/ThingsboardMqttTransportApplication.java
@@ -0,0 +1,48 @@
+package org.thingsboard.server.mqtt; /**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.SpringBootConfiguration;
+import org.springframework.context.annotation.ComponentScan;
+import org.springframework.scheduling.annotation.EnableAsync;
+import org.springframework.scheduling.annotation.EnableScheduling;
+import springfox.documentation.swagger2.annotations.EnableSwagger2;
+
+import java.util.Arrays;
+
+@SpringBootConfiguration
+@EnableAsync
+@EnableScheduling
+@ComponentScan({"org.thingsboard.server"})
+public class ThingsboardMqttTransportApplication {
+
+ private static final String SPRING_CONFIG_NAME_KEY = "--spring.config.name";
+ private static final String DEFAULT_SPRING_CONFIG_PARAM = SPRING_CONFIG_NAME_KEY + "=" + "tb-mqtt-transport";
+
+ public static void main(String[] args) {
+ SpringApplication.run(ThingsboardMqttTransportApplication.class, updateArguments(args));
+ }
+
+ private static String[] updateArguments(String[] args) {
+ if (Arrays.stream(args).noneMatch(arg -> arg.startsWith(SPRING_CONFIG_NAME_KEY))) {
+ String[] modifiedArgs = new String[args.length + 1];
+ System.arraycopy(args, 0, modifiedArgs, 0, args.length);
+ modifiedArgs[args.length] = DEFAULT_SPRING_CONFIG_PARAM;
+ return modifiedArgs;
+ }
+ return args;
+ }
+}
diff --git a/transport/mqtt-transport/src/main/resources/tb-mqtt-transport.yml b/transport/mqtt-transport/src/main/resources/tb-mqtt-transport.yml
new file mode 100644
index 0000000..c12f25e
--- /dev/null
+++ b/transport/mqtt-transport/src/main/resources/tb-mqtt-transport.yml
@@ -0,0 +1,57 @@
+#
+# Copyright © 2016-2018 The Thingsboard Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+spring.main.web-application-type: none
+
+# MQTT server parameters
+mqtt:
+ # Enable/disable mqtt transport protocol.
+ enabled: "${MQTT_ENABLED:true}"
+ bind_address: "${MQTT_BIND_ADDRESS:0.0.0.0}"
+ bind_port: "${MQTT_BIND_PORT:1883}"
+ adaptor: "${MQTT_ADAPTOR_NAME:JsonMqttAdaptor}"
+ timeout: "${MQTT_TIMEOUT:10000}"
+ netty:
+ leak_detector_level: "${NETTY_LEASK_DETECTOR_LVL:DISABLED}"
+ boss_group_thread_count: "${NETTY_BOSS_GROUP_THREADS:1}"
+ worker_group_thread_count: "${NETTY_WORKER_GROUP_THREADS:12}"
+ max_payload_size: "${NETTY_MAX_PAYLOAD_SIZE:65536}"
+ # MQTT SSL configuration
+ ssl:
+ # Enable/disable SSL support
+ enabled: "${MQTT_SSL_ENABLED:false}"
+ # SSL protocol: See http://docs.oracle.com/javase/8/docs/technotes/guides/security/StandardNames.html#SSLContext
+ protocol: "${MQTT_SSL_PROTOCOL:TLSv1.2}"
+ # Path to the key store that holds the SSL certificate
+ key_store: "${MQTT_SSL_KEY_STORE:mqttserver.jks}"
+ # Password used to access the key store
+ key_store_password: "${MQTT_SSL_KEY_STORE_PASSWORD:server_ks_password}"
+ # Password used to access the key
+ key_password: "${MQTT_SSL_KEY_PASSWORD:server_key_password}"
+ # Type of the key store
+ key_store_type: "${MQTT_SSL_KEY_STORE_TYPE:JKS}"
+
+kafka:
+ enabled: true
+ bootstrap.servers: "${TB_KAFKA_SERVERS:localhost:9092}"
+ acks: "${TB_KAFKA_ACKS:all}"
+ retries: "${TB_KAFKA_RETRIES:1}"
+ batch.size: "${TB_KAFKA_BATCH_SIZE:16384}"
+ linger.ms: "${TB_KAFKA_LINGER_MS:1}"
+ buffer.memory: "${TB_BUFFER_MEMORY:33554432}"
+ topic:
+ telemetry: "${TB_TELEMETRY_TOPIC:tb.transport.telemetry}"
+ requests: "${TB_TELEMETRY_TOPIC:tb.transport.requests}"
\ No newline at end of file
transport/pom.xml 1(+1 -0)
diff --git a/transport/pom.xml b/transport/pom.xml
index 95b0112..a01c7ac 100644
--- a/transport/pom.xml
+++ b/transport/pom.xml
@@ -38,6 +38,7 @@
<module>http</module>
<module>coap</module>
<module>mqtt</module>
+ <module>mqtt-transport</module>
</modules>
<dependencies>