Schedule2Trigger.java

260 lines | 10.126 kB Blame History Raw Download
package azkaban.migration.schedule2trigger;

import java.io.File;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.commons.io.FileUtils;
import org.apache.log4j.Logger;
import azkaban.executor.ExecutionOptions;
import static azkaban.migration.schedule2trigger.CommonParams.*;
import azkaban.utils.JSONUtils;
import azkaban.utils.Props;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.ReadablePeriod;
import azkaban.trigger.Condition;
import azkaban.trigger.ConditionChecker;
import azkaban.trigger.JdbcTriggerLoader;
import azkaban.trigger.Trigger;
import azkaban.trigger.TriggerAction;
import azkaban.trigger.TriggerLoader;
import azkaban.trigger.builtin.BasicTimeChecker;
import azkaban.trigger.builtin.ExecuteFlowAction;
import azkaban.utils.Utils;

@SuppressWarnings("deprecation")
public class Schedule2Trigger {
	
	private static final Logger logger = Logger.getLogger(Schedule2Trigger.class);
	private static Props props;
	private static File outputDir;
	
	public static void main(String[] args) throws Exception{
		if(args.length < 1) {
			printUsage();
		}
		
		File confFile = new File(args[0]);
		try {
			logger.info("Trying to load config from " + confFile.getAbsolutePath());
			props = loadAzkabanConfig(confFile);
		} catch (Exception e) {
			e.printStackTrace();
			logger.error(e);
			return;
		}
		
		try {
			outputDir = File.createTempFile("schedules", null);
			logger.info("Creating temp dir for dumping existing schedules.");
			outputDir.delete();
			outputDir.mkdir();
		} catch (Exception e) {
			e.printStackTrace();
			logger.error(e);
			return;
		}

		try {
			schedule2File();
		} catch (Exception e) {
			e.printStackTrace();
			logger.error(e);
			return;
		}
		
		try {
			file2ScheduleTrigger();
		} catch (Exception e) {
			e.printStackTrace();
			logger.error(e);
			return;
		}
		
		logger.info("Uploaded all schedules. Removing temp dir.");
		FileUtils.deleteDirectory(outputDir);
		System.exit(0);
	}
	
	private static Props loadAzkabanConfig(File confFile) throws IOException {
		return new Props(null, confFile);
	}
	
	private static void printUsage() {
		System.out.println("Usage: schedule2Trigger PATH_TO_CONFIG_FILE");
	}
	
	private static void schedule2File() throws Exception {
		azkaban.migration.scheduler.ScheduleLoader scheduleLoader = new azkaban.migration.scheduler.JdbcScheduleLoader(props);
		logger.info("Loading old schedule info from DB.");
		List<azkaban.migration.scheduler.Schedule> schedules = scheduleLoader.loadSchedules();
		for(azkaban.migration.scheduler.Schedule sched : schedules) {
			writeScheduleFile(sched, outputDir);
		}
	}
	
	private static void writeScheduleFile(azkaban.migration.scheduler.Schedule sched, File outputDir) throws IOException {
		String scheduleFileName = sched.getProjectName()+"-"+sched.getFlowName();
		File outputFile = new File(outputDir, scheduleFileName);
		outputFile.createNewFile();
		Props props = new Props();
		props.put("flowName", sched.getFlowName());
		props.put("projectName", sched.getProjectName());
		props.put("projectId", String.valueOf(sched.getProjectId()));
		props.put("period", azkaban.migration.scheduler.Schedule.createPeriodString(sched.getPeriod()));
		props.put("firstScheduleTimeLong", sched.getFirstSchedTime());
		props.put("timezone", sched.getTimezone().getID());
		props.put("submitUser", sched.getSubmitUser());
		props.put("submitTimeLong", sched.getSubmitTime());
		props.put("nextExecTimeLong", sched.getNextExecTime());
		
		ExecutionOptions executionOptions = sched.getExecutionOptions();
		if(executionOptions != null) {
			props.put("executionOptionsObj", JSONUtils.toJSON(executionOptions.toObject()));
		}
		
		azkaban.migration.sla.SlaOptions slaOptions = sched.getSlaOptions();
		if(slaOptions != null) {
			
			List<Map<String, Object>> settingsObj = new ArrayList<Map<String,Object>>();
			List<azkaban.migration.sla.SLA.SlaSetting> settings = slaOptions.getSettings();
			for(azkaban.migration.sla.SLA.SlaSetting set : settings) {
				Map<String, Object> setObj = new HashMap<String, Object>();
				String setId = set.getId();
				azkaban.migration.sla.SLA.SlaRule rule = set.getRule();
				Map<String, Object> info = new HashMap<String, Object>();
				info.put(INFO_DURATION, azkaban.migration.scheduler.Schedule.createPeriodString(set.getDuration()));
				info.put(INFO_EMAIL_LIST, slaOptions.getSlaEmails());
				List<String> actionsList = new ArrayList<String>();
				for(azkaban.migration.sla.SLA.SlaAction act : set.getActions()) {
					if(act.equals(azkaban.migration.sla.SLA.SlaAction.EMAIL)) {
						actionsList.add(ACTION_ALERT);
						info.put(ALERT_TYPE, "email");
					} else if(act.equals(azkaban.migration.sla.SLA.SlaAction.KILL)) {
						actionsList.add(ACTION_CANCEL_FLOW);
					}
				}
				setObj.put("actions", actionsList);
				if(setId.equals("")) {
					info.put(INFO_FLOW_NAME, sched.getFlowName());
					if(rule.equals(azkaban.migration.sla.SLA.SlaRule.FINISH)) {
						setObj.put("type", TYPE_FLOW_FINISH);
					} else if(rule.equals(azkaban.migration.sla.SLA.SlaRule.SUCCESS)) {
						setObj.put("type", TYPE_FLOW_SUCCEED);
					}
				} else {
					info.put(INFO_JOB_NAME, setId);
					if(rule.equals(azkaban.migration.sla.SLA.SlaRule.FINISH)) {
						setObj.put("type", TYPE_JOB_FINISH);
					} else if(rule.equals(azkaban.migration.sla.SLA.SlaRule.SUCCESS)) {
						setObj.put("type", TYPE_JOB_SUCCEED);
					}
				}
				setObj.put("info", info);
				settingsObj.add(setObj);
			}
			
			props.put("slaOptionsObj", JSONUtils.toJSON(settingsObj));
		}
		props.storeLocal(outputFile);
	}

	@SuppressWarnings("unchecked")
	private static void file2ScheduleTrigger() throws Exception {
		
		TriggerLoader triggerLoader = new JdbcTriggerLoader(props);
		for(File scheduleFile : outputDir.listFiles()) {
			logger.info("Trying to load schedule from " + scheduleFile.getAbsolutePath());
			if(scheduleFile.isFile()) {
				Props schedProps = new Props(null, scheduleFile);
				String flowName = schedProps.getString("flowName");
				String projectName = schedProps.getString("projectName");
				int projectId = schedProps.getInt("projectId");
				long firstSchedTimeLong = schedProps.getLong("firstScheduleTimeLong");
//				DateTime firstSchedTime = new DateTime(firstSchedTimeLong);
				String timezoneId = schedProps.getString("timezone");
				DateTimeZone timezone = DateTimeZone.forID(timezoneId);
				ReadablePeriod period = Utils.parsePeriodString(schedProps.getString("period"));
//				DateTime lastModifyTime = DateTime.now();
				long nextExecTimeLong = schedProps.getLong("nextExecTimeLong");
//				DateTime nextExecTime = new DateTime(nextExecTimeLong);
				long submitTimeLong = schedProps.getLong("submitTimeLong");
//				DateTime submitTime = new DateTime(submitTimeLong);
				String submitUser = schedProps.getString("submitUser");
				ExecutionOptions executionOptions = null;
				if(schedProps.containsKey("executionOptionsObj")) {
					String executionOptionsObj = schedProps.getString("executionOptionsObj");
					executionOptions = ExecutionOptions.createFromObject(JSONUtils.parseJSONFromString(executionOptionsObj));
				} else {
					executionOptions = new ExecutionOptions();
				}
				List<azkaban.sla.SlaOption> slaOptions = null;
				if(schedProps.containsKey("slaOptionsObj")) {
					slaOptions = new ArrayList<azkaban.sla.SlaOption>();
					List<Map<String, Object>> settingsObj = (List<Map<String, Object>>) JSONUtils.parseJSONFromString(schedProps.getString("slaOptionsObj"));
					for(Map<String, Object> sla : settingsObj) {
						String type = (String) sla.get("type");
						Map<String, Object> info = (Map<String, Object>) sla.get("info");
						List<String> actions = (List<String>) sla.get("actions");
						azkaban.sla.SlaOption slaOption = new azkaban.sla.SlaOption(type, actions, info);
						slaOptions.add(slaOption);
					}
				}
				
				azkaban.scheduler.Schedule schedule = new azkaban.scheduler.Schedule(-1, projectId, projectName, flowName, "ready", firstSchedTimeLong, timezone, period, DateTime.now().getMillis(), nextExecTimeLong, submitTimeLong, submitUser, executionOptions, slaOptions);
				Trigger t = scheduleToTrigger(schedule);
				logger.info("Ready to insert trigger " + t.getDescription());
				triggerLoader.addTrigger(t);
				
			}
			
		}
	}
	
	
	private static Trigger scheduleToTrigger(azkaban.scheduler.Schedule s) {
		
		Condition triggerCondition = createTimeTriggerCondition(s);
		Condition expireCondition = createTimeExpireCondition(s);
		List<TriggerAction> actions = createActions(s);
		Trigger t = new Trigger(s.getScheduleId(), s.getLastModifyTime(), s.getSubmitTime(), s.getSubmitUser(), azkaban.scheduler.ScheduleManager.triggerSource, triggerCondition, expireCondition, actions);
		if(s.isRecurring()) {
			t.setResetOnTrigger(true);
		}
		return t;
	}
	
	private static List<TriggerAction> createActions (azkaban.scheduler.Schedule s) {
		List<TriggerAction> actions = new ArrayList<TriggerAction>();
		ExecuteFlowAction executeAct = new ExecuteFlowAction("executeFlowAction", s.getProjectId(), s.getProjectName(), s.getFlowName(), s.getSubmitUser(), s.getExecutionOptions(), s.getSlaOptions());
		actions.add(executeAct);
		
		return actions;
	}
	
	private static Condition createTimeTriggerCondition (azkaban.scheduler.Schedule s) {
		Map<String, ConditionChecker> checkers = new HashMap<String, ConditionChecker>();
		ConditionChecker checker = new BasicTimeChecker("BasicTimeChecker_1", s.getFirstSchedTime(), s.getTimezone(), s.isRecurring(), s.skipPastOccurrences(), s.getPeriod());
		checkers.put(checker.getId(), checker);
		String expr = checker.getId() + ".eval()";
		Condition cond = new Condition(checkers, expr);
		return cond;
	}
	
	// if failed to trigger, auto expire?
	private static Condition createTimeExpireCondition (azkaban.scheduler.Schedule s) {
		Map<String, ConditionChecker> checkers = new HashMap<String, ConditionChecker>();
		ConditionChecker checker = new BasicTimeChecker("BasicTimeChecker_2", s.getFirstSchedTime(), s.getTimezone(), s.isRecurring(), s.skipPastOccurrences(), s.getPeriod());
		checkers.put(checker.getId(), checker);
		String expr = checker.getId() + ".eval()";
		Condition cond = new Condition(checkers, expr);
		return cond;
	}

}