azkaban-memoizeit
Changes
unit/java/azkaban/test/execapp/FlowRunnerTest2.java 219(+219 -0)
Details
diff --git a/src/java/azkaban/execapp/FlowRunner.java b/src/java/azkaban/execapp/FlowRunner.java
index f4d7ff7..01898bb 100644
--- a/src/java/azkaban/execapp/FlowRunner.java
+++ b/src/java/azkaban/execapp/FlowRunner.java
@@ -703,7 +703,7 @@ public class FlowRunner extends EventHandler implements Runnable {
private void reEnableDependents(ExecutableNode node) {
for(String dependent: node.getOutNodes()) {
- ExecutableNode dependentNode = flow.getExecutableNode(dependent);
+ ExecutableNode dependentNode = node.getParentFlow().getExecutableNode(dependent);
if (dependentNode.getStatus() == Status.KILLED) {
dependentNode.setStatus(Status.READY);
unit/java/azkaban/test/execapp/FlowRunnerTest2.java 219(+219 -0)
diff --git a/unit/java/azkaban/test/execapp/FlowRunnerTest2.java b/unit/java/azkaban/test/execapp/FlowRunnerTest2.java
index 6f794d0..312dea1 100644
--- a/unit/java/azkaban/test/execapp/FlowRunnerTest2.java
+++ b/unit/java/azkaban/test/execapp/FlowRunnerTest2.java
@@ -594,6 +594,225 @@ public class FlowRunnerTest2 {
Assert.assertFalse(thread.isAlive());
}
+ @Test
+ public void testRetryOnFailure() throws Exception {
+ // Test propagation of KILLED status to embedded flows different branch
+ EventCollectorListener eventCollector = new EventCollectorListener();
+ FlowRunner runner = createFlowRunner(eventCollector, "jobf");
+ ExecutableFlow flow = runner.getExecutableFlow();
+ Map<String, Status> expectedStateMap = new HashMap<String, Status>();
+ Map<String, ExecutableNode> nodeMap = new HashMap<String, ExecutableNode>();
+ flow.getExecutableNode("joba").setStatus(Status.DISABLED);
+ ((ExecutableFlowBase)flow.getExecutableNode("jobb")).getExecutableNode("innerFlow").setStatus(Status.DISABLED);
+
+ // 1. START FLOW
+ createExpectedStateMap(flow, expectedStateMap, nodeMap);
+ Thread thread = runFlowRunnerInThread(runner);
+ pause(250);
+
+ // After it starts up, only joba should be running
+ expectedStateMap.put("joba", Status.SKIPPED);
+ expectedStateMap.put("joba1", Status.RUNNING);
+ expectedStateMap.put("jobb", Status.RUNNING);
+ expectedStateMap.put("jobc", Status.RUNNING);
+ expectedStateMap.put("jobd", Status.RUNNING);
+ expectedStateMap.put("jobb:innerJobA", Status.RUNNING);
+ expectedStateMap.put("jobd:innerJobA", Status.RUNNING);
+ compareStates(expectedStateMap, nodeMap);
+
+ InteractiveTestJob.getTestJob("jobb:innerJobA").succeedJob();
+ pause(250);
+ expectedStateMap.put("jobb:innerJobA", Status.SUCCEEDED);
+ expectedStateMap.put("jobb:innerJobB", Status.RUNNING);
+ expectedStateMap.put("jobb:innerJobC", Status.RUNNING);
+ compareStates(expectedStateMap, nodeMap);
+
+ InteractiveTestJob.getTestJob("jobb:innerJobB").failJob();
+ InteractiveTestJob.getTestJob("jobb:innerJobC").failJob();
+ pause(250);
+ InteractiveTestJob.getTestJob("jobd:innerJobA").succeedJob();
+ pause(250);
+ expectedStateMap.put("jobb", Status.FAILED);
+ expectedStateMap.put("jobb:innerJobB", Status.FAILED);
+ expectedStateMap.put("jobb:innerJobC", Status.FAILED);
+ expectedStateMap.put("jobb:innerFlow", Status.SKIPPED);
+ expectedStateMap.put("jobd:innerFlow2", Status.KILLED);
+ expectedStateMap.put("jobd:innerJobA", Status.SUCCEEDED);
+ expectedStateMap.put("jobd", Status.KILLED);
+ Assert.assertEquals(Status.FAILED_FINISHING, flow.getStatus());
+ compareStates(expectedStateMap, nodeMap);
+
+ runner.retryFailures("me");
+ pause(500);
+ expectedStateMap.put("jobb:innerJobB", Status.RUNNING);
+ expectedStateMap.put("jobb:innerJobC", Status.RUNNING);
+ expectedStateMap.put("jobb", Status.RUNNING);
+ expectedStateMap.put("jobd", Status.RUNNING);
+ expectedStateMap.put("jobb:innerFlow", Status.DISABLED);
+ expectedStateMap.put("jobd:innerFlow2", Status.RUNNING);
+ Assert.assertEquals(Status.RUNNING, flow.getStatus());
+ compareStates(expectedStateMap, nodeMap);
+ Assert.assertTrue(thread.isAlive());
+
+ InteractiveTestJob.getTestJob("jobb:innerJobB").succeedJob();
+ InteractiveTestJob.getTestJob("jobb:innerJobC").succeedJob();
+ InteractiveTestJob.getTestJob("jobd:innerFlow2").succeedJob();
+ InteractiveTestJob.getTestJob("jobc").succeedJob();
+ pause(250);
+ expectedStateMap.put("jobb:innerFlow", Status.SKIPPED);
+ expectedStateMap.put("jobb", Status.SUCCEEDED);
+ expectedStateMap.put("jobb:innerJobB", Status.SUCCEEDED);
+ expectedStateMap.put("jobb:innerJobC", Status.SUCCEEDED);
+ expectedStateMap.put("jobc", Status.SUCCEEDED);
+ expectedStateMap.put("jobd", Status.SUCCEEDED);
+ expectedStateMap.put("jobd:innerFlow2", Status.SUCCEEDED);
+ expectedStateMap.put("jobe", Status.RUNNING);
+ compareStates(expectedStateMap, nodeMap);
+
+ InteractiveTestJob.getTestJob("jobe").succeedJob();
+ pause(250);
+ expectedStateMap.put("jobe", Status.SUCCEEDED);
+ compareStates(expectedStateMap, nodeMap);
+
+ InteractiveTestJob.getTestJob("joba1").succeedJob();
+ pause(250);
+ expectedStateMap.put("joba1", Status.SUCCEEDED);
+ expectedStateMap.put("jobf", Status.RUNNING);
+ compareStates(expectedStateMap, nodeMap);
+
+ InteractiveTestJob.getTestJob("jobf").succeedJob();
+ pause(250);
+ expectedStateMap.put("jobf", Status.SUCCEEDED);
+ compareStates(expectedStateMap, nodeMap);
+ Assert.assertEquals(Status.SUCCEEDED, flow.getStatus());
+ Assert.assertFalse(thread.isAlive());
+ }
+
+ @Test
+ public void testCancel() throws Exception {
+ // Test propagation of KILLED status to embedded flows different branch
+ EventCollectorListener eventCollector = new EventCollectorListener();
+ FlowRunner runner = createFlowRunner(eventCollector, "jobf", FailureAction.CANCEL_ALL);
+ ExecutableFlow flow = runner.getExecutableFlow();
+ Map<String, Status> expectedStateMap = new HashMap<String, Status>();
+ Map<String, ExecutableNode> nodeMap = new HashMap<String, ExecutableNode>();
+
+ // 1. START FLOW
+ createExpectedStateMap(flow, expectedStateMap, nodeMap);
+ Thread thread = runFlowRunnerInThread(runner);
+ pause(250);
+
+ // After it starts up, only joba should be running
+ expectedStateMap.put("joba", Status.RUNNING);
+ expectedStateMap.put("joba1", Status.RUNNING);
+ compareStates(expectedStateMap, nodeMap);
+
+ // 2. JOB in subflow FAILS
+ InteractiveTestJob.getTestJob("joba").succeedJob();
+ pause(250);
+ expectedStateMap.put("joba", Status.SUCCEEDED);
+ expectedStateMap.put("jobb", Status.RUNNING);
+ expectedStateMap.put("jobc", Status.RUNNING);
+ expectedStateMap.put("jobd", Status.RUNNING);
+ expectedStateMap.put("jobb:innerJobA", Status.RUNNING);
+ expectedStateMap.put("jobd:innerJobA", Status.RUNNING);
+ compareStates(expectedStateMap, nodeMap);
+
+ InteractiveTestJob.getTestJob("joba1").succeedJob();
+ InteractiveTestJob.getTestJob("jobb:innerJobA").succeedJob();
+ pause(250);
+ expectedStateMap.put("jobb:innerJobA", Status.SUCCEEDED);
+ expectedStateMap.put("joba1", Status.SUCCEEDED);
+ expectedStateMap.put("jobb:innerJobB", Status.RUNNING);
+ expectedStateMap.put("jobb:innerJobC", Status.RUNNING);
+ compareStates(expectedStateMap, nodeMap);
+
+ runner.cancel("me");
+ pause(1000);
+
+ expectedStateMap.put("jobb", Status.KILLED);
+ expectedStateMap.put("jobb:innerJobB", Status.FAILED);
+ expectedStateMap.put("jobb:innerJobC", Status.FAILED);
+ expectedStateMap.put("jobb:innerFlow", Status.KILLED);
+ expectedStateMap.put("jobc", Status.FAILED);
+ expectedStateMap.put("jobd", Status.KILLED);
+ expectedStateMap.put("jobd:innerJobA", Status.FAILED);
+ expectedStateMap.put("jobd:innerFlow2", Status.KILLED);
+ expectedStateMap.put("jobe", Status.KILLED);
+ expectedStateMap.put("jobf", Status.KILLED);
+
+ Assert.assertEquals(Status.KILLED, flow.getStatus());
+ compareStates(expectedStateMap, nodeMap);
+ Assert.assertFalse(thread.isAlive());
+ }
+
+ @Test
+ public void testManualCancelOnFailure() throws Exception {
+ // Test propagation of KILLED status to embedded flows different branch
+ EventCollectorListener eventCollector = new EventCollectorListener();
+ FlowRunner runner = createFlowRunner(eventCollector, "jobf");
+ ExecutableFlow flow = runner.getExecutableFlow();
+ Map<String, Status> expectedStateMap = new HashMap<String, Status>();
+ Map<String, ExecutableNode> nodeMap = new HashMap<String, ExecutableNode>();
+
+ // 1. START FLOW
+ createExpectedStateMap(flow, expectedStateMap, nodeMap);
+ Thread thread = runFlowRunnerInThread(runner);
+ pause(250);
+
+ // After it starts up, only joba should be running
+ expectedStateMap.put("joba", Status.RUNNING);
+ expectedStateMap.put("joba1", Status.RUNNING);
+ compareStates(expectedStateMap, nodeMap);
+
+ // 2. JOB in subflow FAILS
+ InteractiveTestJob.getTestJob("joba").succeedJob();
+ pause(250);
+ expectedStateMap.put("joba", Status.SUCCEEDED);
+ expectedStateMap.put("jobb", Status.RUNNING);
+ expectedStateMap.put("jobc", Status.RUNNING);
+ expectedStateMap.put("jobd", Status.RUNNING);
+ expectedStateMap.put("jobb:innerJobA", Status.RUNNING);
+ expectedStateMap.put("jobd:innerJobA", Status.RUNNING);
+ compareStates(expectedStateMap, nodeMap);
+
+ InteractiveTestJob.getTestJob("joba1").succeedJob();
+ InteractiveTestJob.getTestJob("jobb:innerJobA").succeedJob();
+ pause(250);
+ expectedStateMap.put("jobb:innerJobA", Status.SUCCEEDED);
+ expectedStateMap.put("joba1", Status.SUCCEEDED);
+ expectedStateMap.put("jobb:innerJobB", Status.RUNNING);
+ expectedStateMap.put("jobb:innerJobC", Status.RUNNING);
+ compareStates(expectedStateMap, nodeMap);
+
+ InteractiveTestJob.getTestJob("jobb:innerJobB").failJob();
+ pause(250);
+ expectedStateMap.put("jobb:innerJobB", Status.FAILED);
+ expectedStateMap.put("jobb", Status.FAILED_FINISHING);
+ Assert.assertEquals(Status.FAILED_FINISHING, flow.getStatus());
+ compareStates(expectedStateMap, nodeMap);
+
+ runner.cancel("me");
+ pause(1000);
+
+ expectedStateMap.put("jobb", Status.FAILED);
+ expectedStateMap.put("jobb:innerJobC", Status.FAILED);
+ expectedStateMap.put("jobb:innerFlow", Status.KILLED);
+ expectedStateMap.put("jobc", Status.FAILED);
+ expectedStateMap.put("jobd", Status.KILLED);
+ expectedStateMap.put("jobd:innerJobA", Status.FAILED);
+ expectedStateMap.put("jobd:innerFlow2", Status.KILLED);
+ expectedStateMap.put("jobe", Status.KILLED);
+ expectedStateMap.put("jobf", Status.KILLED);
+
+ Assert.assertEquals(Status.FAILED, flow.getStatus());
+ compareStates(expectedStateMap, nodeMap);
+ Assert.assertFalse(thread.isAlive());
+ }
+
+ // @TODO write test for:
+ // Pipeline
+
private Thread runFlowRunnerInThread(FlowRunner runner) {
Thread thread = new Thread(runner);
thread.start();