DefaultDeviceStateService.java

494 lines | 21.217 kB Blame History Raw Download
/**
 * Copyright © 2016-2018 The Thingsboard Authors
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.thingsboard.server.service.state;

import com.datastax.driver.core.utils.UUIDs;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.InvalidProtocolBufferException;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.thingsboard.rule.engine.api.RpcError;
import org.thingsboard.server.actors.service.ActorService;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.Tenant;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.page.TextPageLink;
import org.thingsboard.server.common.data.plugin.ComponentLifecycleState;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgDataType;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.common.msg.cluster.ServerAddress;
import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg;
import org.thingsboard.server.dao.attributes.AttributesService;
import org.thingsboard.server.dao.device.DeviceService;
import org.thingsboard.server.dao.tenant.TenantService;
import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
import org.thingsboard.server.service.cluster.routing.ClusterRoutingService;
import org.thingsboard.server.service.cluster.rpc.ClusterRpcService;
import org.thingsboard.server.service.rpc.FromDeviceRpcResponse;
import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService;

import javax.annotation.Nullable;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import static org.thingsboard.server.common.data.DataConstants.ACTIVITY_EVENT;
import static org.thingsboard.server.common.data.DataConstants.CONNECT_EVENT;
import static org.thingsboard.server.common.data.DataConstants.DISCONNECT_EVENT;
import static org.thingsboard.server.common.data.DataConstants.INACTIVITY_EVENT;

/**
 * Created by ashvayka on 01.05.18.
 */
@Service
@Slf4j
//TODO: refactor to use page links as cursor and not fetch all
public class DefaultDeviceStateService implements DeviceStateService {

    private static final ObjectMapper json = new ObjectMapper();
    public static final String ACTIVITY_STATE = "active";
    public static final String LAST_CONNECT_TIME = "lastConnectTime";
    public static final String LAST_DISCONNECT_TIME = "lastDisconnectTime";
    public static final String LAST_ACTIVITY_TIME = "lastActivityTime";
    public static final String INACTIVITY_ALARM_TIME = "inactivityAlarmTime";
    public static final String INACTIVITY_TIMEOUT = "inactivityTimeout";

    public static final List<String> PERSISTENT_ATTRIBUTES = Arrays.asList(ACTIVITY_STATE, LAST_CONNECT_TIME, LAST_DISCONNECT_TIME, LAST_ACTIVITY_TIME, INACTIVITY_ALARM_TIME, INACTIVITY_TIMEOUT);

    @Autowired
    private TenantService tenantService;

    @Autowired
    private DeviceService deviceService;

    @Autowired
    private AttributesService attributesService;

    @Autowired
    private ActorService actorService;

    @Autowired
    private TelemetrySubscriptionService tsSubService;

    @Autowired
    private ClusterRoutingService routingService;

    @Autowired
    private ClusterRpcService clusterRpcService;

    @Value("${state.defaultInactivityTimeoutInSec}")
    @Getter
    private long defaultInactivityTimeoutInSec;

    @Value("${state.defaultStateCheckIntervalInSec}")
    @Getter
    private long defaultStateCheckIntervalInSec;

// TODO in v2.1
//    @Value("${state.defaultStatePersistenceIntervalInSec}")
//    @Getter
//    private long defaultStatePersistenceIntervalInSec;
//
//    @Value("${state.defaultStatePersistencePack}")
//    @Getter
//    private long defaultStatePersistencePack;

    private ListeningScheduledExecutorService queueExecutor;

    private ConcurrentMap<TenantId, Set<DeviceId>> tenantDevices = new ConcurrentHashMap<>();
    private ConcurrentMap<DeviceId, DeviceStateData> deviceStates = new ConcurrentHashMap<>();

    @PostConstruct
    public void init() {
        // Should be always single threaded due to absence of locks.
        queueExecutor = MoreExecutors.listeningDecorator(Executors.newSingleThreadScheduledExecutor());
        queueExecutor.submit(this::initStateFromDB);
        queueExecutor.scheduleAtFixedRate(this::updateState, defaultStateCheckIntervalInSec, defaultStateCheckIntervalInSec, TimeUnit.SECONDS);
        //TODO: schedule persistence in v2.1;
    }

    @PreDestroy
    public void stop() {
        if (queueExecutor != null) {
            queueExecutor.shutdownNow();
        }
    }

    @Override
    public void onDeviceAdded(Device device) {
        queueExecutor.submit(() -> onDeviceAddedSync(device));
    }

    @Override
    public void onDeviceUpdated(Device device) {
        queueExecutor.submit(() -> onDeviceUpdatedSync(device));
    }

    @Override
    public void onDeviceConnect(DeviceId deviceId) {
        queueExecutor.submit(() -> onDeviceConnectSync(deviceId));
    }

    @Override
    public void onDeviceActivity(DeviceId deviceId) {
        queueExecutor.submit(() -> onDeviceActivitySync(deviceId));
    }

    @Override
    public void onDeviceDisconnect(DeviceId deviceId) {
        queueExecutor.submit(() -> onDeviceDisconnectSync(deviceId));
    }

    @Override
    public void onDeviceDeleted(Device device) {
        queueExecutor.submit(() -> onDeviceDeleted(device.getTenantId(), device.getId()));
    }

    @Override
    public void onDeviceInactivityTimeoutUpdate(DeviceId deviceId, long inactivityTimeout) {
        queueExecutor.submit(() -> onInactivityTimeoutUpdate(deviceId, inactivityTimeout));
    }

    @Override
    public void onClusterUpdate() {
        queueExecutor.submit(this::onClusterUpdateSync);
    }

    @Override
    public void onRemoteMsg(ServerAddress serverAddress, byte[] data) {
        ClusterAPIProtos.DeviceStateServiceMsgProto proto;
        try {
            proto = ClusterAPIProtos.DeviceStateServiceMsgProto.parseFrom(data);
        } catch (InvalidProtocolBufferException e) {
            throw new RuntimeException(e);
        }
        TenantId tenantId = new TenantId(new UUID(proto.getTenantIdMSB(), proto.getTenantIdLSB()));
        DeviceId deviceId = new DeviceId(new UUID(proto.getDeviceIdMSB(), proto.getDeviceIdLSB()));
        if (proto.getDeleted()) {
            queueExecutor.submit(() -> onDeviceDeleted(tenantId, deviceId));
        } else {
            Device device = deviceService.findDeviceById(deviceId);
            if (device != null) {
                if (proto.getAdded()) {
                    onDeviceAdded(device);
                } else if (proto.getUpdated()) {
                    onDeviceUpdated(device);
                }
            }
        }
    }

    private void onClusterUpdateSync() {
        List<Tenant> tenants = tenantService.findTenants(new TextPageLink(Integer.MAX_VALUE)).getData();
        for (Tenant tenant : tenants) {
            List<ListenableFuture<DeviceStateData>> fetchFutures = new ArrayList<>();
            List<Device> devices = deviceService.findDevicesByTenantId(tenant.getId(), new TextPageLink(Integer.MAX_VALUE)).getData();
            for (Device device : devices) {
                if (!routingService.resolveById(device.getId()).isPresent()) {
                    if (!deviceStates.containsKey(device.getId())) {
                        fetchFutures.add(fetchDeviceState(device));
                    }
                } else {
                    Set<DeviceId> tenantDeviceSet = tenantDevices.get(tenant.getId());
                    if (tenantDeviceSet != null) {
                        tenantDeviceSet.remove(device.getId());
                    }
                    deviceStates.remove(device.getId());
                }
            }
            try {
                Futures.successfulAsList(fetchFutures).get().forEach(this::addDeviceUsingState);
            } catch (InterruptedException | ExecutionException e) {
                log.warn("Failed to init device state service from DB", e);
            }
        }
    }

    private void initStateFromDB() {
        List<Tenant> tenants = tenantService.findTenants(new TextPageLink(Integer.MAX_VALUE)).getData();
        for (Tenant tenant : tenants) {
            List<ListenableFuture<DeviceStateData>> fetchFutures = new ArrayList<>();
            List<Device> devices = deviceService.findDevicesByTenantId(tenant.getId(), new TextPageLink(Integer.MAX_VALUE)).getData();
            for (Device device : devices) {
                if (!routingService.resolveById(device.getId()).isPresent()) {
                    fetchFutures.add(fetchDeviceState(device));
                }
            }
            try {
                Futures.successfulAsList(fetchFutures).get().forEach(this::addDeviceUsingState);
            } catch (InterruptedException | ExecutionException e) {
                log.warn("Failed to init device state service from DB", e);
            }
        }
    }

    private void addDeviceUsingState(DeviceStateData state) {
        tenantDevices.computeIfAbsent(state.getTenantId(), id -> ConcurrentHashMap.newKeySet()).add(state.getDeviceId());
        deviceStates.put(state.getDeviceId(), state);
    }

    private void updateState() {
        long ts = System.currentTimeMillis();
        Set<DeviceId> deviceIds = new HashSet<>(deviceStates.keySet());
        for (DeviceId deviceId : deviceIds) {
            DeviceStateData stateData = deviceStates.get(deviceId);
            DeviceState state = stateData.getState();
            state.setActive(ts < state.getLastActivityTime() + state.getInactivityTimeout());
            if (!state.isActive() && (state.getLastInactivityAlarmTime() == 0L || state.getLastInactivityAlarmTime() < state.getLastActivityTime())) {
                state.setLastInactivityAlarmTime(ts);
                pushRuleEngineMessage(stateData, INACTIVITY_EVENT);
                saveAttribute(deviceId, INACTIVITY_ALARM_TIME, ts);
                saveAttribute(deviceId, ACTIVITY_STATE, state.isActive());
            }
        }
    }

    private void onDeviceConnectSync(DeviceId deviceId) {
        DeviceStateData stateData = getOrFetchDeviceStateData(deviceId);
        if (stateData != null) {
            long ts = System.currentTimeMillis();
            stateData.getState().setLastConnectTime(ts);
            pushRuleEngineMessage(stateData, CONNECT_EVENT);
            saveAttribute(deviceId, LAST_CONNECT_TIME, ts);
        }
    }

    private void onDeviceDisconnectSync(DeviceId deviceId) {
        DeviceStateData stateData = getOrFetchDeviceStateData(deviceId);
        if (stateData != null) {
            long ts = System.currentTimeMillis();
            stateData.getState().setLastDisconnectTime(ts);
            pushRuleEngineMessage(stateData, DISCONNECT_EVENT);
            saveAttribute(deviceId, LAST_DISCONNECT_TIME, ts);
        }
    }

    private void onDeviceActivitySync(DeviceId deviceId) {
        DeviceStateData stateData = getOrFetchDeviceStateData(deviceId);
        if (stateData != null) {
            DeviceState state = stateData.getState();
            long ts = System.currentTimeMillis();
            state.setActive(true);
            stateData.getState().setLastActivityTime(ts);
            pushRuleEngineMessage(stateData, ACTIVITY_EVENT);
            saveAttribute(deviceId, LAST_ACTIVITY_TIME, ts);
            saveAttribute(deviceId, ACTIVITY_STATE, state.isActive());
        }
    }

    private DeviceStateData getOrFetchDeviceStateData(DeviceId deviceId) {
        DeviceStateData deviceStateData = deviceStates.get(deviceId);
        if (deviceStateData == null) {
            if (!routingService.resolveById(deviceId).isPresent()) {
                Device device = deviceService.findDeviceById(deviceId);
                if (device != null) {
                    try {
                        deviceStateData = fetchDeviceState(device).get();
                    } catch (InterruptedException | ExecutionException e) {
                        log.debug("[{}] Failed to fetch device state!", deviceId, e);
                    }
                }
            }
        }
        return deviceStateData;
    }

    private void onInactivityTimeoutUpdate(DeviceId deviceId, long inactivityTimeout) {
        if (inactivityTimeout == 0L) {
            return;
        }
        DeviceStateData stateData = deviceStates.get(deviceId);
        if (stateData != null) {
            long ts = System.currentTimeMillis();
            DeviceState state = stateData.getState();
            state.setInactivityTimeout(inactivityTimeout);
            boolean oldActive = state.isActive();
            state.setActive(ts < state.getLastActivityTime() + state.getInactivityTimeout());
            if (!oldActive && state.isActive() || oldActive && !state.isActive()) {
                saveAttribute(deviceId, ACTIVITY_STATE, state.isActive());
            }
        }
    }

    private void onDeviceAddedSync(Device device) {
        Optional<ServerAddress> address = routingService.resolveById(device.getId());
        if (!address.isPresent()) {
            Futures.addCallback(fetchDeviceState(device), new FutureCallback<DeviceStateData>() {
                @Override
                public void onSuccess(@Nullable DeviceStateData state) {
                    addDeviceUsingState(state);
                }

                @Override
                public void onFailure(Throwable t) {
                    log.warn("Failed to register device to the state service", t);
                }
            });
        } else {
            sendDeviceEvent(device.getTenantId(), device.getId(), address.get(), true, false, false);
        }
    }

    private void sendDeviceEvent(TenantId tenantId, DeviceId deviceId, ServerAddress address, boolean added, boolean updated, boolean deleted) {
        log.trace("[{}][{}] Device is monitored on other server: {}", tenantId, deviceId, address);
        ClusterAPIProtos.DeviceStateServiceMsgProto.Builder builder = ClusterAPIProtos.DeviceStateServiceMsgProto.newBuilder();
        builder.setTenantIdMSB(tenantId.getId().getMostSignificantBits());
        builder.setTenantIdLSB(tenantId.getId().getLeastSignificantBits());
        builder.setDeviceIdMSB(deviceId.getId().getMostSignificantBits());
        builder.setDeviceIdLSB(deviceId.getId().getLeastSignificantBits());
        builder.setAdded(added);
        builder.setUpdated(updated);
        builder.setDeleted(deleted);
        clusterRpcService.tell(address, ClusterAPIProtos.MessageType.CLUSTER_DEVICE_STATE_SERVICE_MESSAGE, builder.build().toByteArray());
    }

    private void onDeviceUpdatedSync(Device device) {
        Optional<ServerAddress> address = routingService.resolveById(device.getId());
        if (!address.isPresent()) {
            DeviceStateData stateData = getOrFetchDeviceStateData(device.getId());
            if (stateData != null) {
                TbMsgMetaData md = new TbMsgMetaData();
                md.putValue("deviceName", device.getName());
                md.putValue("deviceType", device.getType());
                stateData.setMetaData(md);
            }
        } else {
            sendDeviceEvent(device.getTenantId(), device.getId(), address.get(), false, true, false);
        }
    }

    private void onDeviceDeleted(TenantId tenantId, DeviceId deviceId) {
        Optional<ServerAddress> address = routingService.resolveById(deviceId);
        if (!address.isPresent()) {
            deviceStates.remove(deviceId);
            Set<DeviceId> deviceIds = tenantDevices.get(tenantId);
            if (deviceIds != null) {
                deviceIds.remove(deviceId);
                if (deviceIds.isEmpty()) {
                    tenantDevices.remove(tenantId);
                }
            }
        } else {
            sendDeviceEvent(tenantId, deviceId, address.get(), false, false, true);
        }
    }

    private ListenableFuture<DeviceStateData> fetchDeviceState(Device device) {
        ListenableFuture<List<AttributeKvEntry>> attributes = attributesService.find(device.getId(), DataConstants.SERVER_SCOPE, PERSISTENT_ATTRIBUTES);
        return Futures.transform(attributes, new Function<List<AttributeKvEntry>, DeviceStateData>() {
            @Nullable
            @Override
            public DeviceStateData apply(@Nullable List<AttributeKvEntry> attributes) {
                long lastActivityTime = getAttributeValue(attributes, LAST_ACTIVITY_TIME, 0L);
                long inactivityAlarmTime = getAttributeValue(attributes, INACTIVITY_ALARM_TIME, 0L);
                long inactivityTimeout = getAttributeValue(attributes, INACTIVITY_TIMEOUT, TimeUnit.SECONDS.toMillis(defaultInactivityTimeoutInSec));
                boolean active = System.currentTimeMillis() < lastActivityTime + inactivityTimeout;
                DeviceState deviceState = DeviceState.builder()
                        .active(active)
                        .lastConnectTime(getAttributeValue(attributes, LAST_CONNECT_TIME, 0L))
                        .lastDisconnectTime(getAttributeValue(attributes, LAST_DISCONNECT_TIME, 0L))
                        .lastActivityTime(lastActivityTime)
                        .lastInactivityAlarmTime(inactivityAlarmTime)
                        .inactivityTimeout(inactivityTimeout)
                        .build();
                TbMsgMetaData md = new TbMsgMetaData();
                md.putValue("deviceName", device.getName());
                md.putValue("deviceType", device.getType());
                return DeviceStateData.builder()
                        .tenantId(device.getTenantId())
                        .deviceId(device.getId())
                        .metaData(md)
                        .state(deviceState).build();
            }
        });
    }

    private long getAttributeValue(List<AttributeKvEntry> attributes, String attributeName, long defaultValue) {
        for (AttributeKvEntry attribute : attributes) {
            if (attribute.getKey().equals(attributeName)) {
                return attribute.getLongValue().orElse(defaultValue);
            }
        }
        return defaultValue;
    }

    private void pushRuleEngineMessage(DeviceStateData stateData, String msgType) {
        DeviceState state = stateData.getState();
        try {
            TbMsg tbMsg = new TbMsg(UUIDs.timeBased(), msgType, stateData.getDeviceId(), stateData.getMetaData().copy(), TbMsgDataType.JSON
                    , json.writeValueAsString(state)
                    , null, null, 0L);
            actorService.onMsg(new ServiceToRuleEngineMsg(stateData.getTenantId(), tbMsg));
        } catch (Exception e) {
            log.warn("[{}] Failed to push inactivity alarm: {}", stateData.getDeviceId(), state, e);
        }
    }

    private void saveAttribute(DeviceId deviceId, String key, long value) {
        tsSubService.saveAttrAndNotify(deviceId, DataConstants.SERVER_SCOPE, key, value, new AttributeSaveCallback(deviceId, key, value));
    }

    private void saveAttribute(DeviceId deviceId, String key, boolean value) {
        tsSubService.saveAttrAndNotify(deviceId, DataConstants.SERVER_SCOPE, key, value, new AttributeSaveCallback(deviceId, key, value));
    }

    private class AttributeSaveCallback implements FutureCallback<Void> {
        private final DeviceId deviceId;
        private final String key;
        private final Object value;

        AttributeSaveCallback(DeviceId deviceId, String key, Object value) {
            this.deviceId = deviceId;
            this.key = key;
            this.value = value;
        }

        @Override
        public void onSuccess(@Nullable Void result) {
            log.trace("[{}] Successfully updated attribute [{}] with value [{}]", deviceId, key, value);
        }

        @Override
        public void onFailure(Throwable t) {
            log.warn("[{}] Failed to update attribute [{}] with value [{}]", deviceId, key, value, t);
        }
    }
}