package azkaban.migration.schedule2trigger;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.io.FileUtils;
import org.apache.log4j.Logger;
import azkaban.executor.ExecutionOptions;
import static azkaban.migration.schedule2trigger.CommonParams.*;
import azkaban.utils.JSONUtils;
import azkaban.utils.Props;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.ReadablePeriod;
import azkaban.trigger.Condition;
import azkaban.trigger.ConditionChecker;
import azkaban.trigger.JdbcTriggerLoader;
import azkaban.trigger.Trigger;
import azkaban.trigger.TriggerAction;
import azkaban.trigger.TriggerLoader;
import azkaban.trigger.builtin.BasicTimeChecker;
import azkaban.trigger.builtin.ExecuteFlowAction;
import azkaban.utils.Utils;
@SuppressWarnings("deprecation")
public class Schedule2Trigger {
private static final Logger logger = Logger.getLogger(Schedule2Trigger.class);
private static Props props;
private static File outputDir;
public static void main(String[] args) throws Exception{
if(args.length < 1) {
printUsage();
}
File confFile = new File(args[0]);
try {
logger.info("Trying to load config from " + confFile.getAbsolutePath());
props = loadAzkabanConfig(confFile);
} catch (Exception e) {
e.printStackTrace();
logger.error(e);
return;
}
try {
outputDir = File.createTempFile("schedules", null);
logger.info("Creating temp dir for dumping existing schedules.");
outputDir.delete();
outputDir.mkdir();
} catch (Exception e) {
e.printStackTrace();
logger.error(e);
return;
}
try {
schedule2File();
} catch (Exception e) {
e.printStackTrace();
logger.error(e);
return;
}
try {
file2ScheduleTrigger();
} catch (Exception e) {
e.printStackTrace();
logger.error(e);
return;
}
logger.info("Uploaded all schedules. Removing temp dir.");
FileUtils.deleteDirectory(outputDir);
System.exit(0);
}
private static Props loadAzkabanConfig(File confFile) throws IOException {
return new Props(null, confFile);
}
private static void printUsage() {
System.out.println("Usage: schedule2Trigger PATH_TO_CONFIG_FILE");
}
private static void schedule2File() throws Exception {
azkaban.migration.scheduler.ScheduleLoader scheduleLoader = new azkaban.migration.scheduler.JdbcScheduleLoader(props);
logger.info("Loading old schedule info from DB.");
List<azkaban.migration.scheduler.Schedule> schedules = scheduleLoader.loadSchedules();
for(azkaban.migration.scheduler.Schedule sched : schedules) {
writeScheduleFile(sched, outputDir);
}
}
private static void writeScheduleFile(azkaban.migration.scheduler.Schedule sched, File outputDir) throws IOException {
String scheduleFileName = sched.getProjectName()+"-"+sched.getFlowName();
File outputFile = new File(outputDir, scheduleFileName);
outputFile.createNewFile();
Props props = new Props();
props.put("flowName", sched.getFlowName());
props.put("projectName", sched.getProjectName());
props.put("projectId", String.valueOf(sched.getProjectId()));
props.put("period", azkaban.migration.scheduler.Schedule.createPeriodString(sched.getPeriod()));
props.put("firstScheduleTimeLong", sched.getFirstSchedTime());
props.put("timezone", sched.getTimezone().getID());
props.put("submitUser", sched.getSubmitUser());
props.put("submitTimeLong", sched.getSubmitTime());
props.put("nextExecTimeLong", sched.getNextExecTime());
ExecutionOptions executionOptions = sched.getExecutionOptions();
if(executionOptions != null) {
props.put("executionOptionsObj", JSONUtils.toJSON(executionOptions.toObject()));
}
azkaban.migration.sla.SlaOptions slaOptions = sched.getSlaOptions();
if(slaOptions != null) {
List<Map<String, Object>> settingsObj = new ArrayList<Map<String,Object>>();
List<azkaban.migration.sla.SLA.SlaSetting> settings = slaOptions.getSettings();
for(azkaban.migration.sla.SLA.SlaSetting set : settings) {
Map<String, Object> setObj = new HashMap<String, Object>();
String setId = set.getId();
azkaban.migration.sla.SLA.SlaRule rule = set.getRule();
Map<String, Object> info = new HashMap<String, Object>();
info.put(INFO_DURATION, azkaban.migration.scheduler.Schedule.createPeriodString(set.getDuration()));
info.put(INFO_EMAIL_LIST, slaOptions.getSlaEmails());
List<String> actionsList = new ArrayList<String>();
for(azkaban.migration.sla.SLA.SlaAction act : set.getActions()) {
if(act.equals(azkaban.migration.sla.SLA.SlaAction.EMAIL)) {
actionsList.add(ACTION_ALERT);
info.put(ALERT_TYPE, "email");
} else if(act.equals(azkaban.migration.sla.SLA.SlaAction.KILL)) {
actionsList.add(ACTION_CANCEL_FLOW);
}
}
setObj.put("actions", actionsList);
if(setId.equals("")) {
info.put(INFO_FLOW_NAME, sched.getFlowName());
if(rule.equals(azkaban.migration.sla.SLA.SlaRule.FINISH)) {
setObj.put("type", TYPE_FLOW_FINISH);
} else if(rule.equals(azkaban.migration.sla.SLA.SlaRule.SUCCESS)) {
setObj.put("type", TYPE_FLOW_SUCCEED);
}
} else {
info.put(INFO_JOB_NAME, setId);
if(rule.equals(azkaban.migration.sla.SLA.SlaRule.FINISH)) {
setObj.put("type", TYPE_JOB_FINISH);
} else if(rule.equals(azkaban.migration.sla.SLA.SlaRule.SUCCESS)) {
setObj.put("type", TYPE_JOB_SUCCEED);
}
}
setObj.put("info", info);
settingsObj.add(setObj);
}
props.put("slaOptionsObj", JSONUtils.toJSON(settingsObj));
}
props.storeLocal(outputFile);
}
@SuppressWarnings("unchecked")
private static void file2ScheduleTrigger() throws Exception {
TriggerLoader triggerLoader = new JdbcTriggerLoader(props);
for(File scheduleFile : outputDir.listFiles()) {
logger.info("Trying to load schedule from " + scheduleFile.getAbsolutePath());
if(scheduleFile.isFile()) {
Props schedProps = new Props(null, scheduleFile);
String flowName = schedProps.getString("flowName");
String projectName = schedProps.getString("projectName");
int projectId = schedProps.getInt("projectId");
long firstSchedTimeLong = schedProps.getLong("firstScheduleTimeLong");
// DateTime firstSchedTime = new DateTime(firstSchedTimeLong);
String timezoneId = schedProps.getString("timezone");
DateTimeZone timezone = DateTimeZone.forID(timezoneId);
ReadablePeriod period = Utils.parsePeriodString(schedProps.getString("period"));
// DateTime lastModifyTime = DateTime.now();
long nextExecTimeLong = schedProps.getLong("nextExecTimeLong");
// DateTime nextExecTime = new DateTime(nextExecTimeLong);
long submitTimeLong = schedProps.getLong("submitTimeLong");
// DateTime submitTime = new DateTime(submitTimeLong);
String submitUser = schedProps.getString("submitUser");
ExecutionOptions executionOptions = null;
if(schedProps.containsKey("executionOptionsObj")) {
String executionOptionsObj = schedProps.getString("executionOptionsObj");
executionOptions = ExecutionOptions.createFromObject(JSONUtils.parseJSONFromString(executionOptionsObj));
} else {
executionOptions = new ExecutionOptions();
}
List<azkaban.sla.SlaOption> slaOptions = null;
if(schedProps.containsKey("slaOptionsObj")) {
slaOptions = new ArrayList<azkaban.sla.SlaOption>();
List<Map<String, Object>> settingsObj = (List<Map<String, Object>>) JSONUtils.parseJSONFromString(schedProps.getString("slaOptionsObj"));
for(Map<String, Object> sla : settingsObj) {
String type = (String) sla.get("type");
Map<String, Object> info = (Map<String, Object>) sla.get("info");
List<String> actions = (List<String>) sla.get("actions");
azkaban.sla.SlaOption slaOption = new azkaban.sla.SlaOption(type, actions, info);
slaOptions.add(slaOption);
}
}
azkaban.scheduler.Schedule schedule = new azkaban.scheduler.Schedule(-1, projectId, projectName, flowName, "ready", firstSchedTimeLong, timezone, period, DateTime.now().getMillis(), nextExecTimeLong, submitTimeLong, submitUser, executionOptions, slaOptions);
Trigger t = scheduleToTrigger(schedule);
logger.info("Ready to insert trigger " + t.getDescription());
triggerLoader.addTrigger(t);
}
}
}
private static Trigger scheduleToTrigger(azkaban.scheduler.Schedule s) {
Condition triggerCondition = createTimeTriggerCondition(s);
Condition expireCondition = createTimeExpireCondition(s);
List<TriggerAction> actions = createActions(s);
Trigger t = new Trigger(s.getScheduleId(), s.getLastModifyTime(), s.getSubmitTime(), s.getSubmitUser(), azkaban.scheduler.ScheduleManager.triggerSource, triggerCondition, expireCondition, actions);
if(s.isRecurring()) {
t.setResetOnTrigger(true);
}
return t;
}
private static List<TriggerAction> createActions (azkaban.scheduler.Schedule s) {
List<TriggerAction> actions = new ArrayList<TriggerAction>();
ExecuteFlowAction executeAct = new ExecuteFlowAction("executeFlowAction", s.getProjectId(), s.getProjectName(), s.getFlowName(), s.getSubmitUser(), s.getExecutionOptions(), s.getSlaOptions());
actions.add(executeAct);
return actions;
}
private static Condition createTimeTriggerCondition (azkaban.scheduler.Schedule s) {
Map<String, ConditionChecker> checkers = new HashMap<String, ConditionChecker>();
ConditionChecker checker = new BasicTimeChecker("BasicTimeChecker_1", s.getFirstSchedTime(), s.getTimezone(), s.isRecurring(), s.skipPastOccurrences(), s.getPeriod());
checkers.put(checker.getId(), checker);
String expr = checker.getId() + ".eval()";
Condition cond = new Condition(checkers, expr);
return cond;
}
// if failed to trigger, auto expire?
private static Condition createTimeExpireCondition (azkaban.scheduler.Schedule s) {
Map<String, ConditionChecker> checkers = new HashMap<String, ConditionChecker>();
ConditionChecker checker = new BasicTimeChecker("BasicTimeChecker_2", s.getFirstSchedTime(), s.getTimezone(), s.isRecurring(), s.skipPastOccurrences(), s.getPeriod());
checkers.put(checker.getId(), checker);
String expr = checker.getId() + ".eval()";
Condition cond = new Condition(checkers, expr);
return cond;
}
}