thingsboard-developers

Some refactoring

10/5/2018 12:52:59 PM

Changes

pom.xml 2(+1 -1)

transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java 280(+0 -280)

Details

diff --git a/application/pom.xml b/application/pom.xml
index c4766f2..bff96ec 100644
--- a/application/pom.xml
+++ b/application/pom.xml
@@ -70,7 +70,7 @@
         </dependency>
         <dependency>
             <groupId>org.thingsboard.transport</groupId>
-            <artifactId>mqtt</artifactId>
+            <artifactId>mqtt-common</artifactId>
         </dependency>
         <dependency>
             <groupId>org.thingsboard</groupId>
diff --git a/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaConsumerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaConsumerTemplate.java
index 3972264..3adb1c3 100644
--- a/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaConsumerTemplate.java
+++ b/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaConsumerTemplate.java
@@ -43,7 +43,8 @@ public class TBKafkaConsumerTemplate<T> {
     private final String topic;
 
     @Builder
-    private TBKafkaConsumerTemplate(TbKafkaSettings settings, TbKafkaDecoder<T> decoder, TbKafkaRequestIdExtractor<T> requestIdExtractor,
+    private TBKafkaConsumerTemplate(TbKafkaSettings settings, TbKafkaDecoder<T> decoder,
+                                    TbKafkaRequestIdExtractor<T> requestIdExtractor,
                                     String clientId, String groupId, String topic,
                                     boolean autoCommit, int autoCommitIntervalMs) {
         Properties props = settings.toProps();
diff --git a/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaHandler.java b/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaHandler.java
new file mode 100644
index 0000000..66d53c3
--- /dev/null
+++ b/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaHandler.java
@@ -0,0 +1,12 @@
+package org.thingsboard.server.kafka;
+
+import java.util.function.Consumer;
+
+/**
+ * Created by ashvayka on 05.10.18.
+ */
+public interface TbKafkaHandler<Request, Response> {
+
+    void handle(Request request, Consumer<Response> onSuccess, Consumer<Throwable> onFailure);
+
+}
diff --git a/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaProducerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaProducerTemplate.java
index 7e24ad0..2611e94 100644
--- a/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaProducerTemplate.java
+++ b/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaProducerTemplate.java
@@ -1,12 +1,12 @@
 /**
  * Copyright © 2016-2018 The Thingsboard Authors
- *
+ * <p>
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -27,13 +27,12 @@ import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.header.Header;
 
-import java.nio.ByteBuffer;
-import java.util.Arrays;
 import java.util.List;
 import java.util.Properties;
 import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Future;
-import java.util.function.BiConsumer;
 
 /**
  * Created by ashvayka on 24.09.18.
@@ -48,7 +47,7 @@ public class TBKafkaProducerTemplate<T> {
     private TbKafkaEnricher<T> enricher = ((value, responseTopic, requestId) -> value);
 
     private final TbKafkaPartitioner<T> partitioner;
-    private List<PartitionInfo> partitionInfoList;
+    private ConcurrentMap<String, List<PartitionInfo>> partitionInfoMap;
     @Getter
     private final String defaultTopic;
 
@@ -78,11 +77,16 @@ public class TBKafkaProducerTemplate<T> {
             log.trace("Failed to create topic: {}", e.getMessage(), e);
         }
         //Maybe this should not be cached, but we don't plan to change size of partitions
-        this.partitionInfoList = producer.partitionsFor(defaultTopic);
+        this.partitionInfoMap = new ConcurrentHashMap<>();
+        this.partitionInfoMap.putIfAbsent(defaultTopic, producer.partitionsFor(defaultTopic));
     }
 
-    public T enrich(T value, String responseTopic, UUID requestId) {
-        return enricher.enrich(value, responseTopic, requestId);
+    T enrich(T value, String responseTopic, UUID requestId) {
+        if (enricher != null) {
+            return enricher.enrich(value, responseTopic, requestId);
+        } else {
+            return value;
+        }
     }
 
     public Future<RecordMetadata> send(String key, T value) {
@@ -101,7 +105,7 @@ public class TBKafkaProducerTemplate<T> {
         byte[] data = encoder.encode(value);
         ProducerRecord<String, byte[]> record;
         Integer partition = getPartition(topic, key, value, data);
-        record = new ProducerRecord<>(this.defaultTopic, partition, timestamp, key, data, headers);
+        record = new ProducerRecord<>(topic, partition, timestamp, key, data, headers);
         return producer.send(record);
     }
 
@@ -109,7 +113,7 @@ public class TBKafkaProducerTemplate<T> {
         if (partitioner == null) {
             return null;
         } else {
-            return partitioner.partition(this.defaultTopic, key, value, data, partitionInfoList);
+            return partitioner.partition(topic, key, value, data, partitionInfoMap.computeIfAbsent(topic, producer::partitionsFor));
         }
     }
 }
diff --git a/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaRequestTemplate.java b/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaRequestTemplate.java
index 8a0f529..30b20e7 100644
--- a/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaRequestTemplate.java
+++ b/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaRequestTemplate.java
@@ -93,13 +93,12 @@ public class TbKafkaRequestTemplate<Request, Response> {
                 ConsumerRecords<String, byte[]> responses = responseTemplate.poll(Duration.ofMillis(pollInterval));
                 responses.forEach(response -> {
                     Header requestIdHeader = response.headers().lastHeader(TbKafkaSettings.REQUEST_ID_HEADER);
-                    Response decocedResponse = null;
+                    Response decodedResponse = null;
                     UUID requestId = null;
                     if (requestIdHeader == null) {
                         try {
-                            decocedResponse = responseTemplate.decode(response);
-                            requestId = responseTemplate.extractRequestId(decocedResponse);
-
+                            decodedResponse = responseTemplate.decode(response);
+                            requestId = responseTemplate.extractRequestId(decodedResponse);
                         } catch (IOException e) {
                             log.error("Failed to decode response", e);
                         }
@@ -107,17 +106,17 @@ public class TbKafkaRequestTemplate<Request, Response> {
                         requestId = bytesToUuid(requestIdHeader.value());
                     }
                     if (requestId == null) {
-                        log.error("[{}] Missing requestId in header and response", response);
+                        log.error("[{}] Missing requestId in header and body", response);
                     } else {
                         ResponseMetaData<Response> expectedResponse = pendingRequests.remove(requestId);
                         if (expectedResponse == null) {
                             log.trace("[{}] Invalid or stale request", requestId);
                         } else {
                             try {
-                                if (decocedResponse == null) {
-                                    decocedResponse = responseTemplate.decode(response);
+                                if (decodedResponse == null) {
+                                    decodedResponse = responseTemplate.decode(response);
                                 }
-                                expectedResponse.future.set(decocedResponse);
+                                expectedResponse.future.set(decodedResponse);
                             } catch (IOException e) {
                                 expectedResponse.future.setException(e);
                             }
diff --git a/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaResponseTemplate.java b/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaResponseTemplate.java
new file mode 100644
index 0000000..536c4b9
--- /dev/null
+++ b/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaResponseTemplate.java
@@ -0,0 +1,173 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ * <p>
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.kafka;
+
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import lombok.Builder;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.admin.CreateTopicsResult;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.internals.RecordHeader;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Created by ashvayka on 25.09.18.
+ */
+@Slf4j
+public class TbKafkaResponseTemplate<Request, Response> {
+
+    private final TBKafkaConsumerTemplate<Request> requestTemplate;
+    private final TBKafkaProducerTemplate<Response> responseTemplate;
+    private final TbKafkaHandler<Request, Response> handler;
+    private final ConcurrentMap<UUID, String> pendingRequests;
+    private final ExecutorService executor;
+    private final long maxPendingRequests;
+
+    private final long pollInterval;
+    private volatile boolean stopped = false;
+
+    @Builder
+    public TbKafkaResponseTemplate(TBKafkaConsumerTemplate<Request> requestTemplate,
+                                   TBKafkaProducerTemplate<Response> responseTemplate,
+                                   TbKafkaHandler<Request, Response> handler,
+                                   long pollInterval,
+                                   long maxPendingRequests,
+                                   ExecutorService executor) {
+        this.requestTemplate = requestTemplate;
+        this.responseTemplate = responseTemplate;
+        this.handler = handler;
+        this.pendingRequests = new ConcurrentHashMap<>();
+        this.maxPendingRequests = maxPendingRequests;
+        this.pollInterval = pollInterval;
+        this.executor = executor;
+    }
+
+    public void init() {
+        this.responseTemplate.init();
+        requestTemplate.subscribe();
+        executor.submit(() -> {
+            long nextCleanupMs = 0L;
+            while (!stopped) {
+                ConsumerRecords<String, byte[]> requests = requestTemplate.poll(Duration.ofMillis(pollInterval));
+                requests.forEach(request -> {
+                    Header requestIdHeader = request.headers().lastHeader(TbKafkaSettings.REQUEST_ID_HEADER);
+                    if (requestIdHeader == null) {
+                        log.error("[{}] Missing requestId in header", request);
+                        return;
+                    }
+                    UUID requestId = bytesToUuid(requestIdHeader.value());
+                    if (requestId == null) {
+                        log.error("[{}] Missing requestId in header and body", request);
+                        return;
+                    }
+                    Header responseTopicHeader = request.headers().lastHeader(TbKafkaSettings.RESPONSE_TOPIC_HEADER);
+                    if (responseTopicHeader == null) {
+                        log.error("[{}] Missing response topic in header", request);
+                        return;
+                    }
+                    String responseTopic = bytesToUuid(responseTopicHeader.value());
+                    if (requestId == null) {
+                        log.error("[{}] Missing requestId in header and body", request);
+                        return;
+                    }
+
+                    Request decodedRequest = null;
+                    String responseTopic = null;
+
+                    try {
+                        if (decodedRequest == null) {
+                            decodedRequest = requestTemplate.decode(request);
+                        }
+                        executor.submit(() -> {
+                            handler.handle(decodedRequest, );
+                        });
+                    } catch (IOException e) {
+                        expectedRequest.future.setException(e);
+                    }
+
+                });
+            }
+        });
+    }
+
+    public void stop() {
+        stopped = true;
+    }
+
+    public ListenableFuture<Response> post(String key, Request request) {
+        if (tickSize > maxPendingRequests) {
+            return Futures.immediateFailedFuture(new RuntimeException("Pending request map is full!"));
+        }
+        UUID requestId = UUID.randomUUID();
+        List<Header> headers = new ArrayList<>(2);
+        headers.add(new RecordHeader(TbKafkaSettings.REQUEST_ID_HEADER, uuidToBytes(requestId)));
+        headers.add(new RecordHeader(TbKafkaSettings.RESPONSE_TOPIC_HEADER, stringToBytes(responseTemplate.getTopic())));
+        SettableFuture<Response> future = SettableFuture.create();
+        pendingRequests.putIfAbsent(requestId, new ResponseMetaData<>(tickTs + maxRequestTimeout, future));
+        request = requestTemplate.enrich(request, responseTemplate.getTopic(), requestId);
+        requestTemplate.send(key, request, headers);
+        return future;
+    }
+
+    private byte[] uuidToBytes(UUID uuid) {
+        ByteBuffer buf = ByteBuffer.allocate(16);
+        buf.putLong(uuid.getMostSignificantBits());
+        buf.putLong(uuid.getLeastSignificantBits());
+        return buf.array();
+    }
+
+    private static UUID bytesToUuid(byte[] bytes) {
+        ByteBuffer bb = ByteBuffer.wrap(bytes);
+        long firstLong = bb.getLong();
+        long secondLong = bb.getLong();
+        return new UUID(firstLong, secondLong);
+    }
+
+    private byte[] stringToBytes(String string) {
+        return string.getBytes(StandardCharsets.UTF_8);
+    }
+
+    private String bytesToString(byte[] data) {
+        return new String(data, StandardCharsets.UTF_8);
+    }
+
+    private static class ResponseMetaData<T> {
+        private final long expTime;
+        private final SettableFuture<T> future;
+
+        ResponseMetaData(long ts, SettableFuture<T> future) {
+            this.expTime = ts;
+            this.future = future;
+        }
+    }
+
+}
diff --git a/common/transport/pom.xml b/common/transport/pom.xml
index b8a7ab5..46efbf4 100644
--- a/common/transport/pom.xml
+++ b/common/transport/pom.xml
@@ -79,6 +79,11 @@
             <artifactId>spring-context</artifactId>
         </dependency>
         <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-web</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
             <groupId>com.google.guava</groupId>
             <artifactId>guava</artifactId>
         </dependency>
diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/tenant/TenantIntervalRegistryCleaner.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/tenant/TenantIntervalRegistryCleaner.java
index c481170..47b4278 100644
--- a/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/tenant/TenantIntervalRegistryCleaner.java
+++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/tenant/TenantIntervalRegistryCleaner.java
@@ -16,10 +16,12 @@
 package org.thingsboard.server.common.transport.quota.tenant;
 
 import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 import org.springframework.stereotype.Component;
 import org.thingsboard.server.common.transport.quota.inmemory.IntervalRegistryCleaner;
 
 @Component
+@ConditionalOnProperty(prefix = "quota.rule.tenant", value = "enabled", havingValue = "true", matchIfMissing = false)
 public class TenantIntervalRegistryCleaner extends IntervalRegistryCleaner {
 
     public TenantIntervalRegistryCleaner(TenantMsgsIntervalRegistry intervalRegistry,
diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/tenant/TenantIntervalRegistryLogger.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/tenant/TenantIntervalRegistryLogger.java
index c56f457..3bdcf93 100644
--- a/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/tenant/TenantIntervalRegistryLogger.java
+++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/tenant/TenantIntervalRegistryLogger.java
@@ -17,6 +17,7 @@ package org.thingsboard.server.common.transport.quota.tenant;
 
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 import org.springframework.stereotype.Component;
 import org.thingsboard.server.common.transport.quota.inmemory.IntervalRegistryLogger;
 
@@ -25,6 +26,7 @@ import java.util.concurrent.TimeUnit;
 
 @Slf4j
 @Component
+@ConditionalOnProperty(prefix = "quota.rule.tenant", value = "enabled", havingValue = "true", matchIfMissing = false)
 public class TenantIntervalRegistryLogger extends IntervalRegistryLogger {
 
     private final long logIntervalMin;
diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/tenant/TenantMsgsIntervalRegistry.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/tenant/TenantMsgsIntervalRegistry.java
index 6e8402c..3402e62 100644
--- a/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/tenant/TenantMsgsIntervalRegistry.java
+++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/tenant/TenantMsgsIntervalRegistry.java
@@ -16,10 +16,12 @@
 package org.thingsboard.server.common.transport.quota.tenant;
 
 import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 import org.springframework.stereotype.Component;
 import org.thingsboard.server.common.transport.quota.inmemory.KeyBasedIntervalRegistry;
 
 @Component
+@ConditionalOnProperty(prefix = "quota.rule.tenant", value = "enabled", havingValue = "true", matchIfMissing = false)
 public class TenantMsgsIntervalRegistry extends KeyBasedIntervalRegistry {
 
     public TenantMsgsIntervalRegistry(@Value("${quota.rule.tenant.intervalMs}") long intervalDurationMs,
diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/tenant/TenantQuotaService.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/tenant/TenantQuotaService.java
index a68860a..875a666 100644
--- a/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/tenant/TenantQuotaService.java
+++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/tenant/TenantQuotaService.java
@@ -16,10 +16,12 @@
 package org.thingsboard.server.common.transport.quota.tenant;
 
 import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 import org.springframework.stereotype.Component;
 import org.thingsboard.server.common.transport.quota.AbstractQuotaService;
 
 @Component
+@ConditionalOnProperty(prefix = "quota.rule.tenant", value = "enabled", havingValue = "true", matchIfMissing = false)
 public class TenantQuotaService extends AbstractQuotaService {
 
     public TenantQuotaService(TenantMsgsIntervalRegistry requestRegistry, TenantRequestLimitPolicy requestsPolicy,
diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/tenant/TenantRequestLimitPolicy.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/tenant/TenantRequestLimitPolicy.java
index cc32c81..c5dfc40 100644
--- a/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/tenant/TenantRequestLimitPolicy.java
+++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/quota/tenant/TenantRequestLimitPolicy.java
@@ -16,10 +16,12 @@
 package org.thingsboard.server.common.transport.quota.tenant;
 
 import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 import org.springframework.stereotype.Component;
 import org.thingsboard.server.common.transport.quota.RequestLimitPolicy;
 
 @Component
+@ConditionalOnProperty(prefix = "quota.rule.tenant", value = "enabled", havingValue = "true", matchIfMissing = false)
 public class TenantRequestLimitPolicy extends RequestLimitPolicy {
 
     public TenantRequestLimitPolicy(@Value("${quota.rule.tenant.limit}") long limit) {
diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/session/DeviceAwareSessionContext.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/session/DeviceAwareSessionContext.java
index 2826945..7b2c05e 100644
--- a/common/transport/src/main/java/org/thingsboard/server/common/transport/session/DeviceAwareSessionContext.java
+++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/session/DeviceAwareSessionContext.java
@@ -1,12 +1,12 @@
 /**
  * Copyright © 2016-2018 The Thingsboard Authors
- * <p>
+ *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/TransportService.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/TransportService.java
index ff0debc..e72a626 100644
--- a/common/transport/src/main/java/org/thingsboard/server/common/transport/TransportService.java
+++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/TransportService.java
@@ -1,3 +1,18 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.thingsboard.server.common.transport;
 
 import org.thingsboard.server.gen.transport.TransportProtos;
diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/TransportServiceCallback.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/TransportServiceCallback.java
index bfc7ac8..5443f7e 100644
--- a/common/transport/src/main/java/org/thingsboard/server/common/transport/TransportServiceCallback.java
+++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/TransportServiceCallback.java
@@ -1,3 +1,18 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.thingsboard.server.common.transport;
 
 /**
@@ -6,6 +21,6 @@ package org.thingsboard.server.common.transport;
 public interface TransportServiceCallback<T> {
 
     void onSuccess(T msg);
-    void onError(Exception e);
+    void onError(Throwable e);
 
 }
diff --git a/common/transport/src/main/proto/transport.proto b/common/transport/src/main/proto/transport.proto
index 65d1468..f0cc3c7 100644
--- a/common/transport/src/main/proto/transport.proto
+++ b/common/transport/src/main/proto/transport.proto
@@ -95,3 +95,17 @@ message ValidateDeviceTokenResponseMsg {
   DeviceInfoProto deviceInfo = 1;
 }
 
+/**
+ * Main messages;
+ */
+message TransportToRuleEngineMsg {
+
+}
+
+message TransportApiRequestMsg {
+   ValidateDeviceTokenRequestMsg validateTokenRequestMsg = 1;
+}
+
+message TransportApiResponseMsg {
+   ValidateDeviceTokenResponseMsg validateTokenResponseMsg = 1;
+}
\ No newline at end of file
diff --git a/common/transport/src/test/java/org/thingsboard/server/common/transport/quota/HostRequestsQuotaServiceTest.java b/common/transport/src/test/java/org/thingsboard/server/common/transport/quota/HostRequestsQuotaServiceTest.java
index 20f8a55..37441e9 100644
--- a/common/transport/src/test/java/org/thingsboard/server/common/transport/quota/HostRequestsQuotaServiceTest.java
+++ b/common/transport/src/test/java/org/thingsboard/server/common/transport/quota/HostRequestsQuotaServiceTest.java
@@ -17,7 +17,11 @@ package org.thingsboard.server.common.transport.quota;
 
 import org.junit.Before;
 import org.junit.Test;
-import org.thingsboard.server.common.transport.quota.host.*;
+import org.thingsboard.server.common.transport.quota.host.HostIntervalRegistryCleaner;
+import org.thingsboard.server.common.transport.quota.host.HostIntervalRegistryLogger;
+import org.thingsboard.server.common.transport.quota.host.HostRequestIntervalRegistry;
+import org.thingsboard.server.common.transport.quota.host.HostRequestLimitPolicy;
+import org.thingsboard.server.common.transport.quota.host.HostRequestsQuotaService;
 
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;

pom.xml 2(+1 -1)

diff --git a/pom.xml b/pom.xml
index d6db123..62fda36 100755
--- a/pom.xml
+++ b/pom.xml
@@ -366,7 +366,7 @@
             </dependency>
             <dependency>
                 <groupId>org.thingsboard.transport</groupId>
-                <artifactId>mqtt</artifactId>
+                <artifactId>mqtt-common</artifactId>
                 <version>${project.version}</version>
             </dependency>
             <dependency>
diff --git a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java
new file mode 100644
index 0000000..fafbc4e
--- /dev/null
+++ b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java
@@ -0,0 +1,284 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.transport.mqtt.session;
+
+import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonNull;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonSyntaxException;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.mqtt.MqttMessage;
+import io.netty.handler.codec.mqtt.MqttPublishMessage;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.util.StringUtils;
+import org.thingsboard.server.common.data.Device;
+import org.thingsboard.server.common.data.id.SessionId;
+import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
+import org.thingsboard.server.common.data.relation.EntityRelation;
+import org.thingsboard.server.common.msg.core.*;
+import org.thingsboard.server.common.msg.session.BasicAdaptorToSessionActorMsg;
+import org.thingsboard.server.common.msg.session.BasicTransportToDeviceSessionActorMsg;
+import org.thingsboard.server.common.msg.session.ctrl.SessionCloseMsg;
+import org.thingsboard.server.common.transport.SessionMsgProcessor;
+import org.thingsboard.server.common.transport.adaptor.AdaptorException;
+import org.thingsboard.server.common.transport.adaptor.JsonConverter;
+import org.thingsboard.server.common.transport.auth.DeviceAuthService;
+import org.thingsboard.server.dao.device.DeviceService;
+import org.thingsboard.server.dao.relation.RelationService;
+import org.thingsboard.server.transport.mqtt.MqttTransportHandler;
+import org.thingsboard.server.transport.mqtt.adaptors.JsonMqttAdaptor;
+
+import java.util.*;
+import java.util.concurrent.ConcurrentMap;
+import java.util.stream.Collectors;
+
+import static org.thingsboard.server.transport.mqtt.adaptors.JsonMqttAdaptor.validateJsonPayload;
+
+/**
+ * Created by ashvayka on 19.01.17.
+ */
+@Slf4j
+public class GatewaySessionCtx {
+
+    private static final String DEFAULT_DEVICE_TYPE = "default";
+    public static final String CAN_T_PARSE_VALUE = "Can't parse value: ";
+    public static final String DEVICE_PROPERTY = "device";
+//    private final Device gateway;
+//    private final SessionId gatewaySessionId;
+//    private final SessionMsgProcessor processor;
+//    private final DeviceService deviceService;
+//    private final DeviceAuthService authService;
+//    private final RelationService relationService;
+//    private final Map<String, GatewayDeviceSessionCtx> devices;
+//    private final ConcurrentMap<String, Integer> mqttQoSMap;
+    private ChannelHandlerContext channel;
+
+//    public GatewaySessionCtx(SessionMsgProcessor processor, DeviceService deviceService, DeviceAuthService authService, RelationService relationService, DeviceSessionCtx gatewaySessionCtx) {
+//        this.processor = processor;
+//        this.deviceService = deviceService;
+//        this.authService = authService;
+//        this.relationService = relationService;
+//        this.gateway = gatewaySessionCtx.getDevice();
+//        this.gatewaySessionId = gatewaySessionCtx.getSessionId();
+//        this.devices = new HashMap<>();
+//        this.mqttQoSMap = gatewaySessionCtx.getMqttQoSMap();
+//    }
+
+    public GatewaySessionCtx(DeviceSessionCtx deviceSessionCtx) {
+
+    }
+
+    public void onDeviceConnect(MqttPublishMessage msg) throws AdaptorException {
+        JsonElement json = getJson(msg);
+        String deviceName = checkDeviceName(getDeviceName(json));
+        String deviceType = getDeviceType(json);
+        onDeviceConnect(deviceName, deviceType);
+        ack(msg);
+    }
+
+    private void onDeviceConnect(String deviceName, String deviceType) {
+//        if (!devices.containsKey(deviceName)) {
+//            Device device = deviceService.findDeviceByTenantIdAndName(gateway.getTenantId(), deviceName);
+//            if (device == null) {
+//                device = new Device();
+//                device.setTenantId(gateway.getTenantId());
+//                device.setName(deviceName);
+//                device.setType(deviceType);
+//                device.setCustomerId(gateway.getCustomerId());
+//                device = deviceService.saveDevice(device);
+//                relationService.saveRelationAsync(new EntityRelation(gateway.getId(), device.getId(), "Created"));
+//                processor.onDeviceAdded(device);
+//            }
+//            GatewayDeviceSessionCtx ctx = new GatewayDeviceSessionCtx(this, device, mqttQoSMap);
+//            devices.put(deviceName, ctx);
+//            log.debug("[{}] Added device [{}] to the gateway session", gatewaySessionId, deviceName);
+//            processor.process(new BasicTransportToDeviceSessionActorMsg(device, new BasicAdaptorToSessionActorMsg(ctx, new AttributesSubscribeMsg())));
+//            processor.process(new BasicTransportToDeviceSessionActorMsg(device, new BasicAdaptorToSessionActorMsg(ctx, new RpcSubscribeMsg())));
+//        }
+    }
+
+    public void onDeviceDisconnect(MqttPublishMessage msg) throws AdaptorException {
+//        String deviceName = checkDeviceName(getDeviceName(getJson(msg)));
+//        GatewayDeviceSessionCtx deviceSessionCtx = devices.remove(deviceName);
+//        if (deviceSessionCtx != null) {
+//            processor.process(SessionCloseMsg.onDisconnect(deviceSessionCtx.getSessionId()));
+//            deviceSessionCtx.setClosed(true);
+//            log.debug("[{}] Removed device [{}] from the gateway session", gatewaySessionId, deviceName);
+//        } else {
+//            log.debug("[{}] Device [{}] was already removed from the gateway session", gatewaySessionId, deviceName);
+//        }
+//        ack(msg);
+    }
+
+    public void onGatewayDisconnect() {
+//        devices.forEach((k, v) -> {
+//            processor.process(SessionCloseMsg.onDisconnect(v.getSessionId()));
+//        });
+    }
+
+    public void onDeviceTelemetry(MqttPublishMessage mqttMsg) throws AdaptorException {
+//        JsonElement json = validateJsonPayload(gatewaySessionId, mqttMsg.payload());
+//        int requestId = mqttMsg.variableHeader().messageId();
+//        if (json.isJsonObject()) {
+//            JsonObject jsonObj = json.getAsJsonObject();
+//            for (Map.Entry<String, JsonElement> deviceEntry : jsonObj.entrySet()) {
+//                String deviceName = checkDeviceConnected(deviceEntry.getKey());
+//                if (!deviceEntry.getValue().isJsonArray()) {
+//                    throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json);
+//                }
+//                BasicTelemetryUploadRequest request = new BasicTelemetryUploadRequest(requestId);
+//                JsonArray deviceData = deviceEntry.getValue().getAsJsonArray();
+//                for (JsonElement element : deviceData) {
+//                    JsonConverter.parseWithTs(request, element.getAsJsonObject());
+//                }
+//                GatewayDeviceSessionCtx deviceSessionCtx = devices.get(deviceName);
+//                processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(),
+//                        new BasicAdaptorToSessionActorMsg(deviceSessionCtx, request)));
+//            }
+//        } else {
+//            throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json);
+//        }
+    }
+
+    public void onDeviceRpcResponse(MqttPublishMessage mqttMsg) throws AdaptorException {
+//        JsonElement json = validateJsonPayload(gatewaySessionId, mqttMsg.payload());
+//        if (json.isJsonObject()) {
+//            JsonObject jsonObj = json.getAsJsonObject();
+//            String deviceName = checkDeviceConnected(jsonObj.get(DEVICE_PROPERTY).getAsString());
+//            Integer requestId = jsonObj.get("id").getAsInt();
+//            String data = jsonObj.get("data").toString();
+//            GatewayDeviceSessionCtx deviceSessionCtx = devices.get(deviceName);
+//            processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(),
+//                    new BasicAdaptorToSessionActorMsg(deviceSessionCtx, new ToDeviceRpcResponseMsg(requestId, data))));
+//            ack(mqttMsg);
+//        } else {
+//            throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json);
+//        }
+    }
+
+    public void onDeviceAttributes(MqttPublishMessage mqttMsg) throws AdaptorException {
+//        JsonElement json = validateJsonPayload(gatewaySessionId, mqttMsg.payload());
+//        int requestId = mqttMsg.variableHeader().messageId();
+//        if (json.isJsonObject()) {
+//            JsonObject jsonObj = json.getAsJsonObject();
+//            for (Map.Entry<String, JsonElement> deviceEntry : jsonObj.entrySet()) {
+//                String deviceName = checkDeviceConnected(deviceEntry.getKey());
+//                if (!deviceEntry.getValue().isJsonObject()) {
+//                    throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json);
+//                }
+//                long ts = System.currentTimeMillis();
+//                BasicAttributesUpdateRequest request = new BasicAttributesUpdateRequest(requestId);
+//                JsonObject deviceData = deviceEntry.getValue().getAsJsonObject();
+//                request.add(JsonConverter.parseValues(deviceData).stream().map(kv -> new BaseAttributeKvEntry(kv, ts)).collect(Collectors.toList()));
+//                GatewayDeviceSessionCtx deviceSessionCtx = devices.get(deviceName);
+//                processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(),
+//                        new BasicAdaptorToSessionActorMsg(deviceSessionCtx, request)));
+//            }
+//        } else {
+//            throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json);
+//        }
+    }
+
+    public void onDeviceAttributesRequest(MqttPublishMessage msg) throws AdaptorException {
+//        JsonElement json = validateJsonPayload(gatewaySessionId, msg.payload());
+//        if (json.isJsonObject()) {
+//            JsonObject jsonObj = json.getAsJsonObject();
+//            int requestId = jsonObj.get("id").getAsInt();
+//            String deviceName = jsonObj.get(DEVICE_PROPERTY).getAsString();
+//            boolean clientScope = jsonObj.get("client").getAsBoolean();
+//            Set<String> keys;
+//            if (jsonObj.has("key")) {
+//                keys = Collections.singleton(jsonObj.get("key").getAsString());
+//            } else {
+//                JsonArray keysArray = jsonObj.get("keys").getAsJsonArray();
+//                keys = new HashSet<>();
+//                for (JsonElement keyObj : keysArray) {
+//                    keys.add(keyObj.getAsString());
+//                }
+//            }
+//
+//            BasicGetAttributesRequest request;
+//            if (clientScope) {
+//                request = new BasicGetAttributesRequest(requestId, keys, null);
+//            } else {
+//                request = new BasicGetAttributesRequest(requestId, null, keys);
+//            }
+//            GatewayDeviceSessionCtx deviceSessionCtx = devices.get(deviceName);
+//            processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(),
+//                    new BasicAdaptorToSessionActorMsg(deviceSessionCtx, request)));
+//            ack(msg);
+//        } else {
+//            throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json);
+//        }
+    }
+
+    private String checkDeviceConnected(String deviceName) {
+//        if (!devices.containsKey(deviceName)) {
+//            log.debug("[{}] Missing device [{}] for the gateway session", gatewaySessionId, deviceName);
+//            onDeviceConnect(deviceName, DEFAULT_DEVICE_TYPE);
+//        }
+//        return deviceName;
+        return null;
+    }
+
+    private String checkDeviceName(String deviceName) {
+        if (StringUtils.isEmpty(deviceName)) {
+            throw new RuntimeException("Device name is empty!");
+        } else {
+            return deviceName;
+        }
+    }
+
+    private String getDeviceName(JsonElement json) throws AdaptorException {
+        return json.getAsJsonObject().get(DEVICE_PROPERTY).getAsString();
+    }
+
+    private String getDeviceType(JsonElement json) throws AdaptorException {
+        JsonElement type = json.getAsJsonObject().get("type");
+        return type == null || type instanceof JsonNull ? DEFAULT_DEVICE_TYPE : type.getAsString();
+    }
+
+    private JsonElement getJson(MqttPublishMessage mqttMsg) throws AdaptorException {
+//        return JsonMqttAdaptor.validateJsonPayload(gatewaySessionId, mqttMsg.payload());
+        return null;
+    }
+
+    protected SessionMsgProcessor getProcessor() {
+//        return processor;
+        return null;
+    }
+
+    DeviceAuthService getAuthService() {
+//        return authService;
+        return null;
+    }
+
+    public void setChannel(ChannelHandlerContext channel) {
+        this.channel = channel;
+    }
+
+    private void ack(MqttPublishMessage msg) {
+        if (msg.variableHeader().messageId() > 0) {
+            writeAndFlush(MqttTransportHandler.createMqttPubAckMsg(msg.variableHeader().messageId()));
+        }
+    }
+
+    void writeAndFlush(MqttMessage mqttMessage) {
+        channel.writeAndFlush(mqttMessage);
+    }
+
+}
diff --git a/transport/mqtt-transport/pom.xml b/transport/mqtt-transport/pom.xml
index 8d8b24d..012aabd 100644
--- a/transport/mqtt-transport/pom.xml
+++ b/transport/mqtt-transport/pom.xml
@@ -41,32 +41,12 @@
             <artifactId>transport</artifactId>
         </dependency>
         <dependency>
-            <groupId>io.netty</groupId>
-            <artifactId>netty-all</artifactId>
+            <groupId>org.thingsboard.transport</groupId>
+            <artifactId>mqtt-common</artifactId>
         </dependency>
         <dependency>
-            <groupId>org.springframework</groupId>
-            <artifactId>spring-context</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.slf4j</groupId>
-            <artifactId>slf4j-api</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.slf4j</groupId>
-            <artifactId>log4j-over-slf4j</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>ch.qos.logback</groupId>
-            <artifactId>logback-core</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>ch.qos.logback</groupId>
-            <artifactId>logback-classic</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>com.google.guava</groupId>
-            <artifactId>guava</artifactId>
+            <groupId>org.thingsboard.common</groupId>
+            <artifactId>queue</artifactId>
         </dependency>
         <dependency>
             <groupId>org.springframework.boot</groupId>
diff --git a/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/AsyncCallbackTemplate.java b/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/AsyncCallbackTemplate.java
new file mode 100644
index 0000000..493c0c8
--- /dev/null
+++ b/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/AsyncCallbackTemplate.java
@@ -0,0 +1,45 @@
+package org.thingsboard.server.mqtt.service;
+
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+
+import javax.annotation.Nullable;
+import java.util.concurrent.Executor;
+import java.util.function.Consumer;
+
+/**
+ * Created by ashvayka on 05.10.18.
+ */
+public class AsyncCallbackTemplate {
+
+    public static <T> void withCallback(ListenableFuture<T> future, Consumer<T> onSuccess,
+                                        Consumer<Throwable> onFailure) {
+        withCallback(future, onSuccess, onFailure, null);
+    }
+
+    public static <T> void withCallback(ListenableFuture<T> future, Consumer<T> onSuccess,
+                                        Consumer<Throwable> onFailure, Executor executor) {
+        FutureCallback<T> callback = new FutureCallback<T>() {
+            @Override
+            public void onSuccess(@Nullable T result) {
+                try {
+                    onSuccess.accept(result);
+                } catch (Throwable th) {
+                    onFailure(th);
+                }
+            }
+
+            @Override
+            public void onFailure(Throwable t) {
+                onFailure.accept(t);
+            }
+        };
+        if (executor != null) {
+            Futures.addCallback(future, callback, executor);
+        } else {
+            Futures.addCallback(future, callback);
+        }
+    }
+
+}
diff --git a/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/MqttTransportService.java b/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/MqttTransportService.java
new file mode 100644
index 0000000..a1641f1
--- /dev/null
+++ b/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/MqttTransportService.java
@@ -0,0 +1,110 @@
+package org.thingsboard.server.mqtt.service;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Service;
+import org.thingsboard.server.common.transport.TransportService;
+import org.thingsboard.server.common.transport.TransportServiceCallback;
+import org.thingsboard.server.kafka.TBKafkaConsumerTemplate;
+import org.thingsboard.server.kafka.TBKafkaProducerTemplate;
+import org.thingsboard.server.kafka.TbKafkaRequestTemplate;
+import org.thingsboard.server.gen.transport.TransportProtos.*;
+import org.thingsboard.server.kafka.TbKafkaSettings;
+import org.thingsboard.server.transport.mqtt.MqttTransportContext;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * Created by ashvayka on 05.10.18.
+ */
+@Service
+public class MqttTransportService implements TransportService {
+
+    @Value("${kafka.rule-engine.topic}")
+    private String ruleEngineTopic;
+    @Value("${kafka.transport-api.requests-topic}")
+    private String transportApiRequestsTopic;
+    @Value("${kafka.transport-api.responses-topic}")
+    private String transportApiResponsesTopic;
+    @Value("${kafka.transport-api.max_pending_requests}")
+    private long maxPendingRequests;
+    @Value("${kafka.transport-api.max_requests_timeout}")
+    private long maxRequestsTimeout;
+    @Value("${kafka.transport-api.response_poll_interval}")
+    private int responsePollDuration;
+    @Value("${kafka.transport-api.response_auto_commit_interval}")
+    private int autoCommitInterval;
+
+    @Autowired
+    private TbKafkaSettings kafkaSettings;
+    //We use this to get the node id. We should replace this with a component that provides the node id.
+    @Autowired
+    private MqttTransportContext transportContext;
+
+    private ExecutorService transportCallbackExecutor;
+
+    private TbKafkaRequestTemplate<TransportApiRequestMsg, TransportApiResponseMsg> transportApiTemplate;
+
+    @PostConstruct
+    public void init() {
+        this.transportCallbackExecutor = Executors.newCachedThreadPool();
+
+        TBKafkaProducerTemplate.TBKafkaProducerTemplateBuilder<TransportApiRequestMsg> requestBuilder = TBKafkaProducerTemplate.builder();
+        requestBuilder.settings(kafkaSettings);
+        requestBuilder.defaultTopic(transportApiRequestsTopic);
+        requestBuilder.encoder(new TransportApiRequestEncoder());
+
+        TBKafkaConsumerTemplate.TBKafkaConsumerTemplateBuilder<TransportApiResponseMsg> responseBuilder = TBKafkaConsumerTemplate.builder();
+        responseBuilder.settings(kafkaSettings);
+        responseBuilder.topic(transportApiResponsesTopic + "." + transportContext.getNodeId());
+        responseBuilder.clientId(transportContext.getNodeId());
+        responseBuilder.groupId("transport-node");
+        responseBuilder.autoCommit(true);
+        responseBuilder.autoCommitIntervalMs(autoCommitInterval);
+        responseBuilder.decoder(new TransportApiResponseDecoder());
+
+        TbKafkaRequestTemplate.TbKafkaRequestTemplateBuilder
+                <TransportApiRequestMsg, TransportApiResponseMsg> builder = TbKafkaRequestTemplate.builder();
+        builder.requestTemplate(requestBuilder.build());
+        builder.responseTemplate(responseBuilder.build());
+        builder.maxPendingRequests(maxPendingRequests);
+        builder.maxRequestTimeout(maxRequestsTimeout);
+        builder.pollInterval(responsePollDuration);
+        transportApiTemplate = builder.build();
+        transportApiTemplate.init();
+    }
+
+    @PreDestroy
+    public void destroy() {
+        if (transportApiTemplate != null) {
+            transportApiTemplate.stop();
+        }
+        if (transportCallbackExecutor != null) {
+            transportCallbackExecutor.shutdownNow();
+        }
+    }
+
+    @Override
+    public void process(ValidateDeviceTokenRequestMsg msg, TransportServiceCallback<ValidateDeviceTokenResponseMsg> callback) {
+        AsyncCallbackTemplate.withCallback(transportApiTemplate.post(msg.getToken(), TransportApiRequestMsg.newBuilder().setValidateTokenRequestMsg(msg).build()),
+                response -> callback.onSuccess(response.getValidateTokenResponseMsg()), callback::onError, transportCallbackExecutor);
+    }
+
+    @Override
+    public void process(SessionEventMsg msg, TransportServiceCallback<Void> callback) {
+
+    }
+
+    @Override
+    public void process(PostTelemetryMsg msg, TransportServiceCallback<Void> callback) {
+
+    }
+
+    @Override
+    public void process(PostAttributeMsg msg, TransportServiceCallback<Void> callback) {
+
+    }
+}
diff --git a/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/TransportApiRequestEncoder.java b/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/TransportApiRequestEncoder.java
new file mode 100644
index 0000000..f931db6
--- /dev/null
+++ b/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/TransportApiRequestEncoder.java
@@ -0,0 +1,14 @@
+package org.thingsboard.server.mqtt.service;
+
+import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg;
+import org.thingsboard.server.kafka.TbKafkaEncoder;
+
+/**
+ * Created by ashvayka on 05.10.18.
+ */
+public class TransportApiRequestEncoder implements TbKafkaEncoder<TransportApiRequestMsg> {
+    @Override
+    public byte[] encode(TransportApiRequestMsg value) {
+        return value.toByteArray();
+    }
+}
diff --git a/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/TransportApiResponseDecoder.java b/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/TransportApiResponseDecoder.java
new file mode 100644
index 0000000..22e1647
--- /dev/null
+++ b/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/TransportApiResponseDecoder.java
@@ -0,0 +1,16 @@
+package org.thingsboard.server.mqtt.service;
+
+import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg;
+import org.thingsboard.server.kafka.TbKafkaDecoder;
+
+import java.io.IOException;
+
+/**
+ * Created by ashvayka on 05.10.18.
+ */
+public class TransportApiResponseDecoder implements TbKafkaDecoder<TransportApiResponseMsg> {
+    @Override
+    public TransportApiResponseMsg decode(byte[] data) throws IOException {
+        return TransportApiResponseMsg.parseFrom(data);
+    }
+}
diff --git a/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/ThingsboardMqttTransportApplication.java b/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/ThingsboardMqttTransportApplication.java
index 3c54938..4740342 100644
--- a/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/ThingsboardMqttTransportApplication.java
+++ b/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/ThingsboardMqttTransportApplication.java
@@ -19,14 +19,13 @@ import org.springframework.boot.SpringBootConfiguration;
 import org.springframework.context.annotation.ComponentScan;
 import org.springframework.scheduling.annotation.EnableAsync;
 import org.springframework.scheduling.annotation.EnableScheduling;
-import springfox.documentation.swagger2.annotations.EnableSwagger2;
 
 import java.util.Arrays;
 
 @SpringBootConfiguration
 @EnableAsync
 @EnableScheduling
-@ComponentScan({"org.thingsboard.server"})
+@ComponentScan({"org.thingsboard.server.mqtt", "org.thingsboard.server.common", "org.thingsboard.server.transport.mqtt", "org.thingsboard.server.kafka"})
 public class ThingsboardMqttTransportApplication {
 
     private static final String SPRING_CONFIG_NAME_KEY = "--spring.config.name";
diff --git a/transport/mqtt-transport/src/main/resources/tb-mqtt-transport.yml b/transport/mqtt-transport/src/main/resources/tb-mqtt-transport.yml
index c12f25e..a2a0d54 100644
--- a/transport/mqtt-transport/src/main/resources/tb-mqtt-transport.yml
+++ b/transport/mqtt-transport/src/main/resources/tb-mqtt-transport.yml
@@ -44,6 +44,27 @@ mqtt:
     # Type of the key store
     key_store_type: "${MQTT_SSL_KEY_STORE_TYPE:JKS}"
 
+#Quota parameters
+quota:
+  host:
+    # Max allowed number of API requests in interval for single host
+    limit: "${QUOTA_HOST_LIMIT:10000}"
+    # Interval duration
+    intervalMs: "${QUOTA_HOST_INTERVAL_MS:60000}"
+    # Maximum silence duration for host after which Host removed from QuotaService. Must be bigger than intervalMs
+    ttlMs: "${QUOTA_HOST_TTL_MS:60000}"
+    # Interval for scheduled task that cleans expired records. TTL is used for expiring
+    cleanPeriodMs: "${QUOTA_HOST_CLEAN_PERIOD_MS:300000}"
+    # Enable Host API Limits
+    enabled: "${QUOTA_HOST_ENABLED:false}"
+    # Array of whitelist hosts
+    whitelist: "${QUOTA_HOST_WHITELIST:localhost,127.0.0.1}"
+    # Array of blacklist hosts
+    blacklist: "${QUOTA_HOST_BLACKLIST:}"
+    log:
+      topSize: 10
+      intervalMin: 2
+
 kafka:
   enabled: true
   bootstrap.servers: "${TB_KAFKA_SERVERS:localhost:9092}"
@@ -52,6 +73,13 @@ kafka:
   batch.size: "${TB_KAFKA_BATCH_SIZE:16384}"
   linger.ms: "${TB_KAFKA_LINGER_MS:1}"
   buffer.memory: "${TB_BUFFER_MEMORY:33554432}"
-  topic:
-    telemetry: "${TB_TELEMETRY_TOPIC:tb.transport.telemetry}"
-    requests: "${TB_TELEMETRY_TOPIC:tb.transport.requests}"
\ No newline at end of file
+  transport-api:
+    requests-topic: "${TB_TRANSPORT_API_REQUEST_TOPIC:tb.transport.api.requests}"
+    responses-topic: "${TB_TRANSPORT_API_RESPONSE_TOPIC:tb.transport.api.responses}"
+    max_pending_requests: "${TB_TRANSPORT_MAX_PENDING_REQUESTS:10000}"
+    max_requests_timeout: "${TB_TRANSPORT_MAX_REQUEST_TIMEOUT:10000}"
+    response_poll_interval: "${TB_TRANSPORT_RESPONSE_POLL_INTERVAL_MS:25}"
+    response_auto_commit_interval: "${TB_TRANSPORT_RESPONSE_AUTO_COMMIT_INTERVAL_MS:100}"
+    # Maximum allowed JavaScript execution errors before JavaScript will be blacklisted
+  rule-engine:
+    topic: "${TB_RULE_ENGINE_TOPIC:tb.rule-engine}"
diff --git a/transport/pom.xml b/transport/pom.xml
index a01c7ac..2401d3f 100644
--- a/transport/pom.xml
+++ b/transport/pom.xml
@@ -37,7 +37,7 @@
     <modules>
         <module>http</module>
         <module>coap</module>
-        <module>mqtt</module>
+        <module>mqtt-common</module>
         <module>mqtt-transport</module>
     </modules>