thingsboard-memoizeit

Details

diff --git a/application/src/main/java/org/thingsboard/server/service/transport/LocalTransportApiService.java b/application/src/main/java/org/thingsboard/server/service/transport/LocalTransportApiService.java
new file mode 100644
index 0000000..5c39f8a
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/service/transport/LocalTransportApiService.java
@@ -0,0 +1,173 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.service.transport;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.stereotype.Service;
+import org.thingsboard.server.common.data.Device;
+import org.thingsboard.server.common.data.id.DeviceId;
+import org.thingsboard.server.common.data.relation.EntityRelation;
+import org.thingsboard.server.common.data.security.DeviceCredentials;
+import org.thingsboard.server.common.data.security.DeviceCredentialsType;
+import org.thingsboard.server.dao.device.DeviceCredentialsService;
+import org.thingsboard.server.dao.device.DeviceService;
+import org.thingsboard.server.dao.relation.RelationService;
+import org.thingsboard.server.gen.transport.TransportProtos.DeviceInfoProto;
+import org.thingsboard.server.gen.transport.TransportProtos.GetOrCreateDeviceFromGatewayRequestMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.GetOrCreateDeviceFromGatewayResponseMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceCredentialsResponseMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceTokenRequestMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceX509CertRequestMsg;
+import org.thingsboard.server.kafka.TBKafkaConsumerTemplate;
+import org.thingsboard.server.kafka.TBKafkaProducerTemplate;
+import org.thingsboard.server.kafka.TbKafkaResponseTemplate;
+import org.thingsboard.server.kafka.TbKafkaSettings;
+import org.thingsboard.server.service.cluster.discovery.DiscoveryService;
+import org.thingsboard.server.service.executors.DbCallbackExecutorService;
+import org.thingsboard.server.service.state.DeviceStateService;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * Created by ashvayka on 05.10.18.
+ */
+@Slf4j
+@Service
+public class LocalTransportApiService implements TransportApiService {
+
+    private static final ObjectMapper mapper = new ObjectMapper();
+
+    @Autowired
+    private DeviceService deviceService;
+
+    @Autowired
+    private RelationService relationService;
+
+    @Autowired
+    private DeviceCredentialsService deviceCredentialsService;
+
+    @Autowired
+    private DeviceStateService deviceStateService;
+
+    @Autowired
+    private DbCallbackExecutorService dbCallbackExecutorService;
+
+    private ReentrantLock deviceCreationLock = new ReentrantLock();
+
+    @Override
+    public ListenableFuture<TransportApiResponseMsg> handle(TransportApiRequestMsg transportApiRequestMsg) {
+        if (transportApiRequestMsg.hasValidateTokenRequestMsg()) {
+            ValidateDeviceTokenRequestMsg msg = transportApiRequestMsg.getValidateTokenRequestMsg();
+            return validateCredentials(msg.getToken(), DeviceCredentialsType.ACCESS_TOKEN);
+        } else if (transportApiRequestMsg.hasValidateX509CertRequestMsg()) {
+            ValidateDeviceX509CertRequestMsg msg = transportApiRequestMsg.getValidateX509CertRequestMsg();
+            return validateCredentials(msg.getHash(), DeviceCredentialsType.X509_CERTIFICATE);
+        } else if (transportApiRequestMsg.hasGetOrCreateDeviceRequestMsg()) {
+            return handle(transportApiRequestMsg.getGetOrCreateDeviceRequestMsg());
+        }
+        return getEmptyTransportApiResponseFuture();
+    }
+
+    private ListenableFuture<TransportApiResponseMsg> validateCredentials(String credentialsId, DeviceCredentialsType credentialsType) {
+        //TODO: Make async and enable caching
+        DeviceCredentials credentials = deviceCredentialsService.findDeviceCredentialsByCredentialsId(credentialsId);
+        if (credentials != null && credentials.getCredentialsType() == credentialsType) {
+            return getDeviceInfo(credentials.getDeviceId());
+        } else {
+            return getEmptyTransportApiResponseFuture();
+        }
+    }
+
+    private ListenableFuture<TransportApiResponseMsg> handle(GetOrCreateDeviceFromGatewayRequestMsg requestMsg) {
+        DeviceId gatewayId = new DeviceId(new UUID(requestMsg.getGatewayIdMSB(), requestMsg.getGatewayIdLSB()));
+        ListenableFuture<Device> gatewayFuture = deviceService.findDeviceByIdAsync(gatewayId);
+        return Futures.transform(gatewayFuture, gateway -> {
+            deviceCreationLock.lock();
+            try {
+                Device device = deviceService.findDeviceByTenantIdAndName(gateway.getTenantId(), requestMsg.getDeviceName());
+                if (device == null) {
+                    device = new Device();
+                    device.setTenantId(gateway.getTenantId());
+                    device.setName(requestMsg.getDeviceName());
+                    device.setType(requestMsg.getDeviceType());
+                    device.setCustomerId(gateway.getCustomerId());
+                    device = deviceService.saveDevice(device);
+                    relationService.saveRelationAsync(new EntityRelation(gateway.getId(), device.getId(), "Created"));
+                    deviceStateService.onDeviceAdded(device);
+                }
+                return TransportApiResponseMsg.newBuilder()
+                        .setGetOrCreateDeviceResponseMsg(GetOrCreateDeviceFromGatewayResponseMsg.newBuilder().setDeviceInfo(getDeviceInfoProto(device)).build()).build();
+            } catch (JsonProcessingException e) {
+                log.warn("[{}] Failed to lookup device by gateway id and name", gatewayId, requestMsg.getDeviceName(), e);
+                throw new RuntimeException(e);
+            } finally {
+                deviceCreationLock.unlock();
+            }
+        }, dbCallbackExecutorService);
+    }
+
+
+    private ListenableFuture<TransportApiResponseMsg> getDeviceInfo(DeviceId deviceId) {
+        return Futures.transform(deviceService.findDeviceByIdAsync(deviceId), device -> {
+            if (device == null) {
+                log.trace("[{}] Failed to lookup device by id", deviceId);
+                return getEmptyTransportApiResponse();
+            }
+            try {
+                return TransportApiResponseMsg.newBuilder()
+                        .setValidateTokenResponseMsg(ValidateDeviceCredentialsResponseMsg.newBuilder().setDeviceInfo(getDeviceInfoProto(device)).build()).build();
+            } catch (JsonProcessingException e) {
+                log.warn("[{}] Failed to lookup device by id", deviceId, e);
+                return getEmptyTransportApiResponse();
+            }
+        });
+    }
+
+    private DeviceInfoProto getDeviceInfoProto(Device device) throws JsonProcessingException {
+        return DeviceInfoProto.newBuilder()
+                .setTenantIdMSB(device.getTenantId().getId().getMostSignificantBits())
+                .setTenantIdLSB(device.getTenantId().getId().getLeastSignificantBits())
+                .setDeviceIdMSB(device.getId().getId().getMostSignificantBits())
+                .setDeviceIdLSB(device.getId().getId().getLeastSignificantBits())
+                .setDeviceName(device.getName())
+                .setDeviceType(device.getType())
+                .setAdditionalInfo(mapper.writeValueAsString(device.getAdditionalInfo()))
+                .build();
+    }
+
+    private ListenableFuture<TransportApiResponseMsg> getEmptyTransportApiResponseFuture() {
+        return Futures.immediateFuture(getEmptyTransportApiResponse());
+    }
+
+    private TransportApiResponseMsg getEmptyTransportApiResponse() {
+        return TransportApiResponseMsg.newBuilder()
+                .setValidateTokenResponseMsg(ValidateDeviceCredentialsResponseMsg.getDefaultInstance()).build();
+    }
+}
diff --git a/application/src/main/java/org/thingsboard/server/service/transport/LocalTransportService.java b/application/src/main/java/org/thingsboard/server/service/transport/LocalTransportService.java
new file mode 100644
index 0000000..ba5cc07
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/service/transport/LocalTransportService.java
@@ -0,0 +1,249 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.service.transport;
+
+import akka.actor.ActorRef;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.stereotype.Service;
+import org.thingsboard.rule.engine.api.util.DonAsynchron;
+import org.thingsboard.server.actors.ActorSystemContext;
+import org.thingsboard.server.common.msg.cluster.ServerAddress;
+import org.thingsboard.server.common.transport.SessionMsgListener;
+import org.thingsboard.server.common.transport.TransportService;
+import org.thingsboard.server.common.transport.TransportServiceCallback;
+import org.thingsboard.server.gen.transport.TransportProtos.DeviceActorToTransportMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeRequestMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.GetOrCreateDeviceFromGatewayRequestMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.GetOrCreateDeviceFromGatewayResponseMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.PostAttributeMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.PostTelemetryMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.SessionEventMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.SessionInfoProto;
+import org.thingsboard.server.gen.transport.TransportProtos.SubscribeToAttributeUpdatesMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.SubscribeToRPCMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.ToDeviceRpcResponseMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.ToServerRpcRequestMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.TransportToDeviceActorMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceCredentialsResponseMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceTokenRequestMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceX509CertRequestMsg;
+import org.thingsboard.server.service.cluster.routing.ClusterRoutingService;
+import org.thingsboard.server.service.cluster.rpc.ClusterRpcService;
+import org.thingsboard.server.service.encoding.DataDecodingEncodingService;
+import org.thingsboard.server.service.transport.msg.TransportToDeviceActorMsgWrapper;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+
+/**
+ * Created by ashvayka on 12.10.18.
+ */
+@Slf4j
+@Service
+@ConditionalOnProperty(prefix = "transport", value = "type", havingValue = "local")
+public class LocalTransportService implements TransportService, RuleEngineTransportService {
+
+    private ConcurrentMap<UUID, SessionMsgListener> sessions = new ConcurrentHashMap<>();
+
+    private ExecutorService transportCallbackExecutor;
+
+    @Autowired
+    private TransportApiService transportApiService;
+
+    @Autowired
+    private ActorSystemContext actorContext;
+
+    //TODO: completely replace this routing with the Kafka routing by partition ids.
+    @Autowired
+    private ClusterRoutingService routingService;
+    @Autowired
+    private ClusterRpcService rpcService;
+    @Autowired
+    private DataDecodingEncodingService encodingService;
+
+    @PostConstruct
+    public void init() {
+        this.transportCallbackExecutor = new ThreadPoolExecutor(0, 100, 60L, TimeUnit.SECONDS, new SynchronousQueue<>());
+    }
+
+    @PreDestroy
+    public void destroy() {
+        if (transportCallbackExecutor != null) {
+            transportCallbackExecutor.shutdownNow();
+        }
+    }
+
+    @Override
+    public void process(ValidateDeviceTokenRequestMsg msg, TransportServiceCallback<ValidateDeviceCredentialsResponseMsg> callback) {
+        DonAsynchron.withCallback(
+                transportApiService.handle(TransportApiRequestMsg.newBuilder().setValidateTokenRequestMsg(msg).build()),
+                transportApiResponseMsg -> {
+                    if (callback != null) {
+                        callback.onSuccess(transportApiResponseMsg.getValidateTokenResponseMsg());
+                    }
+                },
+                getThrowableConsumer(callback), transportCallbackExecutor);
+    }
+
+    @Override
+    public void process(ValidateDeviceX509CertRequestMsg msg, TransportServiceCallback<ValidateDeviceCredentialsResponseMsg> callback) {
+        DonAsynchron.withCallback(
+                transportApiService.handle(TransportApiRequestMsg.newBuilder().setValidateX509CertRequestMsg(msg).build()),
+                transportApiResponseMsg -> {
+                    if (callback != null) {
+                        callback.onSuccess(transportApiResponseMsg.getValidateTokenResponseMsg());
+                    }
+                },
+                getThrowableConsumer(callback), transportCallbackExecutor);
+    }
+
+    @Override
+    public void process(GetOrCreateDeviceFromGatewayRequestMsg msg, TransportServiceCallback<GetOrCreateDeviceFromGatewayResponseMsg> callback) {
+        DonAsynchron.withCallback(
+                transportApiService.handle(TransportApiRequestMsg.newBuilder().setGetOrCreateDeviceRequestMsg(msg).build()),
+                transportApiResponseMsg -> {
+                    if (callback != null) {
+                        callback.onSuccess(transportApiResponseMsg.getGetOrCreateDeviceResponseMsg());
+                    }
+                },
+                getThrowableConsumer(callback), transportCallbackExecutor);
+    }
+
+    @Override
+    public void process(SessionInfoProto sessionInfo, SessionEventMsg msg, TransportServiceCallback<Void> callback) {
+        forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setSessionEvent(msg).build(), callback);
+    }
+
+    @Override
+    public void process(SessionInfoProto sessionInfo, PostTelemetryMsg msg, TransportServiceCallback<Void> callback) {
+        forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setPostTelemetry(msg).build(), callback);
+    }
+
+    @Override
+    public void process(SessionInfoProto sessionInfo, PostAttributeMsg msg, TransportServiceCallback<Void> callback) {
+        forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setPostAttributes(msg).build(), callback);
+    }
+
+    @Override
+    public void process(SessionInfoProto sessionInfo, GetAttributeRequestMsg msg, TransportServiceCallback<Void> callback) {
+        forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setGetAttributes(msg).build(), callback);
+    }
+
+    @Override
+    public void process(SessionInfoProto sessionInfo, SubscribeToAttributeUpdatesMsg msg, TransportServiceCallback<Void> callback) {
+        forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setSubscribeToAttributes(msg).build(), callback);
+    }
+
+    @Override
+    public void process(SessionInfoProto sessionInfo, SubscribeToRPCMsg msg, TransportServiceCallback<Void> callback) {
+        forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setSubscribeToRPC(msg).build(), callback);
+    }
+
+    @Override
+    public void process(SessionInfoProto sessionInfo, ToDeviceRpcResponseMsg msg, TransportServiceCallback<Void> callback) {
+        forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setToDeviceRPCCallResponse(msg).build(), callback);
+    }
+
+    @Override
+    public void process(SessionInfoProto sessionInfo, ToServerRpcRequestMsg msg, TransportServiceCallback<Void> callback) {
+        forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setToServerRPCCallRequest(msg).build(), callback);
+    }
+
+    @Override
+    public void registerSession(SessionInfoProto sessionInfo, SessionMsgListener listener) {
+        sessions.putIfAbsent(toId(sessionInfo), listener);
+        //TODO: monitor sessions periodically: PING REQ/RESP, etc.
+    }
+
+    @Override
+    public void deregisterSession(SessionInfoProto sessionInfo) {
+        sessions.remove(toId(sessionInfo));
+    }
+
+    private UUID toId(SessionInfoProto sessionInfo) {
+        return new UUID(sessionInfo.getSessionIdMSB(), sessionInfo.getSessionIdLSB());
+    }
+
+    @Override
+    public void process(String nodeId, DeviceActorToTransportMsg msg) {
+        process(nodeId, msg, null, null);
+    }
+
+    @Override
+    public void process(String nodeId, DeviceActorToTransportMsg msg, Runnable onSuccess, Consumer<Throwable> onFailure) {
+        UUID sessionId = new UUID(msg.getSessionIdMSB(), msg.getSessionIdLSB());
+        SessionMsgListener listener = sessions.get(sessionId);
+        if (listener != null) {
+            transportCallbackExecutor.submit(() -> {
+                if (msg.hasGetAttributesResponse()) {
+                    listener.onGetAttributesResponse(msg.getGetAttributesResponse());
+                }
+                if (msg.hasAttributeUpdateNotification()) {
+                    listener.onAttributeUpdate(msg.getAttributeUpdateNotification());
+                }
+                if (msg.hasSessionCloseNotification()) {
+                    listener.onRemoteSessionCloseCommand(msg.getSessionCloseNotification());
+                }
+                if (msg.hasToDeviceRequest()) {
+                    listener.onToDeviceRpcRequest(msg.getToDeviceRequest());
+                }
+                if (msg.hasToServerResponse()) {
+                    listener.onToServerRpcResponse(msg.getToServerResponse());
+                }
+            });
+        } else {
+            //TODO: should we notify the device actor about missed session?
+            log.debug("[{}] Missing session.", sessionId);
+        }
+        if (onSuccess != null) {
+            onSuccess.run();
+        }
+    }
+
+    private void forwardToDeviceActor(TransportToDeviceActorMsg toDeviceActorMsg, TransportServiceCallback<Void> callback) {
+        TransportToDeviceActorMsgWrapper wrapper = new TransportToDeviceActorMsgWrapper(toDeviceActorMsg);
+        Optional<ServerAddress> address = routingService.resolveById(wrapper.getDeviceId());
+        if (address.isPresent()) {
+            rpcService.tell(encodingService.convertToProtoDataMessage(address.get(), wrapper));
+        } else {
+            actorContext.getAppActor().tell(wrapper, ActorRef.noSender());
+        }
+        if (callback != null) {
+            callback.onSuccess(null);
+        }
+    }
+
+    private <T> Consumer<Throwable> getThrowableConsumer(TransportServiceCallback<T> callback) {
+        return e -> {
+            if (callback != null) {
+                callback.onError(e);
+            }
+        };
+    }
+
+}
diff --git a/application/src/main/java/org/thingsboard/server/service/transport/RemoteRuleEngineTransportService.java b/application/src/main/java/org/thingsboard/server/service/transport/RemoteRuleEngineTransportService.java
index aa36b9b..a0791ad 100644
--- a/application/src/main/java/org/thingsboard/server/service/transport/RemoteRuleEngineTransportService.java
+++ b/application/src/main/java/org/thingsboard/server/service/transport/RemoteRuleEngineTransportService.java
@@ -55,7 +55,7 @@ import java.util.function.Consumer;
  */
 @Slf4j
 @Service
-@ConditionalOnProperty(prefix = "transport.remote", value = "enabled", havingValue = "true")
+@ConditionalOnProperty(prefix = "transport", value = "type", havingValue = "remote")
 public class RemoteRuleEngineTransportService implements RuleEngineTransportService {
 
     private static final ObjectMapper mapper = new ObjectMapper();
@@ -78,9 +78,6 @@ public class RemoteRuleEngineTransportService implements RuleEngineTransportServ
     @Autowired
     private ActorSystemContext actorContext;
 
-    @Autowired
-    private ActorService actorService;
-
     //TODO: completely replace this routing with the Kafka routing by partition ids.
     @Autowired
     private ClusterRoutingService routingService;
diff --git a/application/src/main/java/org/thingsboard/server/service/transport/RemoteTransportApiService.java b/application/src/main/java/org/thingsboard/server/service/transport/RemoteTransportApiService.java
index 66a8226..ebd35a4 100644
--- a/application/src/main/java/org/thingsboard/server/service/transport/RemoteTransportApiService.java
+++ b/application/src/main/java/org/thingsboard/server/service/transport/RemoteTransportApiService.java
@@ -23,6 +23,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.stereotype.Component;
 import org.springframework.stereotype.Service;
 import org.thingsboard.server.common.data.Device;
 import org.thingsboard.server.common.data.id.DeviceId;
@@ -48,20 +49,22 @@ import org.thingsboard.server.service.cluster.discovery.DiscoveryService;
 import org.thingsboard.server.service.state.DeviceStateService;
 
 import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
 import java.util.UUID;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantLock;
 
 /**
  * Created by ashvayka on 05.10.18.
  */
 @Slf4j
-@Service
-@ConditionalOnProperty(prefix = "transport.remote", value = "enabled", havingValue = "true")
-public class RemoteTransportApiService implements TransportApiService {
-
-    private static final ObjectMapper mapper = new ObjectMapper();
+@Component
+@ConditionalOnProperty(prefix = "transport", value = "type", havingValue = "remote")
+public class RemoteTransportApiService {
 
     @Value("${transport.remote.transport_api.requests_topic}")
     private String transportApiRequestsTopic;
@@ -83,26 +86,15 @@ public class RemoteTransportApiService implements TransportApiService {
     private DiscoveryService discoveryService;
 
     @Autowired
-    private DeviceService deviceService;
-
-    @Autowired
-    private RelationService relationService;
-
-    @Autowired
-    private DeviceCredentialsService deviceCredentialsService;
-
-    @Autowired
-    private DeviceStateService deviceStateService;
+    private TransportApiService transportApiService;
 
     private ExecutorService transportCallbackExecutor;
 
     private TbKafkaResponseTemplate<TransportApiRequestMsg, TransportApiResponseMsg> transportApiTemplate;
 
-    private ReentrantLock deviceCreationLock = new ReentrantLock();
-
     @PostConstruct
     public void init() {
-        this.transportCallbackExecutor = Executors.newCachedThreadPool();
+        this.transportCallbackExecutor = new ThreadPoolExecutor(0, 100, 60L, TimeUnit.SECONDS, new SynchronousQueue<>());
 
         TBKafkaProducerTemplate.TBKafkaProducerTemplateBuilder<TransportApiResponseMsg> responseBuilder = TBKafkaProducerTemplate.builder();
         responseBuilder.settings(kafkaSettings);
@@ -126,98 +118,19 @@ public class RemoteTransportApiService implements TransportApiService {
         builder.requestTimeout(requestTimeout);
         builder.pollInterval(responsePollDuration);
         builder.executor(transportCallbackExecutor);
-        builder.handler(this);
+        builder.handler(transportApiService);
         transportApiTemplate = builder.build();
         transportApiTemplate.init();
     }
 
-    @Override
-    public ListenableFuture<TransportApiResponseMsg> handle(TransportApiRequestMsg transportApiRequestMsg) throws Exception {
-        if (transportApiRequestMsg.hasValidateTokenRequestMsg()) {
-            ValidateDeviceTokenRequestMsg msg = transportApiRequestMsg.getValidateTokenRequestMsg();
-            return validateCredentials(msg.getToken(), DeviceCredentialsType.ACCESS_TOKEN);
-        } else if (transportApiRequestMsg.hasValidateX509CertRequestMsg()) {
-            ValidateDeviceX509CertRequestMsg msg = transportApiRequestMsg.getValidateX509CertRequestMsg();
-            return validateCredentials(msg.getHash(), DeviceCredentialsType.X509_CERTIFICATE);
-        } else if (transportApiRequestMsg.hasGetOrCreateDeviceRequestMsg()) {
-            return handle(transportApiRequestMsg.getGetOrCreateDeviceRequestMsg());
+    @PreDestroy
+    public void destroy() {
+        if (transportApiTemplate != null) {
+            transportApiTemplate.stop();
         }
-        return getEmptyTransportApiResponseFuture();
-    }
-
-    private ListenableFuture<TransportApiResponseMsg> validateCredentials(String credentialsId, DeviceCredentialsType credentialsType) {
-        //TODO: Make async and enable caching
-        DeviceCredentials credentials = deviceCredentialsService.findDeviceCredentialsByCredentialsId(credentialsId);
-        if (credentials != null && credentials.getCredentialsType() == credentialsType) {
-            return getDeviceInfo(credentials.getDeviceId());
-        } else {
-            return getEmptyTransportApiResponseFuture();
+        if (transportCallbackExecutor != null) {
+            transportCallbackExecutor.shutdownNow();
         }
     }
 
-    private ListenableFuture<TransportApiResponseMsg> handle(GetOrCreateDeviceFromGatewayRequestMsg requestMsg) {
-        DeviceId gatewayId = new DeviceId(new UUID(requestMsg.getGatewayIdMSB(), requestMsg.getGatewayIdLSB()));
-        ListenableFuture<Device> gatewayFuture = deviceService.findDeviceByIdAsync(gatewayId);
-        return Futures.transform(gatewayFuture, gateway -> {
-            deviceCreationLock.lock();
-            try {
-                Device device = deviceService.findDeviceByTenantIdAndName(gateway.getTenantId(), requestMsg.getDeviceName());
-                if (device == null) {
-                    device = new Device();
-                    device.setTenantId(gateway.getTenantId());
-                    device.setName(requestMsg.getDeviceName());
-                    device.setType(requestMsg.getDeviceType());
-                    device.setCustomerId(gateway.getCustomerId());
-                    device = deviceService.saveDevice(device);
-                    relationService.saveRelationAsync(new EntityRelation(gateway.getId(), device.getId(), "Created"));
-                    deviceStateService.onDeviceAdded(device);
-                }
-                return TransportApiResponseMsg.newBuilder()
-                        .setGetOrCreateDeviceResponseMsg(GetOrCreateDeviceFromGatewayResponseMsg.newBuilder().setDeviceInfo(getDeviceInfoProto(device)).build()).build();
-            } catch (JsonProcessingException e) {
-                log.warn("[{}] Failed to lookup device by gateway id and name", gatewayId, requestMsg.getDeviceName(), e);
-                throw new RuntimeException(e);
-            } finally {
-                deviceCreationLock.unlock();
-            }
-        }, transportCallbackExecutor);
-    }
-
-
-    private ListenableFuture<TransportApiResponseMsg> getDeviceInfo(DeviceId deviceId) {
-        return Futures.transform(deviceService.findDeviceByIdAsync(deviceId), device -> {
-            if (device == null) {
-                log.trace("[{}] Failed to lookup device by id", deviceId);
-                return getEmptyTransportApiResponse();
-            }
-            try {
-                return TransportApiResponseMsg.newBuilder()
-                        .setValidateTokenResponseMsg(ValidateDeviceCredentialsResponseMsg.newBuilder().setDeviceInfo(getDeviceInfoProto(device)).build()).build();
-            } catch (JsonProcessingException e) {
-                log.warn("[{}] Failed to lookup device by id", deviceId, e);
-                return getEmptyTransportApiResponse();
-            }
-        });
-    }
-
-    private DeviceInfoProto getDeviceInfoProto(Device device) throws JsonProcessingException {
-        return DeviceInfoProto.newBuilder()
-                .setTenantIdMSB(device.getTenantId().getId().getMostSignificantBits())
-                .setTenantIdLSB(device.getTenantId().getId().getLeastSignificantBits())
-                .setDeviceIdMSB(device.getId().getId().getMostSignificantBits())
-                .setDeviceIdLSB(device.getId().getId().getLeastSignificantBits())
-                .setDeviceName(device.getName())
-                .setDeviceType(device.getType())
-                .setAdditionalInfo(mapper.writeValueAsString(device.getAdditionalInfo()))
-                .build();
-    }
-
-    private ListenableFuture<TransportApiResponseMsg> getEmptyTransportApiResponseFuture() {
-        return Futures.immediateFuture(getEmptyTransportApiResponse());
-    }
-
-    private TransportApiResponseMsg getEmptyTransportApiResponse() {
-        return TransportApiResponseMsg.newBuilder()
-                .setValidateTokenResponseMsg(ValidateDeviceCredentialsResponseMsg.getDefaultInstance()).build();
-    }
 }
diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml
index 923976c..529ab25 100644
--- a/application/src/main/resources/thingsboard.yml
+++ b/application/src/main/resources/thingsboard.yml
@@ -452,8 +452,8 @@ js:
     max_errors: "${REMOTE_JS_SANDBOX_MAX_ERRORS:3}"
 
 transport:
+  type: "${TRANSPORT_TYPE:remote}" # local or remote
   remote:
-    enabled: "${REMOTE_TRANSPORT_ENABLED:true}"
     transport_api:
       requests_topic: "${TB_TRANSPORT_API_REQUEST_TOPIC:tb.transport.api.requests}"
       responses_topic: "${TB_TRANSPORT_API_RESPONSE_TOPIC:tb.transport.api.responses}"
diff --git a/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaHandler.java b/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaHandler.java
index a64d28e..9f86155 100644
--- a/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaHandler.java
+++ b/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaHandler.java
@@ -22,6 +22,6 @@ import com.google.common.util.concurrent.ListenableFuture;
  */
 public interface TbKafkaHandler<Request, Response> {
 
-    ListenableFuture<Response> handle(Request request) throws Exception;
+    ListenableFuture<Response> handle(Request request);
 
 }
diff --git a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java
index 4742b3b..173fe8a 100644
--- a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java
+++ b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java
@@ -48,7 +48,6 @@ public class MqttTransportContext {
     private final ObjectMapper mapper = new ObjectMapper();
 
     @Autowired
-    @Lazy
     private TransportService transportService;
 
     @Autowired(required = false)
diff --git a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
index b1b176d..f9427a8 100644
--- a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
+++ b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
@@ -494,7 +494,6 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
             ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_NOT_AUTHORIZED));
             ctx.close();
         } else {
-            ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED));
             deviceSessionCtx.setDeviceInfo(msg.getDeviceInfo());
             sessionInfo = SessionInfoProto.newBuilder()
                     .setNodeId(context.getNodeId())
@@ -508,6 +507,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
             transportService.process(sessionInfo, getSessionEventMsg(SessionEvent.OPEN), null);
             transportService.registerSession(sessionInfo, this);
             checkGatewaySession();
+            ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED));
         }
     }
 
diff --git a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java
index b1504b1..7ef832f 100644
--- a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java
+++ b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java
@@ -87,10 +87,12 @@ public class GatewaySessionHandler {
         JsonElement json = getJson(msg);
         String deviceName = checkDeviceName(getDeviceName(json));
         String deviceType = getDeviceType(json);
+        log.trace("[{}] onDeviceConnect: {}", sessionId, deviceName);
         Futures.addCallback(onDeviceConnect(deviceName, deviceType), new FutureCallback<GatewayDeviceSessionCtx>() {
             @Override
             public void onSuccess(@Nullable GatewayDeviceSessionCtx result) {
                 ack(msg);
+                log.trace("[{}] onDeviceConnectOk: {}", sessionId, deviceName);
             }
 
             @Override
diff --git a/transport/mqtt-transport/src/main/resources/logback.xml b/transport/mqtt-transport/src/main/resources/logback.xml
new file mode 100644
index 0000000..18864a9
--- /dev/null
+++ b/transport/mqtt-transport/src/main/resources/logback.xml
@@ -0,0 +1,34 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+
+    Copyright © 2016-2018 The Thingsboard Authors
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+        http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+
+-->
+<!DOCTYPE configuration>
+<configuration scan="true" scanPeriod="10 seconds">
+
+    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+        <encoder>
+            <pattern>%d{ISO8601} [%thread] %-5level %logger{36} - %msg%n</pattern>
+        </encoder>
+    </appender>
+
+    <logger name="org.thingsboard.server" level="TRACE" />
+
+    <root level="INFO">
+        <appender-ref ref="STDOUT"/>
+    </root>
+
+</configuration>
\ No newline at end of file