azkaban-uncached

Merge branch 'master' of github.com:azkaban/azkaban2 into

2/13/2013 10:28:53 AM

Changes

src/web/css/azkaban.css 292(+277 -15)

Details

diff --git a/src/java/azkaban/executor/ExecutorManager.java b/src/java/azkaban/executor/ExecutorManager.java
index a5add77..35208cb 100644
--- a/src/java/azkaban/executor/ExecutorManager.java
+++ b/src/java/azkaban/executor/ExecutorManager.java
@@ -631,7 +631,7 @@ public class ExecutorManager {
 		return flow;
 	}
 	
-	private boolean isFinished(ExecutableFlow flow) {
+	public boolean isFinished(ExecutableFlow flow) {
 		switch(flow.getStatus()) {
 		case SUCCEEDED:
 		case FAILED:
diff --git a/src/java/azkaban/scheduler/JdbcScheduleLoader.java b/src/java/azkaban/scheduler/JdbcScheduleLoader.java
index caba099..ee9c812 100644
--- a/src/java/azkaban/scheduler/JdbcScheduleLoader.java
+++ b/src/java/azkaban/scheduler/JdbcScheduleLoader.java
@@ -17,12 +17,14 @@
 package azkaban.scheduler;
 
 
+import java.io.IOException;
 import java.sql.Connection;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 
 import javax.sql.DataSource;
 
@@ -36,58 +38,86 @@ import org.joda.time.ReadablePeriod;
 import org.joda.time.format.DateTimeFormat;
 import org.joda.time.format.DateTimeFormatter;
 
+import azkaban.scheduler.Schedule.FlowOptions;
+import azkaban.scheduler.Schedule.SlaOptions;
+import azkaban.sla.SLA;
+import azkaban.sla.SLAManagerException;
+import azkaban.sla.JdbcSLALoader.EncodingType;
 import azkaban.utils.DataSourceUtils;
+import azkaban.utils.GZIPUtils;
+import azkaban.utils.JSONUtils;
 import azkaban.utils.Props;
 
 
 public class JdbcScheduleLoader implements ScheduleLoader {
+
+	private static Logger logger = Logger.getLogger(JdbcScheduleLoader.class);
 	
-	private DataSource dataSource;
-	private static DateTimeFormatter FILE_DATEFORMAT = DateTimeFormat.forPattern("yyyy-MM-dd.HH.mm.ss.SSS");
-    private static Logger logger = Logger.getLogger(JdbcScheduleLoader.class);
+	public static enum EncodingType {
+		PLAIN(1), GZIP(2);
+
+		private int numVal;
+
+		EncodingType(int numVal) {
+			this.numVal = numVal;
+		}
+
+		public int getNumVal() {
+			return numVal;
+		}
+
+		public static EncodingType fromInteger(int x) {
+			switch (x) {
+			case 1:
+				return PLAIN;
+			case 2:
+				return GZIP;
+			default:
+				return PLAIN;
+			}
+		}
+	}
 	
-	private static final String SCHEDULE = "schedule";
-	// schedule ids
-//	private static final String PROJECTGUID = "projectGuid";
-//	private static final String FLOWGUID = "flowGuid";
-//	
-//	private static final String SCHEDULEID = "scheduleId";
-	private static final String PROJECTID = "projectId";
-	private static final String PROJECTNAME = "projectName";
-	private static final String FLOWNAME = "flowName";
-	// status
-	private static final String STATUS = "status";
-	// schedule info
-	private static final String FIRSTSCHEDTIME = "firstSchedTime";
-	private static final String TIMEZONE = "timezone";
-	private static final String PERIOD = "period";
-	private static final String LASTMODIFYTIME = "lastModifyTime";
-	private static final String NEXTEXECTIME = "nextExecTime";
-	// auditing info
-	private static final String SUBMITTIME = "submitTime";
-	private static final String SUBMITUSER = "userSubmit";
+	private DataSource dataSource;
+	private EncodingType defaultEncodingType = EncodingType.GZIP;
 	
 	private static final String scheduleTableName = "schedules";
 
-	private static String SELECT_SCHEDULE_BY_KEY = 
-			"SELECT project_id, project_name, flow_name, status, first_sched_time, timezone, period, last_modify_time, next_exec_time, submit_time, submit_user FROM " + scheduleTableName + " WHERE project_id=? AND flow_name=?";
-	
 	private static String SELECT_ALL_SCHEDULES =
-			"SELECT project_id, project_name, flow_name, status, first_sched_time, timezone, period, last_modify_time, next_exec_time, submit_time, submit_user FROM " + scheduleTableName;
+			"SELECT project_id, project_name, flow_name, status, first_sched_time, timezone, period, last_modify_time, next_exec_time, submit_time, submit_user, enc_type, schedule_options FROM " + scheduleTableName;
 	
 	private static String INSERT_SCHEDULE = 
-			"INSERT INTO " + scheduleTableName + " ( project_id, project_name, flow_name, status, first_sched_time, timezone, period, last_modify_time, next_exec_time, submit_time, submit_user) values (?,?,?,?,?,?,?,?,?,?,?)";
+			"INSERT INTO " + scheduleTableName + " ( project_id, project_name, flow_name, status, first_sched_time, timezone, period, last_modify_time, next_exec_time, submit_time, submit_user, enc_type, schedule_options) values (?,?,?,?,?,?,?,?,?,?,?,?,?)";
 	
 	private static String REMOVE_SCHEDULE_BY_KEY = 
 			"DELETE FROM " + scheduleTableName + " WHERE project_id=? AND flow_name=?";
 	
 	private static String UPDATE_SCHEDULE_BY_KEY = 
-			"UPDATE " + scheduleTableName + " SET status=?, first_sched_time=?, timezone=?, period=?, last_modify_time=?, next_exec_time=?, submit_time=?, submit_user=? WHERE project_id=? AND flow_name=?";
+			"UPDATE " + scheduleTableName + " SET status=?, first_sched_time=?, timezone=?, period=?, last_modify_time=?, next_exec_time=?, submit_time=?, submit_user=?, enc_type=?, schedule_options=? WHERE project_id=? AND flow_name=?";
 	
 	private static String UPDATE_NEXT_EXEC_TIME = 
 			"UPDATE " + scheduleTableName + " SET next_exec_time=? WHERE project_id=? AND flow_name=?";
 
+	private Connection getConnection() throws ScheduleManagerException {
+		Connection connection = null;
+		try {
+			connection = dataSource.getConnection();
+		} catch (Exception e) {
+			DbUtils.closeQuietly(connection);
+			throw new ScheduleManagerException("Error getting DB connection.", e);
+		}
+		
+		return connection;
+	}
+	
+	public EncodingType getDefaultEncodingType() {
+		return defaultEncodingType;
+	}
 
+	public void setDefaultEncodingType(EncodingType defaultEncodingType) {
+		this.defaultEncodingType = defaultEncodingType;
+	}
+	
 	public JdbcScheduleLoader(Props props) {
 		String databaseType = props.getString("database.type");
 		
@@ -106,15 +136,7 @@ public class JdbcScheduleLoader implements ScheduleLoader {
 	@Override
 	public List<Schedule> loadSchedules() throws ScheduleManagerException {
 		logger.info("Loading all schedules from db.");
-		Connection connection = null;
-		try {
-			connection = dataSource.getConnection();
-		} catch (SQLException e1) {
-			logger.error("Failed to get db connection!");
-			e1.printStackTrace();
-			DbUtils.closeQuietly(connection);
-			throw new ScheduleManagerException("Failed to get db connection!", e1);
-		}
+		Connection connection = getConnection();
 
 		QueryRunner runner = new QueryRunner();
 		ResultSetHandler<List<Schedule>> handler = new ScheduleResultHandler();
@@ -139,7 +161,7 @@ public class JdbcScheduleLoader implements ScheduleLoader {
 			}
 			else {
 				try {
-					updateNextExecTime(sched, connection);
+					updateNextExecTime(sched);
 				} catch (Exception e) {
 					DbUtils.closeQuietly(connection);
 					throw new ScheduleManagerException("Update next execution time failed.", e);
@@ -172,10 +194,29 @@ public class JdbcScheduleLoader implements ScheduleLoader {
 		}
 	}
 	
-	@Override
+	
 	public void insertSchedule(Schedule s) throws ScheduleManagerException {
 		logger.info("Inserting schedule " + s.getScheduleName() + " into db.");
+		insertSchedule(s, defaultEncodingType);
+	}
 
+	public void insertSchedule(Schedule s, EncodingType encType) throws ScheduleManagerException {
+		
+		String json = JSONUtils.toJSON(s.optionsToObject());
+		byte[] data = null;
+		try {
+			byte[] stringData = json.getBytes("UTF-8");
+			data = stringData;
+	
+			if (encType == EncodingType.GZIP) {
+				data = GZIPUtils.gzipBytes(stringData);
+			}
+			logger.debug("NumChars: " + json.length() + " UTF-8:" + stringData.length + " Gzip:"+ data.length);
+		}
+		catch (IOException e) {
+			throw new ScheduleManagerException("Error encoding the schedule options. " + s.getScheduleName());
+		}
+		
 		QueryRunner runner = new QueryRunner(dataSource);
 		try {
 			int inserts =  runner.update( 
@@ -190,7 +231,9 @@ public class JdbcScheduleLoader implements ScheduleLoader {
 					s.getLastModifyTime(), 
 					s.getNextExecTime(), 
 					s.getSubmitTime(), 
-					s.getSubmitUser());
+					s.getSubmitUser(),
+					encType.getNumVal(),
+					data);
 			if (inserts == 0) {
 				throw new ScheduleManagerException("No schedule has been inserted.");
 			}
@@ -200,8 +243,10 @@ public class JdbcScheduleLoader implements ScheduleLoader {
 		}
 	}
 	
-	private void updateNextExecTime(Schedule s, Connection connection) throws ScheduleManagerException 
+	@Override
+	public void updateNextExecTime(Schedule s) throws ScheduleManagerException 
 	{
+		Connection connection = getConnection();
 		QueryRunner runner = new QueryRunner();
 		try {
 			runner.update(connection, UPDATE_NEXT_EXEC_TIME, s.getNextExecTime(), s.getProjectId(), s.getFlowName()); 
@@ -214,6 +259,25 @@ public class JdbcScheduleLoader implements ScheduleLoader {
 	@Override
 	public void updateSchedule(Schedule s) throws ScheduleManagerException {
 		logger.info("Updating schedule " + s.getScheduleName() + " into db.");
+		updateSchedule(s, defaultEncodingType);
+	}
+		
+	public void updateSchedule(Schedule s, EncodingType encType) throws ScheduleManagerException {
+
+		String json = JSONUtils.toJSON(s.optionsToObject());
+		byte[] data = null;
+		try {
+			byte[] stringData = json.getBytes("UTF-8");
+			data = stringData;
+	
+			if (encType == EncodingType.GZIP) {
+				data = GZIPUtils.gzipBytes(stringData);
+			}
+			logger.debug("NumChars: " + json.length() + " UTF-8:" + stringData.length + " Gzip:"+ data.length);
+		}
+		catch (IOException e) {
+			throw new ScheduleManagerException("Error encoding the schedule options " + s.getScheduleName());
+		}
 
 		QueryRunner runner = new QueryRunner(dataSource);
 	
@@ -227,7 +291,9 @@ public class JdbcScheduleLoader implements ScheduleLoader {
 					s.getLastModifyTime(), 
 					s.getNextExecTime(), 
 					s.getSubmitTime(), 
-					s.getSubmitUser(), 
+					s.getSubmitUser(), 	
+					encType.getNumVal(),
+					data,
 					s.getProjectId(), 
 					s.getFlowName());
 			if (updates == 0) {
@@ -238,46 +304,6 @@ public class JdbcScheduleLoader implements ScheduleLoader {
 			throw new ScheduleManagerException("Update schedule " + s.getScheduleName() + " into db failed. ", e);
 		}
 	}
-	
-//	private Schedule fromJSON(HashMap<String, Object> obj) {
-//		long projectGuid = Long.valueOf((String) obj.get(PROJECTGUID));
-//		String projectId = (String) obj.get(PROJECTID);
-//		long flowGuid = Long.valueOf((String) obj.get(FLOWGUID));
-//		String flowId = (String) obj.get(FLOWID);
-//		String status = (String) obj.get(STATUS);
-//		long firstSchedTime = Long.valueOf((String) obj.get(FIRSTSCHEDTIME));
-//		String timezone = (String) obj.get(TIMEZONE);
-//		String period = (String) obj.get(PERIOD);
-//		long lastModifyTime = Long.valueOf((String) obj.get(LASTMODIFYTIME));
-//		long nextExecTime = Long.valueOf((String) obj.get(NEXTEXECTIME));
-//		long submitTime = Long.valueOf((String) obj.get(SUBMITTIME));
-//		String submitUser = (String) obj.get(SUBMITUSER);
-//		
-//		return new Schedule(projectId, flowId, status, firstSchedTime, timezone, period, lastModifyTime, nextExecTime, submitTime, submitUser);
-//
-//	}
-//
-//	private HashMap<String, Object> toJSON(Schedule s) {
-//		HashMap<String, Object> object = new HashMap<String, Object>();
-////		object.put(PROJECTGUID, s.getProjectGuid());
-//		object.put(SCHEDULEID, s.getScheduleId());
-//		object.put(PROJECTID, s.getProjectId());
-////		object.put(FLOWGUID, s.getFlowGuid());
-//		object.put(FLOWID, s.getFlowId());
-//		
-//		object.put(STATUS, s.getStatus());
-//		
-//		object.put(FIRSTSCHEDTIME, s.getFirstSchedTime());
-//		object.put(TIMEZONE, s.getTimezone());
-//		object.put(PERIOD, s.getPeriod());
-//		
-//		object.put(LASTMODIFYTIME, s.getLastModifyTime());
-//		object.put(NEXTEXECTIME, s.getNextExecTime());
-//		object.put(SUBMITTIME, s.getSubmitTime());
-//		object.put(SUBMITUSER, s.getSubmitUser());
-//
-//		return object;
-//	}
 
 	public class ScheduleResultHandler implements ResultSetHandler<List<Schedule>> {
 		@Override
@@ -299,8 +325,33 @@ public class JdbcScheduleLoader implements ScheduleLoader {
 				long nextExecTime = rs.getLong(9);
 				long submitTime = rs.getLong(10);
 				String submitUser = rs.getString(11);
+				int encodingType = rs.getInt(12);
+				byte[] data = rs.getBytes(13);
+				
+				FlowOptions flowOptions = null;
+				SlaOptions slaOptions = null;
+				if (data != null) {
+					EncodingType encType = EncodingType.fromInteger(encodingType);
+					Object optsObj;
+					try {
+						// Convoluted way to inflate strings. Should find common package or helper function.
+						if (encType == EncodingType.GZIP) {
+							// Decompress the sucker.
+							String jsonString = GZIPUtils.unGzipString(data, "UTF-8");
+							optsObj = JSONUtils.parseJSONFromString(jsonString);
+						}
+						else {
+							String jsonString = new String(data, "UTF-8");
+							optsObj = JSONUtils.parseJSONFromString(jsonString);
+						}						
+						flowOptions = Schedule.createFlowOptionFromObject(optsObj);
+						slaOptions = Schedule.createSlaOptionFromObject(optsObj);
+					} catch (IOException e) {
+						throw new SQLException("Error reconstructing schedule options " + projectName + "." + flowName);
+					}
+				}
 				
-				Schedule s = new Schedule(projectId, projectName, flowName, status, firstSchedTime, timezone, period, lastModifyTime, nextExecTime, submitTime, submitUser);
+				Schedule s = new Schedule(projectId, projectName, flowName, status, firstSchedTime, timezone, period, lastModifyTime, nextExecTime, submitTime, submitUser, flowOptions, slaOptions);
 				
 				schedules.add(s);
 			} while (rs.next());
diff --git a/src/java/azkaban/scheduler/Schedule.java b/src/java/azkaban/scheduler/Schedule.java
index a8c0c91..1c80c58 100644
--- a/src/java/azkaban/scheduler/Schedule.java
+++ b/src/java/azkaban/scheduler/Schedule.java
@@ -16,7 +16,10 @@
 
 package azkaban.scheduler;
 
-
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 
 import org.joda.time.DateTime;
 import org.joda.time.DateTimeZone;
@@ -29,11 +32,153 @@ import org.joda.time.ReadablePeriod;
 import org.joda.time.Seconds;
 import org.joda.time.Weeks;
 
+import azkaban.executor.ExecutableFlow.FailureAction;
+import azkaban.scheduler.Schedule.FlowOptions;
+import azkaban.scheduler.Schedule.SlaOptions;
+import azkaban.sla.SLA.SlaSetting;
 import azkaban.utils.Pair;
 
 public class Schedule{
 	
+	
+	public static class FlowOptions {
+		
+		public List<String> getFailureEmails() {
+			return failureEmails;
+		}
+		public void setFailureEmails(List<String> failureEmails) {
+			this.failureEmails = failureEmails;
+		}
+		public List<String> getSuccessEmails() {
+			return successEmails;
+		}
+		public void setSuccessEmails(List<String> successEmails) {
+			this.successEmails = successEmails;
+		}
+		public FailureAction getFailureAction() {
+			return failureAction;
+		}
+		public void setFailureAction(FailureAction failureAction) {
+			this.failureAction = failureAction;
+		}
+		public boolean isnotifyOnFirstFailure() {
+			return notifyOnFirstFailure;
+		}
+		public void setNotifyOnFirstFailure(boolean notifyOnFirstFailure) {
+			this.notifyOnFirstFailure = notifyOnFirstFailure;
+		}
+		public boolean isnotifyOnLastFailure() {
+			return notifyOnLastFailure;
+		}
+		public void setNotifyOnLastFailure(boolean notifyOnLastFailure) {
+			this.notifyOnLastFailure = notifyOnLastFailure;
+		}
+		public Map<String, String> getFlowOverride() {
+			return flowOverride;
+		}
+		public void setFlowOverride(Map<String, String> flowOverride) {
+			this.flowOverride = flowOverride;
+		}
+		public List<String> getDisabledJobs() {
+			return disabledJobs;
+		}
+		public void setDisabledJobs(List<String> disabledJobs) {
+			this.disabledJobs = disabledJobs;
+		}
+		private List<String> failureEmails;
+		private List<String> successEmails;
+		private FailureAction failureAction = FailureAction.FINISH_CURRENTLY_RUNNING;
+		private boolean notifyOnFirstFailure;
+		private boolean notifyOnLastFailure;
+		Map<String, String> flowOverride;
+		private List<String> disabledJobs;
+		public Object toObject() {
+			Map<String, Object> obj = new HashMap<String, Object>();
+			obj.put("failureEmails", failureEmails);
+			obj.put("successEmails", successEmails);
+			obj.put("failureAction", failureAction.toString());
+			obj.put("notifyOnFirstFailure", notifyOnFirstFailure);
+			obj.put("notifyOnLastFailure", notifyOnLastFailure);
+			obj.put("flowOverride", flowOverride);
+			obj.put("disabledJobs", disabledJobs);
+			return obj;
+		}
+		@SuppressWarnings("unchecked")
+		public static FlowOptions fromObject(Object object) {
+			if(object != null) {
+				FlowOptions flowOptions = new FlowOptions();
+				Map<String, Object> obj = (HashMap<String, Object>) object;
+				if(obj.containsKey("failureEmails")) {
+					flowOptions.setFailureEmails((List<String>) obj.get("failureEmails"));
+				}
+				if(obj.containsKey("successEmails")) {
+					flowOptions.setSuccessEmails((List<String>) obj.get("SuccessEmails"));
+				}
+				if(obj.containsKey("failureAction")) {
+					flowOptions.setFailureAction(FailureAction.valueOf((String)obj.get("failureAction")));
+				}
+				if(obj.containsKey("notifyOnFirstFailure")) {
+					flowOptions.setNotifyOnFirstFailure((Boolean)obj.get("notifyOnFirstFailure"));
+				}
+				if(obj.containsKey("notifyOnLastFailure")) {
+					flowOptions.setNotifyOnFirstFailure((Boolean)obj.get("notifyOnLastFailure"));
+				}
+				if(obj.containsKey("flowOverride")) {
+					flowOptions.setFlowOverride((Map<String, String>) obj.get("flowOverride"));
+				}
+				if(obj.containsKey("disabledJobs")) {
+					flowOptions.setDisabledJobs((List<String>) obj.get("disabledJobs"));
+				}
+				return flowOptions;
+			}
+			return null;
+		}
+	}
 
+	public static class SlaOptions {
+
+		public List<String> getSlaEmails() {
+			return slaEmails;
+		}
+		public void setSlaEmails(List<String> slaEmails) {
+			this.slaEmails = slaEmails;
+		}
+		public List<SlaSetting> getSettings() {
+			return settings;
+		}
+		public void setSettings(List<SlaSetting> settings) {
+			this.settings = settings;
+		}
+		private List<String> slaEmails;
+		private List<SlaSetting> settings;
+		public Object toObject() {
+			Map<String, Object> obj = new HashMap<String, Object>();
+			obj.put("slaEmails", slaEmails);
+			List<Object> slaSettings = new ArrayList<Object>();
+			for(SlaSetting s : settings) {
+				slaSettings.add(s.toObject());
+			}
+			obj.put("settings", slaSettings);
+			return obj;
+		}
+		@SuppressWarnings("unchecked")
+		public static SlaOptions fromObject(Object object) {
+			if(object != null) {
+				SlaOptions slaOptions = new SlaOptions();
+				Map<String, Object> obj = (HashMap<String, Object>) object;
+				slaOptions.setSlaEmails((List<String>) obj.get("slaEmails"));
+				List<SlaSetting> slaSets = new ArrayList<SlaSetting>();
+				for(Object set: (List<Object>)obj.get("settings")) {
+					slaSets.add(SlaSetting.fromObject(set));
+				}
+				slaOptions.setSettings(slaSets);
+				return slaOptions;
+			}
+			return null;			
+		}
+		
+	}
+	
 //	private long projectGuid;
 //	private long flowGuid;
 	
@@ -51,6 +196,9 @@ public class Schedule{
 	private String status;
 	private long submitTime;
 	
+	private FlowOptions flowOptions;
+	private SlaOptions slaOptions;
+	
 	public Schedule(
 						int projectId,
 						String projectName,
@@ -75,31 +223,84 @@ public class Schedule{
 		this.submitUser = submitUser;
 		this.status = status;
 		this.submitTime = submitTime;
+		this.flowOptions = null;
+		this.slaOptions = null;
 	}
-	
+
 	public Schedule(
-			int projectId,
-			String projectName,
-			String flowName,
-			String status,
-			long firstSchedTime,
-			String timezone,
-			String period,
-			long lastModifyTime,						
-			long nextExecTime,						
-			long submitTime,
-			String submitUser) {
+						int projectId,
+						String projectName,
+						String flowName,
+						String status,
+						long firstSchedTime,
+						String timezoneId,
+						String period,
+						long lastModifyTime,						
+						long nextExecTime,						
+						long submitTime,
+						String submitUser,
+						FlowOptions flowOptions,
+						SlaOptions slaOptions
+			) {
 		this.projectId = projectId;
 		this.projectName = projectName;
 		this.flowName = flowName;
 		this.firstSchedTime = firstSchedTime;
-		this.timezone = DateTimeZone.forID(timezone);
+		this.timezone = DateTimeZone.forID(timezoneId);
 		this.lastModifyTime = lastModifyTime;
 		this.period = parsePeriodString(period);
 		this.nextExecTime = nextExecTime;
 		this.submitUser = submitUser;
 		this.status = status;
 		this.submitTime = submitTime;
+		this.flowOptions = flowOptions;
+		this.slaOptions = slaOptions;
+	}
+
+	public Schedule(
+						int projectId,
+						String projectName,
+						String flowName,
+						String status,
+						long firstSchedTime,
+						DateTimeZone timezone,
+						ReadablePeriod period,
+						long lastModifyTime,						
+						long nextExecTime,						
+						long submitTime,
+						String submitUser,
+						FlowOptions flowOptions,
+						SlaOptions slaOptions
+						) {
+		this.projectId = projectId;
+		this.projectName = projectName;
+		this.flowName = flowName;
+		this.firstSchedTime = firstSchedTime;
+		this.timezone = timezone;
+		this.lastModifyTime = lastModifyTime;
+		this.period = period;
+		this.nextExecTime = nextExecTime;
+		this.submitUser = submitUser;
+		this.status = status;
+		this.submitTime = submitTime;
+		this.flowOptions = flowOptions;
+		this.slaOptions = slaOptions;
+	}
+
+	public FlowOptions getFlowOptions() {
+		return flowOptions;
+	}
+
+	public void setFlowOptions(FlowOptions flowOptions) {
+		this.flowOptions = flowOptions;
+	}
+
+	public SlaOptions getSlaOptions() {
+		return slaOptions;
+	}
+
+	public void setSlaOptions(SlaOptions slaOptions) {
+		this.slaOptions = slaOptions;
 	}
 
 	public String getScheduleName() {
@@ -155,85 +356,77 @@ public class Schedule{
 	}
 
 	public boolean updateTime() {
-        if (new DateTime(nextExecTime).isAfterNow()) {
-            return true;
-        }
-
-        if (period != null) {
-            DateTime nextTime = getNextRuntime(nextExecTime, timezone, period);
-
-            this.nextExecTime = nextTime.getMillis();
-            return true;
-        }
-
-        return false;
-    }
-//	public String getFlowId(){
-//		return this.scheduleId.split("\\.")[1];
-//	}
-//
-//	public String getProjectId(){
-//		return this.scheduleId.split("\\.")[0];
-//	}
-	
+		if (new DateTime(nextExecTime).isAfterNow()) {
+			return true;
+		}
+
+		if (period != null) {
+			DateTime nextTime = getNextRuntime(nextExecTime, timezone, period);
+
+			this.nextExecTime = nextTime.getMillis();
+			return true;
+		}
+
+		return false;
+	}
 	
-    private DateTime getNextRuntime(long scheduleTime, DateTimeZone timezone, ReadablePeriod period) {
-        DateTime now = new DateTime();
-        DateTime date = new DateTime(scheduleTime).withZone(timezone);
-        int count = 0;
-        while (!now.isBefore(date)) {
-            if (count > 100000) {
-                throw new IllegalStateException(
-                        "100000 increments of period did not get to present time.");
-            }
-
-            if (period == null) {
-                break;
-            } else {
-                date = date.plus(period);
-            }
-
-            count += 1;
-        }
-
-        return date;
-    }
-
-    public static ReadablePeriod parsePeriodString(String periodStr) {
-        ReadablePeriod period;
-        char periodUnit = periodStr.charAt(periodStr.length() - 1);
-        if (periodUnit == 'n') {
-            return null;
-        }
-
-        int periodInt = Integer.parseInt(periodStr.substring(0,
-                periodStr.length() - 1));
-        switch (periodUnit) {
-        case 'M':
-            period = Months.months(periodInt);
-            break;
-        case 'w':
-            period = Weeks.weeks(periodInt);
-            break;
-        case 'd':
-            period = Days.days(periodInt);
-            break;
-        case 'h':
-            period = Hours.hours(periodInt);
-            break;
-        case 'm':
-            period = Minutes.minutes(periodInt);
-            break;
-        case 's':
-            period = Seconds.seconds(periodInt);
-            break;
-        default:
-            throw new IllegalArgumentException("Invalid schedule period unit '"
-                    + periodUnit);
-        }
-
-        return period;
-    }
+	private DateTime getNextRuntime(long scheduleTime, DateTimeZone timezone, ReadablePeriod period) {
+		DateTime now = new DateTime();
+		DateTime date = new DateTime(scheduleTime).withZone(timezone);
+		int count = 0;
+		while (!now.isBefore(date)) {
+			if (count > 100000) {
+				throw new IllegalStateException(
+						"100000 increments of period did not get to present time.");
+			}
+
+			if (period == null) {
+				break;
+			} else {
+				date = date.plus(period);
+			}
+
+			count += 1;
+		}
+
+		return date;
+	}
+
+	public static ReadablePeriod parsePeriodString(String periodStr) {
+		ReadablePeriod period;
+		char periodUnit = periodStr.charAt(periodStr.length() - 1);
+		if (periodUnit == 'n') {
+			return null;
+		}
+
+		int periodInt = Integer.parseInt(periodStr.substring(0,
+				periodStr.length() - 1));
+		switch (periodUnit) {
+		case 'M':
+			period = Months.months(periodInt);
+			break;
+		case 'w':
+			period = Weeks.weeks(periodInt);
+			break;
+		case 'd':
+			period = Days.days(periodInt);
+			break;
+		case 'h':
+			period = Hours.hours(periodInt);
+			break;
+		case 'm':
+			period = Minutes.minutes(periodInt);
+			break;
+		case 's':
+			period = Seconds.seconds(periodInt);
+			break;
+		default:
+			throw new IllegalArgumentException("Invalid schedule period unit '"
+					+ periodUnit);
+		}
+
+		return period;
+	}
 
 	public static String createPeriodString(ReadablePeriod period) {
 		String periodStr = "n";
@@ -264,7 +457,44 @@ public class Schedule{
 
 		return periodStr;
 	}
-    
+	
 
+	public Map<String,Object> optionsToObject() {
+		if(flowOptions != null || slaOptions != null) {
+			HashMap<String, Object> schedObj = new HashMap<String, Object>();
+			
+			if(flowOptions != null) {
+				schedObj.put("flowOptions", flowOptions.toObject());
+			}
+			if(slaOptions != null) {
+				schedObj.put("slaOptions", slaOptions.toObject());
+			}
+	
+			return schedObj;
+		}
+		return null;
+	}
+	
+	@SuppressWarnings("unchecked")
+	public static FlowOptions createFlowOptionFromObject(Object obj) {
+		if(obj != null) {
+			Map<String, Object> options = (HashMap<String, Object>) obj;
+			if(options.containsKey("flowOptions")) {
+				return FlowOptions.fromObject(options.get("flowOptions"));
+			}
+		}		
+		return null;
+	}
 	
+	@SuppressWarnings("unchecked")
+	public static SlaOptions createSlaOptionFromObject(Object obj) {
+		if(obj != null) {
+			Map<String, Object> options = (HashMap<String, Object>) obj;
+			if(options.containsKey("slaOptions")) {
+				return SlaOptions.fromObject(options.get("slaOptions"));
+			}
+		}		
+		return null;
+	}
+
 }
\ No newline at end of file
diff --git a/src/java/azkaban/scheduler/ScheduleLoader.java b/src/java/azkaban/scheduler/ScheduleLoader.java
index 4b55b4e..9caed47 100644
--- a/src/java/azkaban/scheduler/ScheduleLoader.java
+++ b/src/java/azkaban/scheduler/ScheduleLoader.java
@@ -16,6 +16,7 @@
 
 package azkaban.scheduler;
 
+import java.sql.Connection;
 import java.util.List;
 
 public interface ScheduleLoader {
@@ -28,4 +29,6 @@ public interface ScheduleLoader {
 	
 	public void removeSchedule(Schedule s) throws ScheduleManagerException;
 
+	public void updateNextExecTime(Schedule s) throws ScheduleManagerException;
+
 }
\ No newline at end of file
diff --git a/src/java/azkaban/scheduler/ScheduleManager.java b/src/java/azkaban/scheduler/ScheduleManager.java
index ef0f076..ad67def 100644
--- a/src/java/azkaban/scheduler/ScheduleManager.java
+++ b/src/java/azkaban/scheduler/ScheduleManager.java
@@ -17,6 +17,7 @@
 package azkaban.scheduler;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Comparator;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -34,6 +35,8 @@ import org.joda.time.format.DateTimeFormatter;
 import azkaban.executor.ExecutableFlow;
 import azkaban.executor.ExecutorManager;
 import azkaban.executor.ExecutorManagerException;
+import azkaban.executor.ExecutableFlow.FailureAction;
+import azkaban.executor.ExecutableFlow.Status;
 
 import azkaban.flow.Flow;
 import azkaban.jobExecutor.utils.JobExecutionException;
@@ -41,6 +44,12 @@ import azkaban.project.Project;
 import azkaban.project.ProjectManager;
 
 
+import azkaban.scheduler.Schedule.FlowOptions;
+import azkaban.scheduler.Schedule.SlaOptions;
+import azkaban.sla.SLA.SlaAction;
+import azkaban.sla.SLA.SlaRule;
+import azkaban.sla.SLAManager;
+import azkaban.sla.SLA.SlaSetting;
 import azkaban.utils.Pair;
 
 
@@ -60,6 +69,7 @@ public class ScheduleManager {
 	private final ScheduleRunner runner;
 	private final ExecutorManager executorManager;
 	private final ProjectManager projectManager;
+	private final SLAManager slaManager;
 
 	/**
 	 * Give the schedule manager a loader class that will properly load the
@@ -69,10 +79,12 @@ public class ScheduleManager {
 	 */
 	public ScheduleManager(ExecutorManager executorManager,
 							ProjectManager projectManager, 
+							SLAManager slaManager,
 							ScheduleLoader loader) 
 	{
 		this.executorManager = executorManager;
 		this.projectManager = projectManager;
+		this.slaManager = slaManager;
 		this.loader = loader;
 		this.runner = new ScheduleRunner();
 
@@ -169,9 +181,11 @@ public class ScheduleManager {
 			final long lastModifyTime,
 			final long nextExecTime,
 			final long submitTime,
-			final String submitUser
+			final String submitUser,
+			final FlowOptions flowOptions,
+			final SlaOptions slaOptions
 			) {
-		Schedule sched = new Schedule(projectId, projectName, flowName, status, firstSchedTime, timezone, period, lastModifyTime, nextExecTime, submitTime, submitUser);
+		Schedule sched = new Schedule(projectId, projectName, flowName, status, firstSchedTime, timezone, period, lastModifyTime, nextExecTime, submitTime, submitUser, flowOptions, slaOptions);
 		logger.info("Scheduling flow '" + sched.getScheduleName() + "' for "
 				+ _dateFormat.print(firstSchedTime) + " with a period of "
 				+ period == null ? "(non-recurring)" : period);
@@ -362,19 +376,33 @@ public class ScheduleManager {
 										// Create ExecutableFlow
 										ExecutableFlow exflow = new ExecutableFlow(flow);
 										exflow.setSubmitUser(runningSched.getSubmitUser());
-	
-										// TODO make disabled in scheduled flow
-										// Map<String, String> paramGroup =
-										// this.getParamGroup(req, "disabled");
-										// for (Map.Entry<String, String> entry:
-										// paramGroup.entrySet()) {
-										// boolean nodeDisabled =
-										// Boolean.parseBoolean(entry.getValue());
-										// exflow.setStatus(entry.getKey(),
-										// nodeDisabled ? Status.DISABLED :
-										// Status.READY);
-										// }
-	
+										
+										FlowOptions flowOptions = runningSched.getFlowOptions();
+										
+										if(flowOptions != null) {
+											if (flowOptions.getFailureAction() != null) {
+												exflow.setFailureAction(flowOptions.getFailureAction());
+											}
+											if (flowOptions.getFailureEmails() != null) {
+												exflow.setFailureEmails(flowOptions.getFailureEmails());
+											}
+											if (flowOptions.getSuccessEmails() != null) {
+												exflow.setSuccessEmails(flowOptions.getSuccessEmails());
+											}
+											exflow.setNotifyOnFirstFailure(flowOptions.isnotifyOnFirstFailure());
+											exflow.setNotifyOnLastFailure(flowOptions.isnotifyOnLastFailure());
+											
+											exflow.addFlowParameters(flowOptions.getFlowOverride());
+											
+											List<String> disabled = flowOptions.getDisabledJobs();
+											// Setup disabled
+											if(disabled != null) {
+												for (String job : disabled) {
+													exflow.setNodeStatus(job, Status.DISABLED);
+												}
+											}
+										}
+										
 										try {
 											executorManager.submitExecutableFlow(exflow);
 											logger.info("Scheduler has invoked " + exflow.getExecutionId());
@@ -383,9 +411,30 @@ public class ScheduleManager {
 											logger.error(e.getMessage());
 											return;
 										}
-									} catch (JobExecutionException e) {
-										logger.info("Could not run flow. " + e.getMessage());
+										
+										SlaOptions slaOptions = runningSched.getSlaOptions();
+										
+										if(slaOptions != null) {
+											// submit flow slas
+											List<SlaSetting> jobsettings = new ArrayList<SlaSetting>();
+											for(SlaSetting set : slaOptions.getSettings()) {
+												if(set.getId().equals("")) {
+													DateTime checkTime = new DateTime(runningSched.getNextExecTime()).plus(set.getDuration());
+													slaManager.submitSla(exflow.getExecutionId(), "", checkTime, slaOptions.getSlaEmails(), set.getActions(), null, set.getRule());
+												}
+												else {
+													jobsettings.add(set);
+												}
+											}
+											if(jobsettings.size() > 0) {
+												slaManager.submitSla(exflow.getExecutionId(), "", DateTime.now(), slaOptions.getSlaEmails(), new ArrayList<SlaAction>(), jobsettings, SlaRule.WAITANDCHECKJOB);
+											}
+										}
+										
+									} catch (Exception e) {
+										logger.info("Scheduler failed to run job. " + e.getMessage() + e.getCause());
 									}
+									
 								}
 								
 								removeRunnerSchedule(runningSched);
diff --git a/src/java/azkaban/sla/JdbcSLALoader.java b/src/java/azkaban/sla/JdbcSLALoader.java
new file mode 100644
index 0000000..cf6cd2f
--- /dev/null
+++ b/src/java/azkaban/sla/JdbcSLALoader.java
@@ -0,0 +1,330 @@
+package azkaban.sla;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import javax.sql.DataSource;
+
+import org.apache.commons.dbutils.DbUtils;
+import org.apache.commons.dbutils.QueryRunner;
+import org.apache.commons.dbutils.ResultSetHandler;
+import org.apache.log4j.Logger;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.joda.time.ReadablePeriod;
+
+import azkaban.scheduler.ScheduleManagerException;
+import azkaban.sla.SLA.SlaRule;
+import azkaban.utils.DataSourceUtils;
+import azkaban.utils.GZIPUtils;
+import azkaban.utils.JSONUtils;
+import azkaban.utils.Props;
+
+public class JdbcSLALoader implements SLALoader {
+
+	private static final Logger logger = Logger.getLogger(JdbcSLALoader.class);
+	
+	/**
+	 * Used for when we store text data. Plain uses UTF8 encoding.
+	 */
+	public static enum EncodingType {
+		PLAIN(1), GZIP(2);
+
+		private int numVal;
+
+		EncodingType(int numVal) {
+			this.numVal = numVal;
+		}
+
+		public int getNumVal() {
+			return numVal;
+		}
+
+		public static EncodingType fromInteger(int x) {
+			switch (x) {
+			case 1:
+				return PLAIN;
+			case 2:
+				return GZIP;
+			default:
+				return PLAIN;
+			}
+		}
+	}
+	
+	private DataSource dataSource;
+	private EncodingType defaultEncodingType = EncodingType.GZIP;
+	
+	private static String slaTblName = "active_sla";
+	
+	final private static String LOAD_ALL_SLA =
+			"SELECT exec_id, job_name, check_time, rule, enc_type, options FROM " + slaTblName;
+	
+	final private static String INSERT_SLA = 
+			"INSERT INTO " + slaTblName + " ( exec_id, job_name, check_time, rule, enc_type, options) values (?,?,?,?,?,?)";
+
+//	final private static String UPDATE_SLA = 
+//			"UPDATE " + slaTblName + " SET exec_id, job_name, check_time, rule, enc_type, options) values (?,?,?,?,?,?)";
+//	
+	// use "" for flow
+	final private static String REMOVE_SLA = 
+			"DELETE FROM " + slaTblName + " WHERE exec_id=? AND job_name=? AND check_time=? AND rule=?";
+	
+	public JdbcSLALoader(Props props) {
+		String databaseType = props.getString("database.type");
+
+		if (databaseType.equals("mysql")) {
+			int port = props.getInt("mysql.port");
+			String host = props.getString("mysql.host");
+			String database = props.getString("mysql.database");
+			String user = props.getString("mysql.user");
+			String password = props.getString("mysql.password");
+			int numConnections = props.getInt("mysql.numconnections");
+			
+			dataSource = DataSourceUtils.getMySQLDataSource(host, port, database, user, password, numConnections);
+		}
+	}
+
+	private Connection getConnection() throws SLAManagerException {
+		Connection connection = null;
+		try {
+			connection = dataSource.getConnection();
+		} catch (Exception e) {
+			DbUtils.closeQuietly(connection);
+			throw new SLAManagerException("Error getting DB connection.", e);
+		}
+		return connection;
+	}
+	
+	public EncodingType getDefaultEncodingType() {
+		return defaultEncodingType;
+	}
+
+	public void setDefaultEncodingType(EncodingType defaultEncodingType) {
+		this.defaultEncodingType = defaultEncodingType;
+	}
+
+	@Override
+	public List<SLA> loadSLAs() throws SLAManagerException {
+		logger.info("Loading all SLAs from db.");
+		
+		Connection connection = getConnection();
+		List<SLA> SLAs;
+		try{
+			SLAs= loadSLAs(connection, defaultEncodingType);
+		}
+		catch(SLAManagerException e) {
+			throw new SLAManagerException("Failed to load SLAs" + e.getCause());
+		}
+		finally{
+			DbUtils.closeQuietly(connection);
+		}
+				
+		logger.info("Loaded " + SLAs.size() + " SLAs.");
+		
+		return SLAs;
+
+	}
+
+	public List<SLA> loadSLAs(Connection connection, EncodingType encType) throws SLAManagerException {
+		logger.info("Loading all SLAs from db.");
+		
+		QueryRunner runner = new QueryRunner();
+		ResultSetHandler<List<SLA>> handler = new SLAResultHandler();
+		List<SLA> SLAs;
+		
+		try {
+			SLAs = runner.query(connection, LOAD_ALL_SLA, handler);
+		} catch (SQLException e) {
+			logger.error(LOAD_ALL_SLA + " failed.");
+			throw new SLAManagerException("Load SLAs from db failed. "+ e.getCause());
+		}
+		
+		return SLAs;
+
+	}
+	
+	@Override
+	public void removeSLA(SLA s) throws SLAManagerException {
+		Connection connection = getConnection();
+		try{
+			removeSLA(connection, s);
+		}
+		catch(SLAManagerException e) {
+			logger.error(LOAD_ALL_SLA + " failed.");
+			throw new SLAManagerException("Load SLAs from db failed. "+ e.getCause());
+		}
+		finally {
+			DbUtils.closeQuietly(connection);
+		}
+		
+	}
+	
+	private void removeSLA(Connection connection, SLA s) throws SLAManagerException {
+
+		logger.info("Removing SLA " + s.toString() + " from db.");
+
+		QueryRunner runner = new QueryRunner(dataSource);
+	
+		try {
+			int removes =  runner.update(REMOVE_SLA, s.getExecId(), s.getJobName(), s.getCheckTime().getMillis(), s.getRule().getNumVal());
+			if (removes == 0) {
+				throw new SLAManagerException("No schedule has been removed.");
+			}
+		} catch (SQLException e) {
+			logger.error("Remove SLA failed. " + s.toString());
+			throw new SLAManagerException("Remove SLA " + s.toString() + " from db failed. "+ e.getCause());
+		}
+		
+	}
+
+	@Override
+	public	void insertSLA(SLA s) throws SLAManagerException {
+		Connection connection = getConnection();
+		try{
+			insertSLA(connection, s, defaultEncodingType);
+		}
+		catch(SLAManagerException e){
+			logger.error("Insert SLA failed. " + s.toString());
+			throw new SLAManagerException("Insert SLA " + s.toString() + " into db failed. "+ e.getCause() + e.getMessage() + e.getStackTrace());
+		}
+		finally {
+			DbUtils.closeQuietly(connection);
+		}
+	}
+	
+	private void insertSLA(Connection connection, SLA s, EncodingType encType) throws SLAManagerException {
+
+		logger.debug("Inserting new SLA into DB. " + s.toString());
+		
+		QueryRunner runner = new QueryRunner();
+
+		String json = JSONUtils.toJSON(s.optionToObject());
+		byte[] data = null;
+		try {
+			byte[] stringData = json.getBytes("UTF-8");
+			data = stringData;
+	
+			if (encType == EncodingType.GZIP) {
+				data = GZIPUtils.gzipBytes(stringData);
+			}
+			logger.debug("NumChars: " + json.length() + " UTF-8:" + stringData.length + " Gzip:"+ data.length);
+		}
+		catch (IOException e) {
+			throw new SLAManagerException("Error encoding the sla options.");
+		}
+		
+		try {
+			int inserts = runner.update(connection, INSERT_SLA, s.getExecId(), s.getJobName(), s.getCheckTime().getMillis(), s.getRule().getNumVal(), encType.getNumVal(), data);
+
+			if (inserts == 0) {
+				throw new SLAManagerException("No SLA has been inserted. Insertion failed.");
+			}
+		} catch (SQLException e) {
+			logger.error("Insert SLA failed.");
+			throw new SLAManagerException("Insert sla " + s.toString() + " into db failed. " + e.getCause());
+		}
+		
+	}
+	
+//	@Override
+//	public	void updateSLA(SLA s) throws SLAManagerException {
+//		Connection connection = getConnection();
+//		try{
+//			insertSLA(connection, s, defaultEncodingType);
+//		}
+//		catch(SLAManagerException e){
+//			logger.error("Insert SLA failed. " + s.toString());
+//			throw new SLAManagerException("Insert SLA " + s.toString() + " into db failed. "+ e.getCause() + e.getMessage() + e.getStackTrace());
+//		}
+//		finally {
+//			DbUtils.closeQuietly(connection);
+//		}
+//	}
+//	
+//	private void updateSLA(Connection connection, SLA s, EncodingType encType) throws SLAManagerException {
+//
+//		logger.debug("Inserting new SLA into DB. " + s.toString());
+//		
+//		QueryRunner runner = new QueryRunner();
+//
+//		String json = JSONUtils.toJSON(s.optionToObject());
+//		byte[] data = null;
+//		try {
+//			byte[] stringData = json.getBytes("UTF-8");
+//			data = stringData;
+//	
+//			if (encType == EncodingType.GZIP) {
+//				data = GZIPUtils.gzipBytes(stringData);
+//			}
+//			logger.debug("NumChars: " + json.length() + " UTF-8:" + stringData.length + " Gzip:"+ data.length);
+//		}
+//		catch (IOException e) {
+//			throw new SLAManagerException("Error encoding the sla options.");
+//		}
+//		
+//		try {
+//			int inserts = runner.update(connection, INSERT_SLA, s.getExecId(), s.getJobName(), s.getCheckTime().getMillis(), s.getRule().getNumVal(), encType.getNumVal(), data);
+//
+//			if (inserts == 0) {
+//				throw new SLAManagerException("No SLA has been inserted. Insertion failed.");
+//			}
+//		} catch (SQLException e) {
+//			logger.error("Insert SLA failed.");
+//			throw new SLAManagerException("Insert sla " + s.toString() + " into db failed. " + e.getCause());
+//		}
+//		
+//	}
+	
+	public class SLAResultHandler implements ResultSetHandler<List<SLA>> {
+		@Override
+		public List<SLA> handle(ResultSet rs) throws SQLException {
+			if (!rs.next()) {
+				return Collections.<SLA>emptyList();
+			}
+			
+			ArrayList<SLA> SLAs = new ArrayList<SLA>();
+			
+			do {
+				int execId = rs.getInt(1);
+				String jobName = rs.getString(2);
+				DateTime checkTime = new DateTime(rs.getLong(3));
+				SlaRule rule = SlaRule.fromInteger(rs.getInt(4));
+				int encodingType = rs.getInt(5);
+				byte[] data = rs.getBytes(6);
+				
+				SLA s = null;
+
+				EncodingType encType = EncodingType.fromInteger(encodingType);
+				Object optsObj;
+				try {
+					// Convoluted way to inflate strings. Should find common package or helper function.
+					if (encType == EncodingType.GZIP) {
+						// Decompress the sucker.
+						String jsonString = GZIPUtils.unGzipString(data, "UTF-8");
+						optsObj = JSONUtils.parseJSONFromString(jsonString);
+					}
+					else {
+						String jsonString = new String(data, "UTF-8");
+						optsObj = JSONUtils.parseJSONFromString(jsonString);
+					}						
+					s = SLA.createSlaFromObject(execId, jobName, checkTime, rule, optsObj);
+				} catch (IOException e) {
+					throw new SQLException("Error reconstructing SLA options. " + execId + " " + jobName + " " + checkTime.toDateTimeISO() + e.getCause());
+				}
+				SLAs.add(s);
+
+			} while (rs.next());
+			
+			return SLAs;
+		}
+		
+	}
+	
+}
diff --git a/src/java/azkaban/sla/SLA.java b/src/java/azkaban/sla/SLA.java
new file mode 100644
index 0000000..eb08229
--- /dev/null
+++ b/src/java/azkaban/sla/SLA.java
@@ -0,0 +1,252 @@
+package azkaban.sla;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.joda.time.DateTime;
+import org.joda.time.ReadablePeriod;
+
+import azkaban.scheduler.Schedule;
+
+public class SLA {
+
+	public static enum SlaRule {
+		SUCCESS(1), FINISH(2), WAITANDCHECKJOB(3);
+
+		private int numVal;
+
+		SlaRule(int numVal) {
+			this.numVal = numVal;
+		}
+
+		public int getNumVal() {
+			return numVal;
+		}
+
+		public static SlaRule fromInteger(int x) {
+			switch (x) {
+			case 1:
+				return SUCCESS;
+			case 2:
+				return FINISH;
+			case 3:
+				return WAITANDCHECKJOB;
+			default:
+				return SUCCESS;
+			}
+		}
+	}
+	
+	public static enum SlaAction {
+		EMAIL(1), KILL(2);
+
+		private int numVal;
+
+		SlaAction(int numVal) {
+			this.numVal = numVal;
+		}
+
+		public int getNumVal() {
+			return numVal;
+		}
+
+		public static SlaAction fromInteger(int x) {
+			switch (x) {
+			case 1:
+				return EMAIL;
+			case 2:
+				return KILL;
+			default:
+				return EMAIL;
+			}
+		}
+	}
+	
+	public static class SlaSetting {
+		public String getId() {
+			return id;
+		}
+		public void setId(String id) {
+			this.id = id;
+		}
+		public ReadablePeriod getDuration() {
+			return duration;
+		}
+		public void setDuration(ReadablePeriod duration) {
+			this.duration = duration;
+		}
+		public SlaRule getRule() {
+			return rule;
+		}
+		public void setRule(SlaRule rule) {
+			this.rule = rule;
+		}
+		public List<SlaAction> getActions() {
+			return actions;
+		}
+		public void setActions(List<SlaAction> actions) {
+			this.actions = actions;
+		}
+		
+		public Object toObject() {
+			Map<String, Object> obj = new HashMap<String, Object>();
+			obj.put("id", id);
+			obj.put("duration", Schedule.createPeriodString(duration));
+//			List<String> rulesObj = new ArrayList<String>();
+//			for(SlaRule rule : rules) {
+//				rulesObj.add(rule.toString());
+//			}
+//			obj.put("rules", rulesObj);
+			obj.put("rule", rule.toString());
+			List<String> actionsObj = new ArrayList<String>();
+			for(SlaAction act : actions) {
+				actionsObj.add(act.toString());
+			}
+			obj.put("actions", actionsObj);
+			return obj;
+		}
+		
+		@SuppressWarnings("unchecked")
+		public static SlaSetting fromObject(Object obj) {
+			Map<String, Object> slaObj = (HashMap<String, Object>) obj;
+			String subId = (String) slaObj.get("id");
+			ReadablePeriod dur = Schedule.parsePeriodString((String) slaObj.get("duration"));
+//			List<String> rulesObj = (ArrayList<String>) slaObj.get("rules");
+//			List<SlaRule> slaRules = new ArrayList<SLA.SlaRule>();
+//			for(String rule : rulesObj) {
+//				slaRules.add(SlaRule.valueOf(rule));
+//			}
+			SlaRule slaRule = SlaRule.valueOf((String) slaObj.get("rule"));
+			List<String> actsObj = (ArrayList<String>) slaObj.get("actions");
+			List<SlaAction> slaActs = new ArrayList<SlaAction>();
+			for(String act : actsObj) {
+				slaActs.add(SlaAction.valueOf(act));
+			}
+			
+			SlaSetting ret = new SlaSetting();
+			ret.setId(subId);
+			ret.setDuration(dur);
+			ret.setRule(slaRule);
+			ret.setActions(slaActs);
+			return ret;
+		}
+		
+		private String id;
+		private ReadablePeriod duration;
+		private SlaRule rule = SlaRule.SUCCESS;
+		private List<SlaAction> actions;
+	}
+	
+	private int execId;
+	private String jobName;
+	private DateTime checkTime;
+	private List<String> emails;
+	private List<SlaAction> actions;
+	private List<SlaSetting> jobSettings;
+	private SlaRule rule;
+	
+	public SLA(
+			int execId,
+			String jobName,
+			DateTime checkTime,
+			List<String> emails,
+			List<SlaAction> slaActions,
+			List<SlaSetting> jobSettings,
+			SlaRule slaRule
+	) {
+		this.execId = execId;
+		this.jobName = jobName;
+		this.checkTime = checkTime;
+		this.emails = emails;
+		this.actions = slaActions;
+		this.jobSettings = jobSettings;
+		this.rule = slaRule;
+	}
+
+	public int getExecId() {
+		return execId;
+	}
+
+	public String getJobName() {
+		return jobName;
+	}
+
+	public DateTime getCheckTime() {
+		return checkTime;
+	}
+
+	public List<String> getEmails() {
+		return emails;
+	}
+
+	public List<SlaAction> getActions() {
+		return actions;
+	}
+	
+	public List<SlaSetting> getJobSettings() {
+		return jobSettings;
+	}
+
+	public SlaRule getRule() {
+		return rule;
+	}
+	
+	public String toString() {
+		return execId + " " + jobName + " to be checked at " + checkTime.toDateTimeISO();
+	}
+	
+	public Map<String,Object> optionToObject() {
+		HashMap<String, Object> slaObj = new HashMap<String, Object>();
+
+		slaObj.put("emails", emails);
+//		slaObj.put("rule", rule.toString());
+
+		List<String> actionsObj = new ArrayList<String>();
+		for(SlaAction act : actions) {
+			actionsObj.add(act.toString());
+		}
+		slaObj.put("actions", actionsObj);
+		
+		if(jobSettings != null && jobSettings.size() > 0) {
+			List<Object> settingsObj = new ArrayList<Object>();
+			for(SlaSetting set : jobSettings) {
+				settingsObj.add(set.toObject());
+			}
+			slaObj.put("jobSettings", settingsObj);
+		}
+		
+
+		return slaObj;
+	}
+
+	@SuppressWarnings("unchecked")
+	public static SLA createSlaFromObject(int execId, String jobName, DateTime checkTime, SlaRule rule, Object obj) {
+		
+		HashMap<String, Object> slaObj = (HashMap<String,Object>)obj;
+
+		List<String> emails = (List<String>)slaObj.get("emails");
+//		SlaRule rule = SlaRule.valueOf((String)slaObj.get("rule"));
+		List<String> actsObj = (ArrayList<String>) slaObj.get("actions");
+		List<SlaAction> slaActs = new ArrayList<SlaAction>();
+		for(String act : actsObj) {
+			slaActs.add(SlaAction.valueOf(act));
+		}
+		List<SlaSetting> jobSets = null;
+		if(slaObj.containsKey("jobSettings") && slaObj.get("jobSettings") != null) {
+			jobSets = new ArrayList<SLA.SlaSetting>();
+			for(Object set : (List<Object>)slaObj.get("jobSettings")) {
+				SlaSetting jobSet = SlaSetting.fromObject(set);
+				jobSets.add(jobSet);
+			}
+		}
+		
+		return new SLA(execId, jobName, checkTime, emails, slaActs, jobSets, rule);
+	}
+
+	public void setCheckTime(DateTime time) {
+		this.checkTime = time;
+	}
+
+}
diff --git a/src/java/azkaban/sla/SLALoader.java b/src/java/azkaban/sla/SLALoader.java
new file mode 100644
index 0000000..2c32b33
--- /dev/null
+++ b/src/java/azkaban/sla/SLALoader.java
@@ -0,0 +1,14 @@
+package azkaban.sla;
+
+import java.util.List;
+
+public interface SLALoader {
+
+	public void insertSLA(SLA s) throws SLAManagerException;
+	
+	public List<SLA> loadSLAs() throws SLAManagerException;
+	
+	public void removeSLA(SLA s) throws SLAManagerException;
+
+//	public void updateSLA(SLA s) throws SLAManagerException;
+}
diff --git a/src/java/azkaban/sla/SlaMailer.java b/src/java/azkaban/sla/SlaMailer.java
new file mode 100644
index 0000000..17a67d0
--- /dev/null
+++ b/src/java/azkaban/sla/SlaMailer.java
@@ -0,0 +1,228 @@
+package azkaban.sla;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.mail.MessagingException;
+
+import org.apache.log4j.Logger;
+
+import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutableFlow.FailureAction;
+import azkaban.executor.ExecutableFlow.Status;
+import azkaban.executor.ExecutableNode;
+import azkaban.sla.SLA;
+import azkaban.utils.EmailMessage;
+import azkaban.utils.Props;
+import azkaban.utils.Utils;
+
+public class SlaMailer {
+	private static Logger logger = Logger.getLogger(SlaMailer.class);
+	
+	private boolean testMode = false;
+	private String clientHostname;
+	private String clientPortNumber;
+	
+	private String mailHost;
+	private String mailUser;
+	private String mailPassword;
+	private String mailSender;
+	private String azkabanName;
+	
+	public SlaMailer(Props props) {
+		this.azkabanName = props.getString("azkaban.name", "azkaban");
+		this.mailHost = props.getString("mail.host", "localhost");
+		this.mailUser = props.getString("mail.user", "");
+		this.mailPassword = props.getString("mail.password", "");
+		this.mailSender = props.getString("mail.sender", "");
+		
+		this.clientHostname = props.getString("jetty.hostname", "localhost");
+		this.clientPortNumber = Utils.nonNull(props.getString("jetty.ssl.port"));
+		
+		testMode = props.getBoolean("test.mode", false);
+	}
+	
+	public void sendFirstErrorMessage(ExecutableFlow flow) {
+		List<String> emailList = flow.getFailureEmails();
+		int execId = flow.getExecutionId();
+		
+		if (emailList != null && !emailList.isEmpty()) {
+			EmailMessage message = new EmailMessage(mailHost, mailUser, mailPassword);
+			message.setFromAddress(mailSender);
+			message.addAllToAddress(emailList);
+			message.setMimeType("text/html");
+			message.setSubject("Flow '" + flow.getFlowId() + "' has failed on " + azkabanName);
+			
+			message.println("<h2 style=\"color:#FF0000\"> Execution '" + flow.getExecutionId() + "' of flow '" + flow.getFlowId() + "' has encountered a failure on " + azkabanName + "</h2>");
+			
+			if (flow.getFailureAction() == FailureAction.CANCEL_ALL) {
+				message.println("This flow is set to cancel all currently running jobs.");
+			}
+			else if (flow.getFailureAction() == FailureAction.FINISH_ALL_POSSIBLE){
+				message.println("This flow is set to complete all jobs that aren't blocked by the failure.");
+			}
+			else {
+				message.println("This flow is set to complete all currently running jobs before stopping.");
+			}
+			
+			message.println("<table>");
+			message.println("<tr><td>Start Time</td><td>" + flow.getStartTime() +"</td></tr>");
+			message.println("<tr><td>End Time</td><td>" + flow.getEndTime() +"</td></tr>");
+			message.println("<tr><td>Duration</td><td>" + Utils.formatDuration(flow.getStartTime(), flow.getEndTime()) +"</td></tr>");
+			message.println("</table>");
+			message.println("");
+			String executionUrl = "https://" + clientHostname + ":" + clientPortNumber + "/" + "executor?" + "execid=" + execId;
+			message.println("<a href='\"" + executionUrl + "\">" + flow.getFlowId() + " Execution Link</a>");
+			
+			message.println("");
+			message.println("<h3>Reason</h3>");
+			List<String> failedJobs = findFailedJobs(flow);
+			message.println("<ul>");
+			for (String jobId : failedJobs) {
+				message.println("<li><a href=\"" + executionUrl + "&job=" + jobId + "\">Failed job '" + jobId + "' Link</a></li>" );
+			}
+			
+			message.println("</ul>");
+			
+			if (!testMode) {
+				try {
+					message.sendEmail();
+				} catch (MessagingException e) {
+					logger.error("Email message send failed" , e);
+				}
+			}
+		}
+	}
+	
+	public void sendErrorEmail(ExecutableFlow flow, String ... extraReasons) {
+		List<String> emailList = flow.getFailureEmails();
+		int execId = flow.getExecutionId();
+		
+		if (emailList != null && !emailList.isEmpty()) {
+			EmailMessage message = new EmailMessage(mailHost, mailUser, mailPassword);
+			message.setFromAddress(mailSender);
+			message.addAllToAddress(emailList);
+			message.setMimeType("text/html");
+			message.setSubject("Flow '" + flow.getFlowId() + "' has failed on " + azkabanName);
+			
+			message.println("<h2 style=\"color:#FF0000\"> Execution '" + flow.getExecutionId() + "' of flow '" + flow.getFlowId() + "' has failed on " + azkabanName + "</h2>");
+			message.println("<table>");
+			message.println("<tr><td>Start Time</td><td>" + flow.getStartTime() +"</td></tr>");
+			message.println("<tr><td>End Time</td><td>" + flow.getEndTime() +"</td></tr>");
+			message.println("<tr><td>Duration</td><td>" + Utils.formatDuration(flow.getStartTime(), flow.getEndTime()) +"</td></tr>");
+			message.println("</table>");
+			message.println("");
+			String executionUrl = "https://" + clientHostname + ":" + clientPortNumber + "/" + "executor?" + "execid=" + execId;
+			message.println("<a href='\"" + executionUrl + "\">" + flow.getFlowId() + " Execution Link</a>");
+			
+			message.println("");
+			message.println("<h3>Reason</h3>");
+			List<String> failedJobs = findFailedJobs(flow);
+			message.println("<ul>");
+			for (String jobId : failedJobs) {
+				message.println("<li><a href=\"" + executionUrl + "&job=" + jobId + "\">Failed job '" + jobId + "' Link</a></li>" );
+			}
+			for (String reasons: extraReasons) {
+				message.println("<li>" + reasons + "</li>");
+			}
+			
+			message.println("</ul>");
+			
+			
+			
+			if (!testMode) {
+				try {
+					message.sendEmail();
+				} catch (MessagingException e) {
+					logger.error("Email message send failed" , e);
+				}
+			}
+		}
+	}
+
+	public void sendSuccessEmail(ExecutableFlow flow) {
+		List<String> emailList = flow.getSuccessEmails();
+
+		int execId = flow.getExecutionId();
+		
+		if (emailList != null && !emailList.isEmpty()) {
+			EmailMessage message = new EmailMessage(mailHost, mailUser, mailPassword);
+			message.setFromAddress(mailSender);
+			message.addAllToAddress(emailList);
+			message.setMimeType("text/html");
+			message.setSubject("Flow '" + flow.getFlowId() + "' has succeeded on " + azkabanName);
+			
+			message.println("<h2> Execution '" + flow.getExecutionId() + "' of flow '" + flow.getFlowId() + "' has succeeded on " + azkabanName + "</h2>");
+			message.println("<table>");
+			message.println("<tr><td>Start Time</td><td>" + flow.getStartTime() +"</td></tr>");
+			message.println("<tr><td>End Time</td><td>" + flow.getEndTime() +"</td></tr>");
+			message.println("<tr><td>Duration</td><td>" + Utils.formatDuration(flow.getStartTime(), flow.getEndTime()) +"</td></tr>");
+			message.println("</table>");
+			message.println("");
+			String executionUrl = "https://" + clientHostname + ":" + clientPortNumber + "/" + "executor?" + "execid=" + execId;
+			message.println("<a href=\"" + executionUrl + "\">" + flow.getFlowId() + " Execution Link</a>");
+			
+			if (!testMode) {
+				try {
+					message.sendEmail();
+				} catch (MessagingException e) {
+					logger.error("Email message send failed" , e);
+				}
+			}
+		}
+	}
+	
+	private List<String> findFailedJobs(ExecutableFlow flow) {
+		ArrayList<String> failedJobs = new ArrayList<String>();
+		for (ExecutableNode node: flow.getExecutableNodes()) {
+			if (node.getStatus() == Status.FAILED) {
+				failedJobs.add(node.getJobId());
+			}
+		}
+		
+		return failedJobs;
+	}
+
+	public void sendSlaEmail(SLA s, String ... extraReasons) {
+		List<String> emailList = s.getEmails();
+		
+		if (emailList != null && !emailList.isEmpty()) {
+			EmailMessage message = new EmailMessage(mailHost, mailUser, mailPassword);
+			message.setFromAddress(mailSender);
+			message.addAllToAddress(emailList);
+			message.setMimeType("text/html");
+			message.setSubject("SLA violation on " + azkabanName);
+			
+//			message.println("<h2 style=\"color:#FF0000\"> Execution '" + s.getExecId() + "' of flow '" + flow.getFlowId() + "' failed to meet SLA on " + azkabanName + "</h2>");
+//			message.println("<table>");
+//			message.println("<tr><td>Start Time</td><td>" + flow.getStartTime() +"</td></tr>");
+//			message.println("<tr><td>End Time</td><td>" + flow.getEndTime() +"</td></tr>");
+//			message.println("<tr><td>Duration</td><td>" + Utils.formatDuration(flow.getStartTime(), flow.getEndTime()) +"</td></tr>");
+//			message.println("</table>");
+//			message.println("");
+//			String executionUrl = "https://" + clientHostname + ":" + clientPortNumber + "/" + "executor?" + "execid=" + execId;
+//			message.println("<a href='\"" + executionUrl + "\">" + flow.getFlowId() + " Execution Link</a>");
+//			
+//			message.println("");
+//			message.println("<h3>Reason</h3>");
+//			List<String> failedJobs = findFailedJobs(flow);
+//			message.println("<ul>");
+//			for (String jobId : failedJobs) {
+//				message.println("<li><a href=\"" + executionUrl + "&job=" + jobId + "\">Failed job '" + jobId + "' Link</a></li>" );
+//			}
+			for (String reasons: extraReasons) {
+				message.println("<li>" + reasons + "</li>");
+			}
+			
+			message.println("</ul>");
+			
+			if (!testMode) {
+				try {
+					message.sendEmail();
+				} catch (MessagingException e) {
+					logger.error("Email message send failed" , e);
+				}
+			}
+		}
+	}
+}
\ No newline at end of file
diff --git a/src/java/azkaban/sla/SLAManager.java b/src/java/azkaban/sla/SLAManager.java
new file mode 100644
index 0000000..ae17595
--- /dev/null
+++ b/src/java/azkaban/sla/SLAManager.java
@@ -0,0 +1,467 @@
+package azkaban.sla;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.log4j.Logger;
+import org.apache.velocity.runtime.parser.node.GetExecutor;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.joda.time.ReadablePeriod;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+
+import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutableFlow.Status;
+import azkaban.executor.ExecutableNode;
+import azkaban.executor.ExecutorMailer;
+import azkaban.executor.ExecutorManager;
+import azkaban.executor.ExecutorManagerException;
+import azkaban.flow.Flow;
+import azkaban.project.Project;
+import azkaban.project.ProjectManager;
+import azkaban.sla.SLA.SlaAction;
+import azkaban.sla.SLA.SlaRule;
+import azkaban.sla.SLA.SlaSetting;
+import azkaban.user.User;
+import azkaban.utils.Pair;
+import azkaban.utils.Props;
+
+/*
+ * Copyright 2012 LinkedIn, Inc
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+
+
+/**
+ * The SLAManager stores and checks the SLA (Service Level Agreement). It uses a single thread
+ * instead and waits until correct check time for the flow, and individual jobs in the flow if their SLA is set. 
+ */
+public class SLAManager {
+	private static Logger logger = Logger.getLogger(SLAManager.class);
+
+	private SLALoader loader;
+
+	private final SLARunner runner;
+	private final SLAPreRunner prerunner;
+	private final ExecutorManager executorManager;
+	private SlaMailer mailer;
+
+	/**
+	 * Give the sla manager a loader class that will properly load the
+	 * sla.
+	 * 
+	 * @param loader
+	 * @throws SLAManagerException 
+	 */
+	public SLAManager(ExecutorManager executorManager,
+							SLALoader loader,
+							Props props) throws SLAManagerException 
+	{
+		this.executorManager = executorManager;
+		this.loader = loader;
+		this.mailer = new SlaMailer(props);
+		this.runner = new SLARunner();
+		this.prerunner = new SLAPreRunner();
+
+		List<SLA> SLAList = null;
+		try {
+			SLAList = loader.loadSLAs();
+		} catch (SLAManagerException e) {
+			// TODO Auto-generated catch block
+			throw e;
+		}
+
+		for (SLA sla : SLAList) {
+			runner.addRunnerSLA(sla);
+		}
+
+		this.runner.start();
+	}
+
+	/**
+	 * Shutdowns the sla thread. After shutdown, it may not be safe to use
+	 * it again.
+	 */
+	public void shutdown() {
+		this.runner.shutdown();
+		this.prerunner.shutdown();
+	}
+
+	/**
+	 * Removes the flow from the SLA if it exists.
+	 * 
+	 * @param id
+	 * @throws SLAManagerException 
+	 */
+	public void removeSLA(SLA s) throws SLAManagerException {
+		logger.info("Removing SLA " + s.toString());
+		runner.removeRunnerSLA(s);
+		loader.removeSLA(s);
+	}
+
+	public void submitSla(
+			int execId, 
+			String id,
+			DateTime checkTime, 
+			List<String> emails,
+			List<SlaAction> slaActions,
+			List<SlaSetting> jobSettings,
+			SlaRule slaRule
+			) throws SLAManagerException {
+		SLA s = new SLA(execId, id, checkTime, emails, slaActions, jobSettings, slaRule);
+		logger.info("Submitting SLA " + s.toString());
+		try {
+			loader.insertSLA(s);
+			runner.addRunnerSLA(s);
+		}
+		catch (SLAManagerException e) {
+			throw new SLAManagerException("Failed to add new SLA!" + e.getCause());
+		}
+	}
+	
+	/**
+	 * Thread that simply invokes the checking of flows when the SLA is
+	 * ready.
+	 * 
+	 */
+	public class SLARunner extends Thread {
+		private final PriorityBlockingQueue<SLA> SLAs;
+		private AtomicBoolean stillAlive = new AtomicBoolean(true);
+
+		// Five minute minimum intervals
+		private static final int TIMEOUT_MS = 60000;
+
+		public SLARunner() {
+			SLAs = new PriorityBlockingQueue<SLA>(1,new SLAComparator());
+		}
+
+		public void shutdown() {
+			logger.error("Shutting down SLA runner thread");
+			stillAlive.set(false);
+			this.interrupt();
+		}
+
+		/**
+		 * Return a list of flow with SLAs
+		 * 
+		 * @return
+		 */
+		protected synchronized List<SLA> getRunnerSLAs() {
+			return new ArrayList<SLA>(SLAs);
+		}
+
+		/**
+		 * Adds SLA into runner and then interrupts so it will update
+		 * its wait time.
+		 * 
+		 * @param flow
+		 */
+		public synchronized void addRunnerSLA(SLA s) {
+			logger.info("Adding " + s + " to SLA runner.");
+			SLAs.add(s);
+			this.interrupt();
+		}
+		
+		/**
+		 * Remove runner SLA. Does not interrupt.
+		 * 
+		 * @param flow
+		 * @throws SLAManagerException 
+		 */
+		public synchronized void removeRunnerSLA(SLA s) {
+			logger.info("Removing " + s + " from the SLA runner.");
+			SLAs.remove(s);
+		}
+
+		public void run() {
+			while (stillAlive.get()) {
+				synchronized (this) {
+					try {
+						// TODO clear up the exception handling
+						SLA s = SLAs.peek();
+
+						if (s == null) {
+							// If null, wake up every minute or so to see if
+							// there's something to do. Most likely there will not be.
+							try {
+								this.wait(TIMEOUT_MS);
+							} catch (InterruptedException e) {
+								// interruption should occur when items are added or removed from the queue.
+							}
+						} else {
+							// We've passed the flow execution time, so we will run.
+							if (!(new DateTime(s.getCheckTime())).isAfterNow()) {
+								// Run flow. The invocation of flows should be quick.
+								SLA runningSLA = SLAs.poll();
+								
+								logger.info("Checking sla " + runningSLA.toString() );
+								
+								int execId = s.getExecId();
+								ExecutableFlow exflow = executorManager.getExecutableFlow(execId);
+								
+								if(runningSLA.getJobName().equals("") && runningSLA.getRule().equals(SlaRule.WAITANDCHECKJOB)) {
+									// do the checking of potential jobsla submissions
+									List<SlaSetting> jobSettings = runningSLA.getJobSettings();
+									List<SlaSetting> removeSettings = new ArrayList<SLA.SlaSetting>();
+									for(SlaSetting set : jobSettings) {
+										ExecutableNode node = exflow.getExecutableNode(set.getId());
+										if(node.getStartTime() != -1 || executorManager.isFinished(exflow)) {
+											submitSla(execId, set.getId(), new DateTime(node.getStartTime()).plus(set.getDuration()), runningSLA.getEmails(), set.getActions(), null, set.getRule());
+											removeSettings.add(set);
+											logger.info("Job " + set.getId() + " already started, monitoring SLA.");
+										}
+									}
+									for(SlaSetting remove : removeSettings) {
+										jobSettings.remove(remove);
+									}
+									if(jobSettings.size() == 0) {
+										removeRunnerSLA(runningSLA);
+										loader.removeSLA(runningSLA);
+									}
+									else {
+										removeRunnerSLA(runningSLA);
+										loader.removeSLA(runningSLA);
+										runningSLA.setCheckTime(runningSLA.getCheckTime().plusMillis(TIMEOUT_MS));
+										addRunnerSLA(runningSLA);
+										loader.insertSLA(runningSLA);
+									}
+								}
+								else {
+									if(!metSla(runningSLA, exflow)) {
+										takeSLAActions(runningSLA, exflow);
+									}
+
+
+									removeRunnerSLA(runningSLA);
+									loader.removeSLA(runningSLA);
+								}
+							} else {
+								// wait until flow run
+								long millisWait = Math.max(0, s.getCheckTime().getMillis() - (new DateTime()).getMillis());
+								try {
+									this.wait(Math.min(millisWait, TIMEOUT_MS));
+								} catch (InterruptedException e) {
+									// interruption should occur when items are
+									// added or removed from the queue.
+								}
+							}
+						}
+					} catch (Exception e) {
+						logger.error("Unexpected exception has been thrown in scheduler", e);
+					} catch (Throwable e) {
+						logger.error("Unexpected throwable has been thrown in scheduler", e);
+					}
+				}
+			}
+		}
+
+		private boolean metSla(SLA s, ExecutableFlow exflow) {
+			SlaRule rule = s.getRule();
+			long finishTime;
+			Status status;
+			if(s.getJobName().equals("")) {
+				finishTime = exflow.getEndTime();
+				status = exflow.getStatus();
+			}
+			else {
+				ExecutableNode exnode = exflow.getExecutableNode(s.getJobName());
+				finishTime = exnode.getEndTime();
+				status = exnode.getStatus();
+			}
+			
+			switch(rule) {
+				case FINISH:	// check finish time
+					return finishTime != -1 && finishTime < s.getCheckTime().getMillis();
+				case SUCCESS: 	// check finish and successful
+					return status == Status.SUCCEEDED && finishTime < s.getCheckTime().getMillis();
+				default: 
+					logger.error("Unknown SLA rules!");
+					return false;
+			}
+		}
+
+		/**
+		 * Class to sort the sla based on time.
+		 * 
+		 */
+		private class SLAComparator implements Comparator<SLA> {
+			@Override
+			public int compare(SLA arg0, SLA arg1) {
+				long first = arg1.getCheckTime().getMillis();
+				long second = arg0.getCheckTime().getMillis();
+
+				if (first == second) {
+					return 0;
+				} else if (first < second) {
+					return 1;
+				}
+
+				return -1;
+			}
+		}
+	}
+
+	private void takeSLAActions(SLA s, ExecutableFlow exflow) {
+		logger.info("SLA " + s.toString() + " missed! Taking predefined actions");
+		List<SlaAction> actions = s.getActions();
+		for(SlaAction act : actions) {
+			if(act.equals(SlaAction.EMAIL)) {
+				try {
+					sendSlaAlertEmail(s, exflow);
+				}
+				catch (Exception e) {
+					logger.error("Failed to send out SLA alert email. " + e.getCause());
+				}
+			} else if(act.equals(SlaAction.KILL)) {
+				try {
+					executorManager.cancelFlow(exflow, "azkaban");
+					//sendSlaKillEmail(s, exflow);
+				} catch (ExecutorManagerException e) {
+					// TODO Auto-generated catch block
+					logger.error("Cancel flow failed." + e.getCause());
+				}
+			}
+		}
+	}
+	
+	private void sendSlaAlertEmail(SLA s, ExecutableFlow exflow) {
+		String message = null;
+		ExecutableNode exnode;
+		switch(s.getRule()) {
+			case FINISH:
+				if(s.getJobName().equals("")) {
+					message = "Flow " + exflow.getFlowId() + " failed to finish with set SLA." + String.format("%n");
+					message += "Flow started at " + new DateTime(exflow.getStartTime()).toDateTimeISO() + String.format("%n");
+					message += "Flow status at " + s.getCheckTime().toDateTimeISO() + " is " + exflow.getStatus();
+				}
+				else {
+					exnode = exflow.getExecutableNode(s.getJobName());
+					message = "Job " + s.getJobName() + " of flow " + exflow.getFlowId() + " failed to finish with set SLA." + String.format("%n");
+					message += "Job started at " + new DateTime(exnode.getStartTime()).toDateTimeISO() + String.format("%n");
+					message += "Job status at " + s.getCheckTime().toDateTimeISO() + " is " + exnode.getStatus();
+				}
+				break;
+			case SUCCESS:
+				if(s.getJobName().equals("")) {
+					message = "Flow " + exflow.getFlowId() + " didn't finish successfully with set SLA. " + String.format("%n");
+					message += "Flow started at " + new DateTime(exflow.getStartTime()).toDateTimeISO() + String.format("  %n");
+					message += "Flow status at " + s.getCheckTime().toDateTimeISO() + " is " + exflow.getStatus();
+				}
+				else {
+					exnode = exflow.getExecutableNode(s.getJobName());
+					message = "Job " + s.getJobName() + " of flow " + exflow.getFlowId() + " didn't finish successfully with set SLA." + String.format("%n"); 
+					message += "Job started at " + new DateTime(exnode.getStartTime()).toDateTimeISO() + String.format("%n");
+					message += "Job status at " + s.getCheckTime().toDateTimeISO() + " is " + exnode.getStatus();
+				}
+				break;
+			default:
+				logger.error("Unknown SLA rules!");
+				message = "Unknown SLA was not met!";
+				break;
+		}
+		mailer.sendSlaEmail(s, message);
+	}
+	
+	public class SLAPreRunner extends Thread {
+		private final List<SLA> preSlas;
+		private AtomicBoolean stillAlive = new AtomicBoolean(true);
+
+		// Five minute minimum intervals
+		private static final int TIMEOUT_MS = 300000;
+
+		public SLAPreRunner() {
+			preSlas = new ArrayList<SLA>();
+		}
+
+		public void shutdown() {
+			logger.error("Shutting down pre-sla checker thread");
+			stillAlive.set(false);
+			this.interrupt();
+		}
+
+		/**
+		 * Return a list of flow with SLAs
+		 * 
+		 * @return
+		 */
+		protected synchronized List<SLA> getPreSlas() {
+			return new ArrayList<SLA>(preSlas);
+		}
+
+		/**
+		 * Adds SLA into runner and then interrupts so it will update
+		 * its wait time.
+		 * 
+		 * @param flow
+		 */
+		public synchronized void addCheckerPreSla(SLA s) {
+			logger.info("Adding " + s + " to pre-sla checker.");
+			preSlas.add(s);
+			this.interrupt();
+		}
+		
+		/**
+		 * Remove runner SLA. Does not interrupt.
+		 * 
+		 * @param flow
+		 * @throws SLAManagerException 
+		 */
+		public synchronized void removeCheckerPreSla(SLA s) {
+			logger.info("Removing " + s + " from the pre-sla checker.");
+			preSlas.remove(s);
+		}
+
+		public void run() {
+			while (stillAlive.get()) {
+				synchronized (this) {
+					try {
+						// TODO clear up the exception handling
+
+						if (preSlas.size() == 0) {
+							try {
+								this.wait(TIMEOUT_MS);
+							} catch (InterruptedException e) {
+								// interruption should occur when items are added or removed from the queue.
+							}
+						} else {
+							for(SLA s : preSlas) {
+								ExecutableFlow exflow = executorManager.getExecutableFlow(s.getExecId());
+								String id = s.getJobName();
+								if(!s.equals("")) {
+									ExecutableNode exnode = exflow.getExecutableNode(id);
+									if(exnode.getStartTime() != -1) {
+										
+									}
+								}
+							}
+						}
+					} catch (Exception e) {
+						logger.error("Unexpected exception has been thrown in scheduler", e);
+					} catch (Throwable e) {
+						logger.error("Unexpected throwable has been thrown in scheduler", e);
+					}
+				}
+			}
+		}
+	}
+
+	
+}
diff --git a/src/java/azkaban/sla/SLAManagerException.java b/src/java/azkaban/sla/SLAManagerException.java
new file mode 100644
index 0000000..a3468a2
--- /dev/null
+++ b/src/java/azkaban/sla/SLAManagerException.java
@@ -0,0 +1,13 @@
+package azkaban.sla;
+
+public class SLAManagerException extends Exception{
+	private static final long serialVersionUID = 1L;
+
+	public SLAManagerException(String message) {
+		super(message);
+	}
+	
+	public SLAManagerException(String message, Throwable cause) {
+		super(message, cause);
+	}
+}
diff --git a/src/java/azkaban/utils/EmailMessage.java b/src/java/azkaban/utils/EmailMessage.java
index 1ac7607..b510d6c 100644
--- a/src/java/azkaban/utils/EmailMessage.java
+++ b/src/java/azkaban/utils/EmailMessage.java
@@ -21,7 +21,10 @@ import javax.mail.internet.MimeBodyPart;
 import javax.mail.internet.MimeMessage;
 import javax.mail.internet.MimeMultipart;
 
+import com.sun.mail.smtp.SMTPTransport;
+
 public class EmailMessage {
+	private static String protocol = "smtp";
 	private List<String> _toAddress = new ArrayList<String>();
 	private String _mailHost;
 	private String _mailUser;
@@ -129,12 +132,13 @@ public class EmailMessage {
 	public void sendEmail() throws MessagingException {
 		checkSettings();
 		Properties props = new Properties();
-		props.setProperty("mail.transport.protocol", "smtp");
-		props.put("mail.host", _mailHost);
+//		props.setProperty("mail.transport.protocol", "smtp");
+		props.put("mail."+protocol+".host", _mailHost);
+		props.put("mail."+protocol+".auth", "true");
 		props.put("mail.user", _mailUser);
 		props.put("mail.password", _mailPassword);
 
-		Session session = Session.getDefaultInstance(props);
+		Session session = Session.getInstance(props, null);
 		Message message = new MimeMessage(session);
 		InternetAddress from = new InternetAddress(_fromAddress, false);
 		message.setFrom(from);
@@ -160,11 +164,13 @@ public class EmailMessage {
 			message.setContent(_body.toString(), _mimeType);
 		}
 
-		Transport transport = session.getTransport();
-		transport.connect();
-		transport.sendMessage(message,
+//		Transport transport = session.getTransport();
+		
+		SMTPTransport t = (SMTPTransport) session.getTransport(protocol);
+		t.connect(_mailHost, _mailUser, _mailPassword);
+		t.sendMessage(message,
 				message.getRecipients(Message.RecipientType.TO));
-		transport.close();
+		t.close();
 	}
 
 	public void setBody(String body) {
diff --git a/src/java/azkaban/webapp/AzkabanWebServer.java b/src/java/azkaban/webapp/AzkabanWebServer.java
index 430af3d..e3cceae 100644
--- a/src/java/azkaban/webapp/AzkabanWebServer.java
+++ b/src/java/azkaban/webapp/AzkabanWebServer.java
@@ -51,6 +51,9 @@ import azkaban.project.ProjectManager;
 
 import azkaban.scheduler.JdbcScheduleLoader;
 import azkaban.scheduler.ScheduleManager;
+import azkaban.sla.JdbcSLALoader;
+import azkaban.sla.SLAManager;
+import azkaban.sla.SLAManagerException;
 import azkaban.user.UserManager;
 import azkaban.user.XmlUserManager;
 import azkaban.utils.FileIOUtils;
@@ -118,6 +121,7 @@ public class AzkabanWebServer implements AzkabanServer {
 	private ProjectManager projectManager;
 	private ExecutorManager executorManager;
 	private ScheduleManager scheduleManager;
+	private SLAManager slaManager;
 
 	private final ClassLoader baseClassLoader;
 	
@@ -144,7 +148,8 @@ public class AzkabanWebServer implements AzkabanServer {
 		userManager = loadUserManager(props);
 		projectManager = loadProjectManager(props);
 		executorManager = loadExecutorManager(props);
-		scheduleManager = loadScheduleManager(executorManager, props);
+		slaManager = loadSLAManager(props);
+		scheduleManager = loadScheduleManager(executorManager, slaManager, props);
 		baseClassLoader = getBaseClassloader();
 		
 		tempDir = new File(props.getString("azkaban.temp.dir", "temp"));
@@ -159,6 +164,8 @@ public class AzkabanWebServer implements AzkabanServer {
 		}
 	}
 	
+	
+
 	private void setViewerPlugins(List<ViewerPlugin> viewerPlugins) {
 		this.viewerPlugins = viewerPlugins;
 	}
@@ -202,12 +209,17 @@ public class AzkabanWebServer implements AzkabanServer {
 		return execManager;
 	}
 
-	private ScheduleManager loadScheduleManager(ExecutorManager execManager, Props props ) throws Exception {
-		ScheduleManager schedManager = new ScheduleManager(execManager, projectManager, new JdbcScheduleLoader(props));
+	private ScheduleManager loadScheduleManager(ExecutorManager execManager, SLAManager slaManager, Props props ) throws Exception {
+		ScheduleManager schedManager = new ScheduleManager(execManager, projectManager, slaManager, new JdbcScheduleLoader(props));
 
 		return schedManager;
 	}
 
+	private SLAManager loadSLAManager(Props props) throws SLAManagerException {
+		SLAManager slaManager = new SLAManager(executorManager, new JdbcSLALoader(props), props);
+		return slaManager;
+	}
+	
 	/**
 	 * Returns the web session cache.
 	 * 
@@ -249,6 +261,10 @@ public class AzkabanWebServer implements AzkabanServer {
 		return executorManager;
 	}
 	
+	public SLAManager getSLAManager() {
+		return slaManager;
+	}
+	
 	public ScheduleManager getScheduleManager() {
 		return scheduleManager;
 	}
@@ -654,4 +670,6 @@ public class AzkabanWebServer implements AzkabanServer {
 		
 		return props;
 	}
+
+	
 }
diff --git a/src/java/azkaban/webapp/servlet/ScheduleServlet.java b/src/java/azkaban/webapp/servlet/ScheduleServlet.java
index fd17899..b5da554 100644
--- a/src/java/azkaban/webapp/servlet/ScheduleServlet.java
+++ b/src/java/azkaban/webapp/servlet/ScheduleServlet.java
@@ -17,6 +17,8 @@
 package azkaban.webapp.servlet;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -25,15 +27,24 @@ import javax.servlet.ServletConfig;
 import javax.servlet.ServletException;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
+import javax.swing.text.StyledEditorKit.BoldAction;
 
 import org.apache.log4j.Logger;
 import org.joda.time.DateTime;
 import org.joda.time.DateTimeZone;
+import org.joda.time.Hours;
 import org.joda.time.LocalDateTime;
+import org.joda.time.Minutes;
 import org.joda.time.ReadablePeriod;
 import org.joda.time.format.DateTimeFormat;
 
+import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutableNode;
+import azkaban.executor.ExecutorManagerException;
+import azkaban.executor.ExecutableFlow.FailureAction;
+import azkaban.executor.ExecutableFlow.Status;
 import azkaban.flow.Flow;
+import azkaban.flow.Node;
 import azkaban.project.Project;
 import azkaban.project.ProjectManager;
 import azkaban.project.ProjectLogEvent.EventType;
@@ -46,13 +57,21 @@ import azkaban.utils.Pair;
 import azkaban.webapp.AzkabanWebServer;
 import azkaban.webapp.session.Session;
 import azkaban.scheduler.Schedule;
+import azkaban.scheduler.Schedule.FlowOptions;
+import azkaban.scheduler.Schedule.SlaOptions;
 import azkaban.scheduler.ScheduleManager;
+import azkaban.sla.SLA;
+import azkaban.sla.SLA.SlaRule;
+import azkaban.sla.SLAManager;
+import azkaban.sla.SLA.SlaAction;
+import azkaban.sla.SLA.SlaSetting;
 
 public class ScheduleServlet extends LoginAbstractAzkabanServlet {
 	private static final long serialVersionUID = 1L;
 	private static final Logger logger = Logger.getLogger(ScheduleServlet.class);
 	private ProjectManager projectManager;
 	private ScheduleManager scheduleManager;
+	private SLAManager slaManager;
 	private UserManager userManager;
 
 	@Override
@@ -62,51 +81,252 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
 		projectManager = server.getProjectManager();
 		scheduleManager = server.getScheduleManager();
 		userManager = server.getUserManager();
+		slaManager = server.getSLAManager();
 	}
 	
 	@Override
 	protected void handleGet(HttpServletRequest req, HttpServletResponse resp,
 			Session session) throws ServletException, IOException {
+		if (hasParam(req, "ajax")) {
+			handleAJAXAction(req, resp, session);
+		}
+		else {
+			handleGetAllSchedules(req, resp, session);
+		}
+	}
+	
+	private void handleAJAXAction(HttpServletRequest req, HttpServletResponse resp, Session session) throws ServletException, IOException {
+		HashMap<String, Object> ret = new HashMap<String, Object>();
+		String ajaxName = getParam(req, "ajax");
+		
+		if (ajaxName.equals("slaInfo")) {
+			ajaxSlaInfo(req, ret, session.getUser());
+		}
+		else if(ajaxName.equals("setSla")) {
+			ajaxSetSla(req, ret, session.getUser());
+		}
+		else if(ajaxName.equals("advSchedule")) {
+			ajaxAdvSchedule(req, ret, session.getUser());
+		}
+
+		if (ret != null) {
+			this.writeJSON(resp, ret);
+		}
+	}
+
+	private void ajaxSetSla(HttpServletRequest req, HashMap<String, Object> ret, User user) {
+		try {
+			
+			int projectId = getIntParam(req, "projectId");
+			String flowName = getParam(req, "flowName");
+			
+			Project project = projectManager.getProject(projectId);
+			if(!hasPermission(project, user, Permission.Type.SCHEDULE)) {
+				ret.put("error", "User " + user + " does not have permission to set SLA for this flow.");
+				return;
+			}
+			
+			Schedule sched = scheduleManager.getSchedule(new Pair<Integer, String>(projectId, flowName));
+			
+			SlaOptions slaOptions= new SlaOptions();
+			
+			String slaEmails = getParam(req, "slaEmails");
+			System.out.println(slaEmails); 
+			String[] emailSplit = slaEmails.split("\\s*,\\s*|\\s*;\\s*|\\s+");
+			
+			Map<String, String> settings = getParamGroup(req, "settings");
+			System.out.println(settings);
+			List<SlaSetting> slaSettings = new ArrayList<SlaSetting>();
+			for(String set : settings.keySet()) {
+				SlaSetting s;
+				try {
+				s = parseSlaSetting(settings.get(set));
+				}
+				catch (Exception e) {
+					throw new ServletException(e);
+				}
+				if(s != null) {
+					slaSettings.add(s);
+				}
+			}
+			
+			if(slaSettings.size() > 0) {
+				slaOptions.setSlaEmails(Arrays.asList(emailSplit));
+				slaOptions.setSettings(slaSettings);
+			}
+			else {
+				slaOptions = null;
+			}
+			sched.setSlaOptions(slaOptions);
+			scheduleManager.insertSchedule(sched);
+
+		} catch (ServletException e) {
+			ret.put("error", e);
+		}
+		
+	}
+
+	
+	private SlaSetting parseSlaSetting(String set) {
+		// "" + Duration + EmailAction + KillAction
+		String[] parts = set.split(",", -1);
+		String id = parts[0];
+		String rule = parts[1];
+		String duration = parts[2];
+		String emailAction = parts[3];
+		String killAction = parts[4];
+		if(emailAction.equals("on") || killAction.equals("on")) {
+			SlaSetting r = new SlaSetting();			
+			r.setId(id);
+			r.setRule(SlaRule.valueOf(rule));
+			ReadablePeriod dur = parseDuration(duration);
+			r.setDuration(dur);
+			List<SlaAction> actions = new ArrayList<SLA.SlaAction>();
+			if(emailAction.equals("on")) {
+				actions.add(SlaAction.EMAIL);
+			}
+			if(killAction.equals("on")) {
+				actions.add(SlaAction.KILL);
+			}
+			r.setActions(actions);
+			return r;
+		}
+		return null;
+	}
+
+	private ReadablePeriod parseDuration(String duration) {
+		int hour = Integer.parseInt(duration.split(":")[0]);
+		int min = Integer.parseInt(duration.split(":")[1]);
+		return Minutes.minutes(min+hour*60).toPeriod();
+	}
+
+	@SuppressWarnings("unchecked")
+	private void ajaxSlaInfo(HttpServletRequest req, HashMap<String, Object> ret, User user) {
+		int projId;
+		String flowName;
+		try {
+			projId = getIntParam(req, "projId");
+			flowName = getParam(req, "flowName");
+			
+			Project project = getProjectAjaxByPermission(ret, projId, user, Type.READ);
+			if (project == null) {
+				ret.put("error", "Error loading project. Project " + projId + " doesn't exist");
+				return;
+			}
+			
+			Flow flow = project.getFlow(flowName);
+			if (flow == null) {
+				ret.put("error", "Error loading flow. Flow " + flowName + " doesn't exist in " + projId);
+				return;
+			}
+			
+			Schedule sched = scheduleManager.getSchedule(new Pair<Integer, String>(projId, flowName));
+			
+			SlaOptions slaOptions = sched.getSlaOptions();
+			FlowOptions flowOptions = sched.getFlowOptions();
+			
+			if(slaOptions != null) {
+				ret.put("slaEmails", slaOptions.getSlaEmails());
+				List<SlaSetting> settings = slaOptions.getSettings();
+				List<Object> setObj = new ArrayList<Object>();
+				for(SlaSetting set: settings) {
+					setObj.add(set.toObject());
+				}
+				ret.put("settings", setObj);
+			}
+			else if (flowOptions != null) {
+				if(flowOptions.getFailureEmails() != null) {
+					List<String> emails = flowOptions.getFailureEmails();
+					if(emails.size() > 0) {
+						ret.put("slaEmails", emails);
+					}
+				}
+			}
+			else {
+				if(flow.getFailureEmails() != null) {
+					List<String> emails = flow.getFailureEmails();
+					if(emails.size() > 0) {
+						ret.put("slaEmails", emails);
+					}
+				}
+			}
+			
+			List<String> disabledJobs;
+			if(flowOptions != null) {
+				disabledJobs = flowOptions.getDisabledJobs() == null ? new ArrayList<String>() : flowOptions.getDisabledJobs();
+			}
+			else {
+				disabledJobs = new ArrayList<String>();
+			}
+				
+			List<String> allJobs = new ArrayList<String>();
+			for(Node n : flow.getNodes()) {
+				if(!disabledJobs.contains(n.getId())) {
+					allJobs.add(n.getId());
+				}
+			}
+			ret.put("allJobNames", allJobs);
+		} catch (ServletException e) {
+			ret.put("error", e);
+		}
+		
+	}
+
+	protected Project getProjectAjaxByPermission(Map<String, Object> ret, int projectId, User user, Permission.Type type) {
+		Project project = projectManager.getProject(projectId);
+		
+		if (project == null) {
+			ret.put("error", "Project '" + project + "' not found.");
+		}
+		else if (!hasPermission(project, user, type)) {
+			ret.put("error", "User '" + user.getUserId() + "' doesn't have " + type.name() + " permissions on " + project.getName());
+		}
+		else {
+			return project;
+		}
+		
+		return null;
+	}
+	
+	private void handleGetAllSchedules(HttpServletRequest req, HttpServletResponse resp,
+			Session session) throws ServletException, IOException{
+		
 		Page page = newPage(req, resp, session, "azkaban/webapp/servlet/velocity/scheduledflowpage.vm");
 		
 		List<Schedule> schedules = scheduleManager.getSchedules();
 		page.add("schedules", schedules);
+//		
+//		List<SLA> slas = slaManager.getSLAs();
+//		page.add("slas", slas);
 
 		page.render();
 	}
 	
 	@Override
 	protected void handlePost(HttpServletRequest req, HttpServletResponse resp, Session session) throws ServletException, IOException {
-		HashMap<String, Object> ret = new HashMap<String, Object>();
-		if (hasParam(req, "action")) {
-			String action = getParam(req, "action");
-			if (action.equals("scheduleFlow")) {
-				ajaxScheduleFlow(req, ret, session.getUser());
-			}
-			else if(action.equals("removeSched")){
-				ajaxRemoveSched(req, ret, session.getUser());
+		if (hasParam(req, "ajax")) {
+			handleAJAXAction(req, resp, session);
+		}
+		else {
+			HashMap<String, Object> ret = new HashMap<String, Object>();
+			if (hasParam(req, "action")) {
+				String action = getParam(req, "action");
+				if (action.equals("scheduleFlow")) {
+					ajaxScheduleFlow(req, ret, session.getUser());
+				}
+				else if(action.equals("removeSched")){
+					ajaxRemoveSched(req, ret, session.getUser());
+				}
 			}
+			
+			if(ret.get("status") == ("success"))
+				setSuccessMessageInCookie(resp, (String) ret.get("message"));
+			else
+				setErrorMessageInCookie(resp, (String) ret.get("message"));
+			
+			this.writeJSON(resp, ret);
 		}
-		
-		if(ret.get("status") == ("success"))
-			setSuccessMessageInCookie(resp, (String) ret.get("message"));
-		else
-			setErrorMessageInCookie(resp, (String) ret.get("message"));
-		
-		this.writeJSON(resp, ret);
 	}
-
-//	private void handleAJAXAction(HttpServletRequest req, HttpServletResponse resp, Session session) throws ServletException, IOException {
-//		HashMap<String, Object> ret = new HashMap<String, Object>();
-//		String ajaxName = getParam(req, "ajax");
-//		
-//		if (ajaxName.equals("scheduleFlow")) {
-//				ajaxScheduleFlow(req, ret, session.getUser());
-//		}
-////		}
-//		this.writeJSON(resp, ret);
-//	}
-//	
 	
 	private void ajaxRemoveSched(HttpServletRequest req, Map<String, Object> ret, User user) throws ServletException{
 		int projectId = getIntParam(req, "projectId");
@@ -197,40 +417,170 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
 		    hour += 12;
 		hour %= 24;
 
-//		String submitUser = user.getUserId();
-//		String userExec = userSubmit;//getParam(req, "userExec");
-//		String scheduleId = projectId + "." + flowName;
 		DateTime submitTime = new DateTime();
 		DateTime firstSchedTime = day.withHourOfDay(hour).withMinuteOfHour(minutes).withSecondOfMinute(0);
 		
-		//ScheduledFlow schedFlow = scheduleManager.schedule(scheduleId, projectId, flowId, userExec, userSubmit, submitTime, firstSchedTime, thePeriod);
-		//project.info("User '" + user.getUserId() + "' has scheduled " + flow.getId() + "[" + schedFlow.toNiceString() + "].");
-		Schedule schedule = scheduleManager.scheduleFlow(projectId, projectName, flowName, "ready", firstSchedTime.getMillis(), timezone, thePeriod, submitTime.getMillis(), firstSchedTime.getMillis(), firstSchedTime.getMillis(), user.getUserId());
+		Schedule sched = scheduleManager.getSchedule(new Pair<Integer, String>(projectId, flowName));
+		FlowOptions flowOptions = null;
+		SlaOptions slaOptions = null;
+		if(sched != null) {
+			if(sched.getFlowOptions() != null) {
+				flowOptions = sched.getFlowOptions();
+			}
+			if(sched.getSlaOptions() != null) {
+				slaOptions = sched.getSlaOptions();
+			}
+		}
+		Schedule schedule = scheduleManager.scheduleFlow(projectId, projectName, flowName, "ready", firstSchedTime.getMillis(), timezone, thePeriod, submitTime.getMillis(), firstSchedTime.getMillis(), firstSchedTime.getMillis(), user.getUserId(), flowOptions, slaOptions);
+		logger.info("User '" + user.getUserId() + "' has scheduled " + "[" + projectName + flowName +  " (" + projectId +")" + "].");
+		projectManager.postProjectEvent(project, EventType.SCHEDULE, user.getUserId(), "Schedule " + schedule.getScheduleName() + " has been added.");
+		
+		ret.put("status", "success");
+		ret.put("message", projectName + "." + flowName + " scheduled.");
+	}
+
+	private void ajaxAdvSchedule(HttpServletRequest req, HashMap<String, Object> ret, User user) throws ServletException {
+		String projectName = getParam(req, "projectName");
+		String flowName = getParam(req, "flowName");
+		int projectId = getIntParam(req, "projectId");
+		
+		Project project = projectManager.getProject(projectId);
+			
+		if (project == null) {
+			ret.put("message", "Project " + projectName + " does not exist");
+			ret.put("status", "error");
+			return;
+		}
+		
+		if (!hasPermission(project, user, Type.SCHEDULE)) {
+			ret.put("status", "error");
+			ret.put("message", "Permission denied. Cannot execute " + flowName);
+			return;
+		}
+
+		Flow flow = project.getFlow(flowName);
+		if (flow == null) {
+			ret.put("status", "error");
+			ret.put("message", "Flow " + flowName + " cannot be found in project " + project);
+			return;
+		}
+		
+		String scheduleTime = getParam(req, "scheduleTime");
+		String scheduleDate = getParam(req, "scheduleDate");
+		DateTime firstSchedTime;
+		try {
+			firstSchedTime = parseDateTime(scheduleDate, scheduleTime);
+		}
+		catch (Exception e) {
+			ret.put("error", "Invalid date and/or time '" + scheduleDate + " " + scheduleTime);
+	      	return;
+		}
+
+		ReadablePeriod thePeriod = null;
+		try {
+			if(hasParam(req, "is_recurring") && getParam(req, "is_recurring").equals("on")) {
+			    thePeriod = Schedule.parsePeriodString(getParam(req, "period"));	
+			}
+		}
+		catch(Exception e){
+			ret.put("error", e.getMessage());
+		}
+		
+		Schedule sched = scheduleManager.getSchedule(new Pair<Integer, String>(projectId, flowName));
+		FlowOptions flowOptions = null;
+		try {
+			flowOptions = parseFlowOptions(req);
+		}
+		catch (Exception e) {
+			ret.put("error", e.getMessage());
+		}
+		SlaOptions slaOptions = null;
+		if(sched != null) {
+			if(sched.getSlaOptions() != null) {
+				slaOptions = sched.getSlaOptions();
+			}
+		}
+		Schedule schedule = scheduleManager.scheduleFlow(projectId, projectName, flowName, "ready", firstSchedTime.getMillis(), firstSchedTime.getZone(), thePeriod, DateTime.now().getMillis(), firstSchedTime.getMillis(), firstSchedTime.getMillis(), user.getUserId(), flowOptions, slaOptions);
 		logger.info("User '" + user.getUserId() + "' has scheduled " + "[" + projectName + flowName +  " (" + projectId +")" + "].");
 		projectManager.postProjectEvent(project, EventType.SCHEDULE, user.getUserId(), "Schedule " + schedule.getScheduleName() + " has been added.");
 		
 		ret.put("status", "success");
 		ret.put("message", projectName + "." + flowName + " scheduled.");
 	}
-				
-//	private ReadablePeriod parsePeriod(HttpServletRequest req) throws ServletException {
-//			int period = getIntParam(req, "period");
-//			String periodUnits = getParam(req, "period_units");
-//			if("M".equals(periodUnits))
-//				return Months.months(period);
-//			else if("w".equals(periodUnits))
-//				return Weeks.weeks(period);
-//			else if("d".equals(periodUnits))
-//				return Days.days(period);
-//			else if("h".equals(periodUnits))
-//				return Hours.hours(period);
-//			else if("m".equals(periodUnits))
-//				return Minutes.minutes(period);
-//			else if("s".equals(periodUnits))
-//				return Seconds.seconds(period);
-//			else
-//				throw new ServletException("Unknown period unit: " + periodUnits);
-//	}
+	
+	private FlowOptions parseFlowOptions(HttpServletRequest req) throws ServletException {
+		FlowOptions flowOptions = new FlowOptions();
+		if (hasParam(req, "failureAction")) {
+			String option = getParam(req, "failureAction");
+			if (option.equals("finishCurrent") ) {
+				flowOptions.setFailureAction(FailureAction.FINISH_CURRENTLY_RUNNING);
+			}
+			else if (option.equals("cancelImmediately")) {
+				flowOptions.setFailureAction(FailureAction.CANCEL_ALL);
+			}
+			else if (option.equals("finishPossible")) {
+				flowOptions.setFailureAction(FailureAction.FINISH_ALL_POSSIBLE);
+			}
+		}
+
+		if (hasParam(req, "failureEmails")) {
+			String emails = getParam(req, "failureEmails");
+			String[] emailSplit = emails.split("\\s*,\\s*|\\s*;\\s*|\\s+");
+			flowOptions.setFailureEmails(Arrays.asList(emailSplit));
+		}
+		if (hasParam(req, "successEmails")) {
+			String emails = getParam(req, "successEmails");
+			String[] emailSplit = emails.split("\\s*,\\s*|\\s*;\\s*|\\s+");
+			flowOptions.setSuccessEmails(Arrays.asList(emailSplit));
+		}
+		if (hasParam(req, "notifyFailureFirst")) {
+			flowOptions.setNotifyOnFirstFailure(Boolean.parseBoolean(getParam(req, "notifyFailureFirst")));
+		}
+		if (hasParam(req, "notifyFailureLast")) {
+			flowOptions.setNotifyOnLastFailure(Boolean.parseBoolean(getParam(req, "notifyFailureLast")));
+		}
+		if (hasParam(req, "executingJobOption")) {
+			//String option = getParam(req, "jobOption");
+			// Not set yet
+		}
+		
+		Map<String, String> flowParamGroup = this.getParamGroup(req, "flowOverride");
+		flowOptions.setFlowOverride(flowParamGroup);
+		
+		if (hasParam(req, "disabledJobs")) {
+			String disable = getParam(req, "disabledJobs");
+			String[] disableSplit = disable.split("\\s*,\\s*|\\s*;\\s*|\\s+");
+			List<String> jobs = (List<String>) Arrays.asList(disableSplit);
+			flowOptions.setDisabledJobs(jobs.subList(1, jobs.size()));
+		}
+		return flowOptions;
+	}
+
+	private DateTime parseDateTime(String scheduleDate, String scheduleTime) {
+		// scheduleTime: 12,00,pm,PDT
+		String[] parts = scheduleTime.split(",", -1);
+		int hour = Integer.parseInt(parts[0]);
+		int minutes = Integer.parseInt(parts[1]);
+		boolean isPm = parts[2].equalsIgnoreCase("pm");
+		
+		DateTimeZone timezone = parts[3].equals("UTC") ? DateTimeZone.UTC : DateTimeZone.forID("America/Los_Angeles");
+
+		// scheduleDate: 02/10/2013
+		DateTime day = null;
+		if(scheduleDate == null || scheduleDate.trim().length() == 0) {
+			day = new LocalDateTime().toDateTime();
+		} else {
+			day = DateTimeFormat.forPattern("MM/dd/yyyy").withZone(timezone).parseDateTime(scheduleDate);
+		}
+		
+		if(isPm && hour < 12)
+		    hour += 12;
+		hour %= 24;
+
+		DateTime firstSchedTime = day.withHourOfDay(hour).withMinuteOfHour(minutes).withSecondOfMinute(0);
+
+		return firstSchedTime;
+	}
 
 	private boolean hasPermission(Project project, User user, Permission.Type type) {
 		if (project.hasPermission(user, type)) {
diff --git a/src/java/azkaban/webapp/servlet/velocity/flowpage.vm b/src/java/azkaban/webapp/servlet/velocity/flowpage.vm
index e28dab6..102c43f 100644
--- a/src/java/azkaban/webapp/servlet/velocity/flowpage.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/flowpage.vm
@@ -34,6 +34,7 @@
 		<script type="text/javascript" src="${context}/js/azkaban.flow.graph.view.js"></script>
 		<script type="text/javascript" src="${context}/js/azkaban.flow.view.js"></script>
 		<script type="text/javascript" src="${context}/js/azkaban.exflow.options.view.js"></script>
+		<script type="text/javascript" src="${context}/js/azkaban.schedule.options.view.js"></script>
 		<script type="text/javascript" src="${context}/js/svgNavigate.js"></script>
 		<script type="text/javascript">
 			var contextURL = "${context}";
@@ -131,6 +132,7 @@
 					</div>
 				</div>
 		<!-- modal content -->
+	
 				<div id="schedule-flow" class="modal">
 						<h3>Schedule Flow</h3>
 						<div id="errorMsg" class="box-error-message">$errorMsg</div>
@@ -173,21 +175,23 @@
 										</form>
 						</div>
 								
-
 						<div class="actions">
-								<a class="yes btn2" id="schedule-btn" href="#">Schedule The Flow</a>
-								<a class="no simplemodal-close btn3" href="#">Cancel</a>
+							<a class="yes btn2" id="schedule-btn" href="#">Schedule The Flow</a>
+							<a class="no simplemodal-close btn3" href="#">Cancel</a>
+							<a class="btn2" id="adv-schedule-opt-btn" href="#">Advanced Schedule Options</a>
 						</div>
 				</div>
 				<div id="invalid-session" class="modal">
-						<h3>Invalid Session</h3>
+					<h3>Invalid Session</h3>
 						<p>Session has expired. Please re-login.</p>
 						<div class="actions">
-								<a class="yes btn3" id="login-btn" href="#">Re-login</a>
+							<a class="yes btn3" id="login-btn" href="#">Re-login</a>
 						</div>
 				</div>
-				
+#parse( "azkaban/webapp/servlet/velocity/scheduleoptionspanel.vm" )
+
 #parse( "azkaban/webapp/servlet/velocity/executionoptionspanel.vm" )
+
 #end
 		<ul id="jobMenu" class="contextMenu">  
 			<li class="open"><a href="#open">Open...</a></li>
@@ -214,6 +218,26 @@
 			</ul>
 		</ul>
 
+		<ul id="scheduleDisableJobMenu" class="contextMenu flowSubmenu">  
+			<li class="openwindow"><a href="#scheduleOpenwindow">Open in New Window...</a></li>
+			<li id="scheduleDisable" class="disable separator"><a href="#disable">Disable</a><div id="scheduleDisableArrow" class="context-sub-icon"></div></li>
+			<ul id="scheduleDisableSub" class="subMenu">
+				<li class="disableAll"><a href="#disableAll">All</a></li>
+				<li class="parents"><a href="#disableParents">Parents</a></li>
+				<li class="ancestors"><a href="#disableAncestors">All Ancestors</a></li>
+				<li class="children"><a href="#disableChildren">Children</a></li>
+				<li class="decendents"><a href="#disableDescendents">All Descendents</a></li>
+			</ul>
+			<li id="scheduleEnable" class="enable"><a href="#enable">Enable</a> <div id="scheduleEnableArrow" class="context-sub-icon"></div></li>
+			<ul id="scheduleEnableSub" class="subMenu">
+				<li class="enableAll"><a href="#enableAll">All</a></li>
+				<li class="parents"><a href="#enableParents">Parents</a></li>
+				<li class="ancestors"><a href="#enableAncestors">All Ancestors</a></li>
+				<li class="children"><a href="#enableChildren">Children</a></li>
+				<li class="decendents"><a href="#enableDescendents">All Descendents</a></li>
+			</ul>
+		</ul>
+
 		</div>
 	</body>
 </html>
diff --git a/src/java/azkaban/webapp/servlet/velocity/scheduledflowpage.vm b/src/java/azkaban/webapp/servlet/velocity/scheduledflowpage.vm
index 09d387a..64d5a9d 100644
--- a/src/java/azkaban/webapp/servlet/velocity/scheduledflowpage.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/scheduledflowpage.vm
@@ -18,8 +18,12 @@
 <html>
 	<head>
 #parse( "azkaban/webapp/servlet/velocity/style.vm" )
-		<script type="text/javascript" src="${context}/js/jquery/jquery.js"></script>    
-		<script type="text/javascript" src="${context}/js/jqueryui/jquery-ui.custom.min.js"></script>
+		<link rel="stylesheet" type="text/css" href="${context}/css/jquery-ui-timepicker-addon.css" />
+		<link rel="stylesheet" type="text/css" href="${context}/css/jquery-ui.css" />
+		<script type="text/javascript" src="${context}/js/jquery/jquery-1.8.3.min.js"></script>  
+		<script type="text/javascript" src="${context}/js/jqueryui/jquery-ui-1.9.2.custom.min.js"></script>  
+		<script type="text/javascript" src="${context}/js/jqueryui/jquery-ui-timepicker-addon.js"></script> 
+		<script type="text/javascript" src="${context}/js/jqueryui/jquery-ui-sliderAccess.js"></script>
 		<script type="text/javascript" src="${context}/js/namespace.js"></script>
 		<script type="text/javascript" src="${context}/js/underscore-1.2.1-min.js"></script>
 		<script type="text/javascript" src="${context}/js/backbone-0.5.3-min.js"></script>
@@ -32,7 +36,6 @@
 			var timezone = "${timezone}";
 			var errorMessage = null;
 			var successMessage = null;
-	
 		</script>
 	</head>
 	<body>
@@ -43,12 +46,12 @@
 		<div class="content">
 		
 #if($errorMsg)
-                                <div class="box-error-message">$errorMsg</div>
+		<div class="box-error-message">$errorMsg</div>
 #else
 #if($error_message != "null")
-                                <div class="box-error-message">$error_message</div>
+		<div class="box-error-message">$error_message</div>
 #elseif($success_message != "null")
-                                <div class="box-success-message">$success_message</div>
+		<div class="box-success-message">$success_message</div>
 #end
 #end		
 		
@@ -65,12 +68,12 @@
 							<!--th class="execid">Execution Id</th-->
 							<th>Flow</th>
 							<th>Project</th>
-							<th>User</th>
 							<th>Submitted By</th>
 							<th class="date">First Scheduled to Run</th>
 							<th class="date">Next Execution Time</th>
 							<th class="date">Repeats Every</th>
-							<th class="action">Action</th>
+							<th>Has SLA</th>
+							<th colspan="2" class="action">Action</th>
 						</tr>
 					</thead>
 					<tbody>
@@ -85,11 +88,12 @@
 								<a href="${context}/manager?project=${sched.projectName}">${sched.projectName}</a>
 							</td>
 							<td>${sched.submitUser}</td>
-							<td>${sched.submitUser}</td>
 							<td>$utils.formatDateTime(${sched.firstSchedTime})</td>
 							<td>$utils.formatDateTime(${sched.nextExecTime})</td>
 							<td>$utils.formatPeriod(${sched.period})</td>
+							<td>#if(${sched.slaOptions}) true #else false #end</td>
 							<td><button id="removeSchedBtn" onclick="removeSched(${sched.projectId}, '${sched.flowName}')" >Remove Schedule</button></td>
+							<td><button id="addSlaBtn" onclick="slaView.initFromSched(${sched.projectId}, '${sched.flowName}')" >Set SLA</button></td>
 						</tr>
 #end
 #else
@@ -97,8 +101,74 @@
 #end
 					</tbody>
 				</table>
-				
-				
+			</div>
+
+		<!-- modal content -->
+
+		<div id="slaModalBackground" class="modalBackground2">
+			<div id="sla-options" class="modal modalContainer2">
+				<a href='#' title='Close' class='modal-close'>x</a>
+					<h3>SLA Options</h3>
+					<div>
+						<ul class="optionsPicker">
+							<li id="slaOptions">General SLA Options</li>
+						</ul>
+					</div>
+					<div class="optionsPane">
+						<div id="generalPanel" class="generalPanel panel">
+							<div id="slaActions">
+								<h4>SLA Alert Emails</h4>
+								<dl>
+									<dt >SLA Alert Emails</dt>
+									<dd>
+										<textarea id="slaEmails"></textarea>
+									</dd>
+								</dl>
+							</div>
+							<br></br>
+							<div id="slaRules">
+								<h4>Flow SLA Rules</h4>
+								<div class="tableDiv">
+									<table id="flowRulesTbl">
+										<thead>
+											<tr>
+												<th>Flow/Job</th>
+												<th>Sla Rule</th>
+												<th>Duration</th>
+												<th>Email Action</th>
+												<th>Kill Action</th>
+											</tr>
+										</thead>
+										<tbody>
+											<tr id="addRow"><td id="addRow-col" colspan="5"><span class="addIcon"></span><a href="#">Add Row</a></td></tr>
+										</tbody>
+									</table>
+								</div>
+								<!--h4 style="visibility: hidden">Job SLA Rules</h4>
+								<div class="tableDiv" style="visibility: hidden">
+									<table id="jobRulesTbl">
+										<thead>
+											<tr>
+												<th>Flow/Job Id</th>
+												<th>Finish In</th>
+												<th>Duration</th>
+												<th>Email Action</th>
+												<th>Kill Action</th>
+											</tr>
+										</thead>
+										<tbody>
+											<tr id="addRow"><td id="addRow-col" colspan="4"><span class="addIcon"></span><a href="#">Add Row</a></td></tr>
+										</tbody>
+									</table>
+								</div-->
+							</div>
+						</div>
+					</div>
+					<div class="actions">
+						<!--a class="yes btn1" id="remove-sla-btn" href="#">Remove SLA</a-->
+						<a class="yes btn1" id="set-sla-btn" href="#">Set/Change SLA</a>
+						<a class="no simplemodal-close btn3" id="sla-cancel-btn" href="#">Cancel</a>
+					</div>
 			</div>
 		</div>
 	</body>
diff --git a/src/java/azkaban/webapp/servlet/velocity/scheduleoptionspanel.vm b/src/java/azkaban/webapp/servlet/velocity/scheduleoptionspanel.vm
new file mode 100644
index 0000000..fc6bc91
--- /dev/null
+++ b/src/java/azkaban/webapp/servlet/velocity/scheduleoptionspanel.vm
@@ -0,0 +1,159 @@
+<div id="scheduleModalBackground" class="modalBackground2">
+	<div id="schedule-options" class="modal modalContainer2">
+		<a href='#' title='Close' class='modal-close'>x</a>
+			<h3>Schedule Flow Options</h3>
+			<div>
+				<ul class="optionsPicker">
+					<li id="scheduleGeneralOptions">General Options</li>
+					<li id="scheduleFlowOptions">Flow Options</li>
+					<!--li id="scheduleSlaOptions">SLA Options</li-->
+				</ul>
+			</div>
+			<div class="optionsPane">
+				<!--div id="scheduleSlaPanel" class="generalPanel panel">
+					<div id="slaActions">
+						<h4>SLA Alert Emails</h4>
+						<dl>
+							<dt >SLA Alert Emails</dt>
+							<dd>
+								<textarea id="slaEmails"></textarea>
+							</dd>
+						</dl>
+					</div>
+					<div id="slaRules">
+						<h4>Flow SLA Rules</h4>
+						<div class="tableDiv">
+							<table id="flowRulesTbl">
+								<thead>
+									<tr>
+										<th>Flow/Job</th>
+										<th>Finish In</th>
+										<th>Email Action</th>
+										<th>Kill Action</th>
+									</tr>
+								</thead>
+								<tbody>
+								</tbody>
+							</table>
+						</div>
+						<h4>Job SLA Rules</h4>
+						<div class="tableDiv">
+							<table id="jobRulesTbl">
+								<thead>
+									<tr>
+										<th>Flow/Job</th>
+										<th>Finish In</th>
+										<th>Email Action</th>
+										<th>Kill Action</th>
+									</tr>
+								</thead>
+								<tbody>
+								</tbody>
+							</table>
+						</div>
+					</div>
+				</div-->
+				<div id="scheduleGeneralPanel" class="generalPanel panel">
+					<div id="scheduleInfo">
+						<dl>
+							<dt>Schedule Time</dt>
+							<dd>
+								<input id="advhour" type="text" size="2" value="12"/>
+								<input id="advminutes" type="text" size="2" value="00"/>
+								<select id="advam_pm">
+								  <option>pm</option>
+								  <option>am</option>
+								</select>
+								<select id="advtimezone">
+								  <option>PDT</option>
+								  <option>UTC</option>
+								</select>
+							</dd>
+							<dt>Schedule Date</dt><dd><input type="text" id="advdatepicker" /></dd>
+							<dt>Recurrence</dt>
+							<dd>
+								<input id="advis_recurring" type="checkbox" checked  />
+								<span>repeat every</span>
+							 	<input id="advperiod" type="text" size="2" value="1"/>
+								<select id="advperiod_units">
+								  <option value="d">Days</option>
+								  <option value="h">Hours</option>
+								  <option value="m">Minutes</option>
+								  <option value="M">Months</option>
+								  <option value="w">Weeks</option>
+								</select>
+							</dd>
+						</dl>
+					</div>
+					<br></br>
+					<br></br>
+					<div id="scheduleCompleteActions">
+						<h4>Completion Actions</h4>
+						<dl>
+							<dt>Failure Action</dt>
+							<dd>
+								<select id="scheduleFailureAction" name="failureAction">
+									<option value="finishCurrent">Finish Current Running</option>
+									<option value="cancelImmediately">Cancel All</option>
+									<option value="finishPossible">Finish All Possible</option>
+								</select>
+											</dd>
+							<dt>Failure Email</dt>
+							<dd>
+								<textarea id="scheduleFailureEmails"></textarea>
+							</dd>
+							<dt>Notify on Failure</dt>
+							<dd>
+								<input id="scheduleNotifyFailureFirst" class="checkbox" type="checkbox" name="notify" value="first" checked >First Failure</input>
+								<input id="scheduleNotifyFailureLast" class="checkbox" type="checkbox" name="notify" value="last">Flow Stop</input>
+							</dd>
+							<dt>Success Email</dt>
+							<dd>
+								<textarea id="scheduleSuccessEmails"></textarea>
+							</dd>
+							<dt>Concurrent Execution</dt>
+							<dd id="scheduleExecutingJob" class="disabled">
+								<input id="scheduleIgnore" class="radio" type="radio" name="concurrent" value="ignore" checked /><label class="radioLabel" for="ignore">Run Concurrently</label>
+								<input id="schedulePipeline" class="radio" type="radio" name="concurrent" value="pipeline" /><label class="radioLabel" for="pipeline">Pipeline</label>
+								<input id="scheduleQueue" class="radio" type="radio" name="concurrent" value="queue" /><label class="radioLabel" for="queue">Queue Job</label>
+							</dd>
+						</dl>
+					</div>
+					<div id="scheduleFlowPropertyOverride">
+						<h4>Flow Property Override</h4>
+						<div class="tableDiv">
+							<table>
+								<thead>
+									<tr>
+										<th>Name</th>
+										<th>Value</th>
+									</tr>
+								</thead>
+								<tbody>
+									<tr id="scheduleAddRow"><td id="addRow-col" colspan="2"><span class="addIcon"></span><a href="#">Add Row</a></td></tr>
+								</tbody>
+							</table>
+						</div>
+					</div>
+				</div>
+				<div id="scheduleGraphPanel" class="graphPanel panel">
+					<div id="scheduleJobListCustom" class="jobList">
+						<div class="filterList">
+							<input class="filter" placeholder="  Job Filter" />
+						</div>
+						<div class="list">
+						</div>
+						<div class="btn5 resetPanZoomBtn" >Reset Pan Zoom</div>
+						</div>
+						<div id="scheduleSvgDivCustom" class="svgDiv" >
+							<svg class="svgGraph" xmlns="http://www.w3.org/2000/svg" version="1.1" shape-rendering="optimize-speed" text-rendering="optimize-speed" >
+							</svg>
+						</div>
+				</div>
+		</div>
+		<div class="actions">
+	<a class="yes btn1" id="adv-schedule-btn" href="#">Schedule</a>
+	<a class="no simplemodal-close btn3" id="schedule-cancel-btn" href="#">Cancel</a>
+	</div>
+	</div>
+</div>
\ No newline at end of file
diff --git a/src/sql/create_schedule_table.sql b/src/sql/create_schedule_table.sql
index defadce..381ce6d 100644
--- a/src/sql/create_schedule_table.sql
+++ b/src/sql/create_schedule_table.sql
@@ -12,6 +12,8 @@ CREATE TABLE schedules (
 	next_exec_time BIGINT,
 	submit_time BIGINT,
 	submit_user VARCHAR(128),
+	enc_type TINYINT,
+    options LONGBLOB,
 	primary key(project_id, flow_name)
 ) ENGINE=InnoDB;
 
diff --git a/src/sql/create_sla_table.sql b/src/sql/create_sla_table.sql
new file mode 100644
index 0000000..9914a03
--- /dev/null
+++ b/src/sql/create_sla_table.sql
@@ -0,0 +1,10 @@
+DROP TABLE if exists active_sla;
+CREATE TABLE active_sla (
+	exec_id INT NOT NULL,
+	job_name VARCHAR(128) NOT NULL,
+	check_time BIGINT NOT NULL,
+	rule TINYINT NOT NULL,
+	enc_type TINYINT,
+	options LONGBLOB NOT NULL,
+	primary key(exec_id, job_name, check_time, rule)
+) ENGINE=InnoDB;

src/web/css/azkaban.css 292(+277 -15)

diff --git a/src/web/css/azkaban.css b/src/web/css/azkaban.css
index 80f0f17..67d3287 100644
--- a/src/web/css/azkaban.css
+++ b/src/web/css/azkaban.css
@@ -1346,8 +1346,8 @@ tr:hover td {
 	bottom: 0px;
 }
 
-#graphPanel {
-	background-color: #F0F0F0;
+.radioLabel.disabled {
+	opacity: 0.3;
 }
 
 #executing-options {
@@ -1357,14 +1357,6 @@ tr:hover td {
 	bottom: 40px;
 }
 
-#scheduled {
-	
-}
-
-.radioLabel.disabled {
-	opacity: 0.3;
-}
-
 #executing-options .svgDiv {
 	position: absolute;
 	background-color: #CCC;
@@ -1447,6 +1439,189 @@ tr:hover td {
 	border-bottom: 1px solid #CCC;
 }
 
+#sla-options {
+	left: 100px;
+	right: 100px;
+	top: 50px;
+	bottom: 40px;
+}
+
+#sla-options .svgDiv {
+	position: absolute;
+	background-color: #CCC;
+	padding: 1px;
+	left: 270px;
+	right: 0px;
+	top: 0px;
+	bottom: 0px;
+}
+
+#sla-options .jobList {
+	position: absolute;
+	width: 255px;
+	top: 0px;
+	bottom: 0px;
+	padding: 5px;
+	background-color: #F0F0F0;
+}
+
+#sla-options .list {
+	width: 255px;
+}
+
+#sla-options ul.optionsPicker {
+	margin-left: 30px;
+}
+
+#sla-options ul.optionsPicker li {
+	float: left;
+	font-size: 12pt;
+	font-weight: bold;
+	margin-right: 15px;
+	cursor: pointer;
+	color: #CCC;
+}
+
+#sla-options ul.optionsPicker li.selected {
+	text-decoration: underline;
+	color: #000;
+}
+
+#sla-options ul.optionsPicker li.selected:hover {
+	color: #000;
+}
+
+#sla-options ul.optionsPicker li:hover {
+	color: #888;
+}
+
+#sla-options .optionsPane {
+	position: absolute;
+	top: 85px;
+	background-color: #FFF;
+	left: 0px;
+	right: 0px;
+	bottom: 0px;
+}
+
+#sla-options .panel {
+	position: absolute;
+	width: 100%;
+	top: 0px;
+	bottom: 65px;
+}
+
+#sla-options .generalPanel.panel {
+	background-color: #F4F4F4;
+	padding-top: 15px;
+}
+
+#sla-options h3 {
+	margin-left: 20px;
+	font-size: 14pt;
+	border-bottom: 1px solid #CCC;
+}
+
+#sla-options h4 {
+	margin-left: 20px;
+	font-size: 12pt;
+	border-bottom: 1px solid #CCC;
+}
+
+
+#schedule-options {
+	left: 100px;
+	right: 100px;
+	top: 50px;
+	bottom: 40px;
+}
+
+#schedule-options .svgDiv {
+	position: absolute;
+	background-color: #CCC;
+	padding: 1px;
+	left: 270px;
+	right: 0px;
+	top: 0px;
+	bottom: 0px;
+}
+
+#schedule-options .jobList {
+	position: absolute;
+	width: 255px;
+	top: 0px;
+	bottom: 0px;
+	padding: 5px;
+	background-color: #F0F0F0;
+}
+
+#schedule-options .list {
+	width: 255px;
+}
+
+#schedule-options ul.optionsPicker {
+	margin-left: 30px;
+}
+
+#schedule-options ul.optionsPicker li {
+	float: left;
+	font-size: 12pt;
+	font-weight: bold;
+	margin-right: 15px;
+	cursor: pointer;
+	color: #CCC;
+}
+
+#schedule-options ul.optionsPicker li.selected {
+	text-decoration: underline;
+	color: #000;
+}
+
+#schedule-options ul.optionsPicker li.selected:hover {
+	color: #000;
+}
+
+#schedule-options ul.optionsPicker li:hover {
+	color: #888;
+}
+
+#schedule-options .optionsPane {
+	position: absolute;
+	top: 85px;
+	background-color: #FFF;
+	left: 0px;
+	right: 0px;
+	bottom: 0px;
+}
+
+#schedule-options .panel {
+	position: absolute;
+	width: 100%;
+	top: 0px;
+	bottom: 65px;
+}
+
+#schedule-options .generalPanel.panel {
+	background-color: #F4F4F4;
+	padding-top: 15px;
+}
+
+#schedule-options h3 {
+	margin-left: 20px;
+	font-size: 14pt;
+	border-bottom: 1px solid #CCC;
+}
+
+#schedule-options h4 {
+	margin-left: 20px;
+	font-size: 12pt;
+	border-bottom: 1px solid #CCC;
+}
+
+#graphPanel {
+	background-color: #F0F0F0;
+}
+
 #generalPanel {
 	overflow: auto;
 }
@@ -1552,6 +1727,97 @@ tr:hover td {
 	top: 100px;
 }
 
+#scheduleGraphPanel {
+	background-color: #F0F0F0;
+}
+
+#scheduleGeneralPanel {
+	overflow: auto;
+}
+
+#scheduleGeneralPanel dt {
+	width: 150px;
+	font-size: 10pt;
+	font-weight: bold;
+	margin-top: 5px;
+}
+
+#scheduleGeneralPanel textarea {
+	width: 500px;
+}
+
+#scheduleGeneralPanel table #addRow {
+	cursor: pointer;
+}
+
+#scheduleGeneralPanel table tr {
+	height: 24px;
+}
+
+#scheduleGeneralPanel table .editable {
+
+}
+
+#scheduleGeneralPanel table .editable input {
+	border: 1px solid #009FC9;
+	height: 16px;
+}
+
+#scheduleGeneralPanel table .name {
+	width: 40%;
+}
+
+#scheduleGeneralPanel span.addIcon {
+	display: block;
+	width: 16px;
+	height: 16px;
+	background-image: url("./images/addIcon.png");
+}
+
+#scheduleGeneralPanel span.removeIcon {
+	display: block;
+	visibility:hidden;
+	disabled: true;
+	width: 16px;
+	height: 16px;
+	background-image: url("./images/removeIcon.png");
+	cursor: pointer;
+}
+
+#scheduleGeneralPanel .editable:hover span.removeIcon {
+	visibility:visible;
+}
+
+#scheduleGeneralPanel {
+}
+
+#scheduleGeneralPanel span {
+	float: left;
+	margin-left: 5px;
+}
+
+#scheduleGeneralPanel dd {
+	font-size: 10pt;
+}
+
+#scheduleFlowPropertyOverride {
+	clear: both;
+	padding-top: 30px;
+}
+
+#scheduleFlowPropertyOverride .tableDiv {
+	padding-right: 20px;
+	padding-left: 20px;
+}
+
+#scheduleJobList {
+	position: absolute;
+	top: 0px;
+	left: 0px;
+	height: 100%;
+	width: 250px;
+}
+
 .filter {
 	width: 100%;
 }
@@ -1581,7 +1847,7 @@ tr:hover td {
 	background-position: 16px 0px;
 }
 
-.list ul li:hover{
+.list ul li:hover {
 	background-color: #E1E3E2;
 	color: #009FC9;
 }
@@ -2371,10 +2637,6 @@ tr.row td.tb-name {
 	cursor: pointer;
 }
 
-.job-hover-menu {
-	padding-top: 1px; 
-}
-
 .azkaban-charts .expandable-hitarea { background-position: -32px -16px; }
 .azkaban-charts .expandable-hitarea.collapse { background-position: 0 -16px; }
 /* clean up */
diff --git a/src/web/js/azkaban.exflow.options.view.js b/src/web/js/azkaban.exflow.options.view.js
index 57ff096..5f70a97 100644
--- a/src/web/js/azkaban.exflow.options.view.js
+++ b/src/web/js/azkaban.exflow.options.view.js
@@ -316,7 +316,7 @@ azkaban.ExecuteFlowView = Backbone.View.extend({
 			$(curTarget).append(input);
 			$(input).focus();
 			this.editingTarget = curTarget;
-	  	}
+			}
 	  },
 	  handleRemoveColumn : function(evt) {
 	  	var curTarget = evt.currentTarget;
@@ -477,4 +477,4 @@ azkaban.ExecuteFlowView = Backbone.View.extend({
 				cloneModel.trigger("change:disabled");
 			}
 		}
-});
\ No newline at end of file
+});
diff --git a/src/web/js/azkaban.flow.view.js b/src/web/js/azkaban.flow.view.js
index 0624033..584e3a4 100644
--- a/src/web/js/azkaban.flow.view.js
+++ b/src/web/js/azkaban.flow.view.js
@@ -280,82 +280,6 @@ azkaban.GraphModel = Backbone.Model.extend({});
 var executionModel;
 azkaban.ExecutionModel = Backbone.Model.extend({});
 
-var scheduleFlowView;
-azkaban.ScheduleFlowView = Backbone.View.extend({
-  events : {
-    "click #schedule-btn": "handleScheduleFlow"
-  },
-  initialize : function(settings) {
-  	$( "#datepicker" ).datepicker();
-  	$( "#datepicker" ).datepicker('setDate', new Date());
-    $("#errorMsg").hide();
-  },
-  handleScheduleFlow : function(evt) {
-         // First make sure we can upload
-//     var projectName = $('#path').val();
-     var description = $('#description').val();
-
-     var hourVal = $('#hour').val();
-     var minutesVal = $('#minutes').val();
-     var ampmVal = $('#am_pm').val();
-     var timezoneVal = $('#timezone').val();
-     var dateVal = $('#datepicker').val();
-     var is_recurringVal = $('#is_recurring').val();
-     var periodVal = $('#period').val();
-     var periodUnits = $('#period_units').val();
-
-     console.log("Creating schedule for "+projectName+"."+flowName);
-     $.ajax({
-        async: "false",
-        url: "schedule",
-        dataType: "json",
-        type: "POST",
-        data: {
-		action:"scheduleFlow", 
-
-		projectId:projectId,
-		projectName:projectName, 
-		flowName:flowName,
-		hour:hourVal,
-		minutes:minutesVal,
-		am_pm:ampmVal,
-		timezone:timezoneVal,
-		date:dateVal,
-		userExec:"dummy",
-		is_recurring:is_recurringVal,
-		period:periodVal,
-		period_units:periodUnits
-		},
-        success: function(data) {
-                if (data.status == "success") {
-			console.log("Successfully scheduled for "+projectName+"."+flowName);
-                        if (data.action == "redirect") {
-                                window.location = contextURL + "/manager?project=" + projectName + "&flow=" + flowName ;
-                        }
-			else{
-				$("#success_message").text("Flow " + projectName + "." + flowName + " scheduled!" );			
- 				window.location = contextURL + "/manager?project=" + projectName + "&flow=" + flowName ; 
-			}
-                }
-                else {
-                        if (data.action == "login") {
-                                        window.location = "";
-                        }
-                        else {
-                                $("#errorMsg").text("ERROR: " + data.message);
-                                $("#errorMsg").slideDown("fast");
-                        }
-                }
-        }
-     });
-
-  },
-  render: function() {
-	  
-  }
-});
-
-
 $(function() {
 	var selected;
 	// Execution model has to be created before the window switches the tabs.
@@ -367,8 +291,10 @@ $(function() {
 	graphModel = new azkaban.GraphModel();
 	svgGraphView = new azkaban.SvgGraphView({el:$('#svgDiv'), model: graphModel, rightClick: {id: 'jobMenu', callback: handleJobMenuClick}});
 	jobsListView = new azkaban.JobListView({el:$('#jobList'), model: graphModel, rightClick: {id: 'jobMenu', callback: handleJobMenuClick}});
-	scheduleFlowView = new azkaban.ScheduleFlowView({el:$('#schedule-flow')});
+	scheduleFlowView = new azkaban.ScheduleFlowView({el:$('#schedule-flow'),   model: graphModel});
+	advancedScheduleView = new azkaban.AdvancedScheduleView({el:$('#schedule-options'), model: graphModel});
 	executeFlowView = new azkaban.ExecuteFlowView({el:$('#executing-options'), model: graphModel});
+	
 	var requestURL = contextURL + "/manager";
 
 	// Set up the Flow options view. Create a new one every time :p
@@ -435,20 +361,22 @@ $(function() {
 	      "json"
 	    );
 	    
+	    
 	$('#scheduleflowbtn').click( function() {
-	  console.log("schedule button clicked");
-	  $('#schedule-flow').modal({
-          closeHTML: "<a href='#' title='Close' class='modal-close'>x</a>",
-          position: ["20%",],
-          containerId: 'confirm-container',
-          containerCss: {
-            'height': '220px',
-            'width': '500px'
-          },
-          onShow: function (dialog) {
-            var modal = this;
-            $("#errorMsg").hide();
-          }
-        });
+		console.log("schedule button clicked");
+		$('#schedule-flow').modal({
+			closeHTML: "<a href='#' title='Close' class='modal-close'>x</a>",
+			position: ["20%",],
+			containerId: 'confirm-container',
+			containerCss: {
+			'height': '220px',
+			'width': '500px'
+			},
+			onShow: function (dialog) {
+				var modal = this;
+				$("#errorMsg").hide();
+			}
+		});
 	});
+	
 });
diff --git a/src/web/js/azkaban.schedule.options.view.js b/src/web/js/azkaban.schedule.options.view.js
new file mode 100644
index 0000000..5b9b760
--- /dev/null
+++ b/src/web/js/azkaban.schedule.options.view.js
@@ -0,0 +1,586 @@
+/*
+ * Copyright 2012 LinkedIn, Inc
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+$.namespace('azkaban');
+
+var scheduleCustomSvgGraphView;
+var scheduleCustomJobListView;
+
+var scheduleFlowView;
+
+var scheduleFlowData;
+
+//function recurseAllAncestors(nodes, disabledMap, id, disable) {
+//	var node = nodes[id];
+//	
+//	if (node.inNodes) {
+//		for (var key in node.inNodes) {
+//			disabledMap[key] = disable;
+//			recurseAllAncestors(nodes, disabledMap, key, disable);
+//		}
+//	}
+//}
+//
+//function recurseAllDescendents(nodes, disabledMap, id, disable) {
+//	var node = nodes[id];
+//	
+//	if (node.outNodes) {
+//		for (var key in node.outNodes) {
+//			disabledMap[key] = disable;
+//			recurseAllDescendents(nodes, disabledMap, key, disable);
+//		}
+//	}
+//}
+//
+azkaban.ScheduleContextMenu = Backbone.View.extend({
+	events : {
+		"click #scheduleDisableArrow" : "handleDisabledClick",
+		"click #scheduleEnableArrow" : "handleEnabledClick"
+	},
+	initialize: function(settings) {
+		$('#scheduleDisableSub').hide();
+		$('#scheduleEnableSub').hide();
+	},
+	handleEnabledClick: function(evt) {
+		if(evt.stopPropagation) {
+			evt.stopPropagation();
+		}
+		evt.cancelBubble=true;
+		
+		if (evt.currentTarget.expanded) {
+			evt.currentTarget.expanded=false;
+			$('#scheduleEnableArrow').removeClass('collapse');
+			$('#scheduleEnableSub').hide();
+		}
+		else {
+			evt.currentTarget.expanded=true;
+			$('#scheduleEnableArrow').addClass('collapse');
+			$('#scheduleEnableSub').show();
+		}
+	},
+	handleDisabledClick: function(evt) {
+		if(evt.stopPropagation) {
+			evt.stopPropagation();
+		}
+		evt.cancelBubble=true;
+		
+		if (evt.currentTarget.expanded) {
+			evt.currentTarget.expanded=false;
+			$('#scheduleDisableArrow').removeClass('collapse');
+			$('#scheduleDisableSub').hide();
+		}
+		else {
+			evt.currentTarget.expanded=true;
+			$('#scheduleDisableArrow').addClass('collapse');
+			$('#scheduleDisableSub').show();
+		}
+	}
+});
+
+azkaban.ScheduleFlowView = Backbone.View.extend({
+	events : {
+	"click #schedule-btn": "handleScheduleFlow",
+	"click #adv-schedule-opt-btn": "handleAdvancedSchedule"
+	},
+	initialize : function(settings) {
+		$( "#datepicker" ).datepicker();
+		$( "#datepicker" ).datepicker('setDate', new Date());
+		$("#errorMsg").hide();
+	},
+	handleAdvancedSchedule : function(evt) {
+		console.log("Clicked advanced schedule options button");
+		//$('#confirm-container').hide();
+		$.modal.close();
+		advancedScheduleView.show();
+	},
+	handleScheduleFlow : function(evt) {
+
+	var hourVal = $('#hour').val();
+	var minutesVal = $('#minutes').val();
+	var ampmVal = $('#am_pm').val();
+	var timezoneVal = $('#timezone').val();
+	var dateVal = $('#datepicker').val();
+	var is_recurringVal = $('#is_recurring').val();
+	var periodVal = $('#period').val();
+	var periodUnits = $('#period_units').val();
+
+	console.log("Creating schedule for "+projectName+"."+flowName);
+	$.ajax({
+		async: "false",
+		url: "schedule",
+		dataType: "json",
+		type: "POST",
+		data: {
+			action:"scheduleFlow", 
+			projectId:projectId,
+			projectName:projectName, 
+			flowName:flowName,
+			hour:hourVal,
+			minutes:minutesVal,
+			am_pm:ampmVal,
+			timezone:timezoneVal,
+			date:dateVal,
+			userExec:"dummy",
+			is_recurring:is_recurringVal,
+			period:periodVal,
+			period_units:periodUnits
+			},
+		success: function(data) {
+			if (data.status == "success") {
+				console.log("Successfully scheduled for "+projectName+"."+flowName);
+				if (data.action == "redirect") {
+					window.location = contextURL + "/manager?project=" + projectName + "&flow=" + flowName ;
+				}
+				else{
+					$("#success_message").text("Flow " + projectName + "." + flowName + " scheduled!" );			
+	 				window.location = contextURL + "/manager?project=" + projectName + "&flow=" + flowName ; 
+				}
+			}
+			else {
+				if (data.action == "login") {
+					window.location = "";
+				}
+				else {
+					$("#errorMsg").text("ERROR: " + data.message);
+					$("#errorMsg").slideDown("fast");
+				}
+			}
+		}
+	});
+
+	},
+	render: function() {
+	}
+});
+
+azkaban.AdvancedScheduleView = Backbone.View.extend({
+	events : {
+		"click" : "closeEditingTarget",
+		"click #adv-schedule-btn": "handleAdvSchedule",
+		"click #schedule-cancel-btn": "handleCancel",
+		"click .modal-close": "handleCancel",
+		"click #scheduleGeneralOptions": "handleGeneralOptionsSelect",
+		"click #scheduleFlowOptions": "handleFlowOptionsSelect",
+		"click #scheduleAddRow": "handleAddRow",
+		"click table .editable": "handleEditColumn",
+		"click table .removeIcon": "handleRemoveColumn"
+	},
+	initialize: function(setting) {
+		this.contextMenu = new azkaban.ScheduleContextMenu({el:$('#scheduleDisableJobMenu')});
+		this.handleGeneralOptionsSelect();
+		$( "#advdatepicker" ).datepicker();
+		$( "#advdatepicker" ).datepicker('setDate', new Date());
+	},
+	show: function() {
+		$('#scheduleModalBackground').show();
+		$('#schedule-options').show();
+		this.handleGeneralOptionsSelect();
+
+		scheduleFlowData = this.model.clone();
+		this.flowData = scheduleFlowData;
+		var flowData = scheduleFlowData;
+		
+		var fetchData = {"project": projectName, "ajax":"flowInfo", "flow":flowName};
+
+		var executeURL = contextURL + "/executor";
+		this.executeURL = executeURL;
+		var scheduleURL = contextURL + "/schedule";
+		this.scheduleURL = scheduleURL;
+		var handleAddRow = this.handleAddRow;
+
+		var data = flowData.get("data");
+		var nodes = {};
+		for (var i=0; i < data.nodes.length; ++i) {
+			var node = data.nodes[i];
+			nodes[node.id] = node;
+		}
+
+		for (var i=0; i < data.edges.length; ++i) {
+			var edge = data.edges[i];
+			var fromNode = nodes[edge.from];
+			var toNode = nodes[edge.target];
+
+			if (!fromNode.outNodes) {
+				fromNode.outNodes = {};
+			}
+			fromNode.outNodes[toNode.id] = toNode;
+
+			if (!toNode.inNodes) {
+				toNode.inNodes = {};
+			}
+			toNode.inNodes[fromNode.id] = fromNode;
+		}
+		flowData.set({nodes: nodes});
+
+		var disabled = {};
+		for (var i = 0; i < data.nodes.length; ++i) {
+			var updateNode = data.nodes[i];
+			if (updateNode.status == "DISABLED" || updateNode.status == "SKIPPED") {
+				updateNode.status = "READY";
+				disabled[updateNode.id] = true;
+			}
+			if (updateNode.status == "SUCCEEDED" || updateNode.status=="RUNNING") {
+				disabled[updateNode.id] = true;
+			}
+		}
+		flowData.set({disabled: disabled});
+
+		$.get(
+			executeURL,
+			fetchData,
+			function(data) {
+				if (data.error) {
+					alert(data.error);
+				}
+				else {
+					if (data.successEmails) {
+						$('#scheduleSuccessEmails').val(data.successEmails.join());
+					}
+					if (data.failureEmails) {
+						$('#scheduleFailureEmails').val(data.failureEmails.join());
+					}
+					
+					if (data.failureAction) {
+						$('#scheduleFailureAction').val(data.failureAction);
+					}
+					if (data.notifyFailureFirst) {
+						$('#scheduleNotifyFailureFirst').attr('checked', true);
+					}
+					if (data.notifyFailureLast) {
+						$('#scheduleNotifyFailureLast').attr('checked', true);	
+					}
+					if (data.flowParam) {
+						var flowParam = data.flowParam;
+						for (var key in flowParam) {
+							var row = handleAddRow();
+							var td = $(row).find('td');
+							$(td[0]).text(key);
+							$(td[1]).text(flowParam[key]);
+						}
+					}
+
+					if (!data.running || data.running.length == 0) {
+						$(".radio").attr("disabled", "disabled");
+						$(".radioLabel").addClass("disabled", "disabled");
+					}
+				}
+			},
+			"json"
+		);
+	},
+	handleCancel: function(evt) {
+		$('#scheduleModalBackground').hide();
+		$('#schedule-options').hide();
+	},
+	handleGeneralOptionsSelect: function(evt) {
+		$('#scheduleFlowOptions').removeClass('selected');
+		$('#scheduleGeneralOptions').addClass('selected');
+
+		$('#scheduleGeneralPanel').show();	  	
+		$('#scheduleGraphPanel').hide();
+	},
+	handleFlowOptionsSelect: function(evt) {
+		$('#scheduleGeneralOptions').removeClass('selected');
+		$('#scheduleFlowOptions').addClass('selected');
+
+		$('#scheduleGraphPanel').show();	  	
+		$('#scheduleGeneralPanel').hide();
+
+		if (this.flowSetup) {
+			return;
+		}
+
+		scheduleCustomSvgGraphView = new azkaban.SvgGraphView({el:$('#scheduleSvgDivCustom'), model: scheduleFlowData, rightClick: {id: 'scheduleDisableJobMenu', callback: this.handleDisableMenuClick}});
+		scheduleCustomJobsListView = new azkaban.JobListView({el:$('#scheduleJobListCustom'), model: scheduleFlowData, rightClick: {id: 'scheduleDisableJobMenu', callback: this.handleDisableMenuClick}});
+		scheduleFlowData.trigger("change:graph");
+		
+		this.flowSetup = true;
+	},
+	handleAdvSchedule: function(evt) {
+		var scheduleURL = this.scheduleURL;
+		var disabled = this.flowData.get("disabled");
+		var disabledJobs = "";
+		for(var job in disabled) {
+			if(disabled[job] == true) {
+				disabledJobs += "," + job;
+			}
+		}
+		var failureAction = $('#scheduleFailureAction').val();
+		var failureEmails = $('#scheduleFailureEmails').val();
+		var successEmails = $('#scheduleSuccessEmails').val();
+		var notifyFailureFirst = $('#scheduleNotifyFailureFirst').is(':checked');
+		var notifyFailureLast = $('#scheduleNotifyFailureLast').is(':checked');
+		var executingJobOption = $('input:radio[name=gender]:checked').val();
+
+		
+		var scheduleTime = $('#advhour').val() + "," + $('#advminutes').val() + "," + $('#advam_pm').val() + "," + $('#advtimezone').val();
+		var scheduleDate = $('#advdatepicker').val();
+		var is_recurring = $('#advis_recurring').val();
+		var period = $('#advperiod').val() + $('#advperiod_units').val();
+
+		var flowOverride = {};
+		var editRows = $(".editRow");
+		for (var i = 0; i < editRows.length; ++i) {
+			var row = editRows[i];
+			var td = $(row).find('td');
+			var key = $(td[0]).text();
+			var val = $(td[1]).text();
+			
+			if (key && key.length > 0) {
+				flowOverride[key] = val;
+			}
+		}
+
+		var scheduleData = {
+			projectId:projectId,
+			projectName: projectName,
+			ajax: "advSchedule",
+			flowName: flowName,
+			scheduleTime: scheduleTime,
+			scheduleDate: scheduleDate,
+			is_recurring: is_recurring,
+			period: period,
+			disabledJobs: disabledJobs,
+			failureAction: failureAction,
+			failureEmails: failureEmails,
+			successEmails: successEmails,
+			notifyFailureFirst: notifyFailureFirst,
+			notifyFailureLast: notifyFailureLast,
+			executingJobOption: executingJobOption,
+			flowOverride: flowOverride
+		};
+
+		$.post(
+				scheduleURL,
+				scheduleData,
+				function(data) {
+					if (data.error) {
+						alert(data.error);
+					}
+					else {
+						window.location = scheduleURL;
+					}
+				},
+				"json"
+		)
+	},
+	handleAddRow: function(evt) {
+		var tr = document.createElement("tr");
+		var tdName = document.createElement("td");
+		var tdValue = document.createElement("td");
+
+		var icon = document.createElement("span");
+		$(icon).addClass("removeIcon");
+		var nameData = document.createElement("span");
+		$(nameData).addClass("spanValue");
+		var valueData = document.createElement("span");
+		$(valueData).addClass("spanValue");
+
+		$(tdName).append(icon);
+		$(tdName).append(nameData);
+		$(tdName).addClass("name");
+		$(tdName).addClass("editable");
+		
+		$(tdValue).append(valueData);
+		$(tdValue).addClass("editable");
+		
+		$(tr).addClass("editRow");
+		$(tr).append(tdName);
+		$(tr).append(tdValue);
+
+		$(tr).insertBefore("#scheduleAddRow");
+		return tr;
+	},
+	handleEditColumn : function(evt) {
+		var curTarget = evt.currentTarget;
+	
+		if (this.editingTarget != curTarget) {
+			this.closeEditingTarget();
+			
+			var text = $(curTarget).children(".spanValue").text();
+			$(curTarget).empty();
+						
+			var input = document.createElement("input");
+			$(input).attr("type", "text");
+			$(input).css("width", "100%");
+			$(input).val(text);
+			$(curTarget).addClass("editing");
+			$(curTarget).append(input);
+			$(input).focus();
+			this.editingTarget = curTarget;
+			}
+	},
+	handleRemoveColumn : function(evt) {
+		var curTarget = evt.currentTarget;
+		// Should be the table
+		var row = curTarget.parentElement.parentElement;
+		$(row).remove();
+	},
+	closeEditingTarget: function(evt) {
+		if (this.editingTarget != null && this.editingTarget != evt.target && this.editingTarget != evt.target.parentElement ) {
+			var input = $(this.editingTarget).children("input")[0];
+			var text = $(input).val();
+			$(input).remove();
+
+			var valueData = document.createElement("span");
+			$(valueData).addClass("spanValue");
+			$(valueData).text(text);
+
+			if ($(this.editingTarget).hasClass("name")) {
+				var icon = document.createElement("span");
+				$(icon).addClass("removeIcon");
+				$(this.editingTarget).append(icon);
+			}
+
+			$(this.editingTarget).removeClass("editing");
+			$(this.editingTarget).append(valueData);
+			this.editingTarget = null;
+		}
+	},
+	handleDisableMenuClick : function(action, el, pos) {
+		var flowData = scheduleFlowData;
+		var jobid = el[0].jobid;
+		var requestURL = contextURL + "/manager?project=" + projectName + "&flow=" + flowName + "&job=" + jobid;
+		if (action == "open") {
+			window.location.href = requestURL;
+		}
+		else if(action == "openwindow") {
+			window.open(requestURL);
+		}
+		else if(action == "disable") {
+			var disabled = flowData.get("disabled");
+	
+			disabled[jobid] = true;
+			flowData.set({disabled: disabled});
+			flowData.trigger("change:disabled");
+		}
+		else if(action == "disableAll") {
+			var disabled = flowData.get("disabled");
+	
+			var nodes = flowData.get("nodes");
+			for (var key in nodes) {
+				disabled[key] = true;
+			}
+
+			flowData.set({disabled: disabled});
+			flowData.trigger("change:disabled");
+		}
+		else if (action == "disableParents") {
+			var disabled = flowData.get("disabled");
+			var nodes = flowData.get("nodes");
+			var inNodes = nodes[jobid].inNodes;
+	
+			if (inNodes) {
+				for (var key in inNodes) {
+				  disabled[key] = true;
+				}
+			}
+			
+			flowData.set({disabled: disabled});
+			flowData.trigger("change:disabled");
+		}
+		else if (action == "disableChildren") {
+			var disabledMap = flowData.get("disabled");
+			var nodes = flowData.get("nodes");
+			var outNodes = nodes[jobid].outNodes;
+	
+			if (outNodes) {
+				for (var key in outNodes) {
+				  disabledMap[key] = true;
+				}
+			}
+			
+			flowData.set({disabled: disabledMap});
+			flowData.trigger("change:disabled");
+		}
+		else if (action == "disableAncestors") {
+			var disabled = flowData.get("disabled");
+			var nodes = flowData.get("nodes");
+			
+			recurseAllAncestors(nodes, disabled, jobid, true);
+			
+			flowData.set({disabled: disabled});
+			flowData.trigger("change:disabled");
+		}
+		else if (action == "disableDescendents") {
+			var disabled = flowData.get("disabled");
+			var nodes = flowData.get("nodes");
+			
+			recurseAllDescendents(nodes, disabled, jobid, true);
+			
+			flowData.set({disabled: disabled});
+			flowData.trigger("change:disabled");
+		}
+		else if(action == "enable") {
+			var disabled = flowData.get("disabled");
+	
+			disabled[jobid] = false;
+			flowData.set({disabled: disabled});
+			flowData.trigger("change:disabled");
+		}
+		else if(action == "enableAll") {
+			disabled = {};
+			flowData.set({disabled: disabled});
+			flowData.trigger("change:disabled");
+		}
+		else if (action == "enableParents") {
+			var disabled = flowData.get("disabled");
+			var nodes = flowData.get("nodes");
+			var inNodes = nodes[jobid].inNodes;
+	
+			if (inNodes) {
+				for (var key in inNodes) {
+				  disabled[key] = false;
+				}
+			}
+			
+			flowData.set({disabled: disabled});
+			flowData.trigger("change:disabled");
+		}
+		else if (action == "enableChildren") {
+			var disabled = flowData.get("disabled");
+			var nodes = flowData.get("nodes");
+			var outNodes = nodes[jobid].outNodes;
+	
+			if (outNodes) {
+				for (var key in outNodes) {
+				  disabled[key] = false;
+				}
+			}
+			
+			flowData.set({disabled: disabled});
+			flowData.trigger("change:disabled");
+		}
+		else if (action == "enableAncestors") {
+			var disabled = flowData.get("disabled");
+			var nodes = flowData.get("nodes");
+			
+			recurseAllAncestors(nodes, disabled, jobid, false);
+			
+			flowData.set({disabled: disabled});
+			flowData.trigger("change:disabled");
+		}
+		else if (action == "enableDescendents") {
+			var disabled = flowData.get("disabled");
+			var nodes = flowData.get("nodes");
+			
+			recurseAllDescendents(nodes, disabled, jobid, false);
+			
+			flowData.set({disabled: disabled});
+			flowData.trigger("change:disabled");
+		}
+	}
+});
\ No newline at end of file
diff --git a/src/web/js/azkaban.scheduled.view.js b/src/web/js/azkaban.scheduled.view.js
index 5cea0ac..b66374b 100644
--- a/src/web/js/azkaban.scheduled.view.js
+++ b/src/web/js/azkaban.scheduled.view.js
@@ -1,21 +1,344 @@
 $.namespace('azkaban');
 
+
 function removeSched(projectId, flowName) {
-    var scheduleURL = contextURL + "/schedule"
-    var redirectURL = contextURL + "/schedule"
-    $.post(
-         scheduleURL,
-         {"action":"removeSched", "projectId":projectId, "flowName":flowName},
-         function(data) {
-             if (data.error) {
+	var scheduleURL = contextURL + "/schedule"
+	var redirectURL = contextURL + "/schedule"
+	$.post(
+			scheduleURL,
+			{"action":"removeSched", "projectId":projectId, "flowName":flowName},
+			function(data) {
+				if (data.error) {
+//                 alert(data.error)
+					$('#errorMsg').text(data.error);
+				}
+				else {
+// 		 alert("Schedule "+schedId+" removed!")
+					window.location = redirectURL;
+				}
+			},
+			"json"
+	)
+}
+
+function removeSla(projectId, flowName) {
+	var scheduleURL = contextURL + "/schedule"
+	var redirectURL = contextURL + "/schedule"
+	$.post(
+			scheduleURL,
+			{"action":"removeSla", "projectId":projectId, "flowName":flowName},
+			function(data) {
+				if (data.error) {
 //                 alert(data.error)
-                 $('#errorMsg').text(data.error)
-             }
-	     else {
+					$('#errorMsg').text(data.error)
+				}
+				else {
 // 		 alert("Schedule "+schedId+" removed!")
-		 window.location = redirectURL
-             }
-         },
-         "json"
-   )
+					window.location = redirectURL
+				}
+			},
+			"json"
+	)
 }
+
+azkaban.ChangeSlaView = Backbone.View.extend({
+	events : {
+		"click" : "closeEditingTarget",
+		"click #set-sla-btn": "handleSetSla",	
+		"click #remove-sla-btn": "handleRemoveSla",
+		"click #sla-cancel-btn": "handleSlaCancel",
+		"click .modal-close": "handleSlaCancel",
+		"click #addRow": "handleAddRow"
+	},
+	initialize: function(setting) {
+
+	},
+	handleSlaCancel: function(evt) {
+		console.log("Clicked cancel button");
+		var scheduleURL = contextURL + "/schedule";
+
+		$('#slaModalBackground').hide();
+		$('#sla-options').hide();
+		
+		var tFlowRules = document.getElementById("flowRulesTbl").tBodies[0];
+		var rows = tFlowRules.rows;
+		var rowLength = rows.length
+		for(var i = 0; i < rowLength-1; i++) {
+			tFlowRules.deleteRow(0);
+		}
+		
+	},
+	initFromSched: function(projId, flowName) {
+		this.projectId = projId;
+		this.flowName = flowName;
+		
+		var scheduleURL = contextURL + "/schedule"
+		this.scheduleURL = scheduleURL;
+		var indexToName = {};
+		var nameToIndex = {};
+		var indexToText = {};
+		this.indexToName = indexToName;
+		this.nameToIndex = nameToIndex;
+		this.indexToText = indexToText;
+		var ruleBoxOptions = ["SUCCESS", "FINISH"];
+		this.ruleBoxOptions = ruleBoxOptions;
+		
+		var fetchScheduleData = {"projId": this.projectId, "ajax":"slaInfo", "flowName":this.flowName};
+		
+		$.get(
+				this.scheduleURL,
+				fetchScheduleData,
+				function(data) {
+					if (data.error) {
+						alert(data.error);
+					}
+					else {
+						if (data.slaEmails) {
+							$('#slaEmails').val(data.slaEmails.join());
+						}
+						
+						var allJobNames = data.allJobNames;
+						
+						indexToName[0] = "";
+						nameToIndex[flowName] = 0;
+						indexToText[0] = "flow " + flowName;
+						for(var i = 1; i <= allJobNames.length; i++) {
+							indexToName[i] = allJobNames[i-1];
+							nameToIndex[allJobNames[i-1]] = i;
+							indexToText[i] = "job " + allJobNames[i-1];
+						}
+						
+						
+						
+						
+						
+						// populate with existing settings
+						if(data.settings) {
+							
+							var tFlowRules = document.getElementById("flowRulesTbl").tBodies[0];
+							
+							for(var setting in data.settings) {
+								var rFlowRule = tFlowRules.insertRow(0);
+								
+								var cId = rFlowRule.insertCell(-1);
+								var idSelect = document.createElement("select");
+								for(var i in indexToName) {
+									idSelect.options[i] = new Option(indexToText[i], indexToName[i]);
+									if(data.settings[setting].id == indexToName[i]) {
+										idSelect.options[i].selected = true;
+									}
+								}								
+								cId.appendChild(idSelect);
+								
+								var cRule = rFlowRule.insertCell(-1);
+								var ruleSelect = document.createElement("select");
+								for(var i in ruleBoxOptions) {
+									ruleSelect.options[i] = new Option(ruleBoxOptions[i], ruleBoxOptions[i]);
+									if(data.settings[setting].rule == ruleBoxOptions[i]) {
+										ruleSelect.options[i].selected = true;
+									}
+								}
+								cRule.appendChild(ruleSelect);
+								
+								var cDuration = rFlowRule.insertCell(-1);
+								var duration = document.createElement("input");
+								duration.type = "text";
+								duration.setAttribute("class", "durationpick");
+								var rawMinutes = data.settings[setting].duration;
+								var intMinutes = rawMinutes.substring(0, rawMinutes.length-1);
+								var minutes = parseInt(intMinutes);
+								var hours = Math.floor(minutes / 60);
+								minutes = minutes % 60;
+								duration.value = hours + ":" + minutes;
+								cDuration.appendChild(duration);
+
+								var cEmail = rFlowRule.insertCell(-1);
+								var emailCheck = document.createElement("input");
+								emailCheck.type = "checkbox";
+								for(var act in data.settings[setting].actions) {
+									if(data.settings[setting].actions[act] == "EMAIL") {
+										emailCheck.checked = true;
+									}
+								}
+								cEmail.appendChild(emailCheck);
+								
+								var cKill = rFlowRule.insertCell(-1);
+								var killCheck = document.createElement("input");
+								killCheck.type = "checkbox";
+								for(var act in data.settings[setting].actions) {
+									if(data.settings[setting].actions[act] == "KILL") {
+										killCheck.checked = true;
+									}
+								}
+								cKill.appendChild(killCheck);
+								
+								$('.durationpick').timepicker({hourMax: 99});
+							}
+						}
+						$('.durationpick').timepicker({hourMax: 99});
+					}
+				},
+				"json"
+			);
+		
+		$('#slaModalBackground').show();
+		$('#sla-options').show();
+		
+//		this.schedFlowOptions = sched.flowOptions
+		console.log("Loaded schedule info. Ready to set SLA.");
+
+	},
+	handleRemoveSla: function(evt) {
+		console.log("Clicked remove sla button");
+		var scheduleURL = this.scheduleURL;
+		var redirectURL = this.scheduleURL;
+		$.post(
+				scheduleURL,
+				{"action":"removeSla", "projectId":this.projectId, "flowName":this.flowName},
+				function(data) {
+				if (data.error) {
+						$('#errorMsg').text(data.error)
+					}
+					else {
+						window.location = redirectURL
+					}
+				"json"
+				}
+			);
+
+	},
+	handleSetSla: function(evt) {
+
+		var slaEmails = $('#slaEmails').val();
+		var settings = {};
+		
+		
+		var tFlowRules = document.getElementById("flowRulesTbl").tBodies[0];
+		for(var row = 0; row < tFlowRules.rows.length-1; row++) {
+			var rFlowRule = tFlowRules.rows[row];
+			var id = rFlowRule.cells[0].firstChild.value;
+			var rule = rFlowRule.cells[1].firstChild.value;
+			var duration = rFlowRule.cells[2].firstChild.value;
+			var email = rFlowRule.cells[3].firstChild.value;
+			var kill = rFlowRule.cells[4].firstChild.value;
+			settings[row] = id + "," + rule + "," + duration + "," + email + "," + kill; 
+		}
+
+		var slaData = {
+			projectId: this.projectId,
+			flowName: this.flowName,
+			ajax: "setSla",			
+			slaEmails: slaEmails,
+			settings: settings
+		};
+
+		var scheduleURL = this.scheduleURL;
+		
+		$.post(
+			scheduleURL,
+			slaData,
+			function(data) {
+				if (data.error) {
+					alert(data.error);
+				}
+				else {
+					tFlowRules.length = 0;
+					window.location = scheduleURL;
+				}
+			},
+			"json"
+		);
+	},
+	handleAddRow: function(evt) {
+		
+		var indexToName = this.indexToName;
+		var nameToIndex = this.nameToIndex;
+		var indexToText = this.indexToText;
+		var ruleBoxOptions = this.ruleBoxOptions;
+
+		var tFlowRules = document.getElementById("flowRulesTbl").tBodies[0];
+		var rFlowRule = tFlowRules.insertRow(0);
+		
+		var cId = rFlowRule.insertCell(-1);
+		var idSelect = document.createElement("select");
+		for(var i in indexToName) {
+			idSelect.options[i] = new Option(indexToText[i], indexToName[i]);
+		}
+		
+		cId.appendChild(idSelect);
+		
+		var cRule = rFlowRule.insertCell(-1);
+		var ruleSelect = document.createElement("select");
+		for(var i in ruleBoxOptions) {
+			ruleSelect.options[i] = new Option(ruleBoxOptions[i], ruleBoxOptions[i]);
+		}
+		cRule.appendChild(ruleSelect);
+		
+		var cDuration = rFlowRule.insertCell(-1);
+		var duration = document.createElement("input");
+		duration.type = "text";
+		duration.setAttribute("class", "durationpick");
+		cDuration.appendChild(duration);
+
+		var cEmail = rFlowRule.insertCell(-1);
+		var emailCheck = document.createElement("input");
+		emailCheck.type = "checkbox";
+		cEmail.appendChild(emailCheck);
+		
+		var cKill = rFlowRule.insertCell(-1);
+		var killCheck = document.createElement("input");
+		killCheck.type = "checkbox";
+		cKill.appendChild(killCheck);
+		
+		$('.durationpick').timepicker({hourMax: 99});
+
+		return rFlowRule;
+	},
+	handleEditColumn : function(evt) {
+		var curTarget = evt.currentTarget;
+	
+		if (this.editingTarget != curTarget) {
+			this.closeEditingTarget();
+			
+			var text = $(curTarget).children(".spanValue").text();
+			$(curTarget).empty();
+						
+			var input = document.createElement("input");
+			$(input).attr("type", "text");
+			$(input).css("width", "100%");
+			$(input).val(text);
+			$(curTarget).addClass("editing");
+			$(curTarget).append(input);
+			$(input).focus();
+			this.editingTarget = curTarget;
+		}
+	},
+	handleRemoveColumn : function(evt) {
+		var curTarget = evt.currentTarget;
+		// Should be the table
+		var row = curTarget.parentElement.parentElement;
+		$(row).remove();
+	},
+	closeEditingTarget: function(evt) {
+
+	}
+});
+
+var slaView;
+
+$(function() {
+	var selected;
+
+
+	slaView = new azkaban.ChangeSlaView({el:$('#sla-options')});
+	
+//	var requestURL = contextURL + "/manager";
+
+	// Set up the Flow options view. Create a new one every time :p
+//	 $('#addSlaBtn').click( function() {
+//		 slaView.show();
+//	 });
+
+	 
+	
+});
\ No newline at end of file
diff --git a/unit/java/azkaban/scheduler/JdbcScheduleLoaderTest.java b/unit/java/azkaban/scheduler/JdbcScheduleLoaderTest.java
index f888f45..4a9491c 100644
--- a/unit/java/azkaban/scheduler/JdbcScheduleLoaderTest.java
+++ b/unit/java/azkaban/scheduler/JdbcScheduleLoaderTest.java
@@ -4,7 +4,9 @@ import java.sql.Connection;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import javax.sql.DataSource;
 
@@ -13,11 +15,24 @@ import junit.framework.Assert;
 import org.apache.commons.dbutils.DbUtils;
 import org.apache.commons.dbutils.QueryRunner;
 import org.apache.commons.dbutils.ResultSetHandler;
+import org.joda.time.DateTimeZone;
+import org.joda.time.DurationFieldType;
+import org.joda.time.Hours;
+import org.joda.time.MutablePeriod;
+import org.joda.time.Period;
+import org.joda.time.PeriodType;
+import org.joda.time.ReadablePeriod;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
 
+import azkaban.executor.ExecutableFlow.FailureAction;
+import azkaban.scheduler.Schedule.FlowOptions;
+import azkaban.scheduler.Schedule.SlaOptions;
+import azkaban.sla.SLA.SlaAction;
+import azkaban.sla.SLA.SlaRule;
+import azkaban.sla.SLA.SlaSetting;
 import azkaban.utils.DataSourceUtils;
 import azkaban.utils.Props;
 
@@ -25,7 +40,7 @@ public class JdbcScheduleLoaderTest {
 	private static boolean testDBExists;
 	private static final String host = "localhost";
 	private static final int port = 3306;
-	private static final String database = "azkaban";
+	private static final String database = "azkaban2";
 	private static final String user = "azkaban";
 	private static final String password = "azkaban";
 	private static final int numConnections = 10;
@@ -92,39 +107,6 @@ public class JdbcScheduleLoaderTest {
 		}
 	}
 	
-//	@Test
-//	public void testLoadSchedule() {
-//		if (!testDBExists) {
-//			return;
-//		}
-//
-//		DataSource dataSource = DataSourceUtils.getMySQLDataSource(host, port, database, user, password, numConnections);
-//		Connection connection = null;
-//		try {
-//			connection = dataSource.getConnection();
-//		} catch (SQLException e) {
-//			e.printStackTrace();
-//			testDBExists = false;
-//			DbUtils.closeQuietly(connection);
-//			return;
-//		}
-//
-////		CountHandler countHandler = new CountHandler();
-//		QueryRunner runner = new QueryRunner();
-//		try {
-//			int count = runner.update(connection, "DELETE FROM schedules");
-//			
-//		} catch (SQLException e) {
-//			e.printStackTrace();
-//			testDBExists = false;
-//			DbUtils.closeQuietly(connection);
-//			return;
-//		}
-//		finally {
-//			DbUtils.closeQuietly(connection);
-//		}
-//	}
-	
 	@Test
 	public void testInsertAndLoadSchedule() throws ScheduleManagerException {
 		if (!isTestSetup()) {
@@ -134,12 +116,34 @@ public class JdbcScheduleLoaderTest {
 		
 		JdbcScheduleLoader loader = createLoader();
 		
-		Schedule s1 = new Schedule(1, "proj1", "flow1", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "cyu");
-		Schedule s2 = new Schedule(1, "proj1", "flow2", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "ccc");
-		Schedule s3 = new Schedule(2, "proj1", "flow1", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "cyu");
-		Schedule s4 = new Schedule(3, "proj2", "flow1", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "cyu");
-		Schedule s5 = new Schedule(3, "proj2", "flow2", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "cyu");
-		Schedule s6 = new Schedule(3, "proj2", "flow3", "error", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "cyu");
+		List<String> emails = new ArrayList<String>();
+		emails.add("email1");
+		emails.add("email2");
+		List<String> disabledJobs = new ArrayList<String>();
+		disabledJobs.add("job1");
+		disabledJobs.add("job2");
+		List<SlaSetting> slaSets = new ArrayList<SlaSetting>();
+		SlaSetting set1 = new SlaSetting();
+		List<SlaAction> actions = new ArrayList<SlaAction>();
+		actions.add(SlaAction.EMAIL);
+		set1.setActions(actions);
+		set1.setId("");
+		set1.setDuration(Schedule.parsePeriodString("1h"));
+		set1.setRule(SlaRule.FINISH);
+		slaSets.add(set1);
+		FlowOptions flowOptions = new FlowOptions();
+		flowOptions.setFailureEmails(emails);
+		flowOptions.setDisabledJobs(disabledJobs);
+		SlaOptions slaOptions = new SlaOptions();
+		slaOptions.setSlaEmails(emails);
+		slaOptions.setSettings(slaSets);
+		
+		Schedule s1 = new Schedule(1, "proj1", "flow1", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "cyu", flowOptions, slaOptions);
+		Schedule s2 = new Schedule(1, "proj1", "flow2", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "ccc", flowOptions, slaOptions);
+		Schedule s3 = new Schedule(2, "proj1", "flow1", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "cyu", flowOptions, slaOptions);
+		Schedule s4 = new Schedule(3, "proj2", "flow1", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "cyu", flowOptions, slaOptions);
+		Schedule s5 = new Schedule(3, "proj2", "flow2", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "cyu", flowOptions, slaOptions);
+		Schedule s6 = new Schedule(3, "proj2", "flow3", "error", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "cyu", flowOptions, slaOptions);
 		
 		loader.insertSchedule(s1);
 		loader.insertSchedule(s2);
@@ -149,12 +153,77 @@ public class JdbcScheduleLoaderTest {
 		loader.insertSchedule(s6);
 		
 		List<Schedule> schedules = loader.loadSchedules();
+		Schedule sched = schedules.get(0);
 		
 		Assert.assertEquals(6, schedules.size());
-		Assert.assertEquals("America/Los_Angeles", schedules.get(0).getTimezone().getID());
-		Assert.assertEquals(44444, schedules.get(0).getSubmitTime());
-		Assert.assertEquals("1d", Schedule.createPeriodString(schedules.get(0).getPeriod()));
+		Assert.assertEquals("America/Los_Angeles", sched.getTimezone().getID());
+		Assert.assertEquals(44444, sched.getSubmitTime());
+		Assert.assertEquals("1d", Schedule.createPeriodString(sched.getPeriod()));
+		FlowOptions fOpt = sched.getFlowOptions();
+		SlaOptions sOpt = sched.getSlaOptions();
+		Assert.assertEquals(SlaAction.EMAIL, sOpt.getSettings().get(0).getActions().get(0));
+		Assert.assertEquals("", sOpt.getSettings().get(0).getId());
+		Assert.assertEquals(Schedule.parsePeriodString("1h"), sOpt.getSettings().get(0).getDuration());
+		Assert.assertEquals(SlaRule.FINISH, sOpt.getSettings().get(0).getRule());
+		Assert.assertEquals(2, fOpt.getFailureEmails().size());
+		Assert.assertEquals(null, fOpt.getSuccessEmails());
+		Assert.assertEquals(2, fOpt.getDisabledJobs().size());
+		Assert.assertEquals(FailureAction.FINISH_CURRENTLY_RUNNING, fOpt.getFailureAction());
+		Assert.assertEquals(null, fOpt.getFlowOverride());
+	}
+	
+	@Test
+	public void testInsertAndUpdateSchedule() throws ScheduleManagerException {
+		if (!isTestSetup()) {
+			return;
+		}
+		clearDB();
+		
+		JdbcScheduleLoader loader = createLoader();
 		
+		List<String> emails = new ArrayList<String>();
+		emails.add("email1");
+		emails.add("email2");
+		List<String> disabledJobs = new ArrayList<String>();
+		disabledJobs.add("job1");
+		disabledJobs.add("job2");
+		List<SlaSetting> slaSets = new ArrayList<SlaSetting>();
+		SlaSetting set1 = new SlaSetting();
+		List<SlaAction> actions = new ArrayList<SlaAction>();
+		actions.add(SlaAction.EMAIL);
+		set1.setActions(actions);
+		set1.setId("");
+		set1.setDuration(Schedule.parsePeriodString("1h"));
+		set1.setRule(SlaRule.FINISH);
+		slaSets.add(set1);
+		FlowOptions flowOptions = new FlowOptions();
+		flowOptions.setFailureEmails(emails);
+		flowOptions.setDisabledJobs(disabledJobs);
+		SlaOptions slaOptions = new SlaOptions();
+		slaOptions.setSlaEmails(emails);
+		slaOptions.setSettings(slaSets);
+		
+		System.out.println("the flow options are " + flowOptions);
+		System.out.println("the sla options are " + slaOptions);
+		Schedule s1 = new Schedule(1, "proj1", "flow1", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "cyu", flowOptions, slaOptions);
+
+		loader.insertSchedule(s1);
+		
+		emails.add("email3");
+		slaOptions.setSlaEmails(emails);
+		
+		Schedule s2 = new Schedule(1, "proj1", "flow1", "ready", 11112, "America/Los_Angeles", "2M", 22223, 33334, 44445, "cyu", flowOptions, slaOptions);
+
+		loader.updateSchedule(s2);
+		
+		List<Schedule> schedules = loader.loadSchedules();
+		
+		Assert.assertEquals(1, schedules.size());
+		Assert.assertEquals("America/Los_Angeles", schedules.get(0).getTimezone().getID());
+		Assert.assertEquals(44445, schedules.get(0).getSubmitTime());
+		Assert.assertEquals("2M", Schedule.createPeriodString(schedules.get(0).getPeriod()));
+//		System.out.println("the options are " + schedules.get(0).getSchedOptions());
+		Assert.assertEquals(3, schedules.get(0).getSlaOptions().getSlaEmails().size());
 	}
 	
 	@Test
@@ -169,11 +238,33 @@ public class JdbcScheduleLoaderTest {
 		
 		List<Schedule> schedules = new ArrayList<Schedule>();
 		
-		int stress = 100;
+		int stress = 10;
 		
 		for(int i=0; i<stress; i++)
 		{
-			Schedule s = new Schedule(i+1, "proj"+(i+1), "flow1", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "cyu");
+			List<String> emails = new ArrayList<String>();
+			emails.add("email1");
+			emails.add("email2");
+			List<String> disabledJobs = new ArrayList<String>();
+			disabledJobs.add("job1");
+			disabledJobs.add("job2");
+			List<SlaSetting> slaSets = new ArrayList<SlaSetting>();
+			SlaSetting set1 = new SlaSetting();
+			List<SlaAction> actions = new ArrayList<SlaAction>();
+			actions.add(SlaAction.EMAIL);
+			set1.setActions(actions);
+			set1.setId("");
+			set1.setDuration(Schedule.parsePeriodString("1h"));
+			set1.setRule(SlaRule.FINISH);
+			slaSets.add(set1);
+			FlowOptions flowOptions = new FlowOptions();
+			flowOptions.setFailureEmails(emails);
+			flowOptions.setDisabledJobs(disabledJobs);
+			SlaOptions slaOptions = new SlaOptions();
+			slaOptions.setSlaEmails(emails);
+			slaOptions.setSettings(slaSets);
+			
+			Schedule s = new Schedule(i+1, "proj"+(i+1), "flow1", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "cyu", flowOptions, slaOptions);
 			schedules.add(s);
 			try {
 				loader.insertSchedule(s);
diff --git a/unit/java/azkaban/test/sla/JdbcSLALoaderTest.java b/unit/java/azkaban/test/sla/JdbcSLALoaderTest.java
new file mode 100644
index 0000000..02b7855
--- /dev/null
+++ b/unit/java/azkaban/test/sla/JdbcSLALoaderTest.java
@@ -0,0 +1,182 @@
+package azkaban.test.sla;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.sql.DataSource;
+
+import org.apache.commons.dbutils.DbUtils;
+import org.apache.commons.dbutils.QueryRunner;
+import org.apache.commons.dbutils.ResultSetHandler;
+import org.joda.time.DateTime;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import azkaban.sla.JdbcSLALoader;
+import azkaban.sla.SLA;
+import azkaban.sla.SLA.SlaAction;
+import azkaban.sla.SLA.SlaRule;
+import azkaban.sla.SLALoader;
+import azkaban.utils.DataSourceUtils;
+import azkaban.utils.Props;
+
+
+
+public class JdbcSLALoaderTest {
+	private static boolean testDBExists;
+	//@TODO remove this and turn into local host.
+	private static final String host = "localhost";
+	private static final int port = 3306;
+	private static final String database = "azkaban2";
+	private static final String user = "azkaban";
+	private static final String password = "azkaban";
+	private static final int numConnections = 10;
+	
+	
+	@BeforeClass
+	public static void setupDB() {
+		DataSource dataSource = DataSourceUtils.getMySQLDataSource(host, port, database, user, password, numConnections);
+		testDBExists = true;
+		
+		Connection connection = null;
+		try {
+			connection = dataSource.getConnection();
+		} catch (SQLException e) {
+			e.printStackTrace();
+			testDBExists = false;
+			DbUtils.closeQuietly(connection);
+			return;
+		}
+
+		CountHandler countHandler = new CountHandler();
+		QueryRunner runner = new QueryRunner();
+		try {
+			runner.query(connection, "SELECT COUNT(1) FROM active_sla", countHandler);
+		} catch (SQLException e) {
+			e.printStackTrace();
+			testDBExists = false;
+			DbUtils.closeQuietly(connection);
+			return;
+		}
+
+		DbUtils.closeQuietly(connection);
+		
+		clearDB();
+	}
+	
+	@AfterClass
+	public static void clearDB() {
+		if (!testDBExists) {
+			return;
+		}
+
+		DataSource dataSource = DataSourceUtils.getMySQLDataSource(host, port, database, user, password, numConnections);
+		Connection connection = null;
+		try {
+			connection = dataSource.getConnection();
+		} catch (SQLException e) {
+			e.printStackTrace();
+			testDBExists = false;
+			DbUtils.closeQuietly(connection);
+			return;
+		}
+
+		QueryRunner runner = new QueryRunner();
+		try {
+			runner.update(connection, "DELETE FROM active_sla");
+			
+		} catch (SQLException e) {
+			e.printStackTrace();
+			testDBExists = false;
+			DbUtils.closeQuietly(connection);
+			return;
+		}
+		
+		DbUtils.closeQuietly(connection);
+	}
+	
+	@Test
+	public void testInsertSLA() throws Exception {
+		if (!isTestSetup()) {
+			return;
+		}
+		
+		SLALoader loader = createLoader();
+		
+		int execId = 1;
+		String jobName = "";
+		DateTime checkTime = new DateTime(11111);
+		List<String> emails = new ArrayList<String>();
+		emails.add("email1");
+		emails.add("email2");
+//		List<SlaRule> rules = new ArrayList<SlaRule>();
+//		rules.add(SlaRule.SUCCESS);
+//		rules.add(SlaRule.FINISH);
+		List<SlaAction> actions = new ArrayList<SLA.SlaAction>();
+		actions.add(SlaAction.EMAIL);
+		SLA s = new SLA(execId, jobName, checkTime, emails, actions, null, SlaRule.FINISH);
+
+		loader.insertSLA(s);
+		
+		List<SLA> allSLAs = loader.loadSLAs();
+		SLA fetchSLA = allSLAs.get(0);
+
+		Assert.assertTrue(allSLAs.size() == 1);
+		// Shouldn't be the same object.
+		Assert.assertTrue(s != fetchSLA);
+		Assert.assertEquals(s.getExecId(), fetchSLA.getExecId());
+		Assert.assertEquals(s.getJobName(), fetchSLA.getJobName());
+		Assert.assertEquals(s.getCheckTime(), fetchSLA.getCheckTime());
+		Assert.assertEquals(s.getEmails(), fetchSLA.getEmails());
+		Assert.assertEquals(s.getRule(), fetchSLA.getRule());
+		Assert.assertEquals(s.getActions().get(0), fetchSLA.getActions().get(0));
+		
+		
+		loader.removeSLA(s);
+		
+		allSLAs = loader.loadSLAs();
+		
+		Assert.assertTrue(allSLAs.size() == 0);
+	}
+	
+	private SLALoader createLoader() {
+		Props props = new Props();
+		props.put("database.type", "mysql");
+		
+		props.put("mysql.host", host);		
+		props.put("mysql.port", port);
+		props.put("mysql.user", user);
+		props.put("mysql.database", database);
+		props.put("mysql.password", password);
+		props.put("mysql.numconnections", numConnections);
+		
+		return new JdbcSLALoader(props);
+	}
+	
+	private boolean isTestSetup() {
+		if (!testDBExists) {
+			System.err.println("Skipping DB test because Db not setup.");
+			return false;
+		}
+		
+		System.out.println("Running DB test because Db setup.");
+		return true;
+	}
+	
+	public static class CountHandler implements ResultSetHandler<Integer> {
+		@Override
+		public Integer handle(ResultSet rs) throws SQLException {
+			int val = 0;
+			while (rs.next()) {
+				val++;
+			}
+			
+			return val;
+		}
+	}
+}
\ No newline at end of file
diff --git a/unit/java/azkaban/test/utils/EmailMessageTest.java b/unit/java/azkaban/test/utils/EmailMessageTest.java
new file mode 100644
index 0000000..43bf216
--- /dev/null
+++ b/unit/java/azkaban/test/utils/EmailMessageTest.java
@@ -0,0 +1,46 @@
+package azkaban.test.utils;
+
+import java.io.IOException;
+
+import javax.mail.MessagingException;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import azkaban.utils.EmailMessage;
+
+public class EmailMessageTest {
+
+	String host = "";
+	String sender = "";
+	String user = "";
+	String password = "";
+	
+	String toAddr = "";
+	
+	private EmailMessage em;
+	@Before
+	public void setUp() throws Exception {
+		em = new EmailMessage(host, user, password);
+		em.setFromAddress(sender);
+	}
+
+	@After
+	public void tearDown() throws Exception {
+	}
+
+	@Test
+	public void testSendEmail() throws IOException {
+		em.addToAddress(toAddr);
+		//em.addToAddress("cyu@linkedin.com");
+		em.setSubject("azkaban test email");
+		em.setBody("azkaban test email");
+		try {
+			em.sendEmail();
+		} catch (MessagingException e) {
+			// TODO Auto-generated catch block
+			e.printStackTrace();
+		}
+	}
+	
+}