RuleActorMessageProcessor.java

340 lines | 13.931 kB Blame History Raw Download
/**
 * Copyright © 2016 The Thingsboard Authors
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.thingsboard.server.actors.rule;

import java.util.*;

import com.fasterxml.jackson.core.JsonProcessingException;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.actors.plugin.RuleToPluginMsgWrapper;
import org.thingsboard.server.actors.shared.ComponentMsgProcessor;
import org.thingsboard.server.common.data.id.PluginId;
import org.thingsboard.server.common.data.id.RuleId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.plugin.ComponentLifecycleState;
import org.thingsboard.server.common.data.plugin.PluginMetaData;
import org.thingsboard.server.common.data.rule.RuleMetaData;
import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
import org.thingsboard.server.common.msg.core.BasicRequest;
import org.thingsboard.server.common.msg.core.BasicStatusCodeResponse;
import org.thingsboard.server.common.msg.core.RuleEngineError;
import org.thingsboard.server.common.msg.device.ToDeviceActorMsg;
import org.thingsboard.server.common.msg.session.MsgType;
import org.thingsboard.server.common.msg.session.ToDeviceMsg;
import org.thingsboard.server.common.msg.session.ex.ProcessingTimeoutException;
import org.thingsboard.server.extensions.api.rules.*;
import org.thingsboard.server.extensions.api.plugins.PluginAction;
import org.thingsboard.server.extensions.api.plugins.msg.PluginToRuleMsg;
import org.thingsboard.server.extensions.api.plugins.msg.RuleToPluginMsg;

import com.fasterxml.jackson.databind.JsonNode;

import akka.actor.ActorContext;
import akka.actor.ActorRef;
import akka.event.LoggingAdapter;

class RuleActorMessageProcessor extends ComponentMsgProcessor<RuleId> {

    private final RuleProcessingContext ruleCtx;
    private final Map<UUID, RuleProcessingMsg> pendingMsgMap;

    private RuleMetaData ruleMd;
    private ComponentLifecycleState state;
    private List<RuleFilter> filters;
    private RuleProcessor processor;
    private PluginAction action;

    private TenantId pluginTenantId;
    private PluginId pluginId;

    protected RuleActorMessageProcessor(TenantId tenantId, RuleId ruleId, ActorSystemContext systemContext, LoggingAdapter logger) {
        super(systemContext, logger, tenantId, ruleId);
        this.pendingMsgMap = new HashMap<>();
        this.ruleCtx = new RuleProcessingContext(systemContext, ruleId);
    }

    @Override
    public void start() throws Exception {
        logger.info("[{}][{}] Starting rule actor.", entityId, tenantId);
        ruleMd = systemContext.getRuleService().findRuleById(entityId);
        if (ruleMd == null) {
            throw new RuleInitializationException("Rule not found!");
        }
        state = ruleMd.getState();
        if (state == ComponentLifecycleState.ACTIVE) {
            logger.info("[{}] Rule is active. Going to initialize rule components.", entityId);
            initComponent();
        } else {
            logger.info("[{}] Rule is suspended. Skipping rule components initialization.", entityId);
        }

        logger.info("[{}][{}] Started rule actor.", entityId, tenantId);
    }

    @Override
    public void stop() throws Exception {
        onStop();
    }


    private void initComponent() throws RuleException {
        try {
            if (!ruleMd.getFilters().isArray()) {
                throw new RuntimeException("Filters are not array!");
            }
            fetchPluginInfo();
            initFilters();
            initProcessor();
            initAction();
        } catch (RuntimeException e) {
            throw new RuleInitializationException("Unknown runtime exception!", e);
        } catch (InstantiationException e) {
            throw new RuleInitializationException("No default constructor for rule implementation!", e);
        } catch (IllegalAccessException e) {
            throw new RuleInitializationException("Illegal Access Exception during rule initialization!", e);
        } catch (ClassNotFoundException e) {
            throw new RuleInitializationException("Rule Class not found!", e);
        } catch (Exception e) {
            throw new RuleException(e.getMessage(), e);
        }
    }

    private void initAction() throws Exception {
        JsonNode actionMd = ruleMd.getAction();
        action = initComponent(actionMd);
    }

    private void initProcessor() throws Exception {
        if (ruleMd.getProcessor() != null && !ruleMd.getProcessor().isNull()) {
            processor = initComponent(ruleMd.getProcessor());
        }
    }

    private void initFilters() throws Exception {
        filters = new ArrayList<>(ruleMd.getFilters().size());
        for (int i = 0; i < ruleMd.getFilters().size(); i++) {
            filters.add(initComponent(ruleMd.getFilters().get(i)));
        }
    }

    private void fetchPluginInfo() {
        PluginMetaData pluginMd = systemContext.getPluginService().findPluginByApiToken(ruleMd.getPluginToken());
        pluginTenantId = pluginMd.getTenantId();
        pluginId = pluginMd.getId();
    }

    protected void onRuleProcessingMsg(ActorContext context, RuleProcessingMsg msg) throws RuleException {
        if (state != ComponentLifecycleState.ACTIVE) {
            pushToNextRule(context, msg.getCtx(), RuleEngineError.NO_ACTIVE_RULES);
            return;
        }
        ChainProcessingContext chainCtx = msg.getCtx();
        ToDeviceActorMsg inMsg = chainCtx.getInMsg();

        ruleCtx.update(inMsg, chainCtx.getAttributes());

        logger.debug("[{}] Going to filter in msg: {}", entityId, inMsg);
        for (RuleFilter filter : filters) {
            if (!filter.filter(ruleCtx, inMsg)) {
                logger.debug("[{}] In msg is NOT valid for processing by current rule: {}", entityId, inMsg);
                pushToNextRule(context, msg.getCtx(), RuleEngineError.NO_FILTERS_MATCHED);
                return;
            }
        }
        RuleProcessingMetaData inMsgMd;
        if (processor != null) {
            logger.debug("[{}] Going to process in msg: {}", entityId, inMsg);
            inMsgMd = processor.process(ruleCtx, inMsg);
        } else {
            inMsgMd = new RuleProcessingMetaData();
        }
        logger.debug("[{}] Going to convert in msg: {}", entityId, inMsg);
        Optional<RuleToPluginMsg<?>> ruleToPluginMsgOptional = action.convert(ruleCtx, inMsg, inMsgMd);
        if (ruleToPluginMsgOptional.isPresent()) {
            RuleToPluginMsg<?> ruleToPluginMsg = ruleToPluginMsgOptional.get();
            logger.debug("[{}] Device msg is converter to: {}", entityId, ruleToPluginMsg);
            context.parent().tell(new RuleToPluginMsgWrapper(pluginTenantId, pluginId, tenantId, entityId, ruleToPluginMsg), context.self());
            if (action.isOneWayAction()) {
                pushToNextRule(context, msg.getCtx(), RuleEngineError.NO_TWO_WAY_ACTIONS);
            } else {
                pendingMsgMap.put(ruleToPluginMsg.getUid(), msg);
                scheduleMsgWithDelay(context, new RuleToPluginTimeoutMsg(ruleToPluginMsg.getUid()), systemContext.getPluginProcessingTimeout());
            }
        } else {
            logger.debug("[{}] Nothing to send to plugin: {}", entityId, pluginId);
            pushToNextRule(context, msg.getCtx(), RuleEngineError.NO_REQUEST_FROM_ACTIONS);
            return;
        }
    }

    public void onPluginMsg(ActorContext context, PluginToRuleMsg<?> msg) {
        RuleProcessingMsg pendingMsg = pendingMsgMap.remove(msg.getUid());
        if (pendingMsg != null) {
            ChainProcessingContext ctx = pendingMsg.getCtx();
            Optional<ToDeviceMsg> ruleResponseOptional = action.convert(msg);
            if (ruleResponseOptional.isPresent()) {
                ctx.mergeResponse(ruleResponseOptional.get());
                pushToNextRule(context, ctx, null);
            } else {
                pushToNextRule(context, ctx, RuleEngineError.NO_RESPONSE_FROM_ACTIONS);
            }
        } else {
            logger.warning("[{}] Processing timeout detected: [{}]", entityId, msg.getUid());
        }
    }

    public void onTimeoutMsg(ActorContext context, RuleToPluginTimeoutMsg msg) {
        RuleProcessingMsg pendingMsg = pendingMsgMap.remove(msg.getMsgId());
        if (pendingMsg != null) {
            logger.debug("[{}] Processing timeout detected [{}]: {}", entityId, msg.getMsgId(), pendingMsg);
            ChainProcessingContext ctx = pendingMsg.getCtx();
            pushToNextRule(context, ctx, RuleEngineError.PLUGIN_TIMEOUT);
        }
    }

    private void pushToNextRule(ActorContext context, ChainProcessingContext ctx, RuleEngineError error) {
        if (error != null) {
            ctx = ctx.withError(error);
        }
        if (ctx.isFailure()) {
            logger.debug("[{}] Forwarding processing chain to device actor due to failure.", ctx.getInMsg().getDeviceId());
            ctx.getDeviceActor().tell(new RulesProcessedMsg(ctx), ActorRef.noSender());
        } else if (!ctx.hasNext()) {
            logger.debug("[{}] Forwarding processing chain to device actor due to end of chain.", ctx.getInMsg().getDeviceId());
            ctx.getDeviceActor().tell(new RulesProcessedMsg(ctx), ActorRef.noSender());
        } else {
            logger.debug("[{}] Forwarding processing chain to next rule actor.", ctx.getInMsg().getDeviceId());
            ChainProcessingContext nextTask = ctx.getNext();
            nextTask.getCurrentActor().tell(new RuleProcessingMsg(nextTask), context.self());
        }
    }

    @Override
    public void onCreated(ActorContext context) {
        logger.info("[{}] Going to process onCreated rule.", entityId);
    }

    @Override
    public void onUpdate(ActorContext context) throws RuleException {
        RuleMetaData oldRuleMd = ruleMd;
        ruleMd = systemContext.getRuleService().findRuleById(entityId);
        logger.info("[{}] Rule configuration was updated from {} to {}.", entityId, oldRuleMd, ruleMd);
        try {
            fetchPluginInfo();
            if (!Objects.equals(oldRuleMd.getFilters(), ruleMd.getFilters())) {
                logger.info("[{}] Rule filters require restart due to json change from {} to {}.",
                        entityId, mapper.writeValueAsString(oldRuleMd.getFilters()), mapper.writeValueAsString(ruleMd.getFilters()));
                stopFilters();
                initFilters();
            }
            if (!Objects.equals(oldRuleMd.getProcessor(), ruleMd.getProcessor())) {
                logger.info("[{}] Rule processor require restart due to configuration change.", entityId);
                stopProcessor();
                initProcessor();
            }
            if (!Objects.equals(oldRuleMd.getAction(), ruleMd.getAction())) {
                logger.info("[{}] Rule action require restart due to configuration change.", entityId);
                stopAction();
                initAction();
            }
        } catch (RuntimeException e) {
            throw new RuleInitializationException("Unknown runtime exception!", e);
        } catch (InstantiationException e) {
            throw new RuleInitializationException("No default constructor for rule implementation!", e);
        } catch (IllegalAccessException e) {
            throw new RuleInitializationException("Illegal Access Exception during rule initialization!", e);
        } catch (ClassNotFoundException e) {
            throw new RuleInitializationException("Rule Class not found!", e);
        } catch (JsonProcessingException e) {
            throw new RuleInitializationException("Rule configuration is invalid!", e);
        } catch (Exception e) {
            throw new RuleInitializationException(e.getMessage(), e);
        }
    }

    @Override
    public void onActivate(ActorContext context) throws Exception {
        logger.info("[{}] Going to process onActivate rule.", entityId);
        this.state = ComponentLifecycleState.ACTIVE;
        if (action != null) {
            if (filters != null) {
                filters.forEach(f -> f.resume());
            }
            if (processor != null) {
                processor.resume();
            }
            if (action != null) {
                action.resume();
            }
            logger.info("[{}] Rule resumed.", entityId);
        } else {
            start();
        }
    }

    @Override
    public void onSuspend(ActorContext context) {
        logger.info("[{}] Going to process onSuspend rule.", entityId);
        this.state = ComponentLifecycleState.SUSPENDED;
        if (filters != null) {
            filters.forEach(f -> f.suspend());
        }
        if (processor != null) {
            processor.suspend();
        }
        if (action != null) {
            action.suspend();
        }
    }

    @Override
    public void onStop(ActorContext context) {
        logger.info("[{}] Going to process onStop rule.", entityId);
        onStop();
        scheduleMsgWithDelay(context, new RuleTerminationMsg(entityId), systemContext.getRuleActorTerminationDelay());
    }

    private void onStop() {
        this.state = ComponentLifecycleState.SUSPENDED;
        stopFilters();
        stopProcessor();
        stopAction();
    }

    @Override
    public void onClusterEventMsg(ClusterEventMsg msg) throws Exception {

    }

    private void stopAction() {
        if (action != null) {
            action.stop();
        }
    }

    private void stopProcessor() {
        if (processor != null) {
            processor.stop();
        }
    }

    private void stopFilters() {
        if (filters != null) {
            filters.forEach(f -> f.stop());
        }
    }
}