thingsboard-aplcache

TB-33: Implementation

1/20/2017 1:12:48 PM

Details

diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml
index 2889148..f150574 100644
--- a/application/src/main/resources/thingsboard.yml
+++ b/application/src/main/resources/thingsboard.yml
@@ -76,10 +76,10 @@ mqtt:
   adaptor: "${MQTT_ADAPTOR_NAME:JsonMqttAdaptor}"
   timeout: "${MQTT_TIMEOUT:10000}"
 # Uncomment the following lines to enable ssl for MQTT
-  ssl:
-    key_store: keystore/mqttserver.jks
-    key_store_password: password
-    key_store_type: JKS
+#  ssl:
+#    key_store: keystore/mqttserver.jks
+#    key_store_password: password
+#    key_store_type: JKS
 
 # CoAP server parameters
 coap:
diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/session/ctrl/SessionCloseMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/session/ctrl/SessionCloseMsg.java
index 6d957ce..c7baaaf 100644
--- a/common/message/src/main/java/org/thingsboard/server/common/msg/session/ctrl/SessionCloseMsg.java
+++ b/common/message/src/main/java/org/thingsboard/server/common/msg/session/ctrl/SessionCloseMsg.java
@@ -24,6 +24,10 @@ public class SessionCloseMsg implements SessionCtrlMsg {
     private final boolean revoked;
     private final boolean timeout;
 
+    public static SessionCloseMsg onDisconnect(SessionId sessionId) {
+        return new SessionCloseMsg(sessionId, false, false);
+    }
+
     public static SessionCloseMsg onError(SessionId sessionId) {
         return new SessionCloseMsg(sessionId, false, false);
     }
diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/session/SessionContext.java b/common/message/src/main/java/org/thingsboard/server/common/msg/session/SessionContext.java
index 4a39225..0b138df 100644
--- a/common/message/src/main/java/org/thingsboard/server/common/msg/session/SessionContext.java
+++ b/common/message/src/main/java/org/thingsboard/server/common/msg/session/SessionContext.java
@@ -27,8 +27,6 @@ public interface SessionContext extends SessionAwareMsg {
 
     void onMsg(SessionCtrlMsg msg) throws SessionException;
 
-    void onError(SessionException e);
-
     boolean isClosed();
 
     long getTimeout();
diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/adaptor/JsonConverter.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/adaptor/JsonConverter.java
index 93e764e..640cce7 100644
--- a/common/transport/src/main/java/org/thingsboard/server/common/transport/adaptor/JsonConverter.java
+++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/adaptor/JsonConverter.java
@@ -74,7 +74,7 @@ public class JsonConverter {
         }
     }
 
-    private static void parseWithTs(BasicTelemetryUploadRequest request, JsonObject jo) {
+    public static void parseWithTs(BasicTelemetryUploadRequest request, JsonObject jo) {
         long ts = jo.get("ts").getAsLong();
         JsonObject valuesObject = jo.get("values").getAsJsonObject();
         for (KvEntry entry : parseValues(valuesObject)) {
@@ -82,7 +82,7 @@ public class JsonConverter {
         }
     }
 
-    private static List<KvEntry> parseValues(JsonObject valuesObject) {
+    public static List<KvEntry> parseValues(JsonObject valuesObject) {
         List<KvEntry> result = new ArrayList<>();
         for (Entry<String, JsonElement> valueEntry : valuesObject.entrySet()) {
             JsonElement element = valueEntry.getValue();
diff --git a/transport/coap/src/main/java/org/thingsboard/server/transport/coap/session/CoapSessionCtx.java b/transport/coap/src/main/java/org/thingsboard/server/transport/coap/session/CoapSessionCtx.java
index e9b8e22..cecc42d 100644
--- a/transport/coap/src/main/java/org/thingsboard/server/transport/coap/session/CoapSessionCtx.java
+++ b/transport/coap/src/main/java/org/thingsboard/server/transport/coap/session/CoapSessionCtx.java
@@ -96,17 +96,6 @@ public class CoapSessionCtx extends DeviceAwareSessionContext {
     }
 
     @Override
-    public void onError(SessionException e) {
-        if (e instanceof SessionAuthException) {
-            log.warn("[{}] onError: {}", sessionId, e.getMessage());
-            exchange.respond(ResponseCode.UNAUTHORIZED);
-        } else {
-            log.warn("[{}] onError: {}", sessionId, e.getMessage(), e);
-            exchange.respond(ResponseCode.BAD_REQUEST);
-        }
-    }
-
-    @Override
     public SessionId getSessionId() {
         return sessionId;
     }
diff --git a/transport/http/src/main/java/org/thingsboard/server/transport/http/session/HttpSessionCtx.java b/transport/http/src/main/java/org/thingsboard/server/transport/http/session/HttpSessionCtx.java
index efaa0cd..4bee595 100644
--- a/transport/http/src/main/java/org/thingsboard/server/transport/http/session/HttpSessionCtx.java
+++ b/transport/http/src/main/java/org/thingsboard/server/transport/http/session/HttpSessionCtx.java
@@ -141,11 +141,6 @@ public class HttpSessionCtx extends DeviceAwareSessionContext {
     }
 
     @Override
-    public void onError(SessionException e) {
-
-    }
-
-    @Override
     public boolean isClosed() {
         return false;
     }
diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttAdaptor.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttAdaptor.java
index ae49cd5..bf033dc 100644
--- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttAdaptor.java
+++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttAdaptor.java
@@ -248,7 +248,7 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor {
         try {
             String payload = payloadData.toString(UTF8);
             if (payload == null) {
-                log.warn("[{}] Payload is empty!", sessionId);
+                log.warn("[{}] Payload is empty!", sessionId.toUidStr());
                 throw new AdaptorException(new IllegalArgumentException("Payload is empty!"));
             }
             return payload;
diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
index e744955..fbc53ff 100644
--- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
+++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
@@ -16,7 +16,6 @@
 package org.thingsboard.server.transport.mqtt;
 
 import com.fasterxml.jackson.databind.JsonNode;
-import com.google.gson.JsonElement;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
 import io.netty.handler.codec.mqtt.*;
@@ -38,7 +37,6 @@ import org.thingsboard.server.common.transport.adaptor.AdaptorException;
 import org.thingsboard.server.common.transport.auth.DeviceAuthService;
 import org.thingsboard.server.dao.EncryptionUtil;
 import org.thingsboard.server.dao.device.DeviceService;
-import org.thingsboard.server.transport.mqtt.adaptors.JsonMqttAdaptor;
 import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor;
 import org.thingsboard.server.transport.mqtt.session.GatewaySessionCtx;
 import org.thingsboard.server.transport.mqtt.session.DeviceSessionCtx;
@@ -129,13 +127,17 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
         log.trace("[{}] Processing publish msg [{}][{}]!", sessionId, topicName, msgId);
 
         if (topicName.startsWith(BASE_GATEWAY_API_TOPIC)) {
-            AdaptorToSessionActorMsg msg = null;
             if (gatewaySessionCtx != null) {
+                gatewaySessionCtx.setChannel(ctx);
                 try {
-                    if (topicName.equals(GATEWAY_CONNECT_TOPIC)) {
-                        gatewaySessionCtx.connect(getDeviceName(mqttMsg));
+                    if (topicName.equals(GATEWAY_TELEMETRY_TOPIC)) {
+                        gatewaySessionCtx.onDeviceTelemetry(mqttMsg);
+                    } else if (topicName.equals(GATEWAY_ATTRIBUTES_TOPIC)) {
+                        gatewaySessionCtx.onDeviceAttributes(mqttMsg);
+                    } else if (topicName.equals(GATEWAY_CONNECT_TOPIC)) {
+                        gatewaySessionCtx.onDeviceConnect(mqttMsg);
                     } else if (topicName.equals(GATEWAY_DISCONNECT_TOPIC)) {
-                        gatewaySessionCtx.disconnect(getDeviceName(mqttMsg));
+                        gatewaySessionCtx.onDeviceDisconnect(mqttMsg);
                     }
                 } catch (RuntimeException | AdaptorException e) {
                     log.warn("[{}] Failed to process publish msg [{}][{}]", sessionId, topicName, msgId, e);
@@ -146,11 +148,6 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
         }
     }
 
-    private String getDeviceName(MqttPublishMessage mqttMsg) throws AdaptorException {
-        JsonElement json = JsonMqttAdaptor.validateJsonPayload(deviceSessionCtx.getSessionId(), mqttMsg.payload());
-        return json.getAsJsonObject().get("device").getAsString();
-    }
-
     private void processDevicePublish(ChannelHandlerContext ctx, MqttPublishMessage mqttMsg, String topicName, int msgId) {
         AdaptorToSessionActorMsg msg = null;
         try {
@@ -309,6 +306,10 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
 
     private void processDisconnect(ChannelHandlerContext ctx) {
         ctx.close();
+        processor.process(SessionCloseMsg.onDisconnect(deviceSessionCtx.getSessionId()));
+        if (gatewaySessionCtx != null) {
+            gatewaySessionCtx.onGatewayDisconnect();
+        }
     }
 
     private MqttConnAckMessage createMqttConnAckMsg(MqttConnectReturnCode returnCode) {
@@ -362,9 +363,12 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
 
     private void checkGatewaySession() {
         Device device = deviceSessionCtx.getDevice();
-        JsonNode gatewayNode = device.getAdditionalInfo().get("gateway");
-        if (gatewayNode != null && gatewayNode.asBoolean()) {
-            gatewaySessionCtx = new GatewaySessionCtx(processor, deviceService, authService, device);
+        JsonNode infoNode = device.getAdditionalInfo();
+        if (infoNode != null) {
+            JsonNode gatewayNode = infoNode.get("gateway");
+            if (gatewayNode != null && gatewayNode.asBoolean()) {
+                gatewaySessionCtx = new GatewaySessionCtx(processor, deviceService, authService, deviceSessionCtx);
+            }
         }
     }
 
diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/DeviceSessionCtx.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/DeviceSessionCtx.java
index f7996fa..f458b86 100644
--- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/DeviceSessionCtx.java
+++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/DeviceSessionCtx.java
@@ -83,11 +83,6 @@ public class DeviceSessionCtx extends DeviceAwareSessionContext {
     }
 
     @Override
-    public void onError(SessionException e) {
-
-    }
-
-    @Override
     public boolean isClosed() {
         return false;
     }
diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionCtx.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionCtx.java
index fef155f..9c4bacf 100644
--- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionCtx.java
+++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionCtx.java
@@ -15,26 +15,29 @@
  */
 package org.thingsboard.server.transport.mqtt.session;
 
+import io.netty.handler.codec.mqtt.MqttMessage;
 import org.thingsboard.server.common.data.Device;
 import org.thingsboard.server.common.data.id.SessionId;
-import org.thingsboard.server.common.msg.session.SessionActorToAdaptorMsg;
-import org.thingsboard.server.common.msg.session.SessionCtrlMsg;
-import org.thingsboard.server.common.msg.session.SessionType;
+import org.thingsboard.server.common.msg.core.ResponseMsg;
+import org.thingsboard.server.common.msg.session.*;
 import org.thingsboard.server.common.msg.session.ex.SessionException;
-import org.thingsboard.server.common.transport.SessionMsgProcessor;
-import org.thingsboard.server.common.transport.auth.DeviceAuthService;
 import org.thingsboard.server.common.transport.session.DeviceAwareSessionContext;
+import org.thingsboard.server.transport.mqtt.MqttTransportHandler;
+
+import java.util.Optional;
 
 /**
  * Created by ashvayka on 19.01.17.
  */
 public class GatewayDeviceSessionCtx extends DeviceAwareSessionContext {
 
+    private GatewaySessionCtx parent;
     private final MqttSessionId sessionId;
     private volatile boolean closed;
 
-    public GatewayDeviceSessionCtx(SessionMsgProcessor processor, DeviceAuthService authService, Device device) {
-        super(processor, authService, device);
+    public GatewayDeviceSessionCtx(GatewaySessionCtx parent, Device device) {
+        super(parent.getProcessor(), parent.getAuthService(), device);
+        this.parent = parent;
         this.sessionId = new MqttSessionId();
     }
 
@@ -49,17 +52,30 @@ public class GatewayDeviceSessionCtx extends DeviceAwareSessionContext {
     }
 
     @Override
-    public void onMsg(SessionActorToAdaptorMsg msg) throws SessionException {
-
+    public void onMsg(SessionActorToAdaptorMsg sessionMsg) throws SessionException {
+        Optional<MqttMessage> message = getToDeviceMsg(sessionMsg);
+        message.ifPresent(parent::writeAndFlush);
     }
 
-    @Override
-    public void onMsg(SessionCtrlMsg msg) throws SessionException {
-
+    private Optional<MqttMessage> getToDeviceMsg(SessionActorToAdaptorMsg sessionMsg) {
+        ToDeviceMsg msg = sessionMsg.getMsg();
+        switch (msg.getMsgType()) {
+            case STATUS_CODE_RESPONSE:
+                ResponseMsg<?> responseMsg = (ResponseMsg) msg;
+                if (responseMsg.isSuccess()) {
+                    MsgType requestMsgType = responseMsg.getRequestMsgType();
+                    Integer requestId = responseMsg.getRequestId();
+                    if (requestMsgType == MsgType.POST_ATTRIBUTES_REQUEST || requestMsgType == MsgType.POST_TELEMETRY_REQUEST) {
+                        return Optional.of(MqttTransportHandler.createMqttPubAckMsg(requestId));
+                    }
+                }
+                break;
+        }
+        return Optional.empty();
     }
 
     @Override
-    public void onError(SessionException e) {
+    public void onMsg(SessionCtrlMsg msg) throws SessionException {
 
     }
 
diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java
index 54336d9..2badd3a 100644
--- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java
+++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java
@@ -15,55 +15,177 @@
  */
 package org.thingsboard.server.transport.mqtt.session;
 
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
-
+import com.google.gson.*;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.UnpooledByteBufAllocator;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.mqtt.MqttMessage;
+import io.netty.handler.codec.mqtt.MqttPublishMessage;
+import lombok.extern.slf4j.Slf4j;
 import org.springframework.util.StringUtils;
 import org.thingsboard.server.common.data.Device;
+import org.thingsboard.server.common.data.id.SessionId;
+import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
+import org.thingsboard.server.common.msg.core.BasicTelemetryUploadRequest;
+import org.thingsboard.server.common.msg.core.BasicUpdateAttributesRequest;
+import org.thingsboard.server.common.msg.core.TelemetryUploadRequest;
+import org.thingsboard.server.common.msg.session.BasicAdaptorToSessionActorMsg;
+import org.thingsboard.server.common.msg.session.BasicToDeviceActorSessionMsg;
+import org.thingsboard.server.common.msg.session.ctrl.SessionCloseMsg;
 import org.thingsboard.server.common.transport.SessionMsgProcessor;
+import org.thingsboard.server.common.transport.adaptor.AdaptorException;
+import org.thingsboard.server.common.transport.adaptor.JsonConverter;
 import org.thingsboard.server.common.transport.auth.DeviceAuthService;
 import org.thingsboard.server.dao.device.DeviceService;
+import org.thingsboard.server.transport.mqtt.MqttTransportHandler;
+import org.thingsboard.server.transport.mqtt.adaptors.JsonMqttAdaptor;
+
+import java.nio.charset.Charset;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import static org.thingsboard.server.transport.mqtt.adaptors.JsonMqttAdaptor.validateJsonPayload;
 
 /**
  * Created by ashvayka on 19.01.17.
  */
+@Slf4j
 public class GatewaySessionCtx {
 
+    private static final Gson GSON = new Gson();
+    private static final Charset UTF8 = Charset.forName("UTF-8");
+    private static final ByteBufAllocator ALLOCATOR = new UnpooledByteBufAllocator(false);
+
     private final Device gateway;
+    private final SessionId gatewaySessionId;
     private final SessionMsgProcessor processor;
     private final DeviceService deviceService;
     private final DeviceAuthService authService;
     private final Map<String, GatewayDeviceSessionCtx> devices;
+    private ChannelHandlerContext channel;
 
-    public GatewaySessionCtx(SessionMsgProcessor processor, DeviceService deviceService, DeviceAuthService authService, Device gateway) {
+    public GatewaySessionCtx(SessionMsgProcessor processor, DeviceService deviceService, DeviceAuthService authService, DeviceSessionCtx gatewaySessionCtx) {
         this.processor = processor;
         this.deviceService = deviceService;
         this.authService = authService;
-        this.gateway = gateway;
+        this.gateway = gatewaySessionCtx.getDevice();
+        this.gatewaySessionId = gatewaySessionCtx.getSessionId();
         this.devices = new HashMap<>();
     }
 
-    public void connect(String deviceName) {
-        checkDeviceName(deviceName);
+    public void onDeviceConnect(MqttPublishMessage msg) throws AdaptorException {
+        String deviceName = checkDeviceName(getDeviceName(msg));
         Optional<Device> deviceOpt = deviceService.findDeviceByTenantIdAndName(gateway.getTenantId(), deviceName);
         Device device = deviceOpt.orElseGet(() -> {
             Device newDevice = new Device();
             newDevice.setTenantId(gateway.getTenantId());
+            newDevice.setName(deviceName);
             return deviceService.saveDevice(newDevice);
         });
-        devices.put(deviceName, new GatewayDeviceSessionCtx(processor, authService, device));
+        devices.put(deviceName, new GatewayDeviceSessionCtx(this, device));
+        ack(msg);
+    }
+
+    public void onDeviceDisconnect(MqttPublishMessage msg) throws AdaptorException {
+        String deviceName = checkDeviceName(getDeviceName(msg));
+        GatewayDeviceSessionCtx deviceSessionCtx = devices.remove(deviceName);
+        deviceSessionCtx.setClosed(true);
+        ack(msg);
+    }
+
+    public void onGatewayDisconnect() {
+        devices.forEach((k, v) -> {
+            processor.process(SessionCloseMsg.onDisconnect(v.getSessionId()));
+        });
     }
 
-    public void disconnect(String deviceName) {
-        checkDeviceName(deviceName);
-        devices.remove(deviceName);
+    public void onDeviceTelemetry(MqttPublishMessage mqttMsg) throws AdaptorException {
+        JsonElement json = validateJsonPayload(gatewaySessionId, mqttMsg.payload());
+        int requestId = mqttMsg.variableHeader().messageId();
+        if (json.isJsonObject()) {
+            JsonObject jsonObj = json.getAsJsonObject();
+            for (Map.Entry<String, JsonElement> deviceEntry : jsonObj.entrySet()) {
+                String deviceName = checkDeviceConnected(deviceEntry.getKey());
+                if (!deviceEntry.getValue().isJsonArray()) {
+                    throw new JsonSyntaxException("Can't parse value: " + json);
+                }
+                BasicTelemetryUploadRequest request = new BasicTelemetryUploadRequest(requestId);
+                JsonArray deviceData = deviceEntry.getValue().getAsJsonArray();
+                for (JsonElement element : deviceData) {
+                    JsonConverter.parseWithTs(request, element.getAsJsonObject());
+                }
+                GatewayDeviceSessionCtx deviceSessionCtx = devices.get(deviceName);
+                processor.process(new BasicToDeviceActorSessionMsg(deviceSessionCtx.getDevice(),
+                        new BasicAdaptorToSessionActorMsg(deviceSessionCtx, request)));
+            }
+        } else {
+            throw new JsonSyntaxException("Can't parse value: " + json);
+        }
     }
 
-    private void checkDeviceName(String deviceName) {
+    public void onDeviceAttributes(MqttPublishMessage mqttMsg) throws AdaptorException {
+        JsonElement json = validateJsonPayload(gatewaySessionId, mqttMsg.payload());
+        int requestId = mqttMsg.variableHeader().messageId();
+        if (json.isJsonObject()) {
+            JsonObject jsonObj = json.getAsJsonObject();
+            for (Map.Entry<String, JsonElement> deviceEntry : jsonObj.entrySet()) {
+                String deviceName = checkDeviceConnected(deviceEntry.getKey());
+                if (!deviceEntry.getValue().isJsonObject()) {
+                    throw new JsonSyntaxException("Can't parse value: " + json);
+                }
+                long ts = System.currentTimeMillis();
+                BasicUpdateAttributesRequest request = new BasicUpdateAttributesRequest(requestId);
+                JsonObject deviceData = deviceEntry.getValue().getAsJsonObject();
+                request.add(JsonConverter.parseValues(deviceData).stream().map(kv -> new BaseAttributeKvEntry(kv, ts)).collect(Collectors.toList()));
+                GatewayDeviceSessionCtx deviceSessionCtx = devices.get(deviceName);
+                processor.process(new BasicToDeviceActorSessionMsg(deviceSessionCtx.getDevice(),
+                        new BasicAdaptorToSessionActorMsg(deviceSessionCtx, request)));
+            }
+        } else {
+            throw new JsonSyntaxException("Can't parse value: " + json);
+        }
+    }
+
+    private String checkDeviceConnected(String deviceName) {
+        if (!devices.containsKey(deviceName)) {
+            throw new RuntimeException("Device is not connected!");
+        } else {
+            return deviceName;
+        }
+    }
+
+    private String checkDeviceName(String deviceName) {
         if (StringUtils.isEmpty(deviceName)) {
-            throw new RuntimeException();
+            throw new RuntimeException("Device name is empty!");
+        } else {
+            return deviceName;
         }
     }
 
+    private String getDeviceName(MqttPublishMessage mqttMsg) throws AdaptorException {
+        JsonElement json = JsonMqttAdaptor.validateJsonPayload(gatewaySessionId, mqttMsg.payload());
+        return json.getAsJsonObject().get("device").getAsString();
+    }
+
+    protected SessionMsgProcessor getProcessor() {
+        return processor;
+    }
+
+    protected DeviceAuthService getAuthService() {
+        return authService;
+    }
+
+    public void setChannel(ChannelHandlerContext channel) {
+        this.channel = channel;
+    }
+
+    private void ack(MqttPublishMessage msg) {
+        writeAndFlush(MqttTransportHandler.createMqttPubAckMsg(msg.variableHeader().messageId()));
+    }
+
+    protected void writeAndFlush(MqttMessage mqttMessage) {
+        channel.writeAndFlush(mqttMessage);
+    }
 }