Schedule2Trigger.java

316 lines | 11.69 kB Blame History Raw Download
/*
 * Copyright 2014 LinkedIn Corp.
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */

package azkaban.migration.schedule2trigger;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.commons.io.FileUtils;
import org.apache.log4j.Logger;

import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.ReadablePeriod;

import azkaban.executor.ExecutionOptions;
import azkaban.trigger.Condition;
import azkaban.trigger.ConditionChecker;
import azkaban.trigger.JdbcTriggerLoader;
import azkaban.trigger.Trigger;
import azkaban.trigger.TriggerAction;
import azkaban.trigger.TriggerLoader;
import azkaban.trigger.builtin.BasicTimeChecker;
import azkaban.trigger.builtin.ExecuteFlowAction;
import azkaban.utils.JSONUtils;
import azkaban.utils.Props;
import azkaban.utils.Utils;

import static azkaban.migration.schedule2trigger.CommonParams.*;

@SuppressWarnings("deprecation")
public class Schedule2Trigger {

  private static final Logger logger = Logger.getLogger(Schedule2Trigger.class);
  private static Props props;
  private static File outputDir;

  public static void main(String[] args) throws Exception {
    if (args.length < 1) {
      printUsage();
    }

    File confFile = new File(args[0]);
    try {
      logger.info("Trying to load config from " + confFile.getAbsolutePath());
      props = loadAzkabanConfig(confFile);
    } catch (Exception e) {
      e.printStackTrace();
      logger.error(e);
      return;
    }

    try {
      outputDir = File.createTempFile("schedules", null);
      logger.info("Creating temp dir for dumping existing schedules.");
      outputDir.delete();
      outputDir.mkdir();
    } catch (Exception e) {
      e.printStackTrace();
      logger.error(e);
      return;
    }

    try {
      schedule2File();
    } catch (Exception e) {
      e.printStackTrace();
      logger.error(e);
      return;
    }

    try {
      file2ScheduleTrigger();
    } catch (Exception e) {
      e.printStackTrace();
      logger.error(e);
      return;
    }

    logger.info("Uploaded all schedules. Removing temp dir.");
    FileUtils.deleteDirectory(outputDir);
    System.exit(0);
  }

  private static Props loadAzkabanConfig(File confFile) throws IOException {
    return new Props(null, confFile);
  }

  private static void printUsage() {
    System.out.println("Usage: schedule2Trigger PATH_TO_CONFIG_FILE");
  }

  private static void schedule2File() throws Exception {
    azkaban.migration.scheduler.ScheduleLoader scheduleLoader =
        new azkaban.migration.scheduler.JdbcScheduleLoader(props);
    logger.info("Loading old schedule info from DB.");
    List<azkaban.migration.scheduler.Schedule> schedules =
        scheduleLoader.loadSchedules();
    for (azkaban.migration.scheduler.Schedule sched : schedules) {
      writeScheduleFile(sched, outputDir);
    }
  }

  private static void writeScheduleFile(
      azkaban.migration.scheduler.Schedule sched, File outputDir)
      throws IOException {
    String scheduleFileName =
        sched.getProjectName() + "-" + sched.getFlowName();
    File outputFile = new File(outputDir, scheduleFileName);
    outputFile.createNewFile();
    Props props = new Props();
    props.put("flowName", sched.getFlowName());
    props.put("projectName", sched.getProjectName());
    props.put("projectId", String.valueOf(sched.getProjectId()));
    props.put("period", azkaban.migration.scheduler.Schedule
        .createPeriodString(sched.getPeriod()));
    props.put("firstScheduleTimeLong", sched.getFirstSchedTime());
    props.put("timezone", sched.getTimezone().getID());
    props.put("submitUser", sched.getSubmitUser());
    props.put("submitTimeLong", sched.getSubmitTime());
    props.put("nextExecTimeLong", sched.getNextExecTime());

    ExecutionOptions executionOptions = sched.getExecutionOptions();
    if (executionOptions != null) {
      props.put("executionOptionsObj",
          JSONUtils.toJSON(executionOptions.toObject()));
    }

    azkaban.migration.sla.SlaOptions slaOptions = sched.getSlaOptions();
    if (slaOptions != null) {

      List<Map<String, Object>> settingsObj =
          new ArrayList<Map<String, Object>>();
      List<azkaban.migration.sla.SLA.SlaSetting> settings =
          slaOptions.getSettings();
      for (azkaban.migration.sla.SLA.SlaSetting set : settings) {
        Map<String, Object> setObj = new HashMap<String, Object>();
        String setId = set.getId();
        azkaban.migration.sla.SLA.SlaRule rule = set.getRule();
        Map<String, Object> info = new HashMap<String, Object>();
        info.put(INFO_DURATION, azkaban.migration.scheduler.Schedule
            .createPeriodString(set.getDuration()));
        info.put(INFO_EMAIL_LIST, slaOptions.getSlaEmails());
        List<String> actionsList = new ArrayList<String>();
        for (azkaban.migration.sla.SLA.SlaAction act : set.getActions()) {
          if (act.equals(azkaban.migration.sla.SLA.SlaAction.EMAIL)) {
            actionsList.add(ACTION_ALERT);
            info.put(ALERT_TYPE, "email");
          } else if (act.equals(azkaban.migration.sla.SLA.SlaAction.KILL)) {
            actionsList.add(ACTION_CANCEL_FLOW);
          }
        }
        setObj.put("actions", actionsList);
        if (setId.equals("")) {
          info.put(INFO_FLOW_NAME, sched.getFlowName());
          if (rule.equals(azkaban.migration.sla.SLA.SlaRule.FINISH)) {
            setObj.put("type", TYPE_FLOW_FINISH);
          } else if (rule.equals(azkaban.migration.sla.SLA.SlaRule.SUCCESS)) {
            setObj.put("type", TYPE_FLOW_SUCCEED);
          }
        } else {
          info.put(INFO_JOB_NAME, setId);
          if (rule.equals(azkaban.migration.sla.SLA.SlaRule.FINISH)) {
            setObj.put("type", TYPE_JOB_FINISH);
          } else if (rule.equals(azkaban.migration.sla.SLA.SlaRule.SUCCESS)) {
            setObj.put("type", TYPE_JOB_SUCCEED);
          }
        }
        setObj.put("info", info);
        settingsObj.add(setObj);
      }

      props.put("slaOptionsObj", JSONUtils.toJSON(settingsObj));
    }
    props.storeLocal(outputFile);
  }

  @SuppressWarnings("unchecked")
  private static void file2ScheduleTrigger() throws Exception {

    TriggerLoader triggerLoader = new JdbcTriggerLoader(props);
    for (File scheduleFile : outputDir.listFiles()) {
      logger.info("Trying to load schedule from "
          + scheduleFile.getAbsolutePath());
      if (scheduleFile.isFile()) {
        Props schedProps = new Props(null, scheduleFile);
        String flowName = schedProps.getString("flowName");
        String projectName = schedProps.getString("projectName");
        int projectId = schedProps.getInt("projectId");
        long firstSchedTimeLong = schedProps.getLong("firstScheduleTimeLong");
        // DateTime firstSchedTime = new DateTime(firstSchedTimeLong);
        String timezoneId = schedProps.getString("timezone");
        DateTimeZone timezone = DateTimeZone.forID(timezoneId);
        ReadablePeriod period =
            Utils.parsePeriodString(schedProps.getString("period"));
        // DateTime lastModifyTime = DateTime.now();
        long nextExecTimeLong = schedProps.getLong("nextExecTimeLong");
        // DateTime nextExecTime = new DateTime(nextExecTimeLong);
        long submitTimeLong = schedProps.getLong("submitTimeLong");
        // DateTime submitTime = new DateTime(submitTimeLong);
        String submitUser = schedProps.getString("submitUser");
        ExecutionOptions executionOptions = null;
        if (schedProps.containsKey("executionOptionsObj")) {
          String executionOptionsObj =
              schedProps.getString("executionOptionsObj");
          executionOptions =
              ExecutionOptions.createFromObject(JSONUtils
                  .parseJSONFromString(executionOptionsObj));
        } else {
          executionOptions = new ExecutionOptions();
        }
        List<azkaban.sla.SlaOption> slaOptions = null;
        if (schedProps.containsKey("slaOptionsObj")) {
          slaOptions = new ArrayList<azkaban.sla.SlaOption>();
          List<Map<String, Object>> settingsObj =
              (List<Map<String, Object>>) JSONUtils
                  .parseJSONFromString(schedProps.getString("slaOptionsObj"));
          for (Map<String, Object> sla : settingsObj) {
            String type = (String) sla.get("type");
            Map<String, Object> info = (Map<String, Object>) sla.get("info");
            List<String> actions = (List<String>) sla.get("actions");
            azkaban.sla.SlaOption slaOption =
                new azkaban.sla.SlaOption(type, actions, info);
            slaOptions.add(slaOption);
          }
        }

        azkaban.scheduler.Schedule schedule =
            new azkaban.scheduler.Schedule(-1, projectId, projectName,
                flowName, "ready", firstSchedTimeLong, timezone, period,
                DateTime.now().getMillis(), nextExecTimeLong, submitTimeLong,
                submitUser, executionOptions, slaOptions);
        Trigger t = scheduleToTrigger(schedule);
        logger.info("Ready to insert trigger " + t.getDescription());
        triggerLoader.addTrigger(t);

      }

    }
  }

  private static Trigger scheduleToTrigger(azkaban.scheduler.Schedule s) {

    Condition triggerCondition = createTimeTriggerCondition(s);
    Condition expireCondition = createTimeExpireCondition(s);
    List<TriggerAction> actions = createActions(s);
    Trigger t =
        new Trigger(s.getScheduleId(), s.getLastModifyTime(),
            s.getSubmitTime(), s.getSubmitUser(),
            azkaban.scheduler.ScheduleManager.triggerSource, triggerCondition,
            expireCondition, actions);
    if (s.isRecurring()) {
      t.setResetOnTrigger(true);
    }
    return t;
  }

  private static List<TriggerAction> createActions(azkaban.scheduler.Schedule s) {
    List<TriggerAction> actions = new ArrayList<TriggerAction>();
    ExecuteFlowAction executeAct =
        new ExecuteFlowAction("executeFlowAction", s.getProjectId(),
            s.getProjectName(), s.getFlowName(), s.getSubmitUser(),
            s.getExecutionOptions(), s.getSlaOptions());
    actions.add(executeAct);

    return actions;
  }

  private static Condition createTimeTriggerCondition(
      azkaban.scheduler.Schedule s) {
    Map<String, ConditionChecker> checkers =
        new HashMap<String, ConditionChecker>();
    ConditionChecker checker =
        new BasicTimeChecker("BasicTimeChecker_1", s.getFirstSchedTime(),
            s.getTimezone(), s.isRecurring(), s.skipPastOccurrences(),
            s.getPeriod());
    checkers.put(checker.getId(), checker);
    String expr = checker.getId() + ".eval()";
    Condition cond = new Condition(checkers, expr);
    return cond;
  }

  // if failed to trigger, auto expire?
  private static Condition createTimeExpireCondition(
      azkaban.scheduler.Schedule s) {
    Map<String, ConditionChecker> checkers =
        new HashMap<String, ConditionChecker>();
    ConditionChecker checker =
        new BasicTimeChecker("BasicTimeChecker_2", s.getFirstSchedTime(),
            s.getTimezone(), s.isRecurring(), s.skipPastOccurrences(),
            s.getPeriod());
    checkers.put(checker.getId(), checker);
    String expr = checker.getId() + ".eval()";
    Condition cond = new Condition(checkers, expr);
    return cond;
  }

}