azkaban-uncached

Details

diff --git a/src/java/azkaban/executor/ExecutorManager.java b/src/java/azkaban/executor/ExecutorManager.java
index fe807d3..dcef504 100644
--- a/src/java/azkaban/executor/ExecutorManager.java
+++ b/src/java/azkaban/executor/ExecutorManager.java
@@ -569,7 +569,7 @@ public class ExecutorManager {
 		return flow;
 	}
 	
-	private boolean isFinished(ExecutableFlow flow) {
+	public boolean isFinished(ExecutableFlow flow) {
 		switch(flow.getStatus()) {
 		case SUCCEEDED:
 		case FAILED:
diff --git a/src/java/azkaban/scheduler/JdbcScheduleLoader.java b/src/java/azkaban/scheduler/JdbcScheduleLoader.java
index 976543a..ee9c812 100644
--- a/src/java/azkaban/scheduler/JdbcScheduleLoader.java
+++ b/src/java/azkaban/scheduler/JdbcScheduleLoader.java
@@ -38,6 +38,8 @@ import org.joda.time.ReadablePeriod;
 import org.joda.time.format.DateTimeFormat;
 import org.joda.time.format.DateTimeFormatter;
 
+import azkaban.scheduler.Schedule.FlowOptions;
+import azkaban.scheduler.Schedule.SlaOptions;
 import azkaban.sla.SLA;
 import azkaban.sla.SLAManagerException;
 import azkaban.sla.JdbcSLALoader.EncodingType;
@@ -200,7 +202,7 @@ public class JdbcScheduleLoader implements ScheduleLoader {
 
 	public void insertSchedule(Schedule s, EncodingType encType) throws ScheduleManagerException {
 		
-		String json = JSONUtils.toJSON(s.optionToObject());
+		String json = JSONUtils.toJSON(s.optionsToObject());
 		byte[] data = null;
 		try {
 			byte[] stringData = json.getBytes("UTF-8");
@@ -212,7 +214,7 @@ public class JdbcScheduleLoader implements ScheduleLoader {
 			logger.debug("NumChars: " + json.length() + " UTF-8:" + stringData.length + " Gzip:"+ data.length);
 		}
 		catch (IOException e) {
-			throw new ScheduleManagerException("Error encoding the schedule options" + s.getSchedOptions());
+			throw new ScheduleManagerException("Error encoding the schedule options. " + s.getScheduleName());
 		}
 		
 		QueryRunner runner = new QueryRunner(dataSource);
@@ -262,7 +264,7 @@ public class JdbcScheduleLoader implements ScheduleLoader {
 		
 	public void updateSchedule(Schedule s, EncodingType encType) throws ScheduleManagerException {
 
-		String json = JSONUtils.toJSON(s.optionToObject());
+		String json = JSONUtils.toJSON(s.optionsToObject());
 		byte[] data = null;
 		try {
 			byte[] stringData = json.getBytes("UTF-8");
@@ -274,7 +276,7 @@ public class JdbcScheduleLoader implements ScheduleLoader {
 			logger.debug("NumChars: " + json.length() + " UTF-8:" + stringData.length + " Gzip:"+ data.length);
 		}
 		catch (IOException e) {
-			throw new ScheduleManagerException("Error encoding the schedule options" + s.getSchedOptions());
+			throw new ScheduleManagerException("Error encoding the schedule options " + s.getScheduleName());
 		}
 
 		QueryRunner runner = new QueryRunner(dataSource);
@@ -326,7 +328,8 @@ public class JdbcScheduleLoader implements ScheduleLoader {
 				int encodingType = rs.getInt(12);
 				byte[] data = rs.getBytes(13);
 				
-				Map<String, Object> options = null;
+				FlowOptions flowOptions = null;
+				SlaOptions slaOptions = null;
 				if (data != null) {
 					EncodingType encType = EncodingType.fromInteger(encodingType);
 					Object optsObj;
@@ -341,13 +344,14 @@ public class JdbcScheduleLoader implements ScheduleLoader {
 							String jsonString = new String(data, "UTF-8");
 							optsObj = JSONUtils.parseJSONFromString(jsonString);
 						}						
-						options = Schedule.createScheduleOptionFromObject(optsObj);
+						flowOptions = Schedule.createFlowOptionFromObject(optsObj);
+						slaOptions = Schedule.createSlaOptionFromObject(optsObj);
 					} catch (IOException e) {
 						throw new SQLException("Error reconstructing schedule options " + projectName + "." + flowName);
 					}
 				}
 				
-				Schedule s = new Schedule(projectId, projectName, flowName, status, firstSchedTime, timezone, period, lastModifyTime, nextExecTime, submitTime, submitUser, options);
+				Schedule s = new Schedule(projectId, projectName, flowName, status, firstSchedTime, timezone, period, lastModifyTime, nextExecTime, submitTime, submitUser, flowOptions, slaOptions);
 				
 				schedules.add(s);
 			} while (rs.next());
diff --git a/src/java/azkaban/scheduler/Schedule.java b/src/java/azkaban/scheduler/Schedule.java
index 98b6973..1c80c58 100644
--- a/src/java/azkaban/scheduler/Schedule.java
+++ b/src/java/azkaban/scheduler/Schedule.java
@@ -16,8 +16,6 @@
 
 package azkaban.scheduler;
 
-
-
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -34,11 +32,153 @@ import org.joda.time.ReadablePeriod;
 import org.joda.time.Seconds;
 import org.joda.time.Weeks;
 
+import azkaban.executor.ExecutableFlow.FailureAction;
+import azkaban.scheduler.Schedule.FlowOptions;
+import azkaban.scheduler.Schedule.SlaOptions;
+import azkaban.sla.SLA.SlaSetting;
 import azkaban.utils.Pair;
 
 public class Schedule{
 	
+	
+	public static class FlowOptions {
+		
+		public List<String> getFailureEmails() {
+			return failureEmails;
+		}
+		public void setFailureEmails(List<String> failureEmails) {
+			this.failureEmails = failureEmails;
+		}
+		public List<String> getSuccessEmails() {
+			return successEmails;
+		}
+		public void setSuccessEmails(List<String> successEmails) {
+			this.successEmails = successEmails;
+		}
+		public FailureAction getFailureAction() {
+			return failureAction;
+		}
+		public void setFailureAction(FailureAction failureAction) {
+			this.failureAction = failureAction;
+		}
+		public boolean isnotifyOnFirstFailure() {
+			return notifyOnFirstFailure;
+		}
+		public void setNotifyOnFirstFailure(boolean notifyOnFirstFailure) {
+			this.notifyOnFirstFailure = notifyOnFirstFailure;
+		}
+		public boolean isnotifyOnLastFailure() {
+			return notifyOnLastFailure;
+		}
+		public void setNotifyOnLastFailure(boolean notifyOnLastFailure) {
+			this.notifyOnLastFailure = notifyOnLastFailure;
+		}
+		public Map<String, String> getFlowOverride() {
+			return flowOverride;
+		}
+		public void setFlowOverride(Map<String, String> flowOverride) {
+			this.flowOverride = flowOverride;
+		}
+		public List<String> getDisabledJobs() {
+			return disabledJobs;
+		}
+		public void setDisabledJobs(List<String> disabledJobs) {
+			this.disabledJobs = disabledJobs;
+		}
+		private List<String> failureEmails;
+		private List<String> successEmails;
+		private FailureAction failureAction = FailureAction.FINISH_CURRENTLY_RUNNING;
+		private boolean notifyOnFirstFailure;
+		private boolean notifyOnLastFailure;
+		Map<String, String> flowOverride;
+		private List<String> disabledJobs;
+		public Object toObject() {
+			Map<String, Object> obj = new HashMap<String, Object>();
+			obj.put("failureEmails", failureEmails);
+			obj.put("successEmails", successEmails);
+			obj.put("failureAction", failureAction.toString());
+			obj.put("notifyOnFirstFailure", notifyOnFirstFailure);
+			obj.put("notifyOnLastFailure", notifyOnLastFailure);
+			obj.put("flowOverride", flowOverride);
+			obj.put("disabledJobs", disabledJobs);
+			return obj;
+		}
+		@SuppressWarnings("unchecked")
+		public static FlowOptions fromObject(Object object) {
+			if(object != null) {
+				FlowOptions flowOptions = new FlowOptions();
+				Map<String, Object> obj = (HashMap<String, Object>) object;
+				if(obj.containsKey("failureEmails")) {
+					flowOptions.setFailureEmails((List<String>) obj.get("failureEmails"));
+				}
+				if(obj.containsKey("successEmails")) {
+					flowOptions.setSuccessEmails((List<String>) obj.get("SuccessEmails"));
+				}
+				if(obj.containsKey("failureAction")) {
+					flowOptions.setFailureAction(FailureAction.valueOf((String)obj.get("failureAction")));
+				}
+				if(obj.containsKey("notifyOnFirstFailure")) {
+					flowOptions.setNotifyOnFirstFailure((Boolean)obj.get("notifyOnFirstFailure"));
+				}
+				if(obj.containsKey("notifyOnLastFailure")) {
+					flowOptions.setNotifyOnFirstFailure((Boolean)obj.get("notifyOnLastFailure"));
+				}
+				if(obj.containsKey("flowOverride")) {
+					flowOptions.setFlowOverride((Map<String, String>) obj.get("flowOverride"));
+				}
+				if(obj.containsKey("disabledJobs")) {
+					flowOptions.setDisabledJobs((List<String>) obj.get("disabledJobs"));
+				}
+				return flowOptions;
+			}
+			return null;
+		}
+	}
+
+	public static class SlaOptions {
 
+		public List<String> getSlaEmails() {
+			return slaEmails;
+		}
+		public void setSlaEmails(List<String> slaEmails) {
+			this.slaEmails = slaEmails;
+		}
+		public List<SlaSetting> getSettings() {
+			return settings;
+		}
+		public void setSettings(List<SlaSetting> settings) {
+			this.settings = settings;
+		}
+		private List<String> slaEmails;
+		private List<SlaSetting> settings;
+		public Object toObject() {
+			Map<String, Object> obj = new HashMap<String, Object>();
+			obj.put("slaEmails", slaEmails);
+			List<Object> slaSettings = new ArrayList<Object>();
+			for(SlaSetting s : settings) {
+				slaSettings.add(s.toObject());
+			}
+			obj.put("settings", slaSettings);
+			return obj;
+		}
+		@SuppressWarnings("unchecked")
+		public static SlaOptions fromObject(Object object) {
+			if(object != null) {
+				SlaOptions slaOptions = new SlaOptions();
+				Map<String, Object> obj = (HashMap<String, Object>) object;
+				slaOptions.setSlaEmails((List<String>) obj.get("slaEmails"));
+				List<SlaSetting> slaSets = new ArrayList<SlaSetting>();
+				for(Object set: (List<Object>)obj.get("settings")) {
+					slaSets.add(SlaSetting.fromObject(set));
+				}
+				slaOptions.setSettings(slaSets);
+				return slaOptions;
+			}
+			return null;			
+		}
+		
+	}
+	
 //	private long projectGuid;
 //	private long flowGuid;
 	
@@ -55,7 +195,9 @@ public class Schedule{
 	private String submitUser;
 	private String status;
 	private long submitTime;
-	private Map<String, Object> schedOptions;
+	
+	private FlowOptions flowOptions;
+	private SlaOptions slaOptions;
 	
 	public Schedule(
 						int projectId,
@@ -68,8 +210,7 @@ public class Schedule{
 						long lastModifyTime,						
 						long nextExecTime,						
 						long submitTime,
-						String submitUser,
-						Map<String, Object> schedOptions
+						String submitUser
 						) {
 		this.projectId = projectId;
 		this.projectName = projectName;
@@ -82,7 +223,8 @@ public class Schedule{
 		this.submitUser = submitUser;
 		this.status = status;
 		this.submitTime = submitTime;
-		this.schedOptions = schedOptions;
+		this.flowOptions = null;
+		this.slaOptions = null;
 	}
 
 	public Schedule(
@@ -97,7 +239,8 @@ public class Schedule{
 						long nextExecTime,						
 						long submitTime,
 						String submitUser,
-						Map<String, Object> schedOptions
+						FlowOptions flowOptions,
+						SlaOptions slaOptions
 			) {
 		this.projectId = projectId;
 		this.projectName = projectName;
@@ -110,15 +253,54 @@ public class Schedule{
 		this.submitUser = submitUser;
 		this.status = status;
 		this.submitTime = submitTime;
-		this.schedOptions = schedOptions;
+		this.flowOptions = flowOptions;
+		this.slaOptions = slaOptions;
+	}
+
+	public Schedule(
+						int projectId,
+						String projectName,
+						String flowName,
+						String status,
+						long firstSchedTime,
+						DateTimeZone timezone,
+						ReadablePeriod period,
+						long lastModifyTime,						
+						long nextExecTime,						
+						long submitTime,
+						String submitUser,
+						FlowOptions flowOptions,
+						SlaOptions slaOptions
+						) {
+		this.projectId = projectId;
+		this.projectName = projectName;
+		this.flowName = flowName;
+		this.firstSchedTime = firstSchedTime;
+		this.timezone = timezone;
+		this.lastModifyTime = lastModifyTime;
+		this.period = period;
+		this.nextExecTime = nextExecTime;
+		this.submitUser = submitUser;
+		this.status = status;
+		this.submitTime = submitTime;
+		this.flowOptions = flowOptions;
+		this.slaOptions = slaOptions;
 	}
 
-	public Map<String, Object> getSchedOptions() {
-		return schedOptions;
+	public FlowOptions getFlowOptions() {
+		return flowOptions;
 	}
 
-	public void setSchedOptions(Map<String, Object> schedOptions) {
-		this.schedOptions = schedOptions;
+	public void setFlowOptions(FlowOptions flowOptions) {
+		this.flowOptions = flowOptions;
+	}
+
+	public SlaOptions getSlaOptions() {
+		return slaOptions;
+	}
+
+	public void setSlaOptions(SlaOptions slaOptions) {
+		this.slaOptions = slaOptions;
 	}
 
 	public String getScheduleName() {
@@ -277,34 +459,42 @@ public class Schedule{
 	}
 	
 
-	public Map<String,Object> optionToObject() {
-		//HashMap<String, Object> optionObject = new HashMap<String, Object>();
-		
-
-		return schedOptions;
+	public Map<String,Object> optionsToObject() {
+		if(flowOptions != null || slaOptions != null) {
+			HashMap<String, Object> schedObj = new HashMap<String, Object>();
+			
+			if(flowOptions != null) {
+				schedObj.put("flowOptions", flowOptions.toObject());
+			}
+			if(slaOptions != null) {
+				schedObj.put("slaOptions", slaOptions.toObject());
+			}
+	
+			return schedObj;
+		}
+		return null;
 	}
 	
 	@SuppressWarnings("unchecked")
-	public static Map<String, Object> createScheduleOptionFromObject(Object obj) {
-		Map<String, Object> options = new HashMap<String, Object>();
+	public static FlowOptions createFlowOptionFromObject(Object obj) {
 		if(obj != null) {
-			HashMap<String, Object> optionObject = (HashMap<String,Object>)obj;
-			options.putAll(optionObject);
-			return options;
-		}
-		else {
-			return new HashMap<String, Object>();
-		}
+			Map<String, Object> options = (HashMap<String, Object>) obj;
+			if(options.containsKey("flowOptions")) {
+				return FlowOptions.fromObject(options.get("flowOptions"));
+			}
+		}		
+		return null;
 	}
-
+	
 	@SuppressWarnings("unchecked")
-	public List<String> getDisabledJobs() {
-		if (schedOptions.containsKey("disabled")) {
-			return (List<String>) schedOptions.get("disabled");
-		}
-		else {
-			return new ArrayList<String>();
-		}
+	public static SlaOptions createSlaOptionFromObject(Object obj) {
+		if(obj != null) {
+			Map<String, Object> options = (HashMap<String, Object>) obj;
+			if(options.containsKey("slaOptions")) {
+				return SlaOptions.fromObject(options.get("slaOptions"));
+			}
+		}		
+		return null;
 	}
-	
+
 }
\ No newline at end of file
diff --git a/src/java/azkaban/scheduler/ScheduleManager.java b/src/java/azkaban/scheduler/ScheduleManager.java
index 9c3c459..ad67def 100644
--- a/src/java/azkaban/scheduler/ScheduleManager.java
+++ b/src/java/azkaban/scheduler/ScheduleManager.java
@@ -44,6 +44,12 @@ import azkaban.project.Project;
 import azkaban.project.ProjectManager;
 
 
+import azkaban.scheduler.Schedule.FlowOptions;
+import azkaban.scheduler.Schedule.SlaOptions;
+import azkaban.sla.SLA.SlaAction;
+import azkaban.sla.SLA.SlaRule;
+import azkaban.sla.SLAManager;
+import azkaban.sla.SLA.SlaSetting;
 import azkaban.utils.Pair;
 
 
@@ -63,6 +69,7 @@ public class ScheduleManager {
 	private final ScheduleRunner runner;
 	private final ExecutorManager executorManager;
 	private final ProjectManager projectManager;
+	private final SLAManager slaManager;
 
 	/**
 	 * Give the schedule manager a loader class that will properly load the
@@ -72,10 +79,12 @@ public class ScheduleManager {
 	 */
 	public ScheduleManager(ExecutorManager executorManager,
 							ProjectManager projectManager, 
+							SLAManager slaManager,
 							ScheduleLoader loader) 
 	{
 		this.executorManager = executorManager;
 		this.projectManager = projectManager;
+		this.slaManager = slaManager;
 		this.loader = loader;
 		this.runner = new ScheduleRunner();
 
@@ -173,9 +182,10 @@ public class ScheduleManager {
 			final long nextExecTime,
 			final long submitTime,
 			final String submitUser,
-			final Map<String, Object> options
+			final FlowOptions flowOptions,
+			final SlaOptions slaOptions
 			) {
-		Schedule sched = new Schedule(projectId, projectName, flowName, status, firstSchedTime, timezone, period, lastModifyTime, nextExecTime, submitTime, submitUser, options);
+		Schedule sched = new Schedule(projectId, projectName, flowName, status, firstSchedTime, timezone, period, lastModifyTime, nextExecTime, submitTime, submitUser, flowOptions, slaOptions);
 		logger.info("Scheduling flow '" + sched.getScheduleName() + "' for "
 				+ _dateFormat.print(firstSchedTime) + " with a period of "
 				+ period == null ? "(non-recurring)" : period);
@@ -367,57 +377,32 @@ public class ScheduleManager {
 										ExecutableFlow exflow = new ExecutableFlow(flow);
 										exflow.setSubmitUser(runningSched.getSubmitUser());
 										
-										Map<String, Object> scheduleOptions = runningSched.getSchedOptions();
+										FlowOptions flowOptions = runningSched.getFlowOptions();
 										
-										if(scheduleOptions != null && scheduleOptions.containsKey("flowOptions")) {
-											Map<String, Object> flowOptions = (Map<String, Object>) scheduleOptions.get("flowOptions");
-										
-											if (flowOptions.containsKey("failureAction")) {
-												String option = (String) flowOptions.get("failureAction");
-												if (option.equals("finishCurrent") ) {
-													exflow.setFailureAction(FailureAction.FINISH_CURRENTLY_RUNNING);
-												}
-												else if (option.equals("cancelImmediately")) {
-													exflow.setFailureAction(FailureAction.CANCEL_ALL);
-												}
-												else if (option.equals("finishPossible")) {
-													exflow.setFailureAction(FailureAction.FINISH_ALL_POSSIBLE);
-												}
-											}
-	
-											if (flowOptions.containsKey("failureEmails")) {
-												String emails = (String) flowOptions.get("failureEmails");
-												String[] emailSplit = emails.split("\\s*,\\s*|\\s*;\\s*|\\s+");
-												exflow.setFailureEmails(Arrays.asList(emailSplit));
-											}
-											if (flowOptions.containsKey("successEmails")) {
-												String emails = (String) flowOptions.get("successEmails");
-												String[] emailSplit = emails.split("\\s*,\\s*|\\s*;\\s*|\\s+");
-												exflow.setSuccessEmails(Arrays.asList(emailSplit));
-											}
-											if (flowOptions.containsKey("notifyFailureFirst")) {
-												exflow.setNotifyOnFirstFailure(Boolean.parseBoolean((String)flowOptions.get("notifyFailureFirst")));
+										if(flowOptions != null) {
+											if (flowOptions.getFailureAction() != null) {
+												exflow.setFailureAction(flowOptions.getFailureAction());
 											}
-											if (flowOptions.containsKey("notifyFailureLast")) {
-												exflow.setNotifyOnLastFailure(Boolean.parseBoolean((String)flowOptions.get("notifyFailureLast")));
+											if (flowOptions.getFailureEmails() != null) {
+												exflow.setFailureEmails(flowOptions.getFailureEmails());
 											}
-											if (flowOptions.containsKey("executingJobOption")) {
-												String option = (String)flowOptions.get("jobOption");
-												// Not set yet
+											if (flowOptions.getSuccessEmails() != null) {
+												exflow.setSuccessEmails(flowOptions.getSuccessEmails());
 											}
+											exflow.setNotifyOnFirstFailure(flowOptions.isnotifyOnFirstFailure());
+											exflow.setNotifyOnLastFailure(flowOptions.isnotifyOnLastFailure());
 											
-											Map<String, String> flowParamGroup = this.getParamGroup(req, "flowOverride");
-											exflow.addFlowParameters(flowParamGroup);
+											exflow.addFlowParameters(flowOptions.getFlowOverride());
 											
+											List<String> disabled = flowOptions.getDisabledJobs();
 											// Setup disabled
-											Map<String, String> paramGroup = this.getParamGroup(req, "disable");
-											for (Map.Entry<String, String> entry: paramGroup.entrySet()) {
-												boolean nodeDisabled = Boolean.parseBoolean(entry.getValue());
-												exflow.setStatus(entry.getKey(), nodeDisabled ? Status.DISABLED : Status.READY);
+											if(disabled != null) {
+												for (String job : disabled) {
+													exflow.setNodeStatus(job, Status.DISABLED);
+												}
 											}
-											
 										}
-	
+										
 										try {
 											executorManager.submitExecutableFlow(exflow);
 											logger.info("Scheduler has invoked " + exflow.getExecutionId());
@@ -426,9 +411,30 @@ public class ScheduleManager {
 											logger.error(e.getMessage());
 											return;
 										}
-									} catch (JobExecutionException e) {
-										logger.info("Could not run flow. " + e.getMessage());
+										
+										SlaOptions slaOptions = runningSched.getSlaOptions();
+										
+										if(slaOptions != null) {
+											// submit flow slas
+											List<SlaSetting> jobsettings = new ArrayList<SlaSetting>();
+											for(SlaSetting set : slaOptions.getSettings()) {
+												if(set.getId().equals("")) {
+													DateTime checkTime = new DateTime(runningSched.getNextExecTime()).plus(set.getDuration());
+													slaManager.submitSla(exflow.getExecutionId(), "", checkTime, slaOptions.getSlaEmails(), set.getActions(), null, set.getRule());
+												}
+												else {
+													jobsettings.add(set);
+												}
+											}
+											if(jobsettings.size() > 0) {
+												slaManager.submitSla(exflow.getExecutionId(), "", DateTime.now(), slaOptions.getSlaEmails(), new ArrayList<SlaAction>(), jobsettings, SlaRule.WAITANDCHECKJOB);
+											}
+										}
+										
+									} catch (Exception e) {
+										logger.info("Scheduler failed to run job. " + e.getMessage() + e.getCause());
 									}
+									
 								}
 								
 								removeRunnerSchedule(runningSched);
diff --git a/src/java/azkaban/sla/JdbcSLALoader.java b/src/java/azkaban/sla/JdbcSLALoader.java
new file mode 100644
index 0000000..cf6cd2f
--- /dev/null
+++ b/src/java/azkaban/sla/JdbcSLALoader.java
@@ -0,0 +1,330 @@
+package azkaban.sla;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import javax.sql.DataSource;
+
+import org.apache.commons.dbutils.DbUtils;
+import org.apache.commons.dbutils.QueryRunner;
+import org.apache.commons.dbutils.ResultSetHandler;
+import org.apache.log4j.Logger;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.joda.time.ReadablePeriod;
+
+import azkaban.scheduler.ScheduleManagerException;
+import azkaban.sla.SLA.SlaRule;
+import azkaban.utils.DataSourceUtils;
+import azkaban.utils.GZIPUtils;
+import azkaban.utils.JSONUtils;
+import azkaban.utils.Props;
+
+public class JdbcSLALoader implements SLALoader {
+
+	private static final Logger logger = Logger.getLogger(JdbcSLALoader.class);
+	
+	/**
+	 * Used for when we store text data. Plain uses UTF8 encoding.
+	 */
+	public static enum EncodingType {
+		PLAIN(1), GZIP(2);
+
+		private int numVal;
+
+		EncodingType(int numVal) {
+			this.numVal = numVal;
+		}
+
+		public int getNumVal() {
+			return numVal;
+		}
+
+		public static EncodingType fromInteger(int x) {
+			switch (x) {
+			case 1:
+				return PLAIN;
+			case 2:
+				return GZIP;
+			default:
+				return PLAIN;
+			}
+		}
+	}
+	
+	private DataSource dataSource;
+	private EncodingType defaultEncodingType = EncodingType.GZIP;
+	
+	private static String slaTblName = "active_sla";
+	
+	final private static String LOAD_ALL_SLA =
+			"SELECT exec_id, job_name, check_time, rule, enc_type, options FROM " + slaTblName;
+	
+	final private static String INSERT_SLA = 
+			"INSERT INTO " + slaTblName + " ( exec_id, job_name, check_time, rule, enc_type, options) values (?,?,?,?,?,?)";
+
+//	final private static String UPDATE_SLA = 
+//			"UPDATE " + slaTblName + " SET exec_id, job_name, check_time, rule, enc_type, options) values (?,?,?,?,?,?)";
+//	
+	// use "" for flow
+	final private static String REMOVE_SLA = 
+			"DELETE FROM " + slaTblName + " WHERE exec_id=? AND job_name=? AND check_time=? AND rule=?";
+	
+	public JdbcSLALoader(Props props) {
+		String databaseType = props.getString("database.type");
+
+		if (databaseType.equals("mysql")) {
+			int port = props.getInt("mysql.port");
+			String host = props.getString("mysql.host");
+			String database = props.getString("mysql.database");
+			String user = props.getString("mysql.user");
+			String password = props.getString("mysql.password");
+			int numConnections = props.getInt("mysql.numconnections");
+			
+			dataSource = DataSourceUtils.getMySQLDataSource(host, port, database, user, password, numConnections);
+		}
+	}
+
+	private Connection getConnection() throws SLAManagerException {
+		Connection connection = null;
+		try {
+			connection = dataSource.getConnection();
+		} catch (Exception e) {
+			DbUtils.closeQuietly(connection);
+			throw new SLAManagerException("Error getting DB connection.", e);
+		}
+		return connection;
+	}
+	
+	public EncodingType getDefaultEncodingType() {
+		return defaultEncodingType;
+	}
+
+	public void setDefaultEncodingType(EncodingType defaultEncodingType) {
+		this.defaultEncodingType = defaultEncodingType;
+	}
+
+	@Override
+	public List<SLA> loadSLAs() throws SLAManagerException {
+		logger.info("Loading all SLAs from db.");
+		
+		Connection connection = getConnection();
+		List<SLA> SLAs;
+		try{
+			SLAs= loadSLAs(connection, defaultEncodingType);
+		}
+		catch(SLAManagerException e) {
+			throw new SLAManagerException("Failed to load SLAs" + e.getCause());
+		}
+		finally{
+			DbUtils.closeQuietly(connection);
+		}
+				
+		logger.info("Loaded " + SLAs.size() + " SLAs.");
+		
+		return SLAs;
+
+	}
+
+	public List<SLA> loadSLAs(Connection connection, EncodingType encType) throws SLAManagerException {
+		logger.info("Loading all SLAs from db.");
+		
+		QueryRunner runner = new QueryRunner();
+		ResultSetHandler<List<SLA>> handler = new SLAResultHandler();
+		List<SLA> SLAs;
+		
+		try {
+			SLAs = runner.query(connection, LOAD_ALL_SLA, handler);
+		} catch (SQLException e) {
+			logger.error(LOAD_ALL_SLA + " failed.");
+			throw new SLAManagerException("Load SLAs from db failed. "+ e.getCause());
+		}
+		
+		return SLAs;
+
+	}
+	
+	@Override
+	public void removeSLA(SLA s) throws SLAManagerException {
+		Connection connection = getConnection();
+		try{
+			removeSLA(connection, s);
+		}
+		catch(SLAManagerException e) {
+			logger.error(LOAD_ALL_SLA + " failed.");
+			throw new SLAManagerException("Load SLAs from db failed. "+ e.getCause());
+		}
+		finally {
+			DbUtils.closeQuietly(connection);
+		}
+		
+	}
+	
+	private void removeSLA(Connection connection, SLA s) throws SLAManagerException {
+
+		logger.info("Removing SLA " + s.toString() + " from db.");
+
+		QueryRunner runner = new QueryRunner(dataSource);
+	
+		try {
+			int removes =  runner.update(REMOVE_SLA, s.getExecId(), s.getJobName(), s.getCheckTime().getMillis(), s.getRule().getNumVal());
+			if (removes == 0) {
+				throw new SLAManagerException("No schedule has been removed.");
+			}
+		} catch (SQLException e) {
+			logger.error("Remove SLA failed. " + s.toString());
+			throw new SLAManagerException("Remove SLA " + s.toString() + " from db failed. "+ e.getCause());
+		}
+		
+	}
+
+	@Override
+	public	void insertSLA(SLA s) throws SLAManagerException {
+		Connection connection = getConnection();
+		try{
+			insertSLA(connection, s, defaultEncodingType);
+		}
+		catch(SLAManagerException e){
+			logger.error("Insert SLA failed. " + s.toString());
+			throw new SLAManagerException("Insert SLA " + s.toString() + " into db failed. "+ e.getCause() + e.getMessage() + e.getStackTrace());
+		}
+		finally {
+			DbUtils.closeQuietly(connection);
+		}
+	}
+	
+	private void insertSLA(Connection connection, SLA s, EncodingType encType) throws SLAManagerException {
+
+		logger.debug("Inserting new SLA into DB. " + s.toString());
+		
+		QueryRunner runner = new QueryRunner();
+
+		String json = JSONUtils.toJSON(s.optionToObject());
+		byte[] data = null;
+		try {
+			byte[] stringData = json.getBytes("UTF-8");
+			data = stringData;
+	
+			if (encType == EncodingType.GZIP) {
+				data = GZIPUtils.gzipBytes(stringData);
+			}
+			logger.debug("NumChars: " + json.length() + " UTF-8:" + stringData.length + " Gzip:"+ data.length);
+		}
+		catch (IOException e) {
+			throw new SLAManagerException("Error encoding the sla options.");
+		}
+		
+		try {
+			int inserts = runner.update(connection, INSERT_SLA, s.getExecId(), s.getJobName(), s.getCheckTime().getMillis(), s.getRule().getNumVal(), encType.getNumVal(), data);
+
+			if (inserts == 0) {
+				throw new SLAManagerException("No SLA has been inserted. Insertion failed.");
+			}
+		} catch (SQLException e) {
+			logger.error("Insert SLA failed.");
+			throw new SLAManagerException("Insert sla " + s.toString() + " into db failed. " + e.getCause());
+		}
+		
+	}
+	
+//	@Override
+//	public	void updateSLA(SLA s) throws SLAManagerException {
+//		Connection connection = getConnection();
+//		try{
+//			insertSLA(connection, s, defaultEncodingType);
+//		}
+//		catch(SLAManagerException e){
+//			logger.error("Insert SLA failed. " + s.toString());
+//			throw new SLAManagerException("Insert SLA " + s.toString() + " into db failed. "+ e.getCause() + e.getMessage() + e.getStackTrace());
+//		}
+//		finally {
+//			DbUtils.closeQuietly(connection);
+//		}
+//	}
+//	
+//	private void updateSLA(Connection connection, SLA s, EncodingType encType) throws SLAManagerException {
+//
+//		logger.debug("Inserting new SLA into DB. " + s.toString());
+//		
+//		QueryRunner runner = new QueryRunner();
+//
+//		String json = JSONUtils.toJSON(s.optionToObject());
+//		byte[] data = null;
+//		try {
+//			byte[] stringData = json.getBytes("UTF-8");
+//			data = stringData;
+//	
+//			if (encType == EncodingType.GZIP) {
+//				data = GZIPUtils.gzipBytes(stringData);
+//			}
+//			logger.debug("NumChars: " + json.length() + " UTF-8:" + stringData.length + " Gzip:"+ data.length);
+//		}
+//		catch (IOException e) {
+//			throw new SLAManagerException("Error encoding the sla options.");
+//		}
+//		
+//		try {
+//			int inserts = runner.update(connection, INSERT_SLA, s.getExecId(), s.getJobName(), s.getCheckTime().getMillis(), s.getRule().getNumVal(), encType.getNumVal(), data);
+//
+//			if (inserts == 0) {
+//				throw new SLAManagerException("No SLA has been inserted. Insertion failed.");
+//			}
+//		} catch (SQLException e) {
+//			logger.error("Insert SLA failed.");
+//			throw new SLAManagerException("Insert sla " + s.toString() + " into db failed. " + e.getCause());
+//		}
+//		
+//	}
+	
+	public class SLAResultHandler implements ResultSetHandler<List<SLA>> {
+		@Override
+		public List<SLA> handle(ResultSet rs) throws SQLException {
+			if (!rs.next()) {
+				return Collections.<SLA>emptyList();
+			}
+			
+			ArrayList<SLA> SLAs = new ArrayList<SLA>();
+			
+			do {
+				int execId = rs.getInt(1);
+				String jobName = rs.getString(2);
+				DateTime checkTime = new DateTime(rs.getLong(3));
+				SlaRule rule = SlaRule.fromInteger(rs.getInt(4));
+				int encodingType = rs.getInt(5);
+				byte[] data = rs.getBytes(6);
+				
+				SLA s = null;
+
+				EncodingType encType = EncodingType.fromInteger(encodingType);
+				Object optsObj;
+				try {
+					// Convoluted way to inflate strings. Should find common package or helper function.
+					if (encType == EncodingType.GZIP) {
+						// Decompress the sucker.
+						String jsonString = GZIPUtils.unGzipString(data, "UTF-8");
+						optsObj = JSONUtils.parseJSONFromString(jsonString);
+					}
+					else {
+						String jsonString = new String(data, "UTF-8");
+						optsObj = JSONUtils.parseJSONFromString(jsonString);
+					}						
+					s = SLA.createSlaFromObject(execId, jobName, checkTime, rule, optsObj);
+				} catch (IOException e) {
+					throw new SQLException("Error reconstructing SLA options. " + execId + " " + jobName + " " + checkTime.toDateTimeISO() + e.getCause());
+				}
+				SLAs.add(s);
+
+			} while (rs.next());
+			
+			return SLAs;
+		}
+		
+	}
+	
+}
diff --git a/src/java/azkaban/sla/SLA.java b/src/java/azkaban/sla/SLA.java
new file mode 100644
index 0000000..eb08229
--- /dev/null
+++ b/src/java/azkaban/sla/SLA.java
@@ -0,0 +1,252 @@
+package azkaban.sla;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.joda.time.DateTime;
+import org.joda.time.ReadablePeriod;
+
+import azkaban.scheduler.Schedule;
+
+public class SLA {
+
+	public static enum SlaRule {
+		SUCCESS(1), FINISH(2), WAITANDCHECKJOB(3);
+
+		private int numVal;
+
+		SlaRule(int numVal) {
+			this.numVal = numVal;
+		}
+
+		public int getNumVal() {
+			return numVal;
+		}
+
+		public static SlaRule fromInteger(int x) {
+			switch (x) {
+			case 1:
+				return SUCCESS;
+			case 2:
+				return FINISH;
+			case 3:
+				return WAITANDCHECKJOB;
+			default:
+				return SUCCESS;
+			}
+		}
+	}
+	
+	public static enum SlaAction {
+		EMAIL(1), KILL(2);
+
+		private int numVal;
+
+		SlaAction(int numVal) {
+			this.numVal = numVal;
+		}
+
+		public int getNumVal() {
+			return numVal;
+		}
+
+		public static SlaAction fromInteger(int x) {
+			switch (x) {
+			case 1:
+				return EMAIL;
+			case 2:
+				return KILL;
+			default:
+				return EMAIL;
+			}
+		}
+	}
+	
+	public static class SlaSetting {
+		public String getId() {
+			return id;
+		}
+		public void setId(String id) {
+			this.id = id;
+		}
+		public ReadablePeriod getDuration() {
+			return duration;
+		}
+		public void setDuration(ReadablePeriod duration) {
+			this.duration = duration;
+		}
+		public SlaRule getRule() {
+			return rule;
+		}
+		public void setRule(SlaRule rule) {
+			this.rule = rule;
+		}
+		public List<SlaAction> getActions() {
+			return actions;
+		}
+		public void setActions(List<SlaAction> actions) {
+			this.actions = actions;
+		}
+		
+		public Object toObject() {
+			Map<String, Object> obj = new HashMap<String, Object>();
+			obj.put("id", id);
+			obj.put("duration", Schedule.createPeriodString(duration));
+//			List<String> rulesObj = new ArrayList<String>();
+//			for(SlaRule rule : rules) {
+//				rulesObj.add(rule.toString());
+//			}
+//			obj.put("rules", rulesObj);
+			obj.put("rule", rule.toString());
+			List<String> actionsObj = new ArrayList<String>();
+			for(SlaAction act : actions) {
+				actionsObj.add(act.toString());
+			}
+			obj.put("actions", actionsObj);
+			return obj;
+		}
+		
+		@SuppressWarnings("unchecked")
+		public static SlaSetting fromObject(Object obj) {
+			Map<String, Object> slaObj = (HashMap<String, Object>) obj;
+			String subId = (String) slaObj.get("id");
+			ReadablePeriod dur = Schedule.parsePeriodString((String) slaObj.get("duration"));
+//			List<String> rulesObj = (ArrayList<String>) slaObj.get("rules");
+//			List<SlaRule> slaRules = new ArrayList<SLA.SlaRule>();
+//			for(String rule : rulesObj) {
+//				slaRules.add(SlaRule.valueOf(rule));
+//			}
+			SlaRule slaRule = SlaRule.valueOf((String) slaObj.get("rule"));
+			List<String> actsObj = (ArrayList<String>) slaObj.get("actions");
+			List<SlaAction> slaActs = new ArrayList<SlaAction>();
+			for(String act : actsObj) {
+				slaActs.add(SlaAction.valueOf(act));
+			}
+			
+			SlaSetting ret = new SlaSetting();
+			ret.setId(subId);
+			ret.setDuration(dur);
+			ret.setRule(slaRule);
+			ret.setActions(slaActs);
+			return ret;
+		}
+		
+		private String id;
+		private ReadablePeriod duration;
+		private SlaRule rule = SlaRule.SUCCESS;
+		private List<SlaAction> actions;
+	}
+	
+	private int execId;
+	private String jobName;
+	private DateTime checkTime;
+	private List<String> emails;
+	private List<SlaAction> actions;
+	private List<SlaSetting> jobSettings;
+	private SlaRule rule;
+	
+	public SLA(
+			int execId,
+			String jobName,
+			DateTime checkTime,
+			List<String> emails,
+			List<SlaAction> slaActions,
+			List<SlaSetting> jobSettings,
+			SlaRule slaRule
+	) {
+		this.execId = execId;
+		this.jobName = jobName;
+		this.checkTime = checkTime;
+		this.emails = emails;
+		this.actions = slaActions;
+		this.jobSettings = jobSettings;
+		this.rule = slaRule;
+	}
+
+	public int getExecId() {
+		return execId;
+	}
+
+	public String getJobName() {
+		return jobName;
+	}
+
+	public DateTime getCheckTime() {
+		return checkTime;
+	}
+
+	public List<String> getEmails() {
+		return emails;
+	}
+
+	public List<SlaAction> getActions() {
+		return actions;
+	}
+	
+	public List<SlaSetting> getJobSettings() {
+		return jobSettings;
+	}
+
+	public SlaRule getRule() {
+		return rule;
+	}
+	
+	public String toString() {
+		return execId + " " + jobName + " to be checked at " + checkTime.toDateTimeISO();
+	}
+	
+	public Map<String,Object> optionToObject() {
+		HashMap<String, Object> slaObj = new HashMap<String, Object>();
+
+		slaObj.put("emails", emails);
+//		slaObj.put("rule", rule.toString());
+
+		List<String> actionsObj = new ArrayList<String>();
+		for(SlaAction act : actions) {
+			actionsObj.add(act.toString());
+		}
+		slaObj.put("actions", actionsObj);
+		
+		if(jobSettings != null && jobSettings.size() > 0) {
+			List<Object> settingsObj = new ArrayList<Object>();
+			for(SlaSetting set : jobSettings) {
+				settingsObj.add(set.toObject());
+			}
+			slaObj.put("jobSettings", settingsObj);
+		}
+		
+
+		return slaObj;
+	}
+
+	@SuppressWarnings("unchecked")
+	public static SLA createSlaFromObject(int execId, String jobName, DateTime checkTime, SlaRule rule, Object obj) {
+		
+		HashMap<String, Object> slaObj = (HashMap<String,Object>)obj;
+
+		List<String> emails = (List<String>)slaObj.get("emails");
+//		SlaRule rule = SlaRule.valueOf((String)slaObj.get("rule"));
+		List<String> actsObj = (ArrayList<String>) slaObj.get("actions");
+		List<SlaAction> slaActs = new ArrayList<SlaAction>();
+		for(String act : actsObj) {
+			slaActs.add(SlaAction.valueOf(act));
+		}
+		List<SlaSetting> jobSets = null;
+		if(slaObj.containsKey("jobSettings") && slaObj.get("jobSettings") != null) {
+			jobSets = new ArrayList<SLA.SlaSetting>();
+			for(Object set : (List<Object>)slaObj.get("jobSettings")) {
+				SlaSetting jobSet = SlaSetting.fromObject(set);
+				jobSets.add(jobSet);
+			}
+		}
+		
+		return new SLA(execId, jobName, checkTime, emails, slaActs, jobSets, rule);
+	}
+
+	public void setCheckTime(DateTime time) {
+		this.checkTime = time;
+	}
+
+}
diff --git a/src/java/azkaban/sla/SLALoader.java b/src/java/azkaban/sla/SLALoader.java
new file mode 100644
index 0000000..2c32b33
--- /dev/null
+++ b/src/java/azkaban/sla/SLALoader.java
@@ -0,0 +1,14 @@
+package azkaban.sla;
+
+import java.util.List;
+
+public interface SLALoader {
+
+	public void insertSLA(SLA s) throws SLAManagerException;
+	
+	public List<SLA> loadSLAs() throws SLAManagerException;
+	
+	public void removeSLA(SLA s) throws SLAManagerException;
+
+//	public void updateSLA(SLA s) throws SLAManagerException;
+}
diff --git a/src/java/azkaban/sla/SlaMailer.java b/src/java/azkaban/sla/SlaMailer.java
new file mode 100644
index 0000000..17a67d0
--- /dev/null
+++ b/src/java/azkaban/sla/SlaMailer.java
@@ -0,0 +1,228 @@
+package azkaban.sla;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.mail.MessagingException;
+
+import org.apache.log4j.Logger;
+
+import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutableFlow.FailureAction;
+import azkaban.executor.ExecutableFlow.Status;
+import azkaban.executor.ExecutableNode;
+import azkaban.sla.SLA;
+import azkaban.utils.EmailMessage;
+import azkaban.utils.Props;
+import azkaban.utils.Utils;
+
+public class SlaMailer {
+	private static Logger logger = Logger.getLogger(SlaMailer.class);
+	
+	private boolean testMode = false;
+	private String clientHostname;
+	private String clientPortNumber;
+	
+	private String mailHost;
+	private String mailUser;
+	private String mailPassword;
+	private String mailSender;
+	private String azkabanName;
+	
+	public SlaMailer(Props props) {
+		this.azkabanName = props.getString("azkaban.name", "azkaban");
+		this.mailHost = props.getString("mail.host", "localhost");
+		this.mailUser = props.getString("mail.user", "");
+		this.mailPassword = props.getString("mail.password", "");
+		this.mailSender = props.getString("mail.sender", "");
+		
+		this.clientHostname = props.getString("jetty.hostname", "localhost");
+		this.clientPortNumber = Utils.nonNull(props.getString("jetty.ssl.port"));
+		
+		testMode = props.getBoolean("test.mode", false);
+	}
+	
+	public void sendFirstErrorMessage(ExecutableFlow flow) {
+		List<String> emailList = flow.getFailureEmails();
+		int execId = flow.getExecutionId();
+		
+		if (emailList != null && !emailList.isEmpty()) {
+			EmailMessage message = new EmailMessage(mailHost, mailUser, mailPassword);
+			message.setFromAddress(mailSender);
+			message.addAllToAddress(emailList);
+			message.setMimeType("text/html");
+			message.setSubject("Flow '" + flow.getFlowId() + "' has failed on " + azkabanName);
+			
+			message.println("<h2 style=\"color:#FF0000\"> Execution '" + flow.getExecutionId() + "' of flow '" + flow.getFlowId() + "' has encountered a failure on " + azkabanName + "</h2>");
+			
+			if (flow.getFailureAction() == FailureAction.CANCEL_ALL) {
+				message.println("This flow is set to cancel all currently running jobs.");
+			}
+			else if (flow.getFailureAction() == FailureAction.FINISH_ALL_POSSIBLE){
+				message.println("This flow is set to complete all jobs that aren't blocked by the failure.");
+			}
+			else {
+				message.println("This flow is set to complete all currently running jobs before stopping.");
+			}
+			
+			message.println("<table>");
+			message.println("<tr><td>Start Time</td><td>" + flow.getStartTime() +"</td></tr>");
+			message.println("<tr><td>End Time</td><td>" + flow.getEndTime() +"</td></tr>");
+			message.println("<tr><td>Duration</td><td>" + Utils.formatDuration(flow.getStartTime(), flow.getEndTime()) +"</td></tr>");
+			message.println("</table>");
+			message.println("");
+			String executionUrl = "https://" + clientHostname + ":" + clientPortNumber + "/" + "executor?" + "execid=" + execId;
+			message.println("<a href='\"" + executionUrl + "\">" + flow.getFlowId() + " Execution Link</a>");
+			
+			message.println("");
+			message.println("<h3>Reason</h3>");
+			List<String> failedJobs = findFailedJobs(flow);
+			message.println("<ul>");
+			for (String jobId : failedJobs) {
+				message.println("<li><a href=\"" + executionUrl + "&job=" + jobId + "\">Failed job '" + jobId + "' Link</a></li>" );
+			}
+			
+			message.println("</ul>");
+			
+			if (!testMode) {
+				try {
+					message.sendEmail();
+				} catch (MessagingException e) {
+					logger.error("Email message send failed" , e);
+				}
+			}
+		}
+	}
+	
+	public void sendErrorEmail(ExecutableFlow flow, String ... extraReasons) {
+		List<String> emailList = flow.getFailureEmails();
+		int execId = flow.getExecutionId();
+		
+		if (emailList != null && !emailList.isEmpty()) {
+			EmailMessage message = new EmailMessage(mailHost, mailUser, mailPassword);
+			message.setFromAddress(mailSender);
+			message.addAllToAddress(emailList);
+			message.setMimeType("text/html");
+			message.setSubject("Flow '" + flow.getFlowId() + "' has failed on " + azkabanName);
+			
+			message.println("<h2 style=\"color:#FF0000\"> Execution '" + flow.getExecutionId() + "' of flow '" + flow.getFlowId() + "' has failed on " + azkabanName + "</h2>");
+			message.println("<table>");
+			message.println("<tr><td>Start Time</td><td>" + flow.getStartTime() +"</td></tr>");
+			message.println("<tr><td>End Time</td><td>" + flow.getEndTime() +"</td></tr>");
+			message.println("<tr><td>Duration</td><td>" + Utils.formatDuration(flow.getStartTime(), flow.getEndTime()) +"</td></tr>");
+			message.println("</table>");
+			message.println("");
+			String executionUrl = "https://" + clientHostname + ":" + clientPortNumber + "/" + "executor?" + "execid=" + execId;
+			message.println("<a href='\"" + executionUrl + "\">" + flow.getFlowId() + " Execution Link</a>");
+			
+			message.println("");
+			message.println("<h3>Reason</h3>");
+			List<String> failedJobs = findFailedJobs(flow);
+			message.println("<ul>");
+			for (String jobId : failedJobs) {
+				message.println("<li><a href=\"" + executionUrl + "&job=" + jobId + "\">Failed job '" + jobId + "' Link</a></li>" );
+			}
+			for (String reasons: extraReasons) {
+				message.println("<li>" + reasons + "</li>");
+			}
+			
+			message.println("</ul>");
+			
+			
+			
+			if (!testMode) {
+				try {
+					message.sendEmail();
+				} catch (MessagingException e) {
+					logger.error("Email message send failed" , e);
+				}
+			}
+		}
+	}
+
+	public void sendSuccessEmail(ExecutableFlow flow) {
+		List<String> emailList = flow.getSuccessEmails();
+
+		int execId = flow.getExecutionId();
+		
+		if (emailList != null && !emailList.isEmpty()) {
+			EmailMessage message = new EmailMessage(mailHost, mailUser, mailPassword);
+			message.setFromAddress(mailSender);
+			message.addAllToAddress(emailList);
+			message.setMimeType("text/html");
+			message.setSubject("Flow '" + flow.getFlowId() + "' has succeeded on " + azkabanName);
+			
+			message.println("<h2> Execution '" + flow.getExecutionId() + "' of flow '" + flow.getFlowId() + "' has succeeded on " + azkabanName + "</h2>");
+			message.println("<table>");
+			message.println("<tr><td>Start Time</td><td>" + flow.getStartTime() +"</td></tr>");
+			message.println("<tr><td>End Time</td><td>" + flow.getEndTime() +"</td></tr>");
+			message.println("<tr><td>Duration</td><td>" + Utils.formatDuration(flow.getStartTime(), flow.getEndTime()) +"</td></tr>");
+			message.println("</table>");
+			message.println("");
+			String executionUrl = "https://" + clientHostname + ":" + clientPortNumber + "/" + "executor?" + "execid=" + execId;
+			message.println("<a href=\"" + executionUrl + "\">" + flow.getFlowId() + " Execution Link</a>");
+			
+			if (!testMode) {
+				try {
+					message.sendEmail();
+				} catch (MessagingException e) {
+					logger.error("Email message send failed" , e);
+				}
+			}
+		}
+	}
+	
+	private List<String> findFailedJobs(ExecutableFlow flow) {
+		ArrayList<String> failedJobs = new ArrayList<String>();
+		for (ExecutableNode node: flow.getExecutableNodes()) {
+			if (node.getStatus() == Status.FAILED) {
+				failedJobs.add(node.getJobId());
+			}
+		}
+		
+		return failedJobs;
+	}
+
+	public void sendSlaEmail(SLA s, String ... extraReasons) {
+		List<String> emailList = s.getEmails();
+		
+		if (emailList != null && !emailList.isEmpty()) {
+			EmailMessage message = new EmailMessage(mailHost, mailUser, mailPassword);
+			message.setFromAddress(mailSender);
+			message.addAllToAddress(emailList);
+			message.setMimeType("text/html");
+			message.setSubject("SLA violation on " + azkabanName);
+			
+//			message.println("<h2 style=\"color:#FF0000\"> Execution '" + s.getExecId() + "' of flow '" + flow.getFlowId() + "' failed to meet SLA on " + azkabanName + "</h2>");
+//			message.println("<table>");
+//			message.println("<tr><td>Start Time</td><td>" + flow.getStartTime() +"</td></tr>");
+//			message.println("<tr><td>End Time</td><td>" + flow.getEndTime() +"</td></tr>");
+//			message.println("<tr><td>Duration</td><td>" + Utils.formatDuration(flow.getStartTime(), flow.getEndTime()) +"</td></tr>");
+//			message.println("</table>");
+//			message.println("");
+//			String executionUrl = "https://" + clientHostname + ":" + clientPortNumber + "/" + "executor?" + "execid=" + execId;
+//			message.println("<a href='\"" + executionUrl + "\">" + flow.getFlowId() + " Execution Link</a>");
+//			
+//			message.println("");
+//			message.println("<h3>Reason</h3>");
+//			List<String> failedJobs = findFailedJobs(flow);
+//			message.println("<ul>");
+//			for (String jobId : failedJobs) {
+//				message.println("<li><a href=\"" + executionUrl + "&job=" + jobId + "\">Failed job '" + jobId + "' Link</a></li>" );
+//			}
+			for (String reasons: extraReasons) {
+				message.println("<li>" + reasons + "</li>");
+			}
+			
+			message.println("</ul>");
+			
+			if (!testMode) {
+				try {
+					message.sendEmail();
+				} catch (MessagingException e) {
+					logger.error("Email message send failed" , e);
+				}
+			}
+		}
+	}
+}
\ No newline at end of file
diff --git a/src/java/azkaban/sla/SLAManager.java b/src/java/azkaban/sla/SLAManager.java
new file mode 100644
index 0000000..ae17595
--- /dev/null
+++ b/src/java/azkaban/sla/SLAManager.java
@@ -0,0 +1,467 @@
+package azkaban.sla;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.log4j.Logger;
+import org.apache.velocity.runtime.parser.node.GetExecutor;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.joda.time.ReadablePeriod;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+
+import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutableFlow.Status;
+import azkaban.executor.ExecutableNode;
+import azkaban.executor.ExecutorMailer;
+import azkaban.executor.ExecutorManager;
+import azkaban.executor.ExecutorManagerException;
+import azkaban.flow.Flow;
+import azkaban.project.Project;
+import azkaban.project.ProjectManager;
+import azkaban.sla.SLA.SlaAction;
+import azkaban.sla.SLA.SlaRule;
+import azkaban.sla.SLA.SlaSetting;
+import azkaban.user.User;
+import azkaban.utils.Pair;
+import azkaban.utils.Props;
+
+/*
+ * Copyright 2012 LinkedIn, Inc
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+
+
+/**
+ * The SLAManager stores and checks the SLA (Service Level Agreement). It uses a single thread
+ * instead and waits until correct check time for the flow, and individual jobs in the flow if their SLA is set. 
+ */
+public class SLAManager {
+	private static Logger logger = Logger.getLogger(SLAManager.class);
+
+	private SLALoader loader;
+
+	private final SLARunner runner;
+	private final SLAPreRunner prerunner;
+	private final ExecutorManager executorManager;
+	private SlaMailer mailer;
+
+	/**
+	 * Give the sla manager a loader class that will properly load the
+	 * sla.
+	 * 
+	 * @param loader
+	 * @throws SLAManagerException 
+	 */
+	public SLAManager(ExecutorManager executorManager,
+							SLALoader loader,
+							Props props) throws SLAManagerException 
+	{
+		this.executorManager = executorManager;
+		this.loader = loader;
+		this.mailer = new SlaMailer(props);
+		this.runner = new SLARunner();
+		this.prerunner = new SLAPreRunner();
+
+		List<SLA> SLAList = null;
+		try {
+			SLAList = loader.loadSLAs();
+		} catch (SLAManagerException e) {
+			// TODO Auto-generated catch block
+			throw e;
+		}
+
+		for (SLA sla : SLAList) {
+			runner.addRunnerSLA(sla);
+		}
+
+		this.runner.start();
+	}
+
+	/**
+	 * Shutdowns the sla thread. After shutdown, it may not be safe to use
+	 * it again.
+	 */
+	public void shutdown() {
+		this.runner.shutdown();
+		this.prerunner.shutdown();
+	}
+
+	/**
+	 * Removes the flow from the SLA if it exists.
+	 * 
+	 * @param id
+	 * @throws SLAManagerException 
+	 */
+	public void removeSLA(SLA s) throws SLAManagerException {
+		logger.info("Removing SLA " + s.toString());
+		runner.removeRunnerSLA(s);
+		loader.removeSLA(s);
+	}
+
+	public void submitSla(
+			int execId, 
+			String id,
+			DateTime checkTime, 
+			List<String> emails,
+			List<SlaAction> slaActions,
+			List<SlaSetting> jobSettings,
+			SlaRule slaRule
+			) throws SLAManagerException {
+		SLA s = new SLA(execId, id, checkTime, emails, slaActions, jobSettings, slaRule);
+		logger.info("Submitting SLA " + s.toString());
+		try {
+			loader.insertSLA(s);
+			runner.addRunnerSLA(s);
+		}
+		catch (SLAManagerException e) {
+			throw new SLAManagerException("Failed to add new SLA!" + e.getCause());
+		}
+	}
+	
+	/**
+	 * Thread that simply invokes the checking of flows when the SLA is
+	 * ready.
+	 * 
+	 */
+	public class SLARunner extends Thread {
+		private final PriorityBlockingQueue<SLA> SLAs;
+		private AtomicBoolean stillAlive = new AtomicBoolean(true);
+
+		// Five minute minimum intervals
+		private static final int TIMEOUT_MS = 60000;
+
+		public SLARunner() {
+			SLAs = new PriorityBlockingQueue<SLA>(1,new SLAComparator());
+		}
+
+		public void shutdown() {
+			logger.error("Shutting down SLA runner thread");
+			stillAlive.set(false);
+			this.interrupt();
+		}
+
+		/**
+		 * Return a list of flow with SLAs
+		 * 
+		 * @return
+		 */
+		protected synchronized List<SLA> getRunnerSLAs() {
+			return new ArrayList<SLA>(SLAs);
+		}
+
+		/**
+		 * Adds SLA into runner and then interrupts so it will update
+		 * its wait time.
+		 * 
+		 * @param flow
+		 */
+		public synchronized void addRunnerSLA(SLA s) {
+			logger.info("Adding " + s + " to SLA runner.");
+			SLAs.add(s);
+			this.interrupt();
+		}
+		
+		/**
+		 * Remove runner SLA. Does not interrupt.
+		 * 
+		 * @param flow
+		 * @throws SLAManagerException 
+		 */
+		public synchronized void removeRunnerSLA(SLA s) {
+			logger.info("Removing " + s + " from the SLA runner.");
+			SLAs.remove(s);
+		}
+
+		public void run() {
+			while (stillAlive.get()) {
+				synchronized (this) {
+					try {
+						// TODO clear up the exception handling
+						SLA s = SLAs.peek();
+
+						if (s == null) {
+							// If null, wake up every minute or so to see if
+							// there's something to do. Most likely there will not be.
+							try {
+								this.wait(TIMEOUT_MS);
+							} catch (InterruptedException e) {
+								// interruption should occur when items are added or removed from the queue.
+							}
+						} else {
+							// We've passed the flow execution time, so we will run.
+							if (!(new DateTime(s.getCheckTime())).isAfterNow()) {
+								// Run flow. The invocation of flows should be quick.
+								SLA runningSLA = SLAs.poll();
+								
+								logger.info("Checking sla " + runningSLA.toString() );
+								
+								int execId = s.getExecId();
+								ExecutableFlow exflow = executorManager.getExecutableFlow(execId);
+								
+								if(runningSLA.getJobName().equals("") && runningSLA.getRule().equals(SlaRule.WAITANDCHECKJOB)) {
+									// do the checking of potential jobsla submissions
+									List<SlaSetting> jobSettings = runningSLA.getJobSettings();
+									List<SlaSetting> removeSettings = new ArrayList<SLA.SlaSetting>();
+									for(SlaSetting set : jobSettings) {
+										ExecutableNode node = exflow.getExecutableNode(set.getId());
+										if(node.getStartTime() != -1 || executorManager.isFinished(exflow)) {
+											submitSla(execId, set.getId(), new DateTime(node.getStartTime()).plus(set.getDuration()), runningSLA.getEmails(), set.getActions(), null, set.getRule());
+											removeSettings.add(set);
+											logger.info("Job " + set.getId() + " already started, monitoring SLA.");
+										}
+									}
+									for(SlaSetting remove : removeSettings) {
+										jobSettings.remove(remove);
+									}
+									if(jobSettings.size() == 0) {
+										removeRunnerSLA(runningSLA);
+										loader.removeSLA(runningSLA);
+									}
+									else {
+										removeRunnerSLA(runningSLA);
+										loader.removeSLA(runningSLA);
+										runningSLA.setCheckTime(runningSLA.getCheckTime().plusMillis(TIMEOUT_MS));
+										addRunnerSLA(runningSLA);
+										loader.insertSLA(runningSLA);
+									}
+								}
+								else {
+									if(!metSla(runningSLA, exflow)) {
+										takeSLAActions(runningSLA, exflow);
+									}
+
+
+									removeRunnerSLA(runningSLA);
+									loader.removeSLA(runningSLA);
+								}
+							} else {
+								// wait until flow run
+								long millisWait = Math.max(0, s.getCheckTime().getMillis() - (new DateTime()).getMillis());
+								try {
+									this.wait(Math.min(millisWait, TIMEOUT_MS));
+								} catch (InterruptedException e) {
+									// interruption should occur when items are
+									// added or removed from the queue.
+								}
+							}
+						}
+					} catch (Exception e) {
+						logger.error("Unexpected exception has been thrown in scheduler", e);
+					} catch (Throwable e) {
+						logger.error("Unexpected throwable has been thrown in scheduler", e);
+					}
+				}
+			}
+		}
+
+		private boolean metSla(SLA s, ExecutableFlow exflow) {
+			SlaRule rule = s.getRule();
+			long finishTime;
+			Status status;
+			if(s.getJobName().equals("")) {
+				finishTime = exflow.getEndTime();
+				status = exflow.getStatus();
+			}
+			else {
+				ExecutableNode exnode = exflow.getExecutableNode(s.getJobName());
+				finishTime = exnode.getEndTime();
+				status = exnode.getStatus();
+			}
+			
+			switch(rule) {
+				case FINISH:	// check finish time
+					return finishTime != -1 && finishTime < s.getCheckTime().getMillis();
+				case SUCCESS: 	// check finish and successful
+					return status == Status.SUCCEEDED && finishTime < s.getCheckTime().getMillis();
+				default: 
+					logger.error("Unknown SLA rules!");
+					return false;
+			}
+		}
+
+		/**
+		 * Class to sort the sla based on time.
+		 * 
+		 */
+		private class SLAComparator implements Comparator<SLA> {
+			@Override
+			public int compare(SLA arg0, SLA arg1) {
+				long first = arg1.getCheckTime().getMillis();
+				long second = arg0.getCheckTime().getMillis();
+
+				if (first == second) {
+					return 0;
+				} else if (first < second) {
+					return 1;
+				}
+
+				return -1;
+			}
+		}
+	}
+
+	private void takeSLAActions(SLA s, ExecutableFlow exflow) {
+		logger.info("SLA " + s.toString() + " missed! Taking predefined actions");
+		List<SlaAction> actions = s.getActions();
+		for(SlaAction act : actions) {
+			if(act.equals(SlaAction.EMAIL)) {
+				try {
+					sendSlaAlertEmail(s, exflow);
+				}
+				catch (Exception e) {
+					logger.error("Failed to send out SLA alert email. " + e.getCause());
+				}
+			} else if(act.equals(SlaAction.KILL)) {
+				try {
+					executorManager.cancelFlow(exflow, "azkaban");
+					//sendSlaKillEmail(s, exflow);
+				} catch (ExecutorManagerException e) {
+					// TODO Auto-generated catch block
+					logger.error("Cancel flow failed." + e.getCause());
+				}
+			}
+		}
+	}
+	
+	private void sendSlaAlertEmail(SLA s, ExecutableFlow exflow) {
+		String message = null;
+		ExecutableNode exnode;
+		switch(s.getRule()) {
+			case FINISH:
+				if(s.getJobName().equals("")) {
+					message = "Flow " + exflow.getFlowId() + " failed to finish with set SLA." + String.format("%n");
+					message += "Flow started at " + new DateTime(exflow.getStartTime()).toDateTimeISO() + String.format("%n");
+					message += "Flow status at " + s.getCheckTime().toDateTimeISO() + " is " + exflow.getStatus();
+				}
+				else {
+					exnode = exflow.getExecutableNode(s.getJobName());
+					message = "Job " + s.getJobName() + " of flow " + exflow.getFlowId() + " failed to finish with set SLA." + String.format("%n");
+					message += "Job started at " + new DateTime(exnode.getStartTime()).toDateTimeISO() + String.format("%n");
+					message += "Job status at " + s.getCheckTime().toDateTimeISO() + " is " + exnode.getStatus();
+				}
+				break;
+			case SUCCESS:
+				if(s.getJobName().equals("")) {
+					message = "Flow " + exflow.getFlowId() + " didn't finish successfully with set SLA. " + String.format("%n");
+					message += "Flow started at " + new DateTime(exflow.getStartTime()).toDateTimeISO() + String.format("  %n");
+					message += "Flow status at " + s.getCheckTime().toDateTimeISO() + " is " + exflow.getStatus();
+				}
+				else {
+					exnode = exflow.getExecutableNode(s.getJobName());
+					message = "Job " + s.getJobName() + " of flow " + exflow.getFlowId() + " didn't finish successfully with set SLA." + String.format("%n"); 
+					message += "Job started at " + new DateTime(exnode.getStartTime()).toDateTimeISO() + String.format("%n");
+					message += "Job status at " + s.getCheckTime().toDateTimeISO() + " is " + exnode.getStatus();
+				}
+				break;
+			default:
+				logger.error("Unknown SLA rules!");
+				message = "Unknown SLA was not met!";
+				break;
+		}
+		mailer.sendSlaEmail(s, message);
+	}
+	
+	public class SLAPreRunner extends Thread {
+		private final List<SLA> preSlas;
+		private AtomicBoolean stillAlive = new AtomicBoolean(true);
+
+		// Five minute minimum intervals
+		private static final int TIMEOUT_MS = 300000;
+
+		public SLAPreRunner() {
+			preSlas = new ArrayList<SLA>();
+		}
+
+		public void shutdown() {
+			logger.error("Shutting down pre-sla checker thread");
+			stillAlive.set(false);
+			this.interrupt();
+		}
+
+		/**
+		 * Return a list of flow with SLAs
+		 * 
+		 * @return
+		 */
+		protected synchronized List<SLA> getPreSlas() {
+			return new ArrayList<SLA>(preSlas);
+		}
+
+		/**
+		 * Adds SLA into runner and then interrupts so it will update
+		 * its wait time.
+		 * 
+		 * @param flow
+		 */
+		public synchronized void addCheckerPreSla(SLA s) {
+			logger.info("Adding " + s + " to pre-sla checker.");
+			preSlas.add(s);
+			this.interrupt();
+		}
+		
+		/**
+		 * Remove runner SLA. Does not interrupt.
+		 * 
+		 * @param flow
+		 * @throws SLAManagerException 
+		 */
+		public synchronized void removeCheckerPreSla(SLA s) {
+			logger.info("Removing " + s + " from the pre-sla checker.");
+			preSlas.remove(s);
+		}
+
+		public void run() {
+			while (stillAlive.get()) {
+				synchronized (this) {
+					try {
+						// TODO clear up the exception handling
+
+						if (preSlas.size() == 0) {
+							try {
+								this.wait(TIMEOUT_MS);
+							} catch (InterruptedException e) {
+								// interruption should occur when items are added or removed from the queue.
+							}
+						} else {
+							for(SLA s : preSlas) {
+								ExecutableFlow exflow = executorManager.getExecutableFlow(s.getExecId());
+								String id = s.getJobName();
+								if(!s.equals("")) {
+									ExecutableNode exnode = exflow.getExecutableNode(id);
+									if(exnode.getStartTime() != -1) {
+										
+									}
+								}
+							}
+						}
+					} catch (Exception e) {
+						logger.error("Unexpected exception has been thrown in scheduler", e);
+					} catch (Throwable e) {
+						logger.error("Unexpected throwable has been thrown in scheduler", e);
+					}
+				}
+			}
+		}
+	}
+
+	
+}
diff --git a/src/java/azkaban/sla/SLAManagerException.java b/src/java/azkaban/sla/SLAManagerException.java
new file mode 100644
index 0000000..a3468a2
--- /dev/null
+++ b/src/java/azkaban/sla/SLAManagerException.java
@@ -0,0 +1,13 @@
+package azkaban.sla;
+
+public class SLAManagerException extends Exception{
+	private static final long serialVersionUID = 1L;
+
+	public SLAManagerException(String message) {
+		super(message);
+	}
+	
+	public SLAManagerException(String message, Throwable cause) {
+		super(message, cause);
+	}
+}
diff --git a/src/java/azkaban/webapp/AzkabanWebServer.java b/src/java/azkaban/webapp/AzkabanWebServer.java
index 22d1af1..46e2cad 100644
--- a/src/java/azkaban/webapp/AzkabanWebServer.java
+++ b/src/java/azkaban/webapp/AzkabanWebServer.java
@@ -53,6 +53,7 @@ import azkaban.scheduler.JdbcScheduleLoader;
 import azkaban.scheduler.ScheduleManager;
 import azkaban.sla.JdbcSLALoader;
 import azkaban.sla.SLAManager;
+import azkaban.sla.SLAManagerException;
 import azkaban.user.UserManager;
 import azkaban.user.XmlUserManager;
 import azkaban.utils.FileIOUtils;
@@ -147,8 +148,8 @@ public class AzkabanWebServer implements AzkabanServer {
 		userManager = loadUserManager(props);
 		projectManager = loadProjectManager(props);
 		executorManager = loadExecutorManager(props);
-		scheduleManager = loadScheduleManager(executorManager, props);
-		slaManager = loadSLAManager();
+		slaManager = loadSLAManager(props);
+		scheduleManager = loadScheduleManager(executorManager, slaManager, props);
 		baseClassLoader = getBaseClassloader();
 		
 		tempDir = new File(props.getString("azkaban.temp.dir", "temp"));
@@ -208,14 +209,14 @@ public class AzkabanWebServer implements AzkabanServer {
 		return execManager;
 	}
 
-	private ScheduleManager loadScheduleManager(ExecutorManager execManager, Props props ) throws Exception {
-		ScheduleManager schedManager = new ScheduleManager(execManager, projectManager, new JdbcScheduleLoader(props));
+	private ScheduleManager loadScheduleManager(ExecutorManager execManager, SLAManager slaManager, Props props ) throws Exception {
+		ScheduleManager schedManager = new ScheduleManager(execManager, projectManager, slaManager, new JdbcScheduleLoader(props));
 
 		return schedManager;
 	}
 
-	private SLAManager loadSLAManager() {
-		SLAManager slaManager = new SLAManager(executorManager, projectManager, new JdbcSLALoader(props));
+	private SLAManager loadSLAManager(Props props) throws SLAManagerException {
+		SLAManager slaManager = new SLAManager(executorManager, new JdbcSLALoader(props), props);
 		return slaManager;
 	}
 	
diff --git a/src/java/azkaban/webapp/servlet/ScheduleServlet.java b/src/java/azkaban/webapp/servlet/ScheduleServlet.java
index 954de95..b5da554 100644
--- a/src/java/azkaban/webapp/servlet/ScheduleServlet.java
+++ b/src/java/azkaban/webapp/servlet/ScheduleServlet.java
@@ -18,6 +18,7 @@ package azkaban.webapp.servlet;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -38,7 +39,10 @@ import org.joda.time.ReadablePeriod;
 import org.joda.time.format.DateTimeFormat;
 
 import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutableNode;
 import azkaban.executor.ExecutorManagerException;
+import azkaban.executor.ExecutableFlow.FailureAction;
+import azkaban.executor.ExecutableFlow.Status;
 import azkaban.flow.Flow;
 import azkaban.flow.Node;
 import azkaban.project.Project;
@@ -53,12 +57,14 @@ import azkaban.utils.Pair;
 import azkaban.webapp.AzkabanWebServer;
 import azkaban.webapp.session.Session;
 import azkaban.scheduler.Schedule;
+import azkaban.scheduler.Schedule.FlowOptions;
+import azkaban.scheduler.Schedule.SlaOptions;
 import azkaban.scheduler.ScheduleManager;
-import azkaban.sla.FlowRule;
-import azkaban.sla.JobRule;
 import azkaban.sla.SLA;
+import azkaban.sla.SLA.SlaRule;
 import azkaban.sla.SLAManager;
 import azkaban.sla.SLA.SlaAction;
+import azkaban.sla.SLA.SlaSetting;
 
 public class ScheduleServlet extends LoginAbstractAzkabanServlet {
 	private static final long serialVersionUID = 1L;
@@ -93,11 +99,14 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
 		HashMap<String, Object> ret = new HashMap<String, Object>();
 		String ajaxName = getParam(req, "ajax");
 		
-		if (ajaxName.equals("schedInfo")) {
-			ajaxSchedInfo(req, resp, ret, session.getUser());
+		if (ajaxName.equals("slaInfo")) {
+			ajaxSlaInfo(req, ret, session.getUser());
 		}
 		else if(ajaxName.equals("setSla")) {
-			ajaxSetSla(req, resp, ret, session.getUser());
+			ajaxSetSla(req, ret, session.getUser());
+		}
+		else if(ajaxName.equals("advSchedule")) {
+			ajaxAdvSchedule(req, ret, session.getUser());
 		}
 
 		if (ret != null) {
@@ -105,7 +114,7 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
 		}
 	}
 
-	private void ajaxSetSla(HttpServletRequest req, HttpServletResponse resp, HashMap<String, Object> ret, User user) {
+	private void ajaxSetSla(HttpServletRequest req, HashMap<String, Object> ret, User user) {
 		try {
 			
 			int projectId = getIntParam(req, "projectId");
@@ -117,26 +126,40 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
 				return;
 			}
 			
-			String slaEmals = getParam(req, "slaEmails");
-			System.out.println(slaEmals);			
-
-			String flowRules = getParam(req, "flowRules");
-			FlowRule flowRule = parseFlowRule(flowRules);
+			Schedule sched = scheduleManager.getSchedule(new Pair<Integer, String>(projectId, flowName));
+			
+			SlaOptions slaOptions= new SlaOptions();
+			
+			String slaEmails = getParam(req, "slaEmails");
+			System.out.println(slaEmails); 
+			String[] emailSplit = slaEmails.split("\\s*,\\s*|\\s*;\\s*|\\s+");
 			
-			List<JobRule> jobRule = new ArrayList<JobRule>();
-			Map<String, String> jobRules = getParamGroup(req, "jobRules");
-			System.out.println(jobRules);
-			for(String job : jobRules.keySet()) {
-				JobRule jr = parseJobRule(job, jobRules.get(job));
-				jobRule.add(jr);
+			Map<String, String> settings = getParamGroup(req, "settings");
+			System.out.println(settings);
+			List<SlaSetting> slaSettings = new ArrayList<SlaSetting>();
+			for(String set : settings.keySet()) {
+				SlaSetting s;
+				try {
+				s = parseSlaSetting(settings.get(set));
+				}
+				catch (Exception e) {
+					throw new ServletException(e);
+				}
+				if(s != null) {
+					slaSettings.add(s);
+				}
 			}
-			Map<String, Object> options= new HashMap<String, Object>();
-			options.put("slaEmails", slaEmals);
-			options.put("flowRules", flowRules);
-			options.put("jobRules", jobRule);
-			Schedule sched = scheduleManager.getSchedule(new Pair<Integer, String>(projectId, flowName));
-			//slaManager.addFlowSLA(projectId, project.getName(), flowName, "ready", sched.getFirstSchedTime(), sched.getTimezone(), sched.getPeriod(), DateTime.now(), DateTime.now(), DateTime.now(), user, options);
-		
+			
+			if(slaSettings.size() > 0) {
+				slaOptions.setSlaEmails(Arrays.asList(emailSplit));
+				slaOptions.setSettings(slaSettings);
+			}
+			else {
+				slaOptions = null;
+			}
+			sched.setSlaOptions(slaOptions);
+			scheduleManager.insertSchedule(sched);
+
 		} catch (ServletException e) {
 			ret.put("error", e);
 		}
@@ -144,67 +167,46 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
 	}
 
 	
-	private FlowRule parseFlowRule(String flowRules) {
-		String[] parts = flowRules.split(",");
-		String duration = parts[0];
-		String emailAction = parts[1];
-		String killAction = parts[2];
+	private SlaSetting parseSlaSetting(String set) {
+		// "" + Duration + EmailAction + KillAction
+		String[] parts = set.split(",", -1);
+		String id = parts[0];
+		String rule = parts[1];
+		String duration = parts[2];
+		String emailAction = parts[3];
+		String killAction = parts[4];
 		if(emailAction.equals("on") || killAction.equals("on")) {
-			if(!duration.equals("")) {
-				FlowRule r = new FlowRule();
-				ReadablePeriod dur = parseDuration(duration);
-				r.setDuration(dur);
-				List<SlaAction> actions = new ArrayList<SLA.SlaAction>();
-				if(emailAction.equals("on")) {
-					actions.add(SlaAction.SENDEMAIL);
-				}
-				if(killAction.equals("on")) {
-					actions.add(SlaAction.KILL);
-				}
-				r.setActions(actions);
-				return r;
+			SlaSetting r = new SlaSetting();			
+			r.setId(id);
+			r.setRule(SlaRule.valueOf(rule));
+			ReadablePeriod dur = parseDuration(duration);
+			r.setDuration(dur);
+			List<SlaAction> actions = new ArrayList<SLA.SlaAction>();
+			if(emailAction.equals("on")) {
+				actions.add(SlaAction.EMAIL);
 			}
-		}		
-		return null;
-	}
-
-	private JobRule parseJobRule(String job, String jobRule) {
-		String[] parts = jobRule.split(",");
-		String duration = parts[0];
-		String emailAction = parts[1];
-		String killAction = parts[2];
-		if(emailAction.equals("on") || killAction.equals("on")) {
-			if(!duration.equals("")) {
-				JobRule r = new JobRule();
-				r.setJobId(job);
-				ReadablePeriod dur = parseDuration(duration);
-				r.setDuration(dur);
-				List<SlaAction> actions = new ArrayList<SLA.SlaAction>();
-				if(emailAction.equals("on")) {
-					actions.add(SlaAction.SENDEMAIL);
-				}
-				if(killAction.equals("on")) {
-					actions.add(SlaAction.KILL);
-				}
-				r.setActions(actions);
-				return r;
+			if(killAction.equals("on")) {
+				actions.add(SlaAction.KILL);
 			}
-		}	
+			r.setActions(actions);
+			return r;
+		}
 		return null;
 	}
 
 	private ReadablePeriod parseDuration(String duration) {
-		int hour = Integer.parseInt(duration.split(",")[0]);
-		int min = Integer.parseInt(duration.split(",")[1]);
-		return Hours.hours(hour).toPeriod().plus(Minutes.minutes(min).toPeriod());
+		int hour = Integer.parseInt(duration.split(":")[0]);
+		int min = Integer.parseInt(duration.split(":")[1]);
+		return Minutes.minutes(min+hour*60).toPeriod();
 	}
 
 	@SuppressWarnings("unchecked")
-	private void ajaxSchedInfo(HttpServletRequest req, HttpServletResponse resp, HashMap<String, Object> ret, User user) {
+	private void ajaxSlaInfo(HttpServletRequest req, HashMap<String, Object> ret, User user) {
 		int projId;
+		String flowName;
 		try {
 			projId = getIntParam(req, "projId");
-			String flowName = getParam(req, "flowName");
+			flowName = getParam(req, "flowName");
 			
 			Project project = getProjectAjaxByPermission(ret, projId, user, Type.READ);
 			if (project == null) {
@@ -218,36 +220,52 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
 				return;
 			}
 			
-			SLA sla = slaManager.getSLA(new Pair<Integer, String>(projId, flowName));
+			Schedule sched = scheduleManager.getSchedule(new Pair<Integer, String>(projId, flowName));
 			
-			if(sla != null) {
-				ret.put("slaEmails", (List<String>)sla.getSlaOptions().get("slaEmails"));
-				List<String> allJobs = new ArrayList<String>();
-				for(Node n : flow.getNodes()) {
-					allJobs.add(n.getId());
-				}
-				ret.put("allJobs", allJobs);
-				if(sla.getFlowRules() != null) {
-					ret.put("flowRules", sla.getFlowRules());
+			SlaOptions slaOptions = sched.getSlaOptions();
+			FlowOptions flowOptions = sched.getFlowOptions();
+			
+			if(slaOptions != null) {
+				ret.put("slaEmails", slaOptions.getSlaEmails());
+				List<SlaSetting> settings = slaOptions.getSettings();
+				List<Object> setObj = new ArrayList<Object>();
+				for(SlaSetting set: settings) {
+					setObj.add(set.toObject());
 				}
-				if(sla.getJobRules() != null) {
-					ret.put("jobRules", sla.getJobRules());
+				ret.put("settings", setObj);
+			}
+			else if (flowOptions != null) {
+				if(flowOptions.getFailureEmails() != null) {
+					List<String> emails = flowOptions.getFailureEmails();
+					if(emails.size() > 0) {
+						ret.put("slaEmails", emails);
+					}
 				}
 			}
 			else {
-				ret.put("slaEmails", flow.getFailureEmails());
-				List<String> allJobs = new ArrayList<String>();
-				Schedule sched = scheduleManager.getSchedule(new Pair<Integer, String>(projId, flowName));
-				List<String> disabled = sched.getDisabledJobs(); 
-				for(Node n : flow.getNodes()) {
-					if(!disabled.contains(n.getId())) {
-						allJobs.add(n.getId());
+				if(flow.getFailureEmails() != null) {
+					List<String> emails = flow.getFailureEmails();
+					if(emails.size() > 0) {
+						ret.put("slaEmails", emails);
 					}
 				}
-				ret.put("allJobs", allJobs);
-				
+			}
+			
+			List<String> disabledJobs;
+			if(flowOptions != null) {
+				disabledJobs = flowOptions.getDisabledJobs() == null ? new ArrayList<String>() : flowOptions.getDisabledJobs();
+			}
+			else {
+				disabledJobs = new ArrayList<String>();
+			}
 				
+			List<String> allJobs = new ArrayList<String>();
+			for(Node n : flow.getNodes()) {
+				if(!disabledJobs.contains(n.getId())) {
+					allJobs.add(n.getId());
+				}
 			}
+			ret.put("allJobNames", allJobs);
 		} catch (ServletException e) {
 			ret.put("error", e);
 		}
@@ -277,45 +295,38 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
 		
 		List<Schedule> schedules = scheduleManager.getSchedules();
 		page.add("schedules", schedules);
-		
-		List<SLA> slas = slaManager.getSLAs();
-		page.add("slas", slas);
+//		
+//		List<SLA> slas = slaManager.getSLAs();
+//		page.add("slas", slas);
 
 		page.render();
 	}
 	
 	@Override
 	protected void handlePost(HttpServletRequest req, HttpServletResponse resp, Session session) throws ServletException, IOException {
-		HashMap<String, Object> ret = new HashMap<String, Object>();
-		if (hasParam(req, "action")) {
-			String action = getParam(req, "action");
-			if (action.equals("scheduleFlow")) {
-				ajaxScheduleFlow(req, ret, session.getUser());
-			}
-			else if(action.equals("removeSched")){
-				ajaxRemoveSched(req, ret, session.getUser());
+		if (hasParam(req, "ajax")) {
+			handleAJAXAction(req, resp, session);
+		}
+		else {
+			HashMap<String, Object> ret = new HashMap<String, Object>();
+			if (hasParam(req, "action")) {
+				String action = getParam(req, "action");
+				if (action.equals("scheduleFlow")) {
+					ajaxScheduleFlow(req, ret, session.getUser());
+				}
+				else if(action.equals("removeSched")){
+					ajaxRemoveSched(req, ret, session.getUser());
+				}
 			}
+			
+			if(ret.get("status") == ("success"))
+				setSuccessMessageInCookie(resp, (String) ret.get("message"));
+			else
+				setErrorMessageInCookie(resp, (String) ret.get("message"));
+			
+			this.writeJSON(resp, ret);
 		}
-		
-		if(ret.get("status") == ("success"))
-			setSuccessMessageInCookie(resp, (String) ret.get("message"));
-		else
-			setErrorMessageInCookie(resp, (String) ret.get("message"));
-		
-		this.writeJSON(resp, ret);
 	}
-
-//	private void handleAJAXAction(HttpServletRequest req, HttpServletResponse resp, Session session) throws ServletException, IOException {
-//		HashMap<String, Object> ret = new HashMap<String, Object>();
-//		String ajaxName = getParam(req, "ajax");
-//		
-//		if (ajaxName.equals("scheduleFlow")) {
-//				ajaxScheduleFlow(req, ret, session.getUser());
-//		}
-////		}
-//		this.writeJSON(resp, ret);
-//	}
-//	
 	
 	private void ajaxRemoveSched(HttpServletRequest req, Map<String, Object> ret, User user) throws ServletException{
 		int projectId = getIntParam(req, "projectId");
@@ -406,40 +417,170 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
 		    hour += 12;
 		hour %= 24;
 
-//		String submitUser = user.getUserId();
-//		String userExec = userSubmit;//getParam(req, "userExec");
-//		String scheduleId = projectId + "." + flowName;
 		DateTime submitTime = new DateTime();
 		DateTime firstSchedTime = day.withHourOfDay(hour).withMinuteOfHour(minutes).withSecondOfMinute(0);
 		
-		//ScheduledFlow schedFlow = scheduleManager.schedule(scheduleId, projectId, flowId, userExec, userSubmit, submitTime, firstSchedTime, thePeriod);
-		//project.info("User '" + user.getUserId() + "' has scheduled " + flow.getId() + "[" + schedFlow.toNiceString() + "].");
-		Schedule schedule = scheduleManager.scheduleFlow(projectId, projectName, flowName, "ready", firstSchedTime.getMillis(), timezone, thePeriod, submitTime.getMillis(), firstSchedTime.getMillis(), firstSchedTime.getMillis(), user.getUserId(), null);
+		Schedule sched = scheduleManager.getSchedule(new Pair<Integer, String>(projectId, flowName));
+		FlowOptions flowOptions = null;
+		SlaOptions slaOptions = null;
+		if(sched != null) {
+			if(sched.getFlowOptions() != null) {
+				flowOptions = sched.getFlowOptions();
+			}
+			if(sched.getSlaOptions() != null) {
+				slaOptions = sched.getSlaOptions();
+			}
+		}
+		Schedule schedule = scheduleManager.scheduleFlow(projectId, projectName, flowName, "ready", firstSchedTime.getMillis(), timezone, thePeriod, submitTime.getMillis(), firstSchedTime.getMillis(), firstSchedTime.getMillis(), user.getUserId(), flowOptions, slaOptions);
 		logger.info("User '" + user.getUserId() + "' has scheduled " + "[" + projectName + flowName +  " (" + projectId +")" + "].");
 		projectManager.postProjectEvent(project, EventType.SCHEDULE, user.getUserId(), "Schedule " + schedule.getScheduleName() + " has been added.");
 		
 		ret.put("status", "success");
 		ret.put("message", projectName + "." + flowName + " scheduled.");
 	}
-				
-//	private ReadablePeriod parsePeriod(HttpServletRequest req) throws ServletException {
-//			int period = getIntParam(req, "period");
-//			String periodUnits = getParam(req, "period_units");
-//			if("M".equals(periodUnits))
-//				return Months.months(period);
-//			else if("w".equals(periodUnits))
-//				return Weeks.weeks(period);
-//			else if("d".equals(periodUnits))
-//				return Days.days(period);
-//			else if("h".equals(periodUnits))
-//				return Hours.hours(period);
-//			else if("m".equals(periodUnits))
-//				return Minutes.minutes(period);
-//			else if("s".equals(periodUnits))
-//				return Seconds.seconds(period);
-//			else
-//				throw new ServletException("Unknown period unit: " + periodUnits);
-//	}
+
+	private void ajaxAdvSchedule(HttpServletRequest req, HashMap<String, Object> ret, User user) throws ServletException {
+		String projectName = getParam(req, "projectName");
+		String flowName = getParam(req, "flowName");
+		int projectId = getIntParam(req, "projectId");
+		
+		Project project = projectManager.getProject(projectId);
+			
+		if (project == null) {
+			ret.put("message", "Project " + projectName + " does not exist");
+			ret.put("status", "error");
+			return;
+		}
+		
+		if (!hasPermission(project, user, Type.SCHEDULE)) {
+			ret.put("status", "error");
+			ret.put("message", "Permission denied. Cannot execute " + flowName);
+			return;
+		}
+
+		Flow flow = project.getFlow(flowName);
+		if (flow == null) {
+			ret.put("status", "error");
+			ret.put("message", "Flow " + flowName + " cannot be found in project " + project);
+			return;
+		}
+		
+		String scheduleTime = getParam(req, "scheduleTime");
+		String scheduleDate = getParam(req, "scheduleDate");
+		DateTime firstSchedTime;
+		try {
+			firstSchedTime = parseDateTime(scheduleDate, scheduleTime);
+		}
+		catch (Exception e) {
+			ret.put("error", "Invalid date and/or time '" + scheduleDate + " " + scheduleTime);
+	      	return;
+		}
+
+		ReadablePeriod thePeriod = null;
+		try {
+			if(hasParam(req, "is_recurring") && getParam(req, "is_recurring").equals("on")) {
+			    thePeriod = Schedule.parsePeriodString(getParam(req, "period"));	
+			}
+		}
+		catch(Exception e){
+			ret.put("error", e.getMessage());
+		}
+		
+		Schedule sched = scheduleManager.getSchedule(new Pair<Integer, String>(projectId, flowName));
+		FlowOptions flowOptions = null;
+		try {
+			flowOptions = parseFlowOptions(req);
+		}
+		catch (Exception e) {
+			ret.put("error", e.getMessage());
+		}
+		SlaOptions slaOptions = null;
+		if(sched != null) {
+			if(sched.getSlaOptions() != null) {
+				slaOptions = sched.getSlaOptions();
+			}
+		}
+		Schedule schedule = scheduleManager.scheduleFlow(projectId, projectName, flowName, "ready", firstSchedTime.getMillis(), firstSchedTime.getZone(), thePeriod, DateTime.now().getMillis(), firstSchedTime.getMillis(), firstSchedTime.getMillis(), user.getUserId(), flowOptions, slaOptions);
+		logger.info("User '" + user.getUserId() + "' has scheduled " + "[" + projectName + flowName +  " (" + projectId +")" + "].");
+		projectManager.postProjectEvent(project, EventType.SCHEDULE, user.getUserId(), "Schedule " + schedule.getScheduleName() + " has been added.");
+		
+		ret.put("status", "success");
+		ret.put("message", projectName + "." + flowName + " scheduled.");
+	}
+	
+	private FlowOptions parseFlowOptions(HttpServletRequest req) throws ServletException {
+		FlowOptions flowOptions = new FlowOptions();
+		if (hasParam(req, "failureAction")) {
+			String option = getParam(req, "failureAction");
+			if (option.equals("finishCurrent") ) {
+				flowOptions.setFailureAction(FailureAction.FINISH_CURRENTLY_RUNNING);
+			}
+			else if (option.equals("cancelImmediately")) {
+				flowOptions.setFailureAction(FailureAction.CANCEL_ALL);
+			}
+			else if (option.equals("finishPossible")) {
+				flowOptions.setFailureAction(FailureAction.FINISH_ALL_POSSIBLE);
+			}
+		}
+
+		if (hasParam(req, "failureEmails")) {
+			String emails = getParam(req, "failureEmails");
+			String[] emailSplit = emails.split("\\s*,\\s*|\\s*;\\s*|\\s+");
+			flowOptions.setFailureEmails(Arrays.asList(emailSplit));
+		}
+		if (hasParam(req, "successEmails")) {
+			String emails = getParam(req, "successEmails");
+			String[] emailSplit = emails.split("\\s*,\\s*|\\s*;\\s*|\\s+");
+			flowOptions.setSuccessEmails(Arrays.asList(emailSplit));
+		}
+		if (hasParam(req, "notifyFailureFirst")) {
+			flowOptions.setNotifyOnFirstFailure(Boolean.parseBoolean(getParam(req, "notifyFailureFirst")));
+		}
+		if (hasParam(req, "notifyFailureLast")) {
+			flowOptions.setNotifyOnLastFailure(Boolean.parseBoolean(getParam(req, "notifyFailureLast")));
+		}
+		if (hasParam(req, "executingJobOption")) {
+			//String option = getParam(req, "jobOption");
+			// Not set yet
+		}
+		
+		Map<String, String> flowParamGroup = this.getParamGroup(req, "flowOverride");
+		flowOptions.setFlowOverride(flowParamGroup);
+		
+		if (hasParam(req, "disabledJobs")) {
+			String disable = getParam(req, "disabledJobs");
+			String[] disableSplit = disable.split("\\s*,\\s*|\\s*;\\s*|\\s+");
+			List<String> jobs = (List<String>) Arrays.asList(disableSplit);
+			flowOptions.setDisabledJobs(jobs.subList(1, jobs.size()));
+		}
+		return flowOptions;
+	}
+
+	private DateTime parseDateTime(String scheduleDate, String scheduleTime) {
+		// scheduleTime: 12,00,pm,PDT
+		String[] parts = scheduleTime.split(",", -1);
+		int hour = Integer.parseInt(parts[0]);
+		int minutes = Integer.parseInt(parts[1]);
+		boolean isPm = parts[2].equalsIgnoreCase("pm");
+		
+		DateTimeZone timezone = parts[3].equals("UTC") ? DateTimeZone.UTC : DateTimeZone.forID("America/Los_Angeles");
+
+		// scheduleDate: 02/10/2013
+		DateTime day = null;
+		if(scheduleDate == null || scheduleDate.trim().length() == 0) {
+			day = new LocalDateTime().toDateTime();
+		} else {
+			day = DateTimeFormat.forPattern("MM/dd/yyyy").withZone(timezone).parseDateTime(scheduleDate);
+		}
+		
+		if(isPm && hour < 12)
+		    hour += 12;
+		hour %= 24;
+
+		DateTime firstSchedTime = day.withHourOfDay(hour).withMinuteOfHour(minutes).withSecondOfMinute(0);
+
+		return firstSchedTime;
+	}
 
 	private boolean hasPermission(Project project, User user, Permission.Type type) {
 		if (project.hasPermission(user, type)) {
diff --git a/src/java/azkaban/webapp/servlet/velocity/flowpage.vm b/src/java/azkaban/webapp/servlet/velocity/flowpage.vm
index 0f8ddce..102c43f 100644
--- a/src/java/azkaban/webapp/servlet/velocity/flowpage.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/flowpage.vm
@@ -34,6 +34,7 @@
 		<script type="text/javascript" src="${context}/js/azkaban.flow.graph.view.js"></script>
 		<script type="text/javascript" src="${context}/js/azkaban.flow.view.js"></script>
 		<script type="text/javascript" src="${context}/js/azkaban.exflow.options.view.js"></script>
+		<script type="text/javascript" src="${context}/js/azkaban.schedule.options.view.js"></script>
 		<script type="text/javascript" src="${context}/js/svgNavigate.js"></script>
 		<script type="text/javascript">
 			var contextURL = "${context}";
@@ -131,165 +132,7 @@
 					</div>
 				</div>
 		<!-- modal content -->
-				<div id="advScheduleModalBackground" class="modalBackground2">
-					<div id="schedule-options" class="modal modalContainer2">
-						<a href='#' title='Close' class='modal-close'>x</a>
-							<h3>Advanced Schedule Options</h3>
-							<div>
-								<ul class="optionsPicker">
-									<li id="scheduleGeneralOptions">General Options</li>
-									<li id="scheduleFlowOptions">Flow Options</li>
-									<li id="scheduleSlaOptions">SLA Options</li>
-								</ul>
-							</div>
-							<div class="optionsPane">
-						<div id="scheduleSlaPanel" class="generalPanel panel">
-							<div id="slaActions">
-								<h4>SLA Alert Emails</h4>
-								<dl>
-									<dt >SLA Alert Emails</dt>
-									<dd>
-										<textarea id="slaEmails"></textarea>
-									</dd>
-								</dl>
-							</div>
-							<div id="slaRules">
-								<h4>Flow SLA Rules</h4>
-								<div class="tableDiv">
-									<table id="flowRulesTbl">
-										<thead>
-											<tr>
-												<th>Flow/Job</th>
-												<th>Finish In</th>
-												<th>Email Action</th>
-												<th>Kill Action</th>
-											</tr>
-										</thead>
-										<tbody>
-										</tbody>
-									</table>
-								</div>
-								<h4>Job SLA Rules</h4>
-								<div class="tableDiv">
-									<table id="jobRulesTbl">
-										<thead>
-											<tr>
-												<th>Flow/Job</th>
-												<th>Finish In</th>
-												<th>Email Action</th>
-												<th>Kill Action</th>
-											</tr>
-										</thead>
-										<tbody>
-										</tbody>
-									</table>
-								</div>
-							</div>
-						</div>
-						<div id="scheduleGeneralPanel" class="generalPanel panel">
-							<div id="scheduleBasicInfo">
-								<h4>Basic Scheduling Information</h4>
-								<dl>
-								<dt>Schedule Time</dt>
-									<dd>
-										<input id="advHour" type="text" size="2" value="12"/>
-										<input id="advMinutes" type="text" size="2" value="00"/>
-										<select id="advAm_pm">
-											<option>pm</option>
-											<option>am</option>
-										</select>
-										<select id="advTimezone">
-											<option>PDT</option>
-											<option>UTC</option>
-										</select>
-									</dd>
-									<dt>Schedule Date</dt>
-									<dd><input type="text" id="advDate" /></dd>
-									<dt>Recurrence</dt>
-									<dd>
-										<input id="advIs_recurring" type="checkbox" checked  /><span>repeat every</span>
-										<input id="advPeriod" type="text" size="2" value="1"/>
-										<select id="advPeriod_units">
-											<option value="d">Days</option>
-											<option value="h">Hours</option>
-											<option value="m">Minutes</option>
-											<option value="M">Months</option>
-											<option value="w">Weeks</option>
-										</select>
-									</dd>
-								</dl>
-							</div>
-							<div id="scheduleCompleteActions">
-								<h4>Completion Actions</h4>
-								<dl>
-									<dt class="disabled">Failure Action</dt>
-									<dd>
-										<select id="scheduleFailureAction" name="failureAction">
-											<option value="finishCurrent">Finish Current Running</option>
-											<option value="cancelImmediately">Cancel All</option>
-											<option value="finishPossible">Finish All Possible</option>
-										</select>
-									</dd>
-									<dt>Failure Email</dt>
-									<dd>
-										<textarea id="scheduleFailureEmails"></textarea>
-									</dd>
-									<dt>Notify on Failure</dt>
-									<dd>
-										<input id="scheduleNotifyFailureFirst" class="checkbox" type="checkbox" name="notify" value="first" checked >First Failure</input>
-										<input id="scheduleNotifyFailureLast" class="checkbox" type="checkbox" name="notify" value="last">Flow Stop</input>
-									</dd>
-									<dt>Success Email</dt>
-									<dd>
-										<textarea id="scheduleSuccessEmails"></textarea>
-									</dd>
-									<dt class="disabled" >Concurrent Execution</dt>
-									<dd id="scheduleExecutingJob" class="disabled">
-										<input id="scheduleIgnore" class="radio" type="radio" name="concurrent" value="ignore" checked /><label class="radioLabel" for="ignore">Run Concurrently</label>
-										<input id="schedulePipeline" class="radio" type="radio" name="concurrent" value="pipeline" /><label class="radioLabel" for="pipeline">Pipeline</label>
-										<input id="scheduleQueue" class="radio" type="radio" name="concurrent" value="queue" /><label class="radioLabel" for="queue">Queue Job</label>
-									</dd>
-								</dl>
-							</div>
-							<div id="scheduleFlowPropertyOverride">
-								<h4>Flow Property Override</h4>
-								<div class="tableDiv">
-									<table>
-										<thead>
-											<tr>
-												<th>Name</th>
-												<th>Value</th>
-											</tr>
-										</thead>
-										<tbody>
-											<tr id="scheduleAddRow"><td id="scheduleAddRow-col" colspan="2"><span class="addIcon"></span><a href="#">Add Row</a></td></tr>
-										</tbody>
-									</table>
-								</div>
-							</div>
-						</div>
-						<div id="scheduleGraphPanel" class="graphPanel panel">
-							<div id="scheduleJobListCustom" class="jobList">
-								<div class="filterList">
-									<input class="filter" placeholder="  Job Filter" />
-								</div>
-								<div class="list">
-								</div>
-								<div class="btn5 resetPanZoomBtn" >Reset Pan Zoom</div>
-							</div>
-						    <div id="scheduleSvgDivCustom" class="svgDiv" >
-						    	<svg class="svgGraph" xmlns="http://www.w3.org/2000/svg" version="1.1" shape-rendering="optimize-speed" text-rendering="optimize-speed" >
-								</svg>
-							</div>
-						</div>
-					</div>
-						<div class="actions">
-							<a class="yes btn1" id="adv-schedule-btn" href="#">Schedule</a>
-							<a class="no simplemodal-close btn3" id="schedule-cancel-btn" href="#">Cancel</a>
-					</div>
-					</div>
-				</div>
-		
+	
 				<div id="schedule-flow" class="modal">
 						<h3>Schedule Flow</h3>
 						<div id="errorMsg" class="box-error-message">$errorMsg</div>
@@ -345,7 +188,7 @@
 							<a class="yes btn3" id="login-btn" href="#">Re-login</a>
 						</div>
 				</div>
-
+#parse( "azkaban/webapp/servlet/velocity/scheduleoptionspanel.vm" )
 
 #parse( "azkaban/webapp/servlet/velocity/executionoptionspanel.vm" )
 
@@ -375,6 +218,26 @@
 			</ul>
 		</ul>
 
+		<ul id="scheduleDisableJobMenu" class="contextMenu flowSubmenu">  
+			<li class="openwindow"><a href="#scheduleOpenwindow">Open in New Window...</a></li>
+			<li id="scheduleDisable" class="disable separator"><a href="#disable">Disable</a><div id="scheduleDisableArrow" class="context-sub-icon"></div></li>
+			<ul id="scheduleDisableSub" class="subMenu">
+				<li class="disableAll"><a href="#disableAll">All</a></li>
+				<li class="parents"><a href="#disableParents">Parents</a></li>
+				<li class="ancestors"><a href="#disableAncestors">All Ancestors</a></li>
+				<li class="children"><a href="#disableChildren">Children</a></li>
+				<li class="decendents"><a href="#disableDescendents">All Descendents</a></li>
+			</ul>
+			<li id="scheduleEnable" class="enable"><a href="#enable">Enable</a> <div id="scheduleEnableArrow" class="context-sub-icon"></div></li>
+			<ul id="scheduleEnableSub" class="subMenu">
+				<li class="enableAll"><a href="#enableAll">All</a></li>
+				<li class="parents"><a href="#enableParents">Parents</a></li>
+				<li class="ancestors"><a href="#enableAncestors">All Ancestors</a></li>
+				<li class="children"><a href="#enableChildren">Children</a></li>
+				<li class="decendents"><a href="#enableDescendents">All Descendents</a></li>
+			</ul>
+		</ul>
+
 		</div>
 	</body>
 </html>
diff --git a/src/java/azkaban/webapp/servlet/velocity/scheduledflowpage.vm b/src/java/azkaban/webapp/servlet/velocity/scheduledflowpage.vm
index b2511ed..64d5a9d 100644
--- a/src/java/azkaban/webapp/servlet/velocity/scheduledflowpage.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/scheduledflowpage.vm
@@ -68,11 +68,11 @@
 							<!--th class="execid">Execution Id</th-->
 							<th>Flow</th>
 							<th>Project</th>
-							<th>User</th>
 							<th>Submitted By</th>
 							<th class="date">First Scheduled to Run</th>
 							<th class="date">Next Execution Time</th>
 							<th class="date">Repeats Every</th>
+							<th>Has SLA</th>
 							<th colspan="2" class="action">Action</th>
 						</tr>
 					</thead>
@@ -88,10 +88,10 @@
 								<a href="${context}/manager?project=${sched.projectName}">${sched.projectName}</a>
 							</td>
 							<td>${sched.submitUser}</td>
-							<td>${sched.submitUser}</td>
 							<td>$utils.formatDateTime(${sched.firstSchedTime})</td>
 							<td>$utils.formatDateTime(${sched.nextExecTime})</td>
 							<td>$utils.formatPeriod(${sched.period})</td>
+							<td>#if(${sched.slaOptions}) true #else false #end</td>
 							<td><button id="removeSchedBtn" onclick="removeSched(${sched.projectId}, '${sched.flowName}')" >Remove Schedule</button></td>
 							<td><button id="addSlaBtn" onclick="slaView.initFromSched(${sched.projectId}, '${sched.flowName}')" >Set SLA</button></td>
 						</tr>
@@ -103,54 +103,6 @@
 				</table>
 			</div>
 
-		
-			<div id="all-sla-content">
-				<div class="section-hd">
-					<h2>Flow SLAs</h2>
-				</div>
-			</div>
-			
-			<div class="scheduledFlows">
-				<table id="slaTbl">
-					<thead>
-						<tr>
-							<th>Flow</th>
-							<th>Project</th>
-							<th>User</th>
-							<th>Submitted By</th>
-							<th class="date">First SLA check Time</th>
-							<th class="date">Next SLA check Time</th>
-							<th class="date">Repeats Every</th>
-							<th colspan="2" class="action">Action</th>
-						</tr>
-					</thead>
-					<tbody>
-						#if($slas)
-#foreach($sla in $slas)
-						<tr class="row" >
-
-							<td class="tb-name">
-								<a href="${context}/manager?project=${sla.projectName}&flow=${sla.flowName}">${sla.flowName}</a>
-							</td>
-							<td>
-								<a href="${context}/manager?project=${sla.projectName}">${sla.projectName}</a>
-							</td>
-							<td>${sla.submitUser}</td>
-							<td>${sla.submitUser}</td>
-							<td>$utils.formatDateTime(${sla.firstCheckTime})</td>
-							<td>$utils.formatDateTime(${sla.nextCheckTime})</td>
-							<td>$utils.formatPeriod(${sla.period})</td>
-							<td><button id="removeSchedBtn" onclick="removeSla(${sla.projectId}, '${sla.flowName}')" >Remove SLA</button></td>
-						</tr>
-#end
-#else
-						<tr><td class="last">No Flow SLA Set</td></tr>
-#end
-					</tbody>
-				</table>
-			</div>
-		</div>
-				
 		<!-- modal content -->
 
 		<div id="slaModalBackground" class="modalBackground2">
@@ -163,7 +115,7 @@
 						</ul>
 					</div>
 					<div class="optionsPane">
-						<div id="slaPanel" class="generalPanel panel">
+						<div id="generalPanel" class="generalPanel panel">
 							<div id="slaActions">
 								<h4>SLA Alert Emails</h4>
 								<dl>
@@ -173,6 +125,7 @@
 									</dd>
 								</dl>
 							</div>
+							<br></br>
 							<div id="slaRules">
 								<h4>Flow SLA Rules</h4>
 								<div class="tableDiv">
@@ -180,36 +133,40 @@
 										<thead>
 											<tr>
 												<th>Flow/Job</th>
-												<th>Finish In</th>
+												<th>Sla Rule</th>
+												<th>Duration</th>
 												<th>Email Action</th>
 												<th>Kill Action</th>
 											</tr>
 										</thead>
 										<tbody>
+											<tr id="addRow"><td id="addRow-col" colspan="5"><span class="addIcon"></span><a href="#">Add Row</a></td></tr>
 										</tbody>
 									</table>
 								</div>
-								<h4>Job SLA Rules</h4>
-								<div class="tableDiv">
+								<!--h4 style="visibility: hidden">Job SLA Rules</h4>
+								<div class="tableDiv" style="visibility: hidden">
 									<table id="jobRulesTbl">
 										<thead>
 											<tr>
-												<th>Flow/Job</th>
+												<th>Flow/Job Id</th>
 												<th>Finish In</th>
+												<th>Duration</th>
 												<th>Email Action</th>
 												<th>Kill Action</th>
 											</tr>
 										</thead>
 										<tbody>
+											<tr id="addRow"><td id="addRow-col" colspan="4"><span class="addIcon"></span><a href="#">Add Row</a></td></tr>
 										</tbody>
 									</table>
-								</div>
+								</div-->
 							</div>
 						</div>
 					</div>
 					<div class="actions">
-						<a class="yes btn1" id="remove-sla-btn" href="#">Remove SLA</a>
-						<a class="yes btn1" id="set-sla-btn" href="#">Set SLA</a>
+						<!--a class="yes btn1" id="remove-sla-btn" href="#">Remove SLA</a-->
+						<a class="yes btn1" id="set-sla-btn" href="#">Set/Change SLA</a>
 						<a class="no simplemodal-close btn3" id="sla-cancel-btn" href="#">Cancel</a>
 					</div>
 			</div>
diff --git a/src/java/azkaban/webapp/servlet/velocity/scheduleoptionspanel.vm b/src/java/azkaban/webapp/servlet/velocity/scheduleoptionspanel.vm
new file mode 100644
index 0000000..fc6bc91
--- /dev/null
+++ b/src/java/azkaban/webapp/servlet/velocity/scheduleoptionspanel.vm
@@ -0,0 +1,159 @@
+<div id="scheduleModalBackground" class="modalBackground2">
+	<div id="schedule-options" class="modal modalContainer2">
+		<a href='#' title='Close' class='modal-close'>x</a>
+			<h3>Schedule Flow Options</h3>
+			<div>
+				<ul class="optionsPicker">
+					<li id="scheduleGeneralOptions">General Options</li>
+					<li id="scheduleFlowOptions">Flow Options</li>
+					<!--li id="scheduleSlaOptions">SLA Options</li-->
+				</ul>
+			</div>
+			<div class="optionsPane">
+				<!--div id="scheduleSlaPanel" class="generalPanel panel">
+					<div id="slaActions">
+						<h4>SLA Alert Emails</h4>
+						<dl>
+							<dt >SLA Alert Emails</dt>
+							<dd>
+								<textarea id="slaEmails"></textarea>
+							</dd>
+						</dl>
+					</div>
+					<div id="slaRules">
+						<h4>Flow SLA Rules</h4>
+						<div class="tableDiv">
+							<table id="flowRulesTbl">
+								<thead>
+									<tr>
+										<th>Flow/Job</th>
+										<th>Finish In</th>
+										<th>Email Action</th>
+										<th>Kill Action</th>
+									</tr>
+								</thead>
+								<tbody>
+								</tbody>
+							</table>
+						</div>
+						<h4>Job SLA Rules</h4>
+						<div class="tableDiv">
+							<table id="jobRulesTbl">
+								<thead>
+									<tr>
+										<th>Flow/Job</th>
+										<th>Finish In</th>
+										<th>Email Action</th>
+										<th>Kill Action</th>
+									</tr>
+								</thead>
+								<tbody>
+								</tbody>
+							</table>
+						</div>
+					</div>
+				</div-->
+				<div id="scheduleGeneralPanel" class="generalPanel panel">
+					<div id="scheduleInfo">
+						<dl>
+							<dt>Schedule Time</dt>
+							<dd>
+								<input id="advhour" type="text" size="2" value="12"/>
+								<input id="advminutes" type="text" size="2" value="00"/>
+								<select id="advam_pm">
+								  <option>pm</option>
+								  <option>am</option>
+								</select>
+								<select id="advtimezone">
+								  <option>PDT</option>
+								  <option>UTC</option>
+								</select>
+							</dd>
+							<dt>Schedule Date</dt><dd><input type="text" id="advdatepicker" /></dd>
+							<dt>Recurrence</dt>
+							<dd>
+								<input id="advis_recurring" type="checkbox" checked  />
+								<span>repeat every</span>
+							 	<input id="advperiod" type="text" size="2" value="1"/>
+								<select id="advperiod_units">
+								  <option value="d">Days</option>
+								  <option value="h">Hours</option>
+								  <option value="m">Minutes</option>
+								  <option value="M">Months</option>
+								  <option value="w">Weeks</option>
+								</select>
+							</dd>
+						</dl>
+					</div>
+					<br></br>
+					<br></br>
+					<div id="scheduleCompleteActions">
+						<h4>Completion Actions</h4>
+						<dl>
+							<dt>Failure Action</dt>
+							<dd>
+								<select id="scheduleFailureAction" name="failureAction">
+									<option value="finishCurrent">Finish Current Running</option>
+									<option value="cancelImmediately">Cancel All</option>
+									<option value="finishPossible">Finish All Possible</option>
+								</select>
+											</dd>
+							<dt>Failure Email</dt>
+							<dd>
+								<textarea id="scheduleFailureEmails"></textarea>
+							</dd>
+							<dt>Notify on Failure</dt>
+							<dd>
+								<input id="scheduleNotifyFailureFirst" class="checkbox" type="checkbox" name="notify" value="first" checked >First Failure</input>
+								<input id="scheduleNotifyFailureLast" class="checkbox" type="checkbox" name="notify" value="last">Flow Stop</input>
+							</dd>
+							<dt>Success Email</dt>
+							<dd>
+								<textarea id="scheduleSuccessEmails"></textarea>
+							</dd>
+							<dt>Concurrent Execution</dt>
+							<dd id="scheduleExecutingJob" class="disabled">
+								<input id="scheduleIgnore" class="radio" type="radio" name="concurrent" value="ignore" checked /><label class="radioLabel" for="ignore">Run Concurrently</label>
+								<input id="schedulePipeline" class="radio" type="radio" name="concurrent" value="pipeline" /><label class="radioLabel" for="pipeline">Pipeline</label>
+								<input id="scheduleQueue" class="radio" type="radio" name="concurrent" value="queue" /><label class="radioLabel" for="queue">Queue Job</label>
+							</dd>
+						</dl>
+					</div>
+					<div id="scheduleFlowPropertyOverride">
+						<h4>Flow Property Override</h4>
+						<div class="tableDiv">
+							<table>
+								<thead>
+									<tr>
+										<th>Name</th>
+										<th>Value</th>
+									</tr>
+								</thead>
+								<tbody>
+									<tr id="scheduleAddRow"><td id="addRow-col" colspan="2"><span class="addIcon"></span><a href="#">Add Row</a></td></tr>
+								</tbody>
+							</table>
+						</div>
+					</div>
+				</div>
+				<div id="scheduleGraphPanel" class="graphPanel panel">
+					<div id="scheduleJobListCustom" class="jobList">
+						<div class="filterList">
+							<input class="filter" placeholder="  Job Filter" />
+						</div>
+						<div class="list">
+						</div>
+						<div class="btn5 resetPanZoomBtn" >Reset Pan Zoom</div>
+						</div>
+						<div id="scheduleSvgDivCustom" class="svgDiv" >
+							<svg class="svgGraph" xmlns="http://www.w3.org/2000/svg" version="1.1" shape-rendering="optimize-speed" text-rendering="optimize-speed" >
+							</svg>
+						</div>
+				</div>
+		</div>
+		<div class="actions">
+	<a class="yes btn1" id="adv-schedule-btn" href="#">Schedule</a>
+	<a class="no simplemodal-close btn3" id="schedule-cancel-btn" href="#">Cancel</a>
+	</div>
+	</div>
+</div>
\ No newline at end of file
diff --git a/src/sql/create_sla_table.sql b/src/sql/create_sla_table.sql
new file mode 100644
index 0000000..9914a03
--- /dev/null
+++ b/src/sql/create_sla_table.sql
@@ -0,0 +1,10 @@
+DROP TABLE if exists active_sla;
+CREATE TABLE active_sla (
+	exec_id INT NOT NULL,
+	job_name VARCHAR(128) NOT NULL,
+	check_time BIGINT NOT NULL,
+	rule TINYINT NOT NULL,
+	enc_type TINYINT,
+	options LONGBLOB NOT NULL,
+	primary key(exec_id, job_name, check_time, rule)
+) ENGINE=InnoDB;
diff --git a/src/web/css/azkaban.css b/src/web/css/azkaban.css
index a29f9a1..67d3287 100644
--- a/src/web/css/azkaban.css
+++ b/src/web/css/azkaban.css
@@ -1346,10 +1346,6 @@ tr:hover td {
 	bottom: 0px;
 }
 
-#graphPanel {
-	background-color: #F0F0F0;
-}
-
 .radioLabel.disabled {
 	opacity: 0.3;
 }
@@ -1443,6 +1439,189 @@ tr:hover td {
 	border-bottom: 1px solid #CCC;
 }
 
+#sla-options {
+	left: 100px;
+	right: 100px;
+	top: 50px;
+	bottom: 40px;
+}
+
+#sla-options .svgDiv {
+	position: absolute;
+	background-color: #CCC;
+	padding: 1px;
+	left: 270px;
+	right: 0px;
+	top: 0px;
+	bottom: 0px;
+}
+
+#sla-options .jobList {
+	position: absolute;
+	width: 255px;
+	top: 0px;
+	bottom: 0px;
+	padding: 5px;
+	background-color: #F0F0F0;
+}
+
+#sla-options .list {
+	width: 255px;
+}
+
+#sla-options ul.optionsPicker {
+	margin-left: 30px;
+}
+
+#sla-options ul.optionsPicker li {
+	float: left;
+	font-size: 12pt;
+	font-weight: bold;
+	margin-right: 15px;
+	cursor: pointer;
+	color: #CCC;
+}
+
+#sla-options ul.optionsPicker li.selected {
+	text-decoration: underline;
+	color: #000;
+}
+
+#sla-options ul.optionsPicker li.selected:hover {
+	color: #000;
+}
+
+#sla-options ul.optionsPicker li:hover {
+	color: #888;
+}
+
+#sla-options .optionsPane {
+	position: absolute;
+	top: 85px;
+	background-color: #FFF;
+	left: 0px;
+	right: 0px;
+	bottom: 0px;
+}
+
+#sla-options .panel {
+	position: absolute;
+	width: 100%;
+	top: 0px;
+	bottom: 65px;
+}
+
+#sla-options .generalPanel.panel {
+	background-color: #F4F4F4;
+	padding-top: 15px;
+}
+
+#sla-options h3 {
+	margin-left: 20px;
+	font-size: 14pt;
+	border-bottom: 1px solid #CCC;
+}
+
+#sla-options h4 {
+	margin-left: 20px;
+	font-size: 12pt;
+	border-bottom: 1px solid #CCC;
+}
+
+
+#schedule-options {
+	left: 100px;
+	right: 100px;
+	top: 50px;
+	bottom: 40px;
+}
+
+#schedule-options .svgDiv {
+	position: absolute;
+	background-color: #CCC;
+	padding: 1px;
+	left: 270px;
+	right: 0px;
+	top: 0px;
+	bottom: 0px;
+}
+
+#schedule-options .jobList {
+	position: absolute;
+	width: 255px;
+	top: 0px;
+	bottom: 0px;
+	padding: 5px;
+	background-color: #F0F0F0;
+}
+
+#schedule-options .list {
+	width: 255px;
+}
+
+#schedule-options ul.optionsPicker {
+	margin-left: 30px;
+}
+
+#schedule-options ul.optionsPicker li {
+	float: left;
+	font-size: 12pt;
+	font-weight: bold;
+	margin-right: 15px;
+	cursor: pointer;
+	color: #CCC;
+}
+
+#schedule-options ul.optionsPicker li.selected {
+	text-decoration: underline;
+	color: #000;
+}
+
+#schedule-options ul.optionsPicker li.selected:hover {
+	color: #000;
+}
+
+#schedule-options ul.optionsPicker li:hover {
+	color: #888;
+}
+
+#schedule-options .optionsPane {
+	position: absolute;
+	top: 85px;
+	background-color: #FFF;
+	left: 0px;
+	right: 0px;
+	bottom: 0px;
+}
+
+#schedule-options .panel {
+	position: absolute;
+	width: 100%;
+	top: 0px;
+	bottom: 65px;
+}
+
+#schedule-options .generalPanel.panel {
+	background-color: #F4F4F4;
+	padding-top: 15px;
+}
+
+#schedule-options h3 {
+	margin-left: 20px;
+	font-size: 14pt;
+	border-bottom: 1px solid #CCC;
+}
+
+#schedule-options h4 {
+	margin-left: 20px;
+	font-size: 12pt;
+	border-bottom: 1px solid #CCC;
+}
+
+#graphPanel {
+	background-color: #F0F0F0;
+}
+
 #generalPanel {
 	overflow: auto;
 }
@@ -1548,6 +1727,97 @@ tr:hover td {
 	top: 100px;
 }
 
+#scheduleGraphPanel {
+	background-color: #F0F0F0;
+}
+
+#scheduleGeneralPanel {
+	overflow: auto;
+}
+
+#scheduleGeneralPanel dt {
+	width: 150px;
+	font-size: 10pt;
+	font-weight: bold;
+	margin-top: 5px;
+}
+
+#scheduleGeneralPanel textarea {
+	width: 500px;
+}
+
+#scheduleGeneralPanel table #addRow {
+	cursor: pointer;
+}
+
+#scheduleGeneralPanel table tr {
+	height: 24px;
+}
+
+#scheduleGeneralPanel table .editable {
+
+}
+
+#scheduleGeneralPanel table .editable input {
+	border: 1px solid #009FC9;
+	height: 16px;
+}
+
+#scheduleGeneralPanel table .name {
+	width: 40%;
+}
+
+#scheduleGeneralPanel span.addIcon {
+	display: block;
+	width: 16px;
+	height: 16px;
+	background-image: url("./images/addIcon.png");
+}
+
+#scheduleGeneralPanel span.removeIcon {
+	display: block;
+	visibility:hidden;
+	disabled: true;
+	width: 16px;
+	height: 16px;
+	background-image: url("./images/removeIcon.png");
+	cursor: pointer;
+}
+
+#scheduleGeneralPanel .editable:hover span.removeIcon {
+	visibility:visible;
+}
+
+#scheduleGeneralPanel {
+}
+
+#scheduleGeneralPanel span {
+	float: left;
+	margin-left: 5px;
+}
+
+#scheduleGeneralPanel dd {
+	font-size: 10pt;
+}
+
+#scheduleFlowPropertyOverride {
+	clear: both;
+	padding-top: 30px;
+}
+
+#scheduleFlowPropertyOverride .tableDiv {
+	padding-right: 20px;
+	padding-left: 20px;
+}
+
+#scheduleJobList {
+	position: absolute;
+	top: 0px;
+	left: 0px;
+	height: 100%;
+	width: 250px;
+}
+
 .filter {
 	width: 100%;
 }
@@ -1577,7 +1847,7 @@ tr:hover td {
 	background-position: 16px 0px;
 }
 
-.list ul li:hover{
+.list ul li:hover {
 	background-color: #E1E3E2;
 	color: #009FC9;
 }
diff --git a/src/web/js/azkaban.exflow.options.view.js b/src/web/js/azkaban.exflow.options.view.js
index 2a036d8..6c631bb 100644
--- a/src/web/js/azkaban.exflow.options.view.js
+++ b/src/web/js/azkaban.exflow.options.view.js
@@ -15,11 +15,9 @@
  */
 
 var executeFlowView;
-var advancedScheduleView;
 var customSvgGraphView;
 var customJobListView;
 var cloneModel;
-var scheduleModel;
 
 function recurseAllAncestors(nodes, disabledMap, id, disable) {
 	var node = nodes[id];
@@ -88,425 +86,6 @@ azkaban.ContextMenu = Backbone.View.extend({
 	}
 });
 
-azkaban.AdvancedScheduleView = Backbone.View.extend({
-	  events : {
-	  	"click" : "closeEditingTarget",
-	    "click #adv-schedule-btn": "handleScheduleFlow",
-	    "click #schedule-cancel-btn": "handleCancel",
-	    "click .modal-close": "handleCancel",
-	    "click #scheduleGeneralOptions": "handleGeneralOptionsSelect",
-	    "click #scheduleFlowOptions": "handleFlowOptionsSelect",
-	    "click #scheduleSlaOptions": "handleSlaOptionsSelect",
-	    "click #scheduleAddRow": "handleAddRow",
-	    "click table .editable": "handleEditColumn",
-	    "click table .removeIcon": "handleRemoveColumn"
-	  },
-	  initialize: function(setting) {
-	  	 this.contextMenu = new azkaban.ContextMenu({el:$('#disableJobMenu')});
-	  	 this.handleGeneralOptionsSelect();
-	  },
-	  show: function() {
-	  	this.handleGeneralOptionsSelect();
-	  	$('#advScheduleModalBackground').show();
-	  	$('#schedule-options').show();
-	  	this.cloneModel = this.model.clone();
-	  	scheduleModel = this.cloneModel;
-	  	
-	  	var fetchData = {"project": projectName, "ajax":"flowInfo", "flow":flowName};
-//	  	if (execId) {
-//	  		fetchData.execid = execId;
-//	  	}
-	  	this.executeURL = contextURL + "/executor";
-	  	this.scheduleURL = contextURL + "/schedule";
-	  	var handleAddRow = this.handleAddRow;
-	  	
-	  	var data = this.cloneModel.get("data");
-	  	var nodes = {};
-	  	for (var i=0; i < data.nodes.length; ++i) {
-	      	var node = data.nodes[i];
-	      	nodes[node.id] = node;
-	    }
-    	
-    	for (var i=0; i < data.edges.length; ++i) {
-    	  	var edge = data.edges[i];
-    	  	var fromNode = nodes[edge.from];
-    	  	var toNode = nodes[edge.target];
-    	  	
-    	  	if (!fromNode.outNodes) {
-    	  		fromNode.outNodes = {};
-    	  	}
-    	  	fromNode.outNodes[toNode.id] = toNode;
-    	  	
-    	  	if (!toNode.inNodes) {
-    	  		toNode.inNodes = {};
-    	  	}
-    	  	toNode.inNodes[fromNode.id] = fromNode;
-    	}
-	    this.cloneModel.set({nodes: nodes});
-	  	
-	  	var disabled = {};
-//		for (var i = 0; i < data.nodes.length; ++i) {
-//			var updateNode = data.nodes[i];
-//			if (updateNode.status == "DISABLED" || updateNode.status == "SKIPPED") {
-//				updateNode.status = "READY";
-//				disabled[updateNode.id] = true;
-//			}
-//			if (updateNode.status == "SUCCEEDED") {
-//				disabled[updateNode.id] = true;
-//			}
-//		}
-	  	this.cloneModel.set({disabled: disabled});
-	  	
-	  	$.get(
-			this.executeURL,
-			fetchData,
-			function(data) {
-				if (data.error) {
-					alert(data.error);
-				}
-				else {
-					if (data.successEmails) {
-						$('#scheduleSuccessEmails').val(data.successEmails.join());
-					}
-					if (data.failureEmails) {
-						$('#scheduleFailureEmails').val(data.failureEmails.join());
-					}
-					
-					if (data.failureAction) {
-						$('#scheduleFailureAction').val(data.failureAction);
-					}
-					if (data.notifyFailureFirst) {
-						$('#scheduleNotifyFailureFirst').attr('checked', true);
-					}
-					if (data.notifyFailureLast) {
-						$('#scheduleNotifyFailureLast').attr('checked', true);	
-					}
-					if (data.flowParam) {
-						var flowParam = data.flowParam;
-						for (var key in flowParam) {
-							var row = handleAddRow();
-							var td = $(row).find('td');
-							$(td[0]).text(key);
-							$(td[1]).text(flowParam[key]);
-						}
-					}
-
-					if (!data.running || data.running.length == 0) {
-						$(".radio").attr("disabled", "disabled");
-						$(".radioLabel").addClass("disabled", "disabled");
-					}
-				}
-			},
-			"json"
-		);
-	  },
-	  handleCancel: function(evt) {
-	  	var scheduleURL = contextURL + "/schedule";
-		$('#advScheduleModalBackground').hide();
-	  	$('#schedule-options').hide();
-	  },
-	  handleGeneralOptionsSelect: function(evt) {
-	  	$('#scheduleFlowOptions').removeClass('selected');
-	  	$('#scheduleSlaOptions').removeClass('selected');
-	  	$('#scheduleGeneralOptions').addClass('selected');
-
-	  	$('#scheduleGeneralPanel').show();	  	
-	  	$('#scheduleGraphPanel').hide();
-	  	$('#scheduleSlaPanel').hide();
-	  },
-	  handleSlaOptionsSelect: function(evt) {
-		  	$('#scheduleFlowOptions').removeClass('selected');
-		  	$('#scheduleSlaOptions').addClass('selected');
-		  	$('#scheduleGeneralOptions').removeClass('selected');
-		  	
-		  	$('#scheduleSlaPanel').show();	  	
-		  	$('#scheduleGraphPanel').hide();
-		  	$('#scheduleGeneralPanel').hide();
-		  },
-	  handleFlowOptionsSelect: function(evt) {
-	  	$('#scheduleGeneralOptions').removeClass('selected');
-	  	$('#scheduleFlowOptions').addClass('selected');
-	  	$('#scheduleSlaOptions').removeClass('selected');
-	  	
-	  	$('#scheduleGraphPanel').show();	  	
-	  	$('#scheduleGeneralPanel').hide();
-	  	$('#scheduleSlaPanel').hide();
-	  	
-	  	if (this.flowSetup) {
-	  		return;
-	  	}
-	  	
-	  	customSvgGraphView = new azkaban.SvgGraphView({el:$('#scheduleSvgDivCustom'), model: this.cloneModel, rightClick: {id: 'disableJobMenu', callback: this.handleDisableMenuClick}});
-		customJobsListView = new azkaban.JobListView({el:$('#scheduleJobListCustom'), model: this.cloneModel, rightClick: {id: 'disableJobMenu', callback: this.handleDisableMenuClick}});
-		this.cloneModel.trigger("change:graph");
-		
-		this.flowSetup = true;
-	  },
-	  handleScheduleFlow: function(evt) {
-	  	var scheduleURL = contextURL + "/scheduler";
-	  	var disabled = this.cloneModel.get("disabled");
-	  	var failureAction = $('#failureAction').val();
-	  	var failureEmails = $('#failureEmails').val();
-	  	var successEmails = $('#successEmails').val();
-	  	var notifyFailureFirst = $('#notifyFailureFirst').is(':checked');
-	  	var notifyFailureLast = $('#notifyFailureLast').is(':checked');
-	  	var executingJobOption = $('input:radio[name=gender]:checked').val();
-	  	
-	  	var flowOverride = {};
-	  	var editRows = $(".editRow");
-		for (var i = 0; i < editRows.length; ++i) {
-			var row = editRows[i];
-			var td = $(row).find('td');
-			var key = $(td[0]).text();
-			var val = $(td[1]).text();
-			
-			if (key && key.length > 0) {
-				flowOverride[key] = val;
-			}
-		}
-
-	  	var scheduleData = {
-	  		project: projectName,
-	  		ajax: "advScheduleFlow",
-	  		flow: flowName,
-	  		hour: $('#advHour').val(),
-	  		min: $('#advMinutes').val(),
-	  		am_pm: $('#advAm_pm').val(),
-	  		timezone: $('#advTimezone').val(),
-	  		date: $('#advDate').val(),
-	  		period: $('#advPeriod').val()+$('#advPeriod_units').val(),
-	  		disable: this.cloneModel.get('disabled'),
-	  		failureAction: failureAction,
-	  		failureEmails: failureEmails,
-	  		successEmails: successEmails,
-	  		notifyFailureFirst: notifyFailureFirst,
-	  		notifyFailureLast: notifyFailureLast,
-	  		executingJobOption: executingJobOption,
-	  		flowOverride: flowOverride
-	  	};
-	  	
-		$.get(
-			scheduleURL,
-			scheduleData,
-			function(data) {
-				if (data.error) {
-					alert(data.error);
-				}
-				else {
-					var redirectURL = contextURL + "/schedule";
-					window.location.href = redirectURL;
-				}
-			},
-			"json"
-		);
-	  },
-	  handleAddRow: function(evt) {
-	  	var tr = document.createElement("tr");
-	  	var tdName = document.createElement("td");
-	    var tdValue = document.createElement("td");
-	    
-	    var icon = document.createElement("span");
-	    $(icon).addClass("removeIcon");
-	    var nameData = document.createElement("span");
-	    $(nameData).addClass("spanValue");
-	    var valueData = document.createElement("span");
-	    $(valueData).addClass("spanValue");
-	    	    
-		$(tdName).append(icon);
-		$(tdName).append(nameData);
-		$(tdName).addClass("name");
-		$(tdName).addClass("editable");
-		
-		$(tdValue).append(valueData);
-	    $(tdValue).addClass("editable");
-		
-		$(tr).addClass("editRow");
-	  	$(tr).append(tdName);
-	  	$(tr).append(tdValue);
-	   
-	  	$(tr).insertBefore("#scheduleAddRow");
-	  	return tr;
-	  },
-	  handleEditColumn : function(evt) {
-	  	var curTarget = evt.currentTarget;
-	
-	  	if (this.editingTarget != curTarget) {
-			this.closeEditingTarget();
-			
-			var text = $(curTarget).children(".spanValue").text();
-			$(curTarget).empty();
-						
-			var input = document.createElement("input");
-			$(input).attr("type", "text");
-			$(input).css("width", "100%");
-			$(input).val(text);
-			$(curTarget).addClass("editing");
-			$(curTarget).append(input);
-			$(input).focus();
-			this.editingTarget = curTarget;
-	  	}
-	  },
-	  handleRemoveColumn : function(evt) {
-	  	var curTarget = evt.currentTarget;
-	  	// Should be the table
-	  	var row = curTarget.parentElement.parentElement;
-		$(row).remove();
-	  },
-	  closeEditingTarget: function(evt) {
-	  	if (this.editingTarget != null && this.editingTarget != evt.target && this.editingTarget != evt.target.parentElement ) {
-	  		var input = $(this.editingTarget).children("input")[0];
-	  		var text = $(input).val();
-	  		$(input).remove();
-
-		    var valueData = document.createElement("span");
-		    $(valueData).addClass("spanValue");
-		    $(valueData).text(text);
-
-	  		if ($(this.editingTarget).hasClass("name")) {
-		  		var icon = document.createElement("span");
-		    	$(icon).addClass("removeIcon");
-		    	$(this.editingTarget).append(icon);
-		    }
-		    
-		    $(this.editingTarget).removeClass("editing");
-		    $(this.editingTarget).append(valueData);
-		    this.editingTarget = null;
-	  	}
-	  },
-	  handleDisableMenuClick : function(action, el, pos) {
-			var jobid = el[0].jobid;
-			var requestURL = contextURL + "/manager?project=" + projectName + "&flow=" + flowName + "&job=" + jobid;
-			if (action == "open") {
-				window.location.href = requestURL;
-			}
-			else if(action == "openwindow") {
-				window.open(requestURL);
-			}
-			else if(action == "disable") {
-				var disabled = scheduleModel.get("disabled");
-		
-				disabled[jobid] = true;
-				scheduleModel.set({disabled: disabled});
-				scheduleModel.trigger("change:disabled");
-			}
-			else if(action == "disableAll") {
-				var disabled = scheduleModel.get("disabled");
-		
-				var nodes = scheduleModel.get("nodes");
-				for (var key in nodes) {
-					disabled[key] = true;
-				}
-
-				scheduleModel.set({disabled: disabled});
-				scheduleModel.trigger("change:disabled");
-			}
-			else if (action == "disableParents") {
-				var disabled = scheduleModel.get("disabled");
-				var nodes = scheduleModel.get("nodes");
-				var inNodes = nodes[jobid].inNodes;
-		
-				if (inNodes) {
-					for (var key in inNodes) {
-					  disabled[key] = true;
-					}
-				}
-				
-				scheduleModel.set({disabled: disabled});
-				scheduleModel.trigger("change:disabled");
-			}
-			else if (action == "disableChildren") {
-				var disabledMap = scheduleModel.get("disabled");
-				var nodes = scheduleModel.get("nodes");
-				var outNodes = nodes[jobid].outNodes;
-		
-				if (outNodes) {
-					for (var key in outNodes) {
-					  disabledMap[key] = true;
-					}
-				}
-				
-				scheduleModel.set({disabled: disabledMap});
-				scheduleModel.trigger("change:disabled");
-			}
-			else if (action == "disableAncestors") {
-				var disabled = scheduleModel.get("disabled");
-				var nodes = scheduleModel.get("nodes");
-				
-				recurseAllAncestors(nodes, disabled, jobid, true);
-				
-				scheduleModel.set({disabled: disabled});
-				scheduleModel.trigger("change:disabled");
-			}
-			else if (action == "disableDescendents") {
-				var disabled = scheduleModel.get("disabled");
-				var nodes = scheduleModel.get("nodes");
-				
-				recurseAllDescendents(nodes, disabled, jobid, true);
-				
-				scheduleModel.set({disabled: disabled});
-				scheduleModel.trigger("change:disabled");
-			}
-			else if(action == "enable") {
-				var disabled = scheduleModel.get("disabled");
-		
-				disabled[jobid] = false;
-				scheduleModel.set({disabled: disabled});
-				scheduleModel.trigger("change:disabled");
-			}
-			else if(action == "enableAll") {
-				disabled = {};
-				scheduleModel.set({disabled: disabled});
-				scheduleModel.trigger("change:disabled");
-			}
-			else if (action == "enableParents") {
-				var disabled = scheduleModel.get("disabled");
-				var nodes = scheduleModel.get("nodes");
-				var inNodes = nodes[jobid].inNodes;
-		
-				if (inNodes) {
-					for (var key in inNodes) {
-					  disabled[key] = false;
-					}
-				}
-				
-				scheduleModel.set({disabled: disabled});
-				scheduleModel.trigger("change:disabled");
-			}
-			else if (action == "enableChildren") {
-				var disabled = scheduleModel.get("disabled");
-				var nodes = scheduleModel.get("nodes");
-				var outNodes = nodes[jobid].outNodes;
-		
-				if (outNodes) {
-					for (var key in outNodes) {
-					  disabled[key] = false;
-					}
-				}
-				
-				scheduleModel.set({disabled: disabled});
-				scheduleModel.trigger("change:disabled");
-			}
-			else if (action == "enableAncestors") {
-				var disabled = scheduleModel.get("disabled");
-				var nodes = scheduleModel.get("nodes");
-				
-				recurseAllAncestors(nodes, disabled, jobid, false);
-				
-				this.cloneModel.set({disabled: disabled});
-				this.cloneModel.trigger("change:disabled");
-			}
-			else if (action == "enableDescendents") {
-				var disabled = this.cloneModel.get("disabled");
-				var nodes = this.cloneModel.get("nodes");
-				
-				recurseAllDescendents(nodes, disabled, jobid, false);
-				
-				scheduleModel.set({disabled: disabled});
-				scheduleModel.trigger("change:disabled");
-			}
-		}
-});
-
 azkaban.ExecuteFlowView = Backbone.View.extend({
   	  events : {
   	  	"click" : "closeEditingTarget",
@@ -668,21 +247,21 @@ azkaban.ExecuteFlowView = Backbone.View.extend({
 				flowOverride[key] = val;
 			}
 		}
-	  	
-	  	var executingData = {
-	  		project: projectName,
-	  		ajax: "executeFlow",
-	  		flow: flowName,
-	  		disable: this.cloneModel.get('disabled'),
-	  		failureAction: failureAction,
-	  		failureEmails: failureEmails,
-	  		successEmails: successEmails,
-	  		notifyFailureFirst: notifyFailureFirst,
-	  		notifyFailureLast: notifyFailureLast,
-	  		executingJobOption: executingJobOption,
-	  		flowOverride: flowOverride
-	  	};
-	  	
+
+		var executingData = {
+			project: projectName,
+			ajax: "executeFlow",
+			flow: flowName,
+			disable: this.cloneModel.get('disabled'),
+			failureAction: failureAction,
+			failureEmails: failureEmails,
+			successEmails: successEmails,
+			notifyFailureFirst: notifyFailureFirst,
+			notifyFailureLast: notifyFailureLast,
+			executingJobOption: executingJobOption,
+			flowOverride: flowOverride
+		};
+
 		executeFlow(executingData);
 	  },
 	  handleAddRow: function(evt) {
@@ -729,7 +308,7 @@ azkaban.ExecuteFlowView = Backbone.View.extend({
 			$(curTarget).append(input);
 			$(input).focus();
 			this.editingTarget = curTarget;
-	  	}
+			}
 	  },
 	  handleRemoveColumn : function(evt) {
 	  	var curTarget = evt.currentTarget;
diff --git a/src/web/js/azkaban.flow.view.js b/src/web/js/azkaban.flow.view.js
index 7c05e77..584e3a4 100644
--- a/src/web/js/azkaban.flow.view.js
+++ b/src/web/js/azkaban.flow.view.js
@@ -280,88 +280,6 @@ azkaban.GraphModel = Backbone.Model.extend({});
 var executionModel;
 azkaban.ExecutionModel = Backbone.Model.extend({});
 
-var scheduleFlowView;
-azkaban.ScheduleFlowView = Backbone.View.extend({
-	events : {
-	"click #schedule-btn": "handleScheduleFlow",
-	"click #adv-schedule-opt-btn": "handleAdvancedSchedule"
-	},
-	initialize : function(settings) {
-		$( "#datepicker" ).datepicker();
-		$( "#datepicker" ).datepicker('setDate', new Date());
-		$("#errorMsg").hide();
-	},
-	handleAdvancedSchedule : function(evt) {
-		console.log("Clicked advanced schedule options button");
-		//$('#confirm-container').hide();
-		$.modal.close();
-		advancedScheduleView.show();
-	},
-	show: function() {
-//		this.cloneModel = this.model.clone();
-//		cloneModel = this.cloneModel;
-	},
-	handleScheduleFlow : function(evt) {
-
-	var hourVal = $('#hour').val();
-	var minutesVal = $('#minutes').val();
-	var ampmVal = $('#am_pm').val();
-	var timezoneVal = $('#timezone').val();
-	var dateVal = $('#datepicker').val();
-	var is_recurringVal = $('#is_recurring').val();
-	var periodVal = $('#period').val();
-	var periodUnits = $('#period_units').val();
-
-	console.log("Creating schedule for "+projectName+"."+flowName);
-	$.ajax({
-		async: "false",
-		url: "schedule",
-		dataType: "json",
-		type: "POST",
-		data: {
-			action:"scheduleFlow", 
-			projectId:projectId,
-			projectName:projectName, 
-			flowName:flowName,
-			hour:hourVal,
-			minutes:minutesVal,
-			am_pm:ampmVal,
-			timezone:timezoneVal,
-			date:dateVal,
-			userExec:"dummy",
-			is_recurring:is_recurringVal,
-			period:periodVal,
-			period_units:periodUnits
-			},
-		success: function(data) {
-			if (data.status == "success") {
-				console.log("Successfully scheduled for "+projectName+"."+flowName);
-				if (data.action == "redirect") {
-					window.location = contextURL + "/manager?project=" + projectName + "&flow=" + flowName ;
-				}
-				else{
-					$("#success_message").text("Flow " + projectName + "." + flowName + " scheduled!" );			
-	 				window.location = contextURL + "/manager?project=" + projectName + "&flow=" + flowName ; 
-				}
-			}
-			else {
-				if (data.action == "login") {
-					window.location = "";
-				}
-				else {
-					$("#errorMsg").text("ERROR: " + data.message);
-					$("#errorMsg").slideDown("fast");
-				}
-			}
-		}
-	});
-
-	},
-	render: function() {
-	}
-});
-
-
 $(function() {
 	var selected;
 	// Execution model has to be created before the window switches the tabs.
@@ -374,8 +292,9 @@ $(function() {
 	svgGraphView = new azkaban.SvgGraphView({el:$('#svgDiv'), model: graphModel, rightClick: {id: 'jobMenu', callback: handleJobMenuClick}});
 	jobsListView = new azkaban.JobListView({el:$('#jobList'), model: graphModel, rightClick: {id: 'jobMenu', callback: handleJobMenuClick}});
 	scheduleFlowView = new azkaban.ScheduleFlowView({el:$('#schedule-flow'),   model: graphModel});
-	executeFlowView = new azkaban.ExecuteFlowView({el:$('#executing-options'), model: graphModel});
 	advancedScheduleView = new azkaban.AdvancedScheduleView({el:$('#schedule-options'), model: graphModel});
+	executeFlowView = new azkaban.ExecuteFlowView({el:$('#executing-options'), model: graphModel});
+	
 	var requestURL = contextURL + "/manager";
 
 	// Set up the Flow options view. Create a new one every time :p
diff --git a/src/web/js/azkaban.schedule.options.view.js b/src/web/js/azkaban.schedule.options.view.js
new file mode 100644
index 0000000..5b9b760
--- /dev/null
+++ b/src/web/js/azkaban.schedule.options.view.js
@@ -0,0 +1,586 @@
+/*
+ * Copyright 2012 LinkedIn, Inc
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+$.namespace('azkaban');
+
+var scheduleCustomSvgGraphView;
+var scheduleCustomJobListView;
+
+var scheduleFlowView;
+
+var scheduleFlowData;
+
+//function recurseAllAncestors(nodes, disabledMap, id, disable) {
+//	var node = nodes[id];
+//	
+//	if (node.inNodes) {
+//		for (var key in node.inNodes) {
+//			disabledMap[key] = disable;
+//			recurseAllAncestors(nodes, disabledMap, key, disable);
+//		}
+//	}
+//}
+//
+//function recurseAllDescendents(nodes, disabledMap, id, disable) {
+//	var node = nodes[id];
+//	
+//	if (node.outNodes) {
+//		for (var key in node.outNodes) {
+//			disabledMap[key] = disable;
+//			recurseAllDescendents(nodes, disabledMap, key, disable);
+//		}
+//	}
+//}
+//
+azkaban.ScheduleContextMenu = Backbone.View.extend({
+	events : {
+		"click #scheduleDisableArrow" : "handleDisabledClick",
+		"click #scheduleEnableArrow" : "handleEnabledClick"
+	},
+	initialize: function(settings) {
+		$('#scheduleDisableSub').hide();
+		$('#scheduleEnableSub').hide();
+	},
+	handleEnabledClick: function(evt) {
+		if(evt.stopPropagation) {
+			evt.stopPropagation();
+		}
+		evt.cancelBubble=true;
+		
+		if (evt.currentTarget.expanded) {
+			evt.currentTarget.expanded=false;
+			$('#scheduleEnableArrow').removeClass('collapse');
+			$('#scheduleEnableSub').hide();
+		}
+		else {
+			evt.currentTarget.expanded=true;
+			$('#scheduleEnableArrow').addClass('collapse');
+			$('#scheduleEnableSub').show();
+		}
+	},
+	handleDisabledClick: function(evt) {
+		if(evt.stopPropagation) {
+			evt.stopPropagation();
+		}
+		evt.cancelBubble=true;
+		
+		if (evt.currentTarget.expanded) {
+			evt.currentTarget.expanded=false;
+			$('#scheduleDisableArrow').removeClass('collapse');
+			$('#scheduleDisableSub').hide();
+		}
+		else {
+			evt.currentTarget.expanded=true;
+			$('#scheduleDisableArrow').addClass('collapse');
+			$('#scheduleDisableSub').show();
+		}
+	}
+});
+
+azkaban.ScheduleFlowView = Backbone.View.extend({
+	events : {
+	"click #schedule-btn": "handleScheduleFlow",
+	"click #adv-schedule-opt-btn": "handleAdvancedSchedule"
+	},
+	initialize : function(settings) {
+		$( "#datepicker" ).datepicker();
+		$( "#datepicker" ).datepicker('setDate', new Date());
+		$("#errorMsg").hide();
+	},
+	handleAdvancedSchedule : function(evt) {
+		console.log("Clicked advanced schedule options button");
+		//$('#confirm-container').hide();
+		$.modal.close();
+		advancedScheduleView.show();
+	},
+	handleScheduleFlow : function(evt) {
+
+	var hourVal = $('#hour').val();
+	var minutesVal = $('#minutes').val();
+	var ampmVal = $('#am_pm').val();
+	var timezoneVal = $('#timezone').val();
+	var dateVal = $('#datepicker').val();
+	var is_recurringVal = $('#is_recurring').val();
+	var periodVal = $('#period').val();
+	var periodUnits = $('#period_units').val();
+
+	console.log("Creating schedule for "+projectName+"."+flowName);
+	$.ajax({
+		async: "false",
+		url: "schedule",
+		dataType: "json",
+		type: "POST",
+		data: {
+			action:"scheduleFlow", 
+			projectId:projectId,
+			projectName:projectName, 
+			flowName:flowName,
+			hour:hourVal,
+			minutes:minutesVal,
+			am_pm:ampmVal,
+			timezone:timezoneVal,
+			date:dateVal,
+			userExec:"dummy",
+			is_recurring:is_recurringVal,
+			period:periodVal,
+			period_units:periodUnits
+			},
+		success: function(data) {
+			if (data.status == "success") {
+				console.log("Successfully scheduled for "+projectName+"."+flowName);
+				if (data.action == "redirect") {
+					window.location = contextURL + "/manager?project=" + projectName + "&flow=" + flowName ;
+				}
+				else{
+					$("#success_message").text("Flow " + projectName + "." + flowName + " scheduled!" );			
+	 				window.location = contextURL + "/manager?project=" + projectName + "&flow=" + flowName ; 
+				}
+			}
+			else {
+				if (data.action == "login") {
+					window.location = "";
+				}
+				else {
+					$("#errorMsg").text("ERROR: " + data.message);
+					$("#errorMsg").slideDown("fast");
+				}
+			}
+		}
+	});
+
+	},
+	render: function() {
+	}
+});
+
+azkaban.AdvancedScheduleView = Backbone.View.extend({
+	events : {
+		"click" : "closeEditingTarget",
+		"click #adv-schedule-btn": "handleAdvSchedule",
+		"click #schedule-cancel-btn": "handleCancel",
+		"click .modal-close": "handleCancel",
+		"click #scheduleGeneralOptions": "handleGeneralOptionsSelect",
+		"click #scheduleFlowOptions": "handleFlowOptionsSelect",
+		"click #scheduleAddRow": "handleAddRow",
+		"click table .editable": "handleEditColumn",
+		"click table .removeIcon": "handleRemoveColumn"
+	},
+	initialize: function(setting) {
+		this.contextMenu = new azkaban.ScheduleContextMenu({el:$('#scheduleDisableJobMenu')});
+		this.handleGeneralOptionsSelect();
+		$( "#advdatepicker" ).datepicker();
+		$( "#advdatepicker" ).datepicker('setDate', new Date());
+	},
+	show: function() {
+		$('#scheduleModalBackground').show();
+		$('#schedule-options').show();
+		this.handleGeneralOptionsSelect();
+
+		scheduleFlowData = this.model.clone();
+		this.flowData = scheduleFlowData;
+		var flowData = scheduleFlowData;
+		
+		var fetchData = {"project": projectName, "ajax":"flowInfo", "flow":flowName};
+
+		var executeURL = contextURL + "/executor";
+		this.executeURL = executeURL;
+		var scheduleURL = contextURL + "/schedule";
+		this.scheduleURL = scheduleURL;
+		var handleAddRow = this.handleAddRow;
+
+		var data = flowData.get("data");
+		var nodes = {};
+		for (var i=0; i < data.nodes.length; ++i) {
+			var node = data.nodes[i];
+			nodes[node.id] = node;
+		}
+
+		for (var i=0; i < data.edges.length; ++i) {
+			var edge = data.edges[i];
+			var fromNode = nodes[edge.from];
+			var toNode = nodes[edge.target];
+
+			if (!fromNode.outNodes) {
+				fromNode.outNodes = {};
+			}
+			fromNode.outNodes[toNode.id] = toNode;
+
+			if (!toNode.inNodes) {
+				toNode.inNodes = {};
+			}
+			toNode.inNodes[fromNode.id] = fromNode;
+		}
+		flowData.set({nodes: nodes});
+
+		var disabled = {};
+		for (var i = 0; i < data.nodes.length; ++i) {
+			var updateNode = data.nodes[i];
+			if (updateNode.status == "DISABLED" || updateNode.status == "SKIPPED") {
+				updateNode.status = "READY";
+				disabled[updateNode.id] = true;
+			}
+			if (updateNode.status == "SUCCEEDED" || updateNode.status=="RUNNING") {
+				disabled[updateNode.id] = true;
+			}
+		}
+		flowData.set({disabled: disabled});
+
+		$.get(
+			executeURL,
+			fetchData,
+			function(data) {
+				if (data.error) {
+					alert(data.error);
+				}
+				else {
+					if (data.successEmails) {
+						$('#scheduleSuccessEmails').val(data.successEmails.join());
+					}
+					if (data.failureEmails) {
+						$('#scheduleFailureEmails').val(data.failureEmails.join());
+					}
+					
+					if (data.failureAction) {
+						$('#scheduleFailureAction').val(data.failureAction);
+					}
+					if (data.notifyFailureFirst) {
+						$('#scheduleNotifyFailureFirst').attr('checked', true);
+					}
+					if (data.notifyFailureLast) {
+						$('#scheduleNotifyFailureLast').attr('checked', true);	
+					}
+					if (data.flowParam) {
+						var flowParam = data.flowParam;
+						for (var key in flowParam) {
+							var row = handleAddRow();
+							var td = $(row).find('td');
+							$(td[0]).text(key);
+							$(td[1]).text(flowParam[key]);
+						}
+					}
+
+					if (!data.running || data.running.length == 0) {
+						$(".radio").attr("disabled", "disabled");
+						$(".radioLabel").addClass("disabled", "disabled");
+					}
+				}
+			},
+			"json"
+		);
+	},
+	handleCancel: function(evt) {
+		$('#scheduleModalBackground').hide();
+		$('#schedule-options').hide();
+	},
+	handleGeneralOptionsSelect: function(evt) {
+		$('#scheduleFlowOptions').removeClass('selected');
+		$('#scheduleGeneralOptions').addClass('selected');
+
+		$('#scheduleGeneralPanel').show();	  	
+		$('#scheduleGraphPanel').hide();
+	},
+	handleFlowOptionsSelect: function(evt) {
+		$('#scheduleGeneralOptions').removeClass('selected');
+		$('#scheduleFlowOptions').addClass('selected');
+
+		$('#scheduleGraphPanel').show();	  	
+		$('#scheduleGeneralPanel').hide();
+
+		if (this.flowSetup) {
+			return;
+		}
+
+		scheduleCustomSvgGraphView = new azkaban.SvgGraphView({el:$('#scheduleSvgDivCustom'), model: scheduleFlowData, rightClick: {id: 'scheduleDisableJobMenu', callback: this.handleDisableMenuClick}});
+		scheduleCustomJobsListView = new azkaban.JobListView({el:$('#scheduleJobListCustom'), model: scheduleFlowData, rightClick: {id: 'scheduleDisableJobMenu', callback: this.handleDisableMenuClick}});
+		scheduleFlowData.trigger("change:graph");
+		
+		this.flowSetup = true;
+	},
+	handleAdvSchedule: function(evt) {
+		var scheduleURL = this.scheduleURL;
+		var disabled = this.flowData.get("disabled");
+		var disabledJobs = "";
+		for(var job in disabled) {
+			if(disabled[job] == true) {
+				disabledJobs += "," + job;
+			}
+		}
+		var failureAction = $('#scheduleFailureAction').val();
+		var failureEmails = $('#scheduleFailureEmails').val();
+		var successEmails = $('#scheduleSuccessEmails').val();
+		var notifyFailureFirst = $('#scheduleNotifyFailureFirst').is(':checked');
+		var notifyFailureLast = $('#scheduleNotifyFailureLast').is(':checked');
+		var executingJobOption = $('input:radio[name=gender]:checked').val();
+
+		
+		var scheduleTime = $('#advhour').val() + "," + $('#advminutes').val() + "," + $('#advam_pm').val() + "," + $('#advtimezone').val();
+		var scheduleDate = $('#advdatepicker').val();
+		var is_recurring = $('#advis_recurring').val();
+		var period = $('#advperiod').val() + $('#advperiod_units').val();
+
+		var flowOverride = {};
+		var editRows = $(".editRow");
+		for (var i = 0; i < editRows.length; ++i) {
+			var row = editRows[i];
+			var td = $(row).find('td');
+			var key = $(td[0]).text();
+			var val = $(td[1]).text();
+			
+			if (key && key.length > 0) {
+				flowOverride[key] = val;
+			}
+		}
+
+		var scheduleData = {
+			projectId:projectId,
+			projectName: projectName,
+			ajax: "advSchedule",
+			flowName: flowName,
+			scheduleTime: scheduleTime,
+			scheduleDate: scheduleDate,
+			is_recurring: is_recurring,
+			period: period,
+			disabledJobs: disabledJobs,
+			failureAction: failureAction,
+			failureEmails: failureEmails,
+			successEmails: successEmails,
+			notifyFailureFirst: notifyFailureFirst,
+			notifyFailureLast: notifyFailureLast,
+			executingJobOption: executingJobOption,
+			flowOverride: flowOverride
+		};
+
+		$.post(
+				scheduleURL,
+				scheduleData,
+				function(data) {
+					if (data.error) {
+						alert(data.error);
+					}
+					else {
+						window.location = scheduleURL;
+					}
+				},
+				"json"
+		)
+	},
+	handleAddRow: function(evt) {
+		var tr = document.createElement("tr");
+		var tdName = document.createElement("td");
+		var tdValue = document.createElement("td");
+
+		var icon = document.createElement("span");
+		$(icon).addClass("removeIcon");
+		var nameData = document.createElement("span");
+		$(nameData).addClass("spanValue");
+		var valueData = document.createElement("span");
+		$(valueData).addClass("spanValue");
+
+		$(tdName).append(icon);
+		$(tdName).append(nameData);
+		$(tdName).addClass("name");
+		$(tdName).addClass("editable");
+		
+		$(tdValue).append(valueData);
+		$(tdValue).addClass("editable");
+		
+		$(tr).addClass("editRow");
+		$(tr).append(tdName);
+		$(tr).append(tdValue);
+
+		$(tr).insertBefore("#scheduleAddRow");
+		return tr;
+	},
+	handleEditColumn : function(evt) {
+		var curTarget = evt.currentTarget;
+	
+		if (this.editingTarget != curTarget) {
+			this.closeEditingTarget();
+			
+			var text = $(curTarget).children(".spanValue").text();
+			$(curTarget).empty();
+						
+			var input = document.createElement("input");
+			$(input).attr("type", "text");
+			$(input).css("width", "100%");
+			$(input).val(text);
+			$(curTarget).addClass("editing");
+			$(curTarget).append(input);
+			$(input).focus();
+			this.editingTarget = curTarget;
+			}
+	},
+	handleRemoveColumn : function(evt) {
+		var curTarget = evt.currentTarget;
+		// Should be the table
+		var row = curTarget.parentElement.parentElement;
+		$(row).remove();
+	},
+	closeEditingTarget: function(evt) {
+		if (this.editingTarget != null && this.editingTarget != evt.target && this.editingTarget != evt.target.parentElement ) {
+			var input = $(this.editingTarget).children("input")[0];
+			var text = $(input).val();
+			$(input).remove();
+
+			var valueData = document.createElement("span");
+			$(valueData).addClass("spanValue");
+			$(valueData).text(text);
+
+			if ($(this.editingTarget).hasClass("name")) {
+				var icon = document.createElement("span");
+				$(icon).addClass("removeIcon");
+				$(this.editingTarget).append(icon);
+			}
+
+			$(this.editingTarget).removeClass("editing");
+			$(this.editingTarget).append(valueData);
+			this.editingTarget = null;
+		}
+	},
+	handleDisableMenuClick : function(action, el, pos) {
+		var flowData = scheduleFlowData;
+		var jobid = el[0].jobid;
+		var requestURL = contextURL + "/manager?project=" + projectName + "&flow=" + flowName + "&job=" + jobid;
+		if (action == "open") {
+			window.location.href = requestURL;
+		}
+		else if(action == "openwindow") {
+			window.open(requestURL);
+		}
+		else if(action == "disable") {
+			var disabled = flowData.get("disabled");
+	
+			disabled[jobid] = true;
+			flowData.set({disabled: disabled});
+			flowData.trigger("change:disabled");
+		}
+		else if(action == "disableAll") {
+			var disabled = flowData.get("disabled");
+	
+			var nodes = flowData.get("nodes");
+			for (var key in nodes) {
+				disabled[key] = true;
+			}
+
+			flowData.set({disabled: disabled});
+			flowData.trigger("change:disabled");
+		}
+		else if (action == "disableParents") {
+			var disabled = flowData.get("disabled");
+			var nodes = flowData.get("nodes");
+			var inNodes = nodes[jobid].inNodes;
+	
+			if (inNodes) {
+				for (var key in inNodes) {
+				  disabled[key] = true;
+				}
+			}
+			
+			flowData.set({disabled: disabled});
+			flowData.trigger("change:disabled");
+		}
+		else if (action == "disableChildren") {
+			var disabledMap = flowData.get("disabled");
+			var nodes = flowData.get("nodes");
+			var outNodes = nodes[jobid].outNodes;
+	
+			if (outNodes) {
+				for (var key in outNodes) {
+				  disabledMap[key] = true;
+				}
+			}
+			
+			flowData.set({disabled: disabledMap});
+			flowData.trigger("change:disabled");
+		}
+		else if (action == "disableAncestors") {
+			var disabled = flowData.get("disabled");
+			var nodes = flowData.get("nodes");
+			
+			recurseAllAncestors(nodes, disabled, jobid, true);
+			
+			flowData.set({disabled: disabled});
+			flowData.trigger("change:disabled");
+		}
+		else if (action == "disableDescendents") {
+			var disabled = flowData.get("disabled");
+			var nodes = flowData.get("nodes");
+			
+			recurseAllDescendents(nodes, disabled, jobid, true);
+			
+			flowData.set({disabled: disabled});
+			flowData.trigger("change:disabled");
+		}
+		else if(action == "enable") {
+			var disabled = flowData.get("disabled");
+	
+			disabled[jobid] = false;
+			flowData.set({disabled: disabled});
+			flowData.trigger("change:disabled");
+		}
+		else if(action == "enableAll") {
+			disabled = {};
+			flowData.set({disabled: disabled});
+			flowData.trigger("change:disabled");
+		}
+		else if (action == "enableParents") {
+			var disabled = flowData.get("disabled");
+			var nodes = flowData.get("nodes");
+			var inNodes = nodes[jobid].inNodes;
+	
+			if (inNodes) {
+				for (var key in inNodes) {
+				  disabled[key] = false;
+				}
+			}
+			
+			flowData.set({disabled: disabled});
+			flowData.trigger("change:disabled");
+		}
+		else if (action == "enableChildren") {
+			var disabled = flowData.get("disabled");
+			var nodes = flowData.get("nodes");
+			var outNodes = nodes[jobid].outNodes;
+	
+			if (outNodes) {
+				for (var key in outNodes) {
+				  disabled[key] = false;
+				}
+			}
+			
+			flowData.set({disabled: disabled});
+			flowData.trigger("change:disabled");
+		}
+		else if (action == "enableAncestors") {
+			var disabled = flowData.get("disabled");
+			var nodes = flowData.get("nodes");
+			
+			recurseAllAncestors(nodes, disabled, jobid, false);
+			
+			flowData.set({disabled: disabled});
+			flowData.trigger("change:disabled");
+		}
+		else if (action == "enableDescendents") {
+			var disabled = flowData.get("disabled");
+			var nodes = flowData.get("nodes");
+			
+			recurseAllDescendents(nodes, disabled, jobid, false);
+			
+			flowData.set({disabled: disabled});
+			flowData.trigger("change:disabled");
+		}
+	}
+});
\ No newline at end of file
diff --git a/src/web/js/azkaban.scheduled.view.js b/src/web/js/azkaban.scheduled.view.js
index 9b0ee9a..b66374b 100644
--- a/src/web/js/azkaban.scheduled.view.js
+++ b/src/web/js/azkaban.scheduled.view.js
@@ -10,11 +10,11 @@ function removeSched(projectId, flowName) {
 			function(data) {
 				if (data.error) {
 //                 alert(data.error)
-					$('#errorMsg').text(data.error)
+					$('#errorMsg').text(data.error);
 				}
 				else {
 // 		 alert("Schedule "+schedId+" removed!")
-					window.location = redirectURL
+					window.location = redirectURL;
 				}
 			},
 			"json"
@@ -48,6 +48,7 @@ azkaban.ChangeSlaView = Backbone.View.extend({
 		"click #remove-sla-btn": "handleRemoveSla",
 		"click #sla-cancel-btn": "handleSlaCancel",
 		"click .modal-close": "handleSlaCancel",
+		"click #addRow": "handleAddRow"
 	},
 	initialize: function(setting) {
 
@@ -58,12 +59,31 @@ azkaban.ChangeSlaView = Backbone.View.extend({
 
 		$('#slaModalBackground').hide();
 		$('#sla-options').hide();
+		
+		var tFlowRules = document.getElementById("flowRulesTbl").tBodies[0];
+		var rows = tFlowRules.rows;
+		var rowLength = rows.length
+		for(var i = 0; i < rowLength-1; i++) {
+			tFlowRules.deleteRow(0);
+		}
+		
 	},
 	initFromSched: function(projId, flowName) {
 		this.projectId = projId;
 		this.flowName = flowName;
-		this.scheduleURL = contextURL + "/schedule"
-		var fetchScheduleData = {"projId": this.projectId, "ajax":"schedInfo", "flowName":this.flowName};
+		
+		var scheduleURL = contextURL + "/schedule"
+		this.scheduleURL = scheduleURL;
+		var indexToName = {};
+		var nameToIndex = {};
+		var indexToText = {};
+		this.indexToName = indexToName;
+		this.nameToIndex = nameToIndex;
+		this.indexToText = indexToText;
+		var ruleBoxOptions = ["SUCCESS", "FINISH"];
+		this.ruleBoxOptions = ruleBoxOptions;
+		
+		var fetchScheduleData = {"projId": this.projectId, "ajax":"slaInfo", "flowName":this.flowName};
 		
 		$.get(
 				this.scheduleURL,
@@ -76,53 +96,84 @@ azkaban.ChangeSlaView = Backbone.View.extend({
 						if (data.slaEmails) {
 							$('#slaEmails').val(data.slaEmails.join());
 						}
-						var flowRulesTbl = document.getElementById("flowRulesTbl").tBodies[0];
-						var flowRuleRow = flowRulesTbl.insertRow(-1);
-						var cflowName = flowRuleRow.insertCell(0);
-						cflowName.innerHTML = flowName;
-						var cflowduration = flowRuleRow.insertCell(1);
-						var flowDuration = document.createElement("input");
-						flowDuration.setAttribute("type", "text");
-						flowDuration.setAttribute("id", "flowDuration");
-						flowDuration.setAttribute("class", "durationpick");
-						if(data.flowRules) {
-							flowDuration.setAttribute("value", data.flowRules.duration);
+						
+						var allJobNames = data.allJobNames;
+						
+						indexToName[0] = "";
+						nameToIndex[flowName] = 0;
+						indexToText[0] = "flow " + flowName;
+						for(var i = 1; i <= allJobNames.length; i++) {
+							indexToName[i] = allJobNames[i-1];
+							nameToIndex[allJobNames[i-1]] = i;
+							indexToText[i] = "job " + allJobNames[i-1];
 						}
-						cflowduration.appendChild(flowDuration);
-						var emailAct = flowRuleRow.insertCell(2);
-						var checkEmailAct = document.createElement("input");
-						checkEmailAct.setAttribute("type", "checkbox");
-						emailAct.appendChild(checkEmailAct);
-						var killAct = flowRuleRow.insertCell(3);
-						var checkKillAct = document.createElement("input");
-						checkKillAct.setAttribute("type", "checkbox");
-						killAct.appendChild(checkKillAct);
 						
-						var jobRulesTbl = document.getElementById("jobRulesTbl").tBodies[0];
-						var allJobs = data.allJobs;
-						for (var job in allJobs) {
+						
+						
+						
+						
+						// populate with existing settings
+						if(data.settings) {
 							
-							var jobRuleRow = jobRulesTbl.insertRow(-1);
-							var cjobName = jobRuleRow.insertCell(0);
-							cjobName.innerHTML = allJobs[job];
-							var cjobduration = jobRuleRow.insertCell(1);
-							var jobDuration = document.createElement("input");
-							jobDuration.setAttribute("type", "text");
-							jobDuration.setAttribute("id", "jobDuration");
-							jobDuration.setAttribute("class", "durationpick");
-							if(data.jobRules) {
-								jobDuration.setAttribute("value", data.jobRules[job].duration);
-							}
-							cjobduration.appendChild(jobDuration);
+							var tFlowRules = document.getElementById("flowRulesTbl").tBodies[0];
 							
-							var emailAct = jobRuleRow.insertCell(2);
-							var checkEmailAct = document.createElement("input");
-							checkEmailAct.setAttribute("type", "checkbox");
-							emailAct.appendChild(checkEmailAct);
-							var killAct = jobRuleRow.insertCell(3);
-							var checkKillAct = document.createElement("input");
-							checkKillAct.setAttribute("type", "checkbox");
-							killAct.appendChild(checkKillAct);
+							for(var setting in data.settings) {
+								var rFlowRule = tFlowRules.insertRow(0);
+								
+								var cId = rFlowRule.insertCell(-1);
+								var idSelect = document.createElement("select");
+								for(var i in indexToName) {
+									idSelect.options[i] = new Option(indexToText[i], indexToName[i]);
+									if(data.settings[setting].id == indexToName[i]) {
+										idSelect.options[i].selected = true;
+									}
+								}								
+								cId.appendChild(idSelect);
+								
+								var cRule = rFlowRule.insertCell(-1);
+								var ruleSelect = document.createElement("select");
+								for(var i in ruleBoxOptions) {
+									ruleSelect.options[i] = new Option(ruleBoxOptions[i], ruleBoxOptions[i]);
+									if(data.settings[setting].rule == ruleBoxOptions[i]) {
+										ruleSelect.options[i].selected = true;
+									}
+								}
+								cRule.appendChild(ruleSelect);
+								
+								var cDuration = rFlowRule.insertCell(-1);
+								var duration = document.createElement("input");
+								duration.type = "text";
+								duration.setAttribute("class", "durationpick");
+								var rawMinutes = data.settings[setting].duration;
+								var intMinutes = rawMinutes.substring(0, rawMinutes.length-1);
+								var minutes = parseInt(intMinutes);
+								var hours = Math.floor(minutes / 60);
+								minutes = minutes % 60;
+								duration.value = hours + ":" + minutes;
+								cDuration.appendChild(duration);
+
+								var cEmail = rFlowRule.insertCell(-1);
+								var emailCheck = document.createElement("input");
+								emailCheck.type = "checkbox";
+								for(var act in data.settings[setting].actions) {
+									if(data.settings[setting].actions[act] == "EMAIL") {
+										emailCheck.checked = true;
+									}
+								}
+								cEmail.appendChild(emailCheck);
+								
+								var cKill = rFlowRule.insertCell(-1);
+								var killCheck = document.createElement("input");
+								killCheck.type = "checkbox";
+								for(var act in data.settings[setting].actions) {
+									if(data.settings[setting].actions[act] == "KILL") {
+										killCheck.checked = true;
+									}
+								}
+								cKill.appendChild(killCheck);
+								
+								$('.durationpick').timepicker({hourMax: 99});
+							}
 						}
 						$('.durationpick').timepicker({hourMax: 99});
 					}
@@ -139,8 +190,8 @@ azkaban.ChangeSlaView = Backbone.View.extend({
 	},
 	handleRemoveSla: function(evt) {
 		console.log("Clicked remove sla button");
-		var scheduleURL = contextURL + "/schedule"
-		var redirectURL = contextURL + "/schedule"
+		var scheduleURL = this.scheduleURL;
+		var redirectURL = this.scheduleURL;
 		$.post(
 				scheduleURL,
 				{"action":"removeSla", "projectId":this.projectId, "flowName":this.flowName},
@@ -159,55 +210,90 @@ azkaban.ChangeSlaView = Backbone.View.extend({
 	handleSetSla: function(evt) {
 
 		var slaEmails = $('#slaEmails').val();
-
-//		var flowRules = {};
-		var flowRulesTbl = document.getElementById("flowRulesTbl").tBodies[0];
-		var flowRuleRow = flowRulesTbl.rows[0];
-//		flowRules["flowDuration"] = flowRuleRow.cells[1].firstChild.value;
-//		flowRules["flowEmailAction"] = flowRuleRow.cells[2].firstChild.value;
-//		flowRules["flowKillAction"] = flowRuleRow.cells[3].firstChild.value;
-		var flowRules = flowRuleRow.cells[1].firstChild.value + ',' + flowRuleRow.cells[2].firstChild.value + ',' + flowRuleRow.cells[3].firstChild.value;
+		var settings = {};
 		
-		var jobRules = {};
-		var jobRulesTbl = document.getElementById("jobRulesTbl").tBodies[0];
-		console.log(jobRulesTbl.rows.length);
-		for(var row = 0; row < jobRulesTbl.rows.length; row++) {
-			
-			var jobRow = jobRulesTbl.rows[row];
-			var jobRule = {};
-			
-			console.log(row);
-			console.log(jobRow.cells[0].firstChild.value);
-//			jobRule["jobDuration"] = jobRow.cells[1].firstChild.value;
-//			jobRule["jobEmailAction"] = jobRow.cells[2].firstChild.value;
-//			jobRule["jobKillAction"] = jobRow.cells[3].firstChild.value;
-//			jobRules[jobRow.cells[0].innerHTML] = jobRule;
-			jobRules[jobRow.cells[0].innerHTML] = jobRow.cells[1].firstChild.value + ',' + jobRow.cells[2].firstChild.value + ',' +  jobRow.cells[3].firstChild.value;
-		}
 		
+		var tFlowRules = document.getElementById("flowRulesTbl").tBodies[0];
+		for(var row = 0; row < tFlowRules.rows.length-1; row++) {
+			var rFlowRule = tFlowRules.rows[row];
+			var id = rFlowRule.cells[0].firstChild.value;
+			var rule = rFlowRule.cells[1].firstChild.value;
+			var duration = rFlowRule.cells[2].firstChild.value;
+			var email = rFlowRule.cells[3].firstChild.value;
+			var kill = rFlowRule.cells[4].firstChild.value;
+			settings[row] = id + "," + rule + "," + duration + "," + email + "," + kill; 
+		}
+
 		var slaData = {
 			projectId: this.projectId,
 			flowName: this.flowName,
 			ajax: "setSla",			
 			slaEmails: slaEmails,
-			flowRules: flowRules,
-			jobRules: jobRules
+			settings: settings
 		};
 
-		$.get(
-			this.scheduleURL,
+		var scheduleURL = this.scheduleURL;
+		
+		$.post(
+			scheduleURL,
 			slaData,
 			function(data) {
 				if (data.error) {
 					alert(data.error);
 				}
 				else {
-					window.location.href = this.scheduleURL;
+					tFlowRules.length = 0;
+					window.location = scheduleURL;
 				}
 			},
 			"json"
 		);
 	},
+	handleAddRow: function(evt) {
+		
+		var indexToName = this.indexToName;
+		var nameToIndex = this.nameToIndex;
+		var indexToText = this.indexToText;
+		var ruleBoxOptions = this.ruleBoxOptions;
+
+		var tFlowRules = document.getElementById("flowRulesTbl").tBodies[0];
+		var rFlowRule = tFlowRules.insertRow(0);
+		
+		var cId = rFlowRule.insertCell(-1);
+		var idSelect = document.createElement("select");
+		for(var i in indexToName) {
+			idSelect.options[i] = new Option(indexToText[i], indexToName[i]);
+		}
+		
+		cId.appendChild(idSelect);
+		
+		var cRule = rFlowRule.insertCell(-1);
+		var ruleSelect = document.createElement("select");
+		for(var i in ruleBoxOptions) {
+			ruleSelect.options[i] = new Option(ruleBoxOptions[i], ruleBoxOptions[i]);
+		}
+		cRule.appendChild(ruleSelect);
+		
+		var cDuration = rFlowRule.insertCell(-1);
+		var duration = document.createElement("input");
+		duration.type = "text";
+		duration.setAttribute("class", "durationpick");
+		cDuration.appendChild(duration);
+
+		var cEmail = rFlowRule.insertCell(-1);
+		var emailCheck = document.createElement("input");
+		emailCheck.type = "checkbox";
+		cEmail.appendChild(emailCheck);
+		
+		var cKill = rFlowRule.insertCell(-1);
+		var killCheck = document.createElement("input");
+		killCheck.type = "checkbox";
+		cKill.appendChild(killCheck);
+		
+		$('.durationpick').timepicker({hourMax: 99});
+
+		return rFlowRule;
+	},
 	handleEditColumn : function(evt) {
 		var curTarget = evt.currentTarget;
 	
@@ -234,25 +320,7 @@ azkaban.ChangeSlaView = Backbone.View.extend({
 		$(row).remove();
 	},
 	closeEditingTarget: function(evt) {
-		if (this.editingTarget != null && this.editingTarget != evt.target && this.editingTarget != evt.target.parentElement ) {
-			var input = $(this.editingTarget).children("input")[0];
-			var text = $(input).val();
-			$(input).remove();
-			
-			var valueData = document.createElement("span");
-			$(valueData).addClass("spanValue");
-			$(valueData).text(text);
 
-			if ($(this.editingTarget).hasClass("name")) {
-				var icon = document.createElement("span");
-				$(icon).addClass("removeIcon");
-				$(this.editingTarget).append(icon);
-			}
-			
-			$(this.editingTarget).removeClass("editing");
-			$(this.editingTarget).append(valueData);
-			this.editingTarget = null;
-		}
 	}
 });
 
diff --git a/unit/java/azkaban/scheduler/JdbcScheduleLoaderTest.java b/unit/java/azkaban/scheduler/JdbcScheduleLoaderTest.java
index f6d73f2..4a9491c 100644
--- a/unit/java/azkaban/scheduler/JdbcScheduleLoaderTest.java
+++ b/unit/java/azkaban/scheduler/JdbcScheduleLoaderTest.java
@@ -4,6 +4,7 @@ import java.sql.Connection;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -15,11 +16,23 @@ import org.apache.commons.dbutils.DbUtils;
 import org.apache.commons.dbutils.QueryRunner;
 import org.apache.commons.dbutils.ResultSetHandler;
 import org.joda.time.DateTimeZone;
+import org.joda.time.DurationFieldType;
+import org.joda.time.Hours;
+import org.joda.time.MutablePeriod;
+import org.joda.time.Period;
+import org.joda.time.PeriodType;
+import org.joda.time.ReadablePeriod;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
 
+import azkaban.executor.ExecutableFlow.FailureAction;
+import azkaban.scheduler.Schedule.FlowOptions;
+import azkaban.scheduler.Schedule.SlaOptions;
+import azkaban.sla.SLA.SlaAction;
+import azkaban.sla.SLA.SlaRule;
+import azkaban.sla.SLA.SlaSetting;
 import azkaban.utils.DataSourceUtils;
 import azkaban.utils.Props;
 
@@ -103,26 +116,34 @@ public class JdbcScheduleLoaderTest {
 		
 		JdbcScheduleLoader loader = createLoader();
 		
-		Map<String, Object> scheduleOptions = new HashMap<String, Object>();
-		List<String> disabled = new ArrayList<String>();
-		disabled.add("job1");
-		disabled.add("job2");
-		disabled.add("job3");
-		List<String> failEmails = new ArrayList<String>();
-		failEmails.add("email1");
-		failEmails.add("email2");
-		failEmails.add("email3");
-		boolean hasSla = true;
-		scheduleOptions.put("disabled", disabled);
-		scheduleOptions.put("failEmails", failEmails);
-		scheduleOptions.put("hasSla", hasSla);
+		List<String> emails = new ArrayList<String>();
+		emails.add("email1");
+		emails.add("email2");
+		List<String> disabledJobs = new ArrayList<String>();
+		disabledJobs.add("job1");
+		disabledJobs.add("job2");
+		List<SlaSetting> slaSets = new ArrayList<SlaSetting>();
+		SlaSetting set1 = new SlaSetting();
+		List<SlaAction> actions = new ArrayList<SlaAction>();
+		actions.add(SlaAction.EMAIL);
+		set1.setActions(actions);
+		set1.setId("");
+		set1.setDuration(Schedule.parsePeriodString("1h"));
+		set1.setRule(SlaRule.FINISH);
+		slaSets.add(set1);
+		FlowOptions flowOptions = new FlowOptions();
+		flowOptions.setFailureEmails(emails);
+		flowOptions.setDisabledJobs(disabledJobs);
+		SlaOptions slaOptions = new SlaOptions();
+		slaOptions.setSlaEmails(emails);
+		slaOptions.setSettings(slaSets);
 		
-		Schedule s1 = new Schedule(1, "proj1", "flow1", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "cyu", scheduleOptions);
-		Schedule s2 = new Schedule(1, "proj1", "flow2", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "ccc", scheduleOptions);
-		Schedule s3 = new Schedule(2, "proj1", "flow1", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "cyu", scheduleOptions);
-		Schedule s4 = new Schedule(3, "proj2", "flow1", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "cyu", scheduleOptions);
-		Schedule s5 = new Schedule(3, "proj2", "flow2", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "cyu", scheduleOptions);
-		Schedule s6 = new Schedule(3, "proj2", "flow3", "error", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "cyu", scheduleOptions);
+		Schedule s1 = new Schedule(1, "proj1", "flow1", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "cyu", flowOptions, slaOptions);
+		Schedule s2 = new Schedule(1, "proj1", "flow2", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "ccc", flowOptions, slaOptions);
+		Schedule s3 = new Schedule(2, "proj1", "flow1", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "cyu", flowOptions, slaOptions);
+		Schedule s4 = new Schedule(3, "proj2", "flow1", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "cyu", flowOptions, slaOptions);
+		Schedule s5 = new Schedule(3, "proj2", "flow2", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "cyu", flowOptions, slaOptions);
+		Schedule s6 = new Schedule(3, "proj2", "flow3", "error", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "cyu", flowOptions, slaOptions);
 		
 		loader.insertSchedule(s1);
 		loader.insertSchedule(s2);
@@ -132,13 +153,23 @@ public class JdbcScheduleLoaderTest {
 		loader.insertSchedule(s6);
 		
 		List<Schedule> schedules = loader.loadSchedules();
+		Schedule sched = schedules.get(0);
 		
 		Assert.assertEquals(6, schedules.size());
-		Assert.assertEquals("America/Los_Angeles", schedules.get(0).getTimezone().getID());
-		Assert.assertEquals(44444, schedules.get(0).getSubmitTime());
-		Assert.assertEquals("1d", Schedule.createPeriodString(schedules.get(0).getPeriod()));
-		System.out.println("the options are " + schedules.get(0).getSchedOptions());
-		Assert.assertEquals(true, schedules.get(0).getSchedOptions().get("hasSla"));
+		Assert.assertEquals("America/Los_Angeles", sched.getTimezone().getID());
+		Assert.assertEquals(44444, sched.getSubmitTime());
+		Assert.assertEquals("1d", Schedule.createPeriodString(sched.getPeriod()));
+		FlowOptions fOpt = sched.getFlowOptions();
+		SlaOptions sOpt = sched.getSlaOptions();
+		Assert.assertEquals(SlaAction.EMAIL, sOpt.getSettings().get(0).getActions().get(0));
+		Assert.assertEquals("", sOpt.getSettings().get(0).getId());
+		Assert.assertEquals(Schedule.parsePeriodString("1h"), sOpt.getSettings().get(0).getDuration());
+		Assert.assertEquals(SlaRule.FINISH, sOpt.getSettings().get(0).getRule());
+		Assert.assertEquals(2, fOpt.getFailureEmails().size());
+		Assert.assertEquals(null, fOpt.getSuccessEmails());
+		Assert.assertEquals(2, fOpt.getDisabledJobs().size());
+		Assert.assertEquals(FailureAction.FINISH_CURRENTLY_RUNNING, fOpt.getFailureAction());
+		Assert.assertEquals(null, fOpt.getFlowOverride());
 	}
 	
 	@Test
@@ -150,29 +181,38 @@ public class JdbcScheduleLoaderTest {
 		
 		JdbcScheduleLoader loader = createLoader();
 		
-		Map<String, Object> scheduleOptions = new HashMap<String, Object>();
-		List<String> disabled = new ArrayList<String>();
-		disabled.add("job1");
-		disabled.add("job2");
-		disabled.add("job3");
-		List<String> failEmails = new ArrayList<String>();
-		failEmails.add("email1");
-		failEmails.add("email2");
-		failEmails.add("email3");
-		boolean hasSla = true;
-		scheduleOptions.put("disabled", disabled);
-		scheduleOptions.put("failEmails", failEmails);
-		scheduleOptions.put("hasSla", hasSla);
+		List<String> emails = new ArrayList<String>();
+		emails.add("email1");
+		emails.add("email2");
+		List<String> disabledJobs = new ArrayList<String>();
+		disabledJobs.add("job1");
+		disabledJobs.add("job2");
+		List<SlaSetting> slaSets = new ArrayList<SlaSetting>();
+		SlaSetting set1 = new SlaSetting();
+		List<SlaAction> actions = new ArrayList<SlaAction>();
+		actions.add(SlaAction.EMAIL);
+		set1.setActions(actions);
+		set1.setId("");
+		set1.setDuration(Schedule.parsePeriodString("1h"));
+		set1.setRule(SlaRule.FINISH);
+		slaSets.add(set1);
+		FlowOptions flowOptions = new FlowOptions();
+		flowOptions.setFailureEmails(emails);
+		flowOptions.setDisabledJobs(disabledJobs);
+		SlaOptions slaOptions = new SlaOptions();
+		slaOptions.setSlaEmails(emails);
+		slaOptions.setSettings(slaSets);
 		
-		System.out.println("the options are " + scheduleOptions);
-		Schedule s1 = new Schedule(1, "proj1", "flow1", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "cyu", scheduleOptions);
+		System.out.println("the flow options are " + flowOptions);
+		System.out.println("the sla options are " + slaOptions);
+		Schedule s1 = new Schedule(1, "proj1", "flow1", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "cyu", flowOptions, slaOptions);
 
 		loader.insertSchedule(s1);
 		
-		hasSla = false;
-		scheduleOptions.put("hasSla", hasSla);
+		emails.add("email3");
+		slaOptions.setSlaEmails(emails);
 		
-		Schedule s2 = new Schedule(1, "proj1", "flow1", "ready", 11112, "America/Los_Angeles", "2M", 22223, 33334, 44445, "cyu", scheduleOptions);
+		Schedule s2 = new Schedule(1, "proj1", "flow1", "ready", 11112, "America/Los_Angeles", "2M", 22223, 33334, 44445, "cyu", flowOptions, slaOptions);
 
 		loader.updateSchedule(s2);
 		
@@ -182,8 +222,8 @@ public class JdbcScheduleLoaderTest {
 		Assert.assertEquals("America/Los_Angeles", schedules.get(0).getTimezone().getID());
 		Assert.assertEquals(44445, schedules.get(0).getSubmitTime());
 		Assert.assertEquals("2M", Schedule.createPeriodString(schedules.get(0).getPeriod()));
-		System.out.println("the options are " + schedules.get(0).getSchedOptions());
-		Assert.assertEquals(false, schedules.get(0).getSchedOptions().get("hasSla"));
+//		System.out.println("the options are " + schedules.get(0).getSchedOptions());
+		Assert.assertEquals(3, schedules.get(0).getSlaOptions().getSlaEmails().size());
 	}
 	
 	@Test
@@ -202,21 +242,29 @@ public class JdbcScheduleLoaderTest {
 		
 		for(int i=0; i<stress; i++)
 		{
-			Map<String, Object> scheduleOptions = new HashMap<String, Object>();
-			List<String> disabled = new ArrayList<String>();
-			disabled.add("job1");
-			disabled.add("job2");
-			disabled.add("job3");
-			List<String> failEmails = new ArrayList<String>();
-			failEmails.add("email1");
-			failEmails.add("email2");
-			failEmails.add("email3");
-			boolean hasSla = true;
-			scheduleOptions.put("disabled", disabled);
-			scheduleOptions.put("failEmails", failEmails);
-			scheduleOptions.put("hasSla", hasSla);
+			List<String> emails = new ArrayList<String>();
+			emails.add("email1");
+			emails.add("email2");
+			List<String> disabledJobs = new ArrayList<String>();
+			disabledJobs.add("job1");
+			disabledJobs.add("job2");
+			List<SlaSetting> slaSets = new ArrayList<SlaSetting>();
+			SlaSetting set1 = new SlaSetting();
+			List<SlaAction> actions = new ArrayList<SlaAction>();
+			actions.add(SlaAction.EMAIL);
+			set1.setActions(actions);
+			set1.setId("");
+			set1.setDuration(Schedule.parsePeriodString("1h"));
+			set1.setRule(SlaRule.FINISH);
+			slaSets.add(set1);
+			FlowOptions flowOptions = new FlowOptions();
+			flowOptions.setFailureEmails(emails);
+			flowOptions.setDisabledJobs(disabledJobs);
+			SlaOptions slaOptions = new SlaOptions();
+			slaOptions.setSlaEmails(emails);
+			slaOptions.setSettings(slaSets);
 			
-			Schedule s = new Schedule(i+1, "proj"+(i+1), "flow1", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "cyu", scheduleOptions);
+			Schedule s = new Schedule(i+1, "proj"+(i+1), "flow1", "ready", 11111, "America/Los_Angeles", "1d", 22222, 33333, 44444, "cyu", flowOptions, slaOptions);
 			schedules.add(s);
 			try {
 				loader.insertSchedule(s);
diff --git a/unit/java/azkaban/test/sla/JdbcSLALoaderTest.java b/unit/java/azkaban/test/sla/JdbcSLALoaderTest.java
new file mode 100644
index 0000000..02b7855
--- /dev/null
+++ b/unit/java/azkaban/test/sla/JdbcSLALoaderTest.java
@@ -0,0 +1,182 @@
+package azkaban.test.sla;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.sql.DataSource;
+
+import org.apache.commons.dbutils.DbUtils;
+import org.apache.commons.dbutils.QueryRunner;
+import org.apache.commons.dbutils.ResultSetHandler;
+import org.joda.time.DateTime;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import azkaban.sla.JdbcSLALoader;
+import azkaban.sla.SLA;
+import azkaban.sla.SLA.SlaAction;
+import azkaban.sla.SLA.SlaRule;
+import azkaban.sla.SLALoader;
+import azkaban.utils.DataSourceUtils;
+import azkaban.utils.Props;
+
+
+
+public class JdbcSLALoaderTest {
+	private static boolean testDBExists;
+	//@TODO remove this and turn into local host.
+	private static final String host = "localhost";
+	private static final int port = 3306;
+	private static final String database = "azkaban2";
+	private static final String user = "azkaban";
+	private static final String password = "azkaban";
+	private static final int numConnections = 10;
+	
+	
+	@BeforeClass
+	public static void setupDB() {
+		DataSource dataSource = DataSourceUtils.getMySQLDataSource(host, port, database, user, password, numConnections);
+		testDBExists = true;
+		
+		Connection connection = null;
+		try {
+			connection = dataSource.getConnection();
+		} catch (SQLException e) {
+			e.printStackTrace();
+			testDBExists = false;
+			DbUtils.closeQuietly(connection);
+			return;
+		}
+
+		CountHandler countHandler = new CountHandler();
+		QueryRunner runner = new QueryRunner();
+		try {
+			runner.query(connection, "SELECT COUNT(1) FROM active_sla", countHandler);
+		} catch (SQLException e) {
+			e.printStackTrace();
+			testDBExists = false;
+			DbUtils.closeQuietly(connection);
+			return;
+		}
+
+		DbUtils.closeQuietly(connection);
+		
+		clearDB();
+	}
+	
+	@AfterClass
+	public static void clearDB() {
+		if (!testDBExists) {
+			return;
+		}
+
+		DataSource dataSource = DataSourceUtils.getMySQLDataSource(host, port, database, user, password, numConnections);
+		Connection connection = null;
+		try {
+			connection = dataSource.getConnection();
+		} catch (SQLException e) {
+			e.printStackTrace();
+			testDBExists = false;
+			DbUtils.closeQuietly(connection);
+			return;
+		}
+
+		QueryRunner runner = new QueryRunner();
+		try {
+			runner.update(connection, "DELETE FROM active_sla");
+			
+		} catch (SQLException e) {
+			e.printStackTrace();
+			testDBExists = false;
+			DbUtils.closeQuietly(connection);
+			return;
+		}
+		
+		DbUtils.closeQuietly(connection);
+	}
+	
+	@Test
+	public void testInsertSLA() throws Exception {
+		if (!isTestSetup()) {
+			return;
+		}
+		
+		SLALoader loader = createLoader();
+		
+		int execId = 1;
+		String jobName = "";
+		DateTime checkTime = new DateTime(11111);
+		List<String> emails = new ArrayList<String>();
+		emails.add("email1");
+		emails.add("email2");
+//		List<SlaRule> rules = new ArrayList<SlaRule>();
+//		rules.add(SlaRule.SUCCESS);
+//		rules.add(SlaRule.FINISH);
+		List<SlaAction> actions = new ArrayList<SLA.SlaAction>();
+		actions.add(SlaAction.EMAIL);
+		SLA s = new SLA(execId, jobName, checkTime, emails, actions, null, SlaRule.FINISH);
+
+		loader.insertSLA(s);
+		
+		List<SLA> allSLAs = loader.loadSLAs();
+		SLA fetchSLA = allSLAs.get(0);
+
+		Assert.assertTrue(allSLAs.size() == 1);
+		// Shouldn't be the same object.
+		Assert.assertTrue(s != fetchSLA);
+		Assert.assertEquals(s.getExecId(), fetchSLA.getExecId());
+		Assert.assertEquals(s.getJobName(), fetchSLA.getJobName());
+		Assert.assertEquals(s.getCheckTime(), fetchSLA.getCheckTime());
+		Assert.assertEquals(s.getEmails(), fetchSLA.getEmails());
+		Assert.assertEquals(s.getRule(), fetchSLA.getRule());
+		Assert.assertEquals(s.getActions().get(0), fetchSLA.getActions().get(0));
+		
+		
+		loader.removeSLA(s);
+		
+		allSLAs = loader.loadSLAs();
+		
+		Assert.assertTrue(allSLAs.size() == 0);
+	}
+	
+	private SLALoader createLoader() {
+		Props props = new Props();
+		props.put("database.type", "mysql");
+		
+		props.put("mysql.host", host);		
+		props.put("mysql.port", port);
+		props.put("mysql.user", user);
+		props.put("mysql.database", database);
+		props.put("mysql.password", password);
+		props.put("mysql.numconnections", numConnections);
+		
+		return new JdbcSLALoader(props);
+	}
+	
+	private boolean isTestSetup() {
+		if (!testDBExists) {
+			System.err.println("Skipping DB test because Db not setup.");
+			return false;
+		}
+		
+		System.out.println("Running DB test because Db setup.");
+		return true;
+	}
+	
+	public static class CountHandler implements ResultSetHandler<Integer> {
+		@Override
+		public Integer handle(ResultSet rs) throws SQLException {
+			int val = 0;
+			while (rs.next()) {
+				val++;
+			}
+			
+			return val;
+		}
+	}
+}
\ No newline at end of file
diff --git a/unit/java/azkaban/test/utils/EmailMessageTest.java b/unit/java/azkaban/test/utils/EmailMessageTest.java
new file mode 100644
index 0000000..43bf216
--- /dev/null
+++ b/unit/java/azkaban/test/utils/EmailMessageTest.java
@@ -0,0 +1,46 @@
+package azkaban.test.utils;
+
+import java.io.IOException;
+
+import javax.mail.MessagingException;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import azkaban.utils.EmailMessage;
+
+public class EmailMessageTest {
+
+	String host = "";
+	String sender = "";
+	String user = "";
+	String password = "";
+	
+	String toAddr = "";
+	
+	private EmailMessage em;
+	@Before
+	public void setUp() throws Exception {
+		em = new EmailMessage(host, user, password);
+		em.setFromAddress(sender);
+	}
+
+	@After
+	public void tearDown() throws Exception {
+	}
+
+	@Test
+	public void testSendEmail() throws IOException {
+		em.addToAddress(toAddr);
+		//em.addToAddress("cyu@linkedin.com");
+		em.setSubject("azkaban test email");
+		em.setBody("azkaban test email");
+		try {
+			em.sendEmail();
+		} catch (MessagingException e) {
+			// TODO Auto-generated catch block
+			e.printStackTrace();
+		}
+	}
+	
+}