azkaban-developers
Changes
src/java/azkaban/execapp/FlowRunner.java 15(+13 -2)
src/java/azkaban/execapp/JobRunner.java 19(+13 -6)
unit/java/azkaban/test/execapp/FlowRunnerTest2.java 468(+468 -0)
Details
src/java/azkaban/execapp/FlowRunner.java 15(+13 -2)
diff --git a/src/java/azkaban/execapp/FlowRunner.java b/src/java/azkaban/execapp/FlowRunner.java
index 77c9016..6aba34a 100644
--- a/src/java/azkaban/execapp/FlowRunner.java
+++ b/src/java/azkaban/execapp/FlowRunner.java
@@ -704,7 +704,7 @@ public class FlowRunner extends EventHandler implements Runnable {
private void runExecutableNode(ExecutableNode node) throws IOException {
// Collect output props from the job's dependencies.
prepareJobProperties(node);
-
+
node.setStatus(Status.QUEUED);
JobRunner runner = createJobRunner(node);
logger.info("Submitting job '" + node.getNestedId() + "' to run.");
@@ -714,7 +714,6 @@ public class FlowRunner extends EventHandler implements Runnable {
} catch (RejectedExecutionException e) {
logger.error(e);
};
-
}
/**
@@ -842,11 +841,14 @@ public class FlowRunner extends EventHandler implements Runnable {
updateFlow();
}
}
+
+ interrupt();
}
public void kill(String user) {
synchronized(mainSyncObj) {
logger.info("Flow killed by " + user);
+ flow.setStatus(Status.KILLED);
kill();
updateFlow();
}
@@ -856,6 +858,8 @@ public class FlowRunner extends EventHandler implements Runnable {
private void kill() {
synchronized(mainSyncObj) {
logger.info("Kill has been called on flow " + execId);
+
+ // If the flow is paused, then we'll also unpause
flowPaused = false;
flowKilled = true;
@@ -996,6 +1000,13 @@ public class FlowRunner extends EventHandler implements Runnable {
long seconds = (node.getEndTime() - node.getStartTime())/1000;
synchronized(mainSyncObj) {
logger.info("Job " + node.getNestedId() + " finished with status " + node.getStatus() + " in " + seconds + " seconds");
+
+ // Cancellation is handled in the main thread, but if the flow is paused, the main thread is paused too.
+ // This unpauses the flow for cancellation.
+ if (flowPaused && node.getStatus() == Status.FAILED && failureAction == FailureAction.CANCEL_ALL) {
+ flowPaused = false;
+ }
+
finishedNodes.add(node);
node.getParentFlow().setUpdateTime(System.currentTimeMillis());
interrupt();
src/java/azkaban/execapp/JobRunner.java 19(+13 -6)
diff --git a/src/java/azkaban/execapp/JobRunner.java b/src/java/azkaban/execapp/JobRunner.java
index 28a96d1..dd9ad96 100644
--- a/src/java/azkaban/execapp/JobRunner.java
+++ b/src/java/azkaban/execapp/JobRunner.java
@@ -259,14 +259,14 @@ public class JobRunner extends EventHandler implements Runnable {
boolean quickFinish = false;
long time = System.currentTimeMillis();
- if (this.isKilled() || Status.isStatusFinished(nodeStatus)) {
+ if (Status.isStatusFinished(nodeStatus)) {
quickFinish = true;
}
else if (nodeStatus == Status.DISABLED) {
changeStatus(Status.SKIPPED, time);
quickFinish = true;
}
- else if (this.killed) {
+ else if (this.isKilled()) {
changeStatus(Status.KILLED, time);
quickFinish = true;
}
@@ -443,6 +443,9 @@ public class JobRunner extends EventHandler implements Runnable {
node.setEndTime(System.currentTimeMillis());
if (isKilled()) {
+ // even if it's killed, there is a chance that the job failed is marked as failure,
+ // So we set it to KILLED to make sure we know that we forced kill it rather than
+ // it being a legitimate failure.
changeStatus(Status.KILLED);
}
logInfo("Finishing job " + this.jobId + " at " + node.getEndTime() + " with status " + node.getStatus());
@@ -455,13 +458,13 @@ public class JobRunner extends EventHandler implements Runnable {
private boolean prepareJob() throws RuntimeException {
// Check pre conditions
- if (props == null || killed) {
+ if (props == null || this.isKilled()) {
logError("Failing job. The job properties don't exist");
return false;
}
synchronized (syncObject) {
- if (node.getStatus() == Status.FAILED || killed) {
+ if (node.getStatus() == Status.FAILED || this.isKilled()) {
return false;
}
@@ -559,6 +562,9 @@ public class JobRunner extends EventHandler implements Runnable {
public void kill() {
synchronized (syncObject) {
+ if (Status.isStatusFinished(node.getStatus())) {
+ return;
+ }
logError("Kill has been called.");
this.killed = true;
@@ -576,7 +582,7 @@ public class JobRunner extends EventHandler implements Runnable {
}
return;
}
-
+
try {
job.cancel();
}
@@ -584,6 +590,8 @@ public class JobRunner extends EventHandler implements Runnable {
logError(e.getMessage());
logError("Failed trying to cancel job. Maybe it hasn't started running yet or just finished.");
}
+
+ this.changeStatus(Status.KILLED);
}
}
@@ -654,6 +662,5 @@ public class JobRunner extends EventHandler implements Runnable {
}
return attempt > 0 ? "_job." + executionId + "." + attempt + "." + jobId + ".attach" : "_job." + executionId + "." + jobId + ".attach";
-
}
}
unit/java/azkaban/test/execapp/FlowRunnerTest2.java 468(+468 -0)
diff --git a/unit/java/azkaban/test/execapp/FlowRunnerTest2.java b/unit/java/azkaban/test/execapp/FlowRunnerTest2.java
index e5fa39f..9605c12 100644
--- a/unit/java/azkaban/test/execapp/FlowRunnerTest2.java
+++ b/unit/java/azkaban/test/execapp/FlowRunnerTest2.java
@@ -30,6 +30,47 @@ import azkaban.test.executor.JavaJob;
import azkaban.utils.DirectoryFlowLoader;
import azkaban.utils.Props;
+/**
+ * Test the flow run, especially with embedded flows.
+ *
+ * This test uses executions/embedded2. It also mainly uses the flow named jobf. The test is designed to
+ * control success/failures explicitly so we don't have to time the flow exactly.
+ *
+ * Flow jobf looks like the following:
+ *
+ *
+ * joba joba1
+ * / | \ |
+ * / | \ |
+ * jobb jobd jobc |
+ * \ | / /
+ * \ | / /
+ * jobe /
+ * | /
+ * | /
+ * jobf
+ *
+ * The job 'jobb' is an embedded flow:
+ *
+ * jobb:innerFlow
+ *
+ * innerJobA
+ * / \
+ * innerJobB innerJobC
+ * \ /
+ * innerFlow
+ *
+ *
+ * The job 'jobd' is a simple embedded flow:
+ *
+ * jobd:innerFlow2
+ *
+ * innerJobA
+ * |
+ * innerFlow2
+ *
+ * The following tests checks each stage of the flow run by forcing jobs to succeed or fail.
+ */
public class FlowRunnerTest2 {
private File workingDir;
private JobTypeManager jobtypeManager;
@@ -73,6 +114,12 @@ public class FlowRunnerTest2 {
}
}
+ /**
+ * Tests the basic successful flow run, and also tests all output variables from
+ * each job.
+ *
+ * @throws Exception
+ */
@Test
public void testBasicRun() throws Exception {
EventCollectorListener eventCollector = new EventCollectorListener();
@@ -229,6 +276,10 @@ public class FlowRunnerTest2 {
Assert.assertFalse(thread.isAlive());
}
+ /**
+ * Tests a flow with Disabled jobs and flows. They should properly SKIP executions
+ * @throws Exception
+ */
@Test
public void testDisabledNormal() throws Exception {
EventCollectorListener eventCollector = new EventCollectorListener();
@@ -294,6 +345,13 @@ public class FlowRunnerTest2 {
Assert.assertFalse(thread.isAlive());
}
+ /**
+ * Tests a failure with the default FINISH_CURRENTLY_RUNNING.
+ * After the first failure, every job that started should complete, and the
+ * rest of the jobs should be skipped.
+ *
+ * @throws Exception
+ */
@Test
public void testNormalFailure1() throws Exception {
// Test propagation of KILLED status to embedded flows.
@@ -338,6 +396,10 @@ public class FlowRunnerTest2 {
Assert.assertFalse(thread.isAlive());
}
+ /**
+ * Test #2 on the default failure case.
+ * @throws Exception
+ */
@Test
public void testNormalFailure2() throws Exception {
// Test propagation of KILLED status to embedded flows different branch
@@ -468,6 +530,13 @@ public class FlowRunnerTest2 {
Assert.assertFalse(thread.isAlive());
}
+ /**
+ * Tests failures when the fail behaviour is FINISH_ALL_POSSIBLE.
+ * In this case, all jobs which have had its pre-requisite met can continue to run.
+ * Finishes when the failure is propagated to the last node of the flow.
+ *
+ * @throws Exception
+ */
@Test
public void testFailedFinishingFailure3() throws Exception {
// Test propagation of KILLED status to embedded flows different branch
@@ -540,6 +609,14 @@ public class FlowRunnerTest2 {
Assert.assertFalse(thread.isAlive());
}
+ /**
+ * Tests the failure condition when a failure invokes a cancel (or killed) on the flow.
+ *
+ * Any jobs that are running will be assigned a KILLED state, and any nodes which were
+ * skipped due to prior errors will be given a CANCELLED state.
+ *
+ * @throws Exception
+ */
@Test
public void testCancelOnFailure() throws Exception {
// Test propagation of KILLED status to embedded flows different branch
@@ -598,6 +675,10 @@ public class FlowRunnerTest2 {
}
+ /**
+ * Tests retries after a failure
+ * @throws Exception
+ */
@Test
public void testRetryOnFailure() throws Exception {
// Test propagation of KILLED status to embedded flows different branch
@@ -700,6 +781,12 @@ public class FlowRunnerTest2 {
Assert.assertFalse(thread.isAlive());
}
+ /**
+ * Tests the manual Killing of a flow. In this case, the flow is just fine before the cancel
+ * is called.
+ *
+ * @throws Exception
+ */
@Test
public void testCancel() throws Exception {
// Test propagation of KILLED status to embedded flows different branch
@@ -758,6 +845,11 @@ public class FlowRunnerTest2 {
Assert.assertFalse(thread.isAlive());
}
+ /**
+ * Tests the manual invocation of cancel on a flow that is FAILED_FINISHING
+ *
+ * @throws Exception
+ */
@Test
public void testManualCancelOnFailure() throws Exception {
// Test propagation of KILLED status to embedded flows different branch
@@ -817,11 +909,387 @@ public class FlowRunnerTest2 {
expectedStateMap.put("jobe", Status.CANCELLED);
expectedStateMap.put("jobf", Status.CANCELLED);
+ Assert.assertEquals(Status.KILLED, flow.getStatus());
+ compareStates(expectedStateMap, nodeMap);
+ Assert.assertFalse(thread.isAlive());
+ }
+
+ /**
+ * Tests that pause and resume work
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testPause() throws Exception {
+ EventCollectorListener eventCollector = new EventCollectorListener();
+ FlowRunner runner = createFlowRunner(eventCollector, "jobf");
+
+ Map<String, Status> expectedStateMap = new HashMap<String, Status>();
+ Map<String, ExecutableNode> nodeMap = new HashMap<String, ExecutableNode>();
+
+ // 1. START FLOW
+ ExecutableFlow flow = runner.getExecutableFlow();
+ createExpectedStateMap(flow, expectedStateMap, nodeMap);
+ Thread thread = runFlowRunnerInThread(runner);
+ pause(250);
+
+ // After it starts up, only joba should be running
+ expectedStateMap.put("joba", Status.RUNNING);
+ expectedStateMap.put("joba1", Status.RUNNING);
+ compareStates(expectedStateMap, nodeMap);
+
+ runner.pause("test");
+ InteractiveTestJob.getTestJob("joba").succeedJob();
+ // 2.1 JOB A COMPLETES SUCCESSFULLY AFTER PAUSE
+ pause(250);
+ expectedStateMap.put("joba", Status.SUCCEEDED);
+ compareStates(expectedStateMap, nodeMap);
+ Assert.assertEquals(flow.getStatus(), Status.PAUSED);
+
+ // 2.2 Flow is unpaused
+ runner.resume("test");
+ pause(250);
+ Assert.assertEquals(flow.getStatus(), Status.RUNNING);
+ expectedStateMap.put("joba", Status.SUCCEEDED);
+ expectedStateMap.put("joba1", Status.RUNNING);
+ expectedStateMap.put("jobb", Status.RUNNING);
+ expectedStateMap.put("jobc", Status.RUNNING);
+ expectedStateMap.put("jobd", Status.RUNNING);
+ expectedStateMap.put("jobd:innerJobA", Status.RUNNING);
+ expectedStateMap.put("jobb:innerJobA", Status.RUNNING);
+ compareStates(expectedStateMap, nodeMap);
+
+ // 3. jobb:Inner completes
+ runner.pause("test");
+
+ /// innerJobA completes, but paused
+ InteractiveTestJob.getTestJob("jobb:innerJobA").succeedJob(Props.of("output.jobb.innerJobA", "jobb.innerJobA"));
+ pause(250);
+ expectedStateMap.put("jobb:innerJobA", Status.SUCCEEDED);
+ compareStates(expectedStateMap, nodeMap);
+
+ runner.resume("test");
+ pause(250);
+ expectedStateMap.put("jobb:innerJobB", Status.RUNNING);
+ expectedStateMap.put("jobb:innerJobC", Status.RUNNING);
+ compareStates(expectedStateMap, nodeMap);
+
+ /// innerJobB, C completes
+ InteractiveTestJob.getTestJob("jobb:innerJobB").succeedJob(Props.of("output.jobb.innerJobB", "jobb.innerJobB"));
+ InteractiveTestJob.getTestJob("jobb:innerJobC").succeedJob(Props.of("output.jobb.innerJobC", "jobb.innerJobC"));
+ pause(250);
+ expectedStateMap.put("jobb:innerJobB", Status.SUCCEEDED);
+ expectedStateMap.put("jobb:innerJobC", Status.SUCCEEDED);
+ expectedStateMap.put("jobb:innerFlow", Status.RUNNING);
+ compareStates(expectedStateMap, nodeMap);
+
+ // 4. Finish up on inner flow for jobb
+ InteractiveTestJob.getTestJob("jobb:innerFlow").succeedJob(Props.of("output1.jobb", "test1", "output2.jobb", "test2"));
+ pause(250);
+ expectedStateMap.put("jobb:innerFlow", Status.SUCCEEDED);
+ expectedStateMap.put("jobb", Status.SUCCEEDED);
+ compareStates(expectedStateMap, nodeMap);
+
+ // 5. Finish jobc, jobd
+ InteractiveTestJob.getTestJob("jobc").succeedJob(Props.of("output.jobc", "jobc"));
+ pause(250);
+ expectedStateMap.put("jobc", Status.SUCCEEDED);
+ compareStates(expectedStateMap, nodeMap);
+ InteractiveTestJob.getTestJob("jobd:innerJobA").succeedJob();
+ pause(250);
+ InteractiveTestJob.getTestJob("jobd:innerFlow2").succeedJob();
+ pause(250);
+ expectedStateMap.put("jobd:innerJobA", Status.SUCCEEDED);
+ expectedStateMap.put("jobd:innerFlow2", Status.SUCCEEDED);
+ expectedStateMap.put("jobd", Status.SUCCEEDED);
+ expectedStateMap.put("jobe", Status.RUNNING);
+ compareStates(expectedStateMap, nodeMap);
+
+ // 6. Finish off flow
+ InteractiveTestJob.getTestJob("joba1").succeedJob();
+ pause(250);
+ InteractiveTestJob.getTestJob("jobe").succeedJob();
+ pause(250);
+ expectedStateMap.put("joba1", Status.SUCCEEDED);
+ expectedStateMap.put("jobe", Status.SUCCEEDED);
+ expectedStateMap.put("jobf", Status.RUNNING);
+ compareStates(expectedStateMap, nodeMap);
+
+ InteractiveTestJob.getTestJob("jobf").succeedJob();
+ pause(250);
+ expectedStateMap.put("jobf", Status.SUCCEEDED);
+ compareStates(expectedStateMap, nodeMap);
+ Assert.assertEquals(Status.SUCCEEDED, flow.getStatus());
+
+ Assert.assertFalse(thread.isAlive());
+ }
+
+ /**
+ * Test the condition for a manual invocation of a KILL (cancel) on a flow that
+ * has been paused. The flow should unpause and be killed immediately.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testPauseKill() throws Exception {
+ EventCollectorListener eventCollector = new EventCollectorListener();
+ FlowRunner runner = createFlowRunner(eventCollector, "jobf");
+
+ Map<String, Status> expectedStateMap = new HashMap<String, Status>();
+ Map<String, ExecutableNode> nodeMap = new HashMap<String, ExecutableNode>();
+
+ // 1. START FLOW
+ ExecutableFlow flow = runner.getExecutableFlow();
+ createExpectedStateMap(flow, expectedStateMap, nodeMap);
+ Thread thread = runFlowRunnerInThread(runner);
+ pause(250);
+
+ // After it starts up, only joba should be running
+ expectedStateMap.put("joba", Status.RUNNING);
+ expectedStateMap.put("joba1", Status.RUNNING);
+ compareStates(expectedStateMap, nodeMap);
+
+ // 2. JOB A COMPLETES SUCCESSFULLY
+ InteractiveTestJob.getTestJob("joba").succeedJob();
+ pause(250);
+ expectedStateMap.put("joba", Status.SUCCEEDED);
+ expectedStateMap.put("joba1", Status.RUNNING);
+ expectedStateMap.put("jobb", Status.RUNNING);
+ expectedStateMap.put("jobc", Status.RUNNING);
+ expectedStateMap.put("jobd", Status.RUNNING);
+ expectedStateMap.put("jobd:innerJobA", Status.RUNNING);
+ expectedStateMap.put("jobb:innerJobA", Status.RUNNING);
+ compareStates(expectedStateMap, nodeMap);
+
+ runner.pause("me");
+ pause(250);
+ Assert.assertEquals(flow.getStatus(), Status.PAUSED);
+ InteractiveTestJob.getTestJob("jobb:innerJobA").succeedJob();
+ InteractiveTestJob.getTestJob("jobd:innerJobA").succeedJob();
+ pause(250);
+ expectedStateMap.put("jobb:innerJobA", Status.SUCCEEDED);
+ expectedStateMap.put("jobd:innerJobA", Status.SUCCEEDED);
+ compareStates(expectedStateMap, nodeMap);
+
+ runner.kill("me");
+ pause(250);
+ expectedStateMap.put("joba1", Status.KILLED);
+ expectedStateMap.put("jobb:innerJobB", Status.CANCELLED);
+ expectedStateMap.put("jobb:innerJobC", Status.CANCELLED);
+ expectedStateMap.put("jobb:innerFlow", Status.CANCELLED);
+ expectedStateMap.put("jobb", Status.KILLED);
+ expectedStateMap.put("jobc", Status.KILLED);
+ expectedStateMap.put("jobd:innerFlow2", Status.CANCELLED);
+ expectedStateMap.put("jobd", Status.KILLED);
+ expectedStateMap.put("jobe", Status.CANCELLED);
+ expectedStateMap.put("jobf", Status.CANCELLED);
+
+ compareStates(expectedStateMap, nodeMap);
+ Assert.assertEquals(Status.KILLED, flow.getStatus());
+ Assert.assertFalse(thread.isAlive());
+ }
+
+ /**
+ * Tests the case where a failure occurs on a Paused flow. In this case, the flow should stay paused.
+ * @throws Exception
+ */
+ @Test
+ public void testPauseFail() throws Exception {
+ EventCollectorListener eventCollector = new EventCollectorListener();
+ FlowRunner runner = createFlowRunner(eventCollector, "jobf", FailureAction.FINISH_CURRENTLY_RUNNING);
+
+ Map<String, Status> expectedStateMap = new HashMap<String, Status>();
+ Map<String, ExecutableNode> nodeMap = new HashMap<String, ExecutableNode>();
+
+ // 1. START FLOW
+ ExecutableFlow flow = runner.getExecutableFlow();
+ createExpectedStateMap(flow, expectedStateMap, nodeMap);
+ Thread thread = runFlowRunnerInThread(runner);
+ pause(250);
+
+ // After it starts up, only joba should be running
+ expectedStateMap.put("joba", Status.RUNNING);
+ expectedStateMap.put("joba1", Status.RUNNING);
+ compareStates(expectedStateMap, nodeMap);
+
+ // 2. JOB A COMPLETES SUCCESSFULLY
+ InteractiveTestJob.getTestJob("joba").succeedJob();
+ pause(250);
+ expectedStateMap.put("joba", Status.SUCCEEDED);
+ expectedStateMap.put("joba1", Status.RUNNING);
+ expectedStateMap.put("jobb", Status.RUNNING);
+ expectedStateMap.put("jobc", Status.RUNNING);
+ expectedStateMap.put("jobd", Status.RUNNING);
+ expectedStateMap.put("jobd:innerJobA", Status.RUNNING);
+ expectedStateMap.put("jobb:innerJobA", Status.RUNNING);
+ compareStates(expectedStateMap, nodeMap);
+
+ runner.pause("me");
+ pause(250);
+ Assert.assertEquals(flow.getStatus(), Status.PAUSED);
+ InteractiveTestJob.getTestJob("jobb:innerJobA").succeedJob();
+ InteractiveTestJob.getTestJob("jobd:innerJobA").failJob();
+ pause(250);
+ expectedStateMap.put("jobd:innerJobA", Status.FAILED);
+ expectedStateMap.put("jobb:innerJobA", Status.SUCCEEDED);
+ compareStates(expectedStateMap, nodeMap);
+ Assert.assertEquals(flow.getStatus(), Status.PAUSED);
+
+ runner.resume("me");
+ pause(250);
+ expectedStateMap.put("jobb:innerJobB", Status.CANCELLED);
+ expectedStateMap.put("jobb:innerJobC", Status.CANCELLED);
+ expectedStateMap.put("jobb:innerFlow", Status.CANCELLED);
+ expectedStateMap.put("jobb", Status.KILLED);
+ expectedStateMap.put("jobd:innerFlow2", Status.CANCELLED);
+ expectedStateMap.put("jobd", Status.FAILED);
+
+ InteractiveTestJob.getTestJob("jobc").succeedJob();
+ InteractiveTestJob.getTestJob("joba1").succeedJob();
+ pause(250);
+ expectedStateMap.put("jobc", Status.SUCCEEDED);
+ expectedStateMap.put("joba1", Status.SUCCEEDED);
+ expectedStateMap.put("jobf", Status.CANCELLED);
+ expectedStateMap.put("jobe", Status.CANCELLED);
+
+ compareStates(expectedStateMap, nodeMap);
Assert.assertEquals(Status.FAILED, flow.getStatus());
+ Assert.assertFalse(thread.isAlive());
+ }
+
+ /**
+ * Test the condition when a Finish all possible is called during a pause.
+ * The Failure is not acted upon until the flow is resumed.
+ * @throws Exception
+ */
+ @Test
+ public void testPauseFailFinishAll() throws Exception {
+ EventCollectorListener eventCollector = new EventCollectorListener();
+ FlowRunner runner = createFlowRunner(eventCollector, "jobf", FailureAction.FINISH_ALL_POSSIBLE);
+
+ Map<String, Status> expectedStateMap = new HashMap<String, Status>();
+ Map<String, ExecutableNode> nodeMap = new HashMap<String, ExecutableNode>();
+
+ // 1. START FLOW
+ ExecutableFlow flow = runner.getExecutableFlow();
+ createExpectedStateMap(flow, expectedStateMap, nodeMap);
+ Thread thread = runFlowRunnerInThread(runner);
+ pause(250);
+
+ // After it starts up, only joba should be running
+ expectedStateMap.put("joba", Status.RUNNING);
+ expectedStateMap.put("joba1", Status.RUNNING);
compareStates(expectedStateMap, nodeMap);
+
+ // 2. JOB A COMPLETES SUCCESSFULLY
+ InteractiveTestJob.getTestJob("joba").succeedJob();
+ pause(250);
+ expectedStateMap.put("joba", Status.SUCCEEDED);
+ expectedStateMap.put("joba1", Status.RUNNING);
+ expectedStateMap.put("jobb", Status.RUNNING);
+ expectedStateMap.put("jobc", Status.RUNNING);
+ expectedStateMap.put("jobd", Status.RUNNING);
+ expectedStateMap.put("jobd:innerJobA", Status.RUNNING);
+ expectedStateMap.put("jobb:innerJobA", Status.RUNNING);
+ compareStates(expectedStateMap, nodeMap);
+
+ runner.pause("me");
+ pause(250);
+ Assert.assertEquals(flow.getStatus(), Status.PAUSED);
+ InteractiveTestJob.getTestJob("jobb:innerJobA").succeedJob();
+ InteractiveTestJob.getTestJob("jobd:innerJobA").failJob();
+ pause(250);
+ expectedStateMap.put("jobd:innerJobA", Status.FAILED);
+ expectedStateMap.put("jobb:innerJobA", Status.SUCCEEDED);
+ compareStates(expectedStateMap, nodeMap);
+
+ runner.resume("me");
+ pause(250);
+ expectedStateMap.put("jobb:innerJobB", Status.RUNNING);
+ expectedStateMap.put("jobb:innerJobC", Status.RUNNING);
+ expectedStateMap.put("jobd:innerFlow2", Status.CANCELLED);
+ expectedStateMap.put("jobd", Status.FAILED);
+
+ InteractiveTestJob.getTestJob("jobc").succeedJob();
+ InteractiveTestJob.getTestJob("joba1").succeedJob();
+ InteractiveTestJob.getTestJob("jobb:innerJobB").succeedJob();
+ InteractiveTestJob.getTestJob("jobb:innerJobC").succeedJob();
+ pause(250);
+ InteractiveTestJob.getTestJob("jobb:innerFlow").succeedJob();
+ pause(250);
+ expectedStateMap.put("jobc", Status.SUCCEEDED);
+ expectedStateMap.put("joba1", Status.SUCCEEDED);
+ expectedStateMap.put("jobb:innerJobB", Status.SUCCEEDED);
+ expectedStateMap.put("jobb:innerJobC", Status.SUCCEEDED);
+ expectedStateMap.put("jobb:innerFlow", Status.SUCCEEDED);
+ expectedStateMap.put("jobb", Status.SUCCEEDED);
+ expectedStateMap.put("jobe", Status.CANCELLED);
+ expectedStateMap.put("jobf", Status.CANCELLED);
+
+ compareStates(expectedStateMap, nodeMap);
+ Assert.assertEquals(Status.FAILED, flow.getStatus());
Assert.assertFalse(thread.isAlive());
}
+ /**
+ * Tests the case when a flow is paused and a failure causes a kill. The
+ * flow should die immediately regardless of the 'paused' status.
+ * @throws Exception
+ */
+ @Test
+ public void testPauseFailKill() throws Exception {
+ EventCollectorListener eventCollector = new EventCollectorListener();
+ FlowRunner runner = createFlowRunner(eventCollector, "jobf", FailureAction.CANCEL_ALL);
+
+ Map<String, Status> expectedStateMap = new HashMap<String, Status>();
+ Map<String, ExecutableNode> nodeMap = new HashMap<String, ExecutableNode>();
+
+ // 1. START FLOW
+ ExecutableFlow flow = runner.getExecutableFlow();
+ createExpectedStateMap(flow, expectedStateMap, nodeMap);
+ Thread thread = runFlowRunnerInThread(runner);
+ pause(250);
+ // After it starts up, only joba should be running
+ expectedStateMap.put("joba", Status.RUNNING);
+ expectedStateMap.put("joba1", Status.RUNNING);
+ compareStates(expectedStateMap, nodeMap);
+
+ // 2. JOB A COMPLETES SUCCESSFULLY
+ InteractiveTestJob.getTestJob("joba").succeedJob();
+ pause(500);
+ expectedStateMap.put("joba", Status.SUCCEEDED);
+ expectedStateMap.put("joba1", Status.RUNNING);
+ expectedStateMap.put("jobb", Status.RUNNING);
+ expectedStateMap.put("jobc", Status.RUNNING);
+ expectedStateMap.put("jobd", Status.RUNNING);
+ expectedStateMap.put("jobd:innerJobA", Status.RUNNING);
+ expectedStateMap.put("jobb:innerJobA", Status.RUNNING);
+ compareStates(expectedStateMap, nodeMap);
+
+ runner.pause("me");
+ pause(250);
+ Assert.assertEquals(flow.getStatus(), Status.PAUSED);
+ InteractiveTestJob.getTestJob("jobd:innerJobA").failJob();
+ pause(250);
+ expectedStateMap.put("jobd:innerJobA", Status.FAILED);
+ expectedStateMap.put("jobd:innerFlow2", Status.CANCELLED);
+ expectedStateMap.put("jobd", Status.FAILED);
+ expectedStateMap.put("jobb:innerJobA", Status.KILLED);
+ expectedStateMap.put("jobb:innerJobB", Status.CANCELLED);
+ expectedStateMap.put("jobb:innerJobC", Status.CANCELLED);
+ expectedStateMap.put("jobb:innerFlow", Status.CANCELLED);
+ expectedStateMap.put("jobb", Status.KILLED);
+ expectedStateMap.put("jobc", Status.KILLED);
+ expectedStateMap.put("jobe", Status.CANCELLED);
+ expectedStateMap.put("jobf", Status.CANCELLED);
+ expectedStateMap.put("joba1", Status.KILLED);
+ compareStates(expectedStateMap, nodeMap);
+
+ Assert.assertEquals(Status.FAILED, flow.getStatus());
+ Assert.assertFalse(thread.isAlive());
+ }
+
+
private Thread runFlowRunnerInThread(FlowRunner runner) {
Thread thread = new Thread(runner);
thread.start();