TriggerManager.java

110 lines | 4.157 kB Blame History Raw Download
/*
 * Copyright 2017 LinkedIn Corp.
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */

package azkaban.execapp;

import azkaban.execapp.action.KillExecutionAction;
import azkaban.execapp.action.KillJobAction;
import azkaban.sla.SlaOption;
import azkaban.trigger.Condition;
import azkaban.trigger.ConditionChecker;
import azkaban.trigger.TriggerAction;
import azkaban.trigger.builtin.SlaAlertAction;
import azkaban.trigger.builtin.SlaChecker;
import azkaban.utils.Utils;
import com.google.inject.Inject;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;
import org.joda.time.ReadablePeriod;


public class TriggerManager {

  private static final int SCHEDULED_THREAD_POOL_SIZE = 4;
  private static final Logger logger = Logger.getLogger(TriggerManager.class);
  private final ScheduledExecutorService scheduledService;

  @Inject
  public TriggerManager() {
    this.scheduledService = Executors.newScheduledThreadPool(SCHEDULED_THREAD_POOL_SIZE);
  }

  private Condition createCondition(final SlaOption sla, final int execId, final String checkerName,
      final String checkerMethod) {
    final SlaChecker slaFailChecker = new SlaChecker(checkerName, sla, execId);
    final Map<String, ConditionChecker> slaCheckers = new HashMap<>();
    slaCheckers.put(slaFailChecker.getId(), slaFailChecker);
    return new Condition(slaCheckers, slaFailChecker.getId() + "." + checkerMethod);
  }

  private List<TriggerAction> createActions(final SlaOption sla, final int execId) {
    final List<TriggerAction> actions = new ArrayList<>();
    final List<String> slaActions = sla.getActions();
    for (final String act : slaActions) {
      TriggerAction action = null;
      switch (act) {
        case SlaOption.ACTION_ALERT:
          action = new SlaAlertAction(SlaOption.ACTION_ALERT, sla, execId);
          break;
        case SlaOption.ACTION_CANCEL_FLOW:
          action = new KillExecutionAction(SlaOption.ACTION_CANCEL_FLOW, execId);
          break;
        case SlaOption.ACTION_KILL_JOB:
          final String jobId = (String) sla.getInfo().get(SlaOption.INFO_JOB_NAME);
          action = new KillJobAction(SlaOption.ACTION_KILL_JOB, execId, jobId);
          break;
        default:
          logger.info("Unknown action type " + act);
          break;
      }
      if (action != null) {
        actions.add(action);
      }
    }
    return actions;
  }

  @SuppressWarnings("FutureReturnValueIgnored")
  public void addTrigger(final int execId, final List<SlaOption> slaOptions) {
    for (final SlaOption sla : slaOptions) {
      final Condition triggerCond = createCondition(sla, execId, "slaFailChecker", "isSlaFailed()");

      // if whole flow finish before violating sla, just expire the checker
      final Condition expireCond = createCondition(sla, execId, "slaPassChecker", "isSlaPassed()");

      final List<TriggerAction> actions = createActions(sla, execId);
      final Trigger trigger = new Trigger(execId, triggerCond, expireCond, actions);
      final ReadablePeriod duration = Utils
          .parsePeriodString((String) sla.getInfo().get(SlaOption.INFO_DURATION));
      final long durationInMillis = duration.toPeriod().toStandardDuration().getMillis();

      logger.info("Adding sla trigger " + sla.toString() + " to execution " + execId
          + ", scheduled to trigger in " + durationInMillis / 1000 + " seconds");
      this.scheduledService.schedule(trigger, durationInMillis, TimeUnit.MILLISECONDS);
    }
  }

  public void shutdown() {
    this.scheduledService.shutdownNow();
  }
}