azkaban-memoizeit
Changes
src/java/azkaban/executor/FlowRunner.java 22(+9 -13)
unit/executions/exectest1/exec3.flow 154(+154 -0)
Details
diff --git a/src/java/azkaban/executor/ExecutableFlow.java b/src/java/azkaban/executor/ExecutableFlow.java
index 5591f87..c8e7080 100644
--- a/src/java/azkaban/executor/ExecutableFlow.java
+++ b/src/java/azkaban/executor/ExecutableFlow.java
@@ -34,7 +34,7 @@ public class ExecutableFlow {
private long endTime = -1;
private int updateNumber = 0;
- private Status flowStatus = Status.UNKNOWN;
+ private Status flowStatus = Status.READY;
private String submitUser;
private boolean submitted = false;
private boolean notifyOnFirstFailure = true;
src/java/azkaban/executor/FlowRunner.java 22(+9 -13)
diff --git a/src/java/azkaban/executor/FlowRunner.java b/src/java/azkaban/executor/FlowRunner.java
index a47f4a2..dd21207 100644
--- a/src/java/azkaban/executor/FlowRunner.java
+++ b/src/java/azkaban/executor/FlowRunner.java
@@ -375,23 +375,21 @@ public class FlowRunner extends EventHandler implements Runnable {
currentThread.interrupt();
}
- private void handleSucceededJob(ExecutableNode node) {
+ private void queueNextJobs(ExecutableNode node) {
if (this.isCancelled()) {
return;
}
- // Check killed case.
for (String dependent : node.getOutNodes()) {
ExecutableNode dependentNode = flow.getExecutableNode(dependent);
-
- // Check all dependencies
+
boolean ready = true;
for (String dependency : dependentNode.getInNodes()) {
ExecutableNode dependencyNode = flow.getExecutableNode(dependency);
Status depStatus = dependencyNode.getStatus();
if (depStatus == Status.FAILED || depStatus == Status.KILLED) {
// We trickle failures down the graph.
- dependencyNode.setStatus(Status.KILLED);
+ dependentNode.setStatus(Status.KILLED);
}
else if (depStatus == Status.SUCCEEDED || depStatus == Status.SKIPPED) {
// We do nothing here. We proceed happily.
@@ -422,8 +420,7 @@ public class FlowRunner extends EventHandler implements Runnable {
try {
runner = this.createJobRunner(dependentNode, previousOutput);
} catch (IOException e) {
- logger.error("JobRunner creation failed due to "
- + e.getMessage());
+ logger.error("JobRunner creation failed due to " + e.getMessage());
dependentNode.setStatus(Status.FAILED);
handleFailedJob(dependentNode);
return;
@@ -433,8 +430,7 @@ public class FlowRunner extends EventHandler implements Runnable {
if (paused) {
dependentNode.setStatus(Status.PAUSED);
pausedJobsToRun.add(runner);
- logger.info("Flow is paused so adding "
- + dependentNode.getId() + " to paused list.");
+ logger.info("Flow is paused so adding " + dependentNode.getId() + " to paused list.");
} else {
logger.info("Adding " + dependentNode.getId() + " to run queue.");
jobsToRun.add(runner);
@@ -456,6 +452,7 @@ public class FlowRunner extends EventHandler implements Runnable {
case FINISH_CURRENTLY_RUNNING:
logger.info("Failure Action: Finish up remaining running jobs.");
flow.setStatus(Status.FAILED_FINISHING);
+
runningJobs.clear();
executorService.shutdown();
@@ -482,6 +479,7 @@ public class FlowRunner extends EventHandler implements Runnable {
default:
logger.info("Failure Action: Finishing accessible jobs.");
flow.setStatus(Status.FAILED_FINISHING);
+ queueNextJobs(node);
}
runningJobs.remove(node.getId());
@@ -499,8 +497,6 @@ public class FlowRunner extends EventHandler implements Runnable {
JobRunner runner = (JobRunner) event.getRunner();
ExecutableNode node = runner.getNode();
String jobID = node.getId();
- System.out.println("Event " + jobID + " "
- + event.getType().toString());
// On Job success, we add the output props and then set up the next
// run.
@@ -532,12 +528,12 @@ public class FlowRunner extends EventHandler implements Runnable {
jobsFinished.add(jobID);
Props props = runner.getOutputProps();
outputProps.put(jobID, props);
- flowRunner.handleSucceededJob(runner.getNode());
+ flowRunner.queueNextJobs(runner.getNode());
}
flowRunner.commitFlow();
if (runningJobs.isEmpty()) {
- System.out.println("There are no more running jobs.");
+ logger.info("There are no more running jobs.");
flowRunner.interrupt();
}
}
diff --git a/src/java/azkaban/executor/JobRunner.java b/src/java/azkaban/executor/JobRunner.java
index 1e454c1..6af4d3d 100644
--- a/src/java/azkaban/executor/JobRunner.java
+++ b/src/java/azkaban/executor/JobRunner.java
@@ -156,8 +156,9 @@ public class JobRunner extends EventHandler implements Runnable {
// will just interrupt, I guess, until the code is finished.
this.notifyAll();
-
- node.setStatus(Status.KILLED);
+ if (node.getStatus() != Status.FAILED) {
+ node.setStatus(Status.KILLED);
+ }
}
public Status getStatus() {
diff --git a/src/java/azkaban/webapp/servlet/LoginAbstractAzkabanServlet.java b/src/java/azkaban/webapp/servlet/LoginAbstractAzkabanServlet.java
index 5b31f73..89486ec 100644
--- a/src/java/azkaban/webapp/servlet/LoginAbstractAzkabanServlet.java
+++ b/src/java/azkaban/webapp/servlet/LoginAbstractAzkabanServlet.java
@@ -28,7 +28,7 @@ public abstract class LoginAbstractAzkabanServlet extends
private static final Logger logger = Logger
.getLogger(LoginAbstractAzkabanServlet.class.getName());
- private static final String SESSION_ID_NAME = "azkaban.session.id";
+ private static final String SESSION_ID_NAME = "azkaban.browser.session.id";
@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp)
diff --git a/src/java/azkaban/webapp/servlet/velocity/executionspage.vm b/src/java/azkaban/webapp/servlet/velocity/executionspage.vm
index 2c04c8b..8994fad 100644
--- a/src/java/azkaban/webapp/servlet/velocity/executionspage.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/executionspage.vm
@@ -13,8 +13,8 @@
var contextURL = "${context}";
var currentTime = ${currentTime};
var timezone = "${timezone}";
- var errorMessage = ${error_message};
- var successMessage = ${success_message};
+ var errorMessage = "${error_message}";
+ var successMessage = "${success_message}";
</script>
</head>
<body>
diff --git a/src/java/azkaban/webapp/servlet/velocity/historypage.vm b/src/java/azkaban/webapp/servlet/velocity/historypage.vm
index 1a94a39..3e90bbe 100644
--- a/src/java/azkaban/webapp/servlet/velocity/historypage.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/historypage.vm
@@ -13,8 +13,8 @@
var contextURL = "${context}";
var currentTime = ${currentTime};
var timezone = "${timezone}";
- var errorMessage = ${error_message};
- var successMessage = ${success_message};
+ var errorMessage = "${error_message}";
+ var successMessage = "${success_message}";
</script>
</head>
<body>
diff --git a/src/java/azkaban/webapp/servlet/velocity/index.vm b/src/java/azkaban/webapp/servlet/velocity/index.vm
index f7c3944..3346b5d 100644
--- a/src/java/azkaban/webapp/servlet/velocity/index.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/index.vm
@@ -13,8 +13,8 @@
var contextURL = "${context}";
var currentTime = ${currentTime};
var timezone = "${timezone}";
- var errorMessage = ${error_message};
- var successMessage = ${success_message};
+ var errorMessage = "${error_message}";
+ var successMessage = "${success_message}";
</script>
</head>
<body>
diff --git a/src/java/azkaban/webapp/servlet/velocity/joblogpage.vm b/src/java/azkaban/webapp/servlet/velocity/joblogpage.vm
index a0da7c7..05f8918 100644
--- a/src/java/azkaban/webapp/servlet/velocity/joblogpage.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/joblogpage.vm
@@ -14,8 +14,8 @@
var contextURL = "${context}";
var currentTime = ${currentTime};
var timezone = "${timezone}";
- var errorMessage = ${error_message};
- var successMessage = ${success_message};
+ var errorMessage = "${error_message}";
+ var successMessage = "${success_message}";
var projectName = "${projectName}";
var flowName = "${flowid}";
diff --git a/src/java/azkaban/webapp/servlet/velocity/projectlogpage.vm b/src/java/azkaban/webapp/servlet/velocity/projectlogpage.vm
index 98dc455..67c0038 100644
--- a/src/java/azkaban/webapp/servlet/velocity/projectlogpage.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/projectlogpage.vm
@@ -15,8 +15,8 @@
var contextURL = "${context}";
var currentTime = ${currentTime};
var timezone = "${timezone}";
- var errorMessage = ${error_message};
- var successMessage = ${success_message};
+ var errorMessage = "${error_message}";
+ var successMessage = "${success_message}";
var projectName = "${projectName}";
diff --git a/src/java/azkaban/webapp/servlet/velocity/scheduledflowpage.vm b/src/java/azkaban/webapp/servlet/velocity/scheduledflowpage.vm
index b0f33d3..a276b58 100644
--- a/src/java/azkaban/webapp/servlet/velocity/scheduledflowpage.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/scheduledflowpage.vm
@@ -15,8 +15,8 @@
var contextURL = "${context}";
var currentTime = ${currentTime};
var timezone = "${timezone}";
- var errorMessage = ${error_message};
- var successMessage = ${success_message};
+ var errorMessage = "${error_message}";
+ var successMessage = "${success_message}";
</script>
</head>
unit/executions/exectest1/exec3.flow 154(+154 -0)
diff --git a/unit/executions/exectest1/exec3.flow b/unit/executions/exectest1/exec3.flow
new file mode 100644
index 0000000..4a137b0
--- /dev/null
+++ b/unit/executions/exectest1/exec3.flow
@@ -0,0 +1,154 @@
+{
+ "id" : "derived-member-data",
+ "success.email" : [],
+ "edges" : [ {
+ "source" : "job1",
+ "target" : "job2d"
+ }, {
+ "source" : "job1",
+ "target" : "job3"
+ },{
+ "source" : "job2d",
+ "target" : "job4"
+ }, {
+ "source" : "job2d",
+ "target" : "job5"
+ },{
+ "source" : "job4",
+ "target" : "job6"
+ },{
+ "source" : "job5",
+ "target" : "job6"
+ },{
+ "source" : "job6",
+ "target" : "job10"
+ },{
+ "source" : "job3",
+ "target" : "job7"
+ },{
+ "source" : "job3",
+ "target" : "job8"
+ },{
+ "source" : "job7",
+ "target" : "job9"
+ },
+ {
+ "source" : "job8",
+ "target" : "job9"
+ },
+ {
+ "source" : "job9",
+ "target" : "job10"
+ }
+ ],
+ "failure.email" : [],
+ "nodes" : [ {
+ "propSource" : "prop2.properties",
+ "id" : "job1",
+ "jobType" : "java",
+ "layout" : {
+ "level" : 0
+ },
+ "jobSource" : "job1.job",
+ "expectedRuntime" : 1
+ },
+ {
+ "propSource" : "prop2.properties",
+ "id" : "job2d",
+ "jobType" : "java",
+ "layout" : {
+ "level" : 0
+ },
+ "jobSource" : "job2d.job",
+ "expectedRuntime" : 1
+ },
+ {
+ "propSource" : "prop2.properties",
+ "id" : "job3",
+ "jobType" : "java",
+ "layout" : {
+ "level" : 0
+ },
+ "jobSource" : "job3.job",
+ "expectedRuntime" : 1
+ },
+ {
+ "propSource" : "prop2.properties",
+ "id" : "job4",
+ "jobType" : "java",
+ "layout" : {
+ "level" : 0
+ },
+ "jobSource" : "job4.job",
+ "expectedRuntime" : 1
+ },
+ {
+ "propSource" : "prop2.properties",
+ "id" : "job5",
+ "jobType" : "java",
+ "layout" : {
+ "level" : 0
+ },
+ "jobSource" : "job5.job",
+ "expectedRuntime" : 1
+ },
+ {
+ "propSource" : "prop2.properties",
+ "id" : "job6",
+ "jobType" : "java",
+ "layout" : {
+ "level" : 0
+ },
+ "jobSource" : "job6.job",
+ "expectedRuntime" : 1
+ },
+ {
+ "propSource" : "prop2.properties",
+ "id" : "job7",
+ "jobType" : "java",
+ "layout" : {
+ "level" : 0
+ },
+ "jobSource" : "job7.job",
+ "expectedRuntime" : 1
+ },
+ {
+ "propSource" : "prop2.properties",
+ "id" : "job8",
+ "jobType" : "java",
+ "layout" : {
+ "level" : 0
+ },
+ "jobSource" : "job8.job",
+ "expectedRuntime" : 1
+ },
+ {
+ "propSource" : "prop2.properties",
+ "id" : "job9",
+ "jobType" : "java",
+ "layout" : {
+ "level" : 0
+ },
+ "jobSource" : "job9.job",
+ "expectedRuntime" : 1
+ },
+ {
+ "propSource" : "prop2.properties",
+ "id" : "job10",
+ "jobType" : "java",
+ "layout" : {
+ "level" : 0
+ },
+ "jobSource" : "job10.job",
+ "expectedRuntime" : 1
+ }
+ ],
+ "layedout" : false,
+ "type" : "flow",
+ "props" : [ {
+ "inherits" : "prop1.properties",
+ "source" : "prop2.properties"
+ },{
+ "source" : "prop1.properties"
+ }]
+}
\ No newline at end of file
diff --git a/unit/java/azkaban/test/executor/FlowRunnerTest.java b/unit/java/azkaban/test/executor/FlowRunnerTest.java
index 87a593c..ab0970f 100644
--- a/unit/java/azkaban/test/executor/FlowRunnerTest.java
+++ b/unit/java/azkaban/test/executor/FlowRunnerTest.java
@@ -3,7 +3,6 @@ package azkaban.test.executor;
import java.io.File;
import java.io.IOException;
import java.util.HashMap;
-import java.util.HashSet;
import junit.framework.Assert;
@@ -59,7 +58,7 @@ public class FlowRunnerTest {
runner.addListener(eventCollector);
Assert.assertTrue(!runner.isCancelled());
- Assert.assertTrue(exFlow.getStatus() == Status.UNKNOWN);
+ Assert.assertTrue(exFlow.getStatus() == Status.READY);
runner.run();
Assert.assertTrue(exFlow.getStatus() == Status.SUCCEEDED);
Assert.assertTrue(runner.getJobsFinished().size() == exFlow.getExecutableNodes().size());
@@ -96,7 +95,7 @@ public class FlowRunnerTest {
runner.addListener(eventCollector);
Assert.assertTrue(!runner.isCancelled());
- Assert.assertTrue(exFlow.getStatus() == Status.UNKNOWN);
+ Assert.assertTrue(exFlow.getStatus() == Status.READY);
runner.run();
Assert.assertTrue(exFlow.getStatus() == Status.FAILED);
@@ -133,7 +132,7 @@ public class FlowRunnerTest {
runner.addListener(eventCollector);
Assert.assertTrue(!runner.isCancelled());
- Assert.assertTrue(exFlow.getStatus() == Status.UNKNOWN);
+ Assert.assertTrue(exFlow.getStatus() == Status.READY);
runner.run();
Assert.assertTrue("Expected flow " + Status.FAILED + " instead " + exFlow.getStatus(), exFlow.getStatus() == Status.FAILED);
@@ -159,6 +158,43 @@ public class FlowRunnerTest {
}
}
+ @Test
+ public void exec1FailedFinishRest() throws Exception {
+ File testDir = new File("unit/executions/exectest1");
+ ExecutableFlow exFlow = prepareExecDir(testDir, "exec3");
+ exFlow.setFailureAction(FailureAction.FINISH_ALL_POSSIBLE);
+
+ EventCollectorListener eventCollector = new EventCollectorListener();
+ FlowRunner runner = new FlowRunner(exFlow);
+ runner.addListener(eventCollector);
+
+ Assert.assertTrue(!runner.isCancelled());
+ Assert.assertTrue(exFlow.getStatus() == Status.READY);
+ runner.run();
+
+ Assert.assertTrue("Expected flow " + Status.FAILED + " instead " + exFlow.getStatus(), exFlow.getStatus() == Status.FAILED);
+
+ testStatus(exFlow, "job1", Status.SUCCEEDED);
+ testStatus(exFlow, "job2d", Status.FAILED);
+ testStatus(exFlow, "job3", Status.SUCCEEDED);
+ testStatus(exFlow, "job4", Status.KILLED);
+ testStatus(exFlow, "job5", Status.KILLED);
+ testStatus(exFlow, "job6", Status.KILLED);
+ testStatus(exFlow, "job7", Status.SUCCEEDED);
+ testStatus(exFlow, "job8", Status.SUCCEEDED);
+ testStatus(exFlow, "job9", Status.SUCCEEDED);
+ testStatus(exFlow, "job10", Status.KILLED);
+
+ try {
+ eventCollector.checkEventExists(new Type[] {Type.FLOW_STARTED, Type.FLOW_FAILED_FINISHING, Type.FLOW_FINISHED});
+ }
+ catch (Exception e) {
+ System.out.println(e.getMessage());
+ eventCollector.writeAllEvents();
+ Assert.fail(e.getMessage());
+ }
+ }
+
private void testStatus(ExecutableFlow flow, String name, Status status) {
ExecutableNode node = flow.getExecutableNode(name);
@@ -171,6 +207,7 @@ public class FlowRunnerTest {
FileUtils.copyDirectory(execDir, workingDir);
File jsonFlowFile = new File(workingDir, execName + ".flow");
+ @SuppressWarnings("unchecked")
HashMap<String, Object> flowObj = (HashMap<String, Object>) JSONUtils.parseJSONFromFile(jsonFlowFile);
Flow flow = Flow.flowFromObject(flowObj);