azkaban-memoizeit

Details

diff --git a/src/java/azkaban/execapp/ExecutorServlet.java b/src/java/azkaban/execapp/ExecutorServlet.java
index 282c14c..00f19a2 100644
--- a/src/java/azkaban/execapp/ExecutorServlet.java
+++ b/src/java/azkaban/execapp/ExecutorServlet.java
@@ -201,7 +201,7 @@ public class ExecutorServlet extends HttpServlet implements ConnectorParams {
 				continue;
 			}
 			
-			if (flow.getUpdateTime() >= updateTime) {
+			if (flow.getUpdateTime() > updateTime) {
 				updateList.add(flow.toUpdateObject(updateTime));
 			}
 		}
diff --git a/src/java/azkaban/execapp/FlowRunner.java b/src/java/azkaban/execapp/FlowRunner.java
index 5d8b39d..c80a9ff 100644
--- a/src/java/azkaban/execapp/FlowRunner.java
+++ b/src/java/azkaban/execapp/FlowRunner.java
@@ -69,6 +69,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 	
 	private JobRunnerEventListener listener = new JobRunnerEventListener();
 	private Map<String, JobRunner> jobRunners = new ConcurrentHashMap<String, JobRunner>();
+	private Map<String, JobRunner> activeJobRunners = new ConcurrentHashMap<String, JobRunner>();
 	
 	// Used for pipelining
 	private Integer pipelineLevel = null;
@@ -300,6 +301,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 								try {
 									executorService.submit(runner);
 									jobRunners.put(node.getJobId(), runner);
+									activeJobRunners.put(node.getJobId(), runner);
 								} catch (RejectedExecutionException e) {
 									logger.error(e);
 								};
@@ -373,6 +375,10 @@ public class FlowRunner extends EventHandler implements Runnable {
 	}
 
 	private boolean isFlowFinished() {
+		if (!activeJobRunners.isEmpty()) {
+			return false;
+		}
+		
 		for (String end: flow.getEndNodes()) {
 			ExecutableNode node = flow.getExecutableNode(end);
 			if (!Status.isStatusFinished(node.getStatus()) ) {
@@ -521,7 +527,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 				watcher.stopWatcher();
 			}
 			
-			for (JobRunner runner : jobRunners.values()) {
+			for (JobRunner runner : activeJobRunners.values()) {
 				runner.cancel();
 			}
 			
@@ -672,7 +678,8 @@ public class FlowRunner extends EventHandler implements Runnable {
 			if (event.getType() == Type.JOB_FINISHED) {
 				synchronized(mainSyncObj) {
 					ExecutableNode node = runner.getNode();
-	
+					activeJobRunners.remove(node.getJobId());
+					
 					logger.info("Job Finished " + node.getJobId() + " with status " + node.getStatus());
 					if (runner.getOutputProps() != null) {
 						logger.info("Job " + node.getJobId() + " had output props.");
@@ -755,6 +762,6 @@ public class FlowRunner extends EventHandler implements Runnable {
 	}
 	
 	public int getNumRunningJobs() {
-		return jobRunners.size();
+		return activeJobRunners.size();
 	}
 }
\ No newline at end of file