azkaban-memoizeit

Details

diff --git a/src/java/azkaban/execapp/FlowRunner.java b/src/java/azkaban/execapp/FlowRunner.java
index f4d7ff7..01898bb 100644
--- a/src/java/azkaban/execapp/FlowRunner.java
+++ b/src/java/azkaban/execapp/FlowRunner.java
@@ -703,7 +703,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 	
 	private void reEnableDependents(ExecutableNode node) {
 		for(String dependent: node.getOutNodes()) {
-			ExecutableNode dependentNode = flow.getExecutableNode(dependent);
+			ExecutableNode dependentNode = node.getParentFlow().getExecutableNode(dependent);
 			
 			if (dependentNode.getStatus() == Status.KILLED) {
 				dependentNode.setStatus(Status.READY);
diff --git a/unit/java/azkaban/test/execapp/FlowRunnerTest2.java b/unit/java/azkaban/test/execapp/FlowRunnerTest2.java
index 6f794d0..312dea1 100644
--- a/unit/java/azkaban/test/execapp/FlowRunnerTest2.java
+++ b/unit/java/azkaban/test/execapp/FlowRunnerTest2.java
@@ -594,6 +594,225 @@ public class FlowRunnerTest2 {
 		Assert.assertFalse(thread.isAlive());
 	}
 	
+	@Test
+	public void testRetryOnFailure() throws Exception {
+		// Test propagation of KILLED status to embedded flows different branch
+		EventCollectorListener eventCollector = new EventCollectorListener();
+		FlowRunner runner = createFlowRunner(eventCollector, "jobf");
+		ExecutableFlow flow = runner.getExecutableFlow();
+		Map<String, Status> expectedStateMap = new HashMap<String, Status>();
+		Map<String, ExecutableNode> nodeMap = new HashMap<String, ExecutableNode>();
+		flow.getExecutableNode("joba").setStatus(Status.DISABLED);
+		((ExecutableFlowBase)flow.getExecutableNode("jobb")).getExecutableNode("innerFlow").setStatus(Status.DISABLED);
+		
+		// 1. START FLOW
+		createExpectedStateMap(flow, expectedStateMap, nodeMap);
+		Thread thread = runFlowRunnerInThread(runner);
+		pause(250);
+		
+		// After it starts up, only joba should be running
+		expectedStateMap.put("joba", Status.SKIPPED);
+		expectedStateMap.put("joba1", Status.RUNNING);
+		expectedStateMap.put("jobb", Status.RUNNING);
+		expectedStateMap.put("jobc", Status.RUNNING);
+		expectedStateMap.put("jobd", Status.RUNNING);
+		expectedStateMap.put("jobb:innerJobA", Status.RUNNING);
+		expectedStateMap.put("jobd:innerJobA", Status.RUNNING);
+		compareStates(expectedStateMap, nodeMap);
+
+		InteractiveTestJob.getTestJob("jobb:innerJobA").succeedJob();
+		pause(250);
+		expectedStateMap.put("jobb:innerJobA", Status.SUCCEEDED);
+		expectedStateMap.put("jobb:innerJobB", Status.RUNNING);
+		expectedStateMap.put("jobb:innerJobC", Status.RUNNING);
+		compareStates(expectedStateMap, nodeMap);
+		
+		InteractiveTestJob.getTestJob("jobb:innerJobB").failJob();
+		InteractiveTestJob.getTestJob("jobb:innerJobC").failJob();
+		pause(250);
+		InteractiveTestJob.getTestJob("jobd:innerJobA").succeedJob();
+		pause(250);
+		expectedStateMap.put("jobb", Status.FAILED);
+		expectedStateMap.put("jobb:innerJobB", Status.FAILED);
+		expectedStateMap.put("jobb:innerJobC", Status.FAILED);
+		expectedStateMap.put("jobb:innerFlow", Status.SKIPPED);
+		expectedStateMap.put("jobd:innerFlow2", Status.KILLED);
+		expectedStateMap.put("jobd:innerJobA", Status.SUCCEEDED);
+		expectedStateMap.put("jobd", Status.KILLED);
+		Assert.assertEquals(Status.FAILED_FINISHING, flow.getStatus());
+		compareStates(expectedStateMap, nodeMap);
+		
+		runner.retryFailures("me");
+		pause(500);
+		expectedStateMap.put("jobb:innerJobB", Status.RUNNING);
+		expectedStateMap.put("jobb:innerJobC", Status.RUNNING);
+		expectedStateMap.put("jobb", Status.RUNNING);
+		expectedStateMap.put("jobd", Status.RUNNING);
+		expectedStateMap.put("jobb:innerFlow", Status.DISABLED);
+		expectedStateMap.put("jobd:innerFlow2", Status.RUNNING);
+		Assert.assertEquals(Status.RUNNING, flow.getStatus());
+		compareStates(expectedStateMap, nodeMap);
+		Assert.assertTrue(thread.isAlive());
+		
+		InteractiveTestJob.getTestJob("jobb:innerJobB").succeedJob();
+		InteractiveTestJob.getTestJob("jobb:innerJobC").succeedJob();
+		InteractiveTestJob.getTestJob("jobd:innerFlow2").succeedJob();
+		InteractiveTestJob.getTestJob("jobc").succeedJob();
+		pause(250);
+		expectedStateMap.put("jobb:innerFlow", Status.SKIPPED);
+		expectedStateMap.put("jobb", Status.SUCCEEDED);
+		expectedStateMap.put("jobb:innerJobB", Status.SUCCEEDED);
+		expectedStateMap.put("jobb:innerJobC", Status.SUCCEEDED);
+		expectedStateMap.put("jobc", Status.SUCCEEDED);
+		expectedStateMap.put("jobd", Status.SUCCEEDED);
+		expectedStateMap.put("jobd:innerFlow2", Status.SUCCEEDED);
+		expectedStateMap.put("jobe", Status.RUNNING);
+		compareStates(expectedStateMap, nodeMap);
+		
+		InteractiveTestJob.getTestJob("jobe").succeedJob();
+		pause(250);
+		expectedStateMap.put("jobe", Status.SUCCEEDED);
+		compareStates(expectedStateMap, nodeMap);
+		
+		InteractiveTestJob.getTestJob("joba1").succeedJob();
+		pause(250);
+		expectedStateMap.put("joba1", Status.SUCCEEDED);
+		expectedStateMap.put("jobf", Status.RUNNING);
+		compareStates(expectedStateMap, nodeMap);
+		
+		InteractiveTestJob.getTestJob("jobf").succeedJob();
+		pause(250);
+		expectedStateMap.put("jobf", Status.SUCCEEDED);
+		compareStates(expectedStateMap, nodeMap);
+		Assert.assertEquals(Status.SUCCEEDED, flow.getStatus());
+		Assert.assertFalse(thread.isAlive());
+	}
+	
+	@Test
+	public void testCancel() throws Exception {
+		// Test propagation of KILLED status to embedded flows different branch
+		EventCollectorListener eventCollector = new EventCollectorListener();
+		FlowRunner runner = createFlowRunner(eventCollector, "jobf", FailureAction.CANCEL_ALL);
+		ExecutableFlow flow = runner.getExecutableFlow();
+		Map<String, Status> expectedStateMap = new HashMap<String, Status>();
+		Map<String, ExecutableNode> nodeMap = new HashMap<String, ExecutableNode>();
+		
+		// 1. START FLOW
+		createExpectedStateMap(flow, expectedStateMap, nodeMap);
+		Thread thread = runFlowRunnerInThread(runner);
+		pause(250);
+		
+		// After it starts up, only joba should be running
+		expectedStateMap.put("joba", Status.RUNNING);
+		expectedStateMap.put("joba1", Status.RUNNING);
+		compareStates(expectedStateMap, nodeMap);
+
+		// 2. JOB in subflow FAILS
+		InteractiveTestJob.getTestJob("joba").succeedJob();
+		pause(250);
+		expectedStateMap.put("joba", Status.SUCCEEDED);
+		expectedStateMap.put("jobb", Status.RUNNING);
+		expectedStateMap.put("jobc", Status.RUNNING);
+		expectedStateMap.put("jobd", Status.RUNNING);
+		expectedStateMap.put("jobb:innerJobA", Status.RUNNING);
+		expectedStateMap.put("jobd:innerJobA", Status.RUNNING);
+		compareStates(expectedStateMap, nodeMap);
+		
+		InteractiveTestJob.getTestJob("joba1").succeedJob();
+		InteractiveTestJob.getTestJob("jobb:innerJobA").succeedJob();
+		pause(250);
+		expectedStateMap.put("jobb:innerJobA", Status.SUCCEEDED);
+		expectedStateMap.put("joba1", Status.SUCCEEDED);
+		expectedStateMap.put("jobb:innerJobB", Status.RUNNING);
+		expectedStateMap.put("jobb:innerJobC", Status.RUNNING);
+		compareStates(expectedStateMap, nodeMap);
+		
+		runner.cancel("me");
+		pause(1000);
+		
+		expectedStateMap.put("jobb", Status.KILLED);
+		expectedStateMap.put("jobb:innerJobB", Status.FAILED);
+		expectedStateMap.put("jobb:innerJobC", Status.FAILED);
+		expectedStateMap.put("jobb:innerFlow", Status.KILLED);
+		expectedStateMap.put("jobc", Status.FAILED);
+		expectedStateMap.put("jobd", Status.KILLED);
+		expectedStateMap.put("jobd:innerJobA", Status.FAILED);
+		expectedStateMap.put("jobd:innerFlow2", Status.KILLED);
+		expectedStateMap.put("jobe", Status.KILLED);
+		expectedStateMap.put("jobf", Status.KILLED);
+		
+		Assert.assertEquals(Status.KILLED, flow.getStatus());
+		compareStates(expectedStateMap, nodeMap);
+		Assert.assertFalse(thread.isAlive());
+	}
+	
+	@Test
+	public void testManualCancelOnFailure() throws Exception {
+		// Test propagation of KILLED status to embedded flows different branch
+		EventCollectorListener eventCollector = new EventCollectorListener();
+		FlowRunner runner = createFlowRunner(eventCollector, "jobf");
+		ExecutableFlow flow = runner.getExecutableFlow();
+		Map<String, Status> expectedStateMap = new HashMap<String, Status>();
+		Map<String, ExecutableNode> nodeMap = new HashMap<String, ExecutableNode>();
+		
+		// 1. START FLOW
+		createExpectedStateMap(flow, expectedStateMap, nodeMap);
+		Thread thread = runFlowRunnerInThread(runner);
+		pause(250);
+		
+		// After it starts up, only joba should be running
+		expectedStateMap.put("joba", Status.RUNNING);
+		expectedStateMap.put("joba1", Status.RUNNING);
+		compareStates(expectedStateMap, nodeMap);
+
+		// 2. JOB in subflow FAILS
+		InteractiveTestJob.getTestJob("joba").succeedJob();
+		pause(250);
+		expectedStateMap.put("joba", Status.SUCCEEDED);
+		expectedStateMap.put("jobb", Status.RUNNING);
+		expectedStateMap.put("jobc", Status.RUNNING);
+		expectedStateMap.put("jobd", Status.RUNNING);
+		expectedStateMap.put("jobb:innerJobA", Status.RUNNING);
+		expectedStateMap.put("jobd:innerJobA", Status.RUNNING);
+		compareStates(expectedStateMap, nodeMap);
+		
+		InteractiveTestJob.getTestJob("joba1").succeedJob();
+		InteractiveTestJob.getTestJob("jobb:innerJobA").succeedJob();
+		pause(250);
+		expectedStateMap.put("jobb:innerJobA", Status.SUCCEEDED);
+		expectedStateMap.put("joba1", Status.SUCCEEDED);
+		expectedStateMap.put("jobb:innerJobB", Status.RUNNING);
+		expectedStateMap.put("jobb:innerJobC", Status.RUNNING);
+		compareStates(expectedStateMap, nodeMap);
+		
+		InteractiveTestJob.getTestJob("jobb:innerJobB").failJob();
+		pause(250);
+		expectedStateMap.put("jobb:innerJobB", Status.FAILED);
+		expectedStateMap.put("jobb", Status.FAILED_FINISHING);
+		Assert.assertEquals(Status.FAILED_FINISHING, flow.getStatus());
+		compareStates(expectedStateMap, nodeMap);
+		
+		runner.cancel("me");
+		pause(1000);
+		
+		expectedStateMap.put("jobb", Status.FAILED);
+		expectedStateMap.put("jobb:innerJobC", Status.FAILED);
+		expectedStateMap.put("jobb:innerFlow", Status.KILLED);
+		expectedStateMap.put("jobc", Status.FAILED);
+		expectedStateMap.put("jobd", Status.KILLED);
+		expectedStateMap.put("jobd:innerJobA", Status.FAILED);
+		expectedStateMap.put("jobd:innerFlow2", Status.KILLED);
+		expectedStateMap.put("jobe", Status.KILLED);
+		expectedStateMap.put("jobf", Status.KILLED);
+		
+		Assert.assertEquals(Status.FAILED, flow.getStatus());
+		compareStates(expectedStateMap, nodeMap);
+		Assert.assertFalse(thread.isAlive());
+	}
+	
+	// @TODO write test for:
+	// Pipeline
+	
 	private Thread runFlowRunnerInThread(FlowRunner runner) {
 		Thread thread = new Thread(runner);
 		thread.start();