azkaban-memoizeit

-Refactored JobRunner since it was screwing up the pipeline

10/8/2013 11:27:58 PM

Details

diff --git a/src/java/azkaban/execapp/event/EventHandler.java b/src/java/azkaban/execapp/event/EventHandler.java
index a71de50..450a4fc 100644
--- a/src/java/azkaban/execapp/event/EventHandler.java
+++ b/src/java/azkaban/execapp/event/EventHandler.java
@@ -16,6 +16,7 @@
 
 package azkaban.execapp.event;
 
+import java.util.ArrayList;
 import java.util.HashSet;
 
 public class EventHandler {
@@ -29,6 +30,7 @@ public class EventHandler {
 	}
 	
 	public void fireEventListeners(Event event) {
+		ArrayList<EventListener> listeners = new ArrayList<EventListener>(this.listeners);
 		for (EventListener listener: listeners) {
 			listener.handleEvent(event);
 		}
diff --git a/src/java/azkaban/execapp/event/FlowWatcher.java b/src/java/azkaban/execapp/event/FlowWatcher.java
index 1c24545..47ecf8f 100644
--- a/src/java/azkaban/execapp/event/FlowWatcher.java
+++ b/src/java/azkaban/execapp/event/FlowWatcher.java
@@ -78,17 +78,17 @@ public abstract class FlowWatcher {
 		return null;
 	}
 	
-	public synchronized void failAllWatches() {
-		logger.info("Failing all watches on " + execId);
+	public synchronized void unblockAllWatches() {
+		logger.info("Unblock all watches on " + execId);
 		cancelWatch = true;
 		
 		for(BlockingStatus status : map.values()) {
 			logger.info("Unblocking " + status.getJobId());
-			status.changeStatus(Status.KILLED);
+			status.changeStatus(Status.SKIPPED);
 			status.unblock();
 		}
 		
-		logger.info("Successfully failed all watches on " + execId);
+		logger.info("Successfully unblocked all watches on " + execId);
 	}
 	
 	public boolean isWatchCancelled() {
diff --git a/src/java/azkaban/execapp/event/LocalFlowWatcher.java b/src/java/azkaban/execapp/event/LocalFlowWatcher.java
index 1dd4ba5..3907b43 100644
--- a/src/java/azkaban/execapp/event/LocalFlowWatcher.java
+++ b/src/java/azkaban/execapp/event/LocalFlowWatcher.java
@@ -32,7 +32,7 @@ public class LocalFlowWatcher extends FlowWatcher {
 		runner = null;
 		
 		getLogger().info("Stopping watcher, and unblocking pipeline");
-		super.failAllWatches();
+		super.unblockAllWatches();
 	}
 
 	public class LocalFlowWatcherListener implements EventListener {
@@ -51,7 +51,7 @@ public class LocalFlowWatcher extends FlowWatcher {
 					// A job runner is finished
 					JobRunner runner = (JobRunner)event.getRunner();
 					ExecutableNode node = runner.getNode();
-					
+					System.out.println(node + " looks like " + node.getStatus());
 					handleJobStatusChange(node.getNestedId(), node.getStatus());
 				}
 			}
diff --git a/src/java/azkaban/execapp/event/RemoteFlowWatcher.java b/src/java/azkaban/execapp/event/RemoteFlowWatcher.java
index 9d1c407..811f5c6 100644
--- a/src/java/azkaban/execapp/event/RemoteFlowWatcher.java
+++ b/src/java/azkaban/execapp/event/RemoteFlowWatcher.java
@@ -102,7 +102,7 @@ public class RemoteFlowWatcher extends FlowWatcher {
 		if (thread != null) {
 			thread.interrupt();
 		}
-		super.failAllWatches();
+		super.unblockAllWatches();
 		loader = null;
 		flow = null;
 	}
diff --git a/src/java/azkaban/execapp/FlowRunner.java b/src/java/azkaban/execapp/FlowRunner.java
index fbf4065..d4e18ed 100644
--- a/src/java/azkaban/execapp/FlowRunner.java
+++ b/src/java/azkaban/execapp/FlowRunner.java
@@ -57,8 +57,8 @@ import azkaban.utils.PropsUtils;
 public class FlowRunner extends EventHandler implements Runnable {
 	private static final Layout DEFAULT_LAYOUT = new PatternLayout("%d{dd-MM-yyyy HH:mm:ss z} %c{1} %p - %m\n");
 	// We check update every 5 minutes, just in case things get stuck. But for the most part, we'll be idling.
-	private static final long CHECK_WAIT_MS = 5*60*1000;
-	
+	//private static final long CHECK_WAIT_MS = 5*60*1000;
+	private static final long CHECK_WAIT_MS = 30*1000;
 	private Logger logger;
 	private Layout loggerLayout = DEFAULT_LAYOUT;
 	private Appender flowAppender;
@@ -508,7 +508,9 @@ public class FlowRunner extends EventHandler implements Runnable {
 			}
 		}
 		// setting this fake source as this will be used to determine the location of log files.
-		props.setSource(path.getPath());
+		if (path.getPath() != null) {
+			props.setSource(path.getPath());
+		}
 		return props;
 	}
 	
@@ -794,9 +796,10 @@ public class FlowRunner extends EventHandler implements Runnable {
 								}
 							}
 						}
+						
 					}
-					
 					updateFlow();
+
 					interrupt();
 					fireEventListeners(event);
 				}
diff --git a/src/java/azkaban/execapp/JobRunner.java b/src/java/azkaban/execapp/JobRunner.java
index f948192..a684307 100644
--- a/src/java/azkaban/execapp/JobRunner.java
+++ b/src/java/azkaban/execapp/JobRunner.java
@@ -222,94 +222,157 @@ public class JobRunner extends EventHandler implements Runnable {
 		}
 	}
 	
-	@Override
-	public void run() {
-		Thread.currentThread().setName("JobRunner-" + this.jobId + "-" + executionId);
+	/**
+	 * Used to handle non-ready and special status's (i.e. KILLED). Returns true
+	 * if they handled anything.
+	 * 
+	 * @return
+	 */
+	private boolean handleNonReadyStatus() {
+		Status nodeStatus = node.getStatus();
+		boolean quickFinish = false;
+		long time = System.currentTimeMillis();
 		
-		if (node.getStatus() == Status.DISABLED) {
-			node.setStartTime(System.currentTimeMillis());
-			fireEvent(Event.create(this, Type.JOB_STARTED, null, false));
-			node.setStatus(Status.SKIPPED);
-			node.setEndTime(System.currentTimeMillis());
-			fireEvent(Event.create(this, Type.JOB_FINISHED));
-			return;
-		} else if (this.cancelled) {
-			node.setStartTime(System.currentTimeMillis());
-			fireEvent(Event.create(this, Type.JOB_STARTED, null, false));
-			node.setStatus(Status.FAILED);
-			node.setEndTime(System.currentTimeMillis());
-			fireEvent(Event.create(this, Type.JOB_FINISHED));
-		} else if (node.getStatus() == Status.FAILED || node.getStatus() == Status.KILLED) {
-			node.setStartTime(System.currentTimeMillis());
+		if (this.isCancelled() || Status.isStatusFinished(nodeStatus)) {
+			quickFinish = true;
+		}
+		else if (nodeStatus == Status.DISABLED) {
+			changeStatus(Status.SKIPPED, time);
+			quickFinish = true;
+		} 
+		else if (this.cancelled) {
+			changeStatus(Status.FAILED, time);
+			quickFinish = true;
+		} 
+		
+		if (quickFinish) {
+			node.setStartTime(time);
 			fireEvent(Event.create(this, Type.JOB_STARTED, null, false));
-			node.setEndTime(System.currentTimeMillis());
+			node.setEndTime(time);
 			fireEvent(Event.create(this, Type.JOB_FINISHED));
-			return;
+			return true;
 		}
-		else {
-			createLogger();
-			node.setUpdateTime(System.currentTimeMillis());
-
-			// For pipelining of jobs. Will watch other jobs.
-			if (!pipelineJobs.isEmpty()) {
-				String blockedList = "";
-				ArrayList<BlockingStatus> blockingStatus = new ArrayList<BlockingStatus>();
-				for (String waitingJobId : pipelineJobs) {
-					Status status = watcher.peekStatus(waitingJobId);
-					if (status != null && !Status.isStatusFinished(status)) {
-						BlockingStatus block = watcher.getBlockingStatus(waitingJobId);
-						blockingStatus.add(block);
-						blockedList += waitingJobId + ",";
-					}
+		
+		return false;
+	}
+	
+	/**
+	 * If pipelining is set, will block on another flow's jobs.
+	 */
+	private boolean blockOnPipeLine() {
+		if (this.isCancelled()) {
+			return true;
+		}
+		
+		// For pipelining of jobs. Will watch other jobs.
+		if (!pipelineJobs.isEmpty()) {
+			String blockedList = "";
+			ArrayList<BlockingStatus> blockingStatus = new ArrayList<BlockingStatus>();
+			for (String waitingJobId : pipelineJobs) {
+				Status status = watcher.peekStatus(waitingJobId);
+				if (status != null && !Status.isStatusFinished(status)) {
+					BlockingStatus block = watcher.getBlockingStatus(waitingJobId);
+					blockingStatus.add(block);
+					blockedList += waitingJobId + ",";
 				}
-				if (!blockingStatus.isEmpty()) {
-					logger.info("Pipeline job " + this.jobId + " waiting on " + blockedList + " in execution " + watcher.getExecId());
-					
-					for(BlockingStatus bStatus: blockingStatus) {
-						logger.info("Waiting on pipelined job " + bStatus.getJobId());
-						currentBlockStatus = bStatus;
-						bStatus.blockOnFinishedStatus();
+			}
+			if (!blockingStatus.isEmpty()) {
+				logger.info("Pipeline job " + this.jobId + " waiting on " + blockedList + " in execution " + watcher.getExecId());
+				
+				for(BlockingStatus bStatus: blockingStatus) {
+					logger.info("Waiting on pipelined job " + bStatus.getJobId());
+					currentBlockStatus = bStatus;
+					bStatus.blockOnFinishedStatus();
+					if (this.isCancelled()) {
+						logger.info("Job was cancelled while waiting on pipeline. Quiting.");
+						return true;
+					}
+					else {
 						logger.info("Pipelined job " + bStatus.getJobId() + " finished.");
-						if (watcher.isWatchCancelled()) {
-							break;
-						}
 					}
-					writeStatus();	
-					fireEvent(Event.create(this, Type.JOB_STATUS_CHANGED));
 				}
-				if (watcher.isWatchCancelled()) {
-					logger.info("Job was cancelled while waiting on pipeline. Quiting.");
-					node.setStartTime(System.currentTimeMillis());
-					node.setEndTime(System.currentTimeMillis());
-					node.setStatus(Status.FAILED);
-					fireEvent(Event.create(this, Type.JOB_FINISHED));
-					return;
+			}
+		}
+		
+		currentBlockStatus = null;
+		return false;
+	}
+	
+	private boolean delayExecution() {
+		if (this.isCancelled()) {
+			return true;
+		}
+		
+		long currentTime = System.currentTimeMillis();
+		if (delayStartMs > 0) {
+			logger.info("Delaying start of execution for " + delayStartMs + " milliseconds.");
+			synchronized(this) {
+				try {
+					this.wait(delayStartMs);
+					logger.info("Execution has been delayed for " + delayStartMs + " ms. Continuing with execution.");
+				} catch (InterruptedException e) {
+					logger.error("Job " + this.jobId + " was to be delayed for " + delayStartMs + ". Interrupted after " + (System.currentTimeMillis() - currentTime));
 				}
 			}
 			
-			currentBlockStatus = null;
-			long currentTime = System.currentTimeMillis();
-			if (delayStartMs > 0) {
-				logger.info("Delaying start of execution for " + delayStartMs + " milliseconds.");
-				synchronized(this) {
-					try {
-						this.wait(delayStartMs);
-						logger.info("Execution has been delayed for " + delayStartMs + " ms. Continuing with execution.");
-					} catch (InterruptedException e) {
-						logger.error("Job " + this.jobId + " was to be delayed for " + delayStartMs + ". Interrupted after " + (System.currentTimeMillis() - currentTime));
+			if (this.isCancelled()) {
+				logger.info("Job was cancelled while in delay. Quiting.");
+				return true;
+			}
+		}
+		
+		return false;
+	}
+	
+	private void finalizeLogFile() {
+		closeLogger();
+		
+		if (logFile != null) {
+			try {
+				File[] files = logFile.getParentFile().listFiles(new FilenameFilter() {
+					
+					@Override
+					public boolean accept(File dir, String name) {
+						return name.startsWith(logFile.getName());
 					}
-				}
+				} 
+				);
+				Arrays.sort(files, Collections.reverseOrder());
 				
-				if (cancelled) {
-					logger.info("Job was cancelled while in delay. Quiting.");
-					node.setStartTime(System.currentTimeMillis());
-					node.setEndTime(System.currentTimeMillis());
-					fireEvent(Event.create(this, Type.JOB_FINISHED));
-					return;
-				}
+				loader.uploadLogFile(executionId, this.jobId, node.getAttempt(), files);
+			} catch (ExecutorManagerException e) {
+				flowLogger.error("Error writing out logs for job " + this.jobId, e);
 			}
-			
-			node.setStartTime(System.currentTimeMillis());
+		}
+		else {
+			flowLogger.info("Log file for job " + this.jobId + " is null");
+		}
+	}
+	
+	/**
+	 * The main run thread.
+	 * 
+	 */
+	@Override
+	public void run() {
+		Thread.currentThread().setName("JobRunner-" + this.jobId + "-" + executionId);
+		
+		// If the job is cancelled, disabled, killed. No log is created in this case
+		if (handleNonReadyStatus()) {
+			return;
+		}
+
+		createLogger();
+		boolean errorFound = false;
+		// Delay execution if necessary. Will return a true if something went wrong.
+		errorFound |= delayExecution();
+
+		// For pipelining of jobs. Will watch other jobs. Will return true if something went wrong.
+		errorFound |= blockOnPipeLine();
+
+		// Start the node.
+		node.setStartTime(System.currentTimeMillis());
+		if (!errorFound && !isCancelled()) {
 			fireEvent(Event.create(this, Type.JOB_STARTED, null, false));
 			try {
 				loader.uploadExecutableNode(node, props);
@@ -318,55 +381,26 @@ public class JobRunner extends EventHandler implements Runnable {
 			}
 			
 			if (prepareJob()) {
+				// Writes status to the db
 				writeStatus();
 				fireEvent(Event.create(this, Type.JOB_STATUS_CHANGED), false);
 				runJob();
+				writeStatus();
 			}
 			else {
-				node.setStatus(Status.FAILED);
+				changeStatus(Status.FAILED);
 				logError("Job run failed preparing the job.");
 			}
-			
-			node.setEndTime(System.currentTimeMillis());
-
-			logInfo("Finishing job " + this.jobId + " at " + node.getEndTime());
-
-			closeLogger();
-			writeStatus();
-			
-			if (logFile != null) {
-				try {
-					File[] files = logFile.getParentFile().listFiles(new FilenameFilter() {
-						
-						@Override
-						public boolean accept(File dir, String name) {
-							return name.startsWith(logFile.getName());
-						}
-					} 
-					);
-					Arrays.sort(files, Collections.reverseOrder());
-					
-					loader.uploadLogFile(executionId, this.jobId, node.getAttempt(), files);
-				} catch (ExecutorManagerException e) {
-					flowLogger.error("Error writing out logs for job " + this.jobId, e);
-				}
-			}
-			else {
-				flowLogger.info("Log file for job " + this.jobId + " is null");
-			}
 		}
-		fireEvent(Event.create(this, Type.JOB_FINISHED));
-	}
-	
-	private void fireEvent(Event event) {
-		fireEvent(event, true);
-	}
-	
-	private void fireEvent(Event event, boolean updateTime) {
-		if (updateTime) {
-			node.setUpdateTime(System.currentTimeMillis());
+		node.setEndTime(System.currentTimeMillis());
+
+		if (isCancelled()) {
+			changeStatus(Status.FAILED);
 		}
-		this.fireEventListeners(event);
+		logInfo("Finishing job " + this.jobId + " at " + node.getEndTime());
+		
+		fireEvent(Event.create(this, Type.JOB_FINISHED), false);
+		finalizeLogFile();
 	}
 	
 	private boolean prepareJob() throws RuntimeException {
@@ -396,8 +430,8 @@ public class JobRunner extends EventHandler implements Runnable {
 			
 			props.put(CommonJobProperties.JOB_ATTEMPT, node.getAttempt());
 			props.put(CommonJobProperties.JOB_METADATA_FILE, createMetaDataFileName(executionId, this.jobId, node.getAttempt()));
-			node.setStatus(Status.RUNNING);
-
+			changeStatus(Status.RUNNING);
+			
 			// Ability to specify working directory
 			if (!props.containsKey(AbstractProcessJob.WORKING_DIR)) {
 				props.put(AbstractProcessJob.WORKING_DIR, workingDir.getAbsolutePath());
@@ -430,22 +464,45 @@ public class JobRunner extends EventHandler implements Runnable {
 			e.printStackTrace();
 			
 			if (props.getBoolean("job.succeed.on.failure", false)) {
-				node.setStatus(Status.FAILED_SUCCEEDED);
+				changeStatus(Status.FAILED_SUCCEEDED);
 				logError("Job run failed, but will treat it like success.");
 				logError(e.getMessage() + e.getCause());
 			}
 			else {
-				node.setStatus(Status.FAILED);
+				changeStatus(Status.FAILED);
 				logError("Job run failed!");
 				logError(e.getMessage() + e.getCause());
 			}
-			return;
 		}
-
-		node.setStatus(Status.SUCCEEDED);
+		
 		if (job != null) {
 			node.setOutputProps(job.getJobGeneratedProperties());
 		}
+		
+		// If the job is still running, set the status to Success.
+		if (!Status.isStatusFinished(node.getStatus())) {
+			changeStatus(Status.SUCCEEDED);
+		}
+	}
+	
+	private void changeStatus(Status status) {
+		changeStatus(status, System.currentTimeMillis());
+	}
+	
+	private void changeStatus(Status status, long time) {
+		node.setStatus(status);
+		node.setUpdateTime(time);
+	}
+	
+	private void fireEvent(Event event) {
+		fireEvent(event, true);
+	}
+	
+	private void fireEvent(Event event, boolean updateTime) {
+		if (updateTime) {
+			node.setUpdateTime(System.currentTimeMillis());
+		}
+		this.fireEventListeners(event);
 	}
 	
 	public void cancel() {
diff --git a/src/java/azkaban/executor/ExecutableFlowBase.java b/src/java/azkaban/executor/ExecutableFlowBase.java
index 35cd4f8..798f40a 100644
--- a/src/java/azkaban/executor/ExecutableFlowBase.java
+++ b/src/java/azkaban/executor/ExecutableFlowBase.java
@@ -305,7 +305,7 @@ public class ExecutableFlowBase extends ExecutableNode {
 					((ExecutableFlowBase)exNode).applyUpdateObject(node, updatedNodes);
 				}
 				else {
-					exNode.applyUpdateObject(updateData);
+					exNode.applyUpdateObject(node);
 				}
 			}
 		}
diff --git a/src/java/azkaban/executor/ExecutableNode.java b/src/java/azkaban/executor/ExecutableNode.java
index db5fd8d..79942e9 100644
--- a/src/java/azkaban/executor/ExecutableNode.java
+++ b/src/java/azkaban/executor/ExecutableNode.java
@@ -241,7 +241,7 @@ public class ExecutableNode {
 	}
 	
 	public String getPrintableId(String delimiter) {
-		if (this.getParentFlow() instanceof ExecutableFlow) {
+		if (this.getParentFlow() == null || this.getParentFlow() instanceof ExecutableFlow) {
 			return getId();
 		}
 		return getParentFlow().getPrintableId(delimiter) + delimiter + getId();
diff --git a/unit/java/azkaban/test/execapp/event/LocalFlowWatcherTest.java b/unit/java/azkaban/test/execapp/event/LocalFlowWatcherTest.java
index aac1fb1..c87f2c6 100644
--- a/unit/java/azkaban/test/execapp/event/LocalFlowWatcherTest.java
+++ b/unit/java/azkaban/test/execapp/event/LocalFlowWatcherTest.java
@@ -190,6 +190,7 @@ public class LocalFlowWatcherTest {
 						" start: " + node.getStartTime() + 
 						" dependent on " + watchedChild + " " + child.getEndTime() +
 						" diff: " + diff);
+				
 				Assert.assertTrue(node.getStartTime() >= child.getEndTime());
 			}
 			
diff --git a/unit/java/azkaban/test/execapp/event/RemoteFlowWatcherTest.java b/unit/java/azkaban/test/execapp/event/RemoteFlowWatcherTest.java
index fdff895..45c3a85 100644
--- a/unit/java/azkaban/test/execapp/event/RemoteFlowWatcherTest.java
+++ b/unit/java/azkaban/test/execapp/event/RemoteFlowWatcherTest.java
@@ -14,6 +14,7 @@ import azkaban.execapp.FlowRunner;
 import azkaban.execapp.event.FlowWatcher;
 import azkaban.execapp.event.RemoteFlowWatcher;
 import azkaban.executor.ExecutableFlow;
+import azkaban.executor.ExecutableFlowBase;
 import azkaban.executor.ExecutableNode;
 import azkaban.executor.ExecutionOptions;
 import azkaban.executor.ExecutorLoader;
@@ -72,8 +73,10 @@ public class RemoteFlowWatcherTest {
 		FlowRunner runner2 = createFlowRunner(workingDir2, loader, eventCollector, "exec1", 2, watcher, 2);
 		Thread runner2Thread = new Thread(runner2);
 		
+		printCurrentState("runner1 ", runner1.getExecutableFlow());
 		runner1Thread.start();
 		runner2Thread.start();
+		
 		runner2Thread.join();
 		
 		FileUtils.deleteDirectory(workingDir1);
@@ -222,6 +225,16 @@ public class RemoteFlowWatcherTest {
 		return runner;
 	}
 	
+	private void printCurrentState(String prefix, ExecutableFlowBase flow) {
+		for(ExecutableNode node: flow.getExecutableNodes()) {
+
+			System.err.println(prefix + node.getNestedId() + "->" + node.getStatus().name());
+			if (node instanceof ExecutableFlowBase) {
+				printCurrentState(prefix, (ExecutableFlowBase)node);
+			}
+		}
+	}
+	
 	private ExecutableFlow prepareExecDir(File workingDir, File execDir, String flowName, int execId) throws IOException {
 		FileUtils.copyDirectory(execDir, workingDir);
 		
diff --git a/unit/java/azkaban/test/execapp/FlowRunnerPipelineTest.java b/unit/java/azkaban/test/execapp/FlowRunnerPipelineTest.java
index d20f2f6..1ef8804 100644
--- a/unit/java/azkaban/test/execapp/FlowRunnerPipelineTest.java
+++ b/unit/java/azkaban/test/execapp/FlowRunnerPipelineTest.java
@@ -20,7 +20,6 @@ import azkaban.executor.ExecutableFlow;
 import azkaban.executor.ExecutableFlowBase;
 import azkaban.executor.ExecutableNode;
 import azkaban.executor.ExecutionOptions;
-import azkaban.executor.ExecutionOptions.FailureAction;
 import azkaban.executor.ExecutorLoader;
 import azkaban.executor.Status;
 import azkaban.flow.Flow;
@@ -31,8 +30,28 @@ import azkaban.project.ProjectManagerException;
 import azkaban.test.executor.InteractiveTestJob;
 import azkaban.test.executor.JavaJob;
 import azkaban.utils.DirectoryFlowLoader;
-import azkaban.utils.Props;
 
+/**
+ * Flows in this test:
+ * joba 
+ * jobb
+ * joba1
+ * jobc->joba
+ * jobd->joba
+ * jobe->jobb,jobc,jobd
+ * jobf->jobe,joba1
+ * 
+ * jobb = innerFlow
+ * innerJobA
+ * innerJobB->innerJobA
+ * innerJobC->innerJobB
+ * innerFlow->innerJobB,innerJobC
+ * 
+ * jobd=innerFlow2
+ * innerFlow2->innerJobA
+ * @author rpark
+ *
+ */
 public class FlowRunnerPipelineTest {
 	private File workingDir;
 	private JobTypeManager jobtypeManager;
@@ -174,10 +193,70 @@ public class FlowRunnerPipelineTest {
 		compareStates(previousExpectedStateMap, previousNodeMap);
 		compareStates(pipelineExpectedStateMap, pipelineNodeMap);
 		
-//		Assert.assertEquals(Status.SUCCEEDED, pipelineFlow.getStatus());
-//		Assert.assertEquals(Status.SUCCEEDED, previousFlow.getStatus());
-//		Assert.assertFalse(thread1.isAlive());
-//		Assert.assertFalse(thread2.isAlive());
+		InteractiveTestJob.getTestJob("prev:jobb:innerFlow").succeedJob();
+		InteractiveTestJob.getTestJob("pipe:jobc").succeedJob();
+		pause(250);
+		previousExpectedStateMap.put("jobb:innerFlow", Status.SUCCEEDED);
+		previousExpectedStateMap.put("jobb", Status.SUCCEEDED);
+		previousExpectedStateMap.put("jobe", Status.RUNNING);
+		pipelineExpectedStateMap.put("jobc", Status.SUCCEEDED);
+		
+		compareStates(previousExpectedStateMap, previousNodeMap);
+		compareStates(pipelineExpectedStateMap, pipelineNodeMap);
+		
+		InteractiveTestJob.getTestJob("pipe:jobb:innerJobB").succeedJob();
+		InteractiveTestJob.getTestJob("pipe:jobb:innerJobC").succeedJob();
+		InteractiveTestJob.getTestJob("prev:jobe").succeedJob();
+		pause(250);
+		previousExpectedStateMap.put("jobe", Status.SUCCEEDED);
+		pipelineExpectedStateMap.put("jobb:innerJobB", Status.SUCCEEDED);
+		pipelineExpectedStateMap.put("jobb:innerJobC", Status.SUCCEEDED);
+		pipelineExpectedStateMap.put("jobb:innerFlow", Status.RUNNING);
+		
+		compareStates(previousExpectedStateMap, previousNodeMap);
+		compareStates(pipelineExpectedStateMap, pipelineNodeMap);
+
+		InteractiveTestJob.getTestJob("pipe:jobd:innerJobA").succeedJob();
+		InteractiveTestJob.getTestJob("pipe:jobb:innerFlow").succeedJob();
+		pause(250);
+		pipelineExpectedStateMap.put("jobb", Status.SUCCEEDED);
+		pipelineExpectedStateMap.put("jobd:innerJobA", Status.SUCCEEDED);
+		pipelineExpectedStateMap.put("jobb:innerFlow", Status.SUCCEEDED);
+		pipelineExpectedStateMap.put("jobd:innerFlow2", Status.RUNNING);
+		compareStates(pipelineExpectedStateMap, pipelineNodeMap);
+		
+		InteractiveTestJob.getTestJob("pipe:jobd:innerFlow2").succeedJob();
+		InteractiveTestJob.getTestJob("prev:joba1").succeedJob();
+		pause(250);
+		pipelineExpectedStateMap.put("jobd:innerFlow2", Status.SUCCEEDED);
+		pipelineExpectedStateMap.put("jobd", Status.SUCCEEDED);
+		previousExpectedStateMap.put("jobf", Status.RUNNING);
+		previousExpectedStateMap.put("joba1", Status.SUCCEEDED);
+		pipelineExpectedStateMap.put("joba1", Status.RUNNING);
+		pipelineExpectedStateMap.put("jobe", Status.RUNNING);
+		compareStates(pipelineExpectedStateMap, pipelineNodeMap);
+		compareStates(previousExpectedStateMap, previousNodeMap);
+		
+		InteractiveTestJob.getTestJob("pipe:jobe").succeedJob();
+		InteractiveTestJob.getTestJob("prev:jobf").succeedJob();
+		pause(250);
+		pipelineExpectedStateMap.put("jobe", Status.SUCCEEDED);
+		previousExpectedStateMap.put("jobf", Status.SUCCEEDED);
+		Assert.assertEquals(Status.SUCCEEDED, previousFlow.getStatus());
+		compareStates(previousExpectedStateMap, previousNodeMap);
+		compareStates(pipelineExpectedStateMap, pipelineNodeMap);
+		
+		InteractiveTestJob.getTestJob("pipe:joba1").succeedJob();
+		pause(250);
+		pipelineExpectedStateMap.put("joba1", Status.SUCCEEDED);
+		pipelineExpectedStateMap.put("jobf", Status.RUNNING);
+		compareStates(pipelineExpectedStateMap, pipelineNodeMap);
+		
+		InteractiveTestJob.getTestJob("pipe:jobf").succeedJob();
+		pause(250);
+		Assert.assertEquals(Status.SUCCEEDED, pipelineFlow.getStatus());
+		Assert.assertFalse(thread1.isAlive());
+		Assert.assertFalse(thread2.isAlive());
 	}
 	
 	private Thread runFlowRunnerInThread(FlowRunner runner) {
@@ -234,6 +313,16 @@ public class FlowRunnerPipelineTest {
 		FileUtils.copyDirectory(directory, workingDir);
 	}
 	
+	private void printCurrentState(String prefix, ExecutableFlowBase flow) {
+		for(ExecutableNode node: flow.getExecutableNodes()) {
+
+			System.err.println(prefix + node.getNestedId() + "->" + node.getStatus().name());
+			if (node instanceof ExecutableFlowBase) {
+				printCurrentState(prefix, (ExecutableFlowBase)node);
+			}
+		}
+	}
+	
 	private FlowRunner createFlowRunner(EventCollectorListener eventCollector, String flowName, String groupName) throws Exception {
 		return createFlowRunner(eventCollector, flowName, groupName, new ExecutionOptions());
 	}
diff --git a/unit/java/azkaban/test/executor/ExecutableFlowTest.java b/unit/java/azkaban/test/executor/ExecutableFlowTest.java
index 4592720..35fa39d 100644
--- a/unit/java/azkaban/test/executor/ExecutableFlowTest.java
+++ b/unit/java/azkaban/test/executor/ExecutableFlowTest.java
@@ -178,10 +178,11 @@ public class ExecutableFlowTest {
 		exFlow.setUpdateTime(133);
 		
 		// Change one job and see if it updates
-		jobe.setEndTime(System.currentTimeMillis());
-		jobe.setUpdateTime(System.currentTimeMillis());
+		long time = System.currentTimeMillis();
+		jobe.setEndTime(time);
+		jobe.setUpdateTime(time);
 		jobe.setStatus(Status.DISABLED);
-		jobe.setStartTime(System.currentTimeMillis() - 1000);
+		jobe.setStartTime(time - 1);
 		// Should be one node that was changed
 		Map<String,Object> updateObject = exFlow.toUpdateObject(0);
 		Assert.assertEquals(1, ((List)(updateObject.get("nodes"))).size());
@@ -194,16 +195,18 @@ public class ExecutableFlowTest {
 		Assert.assertNull(updateObject.get("nodes"));
 		
 		// Change inner flow
-		jobbInnerFlowA.setEndTime(System.currentTimeMillis());
-		jobbInnerFlowA.setUpdateTime(System.currentTimeMillis());
+		long currentTime = time + 1 ;
+		jobbInnerFlowA.setEndTime(currentTime);
+		jobbInnerFlowA.setUpdateTime(currentTime);
 		jobbInnerFlowA.setStatus(Status.DISABLED);
-		jobbInnerFlowA.setStartTime(System.currentTimeMillis() - 1000);
+		jobbInnerFlowA.setStartTime(currentTime - 100);
 		// We should get 2 updates if we do a toUpdateObject using 0 as the start time
 		updateObject = exFlow.toUpdateObject(0);
 		Assert.assertEquals(2, ((List)(updateObject.get("nodes"))).size());
 
 		// This should provide 1 update. That we can apply
 		updateObject = exFlow.toUpdateObject(jobe.getUpdateTime());
+		Assert.assertNotNull(updateObject.get("nodes"));
 		Assert.assertEquals(1, ((List)(updateObject.get("nodes"))).size());
 		copyFlow.applyUpdateObject(updateObject);
 		testEquals(exFlow, copyFlow);
diff --git a/unit/java/azkaban/test/executor/JavaJobRunnerMain.java b/unit/java/azkaban/test/executor/JavaJobRunnerMain.java
index 9add12a..d1766e6 100644
--- a/unit/java/azkaban/test/executor/JavaJobRunnerMain.java
+++ b/unit/java/azkaban/test/executor/JavaJobRunnerMain.java
@@ -185,7 +185,9 @@ public class JavaJobRunnerMain {
 			new RuntimeException("Unable to store output properties to: " + outputFileStr);
 		} finally {
 			try {
-				writer.close();
+				if (writer != null) {
+					writer.close();
+				}
 			} catch (IOException e) {
 			}
 		}