thingsboard-aplcache
Changes
application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java 29(+25 -4)
application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java 375(+375 -0)
Details
diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
index fad7a1b..2e1706c 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
@@ -64,6 +64,7 @@ import org.thingsboard.server.service.executors.DbCallbackExecutorService;
import org.thingsboard.server.service.mail.MailExecutorService;
import org.thingsboard.server.service.rpc.DeviceRpcService;
import org.thingsboard.server.service.script.JsExecutorService;
+import org.thingsboard.server.service.state.DeviceStateService;
import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService;
import java.io.IOException;
@@ -192,6 +193,10 @@ public class ActorSystemContext {
@Getter
private MsgQueue msgQueue;
+ @Autowired
+ @Getter
+ private DeviceStateService deviceStateService;
+
@Value("${actors.session.sync.timeout}")
@Getter
private long syncSessionTimeout;
diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
index cd043bf..35adf96 100644
--- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java
@@ -1,12 +1,12 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
- *
+ * <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -265,17 +265,32 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
break;
case POST_ATTRIBUTES_REQUEST:
handlePostAttributesRequest(context, msg);
+ reportActivity();
break;
case POST_TELEMETRY_REQUEST:
handlePostTelemetryRequest(context, msg);
+ reportActivity();
break;
case TO_SERVER_RPC_REQUEST:
handleClientSideRPCRequest(context, msg);
+ reportActivity();
break;
}
}
}
+ private void reportActivity() {
+ systemContext.getDeviceStateService().onDeviceActivity(deviceId);
+ }
+
+ private void reportSessionOpen() {
+ systemContext.getDeviceStateService().onDeviceConnect(deviceId);
+ }
+
+ private void reportSessionClose() {
+ systemContext.getDeviceStateService().onDeviceDisconnect(deviceId);
+ }
+
private void handleGetAttributesRequest(DeviceToDeviceActorMsg src) {
GetAttributesRequest request = (GetAttributesRequest) src.getPayload();
ListenableFuture<List<AttributeKvEntry>> clientAttributesFuture = getAttributeKvEntries(deviceId, DataConstants.CLIENT_SCOPE, request.getClientAttributeNames());
@@ -488,11 +503,17 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
if (inMsg instanceof SessionOpenMsg) {
logger.debug("[{}] Processing new session [{}]", deviceId, sessionId);
sessions.put(sessionId, new SessionInfo(SessionType.ASYNC, msg.getServerAddress()));
+ if (sessions.size() == 1) {
+ reportSessionOpen();
+ }
} else if (inMsg instanceof SessionCloseMsg) {
logger.debug("[{}] Canceling subscriptions for closed session [{}]", deviceId, sessionId);
sessions.remove(sessionId);
attributeSubscriptions.remove(sessionId);
rpcSubscriptions.remove(sessionId);
+ if (sessions.isEmpty()) {
+ reportSessionClose();
+ }
}
}
diff --git a/application/src/main/java/org/thingsboard/server/actors/service/ActorService.java b/application/src/main/java/org/thingsboard/server/actors/service/ActorService.java
index 0be0385..e15eb2f 100644
--- a/application/src/main/java/org/thingsboard/server/actors/service/ActorService.java
+++ b/application/src/main/java/org/thingsboard/server/actors/service/ActorService.java
@@ -32,4 +32,5 @@ public interface ActorService extends SessionMsgProcessor, WebSocketMsgProcessor
void onCredentialsUpdate(TenantId tenantId, DeviceId deviceId);
void onDeviceNameOrTypeUpdate(TenantId tenantId, DeviceId deviceId, String deviceName, String deviceType);
+
}
diff --git a/application/src/main/java/org/thingsboard/server/controller/BaseController.java b/application/src/main/java/org/thingsboard/server/controller/BaseController.java
index 79d8280..a35e5dc 100644
--- a/application/src/main/java/org/thingsboard/server/controller/BaseController.java
+++ b/application/src/main/java/org/thingsboard/server/controller/BaseController.java
@@ -63,6 +63,7 @@ import org.thingsboard.server.exception.ThingsboardErrorResponseHandler;
import org.thingsboard.server.common.data.exception.ThingsboardException;
import org.thingsboard.server.service.component.ComponentDiscoveryService;
import org.thingsboard.server.service.security.model.SecurityUser;
+import org.thingsboard.server.service.state.DeviceStateService;
import javax.mail.MessagingException;
import javax.servlet.http.HttpServletRequest;
@@ -137,6 +138,9 @@ public abstract class BaseController {
@Autowired
protected DeviceOfflineService offlineService;
+ @Autowired
+ protected DeviceStateService deviceStateService;
+
@ExceptionHandler(ThingsboardException.class)
public void handleThingsboardException(ThingsboardException ex, HttpServletResponse response) {
errorResponseHandler.handle(ex, response);
diff --git a/application/src/main/java/org/thingsboard/server/controller/DeviceController.java b/application/src/main/java/org/thingsboard/server/controller/DeviceController.java
index f97603e..7261075 100644
--- a/application/src/main/java/org/thingsboard/server/controller/DeviceController.java
+++ b/application/src/main/java/org/thingsboard/server/controller/DeviceController.java
@@ -1,12 +1,12 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
- *
+ * <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -90,6 +90,11 @@ public class DeviceController extends BaseController {
savedDevice.getCustomerId(),
device.getId() == null ? ActionType.ADDED : ActionType.UPDATED, null);
+ if (device.getId() == null) {
+ deviceStateService.onDeviceAdded(savedDevice);
+ } else {
+ deviceStateService.onDeviceUpdated(savedDevice);
+ }
return savedDevice;
} catch (Exception e) {
logEntityAction(emptyId(EntityType.DEVICE), device,
@@ -112,6 +117,7 @@ public class DeviceController extends BaseController {
device.getCustomerId(),
ActionType.DELETED, null, strDeviceId);
+ deviceStateService.onDeviceDeleted(device);
} catch (Exception e) {
logEntityAction(emptyId(EntityType.DEVICE),
null,
@@ -387,7 +393,7 @@ public class DeviceController extends BaseController {
@RequestMapping(value = "/device/online", method = RequestMethod.GET)
@ResponseBody
public List<Device> getOnlineDevices(@RequestParam("contactType") DeviceStatusQuery.ContactType contactType,
- @RequestParam("threshold") long threshold) throws ThingsboardException {
+ @RequestParam("threshold") long threshold) throws ThingsboardException {
try {
TenantId tenantId = getCurrentUser().getTenantId();
ListenableFuture<List<Device>> offlineDevices = offlineService.findOnlineDevices(tenantId.getId(), contactType, threshold);
diff --git a/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java b/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java
new file mode 100644
index 0000000..3eb7a13
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java
@@ -0,0 +1,375 @@
+package org.thingsboard.server.service.state;
+
+import com.datastax.driver.core.utils.UUIDs;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Function;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningScheduledExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Service;
+import org.thingsboard.server.actors.service.ActorService;
+import org.thingsboard.server.common.data.DataConstants;
+import org.thingsboard.server.common.data.Device;
+import org.thingsboard.server.common.data.Tenant;
+import org.thingsboard.server.common.data.id.DeviceId;
+import org.thingsboard.server.common.data.id.TenantId;
+import org.thingsboard.server.common.data.kv.AttributeKvEntry;
+import org.thingsboard.server.common.data.page.TextPageLink;
+import org.thingsboard.server.common.msg.TbMsg;
+import org.thingsboard.server.common.msg.TbMsgDataType;
+import org.thingsboard.server.common.msg.TbMsgMetaData;
+import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg;
+import org.thingsboard.server.dao.attributes.AttributesService;
+import org.thingsboard.server.dao.device.DeviceService;
+import org.thingsboard.server.dao.tenant.TenantService;
+import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService;
+
+import javax.annotation.Nullable;
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import static org.thingsboard.server.common.data.DataConstants.ACTIVITY_EVENT;
+import static org.thingsboard.server.common.data.DataConstants.CONNECT_EVENT;
+import static org.thingsboard.server.common.data.DataConstants.DISCONNECT_EVENT;
+import static org.thingsboard.server.common.data.DataConstants.INACTIVITY_EVENT;
+
+/**
+ * Created by ashvayka on 01.05.18.
+ */
+@Service
+@Slf4j
+//TODO: refactor to use page links as cursor and not fetch all
+public class DefaultDeviceStateService implements DeviceStateService {
+
+ private static final ObjectMapper json = new ObjectMapper();
+ public static final String ACTIVITY_STATE = "active";
+ public static final String LAST_CONNECT_TIME = "lastConnectTime";
+ public static final String LAST_DISCONNECT_TIME = "lastDisconnectTime";
+ public static final String LAST_ACTIVITY_TIME = "lastActivityTime";
+ public static final String INACTIVITY_ALARM_TIME = "inactivityAlarmTime";
+ public static final String INACTIVITY_TIMEOUT = "inactivityTimeout";
+
+ public static final List<String> PERSISTENT_ATTRIBUTES = Arrays.asList(ACTIVITY_STATE, LAST_CONNECT_TIME, LAST_DISCONNECT_TIME, LAST_ACTIVITY_TIME, INACTIVITY_ALARM_TIME, INACTIVITY_TIMEOUT);
+
+ @Autowired
+ private TenantService tenantService;
+
+ @Autowired
+ private DeviceService deviceService;
+
+ @Autowired
+ private AttributesService attributesService;
+
+ @Autowired
+ private ActorService actorService;
+
+ @Autowired
+ private TelemetrySubscriptionService tsSubService;
+
+ @Value("${state.defaultInactivityTimeoutInSec}")
+ @Getter
+ private long defaultInactivityTimeoutInSec;
+
+ @Value("${state.defaultStateCheckIntervalInSec}")
+ @Getter
+ private long defaultStateCheckIntervalInSec;
+
+// TODO in v2.1
+// @Value("${state.defaultStatePersistenceIntervalInSec}")
+// @Getter
+// private long defaultStatePersistenceIntervalInSec;
+//
+// @Value("${state.defaultStatePersistencePack}")
+// @Getter
+// private long defaultStatePersistencePack;
+
+ private ListeningScheduledExecutorService queueExecutor;
+
+ private ConcurrentMap<TenantId, Set<DeviceId>> tenantDevices = new ConcurrentHashMap<>();
+ private ConcurrentMap<DeviceId, DeviceStateData> deviceStates = new ConcurrentHashMap<>();
+
+ @PostConstruct
+ public void init() {
+ // Should be always single threaded due to absence of locks.
+ queueExecutor = MoreExecutors.listeningDecorator(Executors.newSingleThreadScheduledExecutor());
+ queueExecutor.submit(this::initStateFromDB);
+ queueExecutor.scheduleAtFixedRate(this::updateState, defaultStateCheckIntervalInSec, defaultStateCheckIntervalInSec, TimeUnit.SECONDS);
+ //TODO: schedule persistence in v2.1;
+ }
+
+ @PreDestroy
+ public void stop() {
+ if (queueExecutor != null) {
+ queueExecutor.shutdownNow();
+ }
+ }
+
+ @Override
+ public void onDeviceAdded(Device device) {
+ queueExecutor.submit(() -> onDeviceAddedSync(device));
+ }
+
+ @Override
+ public void onDeviceUpdated(Device device) {
+ queueExecutor.submit(() -> onDeviceUpdatedSync(device));
+ }
+
+ @Override
+ public void onDeviceConnect(DeviceId deviceId) {
+ queueExecutor.submit(() -> onDeviceConnectSync(deviceId));
+ }
+
+ @Override
+ public void onDeviceActivity(DeviceId deviceId) {
+ queueExecutor.submit(() -> onDeviceActivitySync(deviceId));
+ }
+
+ @Override
+ public void onDeviceDisconnect(DeviceId deviceId) {
+ queueExecutor.submit(() -> onDeviceDisconnectSync(deviceId));
+ }
+
+ @Override
+ public void onDeviceDeleted(Device device) {
+ queueExecutor.submit(() -> onDeviceDeleted(device.getTenantId(), device.getId()));
+ }
+
+ @Override
+ public void onDeviceInactivityTimeoutUpdate(DeviceId deviceId, long inactivityTimeout) {
+ queueExecutor.submit(() -> onInactivityTimeoutUpdate(deviceId, inactivityTimeout));
+ }
+
+ @Override
+ public Optional<DeviceState> getDeviceState(DeviceId deviceId) {
+ DeviceStateData state = deviceStates.get(deviceId);
+ if (state != null) {
+ return Optional.of(state.getState());
+ } else {
+ return Optional.empty();
+ }
+ }
+
+ private void initStateFromDB() {
+ List<Tenant> tenants = tenantService.findTenants(new TextPageLink(Integer.MAX_VALUE)).getData();
+ for (Tenant tenant : tenants) {
+ List<ListenableFuture<DeviceStateData>> fetchFutures = new ArrayList<>();
+ List<Device> devices = deviceService.findDevicesByTenantId(tenant.getId(), new TextPageLink(Integer.MAX_VALUE)).getData();
+ for (Device device : devices) {
+ fetchFutures.add(fetchDeviceState(device));
+ }
+ try {
+ Futures.successfulAsList(fetchFutures).get().forEach(this::addDeviceUsingState);
+ } catch (InterruptedException | ExecutionException e) {
+ log.warn("Failed to init device state service from DB", e);
+ }
+ }
+ }
+
+ private void addDeviceUsingState(DeviceStateData state) {
+ tenantDevices.computeIfAbsent(state.getTenantId(), id -> ConcurrentHashMap.newKeySet()).add(state.getDeviceId());
+ deviceStates.put(state.getDeviceId(), state);
+ }
+
+ private void updateState() {
+ long ts = System.currentTimeMillis();
+ Set<DeviceId> deviceIds = new HashSet<>(deviceStates.keySet());
+ for (DeviceId deviceId : deviceIds) {
+ DeviceStateData stateData = deviceStates.get(deviceId);
+ DeviceState state = stateData.getState();
+ state.setActive(ts < state.getLastActivityTime() + state.getInactivityTimeout());
+ if (!state.isActive() && state.getLastInactivityAlarmTime() < state.getLastActivityTime()) {
+ state.setLastInactivityAlarmTime(ts);
+ pushRuleEngineMessage(stateData, INACTIVITY_EVENT);
+ saveAttribute(deviceId, INACTIVITY_ALARM_TIME, ts);
+ saveAttribute(deviceId, ACTIVITY_STATE, state.isActive());
+ }
+ }
+ }
+
+ private void onDeviceConnectSync(DeviceId deviceId) {
+ DeviceStateData stateData = deviceStates.get(deviceId);
+ if (stateData != null) {
+ long ts = System.currentTimeMillis();
+ stateData.getState().setLastConnectTime(ts);
+ pushRuleEngineMessage(stateData, CONNECT_EVENT);
+ saveAttribute(deviceId, LAST_CONNECT_TIME, ts);
+ }
+ }
+
+ private void onDeviceDisconnectSync(DeviceId deviceId) {
+ DeviceStateData stateData = deviceStates.get(deviceId);
+ if (stateData != null) {
+ long ts = System.currentTimeMillis();
+ stateData.getState().setLastDisconnectTime(ts);
+ pushRuleEngineMessage(stateData, DISCONNECT_EVENT);
+ saveAttribute(deviceId, LAST_DISCONNECT_TIME, ts);
+ }
+ }
+
+ private void onDeviceActivitySync(DeviceId deviceId) {
+ DeviceStateData stateData = deviceStates.get(deviceId);
+ if (stateData != null) {
+ DeviceState state = stateData.getState();
+ long ts = System.currentTimeMillis();
+ state.setActive(true);
+ stateData.getState().setLastActivityTime(ts);
+ pushRuleEngineMessage(stateData, ACTIVITY_EVENT);
+ saveAttribute(deviceId, LAST_ACTIVITY_TIME, ts);
+ saveAttribute(deviceId, ACTIVITY_STATE, state.isActive());
+ }
+ }
+
+ private void onInactivityTimeoutUpdate(DeviceId deviceId, long inactivityTimeout) {
+ if (inactivityTimeout == 0L) {
+ return;
+ }
+ DeviceStateData stateData = deviceStates.get(deviceId);
+ if (stateData != null) {
+ long ts = System.currentTimeMillis();
+ DeviceState state = stateData.getState();
+ state.setInactivityTimeout(inactivityTimeout);
+ boolean oldActive = state.isActive();
+ state.setActive(ts < state.getLastActivityTime() + state.getInactivityTimeout());
+ if (!oldActive && state.isActive()) {
+ saveAttribute(deviceId, ACTIVITY_STATE, state.isActive());
+ }
+ }
+ }
+
+ private void onDeviceAddedSync(Device device) {
+ Futures.addCallback(fetchDeviceState(device), new FutureCallback<DeviceStateData>() {
+ @Override
+ public void onSuccess(@Nullable DeviceStateData state) {
+ addDeviceUsingState(state);
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ log.warn("Failed to register device to the state service", t);
+ }
+ });
+ }
+
+ private void onDeviceUpdatedSync(Device device) {
+ DeviceStateData stateData = deviceStates.get(device.getId());
+ if (stateData != null) {
+ TbMsgMetaData md = new TbMsgMetaData();
+ md.putValue("deviceName", device.getName());
+ md.putValue("deviceType", device.getType());
+ stateData.setMetaData(md);
+ }
+ }
+
+ private void onDeviceDeleted(TenantId tenantId, DeviceId deviceId) {
+ deviceStates.remove(deviceId);
+ Set<DeviceId> deviceIds = tenantDevices.get(tenantId);
+ if (deviceIds != null) {
+ deviceIds.remove(deviceId);
+ if (deviceIds.isEmpty()) {
+ tenantDevices.remove(tenantId);
+ }
+ }
+ }
+
+ private ListenableFuture<DeviceStateData> fetchDeviceState(Device device) {
+ ListenableFuture<List<AttributeKvEntry>> attributes = attributesService.find(device.getId(), DataConstants.SERVER_SCOPE, PERSISTENT_ATTRIBUTES);
+ return Futures.transform(attributes, new Function<List<AttributeKvEntry>, DeviceStateData>() {
+ @Nullable
+ @Override
+ public DeviceStateData apply(@Nullable List<AttributeKvEntry> attributes) {
+ long lastActivityTime = getAttributeValue(attributes, LAST_ACTIVITY_TIME, 0L);
+ long inactivityAlarmTime = getAttributeValue(attributes, INACTIVITY_ALARM_TIME, 0L);
+ long inactivityTimeout = getAttributeValue(attributes, INACTIVITY_TIMEOUT, TimeUnit.SECONDS.toMillis(defaultInactivityTimeoutInSec));
+ boolean active = System.currentTimeMillis() < lastActivityTime + inactivityTimeout;
+ DeviceState deviceState = DeviceState.builder()
+ .active(active)
+ .lastConnectTime(getAttributeValue(attributes, LAST_CONNECT_TIME, 0L))
+ .lastDisconnectTime(getAttributeValue(attributes, LAST_DISCONNECT_TIME, 0L))
+ .lastActivityTime(lastActivityTime)
+ .lastInactivityAlarmTime(inactivityAlarmTime)
+ .inactivityTimeout(inactivityTimeout)
+ .build();
+ TbMsgMetaData md = new TbMsgMetaData();
+ md.putValue("deviceName", device.getName());
+ md.putValue("deviceType", device.getType());
+ return DeviceStateData.builder()
+ .tenantId(device.getTenantId())
+ .deviceId(device.getId())
+ .metaData(md)
+ .state(deviceState).build();
+ }
+ });
+ }
+
+ private long getLastPersistTime(List<AttributeKvEntry> attributes) {
+ return attributes.stream().map(AttributeKvEntry::getLastUpdateTs).max(Long::compare).orElse(0L);
+ }
+
+ private long getAttributeValue(List<AttributeKvEntry> attributes, String attributeName, long defaultValue) {
+ for (AttributeKvEntry attribute : attributes) {
+ if (attribute.getKey().equals(attributeName)) {
+ return attribute.getLongValue().orElse(defaultValue);
+ }
+ }
+ return defaultValue;
+ }
+
+ private void pushRuleEngineMessage(DeviceStateData stateData, String msgType) {
+ DeviceState state = stateData.getState();
+ try {
+ TbMsg tbMsg = new TbMsg(UUIDs.timeBased(), msgType, stateData.getDeviceId(), stateData.getMetaData(), TbMsgDataType.JSON
+ , json.writeValueAsString(state)
+ , null, null, 0L);
+ actorService.onMsg(new ServiceToRuleEngineMsg(stateData.getTenantId(), tbMsg));
+ } catch (Exception e) {
+ log.warn("[{}] Failed to push inactivity alarm: {}", stateData.getDeviceId(), state, e);
+ }
+ }
+
+ private void saveAttribute(DeviceId deviceId, String key, long value) {
+ tsSubService.saveAttrAndNotify(deviceId, DataConstants.SERVER_SCOPE, key, value, new AttributeSaveCallback(deviceId, key, value));
+ }
+
+ private void saveAttribute(DeviceId deviceId, String key, boolean value) {
+ tsSubService.saveAttrAndNotify(deviceId, DataConstants.SERVER_SCOPE, key, value, new AttributeSaveCallback(deviceId, key, value));
+ }
+
+ private class AttributeSaveCallback implements FutureCallback<Void> {
+ private final DeviceId deviceId;
+ private final String key;
+ private final Object value;
+
+ AttributeSaveCallback(DeviceId deviceId, String key, Object value) {
+ this.deviceId = deviceId;
+ this.key = key;
+ this.value = value;
+ }
+
+ @Override
+ public void onSuccess(@Nullable Void result) {
+ log.trace("[{}] Successfully updated attribute [{}] with value [{}]", deviceId, key, value);
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ log.warn("[{}] Failed to update attribute [{}] with value [{}]", deviceId, key, value, t);
+ }
+ }
+}
diff --git a/application/src/main/java/org/thingsboard/server/service/state/DeviceState.java b/application/src/main/java/org/thingsboard/server/service/state/DeviceState.java
new file mode 100644
index 0000000..be4c352
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/service/state/DeviceState.java
@@ -0,0 +1,20 @@
+package org.thingsboard.server.service.state;
+
+import lombok.Builder;
+import lombok.Data;
+
+/**
+ * Created by ashvayka on 01.05.18.
+ */
+@Data
+@Builder
+public class DeviceState {
+
+ private boolean active;
+ private long lastConnectTime;
+ private long lastActivityTime;
+ private long lastDisconnectTime;
+ private long lastInactivityAlarmTime;
+ private long inactivityTimeout;
+
+}
diff --git a/application/src/main/java/org/thingsboard/server/service/state/DeviceStateData.java b/application/src/main/java/org/thingsboard/server/service/state/DeviceStateData.java
new file mode 100644
index 0000000..c1b6b72
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/service/state/DeviceStateData.java
@@ -0,0 +1,22 @@
+package org.thingsboard.server.service.state;
+
+import lombok.Builder;
+import lombok.Data;
+import org.thingsboard.server.common.data.id.DeviceId;
+import org.thingsboard.server.common.data.id.TenantId;
+import org.thingsboard.server.common.msg.TbMsgMetaData;
+
+/**
+ * Created by ashvayka on 01.05.18.
+ */
+@Data
+@Builder
+class DeviceStateData {
+
+ private final TenantId tenantId;
+ private final DeviceId deviceId;
+
+ private TbMsgMetaData metaData;
+ private final DeviceState state;
+
+}
diff --git a/application/src/main/java/org/thingsboard/server/service/state/DeviceStateService.java b/application/src/main/java/org/thingsboard/server/service/state/DeviceStateService.java
new file mode 100644
index 0000000..43d36f3
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/service/state/DeviceStateService.java
@@ -0,0 +1,29 @@
+package org.thingsboard.server.service.state;
+
+import org.thingsboard.server.common.data.Device;
+import org.thingsboard.server.common.data.id.DeviceId;
+
+import java.util.Optional;
+
+/**
+ * Created by ashvayka on 01.05.18.
+ */
+public interface DeviceStateService {
+
+ void onDeviceAdded(Device device);
+
+ void onDeviceUpdated(Device device);
+
+ void onDeviceDeleted(Device device);
+
+ void onDeviceConnect(DeviceId deviceId);
+
+ void onDeviceActivity(DeviceId deviceId);
+
+ void onDeviceDisconnect(DeviceId deviceId);
+
+ void onDeviceInactivityTimeoutUpdate(DeviceId deviceId, long inactivityTimeout);
+
+ Optional<DeviceState> getDeviceState(DeviceId deviceId);
+
+}
diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java
index 58bbec5..880d2df 100644
--- a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java
+++ b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java
@@ -1,12 +1,12 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
- *
+ * <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -20,11 +20,20 @@ import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
+import org.thingsboard.server.common.data.DataConstants;
+import org.thingsboard.server.common.data.EntityType;
+import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
+import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
+import org.thingsboard.server.common.data.kv.BooleanDataEntry;
+import org.thingsboard.server.common.data.kv.DoubleDataEntry;
+import org.thingsboard.server.common.data.kv.LongDataEntry;
+import org.thingsboard.server.common.data.kv.StringDataEntry;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.msg.cluster.ServerAddress;
import org.thingsboard.server.dao.attributes.AttributesService;
@@ -34,11 +43,14 @@ import org.thingsboard.server.extensions.core.plugin.telemetry.sub.Subscription;
import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionState;
import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionUpdate;
import org.thingsboard.server.service.cluster.routing.ClusterRoutingService;
+import org.thingsboard.server.service.state.DefaultDeviceStateService;
+import org.thingsboard.server.service.state.DeviceStateService;
import javax.annotation.Nullable;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -70,6 +82,10 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio
@Autowired
private ClusterRoutingService routingService;
+ @Autowired
+ @Lazy
+ private DeviceStateService stateService;
+
private ExecutorService tsCallBackExecutor;
private ExecutorService wsCallBackExecutor;
@@ -149,10 +165,41 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio
addWsCallback(saveFuture, success -> onAttributesUpdate(entityId, scope, attributes));
}
+ @Override
+ public void saveAttrAndNotify(EntityId entityId, String scope, String key, long value, FutureCallback<Void> callback) {
+ saveAndNotify(entityId, scope, Collections.singletonList(new BaseAttributeKvEntry(new LongDataEntry(key, value)
+ , System.currentTimeMillis())), callback);
+ }
+
+ @Override
+ public void saveAttrAndNotify(EntityId entityId, String scope, String key, String value, FutureCallback<Void> callback) {
+ saveAndNotify(entityId, scope, Collections.singletonList(new BaseAttributeKvEntry(new StringDataEntry(key, value)
+ , System.currentTimeMillis())), callback);
+ }
+
+ @Override
+ public void saveAttrAndNotify(EntityId entityId, String scope, String key, double value, FutureCallback<Void> callback) {
+ saveAndNotify(entityId, scope, Collections.singletonList(new BaseAttributeKvEntry(new DoubleDataEntry(key, value)
+ , System.currentTimeMillis())), callback);
+ }
+
+ @Override
+ public void saveAttrAndNotify(EntityId entityId, String scope, String key, boolean value, FutureCallback<Void> callback) {
+ saveAndNotify(entityId, scope, Collections.singletonList(new BaseAttributeKvEntry(new BooleanDataEntry(key, value)
+ , System.currentTimeMillis())), callback);
+ }
+
private void onAttributesUpdate(EntityId entityId, String scope, List<AttributeKvEntry> attributes) {
Optional<ServerAddress> serverAddress = routingService.resolveById(entityId);
if (!serverAddress.isPresent()) {
onLocalAttributesUpdate(entityId, scope, attributes);
+ if (entityId.getEntityType() == EntityType.DEVICE && DataConstants.SERVER_SCOPE.equalsIgnoreCase(scope)) {
+ for (AttributeKvEntry attribute : attributes) {
+ if (attribute.getKey().equals(DefaultDeviceStateService.INACTIVITY_TIMEOUT)) {
+ stateService.onDeviceInactivityTimeoutUpdate(new DeviceId(entityId.getId()), attribute.getLongValue().orElse(0L));
+ }
+ }
+ }
} else {
// rpcHandler.onAttributesUpdate(ctx, serverAddress.get(), entityId, entries);
}
diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml
index 744cc5b..02ae2b8 100644
--- a/application/src/main/resources/thingsboard.yml
+++ b/application/src/main/resources/thingsboard.yml
@@ -357,4 +357,11 @@ audit_log:
host: "${AUDIT_LOG_SINK_HOST:localhost}"
port: "${AUDIT_LOG_SINK_POST:9200}"
user_name: "${AUDIT_LOG_SINK_USER_NAME:}"
- password: "${AUDIT_LOG_SINK_PASSWORD:}"
\ No newline at end of file
+ password: "${AUDIT_LOG_SINK_PASSWORD:}"
+
+state:
+ defaultInactivityTimeoutInSec: 10
+ defaultStateCheckIntervalInSec: 10
+# TODO in v2.1
+# defaultStatePersistenceIntervalInSec: 60
+# defaultStatePersistencePack: 100
\ No newline at end of file
diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java b/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java
index 7d4e480..f4e9559 100644
--- a/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java
+++ b/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java
@@ -45,4 +45,9 @@ public class DataConstants {
public static final String IN = "IN";
public static final String OUT = "OUT";
+ public static final String INACTIVITY_EVENT = "INACTIVITY_EVENT";
+ public static final String CONNECT_EVENT = "CONNECT_EVENT";
+ public static final String DISCONNECT_EVENT = "DISCONNECT_EVENT";
+ public static final String ACTIVITY_EVENT = "ACTIVITY_EVENT";
+
}
diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineTelemetryService.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineTelemetryService.java
index 1ba18cd..aa57a02 100644
--- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineTelemetryService.java
+++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineTelemetryService.java
@@ -33,4 +33,12 @@ public interface RuleEngineTelemetryService {
void saveAndNotify(EntityId entityId, String scope, List<AttributeKvEntry> attributes, FutureCallback<Void> callback);
+ void saveAttrAndNotify(EntityId entityId, String scope, String key, long value, FutureCallback<Void> callback);
+
+ void saveAttrAndNotify(EntityId entityId, String scope, String key, String value, FutureCallback<Void> callback);
+
+ void saveAttrAndNotify(EntityId entityId, String scope, String key, double value, FutureCallback<Void> callback);
+
+ void saveAttrAndNotify(EntityId entityId, String scope, String key, boolean value, FutureCallback<Void> callback);
+
}