thingsboard-developers

Details

diff --git a/application/src/main/java/org/thingsboard/server/service/transport/RemoteTransportApiService.java b/application/src/main/java/org/thingsboard/server/service/transport/RemoteTransportApiService.java
index 0c7b5f4..6e9b2ef 100644
--- a/application/src/main/java/org/thingsboard/server/service/transport/RemoteTransportApiService.java
+++ b/application/src/main/java/org/thingsboard/server/service/transport/RemoteTransportApiService.java
@@ -33,7 +33,8 @@ import org.thingsboard.server.gen.transport.TransportProtos.DeviceInfoProto;
 import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg;
 import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg;
 import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceTokenRequestMsg;
-import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceTokenResponseMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceX509CertRequestMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceCredentialsResponseMsg;
 import org.thingsboard.server.kafka.TBKafkaConsumerTemplate;
 import org.thingsboard.server.kafka.TBKafkaProducerTemplate;
 import org.thingsboard.server.kafka.TbKafkaResponseTemplate;
@@ -118,17 +119,24 @@ public class RemoteTransportApiService implements TransportApiService {
     public ListenableFuture<TransportApiResponseMsg> handle(TransportApiRequestMsg transportApiRequestMsg) throws Exception {
         if (transportApiRequestMsg.hasValidateTokenRequestMsg()) {
             ValidateDeviceTokenRequestMsg msg = transportApiRequestMsg.getValidateTokenRequestMsg();
-            //TODO: Make async and enable caching
-            DeviceCredentials credentials = deviceCredentialsService.findDeviceCredentialsByCredentialsId(msg.getToken());
-            if (credentials != null && credentials.getCredentialsType() == DeviceCredentialsType.ACCESS_TOKEN) {
-                return getDeviceInfo(credentials.getDeviceId());
-            } else {
-                return getEmptyTransportApiResponseFuture();
-            }
+            return validateCredentials(msg.getToken(), DeviceCredentialsType.ACCESS_TOKEN);
+        } else if (transportApiRequestMsg.hasValidateX509CertRequestMsg()) {
+            ValidateDeviceX509CertRequestMsg msg = transportApiRequestMsg.getValidateX509CertRequestMsg();
+            return validateCredentials(msg.getHash(), DeviceCredentialsType.X509_CERTIFICATE);
         }
         return getEmptyTransportApiResponseFuture();
     }
 
+    private ListenableFuture<TransportApiResponseMsg> validateCredentials(String credentialsId, DeviceCredentialsType credentialsType) {
+        //TODO: Make async and enable caching
+        DeviceCredentials credentials = deviceCredentialsService.findDeviceCredentialsByCredentialsId(credentialsId);
+        if (credentials != null && credentials.getCredentialsType() == credentialsType) {
+            return getDeviceInfo(credentials.getDeviceId());
+        } else {
+            return getEmptyTransportApiResponseFuture();
+        }
+    }
+
     private ListenableFuture<TransportApiResponseMsg> getDeviceInfo(DeviceId deviceId) {
         return Futures.transform(deviceService.findDeviceByIdAsync(deviceId), device -> {
             if (device == null) {
@@ -146,7 +154,7 @@ public class RemoteTransportApiService implements TransportApiService {
                         .setAdditionalInfo(mapper.writeValueAsString(device.getAdditionalInfo()))
                         .build();
                 return TransportApiResponseMsg.newBuilder()
-                        .setValidateTokenResponseMsg(ValidateDeviceTokenResponseMsg.newBuilder().setDeviceInfo(deviceInfoProto).build()).build();
+                        .setValidateTokenResponseMsg(ValidateDeviceCredentialsResponseMsg.newBuilder().setDeviceInfo(deviceInfoProto).build()).build();
             } catch (JsonProcessingException e) {
                 log.warn("[{}] Failed to lookup device by id", deviceId, e);
                 return getEmptyTransportApiResponse();
@@ -160,6 +168,6 @@ public class RemoteTransportApiService implements TransportApiService {
 
     private TransportApiResponseMsg getEmptyTransportApiResponse() {
         return TransportApiResponseMsg.newBuilder()
-                .setValidateTokenResponseMsg(ValidateDeviceTokenResponseMsg.getDefaultInstance()).build();
+                .setValidateTokenResponseMsg(ValidateDeviceCredentialsResponseMsg.getDefaultInstance()).build();
     }
 }
diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/TransportService.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/TransportService.java
index e72a626..84d34e1 100644
--- a/common/transport/src/main/java/org/thingsboard/server/common/transport/TransportService.java
+++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/TransportService.java
@@ -23,7 +23,10 @@ import org.thingsboard.server.gen.transport.TransportProtos;
 public interface TransportService {
 
     void process(TransportProtos.ValidateDeviceTokenRequestMsg msg,
-                 TransportServiceCallback<TransportProtos.ValidateDeviceTokenResponseMsg> callback);
+                 TransportServiceCallback<TransportProtos.ValidateDeviceCredentialsResponseMsg> callback);
+
+    void process(TransportProtos.ValidateDeviceX509CertRequestMsg msg,
+                 TransportServiceCallback<TransportProtos.ValidateDeviceCredentialsResponseMsg> callback);
 
     void process(TransportProtos.SessionEventMsg msg, TransportServiceCallback<Void> callback);
 
diff --git a/common/transport/src/main/proto/transport.proto b/common/transport/src/main/proto/transport.proto
index 029bb14..e78f873 100644
--- a/common/transport/src/main/proto/transport.proto
+++ b/common/transport/src/main/proto/transport.proto
@@ -93,7 +93,11 @@ message ValidateDeviceTokenRequestMsg {
   string token = 1;
 }
 
-message ValidateDeviceTokenResponseMsg {
+message ValidateDeviceX509CertRequestMsg {
+  string hash = 1;
+}
+
+message ValidateDeviceCredentialsResponseMsg {
   DeviceInfoProto deviceInfo = 1;
 }
 
@@ -106,8 +110,9 @@ message TransportToRuleEngineMsg {
 
 message TransportApiRequestMsg {
    ValidateDeviceTokenRequestMsg validateTokenRequestMsg = 1;
+   ValidateDeviceX509CertRequestMsg validateX509CertRequestMsg = 2;
 }
 
 message TransportApiResponseMsg {
-   ValidateDeviceTokenResponseMsg validateTokenResponseMsg = 1;
+   ValidateDeviceCredentialsResponseMsg validateTokenResponseMsg = 1;
 }
\ No newline at end of file
diff --git a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
index a318b30..23bc4cc 100644
--- a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
+++ b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
@@ -380,9 +380,9 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
             ctx.close();
         } else {
             transportService.process(ValidateDeviceTokenRequestMsg.newBuilder().setToken(userName).build(),
-                    new TransportServiceCallback<ValidateDeviceTokenResponseMsg>() {
+                    new TransportServiceCallback<ValidateDeviceCredentialsResponseMsg>() {
                         @Override
-                        public void onSuccess(ValidateDeviceTokenResponseMsg msg) {
+                        public void onSuccess(ValidateDeviceCredentialsResponseMsg msg) {
                             if (!msg.hasDeviceInfo()) {
                                 ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_NOT_AUTHORIZED));
                                 ctx.close();
@@ -404,32 +404,36 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
         }
     }
 
-    protected SessionEventMsg getSessionEventMsg(SessionEvent event) {
-        return SessionEventMsg.newBuilder()
-                .setSessionInfo(sessionInfo)
-                .setDeviceIdMSB(deviceSessionCtx.getDeviceIdMSB())
-                .setDeviceIdLSB(deviceSessionCtx.getDeviceIdLSB())
-                .setEvent(event).build();
-    }
-
     private void processX509CertConnect(ChannelHandlerContext ctx, X509Certificate cert) {
-//        try {
-//            String strCert = SslUtil.getX509CertificateString(cert);
-//            String sha3Hash = EncryptionUtil.getSha3Hash(strCert);
-//            if (deviceSessionCtx.login(new DeviceX509Credentials(sha3Hash))) {
-//                ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED));
-//                connected = true;
-//                processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(),
-//                        new BasicAdaptorToSessionActorMsg(deviceSessionCtx, new SessionOpenMsg())));
-//                checkGatewaySession();
-//            } else {
-//                ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_NOT_AUTHORIZED));
-//                ctx.close();
-//            }
-//        } catch (Exception e) {
-//            ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_NOT_AUTHORIZED));
-//            ctx.close();
-//        }
+        try {
+            String strCert = SslUtil.getX509CertificateString(cert);
+            String sha3Hash = EncryptionUtil.getSha3Hash(strCert);
+            transportService.process(ValidateDeviceX509CertRequestMsg.newBuilder().setHash(sha3Hash).build(),
+                    new TransportServiceCallback<ValidateDeviceCredentialsResponseMsg>() {
+                        @Override
+                        public void onSuccess(ValidateDeviceCredentialsResponseMsg msg) {
+                            if (!msg.hasDeviceInfo()) {
+                                ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_NOT_AUTHORIZED));
+                                ctx.close();
+                            } else {
+                                ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED));
+                                deviceSessionCtx.setDeviceInfo(msg.getDeviceInfo());
+                                transportService.process(getSessionEventMsg(SessionEvent.OPEN), null);
+                                checkGatewaySession();
+                            }
+                        }
+
+                        @Override
+                        public void onError(Throwable e) {
+                            log.trace("[{}] Failed to process credentials: {}", address, sha3Hash, e);
+                            ctx.writeAndFlush(createMqttConnAckMsg(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE));
+                            ctx.close();
+                        }
+                    });
+        } catch (Exception e) {
+            ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_NOT_AUTHORIZED));
+            ctx.close();
+        }
     }
 
     private X509Certificate getX509Certificate() {
@@ -519,6 +523,14 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
         }
     }
 
+    private SessionEventMsg getSessionEventMsg(SessionEvent event) {
+        return SessionEventMsg.newBuilder()
+                .setSessionInfo(sessionInfo)
+                .setDeviceIdMSB(deviceSessionCtx.getDeviceIdMSB())
+                .setDeviceIdLSB(deviceSessionCtx.getDeviceIdLSB())
+                .setEvent(event).build();
+    }
+
     @Override
     public void operationComplete(Future<? super Void> future) throws Exception {
         if (deviceSessionCtx.isConnected()) {
diff --git a/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/MqttTransportService.java b/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/MqttTransportService.java
index f5fd690..4d945de 100644
--- a/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/MqttTransportService.java
+++ b/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/MqttTransportService.java
@@ -104,8 +104,16 @@ public class MqttTransportService implements TransportService {
     }
 
     @Override
-    public void process(ValidateDeviceTokenRequestMsg msg, TransportServiceCallback<ValidateDeviceTokenResponseMsg> callback) {
-        AsyncCallbackTemplate.withCallback(transportApiTemplate.post(msg.getToken(), TransportApiRequestMsg.newBuilder().setValidateTokenRequestMsg(msg).build()),
+    public void process(ValidateDeviceTokenRequestMsg msg, TransportServiceCallback<ValidateDeviceCredentialsResponseMsg> callback) {
+        AsyncCallbackTemplate.withCallback(transportApiTemplate.post(msg.getToken(),
+                TransportApiRequestMsg.newBuilder().setValidateTokenRequestMsg(msg).build()),
+                response -> callback.onSuccess(response.getValidateTokenResponseMsg()), callback::onError, transportCallbackExecutor);
+    }
+
+    @Override
+    public void process(ValidateDeviceX509CertRequestMsg msg, TransportServiceCallback<ValidateDeviceCredentialsResponseMsg> callback) {
+        AsyncCallbackTemplate.withCallback(transportApiTemplate.post(msg.getHash(),
+                TransportApiRequestMsg.newBuilder().setValidateX509CertRequestMsg(msg).build()),
                 response -> callback.onSuccess(response.getValidateTokenResponseMsg()), callback::onError, transportCallbackExecutor);
     }