/**
* Copyright © 2016-2018 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.state;
import com.datastax.driver.core.utils.UUIDs;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.thingsboard.server.actors.service.ActorService;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.Tenant;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.page.TextPageLink;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgDataType;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg;
import org.thingsboard.server.dao.attributes.AttributesService;
import org.thingsboard.server.dao.device.DeviceService;
import org.thingsboard.server.dao.tenant.TenantService;
import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService;
import javax.annotation.Nullable;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import static org.thingsboard.server.common.data.DataConstants.ACTIVITY_EVENT;
import static org.thingsboard.server.common.data.DataConstants.CONNECT_EVENT;
import static org.thingsboard.server.common.data.DataConstants.DISCONNECT_EVENT;
import static org.thingsboard.server.common.data.DataConstants.INACTIVITY_EVENT;
/**
* Created by ashvayka on 01.05.18.
*/
@Service
@Slf4j
//TODO: refactor to use page links as cursor and not fetch all
public class DefaultDeviceStateService implements DeviceStateService {
private static final ObjectMapper json = new ObjectMapper();
public static final String ACTIVITY_STATE = "active";
public static final String LAST_CONNECT_TIME = "lastConnectTime";
public static final String LAST_DISCONNECT_TIME = "lastDisconnectTime";
public static final String LAST_ACTIVITY_TIME = "lastActivityTime";
public static final String INACTIVITY_ALARM_TIME = "inactivityAlarmTime";
public static final String INACTIVITY_TIMEOUT = "inactivityTimeout";
public static final List<String> PERSISTENT_ATTRIBUTES = Arrays.asList(ACTIVITY_STATE, LAST_CONNECT_TIME, LAST_DISCONNECT_TIME, LAST_ACTIVITY_TIME, INACTIVITY_ALARM_TIME, INACTIVITY_TIMEOUT);
@Autowired
private TenantService tenantService;
@Autowired
private DeviceService deviceService;
@Autowired
private AttributesService attributesService;
@Autowired
private ActorService actorService;
@Autowired
private TelemetrySubscriptionService tsSubService;
@Value("${state.defaultInactivityTimeoutInSec}")
@Getter
private long defaultInactivityTimeoutInSec;
@Value("${state.defaultStateCheckIntervalInSec}")
@Getter
private long defaultStateCheckIntervalInSec;
// TODO in v2.1
// @Value("${state.defaultStatePersistenceIntervalInSec}")
// @Getter
// private long defaultStatePersistenceIntervalInSec;
//
// @Value("${state.defaultStatePersistencePack}")
// @Getter
// private long defaultStatePersistencePack;
private ListeningScheduledExecutorService queueExecutor;
private ConcurrentMap<TenantId, Set<DeviceId>> tenantDevices = new ConcurrentHashMap<>();
private ConcurrentMap<DeviceId, DeviceStateData> deviceStates = new ConcurrentHashMap<>();
@PostConstruct
public void init() {
// Should be always single threaded due to absence of locks.
queueExecutor = MoreExecutors.listeningDecorator(Executors.newSingleThreadScheduledExecutor());
queueExecutor.submit(this::initStateFromDB);
queueExecutor.scheduleAtFixedRate(this::updateState, defaultStateCheckIntervalInSec, defaultStateCheckIntervalInSec, TimeUnit.SECONDS);
//TODO: schedule persistence in v2.1;
}
@PreDestroy
public void stop() {
if (queueExecutor != null) {
queueExecutor.shutdownNow();
}
}
@Override
public void onDeviceAdded(Device device) {
queueExecutor.submit(() -> onDeviceAddedSync(device));
}
@Override
public void onDeviceUpdated(Device device) {
queueExecutor.submit(() -> onDeviceUpdatedSync(device));
}
@Override
public void onDeviceConnect(DeviceId deviceId) {
queueExecutor.submit(() -> onDeviceConnectSync(deviceId));
}
@Override
public void onDeviceActivity(DeviceId deviceId) {
queueExecutor.submit(() -> onDeviceActivitySync(deviceId));
}
@Override
public void onDeviceDisconnect(DeviceId deviceId) {
queueExecutor.submit(() -> onDeviceDisconnectSync(deviceId));
}
@Override
public void onDeviceDeleted(Device device) {
queueExecutor.submit(() -> onDeviceDeleted(device.getTenantId(), device.getId()));
}
@Override
public void onDeviceInactivityTimeoutUpdate(DeviceId deviceId, long inactivityTimeout) {
queueExecutor.submit(() -> onInactivityTimeoutUpdate(deviceId, inactivityTimeout));
}
@Override
public Optional<DeviceState> getDeviceState(DeviceId deviceId) {
DeviceStateData state = deviceStates.get(deviceId);
if (state != null) {
return Optional.of(state.getState());
} else {
return Optional.empty();
}
}
private void initStateFromDB() {
List<Tenant> tenants = tenantService.findTenants(new TextPageLink(Integer.MAX_VALUE)).getData();
for (Tenant tenant : tenants) {
List<ListenableFuture<DeviceStateData>> fetchFutures = new ArrayList<>();
List<Device> devices = deviceService.findDevicesByTenantId(tenant.getId(), new TextPageLink(Integer.MAX_VALUE)).getData();
for (Device device : devices) {
fetchFutures.add(fetchDeviceState(device));
}
try {
Futures.successfulAsList(fetchFutures).get().forEach(this::addDeviceUsingState);
} catch (InterruptedException | ExecutionException e) {
log.warn("Failed to init device state service from DB", e);
}
}
}
private void addDeviceUsingState(DeviceStateData state) {
tenantDevices.computeIfAbsent(state.getTenantId(), id -> ConcurrentHashMap.newKeySet()).add(state.getDeviceId());
deviceStates.put(state.getDeviceId(), state);
}
private void updateState() {
long ts = System.currentTimeMillis();
Set<DeviceId> deviceIds = new HashSet<>(deviceStates.keySet());
for (DeviceId deviceId : deviceIds) {
DeviceStateData stateData = deviceStates.get(deviceId);
DeviceState state = stateData.getState();
state.setActive(ts < state.getLastActivityTime() + state.getInactivityTimeout());
if (!state.isActive() && state.getLastInactivityAlarmTime() < state.getLastActivityTime()) {
state.setLastInactivityAlarmTime(ts);
pushRuleEngineMessage(stateData, INACTIVITY_EVENT);
saveAttribute(deviceId, INACTIVITY_ALARM_TIME, ts);
saveAttribute(deviceId, ACTIVITY_STATE, state.isActive());
}
}
}
private void onDeviceConnectSync(DeviceId deviceId) {
DeviceStateData stateData = deviceStates.get(deviceId);
if (stateData != null) {
long ts = System.currentTimeMillis();
stateData.getState().setLastConnectTime(ts);
pushRuleEngineMessage(stateData, CONNECT_EVENT);
saveAttribute(deviceId, LAST_CONNECT_TIME, ts);
}
}
private void onDeviceDisconnectSync(DeviceId deviceId) {
DeviceStateData stateData = deviceStates.get(deviceId);
if (stateData != null) {
long ts = System.currentTimeMillis();
stateData.getState().setLastDisconnectTime(ts);
pushRuleEngineMessage(stateData, DISCONNECT_EVENT);
saveAttribute(deviceId, LAST_DISCONNECT_TIME, ts);
}
}
private void onDeviceActivitySync(DeviceId deviceId) {
DeviceStateData stateData = deviceStates.get(deviceId);
if (stateData != null) {
DeviceState state = stateData.getState();
long ts = System.currentTimeMillis();
state.setActive(true);
stateData.getState().setLastActivityTime(ts);
pushRuleEngineMessage(stateData, ACTIVITY_EVENT);
saveAttribute(deviceId, LAST_ACTIVITY_TIME, ts);
saveAttribute(deviceId, ACTIVITY_STATE, state.isActive());
}
}
private void onInactivityTimeoutUpdate(DeviceId deviceId, long inactivityTimeout) {
if (inactivityTimeout == 0L) {
return;
}
DeviceStateData stateData = deviceStates.get(deviceId);
if (stateData != null) {
long ts = System.currentTimeMillis();
DeviceState state = stateData.getState();
state.setInactivityTimeout(inactivityTimeout);
boolean oldActive = state.isActive();
state.setActive(ts < state.getLastActivityTime() + state.getInactivityTimeout());
if (!oldActive && state.isActive() || oldActive && !state.isActive()) {
saveAttribute(deviceId, ACTIVITY_STATE, state.isActive());
}
}
}
private void onDeviceAddedSync(Device device) {
Futures.addCallback(fetchDeviceState(device), new FutureCallback<DeviceStateData>() {
@Override
public void onSuccess(@Nullable DeviceStateData state) {
addDeviceUsingState(state);
}
@Override
public void onFailure(Throwable t) {
log.warn("Failed to register device to the state service", t);
}
});
}
private void onDeviceUpdatedSync(Device device) {
DeviceStateData stateData = deviceStates.get(device.getId());
if (stateData != null) {
TbMsgMetaData md = new TbMsgMetaData();
md.putValue("deviceName", device.getName());
md.putValue("deviceType", device.getType());
stateData.setMetaData(md);
}
}
private void onDeviceDeleted(TenantId tenantId, DeviceId deviceId) {
deviceStates.remove(deviceId);
Set<DeviceId> deviceIds = tenantDevices.get(tenantId);
if (deviceIds != null) {
deviceIds.remove(deviceId);
if (deviceIds.isEmpty()) {
tenantDevices.remove(tenantId);
}
}
}
private ListenableFuture<DeviceStateData> fetchDeviceState(Device device) {
ListenableFuture<List<AttributeKvEntry>> attributes = attributesService.find(device.getId(), DataConstants.SERVER_SCOPE, PERSISTENT_ATTRIBUTES);
return Futures.transform(attributes, new Function<List<AttributeKvEntry>, DeviceStateData>() {
@Nullable
@Override
public DeviceStateData apply(@Nullable List<AttributeKvEntry> attributes) {
long lastActivityTime = getAttributeValue(attributes, LAST_ACTIVITY_TIME, 0L);
long inactivityAlarmTime = getAttributeValue(attributes, INACTIVITY_ALARM_TIME, 0L);
long inactivityTimeout = getAttributeValue(attributes, INACTIVITY_TIMEOUT, TimeUnit.SECONDS.toMillis(defaultInactivityTimeoutInSec));
boolean active = System.currentTimeMillis() < lastActivityTime + inactivityTimeout;
DeviceState deviceState = DeviceState.builder()
.active(active)
.lastConnectTime(getAttributeValue(attributes, LAST_CONNECT_TIME, 0L))
.lastDisconnectTime(getAttributeValue(attributes, LAST_DISCONNECT_TIME, 0L))
.lastActivityTime(lastActivityTime)
.lastInactivityAlarmTime(inactivityAlarmTime)
.inactivityTimeout(inactivityTimeout)
.build();
TbMsgMetaData md = new TbMsgMetaData();
md.putValue("deviceName", device.getName());
md.putValue("deviceType", device.getType());
return DeviceStateData.builder()
.tenantId(device.getTenantId())
.deviceId(device.getId())
.metaData(md)
.state(deviceState).build();
}
});
}
private long getAttributeValue(List<AttributeKvEntry> attributes, String attributeName, long defaultValue) {
for (AttributeKvEntry attribute : attributes) {
if (attribute.getKey().equals(attributeName)) {
return attribute.getLongValue().orElse(defaultValue);
}
}
return defaultValue;
}
private void pushRuleEngineMessage(DeviceStateData stateData, String msgType) {
DeviceState state = stateData.getState();
try {
TbMsg tbMsg = new TbMsg(UUIDs.timeBased(), msgType, stateData.getDeviceId(), stateData.getMetaData().copy(), TbMsgDataType.JSON
, json.writeValueAsString(state)
, null, null, 0L);
actorService.onMsg(new ServiceToRuleEngineMsg(stateData.getTenantId(), tbMsg));
} catch (Exception e) {
log.warn("[{}] Failed to push inactivity alarm: {}", stateData.getDeviceId(), state, e);
}
}
private void saveAttribute(DeviceId deviceId, String key, long value) {
tsSubService.saveAttrAndNotify(deviceId, DataConstants.SERVER_SCOPE, key, value, new AttributeSaveCallback(deviceId, key, value));
}
private void saveAttribute(DeviceId deviceId, String key, boolean value) {
tsSubService.saveAttrAndNotify(deviceId, DataConstants.SERVER_SCOPE, key, value, new AttributeSaveCallback(deviceId, key, value));
}
private class AttributeSaveCallback implements FutureCallback<Void> {
private final DeviceId deviceId;
private final String key;
private final Object value;
AttributeSaveCallback(DeviceId deviceId, String key, Object value) {
this.deviceId = deviceId;
this.key = key;
this.value = value;
}
@Override
public void onSuccess(@Nullable Void result) {
log.trace("[{}] Successfully updated attribute [{}] with value [{}]", deviceId, key, value);
}
@Override
public void onFailure(Throwable t) {
log.warn("[{}] Failed to update attribute [{}] with value [{}]", deviceId, key, value, t);
}
}
}