azkaban-memoizeit

Details

diff --git a/src/java/azkaban/execapp/FlowRunner.java b/src/java/azkaban/execapp/FlowRunner.java
index 491189b..5cb7188 100644
--- a/src/java/azkaban/execapp/FlowRunner.java
+++ b/src/java/azkaban/execapp/FlowRunner.java
@@ -334,11 +334,11 @@ public class FlowRunner extends EventHandler implements Runnable {
 				}
 				else {
 					if (!progressGraph(flow)) {
-						if (flow.isFlowFinished() || flowCancelled ) {
+						if (flow.isFlowFinished() ) {
 							flowFinished = true;
 							break;
 						}
-					
+						
 						try {
 							mainSyncObj.wait(CHECK_WAIT_MS);
 						} catch (InterruptedException e) {
@@ -348,27 +348,12 @@ public class FlowRunner extends EventHandler implements Runnable {
 			}
 		}
 		
-		if (flowCancelled) {
-			try {
-				logger.info("Flow was force cancelled cleaning up.");
-				for(JobRunner activeRunner : activeJobRunners) {
-					activeRunner.cancel();
-				}
-
-				flow.killNode(System.currentTimeMillis());
-			} catch (Exception e) {
-				logger.error(e);
-			}
-	
-			updateFlow();
-		}
-		
 		logger.info("Finishing up flow. Awaiting Termination");
 		executorService.shutdown();
 		
-		synchronized(mainSyncObj) {
-			finalizeFlow(flow);
-		}
+		finalizeFlow(flow);
+		updateFlow();
+		logger.info("Finished Flow");
 	}
 	
 	private boolean progressGraph(ExecutableFlowBase flow) throws IOException {
@@ -405,6 +390,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 	private void finalizeFlow(ExecutableFlowBase flow) {
 		String id = flow == this.flow ? "" : flow.getPrintableId() + " ";
 		
+		flow.setEndTime(System.currentTimeMillis());
 		switch(flow.getStatus()) {
 		case FAILED_FINISHING:
 			logger.info("Setting flow " + id + "status to Failed.");
@@ -752,7 +738,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 					
 					String id = node.getPrintableId(":");
 					logger.info("Job Finished " + id + " with status " + node.getStatus());
-					if (node.getOutputProps() != null) {
+					if (node.getOutputProps() != null && node.getOutputProps().size() > 0) {
 						logger.info("Job " + id + " had output props.");
 					}
 
diff --git a/src/java/azkaban/execapp/JobRunner.java b/src/java/azkaban/execapp/JobRunner.java
index cde9b6e..7013e4a 100644
--- a/src/java/azkaban/execapp/JobRunner.java
+++ b/src/java/azkaban/execapp/JobRunner.java
@@ -355,7 +355,7 @@ public class JobRunner extends EventHandler implements Runnable {
 			
 			// If it's an embedded flow, we'll add the nested flow info to the job conf
 			if (node.getExecutableFlow() != node.getParentFlow()) {
-				String subFlow = node.getExecutableFlow().getId() + ":" + node.getPrintableId(":");
+				String subFlow = node.getPrintableId(":");
 				props.put(CommonJobProperties.NESTED_FLOW_PATH, subFlow);
 			}
 			
diff --git a/unit/executions/embedded2/innerFlow2.job b/unit/executions/embedded2/innerFlow2.job
new file mode 100644
index 0000000..35cbccb
--- /dev/null
+++ b/unit/executions/embedded2/innerFlow2.job
@@ -0,0 +1,2 @@
+type=test
+dependencies=innerJobA
\ No newline at end of file
diff --git a/unit/executions/embedded2/jobb.job b/unit/executions/embedded2/jobb.job
index 8844d8c..4531028 100644
--- a/unit/executions/embedded2/jobb.job
+++ b/unit/executions/embedded2/jobb.job
@@ -1,3 +1,5 @@
 type=flow
 flow.name=innerFlow
 dependencies=joba
+testprops=moo
+output.override=jobb
\ No newline at end of file
diff --git a/unit/executions/embedded2/jobc.job b/unit/executions/embedded2/jobc.job
index 8844d8c..2bfc5ff 100644
--- a/unit/executions/embedded2/jobc.job
+++ b/unit/executions/embedded2/jobc.job
@@ -1,3 +1,2 @@
-type=flow
-flow.name=innerFlow
+type=test
 dependencies=joba
diff --git a/unit/executions/embedded2/jobd.job b/unit/executions/embedded2/jobd.job
index 8844d8c..e80f82b 100644
--- a/unit/executions/embedded2/jobd.job
+++ b/unit/executions/embedded2/jobd.job
@@ -1,3 +1,4 @@
 type=flow
-flow.name=innerFlow
+flow.name=innerFlow2
 dependencies=joba
+jobdprop=poop
diff --git a/unit/java/azkaban/test/execapp/FlowRunnerTest.java b/unit/java/azkaban/test/execapp/FlowRunnerTest.java
index 7634a56..d7fe287 100644
--- a/unit/java/azkaban/test/execapp/FlowRunnerTest.java
+++ b/unit/java/azkaban/test/execapp/FlowRunnerTest.java
@@ -39,11 +39,13 @@ public class FlowRunnerTest {
 	@Before
 	public void setUp() throws Exception {
 		System.out.println("Create temp dir");
-		workingDir = new File("_AzkabanTestDir_" + System.currentTimeMillis());
-		if (workingDir.exists()) {
-			FileUtils.deleteDirectory(workingDir);
+		synchronized ( this) {
+			workingDir = new File("_AzkabanTestDir_" + System.currentTimeMillis());
+			if (workingDir.exists()) {
+				FileUtils.deleteDirectory(workingDir);
+			}
+			workingDir.mkdirs();
 		}
-		workingDir.mkdirs();
 		jobtypeManager = new JobTypeManager(null, this.getClass().getClassLoader());
 		jobtypeManager.registerJobType("java", JavaJob.class);
 		jobtypeManager.registerJobType("test", InteractiveTestJob.class);
@@ -55,9 +57,11 @@ public class FlowRunnerTest {
 	@After
 	public void tearDown() throws IOException {
 		System.out.println("Teardown temp dir");
-		if (workingDir != null) {
-			FileUtils.deleteDirectory(workingDir);
-			workingDir = null;
+		synchronized ( this) {
+			if (workingDir != null) {
+				FileUtils.deleteDirectory(workingDir);
+				workingDir = null;
+			}
 		}
 	}
 	
@@ -194,8 +198,16 @@ public class FlowRunnerTest {
 		
 		Assert.assertTrue(runner.isCancelled());
 		
-		Assert.assertTrue("Expected flow " + Status.KILLED + " instead " + exFlow.getStatus(), exFlow.getStatus() == Status.KILLED);
+		Assert.assertTrue("Expected flow " + Status.FAILED + " instead " + exFlow.getStatus(), exFlow.getStatus() == Status.FAILED);
 		
+		synchronized(this) {
+			try {
+				wait(500);
+			} catch(InterruptedException e) {
+				
+			}
+		}
+
 		testStatus(exFlow, "job1", Status.SUCCEEDED);
 		testStatus(exFlow, "job2d", Status.FAILED);
 		testStatus(exFlow, "job3", Status.KILLED);
@@ -231,6 +243,14 @@ public class FlowRunnerTest {
 		ExecutableFlow exFlow = runner.getExecutableFlow();
 		Assert.assertTrue("Expected flow " + Status.FAILED + " instead " + exFlow.getStatus(), exFlow.getStatus() == Status.FAILED);
 		
+		synchronized(this) {
+			try {
+				wait(500);
+			} catch(InterruptedException e) {
+				
+			}
+		}
+		
 		testStatus(exFlow, "job1", Status.SUCCEEDED);
 		testStatus(exFlow, "job2d", Status.FAILED);
 		testStatus(exFlow, "job3", Status.SUCCEEDED);
@@ -279,7 +299,7 @@ public class FlowRunnerTest {
 		synchronized(this) {
 			// Wait for cleanup.
 			try {
-				wait(1000);
+				wait(2000);
 			} catch (InterruptedException e) {
 				// TODO Auto-generated catch block
 				e.printStackTrace();
@@ -345,7 +365,9 @@ public class FlowRunnerTest {
 	}
 	
 	private ExecutableFlow prepareExecDir(File execDir, String flowName, int execId) throws IOException {
-		FileUtils.copyDirectory(execDir, workingDir);
+		synchronized ( this) {
+			FileUtils.copyDirectory(execDir, workingDir);
+		}
 		
 		File jsonFlowFile = new File(workingDir, flowName + ".flow");
 		@SuppressWarnings("unchecked")
diff --git a/unit/java/azkaban/test/execapp/FlowRunnerTest2.java b/unit/java/azkaban/test/execapp/FlowRunnerTest2.java
index a8f6d34..909d490 100644
--- a/unit/java/azkaban/test/execapp/FlowRunnerTest2.java
+++ b/unit/java/azkaban/test/execapp/FlowRunnerTest2.java
@@ -84,7 +84,7 @@ public class FlowRunnerTest2 {
 		ExecutableFlow flow = runner.getExecutableFlow();
 		createExpectedStateMap(flow, expectedStateMap, nodeMap);
 		runFlowRunnerInThread(runner);
-		pause(500);
+		pause(250);
 		
 		// After it starts up, only joba should be running
 		expectedStateMap.put("joba", Status.RUNNING);
@@ -112,22 +112,20 @@ public class FlowRunnerTest2 {
 		Assert.assertEquals("test2.8", joba1.get("param8"));
 		
 		// 2. JOB A COMPLETES SUCCESSFULLY 
-		InteractiveTestJob testJoba = InteractiveTestJob.getTestJob("joba");
-		Props jobAOut = new Props();
-		jobAOut.put("output.joba", "joba");
-		testJoba.succeedJob(jobAOut);
-		pause(500);
+		InteractiveTestJob.getTestJob("joba").succeedJob(Props.of("output.joba", "joba", "output.override", "joba"));
+		pause(250);
 		expectedStateMap.put("joba", Status.SUCCEEDED);
 		expectedStateMap.put("joba1", Status.RUNNING);
 		expectedStateMap.put("jobb", Status.RUNNING);
 		expectedStateMap.put("jobc", Status.RUNNING);
 		expectedStateMap.put("jobd", Status.RUNNING);
-		expectedStateMap.put("joba:innerJobA", Status.RUNNING);
+		expectedStateMap.put("jobd:innerJobA", Status.RUNNING);
 		expectedStateMap.put("jobb:innerJobA", Status.RUNNING);
-		expectedStateMap.put("jobc:innerJobA", Status.RUNNING);
+		compareStates(expectedStateMap, nodeMap);
 		
 		Props jobb = nodeMap.get("jobb").getInputProps();
 		Assert.assertEquals("test1.1", jobb.get("param1"));
+		Assert.assertEquals("test1.1", jobb.get("param1"));
 		Assert.assertEquals("test1.2", jobb.get("param2"));
 		Assert.assertEquals("test1.3", jobb.get("param3"));
 		Assert.assertEquals("override.4", jobb.get("param4"));
@@ -136,6 +134,9 @@ public class FlowRunnerTest2 {
 		Assert.assertEquals("test2.7", jobb.get("param7"));
 		Assert.assertEquals("test2.8", jobb.get("param8"));
 		Assert.assertEquals("test2.8", jobb.get("param8"));
+		// Test that jobb properties overwrites the output properties
+		Assert.assertEquals("moo", jobb.get("testprops"));
+		Assert.assertEquals("jobb", jobb.get("output.override"));
 		Assert.assertEquals("joba", jobb.get("output.joba"));
 		
 		Props jobbInnerJobA = nodeMap.get("jobb:innerJobA").getInputProps();
@@ -148,6 +149,79 @@ public class FlowRunnerTest2 {
 		Assert.assertEquals("test2.7", jobbInnerJobA.get("param7"));
 		Assert.assertEquals("test2.8", jobbInnerJobA.get("param8"));
 		Assert.assertEquals("joba", jobbInnerJobA.get("output.joba"));
+		
+		// 3. jobb:Inner completes
+		/// innerJobA completes
+		InteractiveTestJob.getTestJob("jobb:innerJobA").succeedJob(Props.of("output.jobb.innerJobA", "jobb.innerJobA"));
+		pause(250);
+		expectedStateMap.put("jobb:innerJobA", Status.SUCCEEDED);
+		expectedStateMap.put("jobb:innerJobB", Status.RUNNING);
+		expectedStateMap.put("jobb:innerJobC", Status.RUNNING);
+		compareStates(expectedStateMap, nodeMap);
+		Props jobbInnerJobB = nodeMap.get("jobb:innerJobB").getInputProps();
+		Assert.assertEquals("test1.1", jobbInnerJobB.get("param1"));
+		Assert.assertEquals("override.4", jobbInnerJobB.get("param4"));
+		Assert.assertEquals("jobb.innerJobA", jobbInnerJobB.get("output.jobb.innerJobA"));
+		Assert.assertEquals("moo", jobbInnerJobB.get("testprops"));
+		/// innerJobB, C completes
+		InteractiveTestJob.getTestJob("jobb:innerJobB").succeedJob(Props.of("output.jobb.innerJobB", "jobb.innerJobB"));
+		InteractiveTestJob.getTestJob("jobb:innerJobC").succeedJob(Props.of("output.jobb.innerJobC", "jobb.innerJobC"));
+		pause(250);
+		expectedStateMap.put("jobb:innerJobB", Status.SUCCEEDED);
+		expectedStateMap.put("jobb:innerJobC", Status.SUCCEEDED);
+		expectedStateMap.put("jobb:innerFlow", Status.RUNNING);
+		compareStates(expectedStateMap, nodeMap);
+		
+		Props jobbInnerJobD = nodeMap.get("jobb:innerFlow").getInputProps();
+		Assert.assertEquals("test1.1", jobbInnerJobD.get("param1"));
+		Assert.assertEquals("override.4", jobbInnerJobD.get("param4"));
+		Assert.assertEquals("jobb.innerJobB", jobbInnerJobD.get("output.jobb.innerJobB"));
+		Assert.assertEquals("jobb.innerJobC", jobbInnerJobD.get("output.jobb.innerJobC"));
+		
+		// 4. Finish up on inner flow for jobb
+		InteractiveTestJob.getTestJob("jobb:innerFlow").succeedJob(Props.of("output1.jobb", "test1", "output2.jobb", "test2"));
+		pause(250);
+		expectedStateMap.put("jobb:innerFlow", Status.SUCCEEDED);
+		expectedStateMap.put("jobb", Status.SUCCEEDED);
+		compareStates(expectedStateMap, nodeMap);
+		Props jobbOutput = nodeMap.get("jobb").getOutputProps();
+		Assert.assertEquals("test1", jobbOutput.get("output1.jobb"));
+		Assert.assertEquals("test2", jobbOutput.get("output2.jobb"));
+		
+		// 5. Finish jobc, jobd
+		InteractiveTestJob.getTestJob("jobc").succeedJob(Props.of("output.jobc", "jobc"));
+		pause(250);
+		expectedStateMap.put("jobc", Status.SUCCEEDED);
+		compareStates(expectedStateMap, nodeMap);
+		InteractiveTestJob.getTestJob("jobd:innerJobA").succeedJob();
+		pause(250);
+		InteractiveTestJob.getTestJob("jobd:innerFlow2").succeedJob();
+		pause(250);
+		expectedStateMap.put("jobd:innerJobA", Status.SUCCEEDED);
+		expectedStateMap.put("jobd:innerFlow2", Status.SUCCEEDED);
+		expectedStateMap.put("jobd", Status.SUCCEEDED);
+		expectedStateMap.put("jobe", Status.RUNNING);
+		compareStates(expectedStateMap, nodeMap);
+		
+		Props jobd = nodeMap.get("jobe").getInputProps();
+		Assert.assertEquals("test1", jobd.get("output1.jobb"));
+		Assert.assertEquals("jobc", jobd.get("output.jobc"));
+		
+		// 6. Finish off flow
+		InteractiveTestJob.getTestJob("joba1").succeedJob();
+		pause(250);
+		InteractiveTestJob.getTestJob("jobe").succeedJob();
+		pause(250);
+		expectedStateMap.put("joba1", Status.SUCCEEDED);
+		expectedStateMap.put("jobe", Status.SUCCEEDED);
+		expectedStateMap.put("jobf", Status.RUNNING);
+		compareStates(expectedStateMap, nodeMap);
+		
+		InteractiveTestJob.getTestJob("jobf").succeedJob();
+		pause(250);
+		expectedStateMap.put("jobf", Status.SUCCEEDED);
+		compareStates(expectedStateMap, nodeMap);
+		Assert.assertEquals(Status.SUCCEEDED, flow.getStatus());
 	}
 	
 	private Thread runFlowRunnerInThread(FlowRunner runner) {
@@ -225,4 +299,5 @@ public class FlowRunnerTest2 {
 		
 		return runner;
 	}
+
 }
diff --git a/unit/java/azkaban/test/executor/InteractiveTestJob.java b/unit/java/azkaban/test/executor/InteractiveTestJob.java
index 41d525c..f117d97 100644
--- a/unit/java/azkaban/test/executor/InteractiveTestJob.java
+++ b/unit/java/azkaban/test/executor/InteractiveTestJob.java
@@ -29,7 +29,7 @@ public class InteractiveTestJob extends AbstractProcessJob {
 	@Override
 	public void run() throws Exception {
 		String nestedFlowPath = this.getJobProps().get(CommonJobProperties.NESTED_FLOW_PATH);
-		String id = nestedFlowPath == null ? this.getId() : nestedFlowPath + ":" + this.getId();
+		String id = nestedFlowPath == null ? this.getId() : nestedFlowPath;
 		testJobs.put(id, this);
 		
 		while(isWaiting) {