azkaban-memoizeit

Fixed tests based on flow completion. Fixed problem with not

10/11/2012 12:34:46 AM

Details

diff --git a/src/java/azkaban/executor/ExecutableFlow.java b/src/java/azkaban/executor/ExecutableFlow.java
index 5591f87..c8e7080 100644
--- a/src/java/azkaban/executor/ExecutableFlow.java
+++ b/src/java/azkaban/executor/ExecutableFlow.java
@@ -34,7 +34,7 @@ public class ExecutableFlow {
 	private long endTime = -1;
 	
 	private int updateNumber = 0;
-	private Status flowStatus = Status.UNKNOWN;
+	private Status flowStatus = Status.READY;
 	private String submitUser;
 	private boolean submitted = false;
 	private boolean notifyOnFirstFailure = true;
diff --git a/src/java/azkaban/executor/FlowRunner.java b/src/java/azkaban/executor/FlowRunner.java
index a47f4a2..dd21207 100644
--- a/src/java/azkaban/executor/FlowRunner.java
+++ b/src/java/azkaban/executor/FlowRunner.java
@@ -375,23 +375,21 @@ public class FlowRunner extends EventHandler implements Runnable {
 		currentThread.interrupt();
 	}
 
-	private void handleSucceededJob(ExecutableNode node) {
+	private void queueNextJobs(ExecutableNode node) {
 		if (this.isCancelled()) {
 			return;
 		}
 
-		// Check killed case.
 		for (String dependent : node.getOutNodes()) {
 			ExecutableNode dependentNode = flow.getExecutableNode(dependent);
-			
-			// Check all dependencies
+
 			boolean ready = true;
 			for (String dependency : dependentNode.getInNodes()) {
 				ExecutableNode dependencyNode = flow.getExecutableNode(dependency);
 				Status depStatus = dependencyNode.getStatus();
 				if (depStatus == Status.FAILED || depStatus == Status.KILLED) {
 					// We trickle failures down the graph.
-					dependencyNode.setStatus(Status.KILLED);
+					dependentNode.setStatus(Status.KILLED);
 				}
 				else if (depStatus == Status.SUCCEEDED || depStatus == Status.SKIPPED) {
 					// We do nothing here. We proceed happily.
@@ -422,8 +420,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 				try {
 					runner = this.createJobRunner(dependentNode, previousOutput);
 				} catch (IOException e) {
-					logger.error("JobRunner creation failed due to "
-							+ e.getMessage());
+					logger.error("JobRunner creation failed due to " + e.getMessage());
 					dependentNode.setStatus(Status.FAILED);
 					handleFailedJob(dependentNode);
 					return;
@@ -433,8 +430,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 				if (paused) {
 					dependentNode.setStatus(Status.PAUSED);
 					pausedJobsToRun.add(runner);
-					logger.info("Flow is paused so adding "
-							+ dependentNode.getId() + " to paused list.");
+					logger.info("Flow is paused so adding " + dependentNode.getId() + " to paused list.");
 				} else {
 					logger.info("Adding " + dependentNode.getId() + " to run queue.");
 					jobsToRun.add(runner);
@@ -456,6 +452,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 		case FINISH_CURRENTLY_RUNNING:
 			logger.info("Failure Action: Finish up remaining running jobs.");
 			flow.setStatus(Status.FAILED_FINISHING);
+
 			runningJobs.clear();
 			executorService.shutdown();
 			
@@ -482,6 +479,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 		default:
 			logger.info("Failure Action: Finishing accessible jobs.");
 			flow.setStatus(Status.FAILED_FINISHING);
+			queueNextJobs(node);
 		}
 
 		runningJobs.remove(node.getId());
@@ -499,8 +497,6 @@ public class FlowRunner extends EventHandler implements Runnable {
 			JobRunner runner = (JobRunner) event.getRunner();
 			ExecutableNode node = runner.getNode();
 			String jobID = node.getId();
-			System.out.println("Event " + jobID + " "
-					+ event.getType().toString());
 
 			// On Job success, we add the output props and then set up the next
 			// run.
@@ -532,12 +528,12 @@ public class FlowRunner extends EventHandler implements Runnable {
 				jobsFinished.add(jobID);
 				Props props = runner.getOutputProps();
 				outputProps.put(jobID, props);
-				flowRunner.handleSucceededJob(runner.getNode());
+				flowRunner.queueNextJobs(runner.getNode());
 			}
 
 			flowRunner.commitFlow();
 			if (runningJobs.isEmpty()) {
-				System.out.println("There are no more running jobs.");
+				logger.info("There are no more running jobs.");
 				flowRunner.interrupt();
 			}
 		}
diff --git a/src/java/azkaban/executor/JobRunner.java b/src/java/azkaban/executor/JobRunner.java
index 1e454c1..6af4d3d 100644
--- a/src/java/azkaban/executor/JobRunner.java
+++ b/src/java/azkaban/executor/JobRunner.java
@@ -156,8 +156,9 @@ public class JobRunner extends EventHandler implements Runnable {
 
 		// will just interrupt, I guess, until the code is finished.
 		this.notifyAll();
-
-		node.setStatus(Status.KILLED);
+		if (node.getStatus() != Status.FAILED) {
+			node.setStatus(Status.KILLED);
+		}
 	}
 
 	public Status getStatus() {
diff --git a/src/java/azkaban/webapp/servlet/LoginAbstractAzkabanServlet.java b/src/java/azkaban/webapp/servlet/LoginAbstractAzkabanServlet.java
index 5b31f73..89486ec 100644
--- a/src/java/azkaban/webapp/servlet/LoginAbstractAzkabanServlet.java
+++ b/src/java/azkaban/webapp/servlet/LoginAbstractAzkabanServlet.java
@@ -28,7 +28,7 @@ public abstract class LoginAbstractAzkabanServlet extends
 
 	private static final Logger logger = Logger
 			.getLogger(LoginAbstractAzkabanServlet.class.getName());
-	private static final String SESSION_ID_NAME = "azkaban.session.id";
+	private static final String SESSION_ID_NAME = "azkaban.browser.session.id";
 
 	@Override
 	protected void doGet(HttpServletRequest req, HttpServletResponse resp)
diff --git a/src/java/azkaban/webapp/servlet/velocity/executionspage.vm b/src/java/azkaban/webapp/servlet/velocity/executionspage.vm
index 2c04c8b..8994fad 100644
--- a/src/java/azkaban/webapp/servlet/velocity/executionspage.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/executionspage.vm
@@ -13,8 +13,8 @@
 			var contextURL = "${context}";
 			var currentTime = ${currentTime};
 			var timezone = "${timezone}";
-			var errorMessage = ${error_message};
-			var successMessage = ${success_message};
+			var errorMessage = "${error_message}";
+			var successMessage = "${success_message}";
 		</script>
 	</head>
 	<body>
diff --git a/src/java/azkaban/webapp/servlet/velocity/historypage.vm b/src/java/azkaban/webapp/servlet/velocity/historypage.vm
index 1a94a39..3e90bbe 100644
--- a/src/java/azkaban/webapp/servlet/velocity/historypage.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/historypage.vm
@@ -13,8 +13,8 @@
 			var contextURL = "${context}";
 			var currentTime = ${currentTime};
 			var timezone = "${timezone}";
-			var errorMessage = ${error_message};
-			var successMessage = ${success_message};
+			var errorMessage = "${error_message}";
+			var successMessage = "${success_message}";
 		</script>
 	</head>
 	<body>
diff --git a/src/java/azkaban/webapp/servlet/velocity/index.vm b/src/java/azkaban/webapp/servlet/velocity/index.vm
index f7c3944..3346b5d 100644
--- a/src/java/azkaban/webapp/servlet/velocity/index.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/index.vm
@@ -13,8 +13,8 @@
 			var contextURL = "${context}";
 			var currentTime = ${currentTime};
 			var timezone = "${timezone}";
-			var errorMessage = ${error_message};
-			var successMessage = ${success_message};
+			var errorMessage = "${error_message}";
+			var successMessage = "${success_message}";
 		</script>
 	</head>
 	<body>
diff --git a/src/java/azkaban/webapp/servlet/velocity/joblogpage.vm b/src/java/azkaban/webapp/servlet/velocity/joblogpage.vm
index a0da7c7..05f8918 100644
--- a/src/java/azkaban/webapp/servlet/velocity/joblogpage.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/joblogpage.vm
@@ -14,8 +14,8 @@
 			var contextURL = "${context}";
 			var currentTime = ${currentTime};
 			var timezone = "${timezone}";
-			var errorMessage = ${error_message};
-			var successMessage = ${success_message};
+			var errorMessage = "${error_message}";
+			var successMessage = "${success_message}";
 			
 			var projectName = "${projectName}";
 			var flowName = "${flowid}";
diff --git a/src/java/azkaban/webapp/servlet/velocity/projectlogpage.vm b/src/java/azkaban/webapp/servlet/velocity/projectlogpage.vm
index 98dc455..67c0038 100644
--- a/src/java/azkaban/webapp/servlet/velocity/projectlogpage.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/projectlogpage.vm
@@ -15,8 +15,8 @@
 			var contextURL = "${context}";
 			var currentTime = ${currentTime};
 			var timezone = "${timezone}";
-			var errorMessage = ${error_message};
-			var successMessage = ${success_message};
+			var errorMessage = "${error_message}";
+			var successMessage = "${success_message}";
 			
 			var projectName = "${projectName}";
 
diff --git a/src/java/azkaban/webapp/servlet/velocity/scheduledflowpage.vm b/src/java/azkaban/webapp/servlet/velocity/scheduledflowpage.vm
index b0f33d3..a276b58 100644
--- a/src/java/azkaban/webapp/servlet/velocity/scheduledflowpage.vm
+++ b/src/java/azkaban/webapp/servlet/velocity/scheduledflowpage.vm
@@ -15,8 +15,8 @@
 			var contextURL = "${context}";
 			var currentTime = ${currentTime};
 			var timezone = "${timezone}";
-			var errorMessage = ${error_message};
-			var successMessage = ${success_message};
+			var errorMessage = "${error_message}";
+			var successMessage = "${success_message}";
 	
 		</script>
 	</head>
diff --git a/unit/executions/exectest1/exec3.flow b/unit/executions/exectest1/exec3.flow
new file mode 100644
index 0000000..4a137b0
--- /dev/null
+++ b/unit/executions/exectest1/exec3.flow
@@ -0,0 +1,154 @@
+{
+  "id" : "derived-member-data",
+  "success.email" : [],
+  "edges" : [ {
+    "source" : "job1",
+    "target" : "job2d"
+  }, {
+    "source" : "job1",
+    "target" : "job3"
+  },{
+    "source" : "job2d",
+    "target" : "job4"
+  }, {
+    "source" : "job2d",
+    "target" : "job5"
+  },{
+    "source" : "job4",
+    "target" : "job6"
+  },{
+    "source" : "job5",
+    "target" : "job6"
+  },{
+    "source" : "job6",
+    "target" : "job10"
+  },{
+    "source" : "job3",
+    "target" : "job7"
+  },{
+    "source" : "job3",
+    "target" : "job8"
+  },{
+    "source" : "job7",
+    "target" : "job9"
+  },
+  {
+    "source" : "job8",
+    "target" : "job9"
+  },
+  {
+    "source" : "job9",
+    "target" : "job10"
+  }
+   ],
+  "failure.email" : [],
+  "nodes" : [ {
+    "propSource" : "prop2.properties",
+    "id" : "job1",
+    "jobType" : "java",
+    "layout" : {
+      "level" : 0
+    },
+    "jobSource" : "job1.job",
+    "expectedRuntime" : 1
+  },
+  {
+    "propSource" : "prop2.properties",
+    "id" : "job2d",
+    "jobType" : "java",
+    "layout" : {
+      "level" : 0
+    },
+    "jobSource" : "job2d.job",
+    "expectedRuntime" : 1
+  },
+  {
+    "propSource" : "prop2.properties",
+    "id" : "job3",
+    "jobType" : "java",
+    "layout" : {
+      "level" : 0
+    },
+    "jobSource" : "job3.job",
+    "expectedRuntime" : 1
+  },
+  {
+    "propSource" : "prop2.properties",
+    "id" : "job4",
+    "jobType" : "java",
+    "layout" : {
+      "level" : 0
+    },
+    "jobSource" : "job4.job",
+    "expectedRuntime" : 1
+  },
+  {
+    "propSource" : "prop2.properties",
+    "id" : "job5",
+    "jobType" : "java",
+    "layout" : {
+      "level" : 0
+    },
+    "jobSource" : "job5.job",
+    "expectedRuntime" : 1
+  },
+  {
+    "propSource" : "prop2.properties",
+    "id" : "job6",
+    "jobType" : "java",
+    "layout" : {
+      "level" : 0
+    },
+    "jobSource" : "job6.job",
+    "expectedRuntime" : 1
+  },
+  {
+    "propSource" : "prop2.properties",
+    "id" : "job7",
+    "jobType" : "java",
+    "layout" : {
+      "level" : 0
+    },
+    "jobSource" : "job7.job",
+    "expectedRuntime" : 1
+  },
+  {
+    "propSource" : "prop2.properties",
+    "id" : "job8",
+    "jobType" : "java",
+    "layout" : {
+      "level" : 0
+    },
+    "jobSource" : "job8.job",
+    "expectedRuntime" : 1
+  },
+  {
+    "propSource" : "prop2.properties",
+    "id" : "job9",
+    "jobType" : "java",
+    "layout" : {
+      "level" : 0
+    },
+    "jobSource" : "job9.job",
+    "expectedRuntime" : 1
+  },
+  {
+    "propSource" : "prop2.properties",
+    "id" : "job10",
+    "jobType" : "java",
+    "layout" : {
+      "level" : 0
+    },
+    "jobSource" : "job10.job",
+    "expectedRuntime" : 1
+  }
+   ],
+  "layedout" : false,
+  "type" : "flow",
+  "props" : [ {
+    "inherits" : "prop1.properties",
+    "source" : "prop2.properties"
+  },{
+    "source" : "prop1.properties"
+  }]
+}
\ No newline at end of file
diff --git a/unit/java/azkaban/test/executor/FlowRunnerTest.java b/unit/java/azkaban/test/executor/FlowRunnerTest.java
index 87a593c..ab0970f 100644
--- a/unit/java/azkaban/test/executor/FlowRunnerTest.java
+++ b/unit/java/azkaban/test/executor/FlowRunnerTest.java
@@ -3,7 +3,6 @@ package azkaban.test.executor;
 import java.io.File;
 import java.io.IOException;
 import java.util.HashMap;
-import java.util.HashSet;
 
 import junit.framework.Assert;
 
@@ -59,7 +58,7 @@ public class FlowRunnerTest {
 		runner.addListener(eventCollector);
 		
 		Assert.assertTrue(!runner.isCancelled());
-		Assert.assertTrue(exFlow.getStatus() == Status.UNKNOWN);
+		Assert.assertTrue(exFlow.getStatus() == Status.READY);
 		runner.run();
 		Assert.assertTrue(exFlow.getStatus() == Status.SUCCEEDED);
 		Assert.assertTrue(runner.getJobsFinished().size() == exFlow.getExecutableNodes().size());
@@ -96,7 +95,7 @@ public class FlowRunnerTest {
 		runner.addListener(eventCollector);
 		
 		Assert.assertTrue(!runner.isCancelled());
-		Assert.assertTrue(exFlow.getStatus() == Status.UNKNOWN);
+		Assert.assertTrue(exFlow.getStatus() == Status.READY);
 		runner.run();
 		
 		Assert.assertTrue(exFlow.getStatus() == Status.FAILED);
@@ -133,7 +132,7 @@ public class FlowRunnerTest {
 		runner.addListener(eventCollector);
 		
 		Assert.assertTrue(!runner.isCancelled());
-		Assert.assertTrue(exFlow.getStatus() == Status.UNKNOWN);
+		Assert.assertTrue(exFlow.getStatus() == Status.READY);
 		runner.run();
 		
 		Assert.assertTrue("Expected flow " + Status.FAILED + " instead " + exFlow.getStatus(), exFlow.getStatus() == Status.FAILED);
@@ -159,6 +158,43 @@ public class FlowRunnerTest {
 		}
 	}
 	
+	@Test
+	public void exec1FailedFinishRest() throws Exception {
+		File testDir = new File("unit/executions/exectest1");
+		ExecutableFlow exFlow = prepareExecDir(testDir, "exec3");
+		exFlow.setFailureAction(FailureAction.FINISH_ALL_POSSIBLE);
+
+		EventCollectorListener eventCollector = new EventCollectorListener();
+		FlowRunner runner = new FlowRunner(exFlow);
+		runner.addListener(eventCollector);
+		
+		Assert.assertTrue(!runner.isCancelled());
+		Assert.assertTrue(exFlow.getStatus() == Status.READY);
+		runner.run();
+		
+		Assert.assertTrue("Expected flow " + Status.FAILED + " instead " + exFlow.getStatus(), exFlow.getStatus() == Status.FAILED);
+		
+		testStatus(exFlow, "job1", Status.SUCCEEDED);
+		testStatus(exFlow, "job2d", Status.FAILED);
+		testStatus(exFlow, "job3", Status.SUCCEEDED);
+		testStatus(exFlow, "job4", Status.KILLED);
+		testStatus(exFlow, "job5", Status.KILLED);
+		testStatus(exFlow, "job6", Status.KILLED);
+		testStatus(exFlow, "job7", Status.SUCCEEDED);
+		testStatus(exFlow, "job8", Status.SUCCEEDED);
+		testStatus(exFlow, "job9", Status.SUCCEEDED);
+		testStatus(exFlow, "job10", Status.KILLED);
+		
+		try {
+			eventCollector.checkEventExists(new Type[] {Type.FLOW_STARTED, Type.FLOW_FAILED_FINISHING, Type.FLOW_FINISHED});
+		}
+		catch (Exception e) {
+			System.out.println(e.getMessage());
+			eventCollector.writeAllEvents();
+			Assert.fail(e.getMessage());
+		}
+	}
+	
 	private void testStatus(ExecutableFlow flow, String name, Status status) {
 		ExecutableNode node = flow.getExecutableNode(name);
 		
@@ -171,6 +207,7 @@ public class FlowRunnerTest {
 		FileUtils.copyDirectory(execDir, workingDir);
 		
 		File jsonFlowFile = new File(workingDir, execName + ".flow");
+		@SuppressWarnings("unchecked")
 		HashMap<String, Object> flowObj = (HashMap<String, Object>) JSONUtils.parseJSONFromFile(jsonFlowFile);
 		
 		Flow flow = Flow.flowFromObject(flowObj);