thingsboard-developers
Changes
application/src/main/java/org/thingsboard/server/service/transport/RemoteTransportApiService.java 28(+18 -10)
common/transport/src/main/java/org/thingsboard/server/common/transport/TransportService.java 5(+4 -1)
Details
diff --git a/application/src/main/java/org/thingsboard/server/service/transport/RemoteTransportApiService.java b/application/src/main/java/org/thingsboard/server/service/transport/RemoteTransportApiService.java
index 0c7b5f4..6e9b2ef 100644
--- a/application/src/main/java/org/thingsboard/server/service/transport/RemoteTransportApiService.java
+++ b/application/src/main/java/org/thingsboard/server/service/transport/RemoteTransportApiService.java
@@ -33,7 +33,8 @@ import org.thingsboard.server.gen.transport.TransportProtos.DeviceInfoProto;
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg;
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceTokenRequestMsg;
-import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceTokenResponseMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceX509CertRequestMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceCredentialsResponseMsg;
import org.thingsboard.server.kafka.TBKafkaConsumerTemplate;
import org.thingsboard.server.kafka.TBKafkaProducerTemplate;
import org.thingsboard.server.kafka.TbKafkaResponseTemplate;
@@ -118,17 +119,24 @@ public class RemoteTransportApiService implements TransportApiService {
public ListenableFuture<TransportApiResponseMsg> handle(TransportApiRequestMsg transportApiRequestMsg) throws Exception {
if (transportApiRequestMsg.hasValidateTokenRequestMsg()) {
ValidateDeviceTokenRequestMsg msg = transportApiRequestMsg.getValidateTokenRequestMsg();
- //TODO: Make async and enable caching
- DeviceCredentials credentials = deviceCredentialsService.findDeviceCredentialsByCredentialsId(msg.getToken());
- if (credentials != null && credentials.getCredentialsType() == DeviceCredentialsType.ACCESS_TOKEN) {
- return getDeviceInfo(credentials.getDeviceId());
- } else {
- return getEmptyTransportApiResponseFuture();
- }
+ return validateCredentials(msg.getToken(), DeviceCredentialsType.ACCESS_TOKEN);
+ } else if (transportApiRequestMsg.hasValidateX509CertRequestMsg()) {
+ ValidateDeviceX509CertRequestMsg msg = transportApiRequestMsg.getValidateX509CertRequestMsg();
+ return validateCredentials(msg.getHash(), DeviceCredentialsType.X509_CERTIFICATE);
}
return getEmptyTransportApiResponseFuture();
}
+ private ListenableFuture<TransportApiResponseMsg> validateCredentials(String credentialsId, DeviceCredentialsType credentialsType) {
+ //TODO: Make async and enable caching
+ DeviceCredentials credentials = deviceCredentialsService.findDeviceCredentialsByCredentialsId(credentialsId);
+ if (credentials != null && credentials.getCredentialsType() == credentialsType) {
+ return getDeviceInfo(credentials.getDeviceId());
+ } else {
+ return getEmptyTransportApiResponseFuture();
+ }
+ }
+
private ListenableFuture<TransportApiResponseMsg> getDeviceInfo(DeviceId deviceId) {
return Futures.transform(deviceService.findDeviceByIdAsync(deviceId), device -> {
if (device == null) {
@@ -146,7 +154,7 @@ public class RemoteTransportApiService implements TransportApiService {
.setAdditionalInfo(mapper.writeValueAsString(device.getAdditionalInfo()))
.build();
return TransportApiResponseMsg.newBuilder()
- .setValidateTokenResponseMsg(ValidateDeviceTokenResponseMsg.newBuilder().setDeviceInfo(deviceInfoProto).build()).build();
+ .setValidateTokenResponseMsg(ValidateDeviceCredentialsResponseMsg.newBuilder().setDeviceInfo(deviceInfoProto).build()).build();
} catch (JsonProcessingException e) {
log.warn("[{}] Failed to lookup device by id", deviceId, e);
return getEmptyTransportApiResponse();
@@ -160,6 +168,6 @@ public class RemoteTransportApiService implements TransportApiService {
private TransportApiResponseMsg getEmptyTransportApiResponse() {
return TransportApiResponseMsg.newBuilder()
- .setValidateTokenResponseMsg(ValidateDeviceTokenResponseMsg.getDefaultInstance()).build();
+ .setValidateTokenResponseMsg(ValidateDeviceCredentialsResponseMsg.getDefaultInstance()).build();
}
}
diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/TransportService.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/TransportService.java
index e72a626..84d34e1 100644
--- a/common/transport/src/main/java/org/thingsboard/server/common/transport/TransportService.java
+++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/TransportService.java
@@ -23,7 +23,10 @@ import org.thingsboard.server.gen.transport.TransportProtos;
public interface TransportService {
void process(TransportProtos.ValidateDeviceTokenRequestMsg msg,
- TransportServiceCallback<TransportProtos.ValidateDeviceTokenResponseMsg> callback);
+ TransportServiceCallback<TransportProtos.ValidateDeviceCredentialsResponseMsg> callback);
+
+ void process(TransportProtos.ValidateDeviceX509CertRequestMsg msg,
+ TransportServiceCallback<TransportProtos.ValidateDeviceCredentialsResponseMsg> callback);
void process(TransportProtos.SessionEventMsg msg, TransportServiceCallback<Void> callback);
diff --git a/common/transport/src/main/proto/transport.proto b/common/transport/src/main/proto/transport.proto
index 029bb14..e78f873 100644
--- a/common/transport/src/main/proto/transport.proto
+++ b/common/transport/src/main/proto/transport.proto
@@ -93,7 +93,11 @@ message ValidateDeviceTokenRequestMsg {
string token = 1;
}
-message ValidateDeviceTokenResponseMsg {
+message ValidateDeviceX509CertRequestMsg {
+ string hash = 1;
+}
+
+message ValidateDeviceCredentialsResponseMsg {
DeviceInfoProto deviceInfo = 1;
}
@@ -106,8 +110,9 @@ message TransportToRuleEngineMsg {
message TransportApiRequestMsg {
ValidateDeviceTokenRequestMsg validateTokenRequestMsg = 1;
+ ValidateDeviceX509CertRequestMsg validateX509CertRequestMsg = 2;
}
message TransportApiResponseMsg {
- ValidateDeviceTokenResponseMsg validateTokenResponseMsg = 1;
+ ValidateDeviceCredentialsResponseMsg validateTokenResponseMsg = 1;
}
\ No newline at end of file
diff --git a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
index a318b30..23bc4cc 100644
--- a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
+++ b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
@@ -380,9 +380,9 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
ctx.close();
} else {
transportService.process(ValidateDeviceTokenRequestMsg.newBuilder().setToken(userName).build(),
- new TransportServiceCallback<ValidateDeviceTokenResponseMsg>() {
+ new TransportServiceCallback<ValidateDeviceCredentialsResponseMsg>() {
@Override
- public void onSuccess(ValidateDeviceTokenResponseMsg msg) {
+ public void onSuccess(ValidateDeviceCredentialsResponseMsg msg) {
if (!msg.hasDeviceInfo()) {
ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_NOT_AUTHORIZED));
ctx.close();
@@ -404,32 +404,36 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
}
}
- protected SessionEventMsg getSessionEventMsg(SessionEvent event) {
- return SessionEventMsg.newBuilder()
- .setSessionInfo(sessionInfo)
- .setDeviceIdMSB(deviceSessionCtx.getDeviceIdMSB())
- .setDeviceIdLSB(deviceSessionCtx.getDeviceIdLSB())
- .setEvent(event).build();
- }
-
private void processX509CertConnect(ChannelHandlerContext ctx, X509Certificate cert) {
-// try {
-// String strCert = SslUtil.getX509CertificateString(cert);
-// String sha3Hash = EncryptionUtil.getSha3Hash(strCert);
-// if (deviceSessionCtx.login(new DeviceX509Credentials(sha3Hash))) {
-// ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED));
-// connected = true;
-// processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(),
-// new BasicAdaptorToSessionActorMsg(deviceSessionCtx, new SessionOpenMsg())));
-// checkGatewaySession();
-// } else {
-// ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_NOT_AUTHORIZED));
-// ctx.close();
-// }
-// } catch (Exception e) {
-// ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_NOT_AUTHORIZED));
-// ctx.close();
-// }
+ try {
+ String strCert = SslUtil.getX509CertificateString(cert);
+ String sha3Hash = EncryptionUtil.getSha3Hash(strCert);
+ transportService.process(ValidateDeviceX509CertRequestMsg.newBuilder().setHash(sha3Hash).build(),
+ new TransportServiceCallback<ValidateDeviceCredentialsResponseMsg>() {
+ @Override
+ public void onSuccess(ValidateDeviceCredentialsResponseMsg msg) {
+ if (!msg.hasDeviceInfo()) {
+ ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_NOT_AUTHORIZED));
+ ctx.close();
+ } else {
+ ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED));
+ deviceSessionCtx.setDeviceInfo(msg.getDeviceInfo());
+ transportService.process(getSessionEventMsg(SessionEvent.OPEN), null);
+ checkGatewaySession();
+ }
+ }
+
+ @Override
+ public void onError(Throwable e) {
+ log.trace("[{}] Failed to process credentials: {}", address, sha3Hash, e);
+ ctx.writeAndFlush(createMqttConnAckMsg(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE));
+ ctx.close();
+ }
+ });
+ } catch (Exception e) {
+ ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_NOT_AUTHORIZED));
+ ctx.close();
+ }
}
private X509Certificate getX509Certificate() {
@@ -519,6 +523,14 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
}
}
+ private SessionEventMsg getSessionEventMsg(SessionEvent event) {
+ return SessionEventMsg.newBuilder()
+ .setSessionInfo(sessionInfo)
+ .setDeviceIdMSB(deviceSessionCtx.getDeviceIdMSB())
+ .setDeviceIdLSB(deviceSessionCtx.getDeviceIdLSB())
+ .setEvent(event).build();
+ }
+
@Override
public void operationComplete(Future<? super Void> future) throws Exception {
if (deviceSessionCtx.isConnected()) {
diff --git a/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/MqttTransportService.java b/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/MqttTransportService.java
index f5fd690..4d945de 100644
--- a/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/MqttTransportService.java
+++ b/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/MqttTransportService.java
@@ -104,8 +104,16 @@ public class MqttTransportService implements TransportService {
}
@Override
- public void process(ValidateDeviceTokenRequestMsg msg, TransportServiceCallback<ValidateDeviceTokenResponseMsg> callback) {
- AsyncCallbackTemplate.withCallback(transportApiTemplate.post(msg.getToken(), TransportApiRequestMsg.newBuilder().setValidateTokenRequestMsg(msg).build()),
+ public void process(ValidateDeviceTokenRequestMsg msg, TransportServiceCallback<ValidateDeviceCredentialsResponseMsg> callback) {
+ AsyncCallbackTemplate.withCallback(transportApiTemplate.post(msg.getToken(),
+ TransportApiRequestMsg.newBuilder().setValidateTokenRequestMsg(msg).build()),
+ response -> callback.onSuccess(response.getValidateTokenResponseMsg()), callback::onError, transportCallbackExecutor);
+ }
+
+ @Override
+ public void process(ValidateDeviceX509CertRequestMsg msg, TransportServiceCallback<ValidateDeviceCredentialsResponseMsg> callback) {
+ AsyncCallbackTemplate.withCallback(transportApiTemplate.post(msg.getHash(),
+ TransportApiRequestMsg.newBuilder().setValidateX509CertRequestMsg(msg).build()),
response -> callback.onSuccess(response.getValidateTokenResponseMsg()), callback::onError, transportCallbackExecutor);
}