azkaban-uncached
Changes
src/java/azkaban/execapp/FlowRunner.java 58(+34 -24)
unit/java/azkaban/test/execapp/FlowRunnerTest2.java 377(+376 -1)
Details
src/java/azkaban/execapp/FlowRunner.java 58(+34 -24)
diff --git a/src/java/azkaban/execapp/FlowRunner.java b/src/java/azkaban/execapp/FlowRunner.java
index 5cb7188..f4d7ff7 100644
--- a/src/java/azkaban/execapp/FlowRunner.java
+++ b/src/java/azkaban/execapp/FlowRunner.java
@@ -368,11 +368,13 @@ public class FlowRunner extends EventHandler implements Runnable {
if (nextStatus == Status.KILLED) {
logger.info("Killing " + node.getId() + " due to prior errors.");
node.killNode(currentTime);
+ finalizeFlowIfFinished(node.getParentFlow());
fireEventListeners(Event.create(this, Type.JOB_FINISHED, node));
}
else if (nextStatus == Status.DISABLED) {
logger.info("Skipping disabled job " + node.getId() + ".");
node.skipNode(currentTime);
+ finalizeFlowIfFinished(node.getParentFlow());
fireEventListeners(Event.create(this, Type.JOB_FINISHED, node));
}
else {
@@ -758,13 +760,12 @@ public class FlowRunner extends EventHandler implements Runnable {
flowFailed = true;
- ExecutionOptions options = flow.getExecutionOptions();
// The KILLED status occurs when cancel is invoked. We want to keep this
// status even in failure conditions.
if (flow.getStatus() != Status.KILLED && flow.getStatus() != Status.FAILED) {
propagateStatus(node.getParentFlow(), Status.FAILED_FINISHING);
- if (options.getFailureAction() == FailureAction.CANCEL_ALL && !flowCancelled) {
+ if (failureAction == FailureAction.CANCEL_ALL && !flowCancelled) {
logger.info("Flow failed. Failure option is Cancel All. Stopping execution.");
cancel();
}
@@ -786,35 +787,44 @@ public class FlowRunner extends EventHandler implements Runnable {
propagateStatus(base.getParentFlow(), status);
}
}
+ }
- private void finalizeFlowIfFinished(ExecutableFlowBase base) {
- // We let main thread finalize the main flow.
- if (base == flow) {
- return;
- }
-
- if (base.isFlowFinished()) {
- Props previousOutput = null;
- for(String end: base.getEndNodes()) {
- ExecutableNode node = base.getExecutableNode(end);
-
- Props output = node.getOutputProps();
- if (output != null) {
- output = Props.clone(output);
- output.setParent(previousOutput);
- previousOutput = output;
- }
+ private void finalizeFlowIfFinished(ExecutableFlowBase base) {
+ // We let main thread finalize the main flow.
+ if (base == flow) {
+ return;
+ }
+
+ if (base.isFlowFinished()) {
+ boolean succeeded = true;
+ Props previousOutput = null;
+ for(String end: base.getEndNodes()) {
+ ExecutableNode node = base.getExecutableNode(end);
+
+ if (node.getStatus() == Status.KILLED) {
+ succeeded = false;
}
- base.setOutputProps(previousOutput);
- finalizeFlow(base);
- if (base.getParentFlow() != null) {
- finalizeFlowIfFinished(base.getParentFlow());
+ Props output = node.getOutputProps();
+ if (output != null) {
+ output = Props.clone(output);
+ output.setParent(previousOutput);
+ previousOutput = output;
}
}
+
+ if (!succeeded && (base.getStatus() == Status.RUNNING)) {
+ base.setStatus(Status.KILLED);
+ }
+ base.setOutputProps(previousOutput);
+ finalizeFlow(base);
+
+ if (base.getParentFlow() != null) {
+ finalizeFlowIfFinished(base.getParentFlow());
+ }
}
}
-
+
public boolean isCancelled() {
return flowCancelled;
}
diff --git a/src/java/azkaban/executor/ExecutorManager.java b/src/java/azkaban/executor/ExecutorManager.java
index 74bba73..29a93a3 100644
--- a/src/java/azkaban/executor/ExecutorManager.java
+++ b/src/java/azkaban/executor/ExecutorManager.java
@@ -377,29 +377,43 @@ public class ExecutorManager {
options = new ExecutionOptions();
}
+ String message = "";
if (options.getDisabledJobs() != null) {
// Disable jobs
for(String disabledId : options.getDisabledJobs()) {
- ExecutableNode node = exflow.getExecutableNode(disabledId);
+ String[] splits = disabledId.split(":");
+ ExecutableNode node = exflow;
+
+ for (String split: splits) {
+ if (node instanceof ExecutableFlowBase) {
+ node = ((ExecutableFlowBase)node).getExecutableNode(split);
+ }
+ else {
+ message = "Cannot disable job " + disabledId + " since flow " + split + " cannot be found. \n";
+ }
+ }
+
+ if (node == null) {
+ throw new ExecutorManagerException("Cannot disable job " + disabledId + ". Cannot find corresponding node.");
+ }
node.setStatus(Status.DISABLED);
}
}
- String message = "";
if (!running.isEmpty()) {
if (options.getConcurrentOption().equals(ExecutionOptions.CONCURRENT_OPTION_PIPELINE)) {
Collections.sort(running);
Integer runningExecId = running.get(running.size() - 1);
options.setPipelineExecutionId(runningExecId);
- message = "Flow " + flowId + " is already running with exec id " + runningExecId +". Pipelining level " + options.getPipelineLevel() + ". ";
+ message = "Flow " + flowId + " is already running with exec id " + runningExecId +". Pipelining level " + options.getPipelineLevel() + ". \n";
}
else if (options.getConcurrentOption().equals(ExecutionOptions.CONCURRENT_OPTION_SKIP)) {
throw new ExecutorManagerException("Flow " + flowId + " is already running. Skipping execution.", ExecutorManagerException.Reason.SkippedExecution);
}
else {
// The settings is to run anyways.
- message = "Flow " + flowId + " is already running with exec id " + StringUtils.join(running, ",") +". Will execute concurrently. ";
+ message = "Flow " + flowId + " is already running with exec id " + StringUtils.join(running, ",") +". Will execute concurrently. \n";
}
}
unit/java/azkaban/test/execapp/FlowRunnerTest2.java 377(+376 -1)
diff --git a/unit/java/azkaban/test/execapp/FlowRunnerTest2.java b/unit/java/azkaban/test/execapp/FlowRunnerTest2.java
index 909d490..6f794d0 100644
--- a/unit/java/azkaban/test/execapp/FlowRunnerTest2.java
+++ b/unit/java/azkaban/test/execapp/FlowRunnerTest2.java
@@ -17,6 +17,7 @@ import azkaban.execapp.FlowRunner;
import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutableFlowBase;
import azkaban.executor.ExecutableNode;
+import azkaban.executor.ExecutionOptions.FailureAction;
import azkaban.executor.ExecutorLoader;
import azkaban.executor.Status;
import azkaban.flow.Flow;
@@ -83,7 +84,7 @@ public class FlowRunnerTest2 {
// 1. START FLOW
ExecutableFlow flow = runner.getExecutableFlow();
createExpectedStateMap(flow, expectedStateMap, nodeMap);
- runFlowRunnerInThread(runner);
+ Thread thread = runFlowRunnerInThread(runner);
pause(250);
// After it starts up, only joba should be running
@@ -222,6 +223,375 @@ public class FlowRunnerTest2 {
expectedStateMap.put("jobf", Status.SUCCEEDED);
compareStates(expectedStateMap, nodeMap);
Assert.assertEquals(Status.SUCCEEDED, flow.getStatus());
+ Assert.assertFalse(thread.isAlive());
+ }
+
+ @Test
+ public void testDisabledNormal() throws Exception {
+ EventCollectorListener eventCollector = new EventCollectorListener();
+ FlowRunner runner = createFlowRunner(eventCollector, "jobf");
+ ExecutableFlow flow = runner.getExecutableFlow();
+ Map<String, Status> expectedStateMap = new HashMap<String, Status>();
+ Map<String, ExecutableNode> nodeMap = new HashMap<String, ExecutableNode>();
+ flow.getExecutableNode("jobb").setStatus(Status.DISABLED);
+ ((ExecutableFlowBase)flow.getExecutableNode("jobd")).getExecutableNode("innerJobA").setStatus(Status.DISABLED);
+
+ // 1. START FLOW
+ createExpectedStateMap(flow, expectedStateMap, nodeMap);
+ Thread thread = runFlowRunnerInThread(runner);
+ pause(250);
+
+ // After it starts up, only joba should be running
+ expectedStateMap.put("joba", Status.RUNNING);
+ expectedStateMap.put("joba1", Status.RUNNING);
+ compareStates(expectedStateMap, nodeMap);
+
+ // 2. JOB A COMPLETES SUCCESSFULLY, others should be skipped
+ InteractiveTestJob.getTestJob("joba").succeedJob();
+ pause(250);
+ expectedStateMap.put("joba", Status.SUCCEEDED);
+ expectedStateMap.put("joba1", Status.RUNNING);
+ expectedStateMap.put("jobb", Status.SKIPPED);
+ expectedStateMap.put("jobc", Status.RUNNING);
+ expectedStateMap.put("jobd", Status.RUNNING);
+ expectedStateMap.put("jobd:innerJobA", Status.SKIPPED);
+ expectedStateMap.put("jobd:innerFlow2", Status.RUNNING);
+ expectedStateMap.put("jobb:innerJobA", Status.READY);
+ expectedStateMap.put("jobb:innerJobB", Status.READY);
+ expectedStateMap.put("jobb:innerJobC", Status.READY);
+ expectedStateMap.put("jobb:innerFlow", Status.READY);
+ compareStates(expectedStateMap, nodeMap);
+
+ // 3. jobb:Inner completes
+ /// innerJobA completes
+ InteractiveTestJob.getTestJob("jobc").succeedJob();
+ InteractiveTestJob.getTestJob("jobd:innerFlow2").succeedJob();
+ pause(250);
+ expectedStateMap.put("jobd:innerFlow2", Status.SUCCEEDED);
+ expectedStateMap.put("jobd", Status.SUCCEEDED);
+ expectedStateMap.put("jobc", Status.SUCCEEDED);
+ expectedStateMap.put("jobe", Status.RUNNING);
+ compareStates(expectedStateMap, nodeMap);
+
+ InteractiveTestJob.getTestJob("jobe").succeedJob();
+ InteractiveTestJob.getTestJob("joba1").succeedJob();
+ pause(250);
+ expectedStateMap.put("jobe", Status.SUCCEEDED);
+ expectedStateMap.put("joba1", Status.SUCCEEDED);
+ expectedStateMap.put("jobf", Status.RUNNING);
+ compareStates(expectedStateMap, nodeMap);
+
+ // 4. Finish up on inner flow for jobb
+ InteractiveTestJob.getTestJob("jobf").succeedJob();
+ pause(250);
+ expectedStateMap.put("jobf", Status.SUCCEEDED);
+ compareStates(expectedStateMap, nodeMap);
+
+ Assert.assertEquals(Status.SUCCEEDED, flow.getStatus());
+ Assert.assertFalse(thread.isAlive());
+ }
+
+ @Test
+ public void testNormalFailure1() throws Exception {
+ // Test propagation of KILLED status to embedded flows.
+ EventCollectorListener eventCollector = new EventCollectorListener();
+ FlowRunner runner = createFlowRunner(eventCollector, "jobf");
+ ExecutableFlow flow = runner.getExecutableFlow();
+ Map<String, Status> expectedStateMap = new HashMap<String, Status>();
+ Map<String, ExecutableNode> nodeMap = new HashMap<String, ExecutableNode>();
+
+ // 1. START FLOW
+ createExpectedStateMap(flow, expectedStateMap, nodeMap);
+ Thread thread = runFlowRunnerInThread(runner);
+ pause(250);
+
+ // After it starts up, only joba should be running
+ expectedStateMap.put("joba", Status.RUNNING);
+ expectedStateMap.put("joba1", Status.RUNNING);
+ compareStates(expectedStateMap, nodeMap);
+
+ // 2. JOB A COMPLETES SUCCESSFULLY, others should be skipped
+ InteractiveTestJob.getTestJob("joba").failJob();
+ pause(250);
+ Assert.assertEquals(Status.FAILED_FINISHING, flow.getStatus());
+ expectedStateMap.put("joba", Status.FAILED);
+ expectedStateMap.put("joba1", Status.RUNNING);
+ expectedStateMap.put("jobb", Status.KILLED);
+ expectedStateMap.put("jobc", Status.KILLED);
+ expectedStateMap.put("jobd", Status.KILLED);
+ expectedStateMap.put("jobd:innerJobA", Status.READY);
+ expectedStateMap.put("jobd:innerFlow2", Status.READY);
+ expectedStateMap.put("jobb:innerJobA", Status.READY);
+ expectedStateMap.put("jobb:innerFlow", Status.READY);
+ expectedStateMap.put("jobe", Status.KILLED);
+ compareStates(expectedStateMap, nodeMap);
+
+ // 3. jobb:Inner completes
+ /// innerJobA completes
+ InteractiveTestJob.getTestJob("joba1").succeedJob();
+ pause(250);
+ expectedStateMap.put("jobf", Status.KILLED);
+ Assert.assertEquals(Status.FAILED, flow.getStatus());
+ Assert.assertFalse(thread.isAlive());
+ }
+
+ @Test
+ public void testNormalFailure2() throws Exception {
+ // Test propagation of KILLED status to embedded flows different branch
+ EventCollectorListener eventCollector = new EventCollectorListener();
+ FlowRunner runner = createFlowRunner(eventCollector, "jobf");
+ ExecutableFlow flow = runner.getExecutableFlow();
+ Map<String, Status> expectedStateMap = new HashMap<String, Status>();
+ Map<String, ExecutableNode> nodeMap = new HashMap<String, ExecutableNode>();
+
+ // 1. START FLOW
+ createExpectedStateMap(flow, expectedStateMap, nodeMap);
+ Thread thread = runFlowRunnerInThread(runner);
+ pause(250);
+
+ // After it starts up, only joba should be running
+ expectedStateMap.put("joba", Status.RUNNING);
+ expectedStateMap.put("joba1", Status.RUNNING);
+ compareStates(expectedStateMap, nodeMap);
+
+ // 2. JOB A COMPLETES SUCCESSFULLY, others should be skipped
+ InteractiveTestJob.getTestJob("joba").succeedJob();
+ pause(250);
+ expectedStateMap.put("joba", Status.SUCCEEDED);
+ expectedStateMap.put("jobb", Status.RUNNING);
+ expectedStateMap.put("jobc", Status.RUNNING);
+ expectedStateMap.put("jobd", Status.RUNNING);
+ expectedStateMap.put("jobb:innerJobA", Status.RUNNING);
+ expectedStateMap.put("jobc", Status.RUNNING);
+ expectedStateMap.put("jobd:innerJobA", Status.RUNNING);
+
+ InteractiveTestJob.getTestJob("joba1").failJob();
+ pause(250);
+ expectedStateMap.put("joba1", Status.FAILED);
+ compareStates(expectedStateMap, nodeMap);
+
+ // 3. joba completes, everything is killed
+ InteractiveTestJob.getTestJob("jobb:innerJobA").succeedJob();
+ InteractiveTestJob.getTestJob("jobd:innerJobA").succeedJob();
+ pause(250);
+ expectedStateMap.put("jobb:innerJobA", Status.SUCCEEDED);
+ expectedStateMap.put("jobd:innerJobA", Status.SUCCEEDED);
+ expectedStateMap.put("jobb:innerJobB", Status.KILLED);
+ expectedStateMap.put("jobb:innerJobC", Status.KILLED);
+ expectedStateMap.put("jobb:innerFlow", Status.KILLED);
+ expectedStateMap.put("jobd:innerFlow2", Status.KILLED);
+ expectedStateMap.put("jobb", Status.KILLED);
+ expectedStateMap.put("jobd", Status.KILLED);
+ compareStates(expectedStateMap, nodeMap);
+ Assert.assertEquals(Status.FAILED_FINISHING, flow.getStatus());
+
+ InteractiveTestJob.getTestJob("jobc").succeedJob();
+ pause(250);
+ expectedStateMap.put("jobc", Status.SUCCEEDED);
+ expectedStateMap.put("jobe", Status.KILLED);
+ expectedStateMap.put("jobf", Status.KILLED);
+ compareStates(expectedStateMap, nodeMap);
+ Assert.assertEquals(Status.FAILED, flow.getStatus());
+
+ Assert.assertFalse(thread.isAlive());
+ }
+
+ @Test
+ public void testNormalFailure3() throws Exception {
+ // Test propagation of KILLED status to embedded flows different branch
+ EventCollectorListener eventCollector = new EventCollectorListener();
+ FlowRunner runner = createFlowRunner(eventCollector, "jobf");
+ ExecutableFlow flow = runner.getExecutableFlow();
+ Map<String, Status> expectedStateMap = new HashMap<String, Status>();
+ Map<String, ExecutableNode> nodeMap = new HashMap<String, ExecutableNode>();
+
+ // 1. START FLOW
+ createExpectedStateMap(flow, expectedStateMap, nodeMap);
+ Thread thread = runFlowRunnerInThread(runner);
+ pause(250);
+
+ // After it starts up, only joba should be running
+ expectedStateMap.put("joba", Status.RUNNING);
+ expectedStateMap.put("joba1", Status.RUNNING);
+ compareStates(expectedStateMap, nodeMap);
+
+ // 2. JOB in subflow FAILS
+ InteractiveTestJob.getTestJob("joba").succeedJob();
+ pause(250);
+ expectedStateMap.put("joba", Status.SUCCEEDED);
+ expectedStateMap.put("jobb", Status.RUNNING);
+ expectedStateMap.put("jobc", Status.RUNNING);
+ expectedStateMap.put("jobd", Status.RUNNING);
+ expectedStateMap.put("jobb:innerJobA", Status.RUNNING);
+ expectedStateMap.put("jobd:innerJobA", Status.RUNNING);
+ compareStates(expectedStateMap, nodeMap);
+
+ InteractiveTestJob.getTestJob("joba1").succeedJob();
+ InteractiveTestJob.getTestJob("jobb:innerJobA").succeedJob();
+ pause(250);
+ expectedStateMap.put("jobb:innerJobA", Status.SUCCEEDED);
+ expectedStateMap.put("joba1", Status.SUCCEEDED);
+ expectedStateMap.put("jobb:innerJobB", Status.RUNNING);
+ expectedStateMap.put("jobb:innerJobC", Status.RUNNING);
+ compareStates(expectedStateMap, nodeMap);
+
+ InteractiveTestJob.getTestJob("jobb:innerJobB").failJob();
+ pause(250);
+ expectedStateMap.put("jobb", Status.FAILED_FINISHING);
+ expectedStateMap.put("jobb:innerJobB", Status.FAILED);
+ Assert.assertEquals(Status.FAILED_FINISHING, flow.getStatus());
+ compareStates(expectedStateMap, nodeMap);
+
+ InteractiveTestJob.getTestJob("jobb:innerJobC").succeedJob();
+ InteractiveTestJob.getTestJob("jobd:innerJobA").succeedJob();
+ pause(250);
+ expectedStateMap.put("jobd:innerJobA", Status.SUCCEEDED);
+ expectedStateMap.put("jobd:innerFlow2", Status.KILLED);
+ expectedStateMap.put("jobd", Status.KILLED);
+ expectedStateMap.put("jobb:innerJobC", Status.SUCCEEDED);
+ expectedStateMap.put("jobb:innerFlow", Status.KILLED);
+ expectedStateMap.put("jobb", Status.FAILED);
+ compareStates(expectedStateMap, nodeMap);
+
+ // 3. jobc completes, everything is killed
+ InteractiveTestJob.getTestJob("jobc").succeedJob();
+ pause(250);
+ expectedStateMap.put("jobc", Status.SUCCEEDED);
+ expectedStateMap.put("jobe", Status.KILLED);
+ expectedStateMap.put("jobf", Status.KILLED);
+ compareStates(expectedStateMap, nodeMap);
+ Assert.assertEquals(Status.FAILED, flow.getStatus());
+
+ Assert.assertFalse(thread.isAlive());
+ }
+
+ @Test
+ public void testFailedFinishingFailure3() throws Exception {
+ // Test propagation of KILLED status to embedded flows different branch
+ EventCollectorListener eventCollector = new EventCollectorListener();
+ FlowRunner runner = createFlowRunner(eventCollector, "jobf", FailureAction.FINISH_ALL_POSSIBLE);
+ ExecutableFlow flow = runner.getExecutableFlow();
+ Map<String, Status> expectedStateMap = new HashMap<String, Status>();
+ Map<String, ExecutableNode> nodeMap = new HashMap<String, ExecutableNode>();
+
+ // 1. START FLOW
+ createExpectedStateMap(flow, expectedStateMap, nodeMap);
+ Thread thread = runFlowRunnerInThread(runner);
+ pause(250);
+
+ // After it starts up, only joba should be running
+ expectedStateMap.put("joba", Status.RUNNING);
+ expectedStateMap.put("joba1", Status.RUNNING);
+ compareStates(expectedStateMap, nodeMap);
+
+ // 2. JOB in subflow FAILS
+ InteractiveTestJob.getTestJob("joba").succeedJob();
+ pause(250);
+ expectedStateMap.put("joba", Status.SUCCEEDED);
+ expectedStateMap.put("jobb", Status.RUNNING);
+ expectedStateMap.put("jobc", Status.RUNNING);
+ expectedStateMap.put("jobd", Status.RUNNING);
+ expectedStateMap.put("jobb:innerJobA", Status.RUNNING);
+ expectedStateMap.put("jobd:innerJobA", Status.RUNNING);
+ compareStates(expectedStateMap, nodeMap);
+
+ InteractiveTestJob.getTestJob("joba1").succeedJob();
+ InteractiveTestJob.getTestJob("jobb:innerJobA").succeedJob();
+ pause(250);
+ expectedStateMap.put("jobb:innerJobA", Status.SUCCEEDED);
+ expectedStateMap.put("joba1", Status.SUCCEEDED);
+ expectedStateMap.put("jobb:innerJobB", Status.RUNNING);
+ expectedStateMap.put("jobb:innerJobC", Status.RUNNING);
+ compareStates(expectedStateMap, nodeMap);
+
+ InteractiveTestJob.getTestJob("jobb:innerJobB").failJob();
+ pause(250);
+ expectedStateMap.put("jobb", Status.FAILED_FINISHING);
+ expectedStateMap.put("jobb:innerJobB", Status.FAILED);
+ Assert.assertEquals(Status.FAILED_FINISHING, flow.getStatus());
+ compareStates(expectedStateMap, nodeMap);
+
+ InteractiveTestJob.getTestJob("jobb:innerJobC").succeedJob();
+ InteractiveTestJob.getTestJob("jobd:innerJobA").succeedJob();
+ pause(250);
+ expectedStateMap.put("jobb", Status.FAILED);
+ expectedStateMap.put("jobd:innerJobA", Status.SUCCEEDED);
+ expectedStateMap.put("jobd:innerFlow2", Status.RUNNING);
+ expectedStateMap.put("jobb:innerJobC", Status.SUCCEEDED);
+ expectedStateMap.put("jobb:innerFlow", Status.KILLED);
+ compareStates(expectedStateMap, nodeMap);
+
+ InteractiveTestJob.getTestJob("jobd:innerFlow2").succeedJob();
+ pause(250);
+ expectedStateMap.put("jobd:innerFlow2", Status.SUCCEEDED);
+ expectedStateMap.put("jobd", Status.SUCCEEDED);
+
+ // 3. jobc completes, everything is killed
+ InteractiveTestJob.getTestJob("jobc").succeedJob();
+ pause(250);
+ expectedStateMap.put("jobc", Status.SUCCEEDED);
+ expectedStateMap.put("jobe", Status.KILLED);
+ expectedStateMap.put("jobf", Status.KILLED);
+ compareStates(expectedStateMap, nodeMap);
+ Assert.assertEquals(Status.FAILED, flow.getStatus());
+ Assert.assertFalse(thread.isAlive());
+ }
+
+ @Test
+ public void testCancelOnFailure() throws Exception {
+ // Test propagation of KILLED status to embedded flows different branch
+ EventCollectorListener eventCollector = new EventCollectorListener();
+ FlowRunner runner = createFlowRunner(eventCollector, "jobf", FailureAction.CANCEL_ALL);
+ ExecutableFlow flow = runner.getExecutableFlow();
+ Map<String, Status> expectedStateMap = new HashMap<String, Status>();
+ Map<String, ExecutableNode> nodeMap = new HashMap<String, ExecutableNode>();
+
+ // 1. START FLOW
+ createExpectedStateMap(flow, expectedStateMap, nodeMap);
+ Thread thread = runFlowRunnerInThread(runner);
+ pause(250);
+
+ // After it starts up, only joba should be running
+ expectedStateMap.put("joba", Status.RUNNING);
+ expectedStateMap.put("joba1", Status.RUNNING);
+ compareStates(expectedStateMap, nodeMap);
+
+ // 2. JOB in subflow FAILS
+ InteractiveTestJob.getTestJob("joba").succeedJob();
+ pause(250);
+ expectedStateMap.put("joba", Status.SUCCEEDED);
+ expectedStateMap.put("jobb", Status.RUNNING);
+ expectedStateMap.put("jobc", Status.RUNNING);
+ expectedStateMap.put("jobd", Status.RUNNING);
+ expectedStateMap.put("jobb:innerJobA", Status.RUNNING);
+ expectedStateMap.put("jobd:innerJobA", Status.RUNNING);
+ compareStates(expectedStateMap, nodeMap);
+
+ InteractiveTestJob.getTestJob("joba1").succeedJob();
+ InteractiveTestJob.getTestJob("jobb:innerJobA").succeedJob();
+ pause(250);
+ expectedStateMap.put("jobb:innerJobA", Status.SUCCEEDED);
+ expectedStateMap.put("joba1", Status.SUCCEEDED);
+ expectedStateMap.put("jobb:innerJobB", Status.RUNNING);
+ expectedStateMap.put("jobb:innerJobC", Status.RUNNING);
+ compareStates(expectedStateMap, nodeMap);
+
+ InteractiveTestJob.getTestJob("jobb:innerJobB").failJob();
+ pause(1000);
+ expectedStateMap.put("jobb", Status.FAILED);
+ expectedStateMap.put("jobb:innerJobB", Status.FAILED);
+ expectedStateMap.put("jobb:innerJobC", Status.FAILED);
+ expectedStateMap.put("jobb:innerFlow", Status.KILLED);
+ expectedStateMap.put("jobc", Status.FAILED);
+ expectedStateMap.put("jobd", Status.FAILED);
+ expectedStateMap.put("jobd:innerJobA", Status.FAILED);
+ expectedStateMap.put("jobd:innerFlow2", Status.KILLED);
+ expectedStateMap.put("jobe", Status.KILLED);
+ expectedStateMap.put("jobf", Status.KILLED);
+
+ Assert.assertEquals(Status.FAILED, flow.getStatus());
+ compareStates(expectedStateMap, nodeMap);
+ Assert.assertFalse(thread.isAlive());
}
private Thread runFlowRunnerInThread(FlowRunner runner) {
@@ -279,6 +649,10 @@ public class FlowRunnerTest2 {
}
private FlowRunner createFlowRunner(EventCollectorListener eventCollector, String flowName) throws Exception {
+ return createFlowRunner(eventCollector, flowName, FailureAction.FINISH_CURRENTLY_RUNNING);
+ }
+
+ private FlowRunner createFlowRunner(EventCollectorListener eventCollector, String flowName, FailureAction action) throws Exception {
Flow flow = flowMap.get(flowName);
int exId = id++;
@@ -291,6 +665,7 @@ public class FlowRunnerTest2 {
flowParam.put("param10", "override.10");
flowParam.put("param11", "override.11");
exFlow.getExecutionOptions().addAllFlowParameters(flowParam);
+ exFlow.getExecutionOptions().setFailureAction(action);
fakeExecutorLoader.uploadExecutableFlow(exFlow);
FlowRunner runner = new FlowRunner(fakeExecutorLoader.fetchExecutableFlow(exId), fakeExecutorLoader, fakeProjectLoader, jobtypeManager);