DefaultActorService.java

258 lines | 10.886 kB Blame History Raw Download
/**
 * Copyright © 2016-2018 The Thingsboard Authors
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.thingsboard.server.actors.service;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.Terminated;
import com.google.protobuf.ByteString;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.thingsboard.rule.engine.api.msg.DeviceCredentialsUpdateNotificationMsg;
import org.thingsboard.rule.engine.api.msg.DeviceNameOrTypeUpdateMsg;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.actors.app.AppActor;
import org.thingsboard.server.actors.rpc.RpcBroadcastMsg;
import org.thingsboard.server.actors.rpc.RpcManagerActor;
import org.thingsboard.server.actors.rpc.RpcSessionCreateRequestMsg;
import org.thingsboard.server.actors.session.SessionManagerActor;
import org.thingsboard.server.actors.stats.StatsActor;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.aware.SessionAwareMsg;
import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
import org.thingsboard.server.common.msg.cluster.SendToClusterMsg;
import org.thingsboard.server.common.msg.cluster.ServerAddress;
import org.thingsboard.server.common.msg.cluster.ToAllNodesMsg;
import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg;
import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
import org.thingsboard.server.service.cluster.discovery.DiscoveryService;
import org.thingsboard.server.service.cluster.discovery.ServerInstance;
import org.thingsboard.server.service.cluster.rpc.ClusterRpcService;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;

import static org.thingsboard.server.gen.cluster.ClusterAPIProtos.MessageType.CLUSTER_ACTOR_MESSAGE;

@Service
@Slf4j
public class DefaultActorService implements ActorService {

    private static final String ACTOR_SYSTEM_NAME = "Akka";

    public static final String APP_DISPATCHER_NAME = "app-dispatcher";
    public static final String CORE_DISPATCHER_NAME = "core-dispatcher";
    public static final String SYSTEM_RULE_DISPATCHER_NAME = "system-rule-dispatcher";
    public static final String SYSTEM_PLUGIN_DISPATCHER_NAME = "system-plugin-dispatcher";
    public static final String TENANT_RULE_DISPATCHER_NAME = "rule-dispatcher";
    public static final String TENANT_PLUGIN_DISPATCHER_NAME = "plugin-dispatcher";
    public static final String SESSION_DISPATCHER_NAME = "session-dispatcher";
    public static final String RPC_DISPATCHER_NAME = "rpc-dispatcher";

    @Autowired
    private ActorSystemContext actorContext;

    @Autowired
    private ClusterRpcService rpcService;

    @Autowired
    private DiscoveryService discoveryService;

    private ActorSystem system;

    private ActorRef appActor;

    private ActorRef sessionManagerActor;

    private ActorRef rpcManagerActor;

    @PostConstruct
    public void initActorSystem() {
        log.info("Initializing Actor system. {}", actorContext.getRuleChainService());
        actorContext.setActorService(this);
        system = ActorSystem.create(ACTOR_SYSTEM_NAME, actorContext.getConfig());
        actorContext.setActorSystem(system);

        appActor = system.actorOf(Props.create(new AppActor.ActorCreator(actorContext)).withDispatcher(APP_DISPATCHER_NAME), "appActor");
        actorContext.setAppActor(appActor);

        sessionManagerActor = system.actorOf(Props.create(new SessionManagerActor.ActorCreator(actorContext)).withDispatcher(CORE_DISPATCHER_NAME),
                "sessionManagerActor");
        actorContext.setSessionManagerActor(sessionManagerActor);

        rpcManagerActor = system.actorOf(Props.create(new RpcManagerActor.ActorCreator(actorContext)).withDispatcher(CORE_DISPATCHER_NAME),
                "rpcManagerActor");

        ActorRef statsActor = system.actorOf(Props.create(new StatsActor.ActorCreator(actorContext)).withDispatcher(CORE_DISPATCHER_NAME), "statsActor");
        actorContext.setStatsActor(statsActor);

        rpcService.init(this);

        discoveryService.addListener(this);
        log.info("Actor system initialized.");
    }

    @PreDestroy
    public void stopActorSystem() {
        Future<Terminated> status = system.terminate();
        try {
            Terminated terminated = Await.result(status, Duration.Inf());
            log.info("Actor system terminated: {}", terminated);
        } catch (Exception e) {
            log.error("Failed to terminate actor system.", e);
        }
    }

    @Override
    public void onMsg(SendToClusterMsg msg) {
        appActor.tell(msg, ActorRef.noSender());
    }

    @Override
    public void process(SessionAwareMsg msg) {
        log.debug("Processing session aware msg: {}", msg);
        sessionManagerActor.tell(msg, ActorRef.noSender());
    }

    @Override
    public void onServerAdded(ServerInstance server) {
        log.trace("Processing onServerAdded msg: {}", server);
        broadcast(new ClusterEventMsg(server.getServerAddress(), true));
    }

    @Override
    public void onServerUpdated(ServerInstance server) {
        //Do nothing
    }

    @Override
    public void onServerRemoved(ServerInstance server) {
        log.trace("Processing onServerRemoved msg: {}", server);
        broadcast(new ClusterEventMsg(server.getServerAddress(), false));
    }

    @Override
    public void onEntityStateChange(TenantId tenantId, EntityId entityId, ComponentLifecycleEvent state) {
        log.trace("[{}] Processing {} state change event: {}", tenantId, entityId.getEntityType(), state);
        broadcast(new ComponentLifecycleMsg(tenantId, entityId, state));
    }

    @Override
    public void onCredentialsUpdate(TenantId tenantId, DeviceId deviceId) {
        DeviceCredentialsUpdateNotificationMsg msg = new DeviceCredentialsUpdateNotificationMsg(tenantId, deviceId);
        appActor.tell(new SendToClusterMsg(deviceId, msg), ActorRef.noSender());
    }

    @Override
    public void onDeviceNameOrTypeUpdate(TenantId tenantId, DeviceId deviceId, String deviceName, String deviceType) {
        log.trace("[{}] Processing onDeviceNameOrTypeUpdate event, deviceName: {}, deviceType: {}", deviceId, deviceName, deviceType);
        DeviceNameOrTypeUpdateMsg msg = new DeviceNameOrTypeUpdateMsg(tenantId, deviceId, deviceName, deviceType);
        appActor.tell(new SendToClusterMsg(deviceId, msg), ActorRef.noSender());
    }

    @Override
    public void onMsg(ServiceToRuleEngineMsg msg) {
        appActor.tell(msg, ActorRef.noSender());
    }

    public void broadcast(ToAllNodesMsg msg) {
        actorContext.getEncodingService().encode(msg);
        rpcService.broadcast(new RpcBroadcastMsg(ClusterAPIProtos.ClusterMessage
                .newBuilder()
                .setPayload(ByteString
                        .copyFrom(actorContext.getEncodingService().encode(msg)))
                .setMessageType(CLUSTER_ACTOR_MESSAGE)
                .build()));
        appActor.tell(msg, ActorRef.noSender());
    }

    private void broadcast(ClusterEventMsg msg) {
        this.appActor.tell(msg, ActorRef.noSender());
        this.sessionManagerActor.tell(msg, ActorRef.noSender());
        this.rpcManagerActor.tell(msg, ActorRef.noSender());
    }

    @Override
    public void onReceivedMsg(ServerAddress source, ClusterAPIProtos.ClusterMessage msg) {
        ServerAddress serverAddress = new ServerAddress(source.getHost(), source.getPort());
        log.info("Received msg [{}] from [{}]", msg.getMessageType().name(), serverAddress);
        if(log.isDebugEnabled()){
            log.info("MSG: ", msg);
        }
        switch (msg.getMessageType()) {
            case CLUSTER_ACTOR_MESSAGE:
                java.util.Optional<TbActorMsg> decodedMsg = actorContext.getEncodingService()
                        .decode(msg.getPayload().toByteArray());
                if (decodedMsg.isPresent()) {
                    appActor.tell(decodedMsg.get(), ActorRef.noSender());
                } else {
                    log.error("Error during decoding cluster proto message");
                }
                break;
            case TO_ALL_NODES_MSG:
                //TODO
                break;
            case CLUSTER_TELEMETRY_SUBSCRIPTION_CREATE_MESSAGE:
                actorContext.getTsSubService().onNewRemoteSubscription(serverAddress, msg.getPayload().toByteArray());
                break;
            case CLUSTER_TELEMETRY_SUBSCRIPTION_UPDATE_MESSAGE:
                actorContext.getTsSubService().onRemoteSubscriptionUpdate(serverAddress, msg.getPayload().toByteArray());
                break;
            case CLUSTER_TELEMETRY_SUBSCRIPTION_CLOSE_MESSAGE:
                actorContext.getTsSubService().onRemoteSubscriptionClose(serverAddress, msg.getPayload().toByteArray());
                break;
            case CLUSTER_TELEMETRY_SESSION_CLOSE_MESSAGE:
                actorContext.getTsSubService().onRemoteSessionClose(serverAddress, msg.getPayload().toByteArray());
                break;
            case CLUSTER_TELEMETRY_ATTR_UPDATE_MESSAGE:
                actorContext.getTsSubService().onRemoteAttributesUpdate(serverAddress, msg.getPayload().toByteArray());
                break;
            case CLUSTER_TELEMETRY_TS_UPDATE_MESSAGE:
                actorContext.getTsSubService().onRemoteTsUpdate(serverAddress, msg.getPayload().toByteArray());
                break;
            case CLUSTER_RPC_FROM_DEVICE_RESPONSE_MESSAGE:
                actorContext.getDeviceRpcService().processRemoteResponseFromDevice(serverAddress, msg.getPayload().toByteArray());
                break;
        }
    }

    @Override
    public void onSendMsg(ClusterAPIProtos.ClusterMessage msg) {
        rpcManagerActor.tell(msg, ActorRef.noSender());
    }

    @Override
    public void onRpcSessionCreateRequestMsg(RpcSessionCreateRequestMsg msg) {
        rpcManagerActor.tell(msg, ActorRef.noSender());
    }

    @Override
    public void onBroadcastMsg(RpcBroadcastMsg msg) {
        rpcManagerActor.tell(msg, ActorRef.noSender());
    }

}