thingsboard-aplcache

Implementation of Session timeouts

10/29/2018 1:31:14 PM

Changes

.gitignore 1(+1 -0)

Details

.gitignore 1(+1 -0)

diff --git a/.gitignore b/.gitignore
index 9039e8e..da8569b 100644
--- a/.gitignore
+++ b/.gitignore
@@ -32,3 +32,4 @@ pom.xml.versionsBackup
 **/Californium.properties
 **/.env
 .instance_id
+rebuild-docker.sh
diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
index 5e19d69..091b504 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
@@ -67,6 +67,7 @@ import org.thingsboard.server.service.mail.MailExecutorService;
 import org.thingsboard.server.service.rpc.DeviceRpcService;
 import org.thingsboard.server.service.script.JsExecutorService;
 import org.thingsboard.server.service.script.JsInvokeService;
+import org.thingsboard.server.service.session.DeviceSessionCacheService;
 import org.thingsboard.server.service.state.DeviceStateService;
 import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService;
 import org.thingsboard.server.service.transport.RuleEngineTransportService;
@@ -201,6 +202,10 @@ public class ActorSystemContext {
     @Getter
     private DeviceStateService deviceStateService;
 
+    @Autowired
+    @Getter
+    private DeviceSessionCacheService deviceSessionCacheService;
+
     @Lazy
     @Autowired
     @Getter
@@ -254,6 +259,14 @@ public class ActorSystemContext {
     @Getter
     private boolean allowSystemMailService;
 
+    @Value("${transport.sessions.inactivity_timeout}")
+    @Getter
+    private long sessionInactivityTimeout;
+
+    @Value("${transport.sessions.report_timeout}")
+    @Getter
+    private long sessionReportTimeout;
+
     @Getter
     @Setter
     private ActorSystem actorSystem;
diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActor.java
index bd2a0f4..7b412b1 100644
--- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActor.java
@@ -44,11 +44,19 @@ public class DeviceActor extends ContextAwareActor {
     }
 
     @Override
+    public void preStart() {
+        logger.debug("[{}][{}] Starting device actor.", processor.tenantId, processor.deviceId);
+        try {
+            processor.initSessionTimeout(context());
+            logger.debug("[{}][{}] Device actor started.", processor.tenantId, processor.deviceId);
+        } catch (Exception e) {
+            logger.error(e, "[{}][{}] Unknown failure", processor.tenantId, processor.deviceId);
+        }
+    }
+
+    @Override
     protected boolean process(TbActorMsg msg) {
         switch (msg.getMsgType()) {
-            case CLUSTER_EVENT_MSG:
-                processor.processClusterEventMsg((ClusterEventMsg) msg);
-                break;
             case TRANSPORT_TO_DEVICE_ACTOR_MSG:
                 processor.process(context(), (TransportToDeviceActorMsgWrapper) msg);
                 break;
@@ -73,6 +81,9 @@ public class DeviceActor extends ContextAwareActor {
             case DEVICE_ACTOR_CLIENT_SIDE_RPC_TIMEOUT_MSG:
                 processor.processClientSideRpcTimeout(context(), (DeviceActorClientSideRpcTimeoutMsg) msg);
                 break;
+            case SESSION_TIMEOUT_MSG:
+                processor.checkSessionsTimeout();
+                break;
             default:
                 return false;
         }
diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
index 021f262..08a4454 100644
--- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
@@ -88,11 +88,11 @@ import java.util.stream.Collectors;
 /**
  * @author Andrew Shvayka
  */
-public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
+class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
 
-    private final TenantId tenantId;
-    private final DeviceId deviceId;
-    private final Map<UUID, SessionInfo> sessions;
+    final TenantId tenantId;
+    final DeviceId deviceId;
+    private final Map<UUID, SessionInfoMetaData> sessions;
     private final Map<UUID, SessionInfo> attributeSubscriptions;
     private final Map<UUID, SessionInfo> rpcSubscriptions;
     private final Map<Integer, ToDeviceRpcRequestMetadata> toDeviceRpcPendingMap;
@@ -116,6 +116,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
         this.toDeviceRpcPendingMap = new HashMap<>();
         this.toServerRpcPendingMap = new HashMap<>();
         initAttributes();
+        restoreSessions();
     }
 
     private void initAttributes() {
@@ -160,7 +161,6 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
         } else {
             logger.debug("[{}] RPC request {} is NOT sent!", deviceId, request.getId());
         }
-
     }
 
     private void registerPendingRpcRequest(ActorContext context, ToDeviceRpcRequestActorMsg msg, boolean sent, ToDeviceRpcRequestMsg rpcRequest, long timeout) {
@@ -174,7 +174,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
         if (requestMd != null) {
             logger.debug("[{}] RPC request [{}] timeout detected!", deviceId, msg.getId());
             systemContext.getDeviceRpcService().processResponseToServerSideRPCRequestFromDeviceActor(new FromDeviceRpcResponse(requestMd.getMsg().getMsg().getId(),
-                     null, requestMd.isSent() ? RpcError.TIMEOUT : RpcError.NO_ACTIVE_CONNECTION));
+                    null, requestMd.isSent() ? RpcError.TIMEOUT : RpcError.NO_ACTIVE_CONNECTION));
         }
     }
 
@@ -227,11 +227,11 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
         }
         if (msg.hasPostAttributes()) {
             handlePostAttributesRequest(context, msg.getSessionInfo(), msg.getPostAttributes());
-            reportActivity();
+            reportLogicalDeviceActivity();
         }
         if (msg.hasPostTelemetry()) {
             handlePostTelemetryRequest(context, msg.getSessionInfo(), msg.getPostTelemetry());
-            reportActivity();
+            reportLogicalDeviceActivity();
         }
         if (msg.hasGetAttributes()) {
             handleGetAttributesRequest(context, msg.getSessionInfo(), msg.getGetAttributes());
@@ -241,11 +241,14 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
         }
         if (msg.hasToServerRPCCallRequest()) {
             handleClientSideRPCRequest(context, msg.getSessionInfo(), msg.getToServerRPCCallRequest());
-            reportActivity();
+            reportLogicalDeviceActivity();
+        }
+        if (msg.hasSubscriptionInfo()) {
+            handleSessionActivity(context, msg.getSessionInfo(), msg.getSubscriptionInfo());
         }
     }
 
-    private void reportActivity() {
+    private void reportLogicalDeviceActivity() {
         systemContext.getDeviceStateService().onDeviceActivity(deviceId);
     }
 
@@ -406,28 +409,20 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
         }
     }
 
-    void processClusterEventMsg(ClusterEventMsg msg) {
-//        if (!msg.isAdded()) {
-//            logger.debug("[{}] Clearing attributes/rpc subscription for server [{}]", deviceId, msg.getServerAddress());
-//            Predicate<Map.Entry<SessionId, SessionInfo>> filter = e -> e.getValue().getServer()
-//                    .map(serverAddress -> serverAddress.equals(msg.getServerAddress())).orElse(false);
-//            attributeSubscriptions.entrySet().removeIf(filter);
-//            rpcSubscriptions.entrySet().removeIf(filter);
-//        }
-    }
-
     private void processSubscriptionCommands(ActorContext context, SessionInfoProto sessionInfo, SubscribeToAttributeUpdatesMsg subscribeCmd) {
         UUID sessionId = getSessionId(sessionInfo);
         if (subscribeCmd.getUnsubscribe()) {
             logger.debug("[{}] Canceling attributes subscription for session [{}]", deviceId, sessionId);
             attributeSubscriptions.remove(sessionId);
         } else {
-            SessionInfo session = sessions.get(sessionId);
-            if (session == null) {
-                session = new SessionInfo(TransportProtos.SessionType.SYNC, sessionInfo.getNodeId());
+            SessionInfoMetaData sessionMD = sessions.get(sessionId);
+            if (sessionMD == null) {
+                sessionMD = new SessionInfoMetaData(new SessionInfo(TransportProtos.SessionType.SYNC, sessionInfo.getNodeId()));
             }
+            sessionMD.setSubscribedToAttributes(true);
             logger.debug("[{}] Registering attributes subscription for session [{}]", deviceId, sessionId);
-            attributeSubscriptions.put(sessionId, session);
+            attributeSubscriptions.put(sessionId, sessionMD.getSessionInfo());
+            dumpSessions();
         }
     }
 
@@ -441,20 +436,22 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
             logger.debug("[{}] Canceling rpc subscription for session [{}]", deviceId, sessionId);
             rpcSubscriptions.remove(sessionId);
         } else {
-            SessionInfo session = sessions.get(sessionId);
-            if (session == null) {
-                session = new SessionInfo(TransportProtos.SessionType.SYNC, sessionInfo.getNodeId());
+            SessionInfoMetaData sessionMD = sessions.get(sessionId);
+            if (sessionMD == null) {
+                sessionMD = new SessionInfoMetaData(new SessionInfo(TransportProtos.SessionType.SYNC, sessionInfo.getNodeId()));
             }
+            sessionMD.setSubscribedToRPC(true);
             logger.debug("[{}] Registering rpc subscription for session [{}]", deviceId, sessionId);
-            rpcSubscriptions.put(sessionId, session);
+            rpcSubscriptions.put(sessionId, sessionMD.getSessionInfo());
             sendPendingRequests(context, sessionId, sessionInfo);
+            dumpSessions();
         }
     }
 
     private void processSessionStateMsgs(SessionInfoProto sessionInfo, SessionEventMsg msg) {
         UUID sessionId = getSessionId(sessionInfo);
         if (msg.getEvent() == SessionEvent.OPEN) {
-            if(sessions.containsKey(sessionId)){
+            if (sessions.containsKey(sessionId)) {
                 logger.debug("[{}] Received duplicate session open event [{}]", deviceId, sessionId);
                 return;
             }
@@ -462,13 +459,14 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
             if (sessions.size() >= systemContext.getMaxConcurrentSessionsPerDevice()) {
                 UUID sessionIdToRemove = sessions.keySet().stream().findFirst().orElse(null);
                 if (sessionIdToRemove != null) {
-                    closeSession(sessionIdToRemove, sessions.remove(sessionIdToRemove));
+                    notifyTransportAboutClosedSession(sessionIdToRemove, sessions.remove(sessionIdToRemove));
                 }
             }
-            sessions.put(sessionId, new SessionInfo(TransportProtos.SessionType.ASYNC, sessionInfo.getNodeId()));
+            sessions.put(sessionId, new SessionInfoMetaData(new SessionInfo(TransportProtos.SessionType.ASYNC, sessionInfo.getNodeId())));
             if (sessions.size() == 1) {
                 reportSessionOpen();
             }
+            dumpSessions();
         } else if (msg.getEvent() == SessionEvent.CLOSED) {
             logger.debug("[{}] Canceling subscriptions for closed session [{}]", deviceId, sessionId);
             sessions.remove(sessionId);
@@ -477,21 +475,34 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
             if (sessions.isEmpty()) {
                 reportSessionClose();
             }
+            dumpSessions();
+        }
+    }
+
+    private void handleSessionActivity(ActorContext context, SessionInfoProto sessionInfo, TransportProtos.SubscriptionInfoProto subscriptionInfo) {
+        UUID sessionId = getSessionId(sessionInfo);
+        SessionInfoMetaData sessionMD = sessions.get(sessionId);
+        if (sessionMD != null) {
+            sessionMD.setLastActivityTime(subscriptionInfo.getLastActivityTime());
+            sessionMD.setSubscribedToAttributes(subscriptionInfo.getAttributeSubscription());
+            sessionMD.setSubscribedToRPC(subscriptionInfo.getRpcSubscription());
         }
+        dumpSessions();
     }
 
     void processCredentialsUpdate() {
-        sessions.forEach(this::closeSession);
+        sessions.forEach(this::notifyTransportAboutClosedSession);
         attributeSubscriptions.clear();
         rpcSubscriptions.clear();
+        dumpSessions();
     }
 
-    private void closeSession(UUID sessionId, SessionInfo sessionInfo) {
+    private void notifyTransportAboutClosedSession(UUID sessionId, SessionInfoMetaData sessionMd) {
         DeviceActorToTransportMsg msg = DeviceActorToTransportMsg.newBuilder()
                 .setSessionIdMSB(sessionId.getMostSignificantBits())
                 .setSessionIdLSB(sessionId.getLeastSignificantBits())
                 .setSessionCloseNotification(SessionCloseNotificationProto.getDefaultInstance()).build();
-        systemContext.getRuleEngineTransportService().process(sessionInfo.getNodeId(), msg);
+        systemContext.getRuleEngineTransportService().process(sessionMd.getSessionInfo().getNodeId(), msg);
     }
 
     void processNameOrTypeUpdate(DeviceNameOrTypeUpdateMsg msg) {
@@ -605,4 +616,67 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
         }
         return builder.build();
     }
+
+    private void restoreSessions() {
+        TransportProtos.DeviceSessionsCacheEntry sessionsDump = systemContext.getDeviceSessionCacheService().get(deviceId);
+        if (sessionsDump.getSerializedSize() == 0) {
+            return;
+        }
+        for (TransportProtos.SessionSubscriptionInfoProto sessionSubscriptionInfoProto : sessionsDump.getSessionsList()) {
+            SessionInfoProto sessionInfoProto = sessionSubscriptionInfoProto.getSessionInfo();
+            UUID sessionId = getSessionId(sessionInfoProto);
+            SessionInfo sessionInfo = new SessionInfo(TransportProtos.SessionType.ASYNC, sessionInfoProto.getNodeId());
+            TransportProtos.SubscriptionInfoProto subInfo = sessionSubscriptionInfoProto.getSubscriptionInfo();
+            SessionInfoMetaData sessionInfoMetaData = new SessionInfoMetaData(sessionInfo, subInfo.getLastActivityTime());
+            sessions.put(sessionId, sessionInfoMetaData);
+            if (subInfo.getAttributeSubscription()) {
+                rpcSubscriptions.put(sessionId, sessionInfo);
+            }
+            if (subInfo.getAttributeSubscription()) {
+                attributeSubscriptions.put(sessionId, sessionInfo);
+            }
+        }
+    }
+
+    private void dumpSessions() {
+        List<TransportProtos.SessionSubscriptionInfoProto> sessionsList = new ArrayList<>(sessions.size());
+        sessions.forEach((uuid, sessionMD) -> {
+            if (sessionMD.getSessionInfo().getType() == TransportProtos.SessionType.SYNC) {
+                return;
+            }
+            SessionInfo sessionInfo = sessionMD.getSessionInfo();
+            TransportProtos.SubscriptionInfoProto subscriptionInfoProto = TransportProtos.SubscriptionInfoProto.newBuilder()
+                    .setLastActivityTime(sessionMD.getLastActivityTime())
+                    .setAttributeSubscription(sessionMD.isSubscribedToAttributes())
+                    .setRpcSubscription(sessionMD.isSubscribedToRPC()).build();
+            TransportProtos.SessionInfoProto sessionInfoProto = TransportProtos.SessionInfoProto.newBuilder()
+                    .setSessionIdMSB(uuid.getMostSignificantBits())
+                    .setSessionIdLSB(uuid.getLeastSignificantBits())
+                    .setNodeId(sessionInfo.getNodeId()).build();
+            sessionsList.add(TransportProtos.SessionSubscriptionInfoProto.newBuilder()
+                    .setSessionInfo(sessionInfoProto)
+                    .setSubscriptionInfo(subscriptionInfoProto).build());
+        });
+        systemContext.getDeviceSessionCacheService()
+                .put(deviceId, TransportProtos.DeviceSessionsCacheEntry.newBuilder()
+                        .addAllSessions(sessionsList).build());
+    }
+
+    void initSessionTimeout(ActorContext context) {
+        schedulePeriodicMsgWithDelay(context, SessionTimeoutCheckMsg.instance(), systemContext.getSessionInactivityTimeout(), systemContext.getSessionInactivityTimeout());
+    }
+
+    void checkSessionsTimeout() {
+        long expTime = System.currentTimeMillis() - systemContext.getSessionInactivityTimeout();
+        Map<UUID, SessionInfoMetaData> sessionsToRemove = sessions.entrySet().stream().filter(kv -> kv.getValue().getLastActivityTime() < expTime).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+        sessionsToRemove.forEach((sessionId, sessionMD) -> {
+            sessions.remove(sessionId);
+            rpcSubscriptions.remove(sessionId);
+            attributeSubscriptions.remove(sessionId);
+            notifyTransportAboutClosedSession(sessionId, sessionMD);
+        });
+        if (!sessionsToRemove.isEmpty()) {
+            dumpSessions();
+        }
+    }
 }
diff --git a/application/src/main/java/org/thingsboard/server/actors/device/SessionInfo.java b/application/src/main/java/org/thingsboard/server/actors/device/SessionInfo.java
index 43ae592..fe09077 100644
--- a/application/src/main/java/org/thingsboard/server/actors/device/SessionInfo.java
+++ b/application/src/main/java/org/thingsboard/server/actors/device/SessionInfo.java
@@ -25,4 +25,5 @@ import org.thingsboard.server.gen.transport.TransportProtos.SessionType;
 public class SessionInfo {
     private final SessionType type;
     private final String nodeId;
+    private long lastActivityTime;
 }
diff --git a/application/src/main/java/org/thingsboard/server/actors/device/SessionInfoMetaData.java b/application/src/main/java/org/thingsboard/server/actors/device/SessionInfoMetaData.java
new file mode 100644
index 0000000..dd08394
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/actors/device/SessionInfoMetaData.java
@@ -0,0 +1,39 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.actors.device;
+
+import lombok.Data;
+import org.thingsboard.server.gen.transport.TransportProtos.SessionType;
+
+/**
+ * @author Andrew Shvayka
+ */
+@Data
+class SessionInfoMetaData {
+    private final SessionInfo sessionInfo;
+    private long lastActivityTime;
+    private boolean subscribedToAttributes;
+    private boolean subscribedToRPC;
+
+    SessionInfoMetaData(SessionInfo sessionInfo) {
+        this(sessionInfo, System.currentTimeMillis());
+    }
+
+    SessionInfoMetaData(SessionInfo sessionInfo, long lastActivityTime) {
+        this.sessionInfo = sessionInfo;
+        this.lastActivityTime = lastActivityTime;
+    }
+}
diff --git a/application/src/main/java/org/thingsboard/server/actors/device/SessionTimeoutCheckMsg.java b/application/src/main/java/org/thingsboard/server/actors/device/SessionTimeoutCheckMsg.java
new file mode 100644
index 0000000..d9172ae
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/actors/device/SessionTimeoutCheckMsg.java
@@ -0,0 +1,39 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.actors.device;
+
+import org.thingsboard.server.common.msg.MsgType;
+import org.thingsboard.server.common.msg.TbActorMsg;
+
+/**
+ * Created by ashvayka on 29.10.18.
+ */
+public class SessionTimeoutCheckMsg implements TbActorMsg {
+
+    private static final SessionTimeoutCheckMsg INSTANCE = new SessionTimeoutCheckMsg();
+
+    private SessionTimeoutCheckMsg() {
+    }
+
+    public static SessionTimeoutCheckMsg instance() {
+        return INSTANCE;
+    }
+
+    @Override
+    public MsgType getMsgType() {
+        return MsgType.SESSION_TIMEOUT_MSG;
+    }
+}
diff --git a/application/src/main/java/org/thingsboard/server/actors/shared/AbstractContextAwareMsgProcessor.java b/application/src/main/java/org/thingsboard/server/actors/shared/AbstractContextAwareMsgProcessor.java
index 8864486..c809782 100644
--- a/application/src/main/java/org/thingsboard/server/actors/shared/AbstractContextAwareMsgProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/shared/AbstractContextAwareMsgProcessor.java
@@ -40,43 +40,31 @@ public abstract class AbstractContextAwareMsgProcessor {
         this.logger = logger;
     }
 
-    protected ActorRef getAppActor() {
-        return systemContext.getAppActor();
-    }
-
-    protected Scheduler getScheduler() {
+    private Scheduler getScheduler() {
         return systemContext.getScheduler();
     }
 
-    protected ExecutionContextExecutor getSystemDispatcher() {
+    private ExecutionContextExecutor getSystemDispatcher() {
         return systemContext.getActorSystem().dispatcher();
     }
 
     protected void schedulePeriodicMsgWithDelay(ActorContext ctx, Object msg, long delayInMs, long periodInMs) {
-        schedulePeriodicMsgWithDelay(ctx, msg, delayInMs, periodInMs, ctx.self());
+        schedulePeriodicMsgWithDelay(msg, delayInMs, periodInMs, ctx.self());
     }
 
-    protected void schedulePeriodicMsgWithDelay(ActorContext ctx, Object msg, long delayInMs, long periodInMs, ActorRef target) {
+    private void schedulePeriodicMsgWithDelay(Object msg, long delayInMs, long periodInMs, ActorRef target) {
         logger.debug("Scheduling periodic msg {} every {} ms with delay {} ms", msg, periodInMs, delayInMs);
         getScheduler().schedule(Duration.create(delayInMs, TimeUnit.MILLISECONDS), Duration.create(periodInMs, TimeUnit.MILLISECONDS), target, msg, getSystemDispatcher(), null);
     }
 
-
     protected void scheduleMsgWithDelay(ActorContext ctx, Object msg, long delayInMs) {
-        scheduleMsgWithDelay(ctx, msg, delayInMs, ctx.self());
+        scheduleMsgWithDelay(msg, delayInMs, ctx.self());
     }
 
-    protected void scheduleMsgWithDelay(ActorContext ctx, Object msg, long delayInMs, ActorRef target) {
+    private void scheduleMsgWithDelay(Object msg, long delayInMs, ActorRef target) {
         logger.debug("Scheduling msg {} with delay {} ms", msg, delayInMs);
         getScheduler().scheduleOnce(Duration.create(delayInMs, TimeUnit.MILLISECONDS), target, msg, getSystemDispatcher(), null);
     }
 
-    @Data
-    @AllArgsConstructor
-    private static class ComponentConfiguration {
-        private final String clazz;
-        private final String name;
-        private final String configuration;
-    }
 
 }
diff --git a/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java b/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java
index 347483a..f822e4e 100644
--- a/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java
@@ -127,7 +127,6 @@ public class TenantActor extends RuleChainManagerActor {
         ruleChainManager.getOrCreateActor(context(), msg.getRuleChainId()).tell(msg, self());
     }
 
-
     private void onToDeviceActorMsg(DeviceAwareMsg msg) {
         getOrCreateDeviceActor(msg.getDeviceId()).tell(msg, ActorRef.noSender());
     }
diff --git a/application/src/main/java/org/thingsboard/server/service/session/DefaultDeviceSessionCacheService.java b/application/src/main/java/org/thingsboard/server/service/session/DefaultDeviceSessionCacheService.java
new file mode 100644
index 0000000..6201dab
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/service/session/DefaultDeviceSessionCacheService.java
@@ -0,0 +1,50 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.service.session;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.cache.annotation.CachePut;
+import org.springframework.cache.annotation.Cacheable;
+import org.springframework.stereotype.Service;
+import org.thingsboard.server.common.data.id.DeviceId;
+import org.thingsboard.server.gen.transport.TransportProtos.DeviceSessionsCacheEntry;
+
+import java.util.ArrayList;
+import java.util.Collections;
+
+import static org.thingsboard.server.common.data.CacheConstants.SESSIONS_CACHE;
+
+/**
+ * Created by ashvayka on 29.10.18.
+ */
+@Service
+@Slf4j
+public class DefaultDeviceSessionCacheService implements DeviceSessionCacheService {
+
+    @Override
+    @Cacheable(cacheNames = SESSIONS_CACHE, key = "#deviceId")
+    public DeviceSessionsCacheEntry get(DeviceId deviceId) {
+        log.debug("[{}] Fetching session data from cache", deviceId);
+        return DeviceSessionsCacheEntry.newBuilder().addAllSessions(Collections.emptyList()).build();
+    }
+
+    @Override
+    @CachePut(cacheNames = SESSIONS_CACHE, key = "#deviceId")
+    public DeviceSessionsCacheEntry put(DeviceId deviceId, DeviceSessionsCacheEntry sessions) {
+        log.debug("[{}] Pushing session data from cache: {}", deviceId, sessions);
+        return sessions;
+    }
+}
diff --git a/application/src/main/java/org/thingsboard/server/service/session/DeviceSessionCacheService.java b/application/src/main/java/org/thingsboard/server/service/session/DeviceSessionCacheService.java
new file mode 100644
index 0000000..0a2e6a5
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/service/session/DeviceSessionCacheService.java
@@ -0,0 +1,30 @@
+/**
+ * Copyright © 2016-2018 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.service.session;
+
+import org.thingsboard.server.common.data.id.DeviceId;
+import org.thingsboard.server.gen.transport.TransportProtos.DeviceSessionsCacheEntry;
+
+/**
+ * Created by ashvayka on 29.10.18.
+ */
+public interface DeviceSessionCacheService {
+
+    DeviceSessionsCacheEntry get(DeviceId deviceId);
+
+    DeviceSessionsCacheEntry put(DeviceId deviceId, DeviceSessionsCacheEntry sessions);
+
+}
diff --git a/application/src/main/java/org/thingsboard/server/service/transport/LocalTransportService.java b/application/src/main/java/org/thingsboard/server/service/transport/LocalTransportService.java
index 0f72230..d3782e8 100644
--- a/application/src/main/java/org/thingsboard/server/service/transport/LocalTransportService.java
+++ b/application/src/main/java/org/thingsboard/server/service/transport/LocalTransportService.java
@@ -161,6 +161,13 @@ public class LocalTransportService extends AbstractTransportService implements R
     }
 
     @Override
+    public void process(SessionInfoProto sessionInfo, TransportProtos.SubscriptionInfoProto msg, TransportServiceCallback<Void> callback) {
+        if (checkLimits(sessionInfo, callback)) {
+            forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setSubscriptionInfo(msg).build(), callback);
+        }
+    }
+
+    @Override
     public void process(SessionInfoProto sessionInfo, SubscribeToAttributeUpdatesMsg msg, TransportServiceCallback<Void> callback) {
         if (checkLimits(sessionInfo, callback)) {
             forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setSubscribeToAttributes(msg).build(), callback);
diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml
index 54096ea..5824293 100644
--- a/application/src/main/resources/thingsboard.yml
+++ b/application/src/main/resources/thingsboard.yml
@@ -202,6 +202,9 @@ caffeine:
     devices:
       timeToLiveInMinutes: 1440
       maxSize: 100000
+    sessions:
+      timeToLiveInMinutes: 1440
+      maxSize: 100000
     assets:
       timeToLiveInMinutes: 1440
       maxSize: 100000
@@ -392,6 +395,9 @@ transport:
       auto_commit_interval: "${TB_RULE_ENGINE_AUTO_COMMIT_INTERVAL_MS:100}"
     notifications:
       topic: "${TB_TRANSPORT_NOTIFICATIONS_TOPIC:tb.transport.notifications}"
+  sessions:
+    inactivity_timeout: "${TB_TRANSPORT_SESSIONS_INACTIVITY_TIMEOUT:300000}"
+    report_timeout: "${TB_TRANSPORT_SESSIONS_REPORT_TIMEOUT:30000}"
   rate_limits:
     enabled: "${TB_TRANSPORT_RATE_LIMITS_ENABLED:false}"
     tenant: "${TB_TRANSPORT_RATE_LIMITS_TENANT:1000:1,20000:60}"
diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/CacheConstants.java b/common/data/src/main/java/org/thingsboard/server/common/data/CacheConstants.java
index 698a69e..853caff 100644
--- a/common/data/src/main/java/org/thingsboard/server/common/data/CacheConstants.java
+++ b/common/data/src/main/java/org/thingsboard/server/common/data/CacheConstants.java
@@ -19,6 +19,7 @@ public class CacheConstants {
     public static final String DEVICE_CREDENTIALS_CACHE = "deviceCredentials";
     public static final String RELATIONS_CACHE = "relations";
     public static final String DEVICE_CACHE = "devices";
+    public static final String SESSIONS_CACHE = "sessions";
     public static final String ASSET_CACHE = "assets";
     public static final String ENTITY_VIEW_CACHE = "entityViews";
 }
diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java b/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java
index 24758b5..44a98d6 100644
--- a/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java
+++ b/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java
@@ -96,13 +96,8 @@ public enum MsgType {
      */
     DEVICE_ACTOR_TO_RULE_ENGINE_MSG,
 
-    /**
-     * Message that is sent from Rule Engine to the Device Actor when message is successfully pushed to queue.
-     */
-    ACTOR_SYSTEM_TO_DEVICE_SESSION_ACTOR_MSG,
-    TRANSPORT_TO_DEVICE_SESSION_ACTOR_MSG,
     SESSION_TIMEOUT_MSG,
-    SESSION_CTRL_MSG,
+
     STATS_PERSIST_TICK_MSG,
 
 
diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
index da8b3a6..a178bdf 100644
--- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
+++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
@@ -43,10 +43,10 @@ import org.thingsboard.server.common.transport.TransportService;
 import org.thingsboard.server.common.transport.TransportServiceCallback;
 import org.thingsboard.server.common.transport.adaptor.AdaptorException;
 import org.thingsboard.server.common.msg.EncryptionUtil;
+import org.thingsboard.server.common.transport.service.AbstractTransportService;
 import org.thingsboard.server.gen.transport.TransportProtos;
 import org.thingsboard.server.gen.transport.TransportProtos.DeviceInfoProto;
 import org.thingsboard.server.gen.transport.TransportProtos.SessionEvent;
-import org.thingsboard.server.gen.transport.TransportProtos.SessionEventMsg;
 import org.thingsboard.server.gen.transport.TransportProtos.SessionInfoProto;
 import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceCredentialsResponseMsg;
 import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceTokenRequestMsg;
@@ -141,9 +141,12 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
                 processUnsubscribe(ctx, (MqttUnsubscribeMessage) msg);
                 break;
             case PINGREQ:
-                //TODO: should we push the notification to the rule engine?
                 if (checkConnected(ctx)) {
                     ctx.writeAndFlush(new MqttMessage(new MqttFixedHeader(PINGRESP, false, AT_MOST_ONCE, false, 0)));
+                    transportService.reportActivity(sessionInfo);
+                    if (gatewaySessionHandler != null) {
+                        gatewaySessionHandler.reportActivity();
+                    }
                 }
                 break;
             case DISCONNECT:
@@ -394,7 +397,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
     private void processDisconnect(ChannelHandlerContext ctx) {
         ctx.close();
         if (deviceSessionCtx.isConnected()) {
-            transportService.process(sessionInfo, getSessionEventMsg(SessionEvent.CLOSED), null);
+            transportService.process(sessionInfo, AbstractTransportService.getSessionEventMsg(SessionEvent.CLOSED), null);
             transportService.deregisterSession(sessionInfo);
             if (gatewaySessionHandler != null) {
                 gatewaySessionHandler.onGatewayDisconnect();
@@ -466,16 +469,10 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
         }
     }
 
-    public static SessionEventMsg getSessionEventMsg(SessionEvent event) {
-        return SessionEventMsg.newBuilder()
-                .setSessionType(TransportProtos.SessionType.ASYNC)
-                .setEvent(event).build();
-    }
-
     @Override
     public void operationComplete(Future<? super Void> future) throws Exception {
         if (deviceSessionCtx.isConnected()) {
-            transportService.process(sessionInfo, getSessionEventMsg(SessionEvent.CLOSED), null);
+            transportService.process(sessionInfo, AbstractTransportService.getSessionEventMsg(SessionEvent.CLOSED), null);
             transportService.deregisterSession(sessionInfo);
         }
     }
@@ -495,7 +492,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
                     .setTenantIdMSB(msg.getDeviceInfo().getTenantIdMSB())
                     .setTenantIdLSB(msg.getDeviceInfo().getTenantIdLSB())
                     .build();
-            transportService.process(sessionInfo, getSessionEventMsg(SessionEvent.OPEN), null);
+            transportService.process(sessionInfo, AbstractTransportService.getSessionEventMsg(SessionEvent.OPEN), null);
             transportService.registerAsyncSession(sessionInfo, this);
             checkGatewaySession();
             ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED));
diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java
index d600059..ac33ba6 100644
--- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java
+++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java
@@ -34,6 +34,7 @@ import org.thingsboard.server.common.transport.TransportService;
 import org.thingsboard.server.common.transport.TransportServiceCallback;
 import org.thingsboard.server.common.transport.adaptor.AdaptorException;
 import org.thingsboard.server.common.transport.adaptor.JsonConverter;
+import org.thingsboard.server.common.transport.service.AbstractTransportService;
 import org.thingsboard.server.gen.transport.TransportProtos;
 import org.thingsboard.server.gen.transport.TransportProtos.DeviceInfoProto;
 import org.thingsboard.server.gen.transport.TransportProtos.GetOrCreateDeviceFromGatewayRequestMsg;
@@ -118,7 +119,7 @@ public class GatewaySessionHandler {
                             GatewayDeviceSessionCtx deviceSessionCtx = new GatewayDeviceSessionCtx(GatewaySessionHandler.this, msg.getDeviceInfo(), mqttQoSMap);
                             if (devices.putIfAbsent(deviceName, deviceSessionCtx) == null) {
                                 SessionInfoProto deviceSessionInfo = deviceSessionCtx.getSessionInfo();
-                                transportService.process(deviceSessionInfo, MqttTransportHandler.getSessionEventMsg(TransportProtos.SessionEvent.OPEN), null);
+                                transportService.process(deviceSessionInfo, AbstractTransportService.getSessionEventMsg(TransportProtos.SessionEvent.OPEN), null);
                                 transportService.process(deviceSessionInfo, TransportProtos.SubscribeToRPCMsg.getDefaultInstance(), null);
                                 transportService.process(deviceSessionInfo, TransportProtos.SubscribeToAttributeUpdatesMsg.getDefaultInstance(), null);
                                 transportService.registerAsyncSession(deviceSessionInfo, deviceSessionCtx);
@@ -334,7 +335,7 @@ public class GatewaySessionHandler {
 
     private void deregisterSession(String deviceName, GatewayDeviceSessionCtx deviceSessionCtx) {
         transportService.deregisterSession(deviceSessionCtx.getSessionInfo());
-        transportService.process(deviceSessionCtx.getSessionInfo(), MqttTransportHandler.getSessionEventMsg(TransportProtos.SessionEvent.CLOSED), null);
+        transportService.process(deviceSessionCtx.getSessionInfo(), AbstractTransportService.getSessionEventMsg(TransportProtos.SessionEvent.CLOSED), null);
         log.debug("[{}] Removed device [{}] from the gateway session", sessionId, deviceName);
     }
 
@@ -360,11 +361,15 @@ public class GatewaySessionHandler {
         return context;
     }
 
-    public MqttTransportAdaptor getAdaptor() {
+    MqttTransportAdaptor getAdaptor() {
         return context.getAdaptor();
     }
 
-    public int nextMsgId() {
+    int nextMsgId() {
         return deviceSessionCtx.nextMsgId();
     }
+
+    public void reportActivity() {
+        devices.forEach((id, deviceCtx) -> transportService.reportActivity(deviceCtx.getSessionInfo()));
+    }
 }
diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/AbstractTransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/AbstractTransportService.java
index 265dacb..3bb84ec 100644
--- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/AbstractTransportService.java
+++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/AbstractTransportService.java
@@ -47,9 +47,14 @@ public abstract class AbstractTransportService implements TransportService {
     private String perTenantLimitsConf;
     @Value("${transport.rate_limits.tenant}")
     private String perDevicesLimitsConf;
+    @Value("${transport.sessions.inactivity_timeout}")
+    private long sessionInactivityTimeout;
+    @Value("${transport.sessions.report_timeout}")
+    private long sessionReportTimeout;
 
     protected ScheduledExecutorService schedulerExecutor;
     protected ExecutorService transportCallbackExecutor;
+
     private ConcurrentMap<UUID, SessionMetaData> sessions = new ConcurrentHashMap<>();
 
     //TODO: Implement cleanup of this maps.
@@ -59,7 +64,81 @@ public abstract class AbstractTransportService implements TransportService {
     @Override
     public void registerAsyncSession(TransportProtos.SessionInfoProto sessionInfo, SessionMsgListener listener) {
         sessions.putIfAbsent(toId(sessionInfo), new SessionMetaData(sessionInfo, TransportProtos.SessionType.ASYNC, listener));
-        //TODO: monitor sessions periodically: PING REQ/RESP, etc.
+    }
+
+    @Override
+    public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.SessionEventMsg msg, TransportServiceCallback<Void> callback) {
+        reportActivityInternal(sessionInfo);
+    }
+
+    @Override
+    public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.PostTelemetryMsg msg, TransportServiceCallback<Void> callback) {
+        reportActivityInternal(sessionInfo);
+    }
+
+    @Override
+    public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.PostAttributeMsg msg, TransportServiceCallback<Void> callback) {
+        reportActivityInternal(sessionInfo);
+    }
+
+    @Override
+    public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.GetAttributeRequestMsg msg, TransportServiceCallback<Void> callback) {
+        reportActivityInternal(sessionInfo);
+    }
+
+    @Override
+    public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.SubscribeToAttributeUpdatesMsg msg, TransportServiceCallback<Void> callback) {
+        SessionMetaData sessionMetaData = reportActivityInternal(sessionInfo);
+        sessionMetaData.setSubscribedToAttributes(!msg.getUnsubscribe());
+    }
+
+    @Override
+    public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.SubscribeToRPCMsg msg, TransportServiceCallback<Void> callback) {
+        SessionMetaData sessionMetaData = reportActivityInternal(sessionInfo);
+        sessionMetaData.setSubscribedToRPC(!msg.getUnsubscribe());
+    }
+
+    @Override
+    public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.ToDeviceRpcResponseMsg msg, TransportServiceCallback<Void> callback) {
+        reportActivityInternal(sessionInfo);
+    }
+
+    @Override
+    public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.ToServerRpcRequestMsg msg, TransportServiceCallback<Void> callback) {
+        reportActivityInternal(sessionInfo);
+    }
+
+    @Override
+    public void reportActivity(TransportProtos.SessionInfoProto sessionInfo) {
+        reportActivityInternal(sessionInfo);
+    }
+
+    private SessionMetaData reportActivityInternal(TransportProtos.SessionInfoProto sessionInfo) {
+        UUID sessionId = toId(sessionInfo);
+        SessionMetaData sessionMetaData = sessions.get(sessionId);
+        if (sessionMetaData != null) {
+            sessionMetaData.updateLastActivityTime();
+        }
+        return sessionMetaData;
+    }
+
+    private void checkInactivityAndReportActivity() {
+        long expTime = System.currentTimeMillis() - sessionInactivityTimeout;
+        sessions.forEach((uuid, sessionMD) -> {
+            if (sessionMD.getLastActivityTime() < expTime) {
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] Session has expired due to last activity time: {}", toId(sessionMD.getSessionInfo()), sessionMD.getLastActivityTime());
+                }
+                process(sessionMD.getSessionInfo(), getSessionEventMsg(TransportProtos.SessionEvent.CLOSED), null);
+                sessions.remove(uuid);
+                sessionMD.getListener().onRemoteSessionCloseCommand(TransportProtos.SessionCloseNotificationProto.getDefaultInstance());
+            } else {
+                process(sessionMD.getSessionInfo(), TransportProtos.SubscriptionInfoProto.newBuilder()
+                        .setAttributeSubscription(sessionMD.isSubscribedToAttributes())
+                        .setRpcSubscription(sessionMD.isSubscribedToRPC())
+                        .setLastActivityTime(sessionMD.getLastActivityTime()).build(), null);
+            }
+        });
     }
 
     @Override
@@ -131,7 +210,7 @@ public abstract class AbstractTransportService implements TransportService {
         }
     }
 
-    protected UUID toId(TransportProtos.SessionInfoProto sessionInfo) {
+    private UUID toId(TransportProtos.SessionInfoProto sessionInfo) {
         return new UUID(sessionInfo.getSessionIdMSB(), sessionInfo.getSessionIdLSB());
     }
 
@@ -147,6 +226,7 @@ public abstract class AbstractTransportService implements TransportService {
         }
         this.schedulerExecutor = Executors.newSingleThreadScheduledExecutor();
         this.transportCallbackExecutor = new ThreadPoolExecutor(0, 20, 60L, TimeUnit.SECONDS, new SynchronousQueue<>());
+        this.schedulerExecutor.scheduleAtFixedRate(this::checkInactivityAndReportActivity, sessionReportTimeout, sessionReportTimeout, TimeUnit.MILLISECONDS);
     }
 
     public void destroy() {
@@ -161,4 +241,10 @@ public abstract class AbstractTransportService implements TransportService {
             transportCallbackExecutor.shutdownNow();
         }
     }
+
+    public static TransportProtos.SessionEventMsg getSessionEventMsg(TransportProtos.SessionEvent event) {
+        return TransportProtos.SessionEventMsg.newBuilder()
+                .setSessionType(TransportProtos.SessionType.ASYNC)
+                .setEvent(event).build();
+    }
 }
diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/RemoteTransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/RemoteTransportService.java
index 6774942..0910961 100644
--- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/RemoteTransportService.java
+++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/RemoteTransportService.java
@@ -228,6 +228,17 @@ public class RemoteTransportService extends AbstractTransportService {
     }
 
     @Override
+    public void process(SessionInfoProto sessionInfo, SubscriptionInfoProto msg, TransportServiceCallback<Void> callback) {
+        if (checkLimits(sessionInfo, callback)) {
+            ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
+                    TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
+                            .setSubscriptionInfo(msg).build()
+            ).build();
+            send(sessionInfo, toRuleEngineMsg, callback);
+        }
+    }
+
+    @Override
     public void process(SessionInfoProto sessionInfo, PostTelemetryMsg msg, TransportServiceCallback<Void> callback) {
         if (checkLimits(sessionInfo, callback)) {
             ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/SessionMetaData.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/SessionMetaData.java
index 1de5711..8642e93 100644
--- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/SessionMetaData.java
+++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/SessionMetaData.java
@@ -23,10 +23,25 @@ import org.thingsboard.server.gen.transport.TransportProtos;
  * Created by ashvayka on 15.10.18.
  */
 @Data
-public class SessionMetaData {
+class SessionMetaData {
 
     private final TransportProtos.SessionInfoProto sessionInfo;
     private final TransportProtos.SessionType sessionType;
     private final SessionMsgListener listener;
 
+    private volatile long lastActivityTime;
+    private volatile boolean subscribedToAttributes;
+    private volatile boolean subscribedToRPC;
+
+    SessionMetaData(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.SessionType sessionType, SessionMsgListener listener) {
+        this.sessionInfo = sessionInfo;
+        this.sessionType = sessionType;
+        this.listener = listener;
+        this.lastActivityTime = System.currentTimeMillis();
+    }
+
+    void updateLastActivityTime() {
+        this.lastActivityTime = System.currentTimeMillis();
+    }
+
 }
diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java
index a47438f..8944e94 100644
--- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java
+++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java
@@ -61,10 +61,14 @@ public interface TransportService {
 
     void process(SessionInfoProto sessionInfo, ToServerRpcRequestMsg msg, TransportServiceCallback<Void> callback);
 
+    void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.SubscriptionInfoProto msg, TransportServiceCallback<Void> callback);
+
     void registerAsyncSession(SessionInfoProto sessionInfo, SessionMsgListener listener);
 
     void registerSyncSession(SessionInfoProto sessionInfo, SessionMsgListener listener, long timeout);
 
+    void reportActivity(SessionInfoProto sessionInfo);
+
     void deregisterSession(SessionInfoProto sessionInfo);
 
 }
diff --git a/transport/coap/src/main/resources/tb-coap-transport.yml b/transport/coap/src/main/resources/tb-coap-transport.yml
index 30351b0..f96bcbe 100644
--- a/transport/coap/src/main/resources/tb-coap-transport.yml
+++ b/transport/coap/src/main/resources/tb-coap-transport.yml
@@ -23,6 +23,9 @@ transport:
     bind_address: "${COAP_BIND_ADDRESS:0.0.0.0}"
     bind_port: "${COAP_BIND_PORT:5683}"
     timeout: "${COAP_TIMEOUT:10000}"
+  sessions:
+    inactivity_timeout: "${TB_TRANSPORT_SESSIONS_INACTIVITY_TIMEOUT:300000}"
+    report_timeout: "${TB_TRANSPORT_SESSIONS_REPORT_TIMEOUT:30000}"
   rate_limits:
     enabled: "${TB_TRANSPORT_RATE_LIMITS_ENABLED:false}"
     tenant: "${TB_TRANSPORT_RATE_LIMITS_TENANT:1000:1,20000:60}"
diff --git a/transport/http/src/main/resources/tb-http-transport.yml b/transport/http/src/main/resources/tb-http-transport.yml
index 001e08a..6d593ed 100644
--- a/transport/http/src/main/resources/tb-http-transport.yml
+++ b/transport/http/src/main/resources/tb-http-transport.yml
@@ -24,6 +24,9 @@ server:
 transport:
   http:
     request_timeout: "${HTTP_REQUEST_TIMEOUT:60000}"
+  sessions:
+    inactivity_timeout: "${TB_TRANSPORT_SESSIONS_INACTIVITY_TIMEOUT:300000}"
+    report_timeout: "${TB_TRANSPORT_SESSIONS_REPORT_TIMEOUT:30000}"
   rate_limits:
     enabled: "${TB_TRANSPORT_RATE_LIMITS_ENABLED:false}"
     tenant: "${TB_TRANSPORT_RATE_LIMITS_TENANT:1000:1,20000:60}"
diff --git a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml
index e37d14e..e7f8942 100644
--- a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml
+++ b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml
@@ -44,8 +44,8 @@ transport:
       # Type of the key store
       key_store_type: "${MQTT_SSL_KEY_STORE_TYPE:JKS}"
   sessions:
-    max_per_tenant: "${TB_TRANSPORT_SESSIONS_MAX_PER_TENANT:1000}"
-    max_per_device: "${TB_TRANSPORT_SESSIONS_MAX_PER_DEVICE:2}"
+    inactivity_timeout: "${TB_TRANSPORT_SESSIONS_INACTIVITY_TIMEOUT:300000}"
+    report_timeout: "${TB_TRANSPORT_SESSIONS_REPORT_TIMEOUT:30000}"
   rate_limits:
     enabled: "${TB_TRANSPORT_RATE_LIMITS_ENABLED:false}"
     tenant: "${TB_TRANSPORT_RATE_LIMITS_TENANT:1000:1,20000:60}"