azkaban-memoizeit

Fixing scheduler flow options.

4/9/2013 12:46:37 AM

Details

diff --git a/src/java/azkaban/execapp/FlowRunner.java b/src/java/azkaban/execapp/FlowRunner.java
index 2afed2a..ffe5add 100644
--- a/src/java/azkaban/execapp/FlowRunner.java
+++ b/src/java/azkaban/execapp/FlowRunner.java
@@ -296,6 +296,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 								Props outputProps = collectOutputProps(node);
 								node.setStatus(Status.QUEUED);
 								JobRunner runner = createJobRunner(node, outputProps);
+								logger.info("Submitting job " + node.getJobId() + " to run.");
 								try {
 									executorService.submit(runner);
 									jobRunners.put(node.getJobId(), runner);
@@ -305,10 +306,12 @@ public class FlowRunner extends EventHandler implements Runnable {
 								
 							} // If killed, then auto complete and KILL
 							else if (node.getStatus() == Status.KILLED) {
+								logger.info("Killing " + node.getJobId() + " due to prior errors.");
 								node.setStartTime(currentTime);
 								node.setEndTime(currentTime);
 							} // If disabled, then we auto skip
 							else if (node.getStatus() == Status.DISABLED) {
+								logger.info("Skipping disabled job " + node.getJobId() + ".");
 								node.setStartTime(currentTime);
 								node.setEndTime(currentTime);
 								node.setStatus(Status.SKIPPED);
diff --git a/src/java/azkaban/executor/ExecutorManager.java b/src/java/azkaban/executor/ExecutorManager.java
index 83aa3fe..f1c369b 100644
--- a/src/java/azkaban/executor/ExecutorManager.java
+++ b/src/java/azkaban/executor/ExecutorManager.java
@@ -351,30 +351,33 @@ public class ExecutorManager {
 
 			ExecutionOptions options = exflow.getExecutionOptions();
 			
-			// Disable jobs
-			for(String disabledId : options.getDisabledJobs()) {
-				ExecutableNode node = exflow.getExecutableNode(disabledId);
-				node.setStatus(Status.DISABLED);
-			}
-			
 			String message = "";
-			if (!running.isEmpty()) {
-				if (options.getConcurrentOption().equals("pipeline")) {
-					Collections.sort(running);
-					Integer runningExecId = running.get(running.size() - 1);
-					
-					options.setPipelineExecutionId(runningExecId);
-					message = "Flow " + flowId + " is already running with exec id " + runningExecId +". Pipelining level " + options.getPipelineLevel() + ". ";
-				}
-				else if (options.getConcurrentOption().equals("skip")) {
-					throw new ExecutorManagerException("Flow " + flowId + " is already running. Skipping execution.");
+			if (options != null) {
+				if (options.getDisabledJobs() != null) {
+					// Disable jobs
+					for(String disabledId : options.getDisabledJobs()) {
+						ExecutableNode node = exflow.getExecutableNode(disabledId);
+						node.setStatus(Status.DISABLED);
+					}
 				}
-				else {
-					// The settings is to run anyways.
-					message = "Flow " + flowId + " is already running with exec id " + StringUtils.join(running, ",") +". Will execute concurrently. ";
+
+				if (!running.isEmpty()) {
+					if (options.getConcurrentOption().equals("pipeline")) {
+						Collections.sort(running);
+						Integer runningExecId = running.get(running.size() - 1);
+						
+						options.setPipelineExecutionId(runningExecId);
+						message = "Flow " + flowId + " is already running with exec id " + runningExecId +". Pipelining level " + options.getPipelineLevel() + ". ";
+					}
+					else if (options.getConcurrentOption().equals("skip")) {
+						throw new ExecutorManagerException("Flow " + flowId + " is already running. Skipping execution.");
+					}
+					else {
+						// The settings is to run anyways.
+						message = "Flow " + flowId + " is already running with exec id " + StringUtils.join(running, ",") +". Will execute concurrently. ";
+					}
 				}
 			}
-			
 			// The exflow id is set by the loader. So it's unavailable until after this call.
 			executorLoader.uploadExecutableFlow(exflow);
 			
diff --git a/src/java/azkaban/scheduler/Schedule.java b/src/java/azkaban/scheduler/Schedule.java
index 2fd6074..3e84c86 100644
--- a/src/java/azkaban/scheduler/Schedule.java
+++ b/src/java/azkaban/scheduler/Schedule.java
@@ -344,6 +344,14 @@ public class Schedule{
 			ExecutionOptions execOptions = ExecutionOptions.createFromObject(schedObj.get("executionOptions"));
 			this.executionOptions = execOptions;
 		}
+		else if (schedObj.containsKey("flowOptions")){
+			ExecutionOptions execOptions = ExecutionOptions.createFromObject(schedObj.get("flowOptions"));
+			this.executionOptions = execOptions;
+		}
+		else {
+			this.executionOptions = new ExecutionOptions();
+		}
+
 		if (schedObj.containsKey("slaOptions")) {
 			SlaOptions slaOptions = SlaOptions.fromObject(schedObj.get("slaOptions"));
 			this.slaOptions = slaOptions;
diff --git a/src/web/css/azkaban.css b/src/web/css/azkaban.css
index a138e89..8fca496 100644
--- a/src/web/css/azkaban.css
+++ b/src/web/css/azkaban.css
@@ -2368,7 +2368,7 @@ span.sublabel {
 }
 
 #flow-status table td.PAUSED {
-	color: #FF6600;
+	color: #C82123;
 }
 
 #flow-status table td.FAILED_FINISHING {
@@ -2464,6 +2464,10 @@ td .status.FAILED {
 	background-color: #C82123;
 }
 
+td .status.PAUSED {
+	background-color: #C82123;
+}
+
 td .status.READY {
 	background-color: #CCC;
 }