thingsboard-aplcache
Changes
common/transport/src/main/java/org/thingsboard/server/common/transport/session/DeviceAwareSessionContext.java 14(+14 -0)
transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttAdaptor.java 65(+41 -24)
transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttGatewayAdaptor.java 49(+49 -0)
transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/MqttGatewayAdaptor.java 36(+36 -0)
transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/MqttTransportAdaptor.java 4(+2 -2)
transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java 205(+137 -68)
transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportServerInitializer.java 7(+5 -2)
transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportService.java 11(+5 -6)
transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/DeviceSessionCtx.java 4(+2 -2)
Details
diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/session/DeviceAwareSessionContext.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/session/DeviceAwareSessionContext.java
index 8321c04..89debed 100644
--- a/common/transport/src/main/java/org/thingsboard/server/common/transport/session/DeviceAwareSessionContext.java
+++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/session/DeviceAwareSessionContext.java
@@ -42,6 +42,12 @@ public abstract class DeviceAwareSessionContext implements SessionContext {
this.authService = authService;
}
+ public DeviceAwareSessionContext(SessionMsgProcessor processor, DeviceAuthService authService, Device device) {
+ this(processor, authService);
+ this.device = device;
+ }
+
+
public boolean login(DeviceCredentialsFilter credentials) {
DeviceAuthResult result = authService.process(credentials);
if (result.isSuccess()) {
@@ -56,6 +62,14 @@ public abstract class DeviceAwareSessionContext implements SessionContext {
}
}
+ public DeviceAuthService getAuthService() {
+ return authService;
+ }
+
+ public SessionMsgProcessor getProcessor() {
+ return processor;
+ }
+
public Device getDevice() {
return device;
}
diff --git a/dao/src/main/java/org/thingsboard/server/dao/device/DeviceService.java b/dao/src/main/java/org/thingsboard/server/dao/device/DeviceService.java
index ad4c338..8d780b6 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/device/DeviceService.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/device/DeviceService.java
@@ -22,10 +22,14 @@ import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.page.TextPageData;
import org.thingsboard.server.common.data.page.TextPageLink;
+import java.util.Optional;
+
public interface DeviceService {
Device findDeviceById(DeviceId deviceId);
+ Optional<Device> findDeviceByTenantIdAndName(TenantId tenantId, String name);
+
Device saveDevice(Device device);
Device assignDeviceToCustomer(DeviceId deviceId, CustomerId customerId);
diff --git a/dao/src/main/java/org/thingsboard/server/dao/device/DeviceServiceImpl.java b/dao/src/main/java/org/thingsboard/server/dao/device/DeviceServiceImpl.java
index 3a3c018..681188e 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/device/DeviceServiceImpl.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/device/DeviceServiceImpl.java
@@ -38,6 +38,7 @@ import org.thingsboard.server.dao.service.PaginatedRemover;
import org.thingsboard.server.dao.tenant.TenantDao;
import java.util.List;
+import java.util.Optional;
import static org.thingsboard.server.dao.DaoUtil.convertDataList;
import static org.thingsboard.server.dao.DaoUtil.getData;
@@ -70,6 +71,18 @@ public class DeviceServiceImpl implements DeviceService {
}
@Override
+ public Optional<Device> findDeviceByTenantIdAndName(TenantId tenantId, String name) {
+ log.trace("Executing findDeviceByTenantIdAndName [{}][{}]", tenantId, name);
+ validateId(tenantId, "Incorrect tenantId " + tenantId);
+ Optional<DeviceEntity> deviceEntityOpt = deviceDao.findDevicesByTenantIdAndName(tenantId.getId(), name);
+ if (deviceEntityOpt.isPresent()) {
+ return Optional.of(getData(deviceEntityOpt.get()));
+ } else {
+ return Optional.empty();
+ }
+ }
+
+ @Override
public Device saveDevice(Device device) {
log.trace("Executing saveDevice [{}]", device);
deviceValidator.validate(device);
diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttAdaptor.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttAdaptor.java
index c2cd86e..ae49cd5 100644
--- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttAdaptor.java
+++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttAdaptor.java
@@ -15,23 +15,31 @@
*/
package org.thingsboard.server.transport.mqtt.adaptors;
-import com.google.gson.*;
+import com.google.gson.Gson;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonParser;
+import com.google.gson.JsonSyntaxException;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.handler.codec.mqtt.*;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
+import org.thingsboard.server.common.data.id.SessionId;
import org.thingsboard.server.common.msg.core.*;
import org.thingsboard.server.common.msg.kv.AttributesKVMsg;
import org.thingsboard.server.common.msg.session.*;
import org.thingsboard.server.common.transport.adaptor.AdaptorException;
import org.thingsboard.server.common.transport.adaptor.JsonConverter;
+import org.thingsboard.server.transport.mqtt.MqttTopics;
+import org.thingsboard.server.transport.mqtt.session.DeviceSessionCtx;
import org.thingsboard.server.transport.mqtt.MqttTransportHandler;
-import org.thingsboard.server.transport.mqtt.session.MqttSessionCtx;
import java.nio.charset.Charset;
-import java.util.*;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
/**
* @author Andrew Shvayka
@@ -45,7 +53,7 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor {
private static final ByteBufAllocator ALLOCATOR = new UnpooledByteBufAllocator(false);
@Override
- public AdaptorToSessionActorMsg convertToActorMsg(MqttSessionCtx ctx, MsgType type, MqttMessage inbound) throws AdaptorException {
+ public AdaptorToSessionActorMsg convertToActorMsg(DeviceSessionCtx ctx, MsgType type, MqttMessage inbound) throws AdaptorException {
FromDeviceMsg msg;
switch (type) {
case POST_TELEMETRY_REQUEST:
@@ -83,7 +91,7 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor {
}
@Override
- public Optional<MqttMessage> convertToAdaptorMsg(MqttSessionCtx ctx, SessionActorToAdaptorMsg sessionMsg) throws AdaptorException {
+ public Optional<MqttMessage> convertToAdaptorMsg(DeviceSessionCtx ctx, SessionActorToAdaptorMsg sessionMsg) throws AdaptorException {
MqttMessage result = null;
ToDeviceMsg msg = sessionMsg.getMsg();
switch (msg.getMsgType()) {
@@ -100,7 +108,7 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor {
GetAttributesResponse response = (GetAttributesResponse) msg;
if (response.isSuccess()) {
result = createMqttPublishMsg(ctx,
- MqttTransportHandler.ATTRIBUTES_RESPONSE_TOPIC_PREFIX + requestId,
+ MqttTopics.DEVICE_ATTRIBUTES_RESPONSE_TOPIC_PREFIX + requestId,
response.getData().get(), true);
} else {
throw new AdaptorException(response.getError().get());
@@ -115,16 +123,16 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor {
break;
case ATTRIBUTES_UPDATE_NOTIFICATION:
AttributesUpdateNotification notification = (AttributesUpdateNotification) msg;
- result = createMqttPublishMsg(ctx, MqttTransportHandler.ATTRIBUTES_TOPIC, notification.getData(), false);
+ result = createMqttPublishMsg(ctx, MqttTopics.DEVICE_ATTRIBUTES_TOPIC, notification.getData(), false);
break;
case TO_DEVICE_RPC_REQUEST:
ToDeviceRpcRequestMsg rpcRequest = (ToDeviceRpcRequestMsg) msg;
- result = createMqttPublishMsg(ctx, MqttTransportHandler.RPC_REQUESTS_TOPIC + rpcRequest.getRequestId(),
+ result = createMqttPublishMsg(ctx, MqttTopics.DEVICE_RPC_REQUESTS_TOPIC + rpcRequest.getRequestId(),
rpcRequest);
break;
case TO_SERVER_RPC_RESPONSE:
ToServerRpcResponseMsg rpcResponse = (ToServerRpcResponseMsg) msg;
- result = createMqttPublishMsg(ctx, MqttTransportHandler.RPC_REQUESTS_TOPIC + rpcResponse.getRequestId(),
+ result = createMqttPublishMsg(ctx, MqttTopics.DEVICE_RPC_REQUESTS_TOPIC + rpcResponse.getRequestId(),
rpcResponse);
break;
case RULE_ENGINE_ERROR:
@@ -135,19 +143,19 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor {
return Optional.ofNullable(result);
}
- private MqttPublishMessage createMqttPublishMsg(MqttSessionCtx ctx, String topic, AttributesKVMsg msg, boolean asMap) {
+ private MqttPublishMessage createMqttPublishMsg(DeviceSessionCtx ctx, String topic, AttributesKVMsg msg, boolean asMap) {
return createMqttPublishMsg(ctx, topic, JsonConverter.toJson(msg, asMap));
}
- private MqttPublishMessage createMqttPublishMsg(MqttSessionCtx ctx, String topic, ToDeviceRpcRequestMsg msg) {
+ private MqttPublishMessage createMqttPublishMsg(DeviceSessionCtx ctx, String topic, ToDeviceRpcRequestMsg msg) {
return createMqttPublishMsg(ctx, topic, JsonConverter.toJson(msg, false));
}
- private MqttPublishMessage createMqttPublishMsg(MqttSessionCtx ctx, String topic, ToServerRpcResponseMsg msg) {
+ private MqttPublishMessage createMqttPublishMsg(DeviceSessionCtx ctx, String topic, ToServerRpcResponseMsg msg) {
return createMqttPublishMsg(ctx, topic, JsonConverter.toJson(msg));
}
- private MqttPublishMessage createMqttPublishMsg(MqttSessionCtx ctx, String topic, JsonElement json) {
+ private MqttPublishMessage createMqttPublishMsg(DeviceSessionCtx ctx, String topic, JsonElement json) {
MqttFixedHeader mqttFixedHeader =
new MqttFixedHeader(MqttMessageType.PUBLISH, false, MqttQoS.AT_LEAST_ONCE, false, 0);
MqttPublishVariableHeader header = new MqttPublishVariableHeader(topic, ctx.nextMsgId());
@@ -156,10 +164,10 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor {
return new MqttPublishMessage(mqttFixedHeader, header, payload);
}
- private FromDeviceMsg convertToGetAttributesRequest(MqttSessionCtx ctx, MqttPublishMessage inbound) throws AdaptorException {
+ private FromDeviceMsg convertToGetAttributesRequest(DeviceSessionCtx ctx, MqttPublishMessage inbound) throws AdaptorException {
String topicName = inbound.variableHeader().topicName();
try {
- Integer requestId = Integer.valueOf(topicName.substring(MqttTransportHandler.ATTRIBUTES_REQUEST_TOPIC_PREFIX.length()));
+ Integer requestId = Integer.valueOf(topicName.substring(MqttTopics.DEVICE_ATTRIBUTES_REQUEST_TOPIC_PREFIX.length()));
String payload = inbound.payload().toString(UTF8);
JsonElement requestBody = new JsonParser().parse(payload);
Set<String> clientKeys = toStringSet(requestBody, "clientKeys");
@@ -175,10 +183,10 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor {
}
}
- private FromDeviceMsg convertToRpcCommandResponse(MqttSessionCtx ctx, MqttPublishMessage inbound) throws AdaptorException {
+ private FromDeviceMsg convertToRpcCommandResponse(DeviceSessionCtx ctx, MqttPublishMessage inbound) throws AdaptorException {
String topicName = inbound.variableHeader().topicName();
try {
- Integer requestId = Integer.valueOf(topicName.substring(MqttTransportHandler.RPC_RESPONSE_TOPIC.length()));
+ Integer requestId = Integer.valueOf(topicName.substring(MqttTopics.DEVICE_RPC_RESPONSE_TOPIC.length()));
String payload = inbound.payload().toString(UTF8);
return new ToDeviceRpcResponseMsg(
requestId,
@@ -199,7 +207,7 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor {
}
private UpdateAttributesRequest convertToUpdateAttributesRequest(SessionContext ctx, MqttPublishMessage inbound) throws AdaptorException {
- String payload = validatePayload(ctx, inbound.payload());
+ String payload = validatePayload(ctx.getSessionId(), inbound.payload());
try {
return JsonConverter.convertToAttributes(new JsonParser().parse(payload), inbound.variableHeader().messageId());
} catch (IllegalStateException | JsonSyntaxException ex) {
@@ -208,7 +216,7 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor {
}
private TelemetryUploadRequest convertToTelemetryUploadRequest(SessionContext ctx, MqttPublishMessage inbound) throws AdaptorException {
- String payload = validatePayload(ctx, inbound.payload());
+ String payload = validatePayload(ctx.getSessionId(), inbound.payload());
try {
return JsonConverter.convertToTelemetry(new JsonParser().parse(payload), inbound.variableHeader().messageId());
} catch (IllegalStateException | JsonSyntaxException ex) {
@@ -216,22 +224,31 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor {
}
}
- private FromDeviceMsg convertToServerRpcRequest(MqttSessionCtx ctx, MqttPublishMessage inbound) throws AdaptorException {
+ private FromDeviceMsg convertToServerRpcRequest(DeviceSessionCtx ctx, MqttPublishMessage inbound) throws AdaptorException {
String topicName = inbound.variableHeader().topicName();
- String payload = validatePayload(ctx, inbound.payload());
+ String payload = validatePayload(ctx.getSessionId(), inbound.payload());
try {
- Integer requestId = Integer.valueOf(topicName.substring(MqttTransportHandler.RPC_REQUESTS_TOPIC.length()));
+ Integer requestId = Integer.valueOf(topicName.substring(MqttTopics.DEVICE_RPC_REQUESTS_TOPIC.length()));
return JsonConverter.convertToServerRpcRequest(new JsonParser().parse(payload), requestId);
} catch (IllegalStateException | JsonSyntaxException ex) {
throw new AdaptorException(ex);
}
}
- private String validatePayload(SessionContext ctx, ByteBuf payloadData) throws AdaptorException {
+ public static JsonElement validateJsonPayload(SessionId sessionId, ByteBuf payloadData) throws AdaptorException {
+ String payload = validatePayload(sessionId, payloadData);
+ try {
+ return new JsonParser().parse(payload);
+ } catch (JsonSyntaxException ex) {
+ throw new AdaptorException(ex);
+ }
+ }
+
+ public static String validatePayload(SessionId sessionId, ByteBuf payloadData) throws AdaptorException {
try {
String payload = payloadData.toString(UTF8);
if (payload == null) {
- log.warn("[{}] Payload is empty!", ctx.getSessionId());
+ log.warn("[{}] Payload is empty!", sessionId);
throw new AdaptorException(new IllegalArgumentException("Payload is empty!"));
}
return payload;
diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttGatewayAdaptor.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttGatewayAdaptor.java
new file mode 100644
index 0000000..02f29be
--- /dev/null
+++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttGatewayAdaptor.java
@@ -0,0 +1,49 @@
+/**
+ * Copyright © 2016-2017 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.transport.mqtt.adaptors;
+
+import com.google.gson.Gson;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.UnpooledByteBufAllocator;
+import io.netty.handler.codec.mqtt.MqttMessage;
+import org.thingsboard.server.common.msg.session.AdaptorToSessionActorMsg;
+import org.thingsboard.server.common.msg.session.MsgType;
+import org.thingsboard.server.common.msg.session.SessionActorToAdaptorMsg;
+import org.thingsboard.server.common.transport.adaptor.AdaptorException;
+import org.thingsboard.server.transport.mqtt.session.GatewaySessionCtx;
+
+import java.nio.charset.Charset;
+import java.util.Optional;
+
+/**
+ * Created by ashvayka on 19.01.17.
+ */
+public class JsonMqttGatewayAdaptor implements MqttGatewayAdaptor {
+
+ private static final Gson GSON = new Gson();
+ private static final Charset UTF8 = Charset.forName("UTF-8");
+ private static final ByteBufAllocator ALLOCATOR = new UnpooledByteBufAllocator(false);
+
+ @Override
+ public AdaptorToSessionActorMsg convertToActorMsg(GatewaySessionCtx ctx, MsgType type, MqttMessage inbound) throws AdaptorException {
+ return null;
+ }
+
+ @Override
+ public Optional<MqttMessage> convertToAdaptorMsg(GatewaySessionCtx ctx, SessionActorToAdaptorMsg msg) throws AdaptorException {
+ return null;
+ }
+}
diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/MqttGatewayAdaptor.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/MqttGatewayAdaptor.java
new file mode 100644
index 0000000..5641af5
--- /dev/null
+++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/MqttGatewayAdaptor.java
@@ -0,0 +1,36 @@
+/**
+ * Copyright © 2016-2017 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.transport.mqtt.adaptors;
+
+import io.netty.handler.codec.mqtt.MqttMessage;
+import org.thingsboard.server.common.msg.session.AdaptorToSessionActorMsg;
+import org.thingsboard.server.common.msg.session.MsgType;
+import org.thingsboard.server.common.msg.session.SessionActorToAdaptorMsg;
+import org.thingsboard.server.common.transport.adaptor.AdaptorException;
+import org.thingsboard.server.transport.mqtt.session.GatewaySessionCtx;
+
+import java.util.Optional;
+
+/**
+ * Created by ashvayka on 19.01.17.
+ */
+public interface MqttGatewayAdaptor {
+
+ AdaptorToSessionActorMsg convertToActorMsg(GatewaySessionCtx ctx, MsgType type, MqttMessage inbound) throws AdaptorException;
+
+ Optional<MqttMessage> convertToAdaptorMsg(GatewaySessionCtx ctx, SessionActorToAdaptorMsg msg) throws AdaptorException;
+
+}
diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/MqttTransportAdaptor.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/MqttTransportAdaptor.java
index 0de11e4..7f8e1d7 100644
--- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/MqttTransportAdaptor.java
+++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/MqttTransportAdaptor.java
@@ -17,10 +17,10 @@ package org.thingsboard.server.transport.mqtt.adaptors;
import io.netty.handler.codec.mqtt.MqttMessage;
import org.thingsboard.server.common.transport.TransportAdaptor;
-import org.thingsboard.server.transport.mqtt.session.MqttSessionCtx;
+import org.thingsboard.server.transport.mqtt.session.DeviceSessionCtx;
/**
* @author Andrew Shvayka
*/
-public interface MqttTransportAdaptor extends TransportAdaptor<MqttSessionCtx, MqttMessage, MqttMessage> {
+public interface MqttTransportAdaptor extends TransportAdaptor<DeviceSessionCtx, MqttMessage, MqttMessage> {
}
diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTopics.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTopics.java
new file mode 100644
index 0000000..4f91e1a
--- /dev/null
+++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTopics.java
@@ -0,0 +1,43 @@
+/**
+ * Copyright © 2016-2017 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.transport.mqtt;
+
+/**
+ * Created by ashvayka on 19.01.17.
+ */
+public class MqttTopics {
+
+ public static final String BASE_DEVICE_API_TOPIC = "v1/devices/me";
+ public static final String DEVICE_RPC_RESPONSE_TOPIC = BASE_DEVICE_API_TOPIC + "/rpc/response/";
+ public static final String DEVICE_RPC_RESPONSE_SUB_TOPIC = DEVICE_RPC_RESPONSE_TOPIC + "+";
+ public static final String DEVICE_RPC_REQUESTS_TOPIC = BASE_DEVICE_API_TOPIC + "/rpc/request/";
+ public static final String DEVICE_RPC_REQUESTS_SUB_TOPIC = DEVICE_RPC_REQUESTS_TOPIC + "+";
+ public static final String DEVICE_ATTRIBUTES_RESPONSE_TOPIC_PREFIX = BASE_DEVICE_API_TOPIC + "/attributes/response/";
+ public static final String DEVICE_ATTRIBUTES_RESPONSES_TOPIC = DEVICE_ATTRIBUTES_RESPONSE_TOPIC_PREFIX + "+";
+ public static final String DEVICE_ATTRIBUTES_REQUEST_TOPIC_PREFIX = BASE_DEVICE_API_TOPIC + "/attributes/request/";
+ public static final String DEVICE_TELEMETRY_TOPIC = BASE_DEVICE_API_TOPIC + "/telemetry";
+ public static final String DEVICE_ATTRIBUTES_TOPIC = BASE_DEVICE_API_TOPIC + "/attributes";
+
+ public static final String BASE_GATEWAY_API_TOPIC = "v1/gateway";
+ public static final String GATEWAY_CONNECT_TOPIC = "v1/gateway/connect";
+ public static final String GATEWAY_DISCONNECT_TOPIC = "v1/gateway/disconnect";
+ public static final String GATEWAY_ATTRIBUTES_TOPIC = "v1/gateway/attributes";
+ public static final String GATEWAY_TELEMETRY_TOPIC = "v1/gateway/telemetry";
+
+
+ private MqttTopics() {
+ }
+}
diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
index 30b37e3..e744955 100644
--- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
+++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
@@ -15,26 +15,33 @@
*/
package org.thingsboard.server.transport.mqtt;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.google.gson.JsonElement;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.mqtt.*;
+import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
+import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.ssl.SslHandler;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.StringUtils;
+import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.security.DeviceTokenCredentials;
import org.thingsboard.server.common.data.security.DeviceX509Credentials;
import org.thingsboard.server.common.msg.session.AdaptorToSessionActorMsg;
import org.thingsboard.server.common.msg.session.BasicToDeviceActorSessionMsg;
-import org.thingsboard.server.common.msg.session.MsgType;
import org.thingsboard.server.common.msg.session.ctrl.SessionCloseMsg;
import org.thingsboard.server.common.transport.SessionMsgProcessor;
import org.thingsboard.server.common.transport.adaptor.AdaptorException;
import org.thingsboard.server.common.transport.auth.DeviceAuthService;
import org.thingsboard.server.dao.EncryptionUtil;
+import org.thingsboard.server.dao.device.DeviceService;
+import org.thingsboard.server.transport.mqtt.adaptors.JsonMqttAdaptor;
import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor;
-import org.thingsboard.server.transport.mqtt.session.MqttSessionCtx;
+import org.thingsboard.server.transport.mqtt.session.GatewaySessionCtx;
+import org.thingsboard.server.transport.mqtt.session.DeviceSessionCtx;
import org.thingsboard.server.transport.mqtt.util.SslUtil;
import javax.net.ssl.SSLPeerUnverifiedException;
@@ -42,35 +49,38 @@ import javax.security.cert.X509Certificate;
import java.util.ArrayList;
import java.util.List;
+import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.*;
+import static io.netty.handler.codec.mqtt.MqttMessageType.*;
+import static io.netty.handler.codec.mqtt.MqttQoS.*;
+import static org.thingsboard.server.common.msg.session.MsgType.*;
+import static org.thingsboard.server.transport.mqtt.MqttTopics.*;
+
/**
* @author Andrew Shvayka
*/
@Slf4j
public class MqttTransportHandler extends ChannelInboundHandlerAdapter implements GenericFutureListener<Future<? super Void>> {
- public static final MqttQoS MAX_SUPPORTED_QOS_LVL = MqttQoS.AT_LEAST_ONCE;
- public static final String BASE_TOPIC = "v1/devices/me";
- public static final String ATTRIBUTES_TOPIC = BASE_TOPIC + "/attributes";
- public static final String TELEMETRY_TOPIC = BASE_TOPIC + "/telemetry";
- public static final String ATTRIBUTES_REQUEST_TOPIC_PREFIX = BASE_TOPIC + "/attributes/request/";
- public static final String ATTRIBUTES_RESPONSE_TOPIC_PREFIX = BASE_TOPIC + "/attributes/response/";
- public static final String ATTRIBUTES_RESPONSES_TOPIC = ATTRIBUTES_RESPONSE_TOPIC_PREFIX + "+";
- public static final String RPC_REQUESTS_TOPIC = BASE_TOPIC + "/rpc/request/";
- public static final String RPC_REQUESTS_SUB_TOPIC = RPC_REQUESTS_TOPIC + "+";
- public static final String RPC_RESPONSE_TOPIC = BASE_TOPIC + "/rpc/response/";
- public static final String RPC_RESPONSE_SUB_TOPIC = RPC_RESPONSE_TOPIC + "+";
- private final MqttSessionCtx sessionCtx;
+ public static final MqttQoS MAX_SUPPORTED_QOS_LVL = AT_LEAST_ONCE;
+
+ private final DeviceSessionCtx deviceSessionCtx;
private final String sessionId;
private final MqttTransportAdaptor adaptor;
private final SessionMsgProcessor processor;
+ private final DeviceService deviceService;
+ private final DeviceAuthService authService;
private final SslHandler sslHandler;
+ private volatile boolean connected;
+ private volatile GatewaySessionCtx gatewaySessionCtx;
- public MqttTransportHandler(SessionMsgProcessor processor, DeviceAuthService authService,
+ public MqttTransportHandler(SessionMsgProcessor processor, DeviceService deviceService, DeviceAuthService authService,
MqttTransportAdaptor adaptor, SslHandler sslHandler) {
this.processor = processor;
+ this.deviceService = deviceService;
+ this.authService = authService;
this.adaptor = adaptor;
- this.sessionCtx = new MqttSessionCtx(processor, authService, adaptor);
- this.sessionId = sessionCtx.getSessionId().toUidStr();
+ this.deviceSessionCtx = new DeviceSessionCtx(processor, authService, adaptor);
+ this.sessionId = deviceSessionCtx.getSessionId().toUidStr();
this.sslHandler = sslHandler;
}
@@ -83,7 +93,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
}
private void processMqttMsg(ChannelHandlerContext ctx, MqttMessage msg) {
- sessionCtx.setChannel(ctx);
+ deviceSessionCtx.setChannel(ctx);
switch (msg.fixedHeader().messageType()) {
case CONNECT:
processConnect(ctx, (MqttConnectMessage) msg);
@@ -98,36 +108,68 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
processUnsubscribe(ctx, (MqttUnsubscribeMessage) msg);
break;
case PINGREQ:
- ctx.writeAndFlush(new MqttMessage(new MqttFixedHeader(MqttMessageType.PINGRESP, false, MqttQoS.AT_MOST_ONCE, false, 0)));
+ if (checkConnected(ctx)) {
+ ctx.writeAndFlush(new MqttMessage(new MqttFixedHeader(PINGRESP, false, AT_MOST_ONCE, false, 0)));
+ }
break;
case DISCONNECT:
- processDisconnect(ctx);
+ if (checkConnected(ctx)) {
+ processDisconnect(ctx);
+ }
break;
}
}
private void processPublish(ChannelHandlerContext ctx, MqttPublishMessage mqttMsg) {
+ if (!checkConnected(ctx)) {
+ return;
+ }
String topicName = mqttMsg.variableHeader().topicName();
int msgId = mqttMsg.variableHeader().messageId();
log.trace("[{}] Processing publish msg [{}][{}]!", sessionId, topicName, msgId);
+
+ if (topicName.startsWith(BASE_GATEWAY_API_TOPIC)) {
+ AdaptorToSessionActorMsg msg = null;
+ if (gatewaySessionCtx != null) {
+ try {
+ if (topicName.equals(GATEWAY_CONNECT_TOPIC)) {
+ gatewaySessionCtx.connect(getDeviceName(mqttMsg));
+ } else if (topicName.equals(GATEWAY_DISCONNECT_TOPIC)) {
+ gatewaySessionCtx.disconnect(getDeviceName(mqttMsg));
+ }
+ } catch (RuntimeException | AdaptorException e) {
+ log.warn("[{}] Failed to process publish msg [{}][{}]", sessionId, topicName, msgId, e);
+ }
+ }
+ } else {
+ processDevicePublish(ctx, mqttMsg, topicName, msgId);
+ }
+ }
+
+ private String getDeviceName(MqttPublishMessage mqttMsg) throws AdaptorException {
+ JsonElement json = JsonMqttAdaptor.validateJsonPayload(deviceSessionCtx.getSessionId(), mqttMsg.payload());
+ return json.getAsJsonObject().get("device").getAsString();
+ }
+
+ private void processDevicePublish(ChannelHandlerContext ctx, MqttPublishMessage mqttMsg, String topicName, int msgId) {
AdaptorToSessionActorMsg msg = null;
try {
- if (topicName.equals(ATTRIBUTES_TOPIC)) {
- msg = adaptor.convertToActorMsg(sessionCtx, MsgType.POST_ATTRIBUTES_REQUEST, mqttMsg);
- } else if (topicName.equals(TELEMETRY_TOPIC)) {
- msg = adaptor.convertToActorMsg(sessionCtx, MsgType.POST_TELEMETRY_REQUEST, mqttMsg);
- } else if (topicName.startsWith(ATTRIBUTES_REQUEST_TOPIC_PREFIX)) {
- msg = adaptor.convertToActorMsg(sessionCtx, MsgType.GET_ATTRIBUTES_REQUEST, mqttMsg);
+ if (topicName.equals(DEVICE_TELEMETRY_TOPIC)) {
+ msg = adaptor.convertToActorMsg(deviceSessionCtx, POST_TELEMETRY_REQUEST, mqttMsg);
+ } else if (topicName.equals(DEVICE_ATTRIBUTES_TOPIC)) {
+ msg = adaptor.convertToActorMsg(deviceSessionCtx, POST_ATTRIBUTES_REQUEST, mqttMsg);
+ } else if (topicName.startsWith(DEVICE_ATTRIBUTES_REQUEST_TOPIC_PREFIX)) {
+ msg = adaptor.convertToActorMsg(deviceSessionCtx, GET_ATTRIBUTES_REQUEST, mqttMsg);
if (msgId >= 0) {
ctx.writeAndFlush(createMqttPubAckMsg(msgId));
}
- } else if (topicName.startsWith(RPC_RESPONSE_TOPIC)) {
- msg = adaptor.convertToActorMsg(sessionCtx, MsgType.TO_DEVICE_RPC_RESPONSE, mqttMsg);
+ } else if (topicName.startsWith(DEVICE_RPC_RESPONSE_TOPIC)) {
+ msg = adaptor.convertToActorMsg(deviceSessionCtx, TO_DEVICE_RPC_RESPONSE, mqttMsg);
if (msgId >= 0) {
ctx.writeAndFlush(createMqttPubAckMsg(msgId));
}
- } else if (topicName.startsWith(RPC_REQUESTS_TOPIC)) {
- msg = adaptor.convertToActorMsg(sessionCtx, MsgType.TO_SERVER_RPC_REQUEST, mqttMsg);
+ } else if (topicName.startsWith(DEVICE_RPC_REQUESTS_TOPIC)) {
+ msg = adaptor.convertToActorMsg(deviceSessionCtx, TO_SERVER_RPC_REQUEST, mqttMsg);
if (msgId >= 0) {
ctx.writeAndFlush(createMqttPubAckMsg(msgId));
}
@@ -135,60 +177,65 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
} catch (AdaptorException e) {
log.warn("[{}] Failed to process publish msg [{}][{}]", sessionId, topicName, msgId, e);
}
-
if (msg != null) {
- processor.process(new BasicToDeviceActorSessionMsg(sessionCtx.getDevice(), msg));
+ processor.process(new BasicToDeviceActorSessionMsg(deviceSessionCtx.getDevice(), msg));
} else {
- log.warn("[{}] Closing current session due to invalid publish msg [{}][{}]", sessionId, topicName, msgId);
+ log.info("[{}] Closing current session due to invalid publish msg [{}][{}]", sessionId, topicName, msgId);
ctx.close();
}
}
private void processSubscribe(ChannelHandlerContext ctx, MqttSubscribeMessage mqttMsg) {
- log.info("[{}] Processing subscription [{}]!", sessionId, mqttMsg.variableHeader().messageId());
+ if (!checkConnected(ctx)) {
+ return;
+ }
+ log.trace("[{}] Processing subscription [{}]!", sessionId, mqttMsg.variableHeader().messageId());
List<Integer> grantedQoSList = new ArrayList<>();
for (MqttTopicSubscription subscription : mqttMsg.payload().topicSubscriptions()) {
String topicName = subscription.topicName();
//TODO: handle this qos level.
MqttQoS reqQoS = subscription.qualityOfService();
try {
- if (topicName.equals(ATTRIBUTES_TOPIC)) {
- AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(sessionCtx, MsgType.SUBSCRIBE_ATTRIBUTES_REQUEST, mqttMsg);
- processor.process(new BasicToDeviceActorSessionMsg(sessionCtx.getDevice(), msg));
+ if (topicName.equals(DEVICE_ATTRIBUTES_TOPIC)) {
+ AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, SUBSCRIBE_ATTRIBUTES_REQUEST, mqttMsg);
+ processor.process(new BasicToDeviceActorSessionMsg(deviceSessionCtx.getDevice(), msg));
grantedQoSList.add(getMinSupportedQos(reqQoS));
- } else if (topicName.equals(RPC_REQUESTS_SUB_TOPIC)) {
- AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(sessionCtx, MsgType.SUBSCRIBE_RPC_COMMANDS_REQUEST, mqttMsg);
- processor.process(new BasicToDeviceActorSessionMsg(sessionCtx.getDevice(), msg));
+ } else if (topicName.equals(DEVICE_RPC_REQUESTS_SUB_TOPIC)) {
+ AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, SUBSCRIBE_RPC_COMMANDS_REQUEST, mqttMsg);
+ processor.process(new BasicToDeviceActorSessionMsg(deviceSessionCtx.getDevice(), msg));
grantedQoSList.add(getMinSupportedQos(reqQoS));
- } else if (topicName.equals(RPC_RESPONSE_SUB_TOPIC)) {
+ } else if (topicName.equals(DEVICE_RPC_RESPONSE_SUB_TOPIC)) {
grantedQoSList.add(getMinSupportedQos(reqQoS));
- } else if (topicName.equals(ATTRIBUTES_RESPONSES_TOPIC)) {
- sessionCtx.setAllowAttributeResponses();
+ } else if (topicName.equals(DEVICE_ATTRIBUTES_RESPONSES_TOPIC)) {
+ deviceSessionCtx.setAllowAttributeResponses();
grantedQoSList.add(getMinSupportedQos(reqQoS));
} else {
log.warn("[{}] Failed to subscribe to [{}][{}]", sessionId, topicName, reqQoS);
- grantedQoSList.add(MqttQoS.FAILURE.value());
+ grantedQoSList.add(FAILURE.value());
}
} catch (AdaptorException e) {
log.warn("[{}] Failed to subscribe to [{}][{}]", sessionId, topicName, reqQoS);
- grantedQoSList.add(MqttQoS.FAILURE.value());
+ grantedQoSList.add(FAILURE.value());
}
}
ctx.writeAndFlush(createSubAckMessage(mqttMsg.variableHeader().messageId(), grantedQoSList));
}
private void processUnsubscribe(ChannelHandlerContext ctx, MqttUnsubscribeMessage mqttMsg) {
- log.info("[{}] Processing subscription [{}]!", sessionId, mqttMsg.variableHeader().messageId());
+ if (!checkConnected(ctx)) {
+ return;
+ }
+ log.trace("[{}] Processing subscription [{}]!", sessionId, mqttMsg.variableHeader().messageId());
for (String topicName : mqttMsg.payload().topics()) {
try {
- if (topicName.equals(ATTRIBUTES_TOPIC)) {
- AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(sessionCtx, MsgType.UNSUBSCRIBE_ATTRIBUTES_REQUEST, mqttMsg);
- processor.process(new BasicToDeviceActorSessionMsg(sessionCtx.getDevice(), msg));
- } else if (topicName.equals(RPC_REQUESTS_SUB_TOPIC)) {
- AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(sessionCtx, MsgType.UNSUBSCRIBE_RPC_COMMANDS_REQUEST, mqttMsg);
- processor.process(new BasicToDeviceActorSessionMsg(sessionCtx.getDevice(), msg));
- } else if (topicName.equals(ATTRIBUTES_RESPONSES_TOPIC)) {
- sessionCtx.setDisallowAttributeResponses();
+ if (topicName.equals(DEVICE_ATTRIBUTES_TOPIC)) {
+ AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, UNSUBSCRIBE_ATTRIBUTES_REQUEST, mqttMsg);
+ processor.process(new BasicToDeviceActorSessionMsg(deviceSessionCtx.getDevice(), msg));
+ } else if (topicName.equals(DEVICE_RPC_REQUESTS_SUB_TOPIC)) {
+ AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, UNSUBSCRIBE_RPC_COMMANDS_REQUEST, mqttMsg);
+ processor.process(new BasicToDeviceActorSessionMsg(deviceSessionCtx.getDevice(), msg));
+ } else if (topicName.equals(DEVICE_ATTRIBUTES_RESPONSES_TOPIC)) {
+ deviceSessionCtx.setDisallowAttributeResponses();
}
} catch (AdaptorException e) {
log.warn("[{}] Failed to process unsubscription [{}] to [{}]", sessionId, mqttMsg.variableHeader().messageId(), topicName);
@@ -199,7 +246,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
private MqttMessage createUnSubAckMessage(int msgId) {
MqttFixedHeader mqttFixedHeader =
- new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_LEAST_ONCE, false, 0);
+ new MqttFixedHeader(SUBACK, false, AT_LEAST_ONCE, false, 0);
MqttMessageIdVariableHeader mqttMessageIdVariableHeader = MqttMessageIdVariableHeader.from(msgId);
return new MqttMessage(mqttFixedHeader, mqttMessageIdVariableHeader);
}
@@ -217,13 +264,15 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
private void processAuthTokenConnect(ChannelHandlerContext ctx, MqttConnectMessage msg) {
String userName = msg.payload().userName();
if (StringUtils.isEmpty(userName)) {
- ctx.writeAndFlush(createMqttConnAckMsg(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD));
+ ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD));
ctx.close();
- } else if (sessionCtx.login(new DeviceTokenCredentials(msg.payload().userName()))) {
- ctx.writeAndFlush(createMqttConnAckMsg(MqttConnectReturnCode.CONNECTION_ACCEPTED));
- } else {
- ctx.writeAndFlush(createMqttConnAckMsg(MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED));
+ } else if (!deviceSessionCtx.login(new DeviceTokenCredentials(msg.payload().userName()))) {
+ ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_NOT_AUTHORIZED));
ctx.close();
+ } else {
+ ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED));
+ connected = true;
+ checkGatewaySession();
}
}
@@ -231,14 +280,16 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
try {
String strCert = SslUtil.getX509CertificateString(cert);
String sha3Hash = EncryptionUtil.getSha3Hash(strCert);
- if (sessionCtx.login(new DeviceX509Credentials(sha3Hash))) {
- ctx.writeAndFlush(createMqttConnAckMsg(MqttConnectReturnCode.CONNECTION_ACCEPTED));
+ if (deviceSessionCtx.login(new DeviceX509Credentials(sha3Hash))) {
+ ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED));
+ connected = true;
+ checkGatewaySession();
} else {
- ctx.writeAndFlush(createMqttConnAckMsg(MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED));
+ ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_NOT_AUTHORIZED));
ctx.close();
}
} catch (Exception e) {
- ctx.writeAndFlush(createMqttConnAckMsg(MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED));
+ ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_NOT_AUTHORIZED));
ctx.close();
}
}
@@ -262,7 +313,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
private MqttConnAckMessage createMqttConnAckMsg(MqttConnectReturnCode returnCode) {
MqttFixedHeader mqttFixedHeader =
- new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
+ new MqttFixedHeader(CONNACK, false, AT_MOST_ONCE, false, 0);
MqttConnAckVariableHeader mqttConnAckVariableHeader =
new MqttConnAckVariableHeader(returnCode, true);
return new MqttConnAckMessage(mqttFixedHeader, mqttConnAckVariableHeader);
@@ -281,7 +332,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
private static MqttSubAckMessage createSubAckMessage(Integer msgId, List<Integer> grantedQoSList) {
MqttFixedHeader mqttFixedHeader =
- new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_LEAST_ONCE, false, 0);
+ new MqttFixedHeader(SUBACK, false, AT_LEAST_ONCE, false, 0);
MqttMessageIdVariableHeader mqttMessageIdVariableHeader = MqttMessageIdVariableHeader.from(msgId);
MqttSubAckPayload mqttSubAckPayload = new MqttSubAckPayload(grantedQoSList);
return new MqttSubAckMessage(mqttFixedHeader, mqttMessageIdVariableHeader, mqttSubAckPayload);
@@ -293,14 +344,32 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
public static MqttPubAckMessage createMqttPubAckMsg(int requestId) {
MqttFixedHeader mqttFixedHeader =
- new MqttFixedHeader(MqttMessageType.PUBACK, false, MqttQoS.AT_LEAST_ONCE, false, 0);
+ new MqttFixedHeader(PUBACK, false, AT_LEAST_ONCE, false, 0);
MqttMessageIdVariableHeader mqttMsgIdVariableHeader =
MqttMessageIdVariableHeader.from(requestId);
return new MqttPubAckMessage(mqttFixedHeader, mqttMsgIdVariableHeader);
}
+ private boolean checkConnected(ChannelHandlerContext ctx) {
+ if (connected) {
+ return true;
+ } else {
+ log.info("[{}] Closing current session due to invalid msg order [{}][{}]", sessionId);
+ ctx.close();
+ return false;
+ }
+ }
+
+ private void checkGatewaySession() {
+ Device device = deviceSessionCtx.getDevice();
+ JsonNode gatewayNode = device.getAdditionalInfo().get("gateway");
+ if (gatewayNode != null && gatewayNode.asBoolean()) {
+ gatewaySessionCtx = new GatewaySessionCtx(processor, deviceService, authService, device);
+ }
+ }
+
@Override
public void operationComplete(Future<? super Void> future) throws Exception {
- processor.process(SessionCloseMsg.onError(sessionCtx.getSessionId()));
+ processor.process(SessionCloseMsg.onError(deviceSessionCtx.getSessionId()));
}
}
diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportServerInitializer.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportServerInitializer.java
index 323ed1e..9444cdd 100644
--- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportServerInitializer.java
+++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportServerInitializer.java
@@ -29,6 +29,7 @@ import io.netty.handler.ssl.util.SelfSignedCertificate;
import org.springframework.beans.factory.annotation.Value;
import org.thingsboard.server.common.transport.SessionMsgProcessor;
import org.thingsboard.server.common.transport.auth.DeviceAuthService;
+import org.thingsboard.server.dao.device.DeviceService;
import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor;
import javax.net.ssl.SSLException;
@@ -40,13 +41,15 @@ import java.security.cert.CertificateException;
public class MqttTransportServerInitializer extends ChannelInitializer<SocketChannel> {
private final SessionMsgProcessor processor;
+ private final DeviceService deviceService;
private final DeviceAuthService authService;
private final MqttTransportAdaptor adaptor;
private final MqttSslHandlerProvider sslHandlerProvider;
- public MqttTransportServerInitializer(SessionMsgProcessor processor, DeviceAuthService authService, MqttTransportAdaptor adaptor,
+ public MqttTransportServerInitializer(SessionMsgProcessor processor, DeviceService deviceService, DeviceAuthService authService, MqttTransportAdaptor adaptor,
MqttSslHandlerProvider sslHandlerProvider) {
this.processor = processor;
+ this.deviceService = deviceService;
this.authService = authService;
this.adaptor = adaptor;
this.sslHandlerProvider = sslHandlerProvider;
@@ -63,7 +66,7 @@ public class MqttTransportServerInitializer extends ChannelInitializer<SocketCha
pipeline.addLast("decoder", new MqttDecoder());
pipeline.addLast("encoder", MqttEncoder.INSTANCE);
- MqttTransportHandler handler = new MqttTransportHandler(processor, authService, adaptor, sslHandler);
+ MqttTransportHandler handler = new MqttTransportHandler(processor, deviceService, authService, adaptor, sslHandler);
pipeline.addLast(handler);
ch.closeFuture().addListener(handler);
}
diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportService.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportService.java
index 9b969fc..1543400 100644
--- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportService.java
+++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportService.java
@@ -17,14 +17,11 @@ package org.thingsboard.server.transport.mqtt;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
-import io.netty.channel.ChannelHandler;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
-import io.netty.handler.ssl.SslHandler;
-import io.netty.util.ResourceLeakDetector;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
@@ -32,12 +29,11 @@ import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Service;
import org.thingsboard.server.common.transport.SessionMsgProcessor;
import org.thingsboard.server.common.transport.auth.DeviceAuthService;
+import org.thingsboard.server.dao.device.DeviceService;
import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
-import javax.net.ssl.SSLEngine;
-import java.util.concurrent.Executor;
/**
* @author Andrew Shvayka
@@ -56,6 +52,9 @@ public class MqttTransportService {
private SessionMsgProcessor processor;
@Autowired(required = false)
+ private DeviceService deviceService;
+
+ @Autowired(required = false)
private DeviceAuthService authService;
@Autowired(required = false)
@@ -87,7 +86,7 @@ public class MqttTransportService {
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.TRACE))
- .childHandler(new MqttTransportServerInitializer(processor, authService, adaptor, sslHandlerProvider));
+ .childHandler(new MqttTransportServerInitializer(processor, deviceService, authService, adaptor, sslHandlerProvider));
serverChannel = b.bind(host, port).sync().channel();
log.info("Mqtt transport started!");
diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionCtx.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionCtx.java
new file mode 100644
index 0000000..fef155f
--- /dev/null
+++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionCtx.java
@@ -0,0 +1,79 @@
+/**
+ * Copyright © 2016-2017 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.transport.mqtt.session;
+
+import org.thingsboard.server.common.data.Device;
+import org.thingsboard.server.common.data.id.SessionId;
+import org.thingsboard.server.common.msg.session.SessionActorToAdaptorMsg;
+import org.thingsboard.server.common.msg.session.SessionCtrlMsg;
+import org.thingsboard.server.common.msg.session.SessionType;
+import org.thingsboard.server.common.msg.session.ex.SessionException;
+import org.thingsboard.server.common.transport.SessionMsgProcessor;
+import org.thingsboard.server.common.transport.auth.DeviceAuthService;
+import org.thingsboard.server.common.transport.session.DeviceAwareSessionContext;
+
+/**
+ * Created by ashvayka on 19.01.17.
+ */
+public class GatewayDeviceSessionCtx extends DeviceAwareSessionContext {
+
+ private final MqttSessionId sessionId;
+ private volatile boolean closed;
+
+ public GatewayDeviceSessionCtx(SessionMsgProcessor processor, DeviceAuthService authService, Device device) {
+ super(processor, authService, device);
+ this.sessionId = new MqttSessionId();
+ }
+
+ @Override
+ public SessionId getSessionId() {
+ return sessionId;
+ }
+
+ @Override
+ public SessionType getSessionType() {
+ return SessionType.ASYNC;
+ }
+
+ @Override
+ public void onMsg(SessionActorToAdaptorMsg msg) throws SessionException {
+
+ }
+
+ @Override
+ public void onMsg(SessionCtrlMsg msg) throws SessionException {
+
+ }
+
+ @Override
+ public void onError(SessionException e) {
+
+ }
+
+ @Override
+ public boolean isClosed() {
+ return closed;
+ }
+
+ public void setClosed(boolean closed) {
+ this.closed = closed;
+ }
+
+ @Override
+ public long getTimeout() {
+ return 0;
+ }
+}
diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java
new file mode 100644
index 0000000..54336d9
--- /dev/null
+++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java
@@ -0,0 +1,69 @@
+/**
+ * Copyright © 2016-2017 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.transport.mqtt.session;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import org.springframework.util.StringUtils;
+import org.thingsboard.server.common.data.Device;
+import org.thingsboard.server.common.transport.SessionMsgProcessor;
+import org.thingsboard.server.common.transport.auth.DeviceAuthService;
+import org.thingsboard.server.dao.device.DeviceService;
+
+/**
+ * Created by ashvayka on 19.01.17.
+ */
+public class GatewaySessionCtx {
+
+ private final Device gateway;
+ private final SessionMsgProcessor processor;
+ private final DeviceService deviceService;
+ private final DeviceAuthService authService;
+ private final Map<String, GatewayDeviceSessionCtx> devices;
+
+ public GatewaySessionCtx(SessionMsgProcessor processor, DeviceService deviceService, DeviceAuthService authService, Device gateway) {
+ this.processor = processor;
+ this.deviceService = deviceService;
+ this.authService = authService;
+ this.gateway = gateway;
+ this.devices = new HashMap<>();
+ }
+
+ public void connect(String deviceName) {
+ checkDeviceName(deviceName);
+ Optional<Device> deviceOpt = deviceService.findDeviceByTenantIdAndName(gateway.getTenantId(), deviceName);
+ Device device = deviceOpt.orElseGet(() -> {
+ Device newDevice = new Device();
+ newDevice.setTenantId(gateway.getTenantId());
+ return deviceService.saveDevice(newDevice);
+ });
+ devices.put(deviceName, new GatewayDeviceSessionCtx(processor, authService, device));
+ }
+
+ public void disconnect(String deviceName) {
+ checkDeviceName(deviceName);
+ devices.remove(deviceName);
+ }
+
+ private void checkDeviceName(String deviceName) {
+ if (StringUtils.isEmpty(deviceName)) {
+ throw new RuntimeException();
+ }
+ }
+
+}