thingsboard-developers

Changes

Details

diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultMsgQueueService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultMsgQueueService.java
index 9275847..416a4d6 100644
--- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultMsgQueueService.java
+++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultMsgQueueService.java
@@ -49,7 +49,7 @@ public class DefaultMsgQueueService implements MsgQueueService {
     @Autowired
     private MsgQueue msgQueue;
 
-    @Autowired
+    @Autowired(required = false)
     private TenantQuotaService quotaService;
 
     private ScheduledExecutorService cleanupExecutor;
@@ -74,7 +74,7 @@ public class DefaultMsgQueueService implements MsgQueueService {
 
     @Override
     public ListenableFuture<Void> put(TenantId tenantId, TbMsg msg, UUID nodeId, long clusterPartition) {
-        if(quotaService.isQuotaExceeded(tenantId.getId().toString())) {
+        if(quotaService != null && quotaService.isQuotaExceeded(tenantId.getId().toString())) {
             log.warn("Tenant TbMsg Quota exceeded for [{}:{}] . Reject", tenantId.getId());
             return Futures.immediateFailedFuture(new RuntimeException("Tenant TbMsg Quota exceeded"));
         }
diff --git a/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java b/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java
index 37a6211..b334ff1 100644
--- a/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java
+++ b/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java
@@ -103,9 +103,7 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService {
         responseBuilder.autoCommit(true);
         responseBuilder.autoCommitIntervalMs(autoCommitInterval);
         responseBuilder.decoder(new RemoteJsResponseDecoder());
-        responseBuilder.requestIdExtractor((response) -> {
-            return new UUID(response.getRequestIdMSB(), response.getRequestIdLSB());
-        });
+        responseBuilder.requestIdExtractor((response) -> new UUID(response.getRequestIdMSB(), response.getRequestIdLSB()));
 
         TbKafkaRequestTemplate.TbKafkaRequestTemplateBuilder
                 <JsInvokeProtos.RemoteJsRequest, JsInvokeProtos.RemoteJsResponse> builder = TbKafkaRequestTemplate.builder();
diff --git a/application/src/main/java/org/thingsboard/server/service/transport/RemoteTransportApiService.java b/application/src/main/java/org/thingsboard/server/service/transport/RemoteTransportApiService.java
index 7831b34..0c7b5f4 100644
--- a/application/src/main/java/org/thingsboard/server/service/transport/RemoteTransportApiService.java
+++ b/application/src/main/java/org/thingsboard/server/service/transport/RemoteTransportApiService.java
@@ -1,14 +1,165 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.thingsboard.server.service.transport;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
 import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 import org.springframework.stereotype.Service;
+import org.thingsboard.server.common.data.id.DeviceId;
+import org.thingsboard.server.common.data.security.DeviceCredentials;
+import org.thingsboard.server.common.data.security.DeviceCredentialsType;
+import org.thingsboard.server.dao.device.DeviceCredentialsService;
+import org.thingsboard.server.dao.device.DeviceService;
+import org.thingsboard.server.gen.transport.TransportProtos.DeviceInfoProto;
+import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceTokenRequestMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceTokenResponseMsg;
+import org.thingsboard.server.kafka.TBKafkaConsumerTemplate;
+import org.thingsboard.server.kafka.TBKafkaProducerTemplate;
+import org.thingsboard.server.kafka.TbKafkaResponseTemplate;
+import org.thingsboard.server.kafka.TbKafkaSettings;
+import org.thingsboard.server.service.cluster.discovery.DiscoveryService;
+
+import javax.annotation.PostConstruct;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 /**
  * Created by ashvayka on 05.10.18.
  */
 @Slf4j
 @Service
-@ConditionalOnProperty(prefix = "quota.rule.tenant", value = "enabled", havingValue = "true", matchIfMissing = false)
+@ConditionalOnProperty(prefix = "transport.remote", value = "enabled", havingValue = "true")
 public class RemoteTransportApiService implements TransportApiService {
+
+    private static final ObjectMapper mapper = new ObjectMapper();
+
+    @Value("${transport.remote.transport_api.requests_topic}")
+    private String transportApiRequestsTopic;
+    @Value("${transport.remote.transport_api.responses_topic}")
+    private String transportApiResponsesTopic;
+    @Value("${transport.remote.transport_api.max_pending_requests}")
+    private int maxPendingRequests;
+    @Value("${transport.remote.transport_api.request_timeout}")
+    private long requestTimeout;
+    @Value("${transport.remote.transport_api.request_poll_interval}")
+    private int responsePollDuration;
+    @Value("${transport.remote.transport_api.request_auto_commit_interval}")
+    private int autoCommitInterval;
+
+    @Autowired
+    private TbKafkaSettings kafkaSettings;
+
+    @Autowired
+    private DiscoveryService discoveryService;
+
+    @Autowired
+    private DeviceService deviceService;
+
+    @Autowired
+    private DeviceCredentialsService deviceCredentialsService;
+
+    private ExecutorService transportCallbackExecutor;
+
+    private TbKafkaResponseTemplate<TransportApiRequestMsg, TransportApiResponseMsg> transportApiTemplate;
+
+    @PostConstruct
+    public void init() {
+        this.transportCallbackExecutor = Executors.newCachedThreadPool();
+
+        TBKafkaProducerTemplate.TBKafkaProducerTemplateBuilder<TransportApiResponseMsg> responseBuilder = TBKafkaProducerTemplate.builder();
+        responseBuilder.settings(kafkaSettings);
+        responseBuilder.defaultTopic(transportApiResponsesTopic);
+        responseBuilder.encoder(new TransportApiResponseEncoder());
+
+        TBKafkaConsumerTemplate.TBKafkaConsumerTemplateBuilder<TransportApiRequestMsg> requestBuilder = TBKafkaConsumerTemplate.builder();
+        requestBuilder.settings(kafkaSettings);
+        requestBuilder.topic(transportApiRequestsTopic);
+        requestBuilder.clientId(discoveryService.getNodeId());
+        requestBuilder.groupId("tb-node");
+        requestBuilder.autoCommit(true);
+        requestBuilder.autoCommitIntervalMs(autoCommitInterval);
+        requestBuilder.decoder(new TransportApiRequestDecoder());
+
+        TbKafkaResponseTemplate.TbKafkaResponseTemplateBuilder
+                <TransportApiRequestMsg, TransportApiResponseMsg> builder = TbKafkaResponseTemplate.builder();
+        builder.requestTemplate(requestBuilder.build());
+        builder.responseTemplate(responseBuilder.build());
+        builder.maxPendingRequests(maxPendingRequests);
+        builder.requestTimeout(requestTimeout);
+        builder.pollInterval(responsePollDuration);
+        builder.executor(transportCallbackExecutor);
+        builder.handler(this);
+        transportApiTemplate = builder.build();
+        transportApiTemplate.init();
+    }
+
+    @Override
+    public ListenableFuture<TransportApiResponseMsg> handle(TransportApiRequestMsg transportApiRequestMsg) throws Exception {
+        if (transportApiRequestMsg.hasValidateTokenRequestMsg()) {
+            ValidateDeviceTokenRequestMsg msg = transportApiRequestMsg.getValidateTokenRequestMsg();
+            //TODO: Make async and enable caching
+            DeviceCredentials credentials = deviceCredentialsService.findDeviceCredentialsByCredentialsId(msg.getToken());
+            if (credentials != null && credentials.getCredentialsType() == DeviceCredentialsType.ACCESS_TOKEN) {
+                return getDeviceInfo(credentials.getDeviceId());
+            } else {
+                return getEmptyTransportApiResponseFuture();
+            }
+        }
+        return getEmptyTransportApiResponseFuture();
+    }
+
+    private ListenableFuture<TransportApiResponseMsg> getDeviceInfo(DeviceId deviceId) {
+        return Futures.transform(deviceService.findDeviceByIdAsync(deviceId), device -> {
+            if (device == null) {
+                log.trace("[{}] Failed to lookup device by id", deviceId);
+                return getEmptyTransportApiResponse();
+            }
+            try {
+                DeviceInfoProto deviceInfoProto = DeviceInfoProto.newBuilder()
+                        .setTenantIdMSB(device.getTenantId().getId().getMostSignificantBits())
+                        .setTenantIdLSB(device.getTenantId().getId().getLeastSignificantBits())
+                        .setDeviceIdMSB(device.getId().getId().getMostSignificantBits())
+                        .setDeviceIdLSB(device.getId().getId().getLeastSignificantBits())
+                        .setDeviceName(device.getName())
+                        .setDeviceType(device.getType())
+                        .setAdditionalInfo(mapper.writeValueAsString(device.getAdditionalInfo()))
+                        .build();
+                return TransportApiResponseMsg.newBuilder()
+                        .setValidateTokenResponseMsg(ValidateDeviceTokenResponseMsg.newBuilder().setDeviceInfo(deviceInfoProto).build()).build();
+            } catch (JsonProcessingException e) {
+                log.warn("[{}] Failed to lookup device by id", deviceId, e);
+                return getEmptyTransportApiResponse();
+            }
+        });
+    }
+
+    private ListenableFuture<TransportApiResponseMsg> getEmptyTransportApiResponseFuture() {
+        return Futures.immediateFuture(getEmptyTransportApiResponse());
+    }
+
+    private TransportApiResponseMsg getEmptyTransportApiResponse() {
+        return TransportApiResponseMsg.newBuilder()
+                .setValidateTokenResponseMsg(ValidateDeviceTokenResponseMsg.getDefaultInstance()).build();
+    }
 }
diff --git a/application/src/main/java/org/thingsboard/server/service/transport/TransportApiRequestDecoder.java b/application/src/main/java/org/thingsboard/server/service/transport/TransportApiRequestDecoder.java
new file mode 100644
index 0000000..c54f1a0
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/service/transport/TransportApiRequestDecoder.java
@@ -0,0 +1,31 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.service.transport;
+
+import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg;
+import org.thingsboard.server.kafka.TbKafkaDecoder;
+
+import java.io.IOException;
+
+/**
+ * Created by ashvayka on 05.10.18.
+ */
+public class TransportApiRequestDecoder implements TbKafkaDecoder<TransportApiRequestMsg> {
+    @Override
+    public TransportApiRequestMsg decode(byte[] data) throws IOException {
+        return TransportApiRequestMsg.parseFrom(data);
+    }
+}
diff --git a/application/src/main/java/org/thingsboard/server/service/transport/TransportApiResponseEncoder.java b/application/src/main/java/org/thingsboard/server/service/transport/TransportApiResponseEncoder.java
new file mode 100644
index 0000000..175a92d
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/service/transport/TransportApiResponseEncoder.java
@@ -0,0 +1,30 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.service.transport;
+
+import org.thingsboard.server.kafka.TbKafkaEncoder;
+
+import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg;
+
+/**
+ * Created by ashvayka on 05.10.18.
+ */
+public class TransportApiResponseEncoder implements TbKafkaEncoder<TransportApiResponseMsg> {
+    @Override
+    public byte[] encode(TransportApiResponseMsg value) {
+        return value.toByteArray();
+    }
+}
diff --git a/application/src/main/java/org/thingsboard/server/service/transport/TransportApiService.java b/application/src/main/java/org/thingsboard/server/service/transport/TransportApiService.java
index 28a769a..cc6f5b9 100644
--- a/application/src/main/java/org/thingsboard/server/service/transport/TransportApiService.java
+++ b/application/src/main/java/org/thingsboard/server/service/transport/TransportApiService.java
@@ -1,7 +1,25 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.thingsboard.server.service.transport;
 
+import org.thingsboard.server.gen.transport.TransportProtos;
+import org.thingsboard.server.kafka.TbKafkaHandler;
+
 /**
  * Created by ashvayka on 05.10.18.
  */
-public interface TransportApiService {
+public interface TransportApiService extends TbKafkaHandler<TransportProtos.TransportApiRequestMsg, TransportProtos.TransportApiResponseMsg> {
 }
diff --git a/application/src/main/java/org/thingsboard/server/ThingsboardServerApplication.java b/application/src/main/java/org/thingsboard/server/ThingsboardServerApplication.java
index c072311..751bde6 100644
--- a/application/src/main/java/org/thingsboard/server/ThingsboardServerApplication.java
+++ b/application/src/main/java/org/thingsboard/server/ThingsboardServerApplication.java
@@ -1,12 +1,12 @@
 /**
  * Copyright © 2016-2018 The Thingsboard Authors
- * <p>
+ *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml
index f35df67..c58f3a1 100644
--- a/application/src/main/resources/thingsboard.yml
+++ b/application/src/main/resources/thingsboard.yml
@@ -84,7 +84,7 @@ http:
 # MQTT server parameters
 mqtt:
   # Enable/disable mqtt transport protocol.
-  enabled: "${MQTT_ENABLED:true}"
+  enabled: "${MQTT_ENABLED:false}"
   bind_address: "${MQTT_BIND_ADDRESS:0.0.0.0}"
   bind_port: "${MQTT_BIND_PORT:1883}"
   adaptor: "${MQTT_ADAPTOR_NAME:JsonMqttAdaptor}"
@@ -149,7 +149,7 @@ quota:
       # Interval for scheduled task that cleans expired records. TTL is used for expiring
       cleanPeriodMs: "${QUOTA_TENANT_CLEAN_PERIOD_MS:300000}"
       # Enable Host API Limits
-      enabled: "${QUOTA_TENANT_ENABLED:false}"
+      enabled: "${QUOTA_TENANT_ENABLED:true}"
       # Array of whitelist tenants
       whitelist: "${QUOTA_TENANT_WHITELIST:}"
       # Array of blacklist tenants
@@ -450,3 +450,16 @@ js:
     response_auto_commit_interval: "${REMOTE_JS_RESPONSE_AUTO_COMMIT_INTERVAL_MS:100}"
     # Maximum allowed JavaScript execution errors before JavaScript will be blacklisted
     max_errors: "${REMOTE_JS_SANDBOX_MAX_ERRORS:3}"
+
+transport:
+  remote:
+    enabled: "${REMOTE_TRANSPORT_ENABLED:true}"
+    transport_api:
+      requests_topic: "${TB_TRANSPORT_API_REQUEST_TOPIC:tb.transport.api.requests}"
+      responses_topic: "${TB_TRANSPORT_API_RESPONSE_TOPIC:tb.transport.api.responses}"
+      max_pending_requests: "${TB_TRANSPORT_MAX_PENDING_REQUESTS:10000}"
+      request_timeout: "${TB_TRANSPORT_MAX_REQUEST_TIMEOUT:10000}"
+      request_poll_interval: "${TB_TRANSPORT_RESPONSE_POLL_INTERVAL_MS:25}"
+      request_auto_commit_interval: "${TB_TRANSPORT_RESPONSE_AUTO_COMMIT_INTERVAL_MS:1000}"
+    rule_engine:
+      topic: "${TB_RULE_ENGINE_TOPIC:tb.rule-engine}"
\ No newline at end of file
diff --git a/common/queue/src/main/java/org/thingsboard/server/kafka/AbstractTbKafkaTemplate.java b/common/queue/src/main/java/org/thingsboard/server/kafka/AbstractTbKafkaTemplate.java
index 8d851bc..386bfc5 100644
--- a/common/queue/src/main/java/org/thingsboard/server/kafka/AbstractTbKafkaTemplate.java
+++ b/common/queue/src/main/java/org/thingsboard/server/kafka/AbstractTbKafkaTemplate.java
@@ -1,12 +1,12 @@
 /**
  * Copyright © 2016-2018 The Thingsboard Authors
- * <p>
+ *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
diff --git a/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaHandler.java b/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaHandler.java
index 66d53c3..a64d28e 100644
--- a/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaHandler.java
+++ b/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaHandler.java
@@ -1,12 +1,27 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.thingsboard.server.kafka;
 
-import java.util.function.Consumer;
+import com.google.common.util.concurrent.ListenableFuture;
 
 /**
  * Created by ashvayka on 05.10.18.
  */
 public interface TbKafkaHandler<Request, Response> {
 
-    void handle(Request request, Consumer<Response> onSuccess, Consumer<Throwable> onFailure);
+    ListenableFuture<Response> handle(Request request) throws Exception;
 
 }
diff --git a/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaProducerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaProducerTemplate.java
index 2611e94..1e109d2 100644
--- a/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaProducerTemplate.java
+++ b/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaProducerTemplate.java
@@ -1,12 +1,12 @@
 /**
  * Copyright © 2016-2018 The Thingsboard Authors
- * <p>
+ *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -101,6 +101,10 @@ public class TBKafkaProducerTemplate<T> {
         return send(this.defaultTopic, key, value, timestamp, headers);
     }
 
+    public Future<RecordMetadata> send(String topic, String key, T value, Iterable<Header> headers) {
+        return send(topic, key, value, null, headers);
+    }
+
     public Future<RecordMetadata> send(String topic, String key, T value, Long timestamp, Iterable<Header> headers) {
         byte[] data = encoder.encode(value);
         ProducerRecord<String, byte[]> record;
diff --git a/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaResponseTemplate.java b/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaResponseTemplate.java
index 1f53387..0dbf45d 100644
--- a/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaResponseTemplate.java
+++ b/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaResponseTemplate.java
@@ -1,12 +1,12 @@
 /**
  * Copyright © 2016-2018 The Thingsboard Authors
- * <p>
+ *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -21,13 +21,15 @@ import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.common.header.Header;
 import org.apache.kafka.common.header.internals.RecordHeader;
 
-import java.io.IOException;
 import java.time.Duration;
 import java.util.Collections;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 
 /**
@@ -40,12 +42,14 @@ public class TbKafkaResponseTemplate<Request, Response> extends AbstractTbKafkaT
     private final TBKafkaProducerTemplate<Response> responseTemplate;
     private final TbKafkaHandler<Request, Response> handler;
     private final ConcurrentMap<UUID, String> pendingRequests;
-    private final ExecutorService executor;
+    private final ExecutorService loopExecutor;
+    private final ScheduledExecutorService timeoutExecutor;
+    private final ExecutorService callbackExecutor;
     private final int maxPendingRequests;
+    private final long requestTimeout;
 
     private final long pollInterval;
     private volatile boolean stopped = false;
-    //TODO:
     private final AtomicInteger pendingRequestCount = new AtomicInteger();
 
     @Builder
@@ -53,6 +57,7 @@ public class TbKafkaResponseTemplate<Request, Response> extends AbstractTbKafkaT
                                    TBKafkaProducerTemplate<Response> responseTemplate,
                                    TbKafkaHandler<Request, Response> handler,
                                    long pollInterval,
+                                   long requestTimeout,
                                    int maxPendingRequests,
                                    ExecutorService executor) {
         this.requestTemplate = requestTemplate;
@@ -61,18 +66,24 @@ public class TbKafkaResponseTemplate<Request, Response> extends AbstractTbKafkaT
         this.pendingRequests = new ConcurrentHashMap<>();
         this.maxPendingRequests = maxPendingRequests;
         this.pollInterval = pollInterval;
-        this.executor = executor;
+        this.requestTimeout = requestTimeout;
+        this.callbackExecutor = executor;
+        this.timeoutExecutor = Executors.newSingleThreadScheduledExecutor();
+        this.loopExecutor = Executors.newSingleThreadExecutor();
     }
 
     public void init() {
         this.responseTemplate.init();
         requestTemplate.subscribe();
-        executor.submit(() -> {
+        loopExecutor.submit(() -> {
             while (!stopped) {
-                if(pendingRequestCount.get() > maxPendingRequests){
-
+                while (pendingRequestCount.get() >= maxPendingRequests) {
+                    try {
+                        Thread.sleep(pollInterval);
+                    } catch (InterruptedException e) {
+                        log.trace("Failed to wait until the server has capacity to handle new requests", e);
+                    }
                 }
-                //TODO: we need to protect from reading too much requests.
                 ConsumerRecords<String, byte[]> requests = requestTemplate.poll(Duration.ofMillis(pollInterval));
                 requests.forEach(request -> {
                     Header requestIdHeader = request.headers().lastHeader(TbKafkaSettings.REQUEST_ID_HEADER);
@@ -92,12 +103,27 @@ public class TbKafkaResponseTemplate<Request, Response> extends AbstractTbKafkaT
                     }
                     String responseTopic = bytesToString(responseTopicHeader.value());
                     try {
+                        pendingRequestCount.getAndIncrement();
                         Request decodedRequest = requestTemplate.decode(request);
-                        executor.submit(() -> handler.handle(decodedRequest,
-                                response -> reply(requestId, responseTopic, response),
-                                e -> log.error("[{}] Failed to process the request: {}", requestId, request, e)));
+                        AsyncCallbackTemplate.withCallbackAndTimeout(handler.handle(decodedRequest),
+                                response -> {
+                                    pendingRequestCount.decrementAndGet();
+                                    reply(requestId, responseTopic, response);
+                                },
+                                e -> {
+                                    pendingRequestCount.decrementAndGet();
+                                    if (e.getCause() != null && e.getCause() instanceof TimeoutException) {
+                                        log.warn("[{}] Timedout to process the request: {}", requestId, request, e);
+                                    } else {
+                                        log.trace("[{}] Failed to process the request: {}", requestId, request, e);
+                                    }
+                                },
+                                requestTimeout,
+                                timeoutExecutor,
+                                callbackExecutor);
                     } catch (Throwable e) {
-                        log.error("[{}] Failed to process the request: {}", requestId, request, e);
+                        pendingRequestCount.decrementAndGet();
+                        log.warn("[{}] Failed to process the request: {}", requestId, request, e);
                     }
                 });
             }
@@ -106,10 +132,16 @@ public class TbKafkaResponseTemplate<Request, Response> extends AbstractTbKafkaT
 
     public void stop() {
         stopped = true;
+        if (timeoutExecutor != null) {
+            timeoutExecutor.shutdownNow();
+        }
+        if (loopExecutor != null) {
+            loopExecutor.shutdownNow();
+        }
     }
 
     private void reply(UUID requestId, String topic, Response response) {
-        responseTemplate.send(topic, response, Collections.singletonList(new RecordHeader(TbKafkaSettings.REQUEST_ID_HEADER, uuidToBytes(requestId))));
+        responseTemplate.send(topic, requestId.toString(), response, Collections.singletonList(new RecordHeader(TbKafkaSettings.REQUEST_ID_HEADER, uuidToBytes(requestId))));
     }
 
 }
diff --git a/common/transport/src/main/proto/transport.proto b/common/transport/src/main/proto/transport.proto
index f0cc3c7..029bb14 100644
--- a/common/transport/src/main/proto/transport.proto
+++ b/common/transport/src/main/proto/transport.proto
@@ -47,11 +47,13 @@ message TsKvListProto {
 }
 
 message DeviceInfoProto {
-  int64 deviceIdMSB = 1;
-  int64 deviceIdLSB = 2;
-  string deviceName = 3;
-  string deviceType = 4;
-  string additionalInfo = 5;
+  int64 tenantIdMSB = 1;
+  int64 tenantIdLSB = 2;
+  int64 deviceIdMSB = 3;
+  int64 deviceIdLSB = 4;
+  string deviceName = 5;
+  string deviceType = 6;
+  string additionalInfo = 7;
 }
 
 /**
diff --git a/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java b/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java
index 8a596d7..0853124 100644
--- a/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java
+++ b/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java
@@ -157,10 +157,10 @@ public class CoapTransportResource extends CoapResource {
 
         CoapSessionCtx ctx = new CoapSessionCtx(exchange, adaptor, processor, authService, timeout);
 
-        if (!ctx.login(credentials.get())) {
-            exchange.respond(ResponseCode.UNAUTHORIZED);
-            return Optional.empty();
-        }
+//        if (!ctx.login(credentials.get())) {
+//            exchange.respond(ResponseCode.UNAUTHORIZED);
+//            return Optional.empty();
+//        }
 
         AdaptorToSessionActorMsg msg;
         try {
@@ -190,7 +190,7 @@ public class CoapTransportResource extends CoapResource {
                     throw new IllegalArgumentException("Unsupported msg type: " + type);
             }
             log.trace("Processing msg: {}", msg);
-            processor.process(new BasicTransportToDeviceSessionActorMsg(ctx.getDevice(), msg));
+//            processor.process(new BasicTransportToDeviceSessionActorMsg(ctx.getDevice(), msg));
         } catch (AdaptorException e) {
             log.debug("Failed to decode payload {}", e);
             exchange.respond(ResponseCode.BAD_REQUEST, e.getMessage());
diff --git a/transport/coap/src/main/java/org/thingsboard/server/transport/coap/session/CoapSessionCtx.java b/transport/coap/src/main/java/org/thingsboard/server/transport/coap/session/CoapSessionCtx.java
index 7a703b2..c43a4de 100644
--- a/transport/coap/src/main/java/org/thingsboard/server/transport/coap/session/CoapSessionCtx.java
+++ b/transport/coap/src/main/java/org/thingsboard/server/transport/coap/session/CoapSessionCtx.java
@@ -49,7 +49,7 @@ public class CoapSessionCtx extends DeviceAwareSessionContext {
     private final AtomicInteger seqNumber = new AtomicInteger(2);
 
     public CoapSessionCtx(CoapExchange exchange, CoapTransportAdaptor adaptor, SessionMsgProcessor processor, DeviceAuthService authService, long timeout) {
-        super(processor, authService);
+        super();
         Request request = exchange.advanced().getRequest();
         this.token = request.getTokenString();
         this.sessionId = new CoapSessionId(request.getSource().getHostAddress(), request.getSourcePort(), this.token);
@@ -112,7 +112,7 @@ public class CoapSessionCtx extends DeviceAwareSessionContext {
 
     public void close() {
         log.info("[{}] Closing processing context. Timeout: {}", sessionId, exchange.advanced().isTimedOut());
-        processor.process(exchange.advanced().isTimedOut() ? SessionCloseMsg.onTimeout(sessionId) : SessionCloseMsg.onError(sessionId));
+//        processor.process(exchange.advanced().isTimedOut() ? SessionCloseMsg.onTimeout(sessionId) : SessionCloseMsg.onError(sessionId));
     }
 
     @Override
diff --git a/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java b/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java
index d26d076..add89b1 100644
--- a/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java
+++ b/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java
@@ -73,20 +73,20 @@ public class DeviceApiController {
         if (quotaExceeded(httpRequest, responseWriter)) {
             return responseWriter;
         }
-        HttpSessionCtx ctx = getHttpSessionCtx(responseWriter);
-        if (ctx.login(new DeviceTokenCredentials(deviceToken))) {
-            GetAttributesRequest request;
-            if (StringUtils.isEmpty(clientKeys) && StringUtils.isEmpty(sharedKeys)) {
-                request = new BasicGetAttributesRequest(0);
-            } else {
-                Set<String> clientKeySet = !StringUtils.isEmpty(clientKeys) ? new HashSet<>(Arrays.asList(clientKeys.split(","))) : null;
-                Set<String> sharedKeySet = !StringUtils.isEmpty(sharedKeys) ? new HashSet<>(Arrays.asList(sharedKeys.split(","))) : null;
-                request = new BasicGetAttributesRequest(0, clientKeySet, sharedKeySet);
-            }
-            process(ctx, request);
-        } else {
-            responseWriter.setResult(new ResponseEntity<>(HttpStatus.UNAUTHORIZED));
-        }
+//        HttpSessionCtx ctx = getHttpSessionCtx(responseWriter);
+//        if (ctx.login(new DeviceTokenCredentials(deviceToken))) {
+//            GetAttributesRequest request;
+//            if (StringUtils.isEmpty(clientKeys) && StringUtils.isEmpty(sharedKeys)) {
+//                request = new BasicGetAttributesRequest(0);
+//            } else {
+//                Set<String> clientKeySet = !StringUtils.isEmpty(clientKeys) ? new HashSet<>(Arrays.asList(clientKeys.split(","))) : null;
+//                Set<String> sharedKeySet = !StringUtils.isEmpty(sharedKeys) ? new HashSet<>(Arrays.asList(sharedKeys.split(","))) : null;
+//                request = new BasicGetAttributesRequest(0, clientKeySet, sharedKeySet);
+//            }
+//            process(ctx, request);
+//        } else {
+//            responseWriter.setResult(new ResponseEntity<>(HttpStatus.UNAUTHORIZED));
+//        }
 
         return responseWriter;
     }
@@ -98,16 +98,16 @@ public class DeviceApiController {
         if (quotaExceeded(request, responseWriter)) {
             return responseWriter;
         }
-        HttpSessionCtx ctx = getHttpSessionCtx(responseWriter);
-        if (ctx.login(new DeviceTokenCredentials(deviceToken))) {
-            try {
-                process(ctx, JsonConverter.convertToAttributes(new JsonParser().parse(json)));
-            } catch (IllegalStateException | JsonSyntaxException ex) {
-                responseWriter.setResult(new ResponseEntity<>(HttpStatus.BAD_REQUEST));
-            }
-        } else {
-            responseWriter.setResult(new ResponseEntity<>(HttpStatus.UNAUTHORIZED));
-        }
+//        HttpSessionCtx ctx = getHttpSessionCtx(responseWriter);
+//        if (ctx.login(new DeviceTokenCredentials(deviceToken))) {
+//            try {
+//                process(ctx, JsonConverter.convertToAttributes(new JsonParser().parse(json)));
+//            } catch (IllegalStateException | JsonSyntaxException ex) {
+//                responseWriter.setResult(new ResponseEntity<>(HttpStatus.BAD_REQUEST));
+//            }
+//        } else {
+//            responseWriter.setResult(new ResponseEntity<>(HttpStatus.UNAUTHORIZED));
+//        }
         return responseWriter;
     }
 
@@ -119,15 +119,15 @@ public class DeviceApiController {
             return responseWriter;
         }
         HttpSessionCtx ctx = getHttpSessionCtx(responseWriter);
-        if (ctx.login(new DeviceTokenCredentials(deviceToken))) {
-            try {
-                process(ctx, JsonConverter.convertToTelemetry(new JsonParser().parse(json)));
-            } catch (IllegalStateException | JsonSyntaxException ex) {
-                responseWriter.setResult(new ResponseEntity<>(HttpStatus.BAD_REQUEST));
-            }
-        } else {
-            responseWriter.setResult(new ResponseEntity<>(HttpStatus.UNAUTHORIZED));
-        }
+//        if (ctx.login(new DeviceTokenCredentials(deviceToken))) {
+//            try {
+//                process(ctx, JsonConverter.convertToTelemetry(new JsonParser().parse(json)));
+//            } catch (IllegalStateException | JsonSyntaxException ex) {
+//                responseWriter.setResult(new ResponseEntity<>(HttpStatus.BAD_REQUEST));
+//            }
+//        } else {
+//            responseWriter.setResult(new ResponseEntity<>(HttpStatus.UNAUTHORIZED));
+//        }
         return responseWriter;
     }
 
@@ -147,17 +147,17 @@ public class DeviceApiController {
         if (quotaExceeded(request, responseWriter)) {
             return responseWriter;
         }
-        HttpSessionCtx ctx = getHttpSessionCtx(responseWriter);
-        if (ctx.login(new DeviceTokenCredentials(deviceToken))) {
-            try {
-                JsonObject response = new JsonParser().parse(json).getAsJsonObject();
-                process(ctx, new ToDeviceRpcResponseMsg(requestId, response.toString()));
-            } catch (IllegalStateException | JsonSyntaxException ex) {
-                responseWriter.setResult(new ResponseEntity<>(HttpStatus.BAD_REQUEST));
-            }
-        } else {
-            responseWriter.setResult(new ResponseEntity<>(HttpStatus.UNAUTHORIZED));
-        }
+//        HttpSessionCtx ctx = getHttpSessionCtx(responseWriter);
+//        if (ctx.login(new DeviceTokenCredentials(deviceToken))) {
+//            try {
+//                JsonObject response = new JsonParser().parse(json).getAsJsonObject();
+//                process(ctx, new ToDeviceRpcResponseMsg(requestId, response.toString()));
+//            } catch (IllegalStateException | JsonSyntaxException ex) {
+//                responseWriter.setResult(new ResponseEntity<>(HttpStatus.BAD_REQUEST));
+//            }
+//        } else {
+//            responseWriter.setResult(new ResponseEntity<>(HttpStatus.UNAUTHORIZED));
+//        }
         return responseWriter;
     }
 
@@ -169,18 +169,18 @@ public class DeviceApiController {
             return responseWriter;
         }
         HttpSessionCtx ctx = getHttpSessionCtx(responseWriter);
-        if (ctx.login(new DeviceTokenCredentials(deviceToken))) {
-            try {
-                JsonObject request = new JsonParser().parse(json).getAsJsonObject();
-                process(ctx, new ToServerRpcRequestMsg(0,
-                        request.get("method").getAsString(),
-                        request.get("params").toString()));
-            } catch (IllegalStateException | JsonSyntaxException ex) {
-                responseWriter.setResult(new ResponseEntity<>(HttpStatus.BAD_REQUEST));
-            }
-        } else {
-            responseWriter.setResult(new ResponseEntity<>(HttpStatus.UNAUTHORIZED));
-        }
+//        if (ctx.login(new DeviceTokenCredentials(deviceToken))) {
+//            try {
+//                JsonObject request = new JsonParser().parse(json).getAsJsonObject();
+//                process(ctx, new ToServerRpcRequestMsg(0,
+//                        request.get("method").getAsString(),
+//                        request.get("params").toString()));
+//            } catch (IllegalStateException | JsonSyntaxException ex) {
+//                responseWriter.setResult(new ResponseEntity<>(HttpStatus.BAD_REQUEST));
+//            }
+//        } else {
+//            responseWriter.setResult(new ResponseEntity<>(HttpStatus.UNAUTHORIZED));
+//        }
         return responseWriter;
     }
 
@@ -197,16 +197,16 @@ public class DeviceApiController {
         if (quotaExceeded(httpRequest, responseWriter)) {
             return responseWriter;
         }
-        HttpSessionCtx ctx = getHttpSessionCtx(responseWriter, timeout);
-        if (ctx.login(new DeviceTokenCredentials(deviceToken))) {
-            try {
-                process(ctx, msg);
-            } catch (IllegalStateException | JsonSyntaxException ex) {
-                responseWriter.setResult(new ResponseEntity<>(HttpStatus.BAD_REQUEST));
-            }
-        } else {
-            responseWriter.setResult(new ResponseEntity<>(HttpStatus.UNAUTHORIZED));
-        }
+//        HttpSessionCtx ctx = getHttpSessionCtx(responseWriter, timeout);
+//        if (ctx.login(new DeviceTokenCredentials(deviceToken))) {
+//            try {
+//                process(ctx, msg);
+//            } catch (IllegalStateException | JsonSyntaxException ex) {
+//                responseWriter.setResult(new ResponseEntity<>(HttpStatus.BAD_REQUEST));
+//            }
+//        } else {
+//            responseWriter.setResult(new ResponseEntity<>(HttpStatus.UNAUTHORIZED));
+//        }
         return responseWriter;
     }
 
@@ -220,7 +220,7 @@ public class DeviceApiController {
 
     private void process(HttpSessionCtx ctx, FromDeviceMsg request) {
         AdaptorToSessionActorMsg msg = new BasicAdaptorToSessionActorMsg(ctx, request);
-        processor.process(new BasicTransportToDeviceSessionActorMsg(ctx.getDevice(), msg));
+//        processor.process(new BasicTransportToDeviceSessionActorMsg(ctx.getDevice(), msg));
     }
 
     private boolean quotaExceeded(HttpServletRequest request, DeferredResult<ResponseEntity> responseWriter) {
diff --git a/transport/http/src/main/java/org/thingsboard/server/transport/http/session/HttpSessionCtx.java b/transport/http/src/main/java/org/thingsboard/server/transport/http/session/HttpSessionCtx.java
index 4732785..e503409 100644
--- a/transport/http/src/main/java/org/thingsboard/server/transport/http/session/HttpSessionCtx.java
+++ b/transport/http/src/main/java/org/thingsboard/server/transport/http/session/HttpSessionCtx.java
@@ -43,7 +43,7 @@ public class HttpSessionCtx extends DeviceAwareSessionContext {
     private final DeferredResult<ResponseEntity> responseWriter;
 
     public HttpSessionCtx(SessionMsgProcessor processor, DeviceAuthService authService, DeferredResult<ResponseEntity> responseWriter, long timeout) {
-        super(processor, authService);
+        super();
         this.sessionId = new HttpSessionId();
         this.responseWriter = responseWriter;
         this.timeout = timeout;
diff --git a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java
index cc40ee0..b500a31 100644
--- a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java
+++ b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java
@@ -1,3 +1,18 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.thingsboard.server.transport.mqtt;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
diff --git a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
index 8c64e7e..a318b30 100644
--- a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
+++ b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
@@ -1,12 +1,12 @@
 /**
  * Copyright © 2016-2018 The Thingsboard Authors
- * <p>
+ *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -388,7 +388,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
                                 ctx.close();
                             } else {
                                 ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED));
-                                deviceSessionCtx.setDeviceInfo(deviceSessionCtx.getDeviceInfo());
+                                deviceSessionCtx.setDeviceInfo(msg.getDeviceInfo());
                                 transportService.process(getSessionEventMsg(SessionEvent.OPEN), null);
                                 checkGatewaySession();
                             }
diff --git a/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/MqttTransportService.java b/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/MqttTransportService.java
index a1641f1..f5fd690 100644
--- a/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/MqttTransportService.java
+++ b/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/MqttTransportService.java
@@ -1,3 +1,18 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.thingsboard.server.mqtt.service;
 
 import org.springframework.beans.factory.annotation.Autowired;
@@ -5,6 +20,7 @@ import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Service;
 import org.thingsboard.server.common.transport.TransportService;
 import org.thingsboard.server.common.transport.TransportServiceCallback;
+import org.thingsboard.server.kafka.AsyncCallbackTemplate;
 import org.thingsboard.server.kafka.TBKafkaConsumerTemplate;
 import org.thingsboard.server.kafka.TBKafkaProducerTemplate;
 import org.thingsboard.server.kafka.TbKafkaRequestTemplate;
@@ -23,19 +39,19 @@ import java.util.concurrent.Executors;
 @Service
 public class MqttTransportService implements TransportService {
 
-    @Value("${kafka.rule-engine.topic}")
+    @Value("${kafka.rule_engine.topic}")
     private String ruleEngineTopic;
-    @Value("${kafka.transport-api.requests-topic}")
+    @Value("${kafka.transport_api.requests_topic}")
     private String transportApiRequestsTopic;
-    @Value("${kafka.transport-api.responses-topic}")
+    @Value("${kafka.transport_api.responses_topic}")
     private String transportApiResponsesTopic;
-    @Value("${kafka.transport-api.max_pending_requests}")
+    @Value("${kafka.transport_api.max_pending_requests}")
     private long maxPendingRequests;
-    @Value("${kafka.transport-api.max_requests_timeout}")
+    @Value("${kafka.transport_api.max_requests_timeout}")
     private long maxRequestsTimeout;
-    @Value("${kafka.transport-api.response_poll_interval}")
+    @Value("${kafka.transport_api.response_poll_interval}")
     private int responsePollDuration;
-    @Value("${kafka.transport-api.response_auto_commit_interval}")
+    @Value("${kafka.transport_api.response_auto_commit_interval}")
     private int autoCommitInterval;
 
     @Autowired
diff --git a/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/TransportApiRequestEncoder.java b/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/TransportApiRequestEncoder.java
index f931db6..8ae4d17 100644
--- a/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/TransportApiRequestEncoder.java
+++ b/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/TransportApiRequestEncoder.java
@@ -1,3 +1,18 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.thingsboard.server.mqtt.service;
 
 import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg;
diff --git a/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/TransportApiResponseDecoder.java b/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/TransportApiResponseDecoder.java
index 22e1647..66e64b5 100644
--- a/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/TransportApiResponseDecoder.java
+++ b/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/TransportApiResponseDecoder.java
@@ -1,3 +1,18 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.thingsboard.server.mqtt.service;
 
 import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg;
diff --git a/transport/mqtt-transport/src/main/resources/tb-mqtt-transport.yml b/transport/mqtt-transport/src/main/resources/tb-mqtt-transport.yml
index a2a0d54..707fb4b 100644
--- a/transport/mqtt-transport/src/main/resources/tb-mqtt-transport.yml
+++ b/transport/mqtt-transport/src/main/resources/tb-mqtt-transport.yml
@@ -73,13 +73,12 @@ kafka:
   batch.size: "${TB_KAFKA_BATCH_SIZE:16384}"
   linger.ms: "${TB_KAFKA_LINGER_MS:1}"
   buffer.memory: "${TB_BUFFER_MEMORY:33554432}"
-  transport-api:
-    requests-topic: "${TB_TRANSPORT_API_REQUEST_TOPIC:tb.transport.api.requests}"
-    responses-topic: "${TB_TRANSPORT_API_RESPONSE_TOPIC:tb.transport.api.responses}"
+  transport_api:
+    requests_topic: "${TB_TRANSPORT_API_REQUEST_TOPIC:tb.transport.api.requests}"
+    responses_topic: "${TB_TRANSPORT_API_RESPONSE_TOPIC:tb.transport.api.responses}"
     max_pending_requests: "${TB_TRANSPORT_MAX_PENDING_REQUESTS:10000}"
     max_requests_timeout: "${TB_TRANSPORT_MAX_REQUEST_TIMEOUT:10000}"
     response_poll_interval: "${TB_TRANSPORT_RESPONSE_POLL_INTERVAL_MS:25}"
     response_auto_commit_interval: "${TB_TRANSPORT_RESPONSE_AUTO_COMMIT_INTERVAL_MS:100}"
-    # Maximum allowed JavaScript execution errors before JavaScript will be blacklisted
-  rule-engine:
+  rule_engine:
     topic: "${TB_RULE_ENGINE_TOPIC:tb.rule-engine}"