thingsboard-developers

Initial structure

10/4/2018 1:18:26 PM

Details

diff --git a/application/src/main/java/org/thingsboard/server/ThingsboardServerApplication.java b/application/src/main/java/org/thingsboard/server/ThingsboardServerApplication.java
index 751bde6..c072311 100644
--- a/application/src/main/java/org/thingsboard/server/ThingsboardServerApplication.java
+++ b/application/src/main/java/org/thingsboard/server/ThingsboardServerApplication.java
@@ -1,12 +1,12 @@
 /**
  * Copyright © 2016-2018 The Thingsboard Authors
- *
+ * <p>
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/session/DeviceAwareSessionContext.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/session/DeviceAwareSessionContext.java
index 2e7b412..2826945 100644
--- a/common/transport/src/main/java/org/thingsboard/server/common/transport/session/DeviceAwareSessionContext.java
+++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/session/DeviceAwareSessionContext.java
@@ -1,12 +1,12 @@
 /**
  * Copyright © 2016-2018 The Thingsboard Authors
- *
+ * <p>
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -15,6 +15,7 @@
  */
 package org.thingsboard.server.common.transport.session;
 
+import lombok.Data;
 import lombok.extern.slf4j.Slf4j;
 import org.thingsboard.server.common.data.Device;
 import org.thingsboard.server.common.data.security.DeviceCredentialsFilter;
@@ -23,56 +24,27 @@ import org.thingsboard.server.common.msg.session.SessionContext;
 import org.thingsboard.server.common.transport.SessionMsgProcessor;
 import org.thingsboard.server.common.transport.auth.DeviceAuthResult;
 import org.thingsboard.server.common.transport.auth.DeviceAuthService;
+import org.thingsboard.server.gen.transport.TransportProtos;
 
 import java.util.Optional;
 
 /**
  * @author Andrew Shvayka
  */
-@Slf4j
+@Data
 public abstract class DeviceAwareSessionContext implements SessionContext {
 
-    protected final DeviceAuthService authService;
-    protected final SessionMsgProcessor processor;
+    private volatile TransportProtos.DeviceInfoProto deviceInfo;
 
-    protected volatile Device device;
-
-    public DeviceAwareSessionContext(SessionMsgProcessor processor, DeviceAuthService authService) {
-        this.processor = processor;
-        this.authService = authService;
-    }
-
-    public DeviceAwareSessionContext(SessionMsgProcessor processor, DeviceAuthService authService, Device device) {
-        this(processor, authService);
-        this.device = device;
-    }
-
-
-    public boolean login(DeviceCredentialsFilter credentials) {
-        DeviceAuthResult result = authService.process(credentials);
-        if (result.isSuccess()) {
-            Optional<Device> deviceOpt = authService.findDeviceById(result.getDeviceId());
-            if (deviceOpt.isPresent()) {
-                device = deviceOpt.get();
-            }
-            return true;
-        } else {
-            log.debug("Can't find device using credentials [{}] due to {}", credentials, result.getErrorMsg());
-            return false;
-        }
-    }
-
-    public DeviceAuthService getAuthService() {
-        return authService;
+    public long getDeviceIdMSB() {
+        return deviceInfo.getDeviceIdMSB();
     }
 
-    public SessionMsgProcessor getProcessor() {
-        return processor;
+    public long getDeviceIdLSB() {
+        return deviceInfo.getDeviceIdLSB();
     }
 
-    public Device getDevice() {
-        return device;
+    public boolean isConnected() {
+        return deviceInfo != null;
     }
-
-
 }
diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/TransportService.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/TransportService.java
index 9ff19b7..ff0debc 100644
--- a/common/transport/src/main/java/org/thingsboard/server/common/transport/TransportService.java
+++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/TransportService.java
@@ -7,10 +7,13 @@ import org.thingsboard.server.gen.transport.TransportProtos;
  */
 public interface TransportService {
 
-    void process(TransportProtos.SessionEventMsg msg);
+    void process(TransportProtos.ValidateDeviceTokenRequestMsg msg,
+                 TransportServiceCallback<TransportProtos.ValidateDeviceTokenResponseMsg> callback);
 
-    void process(TransportProtos.PostTelemetryMsg msg);
+    void process(TransportProtos.SessionEventMsg msg, TransportServiceCallback<Void> callback);
 
-    void process(TransportProtos.PostAttributeMsg msg);
+    void process(TransportProtos.PostTelemetryMsg msg, TransportServiceCallback<Void> callback);
+
+    void process(TransportProtos.PostAttributeMsg msg, TransportServiceCallback<Void> callback);
 
 }
diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/TransportServiceCallback.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/TransportServiceCallback.java
new file mode 100644
index 0000000..bfc7ac8
--- /dev/null
+++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/TransportServiceCallback.java
@@ -0,0 +1,11 @@
+package org.thingsboard.server.common.transport;
+
+/**
+ * Created by ashvayka on 04.10.18.
+ */
+public interface TransportServiceCallback<T> {
+
+    void onSuccess(T msg);
+    void onError(Exception e);
+
+}
diff --git a/common/transport/src/main/proto/transport.proto b/common/transport/src/main/proto/transport.proto
index d77e79b..65d1468 100644
--- a/common/transport/src/main/proto/transport.proto
+++ b/common/transport/src/main/proto/transport.proto
@@ -46,6 +46,14 @@ message TsKvListProto {
   repeated KeyValueProto kv = 2;
 }
 
+message DeviceInfoProto {
+  int64 deviceIdMSB = 1;
+  int64 deviceIdLSB = 2;
+  string deviceName = 3;
+  string deviceType = 4;
+  string additionalInfo = 5;
+}
+
 /**
  * Messages that use Data Structures;
  */
@@ -53,6 +61,7 @@ message SessionEventMsg {
   SessionInfoProto sessionInfo = 1;
   int64 deviceIdMSB = 2;
   int64 deviceIdLSB = 3;
+  SessionEvent event = 4;
 }
 
 message PostTelemetryMsg {
@@ -76,4 +85,13 @@ message GetAttributeResponseMsg {
   repeated TsKvListProto clientAttributeList = 2;
   repeated TsKvListProto sharedAttributeList = 3;
   repeated string deletedAttributeKeys = 4;
-}
\ No newline at end of file
+}
+
+message ValidateDeviceTokenRequestMsg {
+  string token = 1;
+}
+
+message ValidateDeviceTokenResponseMsg {
+  DeviceInfoProto deviceInfo = 1;
+}
+
diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java
index 43e2a34..cc40ee0 100644
--- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java
+++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java
@@ -1,22 +1,33 @@
 package org.thingsboard.server.transport.mqtt;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
 import io.netty.handler.ssl.SslHandler;
 import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.RandomStringUtils;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.context.annotation.Lazy;
 import org.springframework.stereotype.Component;
+import org.springframework.util.StringUtils;
 import org.thingsboard.server.common.transport.TransportService;
 import org.thingsboard.server.common.transport.quota.host.HostRequestsQuotaService;
 import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor;
 
+import javax.annotation.PostConstruct;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
 /**
  * Created by ashvayka on 04.10.18.
  */
+@Slf4j
 @Component
 @Data
 public class MqttTransportContext {
 
+    private final ObjectMapper mapper = new ObjectMapper();
+
     @Autowired
     @Lazy
     private TransportService transportService;
@@ -33,6 +44,21 @@ public class MqttTransportContext {
     @Value("${mqtt.netty.max_payload_size}")
     private Integer maxPayloadSize;
 
+    @Value("${cluster.node_id:#{null}}")
+    private String nodeId;
+
     private SslHandler sslHandler;
 
+    @PostConstruct
+    public void init() {
+        if (StringUtils.isEmpty(nodeId)) {
+            try {
+                nodeId = InetAddress.getLocalHost().getHostName();
+            } catch (UnknownHostException e) {
+                nodeId = RandomStringUtils.randomAlphabetic(10);
+            }
+        }
+        log.info("Current NodeId: {}", nodeId);
+    }
+
 }
diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
index ed0c8c6..fd02d3a 100644
--- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
+++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
@@ -1,12 +1,12 @@
 /**
  * Copyright © 2016-2018 The Thingsboard Authors
- *
+ * <p>
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -18,9 +18,21 @@ package org.thingsboard.server.transport.mqtt;
 import com.fasterxml.jackson.databind.JsonNode;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
-import io.netty.handler.codec.mqtt.*;
+import io.netty.handler.codec.mqtt.MqttConnAckMessage;
+import io.netty.handler.codec.mqtt.MqttConnAckVariableHeader;
+import io.netty.handler.codec.mqtt.MqttConnectMessage;
 import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
+import io.netty.handler.codec.mqtt.MqttFixedHeader;
+import io.netty.handler.codec.mqtt.MqttMessage;
+import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
+import io.netty.handler.codec.mqtt.MqttPubAckMessage;
+import io.netty.handler.codec.mqtt.MqttPublishMessage;
 import io.netty.handler.codec.mqtt.MqttQoS;
+import io.netty.handler.codec.mqtt.MqttSubAckMessage;
+import io.netty.handler.codec.mqtt.MqttSubAckPayload;
+import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
+import io.netty.handler.codec.mqtt.MqttTopicSubscription;
+import io.netty.handler.codec.mqtt.MqttUnsubscribeMessage;
 import io.netty.handler.ssl.SslHandler;
 import io.netty.util.concurrent.Future;
 import io.netty.util.concurrent.GenericFutureListener;
@@ -34,13 +46,12 @@ import org.thingsboard.server.common.msg.session.AdaptorToSessionActorMsg;
 import org.thingsboard.server.common.msg.session.BasicAdaptorToSessionActorMsg;
 import org.thingsboard.server.common.msg.session.BasicTransportToDeviceSessionActorMsg;
 import org.thingsboard.server.common.msg.session.ctrl.SessionCloseMsg;
-import org.thingsboard.server.common.transport.SessionMsgProcessor;
+import org.thingsboard.server.common.transport.TransportService;
+import org.thingsboard.server.common.transport.TransportServiceCallback;
 import org.thingsboard.server.common.transport.adaptor.AdaptorException;
-import org.thingsboard.server.common.transport.auth.DeviceAuthService;
 import org.thingsboard.server.common.transport.quota.QuotaService;
 import org.thingsboard.server.dao.EncryptionUtil;
-import org.thingsboard.server.dao.device.DeviceService;
-import org.thingsboard.server.dao.relation.RelationService;
+import org.thingsboard.server.gen.transport.TransportProtos;
 import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor;
 import org.thingsboard.server.transport.mqtt.session.DeviceSessionCtx;
 import org.thingsboard.server.transport.mqtt.session.GatewaySessionCtx;
@@ -48,18 +59,51 @@ import org.thingsboard.server.transport.mqtt.util.SslUtil;
 
 import javax.net.ssl.SSLPeerUnverifiedException;
 import javax.security.cert.X509Certificate;
+import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
+import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
-import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.*;
-import static io.netty.handler.codec.mqtt.MqttMessageType.*;
-import static io.netty.handler.codec.mqtt.MqttQoS.*;
-import static org.thingsboard.server.common.msg.session.SessionMsgType.*;
-import static org.thingsboard.server.transport.mqtt.MqttTopics.*;
+import org.thingsboard.server.gen.transport.TransportProtos.*;
+
+import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_ACCEPTED;
+import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD;
+import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED;
+import static io.netty.handler.codec.mqtt.MqttMessageType.CONNACK;
+import static io.netty.handler.codec.mqtt.MqttMessageType.PINGRESP;
+import static io.netty.handler.codec.mqtt.MqttMessageType.PUBACK;
+import static io.netty.handler.codec.mqtt.MqttMessageType.SUBACK;
+import static io.netty.handler.codec.mqtt.MqttMessageType.UNSUBACK;
+import static io.netty.handler.codec.mqtt.MqttQoS.AT_LEAST_ONCE;
+import static io.netty.handler.codec.mqtt.MqttQoS.AT_MOST_ONCE;
+import static io.netty.handler.codec.mqtt.MqttQoS.FAILURE;
+import static org.thingsboard.server.common.msg.session.SessionMsgType.GET_ATTRIBUTES_REQUEST;
+import static org.thingsboard.server.common.msg.session.SessionMsgType.POST_ATTRIBUTES_REQUEST;
+import static org.thingsboard.server.common.msg.session.SessionMsgType.POST_TELEMETRY_REQUEST;
+import static org.thingsboard.server.common.msg.session.SessionMsgType.SUBSCRIBE_ATTRIBUTES_REQUEST;
+import static org.thingsboard.server.common.msg.session.SessionMsgType.SUBSCRIBE_RPC_COMMANDS_REQUEST;
+import static org.thingsboard.server.common.msg.session.SessionMsgType.TO_DEVICE_RPC_RESPONSE;
+import static org.thingsboard.server.common.msg.session.SessionMsgType.TO_SERVER_RPC_REQUEST;
+import static org.thingsboard.server.common.msg.session.SessionMsgType.UNSUBSCRIBE_ATTRIBUTES_REQUEST;
+import static org.thingsboard.server.common.msg.session.SessionMsgType.UNSUBSCRIBE_RPC_COMMANDS_REQUEST;
+import static org.thingsboard.server.transport.mqtt.MqttTopics.BASE_GATEWAY_API_TOPIC;
+import static org.thingsboard.server.transport.mqtt.MqttTopics.DEVICE_ATTRIBUTES_REQUEST_TOPIC_PREFIX;
+import static org.thingsboard.server.transport.mqtt.MqttTopics.DEVICE_ATTRIBUTES_RESPONSES_TOPIC;
+import static org.thingsboard.server.transport.mqtt.MqttTopics.DEVICE_ATTRIBUTES_TOPIC;
+import static org.thingsboard.server.transport.mqtt.MqttTopics.DEVICE_RPC_REQUESTS_SUB_TOPIC;
+import static org.thingsboard.server.transport.mqtt.MqttTopics.DEVICE_RPC_REQUESTS_TOPIC;
+import static org.thingsboard.server.transport.mqtt.MqttTopics.DEVICE_RPC_RESPONSE_SUB_TOPIC;
+import static org.thingsboard.server.transport.mqtt.MqttTopics.DEVICE_RPC_RESPONSE_TOPIC;
+import static org.thingsboard.server.transport.mqtt.MqttTopics.DEVICE_TELEMETRY_TOPIC;
+import static org.thingsboard.server.transport.mqtt.MqttTopics.GATEWAY_ATTRIBUTES_REQUEST_TOPIC;
+import static org.thingsboard.server.transport.mqtt.MqttTopics.GATEWAY_ATTRIBUTES_TOPIC;
+import static org.thingsboard.server.transport.mqtt.MqttTopics.GATEWAY_CONNECT_TOPIC;
+import static org.thingsboard.server.transport.mqtt.MqttTopics.GATEWAY_DISCONNECT_TOPIC;
+import static org.thingsboard.server.transport.mqtt.MqttTopics.GATEWAY_RPC_TOPIC;
+import static org.thingsboard.server.transport.mqtt.MqttTopics.GATEWAY_TELEMETRY_TOPIC;
 
 /**
  * @author Andrew Shvayka
@@ -69,33 +113,34 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
 
     public static final MqttQoS MAX_SUPPORTED_QOS_LVL = AT_LEAST_ONCE;
 
-    private final DeviceSessionCtx deviceSessionCtx;
-    private final String sessionId;
+    private final UUID sessionId;
+    private final MqttTransportContext context;
     private final MqttTransportAdaptor adaptor;
-    private final SessionMsgProcessor processor;
-    private final DeviceService deviceService;
-    private final DeviceAuthService authService;
-    private final RelationService relationService;
+    private final TransportService transportService;
     private final QuotaService quotaService;
     private final SslHandler sslHandler;
     private final ConcurrentMap<String, Integer> mqttQoSMap;
 
-    private volatile boolean connected;
+    private final SessionInfoProto sessionInfo;
+
     private volatile InetSocketAddress address;
+    private volatile DeviceSessionCtx deviceSessionCtx;
     private volatile GatewaySessionCtx gatewaySessionCtx;
 
-    public MqttTransportHandler(SessionMsgProcessor processor, DeviceService deviceService, DeviceAuthService authService, RelationService relationService,
-                                MqttTransportAdaptor adaptor, SslHandler sslHandler, QuotaService quotaService) {
-        this.processor = processor;
-        this.deviceService = deviceService;
-        this.relationService = relationService;
-        this.authService = authService;
-        this.adaptor = adaptor;
+    public MqttTransportHandler(MqttTransportContext context) {
+        this.sessionId = UUID.randomUUID();
+        this.context = context;
+        this.transportService = context.getTransportService();
+        this.adaptor = context.getAdaptor();
+        this.quotaService = context.getQuotaService();
+        this.sslHandler = context.getSslHandler();
         this.mqttQoSMap = new ConcurrentHashMap<>();
-        this.deviceSessionCtx = new DeviceSessionCtx(processor, authService, adaptor, mqttQoSMap);
-        this.sessionId = deviceSessionCtx.getSessionId().toUidStr();
-        this.sslHandler = sslHandler;
-        this.quotaService = quotaService;
+        this.sessionInfo = SessionInfoProto.newBuilder()
+                .setNodeId(context.getNodeId())
+                .setSessionIdMSB(sessionId.getMostSignificantBits())
+                .setSessionIdLSB(sessionId.getLeastSignificantBits())
+                .build();
+        this.deviceSessionCtx = new DeviceSessionCtx(mqttQoSMap);
     }
 
     @Override
@@ -127,196 +172,196 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
             case CONNECT:
                 processConnect(ctx, (MqttConnectMessage) msg);
                 break;
-            case PUBLISH:
-                processPublish(ctx, (MqttPublishMessage) msg);
-                break;
-            case SUBSCRIBE:
-                processSubscribe(ctx, (MqttSubscribeMessage) msg);
-                break;
-            case UNSUBSCRIBE:
-                processUnsubscribe(ctx, (MqttUnsubscribeMessage) msg);
-                break;
-            case PINGREQ:
-                if (checkConnected(ctx)) {
-                    ctx.writeAndFlush(new MqttMessage(new MqttFixedHeader(PINGRESP, false, AT_MOST_ONCE, false, 0)));
-                }
-                break;
-            case DISCONNECT:
-                if (checkConnected(ctx)) {
-                    processDisconnect(ctx);
-                }
-                break;
+//            case PUBLISH:
+//                processPublish(ctx, (MqttPublishMessage) msg);
+//                break;
+//            case SUBSCRIBE:
+//                processSubscribe(ctx, (MqttSubscribeMessage) msg);
+//                break;
+//            case UNSUBSCRIBE:
+//                processUnsubscribe(ctx, (MqttUnsubscribeMessage) msg);
+//                break;
+//            case PINGREQ:
+//                if (checkConnected(ctx)) {
+//                    ctx.writeAndFlush(new MqttMessage(new MqttFixedHeader(PINGRESP, false, AT_MOST_ONCE, false, 0)));
+//                }
+//                break;
+//            case DISCONNECT:
+//                if (checkConnected(ctx)) {
+//                    processDisconnect(ctx);
+//                }
+//                break;
             default:
                 break;
         }
 
     }
 
-    private void processPublish(ChannelHandlerContext ctx, MqttPublishMessage mqttMsg) {
-        if (!checkConnected(ctx)) {
-            return;
-        }
-        String topicName = mqttMsg.variableHeader().topicName();
-        int msgId = mqttMsg.variableHeader().messageId();
-        log.trace("[{}] Processing publish msg [{}][{}]!", sessionId, topicName, msgId);
-
-        if (topicName.startsWith(BASE_GATEWAY_API_TOPIC)) {
-            if (gatewaySessionCtx != null) {
-                gatewaySessionCtx.setChannel(ctx);
-                handleMqttPublishMsg(topicName, msgId, mqttMsg);
-            }
-        } else {
-            processDevicePublish(ctx, mqttMsg, topicName, msgId);
-        }
-    }
-
-    private void handleMqttPublishMsg(String topicName, int msgId, MqttPublishMessage mqttMsg) {
-        try {
-            switch (topicName) {
-                case GATEWAY_TELEMETRY_TOPIC:
-                    gatewaySessionCtx.onDeviceTelemetry(mqttMsg);
-                    break;
-                case GATEWAY_ATTRIBUTES_TOPIC:
-                    gatewaySessionCtx.onDeviceAttributes(mqttMsg);
-                    break;
-                case GATEWAY_ATTRIBUTES_REQUEST_TOPIC:
-                    gatewaySessionCtx.onDeviceAttributesRequest(mqttMsg);
-                    break;
-                case GATEWAY_RPC_TOPIC:
-                    gatewaySessionCtx.onDeviceRpcResponse(mqttMsg);
-                    break;
-                case GATEWAY_CONNECT_TOPIC:
-                    gatewaySessionCtx.onDeviceConnect(mqttMsg);
-                    break;
-                case GATEWAY_DISCONNECT_TOPIC:
-                    gatewaySessionCtx.onDeviceDisconnect(mqttMsg);
-                    break;
-            }
-        } catch (RuntimeException | AdaptorException e) {
-            log.warn("[{}] Failed to process publish msg [{}][{}]", sessionId, topicName, msgId, e);
-        }
-    }
-
-    private void processDevicePublish(ChannelHandlerContext ctx, MqttPublishMessage mqttMsg, String topicName, int msgId) {
-        AdaptorToSessionActorMsg msg = null;
-        try {
-            if (topicName.equals(DEVICE_TELEMETRY_TOPIC)) {
-                msg = adaptor.convertToActorMsg(deviceSessionCtx, POST_TELEMETRY_REQUEST, mqttMsg);
-            } else if (topicName.equals(DEVICE_ATTRIBUTES_TOPIC)) {
-                msg = adaptor.convertToActorMsg(deviceSessionCtx, POST_ATTRIBUTES_REQUEST, mqttMsg);
-            } else if (topicName.startsWith(DEVICE_ATTRIBUTES_REQUEST_TOPIC_PREFIX)) {
-                msg = adaptor.convertToActorMsg(deviceSessionCtx, GET_ATTRIBUTES_REQUEST, mqttMsg);
-                if (msgId >= 0) {
-                    ctx.writeAndFlush(createMqttPubAckMsg(msgId));
-                }
-            } else if (topicName.startsWith(DEVICE_RPC_RESPONSE_TOPIC)) {
-                msg = adaptor.convertToActorMsg(deviceSessionCtx, TO_DEVICE_RPC_RESPONSE, mqttMsg);
-                if (msgId >= 0) {
-                    ctx.writeAndFlush(createMqttPubAckMsg(msgId));
-                }
-            } else if (topicName.startsWith(DEVICE_RPC_REQUESTS_TOPIC)) {
-                msg = adaptor.convertToActorMsg(deviceSessionCtx, TO_SERVER_RPC_REQUEST, mqttMsg);
-                if (msgId >= 0) {
-                    ctx.writeAndFlush(createMqttPubAckMsg(msgId));
-                }
-            }
-        } catch (AdaptorException e) {
-            log.warn("[{}] Failed to process publish msg [{}][{}]", sessionId, topicName, msgId, e);
-        }
-        if (msg != null) {
-            processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg));
-        } else {
-            log.info("[{}] Closing current session due to invalid publish msg [{}][{}]", sessionId, topicName, msgId);
-            ctx.close();
-        }
-    }
-
-    private void processSubscribe(ChannelHandlerContext ctx, MqttSubscribeMessage mqttMsg) {
-        if (!checkConnected(ctx)) {
-            return;
-        }
-        log.trace("[{}] Processing subscription [{}]!", sessionId, mqttMsg.variableHeader().messageId());
-        List<Integer> grantedQoSList = new ArrayList<>();
-        for (MqttTopicSubscription subscription : mqttMsg.payload().topicSubscriptions()) {
-            String topic = subscription.topicName();
-            MqttQoS reqQoS = subscription.qualityOfService();
-            try {
-                switch (topic) {
-                    case DEVICE_ATTRIBUTES_TOPIC: {
-                        AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, SUBSCRIBE_ATTRIBUTES_REQUEST, mqttMsg);
-                        processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg));
-                        registerSubQoS(topic, grantedQoSList, reqQoS);
-                        break;
-                    }
-                    case DEVICE_RPC_REQUESTS_SUB_TOPIC: {
-                        AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, SUBSCRIBE_RPC_COMMANDS_REQUEST, mqttMsg);
-                        processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg));
-                        registerSubQoS(topic, grantedQoSList, reqQoS);
-                        break;
-                    }
-                    case DEVICE_RPC_RESPONSE_SUB_TOPIC:
-                    case GATEWAY_ATTRIBUTES_TOPIC:
-                    case GATEWAY_RPC_TOPIC:
-                        registerSubQoS(topic, grantedQoSList, reqQoS);
-                        break;
-                    case DEVICE_ATTRIBUTES_RESPONSES_TOPIC:
-                        deviceSessionCtx.setAllowAttributeResponses();
-                        registerSubQoS(topic, grantedQoSList, reqQoS);
-                        break;
-                    default:
-                        log.warn("[{}] Failed to subscribe to [{}][{}]", sessionId, topic, reqQoS);
-                        grantedQoSList.add(FAILURE.value());
-                        break;
-                }
-            } catch (AdaptorException e) {
-                log.warn("[{}] Failed to subscribe to [{}][{}]", sessionId, topic, reqQoS);
-                grantedQoSList.add(FAILURE.value());
-            }
-        }
-        ctx.writeAndFlush(createSubAckMessage(mqttMsg.variableHeader().messageId(), grantedQoSList));
-    }
-
-    private void registerSubQoS(String topic, List<Integer> grantedQoSList, MqttQoS reqQoS) {
-        grantedQoSList.add(getMinSupportedQos(reqQoS));
-        mqttQoSMap.put(topic, getMinSupportedQos(reqQoS));
-    }
-
-    private void processUnsubscribe(ChannelHandlerContext ctx, MqttUnsubscribeMessage mqttMsg) {
-        if (!checkConnected(ctx)) {
-            return;
-        }
-        log.trace("[{}] Processing subscription [{}]!", sessionId, mqttMsg.variableHeader().messageId());
-        for (String topicName : mqttMsg.payload().topics()) {
-            mqttQoSMap.remove(topicName);
-            try {
-                switch (topicName) {
-                    case DEVICE_ATTRIBUTES_TOPIC: {
-                        AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, UNSUBSCRIBE_ATTRIBUTES_REQUEST, mqttMsg);
-                        processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg));
-                        break;
-                    }
-                    case DEVICE_RPC_REQUESTS_SUB_TOPIC: {
-                        AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, UNSUBSCRIBE_RPC_COMMANDS_REQUEST, mqttMsg);
-                        processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg));
-                        break;
-                    }
-                    case DEVICE_ATTRIBUTES_RESPONSES_TOPIC:
-                        deviceSessionCtx.setDisallowAttributeResponses();
-                        break;
-                }
-            } catch (AdaptorException e) {
-                log.warn("[{}] Failed to process unsubscription [{}] to [{}]", sessionId, mqttMsg.variableHeader().messageId(), topicName);
-            }
-        }
-        ctx.writeAndFlush(createUnSubAckMessage(mqttMsg.variableHeader().messageId()));
-    }
-
-    private MqttMessage createUnSubAckMessage(int msgId) {
-        MqttFixedHeader mqttFixedHeader =
-                new MqttFixedHeader(UNSUBACK, false, AT_LEAST_ONCE, false, 0);
-        MqttMessageIdVariableHeader mqttMessageIdVariableHeader = MqttMessageIdVariableHeader.from(msgId);
-        return new MqttMessage(mqttFixedHeader, mqttMessageIdVariableHeader);
-    }
+//    private void processPublish(ChannelHandlerContext ctx, MqttPublishMessage mqttMsg) {
+//        if (!checkConnected(ctx)) {
+//            return;
+//        }
+//        String topicName = mqttMsg.variableHeader().topicName();
+//        int msgId = mqttMsg.variableHeader().packetId();
+//        log.trace("[{}] Processing publish msg [{}][{}]!", sessionId, topicName, msgId);
+//
+//        if (topicName.startsWith(BASE_GATEWAY_API_TOPIC)) {
+//            if (gatewaySessionCtx != null) {
+//                gatewaySessionCtx.setChannel(ctx);
+//                handleMqttPublishMsg(topicName, msgId, mqttMsg);
+//            }
+//        } else {
+//            processDevicePublish(ctx, mqttMsg, topicName, msgId);
+//        }
+//    }
+//
+//    private void handleMqttPublishMsg(String topicName, int msgId, MqttPublishMessage mqttMsg) {
+//        try {
+//            switch (topicName) {
+//                case GATEWAY_TELEMETRY_TOPIC:
+//                    gatewaySessionCtx.onDeviceTelemetry(mqttMsg);
+//                    break;
+//                case GATEWAY_ATTRIBUTES_TOPIC:
+//                    gatewaySessionCtx.onDeviceAttributes(mqttMsg);
+//                    break;
+//                case GATEWAY_ATTRIBUTES_REQUEST_TOPIC:
+//                    gatewaySessionCtx.onDeviceAttributesRequest(mqttMsg);
+//                    break;
+//                case GATEWAY_RPC_TOPIC:
+//                    gatewaySessionCtx.onDeviceRpcResponse(mqttMsg);
+//                    break;
+//                case GATEWAY_CONNECT_TOPIC:
+//                    gatewaySessionCtx.onDeviceConnect(mqttMsg);
+//                    break;
+//                case GATEWAY_DISCONNECT_TOPIC:
+//                    gatewaySessionCtx.onDeviceDisconnect(mqttMsg);
+//                    break;
+//            }
+//        } catch (RuntimeException | AdaptorException e) {
+//            log.warn("[{}] Failed to process publish msg [{}][{}]", sessionId, topicName, msgId, e);
+//        }
+//    }
+//
+//    private void processDevicePublish(ChannelHandlerContext ctx, MqttPublishMessage mqttMsg, String topicName, int msgId) {
+//        AdaptorToSessionActorMsg msg = null;
+//        try {
+//            if (topicName.equals(DEVICE_TELEMETRY_TOPIC)) {
+//                msg = adaptor.convertToActorMsg(deviceSessionCtx, POST_TELEMETRY_REQUEST, mqttMsg);
+//            } else if (topicName.equals(DEVICE_ATTRIBUTES_TOPIC)) {
+//                msg = adaptor.convertToActorMsg(deviceSessionCtx, POST_ATTRIBUTES_REQUEST, mqttMsg);
+//            } else if (topicName.startsWith(DEVICE_ATTRIBUTES_REQUEST_TOPIC_PREFIX)) {
+//                msg = adaptor.convertToActorMsg(deviceSessionCtx, GET_ATTRIBUTES_REQUEST, mqttMsg);
+//                if (msgId >= 0) {
+//                    ctx.writeAndFlush(createMqttPubAckMsg(msgId));
+//                }
+//            } else if (topicName.startsWith(DEVICE_RPC_RESPONSE_TOPIC)) {
+//                msg = adaptor.convertToActorMsg(deviceSessionCtx, TO_DEVICE_RPC_RESPONSE, mqttMsg);
+//                if (msgId >= 0) {
+//                    ctx.writeAndFlush(createMqttPubAckMsg(msgId));
+//                }
+//            } else if (topicName.startsWith(DEVICE_RPC_REQUESTS_TOPIC)) {
+//                msg = adaptor.convertToActorMsg(deviceSessionCtx, TO_SERVER_RPC_REQUEST, mqttMsg);
+//                if (msgId >= 0) {
+//                    ctx.writeAndFlush(createMqttPubAckMsg(msgId));
+//                }
+//            }
+//        } catch (AdaptorException e) {
+//            log.warn("[{}] Failed to process publish msg [{}][{}]", sessionId, topicName, msgId, e);
+//        }
+//        if (msg != null) {
+//            processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg));
+//        } else {
+//            log.info("[{}] Closing current session due to invalid publish msg [{}][{}]", sessionId, topicName, msgId);
+//            ctx.close();
+//        }
+//    }
+//
+//    private void processSubscribe(ChannelHandlerContext ctx, MqttSubscribeMessage mqttMsg) {
+//        if (!checkConnected(ctx)) {
+//            return;
+//        }
+//        log.trace("[{}] Processing subscription [{}]!", sessionId, mqttMsg.variableHeader().messageId());
+//        List<Integer> grantedQoSList = new ArrayList<>();
+//        for (MqttTopicSubscription subscription : mqttMsg.payload().topicSubscriptions()) {
+//            String topic = subscription.topicName();
+//            MqttQoS reqQoS = subscription.qualityOfService();
+//            try {
+//                switch (topic) {
+//                    case DEVICE_ATTRIBUTES_TOPIC: {
+//                        AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, SUBSCRIBE_ATTRIBUTES_REQUEST, mqttMsg);
+//                        processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg));
+//                        registerSubQoS(topic, grantedQoSList, reqQoS);
+//                        break;
+//                    }
+//                    case DEVICE_RPC_REQUESTS_SUB_TOPIC: {
+//                        AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, SUBSCRIBE_RPC_COMMANDS_REQUEST, mqttMsg);
+//                        processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg));
+//                        registerSubQoS(topic, grantedQoSList, reqQoS);
+//                        break;
+//                    }
+//                    case DEVICE_RPC_RESPONSE_SUB_TOPIC:
+//                    case GATEWAY_ATTRIBUTES_TOPIC:
+//                    case GATEWAY_RPC_TOPIC:
+//                        registerSubQoS(topic, grantedQoSList, reqQoS);
+//                        break;
+//                    case DEVICE_ATTRIBUTES_RESPONSES_TOPIC:
+//                        deviceSessionCtx.setAllowAttributeResponses();
+//                        registerSubQoS(topic, grantedQoSList, reqQoS);
+//                        break;
+//                    default:
+//                        log.warn("[{}] Failed to subscribe to [{}][{}]", sessionId, topic, reqQoS);
+//                        grantedQoSList.add(FAILURE.value());
+//                        break;
+//                }
+//            } catch (AdaptorException e) {
+//                log.warn("[{}] Failed to subscribe to [{}][{}]", sessionId, topic, reqQoS);
+//                grantedQoSList.add(FAILURE.value());
+//            }
+//        }
+//        ctx.writeAndFlush(createSubAckMessage(mqttMsg.variableHeader().messageId(), grantedQoSList));
+//    }
+//
+//    private void registerSubQoS(String topic, List<Integer> grantedQoSList, MqttQoS reqQoS) {
+//        grantedQoSList.add(getMinSupportedQos(reqQoS));
+//        mqttQoSMap.put(topic, getMinSupportedQos(reqQoS));
+//    }
+//
+//    private void processUnsubscribe(ChannelHandlerContext ctx, MqttUnsubscribeMessage mqttMsg) {
+//        if (!checkConnected(ctx)) {
+//            return;
+//        }
+//        log.trace("[{}] Processing subscription [{}]!", sessionId, mqttMsg.variableHeader().messageId());
+//        for (String topicName : mqttMsg.payload().topics()) {
+//            mqttQoSMap.remove(topicName);
+//            try {
+//                switch (topicName) {
+//                    case DEVICE_ATTRIBUTES_TOPIC: {
+//                        AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, UNSUBSCRIBE_ATTRIBUTES_REQUEST, mqttMsg);
+//                        processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg));
+//                        break;
+//                    }
+//                    case DEVICE_RPC_REQUESTS_SUB_TOPIC: {
+//                        AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, UNSUBSCRIBE_RPC_COMMANDS_REQUEST, mqttMsg);
+//                        processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg));
+//                        break;
+//                    }
+//                    case DEVICE_ATTRIBUTES_RESPONSES_TOPIC:
+//                        deviceSessionCtx.setDisallowAttributeResponses();
+//                        break;
+//                }
+//            } catch (AdaptorException e) {
+//                log.warn("[{}] Failed to process unsubscription [{}] to [{}]", sessionId, mqttMsg.variableHeader().messageId(), topicName);
+//            }
+//        }
+//        ctx.writeAndFlush(createUnSubAckMessage(mqttMsg.variableHeader().messageId()));
+//    }
+//
+//    private MqttMessage createUnSubAckMessage(int msgId) {
+//        MqttFixedHeader mqttFixedHeader =
+//                new MqttFixedHeader(UNSUBACK, false, AT_LEAST_ONCE, false, 0);
+//        MqttMessageIdVariableHeader mqttMessageIdVariableHeader = MqttMessageIdVariableHeader.from(msgId);
+//        return new MqttMessage(mqttFixedHeader, mqttMessageIdVariableHeader);
+//    }
 
     private void processConnect(ChannelHandlerContext ctx, MqttConnectMessage msg) {
         log.info("[{}] Processing connect msg for client: {}!", sessionId, msg.payload().clientIdentifier());
@@ -333,36 +378,58 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
         if (StringUtils.isEmpty(userName)) {
             ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD));
             ctx.close();
-        } else if (!deviceSessionCtx.login(new DeviceTokenCredentials(msg.payload().userName()))) {
-            ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_NOT_AUTHORIZED));
-            ctx.close();
         } else {
-            ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED));
-            connected = true;
-            processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(),
-                    new BasicAdaptorToSessionActorMsg(deviceSessionCtx, new SessionOpenMsg())));
-            checkGatewaySession();
+            transportService.process(ValidateDeviceTokenRequestMsg.newBuilder().setToken(userName).build(),
+                    new TransportServiceCallback<ValidateDeviceTokenResponseMsg>() {
+                        @Override
+                        public void onSuccess(ValidateDeviceTokenResponseMsg msg) {
+                            if (!msg.hasDeviceInfo()) {
+                                ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_NOT_AUTHORIZED));
+                                ctx.close();
+                            } else {
+                                ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED));
+                                deviceSessionCtx.setDeviceInfo(deviceSessionCtx.getDeviceInfo());
+                                transportService.process(getSessionEventMsg(SessionEvent.OPEN), null);
+                                checkGatewaySession();
+                            }
+                        }
+
+                        @Override
+                        public void onError(Exception e) {
+                            log.trace("[{}] Failed to process credentials: {}", address, userName, e);
+                            ctx.writeAndFlush(createMqttConnAckMsg(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE));
+                            ctx.close();
+                        }
+                    });
         }
     }
 
+    protected SessionEventMsg getSessionEventMsg(SessionEvent event) {
+        return SessionEventMsg.newBuilder()
+                .setSessionInfo(sessionInfo)
+                .setDeviceIdMSB(deviceSessionCtx.getDeviceIdMSB())
+                .setDeviceIdLSB(deviceSessionCtx.getDeviceIdLSB())
+                .setEvent(event).build();
+    }
+
     private void processX509CertConnect(ChannelHandlerContext ctx, X509Certificate cert) {
-        try {
-            String strCert = SslUtil.getX509CertificateString(cert);
-            String sha3Hash = EncryptionUtil.getSha3Hash(strCert);
-            if (deviceSessionCtx.login(new DeviceX509Credentials(sha3Hash))) {
-                ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED));
-                connected = true;
-                processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(),
-                        new BasicAdaptorToSessionActorMsg(deviceSessionCtx, new SessionOpenMsg())));
-                checkGatewaySession();
-            } else {
-                ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_NOT_AUTHORIZED));
-                ctx.close();
-            }
-        } catch (Exception e) {
-            ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_NOT_AUTHORIZED));
-            ctx.close();
-        }
+//        try {
+//            String strCert = SslUtil.getX509CertificateString(cert);
+//            String sha3Hash = EncryptionUtil.getSha3Hash(strCert);
+//            if (deviceSessionCtx.login(new DeviceX509Credentials(sha3Hash))) {
+//                ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED));
+//                connected = true;
+//                processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(),
+//                        new BasicAdaptorToSessionActorMsg(deviceSessionCtx, new SessionOpenMsg())));
+//                checkGatewaySession();
+//            } else {
+//                ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_NOT_AUTHORIZED));
+//                ctx.close();
+//            }
+//        } catch (Exception e) {
+//            ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_NOT_AUTHORIZED));
+//            ctx.close();
+//        }
     }
 
     private X509Certificate getX509Certificate() {
@@ -380,8 +447,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
 
     private void processDisconnect(ChannelHandlerContext ctx) {
         ctx.close();
-        if (connected) {
-            processor.process(SessionCloseMsg.onDisconnect(deviceSessionCtx.getSessionId()));
+        if (deviceSessionCtx.isConnected()) {
+            transportService.process(getSessionEventMsg(SessionEvent.CLOSED), null);
             if (gatewaySessionCtx != null) {
                 gatewaySessionCtx.onGatewayDisconnect();
             }
@@ -428,7 +495,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
     }
 
     private boolean checkConnected(ChannelHandlerContext ctx) {
-        if (connected) {
+        if (deviceSessionCtx.isConnected()) {
             return true;
         } else {
             log.info("[{}] Closing current session due to invalid msg order [{}][{}]", sessionId);
@@ -438,18 +505,22 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
     }
 
     private void checkGatewaySession() {
-        Device device = deviceSessionCtx.getDevice();
-        JsonNode infoNode = device.getAdditionalInfo();
-        if (infoNode != null) {
-            JsonNode gatewayNode = infoNode.get("gateway");
-            if (gatewayNode != null && gatewayNode.asBoolean()) {
-                gatewaySessionCtx = new GatewaySessionCtx(processor, deviceService, authService, relationService, deviceSessionCtx);
+        DeviceInfoProto device = deviceSessionCtx.getDeviceInfo();
+        try {
+            JsonNode infoNode = context.getMapper().readTree(device.getAdditionalInfo());
+            if (infoNode != null) {
+                JsonNode gatewayNode = infoNode.get("gateway");
+                if (gatewayNode != null && gatewayNode.asBoolean()) {
+                    gatewaySessionCtx = new GatewaySessionCtx(deviceSessionCtx);
+                }
             }
+        } catch (IOException e) {
+            log.trace("[{}][{}] Failed to fetch device additional info", sessionId, device.getDeviceName(), e);
         }
     }
 
     @Override
     public void operationComplete(Future<? super Void> future) throws Exception {
-        processor.process(SessionCloseMsg.onError(deviceSessionCtx.getSessionId()));
+        transportService.process(getSessionEventMsg(SessionEvent.CLOSED), null);
     }
 }
diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/DeviceSessionCtx.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/DeviceSessionCtx.java
index 3dbb3ef..3f29a82 100644
--- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/DeviceSessionCtx.java
+++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/DeviceSessionCtx.java
@@ -40,15 +40,13 @@ import java.util.concurrent.atomic.AtomicInteger;
 @Slf4j
 public class DeviceSessionCtx extends MqttDeviceAwareSessionContext {
 
-    private final MqttTransportAdaptor adaptor;
     private final MqttSessionId sessionId;
     private ChannelHandlerContext channel;
     private volatile boolean allowAttributeResponses;
     private AtomicInteger msgIdSeq = new AtomicInteger(0);
 
-    public DeviceSessionCtx(SessionMsgProcessor processor, DeviceAuthService authService, MqttTransportAdaptor adaptor, ConcurrentMap<String, Integer> mqttQoSMap) {
-        super(processor, authService, mqttQoSMap);
-        this.adaptor = adaptor;
+    public DeviceSessionCtx(ConcurrentMap<String, Integer> mqttQoSMap) {
+        super(null, null, null);
         this.sessionId = new MqttSessionId();
     }
 
diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java
index 98ad6d2..dec8602 100644
--- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java
+++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java
@@ -78,6 +78,10 @@ public class GatewaySessionCtx {
         this.mqttQoSMap = gatewaySessionCtx.getMqttQoSMap();
     }
 
+    public GatewaySessionCtx(DeviceSessionCtx deviceSessionCtx) {
+
+    }
+
     public void onDeviceConnect(MqttPublishMessage msg) throws AdaptorException {
         JsonElement json = getJson(msg);
         String deviceName = checkDeviceName(getDeviceName(json));
diff --git a/transport/mqtt-transport/pom.xml b/transport/mqtt-transport/pom.xml
new file mode 100644
index 0000000..8d8b24d
--- /dev/null
+++ b/transport/mqtt-transport/pom.xml
@@ -0,0 +1,88 @@
+<!--
+
+    Copyright © 2016-2018 The Thingsboard Authors
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+        http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.thingsboard</groupId>
+        <version>2.2.0-SNAPSHOT</version>
+        <artifactId>transport</artifactId>
+    </parent>
+    <groupId>org.thingsboard.transport</groupId>
+    <artifactId>mqtt-transport</artifactId>
+    <packaging>jar</packaging>
+
+    <name>Thingsboard MQTT Transport Service</name>
+    <url>https://thingsboard.io</url>
+
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+        <main.dir>${basedir}/../..</main.dir>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.thingsboard.common</groupId>
+            <artifactId>transport</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-all</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework</groupId>
+            <artifactId>spring-context</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>log4j-over-slf4j</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-classic</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-test</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-all</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+</project>
diff --git a/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/ThingsboardMqttTransportApplication.java b/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/ThingsboardMqttTransportApplication.java
new file mode 100644
index 0000000..3c54938
--- /dev/null
+++ b/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/ThingsboardMqttTransportApplication.java
@@ -0,0 +1,48 @@
+package org.thingsboard.server.mqtt; /**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.SpringBootConfiguration;
+import org.springframework.context.annotation.ComponentScan;
+import org.springframework.scheduling.annotation.EnableAsync;
+import org.springframework.scheduling.annotation.EnableScheduling;
+import springfox.documentation.swagger2.annotations.EnableSwagger2;
+
+import java.util.Arrays;
+
+@SpringBootConfiguration
+@EnableAsync
+@EnableScheduling
+@ComponentScan({"org.thingsboard.server"})
+public class ThingsboardMqttTransportApplication {
+
+    private static final String SPRING_CONFIG_NAME_KEY = "--spring.config.name";
+    private static final String DEFAULT_SPRING_CONFIG_PARAM = SPRING_CONFIG_NAME_KEY + "=" + "tb-mqtt-transport";
+
+    public static void main(String[] args) {
+        SpringApplication.run(ThingsboardMqttTransportApplication.class, updateArguments(args));
+    }
+
+    private static String[] updateArguments(String[] args) {
+        if (Arrays.stream(args).noneMatch(arg -> arg.startsWith(SPRING_CONFIG_NAME_KEY))) {
+            String[] modifiedArgs = new String[args.length + 1];
+            System.arraycopy(args, 0, modifiedArgs, 0, args.length);
+            modifiedArgs[args.length] = DEFAULT_SPRING_CONFIG_PARAM;
+            return modifiedArgs;
+        }
+        return args;
+    }
+}
diff --git a/transport/mqtt-transport/src/main/resources/tb-mqtt-transport.yml b/transport/mqtt-transport/src/main/resources/tb-mqtt-transport.yml
new file mode 100644
index 0000000..c12f25e
--- /dev/null
+++ b/transport/mqtt-transport/src/main/resources/tb-mqtt-transport.yml
@@ -0,0 +1,57 @@
+#
+# Copyright © 2016-2018 The Thingsboard Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+spring.main.web-application-type: none
+
+# MQTT server parameters
+mqtt:
+  # Enable/disable mqtt transport protocol.
+  enabled: "${MQTT_ENABLED:true}"
+  bind_address: "${MQTT_BIND_ADDRESS:0.0.0.0}"
+  bind_port: "${MQTT_BIND_PORT:1883}"
+  adaptor: "${MQTT_ADAPTOR_NAME:JsonMqttAdaptor}"
+  timeout: "${MQTT_TIMEOUT:10000}"
+  netty:
+    leak_detector_level: "${NETTY_LEASK_DETECTOR_LVL:DISABLED}"
+    boss_group_thread_count: "${NETTY_BOSS_GROUP_THREADS:1}"
+    worker_group_thread_count: "${NETTY_WORKER_GROUP_THREADS:12}"
+    max_payload_size: "${NETTY_MAX_PAYLOAD_SIZE:65536}"
+  # MQTT SSL configuration
+  ssl:
+    # Enable/disable SSL support
+    enabled: "${MQTT_SSL_ENABLED:false}"
+    # SSL protocol: See http://docs.oracle.com/javase/8/docs/technotes/guides/security/StandardNames.html#SSLContext
+    protocol: "${MQTT_SSL_PROTOCOL:TLSv1.2}"
+    # Path to the key store that holds the SSL certificate
+    key_store: "${MQTT_SSL_KEY_STORE:mqttserver.jks}"
+    # Password used to access the key store
+    key_store_password: "${MQTT_SSL_KEY_STORE_PASSWORD:server_ks_password}"
+    # Password used to access the key
+    key_password: "${MQTT_SSL_KEY_PASSWORD:server_key_password}"
+    # Type of the key store
+    key_store_type: "${MQTT_SSL_KEY_STORE_TYPE:JKS}"
+
+kafka:
+  enabled: true
+  bootstrap.servers: "${TB_KAFKA_SERVERS:localhost:9092}"
+  acks: "${TB_KAFKA_ACKS:all}"
+  retries: "${TB_KAFKA_RETRIES:1}"
+  batch.size: "${TB_KAFKA_BATCH_SIZE:16384}"
+  linger.ms: "${TB_KAFKA_LINGER_MS:1}"
+  buffer.memory: "${TB_BUFFER_MEMORY:33554432}"
+  topic:
+    telemetry: "${TB_TELEMETRY_TOPIC:tb.transport.telemetry}"
+    requests: "${TB_TELEMETRY_TOPIC:tb.transport.requests}"
\ No newline at end of file
diff --git a/transport/pom.xml b/transport/pom.xml
index 95b0112..a01c7ac 100644
--- a/transport/pom.xml
+++ b/transport/pom.xml
@@ -38,6 +38,7 @@
         <module>http</module>
         <module>coap</module>
         <module>mqtt</module>
+        <module>mqtt-transport</module>
     </modules>
 
     <dependencies>