thingsboard-aplcache

Device State Service

5/3/2018 1:02:04 PM

Details

diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
index fad7a1b..2e1706c 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
@@ -64,6 +64,7 @@ import org.thingsboard.server.service.executors.DbCallbackExecutorService;
 import org.thingsboard.server.service.mail.MailExecutorService;
 import org.thingsboard.server.service.rpc.DeviceRpcService;
 import org.thingsboard.server.service.script.JsExecutorService;
+import org.thingsboard.server.service.state.DeviceStateService;
 import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService;
 
 import java.io.IOException;
@@ -192,6 +193,10 @@ public class ActorSystemContext {
     @Getter
     private MsgQueue msgQueue;
 
+    @Autowired
+    @Getter
+    private DeviceStateService deviceStateService;
+
     @Value("${actors.session.sync.timeout}")
     @Getter
     private long syncSessionTimeout;
diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
index cd043bf..35adf96 100644
--- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
@@ -1,12 +1,12 @@
 /**
  * Copyright © 2016-2018 The Thingsboard Authors
- *
+ * <p>
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -265,17 +265,32 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
                     break;
                 case POST_ATTRIBUTES_REQUEST:
                     handlePostAttributesRequest(context, msg);
+                    reportActivity();
                     break;
                 case POST_TELEMETRY_REQUEST:
                     handlePostTelemetryRequest(context, msg);
+                    reportActivity();
                     break;
                 case TO_SERVER_RPC_REQUEST:
                     handleClientSideRPCRequest(context, msg);
+                    reportActivity();
                     break;
             }
         }
     }
 
+    private void reportActivity() {
+        systemContext.getDeviceStateService().onDeviceActivity(deviceId);
+    }
+
+    private void reportSessionOpen() {
+        systemContext.getDeviceStateService().onDeviceConnect(deviceId);
+    }
+
+    private void reportSessionClose() {
+        systemContext.getDeviceStateService().onDeviceDisconnect(deviceId);
+    }
+
     private void handleGetAttributesRequest(DeviceToDeviceActorMsg src) {
         GetAttributesRequest request = (GetAttributesRequest) src.getPayload();
         ListenableFuture<List<AttributeKvEntry>> clientAttributesFuture = getAttributeKvEntries(deviceId, DataConstants.CLIENT_SCOPE, request.getClientAttributeNames());
@@ -488,11 +503,17 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
         if (inMsg instanceof SessionOpenMsg) {
             logger.debug("[{}] Processing new session [{}]", deviceId, sessionId);
             sessions.put(sessionId, new SessionInfo(SessionType.ASYNC, msg.getServerAddress()));
+            if (sessions.size() == 1) {
+                reportSessionOpen();
+            }
         } else if (inMsg instanceof SessionCloseMsg) {
             logger.debug("[{}] Canceling subscriptions for closed session [{}]", deviceId, sessionId);
             sessions.remove(sessionId);
             attributeSubscriptions.remove(sessionId);
             rpcSubscriptions.remove(sessionId);
+            if (sessions.isEmpty()) {
+                reportSessionClose();
+            }
         }
     }
 
diff --git a/application/src/main/java/org/thingsboard/server/actors/service/ActorService.java b/application/src/main/java/org/thingsboard/server/actors/service/ActorService.java
index 0be0385..e15eb2f 100644
--- a/application/src/main/java/org/thingsboard/server/actors/service/ActorService.java
+++ b/application/src/main/java/org/thingsboard/server/actors/service/ActorService.java
@@ -32,4 +32,5 @@ public interface ActorService extends SessionMsgProcessor, WebSocketMsgProcessor
     void onCredentialsUpdate(TenantId tenantId, DeviceId deviceId);
 
     void onDeviceNameOrTypeUpdate(TenantId tenantId, DeviceId deviceId, String deviceName, String deviceType);
+
 }
diff --git a/application/src/main/java/org/thingsboard/server/controller/BaseController.java b/application/src/main/java/org/thingsboard/server/controller/BaseController.java
index 79d8280..a35e5dc 100644
--- a/application/src/main/java/org/thingsboard/server/controller/BaseController.java
+++ b/application/src/main/java/org/thingsboard/server/controller/BaseController.java
@@ -63,6 +63,7 @@ import org.thingsboard.server.exception.ThingsboardErrorResponseHandler;
 import org.thingsboard.server.common.data.exception.ThingsboardException;
 import org.thingsboard.server.service.component.ComponentDiscoveryService;
 import org.thingsboard.server.service.security.model.SecurityUser;
+import org.thingsboard.server.service.state.DeviceStateService;
 
 import javax.mail.MessagingException;
 import javax.servlet.http.HttpServletRequest;
@@ -137,6 +138,9 @@ public abstract class BaseController {
     @Autowired
     protected DeviceOfflineService offlineService;
 
+    @Autowired
+    protected DeviceStateService deviceStateService;
+
     @ExceptionHandler(ThingsboardException.class)
     public void handleThingsboardException(ThingsboardException ex, HttpServletResponse response) {
         errorResponseHandler.handle(ex, response);
diff --git a/application/src/main/java/org/thingsboard/server/controller/DeviceController.java b/application/src/main/java/org/thingsboard/server/controller/DeviceController.java
index f97603e..7261075 100644
--- a/application/src/main/java/org/thingsboard/server/controller/DeviceController.java
+++ b/application/src/main/java/org/thingsboard/server/controller/DeviceController.java
@@ -1,12 +1,12 @@
 /**
  * Copyright © 2016-2018 The Thingsboard Authors
- *
+ * <p>
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -90,6 +90,11 @@ public class DeviceController extends BaseController {
                     savedDevice.getCustomerId(),
                     device.getId() == null ? ActionType.ADDED : ActionType.UPDATED, null);
 
+            if (device.getId() == null) {
+                deviceStateService.onDeviceAdded(savedDevice);
+            } else {
+                deviceStateService.onDeviceUpdated(savedDevice);
+            }
             return savedDevice;
         } catch (Exception e) {
             logEntityAction(emptyId(EntityType.DEVICE), device,
@@ -112,6 +117,7 @@ public class DeviceController extends BaseController {
                     device.getCustomerId(),
                     ActionType.DELETED, null, strDeviceId);
 
+            deviceStateService.onDeviceDeleted(device);
         } catch (Exception e) {
             logEntityAction(emptyId(EntityType.DEVICE),
                     null,
@@ -387,7 +393,7 @@ public class DeviceController extends BaseController {
     @RequestMapping(value = "/device/online", method = RequestMethod.GET)
     @ResponseBody
     public List<Device> getOnlineDevices(@RequestParam("contactType") DeviceStatusQuery.ContactType contactType,
-                                          @RequestParam("threshold") long threshold) throws ThingsboardException {
+                                         @RequestParam("threshold") long threshold) throws ThingsboardException {
         try {
             TenantId tenantId = getCurrentUser().getTenantId();
             ListenableFuture<List<Device>> offlineDevices = offlineService.findOnlineDevices(tenantId.getId(), contactType, threshold);
diff --git a/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java b/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java
new file mode 100644
index 0000000..3eb7a13
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java
@@ -0,0 +1,375 @@
+package org.thingsboard.server.service.state;
+
+import com.datastax.driver.core.utils.UUIDs;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Function;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningScheduledExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Service;
+import org.thingsboard.server.actors.service.ActorService;
+import org.thingsboard.server.common.data.DataConstants;
+import org.thingsboard.server.common.data.Device;
+import org.thingsboard.server.common.data.Tenant;
+import org.thingsboard.server.common.data.id.DeviceId;
+import org.thingsboard.server.common.data.id.TenantId;
+import org.thingsboard.server.common.data.kv.AttributeKvEntry;
+import org.thingsboard.server.common.data.page.TextPageLink;
+import org.thingsboard.server.common.msg.TbMsg;
+import org.thingsboard.server.common.msg.TbMsgDataType;
+import org.thingsboard.server.common.msg.TbMsgMetaData;
+import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg;
+import org.thingsboard.server.dao.attributes.AttributesService;
+import org.thingsboard.server.dao.device.DeviceService;
+import org.thingsboard.server.dao.tenant.TenantService;
+import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService;
+
+import javax.annotation.Nullable;
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import static org.thingsboard.server.common.data.DataConstants.ACTIVITY_EVENT;
+import static org.thingsboard.server.common.data.DataConstants.CONNECT_EVENT;
+import static org.thingsboard.server.common.data.DataConstants.DISCONNECT_EVENT;
+import static org.thingsboard.server.common.data.DataConstants.INACTIVITY_EVENT;
+
+/**
+ * Created by ashvayka on 01.05.18.
+ */
+@Service
+@Slf4j
+//TODO: refactor to use page links as cursor and not fetch all
+public class DefaultDeviceStateService implements DeviceStateService {
+
+    private static final ObjectMapper json = new ObjectMapper();
+    public static final String ACTIVITY_STATE = "active";
+    public static final String LAST_CONNECT_TIME = "lastConnectTime";
+    public static final String LAST_DISCONNECT_TIME = "lastDisconnectTime";
+    public static final String LAST_ACTIVITY_TIME = "lastActivityTime";
+    public static final String INACTIVITY_ALARM_TIME = "inactivityAlarmTime";
+    public static final String INACTIVITY_TIMEOUT = "inactivityTimeout";
+
+    public static final List<String> PERSISTENT_ATTRIBUTES = Arrays.asList(ACTIVITY_STATE, LAST_CONNECT_TIME, LAST_DISCONNECT_TIME, LAST_ACTIVITY_TIME, INACTIVITY_ALARM_TIME, INACTIVITY_TIMEOUT);
+
+    @Autowired
+    private TenantService tenantService;
+
+    @Autowired
+    private DeviceService deviceService;
+
+    @Autowired
+    private AttributesService attributesService;
+
+    @Autowired
+    private ActorService actorService;
+
+    @Autowired
+    private TelemetrySubscriptionService tsSubService;
+
+    @Value("${state.defaultInactivityTimeoutInSec}")
+    @Getter
+    private long defaultInactivityTimeoutInSec;
+
+    @Value("${state.defaultStateCheckIntervalInSec}")
+    @Getter
+    private long defaultStateCheckIntervalInSec;
+
+// TODO in v2.1
+//    @Value("${state.defaultStatePersistenceIntervalInSec}")
+//    @Getter
+//    private long defaultStatePersistenceIntervalInSec;
+//
+//    @Value("${state.defaultStatePersistencePack}")
+//    @Getter
+//    private long defaultStatePersistencePack;
+
+    private ListeningScheduledExecutorService queueExecutor;
+
+    private ConcurrentMap<TenantId, Set<DeviceId>> tenantDevices = new ConcurrentHashMap<>();
+    private ConcurrentMap<DeviceId, DeviceStateData> deviceStates = new ConcurrentHashMap<>();
+
+    @PostConstruct
+    public void init() {
+        // Should be always single threaded due to absence of locks.
+        queueExecutor = MoreExecutors.listeningDecorator(Executors.newSingleThreadScheduledExecutor());
+        queueExecutor.submit(this::initStateFromDB);
+        queueExecutor.scheduleAtFixedRate(this::updateState, defaultStateCheckIntervalInSec, defaultStateCheckIntervalInSec, TimeUnit.SECONDS);
+        //TODO: schedule persistence in v2.1;
+    }
+
+    @PreDestroy
+    public void stop() {
+        if (queueExecutor != null) {
+            queueExecutor.shutdownNow();
+        }
+    }
+
+    @Override
+    public void onDeviceAdded(Device device) {
+        queueExecutor.submit(() -> onDeviceAddedSync(device));
+    }
+
+    @Override
+    public void onDeviceUpdated(Device device) {
+        queueExecutor.submit(() -> onDeviceUpdatedSync(device));
+    }
+
+    @Override
+    public void onDeviceConnect(DeviceId deviceId) {
+        queueExecutor.submit(() -> onDeviceConnectSync(deviceId));
+    }
+
+    @Override
+    public void onDeviceActivity(DeviceId deviceId) {
+        queueExecutor.submit(() -> onDeviceActivitySync(deviceId));
+    }
+
+    @Override
+    public void onDeviceDisconnect(DeviceId deviceId) {
+        queueExecutor.submit(() -> onDeviceDisconnectSync(deviceId));
+    }
+
+    @Override
+    public void onDeviceDeleted(Device device) {
+        queueExecutor.submit(() -> onDeviceDeleted(device.getTenantId(), device.getId()));
+    }
+
+    @Override
+    public void onDeviceInactivityTimeoutUpdate(DeviceId deviceId, long inactivityTimeout) {
+        queueExecutor.submit(() -> onInactivityTimeoutUpdate(deviceId, inactivityTimeout));
+    }
+
+    @Override
+    public Optional<DeviceState> getDeviceState(DeviceId deviceId) {
+        DeviceStateData state = deviceStates.get(deviceId);
+        if (state != null) {
+            return Optional.of(state.getState());
+        } else {
+            return Optional.empty();
+        }
+    }
+
+    private void initStateFromDB() {
+        List<Tenant> tenants = tenantService.findTenants(new TextPageLink(Integer.MAX_VALUE)).getData();
+        for (Tenant tenant : tenants) {
+            List<ListenableFuture<DeviceStateData>> fetchFutures = new ArrayList<>();
+            List<Device> devices = deviceService.findDevicesByTenantId(tenant.getId(), new TextPageLink(Integer.MAX_VALUE)).getData();
+            for (Device device : devices) {
+                fetchFutures.add(fetchDeviceState(device));
+            }
+            try {
+                Futures.successfulAsList(fetchFutures).get().forEach(this::addDeviceUsingState);
+            } catch (InterruptedException | ExecutionException e) {
+                log.warn("Failed to init device state service from DB", e);
+            }
+        }
+    }
+
+    private void addDeviceUsingState(DeviceStateData state) {
+        tenantDevices.computeIfAbsent(state.getTenantId(), id -> ConcurrentHashMap.newKeySet()).add(state.getDeviceId());
+        deviceStates.put(state.getDeviceId(), state);
+    }
+
+    private void updateState() {
+        long ts = System.currentTimeMillis();
+        Set<DeviceId> deviceIds = new HashSet<>(deviceStates.keySet());
+        for (DeviceId deviceId : deviceIds) {
+            DeviceStateData stateData = deviceStates.get(deviceId);
+            DeviceState state = stateData.getState();
+            state.setActive(ts < state.getLastActivityTime() + state.getInactivityTimeout());
+            if (!state.isActive() && state.getLastInactivityAlarmTime() < state.getLastActivityTime()) {
+                state.setLastInactivityAlarmTime(ts);
+                pushRuleEngineMessage(stateData, INACTIVITY_EVENT);
+                saveAttribute(deviceId, INACTIVITY_ALARM_TIME, ts);
+                saveAttribute(deviceId, ACTIVITY_STATE, state.isActive());
+            }
+        }
+    }
+
+    private void onDeviceConnectSync(DeviceId deviceId) {
+        DeviceStateData stateData = deviceStates.get(deviceId);
+        if (stateData != null) {
+            long ts = System.currentTimeMillis();
+            stateData.getState().setLastConnectTime(ts);
+            pushRuleEngineMessage(stateData, CONNECT_EVENT);
+            saveAttribute(deviceId, LAST_CONNECT_TIME, ts);
+        }
+    }
+
+    private void onDeviceDisconnectSync(DeviceId deviceId) {
+        DeviceStateData stateData = deviceStates.get(deviceId);
+        if (stateData != null) {
+            long ts = System.currentTimeMillis();
+            stateData.getState().setLastDisconnectTime(ts);
+            pushRuleEngineMessage(stateData, DISCONNECT_EVENT);
+            saveAttribute(deviceId, LAST_DISCONNECT_TIME, ts);
+        }
+    }
+
+    private void onDeviceActivitySync(DeviceId deviceId) {
+        DeviceStateData stateData = deviceStates.get(deviceId);
+        if (stateData != null) {
+            DeviceState state = stateData.getState();
+            long ts = System.currentTimeMillis();
+            state.setActive(true);
+            stateData.getState().setLastActivityTime(ts);
+            pushRuleEngineMessage(stateData, ACTIVITY_EVENT);
+            saveAttribute(deviceId, LAST_ACTIVITY_TIME, ts);
+            saveAttribute(deviceId, ACTIVITY_STATE, state.isActive());
+        }
+    }
+
+    private void onInactivityTimeoutUpdate(DeviceId deviceId, long inactivityTimeout) {
+        if (inactivityTimeout == 0L) {
+            return;
+        }
+        DeviceStateData stateData = deviceStates.get(deviceId);
+        if (stateData != null) {
+            long ts = System.currentTimeMillis();
+            DeviceState state = stateData.getState();
+            state.setInactivityTimeout(inactivityTimeout);
+            boolean oldActive = state.isActive();
+            state.setActive(ts < state.getLastActivityTime() + state.getInactivityTimeout());
+            if (!oldActive && state.isActive()) {
+                saveAttribute(deviceId, ACTIVITY_STATE, state.isActive());
+            }
+        }
+    }
+
+    private void onDeviceAddedSync(Device device) {
+        Futures.addCallback(fetchDeviceState(device), new FutureCallback<DeviceStateData>() {
+            @Override
+            public void onSuccess(@Nullable DeviceStateData state) {
+                addDeviceUsingState(state);
+            }
+
+            @Override
+            public void onFailure(Throwable t) {
+                log.warn("Failed to register device to the state service", t);
+            }
+        });
+    }
+
+    private void onDeviceUpdatedSync(Device device) {
+        DeviceStateData stateData = deviceStates.get(device.getId());
+        if (stateData != null) {
+            TbMsgMetaData md = new TbMsgMetaData();
+            md.putValue("deviceName", device.getName());
+            md.putValue("deviceType", device.getType());
+            stateData.setMetaData(md);
+        }
+    }
+
+    private void onDeviceDeleted(TenantId tenantId, DeviceId deviceId) {
+        deviceStates.remove(deviceId);
+        Set<DeviceId> deviceIds = tenantDevices.get(tenantId);
+        if (deviceIds != null) {
+            deviceIds.remove(deviceId);
+            if (deviceIds.isEmpty()) {
+                tenantDevices.remove(tenantId);
+            }
+        }
+    }
+
+    private ListenableFuture<DeviceStateData> fetchDeviceState(Device device) {
+        ListenableFuture<List<AttributeKvEntry>> attributes = attributesService.find(device.getId(), DataConstants.SERVER_SCOPE, PERSISTENT_ATTRIBUTES);
+        return Futures.transform(attributes, new Function<List<AttributeKvEntry>, DeviceStateData>() {
+            @Nullable
+            @Override
+            public DeviceStateData apply(@Nullable List<AttributeKvEntry> attributes) {
+                long lastActivityTime = getAttributeValue(attributes, LAST_ACTIVITY_TIME, 0L);
+                long inactivityAlarmTime = getAttributeValue(attributes, INACTIVITY_ALARM_TIME, 0L);
+                long inactivityTimeout = getAttributeValue(attributes, INACTIVITY_TIMEOUT, TimeUnit.SECONDS.toMillis(defaultInactivityTimeoutInSec));
+                boolean active = System.currentTimeMillis() < lastActivityTime + inactivityTimeout;
+                DeviceState deviceState = DeviceState.builder()
+                        .active(active)
+                        .lastConnectTime(getAttributeValue(attributes, LAST_CONNECT_TIME, 0L))
+                        .lastDisconnectTime(getAttributeValue(attributes, LAST_DISCONNECT_TIME, 0L))
+                        .lastActivityTime(lastActivityTime)
+                        .lastInactivityAlarmTime(inactivityAlarmTime)
+                        .inactivityTimeout(inactivityTimeout)
+                        .build();
+                TbMsgMetaData md = new TbMsgMetaData();
+                md.putValue("deviceName", device.getName());
+                md.putValue("deviceType", device.getType());
+                return DeviceStateData.builder()
+                        .tenantId(device.getTenantId())
+                        .deviceId(device.getId())
+                        .metaData(md)
+                        .state(deviceState).build();
+            }
+        });
+    }
+
+    private long getLastPersistTime(List<AttributeKvEntry> attributes) {
+        return attributes.stream().map(AttributeKvEntry::getLastUpdateTs).max(Long::compare).orElse(0L);
+    }
+
+    private long getAttributeValue(List<AttributeKvEntry> attributes, String attributeName, long defaultValue) {
+        for (AttributeKvEntry attribute : attributes) {
+            if (attribute.getKey().equals(attributeName)) {
+                return attribute.getLongValue().orElse(defaultValue);
+            }
+        }
+        return defaultValue;
+    }
+
+    private void pushRuleEngineMessage(DeviceStateData stateData, String msgType) {
+        DeviceState state = stateData.getState();
+        try {
+            TbMsg tbMsg = new TbMsg(UUIDs.timeBased(), msgType, stateData.getDeviceId(), stateData.getMetaData(), TbMsgDataType.JSON
+                    , json.writeValueAsString(state)
+                    , null, null, 0L);
+            actorService.onMsg(new ServiceToRuleEngineMsg(stateData.getTenantId(), tbMsg));
+        } catch (Exception e) {
+            log.warn("[{}] Failed to push inactivity alarm: {}", stateData.getDeviceId(), state, e);
+        }
+    }
+
+    private void saveAttribute(DeviceId deviceId, String key, long value) {
+        tsSubService.saveAttrAndNotify(deviceId, DataConstants.SERVER_SCOPE, key, value, new AttributeSaveCallback(deviceId, key, value));
+    }
+
+    private void saveAttribute(DeviceId deviceId, String key, boolean value) {
+        tsSubService.saveAttrAndNotify(deviceId, DataConstants.SERVER_SCOPE, key, value, new AttributeSaveCallback(deviceId, key, value));
+    }
+
+    private class AttributeSaveCallback implements FutureCallback<Void> {
+        private final DeviceId deviceId;
+        private final String key;
+        private final Object value;
+
+        AttributeSaveCallback(DeviceId deviceId, String key, Object value) {
+            this.deviceId = deviceId;
+            this.key = key;
+            this.value = value;
+        }
+
+        @Override
+        public void onSuccess(@Nullable Void result) {
+            log.trace("[{}] Successfully updated attribute [{}] with value [{}]", deviceId, key, value);
+        }
+
+        @Override
+        public void onFailure(Throwable t) {
+            log.warn("[{}] Failed to update attribute [{}] with value [{}]", deviceId, key, value, t);
+        }
+    }
+}
diff --git a/application/src/main/java/org/thingsboard/server/service/state/DeviceState.java b/application/src/main/java/org/thingsboard/server/service/state/DeviceState.java
new file mode 100644
index 0000000..be4c352
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/service/state/DeviceState.java
@@ -0,0 +1,20 @@
+package org.thingsboard.server.service.state;
+
+import lombok.Builder;
+import lombok.Data;
+
+/**
+ * Created by ashvayka on 01.05.18.
+ */
+@Data
+@Builder
+public class DeviceState {
+
+    private boolean active;
+    private long lastConnectTime;
+    private long lastActivityTime;
+    private long lastDisconnectTime;
+    private long lastInactivityAlarmTime;
+    private long inactivityTimeout;
+
+}
diff --git a/application/src/main/java/org/thingsboard/server/service/state/DeviceStateData.java b/application/src/main/java/org/thingsboard/server/service/state/DeviceStateData.java
new file mode 100644
index 0000000..c1b6b72
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/service/state/DeviceStateData.java
@@ -0,0 +1,22 @@
+package org.thingsboard.server.service.state;
+
+import lombok.Builder;
+import lombok.Data;
+import org.thingsboard.server.common.data.id.DeviceId;
+import org.thingsboard.server.common.data.id.TenantId;
+import org.thingsboard.server.common.msg.TbMsgMetaData;
+
+/**
+ * Created by ashvayka on 01.05.18.
+ */
+@Data
+@Builder
+class DeviceStateData {
+
+    private final TenantId tenantId;
+    private final DeviceId deviceId;
+
+    private TbMsgMetaData metaData;
+    private final DeviceState state;
+
+}
diff --git a/application/src/main/java/org/thingsboard/server/service/state/DeviceStateService.java b/application/src/main/java/org/thingsboard/server/service/state/DeviceStateService.java
new file mode 100644
index 0000000..43d36f3
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/service/state/DeviceStateService.java
@@ -0,0 +1,29 @@
+package org.thingsboard.server.service.state;
+
+import org.thingsboard.server.common.data.Device;
+import org.thingsboard.server.common.data.id.DeviceId;
+
+import java.util.Optional;
+
+/**
+ * Created by ashvayka on 01.05.18.
+ */
+public interface DeviceStateService {
+
+    void onDeviceAdded(Device device);
+
+    void onDeviceUpdated(Device device);
+
+    void onDeviceDeleted(Device device);
+
+    void onDeviceConnect(DeviceId deviceId);
+
+    void onDeviceActivity(DeviceId deviceId);
+
+    void onDeviceDisconnect(DeviceId deviceId);
+
+    void onDeviceInactivityTimeoutUpdate(DeviceId deviceId, long inactivityTimeout);
+
+    Optional<DeviceState> getDeviceState(DeviceId deviceId);
+
+}
diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java
index 58bbec5..880d2df 100644
--- a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java
+++ b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java
@@ -1,12 +1,12 @@
 /**
  * Copyright © 2016-2018 The Thingsboard Authors
- *
+ * <p>
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -20,11 +20,20 @@ import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Lazy;
 import org.springframework.stereotype.Service;
 import org.springframework.util.StringUtils;
+import org.thingsboard.server.common.data.DataConstants;
+import org.thingsboard.server.common.data.EntityType;
+import org.thingsboard.server.common.data.id.DeviceId;
 import org.thingsboard.server.common.data.id.EntityId;
 import org.thingsboard.server.common.data.kv.AttributeKvEntry;
+import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
 import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
+import org.thingsboard.server.common.data.kv.BooleanDataEntry;
+import org.thingsboard.server.common.data.kv.DoubleDataEntry;
+import org.thingsboard.server.common.data.kv.LongDataEntry;
+import org.thingsboard.server.common.data.kv.StringDataEntry;
 import org.thingsboard.server.common.data.kv.TsKvEntry;
 import org.thingsboard.server.common.msg.cluster.ServerAddress;
 import org.thingsboard.server.dao.attributes.AttributesService;
@@ -34,11 +43,14 @@ import org.thingsboard.server.extensions.core.plugin.telemetry.sub.Subscription;
 import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionState;
 import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionUpdate;
 import org.thingsboard.server.service.cluster.routing.ClusterRoutingService;
+import org.thingsboard.server.service.state.DefaultDeviceStateService;
+import org.thingsboard.server.service.state.DeviceStateService;
 
 import javax.annotation.Nullable;
 import javax.annotation.PostConstruct;
 import javax.annotation.PreDestroy;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -70,6 +82,10 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio
     @Autowired
     private ClusterRoutingService routingService;
 
+    @Autowired
+    @Lazy
+    private DeviceStateService stateService;
+
     private ExecutorService tsCallBackExecutor;
     private ExecutorService wsCallBackExecutor;
 
@@ -149,10 +165,41 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio
         addWsCallback(saveFuture, success -> onAttributesUpdate(entityId, scope, attributes));
     }
 
+    @Override
+    public void saveAttrAndNotify(EntityId entityId, String scope, String key, long value, FutureCallback<Void> callback) {
+        saveAndNotify(entityId, scope, Collections.singletonList(new BaseAttributeKvEntry(new LongDataEntry(key, value)
+                , System.currentTimeMillis())), callback);
+    }
+
+    @Override
+    public void saveAttrAndNotify(EntityId entityId, String scope, String key, String value, FutureCallback<Void> callback) {
+        saveAndNotify(entityId, scope, Collections.singletonList(new BaseAttributeKvEntry(new StringDataEntry(key, value)
+                , System.currentTimeMillis())), callback);
+    }
+
+    @Override
+    public void saveAttrAndNotify(EntityId entityId, String scope, String key, double value, FutureCallback<Void> callback) {
+        saveAndNotify(entityId, scope, Collections.singletonList(new BaseAttributeKvEntry(new DoubleDataEntry(key, value)
+                , System.currentTimeMillis())), callback);
+    }
+
+    @Override
+    public void saveAttrAndNotify(EntityId entityId, String scope, String key, boolean value, FutureCallback<Void> callback) {
+        saveAndNotify(entityId, scope, Collections.singletonList(new BaseAttributeKvEntry(new BooleanDataEntry(key, value)
+                , System.currentTimeMillis())), callback);
+    }
+
     private void onAttributesUpdate(EntityId entityId, String scope, List<AttributeKvEntry> attributes) {
         Optional<ServerAddress> serverAddress = routingService.resolveById(entityId);
         if (!serverAddress.isPresent()) {
             onLocalAttributesUpdate(entityId, scope, attributes);
+            if (entityId.getEntityType() == EntityType.DEVICE && DataConstants.SERVER_SCOPE.equalsIgnoreCase(scope)) {
+                for (AttributeKvEntry attribute : attributes) {
+                    if (attribute.getKey().equals(DefaultDeviceStateService.INACTIVITY_TIMEOUT)) {
+                        stateService.onDeviceInactivityTimeoutUpdate(new DeviceId(entityId.getId()), attribute.getLongValue().orElse(0L));
+                    }
+                }
+            }
         } else {
 //            rpcHandler.onAttributesUpdate(ctx, serverAddress.get(), entityId, entries);
         }
diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml
index 744cc5b..02ae2b8 100644
--- a/application/src/main/resources/thingsboard.yml
+++ b/application/src/main/resources/thingsboard.yml
@@ -357,4 +357,11 @@ audit_log:
     host: "${AUDIT_LOG_SINK_HOST:localhost}"
     port: "${AUDIT_LOG_SINK_POST:9200}"
     user_name: "${AUDIT_LOG_SINK_USER_NAME:}"
-    password: "${AUDIT_LOG_SINK_PASSWORD:}"
\ No newline at end of file
+    password: "${AUDIT_LOG_SINK_PASSWORD:}"
+
+state:
+  defaultInactivityTimeoutInSec: 10
+  defaultStateCheckIntervalInSec: 10
+# TODO in v2.1
+#  defaultStatePersistenceIntervalInSec: 60
+#  defaultStatePersistencePack: 100
\ No newline at end of file
diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java b/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java
index 7d4e480..f4e9559 100644
--- a/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java
+++ b/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java
@@ -45,4 +45,9 @@ public class DataConstants {
     public static final String IN = "IN";
     public static final String OUT = "OUT";
 
+    public static final String INACTIVITY_EVENT = "INACTIVITY_EVENT";
+    public static final String CONNECT_EVENT = "CONNECT_EVENT";
+    public static final String DISCONNECT_EVENT = "DISCONNECT_EVENT";
+    public static final String ACTIVITY_EVENT = "ACTIVITY_EVENT";
+
 }
diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineTelemetryService.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineTelemetryService.java
index 1ba18cd..aa57a02 100644
--- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineTelemetryService.java
+++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineTelemetryService.java
@@ -33,4 +33,12 @@ public interface RuleEngineTelemetryService {
 
     void saveAndNotify(EntityId entityId, String scope, List<AttributeKvEntry> attributes, FutureCallback<Void> callback);
 
+    void saveAttrAndNotify(EntityId entityId, String scope, String key, long value, FutureCallback<Void> callback);
+
+    void saveAttrAndNotify(EntityId entityId, String scope, String key, String value, FutureCallback<Void> callback);
+
+    void saveAttrAndNotify(EntityId entityId, String scope, String key, double value, FutureCallback<Void> callback);
+
+    void saveAttrAndNotify(EntityId entityId, String scope, String key, boolean value, FutureCallback<Void> callback);
+
 }