azkaban-developers
Changes
unit/java/azkaban/test/execapp/FlowRunnerTest2.java 120(+119 -1)
Details
unit/java/azkaban/test/execapp/FlowRunnerTest2.java 120(+119 -1)
diff --git a/unit/java/azkaban/test/execapp/FlowRunnerTest2.java b/unit/java/azkaban/test/execapp/FlowRunnerTest2.java
index b2d2427..9605c12 100644
--- a/unit/java/azkaban/test/execapp/FlowRunnerTest2.java
+++ b/unit/java/azkaban/test/execapp/FlowRunnerTest2.java
@@ -30,6 +30,47 @@ import azkaban.test.executor.JavaJob;
import azkaban.utils.DirectoryFlowLoader;
import azkaban.utils.Props;
+/**
+ * Test the flow run, especially with embedded flows.
+ *
+ * This test uses executions/embedded2. It also mainly uses the flow named jobf. The test is designed to
+ * control success/failures explicitly so we don't have to time the flow exactly.
+ *
+ * Flow jobf looks like the following:
+ *
+ *
+ * joba joba1
+ * / | \ |
+ * / | \ |
+ * jobb jobd jobc |
+ * \ | / /
+ * \ | / /
+ * jobe /
+ * | /
+ * | /
+ * jobf
+ *
+ * The job 'jobb' is an embedded flow:
+ *
+ * jobb:innerFlow
+ *
+ * innerJobA
+ * / \
+ * innerJobB innerJobC
+ * \ /
+ * innerFlow
+ *
+ *
+ * The job 'jobd' is a simple embedded flow:
+ *
+ * jobd:innerFlow2
+ *
+ * innerJobA
+ * |
+ * innerFlow2
+ *
+ * The following tests checks each stage of the flow run by forcing jobs to succeed or fail.
+ */
public class FlowRunnerTest2 {
private File workingDir;
private JobTypeManager jobtypeManager;
@@ -73,6 +114,12 @@ public class FlowRunnerTest2 {
}
}
+ /**
+ * Tests the basic successful flow run, and also tests all output variables from
+ * each job.
+ *
+ * @throws Exception
+ */
@Test
public void testBasicRun() throws Exception {
EventCollectorListener eventCollector = new EventCollectorListener();
@@ -229,6 +276,10 @@ public class FlowRunnerTest2 {
Assert.assertFalse(thread.isAlive());
}
+ /**
+ * Tests a flow with Disabled jobs and flows. They should properly SKIP executions
+ * @throws Exception
+ */
@Test
public void testDisabledNormal() throws Exception {
EventCollectorListener eventCollector = new EventCollectorListener();
@@ -294,6 +345,13 @@ public class FlowRunnerTest2 {
Assert.assertFalse(thread.isAlive());
}
+ /**
+ * Tests a failure with the default FINISH_CURRENTLY_RUNNING.
+ * After the first failure, every job that started should complete, and the
+ * rest of the jobs should be skipped.
+ *
+ * @throws Exception
+ */
@Test
public void testNormalFailure1() throws Exception {
// Test propagation of KILLED status to embedded flows.
@@ -338,6 +396,10 @@ public class FlowRunnerTest2 {
Assert.assertFalse(thread.isAlive());
}
+ /**
+ * Test #2 on the default failure case.
+ * @throws Exception
+ */
@Test
public void testNormalFailure2() throws Exception {
// Test propagation of KILLED status to embedded flows different branch
@@ -468,6 +530,13 @@ public class FlowRunnerTest2 {
Assert.assertFalse(thread.isAlive());
}
+ /**
+ * Tests failures when the fail behaviour is FINISH_ALL_POSSIBLE.
+ * In this case, all jobs which have had its pre-requisite met can continue to run.
+ * Finishes when the failure is propagated to the last node of the flow.
+ *
+ * @throws Exception
+ */
@Test
public void testFailedFinishingFailure3() throws Exception {
// Test propagation of KILLED status to embedded flows different branch
@@ -540,6 +609,14 @@ public class FlowRunnerTest2 {
Assert.assertFalse(thread.isAlive());
}
+ /**
+ * Tests the failure condition when a failure invokes a cancel (or killed) on the flow.
+ *
+ * Any jobs that are running will be assigned a KILLED state, and any nodes which were
+ * skipped due to prior errors will be given a CANCELLED state.
+ *
+ * @throws Exception
+ */
@Test
public void testCancelOnFailure() throws Exception {
// Test propagation of KILLED status to embedded flows different branch
@@ -598,6 +675,10 @@ public class FlowRunnerTest2 {
}
+ /**
+ * Tests retries after a failure
+ * @throws Exception
+ */
@Test
public void testRetryOnFailure() throws Exception {
// Test propagation of KILLED status to embedded flows different branch
@@ -700,6 +781,12 @@ public class FlowRunnerTest2 {
Assert.assertFalse(thread.isAlive());
}
+ /**
+ * Tests the manual Killing of a flow. In this case, the flow is just fine before the cancel
+ * is called.
+ *
+ * @throws Exception
+ */
@Test
public void testCancel() throws Exception {
// Test propagation of KILLED status to embedded flows different branch
@@ -758,6 +845,11 @@ public class FlowRunnerTest2 {
Assert.assertFalse(thread.isAlive());
}
+ /**
+ * Tests the manual invocation of cancel on a flow that is FAILED_FINISHING
+ *
+ * @throws Exception
+ */
@Test
public void testManualCancelOnFailure() throws Exception {
// Test propagation of KILLED status to embedded flows different branch
@@ -822,6 +914,11 @@ public class FlowRunnerTest2 {
Assert.assertFalse(thread.isAlive());
}
+ /**
+ * Tests that pause and resume work
+ *
+ * @throws Exception
+ */
@Test
public void testPause() throws Exception {
EventCollectorListener eventCollector = new EventCollectorListener();
@@ -927,6 +1024,12 @@ public class FlowRunnerTest2 {
Assert.assertFalse(thread.isAlive());
}
+ /**
+ * Test the condition for a manual invocation of a KILL (cancel) on a flow that
+ * has been paused. The flow should unpause and be killed immediately.
+ *
+ * @throws Exception
+ */
@Test
public void testPauseKill() throws Exception {
EventCollectorListener eventCollector = new EventCollectorListener();
@@ -986,6 +1089,10 @@ public class FlowRunnerTest2 {
Assert.assertFalse(thread.isAlive());
}
+ /**
+ * Tests the case where a failure occurs on a Paused flow. In this case, the flow should stay paused.
+ * @throws Exception
+ */
@Test
public void testPauseFail() throws Exception {
EventCollectorListener eventCollector = new EventCollectorListener();
@@ -1026,7 +1133,8 @@ public class FlowRunnerTest2 {
expectedStateMap.put("jobd:innerJobA", Status.FAILED);
expectedStateMap.put("jobb:innerJobA", Status.SUCCEEDED);
compareStates(expectedStateMap, nodeMap);
-
+ Assert.assertEquals(flow.getStatus(), Status.PAUSED);
+
runner.resume("me");
pause(250);
expectedStateMap.put("jobb:innerJobB", Status.CANCELLED);
@@ -1049,6 +1157,11 @@ public class FlowRunnerTest2 {
Assert.assertFalse(thread.isAlive());
}
+ /**
+ * Test the condition when a Finish all possible is called during a pause.
+ * The Failure is not acted upon until the flow is resumed.
+ * @throws Exception
+ */
@Test
public void testPauseFailFinishAll() throws Exception {
EventCollectorListener eventCollector = new EventCollectorListener();
@@ -1118,6 +1231,11 @@ public class FlowRunnerTest2 {
Assert.assertFalse(thread.isAlive());
}
+ /**
+ * Tests the case when a flow is paused and a failure causes a kill. The
+ * flow should die immediately regardless of the 'paused' status.
+ * @throws Exception
+ */
@Test
public void testPauseFailKill() throws Exception {
EventCollectorListener eventCollector = new EventCollectorListener();