azkaban-developers

changed the way migration works from schedules to triggers. azkaban2

1/9/2014 12:38:58 AM

Changes

scheduleTriggerMigration/file2Trigger/file2Trigger 5(+0 -5)

scheduleTriggerMigration/file2Trigger/lib/azkaban-2.2.jar 0(+0 -0)

scheduleTriggerMigration/file2Trigger/lib/commons-dbcp-1.4.jar 0(+0 -0)

scheduleTriggerMigration/file2Trigger/lib/commons-dbutils-1.5.jar 0(+0 -0)

scheduleTriggerMigration/file2Trigger/lib/commons-jexl-2.1.1.jar 0(+0 -0)

scheduleTriggerMigration/file2Trigger/lib/commons-logging-1.1.1.jar 0(+0 -0)

scheduleTriggerMigration/file2Trigger/lib/commons-pool-1.6.jar 0(+0 -0)

scheduleTriggerMigration/file2Trigger/lib/file2trigger.jar 0(+0 -0)

scheduleTriggerMigration/file2Trigger/lib/jackson-core-asl-1.9.5.jar 0(+0 -0)

scheduleTriggerMigration/file2Trigger/lib/jackson-mapper-asl-1.9.5.jar 0(+0 -0)

scheduleTriggerMigration/file2Trigger/lib/joda-time-2.0.jar 0(+0 -0)

scheduleTriggerMigration/file2Trigger/lib/log4j-1.2.16.jar 0(+0 -0)

scheduleTriggerMigration/schedule2File/lib/azkaban-2.1.jar 0(+0 -0)

scheduleTriggerMigration/schedule2File/lib/commons-dbcp-1.4.jar 0(+0 -0)

scheduleTriggerMigration/schedule2File/lib/commons-dbutils-1.5.jar 0(+0 -0)

scheduleTriggerMigration/schedule2File/lib/commons-io-2.4.jar 0(+0 -0)

scheduleTriggerMigration/schedule2File/lib/commons-pool-1.6.jar 0(+0 -0)

scheduleTriggerMigration/schedule2File/lib/jackson-core-asl-1.9.5.jar 0(+0 -0)

scheduleTriggerMigration/schedule2File/lib/jackson-mapper-asl-1.9.5.jar 0(+0 -0)

scheduleTriggerMigration/schedule2File/lib/joda-time-2.0.jar 0(+0 -0)

scheduleTriggerMigration/schedule2File/lib/log4j-1.2.16.jar 0(+0 -0)

scheduleTriggerMigration/schedule2File/lib/schedule2file.jar 0(+0 -0)

scheduleTriggerMigration/schedule2File/schedule2file 7(+0 -7)

Details

diff --git a/src/java/azkaban/migration/schedule2trigger/CommonParams.java b/src/java/azkaban/migration/schedule2trigger/CommonParams.java
new file mode 100644
index 0000000..0408f51
--- /dev/null
+++ b/src/java/azkaban/migration/schedule2trigger/CommonParams.java
@@ -0,0 +1,22 @@
+package azkaban.migration.schedule2trigger;
+
+public class CommonParams {
+	public static final String TYPE_FLOW_FINISH = "FlowFinish";
+	public static final String TYPE_FLOW_SUCCEED = "FlowSucceed";
+	public static final String TYPE_FLOW_PROGRESS = "FlowProgress";
+
+	public static final String TYPE_JOB_FINISH = "JobFinish";
+	public static final String TYPE_JOB_SUCCEED = "JobSucceed";
+	public static final String TYPE_JOB_PROGRESS = "JobProgress";
+
+	public static final String INFO_DURATION = "Duration";
+	public static final String INFO_FLOW_NAME = "FlowName";
+	public static final String INFO_JOB_NAME = "JobName";
+	public static final String INFO_PROGRESS_PERCENT = "ProgressPercent";
+	public static final String INFO_EMAIL_LIST = "EmailList";
+
+	// always alert
+	public static final String ALERT_TYPE = "SlaAlertType";
+	public static final String ACTION_CANCEL_FLOW = "SlaCancelFlow";
+	public static final String ACTION_ALERT = "SlaAlert";
+}
diff --git a/src/java/azkaban/migration/schedule2trigger/Schedule2Trigger.java b/src/java/azkaban/migration/schedule2trigger/Schedule2Trigger.java
new file mode 100644
index 0000000..6d1bd39
--- /dev/null
+++ b/src/java/azkaban/migration/schedule2trigger/Schedule2Trigger.java
@@ -0,0 +1,256 @@
+package azkaban.migration.schedule2trigger;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.log4j.Logger;
+import azkaban.executor.ExecutionOptions;
+import static azkaban.migration.schedule2trigger.CommonParams.*;
+import azkaban.utils.JSONUtils;
+import azkaban.utils.Props;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.joda.time.ReadablePeriod;
+import azkaban.trigger.Condition;
+import azkaban.trigger.ConditionChecker;
+import azkaban.trigger.JdbcTriggerLoader;
+import azkaban.trigger.Trigger;
+import azkaban.trigger.TriggerAction;
+import azkaban.trigger.TriggerLoader;
+import azkaban.trigger.builtin.BasicTimeChecker;
+import azkaban.trigger.builtin.ExecuteFlowAction;
+import azkaban.utils.Utils;
+
+public class Schedule2Trigger {
+	
+	private static final Logger logger = Logger.getLogger(Schedule2Trigger.class);
+	private static Props props;
+	private static File outputDir;
+	
+	public static void main(String[] args) throws Exception{
+		if(args.length < 1) {
+			printUsage();
+		}
+		
+		File confFile = new File(args[0]);
+		try {
+			logger.info("Trying to load config from " + confFile.getAbsolutePath());
+			props = loadAzkabanConfig(confFile);
+		} catch (Exception e) {
+			e.printStackTrace();
+			logger.error(e);
+			return;
+		}
+		
+		try {
+			outputDir = File.createTempFile("schedules", null);
+			logger.info("Creating temp dir for dumping existing schedules.");
+			outputDir.delete();
+			outputDir.mkdir();
+		} catch (Exception e) {
+			e.printStackTrace();
+			logger.error(e);
+			return;
+		}
+
+		try {
+			schedule2File();
+		} catch (Exception e) {
+			e.printStackTrace();
+			logger.error(e);
+			return;
+		}
+		
+		try {
+			file2ScheduleTrigger();
+		} catch (Exception e) {
+			e.printStackTrace();
+			logger.error(e);
+			return;
+		}
+		
+		logger.info("Uploaded all schedules. Removing temp dir.");
+		FileUtils.deleteDirectory(outputDir);
+		System.exit(0);
+	}
+	
+	private static Props loadAzkabanConfig(File confFile) throws IOException {
+		return new Props(null, confFile);
+	}
+	
+	private static void printUsage() {
+		System.out.println("Usage: schedule2Trigger PATH_TO_CONFIG_FILE");
+	}
+	
+	private static void schedule2File() throws Exception {
+		azkaban.migration.scheduler.ScheduleLoader scheduleLoader = new azkaban.migration.scheduler.JdbcScheduleLoader(props);
+		logger.info("Loading old schedule info from DB.");
+		List<azkaban.migration.scheduler.Schedule> schedules = scheduleLoader.loadSchedules();
+		for(azkaban.migration.scheduler.Schedule sched : schedules) {
+			writeScheduleFile(sched, outputDir);
+		}
+	}
+	
+	private static void writeScheduleFile(azkaban.migration.scheduler.Schedule sched, File outputDir) throws IOException {
+		String scheduleFileName = sched.getProjectName()+"-"+sched.getFlowName();
+		File outputFile = new File(outputDir, scheduleFileName);
+		outputFile.createNewFile();
+		Props props = new Props();
+		props.put("flowName", sched.getFlowName());
+		props.put("projectName", sched.getProjectName());
+		props.put("projectId", String.valueOf(sched.getProjectId()));
+		props.put("period", azkaban.migration.scheduler.Schedule.createPeriodString(sched.getPeriod()));
+		props.put("firstScheduleTimeLong", sched.getFirstSchedTime());
+		props.put("timezone", sched.getTimezone().getID());
+		props.put("submitUser", sched.getSubmitUser());
+		props.put("submitTimeLong", sched.getSubmitTime());
+		props.put("nextExecTimeLong", sched.getNextExecTime());
+		
+		ExecutionOptions executionOptions = sched.getExecutionOptions();
+		if(executionOptions != null) {
+			props.put("executionOptionsObj", JSONUtils.toJSON(executionOptions.toObject()));
+		}
+		
+		azkaban.migration.sla.SlaOptions slaOptions = sched.getSlaOptions();
+		if(slaOptions != null) {
+			
+			List<Map<String, Object>> settingsObj = new ArrayList<Map<String,Object>>();
+			List<azkaban.migration.sla.SLA.SlaSetting> settings = slaOptions.getSettings();
+			for(azkaban.migration.sla.SLA.SlaSetting set : settings) {
+				Map<String, Object> setObj = new HashMap<String, Object>();
+				String setId = set.getId();
+				azkaban.migration.sla.SLA.SlaRule rule = set.getRule();
+				Map<String, Object> info = new HashMap<String, Object>();
+				info.put(INFO_DURATION, azkaban.migration.scheduler.Schedule.createPeriodString(set.getDuration()));
+				info.put(INFO_EMAIL_LIST, slaOptions.getSlaEmails());
+				List<String> actionsList = new ArrayList<String>();
+				for(azkaban.migration.sla.SLA.SlaAction act : set.getActions()) {
+					if(act.equals(azkaban.migration.sla.SLA.SlaAction.EMAIL)) {
+						actionsList.add(ACTION_ALERT);
+						info.put(ALERT_TYPE, "email");
+					} else if(act.equals(azkaban.migration.sla.SLA.SlaAction.KILL)) {
+						actionsList.add(ACTION_CANCEL_FLOW);
+					}
+				}
+				setObj.put("actions", actionsList);
+				if(setId.equals("")) {
+					info.put(INFO_FLOW_NAME, sched.getFlowName());
+					if(rule.equals(azkaban.migration.sla.SLA.SlaRule.FINISH)) {
+						setObj.put("type", TYPE_FLOW_FINISH);
+					} else if(rule.equals(azkaban.migration.sla.SLA.SlaRule.SUCCESS)) {
+						setObj.put("type", TYPE_FLOW_SUCCEED);
+					}
+				} else {
+					info.put(INFO_JOB_NAME, setId);
+					if(rule.equals(azkaban.migration.sla.SLA.SlaRule.FINISH)) {
+						setObj.put("type", TYPE_JOB_FINISH);
+					} else if(rule.equals(azkaban.migration.sla.SLA.SlaRule.SUCCESS)) {
+						setObj.put("type", TYPE_JOB_SUCCEED);
+					}
+				}
+				setObj.put("info", info);
+				settingsObj.add(setObj);
+			}
+			
+			props.put("slaOptionsObj", JSONUtils.toJSON(settingsObj));
+		}
+		props.storeLocal(outputFile);
+	}
+
+	private static void file2ScheduleTrigger() throws Exception {
+		
+		TriggerLoader triggerLoader = new JdbcTriggerLoader(props);
+		for(File scheduleFile : outputDir.listFiles()) {
+			logger.info("Trying to load schedule from " + scheduleFile.getAbsolutePath());
+			if(scheduleFile.isFile()) {
+				Props schedProps = new Props(null, scheduleFile);
+				String flowName = schedProps.getString("flowName");
+				String projectName = schedProps.getString("projectName");
+				int projectId = schedProps.getInt("projectId");
+				long firstSchedTimeLong = schedProps.getLong("firstScheduleTimeLong");
+//				DateTime firstSchedTime = new DateTime(firstSchedTimeLong);
+				String timezoneId = schedProps.getString("timezone");
+				DateTimeZone timezone = DateTimeZone.forID(timezoneId);
+				ReadablePeriod period = Utils.parsePeriodString(schedProps.getString("period"));
+//				DateTime lastModifyTime = DateTime.now();
+				long nextExecTimeLong = schedProps.getLong("nextExecTimeLong");
+//				DateTime nextExecTime = new DateTime(nextExecTimeLong);
+				long submitTimeLong = schedProps.getLong("submitTimeLong");
+//				DateTime submitTime = new DateTime(submitTimeLong);
+				String submitUser = schedProps.getString("submitUser");
+				ExecutionOptions executionOptions = null;
+				if(schedProps.containsKey("executionOptionsObj")) {
+					String executionOptionsObj = schedProps.getString("executionOptionsObj");
+					executionOptions = ExecutionOptions.createFromObject(JSONUtils.parseJSONFromString(executionOptionsObj));
+				} else {
+					executionOptions = new ExecutionOptions();
+				}
+				List<azkaban.sla.SlaOption> slaOptions = null;
+				if(schedProps.containsKey("slaOptionsObj")) {
+					slaOptions = new ArrayList<azkaban.sla.SlaOption>();
+					List<Map<String, Object>> settingsObj = (List<Map<String, Object>>) JSONUtils.parseJSONFromString(schedProps.getString("slaOptionsObj"));
+					for(Map<String, Object> sla : settingsObj) {
+						String type = (String) sla.get("type");
+						Map<String, Object> info = (Map<String, Object>) sla.get("info");
+						List<String> actions = (List<String>) sla.get("actions");
+						azkaban.sla.SlaOption slaOption = new azkaban.sla.SlaOption(type, actions, info);
+						slaOptions.add(slaOption);
+					}
+				}
+				
+				azkaban.scheduler.Schedule schedule = new azkaban.scheduler.Schedule(-1, projectId, projectName, flowName, "ready", firstSchedTimeLong, timezone, period, DateTime.now().getMillis(), nextExecTimeLong, submitTimeLong, submitUser, executionOptions, slaOptions);
+				Trigger t = scheduleToTrigger(schedule);
+				logger.info("Ready to insert trigger " + t.getDescription());
+				triggerLoader.addTrigger(t);
+				
+			}
+			
+		}
+	}
+	
+	
+	private static Trigger scheduleToTrigger(azkaban.scheduler.Schedule s) {
+		
+		Condition triggerCondition = createTimeTriggerCondition(s);
+		Condition expireCondition = createTimeExpireCondition(s);
+		List<TriggerAction> actions = createActions(s);
+		Trigger t = new Trigger(s.getScheduleId(), s.getLastModifyTime(), s.getSubmitTime(), s.getSubmitUser(), azkaban.scheduler.ScheduleManager.triggerSource, triggerCondition, expireCondition, actions);
+		if(s.isRecurring()) {
+			t.setResetOnTrigger(true);
+		}
+		return t;
+	}
+	
+	private static List<TriggerAction> createActions (azkaban.scheduler.Schedule s) {
+		List<TriggerAction> actions = new ArrayList<TriggerAction>();
+		ExecuteFlowAction executeAct = new ExecuteFlowAction("executeFlowAction", s.getProjectId(), s.getProjectName(), s.getFlowName(), s.getSubmitUser(), s.getExecutionOptions(), s.getSlaOptions());
+		actions.add(executeAct);
+		
+		return actions;
+	}
+	
+	private static Condition createTimeTriggerCondition (azkaban.scheduler.Schedule s) {
+		Map<String, ConditionChecker> checkers = new HashMap<String, ConditionChecker>();
+		ConditionChecker checker = new BasicTimeChecker("BasicTimeChecker_1", s.getFirstSchedTime(), s.getTimezone(), s.isRecurring(), s.skipPastOccurrences(), s.getPeriod());
+		checkers.put(checker.getId(), checker);
+		String expr = checker.getId() + ".eval()";
+		Condition cond = new Condition(checkers, expr);
+		return cond;
+	}
+	
+	// if failed to trigger, auto expire?
+	private static Condition createTimeExpireCondition (azkaban.scheduler.Schedule s) {
+		Map<String, ConditionChecker> checkers = new HashMap<String, ConditionChecker>();
+		ConditionChecker checker = new BasicTimeChecker("BasicTimeChecker_2", s.getFirstSchedTime(), s.getTimezone(), s.isRecurring(), s.skipPastOccurrences(), s.getPeriod());
+		checkers.put(checker.getId(), checker);
+		String expr = checker.getId() + ".eval()";
+		Condition cond = new Condition(checkers, expr);
+		return cond;
+	}
+
+}
diff --git a/src/java/azkaban/migration/scheduler/JdbcScheduleLoader.java b/src/java/azkaban/migration/scheduler/JdbcScheduleLoader.java
new file mode 100644
index 0000000..bcef168
--- /dev/null
+++ b/src/java/azkaban/migration/scheduler/JdbcScheduleLoader.java
@@ -0,0 +1,364 @@
+/*
+ * Copyright 2012 LinkedIn, Inc
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.migration.scheduler;
+
+
+import azkaban.database.DataSourceUtils;
+import azkaban.utils.GZIPUtils;
+import azkaban.utils.JSONUtils;
+import azkaban.utils.Props;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import javax.sql.DataSource;
+import org.apache.commons.dbutils.DbUtils;
+import org.apache.commons.dbutils.QueryRunner;
+import org.apache.commons.dbutils.ResultSetHandler;
+import org.apache.log4j.Logger;
+import org.joda.time.DateTimeZone;
+import org.joda.time.ReadablePeriod;
+
+@Deprecated
+public class JdbcScheduleLoader implements ScheduleLoader {
+
+	private static Logger logger = Logger.getLogger(JdbcScheduleLoader.class);
+	
+	public static enum EncodingType {
+		PLAIN(1), GZIP(2);
+
+		private int numVal;
+
+		EncodingType(int numVal) {
+			this.numVal = numVal;
+		}
+
+		public int getNumVal() {
+			return numVal;
+		}
+
+		public static EncodingType fromInteger(int x) {
+			switch (x) {
+			case 1:
+				return PLAIN;
+			case 2:
+				return GZIP;
+			default:
+				return PLAIN;
+			}
+		}
+	}
+	
+	private DataSource dataSource;
+	private EncodingType defaultEncodingType = EncodingType.GZIP;
+	
+	private static final String scheduleTableName = "schedules";
+
+	private static String SELECT_ALL_SCHEDULES =
+			"SELECT project_id, project_name, flow_name, status, first_sched_time, timezone, period, last_modify_time, next_exec_time, submit_time, submit_user, enc_type, schedule_options FROM " + scheduleTableName;
+	
+	private static String INSERT_SCHEDULE = 
+			"INSERT INTO " + scheduleTableName + " ( project_id, project_name, flow_name, status, first_sched_time, timezone, period, last_modify_time, next_exec_time, submit_time, submit_user, enc_type, schedule_options) values (?,?,?,?,?,?,?,?,?,?,?,?,?)";
+	
+	private static String REMOVE_SCHEDULE_BY_KEY = 
+			"DELETE FROM " + scheduleTableName + " WHERE project_id=? AND flow_name=?";
+	
+	private static String UPDATE_SCHEDULE_BY_KEY = 
+			"UPDATE " + scheduleTableName + " SET status=?, first_sched_time=?, timezone=?, period=?, last_modify_time=?, next_exec_time=?, submit_time=?, submit_user=?, enc_type=?, schedule_options=? WHERE project_id=? AND flow_name=?";
+	
+	private static String UPDATE_NEXT_EXEC_TIME = 
+			"UPDATE " + scheduleTableName + " SET next_exec_time=? WHERE project_id=? AND flow_name=?";
+
+	private Connection getConnection() throws ScheduleManagerException {
+		Connection connection = null;
+		try {
+			connection = dataSource.getConnection();
+		} catch (Exception e) {
+			DbUtils.closeQuietly(connection);
+			throw new ScheduleManagerException("Error getting DB connection.", e);
+		}
+		
+		return connection;
+	}
+	
+	public EncodingType getDefaultEncodingType() {
+		return defaultEncodingType;
+	}
+
+	public void setDefaultEncodingType(EncodingType defaultEncodingType) {
+		this.defaultEncodingType = defaultEncodingType;
+	}
+	
+	public JdbcScheduleLoader(Props props) {
+		String databaseType = props.getString("database.type");
+		
+		if (databaseType.equals("mysql")) {
+			int port = props.getInt("mysql.port");
+			String host = props.getString("mysql.host");
+			String database = props.getString("mysql.database");
+			String user = props.getString("mysql.user");
+			String password = props.getString("mysql.password");
+			int numConnections = props.getInt("mysql.numconnections");
+			
+			dataSource = DataSourceUtils.getMySQLDataSource(host, port, database, user, password, numConnections);
+		}
+	}
+
+	@Override
+	public List<Schedule> loadSchedules() throws ScheduleManagerException {
+		logger.info("Loading all schedules from db.");
+		Connection connection = getConnection();
+
+		QueryRunner runner = new QueryRunner();
+		ResultSetHandler<List<Schedule>> handler = new ScheduleResultHandler();
+	
+		List<Schedule> schedules;
+		
+		try {
+			schedules = runner.query(connection, SELECT_ALL_SCHEDULES, handler);
+		} catch (SQLException e) {
+			logger.error(SELECT_ALL_SCHEDULES + " failed.");
+
+			DbUtils.closeQuietly(connection);
+			throw new ScheduleManagerException("Loading schedules from db failed. ", e);
+		} finally {
+			DbUtils.closeQuietly(connection);
+		}
+		
+		logger.info("Now trying to update the schedules");
+		
+		// filter the schedules
+        Iterator<Schedule> scheduleIterator = schedules.iterator();
+        while (scheduleIterator.hasNext()) {
+            Schedule sched = scheduleIterator.next();
+			if(!sched.updateTime()) {
+				logger.info("Schedule " + sched.getScheduleName() + " was scheduled before azkaban start, skipping it.");
+				scheduleIterator.remove();
+				removeSchedule(sched);
+			}
+			else {
+				logger.info("Recurring schedule, need to update next exec time");
+				try {
+					updateNextExecTime(sched);
+				} catch (Exception e) {
+					e.printStackTrace();
+					throw new ScheduleManagerException("Update next execution time failed.", e);
+				} 
+				logger.info("Schedule " + sched.getScheduleName() + " loaded and updated.");
+			}
+		}
+		
+		
+				
+		logger.info("Loaded " + schedules.size() + " schedules.");
+		
+		return schedules;
+	}
+
+	@Override
+	public void removeSchedule(Schedule s) throws ScheduleManagerException {		
+		logger.info("Removing schedule " + s.getScheduleName() + " from db.");
+
+		QueryRunner runner = new QueryRunner(dataSource);
+	
+		try {
+			int removes =  runner.update(REMOVE_SCHEDULE_BY_KEY, s.getProjectId(), s.getFlowName());
+			if (removes == 0) {
+				throw new ScheduleManagerException("No schedule has been removed.");
+			}
+		} catch (SQLException e) {
+			logger.error(REMOVE_SCHEDULE_BY_KEY + " failed.");
+			throw new ScheduleManagerException("Remove schedule " + s.getScheduleName() + " from db failed. ", e);
+		}
+	}
+	
+	
+	public void insertSchedule(Schedule s) throws ScheduleManagerException {
+		logger.info("Inserting schedule " + s.getScheduleName() + " into db.");
+		insertSchedule(s, defaultEncodingType);
+	}
+
+	public void insertSchedule(Schedule s, EncodingType encType) throws ScheduleManagerException {
+		
+		String json = JSONUtils.toJSON(s.optionsToObject());
+		byte[] data = null;
+		try {
+			byte[] stringData = json.getBytes("UTF-8");
+			data = stringData;
+	
+			if (encType == EncodingType.GZIP) {
+				data = GZIPUtils.gzipBytes(stringData);
+			}
+			logger.debug("NumChars: " + json.length() + " UTF-8:" + stringData.length + " Gzip:"+ data.length);
+		}
+		catch (IOException e) {
+			throw new ScheduleManagerException("Error encoding the schedule options. " + s.getScheduleName());
+		}
+		
+		QueryRunner runner = new QueryRunner(dataSource);
+		try {
+			int inserts =  runner.update( 
+					INSERT_SCHEDULE, 
+					s.getProjectId(),
+					s.getProjectName(),
+					s.getFlowName(), 
+					s.getStatus(), 
+					s.getFirstSchedTime(), 
+					s.getTimezone().getID(), 
+					Schedule.createPeriodString(s.getPeriod()), 
+					s.getLastModifyTime(), 
+					s.getNextExecTime(), 
+					s.getSubmitTime(), 
+					s.getSubmitUser(),
+					encType.getNumVal(),
+					data);
+			if (inserts == 0) {
+				throw new ScheduleManagerException("No schedule has been inserted.");
+			}
+		} catch (SQLException e) {
+			logger.error(INSERT_SCHEDULE + " failed.");
+			throw new ScheduleManagerException("Insert schedule " + s.getScheduleName() + " into db failed. ", e);
+		}
+	}
+	
+	@Override
+	public void updateNextExecTime(Schedule s) throws ScheduleManagerException 
+	{
+		logger.info("Update schedule " + s.getScheduleName() + " into db. ");
+		Connection connection = getConnection();
+		QueryRunner runner = new QueryRunner();
+		try {
+			
+			runner.update(connection, UPDATE_NEXT_EXEC_TIME, s.getNextExecTime(), s.getProjectId(), s.getFlowName()); 
+		} catch (SQLException e) {
+			e.printStackTrace();
+			logger.error(UPDATE_NEXT_EXEC_TIME + " failed.", e);
+			throw new ScheduleManagerException("Update schedule " + s.getScheduleName() + " into db failed. ", e);
+		} finally {
+			DbUtils.closeQuietly(connection);
+		}
+	}
+	
+	@Override
+	public void updateSchedule(Schedule s) throws ScheduleManagerException {
+		logger.info("Updating schedule " + s.getScheduleName() + " into db.");
+		updateSchedule(s, defaultEncodingType);
+	}
+		
+	public void updateSchedule(Schedule s, EncodingType encType) throws ScheduleManagerException {
+
+		String json = JSONUtils.toJSON(s.optionsToObject());
+		byte[] data = null;
+		try {
+			byte[] stringData = json.getBytes("UTF-8");
+			data = stringData;
+	
+			if (encType == EncodingType.GZIP) {
+				data = GZIPUtils.gzipBytes(stringData);
+			}
+			logger.debug("NumChars: " + json.length() + " UTF-8:" + stringData.length + " Gzip:"+ data.length);
+		}
+		catch (IOException e) {
+			throw new ScheduleManagerException("Error encoding the schedule options " + s.getScheduleName());
+		}
+
+		QueryRunner runner = new QueryRunner(dataSource);
+	
+		try {
+			int updates =  runner.update( 
+					UPDATE_SCHEDULE_BY_KEY, 
+					s.getStatus(), 
+					s.getFirstSchedTime(), 
+					s.getTimezone().getID(), 
+					Schedule.createPeriodString(s.getPeriod()), 
+					s.getLastModifyTime(), 
+					s.getNextExecTime(), 
+					s.getSubmitTime(), 
+					s.getSubmitUser(), 	
+					encType.getNumVal(),
+					data,
+					s.getProjectId(), 
+					s.getFlowName());
+			if (updates == 0) {
+				throw new ScheduleManagerException("No schedule has been updated.");
+			}
+		} catch (SQLException e) {
+			logger.error(UPDATE_SCHEDULE_BY_KEY + " failed.");
+			throw new ScheduleManagerException("Update schedule " + s.getScheduleName() + " into db failed. ", e);
+		}
+	}
+
+	public class ScheduleResultHandler implements ResultSetHandler<List<Schedule>> {
+		@Override
+		public List<Schedule> handle(ResultSet rs) throws SQLException {
+			if (!rs.next()) {
+				return Collections.<Schedule>emptyList();
+			}
+			
+			ArrayList<Schedule> schedules = new ArrayList<Schedule>();
+			do {
+				int projectId = rs.getInt(1);
+				String projectName = rs.getString(2);
+				String flowName = rs.getString(3);
+				String status = rs.getString(4);
+				long firstSchedTime = rs.getLong(5);
+				DateTimeZone timezone = DateTimeZone.forID(rs.getString(6));
+				ReadablePeriod period = Schedule.parsePeriodString(rs.getString(7));
+				long lastModifyTime = rs.getLong(8);
+				long nextExecTime = rs.getLong(9);
+				long submitTime = rs.getLong(10);
+				String submitUser = rs.getString(11);
+				int encodingType = rs.getInt(12);
+				byte[] data = rs.getBytes(13);
+				
+				Object optsObj = null;
+				if (data != null) {
+					EncodingType encType = EncodingType.fromInteger(encodingType);
+
+					try {
+						// Convoluted way to inflate strings. Should find common package or helper function.
+						if (encType == EncodingType.GZIP) {
+							// Decompress the sucker.
+							String jsonString = GZIPUtils.unGzipString(data, "UTF-8");
+							optsObj = JSONUtils.parseJSONFromString(jsonString);
+						}
+						else {
+							String jsonString = new String(data, "UTF-8");
+							optsObj = JSONUtils.parseJSONFromString(jsonString);
+						}	
+					} catch (IOException e) {
+						throw new SQLException("Error reconstructing schedule options " + projectName + "." + flowName);
+					}
+				}
+				
+				Schedule s = new Schedule(projectId, projectName, flowName, status, firstSchedTime, timezone, period, lastModifyTime, nextExecTime, submitTime, submitUser);
+				if (optsObj != null) {
+					s.createAndSetScheduleOptions(optsObj);
+				}
+				
+				schedules.add(s);
+			} while (rs.next());
+			
+			return schedules;
+		}
+		
+	}
+}
\ No newline at end of file
diff --git a/src/java/azkaban/migration/scheduler/Schedule.java b/src/java/azkaban/migration/scheduler/Schedule.java
new file mode 100644
index 0000000..9490243
--- /dev/null
+++ b/src/java/azkaban/migration/scheduler/Schedule.java
@@ -0,0 +1,363 @@
+/*
+ * Copyright 2012 LinkedIn, Inc
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.migration.scheduler;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.joda.time.Days;
+import org.joda.time.DurationFieldType;
+import org.joda.time.Hours;
+import org.joda.time.Minutes;
+import org.joda.time.Months;
+import org.joda.time.ReadablePeriod;
+import org.joda.time.Seconds;
+import org.joda.time.Weeks;
+
+import azkaban.executor.ExecutionOptions;
+import azkaban.migration.sla.SlaOptions;
+import azkaban.utils.Pair;
+
+@Deprecated
+public class Schedule{
+	
+//	private long projectGuid;
+//	private long flowGuid;
+	
+//	private String scheduleId;
+	
+	private int projectId;
+	private String projectName;
+	private String flowName;
+	private long firstSchedTime;
+	private DateTimeZone timezone;
+	private long lastModifyTime;
+	private ReadablePeriod period;
+	private long nextExecTime;
+	private String submitUser;
+	private String status;
+	private long submitTime;
+	
+	private ExecutionOptions executionOptions;
+	private SlaOptions slaOptions;
+	
+	public Schedule(
+						int projectId,
+						String projectName,
+						String flowName,
+						String status,
+						long firstSchedTime,
+						DateTimeZone timezone,
+						ReadablePeriod period,
+						long lastModifyTime,						
+						long nextExecTime,						
+						long submitTime,
+						String submitUser
+						) {
+		this.projectId = projectId;
+		this.projectName = projectName;
+		this.flowName = flowName;
+		this.firstSchedTime = firstSchedTime;
+		this.timezone = timezone;
+		this.lastModifyTime = lastModifyTime;
+		this.period = period;
+		this.nextExecTime = nextExecTime;
+		this.submitUser = submitUser;
+		this.status = status;
+		this.submitTime = submitTime;
+		this.executionOptions = null;
+		this.slaOptions = null;
+	}
+
+	public Schedule(
+						int projectId,
+						String projectName,
+						String flowName,
+						String status,
+						long firstSchedTime,
+						String timezoneId,
+						String period,
+						long lastModifyTime,						
+						long nextExecTime,						
+						long submitTime,
+						String submitUser,
+						ExecutionOptions executionOptions,
+						SlaOptions slaOptions
+			) {
+		this.projectId = projectId;
+		this.projectName = projectName;
+		this.flowName = flowName;
+		this.firstSchedTime = firstSchedTime;
+		this.timezone = DateTimeZone.forID(timezoneId);
+		this.lastModifyTime = lastModifyTime;
+		this.period = parsePeriodString(period);
+		this.nextExecTime = nextExecTime;
+		this.submitUser = submitUser;
+		this.status = status;
+		this.submitTime = submitTime;
+		this.executionOptions = executionOptions;
+		this.slaOptions = slaOptions;
+	}
+
+	public Schedule(
+						int projectId,
+						String projectName,
+						String flowName,
+						String status,
+						long firstSchedTime,
+						DateTimeZone timezone,
+						ReadablePeriod period,
+						long lastModifyTime,
+						long nextExecTime,
+						long submitTime,
+						String submitUser,
+						ExecutionOptions executionOptions,
+						SlaOptions slaOptions
+						) {
+		this.projectId = projectId;
+		this.projectName = projectName;
+		this.flowName = flowName;
+		this.firstSchedTime = firstSchedTime;
+		this.timezone = timezone;
+		this.lastModifyTime = lastModifyTime;
+		this.period = period;
+		this.nextExecTime = nextExecTime;
+		this.submitUser = submitUser;
+		this.status = status;
+		this.submitTime = submitTime;
+		this.executionOptions = executionOptions;
+		this.slaOptions = slaOptions;
+	}
+
+	public ExecutionOptions getExecutionOptions() {
+		return executionOptions;
+	}
+
+	public void setFlowOptions(ExecutionOptions executionOptions) {
+		this.executionOptions = executionOptions;
+	}
+
+	public SlaOptions getSlaOptions() {
+		return slaOptions;
+	}
+
+	public void setSlaOptions(SlaOptions slaOptions) {
+		this.slaOptions = slaOptions;
+	}
+
+	public String getScheduleName() {
+		return projectName + "." + flowName + " (" + projectId + ")";
+	}
+	
+	public String toString() {
+		return projectName + "." + flowName + " (" + projectId + ")" + " to be run at (starting) " + 
+				new DateTime(firstSchedTime).toDateTimeISO() + " with recurring period of " + (period == null ? "non-recurring" : createPeriodString(period));
+	}
+	
+	public Pair<Integer, String> getScheduleId() {
+		return new Pair<Integer, String>(getProjectId(), getFlowName());
+	}
+	
+	public int getProjectId() {
+		return projectId;
+	}
+
+	public String getProjectName() {
+		return projectName;
+	}
+
+	public String getFlowName() {
+		return flowName;
+	}
+
+	public long getFirstSchedTime() {
+		return firstSchedTime;
+	}
+
+	public DateTimeZone getTimezone() {
+		return timezone;
+	}
+
+	public long getLastModifyTime() {
+		return lastModifyTime;
+	}
+
+	public ReadablePeriod getPeriod() {
+		return period;
+	}
+
+	public long getNextExecTime() {
+		return nextExecTime;
+	}
+
+	public String getSubmitUser() {
+		return submitUser;
+	}
+
+	public String getStatus() {
+		return status;
+	}
+
+	public long getSubmitTime() {
+		return submitTime;
+	}
+
+	public boolean updateTime() {
+		if (new DateTime(nextExecTime).isAfterNow()) {
+			return true;
+		}
+
+		if (period != null) {
+			DateTime nextTime = getNextRuntime(nextExecTime, timezone, period);
+
+			this.nextExecTime = nextTime.getMillis();
+			return true;
+		}
+
+		return false;
+	}
+	
+	private DateTime getNextRuntime(long scheduleTime, DateTimeZone timezone, ReadablePeriod period) {
+		DateTime now = new DateTime();
+		DateTime date = new DateTime(scheduleTime).withZone(timezone);
+		int count = 0;
+		while (!now.isBefore(date)) {
+			if (count > 100000) {
+				throw new IllegalStateException(
+						"100000 increments of period did not get to present time.");
+			}
+
+			if (period == null) {
+				break;
+			} else {
+				date = date.plus(period);
+			}
+
+			count += 1;
+		}
+
+		return date;
+	}
+
+	public static ReadablePeriod parsePeriodString(String periodStr) {
+		ReadablePeriod period;
+		char periodUnit = periodStr.charAt(periodStr.length() - 1);
+		if (periodUnit == 'n') {
+			return null;
+		}
+
+		int periodInt = Integer.parseInt(periodStr.substring(0,
+				periodStr.length() - 1));
+		switch (periodUnit) {
+		case 'M':
+			period = Months.months(periodInt);
+			break;
+		case 'w':
+			period = Weeks.weeks(periodInt);
+			break;
+		case 'd':
+			period = Days.days(periodInt);
+			break;
+		case 'h':
+			period = Hours.hours(periodInt);
+			break;
+		case 'm':
+			period = Minutes.minutes(periodInt);
+			break;
+		case 's':
+			period = Seconds.seconds(periodInt);
+			break;
+		default:
+			throw new IllegalArgumentException("Invalid schedule period unit '"
+					+ periodUnit);
+		}
+
+		return period;
+	}
+
+	public static String createPeriodString(ReadablePeriod period) {
+		String periodStr = "n";
+
+		if (period == null) {
+			return "n";
+		}
+
+		if (period.get(DurationFieldType.months()) > 0) {
+			int months = period.get(DurationFieldType.months());
+			periodStr = months + "M";
+		} else if (period.get(DurationFieldType.weeks()) > 0) {
+			int weeks = period.get(DurationFieldType.weeks());
+			periodStr = weeks + "w";
+		} else if (period.get(DurationFieldType.days()) > 0) {
+			int days = period.get(DurationFieldType.days());
+			periodStr = days + "d";
+		} else if (period.get(DurationFieldType.hours()) > 0) {
+			int hours = period.get(DurationFieldType.hours());
+			periodStr = hours + "h";
+		} else if (period.get(DurationFieldType.minutes()) > 0) {
+			int minutes = period.get(DurationFieldType.minutes());
+			periodStr = minutes + "m";
+		} else if (period.get(DurationFieldType.seconds()) > 0) {
+			int seconds = period.get(DurationFieldType.seconds());
+			periodStr = seconds + "s";
+		}
+
+		return periodStr;
+	}
+	
+	
+	public Map<String,Object> optionsToObject() {
+		if(executionOptions != null || slaOptions != null) {
+			HashMap<String, Object> schedObj = new HashMap<String, Object>();
+			
+			if(executionOptions != null) {
+				schedObj.put("executionOptions", executionOptions.toObject());
+			}
+			if(slaOptions != null) {
+				schedObj.put("slaOptions", slaOptions.toObject());
+			}
+	
+			return schedObj;
+		}
+		return null;
+	}
+	
+	public void createAndSetScheduleOptions(Object obj) {
+		@SuppressWarnings("unchecked")
+		HashMap<String, Object> schedObj = (HashMap<String, Object>)obj;
+		if (schedObj.containsKey("executionOptions")) {
+			ExecutionOptions execOptions = ExecutionOptions.createFromObject(schedObj.get("executionOptions"));
+			this.executionOptions = execOptions;
+		}
+		else if (schedObj.containsKey("flowOptions")){
+			ExecutionOptions execOptions = ExecutionOptions.createFromObject(schedObj.get("flowOptions"));
+			this.executionOptions = execOptions;
+			execOptions.setConcurrentOption(ExecutionOptions.CONCURRENT_OPTION_SKIP);
+		}
+		else {
+			this.executionOptions = new ExecutionOptions();
+			this.executionOptions.setConcurrentOption(ExecutionOptions.CONCURRENT_OPTION_SKIP);
+		}
+
+		if (schedObj.containsKey("slaOptions")) {
+			SlaOptions slaOptions = SlaOptions.fromObject(schedObj.get("slaOptions"));
+			this.slaOptions = slaOptions;
+		}
+	}
+}
\ No newline at end of file
diff --git a/src/java/azkaban/migration/scheduler/ScheduleLoader.java b/src/java/azkaban/migration/scheduler/ScheduleLoader.java
new file mode 100644
index 0000000..6511d9c
--- /dev/null
+++ b/src/java/azkaban/migration/scheduler/ScheduleLoader.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2012 LinkedIn, Inc
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.migration.scheduler;
+
+import java.util.List;
+
+@Deprecated
+public interface ScheduleLoader {
+	
+	public void insertSchedule(Schedule s) throws ScheduleManagerException;
+	
+	public void updateSchedule(Schedule s) throws ScheduleManagerException;
+	
+	public List<Schedule> loadSchedules() throws ScheduleManagerException;
+	
+	public void removeSchedule(Schedule s) throws ScheduleManagerException;
+
+	public void updateNextExecTime(Schedule s) throws ScheduleManagerException;
+
+}
\ No newline at end of file
diff --git a/src/java/azkaban/migration/scheduler/ScheduleManagerException.java b/src/java/azkaban/migration/scheduler/ScheduleManagerException.java
new file mode 100644
index 0000000..f0f6705
--- /dev/null
+++ b/src/java/azkaban/migration/scheduler/ScheduleManagerException.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2012 LinkedIn, Inc
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.migration.scheduler;
+
+@Deprecated
+public class ScheduleManagerException extends Exception{
+	private static final long serialVersionUID = 1L;
+
+	public ScheduleManagerException(String message) {
+		super(message);
+	}
+	
+	public ScheduleManagerException(String message, Throwable cause) {
+		super(message, cause);
+	}
+}
\ No newline at end of file
diff --git a/src/java/azkaban/migration/sla/SLA.java b/src/java/azkaban/migration/sla/SLA.java
new file mode 100644
index 0000000..0e75965
--- /dev/null
+++ b/src/java/azkaban/migration/sla/SLA.java
@@ -0,0 +1,252 @@
+package azkaban.migration.sla;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.joda.time.DateTime;
+import org.joda.time.ReadablePeriod;
+
+import azkaban.migration.scheduler.Schedule;
+
+@Deprecated
+public class SLA {
+
+	public static enum SlaRule {
+		SUCCESS(1), FINISH(2), WAITANDCHECKJOB(3);
+
+		private int numVal;
+
+		SlaRule(int numVal) {
+			this.numVal = numVal;
+		}
+
+		public int getNumVal() {
+			return numVal;
+		}
+
+		public static SlaRule fromInteger(int x) {
+			switch (x) {
+			case 1:
+				return SUCCESS;
+			case 2:
+				return FINISH;
+			case 3:
+				return WAITANDCHECKJOB;
+			default:
+				return SUCCESS;
+			}
+		}
+	}
+	
+	public static enum SlaAction {
+		EMAIL(1), KILL(2);
+
+		private int numVal;
+
+		SlaAction(int numVal) {
+			this.numVal = numVal;
+		}
+
+		public int getNumVal() {
+			return numVal;
+		}
+
+		public static SlaAction fromInteger(int x) {
+			switch (x) {
+			case 1:
+				return EMAIL;
+			case 2:
+				return KILL;
+			default:
+				return EMAIL;
+			}
+		}
+	}
+	
+	public static class SlaSetting {
+		public String getId() {
+			return id;
+		}
+		public void setId(String id) {
+			this.id = id;
+		}
+		public ReadablePeriod getDuration() {
+			return duration;
+		}
+		public void setDuration(ReadablePeriod duration) {
+			this.duration = duration;
+		}
+		public SlaRule getRule() {
+			return rule;
+		}
+		public void setRule(SlaRule rule) {
+			this.rule = rule;
+		}
+		public List<SlaAction> getActions() {
+			return actions;
+		}
+		public void setActions(List<SlaAction> actions) {
+			this.actions = actions;
+		}
+		
+		public Object toObject() {
+			Map<String, Object> obj = new HashMap<String, Object>();
+			obj.put("id", id);
+			obj.put("duration", Schedule.createPeriodString(duration));
+//			List<String> rulesObj = new ArrayList<String>();
+//			for(SlaRule rule : rules) {
+//				rulesObj.add(rule.toString());
+//			}
+//			obj.put("rules", rulesObj);
+			obj.put("rule", rule.toString());
+			List<String> actionsObj = new ArrayList<String>();
+			for(SlaAction act : actions) {
+				actionsObj.add(act.toString());
+			}
+			obj.put("actions", actionsObj);
+			return obj;
+		}
+		
+		@SuppressWarnings("unchecked")
+		public static SlaSetting fromObject(Object obj) {
+			Map<String, Object> slaObj = (HashMap<String, Object>) obj;
+			String subId = (String) slaObj.get("id");
+			ReadablePeriod dur = Schedule.parsePeriodString((String) slaObj.get("duration"));
+//			List<String> rulesObj = (ArrayList<String>) slaObj.get("rules");
+//			List<SlaRule> slaRules = new ArrayList<SLA.SlaRule>();
+//			for(String rule : rulesObj) {
+//				slaRules.add(SlaRule.valueOf(rule));
+//			}
+			SlaRule slaRule = SlaRule.valueOf((String) slaObj.get("rule"));
+			List<String> actsObj = (ArrayList<String>) slaObj.get("actions");
+			List<SlaAction> slaActs = new ArrayList<SlaAction>();
+			for(String act : actsObj) {
+				slaActs.add(SlaAction.valueOf(act));
+			}
+			
+			SlaSetting ret = new SlaSetting();
+			ret.setId(subId);
+			ret.setDuration(dur);
+			ret.setRule(slaRule);
+			ret.setActions(slaActs);
+			return ret;
+		}
+		
+		private String id;
+		private ReadablePeriod duration;
+		private SlaRule rule = SlaRule.SUCCESS;
+		private List<SlaAction> actions;
+	}
+	
+	private int execId;
+	private String jobName;
+	private DateTime checkTime;
+	private List<String> emails;
+	private List<SlaAction> actions;
+	private List<SlaSetting> jobSettings;
+	private SlaRule rule;
+	
+	public SLA(
+			int execId,
+			String jobName,
+			DateTime checkTime,
+			List<String> emails,
+			List<SlaAction> slaActions,
+			List<SlaSetting> jobSettings,
+			SlaRule slaRule
+	) {
+		this.execId = execId;
+		this.jobName = jobName;
+		this.checkTime = checkTime;
+		this.emails = emails;
+		this.actions = slaActions;
+		this.jobSettings = jobSettings;
+		this.rule = slaRule;
+	}
+
+	public int getExecId() {
+		return execId;
+	}
+
+	public String getJobName() {
+		return jobName;
+	}
+
+	public DateTime getCheckTime() {
+		return checkTime;
+	}
+
+	public List<String> getEmails() {
+		return emails;
+	}
+
+	public List<SlaAction> getActions() {
+		return actions;
+	}
+	
+	public List<SlaSetting> getJobSettings() {
+		return jobSettings;
+	}
+
+	public SlaRule getRule() {
+		return rule;
+	}
+	
+	public String toString() {
+		return execId + " " + jobName + " to be checked at " + checkTime.toDateTimeISO();
+	}
+	
+	public Map<String,Object> optionToObject() {
+		HashMap<String, Object> slaObj = new HashMap<String, Object>();
+
+		slaObj.put("emails", emails);
+//		slaObj.put("rule", rule.toString());
+
+		List<String> actionsObj = new ArrayList<String>();
+		for(SlaAction act : actions) {
+			actionsObj.add(act.toString());
+		}
+		slaObj.put("actions", actionsObj);
+		
+		if(jobSettings != null && jobSettings.size() > 0) {
+			List<Object> settingsObj = new ArrayList<Object>();
+			for(SlaSetting set : jobSettings) {
+				settingsObj.add(set.toObject());
+			}
+			slaObj.put("jobSettings", settingsObj);
+		}
+
+		return slaObj;
+	}
+
+	@SuppressWarnings("unchecked")
+	public static SLA createSlaFromObject(int execId, String jobName, DateTime checkTime, SlaRule rule, Object obj) {
+		
+		HashMap<String, Object> slaObj = (HashMap<String,Object>)obj;
+
+		List<String> emails = (List<String>)slaObj.get("emails");
+//		SlaRule rule = SlaRule.valueOf((String)slaObj.get("rule"));
+		List<String> actsObj = (ArrayList<String>) slaObj.get("actions");
+		List<SlaAction> slaActs = new ArrayList<SlaAction>();
+		for(String act : actsObj) {
+			slaActs.add(SlaAction.valueOf(act));
+		}
+		List<SlaSetting> jobSets = null;
+		if(slaObj.containsKey("jobSettings") && slaObj.get("jobSettings") != null) {
+			jobSets = new ArrayList<SLA.SlaSetting>();
+			for(Object set : (List<Object>)slaObj.get("jobSettings")) {
+				SlaSetting jobSet = SlaSetting.fromObject(set);
+				jobSets.add(jobSet);
+			}
+		}
+		
+		return new SLA(execId, jobName, checkTime, emails, slaActs, jobSets, rule);
+	}
+
+	public void setCheckTime(DateTime time) {
+		this.checkTime = time;
+	}
+
+}
diff --git a/src/java/azkaban/migration/sla/SlaOptions.java b/src/java/azkaban/migration/sla/SlaOptions.java
new file mode 100644
index 0000000..f7b9d49
--- /dev/null
+++ b/src/java/azkaban/migration/sla/SlaOptions.java
@@ -0,0 +1,52 @@
+package azkaban.migration.sla;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import azkaban.migration.sla.SLA.SlaSetting;
+
+@Deprecated
+public class SlaOptions {
+
+	public List<String> getSlaEmails() {
+		return slaEmails;
+	}
+	public void setSlaEmails(List<String> slaEmails) {
+		this.slaEmails = slaEmails;
+	}
+	public List<SlaSetting> getSettings() {
+		return settings;
+	}
+	public void setSettings(List<SlaSetting> settings) {
+		this.settings = settings;
+	}
+	private List<String> slaEmails;
+	private List<SlaSetting> settings;
+	public Object toObject() {
+		Map<String, Object> obj = new HashMap<String, Object>();
+		obj.put("slaEmails", slaEmails);
+		List<Object> slaSettings = new ArrayList<Object>();
+		for(SlaSetting s : settings) {
+			slaSettings.add(s.toObject());
+		}
+		obj.put("settings", slaSettings);
+		return obj;
+	}
+	@SuppressWarnings("unchecked")
+	public static SlaOptions fromObject(Object object) {
+		if(object != null) {
+			SlaOptions slaOptions = new SlaOptions();
+			Map<String, Object> obj = (HashMap<String, Object>) object;
+			slaOptions.setSlaEmails((List<String>) obj.get("slaEmails"));
+			List<SlaSetting> slaSets = new ArrayList<SlaSetting>();
+			for(Object set: (List<Object>)obj.get("settings")) {
+				slaSets.add(SlaSetting.fromObject(set));
+			}
+			slaOptions.setSettings(slaSets);
+			return slaOptions;
+		}
+		return null;			
+	}
+}
\ No newline at end of file
diff --git a/src/package/webserver/bin/schedule2trigger.sh b/src/package/webserver/bin/schedule2trigger.sh
new file mode 100644
index 0000000..1178cf3
--- /dev/null
+++ b/src/package/webserver/bin/schedule2trigger.sh
@@ -0,0 +1,4 @@
+#!/bin/bash
+
+java -cp "lib/*:extlib/*" azkaban.migration.schedule2trigger.Schedule2Trigger conf/azkaban.properties
+