azkaban-developers

Merge pull request #147 from rbpark/master Fixing issue

2/6/2014 2:50:17 AM
2.5.0-rc2

Details

diff --git a/src/java/azkaban/execapp/FlowRunner.java b/src/java/azkaban/execapp/FlowRunner.java
index 77c9016..6aba34a 100644
--- a/src/java/azkaban/execapp/FlowRunner.java
+++ b/src/java/azkaban/execapp/FlowRunner.java
@@ -704,7 +704,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 	private void runExecutableNode(ExecutableNode node) throws IOException {
 		// Collect output props from the job's dependencies.
 		prepareJobProperties(node);
-		
+
 		node.setStatus(Status.QUEUED);
 		JobRunner runner = createJobRunner(node);
 		logger.info("Submitting job '" + node.getNestedId() + "' to run.");
@@ -714,7 +714,6 @@ public class FlowRunner extends EventHandler implements Runnable {
 		} catch (RejectedExecutionException e) {
 			logger.error(e);
 		};
-
 	}
 	
 	/**
@@ -842,11 +841,14 @@ public class FlowRunner extends EventHandler implements Runnable {
 				updateFlow();
 			}
 		}
+
+		interrupt();
 	}
 	
 	public void kill(String user) {
 		synchronized(mainSyncObj) {
 			logger.info("Flow killed by " + user);
+			flow.setStatus(Status.KILLED);
 			kill();
 			updateFlow();
 		}
@@ -856,6 +858,8 @@ public class FlowRunner extends EventHandler implements Runnable {
 	private void kill() {
 		synchronized(mainSyncObj) {
 			logger.info("Kill has been called on flow " + execId);
+			
+			// If the flow is paused, then we'll also unpause
 			flowPaused = false;
 			flowKilled = true;
 			
@@ -996,6 +1000,13 @@ public class FlowRunner extends EventHandler implements Runnable {
 				long seconds = (node.getEndTime() - node.getStartTime())/1000;
 				synchronized(mainSyncObj) {
 					logger.info("Job " + node.getNestedId() + " finished with status " + node.getStatus() + " in " + seconds + " seconds");
+					
+					// Cancellation is handled in the main thread, but if the flow is paused, the main thread is paused too.
+					// This unpauses the flow for cancellation.
+					if (flowPaused && node.getStatus() == Status.FAILED && failureAction == FailureAction.CANCEL_ALL) {
+						flowPaused = false;
+					}
+					
 					finishedNodes.add(node);
 					node.getParentFlow().setUpdateTime(System.currentTimeMillis());
 					interrupt();
diff --git a/src/java/azkaban/execapp/JobRunner.java b/src/java/azkaban/execapp/JobRunner.java
index 28a96d1..dd9ad96 100644
--- a/src/java/azkaban/execapp/JobRunner.java
+++ b/src/java/azkaban/execapp/JobRunner.java
@@ -259,14 +259,14 @@ public class JobRunner extends EventHandler implements Runnable {
 		boolean quickFinish = false;
 		long time = System.currentTimeMillis();
 		
-		if (this.isKilled() || Status.isStatusFinished(nodeStatus)) {
+		if (Status.isStatusFinished(nodeStatus)) {
 			quickFinish = true;
 		}
 		else if (nodeStatus == Status.DISABLED) {
 			changeStatus(Status.SKIPPED, time);
 			quickFinish = true;
 		} 
-		else if (this.killed) {
+		else if (this.isKilled()) {
 			changeStatus(Status.KILLED, time);
 			quickFinish = true;
 		} 
@@ -443,6 +443,9 @@ public class JobRunner extends EventHandler implements Runnable {
 		node.setEndTime(System.currentTimeMillis());
 
 		if (isKilled()) {
+			// even if it's killed, there is a chance that the job failed is marked as failure,
+			// So we set it to KILLED to make sure we know that we forced kill it rather than
+			// it being a legitimate failure.
 			changeStatus(Status.KILLED);
 		}
 		logInfo("Finishing job " + this.jobId + " at " + node.getEndTime() + " with status " + node.getStatus());
@@ -455,13 +458,13 @@ public class JobRunner extends EventHandler implements Runnable {
 	
 	private boolean prepareJob() throws RuntimeException {
 		// Check pre conditions
-		if (props == null || killed) {
+		if (props == null || this.isKilled()) {
 			logError("Failing job. The job properties don't exist");
 			return false;
 		}
 		
 		synchronized (syncObject) {
-			if (node.getStatus() == Status.FAILED || killed) {
+			if (node.getStatus() == Status.FAILED || this.isKilled()) {
 				return false;
 			}
 
@@ -559,6 +562,9 @@ public class JobRunner extends EventHandler implements Runnable {
 	
 	public void kill() {
 		synchronized (syncObject) {
+			if (Status.isStatusFinished(node.getStatus())) {
+				return;
+			}
 			logError("Kill has been called.");
 			this.killed = true;
 			
@@ -576,7 +582,7 @@ public class JobRunner extends EventHandler implements Runnable {
 				}
 				return;
 			}
-	
+
 			try {
 				job.cancel();
 			}
@@ -584,6 +590,8 @@ public class JobRunner extends EventHandler implements Runnable {
 				logError(e.getMessage());
 				logError("Failed trying to cancel job. Maybe it hasn't started running yet or just finished.");
 			}
+			
+			this.changeStatus(Status.KILLED);
 		}
 	}
 	
@@ -654,6 +662,5 @@ public class JobRunner extends EventHandler implements Runnable {
 		}
 
 		return attempt > 0 ? "_job." + executionId + "." + attempt + "." + jobId + ".attach" : "_job." + executionId + "." + jobId + ".attach";
-
 	}
 }
diff --git a/unit/java/azkaban/test/execapp/FlowRunnerTest2.java b/unit/java/azkaban/test/execapp/FlowRunnerTest2.java
index e5fa39f..9605c12 100644
--- a/unit/java/azkaban/test/execapp/FlowRunnerTest2.java
+++ b/unit/java/azkaban/test/execapp/FlowRunnerTest2.java
@@ -30,6 +30,47 @@ import azkaban.test.executor.JavaJob;
 import azkaban.utils.DirectoryFlowLoader;
 import azkaban.utils.Props;
 
+/**
+ * Test the flow run, especially with embedded flows.
+ * 
+ * This test uses executions/embedded2. It also mainly uses the flow named jobf. The test is designed to
+ * control success/failures explicitly so we don't have to time the flow exactly.
+ * 
+ * Flow jobf looks like the following:
+ * 
+ * 
+ *       joba       joba1
+ *      /  |  \      |
+ *     /   |   \     |
+ *  jobb  jobd jobc  |
+ *     \   |   /    /
+ *      \  |  /    /
+ *        jobe    /
+ *         |     /
+ *         |    /
+ *        jobf
+ * 
+ *  The job 'jobb' is an embedded flow:
+ *  
+ *  jobb:innerFlow
+ *    
+ *        innerJobA
+ *        /       \
+ *   innerJobB   innerJobC
+ *        \       / 
+ *        innerFlow
+ *        
+ *        
+ *  The job 'jobd' is a simple embedded flow:
+ *  
+ *  jobd:innerFlow2
+ *    
+ *       innerJobA
+ *           |
+ *       innerFlow2
+ *       
+ *  The following tests checks each stage of the flow run by forcing jobs to succeed or fail.
+ */
 public class FlowRunnerTest2 {
 	private File workingDir;
 	private JobTypeManager jobtypeManager;
@@ -73,6 +114,12 @@ public class FlowRunnerTest2 {
 		}
 	}
 	
+	/**
+	 * Tests the basic successful flow run, and also tests all output variables from
+	 * each job.
+	 * 
+	 * @throws Exception
+	 */
 	@Test
 	public void testBasicRun() throws Exception {
 		EventCollectorListener eventCollector = new EventCollectorListener();
@@ -229,6 +276,10 @@ public class FlowRunnerTest2 {
 		Assert.assertFalse(thread.isAlive());
 	}
 	
+	/**
+	 * Tests a flow with Disabled jobs and flows. They should properly SKIP executions
+	 * @throws Exception
+	 */
 	@Test
 	public void testDisabledNormal() throws Exception {
 		EventCollectorListener eventCollector = new EventCollectorListener();
@@ -294,6 +345,13 @@ public class FlowRunnerTest2 {
 		Assert.assertFalse(thread.isAlive());
 	}
 	
+	/**
+	 * Tests a failure with the default FINISH_CURRENTLY_RUNNING.
+	 * After the first failure, every job that started should complete, and the
+	 * rest of the jobs should be skipped.
+	 * 
+	 * @throws Exception
+	 */
 	@Test
 	public void testNormalFailure1() throws Exception {
 		// Test propagation of KILLED status to embedded flows.
@@ -338,6 +396,10 @@ public class FlowRunnerTest2 {
 		Assert.assertFalse(thread.isAlive());
 	}
 	
+	/**
+	 * Test #2 on the default failure case.
+	 * @throws Exception
+	 */
 	@Test
 	public void testNormalFailure2() throws Exception {
 		// Test propagation of KILLED status to embedded flows different branch
@@ -468,6 +530,13 @@ public class FlowRunnerTest2 {
 		Assert.assertFalse(thread.isAlive());
 	}
 	
+	/**
+	 * Tests failures when the fail behaviour is FINISH_ALL_POSSIBLE.
+	 * In this case, all jobs which have had its pre-requisite met can continue to run.
+	 * Finishes when the failure is propagated to the last node of the flow.
+	 * 
+	 * @throws Exception
+	 */
 	@Test
 	public void testFailedFinishingFailure3() throws Exception {
 		// Test propagation of KILLED status to embedded flows different branch
@@ -540,6 +609,14 @@ public class FlowRunnerTest2 {
 		Assert.assertFalse(thread.isAlive());
 	}
 	
+	/**
+	 * Tests the failure condition when a failure invokes a cancel (or killed) on the flow.
+	 * 
+	 * Any jobs that are running will be assigned a KILLED state, and any nodes which were
+	 * skipped due to prior errors will be given a CANCELLED state.
+	 * 
+	 * @throws Exception
+	 */
 	@Test
 	public void testCancelOnFailure() throws Exception {
 		// Test propagation of KILLED status to embedded flows different branch
@@ -598,6 +675,10 @@ public class FlowRunnerTest2 {
 
 	}
 	
+	/**
+	 * Tests retries after a failure
+	 * @throws Exception
+	 */
 	@Test
 	public void testRetryOnFailure() throws Exception {
 		// Test propagation of KILLED status to embedded flows different branch
@@ -700,6 +781,12 @@ public class FlowRunnerTest2 {
 		Assert.assertFalse(thread.isAlive());
 	}
 	
+	/**
+	 * Tests the manual Killing of a flow. In this case, the flow is just fine before the cancel
+	 * is called.
+	 * 
+	 * @throws Exception
+	 */
 	@Test
 	public void testCancel() throws Exception {
 		// Test propagation of KILLED status to embedded flows different branch
@@ -758,6 +845,11 @@ public class FlowRunnerTest2 {
 		Assert.assertFalse(thread.isAlive());
 	}
 	
+	/**
+	 * Tests the manual invocation of cancel on a flow that is FAILED_FINISHING
+	 * 
+	 * @throws Exception
+	 */
 	@Test
 	public void testManualCancelOnFailure() throws Exception {
 		// Test propagation of KILLED status to embedded flows different branch
@@ -817,11 +909,387 @@ public class FlowRunnerTest2 {
 		expectedStateMap.put("jobe", Status.CANCELLED);
 		expectedStateMap.put("jobf", Status.CANCELLED);
 		
+		Assert.assertEquals(Status.KILLED, flow.getStatus());
+		compareStates(expectedStateMap, nodeMap);
+		Assert.assertFalse(thread.isAlive());
+	}
+	
+	/**
+	 * Tests that pause and resume work
+	 * 
+	 * @throws Exception
+	 */
+	@Test
+	public void testPause() throws Exception {
+		EventCollectorListener eventCollector = new EventCollectorListener();
+		FlowRunner runner = createFlowRunner(eventCollector, "jobf");
+		
+		Map<String, Status> expectedStateMap = new HashMap<String, Status>();
+		Map<String, ExecutableNode> nodeMap = new HashMap<String, ExecutableNode>();
+		
+		// 1. START FLOW
+		ExecutableFlow flow = runner.getExecutableFlow();
+		createExpectedStateMap(flow, expectedStateMap, nodeMap);
+		Thread thread = runFlowRunnerInThread(runner);
+		pause(250);
+		
+		// After it starts up, only joba should be running
+		expectedStateMap.put("joba", Status.RUNNING);
+		expectedStateMap.put("joba1", Status.RUNNING);
+		compareStates(expectedStateMap, nodeMap);
+
+		runner.pause("test");
+		InteractiveTestJob.getTestJob("joba").succeedJob();
+		// 2.1 JOB A COMPLETES SUCCESSFULLY AFTER PAUSE
+		pause(250);
+		expectedStateMap.put("joba", Status.SUCCEEDED);
+		compareStates(expectedStateMap, nodeMap);
+		Assert.assertEquals(flow.getStatus(), Status.PAUSED);
+		
+		// 2.2 Flow is unpaused
+		runner.resume("test");
+		pause(250);
+		Assert.assertEquals(flow.getStatus(), Status.RUNNING);
+		expectedStateMap.put("joba", Status.SUCCEEDED);
+		expectedStateMap.put("joba1", Status.RUNNING);
+		expectedStateMap.put("jobb", Status.RUNNING);
+		expectedStateMap.put("jobc", Status.RUNNING);
+		expectedStateMap.put("jobd", Status.RUNNING);
+		expectedStateMap.put("jobd:innerJobA", Status.RUNNING);
+		expectedStateMap.put("jobb:innerJobA", Status.RUNNING);
+		compareStates(expectedStateMap, nodeMap);		
+		
+		// 3. jobb:Inner completes
+		runner.pause("test");
+		
+		/// innerJobA completes, but paused
+		InteractiveTestJob.getTestJob("jobb:innerJobA").succeedJob(Props.of("output.jobb.innerJobA", "jobb.innerJobA"));
+		pause(250);
+		expectedStateMap.put("jobb:innerJobA", Status.SUCCEEDED);
+		compareStates(expectedStateMap, nodeMap);
+
+		runner.resume("test");
+		pause(250);
+		expectedStateMap.put("jobb:innerJobB", Status.RUNNING);
+		expectedStateMap.put("jobb:innerJobC", Status.RUNNING);
+		compareStates(expectedStateMap, nodeMap);
+		
+		/// innerJobB, C completes
+		InteractiveTestJob.getTestJob("jobb:innerJobB").succeedJob(Props.of("output.jobb.innerJobB", "jobb.innerJobB"));
+		InteractiveTestJob.getTestJob("jobb:innerJobC").succeedJob(Props.of("output.jobb.innerJobC", "jobb.innerJobC"));
+		pause(250);
+		expectedStateMap.put("jobb:innerJobB", Status.SUCCEEDED);
+		expectedStateMap.put("jobb:innerJobC", Status.SUCCEEDED);
+		expectedStateMap.put("jobb:innerFlow", Status.RUNNING);
+		compareStates(expectedStateMap, nodeMap);
+		
+		// 4. Finish up on inner flow for jobb
+		InteractiveTestJob.getTestJob("jobb:innerFlow").succeedJob(Props.of("output1.jobb", "test1", "output2.jobb", "test2"));
+		pause(250);
+		expectedStateMap.put("jobb:innerFlow", Status.SUCCEEDED);
+		expectedStateMap.put("jobb", Status.SUCCEEDED);
+		compareStates(expectedStateMap, nodeMap);
+		
+		// 5. Finish jobc, jobd
+		InteractiveTestJob.getTestJob("jobc").succeedJob(Props.of("output.jobc", "jobc"));
+		pause(250);
+		expectedStateMap.put("jobc", Status.SUCCEEDED);
+		compareStates(expectedStateMap, nodeMap);
+		InteractiveTestJob.getTestJob("jobd:innerJobA").succeedJob();
+		pause(250);
+		InteractiveTestJob.getTestJob("jobd:innerFlow2").succeedJob();
+		pause(250);
+		expectedStateMap.put("jobd:innerJobA", Status.SUCCEEDED);
+		expectedStateMap.put("jobd:innerFlow2", Status.SUCCEEDED);
+		expectedStateMap.put("jobd", Status.SUCCEEDED);
+		expectedStateMap.put("jobe", Status.RUNNING);
+		compareStates(expectedStateMap, nodeMap);
+		
+		// 6. Finish off flow
+		InteractiveTestJob.getTestJob("joba1").succeedJob();
+		pause(250);
+		InteractiveTestJob.getTestJob("jobe").succeedJob();
+		pause(250);
+		expectedStateMap.put("joba1", Status.SUCCEEDED);
+		expectedStateMap.put("jobe", Status.SUCCEEDED);
+		expectedStateMap.put("jobf", Status.RUNNING);
+		compareStates(expectedStateMap, nodeMap);
+		
+		InteractiveTestJob.getTestJob("jobf").succeedJob();
+		pause(250);
+		expectedStateMap.put("jobf", Status.SUCCEEDED);
+		compareStates(expectedStateMap, nodeMap);
+		Assert.assertEquals(Status.SUCCEEDED, flow.getStatus());
+		
+		Assert.assertFalse(thread.isAlive());
+	}
+	
+	/**
+	 * Test the condition for a manual invocation of a KILL (cancel) on a flow that
+	 * has been paused. The flow should unpause and be killed immediately.
+	 * 
+	 * @throws Exception
+	 */
+	@Test
+	public void testPauseKill() throws Exception {
+		EventCollectorListener eventCollector = new EventCollectorListener();
+		FlowRunner runner = createFlowRunner(eventCollector, "jobf");
+		
+		Map<String, Status> expectedStateMap = new HashMap<String, Status>();
+		Map<String, ExecutableNode> nodeMap = new HashMap<String, ExecutableNode>();
+		
+		// 1. START FLOW
+		ExecutableFlow flow = runner.getExecutableFlow();
+		createExpectedStateMap(flow, expectedStateMap, nodeMap);
+		Thread thread = runFlowRunnerInThread(runner);
+		pause(250);
+		
+		// After it starts up, only joba should be running
+		expectedStateMap.put("joba", Status.RUNNING);
+		expectedStateMap.put("joba1", Status.RUNNING);
+		compareStates(expectedStateMap, nodeMap);
+
+		// 2. JOB A COMPLETES SUCCESSFULLY 
+		InteractiveTestJob.getTestJob("joba").succeedJob();
+		pause(250);
+		expectedStateMap.put("joba", Status.SUCCEEDED);
+		expectedStateMap.put("joba1", Status.RUNNING);
+		expectedStateMap.put("jobb", Status.RUNNING);
+		expectedStateMap.put("jobc", Status.RUNNING);
+		expectedStateMap.put("jobd", Status.RUNNING);
+		expectedStateMap.put("jobd:innerJobA", Status.RUNNING);
+		expectedStateMap.put("jobb:innerJobA", Status.RUNNING);
+		compareStates(expectedStateMap, nodeMap);
+
+		runner.pause("me");
+		pause(250);
+		Assert.assertEquals(flow.getStatus(), Status.PAUSED);
+		InteractiveTestJob.getTestJob("jobb:innerJobA").succeedJob();
+		InteractiveTestJob.getTestJob("jobd:innerJobA").succeedJob();
+		pause(250);
+		expectedStateMap.put("jobb:innerJobA", Status.SUCCEEDED);
+		expectedStateMap.put("jobd:innerJobA", Status.SUCCEEDED);
+		compareStates(expectedStateMap, nodeMap);
+		
+		runner.kill("me");
+		pause(250);
+		expectedStateMap.put("joba1", Status.KILLED);
+		expectedStateMap.put("jobb:innerJobB", Status.CANCELLED);
+		expectedStateMap.put("jobb:innerJobC", Status.CANCELLED);
+		expectedStateMap.put("jobb:innerFlow", Status.CANCELLED);
+		expectedStateMap.put("jobb", Status.KILLED);
+		expectedStateMap.put("jobc", Status.KILLED);
+		expectedStateMap.put("jobd:innerFlow2", Status.CANCELLED);
+		expectedStateMap.put("jobd", Status.KILLED);
+		expectedStateMap.put("jobe", Status.CANCELLED);
+		expectedStateMap.put("jobf", Status.CANCELLED);
+
+		compareStates(expectedStateMap, nodeMap);
+		Assert.assertEquals(Status.KILLED, flow.getStatus());
+		Assert.assertFalse(thread.isAlive());
+	}
+	
+	/**
+	 * Tests the case where a failure occurs on a Paused flow. In this case, the flow should stay paused.
+	 * @throws Exception
+	 */
+	@Test
+	public void testPauseFail() throws Exception {
+		EventCollectorListener eventCollector = new EventCollectorListener();
+		FlowRunner runner = createFlowRunner(eventCollector, "jobf", FailureAction.FINISH_CURRENTLY_RUNNING);
+		
+		Map<String, Status> expectedStateMap = new HashMap<String, Status>();
+		Map<String, ExecutableNode> nodeMap = new HashMap<String, ExecutableNode>();
+		
+		// 1. START FLOW
+		ExecutableFlow flow = runner.getExecutableFlow();
+		createExpectedStateMap(flow, expectedStateMap, nodeMap);
+		Thread thread = runFlowRunnerInThread(runner);
+		pause(250);
+		
+		// After it starts up, only joba should be running
+		expectedStateMap.put("joba", Status.RUNNING);
+		expectedStateMap.put("joba1", Status.RUNNING);
+		compareStates(expectedStateMap, nodeMap);
+
+		// 2. JOB A COMPLETES SUCCESSFULLY 
+		InteractiveTestJob.getTestJob("joba").succeedJob();
+		pause(250);
+		expectedStateMap.put("joba", Status.SUCCEEDED);
+		expectedStateMap.put("joba1", Status.RUNNING);
+		expectedStateMap.put("jobb", Status.RUNNING);
+		expectedStateMap.put("jobc", Status.RUNNING);
+		expectedStateMap.put("jobd", Status.RUNNING);
+		expectedStateMap.put("jobd:innerJobA", Status.RUNNING);
+		expectedStateMap.put("jobb:innerJobA", Status.RUNNING);
+		compareStates(expectedStateMap, nodeMap);
+
+		runner.pause("me");
+		pause(250);
+		Assert.assertEquals(flow.getStatus(), Status.PAUSED);
+		InteractiveTestJob.getTestJob("jobb:innerJobA").succeedJob();
+		InteractiveTestJob.getTestJob("jobd:innerJobA").failJob();
+		pause(250);
+		expectedStateMap.put("jobd:innerJobA", Status.FAILED);
+		expectedStateMap.put("jobb:innerJobA", Status.SUCCEEDED);
+		compareStates(expectedStateMap, nodeMap);
+		Assert.assertEquals(flow.getStatus(), Status.PAUSED);
+		
+		runner.resume("me");
+		pause(250);
+		expectedStateMap.put("jobb:innerJobB", Status.CANCELLED);
+		expectedStateMap.put("jobb:innerJobC", Status.CANCELLED);
+		expectedStateMap.put("jobb:innerFlow", Status.CANCELLED);
+		expectedStateMap.put("jobb", Status.KILLED);
+		expectedStateMap.put("jobd:innerFlow2", Status.CANCELLED);
+		expectedStateMap.put("jobd", Status.FAILED);
+
+		InteractiveTestJob.getTestJob("jobc").succeedJob();
+		InteractiveTestJob.getTestJob("joba1").succeedJob();
+		pause(250);
+		expectedStateMap.put("jobc", Status.SUCCEEDED);
+		expectedStateMap.put("joba1", Status.SUCCEEDED);
+		expectedStateMap.put("jobf", Status.CANCELLED);
+		expectedStateMap.put("jobe", Status.CANCELLED);
+		
+		compareStates(expectedStateMap, nodeMap);
 		Assert.assertEquals(Status.FAILED, flow.getStatus());
+		Assert.assertFalse(thread.isAlive());
+	}
+	
+	/**
+	 * Test the condition when a Finish all possible is called during a pause.
+	 * The Failure is not acted upon until the flow is resumed.
+	 * @throws Exception
+	 */
+	@Test
+	public void testPauseFailFinishAll() throws Exception {
+		EventCollectorListener eventCollector = new EventCollectorListener();
+		FlowRunner runner = createFlowRunner(eventCollector, "jobf", FailureAction.FINISH_ALL_POSSIBLE);
+		
+		Map<String, Status> expectedStateMap = new HashMap<String, Status>();
+		Map<String, ExecutableNode> nodeMap = new HashMap<String, ExecutableNode>();
+		
+		// 1. START FLOW
+		ExecutableFlow flow = runner.getExecutableFlow();
+		createExpectedStateMap(flow, expectedStateMap, nodeMap);
+		Thread thread = runFlowRunnerInThread(runner);
+		pause(250);
+		
+		// After it starts up, only joba should be running
+		expectedStateMap.put("joba", Status.RUNNING);
+		expectedStateMap.put("joba1", Status.RUNNING);
 		compareStates(expectedStateMap, nodeMap);
+
+		// 2. JOB A COMPLETES SUCCESSFULLY 
+		InteractiveTestJob.getTestJob("joba").succeedJob();
+		pause(250);
+		expectedStateMap.put("joba", Status.SUCCEEDED);
+		expectedStateMap.put("joba1", Status.RUNNING);
+		expectedStateMap.put("jobb", Status.RUNNING);
+		expectedStateMap.put("jobc", Status.RUNNING);
+		expectedStateMap.put("jobd", Status.RUNNING);
+		expectedStateMap.put("jobd:innerJobA", Status.RUNNING);
+		expectedStateMap.put("jobb:innerJobA", Status.RUNNING);
+		compareStates(expectedStateMap, nodeMap);
+
+		runner.pause("me");
+		pause(250);
+		Assert.assertEquals(flow.getStatus(), Status.PAUSED);
+		InteractiveTestJob.getTestJob("jobb:innerJobA").succeedJob();
+		InteractiveTestJob.getTestJob("jobd:innerJobA").failJob();
+		pause(250);
+		expectedStateMap.put("jobd:innerJobA", Status.FAILED);
+		expectedStateMap.put("jobb:innerJobA", Status.SUCCEEDED);
+		compareStates(expectedStateMap, nodeMap);
+	
+		runner.resume("me");
+		pause(250);
+		expectedStateMap.put("jobb:innerJobB", Status.RUNNING);
+		expectedStateMap.put("jobb:innerJobC", Status.RUNNING);
+		expectedStateMap.put("jobd:innerFlow2", Status.CANCELLED);
+		expectedStateMap.put("jobd", Status.FAILED);
+
+		InteractiveTestJob.getTestJob("jobc").succeedJob();
+		InteractiveTestJob.getTestJob("joba1").succeedJob();
+		InteractiveTestJob.getTestJob("jobb:innerJobB").succeedJob();
+		InteractiveTestJob.getTestJob("jobb:innerJobC").succeedJob();
+		pause(250);
+		InteractiveTestJob.getTestJob("jobb:innerFlow").succeedJob();
+		pause(250);
+		expectedStateMap.put("jobc", Status.SUCCEEDED);
+		expectedStateMap.put("joba1", Status.SUCCEEDED);
+		expectedStateMap.put("jobb:innerJobB", Status.SUCCEEDED);
+		expectedStateMap.put("jobb:innerJobC", Status.SUCCEEDED);
+		expectedStateMap.put("jobb:innerFlow", Status.SUCCEEDED);
+		expectedStateMap.put("jobb", Status.SUCCEEDED);
+		expectedStateMap.put("jobe", Status.CANCELLED);
+		expectedStateMap.put("jobf", Status.CANCELLED);
+		
+		compareStates(expectedStateMap, nodeMap);
+		Assert.assertEquals(Status.FAILED, flow.getStatus());
 		Assert.assertFalse(thread.isAlive());
 	}
 	
+	/**
+	 * Tests the case when a flow is paused and a failure causes a kill. The
+	 * flow should die immediately regardless of the 'paused' status.
+	 * @throws Exception
+	 */
+	@Test
+	public void testPauseFailKill() throws Exception {
+		EventCollectorListener eventCollector = new EventCollectorListener();
+		FlowRunner runner = createFlowRunner(eventCollector, "jobf", FailureAction.CANCEL_ALL);
+		
+		Map<String, Status> expectedStateMap = new HashMap<String, Status>();
+		Map<String, ExecutableNode> nodeMap = new HashMap<String, ExecutableNode>();
+		
+		// 1. START FLOW
+		ExecutableFlow flow = runner.getExecutableFlow();
+		createExpectedStateMap(flow, expectedStateMap, nodeMap);
+		Thread thread = runFlowRunnerInThread(runner);
+		pause(250);
+		// After it starts up, only joba should be running
+		expectedStateMap.put("joba", Status.RUNNING);
+		expectedStateMap.put("joba1", Status.RUNNING);
+		compareStates(expectedStateMap, nodeMap);
+
+		// 2. JOB A COMPLETES SUCCESSFULLY 
+		InteractiveTestJob.getTestJob("joba").succeedJob();
+		pause(500);
+		expectedStateMap.put("joba", Status.SUCCEEDED);
+		expectedStateMap.put("joba1", Status.RUNNING);
+		expectedStateMap.put("jobb", Status.RUNNING);
+		expectedStateMap.put("jobc", Status.RUNNING);
+		expectedStateMap.put("jobd", Status.RUNNING);
+		expectedStateMap.put("jobd:innerJobA", Status.RUNNING);
+		expectedStateMap.put("jobb:innerJobA", Status.RUNNING);
+		compareStates(expectedStateMap, nodeMap);
+
+		runner.pause("me");
+		pause(250);
+		Assert.assertEquals(flow.getStatus(), Status.PAUSED);
+		InteractiveTestJob.getTestJob("jobd:innerJobA").failJob();
+		pause(250);
+		expectedStateMap.put("jobd:innerJobA", Status.FAILED);
+		expectedStateMap.put("jobd:innerFlow2", Status.CANCELLED);
+		expectedStateMap.put("jobd", Status.FAILED);
+		expectedStateMap.put("jobb:innerJobA", Status.KILLED);
+		expectedStateMap.put("jobb:innerJobB", Status.CANCELLED);
+		expectedStateMap.put("jobb:innerJobC", Status.CANCELLED);
+		expectedStateMap.put("jobb:innerFlow", Status.CANCELLED);
+		expectedStateMap.put("jobb", Status.KILLED);
+		expectedStateMap.put("jobc", Status.KILLED);
+		expectedStateMap.put("jobe", Status.CANCELLED);
+		expectedStateMap.put("jobf", Status.CANCELLED);
+		expectedStateMap.put("joba1", Status.KILLED);
+		compareStates(expectedStateMap, nodeMap);
+
+		Assert.assertEquals(Status.FAILED, flow.getStatus());
+		Assert.assertFalse(thread.isAlive());
+	}
+	
+	
 	private Thread runFlowRunnerInThread(FlowRunner runner) {
 		Thread thread = new Thread(runner);
 		thread.start();