azkaban-uncached

Changes

Details

diff --git a/src/java/azkaban/execapp/FlowRunner.java b/src/java/azkaban/execapp/FlowRunner.java
index 2f58a1a..0fbe48d 100644
--- a/src/java/azkaban/execapp/FlowRunner.java
+++ b/src/java/azkaban/execapp/FlowRunner.java
@@ -28,8 +28,9 @@ import azkaban.execapp.event.EventHandler;
 import azkaban.execapp.event.EventListener;
 import azkaban.execapp.event.FlowWatcher;
 import azkaban.executor.ExecutableFlow;
-import azkaban.executor.ExecutableFlow.FailureAction;
 import azkaban.executor.ExecutableNode;
+import azkaban.executor.ExecutionOptions;
+import azkaban.executor.ExecutionOptions.FailureAction;
 import azkaban.executor.ExecutorLoader;
 import azkaban.executor.ExecutorManagerException;
 import azkaban.executor.Status;
@@ -105,8 +106,9 @@ public class FlowRunner extends EventHandler implements Runnable {
 		this.execDir = new File(flow.getExecutionPath());
 		this.jobtypeManager = jobtypeManager;
 
-		this.pipelineLevel = flow.getPipelineLevel();
-		this.pipelineExecId = flow.getPipelineExecutionId();
+		ExecutionOptions options = flow.getExecutionOptions();
+		this.pipelineLevel = options.getPipelineLevel();
+		this.pipelineExecId = options.getPipelineExecutionId();
 
 		this.proxyUsers = flow.getProxyUsers();
 	}
@@ -344,8 +346,9 @@ public class FlowRunner extends EventHandler implements Runnable {
 		Props parentProps = propsSource == null ? globalProps : sharedProps.get(propsSource);
 
 		// Set up overrides
+		ExecutionOptions options = flow.getExecutionOptions();
 		@SuppressWarnings("unchecked")
-		Props flowProps = new Props(null, flow.getFlowParameters()); 
+		Props flowProps = new Props(null, options.getFlowParameters()); 
 		
 		if (flowProps.size() > 0) {
 			flowProps.setParent(parentProps);
@@ -699,7 +702,8 @@ public class FlowRunner extends EventHandler implements Runnable {
 			}
 		}
 		
-		if (shouldKill || flowCancelled || (flowFailed && flow.getFailureAction() != FailureAction.FINISH_ALL_POSSIBLE)) {
+		ExecutionOptions options = flow.getExecutionOptions();
+		if (shouldKill || flowCancelled || (flowFailed && options.getFailureAction() != FailureAction.FINISH_ALL_POSSIBLE)) {
 			return Status.KILLED;
 		}
 		
@@ -793,7 +797,8 @@ public class FlowRunner extends EventHandler implements Runnable {
 						flowFailed = true;
 						if (!isFailedStatus(flow.getStatus())) {
 							flow.setStatus(Status.FAILED_FINISHING);
-							if (flow.getFailureAction() == FailureAction.CANCEL_ALL) {
+							ExecutionOptions options = flow.getExecutionOptions();
+							if (options.getFailureAction() == FailureAction.CANCEL_ALL) {
 								cancel("azkaban");
 							}
 						}
diff --git a/src/java/azkaban/execapp/FlowRunnerManager.java b/src/java/azkaban/execapp/FlowRunnerManager.java
index 9e9d8c7..cc35939 100644
--- a/src/java/azkaban/execapp/FlowRunnerManager.java
+++ b/src/java/azkaban/execapp/FlowRunnerManager.java
@@ -42,6 +42,7 @@ import azkaban.execapp.event.FlowWatcher;
 import azkaban.execapp.event.LocalFlowWatcher;
 import azkaban.execapp.event.RemoteFlowWatcher;
 import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutionOptions;
 import azkaban.executor.ExecutorLoader;
 import azkaban.executor.ExecutorManagerException;
 import azkaban.jobtype.JobTypeManager;
@@ -368,8 +369,9 @@ public class FlowRunnerManager implements EventListener {
 		
 		// Setup flow runner
 		FlowWatcher watcher = null;
-		if (flow.getPipelineExecutionId() != null) {
-			Integer pipelineExecId = flow.getPipelineExecutionId();
+		ExecutionOptions options = flow.getExecutionOptions();
+		if (options.getPipelineExecutionId() != null) {
+			Integer pipelineExecId = options.getPipelineExecutionId();
 			FlowRunner runner = runningFlows.get(pipelineExecId);
 			
 			if (runner != null) {
diff --git a/src/java/azkaban/executor/ExecutableFlow.java b/src/java/azkaban/executor/ExecutableFlow.java
index b70baa8..ef34069 100644
--- a/src/java/azkaban/executor/ExecutableFlow.java
+++ b/src/java/azkaban/executor/ExecutableFlow.java
@@ -43,9 +43,6 @@ public class ExecutableFlow {
 	private HashMap<String, ExecutableNode> executableNodes = new HashMap<String, ExecutableNode>();
 	private ArrayList<String> startNodes;
 	private ArrayList<String> endNodes;
-	
-	private ArrayList<String> failureEmails = new ArrayList<String>();
-	private ArrayList<String> successEmails = new ArrayList<String>();
 
 	private long submitTime = -1;
 	private long startTime = -1;
@@ -54,30 +51,14 @@ public class ExecutableFlow {
 
 	private Status flowStatus = Status.READY;
 	private String submitUser;
-	private boolean notifyOnFirstFailure = true;
-	private boolean notifyOnLastFailure = false;
-	
-	private Integer pipelineLevel = null;
-	private Integer pipelineExecId = null;
-	private Integer queueLevel = null;
-	private String concurrentOption = null;
-	private Map<String, String> flowParameters = new HashMap<String, String>();
 	
 	private HashSet<String> proxyUsers = new HashSet<String>();
-
-	public enum FailureAction {
-		FINISH_CURRENTLY_RUNNING,
-		CANCEL_ALL,
-		FINISH_ALL_POSSIBLE
-	}
-	
-	private FailureAction failureAction = FailureAction.FINISH_CURRENTLY_RUNNING;
+	private ExecutionOptions executionOptions;
 	
 	public ExecutableFlow(Flow flow) {
 		this.projectId = flow.getProjectId();
 		this.flowId = flow.getId();
 		this.version = flow.getVersion();
-
 		this.setFlow(flow);
 	}
 	
@@ -113,14 +94,6 @@ public class ExecutableFlow {
 		return flowProps.values();
 	}
 	
-	public void addFlowParameters(Map<String, String> param) {
-		flowParameters.putAll(param);
-	}
-	
-	public Map<String, String> getFlowParameters() {
-		return flowParameters;
-	}
-	
 	public void setProxyUsers(HashSet<String> proxyUsers) {
 		this.proxyUsers = proxyUsers;
 	}
@@ -129,7 +102,17 @@ public class ExecutableFlow {
 		return this.proxyUsers;
 	}
 	
+	public void setExecutionOptions(ExecutionOptions options) {
+		executionOptions = options;
+	}
+	
+	public ExecutionOptions getExecutionOptions() {
+		return executionOptions;
+	}
+	
 	private void setFlow(Flow flow) {
+		executionOptions = new ExecutionOptions();
+		
 		for (Node node: flow.getNodes()) {
 			String id = node.getId();
 			ExecutableNode exNode = new ExecutableNode(node, this);
@@ -145,10 +128,10 @@ public class ExecutableFlow {
 		}
 		
 		if (flow.getSuccessEmails() != null) {
-			successEmails = new ArrayList<String>(flow.getSuccessEmails());
+			executionOptions.setSuccessEmails(flow.getSuccessEmails());
 		}
 		if (flow.getFailureEmails() != null) {
-			failureEmails = new ArrayList<String>(flow.getFailureEmails());
+			executionOptions.setFailureEmails(flow.getFailureEmails());
 		}
 		flowProps.putAll(flow.getAllFlowProps());
 	}
@@ -265,22 +248,6 @@ public class ExecutableFlow {
 		this.flowStatus = flowStatus;
 	}
 	
-	public void setFailureEmails(List<String> emails) {
-		this.failureEmails = emails == null ? new ArrayList<String>() : new ArrayList<String>(emails);
-	}
-	
-	public List<String> getFailureEmails() {
-		return this.failureEmails;
-	}
-	
-	public void setSuccessEmails(List<String> emails) {
-		this.successEmails = emails == null ? new ArrayList<String>() : new ArrayList<String>(emails);
-	}
-	
-	public List<String> getSuccessEmails() {
-		return this.successEmails;
-	}
-	
 	public Map<String,Object> toObject() {
 		HashMap<String, Object> flowObj = new HashMap<String, Object>();
 		flowObj.put("type", "executableflow");
@@ -293,17 +260,10 @@ public class ExecutableFlow {
 		flowObj.put("endTime", endTime);
 		flowObj.put("status", flowStatus.toString());
 		flowObj.put("submitUser", submitUser);
-		flowObj.put("flowParameters", this.flowParameters);
-		flowObj.put("notifyOnFirstFailure", this.notifyOnFirstFailure);
-		flowObj.put("notifyOnLastFailure", this.notifyOnLastFailure);
-		flowObj.put("successEmails", successEmails);
-		flowObj.put("failureEmails", failureEmails);
-		flowObj.put("failureAction", failureAction.toString());
-		flowObj.put("pipelineLevel", pipelineLevel);
-		flowObj.put("pipelineExecId", pipelineExecId);
-		flowObj.put("queueLevel", queueLevel);
 		flowObj.put("version", version);
-		flowObj.put("concurrentOption", concurrentOption);
+		
+		flowObj.put("executionOptions", this.executionOptions.toObject());
+		flowObj.put("version", version);
 		
 		ArrayList<Object> props = new ArrayList<Object>();
 		for (FlowProps fprop: flowProps.values()) {
@@ -332,14 +292,6 @@ public class ExecutableFlow {
 		return flowObj;
 	}
 
-	public void setFailureAction(FailureAction action) {
-		failureAction = action;
-	}
-	
-	public FailureAction getFailureAction() {
-		return failureAction;
-	}
-	
 	public Object toUpdateObject(long lastUpdateTime) {
 		Map<String, Object> updateData = new HashMap<String,Object>();
 		updateData.put("execId", this.executionId);
@@ -424,28 +376,14 @@ public class ExecutableFlow {
 		exFlow.flowStatus = Status.valueOf((String)flowObj.get("status"));
 		exFlow.submitUser = (String)flowObj.get("submitUser");
 		exFlow.version = (Integer)flowObj.get("version");
-
-		if (flowObj.containsKey("flowParameters")) {
-			exFlow.flowParameters = new HashMap<String, String>((Map<String,String>)flowObj.get("flowParameters"));
-		}
-		// Failure notification
-		if (flowObj.containsKey("notifyOnFirstFailure")) {
-			exFlow.notifyOnFirstFailure = (Boolean)flowObj.get("notifyOnFirstFailure");
-		}
-		if (flowObj.containsKey("notifyOnLastFailure")) {
-			exFlow.notifyOnLastFailure = (Boolean)flowObj.get("notifyOnLastFailure");
-		}
-		if (flowObj.containsKey("concurrentOption")) {
-			exFlow.concurrentOption = (String)flowObj.get("concurrentOption");
-		}
 		
-		// Failure action
-		if (flowObj.containsKey("failureAction")) {
-			exFlow.failureAction = FailureAction.valueOf((String)flowObj.get("failureAction"));
+		if (flowObj.containsKey("executionOptions")) {
+			exFlow.executionOptions = ExecutionOptions.createFromObject(flowObj.get("executionOptions"));
+		}
+		else {
+			// for backawards compatibility should remove in a few versions.
+			exFlow.executionOptions = ExecutionOptions.createFromObject(flowObj);
 		}
-		exFlow.pipelineLevel = (Integer)flowObj.get("pipelineLevel");
-		exFlow.pipelineExecId = (Integer)flowObj.get("pipelineExecId");
-		exFlow.queueLevel = (Integer)flowObj.get("queueLevel");
 		
 		// Copy nodes
 		List<Object> nodes = (List<Object>)flowObj.get("nodes");
@@ -464,11 +402,6 @@ public class ExecutableFlow {
 			exFlow.flowProps.put(source, flowProps);
 		}
 		
-		// Success emails
-		exFlow.setSuccessEmails((List<String>)flowObj.get("successEmails"));
-		// Failure emails
-		exFlow.setFailureEmails((List<String>)flowObj.get("failureEmails"));
-		
 		if(flowObj.containsKey("proxyUsers")) {
 			ArrayList<String> proxyUserList = (ArrayList<String>) flowObj.get("proxyUsers");
 			exFlow.setProxyUsers(new HashSet<String>(proxyUserList));
@@ -518,30 +451,6 @@ public class ExecutableFlow {
 	public void setSubmitUser(String submitUser) {
 		this.submitUser = submitUser;
 	}
-
-	public void setPipelineLevel(Integer level) {
-		pipelineLevel = level;
-	}
-	
-	public void setPipelineExecutionId(Integer execId) {
-		pipelineExecId = execId;
-	}
-	
-	public void setNotifyOnFirstFailure(boolean notify) {
-		this.notifyOnFirstFailure = notify;
-	}
-	
-	public void setNotifyOnLastFailure(boolean notify) {
-		this.notifyOnLastFailure = notify;
-	}
-	
-	public boolean getNotifyOnFirstFailure() {
-		return this.notifyOnFirstFailure;
-	}
-	
-	public boolean getNotifyOnLastFailure() {
-		return this.notifyOnLastFailure;
-	}
 	
 	public int getVersion() {
 		return version;
@@ -550,28 +459,4 @@ public class ExecutableFlow {
 	public void setVersion(int version) {
 		this.version = version;
 	}
-	
-	public Integer getPipelineLevel() {
-		return pipelineLevel;
-	}
-	
-	public Integer getPipelineExecutionId() {
-		return pipelineExecId;
-	}
-	
-	public Integer getQueueLevel() {
-		return queueLevel;
-	}
-	
-	public void setQueueLevel(int queue) {
-		queueLevel = queue;
-	}
-	
-	public String getConcurrentOption() {
-		return this.concurrentOption;
-	}
-	
-	public void setConcurrentOption(String concurrentOption) {
-		this.concurrentOption = concurrentOption;
-	}
 }
diff --git a/src/java/azkaban/executor/ExecutionOptions.java b/src/java/azkaban/executor/ExecutionOptions.java
new file mode 100644
index 0000000..25b9575
--- /dev/null
+++ b/src/java/azkaban/executor/ExecutionOptions.java
@@ -0,0 +1,180 @@
+package azkaban.executor;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Execution options for submitted flows and scheduled flows
+ */
+public class ExecutionOptions {
+	
+	private boolean notifyOnFirstFailure = true;
+	private boolean notifyOnLastFailure = false;
+	private ArrayList<String> failureEmails = new ArrayList<String>();
+	private ArrayList<String> successEmails = new ArrayList<String>();
+	
+	private Integer pipelineLevel = null;
+	private Integer pipelineExecId = null;
+	private Integer queueLevel = null;
+	private String concurrentOption = null;
+	private Map<String, String> flowParameters = new HashMap<String, String>();
+	
+	public enum FailureAction {
+		FINISH_CURRENTLY_RUNNING,
+		CANCEL_ALL,
+		FINISH_ALL_POSSIBLE
+	}
+	
+	private FailureAction failureAction = FailureAction.FINISH_CURRENTLY_RUNNING;
+	
+	private Set<String> initiallyDisabledJobs = new HashSet<String>();
+	
+	public void setFlowParameters(Map<String,String> flowParam) {
+		flowParameters.get(flowParam);
+	}
+	
+	public Map<String,String> getFlowParameters() {
+		return flowParameters;
+	}
+	
+	public void setFailureEmails(Collection<String> emails) {
+		failureEmails.addAll(emails);
+	}
+	
+	public List<String> getFailureEmails() {
+		return failureEmails;
+	}
+	
+	public void setSuccessEmails(Collection<String> emails) {
+		successEmails.addAll(emails);
+	}
+	
+	public List<String> getSuccessEmails() {
+		return successEmails;
+	}
+	
+	public boolean getNotifyOnFirstFailure() {
+		return notifyOnFirstFailure;
+	}
+	
+	public boolean getNotifyOnLastFailure() {
+		return notifyOnLastFailure;
+	}
+	
+	public void setNotifyOnFirstFailure(boolean notify) {
+		this.notifyOnFirstFailure = notify;
+	}
+	
+	public void setNotifyOnLastFailure(boolean notify) {
+		this.notifyOnLastFailure = notify;
+	}
+	
+	public FailureAction getFailureAction() {
+		return failureAction;
+	}
+	
+	public void setFailureAction(FailureAction action) {
+		failureAction = action;
+	}
+	
+	public void setConcurrentOption(String concurrentOption) {
+		this.concurrentOption = concurrentOption;
+	}
+	
+	public String getConcurrentOption() {
+		return concurrentOption;
+	}
+	
+	public Integer getPipelineLevel() {
+		return pipelineLevel;
+	}
+	
+	public Integer getPipelineExecutionId() {
+		return pipelineExecId;
+	}
+	
+	public void setPipelineLevel(Integer level) {
+		pipelineLevel = level;
+	}
+	
+	public void setPipelineExecutionId(Integer id) {
+		this.pipelineExecId = id;
+	}
+	
+	public Integer getQueueLevel() {
+		return queueLevel;
+	}
+	
+	public List<String> getDisabledJobs() {
+		return new ArrayList<String>(initiallyDisabledJobs);
+	}
+	
+	public void setDisabledJobs(List<String> disabledJobs) {
+		initiallyDisabledJobs = new HashSet<String>(disabledJobs);
+	}
+	
+	public Map<String,Object> toObject() {
+		HashMap<String,Object> flowOptionObj = new HashMap<String,Object>();
+		
+		flowOptionObj.put("flowParameters", this.flowParameters);
+		flowOptionObj.put("notifyOnFirstFailure", this.notifyOnFirstFailure);
+		flowOptionObj.put("notifyOnLastFailure", this.notifyOnLastFailure);
+		flowOptionObj.put("successEmails", successEmails);
+		flowOptionObj.put("failureEmails", failureEmails);
+		flowOptionObj.put("failureAction", failureAction.toString());
+		flowOptionObj.put("pipelineLevel", pipelineLevel);
+		flowOptionObj.put("pipelineExecId", pipelineExecId);
+		flowOptionObj.put("queueLevel", queueLevel);
+		flowOptionObj.put("concurrentOption", concurrentOption);
+		flowOptionObj.put("disabled", initiallyDisabledJobs);
+		
+		return flowOptionObj;
+	}
+	
+	@SuppressWarnings("unchecked")
+	public static ExecutionOptions createFromObject(Object obj) {
+		if (obj == null || !(obj instanceof Map)) {
+			return null;
+		}
+		
+		Map<String,Object> optionsMap = new HashMap<String,Object>();
+		
+		ExecutionOptions options = new ExecutionOptions();
+		if (optionsMap.containsKey("flowParameters")) {
+			options.flowParameters = new HashMap<String, String>((Map<String,String>)optionsMap.get("flowParameters"));
+		}
+		// Failure notification
+		if (optionsMap.containsKey("notifyOnFirstFailure")) {
+			options.notifyOnFirstFailure = (Boolean)optionsMap.get("notifyOnFirstFailure");
+		}
+		if (optionsMap.containsKey("notifyOnLastFailure")) {
+			options.notifyOnLastFailure = (Boolean)optionsMap.get("notifyOnLastFailure");
+		}
+		if (optionsMap.containsKey("concurrentOption")) {
+			options.concurrentOption = (String)optionsMap.get("concurrentOption");
+		}
+		if (optionsMap.containsKey("disabled")) {
+			options.initiallyDisabledJobs = new HashSet<String>((List<String>)optionsMap.get("disabled"));
+		}
+		
+		// Failure action
+		if (optionsMap.containsKey("failureAction")) {
+			options.failureAction = FailureAction.valueOf((String)optionsMap.get("failureAction"));
+		}
+		options.pipelineLevel = (Integer)optionsMap.get("pipelineLevel");
+		options.pipelineExecId = (Integer)optionsMap.get("pipelineExecId");
+		options.queueLevel = (Integer)optionsMap.get("queueLevel");
+		
+		// Success emails
+		options.setSuccessEmails((List<String>)optionsMap.get("successEmails"));
+		// Failure emails
+		options.setFailureEmails((List<String>)optionsMap.get("failureEmails"));
+		
+		return options;
+	}
+}
diff --git a/src/java/azkaban/executor/ExecutorMailer.java b/src/java/azkaban/executor/ExecutorMailer.java
index 59d7727..a49b4fb 100644
--- a/src/java/azkaban/executor/ExecutorMailer.java
+++ b/src/java/azkaban/executor/ExecutorMailer.java
@@ -7,7 +7,7 @@ import javax.mail.MessagingException;
 
 import org.apache.log4j.Logger;
 
-import azkaban.executor.ExecutableFlow.FailureAction;
+import azkaban.executor.ExecutionOptions.FailureAction;
 import azkaban.utils.EmailMessage;
 import azkaban.utils.Props;
 import azkaban.utils.Utils;
@@ -39,7 +39,8 @@ public class ExecutorMailer {
 	}
 	
 	public void sendFirstErrorMessage(ExecutableFlow flow) {
-		List<String> emailList = flow.getFailureEmails();
+		ExecutionOptions option = flow.getExecutionOptions();
+		List<String> emailList = option.getDisabledJobs();
 		int execId = flow.getExecutionId();
 		
 		if (emailList != null && !emailList.isEmpty()) {
@@ -51,10 +52,10 @@ public class ExecutorMailer {
 			
 			message.println("<h2 style=\"color:#FF0000\"> Execution '" + flow.getExecutionId() + "' of flow '" + flow.getFlowId() + "' has encountered a failure on " + azkabanName + "</h2>");
 			
-			if (flow.getFailureAction() == FailureAction.CANCEL_ALL) {
+			if (option.getFailureAction() == FailureAction.CANCEL_ALL) {
 				message.println("This flow is set to cancel all currently running jobs.");
 			}
-			else if (flow.getFailureAction() == FailureAction.FINISH_ALL_POSSIBLE){
+			else if (option.getFailureAction() == FailureAction.FINISH_ALL_POSSIBLE){
 				message.println("This flow is set to complete all jobs that aren't blocked by the failure.");
 			}
 			else {
@@ -91,7 +92,9 @@ public class ExecutorMailer {
 	}
 	
 	public void sendErrorEmail(ExecutableFlow flow, String ... extraReasons) {
-		List<String> emailList = flow.getFailureEmails();
+		ExecutionOptions option = flow.getExecutionOptions();
+		
+		List<String> emailList = option.getFailureEmails();
 		int execId = flow.getExecutionId();
 		
 		if (emailList != null && !emailList.isEmpty()) {
@@ -101,7 +104,7 @@ public class ExecutorMailer {
 			message.setMimeType("text/html");
 			message.setSubject("Flow '" + flow.getFlowId() + "' has failed on " + azkabanName);
 			
-			message.println("<h2 style=\"color:#FF0000\"> Execution '" + flow.getExecutionId() + "' of flow '" + flow.getFlowId() + "' has failed on " + azkabanName + "</h2>");
+			message.println("<h2 style=\"color:#FF0000\"> Execution '" + execId + "' of flow '" + flow.getFlowId() + "' has failed on " + azkabanName + "</h2>");
 			message.println("<table>");
 			message.println("<tr><td>Start Time</td><td>" + flow.getStartTime() +"</td></tr>");
 			message.println("<tr><td>End Time</td><td>" + flow.getEndTime() +"</td></tr>");
@@ -137,7 +140,8 @@ public class ExecutorMailer {
 	}
 
 	public void sendSuccessEmail(ExecutableFlow flow) {
-		List<String> emailList = flow.getSuccessEmails();
+		ExecutionOptions option = flow.getExecutionOptions();
+		List<String> emailList = option.getSuccessEmails();
 
 		int execId = flow.getExecutionId();
 		
diff --git a/src/java/azkaban/executor/ExecutorManager.java b/src/java/azkaban/executor/ExecutorManager.java
index ff5e2ff..b18bc6c 100644
--- a/src/java/azkaban/executor/ExecutorManager.java
+++ b/src/java/azkaban/executor/ExecutorManager.java
@@ -299,16 +299,24 @@ public class ExecutorManager {
 			
 			List<Integer> running = getRunningFlows(projectId, flowId);
 
+			ExecutionOptions options = exflow.getExecutionOptions();
+			
+			// Disable jobs
+			for(String disabledId : options.getDisabledJobs()) {
+				ExecutableNode node = exflow.getExecutableNode(disabledId);
+				node.setStatus(Status.DISABLED);
+			}
+			
 			String message = "";
 			if (!running.isEmpty()) {
-				if (exflow.getConcurrentOption().equals("pipeline")) {
+				if (options.getConcurrentOption().equals("pipeline")) {
 					Collections.sort(running);
 					Integer runningExecId = running.get(running.size() - 1);
 					
-					exflow.setPipelineExecutionId(runningExecId);
-					message = "Flow " + flowId + " is already running with exec id " + runningExecId +". Pipelining level " + exflow.getPipelineLevel() + ". ";
+					options.setPipelineExecutionId(runningExecId);
+					message = "Flow " + flowId + " is already running with exec id " + runningExecId +". Pipelining level " + options.getPipelineLevel() + ". ";
 				}
-				else if (exflow.getConcurrentOption().equals("skip")) {
+				else if (options.getConcurrentOption().equals("skip")) {
 					throw new ExecutorManagerException("Flow " + flowId + " is already running. Skipping execution.");
 				}
 				else {
@@ -582,10 +590,11 @@ public class ExecutorManager {
 		// TODO append to the flow log that we forced killed this flow because the target no longer had
 		// the reference.
 		
+		ExecutionOptions options = flow.getExecutionOptions();
 		// But we can definitely email them.
 		if(flow.getStatus() == Status.FAILED || flow.getStatus() == Status.KILLED)
 		{
-			if(flow.getFailureEmails() != null && !flow.getFailureEmails().isEmpty())
+			if(options.getFailureEmails() != null && !options.getFailureEmails().isEmpty())
 			{
 				try {
 					mailer.sendErrorEmail(flow, "Executor no longer seems to be running this execution. Most likely due to executor bounce.");
@@ -596,7 +605,7 @@ public class ExecutorManager {
 		}
 		else
 		{
-			if(flow.getSuccessEmails() != null && !flow.getSuccessEmails().isEmpty())
+			if(options.getSuccessEmails() != null && !options.getSuccessEmails().isEmpty())
 			{
 				try {
 					mailer.sendSuccessEmail(flow);
@@ -681,9 +690,10 @@ public class ExecutorManager {
 		flow.applyUpdateObject(updateData);
 		Status newStatus = flow.getStatus();
 		
+		ExecutionOptions options = flow.getExecutionOptions();
 		if (oldStatus != newStatus && newStatus.equals(Status.FAILED_FINISHING)) {
 			// We want to see if we should give an email status on first failure.
-			if (flow.getNotifyOnFirstFailure()) {
+			if (options.getNotifyOnFirstFailure()) {
 				mailer.sendFirstErrorMessage(flow);
 			}
 		}
diff --git a/src/java/azkaban/jobtype/JobTypeManager.java b/src/java/azkaban/jobtype/JobTypeManager.java
index 65c747e..6464ca7 100644
--- a/src/java/azkaban/jobtype/JobTypeManager.java
+++ b/src/java/azkaban/jobtype/JobTypeManager.java
@@ -25,7 +25,6 @@ import azkaban.jobExecutor.ScriptJob;
 import azkaban.utils.Props;
 import azkaban.utils.PropsUtils;
 import azkaban.utils.Utils;
-import azkaban.jobExecutor.utils.InitErrorJob;
 import azkaban.jobExecutor.utils.JobExecutionException;
 import java.io.File;
 import java.net.MalformedURLException;
diff --git a/src/java/azkaban/scheduler/JdbcScheduleLoader.java b/src/java/azkaban/scheduler/JdbcScheduleLoader.java
index ee9c812..96130dd 100644
--- a/src/java/azkaban/scheduler/JdbcScheduleLoader.java
+++ b/src/java/azkaban/scheduler/JdbcScheduleLoader.java
@@ -24,7 +24,6 @@ import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-import java.util.Map;
 
 import javax.sql.DataSource;
 
@@ -35,14 +34,7 @@ import org.apache.log4j.Logger;
 
 import org.joda.time.DateTimeZone;
 import org.joda.time.ReadablePeriod;
-import org.joda.time.format.DateTimeFormat;
-import org.joda.time.format.DateTimeFormatter;
 
-import azkaban.scheduler.Schedule.FlowOptions;
-import azkaban.scheduler.Schedule.SlaOptions;
-import azkaban.sla.SLA;
-import azkaban.sla.SLAManagerException;
-import azkaban.sla.JdbcSLALoader.EncodingType;
 import azkaban.utils.DataSourceUtils;
 import azkaban.utils.GZIPUtils;
 import azkaban.utils.JSONUtils;
@@ -328,11 +320,10 @@ public class JdbcScheduleLoader implements ScheduleLoader {
 				int encodingType = rs.getInt(12);
 				byte[] data = rs.getBytes(13);
 				
-				FlowOptions flowOptions = null;
-				SlaOptions slaOptions = null;
+				Object optsObj = null;
 				if (data != null) {
 					EncodingType encType = EncodingType.fromInteger(encodingType);
-					Object optsObj;
+
 					try {
 						// Convoluted way to inflate strings. Should find common package or helper function.
 						if (encType == EncodingType.GZIP) {
@@ -343,15 +334,16 @@ public class JdbcScheduleLoader implements ScheduleLoader {
 						else {
 							String jsonString = new String(data, "UTF-8");
 							optsObj = JSONUtils.parseJSONFromString(jsonString);
-						}						
-						flowOptions = Schedule.createFlowOptionFromObject(optsObj);
-						slaOptions = Schedule.createSlaOptionFromObject(optsObj);
+						}	
 					} catch (IOException e) {
 						throw new SQLException("Error reconstructing schedule options " + projectName + "." + flowName);
 					}
 				}
 				
-				Schedule s = new Schedule(projectId, projectName, flowName, status, firstSchedTime, timezone, period, lastModifyTime, nextExecTime, submitTime, submitUser, flowOptions, slaOptions);
+				Schedule s = new Schedule(projectId, projectName, flowName, status, firstSchedTime, timezone, period, lastModifyTime, nextExecTime, submitTime, submitUser);
+				if (optsObj != null) {
+					s.createAndSetScheduleOptions(optsObj);
+				}
 				
 				schedules.add(s);
 			} while (rs.next());
diff --git a/src/java/azkaban/scheduler/Schedule.java b/src/java/azkaban/scheduler/Schedule.java
index b3382d3..2fd6074 100644
--- a/src/java/azkaban/scheduler/Schedule.java
+++ b/src/java/azkaban/scheduler/Schedule.java
@@ -16,9 +16,7 @@
 
 package azkaban.scheduler;
 
-import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 
 import org.joda.time.DateTime;
@@ -32,153 +30,12 @@ import org.joda.time.ReadablePeriod;
 import org.joda.time.Seconds;
 import org.joda.time.Weeks;
 
-import azkaban.executor.ExecutableFlow.FailureAction;
-import azkaban.scheduler.Schedule.FlowOptions;
-import azkaban.scheduler.Schedule.SlaOptions;
-import azkaban.sla.SLA.SlaSetting;
+import azkaban.executor.ExecutionOptions;
+import azkaban.sla.SlaOptions;
 import azkaban.utils.Pair;
 
 public class Schedule{
 	
-	
-	public static class FlowOptions {
-		
-		public List<String> getFailureEmails() {
-			return failureEmails;
-		}
-		public void setFailureEmails(List<String> failureEmails) {
-			this.failureEmails = failureEmails;
-		}
-		public List<String> getSuccessEmails() {
-			return successEmails;
-		}
-		public void setSuccessEmails(List<String> successEmails) {
-			this.successEmails = successEmails;
-		}
-		public FailureAction getFailureAction() {
-			return failureAction;
-		}
-		public void setFailureAction(FailureAction failureAction) {
-			this.failureAction = failureAction;
-		}
-		public boolean isnotifyOnFirstFailure() {
-			return notifyOnFirstFailure;
-		}
-		public void setNotifyOnFirstFailure(boolean notifyOnFirstFailure) {
-			this.notifyOnFirstFailure = notifyOnFirstFailure;
-		}
-		public boolean isnotifyOnLastFailure() {
-			return notifyOnLastFailure;
-		}
-		public void setNotifyOnLastFailure(boolean notifyOnLastFailure) {
-			this.notifyOnLastFailure = notifyOnLastFailure;
-		}
-		public Map<String, String> getFlowOverride() {
-			return flowOverride;
-		}
-		public void setFlowOverride(Map<String, String> flowOverride) {
-			this.flowOverride = flowOverride;
-		}
-		public List<String> getDisabledJobs() {
-			return disabledJobs;
-		}
-		public void setDisabledJobs(List<String> disabledJobs) {
-			this.disabledJobs = disabledJobs;
-		}
-		private List<String> failureEmails;
-		private List<String> successEmails;
-		private FailureAction failureAction = FailureAction.FINISH_CURRENTLY_RUNNING;
-		private boolean notifyOnFirstFailure;
-		private boolean notifyOnLastFailure;
-		Map<String, String> flowOverride;
-		private List<String> disabledJobs;
-		public Object toObject() {
-			Map<String, Object> obj = new HashMap<String, Object>();
-			obj.put("failureEmails", failureEmails);
-			obj.put("successEmails", successEmails);
-			obj.put("failureAction", failureAction.toString());
-			obj.put("notifyOnFirstFailure", notifyOnFirstFailure);
-			obj.put("notifyOnLastFailure", notifyOnLastFailure);
-			obj.put("flowOverride", flowOverride);
-			obj.put("disabledJobs", disabledJobs);
-			return obj;
-		}
-		@SuppressWarnings("unchecked")
-		public static FlowOptions fromObject(Object object) {
-			if(object != null) {
-				FlowOptions flowOptions = new FlowOptions();
-				Map<String, Object> obj = (HashMap<String, Object>) object;
-				if(obj.containsKey("failureEmails")) {
-					flowOptions.setFailureEmails((List<String>) obj.get("failureEmails"));
-				}
-				if(obj.containsKey("successEmails")) {
-					flowOptions.setSuccessEmails((List<String>) obj.get("SuccessEmails"));
-				}
-				if(obj.containsKey("failureAction")) {
-					flowOptions.setFailureAction(FailureAction.valueOf((String)obj.get("failureAction")));
-				}
-				if(obj.containsKey("notifyOnFirstFailure")) {
-					flowOptions.setNotifyOnFirstFailure((Boolean)obj.get("notifyOnFirstFailure"));
-				}
-				if(obj.containsKey("notifyOnLastFailure")) {
-					flowOptions.setNotifyOnFirstFailure((Boolean)obj.get("notifyOnLastFailure"));
-				}
-				if(obj.containsKey("flowOverride")) {
-					flowOptions.setFlowOverride((Map<String, String>) obj.get("flowOverride"));
-				}
-				if(obj.containsKey("disabledJobs")) {
-					flowOptions.setDisabledJobs((List<String>) obj.get("disabledJobs"));
-				}
-				return flowOptions;
-			}
-			return null;
-		}
-	}
-
-	public static class SlaOptions {
-
-		public List<String> getSlaEmails() {
-			return slaEmails;
-		}
-		public void setSlaEmails(List<String> slaEmails) {
-			this.slaEmails = slaEmails;
-		}
-		public List<SlaSetting> getSettings() {
-			return settings;
-		}
-		public void setSettings(List<SlaSetting> settings) {
-			this.settings = settings;
-		}
-		private List<String> slaEmails;
-		private List<SlaSetting> settings;
-		public Object toObject() {
-			Map<String, Object> obj = new HashMap<String, Object>();
-			obj.put("slaEmails", slaEmails);
-			List<Object> slaSettings = new ArrayList<Object>();
-			for(SlaSetting s : settings) {
-				slaSettings.add(s.toObject());
-			}
-			obj.put("settings", slaSettings);
-			return obj;
-		}
-		@SuppressWarnings("unchecked")
-		public static SlaOptions fromObject(Object object) {
-			if(object != null) {
-				SlaOptions slaOptions = new SlaOptions();
-				Map<String, Object> obj = (HashMap<String, Object>) object;
-				slaOptions.setSlaEmails((List<String>) obj.get("slaEmails"));
-				List<SlaSetting> slaSets = new ArrayList<SlaSetting>();
-				for(Object set: (List<Object>)obj.get("settings")) {
-					slaSets.add(SlaSetting.fromObject(set));
-				}
-				slaOptions.setSettings(slaSets);
-				return slaOptions;
-			}
-			return null;			
-		}
-		
-	}
-	
 //	private long projectGuid;
 //	private long flowGuid;
 	
@@ -196,7 +53,7 @@ public class Schedule{
 	private String status;
 	private long submitTime;
 	
-	private FlowOptions flowOptions;
+	private ExecutionOptions executionOptions;
 	private SlaOptions slaOptions;
 	
 	public Schedule(
@@ -223,7 +80,7 @@ public class Schedule{
 		this.submitUser = submitUser;
 		this.status = status;
 		this.submitTime = submitTime;
-		this.flowOptions = null;
+		this.executionOptions = null;
 		this.slaOptions = null;
 	}
 
@@ -239,7 +96,7 @@ public class Schedule{
 						long nextExecTime,						
 						long submitTime,
 						String submitUser,
-						FlowOptions flowOptions,
+						ExecutionOptions executionOptions,
 						SlaOptions slaOptions
 			) {
 		this.projectId = projectId;
@@ -253,7 +110,7 @@ public class Schedule{
 		this.submitUser = submitUser;
 		this.status = status;
 		this.submitTime = submitTime;
-		this.flowOptions = flowOptions;
+		this.executionOptions = executionOptions;
 		this.slaOptions = slaOptions;
 	}
 
@@ -265,11 +122,11 @@ public class Schedule{
 						long firstSchedTime,
 						DateTimeZone timezone,
 						ReadablePeriod period,
-						long lastModifyTime,						
-						long nextExecTime,						
+						long lastModifyTime,
+						long nextExecTime,
 						long submitTime,
 						String submitUser,
-						FlowOptions flowOptions,
+						ExecutionOptions executionOptions,
 						SlaOptions slaOptions
 						) {
 		this.projectId = projectId;
@@ -283,16 +140,16 @@ public class Schedule{
 		this.submitUser = submitUser;
 		this.status = status;
 		this.submitTime = submitTime;
-		this.flowOptions = flowOptions;
+		this.executionOptions = executionOptions;
 		this.slaOptions = slaOptions;
 	}
 
-	public FlowOptions getFlowOptions() {
-		return flowOptions;
+	public ExecutionOptions getExecutionOptions() {
+		return executionOptions;
 	}
 
-	public void setFlowOptions(FlowOptions flowOptions) {
-		this.flowOptions = flowOptions;
+	public void setFlowOptions(ExecutionOptions executionOptions) {
+		this.executionOptions = executionOptions;
 	}
 
 	public SlaOptions getSlaOptions() {
@@ -463,13 +320,13 @@ public class Schedule{
 		return periodStr;
 	}
 	
-
+	
 	public Map<String,Object> optionsToObject() {
-		if(flowOptions != null || slaOptions != null) {
+		if(executionOptions != null || slaOptions != null) {
 			HashMap<String, Object> schedObj = new HashMap<String, Object>();
 			
-			if(flowOptions != null) {
-				schedObj.put("flowOptions", flowOptions.toObject());
+			if(executionOptions != null) {
+				schedObj.put("executionOptions", executionOptions.toObject());
 			}
 			if(slaOptions != null) {
 				schedObj.put("slaOptions", slaOptions.toObject());
@@ -480,26 +337,16 @@ public class Schedule{
 		return null;
 	}
 	
-	@SuppressWarnings("unchecked")
-	public static FlowOptions createFlowOptionFromObject(Object obj) {
-		if(obj != null) {
-			Map<String, Object> options = (HashMap<String, Object>) obj;
-			if(options.containsKey("flowOptions")) {
-				return FlowOptions.fromObject(options.get("flowOptions"));
-			}
-		}		
-		return null;
-	}
-	
-	@SuppressWarnings("unchecked")
-	public static SlaOptions createSlaOptionFromObject(Object obj) {
-		if(obj != null) {
-			Map<String, Object> options = (HashMap<String, Object>) obj;
-			if(options.containsKey("slaOptions")) {
-				return SlaOptions.fromObject(options.get("slaOptions"));
-			}
-		}		
-		return null;
+	public void createAndSetScheduleOptions(Object obj) {
+		@SuppressWarnings("unchecked")
+		HashMap<String, Object> schedObj = (HashMap<String, Object>)obj;
+		if (schedObj.containsKey("executionOptions")) {
+			ExecutionOptions execOptions = ExecutionOptions.createFromObject(schedObj.get("executionOptions"));
+			this.executionOptions = execOptions;
+		}
+		if (schedObj.containsKey("slaOptions")) {
+			SlaOptions slaOptions = SlaOptions.fromObject(schedObj.get("slaOptions"));
+			this.slaOptions = slaOptions;
+		}
 	}
-
 }
\ No newline at end of file
diff --git a/src/java/azkaban/scheduler/ScheduleManager.java b/src/java/azkaban/scheduler/ScheduleManager.java
index 7fad4e2..e298356 100644
--- a/src/java/azkaban/scheduler/ScheduleManager.java
+++ b/src/java/azkaban/scheduler/ScheduleManager.java
@@ -33,23 +33,20 @@ import org.joda.time.format.DateTimeFormat;
 import org.joda.time.format.DateTimeFormatter;
 
 import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutionOptions;
 import azkaban.executor.ExecutorManager;
-import azkaban.executor.ExecutorManagerException;
-import azkaban.executor.Status;
 
 import azkaban.flow.Flow;
 import azkaban.project.Project;
 import azkaban.project.ProjectManager;
 
-import azkaban.scheduler.Schedule.FlowOptions;
-import azkaban.scheduler.Schedule.SlaOptions;
 import azkaban.sla.SLA.SlaAction;
 import azkaban.sla.SLA.SlaRule;
 import azkaban.sla.SLAManager;
 import azkaban.sla.SLA.SlaSetting;
+import azkaban.sla.SlaOptions;
 import azkaban.utils.Pair;
 
-
 /**
  * The ScheduleManager stores and executes the schedule. It uses a single thread
  * instead and waits until correct loading time for the flow. It will not remove
@@ -127,8 +124,8 @@ public class ScheduleManager {
 	 * @param id
 	 * @return
 	*/
-	public Schedule getSchedule(Pair<Integer, String> scheduleId) {
-		return scheduleIDMap.get(scheduleId);
+	public Schedule getSchedule(int projectId, String flowId) {
+		return scheduleIDMap.get(new Pair<Integer,String>(projectId, flowId));
 	}
 
 	/**
@@ -136,7 +133,9 @@ public class ScheduleManager {
 	 * 
 	 * @param id
 	 */
-	public synchronized void removeSchedule(Pair<Integer, String> scheduleId) {
+	public synchronized void removeSchedule(int projectId, String flowId) {
+		Pair<Integer,String> scheduleId = new Pair<Integer,String>(projectId, flowId);
+		
 		Schedule sched = scheduleIDMap.get(scheduleId);
 		scheduleIDMap.remove(scheduleId);
 		
@@ -182,11 +181,27 @@ public class ScheduleManager {
 			final long lastModifyTime,
 			final long nextExecTime,
 			final long submitTime,
+			final String submitUser
+			) {
+		return scheduleFlow(projectId, projectName, flowName, status, firstSchedTime, timezone, period, lastModifyTime, nextExecTime, submitTime, submitUser, null, null);
+	}
+	
+	public Schedule scheduleFlow(
+			final int projectId,
+			final String projectName,
+			final String flowName,
+			final String status,
+			final long firstSchedTime,
+			final DateTimeZone timezone,
+			final ReadablePeriod period,
+			final long lastModifyTime,
+			final long nextExecTime,
+			final long submitTime,
 			final String submitUser,
-			final FlowOptions flowOptions,
-			final SlaOptions slaOptions
+			ExecutionOptions execOptions,
+			SlaOptions slaOptions
 			) {
-		Schedule sched = new Schedule(projectId, projectName, flowName, status, firstSchedTime, timezone, period, lastModifyTime, nextExecTime, submitTime, submitUser, flowOptions, slaOptions);
+		Schedule sched = new Schedule(projectId, projectName, flowName, status, firstSchedTime, timezone, period, lastModifyTime, nextExecTime, submitTime, submitUser, execOptions, slaOptions);
 		logger.info("Scheduling flow '" + sched.getScheduleName() + "' for "
 				+ _dateFormat.print(firstSchedTime) + " with a period of "
 				+ period == null ? "(non-recurring)" : period);
@@ -195,28 +210,6 @@ public class ScheduleManager {
 		return sched;
 	}
 
-//	/**
-//	 * Schedule the flow
-//	 * 
-//	 * @param flowId
-//	 * @param date
-//	 * @param ignoreDep
-//	 */
-//	public Schedule schedule(
-//			String scheduleId,
-//			String projectId,
-//			String flowId,
-//			String user, 
-//			String userSubmit,
-//			DateTime submitTime,
-//			DateTime firstSchedTime) 
-//	{
-//		logger.info("Scheduling flow '" + scheduleId + "' for " + _dateFormat.print(firstSchedTime));
-//		ScheduledFlow scheduleFlow = new ScheduledFlow(scheduleId, projectId, flowId, user, userSubmit, submitTime, firstSchedTime);
-//		schedule(scheduleFlow);
-//		return scheduleFlow;
-//	}
-
 	/**
 	 * Schedules the flow, but doesn't save the schedule afterwards.
 	 * 
@@ -381,31 +374,8 @@ public class ScheduleManager {
 										exflow.setSubmitUser(runningSched.getSubmitUser());
 										exflow.setProxyUsers(project.getProxyUsers());
 										
-										FlowOptions flowOptions = runningSched.getFlowOptions();
-										
-										if(flowOptions != null) {
-											if (flowOptions.getFailureAction() != null) {
-												exflow.setFailureAction(flowOptions.getFailureAction());
-											}
-											if (flowOptions.getFailureEmails() != null) {
-												exflow.setFailureEmails(flowOptions.getFailureEmails());
-											}
-											if (flowOptions.getSuccessEmails() != null) {
-												exflow.setSuccessEmails(flowOptions.getSuccessEmails());
-											}
-											exflow.setNotifyOnFirstFailure(flowOptions.isnotifyOnFirstFailure());
-											exflow.setNotifyOnLastFailure(flowOptions.isnotifyOnLastFailure());
-											
-											exflow.addFlowParameters(flowOptions.getFlowOverride());
-											
-											List<String> disabled = flowOptions.getDisabledJobs();
-											// Setup disabled
-											if(disabled != null) {
-												for (String job : disabled) {
-													exflow.setNodeStatus(job, Status.DISABLED);
-												}
-											}
-										}
+										ExecutionOptions flowOptions = runningSched.getExecutionOptions();
+										exflow.setExecutionOptions(flowOptions);
 										
 										try {
 											executorManager.submitExecutableFlow(exflow);
@@ -417,7 +387,6 @@ public class ScheduleManager {
 										}
 										
 										SlaOptions slaOptions = runningSched.getSlaOptions();
-										
 										if(slaOptions != null) {
 											// submit flow slas
 											List<SlaSetting> jobsettings = new ArrayList<SlaSetting>();
@@ -452,7 +421,7 @@ public class ScheduleManager {
 									loader.updateSchedule(runningSched);
 								}
 								else {
-									removeSchedule(runningSched.getScheduleId());
+									removeSchedule(runningSched.getProjectId(), runningSched.getFlowName());
 								}								
 							} else {
 								// wait until flow run
diff --git a/src/java/azkaban/sla/JdbcSLALoader.java b/src/java/azkaban/sla/JdbcSLALoader.java
index cf6cd2f..0ab7b3b 100644
--- a/src/java/azkaban/sla/JdbcSLALoader.java
+++ b/src/java/azkaban/sla/JdbcSLALoader.java
@@ -7,7 +7,6 @@ import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-import java.util.Map;
 
 import javax.sql.DataSource;
 
@@ -16,10 +15,7 @@ import org.apache.commons.dbutils.QueryRunner;
 import org.apache.commons.dbutils.ResultSetHandler;
 import org.apache.log4j.Logger;
 import org.joda.time.DateTime;
-import org.joda.time.DateTimeZone;
-import org.joda.time.ReadablePeriod;
 
-import azkaban.scheduler.ScheduleManagerException;
 import azkaban.sla.SLA.SlaRule;
 import azkaban.utils.DataSourceUtils;
 import azkaban.utils.GZIPUtils;
diff --git a/src/java/azkaban/sla/SlaMailer.java b/src/java/azkaban/sla/SlaMailer.java
index 9b19da8..db9e24d 100644
--- a/src/java/azkaban/sla/SlaMailer.java
+++ b/src/java/azkaban/sla/SlaMailer.java
@@ -1,16 +1,11 @@
 package azkaban.sla;
 
-import java.util.ArrayList;
 import java.util.List;
 
 import javax.mail.MessagingException;
 
 import org.apache.log4j.Logger;
 
-import azkaban.executor.ExecutableFlow;
-import azkaban.executor.ExecutableFlow.FailureAction;
-import azkaban.executor.ExecutableNode;
-import azkaban.executor.Status;
 import azkaban.sla.SLA;
 import azkaban.utils.EmailMessage;
 import azkaban.utils.Props;
@@ -42,147 +37,6 @@ public class SlaMailer {
 		testMode = props.getBoolean("test.mode", false);
 	}
 	
-	public void sendFirstErrorMessage(ExecutableFlow flow) {
-		List<String> emailList = flow.getFailureEmails();
-		int execId = flow.getExecutionId();
-		
-		if (emailList != null && !emailList.isEmpty()) {
-			EmailMessage message = new EmailMessage(mailHost, mailUser, mailPassword);
-			message.setFromAddress(mailSender);
-			message.addAllToAddress(emailList);
-			message.setMimeType("text/html");
-			message.setSubject("Flow '" + flow.getFlowId() + "' has failed on " + azkabanName);
-			
-			message.println("<h2 style=\"color:#FF0000\"> Execution '" + flow.getExecutionId() + "' of flow '" + flow.getFlowId() + "' has encountered a failure on " + azkabanName + "</h2>");
-			
-			if (flow.getFailureAction() == FailureAction.CANCEL_ALL) {
-				message.println("This flow is set to cancel all currently running jobs.");
-			}
-			else if (flow.getFailureAction() == FailureAction.FINISH_ALL_POSSIBLE){
-				message.println("This flow is set to complete all jobs that aren't blocked by the failure.");
-			}
-			else {
-				message.println("This flow is set to complete all currently running jobs before stopping.");
-			}
-			
-			message.println("<table>");
-			message.println("<tr><td>Start Time</td><td>" + flow.getStartTime() +"</td></tr>");
-			message.println("<tr><td>End Time</td><td>" + flow.getEndTime() +"</td></tr>");
-			message.println("<tr><td>Duration</td><td>" + Utils.formatDuration(flow.getStartTime(), flow.getEndTime()) +"</td></tr>");
-			message.println("</table>");
-			message.println("");
-			String executionUrl = "https://" + clientHostname + ":" + clientPortNumber + "/" + "executor?" + "execid=" + execId;
-			message.println("<a href='\"" + executionUrl + "\">" + flow.getFlowId() + " Execution Link</a>");
-			
-			message.println("");
-			message.println("<h3>Reason</h3>");
-			List<String> failedJobs = findFailedJobs(flow);
-			message.println("<ul>");
-			for (String jobId : failedJobs) {
-				message.println("<li><a href=\"" + executionUrl + "&job=" + jobId + "\">Failed job '" + jobId + "' Link</a></li>" );
-			}
-			
-			message.println("</ul>");
-			
-			if (!testMode) {
-				try {
-					message.sendEmail();
-				} catch (MessagingException e) {
-					logger.error("Email message send failed" , e);
-				}
-			}
-		}
-	}
-	
-	public void sendErrorEmail(ExecutableFlow flow, String ... extraReasons) {
-		List<String> emailList = flow.getFailureEmails();
-		int execId = flow.getExecutionId();
-		
-		if (emailList != null && !emailList.isEmpty()) {
-			EmailMessage message = new EmailMessage(mailHost, mailUser, mailPassword);
-			message.setFromAddress(mailSender);
-			message.addAllToAddress(emailList);
-			message.setMimeType("text/html");
-			message.setSubject("Flow '" + flow.getFlowId() + "' has failed on " + azkabanName);
-			
-			message.println("<h2 style=\"color:#FF0000\"> Execution '" + flow.getExecutionId() + "' of flow '" + flow.getFlowId() + "' has failed on " + azkabanName + "</h2>");
-			message.println("<table>");
-			message.println("<tr><td>Start Time</td><td>" + flow.getStartTime() +"</td></tr>");
-			message.println("<tr><td>End Time</td><td>" + flow.getEndTime() +"</td></tr>");
-			message.println("<tr><td>Duration</td><td>" + Utils.formatDuration(flow.getStartTime(), flow.getEndTime()) +"</td></tr>");
-			message.println("</table>");
-			message.println("");
-			String executionUrl = "https://" + clientHostname + ":" + clientPortNumber + "/" + "executor?" + "execid=" + execId;
-			message.println("<a href='\"" + executionUrl + "\">" + flow.getFlowId() + " Execution Link</a>");
-			
-			message.println("");
-			message.println("<h3>Reason</h3>");
-			List<String> failedJobs = findFailedJobs(flow);
-			message.println("<ul>");
-			for (String jobId : failedJobs) {
-				message.println("<li><a href=\"" + executionUrl + "&job=" + jobId + "\">Failed job '" + jobId + "' Link</a></li>" );
-			}
-			for (String reasons: extraReasons) {
-				message.println("<li>" + reasons + "</li>");
-			}
-			
-			message.println("</ul>");
-			
-			
-			
-			if (!testMode) {
-				try {
-					message.sendEmail();
-				} catch (MessagingException e) {
-					logger.error("Email message send failed" , e);
-				}
-			}
-		}
-	}
-
-	public void sendSuccessEmail(ExecutableFlow flow) {
-		List<String> emailList = flow.getSuccessEmails();
-
-		int execId = flow.getExecutionId();
-		
-		if (emailList != null && !emailList.isEmpty()) {
-			EmailMessage message = new EmailMessage(mailHost, mailUser, mailPassword);
-			message.setFromAddress(mailSender);
-			message.addAllToAddress(emailList);
-			message.setMimeType("text/html");
-			message.setSubject("Flow '" + flow.getFlowId() + "' has succeeded on " + azkabanName);
-			
-			message.println("<h2> Execution '" + flow.getExecutionId() + "' of flow '" + flow.getFlowId() + "' has succeeded on " + azkabanName + "</h2>");
-			message.println("<table>");
-			message.println("<tr><td>Start Time</td><td>" + flow.getStartTime() +"</td></tr>");
-			message.println("<tr><td>End Time</td><td>" + flow.getEndTime() +"</td></tr>");
-			message.println("<tr><td>Duration</td><td>" + Utils.formatDuration(flow.getStartTime(), flow.getEndTime()) +"</td></tr>");
-			message.println("</table>");
-			message.println("");
-			String executionUrl = "https://" + clientHostname + ":" + clientPortNumber + "/" + "executor?" + "execid=" + execId;
-			message.println("<a href=\"" + executionUrl + "\">" + flow.getFlowId() + " Execution Link</a>");
-			
-			if (!testMode) {
-				try {
-					message.sendEmail();
-				} catch (MessagingException e) {
-					logger.error("Email message send failed" , e);
-				}
-			}
-		}
-	}
-	
-	private List<String> findFailedJobs(ExecutableFlow flow) {
-		ArrayList<String> failedJobs = new ArrayList<String>();
-		for (ExecutableNode node: flow.getExecutableNodes()) {
-			if (node.getStatus() == Status.FAILED) {
-				failedJobs.add(node.getJobId());
-			}
-		}
-		
-		return failedJobs;
-	}
-
 	public void sendSlaEmail(SLA s, String ... extraReasons) {
 		List<String> emailList = s.getEmails();
 		
diff --git a/src/java/azkaban/sla/SlaOptions.java b/src/java/azkaban/sla/SlaOptions.java
new file mode 100644
index 0000000..5fb2ff6
--- /dev/null
+++ b/src/java/azkaban/sla/SlaOptions.java
@@ -0,0 +1,51 @@
+package azkaban.sla;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import azkaban.sla.SLA.SlaSetting;
+
+public class SlaOptions {
+
+	public List<String> getSlaEmails() {
+		return slaEmails;
+	}
+	public void setSlaEmails(List<String> slaEmails) {
+		this.slaEmails = slaEmails;
+	}
+	public List<SlaSetting> getSettings() {
+		return settings;
+	}
+	public void setSettings(List<SlaSetting> settings) {
+		this.settings = settings;
+	}
+	private List<String> slaEmails;
+	private List<SlaSetting> settings;
+	public Object toObject() {
+		Map<String, Object> obj = new HashMap<String, Object>();
+		obj.put("slaEmails", slaEmails);
+		List<Object> slaSettings = new ArrayList<Object>();
+		for(SlaSetting s : settings) {
+			slaSettings.add(s.toObject());
+		}
+		obj.put("settings", slaSettings);
+		return obj;
+	}
+	@SuppressWarnings("unchecked")
+	public static SlaOptions fromObject(Object object) {
+		if(object != null) {
+			SlaOptions slaOptions = new SlaOptions();
+			Map<String, Object> obj = (HashMap<String, Object>) object;
+			slaOptions.setSlaEmails((List<String>) obj.get("slaEmails"));
+			List<SlaSetting> slaSets = new ArrayList<SlaSetting>();
+			for(Object set: (List<Object>)obj.get("settings")) {
+				slaSets.add(SlaSetting.fromObject(set));
+			}
+			slaOptions.setSettings(slaSets);
+			return slaOptions;
+		}
+		return null;			
+	}
+}
\ No newline at end of file
diff --git a/src/java/azkaban/webapp/servlet/AbstractAzkabanServlet.java b/src/java/azkaban/webapp/servlet/AbstractAzkabanServlet.java
index 253c7e9..e257bc5 100644
--- a/src/java/azkaban/webapp/servlet/AbstractAzkabanServlet.java
+++ b/src/java/azkaban/webapp/servlet/AbstractAzkabanServlet.java
@@ -18,7 +18,6 @@ package azkaban.webapp.servlet;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Enumeration;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -102,7 +101,7 @@ public abstract class AbstractAzkabanServlet extends HttpServlet {
 	 * @return
 	 */
 	public boolean hasParam(HttpServletRequest request, String param) {
-		return request.getParameter(param) != null;
+		return HttpRequestUtils.hasParam(request, param);
 	}
 
 	/**
@@ -115,13 +114,7 @@ public abstract class AbstractAzkabanServlet extends HttpServlet {
 	 * @throws ServletException
 	 */
 	public String getParam(HttpServletRequest request, String name) throws ServletException {
-		String p = request.getParameter(name);
-		if (p == null) {
-			throw new ServletException("Missing required parameter '" + name + "'.");
-		}
-		else {
-			return p;
-		}
+		return HttpRequestUtils.getParam(request, name);
 	}
 
 	/**
@@ -134,11 +127,7 @@ public abstract class AbstractAzkabanServlet extends HttpServlet {
 	 * @return
 	 */
 	public String getParam(HttpServletRequest request, String name, String defaultVal){
-		String p = request.getParameter(name);
-		if (p == null) {
-			return defaultVal;
-		}
-		return p;
+		return HttpRequestUtils.getParam(request, name, defaultVal);
 	}
 
 	
@@ -152,54 +141,24 @@ public abstract class AbstractAzkabanServlet extends HttpServlet {
 	 * @throws ServletException
 	 */
 	public int getIntParam(HttpServletRequest request, String name) throws ServletException {
-		String p = getParam(request, name);
-		return Integer.parseInt(p);
+		return HttpRequestUtils.getIntParam(request, name);
 	}
 	
 	public int getIntParam(HttpServletRequest request, String name, int defaultVal) {
-		if (hasParam(request, name)) {
-			try {
-				return getIntParam(request, name);
-			} catch (Exception e) {
-				return defaultVal;
-			}
-		}
-		
-		return defaultVal;
+		return HttpRequestUtils.getIntParam(request, name, defaultVal);
 	}
 
 	public long getLongParam(HttpServletRequest request, String name) throws ServletException {
-		String p = getParam(request, name);
-		return Long.valueOf(p);
+		return HttpRequestUtils.getLongParam(request, name);
 	}
 	
 	public long getLongParam(HttpServletRequest request, String name, long defaultVal) {
-		if (hasParam(request, name)) {
-			try {
-				return getLongParam(request, name);
-			} catch (Exception e) {
-				return defaultVal;
-			}
-		}
-		
-		return defaultVal;
+		return HttpRequestUtils.getLongParam(request, name, defaultVal);
 	}
 
 	
 	public Map<String, String> getParamGroup(HttpServletRequest request, String groupName)  throws ServletException {
-		@SuppressWarnings("unchecked")
-		Enumeration<Object> enumerate = (Enumeration<Object>)request.getParameterNames();
-		String matchString = groupName + "[";
-
-		HashMap<String, String> groupParam = new HashMap<String, String>();
-		while( enumerate.hasMoreElements() ) {
-			String str = (String)enumerate.nextElement();
-			if (str.startsWith(matchString)) {
-				groupParam.put(str.substring(matchString.length(), str.length() - 1), request.getParameter(str));
-			}
-			
-		}
-		return groupParam;
+		return HttpRequestUtils.getParamGroup(request, groupName);
 	}
 	
 	/**
diff --git a/src/java/azkaban/webapp/servlet/ExecutorServlet.java b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
index 524c223..f5f8c51 100644
--- a/src/java/azkaban/webapp/servlet/ExecutorServlet.java
+++ b/src/java/azkaban/webapp/servlet/ExecutorServlet.java
@@ -18,22 +18,19 @@ package azkaban.webapp.servlet;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
 import javax.servlet.ServletConfig;
 import javax.servlet.ServletException;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
-import org.apache.commons.lang.StringUtils;
-
 import azkaban.executor.ExecutableFlow;
 import azkaban.executor.ExecutableNode;
-import azkaban.executor.ExecutableFlow.FailureAction;
+import azkaban.executor.ExecutionOptions;
+import azkaban.executor.ExecutionOptions.FailureAction;
 import azkaban.executor.ExecutorManager;
 import azkaban.executor.ExecutorManagerException;
 import azkaban.executor.Status;
@@ -158,6 +155,7 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
 			return;
 		}
 		
+		page.add("projectId", project.getId());
 		page.add("projectName", project.getName());
 		page.add("flowid", flow.getFlowId());
 		
@@ -485,11 +483,13 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
 			return;
 		}
 		
-		ret.put("successEmails", exflow.getSuccessEmails());
-		ret.put("failureEmails", exflow.getFailureEmails());
-		ret.put("flowParam", exflow.getFlowParameters());
+		ExecutionOptions options = exflow.getExecutionOptions();
+		
+		ret.put("successEmails", options.getSuccessEmails());
+		ret.put("failureEmails", options.getFailureEmails());
+		ret.put("flowParam", options.getFlowParameters());
 		
-		FailureAction action = exflow.getFailureAction();
+		FailureAction action = options.getFailureAction();
 		String failureAction = null;
 		switch (action) {
 			case FINISH_CURRENTLY_RUNNING:
@@ -504,21 +504,23 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
 		}
 		ret.put("failureAction", failureAction);
 		
-		ret.put("notifyFailureFirst", exflow.getNotifyOnFirstFailure());
-		ret.put("notifyFailureLast", exflow.getNotifyOnLastFailure());
+		ret.put("notifyFailureFirst", options.getNotifyOnFirstFailure());
+		ret.put("notifyFailureLast", options.getNotifyOnLastFailure());
 		
-		ret.put("concurrentOptions", exflow.getConcurrentOption());
-		ret.put("pipelineLevel", exflow.getPipelineLevel());
-		ret.put("pipelineExecution", exflow.getPipelineExecutionId());
-		ret.put("queueLevel", exflow.getQueueLevel());
+		ret.put("concurrentOptions", options.getConcurrentOption());
+		ret.put("pipelineLevel", options.getPipelineLevel());
+		ret.put("pipelineExecution", options.getPipelineExecutionId());
+		ret.put("queueLevel", options.getQueueLevel());
 		
 		HashMap<String, String> nodeStatus = new HashMap<String,String>();
 		for(ExecutableNode node : exflow.getExecutableNodes()) {
 			nodeStatus.put(node.getJobId(), node.getStatus().toString());
 		}
 		ret.put("nodeStatus", nodeStatus);
+		ret.put("disabled", options.getDisabledJobs());
+		
+		Schedule sflow = scheduleManager.getSchedule(project.getId(), exflow.getFlowId());
 		
-		Schedule sflow = null;
 		for (Schedule sched: scheduleManager.getSchedules()) {
 			if (sched.getProjectId() == project.getId() && sched.getFlowName().equals(exflow.getFlowId())) {
 				sflow = sched;
@@ -715,85 +717,8 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
 		exflow.setSubmitUser(user.getUserId());
 		exflow.setProxyUsers(project.getProxyUsers());
 
-		if (hasParam(req, "failureAction")) {
-			String option = getParam(req, "failureAction");
-			if (option.equals("finishCurrent") ) {
-				exflow.setFailureAction(FailureAction.FINISH_CURRENTLY_RUNNING);
-			}
-			else if (option.equals("cancelImmediately")) {
-				exflow.setFailureAction(FailureAction.CANCEL_ALL);
-			}
-			else if (option.equals("finishPossible")) {
-				exflow.setFailureAction(FailureAction.FINISH_ALL_POSSIBLE);
-			}
-		}
-
-		String concurrentOption = "skip";
-		if (hasParam(req, "failureEmails")) {
-			String emails = getParam(req, "failureEmails");
-			String[] emailSplit = emails.split("\\s*,\\s*|\\s*;\\s*|\\s+");
-			exflow.setFailureEmails(Arrays.asList(emailSplit));
-		}
-		if (hasParam(req, "successEmails")) {
-			String emails = getParam(req, "successEmails");
-			String[] emailSplit = emails.split("\\s*,\\s*|\\s*;\\s*|\\s+");
-			exflow.setSuccessEmails(Arrays.asList(emailSplit));
-		}
-		if (hasParam(req, "notifyFailureFirst")) {
-			exflow.setNotifyOnFirstFailure(Boolean.parseBoolean(getParam(req, "notifyFailureFirst")));
-		}
-		if (hasParam(req, "notifyFailureLast")) {
-			exflow.setNotifyOnLastFailure(Boolean.parseBoolean(getParam(req, "notifyFailureLast")));
-		}
-		if (hasParam(req, "concurrentOption")) {
-			concurrentOption = getParam(req, "concurrentOption");
-			exflow.setConcurrentOption(concurrentOption);
-			if (concurrentOption.equals("pipeline")) {
-				int pipelineLevel = getIntParam(req, "pipelineLevel");
-				exflow.setPipelineLevel(pipelineLevel);
-			}
-			else if (concurrentOption.equals("queue")) {
-				// Not yet implemented
-				int queueLevel = getIntParam(req, "queueLevel", 1);
-				exflow.setPipelineLevel(queueLevel);
-			}
-		}
-		
-		Map<String, String> flowParamGroup = this.getParamGroup(req, "flowOverride");
-		exflow.addFlowParameters(flowParamGroup);
-		
-		if (hasParam(req, "job")) {
-			// Disable everything.
-			for(ExecutableNode node : exflow.getExecutableNodes()) {
-				node.setStatus(Status.DISABLED);
-			}
-			
-			String jobId = getParam(req, "job");
-			ExecutableNode job = exflow.getExecutableNode(jobId);
-			if (job == null) {
-				ret.put("error", "Job " + jobId + " doesn't exist in flow.");
-				return;
-			}
-			
-			job.setStatus(Status.READY);
-			
-			if (hasParam(req, "withDep")) {
-				boolean withDep = "true".equals(getParam(req, "withDep"));
-				if (withDep) {
-					enableAllAncestors(job, exflow);
-				}
-			}
-		}
-		else if (hasParam(req, "disabled")) {
-			String disabled = getParam(req, "disabled");
-			String[] disabledNodes = disabled.split("\\s*,\\s*");
-			
-			for (String node: disabledNodes) {
-				if (!node.isEmpty()) {
-					exflow.setNodeStatus(node, Status.DISABLED);
-				}
-			}
-		}
+		ExecutionOptions options = HttpRequestUtils.parseFlowOptions(req);
+		exflow.setExecutionOptions(options);
 		
 		try {
 			String message = executorManager.submitExecutableFlow(exflow);
@@ -807,19 +732,6 @@ public class ExecutorServlet extends LoginAbstractAzkabanServlet {
 		ret.put("execid", exflow.getExecutionId());
 	}
 	
-	private void enableAllAncestors(ExecutableNode node, ExecutableFlow flow) {
-		Set<String> inNodes = node.getInNodes();
-		if (inNodes != null) {
-			for (String inNode: inNodes) {
-				ExecutableNode job = flow.getExecutableNode(inNode);
-				if (job != null) {
-					job.setStatus(Status.READY);
-					enableAllAncestors(job, flow);
-				}
-			}
-		}
-	}
-	
 	public class ExecutorVelocityHelper {
 		public String getProjectName(int id) {
 			Project project = projectManager.getProject(id);
diff --git a/src/java/azkaban/webapp/servlet/HttpRequestUtils.java b/src/java/azkaban/webapp/servlet/HttpRequestUtils.java
new file mode 100644
index 0000000..d1e55da
--- /dev/null
+++ b/src/java/azkaban/webapp/servlet/HttpRequestUtils.java
@@ -0,0 +1,183 @@
+package azkaban.webapp.servlet;
+
+import java.util.Arrays;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+
+import azkaban.executor.ExecutionOptions;
+import azkaban.executor.ExecutionOptions.FailureAction;
+
+public class HttpRequestUtils {
+	public static ExecutionOptions parseFlowOptions(HttpServletRequest req) throws ServletException {
+		ExecutionOptions execOptions = new ExecutionOptions();
+		
+		if (hasParam(req, "failureAction")) {
+			String option = getParam(req, "failureAction");
+			if (option.equals("finishCurrent") ) {
+				execOptions.setFailureAction(FailureAction.FINISH_CURRENTLY_RUNNING);
+			}
+			else if (option.equals("cancelImmediately")) {
+				execOptions.setFailureAction(FailureAction.CANCEL_ALL);
+			}
+			else if (option.equals("finishPossible")) {
+				execOptions.setFailureAction(FailureAction.FINISH_ALL_POSSIBLE);
+			}
+		}
+
+		if (hasParam(req, "failureEmails")) {
+			String emails = getParam(req, "failureEmails");
+			String[] emailSplit = emails.split("\\s*,\\s*|\\s*;\\s*|\\s+");
+			execOptions.setFailureEmails(Arrays.asList(emailSplit));
+		}
+		if (hasParam(req, "successEmails")) {
+			String emails = getParam(req, "successEmails");
+			String[] emailSplit = emails.split("\\s*,\\s*|\\s*;\\s*|\\s+");
+			execOptions.setSuccessEmails(Arrays.asList(emailSplit));
+		}
+		if (hasParam(req, "notifyFailureFirst")) {
+			execOptions.setNotifyOnFirstFailure(Boolean.parseBoolean(getParam(req, "notifyFailureFirst")));
+		}
+		if (hasParam(req, "notifyFailureLast")) {
+			execOptions.setNotifyOnLastFailure(Boolean.parseBoolean(getParam(req, "notifyFailureLast")));
+		}
+
+		String concurrentOption = "skip";
+		if (hasParam(req, "concurrentOption")) {
+			concurrentOption = getParam(req, "concurrentOption");
+			execOptions.setConcurrentOption(concurrentOption);
+			if (concurrentOption.equals("pipeline")) {
+				int pipelineLevel = getIntParam(req, "pipelineLevel");
+				execOptions.setPipelineLevel(pipelineLevel);
+			}
+			else if (concurrentOption.equals("queue")) {
+				// Not yet implemented
+				int queueLevel = getIntParam(req, "queueLevel", 1);
+				execOptions.setPipelineLevel(queueLevel);
+			}
+		}
+		
+		Map<String, String> flowParamGroup = getParamGroup(req, "flowOverride");
+		execOptions.setFlowParameters(flowParamGroup);
+		
+		if (hasParam(req, "disabled")) {
+			String disabled = getParam(req, "disabled");
+			String[] disabledNodes = disabled.split("\\s*,\\s*");
+			
+			execOptions.setDisabledJobs(Arrays.asList(disabledNodes));
+		}
+		return execOptions;
+	}
+	
+	/**
+	 * Checks for the existance of the parameter in the request
+	 * 
+	 * @param request
+	 * @param param
+	 * @return
+	 */
+	public static boolean hasParam(HttpServletRequest request, String param) {
+		return request.getParameter(param) != null;
+	}
+
+	/**
+	 * Retrieves the param from the http servlet request. Will throw an
+	 * exception if not found
+	 * 
+	 * @param request
+	 * @param name
+	 * @return
+	 * @throws ServletException
+	 */
+	public static String getParam(HttpServletRequest request, String name) throws ServletException {
+		String p = request.getParameter(name);
+		if (p == null) {
+			throw new ServletException("Missing required parameter '" + name + "'.");
+		}
+		else {
+			return p;
+		}
+	}
+
+	/**
+	 * Retrieves the param from the http servlet request.
+	 * 
+	 * @param request
+	 * @param name
+	 * @param default
+	 * 
+	 * @return
+	 */
+	public static String getParam(HttpServletRequest request, String name, String defaultVal){
+		String p = request.getParameter(name);
+		if (p == null) {
+			return defaultVal;
+		}
+		return p;
+	}
+
+	
+	/**
+	 * Returns the param and parses it into an int. Will throw an exception if
+	 * not found, or a parse error if the type is incorrect.
+	 * 
+	 * @param request
+	 * @param name
+	 * @return
+	 * @throws ServletException
+	 */
+	public static int getIntParam(HttpServletRequest request, String name) throws ServletException {
+		String p = getParam(request, name);
+		return Integer.parseInt(p);
+	}
+	
+	public static int getIntParam(HttpServletRequest request, String name, int defaultVal) {
+		if (hasParam(request, name)) {
+			try {
+				return getIntParam(request, name);
+			} catch (Exception e) {
+				return defaultVal;
+			}
+		}
+		
+		return defaultVal;
+	}
+
+	public static long getLongParam(HttpServletRequest request, String name) throws ServletException {
+		String p = getParam(request, name);
+		return Long.valueOf(p);
+	}
+	
+	public static long getLongParam(HttpServletRequest request, String name, long defaultVal) {
+		if (hasParam(request, name)) {
+			try {
+				return getLongParam(request, name);
+			} catch (Exception e) {
+				return defaultVal;
+			}
+		}
+		
+		return defaultVal;
+	}
+
+	
+	public static Map<String, String> getParamGroup(HttpServletRequest request, String groupName)  throws ServletException {
+		@SuppressWarnings("unchecked")
+		Enumeration<Object> enumerate = (Enumeration<Object>)request.getParameterNames();
+		String matchString = groupName + "[";
+
+		HashMap<String, String> groupParam = new HashMap<String, String>();
+		while( enumerate.hasMoreElements() ) {
+			String str = (String)enumerate.nextElement();
+			if (str.startsWith(matchString)) {
+				groupParam.put(str.substring(matchString.length(), str.length() - 1), request.getParameter(str));
+			}
+			
+		}
+		return groupParam;
+	}
+	
+}
diff --git a/src/java/azkaban/webapp/servlet/ScheduleServlet.java b/src/java/azkaban/webapp/servlet/ScheduleServlet.java
index 13de687..c382bbe 100644
--- a/src/java/azkaban/webapp/servlet/ScheduleServlet.java
+++ b/src/java/azkaban/webapp/servlet/ScheduleServlet.java
@@ -31,46 +31,35 @@ import javax.servlet.http.HttpServletResponse;
 import org.apache.log4j.Logger;
 import org.joda.time.DateTime;
 import org.joda.time.DateTimeZone;
-import org.joda.time.Hours;
 import org.joda.time.LocalDateTime;
 import org.joda.time.Minutes;
 import org.joda.time.ReadablePeriod;
 import org.joda.time.format.DateTimeFormat;
 
-import azkaban.executor.ExecutableFlow;
-import azkaban.executor.ExecutableNode;
-import azkaban.executor.ExecutorManagerException;
-import azkaban.executor.ExecutableFlow.FailureAction;
+import azkaban.executor.ExecutionOptions;
 import azkaban.flow.Flow;
 import azkaban.flow.Node;
 import azkaban.project.Project;
 import azkaban.project.ProjectManager;
 import azkaban.project.ProjectLogEvent.EventType;
 import azkaban.user.Permission;
-import azkaban.user.Role;
 import azkaban.user.User;
 import azkaban.user.Permission.Type;
-import azkaban.user.UserManager;
-import azkaban.utils.Pair;
 import azkaban.webapp.AzkabanWebServer;
 import azkaban.webapp.session.Session;
 import azkaban.scheduler.Schedule;
-import azkaban.scheduler.Schedule.FlowOptions;
-import azkaban.scheduler.Schedule.SlaOptions;
 import azkaban.scheduler.ScheduleManager;
 import azkaban.sla.SLA;
 import azkaban.sla.SLA.SlaRule;
-import azkaban.sla.SLAManager;
 import azkaban.sla.SLA.SlaAction;
 import azkaban.sla.SLA.SlaSetting;
+import azkaban.sla.SlaOptions;
 
 public class ScheduleServlet extends LoginAbstractAzkabanServlet {
 	private static final long serialVersionUID = 1L;
 	private static final Logger logger = Logger.getLogger(ScheduleServlet.class);
 	private ProjectManager projectManager;
 	private ScheduleManager scheduleManager;
-	private SLAManager slaManager;
-	private UserManager userManager;
 
 	@Override
 	public void init(ServletConfig config) throws ServletException {
@@ -78,8 +67,6 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
 		AzkabanWebServer server = (AzkabanWebServer)getApplication();
 		projectManager = server.getProjectManager();
 		scheduleManager = server.getScheduleManager();
-		userManager = server.getUserManager();
-		slaManager = server.getSLAManager();
 	}
 	
 	@Override
@@ -103,8 +90,8 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
 		else if(ajaxName.equals("setSla")) {
 			ajaxSetSla(req, ret, session.getUser());
 		}
-		else if(ajaxName.equals("advSchedule")) {
-			ajaxAdvSchedule(req, ret, session.getUser());
+		else if(ajaxName.equals("scheduleFlow")) {
+			ajaxScheduleFlow(req, ret, session.getUser());
 		}
 
 		if (ret != null) {
@@ -124,7 +111,7 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
 				return;
 			}
 			
-			Schedule sched = scheduleManager.getSchedule(new Pair<Integer, String>(projectId, flowName));
+			Schedule sched = scheduleManager.getSchedule(projectId, flowName);
 			
 			SlaOptions slaOptions= new SlaOptions();
 			
@@ -167,7 +154,6 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
 		}
 		
 	}
-
 	
 	private SlaSetting parseSlaSetting(String set) {
 		// "" + Duration + EmailAction + KillAction
@@ -202,7 +188,6 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
 		return Minutes.minutes(min+hour*60).toPeriod();
 	}
 
-	@SuppressWarnings("unchecked")
 	private void ajaxSlaInfo(HttpServletRequest req, HashMap<String, Object> ret, User user) {
 		int projId;
 		String flowName;
@@ -222,10 +207,10 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
 				return;
 			}
 			
-			Schedule sched = scheduleManager.getSchedule(new Pair<Integer, String>(projId, flowName));
+			Schedule sched = scheduleManager.getSchedule(projId, flowName);
 			
 			SlaOptions slaOptions = sched.getSlaOptions();
-			FlowOptions flowOptions = sched.getFlowOptions();
+			ExecutionOptions flowOptions = sched.getExecutionOptions();
 			
 			if(slaOptions != null) {
 				ret.put("slaEmails", slaOptions.getSlaEmails());
@@ -333,8 +318,7 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
 	private void ajaxRemoveSched(HttpServletRequest req, Map<String, Object> ret, User user) throws ServletException{
 		int projectId = getIntParam(req, "projectId");
 		String flowName = getParam(req, "flowName");
-		Pair<Integer, String> scheduleId = new Pair<Integer, String>(projectId, flowName);
-		Schedule sched = scheduleManager.getSchedule(scheduleId);
+		Schedule sched = scheduleManager.getSchedule(projectId, flowName);
 
 //		int projectId = sched.getProjectId();
 
@@ -352,98 +336,18 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
 			return;
 		}
 
-		scheduleManager.removeSchedule(scheduleId);
+		scheduleManager.removeSchedule(projectId, flowName);
 		logger.info("User '" + user.getUserId() + " has removed schedule " + sched.getScheduleName());
 		projectManager.postProjectEvent(project, EventType.SCHEDULE, user.getUserId(), "Schedule " + sched.toString() + " has been removed.");
 		
 		ret.put("status", "success");
-		ret.put("message", "flow " + scheduleId.getSecond() + " removed from Schedules.");
+		ret.put("message", "flow " + flowName + " removed from Schedules.");
 		return;
 	}
-	
-	private void ajaxScheduleFlow(HttpServletRequest req, Map<String, Object> ret, User user) throws ServletException {
-		String projectName = getParam(req, "projectName");
-		String flowName = getParam(req, "flowName");
-		int projectId = getIntParam(req, "projectId");
-		
-		Project project = projectManager.getProject(projectId);
-			
-		if (project == null) {
-			ret.put("message", "Project " + projectName + " does not exist");
-			ret.put("status", "error");
-			return;
-		}
-		
-		if (!hasPermission(project, user, Type.SCHEDULE)) {
-			ret.put("status", "error");
-			ret.put("message", "Permission denied. Cannot execute " + flowName);
-			return;
-		}
-
-		Flow flow = project.getFlow(flowName);
-		if (flow == null) {
-			ret.put("status", "error");
-			ret.put("message", "Flow " + flowName + " cannot be found in project " + project);
-			return;
-		}
-		
-		int hour = getIntParam(req, "hour");
-		int minutes = getIntParam(req, "minutes");
-		boolean isPm = getParam(req, "am_pm").equalsIgnoreCase("pm");
-		
-		DateTimeZone timezone = getParam(req,  "timezone").equals("UTC") ? DateTimeZone.UTC : DateTimeZone.forID("America/Los_Angeles");
-
-		String scheduledDate = req.getParameter("date");
-		DateTime day = null;
-		if(scheduledDate == null || scheduledDate.trim().length() == 0) {
-			day = new LocalDateTime().toDateTime();
-		} else {
-		    try {
-		    	day = DateTimeFormat.forPattern("MM/dd/yyyy").withZone(timezone).parseDateTime(scheduledDate);
-		    } catch(IllegalArgumentException e) {
-		      	ret.put("error", "Invalid date: '" + scheduledDate + "'");
-		      	return;
-		      }
-		}
-
-		ReadablePeriod thePeriod = null;
-		try {
-			if(hasParam(req, "is_recurring"))
-			    thePeriod = Schedule.parsePeriodString(getParam(req, "period")+getParam(req,"period_units"));	
-		}
-		catch(Exception e){
-			ret.put("error", e.getMessage());
-		}
-
-		if(isPm && hour < 12)
-		    hour += 12;
-		hour %= 24;
-
-		DateTime submitTime = new DateTime();
-		DateTime firstSchedTime = day.withHourOfDay(hour).withMinuteOfHour(minutes).withSecondOfMinute(0);
-		
-		Schedule sched = scheduleManager.getSchedule(new Pair<Integer, String>(projectId, flowName));
-		FlowOptions flowOptions = null;
-		SlaOptions slaOptions = null;
-		if(sched != null) {
-			if(sched.getFlowOptions() != null) {
-				flowOptions = sched.getFlowOptions();
-			}
-			if(sched.getSlaOptions() != null) {
-				slaOptions = sched.getSlaOptions();
-			}
-		}
-		Schedule schedule = scheduleManager.scheduleFlow(projectId, projectName, flowName, "ready", firstSchedTime.getMillis(), timezone, thePeriod, submitTime.getMillis(), firstSchedTime.getMillis(), firstSchedTime.getMillis(), user.getUserId(), flowOptions, slaOptions);
-		logger.info("User '" + user.getUserId() + "' has scheduled " + "[" + projectName + flowName +  " (" + projectId +")" + "].");
-		projectManager.postProjectEvent(project, EventType.SCHEDULE, user.getUserId(), "Schedule " + schedule.toString() + " has been added.");
-		
-		ret.put("status", "success");
-		ret.put("message", projectName + "." + flowName + " scheduled.");
-	}
 
-	private void ajaxAdvSchedule(HttpServletRequest req, HashMap<String, Object> ret, User user) throws ServletException {
+	private void ajaxScheduleFlow(HttpServletRequest req, HashMap<String, Object> ret, User user) throws ServletException {
 		String projectName = getParam(req, "projectName");
-		String flowName = getParam(req, "flowName");
+		String flowName = getParam(req, "flow");
 		int projectId = getIntParam(req, "projectId");
 		
 		Project project = projectManager.getProject(projectId);
@@ -488,10 +392,10 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
 			ret.put("error", e.getMessage());
 		}
 		
-		Schedule sched = scheduleManager.getSchedule(new Pair<Integer, String>(projectId, flowName));
-		FlowOptions flowOptions = null;
+		Schedule sched = scheduleManager.getSchedule(projectId, flowName);
+		ExecutionOptions flowOptions = null;
 		try {
-			flowOptions = parseFlowOptions(req);
+			flowOptions = HttpRequestUtils.parseFlowOptions(req);
 		}
 		catch (Exception e) {
 			ret.put("error", e.getMessage());
@@ -510,54 +414,6 @@ public class ScheduleServlet extends LoginAbstractAzkabanServlet {
 		ret.put("message", projectName + "." + flowName + " scheduled.");
 	}
 	
-	private FlowOptions parseFlowOptions(HttpServletRequest req) throws ServletException {
-		FlowOptions flowOptions = new FlowOptions();
-		if (hasParam(req, "failureAction")) {
-			String option = getParam(req, "failureAction");
-			if (option.equals("finishCurrent") ) {
-				flowOptions.setFailureAction(FailureAction.FINISH_CURRENTLY_RUNNING);
-			}
-			else if (option.equals("cancelImmediately")) {
-				flowOptions.setFailureAction(FailureAction.CANCEL_ALL);
-			}
-			else if (option.equals("finishPossible")) {
-				flowOptions.setFailureAction(FailureAction.FINISH_ALL_POSSIBLE);
-			}
-		}
-
-		if (hasParam(req, "failureEmails")) {
-			String emails = getParam(req, "failureEmails");
-			String[] emailSplit = emails.split("\\s*,\\s*|\\s*;\\s*|\\s+");
-			flowOptions.setFailureEmails(Arrays.asList(emailSplit));
-		}
-		if (hasParam(req, "successEmails")) {
-			String emails = getParam(req, "successEmails");
-			String[] emailSplit = emails.split("\\s*,\\s*|\\s*;\\s*|\\s+");
-			flowOptions.setSuccessEmails(Arrays.asList(emailSplit));
-		}
-		if (hasParam(req, "notifyFailureFirst")) {
-			flowOptions.setNotifyOnFirstFailure(Boolean.parseBoolean(getParam(req, "notifyFailureFirst")));
-		}
-		if (hasParam(req, "notifyFailureLast")) {
-			flowOptions.setNotifyOnLastFailure(Boolean.parseBoolean(getParam(req, "notifyFailureLast")));
-		}
-		if (hasParam(req, "executingJobOption")) {
-			//String option = getParam(req, "jobOption");
-			// Not set yet
-		}
-		
-		Map<String, String> flowParamGroup = this.getParamGroup(req, "flowOverride");
-		flowOptions.setFlowOverride(flowParamGroup);
-		
-		if (hasParam(req, "disabledJobs")) {
-			String disable = getParam(req, "disabledJobs");
-			String[] disableSplit = disable.split("\\s*,\\s*|\\s*;\\s*|\\s+");
-			List<String> jobs = (List<String>) Arrays.asList(disableSplit);
-			flowOptions.setDisabledJobs(jobs.subList(1, jobs.size()));
-		}
-		return flowOptions;
-	}
-
 	private DateTime parseDateTime(String scheduleDate, String scheduleTime) {
 		// scheduleTime: 12,00,pm,PDT
 		String[] parts = scheduleTime.split(",", -1);
diff --git a/src/java/azkaban/webapp/servlet/velocity/executingflowpage.vm b/src/java/azkaban/webapp/servlet/velocity/executingflowpage.vm
index fea2b3a..de86051 100644
--- a/src/java/azkaban/webapp/servlet/velocity/executingflowpage.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/executingflowpage.vm
@@ -41,7 +41,7 @@
 			var timezone = "${timezone}";
 			var errorMessage = null;
 			var successMessage = null;;
-			
+			var projectId = "${projectId}";
 			var projectName = "${projectName}";
 			var flowId = "${flowid}";
 			var execId = "${execid}";
@@ -50,6 +50,8 @@
 	</head>
 	<body>
 #set($current_page="all")
+#set($show_schedule="false")
+
 #parse( "azkaban/webapp/servlet/velocity/nav.vm" )
 		<div class="messaging"><p id="messageClose">X</p><p id="message"></p></div>  
 		<div class="content">
diff --git a/src/java/azkaban/webapp/servlet/velocity/flowexecutionpanel.vm b/src/java/azkaban/webapp/servlet/velocity/flowexecutionpanel.vm
index 16f8df2..806465f 100644
--- a/src/java/azkaban/webapp/servlet/velocity/flowexecutionpanel.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/flowexecutionpanel.vm
@@ -123,14 +123,19 @@
 	</div>
 	
 	<div class="actions">
+		#if(!$show_schedule || $show_schedule == 'true') 
 		<a class="btn2" id="schedule-btn">Schedule</a>
+		#end
+
 		<a class="yes btn1" id="execute-btn">Execute</a>
 		<a class="no simplemodal-close btn3 closeExecPanel">Cancel</a>
 	</div>
 </div>
 </div>
 
+#if(!$show_schedule || $show_schedule == 'true') 
 #parse( "azkaban/webapp/servlet/velocity/schedulepanel.vm" )
+#end
 
 <div id="contextMenu">
 	
diff --git a/src/java/azkaban/webapp/servlet/velocity/projectpage.vm b/src/java/azkaban/webapp/servlet/velocity/projectpage.vm
index 32fc700..6c82bfb 100644
--- a/src/java/azkaban/webapp/servlet/velocity/projectpage.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/projectpage.vm
@@ -38,6 +38,8 @@
 			var errorMessage = null;
 			var successMessage = null;
 
+			var projectId = ${project.id};
+
 			var execAccess = ${exec};
 			var projectName = "$project.name";
 		</script>
diff --git a/src/java/azkaban/webapp/servlet/velocity/schedulepanel.vm b/src/java/azkaban/webapp/servlet/velocity/schedulepanel.vm
index ea46998..002b188 100644
--- a/src/java/azkaban/webapp/servlet/velocity/schedulepanel.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/schedulepanel.vm
@@ -1,4 +1,3 @@
-
 <script type="text/javascript" src="${context}/js/azkaban.date.utils.js"></script>  
 <script type="text/javascript" src="${context}/js/azkaban.schedule.panel.view.js"></script>
 
@@ -12,24 +11,24 @@
 				<dl>
 					<dt>Schedule Time</dt>
 					<dd>
-						<input id="advhour" type="text" size="2" value="12"/>
-						<input id="advminutes" type="text" size="2" value="00"/>
-						<select id="advam_pm">
+						<input id="hour" type="text" size="2" value="12"/>
+						<input id="minutes" type="text" size="2" value="00"/>
+						<select id="am_pm">
 						  <option>pm</option>
 						  <option>am</option>
 						</select>
-						<select id="advtimezone">
+						<select id="timezone">
 						  <option>PDT</option>
 						  <option>UTC</option>
 						</select>
 					</dd>
-					<dt>Schedule Date</dt><dd><input type="text" id="advdatepicker" /></dd>
+					<dt>Schedule Date</dt><dd><input type="text" id="datepicker" /></dd>
 					<dt>Recurrence</dt>
 					<dd>
-						<input id="advis_recurring" type="checkbox" checked  />
+						<input id="is_recurring" type="checkbox" checked  />
 						<span>repeat every</span>
-					 	<input id="advperiod" type="text" size="2" value="1"/>
-						<select id="advperiod_units">
+					 	<input id="period" type="text" size="2" value="1"/>
+						<select id="period_units">
 						  <option value="d">Days</option>
 						  <option value="h">Hours</option>
 						  <option value="m">Minutes</option>
diff --git a/src/web/js/azkaban.flow.execute.view.js b/src/web/js/azkaban.flow.execute.view.js
index a59714c..82fc060 100644
--- a/src/web/js/azkaban.flow.execute.view.js
+++ b/src/web/js/azkaban.flow.execute.view.js
@@ -34,6 +34,62 @@ azkaban.FlowExecuteDialogView= Backbone.View.extend({
   },
   render: function() {
   },
+  getExecutionOptionData: function() {
+	var failureAction = $('#failureAction').val();
+	var failureEmails = $('#failureEmails').val();
+	var successEmails = $('#successEmails').val();
+	var notifyFailureFirst = $('#notifyFailureFirst').is(':checked');
+	var notifyFailureLast = $('#notifyFailureLast').is(':checked');
+
+	var flowOverride = {};
+	var editRows = $(".editRow");
+	for (var i = 0; i < editRows.length; ++i) {
+		var row = editRows[i];
+		var td = $(row).find('td');
+		var key = $(td[0]).text();
+		var val = $(td[1]).text();
+		
+		if (key && key.length > 0) {
+			flowOverride[key] = val;
+		}
+	}
+	
+	var disabled = "";
+	var disabledMap = this.model.get('disabled');
+	for (var dis in disabledMap) {
+		if (disabledMap[dis]) {
+			disabled += dis + ",";
+		}
+	}
+	
+	var executingData = {
+		projectId: projectId,
+		project: this.projectName,
+		ajax: "executeFlow",
+		flow: this.flowId,
+		disabled: disabled,
+		failureAction: failureAction,
+		failureEmails: failureEmails,
+		successEmails: successEmails,
+		notifyFailureFirst: notifyFailureFirst,
+		notifyFailureLast: notifyFailureLast,
+		flowOverride: flowOverride
+	};
+	
+	// Set concurrency option, default is skip
+
+	var concurrentOption = $('input[name=concurrent]:checked').val();
+	executingData.concurrentOption = concurrentOption;
+	if (concurrentOption == "pipeline") {
+		var pipelineLevel = $("#pipelineLevel").val();
+		executingData.pipelineLevel = pipelineLevel;
+	}
+	else if (concurrentOption == "queue") {
+		executingData.queueLevel = $("#queueLevel").val();
+	}
+	
+	return executingData;
+  },
   changeFlowInfo: function() {
   	var successEmails = this.model.get("successEmails");
   	var failureEmails = this.model.get("failureEmails");
@@ -176,58 +232,7 @@ azkaban.FlowExecuteDialogView= Backbone.View.extend({
   },
   handleExecuteFlow: function(evt) {
   	  	var executeURL = contextURL + "/executor";
-	  	var failureAction = $('#failureAction').val();
-	  	var failureEmails = $('#failureEmails').val();
-	  	var successEmails = $('#successEmails').val();
-	  	var notifyFailureFirst = $('#notifyFailureFirst').is(':checked');
-	  	var notifyFailureLast = $('#notifyFailureLast').is(':checked');
-
-	  	var flowOverride = {};
-	  	var editRows = $(".editRow");
-		for (var i = 0; i < editRows.length; ++i) {
-			var row = editRows[i];
-			var td = $(row).find('td');
-			var key = $(td[0]).text();
-			var val = $(td[1]).text();
-			
-			if (key && key.length > 0) {
-				flowOverride[key] = val;
-			}
-		}
-	  	
-	  	var disabled = "";
-	  	var disabledMap = this.model.get('disabled');
-	  	for (var dis in disabledMap) {
-	  		if (disabledMap[dis]) {
-	  			disabled += dis + ",";
-	  		}
-	  	}
-	  	
-	  	var executingData = {
-	  		project: this.projectName,
-	  		ajax: "executeFlow",
-	  		flow: this.flowId,
-	  		disabled: disabled,
-	  		failureAction: failureAction,
-	  		failureEmails: failureEmails,
-	  		successEmails: successEmails,
-	  		notifyFailureFirst: notifyFailureFirst,
-	  		notifyFailureLast: notifyFailureLast,
-	  		flowOverride: flowOverride
-	  	};
-	  	
-	    // Set concurrency option, default is skip
-
-	  	var concurrentOption = $('input[name=concurrent]:checked').val();
-		executingData.concurrentOption = concurrentOption;
-	  	if (concurrentOption == "pipeline") {
-	  		var pipelineLevel = $("#pipelineLevel").val();
-	  		executingData.pipelineLevel = pipelineLevel;
-	  	}
-	  	else if (concurrentOption == "queue") {
-	  		executingData.queueLevel = $("#queueLevel").val();
-	  	}
-	  	
+		var executingData = this.getExecutionOptionData();
 		executeFlow(executingData);
   }
 });
diff --git a/src/web/js/azkaban.schedule.panel.view.js b/src/web/js/azkaban.schedule.panel.view.js
index 4fc10ad..e445a5e 100644
--- a/src/web/js/azkaban.schedule.panel.view.js
+++ b/src/web/js/azkaban.schedule.panel.view.js
@@ -19,13 +19,14 @@ $.namespace('azkaban');
 var schedulePanelView;
 azkaban.SchedulePanelView= Backbone.View.extend({
   events : {
-  	"click .closeSchedule": "hideSchedulePanel"
+  	"click .closeSchedule": "hideSchedulePanel",
+  	"click #schedule-button": "scheduleFlow"
   },
   initialize : function(settings) {
- 	$("#advdatepicker").css("backgroundColor", "transparent");
-	$( "#advdatepicker" ).datepicker();
-	$( "#advdatepicker" ).datepicker('setDate', new Date());
-	$( "#advdatepicker" ).datepicker("hide");
+ 	$("#datepicker").css("backgroundColor", "transparent");
+	$( "#datepicker" ).datepicker();
+	$( "#datepicker" ).datepicker('setDate', new Date());
+	$( "#datepicker" ).datepicker("hide");
   },
   render: function() {
   },
@@ -36,6 +37,50 @@ azkaban.SchedulePanelView= Backbone.View.extend({
   hideSchedulePanel: function() {
   	$('#scheduleModalBackground').hide();
   	$('#schedule-panel').hide();
+  },
+  scheduleFlow: function() {
+	var hourVal = $('#hour').val();
+	var minutesVal = $('#minutes').val();
+	var ampmVal = $('#am_pm').val();
+	var timezoneVal = $('#timezone').val();
+	var dateVal = $('#datepicker').val();
+	var is_recurringVal = $('#is_recurring').val();
+	var periodVal = $('#period').val();
+	var periodUnits = $('#period_units').val();
+  
+  	var scheduleURL = contextURL + "/schedule"
+  	
+  	var scheduleData = flowExecuteDialogView.getExecutionOptionData();
+  	 console.log("Creating schedule for "+projectName+"."+scheduleData.flow);
+	var scheduleTime = $('#hour').val() + "," + $('#minutes').val() + "," + $('#am_pm').val() + "," + $('#timezone').val();
+	var scheduleDate = $('#datepicker').val();
+	var is_recurring = $('#is_recurring').val();
+	var period = $('#period').val() + $('#period_units').val();
+	
+	scheduleData.ajax = "scheduleFlow";
+	scheduleData.projectName = projectName;
+	scheduleData.scheduleTime = scheduleTime;
+	scheduleData.scheduleDate = scheduleDate;
+	scheduleData.is_recurring = is_recurring;
+	scheduleData.period = period;
+
+	$.post(
+			scheduleURL,
+			scheduleData,
+			function(data) {
+				if (data.error) {
+					messageDialogView.show("Error Scheduling Flow", data.message);
+				}
+				else {
+					messageDialogView.show("Flow Scheduled", data.message,
+					function() {
+						window.location.href = scheduleURL;
+					}
+					);
+				}
+			},
+			"json"
+	);
   }
 });
 
diff --git a/unit/java/azkaban/scheduler/JdbcScheduleLoaderTest.java b/unit/java/azkaban/scheduler/JdbcScheduleLoaderTest.java
index 4a9491c..91a5e6e 100644
--- a/unit/java/azkaban/scheduler/JdbcScheduleLoaderTest.java
+++ b/unit/java/azkaban/scheduler/JdbcScheduleLoaderTest.java
@@ -26,13 +26,11 @@ import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-
-import azkaban.executor.ExecutableFlow.FailureAction;
-import azkaban.scheduler.Schedule.FlowOptions;
-import azkaban.scheduler.Schedule.SlaOptions;
+import azkaban.executor.ExecutionOptions;
 import azkaban.sla.SLA.SlaAction;
 import azkaban.sla.SLA.SlaRule;
 import azkaban.sla.SLA.SlaSetting;
+import azkaban.sla.SlaOptions;
 import azkaban.utils.DataSourceUtils;
 import azkaban.utils.Props;
 
@@ -131,7 +129,7 @@ public class JdbcScheduleLoaderTest {
 		set1.setDuration(Schedule.parsePeriodString("1h"));
 		set1.setRule(SlaRule.FINISH);
 		slaSets.add(set1);
-		FlowOptions flowOptions = new FlowOptions();
+		ExecutionOptions flowOptions = new ExecutionOptions();
 		flowOptions.setFailureEmails(emails);
 		flowOptions.setDisabledJobs(disabledJobs);
 		SlaOptions slaOptions = new SlaOptions();
@@ -159,7 +157,7 @@ public class JdbcScheduleLoaderTest {
 		Assert.assertEquals("America/Los_Angeles", sched.getTimezone().getID());
 		Assert.assertEquals(44444, sched.getSubmitTime());
 		Assert.assertEquals("1d", Schedule.createPeriodString(sched.getPeriod()));
-		FlowOptions fOpt = sched.getFlowOptions();
+		ExecutionOptions fOpt = sched.getExecutionOptions();
 		SlaOptions sOpt = sched.getSlaOptions();
 		Assert.assertEquals(SlaAction.EMAIL, sOpt.getSettings().get(0).getActions().get(0));
 		Assert.assertEquals("", sOpt.getSettings().get(0).getId());
@@ -168,8 +166,8 @@ public class JdbcScheduleLoaderTest {
 		Assert.assertEquals(2, fOpt.getFailureEmails().size());
 		Assert.assertEquals(null, fOpt.getSuccessEmails());
 		Assert.assertEquals(2, fOpt.getDisabledJobs().size());
-		Assert.assertEquals(FailureAction.FINISH_CURRENTLY_RUNNING, fOpt.getFailureAction());
-		Assert.assertEquals(null, fOpt.getFlowOverride());
+		Assert.assertEquals(ExecutionOptions.FailureAction.FINISH_CURRENTLY_RUNNING, fOpt.getFailureAction());
+		Assert.assertEquals(null, fOpt.getFlowParameters());
 	}
 	
 	@Test
@@ -196,7 +194,7 @@ public class JdbcScheduleLoaderTest {
 		set1.setDuration(Schedule.parsePeriodString("1h"));
 		set1.setRule(SlaRule.FINISH);
 		slaSets.add(set1);
-		FlowOptions flowOptions = new FlowOptions();
+		ExecutionOptions flowOptions = new ExecutionOptions();
 		flowOptions.setFailureEmails(emails);
 		flowOptions.setDisabledJobs(disabledJobs);
 		SlaOptions slaOptions = new SlaOptions();
@@ -257,7 +255,7 @@ public class JdbcScheduleLoaderTest {
 			set1.setDuration(Schedule.parsePeriodString("1h"));
 			set1.setRule(SlaRule.FINISH);
 			slaSets.add(set1);
-			FlowOptions flowOptions = new FlowOptions();
+			ExecutionOptions flowOptions = new ExecutionOptions();
 			flowOptions.setFailureEmails(emails);
 			flowOptions.setDisabledJobs(disabledJobs);
 			SlaOptions slaOptions = new SlaOptions();
diff --git a/unit/java/azkaban/test/execapp/event/LocalFlowWatcherTest.java b/unit/java/azkaban/test/execapp/event/LocalFlowWatcherTest.java
index 452be05..2bdfc7a 100644
--- a/unit/java/azkaban/test/execapp/event/LocalFlowWatcherTest.java
+++ b/unit/java/azkaban/test/execapp/event/LocalFlowWatcherTest.java
@@ -15,6 +15,7 @@ import azkaban.execapp.event.FlowWatcher;
 import azkaban.execapp.event.LocalFlowWatcher;
 import azkaban.executor.ExecutableFlow;
 import azkaban.executor.ExecutableNode;
+import azkaban.executor.ExecutionOptions;
 import azkaban.executor.ExecutorLoader;
 import azkaban.executor.Status;
 import azkaban.flow.Flow;
@@ -205,9 +206,10 @@ public class LocalFlowWatcherTest {
 	private FlowRunner createFlowRunner(File workingDir, ExecutorLoader loader, EventCollectorListener eventCollector, String flowName, int execId, FlowWatcher watcher, Integer pipeline) throws Exception {
 		File testDir = new File("unit/executions/exectest1");
 		ExecutableFlow exFlow = prepareExecDir(workingDir, testDir, flowName, execId);
+		ExecutionOptions option = exFlow.getExecutionOptions();
 		if (watcher != null) {
-			exFlow.setPipelineLevel(pipeline);
-			exFlow.setPipelineExecutionId(watcher.getExecId());
+			option.setPipelineLevel(pipeline);
+			option.setPipelineExecutionId(watcher.getExecId());
 		}
 		//MockProjectLoader projectLoader = new MockProjectLoader(new File(exFlow.getExecutionPath()));
 		
diff --git a/unit/java/azkaban/test/execapp/event/RemoteFlowWatcherTest.java b/unit/java/azkaban/test/execapp/event/RemoteFlowWatcherTest.java
index 1fddfc7..edf520c 100644
--- a/unit/java/azkaban/test/execapp/event/RemoteFlowWatcherTest.java
+++ b/unit/java/azkaban/test/execapp/event/RemoteFlowWatcherTest.java
@@ -15,6 +15,7 @@ import azkaban.execapp.event.FlowWatcher;
 import azkaban.execapp.event.RemoteFlowWatcher;
 import azkaban.executor.ExecutableFlow;
 import azkaban.executor.ExecutableNode;
+import azkaban.executor.ExecutionOptions;
 import azkaban.executor.ExecutorLoader;
 import azkaban.executor.Status;
 import azkaban.flow.Flow;
@@ -205,9 +206,10 @@ public class RemoteFlowWatcherTest {
 	private FlowRunner createFlowRunner(File workingDir, ExecutorLoader loader, EventCollectorListener eventCollector, String flowName, int execId, FlowWatcher watcher, Integer pipeline) throws Exception {
 		File testDir = new File("unit/executions/exectest1");
 		ExecutableFlow exFlow = prepareExecDir(workingDir, testDir, flowName, execId);
+		ExecutionOptions options = exFlow.getExecutionOptions();
 		if (watcher != null) {
-			exFlow.setPipelineLevel(pipeline);
-			exFlow.setPipelineExecutionId(watcher.getExecId());
+			options.setPipelineLevel(pipeline);
+			options.setPipelineExecutionId(watcher.getExecId());
 		}
 		//MockProjectLoader projectLoader = new MockProjectLoader(new File(exFlow.getExecutionPath()));
 		
diff --git a/unit/java/azkaban/test/execapp/FlowRunnerTest.java b/unit/java/azkaban/test/execapp/FlowRunnerTest.java
index 54dc49c..9cdca2f 100644
--- a/unit/java/azkaban/test/execapp/FlowRunnerTest.java
+++ b/unit/java/azkaban/test/execapp/FlowRunnerTest.java
@@ -16,7 +16,7 @@ import azkaban.execapp.event.Event;
 import azkaban.execapp.event.Event.Type;
 import azkaban.executor.ExecutableFlow;
 import azkaban.executor.ExecutableNode;
-import azkaban.executor.ExecutableFlow.FailureAction;
+import azkaban.executor.ExecutionOptions.FailureAction;
 import azkaban.executor.ExecutorLoader;
 import azkaban.executor.Status;
 
@@ -79,7 +79,6 @@ public class FlowRunnerTest {
 		testStatus(exFlow, "job6", Status.SUCCEEDED);
 		testStatus(exFlow, "job7", Status.SUCCEEDED);
 		testStatus(exFlow, "job8", Status.SUCCEEDED);
-		testStatus(exFlow, "job9", Status.SUCCEEDED);
 		testStatus(exFlow, "job10", Status.SUCCEEDED);
 		
 		try {
@@ -125,7 +124,6 @@ public class FlowRunnerTest {
 		testStatus(exFlow, "job6", Status.SKIPPED);
 		testStatus(exFlow, "job7", Status.SUCCEEDED);
 		testStatus(exFlow, "job8", Status.SUCCEEDED);
-		testStatus(exFlow, "job9", Status.SUCCEEDED);
 		testStatus(exFlow, "job10", Status.SKIPPED);
 		
 		try {
@@ -181,7 +179,8 @@ public class FlowRunnerTest {
 		eventCollector.setEventFilterOut(Event.Type.JOB_FINISHED, Event.Type.JOB_STARTED, Event.Type.JOB_STATUS_CHANGED);
 		File testDir = new File("unit/executions/exectest1");
 		ExecutableFlow flow = prepareExecDir(testDir, "exec2", 1);
-		flow.setFailureAction(FailureAction.CANCEL_ALL);
+		flow.getExecutionOptions().setFailureAction(FailureAction.CANCEL_ALL);
+		
 		
 		FlowRunner runner = createFlowRunner(flow, loader, eventCollector);
 		
@@ -220,8 +219,7 @@ public class FlowRunnerTest {
 		eventCollector.setEventFilterOut(Event.Type.JOB_FINISHED, Event.Type.JOB_STARTED, Event.Type.JOB_STATUS_CHANGED);
 		File testDir = new File("unit/executions/exectest1");
 		ExecutableFlow flow = prepareExecDir(testDir, "exec3", 1);
-		flow.setFailureAction(FailureAction.FINISH_ALL_POSSIBLE);
-		
+		flow.getExecutionOptions().setFailureAction(FailureAction.FINISH_ALL_POSSIBLE);
 		FlowRunner runner = createFlowRunner(flow, loader, eventCollector);
 		
 		runner.run();
@@ -288,7 +286,6 @@ public class FlowRunnerTest {
 		testStatus(exFlow, "job5", Status.KILLED);
 		testStatus(exFlow, "job7", Status.KILLED);
 		testStatus(exFlow, "job8", Status.KILLED);
-		testStatus(exFlow, "job9", Status.KILLED);
 		testStatus(exFlow, "job10", Status.KILLED);
 		testStatus(exFlow, "job3", Status.FAILED);
 		testStatus(exFlow, "job4", Status.FAILED);
diff --git a/unit/java/azkaban/test/executor/JdbcExecutorLoaderTest.java b/unit/java/azkaban/test/executor/JdbcExecutorLoaderTest.java
index 56ffcc4..1fcd9ee 100644
--- a/unit/java/azkaban/test/executor/JdbcExecutorLoaderTest.java
+++ b/unit/java/azkaban/test/executor/JdbcExecutorLoaderTest.java
@@ -185,7 +185,7 @@ public class JdbcExecutorLoaderTest {
 		Assert.assertEquals(flow.getFlowId(), fetchFlow.getFlowId());
 		Assert.assertEquals(flow.getProjectId(), fetchFlow.getProjectId());
 		Assert.assertEquals(flow.getVersion(), fetchFlow.getVersion());
-		Assert.assertEquals(flow.getFailureAction(), fetchFlow.getFailureAction());
+		Assert.assertEquals(flow.getExecutionOptions().getFailureAction(), fetchFlow.getExecutionOptions().getFailureAction());
 		Assert.assertEquals(new HashSet<String>(flow.getEndNodes()), new HashSet<String>(fetchFlow.getEndNodes()));
 	}
 	
@@ -217,7 +217,7 @@ public class JdbcExecutorLoaderTest {
 		Assert.assertEquals(flow.getFlowId(), fetchFlow.getFlowId());
 		Assert.assertEquals(flow.getProjectId(), fetchFlow.getProjectId());
 		Assert.assertEquals(flow.getVersion(), fetchFlow.getVersion());
-		Assert.assertEquals(flow.getFailureAction(), fetchFlow.getFailureAction());
+		Assert.assertEquals(flow.getExecutionOptions().getFailureAction(), fetchFlow.getExecutionOptions().getFailureAction());
 		Assert.assertEquals(new HashSet<String>(flow.getEndNodes()), new HashSet<String>(fetchFlow.getEndNodes()));
 	}
 	
@@ -298,7 +298,7 @@ public class JdbcExecutorLoaderTest {
 		Assert.assertEquals(flow1.getFlowId(), flow1Result.getFlowId());
 		Assert.assertEquals(flow1.getProjectId(), flow1Result.getProjectId());
 		Assert.assertEquals(flow1.getVersion(), flow1Result.getVersion());
-		Assert.assertEquals(flow1.getFailureAction(), flow1Result.getFailureAction());
+		Assert.assertEquals(flow1.getExecutionOptions().getFailureAction(), flow1Result.getExecutionOptions().getFailureAction());
 		
 		ExecutableFlow flow1Result2 = activeFlows1.get(flow2.getExecutionId()).getSecond();
 		Assert.assertNotNull(flow1Result2);
@@ -310,7 +310,7 @@ public class JdbcExecutorLoaderTest {
 		Assert.assertEquals(flow2.getFlowId(), flow1Result2.getFlowId());
 		Assert.assertEquals(flow2.getProjectId(), flow1Result2.getProjectId());
 		Assert.assertEquals(flow2.getVersion(), flow1Result2.getVersion());
-		Assert.assertEquals(flow2.getFailureAction(), flow1Result2.getFailureAction());
+		Assert.assertEquals(flow2.getExecutionOptions().getFailureAction(), flow1Result2.getExecutionOptions().getFailureAction());
 		
 		loader.removeActiveExecutableReference(flow2.getExecutionId());
 		Map<Integer, Pair<ExecutionReference,ExecutableFlow>> activeFlows2 = loader.fetchActiveFlows();