thingsboard-memoizeit

HTTP transport implementation

10/17/2018 12:59:52 PM

Details

diff --git a/application/src/main/java/org/thingsboard/server/service/transport/LocalTransportService.java b/application/src/main/java/org/thingsboard/server/service/transport/LocalTransportService.java
index a7590c9..b800172 100644
--- a/application/src/main/java/org/thingsboard/server/service/transport/LocalTransportService.java
+++ b/application/src/main/java/org/thingsboard/server/service/transport/LocalTransportService.java
@@ -1,12 +1,12 @@
 /**
  * Copyright © 2016-2018 The Thingsboard Authors
- *
+ * <p>
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -26,6 +26,7 @@ import org.thingsboard.server.common.msg.cluster.ServerAddress;
 import org.thingsboard.server.common.transport.SessionMsgListener;
 import org.thingsboard.server.common.transport.TransportService;
 import org.thingsboard.server.common.transport.TransportServiceCallback;
+import org.thingsboard.server.common.transport.service.AbstractTransportService;
 import org.thingsboard.server.gen.transport.TransportProtos;
 import org.thingsboard.server.gen.transport.TransportProtos.DeviceActorToTransportMsg;
 import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeRequestMsg;
@@ -56,6 +57,8 @@ import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -67,11 +70,7 @@ import java.util.function.Consumer;
 @Slf4j
 @Service
 @ConditionalOnProperty(prefix = "transport", value = "type", havingValue = "local")
-public class LocalTransportService implements TransportService, RuleEngineTransportService {
-
-    private ConcurrentMap<UUID, SessionMsgListener> sessions = new ConcurrentHashMap<>();
-
-    private ExecutorService transportCallbackExecutor;
+public class LocalTransportService extends AbstractTransportService implements RuleEngineTransportService {
 
     @Autowired
     private TransportApiService transportApiService;
@@ -89,14 +88,12 @@ public class LocalTransportService implements TransportService, RuleEngineTransp
 
     @PostConstruct
     public void init() {
-        this.transportCallbackExecutor = new ThreadPoolExecutor(0, 100, 60L, TimeUnit.SECONDS, new SynchronousQueue<>());
+        super.init();
     }
 
     @PreDestroy
     public void destroy() {
-        if (transportCallbackExecutor != null) {
-            transportCallbackExecutor.shutdownNow();
-        }
+        super.destroy();
     }
 
     @Override
@@ -176,51 +173,13 @@ public class LocalTransportService implements TransportService, RuleEngineTransp
     }
 
     @Override
-    public void registerSession(SessionInfoProto sessionInfo, TransportProtos.SessionType sessionType, SessionMsgListener listener) {
-        sessions.putIfAbsent(toId(sessionInfo), listener);
-        //TODO: monitor sessions periodically: PING REQ/RESP, etc.
-    }
-
-    @Override
-    public void deregisterSession(SessionInfoProto sessionInfo) {
-        sessions.remove(toId(sessionInfo));
-    }
-
-    private UUID toId(SessionInfoProto sessionInfo) {
-        return new UUID(sessionInfo.getSessionIdMSB(), sessionInfo.getSessionIdLSB());
-    }
-
-    @Override
     public void process(String nodeId, DeviceActorToTransportMsg msg) {
         process(nodeId, msg, null, null);
     }
 
     @Override
     public void process(String nodeId, DeviceActorToTransportMsg msg, Runnable onSuccess, Consumer<Throwable> onFailure) {
-        UUID sessionId = new UUID(msg.getSessionIdMSB(), msg.getSessionIdLSB());
-        SessionMsgListener listener = sessions.get(sessionId);
-        if (listener != null) {
-            transportCallbackExecutor.submit(() -> {
-                if (msg.hasGetAttributesResponse()) {
-                    listener.onGetAttributesResponse(msg.getGetAttributesResponse());
-                }
-                if (msg.hasAttributeUpdateNotification()) {
-                    listener.onAttributeUpdate(msg.getAttributeUpdateNotification());
-                }
-                if (msg.hasSessionCloseNotification()) {
-                    listener.onRemoteSessionCloseCommand(msg.getSessionCloseNotification());
-                }
-                if (msg.hasToDeviceRequest()) {
-                    listener.onToDeviceRpcRequest(msg.getToDeviceRequest());
-                }
-                if (msg.hasToServerResponse()) {
-                    listener.onToServerRpcResponse(msg.getToServerResponse());
-                }
-            });
-        } else {
-            //TODO: should we notify the device actor about missed session?
-            log.debug("[{}] Missing session.", sessionId);
-        }
+        processToTransportMsg(msg);
         if (onSuccess != null) {
             onSuccess.run();
         }
diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml
index ca50944..0eb624b 100644
--- a/application/src/main/resources/thingsboard.yml
+++ b/application/src/main/resources/thingsboard.yml
@@ -439,7 +439,7 @@ transport:
   # Local MQTT transport parameters
   mqtt:
     # Enable/disable mqtt transport protocol.
-    enabled: "${MQTT_ENABLED:true}"
+    enabled: "${MQTT_ENABLED:false}"
     bind_address: "${MQTT_BIND_ADDRESS:0.0.0.0}"
     bind_port: "${MQTT_BIND_PORT:1883}"
     adaptor: "${MQTT_ADAPTOR_NAME:JsonMqttAdaptor}"
diff --git a/common/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java b/common/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java
index bca05c0..8d579d4 100644
--- a/common/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java
+++ b/common/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java
@@ -15,6 +15,8 @@
  */
 package org.thingsboard.server.transport.http;
 
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
@@ -33,9 +35,20 @@ import org.thingsboard.server.common.transport.TransportContext;
 import org.thingsboard.server.common.transport.TransportService;
 import org.thingsboard.server.common.transport.TransportServiceCallback;
 import org.thingsboard.server.common.transport.adaptor.JsonConverter;
-import org.thingsboard.server.gen.transport.TransportProtos;
-import org.thingsboard.server.gen.transport.TransportProtos.*;
-import org.thingsboard.server.transport.http.session.HttpSessionCtx;
+import org.thingsboard.server.gen.transport.TransportProtos.AttributeUpdateNotificationMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.DeviceInfoProto;
+import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeRequestMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeResponseMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.SessionCloseNotificationProto;
+import org.thingsboard.server.gen.transport.TransportProtos.SessionInfoProto;
+import org.thingsboard.server.gen.transport.TransportProtos.SubscribeToAttributeUpdatesMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.SubscribeToRPCMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.ToDeviceRpcRequestMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.ToDeviceRpcResponseMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.ToServerRpcRequestMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.ToServerRpcResponseMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceCredentialsResponseMsg;
+import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceTokenRequestMsg;
 
 import javax.servlet.http.HttpServletRequest;
 import java.util.Arrays;
@@ -76,7 +89,7 @@ public class DeviceApiController {
                         request.addAllSharedAttributeNames(sharedKeySet);
                     }
                     TransportService transportService = transportContext.getTransportService();
-                    transportService.registerSession(sessionInfo, TransportProtos.SessionType.SYNC, new HttpSessionListener(transportContext, responseWriter));
+                    transportService.registerSyncSession(sessionInfo, new HttpSessionListener(responseWriter), transportContext.getDefaultTimeout());
                     transportService.process(sessionInfo, request.build(), new SessionCloseOnErrorCallback(transportService, sessionInfo));
                 }));
         return responseWriter;
@@ -85,20 +98,16 @@ public class DeviceApiController {
     @RequestMapping(value = "/{deviceToken}/attributes", method = RequestMethod.POST)
     public DeferredResult<ResponseEntity> postDeviceAttributes(@PathVariable("deviceToken") String deviceToken,
                                                                @RequestBody String json, HttpServletRequest request) {
-        DeferredResult<ResponseEntity> responseWriter = new DeferredResult<ResponseEntity>();
+        DeferredResult<ResponseEntity> responseWriter = new DeferredResult<>();
         if (quotaExceeded(request, responseWriter)) {
             return responseWriter;
         }
-//        HttpSessionCtx ctx = getHttpSessionCtx(responseWriter);
-//        if (ctx.login(new DeviceTokenCredentials(deviceToken))) {
-//            try {
-//                process(ctx, JsonConverter.convertToAttributes(new JsonParser().parse(json)));
-//            } catch (IllegalStateException | JsonSyntaxException ex) {
-//                responseWriter.setResult(new ResponseEntity<>(HttpStatus.BAD_REQUEST));
-//            }
-//        } else {
-//            responseWriter.setResult(new ResponseEntity<>(HttpStatus.UNAUTHORIZED));
-//        }
+        transportContext.getTransportService().process(ValidateDeviceTokenRequestMsg.newBuilder().setToken(deviceToken).build(),
+                new DeviceAuthCallback(transportContext, responseWriter, sessionInfo -> {
+                    TransportService transportService = transportContext.getTransportService();
+                    transportService.process(sessionInfo, JsonConverter.convertToAttributesProto(new JsonParser().parse(json)),
+                            new HttpOkCallback(responseWriter));
+                }));
         return responseWriter;
     }
 
@@ -109,26 +118,33 @@ public class DeviceApiController {
         if (quotaExceeded(request, responseWriter)) {
             return responseWriter;
         }
-        HttpSessionCtx ctx = getHttpSessionCtx(responseWriter);
-//        if (ctx.login(new DeviceTokenCredentials(deviceToken))) {
-//            try {
-//                process(ctx, JsonConverter.convertToTelemetry(new JsonParser().parse(json)));
-//            } catch (IllegalStateException | JsonSyntaxException ex) {
-//                responseWriter.setResult(new ResponseEntity<>(HttpStatus.BAD_REQUEST));
-//            }
-//        } else {
-//            responseWriter.setResult(new ResponseEntity<>(HttpStatus.UNAUTHORIZED));
-//        }
+        transportContext.getTransportService().process(ValidateDeviceTokenRequestMsg.newBuilder().setToken(deviceToken).build(),
+                new DeviceAuthCallback(transportContext, responseWriter, sessionInfo -> {
+                    TransportService transportService = transportContext.getTransportService();
+                    transportService.process(sessionInfo, JsonConverter.convertToTelemetryProto(new JsonParser().parse(json)),
+                            new HttpOkCallback(responseWriter));
+                }));
         return responseWriter;
     }
 
     @RequestMapping(value = "/{deviceToken}/rpc", method = RequestMethod.GET, produces = "application/json")
     public DeferredResult<ResponseEntity> subscribeToCommands(@PathVariable("deviceToken") String deviceToken,
                                                               @RequestParam(value = "timeout", required = false, defaultValue = "0") long timeout,
-                                                              HttpServletRequest request) {
+                                                              HttpServletRequest httpRequest) {
+        DeferredResult<ResponseEntity> responseWriter = new DeferredResult<>();
+        if (quotaExceeded(httpRequest, responseWriter)) {
+            return responseWriter;
+        }
+        transportContext.getTransportService().process(ValidateDeviceTokenRequestMsg.newBuilder().setToken(deviceToken).build(),
+                new DeviceAuthCallback(transportContext, responseWriter, sessionInfo -> {
+                    TransportService transportService = transportContext.getTransportService();
+                    transportService.registerSyncSession(sessionInfo, new HttpSessionListener(responseWriter),
+                            timeout == 0 ? transportContext.getDefaultTimeout() : timeout);
+                    transportService.process(sessionInfo, SubscribeToRPCMsg.getDefaultInstance(),
+                            new SessionCloseOnErrorCallback(transportService, sessionInfo));
 
-//        return subscribe(deviceToken, timeout, new RpcSubscribeMsg(), request);
-        return null;
+                }));
+        return responseWriter;
     }
 
     @RequestMapping(value = "/{deviceToken}/rpc/{requestId}", method = RequestMethod.POST)
@@ -139,17 +155,11 @@ public class DeviceApiController {
         if (quotaExceeded(request, responseWriter)) {
             return responseWriter;
         }
-//        HttpSessionCtx ctx = getHttpSessionCtx(responseWriter);
-//        if (ctx.login(new DeviceTokenCredentials(deviceToken))) {
-//            try {
-//                JsonObject response = new JsonParser().parse(json).getAsJsonObject();
-//                process(ctx, new ToDeviceRpcResponseMsg(requestId, response.toString()));
-//            } catch (IllegalStateException | JsonSyntaxException ex) {
-//                responseWriter.setResult(new ResponseEntity<>(HttpStatus.BAD_REQUEST));
-//            }
-//        } else {
-//            responseWriter.setResult(new ResponseEntity<>(HttpStatus.UNAUTHORIZED));
-//        }
+        transportContext.getTransportService().process(ValidateDeviceTokenRequestMsg.newBuilder().setToken(deviceToken).build(),
+                new DeviceAuthCallback(transportContext, responseWriter, sessionInfo -> {
+                    TransportService transportService = transportContext.getTransportService();
+                    transportService.process(sessionInfo, ToDeviceRpcResponseMsg.newBuilder().setRequestId(requestId).setPayload(json).build(), new HttpOkCallback(responseWriter));
+                }));
         return responseWriter;
     }
 
@@ -160,19 +170,16 @@ public class DeviceApiController {
         if (quotaExceeded(httpRequest, responseWriter)) {
             return responseWriter;
         }
-        HttpSessionCtx ctx = getHttpSessionCtx(responseWriter);
-//        if (ctx.login(new DeviceTokenCredentials(deviceToken))) {
-//            try {
-//                JsonObject request = new JsonParser().parse(json).getAsJsonObject();
-//                process(ctx, new ToServerRpcRequestMsg(0,
-//                        request.get("method").getAsString(),
-//                        request.get("params").toString()));
-//            } catch (IllegalStateException | JsonSyntaxException ex) {
-//                responseWriter.setResult(new ResponseEntity<>(HttpStatus.BAD_REQUEST));
-//            }
-//        } else {
-//            responseWriter.setResult(new ResponseEntity<>(HttpStatus.UNAUTHORIZED));
-//        }
+        transportContext.getTransportService().process(ValidateDeviceTokenRequestMsg.newBuilder().setToken(deviceToken).build(),
+                new DeviceAuthCallback(transportContext, responseWriter, sessionInfo -> {
+                    JsonObject request = new JsonParser().parse(json).getAsJsonObject();
+                    TransportService transportService = transportContext.getTransportService();
+                    transportService.registerSyncSession(sessionInfo, new HttpSessionListener(responseWriter), transportContext.getDefaultTimeout());
+                    transportService.process(sessionInfo, ToServerRpcRequestMsg.newBuilder().setRequestId(0)
+                                    .setMethodName(request.get("method").getAsString())
+                                    .setParams(request.get("params").toString()).build(),
+                            new SessionCloseOnErrorCallback(transportService, sessionInfo));
+                }));
         return responseWriter;
     }
 
@@ -180,49 +187,30 @@ public class DeviceApiController {
     public DeferredResult<ResponseEntity> subscribeToAttributes(@PathVariable("deviceToken") String deviceToken,
                                                                 @RequestParam(value = "timeout", required = false, defaultValue = "0") long timeout,
                                                                 HttpServletRequest httpRequest) {
-        return null;
-//        return subscribe(deviceToken, timeout, new AttributesSubscribeMsg(), httpRequest);
-    }
-
-//    private DeferredResult<ResponseEntity> subscribe(String deviceToken, long timeout, FromDeviceMsg msg, HttpServletRequest httpRequest) {
-//        DeferredResult<ResponseEntity> responseWriter = new DeferredResult<ResponseEntity>();
-//        if (quotaExceeded(httpRequest, responseWriter)) {
-//            return responseWriter;
-//        }
-//        HttpSessionCtx ctx = getHttpSessionCtx(responseWriter, timeout);
-//        if (ctx.login(new DeviceTokenCredentials(deviceToken))) {
-//            try {
-//                process(ctx, msg);
-//            } catch (IllegalStateException | JsonSyntaxException ex) {
-//                responseWriter.setResult(new ResponseEntity<>(HttpStatus.BAD_REQUEST));
-//            }
-//        } else {
-//            responseWriter.setResult(new ResponseEntity<>(HttpStatus.UNAUTHORIZED));
-//        }
-//        return responseWriter;
-//    }
-
-    private HttpSessionCtx getHttpSessionCtx(DeferredResult<ResponseEntity> responseWriter) {
-        return getHttpSessionCtx(responseWriter, transportContext.getDefaultTimeout());
-    }
+        DeferredResult<ResponseEntity> responseWriter = new DeferredResult<>();
+        if (quotaExceeded(httpRequest, responseWriter)) {
+            return responseWriter;
+        }
+        transportContext.getTransportService().process(ValidateDeviceTokenRequestMsg.newBuilder().setToken(deviceToken).build(),
+                new DeviceAuthCallback(transportContext, responseWriter, sessionInfo -> {
+                    TransportService transportService = transportContext.getTransportService();
+                    transportService.registerSyncSession(sessionInfo, new HttpSessionListener(responseWriter),
+                            timeout == 0 ? transportContext.getDefaultTimeout() : timeout);
+                    transportService.process(sessionInfo, SubscribeToAttributeUpdatesMsg.getDefaultInstance(),
+                            new SessionCloseOnErrorCallback(transportService, sessionInfo));
 
-    private HttpSessionCtx getHttpSessionCtx(DeferredResult<ResponseEntity> responseWriter, long timeout) {
-        return null;
-//        return new HttpSessionCtx(processor, authService, responseWriter, timeout != 0 ? timeout : defaultTimeout);
+                }));
+        return responseWriter;
     }
 
-//    private void process(HttpSessionCtx ctx, FromDeviceMsg request) {
-//        AdaptorToSessionActorMsg msg = new BasicAdaptorToSessionActorMsg(ctx, request);
-////        processor.process(new BasicTransportToDeviceSessionActorMsg(ctx.getDevice(), msg));
-//    }
-
     private boolean quotaExceeded(HttpServletRequest request, DeferredResult<ResponseEntity> responseWriter) {
         if (transportContext.getQuotaService().isQuotaExceeded(request.getRemoteAddr())) {
             log.warn("REST Quota exceeded for [{}] . Disconnect", request.getRemoteAddr());
             responseWriter.setResult(new ResponseEntity<>(HttpStatus.BANDWIDTH_LIMIT_EXCEEDED));
             return true;
+        } else {
+            return false;
         }
-        return false;
     }
 
     private static class DeviceAuthCallback implements TransportServiceCallback<ValidateDeviceCredentialsResponseMsg> {
@@ -274,7 +262,6 @@ public class DeviceApiController {
 
         @Override
         public void onSuccess(Void msg) {
-
         }
 
         @Override
@@ -283,13 +270,30 @@ public class DeviceApiController {
         }
     }
 
+    private static class HttpOkCallback implements TransportServiceCallback<Void> {
+        private final DeferredResult<ResponseEntity> responseWriter;
+
+        public HttpOkCallback(DeferredResult<ResponseEntity> responseWriter) {
+            this.responseWriter = responseWriter;
+        }
+
+        @Override
+        public void onSuccess(Void msg) {
+            responseWriter.setResult(new ResponseEntity<>(HttpStatus.OK));
+        }
+
+        @Override
+        public void onError(Throwable e) {
+            responseWriter.setResult(new ResponseEntity<>(HttpStatus.INTERNAL_SERVER_ERROR));
+        }
+    }
+
+
     private static class HttpSessionListener implements SessionMsgListener {
 
-        private final TransportContext transportContext;
         private final DeferredResult<ResponseEntity> responseWriter;
 
-        HttpSessionListener(TransportContext transportContext, DeferredResult<ResponseEntity> responseWriter) {
-            this.transportContext = transportContext;
+        HttpSessionListener(DeferredResult<ResponseEntity> responseWriter) {
             this.responseWriter = responseWriter;
         }
 
@@ -305,16 +309,17 @@ public class DeviceApiController {
 
         @Override
         public void onRemoteSessionCloseCommand(SessionCloseNotificationProto sessionCloseNotification) {
+            responseWriter.setResult(new ResponseEntity<>(HttpStatus.REQUEST_TIMEOUT));
         }
 
         @Override
-        public void onToDeviceRpcRequest(ToDeviceRpcRequestMsg toDeviceRequest) {
-
+        public void onToDeviceRpcRequest(ToDeviceRpcRequestMsg msg) {
+            responseWriter.setResult(new ResponseEntity<>(JsonConverter.toJson(msg, true).toString(), HttpStatus.OK));
         }
 
         @Override
-        public void onToServerRpcResponse(ToServerRpcResponseMsg toServerResponse) {
-
+        public void onToServerRpcResponse(ToServerRpcResponseMsg msg) {
+            responseWriter.setResult(new ResponseEntity<>(JsonConverter.toJson(msg).toString(), HttpStatus.OK));
         }
     }
 }
diff --git a/common/transport/http/src/main/java/org/thingsboard/server/transport/http/HttpTransportContext.java b/common/transport/http/src/main/java/org/thingsboard/server/transport/http/HttpTransportContext.java
index 87db06e..66654c1 100644
--- a/common/transport/http/src/main/java/org/thingsboard/server/transport/http/HttpTransportContext.java
+++ b/common/transport/http/src/main/java/org/thingsboard/server/transport/http/HttpTransportContext.java
@@ -26,12 +26,12 @@ import org.thingsboard.server.common.transport.TransportContext;
  * Created by ashvayka on 04.10.18.
  */
 @Slf4j
-@ConditionalOnProperty(prefix = "transport.mqtt", value = "enabled", havingValue = "true", matchIfMissing = true)
+@ConditionalOnProperty(prefix = "transport.http", value = "enabled", havingValue = "true", matchIfMissing = true)
 @Component
 public class HttpTransportContext extends TransportContext {
 
     @Getter
-    @Value("${http.request_timeout}")
+    @Value("${transport.http.request_timeout}")
     private long defaultTimeout;
 
 }
diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java
index 16f8981..91580bf 100644
--- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java
+++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java
@@ -19,6 +19,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import io.netty.handler.ssl.SslHandler;
 import lombok.Data;
 import lombok.Getter;
+import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
@@ -56,6 +57,7 @@ public class MqttTransportContext extends TransportContext {
     private Integer maxPayloadSize;
 
     @Getter
+    @Setter
     private SslHandler sslHandler;
 
 }
diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
index 30cc9c7..d739824 100644
--- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
+++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
@@ -1,12 +1,12 @@
 /**
  * Copyright © 2016-2018 The Thingsboard Authors
- *
+ * <p>
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -506,7 +506,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
                     .setTenantIdLSB(msg.getDeviceInfo().getTenantIdLSB())
                     .build();
             transportService.process(sessionInfo, getSessionEventMsg(SessionEvent.OPEN), null);
-            transportService.registerSession(sessionInfo, TransportProtos.SessionType.ASYNC, this);
+            transportService.registerAsyncSession(sessionInfo, this);
             checkGatewaySession();
             ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED));
         }
diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java
index 43f9978..d600059 100644
--- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java
+++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java
@@ -121,7 +121,7 @@ public class GatewaySessionHandler {
                                 transportService.process(deviceSessionInfo, MqttTransportHandler.getSessionEventMsg(TransportProtos.SessionEvent.OPEN), null);
                                 transportService.process(deviceSessionInfo, TransportProtos.SubscribeToRPCMsg.getDefaultInstance(), null);
                                 transportService.process(deviceSessionInfo, TransportProtos.SubscribeToAttributeUpdatesMsg.getDefaultInstance(), null);
-                                transportService.registerSession(deviceSessionInfo, TransportProtos.SessionType.ASYNC, deviceSessionCtx);
+                                transportService.registerAsyncSession(deviceSessionInfo, deviceSessionCtx);
                             }
                             future.set(devices.get(deviceName));
                         }
diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/AbstractTransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/AbstractTransportService.java
new file mode 100644
index 0000000..1bf4dd1
--- /dev/null
+++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/AbstractTransportService.java
@@ -0,0 +1,101 @@
+package org.thingsboard.server.common.transport.service;
+
+import lombok.extern.slf4j.Slf4j;
+import org.thingsboard.server.common.transport.SessionMsgListener;
+import org.thingsboard.server.common.transport.TransportService;
+import org.thingsboard.server.gen.transport.TransportProtos;
+
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Created by ashvayka on 17.10.18.
+ */
+@Slf4j
+public abstract class AbstractTransportService implements TransportService {
+
+    protected ScheduledExecutorService schedulerExecutor;
+    protected ExecutorService transportCallbackExecutor;
+    protected ConcurrentMap<UUID, SessionMetaData> sessions = new ConcurrentHashMap<>();
+
+    @Override
+    public void registerAsyncSession(TransportProtos.SessionInfoProto sessionInfo, SessionMsgListener listener) {
+        sessions.putIfAbsent(toId(sessionInfo), new SessionMetaData(sessionInfo, TransportProtos.SessionType.ASYNC, listener));
+        //TODO: monitor sessions periodically: PING REQ/RESP, etc.
+    }
+
+    @Override
+    public void registerSyncSession(TransportProtos.SessionInfoProto sessionInfo, SessionMsgListener listener, long timeout) {
+        sessions.putIfAbsent(toId(sessionInfo), new SessionMetaData(sessionInfo, TransportProtos.SessionType.SYNC, listener));
+        schedulerExecutor.schedule(() -> {
+            listener.onRemoteSessionCloseCommand(TransportProtos.SessionCloseNotificationProto.getDefaultInstance());
+            deregisterSession(sessionInfo);
+        }, timeout, TimeUnit.MILLISECONDS);
+
+    }
+
+    @Override
+    public void deregisterSession(TransportProtos.SessionInfoProto sessionInfo) {
+        sessions.remove(toId(sessionInfo));
+    }
+
+    protected void processToTransportMsg(TransportProtos.DeviceActorToTransportMsg toSessionMsg) {
+        UUID sessionId = new UUID(toSessionMsg.getSessionIdMSB(), toSessionMsg.getSessionIdLSB());
+        SessionMetaData md = sessions.get(sessionId);
+        if (md != null) {
+            SessionMsgListener listener = md.getListener();
+            transportCallbackExecutor.submit(() -> {
+                if (toSessionMsg.hasGetAttributesResponse()) {
+                    listener.onGetAttributesResponse(toSessionMsg.getGetAttributesResponse());
+                }
+                if (toSessionMsg.hasAttributeUpdateNotification()) {
+                    listener.onAttributeUpdate(toSessionMsg.getAttributeUpdateNotification());
+                }
+                if (toSessionMsg.hasSessionCloseNotification()) {
+                    listener.onRemoteSessionCloseCommand(toSessionMsg.getSessionCloseNotification());
+                }
+                if (toSessionMsg.hasToDeviceRequest()) {
+                    listener.onToDeviceRpcRequest(toSessionMsg.getToDeviceRequest());
+                }
+                if (toSessionMsg.hasToServerResponse()) {
+                    listener.onToServerRpcResponse(toSessionMsg.getToServerResponse());
+                }
+            });
+            if (md.getSessionType() == TransportProtos.SessionType.SYNC) {
+                deregisterSession(md.getSessionInfo());
+            }
+        } else {
+            //TODO: should we notify the device actor about missed session?
+            log.debug("[{}] Missing session.", sessionId);
+        }
+    }
+
+    protected UUID toId(TransportProtos.SessionInfoProto sessionInfo) {
+        return new UUID(sessionInfo.getSessionIdMSB(), sessionInfo.getSessionIdLSB());
+    }
+
+    String getRoutingKey(TransportProtos.SessionInfoProto sessionInfo) {
+        return new UUID(sessionInfo.getDeviceIdMSB(), sessionInfo.getDeviceIdLSB()).toString();
+    }
+
+    public void init() {
+        this.schedulerExecutor = Executors.newSingleThreadScheduledExecutor();
+        this.transportCallbackExecutor = new ThreadPoolExecutor(0, 20, 60L, TimeUnit.SECONDS, new SynchronousQueue<>());
+    }
+
+    public void destroy() {
+        if (schedulerExecutor != null) {
+            schedulerExecutor.shutdownNow();
+        }
+        if (transportCallbackExecutor != null) {
+            transportCallbackExecutor.shutdownNow();
+        }
+    }
+}
diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/RemoteTransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/RemoteTransportService.java
index eb6dbc5..1442cc5 100644
--- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/RemoteTransportService.java
+++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/RemoteTransportService.java
@@ -1,12 +1,12 @@
 /**
  * Copyright © 2016-2018 The Thingsboard Authors
- * <p>
+ *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -26,6 +26,7 @@ import org.springframework.stereotype.Service;
 import org.thingsboard.server.common.transport.SessionMsgListener;
 import org.thingsboard.server.common.transport.TransportService;
 import org.thingsboard.server.common.transport.TransportServiceCallback;
+import org.thingsboard.server.gen.transport.TransportProtos;
 import org.thingsboard.server.gen.transport.TransportProtos.*;
 import org.thingsboard.server.gen.transport.TransportProtos.GetOrCreateDeviceFromGatewayRequestMsg;
 import org.thingsboard.server.gen.transport.TransportProtos.GetOrCreateDeviceFromGatewayResponseMsg;
@@ -62,7 +63,7 @@ import java.util.concurrent.Executors;
 @ConditionalOnProperty(prefix = "transport", value = "type", havingValue = "remote", matchIfMissing = true)
 @Service
 @Slf4j
-public class RemoteTransportService implements TransportService {
+public class RemoteTransportService extends AbstractTransportService {
 
     @Value("${kafka.rule_engine.topic}")
     private String ruleEngineTopic;
@@ -85,16 +86,12 @@ public class RemoteTransportService implements TransportService {
     @Value("${kafka.transport_api.response_auto_commit_interval}")
     private int autoCommitInterval;
 
-    private ConcurrentMap<UUID, SessionMetaData> sessions = new ConcurrentHashMap<>();
-
     @Autowired
     private TbKafkaSettings kafkaSettings;
     //We use this to get the node id. We should replace this with a component that provides the node id.
     @Autowired
     private TbNodeIdProvider nodeIdProvider;
 
-    private ExecutorService transportCallbackExecutor;
-
     private TbKafkaRequestTemplate<TransportApiRequestMsg, TransportApiResponseMsg> transportApiTemplate;
     private TBKafkaProducerTemplate<ToRuleEngineMsg> ruleEngineProducer;
     private TBKafkaConsumerTemplate<ToTransportMsg> mainConsumer;
@@ -105,7 +102,7 @@ public class RemoteTransportService implements TransportService {
 
     @PostConstruct
     public void init() {
-        this.transportCallbackExecutor = Executors.newCachedThreadPool();
+        super.init();
 
         TBKafkaProducerTemplate.TBKafkaProducerTemplateBuilder<TransportApiRequestMsg> requestBuilder = TBKafkaProducerTemplate.builder();
         requestBuilder.settings(kafkaSettings);
@@ -157,36 +154,7 @@ public class RemoteTransportService implements TransportService {
                         try {
                             ToTransportMsg toTransportMsg = mainConsumer.decode(record);
                             if (toTransportMsg.hasToDeviceSessionMsg()) {
-                                DeviceActorToTransportMsg toSessionMsg = toTransportMsg.getToDeviceSessionMsg();
-                                UUID sessionId = new UUID(toSessionMsg.getSessionIdMSB(), toSessionMsg.getSessionIdLSB());
-                                SessionMetaData md = sessions.get(sessionId);
-                                if (md != null) {
-                                    SessionMsgListener listener = md.getListener();
-                                    transportCallbackExecutor.submit(() -> {
-                                        if (toSessionMsg.hasGetAttributesResponse()) {
-                                            listener.onGetAttributesResponse(toSessionMsg.getGetAttributesResponse());
-                                        }
-                                        if (toSessionMsg.hasAttributeUpdateNotification()) {
-                                            listener.onAttributeUpdate(toSessionMsg.getAttributeUpdateNotification());
-                                        }
-                                        if (toSessionMsg.hasSessionCloseNotification()) {
-                                            listener.onRemoteSessionCloseCommand(toSessionMsg.getSessionCloseNotification());
-                                        }
-                                        if (toSessionMsg.hasToDeviceRequest()) {
-                                            listener.onToDeviceRpcRequest(toSessionMsg.getToDeviceRequest());
-                                        }
-                                        if (toSessionMsg.hasToServerResponse()) {
-                                            listener.onToServerRpcResponse(toSessionMsg.getToServerResponse());
-                                        }
-                                    });
-                                    if (md.getSessionType() == SessionType.SYNC) {
-                                        deregisterSession(md.getSessionInfo());
-                                    }
-                                } else {
-                                    //TODO: should we notify the device actor about missed session?
-                                    log.debug("[{}] Missing session.", sessionId);
-                                }
-
+                                processToTransportMsg(toTransportMsg.getToDeviceSessionMsg());
                             }
                         } catch (Throwable e) {
                             log.warn("Failed to process the notification.", e);
@@ -206,13 +174,11 @@ public class RemoteTransportService implements TransportService {
 
     @PreDestroy
     public void destroy() {
+        super.destroy();
         stopped = true;
         if (transportApiTemplate != null) {
             transportApiTemplate.stop();
         }
-        if (transportCallbackExecutor != null) {
-            transportCallbackExecutor.shutdownNow();
-        }
         if (mainConsumer != null) {
             mainConsumer.unsubscribe();
         }
@@ -314,25 +280,6 @@ public class RemoteTransportService implements TransportService {
         send(sessionInfo, toRuleEngineMsg, callback);
     }
 
-    @Override
-    public void registerSession(SessionInfoProto sessionInfo, SessionType sessionType, SessionMsgListener listener) {
-        sessions.putIfAbsent(toId(sessionInfo), new SessionMetaData(sessionInfo, sessionType, listener));
-        //TODO: monitor sessions periodically: PING REQ/RESP, etc.
-    }
-
-    @Override
-    public void deregisterSession(SessionInfoProto sessionInfo) {
-        sessions.remove(toId(sessionInfo));
-    }
-
-    private UUID toId(SessionInfoProto sessionInfo) {
-        return new UUID(sessionInfo.getSessionIdMSB(), sessionInfo.getSessionIdLSB());
-    }
-
-    private String getRoutingKey(SessionInfoProto sessionInfo) {
-        return new UUID(sessionInfo.getDeviceIdMSB(), sessionInfo.getDeviceIdLSB()).toString();
-    }
-
     private static class TransportCallbackAdaptor implements Callback {
         private final TransportServiceCallback<Void> callback;
 
diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/SessionMetaData.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/SessionMetaData.java
index dd52f77..1de5711 100644
--- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/SessionMetaData.java
+++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/SessionMetaData.java
@@ -1,3 +1,18 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.thingsboard.server.common.transport.service;
 
 import lombok.Data;
diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportContext.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportContext.java
index f846cf0..ab42982 100644
--- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportContext.java
+++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportContext.java
@@ -1,3 +1,18 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.thingsboard.server.common.transport;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java
index 9bbc40c..5d0c14d 100644
--- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java
+++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java
@@ -1,12 +1,12 @@
 /**
  * Copyright © 2016-2018 The Thingsboard Authors
- *
+ * <p>
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -59,7 +59,9 @@ public interface TransportService {
 
     void process(SessionInfoProto sessionInfo, ToServerRpcRequestMsg msg, TransportServiceCallback<Void> callback);
 
-    void registerSession(SessionInfoProto sessionInfo, TransportProtos.SessionType sessionType, SessionMsgListener listener);
+    void registerAsyncSession(SessionInfoProto sessionInfo, SessionMsgListener listener);
+
+    void registerSyncSession(SessionInfoProto sessionInfo, SessionMsgListener listener, long timeout);
 
     void deregisterSession(SessionInfoProto sessionInfo);