azkaban-memoizeit
Details
diff --git a/src/java/azkaban/execapp/FlowRunner.java b/src/java/azkaban/execapp/FlowRunner.java
index 2afed2a..ffe5add 100644
--- a/src/java/azkaban/execapp/FlowRunner.java
+++ b/src/java/azkaban/execapp/FlowRunner.java
@@ -296,6 +296,7 @@ public class FlowRunner extends EventHandler implements Runnable {
Props outputProps = collectOutputProps(node);
node.setStatus(Status.QUEUED);
JobRunner runner = createJobRunner(node, outputProps);
+ logger.info("Submitting job " + node.getJobId() + " to run.");
try {
executorService.submit(runner);
jobRunners.put(node.getJobId(), runner);
@@ -305,10 +306,12 @@ public class FlowRunner extends EventHandler implements Runnable {
} // If killed, then auto complete and KILL
else if (node.getStatus() == Status.KILLED) {
+ logger.info("Killing " + node.getJobId() + " due to prior errors.");
node.setStartTime(currentTime);
node.setEndTime(currentTime);
} // If disabled, then we auto skip
else if (node.getStatus() == Status.DISABLED) {
+ logger.info("Skipping disabled job " + node.getJobId() + ".");
node.setStartTime(currentTime);
node.setEndTime(currentTime);
node.setStatus(Status.SKIPPED);
src/java/azkaban/executor/ExecutorManager.java 43(+23 -20)
diff --git a/src/java/azkaban/executor/ExecutorManager.java b/src/java/azkaban/executor/ExecutorManager.java
index 83aa3fe..f1c369b 100644
--- a/src/java/azkaban/executor/ExecutorManager.java
+++ b/src/java/azkaban/executor/ExecutorManager.java
@@ -351,30 +351,33 @@ public class ExecutorManager {
ExecutionOptions options = exflow.getExecutionOptions();
- // Disable jobs
- for(String disabledId : options.getDisabledJobs()) {
- ExecutableNode node = exflow.getExecutableNode(disabledId);
- node.setStatus(Status.DISABLED);
- }
-
String message = "";
- if (!running.isEmpty()) {
- if (options.getConcurrentOption().equals("pipeline")) {
- Collections.sort(running);
- Integer runningExecId = running.get(running.size() - 1);
-
- options.setPipelineExecutionId(runningExecId);
- message = "Flow " + flowId + " is already running with exec id " + runningExecId +". Pipelining level " + options.getPipelineLevel() + ". ";
- }
- else if (options.getConcurrentOption().equals("skip")) {
- throw new ExecutorManagerException("Flow " + flowId + " is already running. Skipping execution.");
+ if (options != null) {
+ if (options.getDisabledJobs() != null) {
+ // Disable jobs
+ for(String disabledId : options.getDisabledJobs()) {
+ ExecutableNode node = exflow.getExecutableNode(disabledId);
+ node.setStatus(Status.DISABLED);
+ }
}
- else {
- // The settings is to run anyways.
- message = "Flow " + flowId + " is already running with exec id " + StringUtils.join(running, ",") +". Will execute concurrently. ";
+
+ if (!running.isEmpty()) {
+ if (options.getConcurrentOption().equals("pipeline")) {
+ Collections.sort(running);
+ Integer runningExecId = running.get(running.size() - 1);
+
+ options.setPipelineExecutionId(runningExecId);
+ message = "Flow " + flowId + " is already running with exec id " + runningExecId +". Pipelining level " + options.getPipelineLevel() + ". ";
+ }
+ else if (options.getConcurrentOption().equals("skip")) {
+ throw new ExecutorManagerException("Flow " + flowId + " is already running. Skipping execution.");
+ }
+ else {
+ // The settings is to run anyways.
+ message = "Flow " + flowId + " is already running with exec id " + StringUtils.join(running, ",") +". Will execute concurrently. ";
+ }
}
}
-
// The exflow id is set by the loader. So it's unavailable until after this call.
executorLoader.uploadExecutableFlow(exflow);
diff --git a/src/java/azkaban/scheduler/Schedule.java b/src/java/azkaban/scheduler/Schedule.java
index 2fd6074..3e84c86 100644
--- a/src/java/azkaban/scheduler/Schedule.java
+++ b/src/java/azkaban/scheduler/Schedule.java
@@ -344,6 +344,14 @@ public class Schedule{
ExecutionOptions execOptions = ExecutionOptions.createFromObject(schedObj.get("executionOptions"));
this.executionOptions = execOptions;
}
+ else if (schedObj.containsKey("flowOptions")){
+ ExecutionOptions execOptions = ExecutionOptions.createFromObject(schedObj.get("flowOptions"));
+ this.executionOptions = execOptions;
+ }
+ else {
+ this.executionOptions = new ExecutionOptions();
+ }
+
if (schedObj.containsKey("slaOptions")) {
SlaOptions slaOptions = SlaOptions.fromObject(schedObj.get("slaOptions"));
this.slaOptions = slaOptions;
src/web/css/azkaban.css 6(+5 -1)
diff --git a/src/web/css/azkaban.css b/src/web/css/azkaban.css
index a138e89..8fca496 100644
--- a/src/web/css/azkaban.css
+++ b/src/web/css/azkaban.css
@@ -2368,7 +2368,7 @@ span.sublabel {
}
#flow-status table td.PAUSED {
- color: #FF6600;
+ color: #C82123;
}
#flow-status table td.FAILED_FINISHING {
@@ -2464,6 +2464,10 @@ td .status.FAILED {
background-color: #C82123;
}
+td .status.PAUSED {
+ background-color: #C82123;
+}
+
td .status.READY {
background-color: #CCC;
}