thingsboard-memoizeit
Changes
transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java 7(+5 -2)
transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportServerInitializer.java 8(+6 -2)
transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportService.java 6(+5 -1)
Details
diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
index 36868c5..9c4dd40 100644
--- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
+++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
@@ -37,6 +37,7 @@ import org.thingsboard.server.common.transport.adaptor.AdaptorException;
import org.thingsboard.server.common.transport.auth.DeviceAuthService;
import org.thingsboard.server.dao.EncryptionUtil;
import org.thingsboard.server.dao.device.DeviceService;
+import org.thingsboard.server.dao.relation.RelationService;
import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor;
import org.thingsboard.server.transport.mqtt.session.GatewaySessionCtx;
import org.thingsboard.server.transport.mqtt.session.DeviceSessionCtx;
@@ -67,14 +68,16 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
private final SessionMsgProcessor processor;
private final DeviceService deviceService;
private final DeviceAuthService authService;
+ private final RelationService relationService;
private final SslHandler sslHandler;
private volatile boolean connected;
private volatile GatewaySessionCtx gatewaySessionCtx;
- public MqttTransportHandler(SessionMsgProcessor processor, DeviceService deviceService, DeviceAuthService authService,
+ public MqttTransportHandler(SessionMsgProcessor processor, DeviceService deviceService, DeviceAuthService authService, RelationService relationService,
MqttTransportAdaptor adaptor, SslHandler sslHandler) {
this.processor = processor;
this.deviceService = deviceService;
+ this.relationService = relationService;
this.authService = authService;
this.adaptor = adaptor;
this.deviceSessionCtx = new DeviceSessionCtx(processor, authService, adaptor);
@@ -371,7 +374,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
if (infoNode != null) {
JsonNode gatewayNode = infoNode.get("gateway");
if (gatewayNode != null && gatewayNode.asBoolean()) {
- gatewaySessionCtx = new GatewaySessionCtx(processor, deviceService, authService, deviceSessionCtx);
+ gatewaySessionCtx = new GatewaySessionCtx(processor, deviceService, authService, relationService, deviceSessionCtx);
}
}
}
diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportServerInitializer.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportServerInitializer.java
index af109f6..1469290 100644
--- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportServerInitializer.java
+++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportServerInitializer.java
@@ -30,6 +30,7 @@ import org.springframework.beans.factory.annotation.Value;
import org.thingsboard.server.common.transport.SessionMsgProcessor;
import org.thingsboard.server.common.transport.auth.DeviceAuthService;
import org.thingsboard.server.dao.device.DeviceService;
+import org.thingsboard.server.dao.relation.RelationService;
import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor;
import javax.net.ssl.SSLException;
@@ -45,14 +46,17 @@ public class MqttTransportServerInitializer extends ChannelInitializer<SocketCha
private final SessionMsgProcessor processor;
private final DeviceService deviceService;
private final DeviceAuthService authService;
+ private final RelationService relationService;
private final MqttTransportAdaptor adaptor;
private final MqttSslHandlerProvider sslHandlerProvider;
- public MqttTransportServerInitializer(SessionMsgProcessor processor, DeviceService deviceService, DeviceAuthService authService, MqttTransportAdaptor adaptor,
+ public MqttTransportServerInitializer(SessionMsgProcessor processor, DeviceService deviceService, DeviceAuthService authService, RelationService relationService,
+ MqttTransportAdaptor adaptor,
MqttSslHandlerProvider sslHandlerProvider) {
this.processor = processor;
this.deviceService = deviceService;
this.authService = authService;
+ this.relationService = relationService;
this.adaptor = adaptor;
this.sslHandlerProvider = sslHandlerProvider;
}
@@ -68,7 +72,7 @@ public class MqttTransportServerInitializer extends ChannelInitializer<SocketCha
pipeline.addLast("decoder", new MqttDecoder(MAX_PAYLOAD_SIZE));
pipeline.addLast("encoder", MqttEncoder.INSTANCE);
- MqttTransportHandler handler = new MqttTransportHandler(processor, deviceService, authService, adaptor, sslHandler);
+ MqttTransportHandler handler = new MqttTransportHandler(processor, deviceService, authService, relationService, adaptor, sslHandler);
pipeline.addLast(handler);
ch.closeFuture().addListener(handler);
}
diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportService.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportService.java
index 8710809..179dad5 100644
--- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportService.java
+++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportService.java
@@ -29,6 +29,7 @@ import org.springframework.stereotype.Service;
import org.thingsboard.server.common.transport.SessionMsgProcessor;
import org.thingsboard.server.common.transport.auth.DeviceAuthService;
import org.thingsboard.server.dao.device.DeviceService;
+import org.thingsboard.server.dao.relation.RelationService;
import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor;
import javax.annotation.PostConstruct;
@@ -57,6 +58,9 @@ public class MqttTransportService {
private DeviceAuthService authService;
@Autowired(required = false)
+ private RelationService relationService;
+
+ @Autowired(required = false)
private MqttSslHandlerProvider sslHandlerProvider;
@Value("${mqtt.bind_address}")
@@ -95,7 +99,7 @@ public class MqttTransportService {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
- .childHandler(new MqttTransportServerInitializer(processor, deviceService, authService, adaptor, sslHandlerProvider));
+ .childHandler(new MqttTransportServerInitializer(processor, deviceService, authService, relationService, adaptor, sslHandlerProvider));
serverChannel = b.bind(host, port).sync().channel();
log.info("Mqtt transport started!");
diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java
index d6a953a..1977344 100644
--- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java
+++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java
@@ -27,6 +27,7 @@ import org.springframework.util.StringUtils;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.id.SessionId;
import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
+import org.thingsboard.server.common.data.relation.EntityRelation;
import org.thingsboard.server.common.msg.core.*;
import org.thingsboard.server.common.msg.session.BasicAdaptorToSessionActorMsg;
import org.thingsboard.server.common.msg.session.BasicToDeviceActorSessionMsg;
@@ -36,6 +37,7 @@ import org.thingsboard.server.common.transport.adaptor.AdaptorException;
import org.thingsboard.server.common.transport.adaptor.JsonConverter;
import org.thingsboard.server.common.transport.auth.DeviceAuthService;
import org.thingsboard.server.dao.device.DeviceService;
+import org.thingsboard.server.dao.relation.RelationService;
import org.thingsboard.server.transport.mqtt.MqttTransportHandler;
import org.thingsboard.server.transport.mqtt.adaptors.JsonMqttAdaptor;
@@ -58,28 +60,34 @@ public class GatewaySessionCtx {
private final SessionMsgProcessor processor;
private final DeviceService deviceService;
private final DeviceAuthService authService;
+ private final RelationService relationService;
private final Map<String, GatewayDeviceSessionCtx> devices;
private ChannelHandlerContext channel;
- public GatewaySessionCtx(SessionMsgProcessor processor, DeviceService deviceService, DeviceAuthService authService, DeviceSessionCtx gatewaySessionCtx) {
+ public GatewaySessionCtx(SessionMsgProcessor processor, DeviceService deviceService, DeviceAuthService authService, RelationService relationService, DeviceSessionCtx gatewaySessionCtx) {
this.processor = processor;
this.deviceService = deviceService;
this.authService = authService;
+ this.relationService = relationService;
this.gateway = gatewaySessionCtx.getDevice();
this.gatewaySessionId = gatewaySessionCtx.getSessionId();
this.devices = new HashMap<>();
}
public void onDeviceConnect(MqttPublishMessage msg) throws AdaptorException {
- String deviceName = checkDeviceName(getDeviceName(msg));
+ JsonElement json = getJson(msg);
+ String deviceName = checkDeviceName(getDeviceName(json));
+ String deviceType = getDeviceType(json);
if (!devices.containsKey(deviceName)) {
Optional<Device> deviceOpt = deviceService.findDeviceByTenantIdAndName(gateway.getTenantId(), deviceName);
Device device = deviceOpt.orElseGet(() -> {
Device newDevice = new Device();
newDevice.setTenantId(gateway.getTenantId());
newDevice.setName(deviceName);
- newDevice.setType("default");
- return deviceService.saveDevice(newDevice);
+ newDevice.setType(deviceType);
+ newDevice = deviceService.saveDevice(newDevice);
+ relationService.saveRelation(new EntityRelation(gateway.getId(), newDevice.getId(), "Created"));
+ return newDevice;
});
GatewayDeviceSessionCtx ctx = new GatewayDeviceSessionCtx(this, device);
devices.put(deviceName, ctx);
@@ -91,7 +99,7 @@ public class GatewaySessionCtx {
}
public void onDeviceDisconnect(MqttPublishMessage msg) throws AdaptorException {
- String deviceName = checkDeviceName(getDeviceName(msg));
+ String deviceName = checkDeviceName(getDeviceName(getJson(msg)));
GatewayDeviceSessionCtx deviceSessionCtx = devices.remove(deviceName);
if (deviceSessionCtx != null) {
processor.process(SessionCloseMsg.onDisconnect(deviceSessionCtx.getSessionId()));
@@ -211,11 +219,19 @@ public class GatewaySessionCtx {
}
}
- private String getDeviceName(MqttPublishMessage mqttMsg) throws AdaptorException {
- JsonElement json = JsonMqttAdaptor.validateJsonPayload(gatewaySessionId, mqttMsg.payload());
+ private String getDeviceName(JsonElement json) throws AdaptorException {
return json.getAsJsonObject().get("device").getAsString();
}
+ private String getDeviceType(JsonElement json) throws AdaptorException {
+ JsonElement type = json.getAsJsonObject().get("type");
+ return type == null ? "default" : type.getAsString();
+ }
+
+ private JsonElement getJson(MqttPublishMessage mqttMsg) throws AdaptorException {
+ return JsonMqttAdaptor.validateJsonPayload(gatewaySessionId, mqttMsg.payload());
+ }
+
protected SessionMsgProcessor getProcessor() {
return processor;
}
@@ -229,7 +245,9 @@ public class GatewaySessionCtx {
}
private void ack(MqttPublishMessage msg) {
- writeAndFlush(MqttTransportHandler.createMqttPubAckMsg(msg.variableHeader().messageId()));
+ if(msg.variableHeader().messageId() > 0) {
+ writeAndFlush(MqttTransportHandler.createMqttPubAckMsg(msg.variableHeader().messageId()));
+ }
}
protected void writeAndFlush(MqttMessage mqttMessage) {
diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/SslUtil.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/SslUtil.java
index adda344..590c289 100644
--- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/SslUtil.java
+++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/SslUtil.java
@@ -18,9 +18,7 @@ package org.thingsboard.server.transport.mqtt.util;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.Base64Utils;
import org.thingsboard.server.dao.EncryptionUtil;
-import sun.misc.BASE64Encoder;
-import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.security.cert.CertificateEncodingException;
import java.security.cert.X509Certificate;