thingsboard-memoizeit

Details

diff --git a/application/pom.xml b/application/pom.xml
index bff96ec..dd1bade 100644
--- a/application/pom.xml
+++ b/application/pom.xml
@@ -60,14 +60,14 @@
             <groupId>org.thingsboard.common</groupId>
             <artifactId>transport</artifactId>
         </dependency>
-        <dependency>
-            <groupId>org.thingsboard.transport</groupId>
-            <artifactId>http</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.thingsboard.transport</groupId>
-            <artifactId>coap</artifactId>
-        </dependency>
+        <!--<dependency>-->
+            <!--<groupId>org.thingsboard.transport</groupId>-->
+            <!--<artifactId>http</artifactId>-->
+        <!--</dependency>-->
+        <!--<dependency>-->
+            <!--<groupId>org.thingsboard.transport</groupId>-->
+            <!--<artifactId>coap</artifactId>-->
+        <!--</dependency>-->
         <dependency>
             <groupId>org.thingsboard.transport</groupId>
             <artifactId>mqtt-common</artifactId>
diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
index 6448086..05902a6 100644
--- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
@@ -455,6 +455,10 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
     private void processSessionStateMsgs(SessionInfoProto sessionInfo, SessionEventMsg msg) {
         UUID sessionId = getSessionId(sessionInfo);
         if (msg.getEvent() == SessionEvent.OPEN) {
+            if(sessions.containsKey(sessionId)){
+                logger.debug("[{}] Received duplicate session open event [{}]", deviceId, sessionId);
+                return;
+            }
             logger.debug("[{}] Processing new session [{}]", deviceId, sessionId);
             if (sessions.size() >= systemContext.getMaxConcurrentSessionsPerDevice()) {
                 UUID sessionIdToRemove = sessions.keySet().stream().findFirst().orElse(null);
diff --git a/application/src/main/java/org/thingsboard/server/service/transport/RemoteTransportApiService.java b/application/src/main/java/org/thingsboard/server/service/transport/RemoteTransportApiService.java
index 945317f..66a8226 100644
--- a/application/src/main/java/org/thingsboard/server/service/transport/RemoteTransportApiService.java
+++ b/application/src/main/java/org/thingsboard/server/service/transport/RemoteTransportApiService.java
@@ -51,6 +51,7 @@ import javax.annotation.PostConstruct;
 import java.util.UUID;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.locks.ReentrantLock;
 
 /**
  * Created by ashvayka on 05.10.18.
@@ -97,6 +98,8 @@ public class RemoteTransportApiService implements TransportApiService {
 
     private TbKafkaResponseTemplate<TransportApiRequestMsg, TransportApiResponseMsg> transportApiTemplate;
 
+    private ReentrantLock deviceCreationLock = new ReentrantLock();
+
     @PostConstruct
     public void init() {
         this.transportCallbackExecutor = Executors.newCachedThreadPool();
@@ -156,23 +159,26 @@ public class RemoteTransportApiService implements TransportApiService {
         DeviceId gatewayId = new DeviceId(new UUID(requestMsg.getGatewayIdMSB(), requestMsg.getGatewayIdLSB()));
         ListenableFuture<Device> gatewayFuture = deviceService.findDeviceByIdAsync(gatewayId);
         return Futures.transform(gatewayFuture, gateway -> {
-            Device device = deviceService.findDeviceByTenantIdAndName(gateway.getTenantId(), gateway.getName());
-            if (device == null) {
-                device = new Device();
-                device.setTenantId(gateway.getTenantId());
-                device.setName(requestMsg.getDeviceName());
-                device.setType(requestMsg.getDeviceType());
-                device.setCustomerId(gateway.getCustomerId());
-                device = deviceService.saveDevice(device);
-                relationService.saveRelationAsync(new EntityRelation(gateway.getId(), device.getId(), "Created"));
-                deviceStateService.onDeviceAdded(device);
-            }
+            deviceCreationLock.lock();
             try {
+                Device device = deviceService.findDeviceByTenantIdAndName(gateway.getTenantId(), requestMsg.getDeviceName());
+                if (device == null) {
+                    device = new Device();
+                    device.setTenantId(gateway.getTenantId());
+                    device.setName(requestMsg.getDeviceName());
+                    device.setType(requestMsg.getDeviceType());
+                    device.setCustomerId(gateway.getCustomerId());
+                    device = deviceService.saveDevice(device);
+                    relationService.saveRelationAsync(new EntityRelation(gateway.getId(), device.getId(), "Created"));
+                    deviceStateService.onDeviceAdded(device);
+                }
                 return TransportApiResponseMsg.newBuilder()
                         .setGetOrCreateDeviceResponseMsg(GetOrCreateDeviceFromGatewayResponseMsg.newBuilder().setDeviceInfo(getDeviceInfoProto(device)).build()).build();
             } catch (JsonProcessingException e) {
                 log.warn("[{}] Failed to lookup device by gateway id and name", gatewayId, requestMsg.getDeviceName(), e);
                 throw new RuntimeException(e);
+            } finally {
+                deviceCreationLock.unlock();
             }
         }, transportCallbackExecutor);
     }
diff --git a/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java b/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java
index bc7eeff..3c7c521 100644
--- a/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java
+++ b/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java
@@ -122,7 +122,8 @@ public class DeviceApiController {
                                                               @RequestParam(value = "timeout", required = false, defaultValue = "0") long timeout,
                                                               HttpServletRequest request) {
 
-        return subscribe(deviceToken, timeout, new RpcSubscribeMsg(), request);
+//        return subscribe(deviceToken, timeout, new RpcSubscribeMsg(), request);
+        return null;
     }
 
     @RequestMapping(value = "/{deviceToken}/rpc/{requestId}", method = RequestMethod.POST)
@@ -174,15 +175,15 @@ public class DeviceApiController {
     public DeferredResult<ResponseEntity> subscribeToAttributes(@PathVariable("deviceToken") String deviceToken,
                                                                 @RequestParam(value = "timeout", required = false, defaultValue = "0") long timeout,
                                                                 HttpServletRequest httpRequest) {
-
-        return subscribe(deviceToken, timeout, new AttributesSubscribeMsg(), httpRequest);
+        return null;
+//        return subscribe(deviceToken, timeout, new AttributesSubscribeMsg(), httpRequest);
     }
 
-    private DeferredResult<ResponseEntity> subscribe(String deviceToken, long timeout, FromDeviceMsg msg, HttpServletRequest httpRequest) {
-        DeferredResult<ResponseEntity> responseWriter = new DeferredResult<ResponseEntity>();
-        if (quotaExceeded(httpRequest, responseWriter)) {
-            return responseWriter;
-        }
+//    private DeferredResult<ResponseEntity> subscribe(String deviceToken, long timeout, FromDeviceMsg msg, HttpServletRequest httpRequest) {
+//        DeferredResult<ResponseEntity> responseWriter = new DeferredResult<ResponseEntity>();
+//        if (quotaExceeded(httpRequest, responseWriter)) {
+//            return responseWriter;
+//        }
 //        HttpSessionCtx ctx = getHttpSessionCtx(responseWriter, timeout);
 //        if (ctx.login(new DeviceTokenCredentials(deviceToken))) {
 //            try {
@@ -193,21 +194,22 @@ public class DeviceApiController {
 //        } else {
 //            responseWriter.setResult(new ResponseEntity<>(HttpStatus.UNAUTHORIZED));
 //        }
-        return responseWriter;
-    }
+//        return responseWriter;
+//    }
 
     private HttpSessionCtx getHttpSessionCtx(DeferredResult<ResponseEntity> responseWriter) {
         return getHttpSessionCtx(responseWriter, defaultTimeout);
     }
 
     private HttpSessionCtx getHttpSessionCtx(DeferredResult<ResponseEntity> responseWriter, long timeout) {
-        return new HttpSessionCtx(processor, authService, responseWriter, timeout != 0 ? timeout : defaultTimeout);
+        return null;
+//        return new HttpSessionCtx(processor, authService, responseWriter, timeout != 0 ? timeout : defaultTimeout);
     }
 
-    private void process(HttpSessionCtx ctx, FromDeviceMsg request) {
-        AdaptorToSessionActorMsg msg = new BasicAdaptorToSessionActorMsg(ctx, request);
-//        processor.process(new BasicTransportToDeviceSessionActorMsg(ctx.getDevice(), msg));
-    }
+//    private void process(HttpSessionCtx ctx, FromDeviceMsg request) {
+//        AdaptorToSessionActorMsg msg = new BasicAdaptorToSessionActorMsg(ctx, request);
+////        processor.process(new BasicTransportToDeviceSessionActorMsg(ctx.getDevice(), msg));
+//    }
 
     private boolean quotaExceeded(HttpServletRequest request, DeferredResult<ResponseEntity> responseWriter) {
         if (quotaService.isQuotaExceeded(request.getRemoteAddr())) {
diff --git a/transport/http/src/main/java/org/thingsboard/server/transport/http/session/HttpSessionCtx.java b/transport/http/src/main/java/org/thingsboard/server/transport/http/session/HttpSessionCtx.java
index 62999cc..0e087d4 100644
--- a/transport/http/src/main/java/org/thingsboard/server/transport/http/session/HttpSessionCtx.java
+++ b/transport/http/src/main/java/org/thingsboard/server/transport/http/session/HttpSessionCtx.java
@@ -29,6 +29,7 @@ import org.thingsboard.server.common.transport.auth.DeviceAuthService;
 import org.thingsboard.server.common.transport.session.DeviceAwareSessionContext;
 
 import java.util.Optional;
+import java.util.UUID;
 import java.util.function.Consumer;
 
 /**
@@ -37,127 +38,136 @@ import java.util.function.Consumer;
 @Slf4j
 public class HttpSessionCtx extends DeviceAwareSessionContext {
 
-    private final SessionId sessionId;
-    private final long timeout;
-    private final DeferredResult<ResponseEntity> responseWriter;
-
-    public HttpSessionCtx(SessionMsgProcessor processor, DeviceAuthService authService, DeferredResult<ResponseEntity> responseWriter, long timeout) {
-        super();
-        this.sessionId = new HttpSessionId();
-        this.responseWriter = responseWriter;
-        this.timeout = timeout;
-    }
-
-    @Override
-    public SessionType getSessionType() {
-        return SessionType.SYNC;
+    public HttpSessionCtx(UUID sessionId) {
+        super(sessionId);
     }
 
     @Override
-    public void onMsg(SessionActorToAdaptorMsg source) throws SessionException {
-        ToDeviceMsg msg = source.getMsg();
-        switch (msg.getSessionMsgType()) {
-            case GET_ATTRIBUTES_RESPONSE:
-                reply((GetAttributesResponse) msg);
-                return;
-            case STATUS_CODE_RESPONSE:
-                reply((StatusCodeResponse) msg);
-                return;
-            case ATTRIBUTES_UPDATE_NOTIFICATION:
-                reply((AttributesUpdateNotification) msg);
-                return;
-            case TO_DEVICE_RPC_REQUEST:
-                reply((ToDeviceRpcRequestMsg) msg);
-                return;
-            case TO_SERVER_RPC_RESPONSE:
-                reply((ToServerRpcResponseMsg) msg);
-                return;
-            case RULE_ENGINE_ERROR:
-                reply((RuleEngineErrorMsg) msg);
-                return;
-            default:
-                break;
-        }
-    }
-
-    private void reply(RuleEngineErrorMsg msg) {
-        HttpStatus status = HttpStatus.INTERNAL_SERVER_ERROR;
-        switch (msg.getError()) {
-            case QUEUE_PUT_TIMEOUT:
-                status = HttpStatus.REQUEST_TIMEOUT;
-                break;
-            default:
-                if (msg.getInSessionMsgType() == SessionMsgType.TO_SERVER_RPC_REQUEST) {
-                    status = HttpStatus.BAD_REQUEST;
-                }
-                break;
-        }
-        responseWriter.setResult(new ResponseEntity<>(JsonConverter.toErrorJson(msg.getErrorMsg()).toString(), status));
-    }
-
-    private <T> void reply(ResponseMsg<? extends T> msg, Consumer<T> f) {
-        Optional<Exception> msgError = msg.getError();
-        if (!msgError.isPresent()) {
-            Optional<? extends T> msgData = msg.getData();
-            if (msgData.isPresent()) {
-                f.accept(msgData.get());
-            }
-        } else {
-            Exception e = msgError.get();
-            responseWriter.setResult(new ResponseEntity<>(e.getMessage(), HttpStatus.INTERNAL_SERVER_ERROR));
-        }
+    public int nextMsgId() {
+        return 0;
     }
 
-    private void reply(ToDeviceRpcRequestMsg msg) {
-        responseWriter.setResult(new ResponseEntity<>(JsonConverter.toJson(msg, true).toString(), HttpStatus.OK));
-    }
-
-    private void reply(ToServerRpcResponseMsg msg) {
-//        responseWriter.setResult(new ResponseEntity<>(JsonConverter.toJson(msg).toString(), HttpStatus.OK));
-    }
-
-    private void reply(AttributesUpdateNotification msg) {
-        responseWriter.setResult(new ResponseEntity<>(JsonConverter.toJson(msg.getData(), false).toString(), HttpStatus.OK));
-    }
-
-    private void reply(GetAttributesResponse msg) {
-        reply(msg, payload -> {
-            if (payload.getClientAttributes().isEmpty() && payload.getSharedAttributes().isEmpty()) {
-                responseWriter.setResult(new ResponseEntity<>(HttpStatus.NOT_FOUND));
-            } else {
-                JsonObject result = JsonConverter.toJson(payload, false);
-                responseWriter.setResult(new ResponseEntity<>(result.toString(), HttpStatus.OK));
-            }
-        });
-    }
-
-    private void reply(StatusCodeResponse msg) {
-        reply(msg, payload -> {
-            if (payload == 0) {
-                responseWriter.setResult(new ResponseEntity<>(HttpStatus.OK));
-            } else {
-                responseWriter.setResult(new ResponseEntity<>(HttpStatus.valueOf(payload)));
-            }
-        });
-    }
-
-    @Override
-    public void onMsg(SessionCtrlMsg msg) throws SessionException {
-        //Do nothing
-    }
-
-    @Override
-    public boolean isClosed() {
-        return false;
-    }
-
-    @Override
-    public long getTimeout() {
-        return timeout;
-    }
-
-    @Override
-    public SessionId getSessionId() {
-        return sessionId;
-    }
+    //    private final SessionId sessionId;
+//    private final long timeout;
+//    private final DeferredResult<ResponseEntity> responseWriter;
+//
+//    public HttpSessionCtx(SessionMsgProcessor processor, DeviceAuthService authService, DeferredResult<ResponseEntity> responseWriter, long timeout) {
+//        super();
+//        this.sessionId = new HttpSessionId();
+//        this.responseWriter = responseWriter;
+//        this.timeout = timeout;
+//    }
+//
+//    @Override
+//    public SessionType getSessionType() {
+//        return SessionType.SYNC;
+//    }
+//
+//    @Override
+//    public void onMsg(SessionActorToAdaptorMsg source) throws SessionException {
+//        ToDeviceMsg msg = source.getMsg();
+//        switch (msg.getSessionMsgType()) {
+//            case GET_ATTRIBUTES_RESPONSE:
+//                reply((GetAttributesResponse) msg);
+//                return;
+//            case STATUS_CODE_RESPONSE:
+//                reply((StatusCodeResponse) msg);
+//                return;
+//            case ATTRIBUTES_UPDATE_NOTIFICATION:
+//                reply((AttributesUpdateNotification) msg);
+//                return;
+//            case TO_DEVICE_RPC_REQUEST:
+//                reply((ToDeviceRpcRequestMsg) msg);
+//                return;
+//            case TO_SERVER_RPC_RESPONSE:
+//                reply((ToServerRpcResponseMsg) msg);
+//                return;
+//            case RULE_ENGINE_ERROR:
+//                reply((RuleEngineErrorMsg) msg);
+//                return;
+//            default:
+//                break;
+//        }
+//    }
+//
+//    private void reply(RuleEngineErrorMsg msg) {
+//        HttpStatus status = HttpStatus.INTERNAL_SERVER_ERROR;
+//        switch (msg.getError()) {
+//            case QUEUE_PUT_TIMEOUT:
+//                status = HttpStatus.REQUEST_TIMEOUT;
+//                break;
+//            default:
+//                if (msg.getInSessionMsgType() == SessionMsgType.TO_SERVER_RPC_REQUEST) {
+//                    status = HttpStatus.BAD_REQUEST;
+//                }
+//                break;
+//        }
+//        responseWriter.setResult(new ResponseEntity<>(JsonConverter.toErrorJson(msg.getErrorMsg()).toString(), status));
+//    }
+//
+//    private <T> void reply(ResponseMsg<? extends T> msg, Consumer<T> f) {
+//        Optional<Exception> msgError = msg.getError();
+//        if (!msgError.isPresent()) {
+//            Optional<? extends T> msgData = msg.getData();
+//            if (msgData.isPresent()) {
+//                f.accept(msgData.get());
+//            }
+//        } else {
+//            Exception e = msgError.get();
+//            responseWriter.setResult(new ResponseEntity<>(e.getMessage(), HttpStatus.INTERNAL_SERVER_ERROR));
+//        }
+//    }
+//
+//    private void reply(ToDeviceRpcRequestMsg msg) {
+//        responseWriter.setResult(new ResponseEntity<>(JsonConverter.toJson(msg, true).toString(), HttpStatus.OK));
+//    }
+//
+//    private void reply(ToServerRpcResponseMsg msg) {
+////        responseWriter.setResult(new ResponseEntity<>(JsonConverter.toJson(msg).toString(), HttpStatus.OK));
+//    }
+//
+//    private void reply(AttributesUpdateNotification msg) {
+//        responseWriter.setResult(new ResponseEntity<>(JsonConverter.toJson(msg.getData(), false).toString(), HttpStatus.OK));
+//    }
+//
+//    private void reply(GetAttributesResponse msg) {
+//        reply(msg, payload -> {
+//            if (payload.getClientAttributes().isEmpty() && payload.getSharedAttributes().isEmpty()) {
+//                responseWriter.setResult(new ResponseEntity<>(HttpStatus.NOT_FOUND));
+//            } else {
+//                JsonObject result = JsonConverter.toJson(payload, false);
+//                responseWriter.setResult(new ResponseEntity<>(result.toString(), HttpStatus.OK));
+//            }
+//        });
+//    }
+//
+//    private void reply(StatusCodeResponse msg) {
+//        reply(msg, payload -> {
+//            if (payload == 0) {
+//                responseWriter.setResult(new ResponseEntity<>(HttpStatus.OK));
+//            } else {
+//                responseWriter.setResult(new ResponseEntity<>(HttpStatus.valueOf(payload)));
+//            }
+//        });
+//    }
+//
+//    @Override
+//    public void onMsg(SessionCtrlMsg msg) throws SessionException {
+//        //Do nothing
+//    }
+//
+//    @Override
+//    public boolean isClosed() {
+//        return false;
+//    }
+//
+//    @Override
+//    public long getTimeout() {
+//        return timeout;
+//    }
+//
+//    @Override
+//    public SessionId getSessionId() {
+//        return sessionId;
+//    }
 }
diff --git a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
index b570c74..b1b176d 100644
--- a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
+++ b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
@@ -54,7 +54,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceTokenR
 import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceX509CertRequestMsg;
 import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor;
 import org.thingsboard.server.transport.mqtt.session.DeviceSessionCtx;
-import org.thingsboard.server.transport.mqtt.session.GatewaySessionCtx;
+import org.thingsboard.server.transport.mqtt.session.GatewaySessionHandler;
 import org.thingsboard.server.transport.mqtt.util.SslUtil;
 
 import javax.net.ssl.SSLPeerUnverifiedException;
@@ -98,7 +98,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
     private volatile SessionInfoProto sessionInfo;
     private volatile InetSocketAddress address;
     private volatile DeviceSessionCtx deviceSessionCtx;
-    private volatile GatewaySessionCtx gatewaySessionCtx;
+    private volatile GatewaySessionHandler gatewaySessionHandler;
 
     MqttTransportHandler(MqttTransportContext context) {
         this.sessionId = UUID.randomUUID();
@@ -175,7 +175,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
         log.trace("[{}] Processing publish msg [{}][{}]!", sessionId, topicName, msgId);
 
         if (topicName.startsWith(MqttTopics.BASE_GATEWAY_API_TOPIC)) {
-            if (gatewaySessionCtx != null) {
+            if (gatewaySessionHandler != null) {
                 handleGatewayPublishMsg(topicName, msgId, mqttMsg);
             }
         } else {
@@ -187,22 +187,22 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
         try {
             switch (topicName) {
                 case MqttTopics.GATEWAY_TELEMETRY_TOPIC:
-                    gatewaySessionCtx.onDeviceTelemetry(mqttMsg);
+                    gatewaySessionHandler.onDeviceTelemetry(mqttMsg);
                     break;
                 case MqttTopics.GATEWAY_ATTRIBUTES_TOPIC:
-                    gatewaySessionCtx.onDeviceAttributes(mqttMsg);
+                    gatewaySessionHandler.onDeviceAttributes(mqttMsg);
                     break;
                 case MqttTopics.GATEWAY_ATTRIBUTES_REQUEST_TOPIC:
-                    gatewaySessionCtx.onDeviceAttributesRequest(mqttMsg);
+                    gatewaySessionHandler.onDeviceAttributesRequest(mqttMsg);
                     break;
                 case MqttTopics.GATEWAY_RPC_TOPIC:
-                    gatewaySessionCtx.onDeviceRpcResponse(mqttMsg);
+                    gatewaySessionHandler.onDeviceRpcResponse(mqttMsg);
                     break;
                 case MqttTopics.GATEWAY_CONNECT_TOPIC:
-                    gatewaySessionCtx.onDeviceConnect(mqttMsg);
+                    gatewaySessionHandler.onDeviceConnect(mqttMsg);
                     break;
                 case MqttTopics.GATEWAY_DISCONNECT_TOPIC:
-                    gatewaySessionCtx.onDeviceDisconnect(mqttMsg);
+                    gatewaySessionHandler.onDeviceDisconnect(mqttMsg);
                     break;
             }
         } catch (RuntimeException | AdaptorException e) {
@@ -405,8 +405,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
         if (deviceSessionCtx.isConnected()) {
             transportService.process(sessionInfo, getSessionEventMsg(SessionEvent.CLOSED), null);
             transportService.deregisterSession(sessionInfo);
-            if (gatewaySessionCtx != null) {
-                gatewaySessionCtx.onGatewayDisconnect();
+            if (gatewaySessionHandler != null) {
+                gatewaySessionHandler.onGatewayDisconnect();
             }
         }
     }
@@ -467,7 +467,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
             if (infoNode != null) {
                 JsonNode gatewayNode = infoNode.get("gateway");
                 if (gatewayNode != null && gatewayNode.asBoolean()) {
-                    gatewaySessionCtx = new GatewaySessionCtx(context, deviceSessionCtx, sessionId);
+                    gatewaySessionHandler = new GatewaySessionHandler(context, deviceSessionCtx, sessionId);
                 }
             }
         } catch (IOException e) {
diff --git a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionCtx.java b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionCtx.java
index 8560b35..1e2c2ca 100644
--- a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionCtx.java
+++ b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionCtx.java
@@ -30,10 +30,10 @@ import java.util.concurrent.ConcurrentMap;
 @Slf4j
 public class GatewayDeviceSessionCtx extends MqttDeviceAwareSessionContext implements SessionMsgListener {
 
-    private final GatewaySessionCtx parent;
+    private final GatewaySessionHandler parent;
     private final SessionInfoProto sessionInfo;
 
-    public GatewayDeviceSessionCtx(GatewaySessionCtx parent, DeviceInfoProto deviceInfo, ConcurrentMap<String, Integer> mqttQoSMap) {
+    public GatewayDeviceSessionCtx(GatewaySessionHandler parent, DeviceInfoProto deviceInfo, ConcurrentMap<String, Integer> mqttQoSMap) {
         super(UUID.randomUUID(), mqttQoSMap);
         this.parent = parent;
         this.sessionInfo = SessionInfoProto.newBuilder()
diff --git a/transport/pom.xml b/transport/pom.xml
index 671e3c1..7915988 100644
--- a/transport/pom.xml
+++ b/transport/pom.xml
@@ -35,7 +35,7 @@
     </properties>
 
     <modules>
-        <!--<module>http</module>-->
+        <module>http</module>
         <!--<module>coap</module>-->
         <module>mqtt-common</module>
         <module>mqtt-transport</module>