azkaban-uncached

Fixing bugs for failed flows

9/18/2013 11:25:52 PM

Details

diff --git a/src/java/azkaban/execapp/FlowRunner.java b/src/java/azkaban/execapp/FlowRunner.java
index 5cb7188..f4d7ff7 100644
--- a/src/java/azkaban/execapp/FlowRunner.java
+++ b/src/java/azkaban/execapp/FlowRunner.java
@@ -368,11 +368,13 @@ public class FlowRunner extends EventHandler implements Runnable {
 				if (nextStatus == Status.KILLED) {
 					logger.info("Killing " + node.getId() + " due to prior errors.");
 					node.killNode(currentTime);
+					finalizeFlowIfFinished(node.getParentFlow());
 					fireEventListeners(Event.create(this, Type.JOB_FINISHED, node));
 				}
 				else if (nextStatus == Status.DISABLED) {
 					logger.info("Skipping disabled job " + node.getId() + ".");
 					node.skipNode(currentTime);
+					finalizeFlowIfFinished(node.getParentFlow());
 					fireEventListeners(Event.create(this, Type.JOB_FINISHED, node));
 				}
 				else {
@@ -758,13 +760,12 @@ public class FlowRunner extends EventHandler implements Runnable {
 
 							flowFailed = true;
 							
-							ExecutionOptions options = flow.getExecutionOptions();
 							// The KILLED status occurs when cancel is invoked. We want to keep this
 							// status even in failure conditions.
 							if (flow.getStatus() != Status.KILLED && flow.getStatus() != Status.FAILED) {
 								propagateStatus(node.getParentFlow(), Status.FAILED_FINISHING);
 
-								if (options.getFailureAction() == FailureAction.CANCEL_ALL && !flowCancelled) {
+								if (failureAction == FailureAction.CANCEL_ALL && !flowCancelled) {
 									logger.info("Flow failed. Failure option is Cancel All. Stopping execution.");
 									cancel();
 								}
@@ -786,35 +787,44 @@ public class FlowRunner extends EventHandler implements Runnable {
 				propagateStatus(base.getParentFlow(), status);
 			}
 		}
+	}
 
-		private void finalizeFlowIfFinished(ExecutableFlowBase base) {
-			// We let main thread finalize the main flow. 
-			if (base == flow) {
-				return;
-			}
-			
-			if (base.isFlowFinished()) {
-				Props previousOutput = null;
-				for(String end: base.getEndNodes()) {
-					ExecutableNode node = base.getExecutableNode(end);
-		
-					Props output = node.getOutputProps();
-					if (output != null) {
-						output = Props.clone(output);
-						output.setParent(previousOutput);
-						previousOutput = output;
-					}
+	private void finalizeFlowIfFinished(ExecutableFlowBase base) {
+		// We let main thread finalize the main flow. 
+		if (base == flow) {
+			return;
+		}
+		
+		if (base.isFlowFinished()) {
+			boolean succeeded = true;
+			Props previousOutput = null;
+			for(String end: base.getEndNodes()) {
+				ExecutableNode node = base.getExecutableNode(end);
+	
+				if (node.getStatus() == Status.KILLED) {
+					succeeded = false;
 				}
-				base.setOutputProps(previousOutput);
-				finalizeFlow(base);
 				
-				if (base.getParentFlow() != null) {
-					finalizeFlowIfFinished(base.getParentFlow());
+				Props output = node.getOutputProps();
+				if (output != null) {
+					output = Props.clone(output);
+					output.setParent(previousOutput);
+					previousOutput = output;
 				}
 			}
+			
+			if (!succeeded && (base.getStatus() == Status.RUNNING)) {
+				base.setStatus(Status.KILLED);
+			}
+			base.setOutputProps(previousOutput);
+			finalizeFlow(base);
+
+			if (base.getParentFlow() != null) {
+				finalizeFlowIfFinished(base.getParentFlow());
+			}
 		}
 	}
-
+	
 	public boolean isCancelled() {
 		return flowCancelled;
 	}
diff --git a/src/java/azkaban/executor/ExecutorManager.java b/src/java/azkaban/executor/ExecutorManager.java
index 74bba73..29a93a3 100644
--- a/src/java/azkaban/executor/ExecutorManager.java
+++ b/src/java/azkaban/executor/ExecutorManager.java
@@ -377,29 +377,43 @@ public class ExecutorManager {
 				options = new ExecutionOptions();
 			}
 			
+			String message = "";
 			if (options.getDisabledJobs() != null) {
 				// Disable jobs
 				for(String disabledId : options.getDisabledJobs()) {
-					ExecutableNode node = exflow.getExecutableNode(disabledId);
+					String[] splits = disabledId.split(":");
+					ExecutableNode node = exflow;
+					
+					for (String split: splits) {
+						if (node instanceof ExecutableFlowBase) {
+							node = ((ExecutableFlowBase)node).getExecutableNode(split);
+						}
+						else {
+							message = "Cannot disable job " + disabledId + " since flow " + split + " cannot be found. \n";
+						}
+					}
+
+					if (node == null) {
+						throw new ExecutorManagerException("Cannot disable job " + disabledId + ". Cannot find corresponding node.");
+					}
 					node.setStatus(Status.DISABLED);
 				}
 			}
 			
-			String message = "";
 			if (!running.isEmpty()) {
 				if (options.getConcurrentOption().equals(ExecutionOptions.CONCURRENT_OPTION_PIPELINE)) {
 					Collections.sort(running);
 					Integer runningExecId = running.get(running.size() - 1);
 					
 					options.setPipelineExecutionId(runningExecId);
-					message = "Flow " + flowId + " is already running with exec id " + runningExecId +". Pipelining level " + options.getPipelineLevel() + ". ";
+					message = "Flow " + flowId + " is already running with exec id " + runningExecId +". Pipelining level " + options.getPipelineLevel() + ". \n";
 				}
 				else if (options.getConcurrentOption().equals(ExecutionOptions.CONCURRENT_OPTION_SKIP)) {
 					throw new ExecutorManagerException("Flow " + flowId + " is already running. Skipping execution.", ExecutorManagerException.Reason.SkippedExecution);
 				}
 				else {
 					// The settings is to run anyways.
-					message = "Flow " + flowId + " is already running with exec id " + StringUtils.join(running, ",") +". Will execute concurrently. ";
+					message = "Flow " + flowId + " is already running with exec id " + StringUtils.join(running, ",") +". Will execute concurrently. \n";
 				}
 			}
 			
diff --git a/unit/java/azkaban/test/execapp/FlowRunnerTest2.java b/unit/java/azkaban/test/execapp/FlowRunnerTest2.java
index 909d490..6f794d0 100644
--- a/unit/java/azkaban/test/execapp/FlowRunnerTest2.java
+++ b/unit/java/azkaban/test/execapp/FlowRunnerTest2.java
@@ -17,6 +17,7 @@ import azkaban.execapp.FlowRunner;
 import azkaban.executor.ExecutableFlow;
 import azkaban.executor.ExecutableFlowBase;
 import azkaban.executor.ExecutableNode;
+import azkaban.executor.ExecutionOptions.FailureAction;
 import azkaban.executor.ExecutorLoader;
 import azkaban.executor.Status;
 import azkaban.flow.Flow;
@@ -83,7 +84,7 @@ public class FlowRunnerTest2 {
 		// 1. START FLOW
 		ExecutableFlow flow = runner.getExecutableFlow();
 		createExpectedStateMap(flow, expectedStateMap, nodeMap);
-		runFlowRunnerInThread(runner);
+		Thread thread = runFlowRunnerInThread(runner);
 		pause(250);
 		
 		// After it starts up, only joba should be running
@@ -222,6 +223,375 @@ public class FlowRunnerTest2 {
 		expectedStateMap.put("jobf", Status.SUCCEEDED);
 		compareStates(expectedStateMap, nodeMap);
 		Assert.assertEquals(Status.SUCCEEDED, flow.getStatus());
+		Assert.assertFalse(thread.isAlive());
+	}
+	
+	@Test
+	public void testDisabledNormal() throws Exception {
+		EventCollectorListener eventCollector = new EventCollectorListener();
+		FlowRunner runner = createFlowRunner(eventCollector, "jobf");
+		ExecutableFlow flow = runner.getExecutableFlow();
+		Map<String, Status> expectedStateMap = new HashMap<String, Status>();
+		Map<String, ExecutableNode> nodeMap = new HashMap<String, ExecutableNode>();
+		flow.getExecutableNode("jobb").setStatus(Status.DISABLED);
+		((ExecutableFlowBase)flow.getExecutableNode("jobd")).getExecutableNode("innerJobA").setStatus(Status.DISABLED);
+		
+		// 1. START FLOW
+		createExpectedStateMap(flow, expectedStateMap, nodeMap);
+		Thread thread = runFlowRunnerInThread(runner);
+		pause(250);
+		
+		// After it starts up, only joba should be running
+		expectedStateMap.put("joba", Status.RUNNING);
+		expectedStateMap.put("joba1", Status.RUNNING);
+		compareStates(expectedStateMap, nodeMap);
+
+		// 2. JOB A COMPLETES SUCCESSFULLY, others should be skipped
+		InteractiveTestJob.getTestJob("joba").succeedJob();
+		pause(250);
+		expectedStateMap.put("joba", Status.SUCCEEDED);
+		expectedStateMap.put("joba1", Status.RUNNING);
+		expectedStateMap.put("jobb", Status.SKIPPED);
+		expectedStateMap.put("jobc", Status.RUNNING);
+		expectedStateMap.put("jobd", Status.RUNNING);
+		expectedStateMap.put("jobd:innerJobA", Status.SKIPPED);
+		expectedStateMap.put("jobd:innerFlow2", Status.RUNNING);
+		expectedStateMap.put("jobb:innerJobA", Status.READY);
+		expectedStateMap.put("jobb:innerJobB", Status.READY);
+		expectedStateMap.put("jobb:innerJobC", Status.READY);
+		expectedStateMap.put("jobb:innerFlow", Status.READY);
+		compareStates(expectedStateMap, nodeMap);
+
+		// 3. jobb:Inner completes
+		/// innerJobA completes
+		InteractiveTestJob.getTestJob("jobc").succeedJob();
+		InteractiveTestJob.getTestJob("jobd:innerFlow2").succeedJob();
+		pause(250);
+		expectedStateMap.put("jobd:innerFlow2", Status.SUCCEEDED);
+		expectedStateMap.put("jobd", Status.SUCCEEDED);
+		expectedStateMap.put("jobc", Status.SUCCEEDED);
+		expectedStateMap.put("jobe", Status.RUNNING);
+		compareStates(expectedStateMap, nodeMap);
+
+		InteractiveTestJob.getTestJob("jobe").succeedJob();
+		InteractiveTestJob.getTestJob("joba1").succeedJob();
+		pause(250);
+		expectedStateMap.put("jobe", Status.SUCCEEDED);
+		expectedStateMap.put("joba1", Status.SUCCEEDED);
+		expectedStateMap.put("jobf", Status.RUNNING);
+		compareStates(expectedStateMap, nodeMap);
+		
+		// 4. Finish up on inner flow for jobb
+		InteractiveTestJob.getTestJob("jobf").succeedJob();
+		pause(250);
+		expectedStateMap.put("jobf", Status.SUCCEEDED);
+		compareStates(expectedStateMap, nodeMap);
+		
+		Assert.assertEquals(Status.SUCCEEDED, flow.getStatus());
+		Assert.assertFalse(thread.isAlive());
+	}
+	
+	@Test
+	public void testNormalFailure1() throws Exception {
+		// Test propagation of KILLED status to embedded flows.
+		EventCollectorListener eventCollector = new EventCollectorListener();
+		FlowRunner runner = createFlowRunner(eventCollector, "jobf");
+		ExecutableFlow flow = runner.getExecutableFlow();
+		Map<String, Status> expectedStateMap = new HashMap<String, Status>();
+		Map<String, ExecutableNode> nodeMap = new HashMap<String, ExecutableNode>();
+		
+		// 1. START FLOW
+		createExpectedStateMap(flow, expectedStateMap, nodeMap);
+		Thread thread = runFlowRunnerInThread(runner);
+		pause(250);
+		
+		// After it starts up, only joba should be running
+		expectedStateMap.put("joba", Status.RUNNING);
+		expectedStateMap.put("joba1", Status.RUNNING);
+		compareStates(expectedStateMap, nodeMap);
+
+		// 2. JOB A COMPLETES SUCCESSFULLY, others should be skipped
+		InteractiveTestJob.getTestJob("joba").failJob();
+		pause(250);
+		Assert.assertEquals(Status.FAILED_FINISHING, flow.getStatus());
+		expectedStateMap.put("joba", Status.FAILED);
+		expectedStateMap.put("joba1", Status.RUNNING);
+		expectedStateMap.put("jobb", Status.KILLED);
+		expectedStateMap.put("jobc", Status.KILLED);
+		expectedStateMap.put("jobd", Status.KILLED);
+		expectedStateMap.put("jobd:innerJobA", Status.READY);
+		expectedStateMap.put("jobd:innerFlow2", Status.READY);
+		expectedStateMap.put("jobb:innerJobA", Status.READY);
+		expectedStateMap.put("jobb:innerFlow", Status.READY);
+		expectedStateMap.put("jobe", Status.KILLED);
+		compareStates(expectedStateMap, nodeMap);
+
+		// 3. jobb:Inner completes
+		/// innerJobA completes
+		InteractiveTestJob.getTestJob("joba1").succeedJob();
+		pause(250);
+		expectedStateMap.put("jobf", Status.KILLED);
+		Assert.assertEquals(Status.FAILED, flow.getStatus());
+		Assert.assertFalse(thread.isAlive());
+	}
+	
+	@Test
+	public void testNormalFailure2() throws Exception {
+		// Test propagation of KILLED status to embedded flows different branch
+		EventCollectorListener eventCollector = new EventCollectorListener();
+		FlowRunner runner = createFlowRunner(eventCollector, "jobf");
+		ExecutableFlow flow = runner.getExecutableFlow();
+		Map<String, Status> expectedStateMap = new HashMap<String, Status>();
+		Map<String, ExecutableNode> nodeMap = new HashMap<String, ExecutableNode>();
+		
+		// 1. START FLOW
+		createExpectedStateMap(flow, expectedStateMap, nodeMap);
+		Thread thread = runFlowRunnerInThread(runner);
+		pause(250);
+		
+		// After it starts up, only joba should be running
+		expectedStateMap.put("joba", Status.RUNNING);
+		expectedStateMap.put("joba1", Status.RUNNING);
+		compareStates(expectedStateMap, nodeMap);
+
+		// 2. JOB A COMPLETES SUCCESSFULLY, others should be skipped
+		InteractiveTestJob.getTestJob("joba").succeedJob();
+		pause(250);
+		expectedStateMap.put("joba", Status.SUCCEEDED);
+		expectedStateMap.put("jobb", Status.RUNNING);
+		expectedStateMap.put("jobc", Status.RUNNING);
+		expectedStateMap.put("jobd", Status.RUNNING);
+		expectedStateMap.put("jobb:innerJobA", Status.RUNNING);
+		expectedStateMap.put("jobc", Status.RUNNING);
+		expectedStateMap.put("jobd:innerJobA", Status.RUNNING);
+		
+		InteractiveTestJob.getTestJob("joba1").failJob();
+		pause(250);
+		expectedStateMap.put("joba1", Status.FAILED);
+		compareStates(expectedStateMap, nodeMap);
+
+		// 3. joba completes, everything is killed
+		InteractiveTestJob.getTestJob("jobb:innerJobA").succeedJob();
+		InteractiveTestJob.getTestJob("jobd:innerJobA").succeedJob();
+		pause(250);
+		expectedStateMap.put("jobb:innerJobA", Status.SUCCEEDED);
+		expectedStateMap.put("jobd:innerJobA", Status.SUCCEEDED);
+		expectedStateMap.put("jobb:innerJobB", Status.KILLED);
+		expectedStateMap.put("jobb:innerJobC", Status.KILLED);
+		expectedStateMap.put("jobb:innerFlow", Status.KILLED);
+		expectedStateMap.put("jobd:innerFlow2", Status.KILLED);
+		expectedStateMap.put("jobb", Status.KILLED);
+		expectedStateMap.put("jobd", Status.KILLED);
+		compareStates(expectedStateMap, nodeMap);
+		Assert.assertEquals(Status.FAILED_FINISHING, flow.getStatus());
+		
+		InteractiveTestJob.getTestJob("jobc").succeedJob();
+		pause(250);
+		expectedStateMap.put("jobc", Status.SUCCEEDED);
+		expectedStateMap.put("jobe", Status.KILLED);
+		expectedStateMap.put("jobf", Status.KILLED);
+		compareStates(expectedStateMap, nodeMap);
+		Assert.assertEquals(Status.FAILED, flow.getStatus());
+		
+		Assert.assertFalse(thread.isAlive());
+	}
+	
+	@Test
+	public void testNormalFailure3() throws Exception {
+		// Test propagation of KILLED status to embedded flows different branch
+		EventCollectorListener eventCollector = new EventCollectorListener();
+		FlowRunner runner = createFlowRunner(eventCollector, "jobf");
+		ExecutableFlow flow = runner.getExecutableFlow();
+		Map<String, Status> expectedStateMap = new HashMap<String, Status>();
+		Map<String, ExecutableNode> nodeMap = new HashMap<String, ExecutableNode>();
+		
+		// 1. START FLOW
+		createExpectedStateMap(flow, expectedStateMap, nodeMap);
+		Thread thread = runFlowRunnerInThread(runner);
+		pause(250);
+		
+		// After it starts up, only joba should be running
+		expectedStateMap.put("joba", Status.RUNNING);
+		expectedStateMap.put("joba1", Status.RUNNING);
+		compareStates(expectedStateMap, nodeMap);
+
+		// 2. JOB in subflow FAILS
+		InteractiveTestJob.getTestJob("joba").succeedJob();
+		pause(250);
+		expectedStateMap.put("joba", Status.SUCCEEDED);
+		expectedStateMap.put("jobb", Status.RUNNING);
+		expectedStateMap.put("jobc", Status.RUNNING);
+		expectedStateMap.put("jobd", Status.RUNNING);
+		expectedStateMap.put("jobb:innerJobA", Status.RUNNING);
+		expectedStateMap.put("jobd:innerJobA", Status.RUNNING);
+		compareStates(expectedStateMap, nodeMap);
+		
+		InteractiveTestJob.getTestJob("joba1").succeedJob();
+		InteractiveTestJob.getTestJob("jobb:innerJobA").succeedJob();
+		pause(250);
+		expectedStateMap.put("jobb:innerJobA", Status.SUCCEEDED);
+		expectedStateMap.put("joba1", Status.SUCCEEDED);
+		expectedStateMap.put("jobb:innerJobB", Status.RUNNING);
+		expectedStateMap.put("jobb:innerJobC", Status.RUNNING);
+		compareStates(expectedStateMap, nodeMap);
+		
+		InteractiveTestJob.getTestJob("jobb:innerJobB").failJob();
+		pause(250);
+		expectedStateMap.put("jobb", Status.FAILED_FINISHING);
+		expectedStateMap.put("jobb:innerJobB", Status.FAILED);
+		Assert.assertEquals(Status.FAILED_FINISHING, flow.getStatus());
+		compareStates(expectedStateMap, nodeMap);
+
+		InteractiveTestJob.getTestJob("jobb:innerJobC").succeedJob();
+		InteractiveTestJob.getTestJob("jobd:innerJobA").succeedJob();
+		pause(250);
+		expectedStateMap.put("jobd:innerJobA", Status.SUCCEEDED);
+		expectedStateMap.put("jobd:innerFlow2", Status.KILLED);
+		expectedStateMap.put("jobd", Status.KILLED);
+		expectedStateMap.put("jobb:innerJobC", Status.SUCCEEDED);
+		expectedStateMap.put("jobb:innerFlow", Status.KILLED);
+		expectedStateMap.put("jobb", Status.FAILED);
+		compareStates(expectedStateMap, nodeMap);
+
+		// 3. jobc completes, everything is killed
+		InteractiveTestJob.getTestJob("jobc").succeedJob();
+		pause(250);
+		expectedStateMap.put("jobc", Status.SUCCEEDED);
+		expectedStateMap.put("jobe", Status.KILLED);
+		expectedStateMap.put("jobf", Status.KILLED);
+		compareStates(expectedStateMap, nodeMap);
+		Assert.assertEquals(Status.FAILED, flow.getStatus());
+		
+		Assert.assertFalse(thread.isAlive());
+	}
+	
+	@Test
+	public void testFailedFinishingFailure3() throws Exception {
+		// Test propagation of KILLED status to embedded flows different branch
+		EventCollectorListener eventCollector = new EventCollectorListener();
+		FlowRunner runner = createFlowRunner(eventCollector, "jobf", FailureAction.FINISH_ALL_POSSIBLE);
+		ExecutableFlow flow = runner.getExecutableFlow();
+		Map<String, Status> expectedStateMap = new HashMap<String, Status>();
+		Map<String, ExecutableNode> nodeMap = new HashMap<String, ExecutableNode>();
+		
+		// 1. START FLOW
+		createExpectedStateMap(flow, expectedStateMap, nodeMap);
+		Thread thread = runFlowRunnerInThread(runner);
+		pause(250);
+		
+		// After it starts up, only joba should be running
+		expectedStateMap.put("joba", Status.RUNNING);
+		expectedStateMap.put("joba1", Status.RUNNING);
+		compareStates(expectedStateMap, nodeMap);
+
+		// 2. JOB in subflow FAILS
+		InteractiveTestJob.getTestJob("joba").succeedJob();
+		pause(250);
+		expectedStateMap.put("joba", Status.SUCCEEDED);
+		expectedStateMap.put("jobb", Status.RUNNING);
+		expectedStateMap.put("jobc", Status.RUNNING);
+		expectedStateMap.put("jobd", Status.RUNNING);
+		expectedStateMap.put("jobb:innerJobA", Status.RUNNING);
+		expectedStateMap.put("jobd:innerJobA", Status.RUNNING);
+		compareStates(expectedStateMap, nodeMap);
+		
+		InteractiveTestJob.getTestJob("joba1").succeedJob();
+		InteractiveTestJob.getTestJob("jobb:innerJobA").succeedJob();
+		pause(250);
+		expectedStateMap.put("jobb:innerJobA", Status.SUCCEEDED);
+		expectedStateMap.put("joba1", Status.SUCCEEDED);
+		expectedStateMap.put("jobb:innerJobB", Status.RUNNING);
+		expectedStateMap.put("jobb:innerJobC", Status.RUNNING);
+		compareStates(expectedStateMap, nodeMap);
+		
+		InteractiveTestJob.getTestJob("jobb:innerJobB").failJob();
+		pause(250);
+		expectedStateMap.put("jobb", Status.FAILED_FINISHING);
+		expectedStateMap.put("jobb:innerJobB", Status.FAILED);
+		Assert.assertEquals(Status.FAILED_FINISHING, flow.getStatus());
+		compareStates(expectedStateMap, nodeMap);
+
+		InteractiveTestJob.getTestJob("jobb:innerJobC").succeedJob();
+		InteractiveTestJob.getTestJob("jobd:innerJobA").succeedJob();
+		pause(250);
+		expectedStateMap.put("jobb", Status.FAILED);
+		expectedStateMap.put("jobd:innerJobA", Status.SUCCEEDED);
+		expectedStateMap.put("jobd:innerFlow2", Status.RUNNING);
+		expectedStateMap.put("jobb:innerJobC", Status.SUCCEEDED);
+		expectedStateMap.put("jobb:innerFlow", Status.KILLED);
+		compareStates(expectedStateMap, nodeMap);
+
+		InteractiveTestJob.getTestJob("jobd:innerFlow2").succeedJob();
+		pause(250);
+		expectedStateMap.put("jobd:innerFlow2", Status.SUCCEEDED);
+		expectedStateMap.put("jobd", Status.SUCCEEDED);
+		
+		// 3. jobc completes, everything is killed
+		InteractiveTestJob.getTestJob("jobc").succeedJob();
+		pause(250);
+		expectedStateMap.put("jobc", Status.SUCCEEDED);
+		expectedStateMap.put("jobe", Status.KILLED);
+		expectedStateMap.put("jobf", Status.KILLED);
+		compareStates(expectedStateMap, nodeMap);
+		Assert.assertEquals(Status.FAILED, flow.getStatus());
+		Assert.assertFalse(thread.isAlive());
+	}
+	
+	@Test
+	public void testCancelOnFailure() throws Exception {
+		// Test propagation of KILLED status to embedded flows different branch
+		EventCollectorListener eventCollector = new EventCollectorListener();
+		FlowRunner runner = createFlowRunner(eventCollector, "jobf", FailureAction.CANCEL_ALL);
+		ExecutableFlow flow = runner.getExecutableFlow();
+		Map<String, Status> expectedStateMap = new HashMap<String, Status>();
+		Map<String, ExecutableNode> nodeMap = new HashMap<String, ExecutableNode>();
+		
+		// 1. START FLOW
+		createExpectedStateMap(flow, expectedStateMap, nodeMap);
+		Thread thread = runFlowRunnerInThread(runner);
+		pause(250);
+		
+		// After it starts up, only joba should be running
+		expectedStateMap.put("joba", Status.RUNNING);
+		expectedStateMap.put("joba1", Status.RUNNING);
+		compareStates(expectedStateMap, nodeMap);
+
+		// 2. JOB in subflow FAILS
+		InteractiveTestJob.getTestJob("joba").succeedJob();
+		pause(250);
+		expectedStateMap.put("joba", Status.SUCCEEDED);
+		expectedStateMap.put("jobb", Status.RUNNING);
+		expectedStateMap.put("jobc", Status.RUNNING);
+		expectedStateMap.put("jobd", Status.RUNNING);
+		expectedStateMap.put("jobb:innerJobA", Status.RUNNING);
+		expectedStateMap.put("jobd:innerJobA", Status.RUNNING);
+		compareStates(expectedStateMap, nodeMap);
+		
+		InteractiveTestJob.getTestJob("joba1").succeedJob();
+		InteractiveTestJob.getTestJob("jobb:innerJobA").succeedJob();
+		pause(250);
+		expectedStateMap.put("jobb:innerJobA", Status.SUCCEEDED);
+		expectedStateMap.put("joba1", Status.SUCCEEDED);
+		expectedStateMap.put("jobb:innerJobB", Status.RUNNING);
+		expectedStateMap.put("jobb:innerJobC", Status.RUNNING);
+		compareStates(expectedStateMap, nodeMap);
+		
+		InteractiveTestJob.getTestJob("jobb:innerJobB").failJob();
+		pause(1000);
+		expectedStateMap.put("jobb", Status.FAILED);
+		expectedStateMap.put("jobb:innerJobB", Status.FAILED);
+		expectedStateMap.put("jobb:innerJobC", Status.FAILED);
+		expectedStateMap.put("jobb:innerFlow", Status.KILLED);
+		expectedStateMap.put("jobc", Status.FAILED);
+		expectedStateMap.put("jobd", Status.FAILED);
+		expectedStateMap.put("jobd:innerJobA", Status.FAILED);
+		expectedStateMap.put("jobd:innerFlow2", Status.KILLED);
+		expectedStateMap.put("jobe", Status.KILLED);
+		expectedStateMap.put("jobf", Status.KILLED);
+		
+		Assert.assertEquals(Status.FAILED, flow.getStatus());
+		compareStates(expectedStateMap, nodeMap);
+		Assert.assertFalse(thread.isAlive());
 	}
 	
 	private Thread runFlowRunnerInThread(FlowRunner runner) {
@@ -279,6 +649,10 @@ public class FlowRunnerTest2 {
 	}
 	
 	private FlowRunner createFlowRunner(EventCollectorListener eventCollector, String flowName) throws Exception {
+		return createFlowRunner(eventCollector, flowName, FailureAction.FINISH_CURRENTLY_RUNNING);
+	}
+	
+	private FlowRunner createFlowRunner(EventCollectorListener eventCollector, String flowName, FailureAction action) throws Exception {
 		Flow flow = flowMap.get(flowName);
 
 		int exId = id++;
@@ -291,6 +665,7 @@ public class FlowRunnerTest2 {
 		flowParam.put("param10", "override.10");
 		flowParam.put("param11", "override.11");
 		exFlow.getExecutionOptions().addAllFlowParameters(flowParam);
+		exFlow.getExecutionOptions().setFailureAction(action);
 		fakeExecutorLoader.uploadExecutableFlow(exFlow);
 	
 		FlowRunner runner = new FlowRunner(fakeExecutorLoader.fetchExecutableFlow(exId), fakeExecutorLoader, fakeProjectLoader, jobtypeManager);