thingsboard-aplcache

Merge pull request #37 from thingsboard/feature/TB-33 TB-33:

1/28/2017 11:26:24 PM

Changes

Details

diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml
index c609414..d2cac80 100644
--- a/application/src/main/resources/thingsboard.yml
+++ b/application/src/main/resources/thingsboard.yml
@@ -81,12 +81,11 @@ mqtt:
     worker_group_thread_count: "${NETTY_WORKER_GROUP_THREADS:12}"
 # Uncomment the following lines to enable ssl for MQTT
 #  ssl:
-#    key_store: keystore/mqttserver.jks
+#    key_store: mqttserver.jks
 #    key_store_password: server_ks_password
 #    key_password: server_key_password
 #    key_store_type: JKS
 
-
 # CoAP server parameters
 coap:
   bind_address: "${COAP_BIND_ADDRESS:0.0.0.0}"
diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/session/ctrl/SessionCloseMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/session/ctrl/SessionCloseMsg.java
index 6d957ce..c7baaaf 100644
--- a/common/message/src/main/java/org/thingsboard/server/common/msg/session/ctrl/SessionCloseMsg.java
+++ b/common/message/src/main/java/org/thingsboard/server/common/msg/session/ctrl/SessionCloseMsg.java
@@ -24,6 +24,10 @@ public class SessionCloseMsg implements SessionCtrlMsg {
     private final boolean revoked;
     private final boolean timeout;
 
+    public static SessionCloseMsg onDisconnect(SessionId sessionId) {
+        return new SessionCloseMsg(sessionId, false, false);
+    }
+
     public static SessionCloseMsg onError(SessionId sessionId) {
         return new SessionCloseMsg(sessionId, false, false);
     }
diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/session/SessionContext.java b/common/message/src/main/java/org/thingsboard/server/common/msg/session/SessionContext.java
index 4a39225..0b138df 100644
--- a/common/message/src/main/java/org/thingsboard/server/common/msg/session/SessionContext.java
+++ b/common/message/src/main/java/org/thingsboard/server/common/msg/session/SessionContext.java
@@ -27,8 +27,6 @@ public interface SessionContext extends SessionAwareMsg {
 
     void onMsg(SessionCtrlMsg msg) throws SessionException;
 
-    void onError(SessionException e);
-
     boolean isClosed();
 
     long getTimeout();
diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/adaptor/JsonConverter.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/adaptor/JsonConverter.java
index 93e764e..640cce7 100644
--- a/common/transport/src/main/java/org/thingsboard/server/common/transport/adaptor/JsonConverter.java
+++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/adaptor/JsonConverter.java
@@ -74,7 +74,7 @@ public class JsonConverter {
         }
     }
 
-    private static void parseWithTs(BasicTelemetryUploadRequest request, JsonObject jo) {
+    public static void parseWithTs(BasicTelemetryUploadRequest request, JsonObject jo) {
         long ts = jo.get("ts").getAsLong();
         JsonObject valuesObject = jo.get("values").getAsJsonObject();
         for (KvEntry entry : parseValues(valuesObject)) {
@@ -82,7 +82,7 @@ public class JsonConverter {
         }
     }
 
-    private static List<KvEntry> parseValues(JsonObject valuesObject) {
+    public static List<KvEntry> parseValues(JsonObject valuesObject) {
         List<KvEntry> result = new ArrayList<>();
         for (Entry<String, JsonElement> valueEntry : valuesObject.entrySet()) {
             JsonElement element = valueEntry.getValue();
diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/session/DeviceAwareSessionContext.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/session/DeviceAwareSessionContext.java
index 8321c04..89debed 100644
--- a/common/transport/src/main/java/org/thingsboard/server/common/transport/session/DeviceAwareSessionContext.java
+++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/session/DeviceAwareSessionContext.java
@@ -42,6 +42,12 @@ public abstract class DeviceAwareSessionContext implements SessionContext {
         this.authService = authService;
     }
 
+    public DeviceAwareSessionContext(SessionMsgProcessor processor, DeviceAuthService authService, Device device) {
+        this(processor, authService);
+        this.device = device;
+    }
+
+
     public boolean login(DeviceCredentialsFilter credentials) {
         DeviceAuthResult result = authService.process(credentials);
         if (result.isSuccess()) {
@@ -56,6 +62,14 @@ public abstract class DeviceAwareSessionContext implements SessionContext {
         }
     }
 
+    public DeviceAuthService getAuthService() {
+        return authService;
+    }
+
+    public SessionMsgProcessor getProcessor() {
+        return processor;
+    }
+
     public Device getDevice() {
         return device;
     }
diff --git a/dao/src/main/java/org/thingsboard/server/dao/device/DeviceService.java b/dao/src/main/java/org/thingsboard/server/dao/device/DeviceService.java
index ad4c338..8d780b6 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/device/DeviceService.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/device/DeviceService.java
@@ -22,10 +22,14 @@ import org.thingsboard.server.common.data.id.TenantId;
 import org.thingsboard.server.common.data.page.TextPageData;
 import org.thingsboard.server.common.data.page.TextPageLink;
 
+import java.util.Optional;
+
 public interface DeviceService {
     
     Device findDeviceById(DeviceId deviceId);
 
+    Optional<Device> findDeviceByTenantIdAndName(TenantId tenantId, String name);
+
     Device saveDevice(Device device);
 
     Device assignDeviceToCustomer(DeviceId deviceId, CustomerId customerId);
diff --git a/dao/src/main/java/org/thingsboard/server/dao/device/DeviceServiceImpl.java b/dao/src/main/java/org/thingsboard/server/dao/device/DeviceServiceImpl.java
index 3a3c018..681188e 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/device/DeviceServiceImpl.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/device/DeviceServiceImpl.java
@@ -38,6 +38,7 @@ import org.thingsboard.server.dao.service.PaginatedRemover;
 import org.thingsboard.server.dao.tenant.TenantDao;
 
 import java.util.List;
+import java.util.Optional;
 
 import static org.thingsboard.server.dao.DaoUtil.convertDataList;
 import static org.thingsboard.server.dao.DaoUtil.getData;
@@ -70,6 +71,18 @@ public class DeviceServiceImpl implements DeviceService {
     }
 
     @Override
+    public Optional<Device> findDeviceByTenantIdAndName(TenantId tenantId, String name) {
+        log.trace("Executing findDeviceByTenantIdAndName [{}][{}]", tenantId, name);
+        validateId(tenantId, "Incorrect tenantId " + tenantId);
+        Optional<DeviceEntity> deviceEntityOpt = deviceDao.findDevicesByTenantIdAndName(tenantId.getId(), name);
+        if (deviceEntityOpt.isPresent()) {
+            return Optional.of(getData(deviceEntityOpt.get()));
+        } else {
+            return Optional.empty();
+        }
+    }
+
+    @Override
     public Device saveDevice(Device device) {
         log.trace("Executing saveDevice [{}]", device);
         deviceValidator.validate(device);
diff --git a/tools/src/main/shell/keygen.properties b/tools/src/main/shell/keygen.properties
index 9435746..8dd11f2 100644
--- a/tools/src/main/shell/keygen.properties
+++ b/tools/src/main/shell/keygen.properties
@@ -17,8 +17,8 @@
 DOMAIN_SUFFIX="$(hostname)"
 ORGANIZATIONAL_UNIT=Thingsboard
 ORGANIZATION=Thingsboard
-CITY=Piscataway
-STATE_OR_PROVINCE=NJ
+CITY=San Francisco
+STATE_OR_PROVINCE=CA
 TWO_LETTER_COUNTRY_CODE=US
 
 SERVER_KEYSTORE_PASSWORD=server_ks_password
@@ -26,10 +26,10 @@ SERVER_KEY_PASSWORD=server_key_password
 
 SERVER_KEY_ALIAS="serveralias"
 SERVER_FILE_PREFIX="mqttserver"
-SERVER_KEYSTORE_DIR="../../../../application/src/main/resources/keystore/"
+SERVER_KEYSTORE_DIR="/etc/thingsboard/conf"
 
-CLIENT_KEYSTORE_PASSWORD=client_ks_password
-CLIENT_KEY_PASSWORD=client_key_password
+CLIENT_KEYSTORE_PASSWORD=password
+CLIENT_KEY_PASSWORD=password
 
 CLIENT_KEY_ALIAS="clientalias"
 CLIENT_FILE_PREFIX="mqttclient"
diff --git a/tools/src/main/shell/server.keygen.sh b/tools/src/main/shell/server.keygen.sh
index cfeaa0c..cfa4683 100755
--- a/tools/src/main/shell/server.keygen.sh
+++ b/tools/src/main/shell/server.keygen.sh
@@ -122,25 +122,25 @@ fi
 
 if [[ $COPY = true ]]; then
     if [[ -z "$COPY_DIR" ]]; then
-        read -p  "Do you want to copy $SERVER_FILE_PREFIX.jks to server directory?[yes]" yn
-            while :
-            do
-                case $yn in
-                    [nN]|[nN][oO])
-                        break
-                        ;;
-                    [yY]|[yY][eE]|[yY][eE]|[sS]|[yY]|"")
-                        read -p "(Default: $SERVER_KEYSTORE_DIR): " dir
-                         if [[ !  -z  $dir  ]]; then
-                            DESTINATION=$dir;
-                         else
-                            DESTINATION=$SERVER_KEYSTORE_DIR
-                         fi;
-                         break;;
-                    *)  echo "Please reply 'yes' or 'no'"
-                        ;;
-                 esac
-             done
+        while :
+        do
+            read -p  "Do you want to copy $SERVER_FILE_PREFIX.jks to server directory? [Y/N]: " yn
+            case $yn in
+                [nN]|[nN][oO])
+                    break
+                    ;;
+                [yY]|[yY][eE]|[yY][eE]|[sS]|[yY]|"")
+                    read -p "(Default: $SERVER_KEYSTORE_DIR): " dir
+                     if [[ !  -z  $dir  ]]; then
+                        DESTINATION=$dir;
+                     else
+                        DESTINATION=$SERVER_KEYSTORE_DIR
+                     fi;
+                     break;;
+                *)  echo "Please reply 'yes' or 'no'"
+                    ;;
+             esac
+         done
     else
         DESTINATION=$COPY_DIR
     fi
diff --git a/transport/coap/src/main/java/org/thingsboard/server/transport/coap/session/CoapSessionCtx.java b/transport/coap/src/main/java/org/thingsboard/server/transport/coap/session/CoapSessionCtx.java
index e9b8e22..cecc42d 100644
--- a/transport/coap/src/main/java/org/thingsboard/server/transport/coap/session/CoapSessionCtx.java
+++ b/transport/coap/src/main/java/org/thingsboard/server/transport/coap/session/CoapSessionCtx.java
@@ -96,17 +96,6 @@ public class CoapSessionCtx extends DeviceAwareSessionContext {
     }
 
     @Override
-    public void onError(SessionException e) {
-        if (e instanceof SessionAuthException) {
-            log.warn("[{}] onError: {}", sessionId, e.getMessage());
-            exchange.respond(ResponseCode.UNAUTHORIZED);
-        } else {
-            log.warn("[{}] onError: {}", sessionId, e.getMessage(), e);
-            exchange.respond(ResponseCode.BAD_REQUEST);
-        }
-    }
-
-    @Override
     public SessionId getSessionId() {
         return sessionId;
     }
diff --git a/transport/http/src/main/java/org/thingsboard/server/transport/http/session/HttpSessionCtx.java b/transport/http/src/main/java/org/thingsboard/server/transport/http/session/HttpSessionCtx.java
index efaa0cd..4bee595 100644
--- a/transport/http/src/main/java/org/thingsboard/server/transport/http/session/HttpSessionCtx.java
+++ b/transport/http/src/main/java/org/thingsboard/server/transport/http/session/HttpSessionCtx.java
@@ -141,11 +141,6 @@ public class HttpSessionCtx extends DeviceAwareSessionContext {
     }
 
     @Override
-    public void onError(SessionException e) {
-
-    }
-
-    @Override
     public boolean isClosed() {
         return false;
     }
diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttAdaptor.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttAdaptor.java
index c2cd86e..bf033dc 100644
--- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttAdaptor.java
+++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttAdaptor.java
@@ -15,23 +15,31 @@
  */
 package org.thingsboard.server.transport.mqtt.adaptors;
 
-import com.google.gson.*;
+import com.google.gson.Gson;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonParser;
+import com.google.gson.JsonSyntaxException;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
 import io.netty.buffer.UnpooledByteBufAllocator;
 import io.netty.handler.codec.mqtt.*;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.stereotype.Component;
+import org.thingsboard.server.common.data.id.SessionId;
 import org.thingsboard.server.common.msg.core.*;
 import org.thingsboard.server.common.msg.kv.AttributesKVMsg;
 import org.thingsboard.server.common.msg.session.*;
 import org.thingsboard.server.common.transport.adaptor.AdaptorException;
 import org.thingsboard.server.common.transport.adaptor.JsonConverter;
+import org.thingsboard.server.transport.mqtt.MqttTopics;
+import org.thingsboard.server.transport.mqtt.session.DeviceSessionCtx;
 import org.thingsboard.server.transport.mqtt.MqttTransportHandler;
-import org.thingsboard.server.transport.mqtt.session.MqttSessionCtx;
 
 import java.nio.charset.Charset;
-import java.util.*;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
 
 /**
  * @author Andrew Shvayka
@@ -45,7 +53,7 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor {
     private static final ByteBufAllocator ALLOCATOR = new UnpooledByteBufAllocator(false);
 
     @Override
-    public AdaptorToSessionActorMsg convertToActorMsg(MqttSessionCtx ctx, MsgType type, MqttMessage inbound) throws AdaptorException {
+    public AdaptorToSessionActorMsg convertToActorMsg(DeviceSessionCtx ctx, MsgType type, MqttMessage inbound) throws AdaptorException {
         FromDeviceMsg msg;
         switch (type) {
             case POST_TELEMETRY_REQUEST:
@@ -83,7 +91,7 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor {
     }
 
     @Override
-    public Optional<MqttMessage> convertToAdaptorMsg(MqttSessionCtx ctx, SessionActorToAdaptorMsg sessionMsg) throws AdaptorException {
+    public Optional<MqttMessage> convertToAdaptorMsg(DeviceSessionCtx ctx, SessionActorToAdaptorMsg sessionMsg) throws AdaptorException {
         MqttMessage result = null;
         ToDeviceMsg msg = sessionMsg.getMsg();
         switch (msg.getMsgType()) {
@@ -100,7 +108,7 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor {
                             GetAttributesResponse response = (GetAttributesResponse) msg;
                             if (response.isSuccess()) {
                                 result = createMqttPublishMsg(ctx,
-                                        MqttTransportHandler.ATTRIBUTES_RESPONSE_TOPIC_PREFIX + requestId,
+                                        MqttTopics.DEVICE_ATTRIBUTES_RESPONSE_TOPIC_PREFIX + requestId,
                                         response.getData().get(), true);
                             } else {
                                 throw new AdaptorException(response.getError().get());
@@ -115,16 +123,16 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor {
                 break;
             case ATTRIBUTES_UPDATE_NOTIFICATION:
                 AttributesUpdateNotification notification = (AttributesUpdateNotification) msg;
-                result = createMqttPublishMsg(ctx, MqttTransportHandler.ATTRIBUTES_TOPIC, notification.getData(), false);
+                result = createMqttPublishMsg(ctx, MqttTopics.DEVICE_ATTRIBUTES_TOPIC, notification.getData(), false);
                 break;
             case TO_DEVICE_RPC_REQUEST:
                 ToDeviceRpcRequestMsg rpcRequest = (ToDeviceRpcRequestMsg) msg;
-                result = createMqttPublishMsg(ctx, MqttTransportHandler.RPC_REQUESTS_TOPIC + rpcRequest.getRequestId(),
+                result = createMqttPublishMsg(ctx, MqttTopics.DEVICE_RPC_REQUESTS_TOPIC + rpcRequest.getRequestId(),
                         rpcRequest);
                 break;
             case TO_SERVER_RPC_RESPONSE:
                 ToServerRpcResponseMsg rpcResponse = (ToServerRpcResponseMsg) msg;
-                result = createMqttPublishMsg(ctx, MqttTransportHandler.RPC_REQUESTS_TOPIC + rpcResponse.getRequestId(),
+                result = createMqttPublishMsg(ctx, MqttTopics.DEVICE_RPC_REQUESTS_TOPIC + rpcResponse.getRequestId(),
                         rpcResponse);
                 break;
             case RULE_ENGINE_ERROR:
@@ -135,19 +143,19 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor {
         return Optional.ofNullable(result);
     }
 
-    private MqttPublishMessage createMqttPublishMsg(MqttSessionCtx ctx, String topic, AttributesKVMsg msg, boolean asMap) {
+    private MqttPublishMessage createMqttPublishMsg(DeviceSessionCtx ctx, String topic, AttributesKVMsg msg, boolean asMap) {
         return createMqttPublishMsg(ctx, topic, JsonConverter.toJson(msg, asMap));
     }
 
-    private MqttPublishMessage createMqttPublishMsg(MqttSessionCtx ctx, String topic, ToDeviceRpcRequestMsg msg) {
+    private MqttPublishMessage createMqttPublishMsg(DeviceSessionCtx ctx, String topic, ToDeviceRpcRequestMsg msg) {
         return createMqttPublishMsg(ctx, topic, JsonConverter.toJson(msg, false));
     }
 
-    private MqttPublishMessage createMqttPublishMsg(MqttSessionCtx ctx, String topic, ToServerRpcResponseMsg msg) {
+    private MqttPublishMessage createMqttPublishMsg(DeviceSessionCtx ctx, String topic, ToServerRpcResponseMsg msg) {
         return createMqttPublishMsg(ctx, topic, JsonConverter.toJson(msg));
     }
 
-    private MqttPublishMessage createMqttPublishMsg(MqttSessionCtx ctx, String topic, JsonElement json) {
+    private MqttPublishMessage createMqttPublishMsg(DeviceSessionCtx ctx, String topic, JsonElement json) {
         MqttFixedHeader mqttFixedHeader =
                 new MqttFixedHeader(MqttMessageType.PUBLISH, false, MqttQoS.AT_LEAST_ONCE, false, 0);
         MqttPublishVariableHeader header = new MqttPublishVariableHeader(topic, ctx.nextMsgId());
@@ -156,10 +164,10 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor {
         return new MqttPublishMessage(mqttFixedHeader, header, payload);
     }
 
-    private FromDeviceMsg convertToGetAttributesRequest(MqttSessionCtx ctx, MqttPublishMessage inbound) throws AdaptorException {
+    private FromDeviceMsg convertToGetAttributesRequest(DeviceSessionCtx ctx, MqttPublishMessage inbound) throws AdaptorException {
         String topicName = inbound.variableHeader().topicName();
         try {
-            Integer requestId = Integer.valueOf(topicName.substring(MqttTransportHandler.ATTRIBUTES_REQUEST_TOPIC_PREFIX.length()));
+            Integer requestId = Integer.valueOf(topicName.substring(MqttTopics.DEVICE_ATTRIBUTES_REQUEST_TOPIC_PREFIX.length()));
             String payload = inbound.payload().toString(UTF8);
             JsonElement requestBody = new JsonParser().parse(payload);
             Set<String> clientKeys = toStringSet(requestBody, "clientKeys");
@@ -175,10 +183,10 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor {
         }
     }
 
-    private FromDeviceMsg convertToRpcCommandResponse(MqttSessionCtx ctx, MqttPublishMessage inbound) throws AdaptorException {
+    private FromDeviceMsg convertToRpcCommandResponse(DeviceSessionCtx ctx, MqttPublishMessage inbound) throws AdaptorException {
         String topicName = inbound.variableHeader().topicName();
         try {
-            Integer requestId = Integer.valueOf(topicName.substring(MqttTransportHandler.RPC_RESPONSE_TOPIC.length()));
+            Integer requestId = Integer.valueOf(topicName.substring(MqttTopics.DEVICE_RPC_RESPONSE_TOPIC.length()));
             String payload = inbound.payload().toString(UTF8);
             return new ToDeviceRpcResponseMsg(
                     requestId,
@@ -199,7 +207,7 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor {
     }
 
     private UpdateAttributesRequest convertToUpdateAttributesRequest(SessionContext ctx, MqttPublishMessage inbound) throws AdaptorException {
-        String payload = validatePayload(ctx, inbound.payload());
+        String payload = validatePayload(ctx.getSessionId(), inbound.payload());
         try {
             return JsonConverter.convertToAttributes(new JsonParser().parse(payload), inbound.variableHeader().messageId());
         } catch (IllegalStateException | JsonSyntaxException ex) {
@@ -208,7 +216,7 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor {
     }
 
     private TelemetryUploadRequest convertToTelemetryUploadRequest(SessionContext ctx, MqttPublishMessage inbound) throws AdaptorException {
-        String payload = validatePayload(ctx, inbound.payload());
+        String payload = validatePayload(ctx.getSessionId(), inbound.payload());
         try {
             return JsonConverter.convertToTelemetry(new JsonParser().parse(payload), inbound.variableHeader().messageId());
         } catch (IllegalStateException | JsonSyntaxException ex) {
@@ -216,22 +224,31 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor {
         }
     }
 
-    private FromDeviceMsg convertToServerRpcRequest(MqttSessionCtx ctx, MqttPublishMessage inbound) throws AdaptorException {
+    private FromDeviceMsg convertToServerRpcRequest(DeviceSessionCtx ctx, MqttPublishMessage inbound) throws AdaptorException {
         String topicName = inbound.variableHeader().topicName();
-        String payload = validatePayload(ctx, inbound.payload());
+        String payload = validatePayload(ctx.getSessionId(), inbound.payload());
         try {
-            Integer requestId = Integer.valueOf(topicName.substring(MqttTransportHandler.RPC_REQUESTS_TOPIC.length()));
+            Integer requestId = Integer.valueOf(topicName.substring(MqttTopics.DEVICE_RPC_REQUESTS_TOPIC.length()));
             return JsonConverter.convertToServerRpcRequest(new JsonParser().parse(payload), requestId);
         } catch (IllegalStateException | JsonSyntaxException ex) {
             throw new AdaptorException(ex);
         }
     }
 
-    private String validatePayload(SessionContext ctx, ByteBuf payloadData) throws AdaptorException {
+    public static JsonElement validateJsonPayload(SessionId sessionId, ByteBuf payloadData) throws AdaptorException {
+        String payload = validatePayload(sessionId, payloadData);
+        try {
+            return new JsonParser().parse(payload);
+        } catch (JsonSyntaxException ex) {
+            throw new AdaptorException(ex);
+        }
+    }
+
+    public static String validatePayload(SessionId sessionId, ByteBuf payloadData) throws AdaptorException {
         try {
             String payload = payloadData.toString(UTF8);
             if (payload == null) {
-                log.warn("[{}] Payload is empty!", ctx.getSessionId());
+                log.warn("[{}] Payload is empty!", sessionId.toUidStr());
                 throw new AdaptorException(new IllegalArgumentException("Payload is empty!"));
             }
             return payload;
diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/MqttGatewayAdaptor.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/MqttGatewayAdaptor.java
new file mode 100644
index 0000000..5641af5
--- /dev/null
+++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/MqttGatewayAdaptor.java
@@ -0,0 +1,36 @@
+/**
+ * Copyright © 2016-2017 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.transport.mqtt.adaptors;
+
+import io.netty.handler.codec.mqtt.MqttMessage;
+import org.thingsboard.server.common.msg.session.AdaptorToSessionActorMsg;
+import org.thingsboard.server.common.msg.session.MsgType;
+import org.thingsboard.server.common.msg.session.SessionActorToAdaptorMsg;
+import org.thingsboard.server.common.transport.adaptor.AdaptorException;
+import org.thingsboard.server.transport.mqtt.session.GatewaySessionCtx;
+
+import java.util.Optional;
+
+/**
+ * Created by ashvayka on 19.01.17.
+ */
+public interface MqttGatewayAdaptor  {
+
+    AdaptorToSessionActorMsg convertToActorMsg(GatewaySessionCtx ctx, MsgType type, MqttMessage inbound) throws AdaptorException;
+
+    Optional<MqttMessage> convertToAdaptorMsg(GatewaySessionCtx ctx, SessionActorToAdaptorMsg msg) throws AdaptorException;
+
+}
diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/MqttTransportAdaptor.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/MqttTransportAdaptor.java
index 0de11e4..7f8e1d7 100644
--- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/MqttTransportAdaptor.java
+++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/MqttTransportAdaptor.java
@@ -17,10 +17,10 @@ package org.thingsboard.server.transport.mqtt.adaptors;
 
 import io.netty.handler.codec.mqtt.MqttMessage;
 import org.thingsboard.server.common.transport.TransportAdaptor;
-import org.thingsboard.server.transport.mqtt.session.MqttSessionCtx;
+import org.thingsboard.server.transport.mqtt.session.DeviceSessionCtx;
 
 /**
  * @author Andrew Shvayka
  */
-public interface MqttTransportAdaptor extends TransportAdaptor<MqttSessionCtx, MqttMessage, MqttMessage> {
+public interface MqttTransportAdaptor extends TransportAdaptor<DeviceSessionCtx, MqttMessage, MqttMessage> {
 }
diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTopics.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTopics.java
new file mode 100644
index 0000000..4f91e1a
--- /dev/null
+++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTopics.java
@@ -0,0 +1,43 @@
+/**
+ * Copyright © 2016-2017 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.transport.mqtt;
+
+/**
+ * Created by ashvayka on 19.01.17.
+ */
+public class MqttTopics {
+
+    public static final String BASE_DEVICE_API_TOPIC = "v1/devices/me";
+    public static final String DEVICE_RPC_RESPONSE_TOPIC = BASE_DEVICE_API_TOPIC + "/rpc/response/";
+    public static final String DEVICE_RPC_RESPONSE_SUB_TOPIC = DEVICE_RPC_RESPONSE_TOPIC + "+";
+    public static final String DEVICE_RPC_REQUESTS_TOPIC = BASE_DEVICE_API_TOPIC + "/rpc/request/";
+    public static final String DEVICE_RPC_REQUESTS_SUB_TOPIC = DEVICE_RPC_REQUESTS_TOPIC + "+";
+    public static final String DEVICE_ATTRIBUTES_RESPONSE_TOPIC_PREFIX = BASE_DEVICE_API_TOPIC + "/attributes/response/";
+    public static final String DEVICE_ATTRIBUTES_RESPONSES_TOPIC = DEVICE_ATTRIBUTES_RESPONSE_TOPIC_PREFIX + "+";
+    public static final String DEVICE_ATTRIBUTES_REQUEST_TOPIC_PREFIX = BASE_DEVICE_API_TOPIC + "/attributes/request/";
+    public static final String DEVICE_TELEMETRY_TOPIC = BASE_DEVICE_API_TOPIC + "/telemetry";
+    public static final String DEVICE_ATTRIBUTES_TOPIC = BASE_DEVICE_API_TOPIC + "/attributes";
+
+    public static final String BASE_GATEWAY_API_TOPIC = "v1/gateway";
+    public static final String GATEWAY_CONNECT_TOPIC = "v1/gateway/connect";
+    public static final String GATEWAY_DISCONNECT_TOPIC = "v1/gateway/disconnect";
+    public static final String GATEWAY_ATTRIBUTES_TOPIC = "v1/gateway/attributes";
+    public static final String GATEWAY_TELEMETRY_TOPIC = "v1/gateway/telemetry";
+
+
+    private MqttTopics() {
+    }
+}
diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
index 30b37e3..fbc53ff 100644
--- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
+++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
@@ -15,26 +15,31 @@
  */
 package org.thingsboard.server.transport.mqtt;
 
+import com.fasterxml.jackson.databind.JsonNode;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
 import io.netty.handler.codec.mqtt.*;
+import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
+import io.netty.handler.codec.mqtt.MqttQoS;
 import io.netty.handler.ssl.SslHandler;
 import io.netty.util.concurrent.Future;
 import io.netty.util.concurrent.GenericFutureListener;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.util.StringUtils;
+import org.thingsboard.server.common.data.Device;
 import org.thingsboard.server.common.data.security.DeviceTokenCredentials;
 import org.thingsboard.server.common.data.security.DeviceX509Credentials;
 import org.thingsboard.server.common.msg.session.AdaptorToSessionActorMsg;
 import org.thingsboard.server.common.msg.session.BasicToDeviceActorSessionMsg;
-import org.thingsboard.server.common.msg.session.MsgType;
 import org.thingsboard.server.common.msg.session.ctrl.SessionCloseMsg;
 import org.thingsboard.server.common.transport.SessionMsgProcessor;
 import org.thingsboard.server.common.transport.adaptor.AdaptorException;
 import org.thingsboard.server.common.transport.auth.DeviceAuthService;
 import org.thingsboard.server.dao.EncryptionUtil;
+import org.thingsboard.server.dao.device.DeviceService;
 import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor;
-import org.thingsboard.server.transport.mqtt.session.MqttSessionCtx;
+import org.thingsboard.server.transport.mqtt.session.GatewaySessionCtx;
+import org.thingsboard.server.transport.mqtt.session.DeviceSessionCtx;
 import org.thingsboard.server.transport.mqtt.util.SslUtil;
 
 import javax.net.ssl.SSLPeerUnverifiedException;
@@ -42,35 +47,38 @@ import javax.security.cert.X509Certificate;
 import java.util.ArrayList;
 import java.util.List;
 
+import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.*;
+import static io.netty.handler.codec.mqtt.MqttMessageType.*;
+import static io.netty.handler.codec.mqtt.MqttQoS.*;
+import static org.thingsboard.server.common.msg.session.MsgType.*;
+import static org.thingsboard.server.transport.mqtt.MqttTopics.*;
+
 /**
  * @author Andrew Shvayka
  */
 @Slf4j
 public class MqttTransportHandler extends ChannelInboundHandlerAdapter implements GenericFutureListener<Future<? super Void>> {
 
-    public static final MqttQoS MAX_SUPPORTED_QOS_LVL = MqttQoS.AT_LEAST_ONCE;
-    public static final String BASE_TOPIC = "v1/devices/me";
-    public static final String ATTRIBUTES_TOPIC = BASE_TOPIC + "/attributes";
-    public static final String TELEMETRY_TOPIC = BASE_TOPIC + "/telemetry";
-    public static final String ATTRIBUTES_REQUEST_TOPIC_PREFIX = BASE_TOPIC + "/attributes/request/";
-    public static final String ATTRIBUTES_RESPONSE_TOPIC_PREFIX = BASE_TOPIC + "/attributes/response/";
-    public static final String ATTRIBUTES_RESPONSES_TOPIC = ATTRIBUTES_RESPONSE_TOPIC_PREFIX + "+";
-    public static final String RPC_REQUESTS_TOPIC = BASE_TOPIC + "/rpc/request/";
-    public static final String RPC_REQUESTS_SUB_TOPIC = RPC_REQUESTS_TOPIC + "+";
-    public static final String RPC_RESPONSE_TOPIC = BASE_TOPIC + "/rpc/response/";
-    public static final String RPC_RESPONSE_SUB_TOPIC = RPC_RESPONSE_TOPIC + "+";
-    private final MqttSessionCtx sessionCtx;
+    public static final MqttQoS MAX_SUPPORTED_QOS_LVL = AT_LEAST_ONCE;
+
+    private final DeviceSessionCtx deviceSessionCtx;
     private final String sessionId;
     private final MqttTransportAdaptor adaptor;
     private final SessionMsgProcessor processor;
+    private final DeviceService deviceService;
+    private final DeviceAuthService authService;
     private final SslHandler sslHandler;
+    private volatile boolean connected;
+    private volatile GatewaySessionCtx gatewaySessionCtx;
 
-    public MqttTransportHandler(SessionMsgProcessor processor, DeviceAuthService authService,
+    public MqttTransportHandler(SessionMsgProcessor processor, DeviceService deviceService, DeviceAuthService authService,
                                 MqttTransportAdaptor adaptor, SslHandler sslHandler) {
         this.processor = processor;
+        this.deviceService = deviceService;
+        this.authService = authService;
         this.adaptor = adaptor;
-        this.sessionCtx = new MqttSessionCtx(processor, authService, adaptor);
-        this.sessionId = sessionCtx.getSessionId().toUidStr();
+        this.deviceSessionCtx = new DeviceSessionCtx(processor, authService, adaptor);
+        this.sessionId = deviceSessionCtx.getSessionId().toUidStr();
         this.sslHandler = sslHandler;
     }
 
@@ -83,7 +91,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
     }
 
     private void processMqttMsg(ChannelHandlerContext ctx, MqttMessage msg) {
-        sessionCtx.setChannel(ctx);
+        deviceSessionCtx.setChannel(ctx);
         switch (msg.fixedHeader().messageType()) {
             case CONNECT:
                 processConnect(ctx, (MqttConnectMessage) msg);
@@ -98,36 +106,67 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
                 processUnsubscribe(ctx, (MqttUnsubscribeMessage) msg);
                 break;
             case PINGREQ:
-                ctx.writeAndFlush(new MqttMessage(new MqttFixedHeader(MqttMessageType.PINGRESP, false, MqttQoS.AT_MOST_ONCE, false, 0)));
+                if (checkConnected(ctx)) {
+                    ctx.writeAndFlush(new MqttMessage(new MqttFixedHeader(PINGRESP, false, AT_MOST_ONCE, false, 0)));
+                }
                 break;
             case DISCONNECT:
-                processDisconnect(ctx);
+                if (checkConnected(ctx)) {
+                    processDisconnect(ctx);
+                }
                 break;
         }
     }
 
     private void processPublish(ChannelHandlerContext ctx, MqttPublishMessage mqttMsg) {
+        if (!checkConnected(ctx)) {
+            return;
+        }
         String topicName = mqttMsg.variableHeader().topicName();
         int msgId = mqttMsg.variableHeader().messageId();
         log.trace("[{}] Processing publish msg [{}][{}]!", sessionId, topicName, msgId);
+
+        if (topicName.startsWith(BASE_GATEWAY_API_TOPIC)) {
+            if (gatewaySessionCtx != null) {
+                gatewaySessionCtx.setChannel(ctx);
+                try {
+                    if (topicName.equals(GATEWAY_TELEMETRY_TOPIC)) {
+                        gatewaySessionCtx.onDeviceTelemetry(mqttMsg);
+                    } else if (topicName.equals(GATEWAY_ATTRIBUTES_TOPIC)) {
+                        gatewaySessionCtx.onDeviceAttributes(mqttMsg);
+                    } else if (topicName.equals(GATEWAY_CONNECT_TOPIC)) {
+                        gatewaySessionCtx.onDeviceConnect(mqttMsg);
+                    } else if (topicName.equals(GATEWAY_DISCONNECT_TOPIC)) {
+                        gatewaySessionCtx.onDeviceDisconnect(mqttMsg);
+                    }
+                } catch (RuntimeException | AdaptorException e) {
+                    log.warn("[{}] Failed to process publish msg [{}][{}]", sessionId, topicName, msgId, e);
+                }
+            }
+        } else {
+            processDevicePublish(ctx, mqttMsg, topicName, msgId);
+        }
+    }
+
+    private void processDevicePublish(ChannelHandlerContext ctx, MqttPublishMessage mqttMsg, String topicName, int msgId) {
         AdaptorToSessionActorMsg msg = null;
         try {
-            if (topicName.equals(ATTRIBUTES_TOPIC)) {
-                msg = adaptor.convertToActorMsg(sessionCtx, MsgType.POST_ATTRIBUTES_REQUEST, mqttMsg);
-            } else if (topicName.equals(TELEMETRY_TOPIC)) {
-                msg = adaptor.convertToActorMsg(sessionCtx, MsgType.POST_TELEMETRY_REQUEST, mqttMsg);
-            } else if (topicName.startsWith(ATTRIBUTES_REQUEST_TOPIC_PREFIX)) {
-                msg = adaptor.convertToActorMsg(sessionCtx, MsgType.GET_ATTRIBUTES_REQUEST, mqttMsg);
+            if (topicName.equals(DEVICE_TELEMETRY_TOPIC)) {
+                msg = adaptor.convertToActorMsg(deviceSessionCtx, POST_TELEMETRY_REQUEST, mqttMsg);
+            } else if (topicName.equals(DEVICE_ATTRIBUTES_TOPIC)) {
+                msg = adaptor.convertToActorMsg(deviceSessionCtx, POST_ATTRIBUTES_REQUEST, mqttMsg);
+            } else if (topicName.startsWith(DEVICE_ATTRIBUTES_REQUEST_TOPIC_PREFIX)) {
+                msg = adaptor.convertToActorMsg(deviceSessionCtx, GET_ATTRIBUTES_REQUEST, mqttMsg);
                 if (msgId >= 0) {
                     ctx.writeAndFlush(createMqttPubAckMsg(msgId));
                 }
-            } else if (topicName.startsWith(RPC_RESPONSE_TOPIC)) {
-                msg = adaptor.convertToActorMsg(sessionCtx, MsgType.TO_DEVICE_RPC_RESPONSE, mqttMsg);
+            } else if (topicName.startsWith(DEVICE_RPC_RESPONSE_TOPIC)) {
+                msg = adaptor.convertToActorMsg(deviceSessionCtx, TO_DEVICE_RPC_RESPONSE, mqttMsg);
                 if (msgId >= 0) {
                     ctx.writeAndFlush(createMqttPubAckMsg(msgId));
                 }
-            } else if (topicName.startsWith(RPC_REQUESTS_TOPIC)) {
-                msg = adaptor.convertToActorMsg(sessionCtx, MsgType.TO_SERVER_RPC_REQUEST, mqttMsg);
+            } else if (topicName.startsWith(DEVICE_RPC_REQUESTS_TOPIC)) {
+                msg = adaptor.convertToActorMsg(deviceSessionCtx, TO_SERVER_RPC_REQUEST, mqttMsg);
                 if (msgId >= 0) {
                     ctx.writeAndFlush(createMqttPubAckMsg(msgId));
                 }
@@ -135,60 +174,65 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
         } catch (AdaptorException e) {
             log.warn("[{}] Failed to process publish msg [{}][{}]", sessionId, topicName, msgId, e);
         }
-
         if (msg != null) {
-            processor.process(new BasicToDeviceActorSessionMsg(sessionCtx.getDevice(), msg));
+            processor.process(new BasicToDeviceActorSessionMsg(deviceSessionCtx.getDevice(), msg));
         } else {
-            log.warn("[{}] Closing current session due to invalid publish msg [{}][{}]", sessionId, topicName, msgId);
+            log.info("[{}] Closing current session due to invalid publish msg [{}][{}]", sessionId, topicName, msgId);
             ctx.close();
         }
     }
 
     private void processSubscribe(ChannelHandlerContext ctx, MqttSubscribeMessage mqttMsg) {
-        log.info("[{}] Processing subscription [{}]!", sessionId, mqttMsg.variableHeader().messageId());
+        if (!checkConnected(ctx)) {
+            return;
+        }
+        log.trace("[{}] Processing subscription [{}]!", sessionId, mqttMsg.variableHeader().messageId());
         List<Integer> grantedQoSList = new ArrayList<>();
         for (MqttTopicSubscription subscription : mqttMsg.payload().topicSubscriptions()) {
             String topicName = subscription.topicName();
             //TODO: handle this qos level.
             MqttQoS reqQoS = subscription.qualityOfService();
             try {
-                if (topicName.equals(ATTRIBUTES_TOPIC)) {
-                    AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(sessionCtx, MsgType.SUBSCRIBE_ATTRIBUTES_REQUEST, mqttMsg);
-                    processor.process(new BasicToDeviceActorSessionMsg(sessionCtx.getDevice(), msg));
+                if (topicName.equals(DEVICE_ATTRIBUTES_TOPIC)) {
+                    AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, SUBSCRIBE_ATTRIBUTES_REQUEST, mqttMsg);
+                    processor.process(new BasicToDeviceActorSessionMsg(deviceSessionCtx.getDevice(), msg));
                     grantedQoSList.add(getMinSupportedQos(reqQoS));
-                } else if (topicName.equals(RPC_REQUESTS_SUB_TOPIC)) {
-                    AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(sessionCtx, MsgType.SUBSCRIBE_RPC_COMMANDS_REQUEST, mqttMsg);
-                    processor.process(new BasicToDeviceActorSessionMsg(sessionCtx.getDevice(), msg));
+                } else if (topicName.equals(DEVICE_RPC_REQUESTS_SUB_TOPIC)) {
+                    AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, SUBSCRIBE_RPC_COMMANDS_REQUEST, mqttMsg);
+                    processor.process(new BasicToDeviceActorSessionMsg(deviceSessionCtx.getDevice(), msg));
                     grantedQoSList.add(getMinSupportedQos(reqQoS));
-                } else if (topicName.equals(RPC_RESPONSE_SUB_TOPIC)) {
+                } else if (topicName.equals(DEVICE_RPC_RESPONSE_SUB_TOPIC)) {
                     grantedQoSList.add(getMinSupportedQos(reqQoS));
-                } else if (topicName.equals(ATTRIBUTES_RESPONSES_TOPIC)) {
-                    sessionCtx.setAllowAttributeResponses();
+                } else if (topicName.equals(DEVICE_ATTRIBUTES_RESPONSES_TOPIC)) {
+                    deviceSessionCtx.setAllowAttributeResponses();
                     grantedQoSList.add(getMinSupportedQos(reqQoS));
                 } else {
                     log.warn("[{}] Failed to subscribe to [{}][{}]", sessionId, topicName, reqQoS);
-                    grantedQoSList.add(MqttQoS.FAILURE.value());
+                    grantedQoSList.add(FAILURE.value());
                 }
             } catch (AdaptorException e) {
                 log.warn("[{}] Failed to subscribe to [{}][{}]", sessionId, topicName, reqQoS);
-                grantedQoSList.add(MqttQoS.FAILURE.value());
+                grantedQoSList.add(FAILURE.value());
             }
         }
         ctx.writeAndFlush(createSubAckMessage(mqttMsg.variableHeader().messageId(), grantedQoSList));
     }
 
     private void processUnsubscribe(ChannelHandlerContext ctx, MqttUnsubscribeMessage mqttMsg) {
-        log.info("[{}] Processing subscription [{}]!", sessionId, mqttMsg.variableHeader().messageId());
+        if (!checkConnected(ctx)) {
+            return;
+        }
+        log.trace("[{}] Processing subscription [{}]!", sessionId, mqttMsg.variableHeader().messageId());
         for (String topicName : mqttMsg.payload().topics()) {
             try {
-                if (topicName.equals(ATTRIBUTES_TOPIC)) {
-                    AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(sessionCtx, MsgType.UNSUBSCRIBE_ATTRIBUTES_REQUEST, mqttMsg);
-                    processor.process(new BasicToDeviceActorSessionMsg(sessionCtx.getDevice(), msg));
-                } else if (topicName.equals(RPC_REQUESTS_SUB_TOPIC)) {
-                    AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(sessionCtx, MsgType.UNSUBSCRIBE_RPC_COMMANDS_REQUEST, mqttMsg);
-                    processor.process(new BasicToDeviceActorSessionMsg(sessionCtx.getDevice(), msg));
-                } else if (topicName.equals(ATTRIBUTES_RESPONSES_TOPIC)) {
-                    sessionCtx.setDisallowAttributeResponses();
+                if (topicName.equals(DEVICE_ATTRIBUTES_TOPIC)) {
+                    AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, UNSUBSCRIBE_ATTRIBUTES_REQUEST, mqttMsg);
+                    processor.process(new BasicToDeviceActorSessionMsg(deviceSessionCtx.getDevice(), msg));
+                } else if (topicName.equals(DEVICE_RPC_REQUESTS_SUB_TOPIC)) {
+                    AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, UNSUBSCRIBE_RPC_COMMANDS_REQUEST, mqttMsg);
+                    processor.process(new BasicToDeviceActorSessionMsg(deviceSessionCtx.getDevice(), msg));
+                } else if (topicName.equals(DEVICE_ATTRIBUTES_RESPONSES_TOPIC)) {
+                    deviceSessionCtx.setDisallowAttributeResponses();
                 }
             } catch (AdaptorException e) {
                 log.warn("[{}] Failed to process unsubscription [{}] to [{}]", sessionId, mqttMsg.variableHeader().messageId(), topicName);
@@ -199,7 +243,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
 
     private MqttMessage createUnSubAckMessage(int msgId) {
         MqttFixedHeader mqttFixedHeader =
-                new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_LEAST_ONCE, false, 0);
+                new MqttFixedHeader(SUBACK, false, AT_LEAST_ONCE, false, 0);
         MqttMessageIdVariableHeader mqttMessageIdVariableHeader = MqttMessageIdVariableHeader.from(msgId);
         return new MqttMessage(mqttFixedHeader, mqttMessageIdVariableHeader);
     }
@@ -217,13 +261,15 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
     private void processAuthTokenConnect(ChannelHandlerContext ctx, MqttConnectMessage msg) {
         String userName = msg.payload().userName();
         if (StringUtils.isEmpty(userName)) {
-            ctx.writeAndFlush(createMqttConnAckMsg(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD));
+            ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD));
             ctx.close();
-        } else if (sessionCtx.login(new DeviceTokenCredentials(msg.payload().userName()))) {
-            ctx.writeAndFlush(createMqttConnAckMsg(MqttConnectReturnCode.CONNECTION_ACCEPTED));
-        } else {
-            ctx.writeAndFlush(createMqttConnAckMsg(MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED));
+        } else if (!deviceSessionCtx.login(new DeviceTokenCredentials(msg.payload().userName()))) {
+            ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_NOT_AUTHORIZED));
             ctx.close();
+        } else {
+            ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED));
+            connected = true;
+            checkGatewaySession();
         }
     }
 
@@ -231,14 +277,16 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
         try {
             String strCert = SslUtil.getX509CertificateString(cert);
             String sha3Hash = EncryptionUtil.getSha3Hash(strCert);
-            if (sessionCtx.login(new DeviceX509Credentials(sha3Hash))) {
-                ctx.writeAndFlush(createMqttConnAckMsg(MqttConnectReturnCode.CONNECTION_ACCEPTED));
+            if (deviceSessionCtx.login(new DeviceX509Credentials(sha3Hash))) {
+                ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED));
+                connected = true;
+                checkGatewaySession();
             } else {
-                ctx.writeAndFlush(createMqttConnAckMsg(MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED));
+                ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_NOT_AUTHORIZED));
                 ctx.close();
             }
         } catch (Exception e) {
-            ctx.writeAndFlush(createMqttConnAckMsg(MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED));
+            ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_NOT_AUTHORIZED));
             ctx.close();
         }
     }
@@ -258,11 +306,15 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
 
     private void processDisconnect(ChannelHandlerContext ctx) {
         ctx.close();
+        processor.process(SessionCloseMsg.onDisconnect(deviceSessionCtx.getSessionId()));
+        if (gatewaySessionCtx != null) {
+            gatewaySessionCtx.onGatewayDisconnect();
+        }
     }
 
     private MqttConnAckMessage createMqttConnAckMsg(MqttConnectReturnCode returnCode) {
         MqttFixedHeader mqttFixedHeader =
-                new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
+                new MqttFixedHeader(CONNACK, false, AT_MOST_ONCE, false, 0);
         MqttConnAckVariableHeader mqttConnAckVariableHeader =
                 new MqttConnAckVariableHeader(returnCode, true);
         return new MqttConnAckMessage(mqttFixedHeader, mqttConnAckVariableHeader);
@@ -281,7 +333,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
 
     private static MqttSubAckMessage createSubAckMessage(Integer msgId, List<Integer> grantedQoSList) {
         MqttFixedHeader mqttFixedHeader =
-                new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_LEAST_ONCE, false, 0);
+                new MqttFixedHeader(SUBACK, false, AT_LEAST_ONCE, false, 0);
         MqttMessageIdVariableHeader mqttMessageIdVariableHeader = MqttMessageIdVariableHeader.from(msgId);
         MqttSubAckPayload mqttSubAckPayload = new MqttSubAckPayload(grantedQoSList);
         return new MqttSubAckMessage(mqttFixedHeader, mqttMessageIdVariableHeader, mqttSubAckPayload);
@@ -293,14 +345,35 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
 
     public static MqttPubAckMessage createMqttPubAckMsg(int requestId) {
         MqttFixedHeader mqttFixedHeader =
-                new MqttFixedHeader(MqttMessageType.PUBACK, false, MqttQoS.AT_LEAST_ONCE, false, 0);
+                new MqttFixedHeader(PUBACK, false, AT_LEAST_ONCE, false, 0);
         MqttMessageIdVariableHeader mqttMsgIdVariableHeader =
                 MqttMessageIdVariableHeader.from(requestId);
         return new MqttPubAckMessage(mqttFixedHeader, mqttMsgIdVariableHeader);
     }
 
+    private boolean checkConnected(ChannelHandlerContext ctx) {
+        if (connected) {
+            return true;
+        } else {
+            log.info("[{}] Closing current session due to invalid msg order [{}][{}]", sessionId);
+            ctx.close();
+            return false;
+        }
+    }
+
+    private void checkGatewaySession() {
+        Device device = deviceSessionCtx.getDevice();
+        JsonNode infoNode = device.getAdditionalInfo();
+        if (infoNode != null) {
+            JsonNode gatewayNode = infoNode.get("gateway");
+            if (gatewayNode != null && gatewayNode.asBoolean()) {
+                gatewaySessionCtx = new GatewaySessionCtx(processor, deviceService, authService, deviceSessionCtx);
+            }
+        }
+    }
+
     @Override
     public void operationComplete(Future<? super Void> future) throws Exception {
-        processor.process(SessionCloseMsg.onError(sessionCtx.getSessionId()));
+        processor.process(SessionCloseMsg.onError(deviceSessionCtx.getSessionId()));
     }
 }
diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportServerInitializer.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportServerInitializer.java
index 323ed1e..9444cdd 100644
--- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportServerInitializer.java
+++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportServerInitializer.java
@@ -29,6 +29,7 @@ import io.netty.handler.ssl.util.SelfSignedCertificate;
 import org.springframework.beans.factory.annotation.Value;
 import org.thingsboard.server.common.transport.SessionMsgProcessor;
 import org.thingsboard.server.common.transport.auth.DeviceAuthService;
+import org.thingsboard.server.dao.device.DeviceService;
 import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor;
 
 import javax.net.ssl.SSLException;
@@ -40,13 +41,15 @@ import java.security.cert.CertificateException;
 public class MqttTransportServerInitializer extends ChannelInitializer<SocketChannel> {
 
     private final SessionMsgProcessor processor;
+    private final DeviceService deviceService;
     private final DeviceAuthService authService;
     private final MqttTransportAdaptor adaptor;
     private final MqttSslHandlerProvider sslHandlerProvider;
 
-    public MqttTransportServerInitializer(SessionMsgProcessor processor, DeviceAuthService authService, MqttTransportAdaptor adaptor,
+    public MqttTransportServerInitializer(SessionMsgProcessor processor, DeviceService deviceService, DeviceAuthService authService, MqttTransportAdaptor adaptor,
                                           MqttSslHandlerProvider sslHandlerProvider) {
         this.processor = processor;
+        this.deviceService = deviceService;
         this.authService = authService;
         this.adaptor = adaptor;
         this.sslHandlerProvider = sslHandlerProvider;
@@ -63,7 +66,7 @@ public class MqttTransportServerInitializer extends ChannelInitializer<SocketCha
         pipeline.addLast("decoder", new MqttDecoder());
         pipeline.addLast("encoder", MqttEncoder.INSTANCE);
 
-        MqttTransportHandler handler = new MqttTransportHandler(processor, authService, adaptor, sslHandler);
+        MqttTransportHandler handler = new MqttTransportHandler(processor, deviceService, authService, adaptor, sslHandler);
         pipeline.addLast(handler);
         ch.closeFuture().addListener(handler);
     }
diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportService.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportService.java
index a74cb0f..8710809 100644
--- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportService.java
+++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportService.java
@@ -17,13 +17,9 @@ package org.thingsboard.server.transport.mqtt;
 
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.channel.Channel;
-import io.netty.channel.ChannelHandler;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.socket.nio.NioServerSocketChannel;
-import io.netty.handler.logging.LogLevel;
-import io.netty.handler.logging.LoggingHandler;
-import io.netty.handler.ssl.SslHandler;
 import io.netty.util.ResourceLeakDetector;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -32,12 +28,11 @@ import org.springframework.context.ApplicationContext;
 import org.springframework.stereotype.Service;
 import org.thingsboard.server.common.transport.SessionMsgProcessor;
 import org.thingsboard.server.common.transport.auth.DeviceAuthService;
+import org.thingsboard.server.dao.device.DeviceService;
 import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor;
 
 import javax.annotation.PostConstruct;
 import javax.annotation.PreDestroy;
-import javax.net.ssl.SSLEngine;
-import java.util.concurrent.Executor;
 
 /**
  * @author Andrew Shvayka
@@ -56,6 +51,9 @@ public class MqttTransportService {
     private SessionMsgProcessor processor;
 
     @Autowired(required = false)
+    private DeviceService deviceService;
+
+    @Autowired(required = false)
     private DeviceAuthService authService;
 
     @Autowired(required = false)
@@ -97,7 +95,7 @@ public class MqttTransportService {
         ServerBootstrap b = new ServerBootstrap();
         b.group(bossGroup, workerGroup)
                 .channel(NioServerSocketChannel.class)
-                .childHandler(new MqttTransportServerInitializer(processor, authService, adaptor, sslHandlerProvider));
+                .childHandler(new MqttTransportServerInitializer(processor, deviceService, authService, adaptor, sslHandlerProvider));
 
         serverChannel = b.bind(host, port).sync().channel();
         log.info("Mqtt transport started!");
diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionCtx.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionCtx.java
new file mode 100644
index 0000000..9c4bacf
--- /dev/null
+++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionCtx.java
@@ -0,0 +1,95 @@
+/**
+ * Copyright © 2016-2017 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.transport.mqtt.session;
+
+import io.netty.handler.codec.mqtt.MqttMessage;
+import org.thingsboard.server.common.data.Device;
+import org.thingsboard.server.common.data.id.SessionId;
+import org.thingsboard.server.common.msg.core.ResponseMsg;
+import org.thingsboard.server.common.msg.session.*;
+import org.thingsboard.server.common.msg.session.ex.SessionException;
+import org.thingsboard.server.common.transport.session.DeviceAwareSessionContext;
+import org.thingsboard.server.transport.mqtt.MqttTransportHandler;
+
+import java.util.Optional;
+
+/**
+ * Created by ashvayka on 19.01.17.
+ */
+public class GatewayDeviceSessionCtx extends DeviceAwareSessionContext {
+
+    private GatewaySessionCtx parent;
+    private final MqttSessionId sessionId;
+    private volatile boolean closed;
+
+    public GatewayDeviceSessionCtx(GatewaySessionCtx parent, Device device) {
+        super(parent.getProcessor(), parent.getAuthService(), device);
+        this.parent = parent;
+        this.sessionId = new MqttSessionId();
+    }
+
+    @Override
+    public SessionId getSessionId() {
+        return sessionId;
+    }
+
+    @Override
+    public SessionType getSessionType() {
+        return SessionType.ASYNC;
+    }
+
+    @Override
+    public void onMsg(SessionActorToAdaptorMsg sessionMsg) throws SessionException {
+        Optional<MqttMessage> message = getToDeviceMsg(sessionMsg);
+        message.ifPresent(parent::writeAndFlush);
+    }
+
+    private Optional<MqttMessage> getToDeviceMsg(SessionActorToAdaptorMsg sessionMsg) {
+        ToDeviceMsg msg = sessionMsg.getMsg();
+        switch (msg.getMsgType()) {
+            case STATUS_CODE_RESPONSE:
+                ResponseMsg<?> responseMsg = (ResponseMsg) msg;
+                if (responseMsg.isSuccess()) {
+                    MsgType requestMsgType = responseMsg.getRequestMsgType();
+                    Integer requestId = responseMsg.getRequestId();
+                    if (requestMsgType == MsgType.POST_ATTRIBUTES_REQUEST || requestMsgType == MsgType.POST_TELEMETRY_REQUEST) {
+                        return Optional.of(MqttTransportHandler.createMqttPubAckMsg(requestId));
+                    }
+                }
+                break;
+        }
+        return Optional.empty();
+    }
+
+    @Override
+    public void onMsg(SessionCtrlMsg msg) throws SessionException {
+
+    }
+
+    @Override
+    public boolean isClosed() {
+        return closed;
+    }
+
+    public void setClosed(boolean closed) {
+        this.closed = closed;
+    }
+
+    @Override
+    public long getTimeout() {
+        return 0;
+    }
+}
diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java
new file mode 100644
index 0000000..2badd3a
--- /dev/null
+++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java
@@ -0,0 +1,191 @@
+/**
+ * Copyright © 2016-2017 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.transport.mqtt.session;
+
+import com.google.gson.*;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.UnpooledByteBufAllocator;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.mqtt.MqttMessage;
+import io.netty.handler.codec.mqtt.MqttPublishMessage;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.util.StringUtils;
+import org.thingsboard.server.common.data.Device;
+import org.thingsboard.server.common.data.id.SessionId;
+import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
+import org.thingsboard.server.common.msg.core.BasicTelemetryUploadRequest;
+import org.thingsboard.server.common.msg.core.BasicUpdateAttributesRequest;
+import org.thingsboard.server.common.msg.core.TelemetryUploadRequest;
+import org.thingsboard.server.common.msg.session.BasicAdaptorToSessionActorMsg;
+import org.thingsboard.server.common.msg.session.BasicToDeviceActorSessionMsg;
+import org.thingsboard.server.common.msg.session.ctrl.SessionCloseMsg;
+import org.thingsboard.server.common.transport.SessionMsgProcessor;
+import org.thingsboard.server.common.transport.adaptor.AdaptorException;
+import org.thingsboard.server.common.transport.adaptor.JsonConverter;
+import org.thingsboard.server.common.transport.auth.DeviceAuthService;
+import org.thingsboard.server.dao.device.DeviceService;
+import org.thingsboard.server.transport.mqtt.MqttTransportHandler;
+import org.thingsboard.server.transport.mqtt.adaptors.JsonMqttAdaptor;
+
+import java.nio.charset.Charset;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import static org.thingsboard.server.transport.mqtt.adaptors.JsonMqttAdaptor.validateJsonPayload;
+
+/**
+ * Created by ashvayka on 19.01.17.
+ */
+@Slf4j
+public class GatewaySessionCtx {
+
+    private static final Gson GSON = new Gson();
+    private static final Charset UTF8 = Charset.forName("UTF-8");
+    private static final ByteBufAllocator ALLOCATOR = new UnpooledByteBufAllocator(false);
+
+    private final Device gateway;
+    private final SessionId gatewaySessionId;
+    private final SessionMsgProcessor processor;
+    private final DeviceService deviceService;
+    private final DeviceAuthService authService;
+    private final Map<String, GatewayDeviceSessionCtx> devices;
+    private ChannelHandlerContext channel;
+
+    public GatewaySessionCtx(SessionMsgProcessor processor, DeviceService deviceService, DeviceAuthService authService, DeviceSessionCtx gatewaySessionCtx) {
+        this.processor = processor;
+        this.deviceService = deviceService;
+        this.authService = authService;
+        this.gateway = gatewaySessionCtx.getDevice();
+        this.gatewaySessionId = gatewaySessionCtx.getSessionId();
+        this.devices = new HashMap<>();
+    }
+
+    public void onDeviceConnect(MqttPublishMessage msg) throws AdaptorException {
+        String deviceName = checkDeviceName(getDeviceName(msg));
+        Optional<Device> deviceOpt = deviceService.findDeviceByTenantIdAndName(gateway.getTenantId(), deviceName);
+        Device device = deviceOpt.orElseGet(() -> {
+            Device newDevice = new Device();
+            newDevice.setTenantId(gateway.getTenantId());
+            newDevice.setName(deviceName);
+            return deviceService.saveDevice(newDevice);
+        });
+        devices.put(deviceName, new GatewayDeviceSessionCtx(this, device));
+        ack(msg);
+    }
+
+    public void onDeviceDisconnect(MqttPublishMessage msg) throws AdaptorException {
+        String deviceName = checkDeviceName(getDeviceName(msg));
+        GatewayDeviceSessionCtx deviceSessionCtx = devices.remove(deviceName);
+        deviceSessionCtx.setClosed(true);
+        ack(msg);
+    }
+
+    public void onGatewayDisconnect() {
+        devices.forEach((k, v) -> {
+            processor.process(SessionCloseMsg.onDisconnect(v.getSessionId()));
+        });
+    }
+
+    public void onDeviceTelemetry(MqttPublishMessage mqttMsg) throws AdaptorException {
+        JsonElement json = validateJsonPayload(gatewaySessionId, mqttMsg.payload());
+        int requestId = mqttMsg.variableHeader().messageId();
+        if (json.isJsonObject()) {
+            JsonObject jsonObj = json.getAsJsonObject();
+            for (Map.Entry<String, JsonElement> deviceEntry : jsonObj.entrySet()) {
+                String deviceName = checkDeviceConnected(deviceEntry.getKey());
+                if (!deviceEntry.getValue().isJsonArray()) {
+                    throw new JsonSyntaxException("Can't parse value: " + json);
+                }
+                BasicTelemetryUploadRequest request = new BasicTelemetryUploadRequest(requestId);
+                JsonArray deviceData = deviceEntry.getValue().getAsJsonArray();
+                for (JsonElement element : deviceData) {
+                    JsonConverter.parseWithTs(request, element.getAsJsonObject());
+                }
+                GatewayDeviceSessionCtx deviceSessionCtx = devices.get(deviceName);
+                processor.process(new BasicToDeviceActorSessionMsg(deviceSessionCtx.getDevice(),
+                        new BasicAdaptorToSessionActorMsg(deviceSessionCtx, request)));
+            }
+        } else {
+            throw new JsonSyntaxException("Can't parse value: " + json);
+        }
+    }
+
+    public void onDeviceAttributes(MqttPublishMessage mqttMsg) throws AdaptorException {
+        JsonElement json = validateJsonPayload(gatewaySessionId, mqttMsg.payload());
+        int requestId = mqttMsg.variableHeader().messageId();
+        if (json.isJsonObject()) {
+            JsonObject jsonObj = json.getAsJsonObject();
+            for (Map.Entry<String, JsonElement> deviceEntry : jsonObj.entrySet()) {
+                String deviceName = checkDeviceConnected(deviceEntry.getKey());
+                if (!deviceEntry.getValue().isJsonObject()) {
+                    throw new JsonSyntaxException("Can't parse value: " + json);
+                }
+                long ts = System.currentTimeMillis();
+                BasicUpdateAttributesRequest request = new BasicUpdateAttributesRequest(requestId);
+                JsonObject deviceData = deviceEntry.getValue().getAsJsonObject();
+                request.add(JsonConverter.parseValues(deviceData).stream().map(kv -> new BaseAttributeKvEntry(kv, ts)).collect(Collectors.toList()));
+                GatewayDeviceSessionCtx deviceSessionCtx = devices.get(deviceName);
+                processor.process(new BasicToDeviceActorSessionMsg(deviceSessionCtx.getDevice(),
+                        new BasicAdaptorToSessionActorMsg(deviceSessionCtx, request)));
+            }
+        } else {
+            throw new JsonSyntaxException("Can't parse value: " + json);
+        }
+    }
+
+    private String checkDeviceConnected(String deviceName) {
+        if (!devices.containsKey(deviceName)) {
+            throw new RuntimeException("Device is not connected!");
+        } else {
+            return deviceName;
+        }
+    }
+
+    private String checkDeviceName(String deviceName) {
+        if (StringUtils.isEmpty(deviceName)) {
+            throw new RuntimeException("Device name is empty!");
+        } else {
+            return deviceName;
+        }
+    }
+
+    private String getDeviceName(MqttPublishMessage mqttMsg) throws AdaptorException {
+        JsonElement json = JsonMqttAdaptor.validateJsonPayload(gatewaySessionId, mqttMsg.payload());
+        return json.getAsJsonObject().get("device").getAsString();
+    }
+
+    protected SessionMsgProcessor getProcessor() {
+        return processor;
+    }
+
+    protected DeviceAuthService getAuthService() {
+        return authService;
+    }
+
+    public void setChannel(ChannelHandlerContext channel) {
+        this.channel = channel;
+    }
+
+    private void ack(MqttPublishMessage msg) {
+        writeAndFlush(MqttTransportHandler.createMqttPubAckMsg(msg.variableHeader().messageId()));
+    }
+
+    protected void writeAndFlush(MqttMessage mqttMessage) {
+        channel.writeAndFlush(mqttMessage);
+    }
+}