azkaban-memoizeit

Details

diff --git a/src/java/azkaban/execapp/FlowRunner.java b/src/java/azkaban/execapp/FlowRunner.java
index 01898bb..dcef893 100644
--- a/src/java/azkaban/execapp/FlowRunner.java
+++ b/src/java/azkaban/execapp/FlowRunner.java
@@ -333,12 +333,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 					continue;
 				}
 				else {
-					if (!progressGraph(flow)) {
-						if (flow.isFlowFinished() ) {
-							flowFinished = true;
-							break;
-						}
-						
+					if (!progressGraph()) {
 						try {
 							mainSyncObj.wait(CHECK_WAIT_MS);
 						} catch (InterruptedException e) {
@@ -356,41 +351,73 @@ public class FlowRunner extends EventHandler implements Runnable {
 		logger.info("Finished Flow");
 	}
 	
-	private boolean progressGraph(ExecutableFlowBase flow) throws IOException {
+	private boolean progressGraph() throws IOException {
 		List<ExecutableNode> jobsReadyToRun = flow.findNextJobsToRun();
 
-		if (!jobsReadyToRun.isEmpty()) {
-			long currentTime = System.currentTimeMillis();
-			for (ExecutableNode node: jobsReadyToRun) {
-				Status nextStatus = getImpliedStatus(node);
-				
-				// If the flow has seen previous failures and the flow has been cancelled, than 
-				if (nextStatus == Status.KILLED) {
-					logger.info("Killing " + node.getId() + " due to prior errors.");
-					node.killNode(currentTime);
-					finalizeFlowIfFinished(node.getParentFlow());
-					fireEventListeners(Event.create(this, Type.JOB_FINISHED, node));
-				}
-				else if (nextStatus == Status.DISABLED) {
-					logger.info("Skipping disabled job " + node.getId() + ".");
-					node.skipNode(currentTime);
-					finalizeFlowIfFinished(node.getParentFlow());
-					fireEventListeners(Event.create(this, Type.JOB_FINISHED, node));
-				}
-				else {
-					runExecutableNode(node);
-				}
+		// If its the current flow
+		if (jobsReadyToRun.size() == 1 && jobsReadyToRun.get(0) == flow) {
+			flowFinished = true;
+			return true;
+		}
+
+		long currentTime = System.currentTimeMillis();
+		for (ExecutableNode node: jobsReadyToRun) {
+			Status nextStatus = getImpliedStatus(node);
+			
+			if (node instanceof ExecutableFlowBase && ((ExecutableFlowBase)node).isFlowFinished()) {
+				finalizeFlow((ExecutableFlowBase)node);
+			}
+			else if (nextStatus == Status.KILLED || isCancelled()) {
+				logger.info("Killing " + node.getId() + " due to prior errors.");
+				node.killNode(currentTime);
+				fireEventListeners(Event.create(this, Type.JOB_FINISHED, node));
 			}
+			else if (nextStatus == Status.DISABLED) {
+				logger.info("Skipping disabled job " + node.getId() + ".");
+				node.skipNode(currentTime);
+				fireEventListeners(Event.create(this, Type.JOB_FINISHED, node));
+			}
+			else {
+				runExecutableNode(node);
+			}
+		}
 			
+		if (!jobsReadyToRun.isEmpty()) {
 			updateFlow();
 			return true;
 		}
-		
-		return false;
+		else {
+			return false;
+		}
 	}
 	
 	private void finalizeFlow(ExecutableFlowBase flow) {
 		String id = flow == this.flow ? "" : flow.getPrintableId() + " ";
+
+		// If it's not the starting flow, we'll create set of output props
+		// for the finished flow.
+		boolean succeeded = true;
+		Props previousOutput = null;
+		
+		for(String end: flow.getEndNodes()) {
+			ExecutableNode node = flow.getExecutableNode(end);
+
+			if (node.getStatus() == Status.KILLED) {
+				succeeded = false;
+			}
+			
+			Props output = node.getOutputProps();
+			if (output != null) {
+				output = Props.clone(output);
+				output.setParent(previousOutput);
+				previousOutput = output;
+			}
+		}
+		
+		flow.setOutputProps(previousOutput);
+		if (!succeeded && (flow.getStatus() == Status.RUNNING)) {
+			flow.setStatus(Status.KILLED);
+		}
 		
 		flow.setEndTime(System.currentTimeMillis());
 		switch(flow.getStatus()) {
@@ -653,11 +680,6 @@ public class FlowRunner extends EventHandler implements Runnable {
 			for (JobRunner runner : activeJobRunners) {
 				runner.cancel();
 			}
-			
-			if (flow.getStatus() != Status.FAILED && flow.getStatus() != Status.FAILED_FINISHING) {
-				logger.info("Setting flow status to " + Status.KILLED.toString());
-				flow.setStatus(Status.KILLED);
-			}
 		}
 	}
 	
@@ -725,7 +747,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 	private class JobRunnerEventListener implements EventListener {
 		public JobRunnerEventListener() {
 		}
-		// TODO: HANDLE subflow execution
+		
 		@Override
 		public synchronized void handleEvent(Event event) {
 			JobRunner runner = (JobRunner)event.getRunner();
@@ -738,33 +760,33 @@ public class FlowRunner extends EventHandler implements Runnable {
 					ExecutableNode node = runner.getNode();
 					activeJobRunners.remove(node.getId());
 					
-					String id = node.getPrintableId(":");
+					String id = node.getPrintableId();
 					logger.info("Job Finished " + id + " with status " + node.getStatus());
 					if (node.getOutputProps() != null && node.getOutputProps().size() > 0) {
 						logger.info("Job " + id + " had output props.");
 					}
-
+					
 					if (node.getStatus() == Status.FAILED) {
-						// Retry failure if conditions are met.
-						if (!runner.isCancelled() && runner.getRetries() > node.getAttempt()) {
+						if (runner.getRetries() > node.getAttempt()) {
 							logger.info("Job " + id + " will be retried. Attempt " + node.getAttempt() + " of " + runner.getRetries());
 							node.setDelayedExecution(runner.getRetryBackoff());
 							node.resetForRetry();
 						}
 						else {
-							if (!runner.isCancelled() && runner.getRetries() > 0) {
+							if (runner.getRetries() > 0) {
 								logger.info("Job " + id + " has run out of retry attempts");
 								// Setting delayed execution to 0 in case this is manually re-tried.
 								node.setDelayedExecution(0);
 							}
-
+							
 							flowFailed = true;
 							
 							// The KILLED status occurs when cancel is invoked. We want to keep this
 							// status even in failure conditions.
-							if (flow.getStatus() != Status.KILLED && flow.getStatus() != Status.FAILED) {
+							if (flow.getStatus() != Status.KILLED) {
+								// During a failure, we propagate the failure to parent flows
 								propagateStatus(node.getParentFlow(), Status.FAILED_FINISHING);
-
+	
 								if (failureAction == FailureAction.CANCEL_ALL && !flowCancelled) {
 									logger.info("Flow failed. Failure option is Cancel All. Stopping execution.");
 									cancel();
@@ -772,59 +794,24 @@ public class FlowRunner extends EventHandler implements Runnable {
 							}
 						}
 					}
-					finalizeFlowIfFinished(node.getParentFlow());
+					
 					updateFlow();
 					interrupt();
-	
 					fireEventListeners(event);
 				}
 			}
 		}
 		
 		private void propagateStatus(ExecutableFlowBase base, Status status) {
-			base.setStatus(status);
-			if (base.getParentFlow() != null) {
-				propagateStatus(base.getParentFlow(), status);
-			}
-		}
-	}
-
-	private void finalizeFlowIfFinished(ExecutableFlowBase base) {
-		// We let main thread finalize the main flow. 
-		if (base == flow) {
-			return;
-		}
-		
-		if (base.isFlowFinished()) {
-			boolean succeeded = true;
-			Props previousOutput = null;
-			for(String end: base.getEndNodes()) {
-				ExecutableNode node = base.getExecutableNode(end);
-	
-				if (node.getStatus() == Status.KILLED) {
-					succeeded = false;
+			if (!Status.isStatusFinished(base.getStatus())) {
+				base.setStatus(status);
+				if (base.getParentFlow() != null) {
+					propagateStatus(base.getParentFlow(), status);
 				}
-				
-				Props output = node.getOutputProps();
-				if (output != null) {
-					output = Props.clone(output);
-					output.setParent(previousOutput);
-					previousOutput = output;
-				}
-			}
-			
-			if (!succeeded && (base.getStatus() == Status.RUNNING)) {
-				base.setStatus(Status.KILLED);
-			}
-			base.setOutputProps(previousOutput);
-			finalizeFlow(base);
-
-			if (base.getParentFlow() != null) {
-				finalizeFlowIfFinished(base.getParentFlow());
 			}
 		}
 	}
-	
+
 	public boolean isCancelled() {
 		return flowCancelled;
 	}
diff --git a/src/java/azkaban/executor/ExecutableFlowBase.java b/src/java/azkaban/executor/ExecutableFlowBase.java
index cda8279..e99bbe2 100644
--- a/src/java/azkaban/executor/ExecutableFlowBase.java
+++ b/src/java/azkaban/executor/ExecutableFlowBase.java
@@ -317,34 +317,42 @@ public class ExecutableFlowBase extends ExecutableNode {
 	 * Finds all jobs which are ready to run. This occurs when all of its 
 	 * dependency nodes are finished running.
 	 * 
+	 * It will also return any subflow that has been completed such that the
+	 * FlowRunner can properly handle them.
+	 * 
 	 * @param flow
 	 * @return
 	 */
 	public List<ExecutableNode> findNextJobsToRun() {
 		ArrayList<ExecutableNode> jobsToRun = new ArrayList<ExecutableNode>();
 		
-		nodeloop:
-		for (ExecutableNode node: executableNodes.values()) {
-			if(Status.isStatusFinished(node.getStatus())) {
-				continue;
-			}
-
-			if ((node instanceof ExecutableFlowBase) && Status.isStatusRunning(node.getStatus())) {
-				// If the flow is still running, we traverse into the flow
-				jobsToRun.addAll(((ExecutableFlowBase)node).findNextJobsToRun());
-			}
-			else if (Status.isStatusRunning(node.getStatus())) {
-				continue;
-			}
-			else {
-				for (String dependency: node.getInNodes()) {
-					// We find that the outer-loop is unfinished.
-					if (!Status.isStatusFinished(getExecutableNode(dependency).getStatus())) {
-						continue nodeloop;
+		if (isFlowFinished() && !Status.isStatusFinished(getStatus())) {
+			jobsToRun.add(this);
+		}
+		else {
+			nodeloop:
+			for (ExecutableNode node: executableNodes.values()) {
+				if(Status.isStatusFinished(node.getStatus())) {
+					continue;
+				}
+	
+				if ((node instanceof ExecutableFlowBase) && Status.isStatusRunning(node.getStatus())) {
+					// If the flow is still running, we traverse into the flow
+					jobsToRun.addAll(((ExecutableFlowBase)node).findNextJobsToRun());
+				}
+				else if (Status.isStatusRunning(node.getStatus())) {
+					continue;
+				}
+				else {
+					for (String dependency: node.getInNodes()) {
+						// We find that the outer-loop is unfinished.
+						if (!Status.isStatusFinished(getExecutableNode(dependency).getStatus())) {
+							continue nodeloop;
+						}
 					}
+	
+					jobsToRun.add(node);
 				}
-
-				jobsToRun.add(node);
 			}
 		}
 		
diff --git a/unit/java/azkaban/test/execapp/FlowRunnerTest.java b/unit/java/azkaban/test/execapp/FlowRunnerTest.java
index d7fe287..a8c620e 100644
--- a/unit/java/azkaban/test/execapp/FlowRunnerTest.java
+++ b/unit/java/azkaban/test/execapp/FlowRunnerTest.java
@@ -285,7 +285,7 @@ public class FlowRunnerTest {
 
 		synchronized(this) {
 			try {
-				wait(4500);
+				wait(5000);
 			} catch (InterruptedException e) {
 				// TODO Auto-generated catch block
 				e.printStackTrace();
diff --git a/unit/java/azkaban/test/execapp/FlowRunnerTest2.java b/unit/java/azkaban/test/execapp/FlowRunnerTest2.java
index 312dea1..2cb47b8 100644
--- a/unit/java/azkaban/test/execapp/FlowRunnerTest2.java
+++ b/unit/java/azkaban/test/execapp/FlowRunnerTest2.java
@@ -223,6 +223,7 @@ public class FlowRunnerTest2 {
 		expectedStateMap.put("jobf", Status.SUCCEEDED);
 		compareStates(expectedStateMap, nodeMap);
 		Assert.assertEquals(Status.SUCCEEDED, flow.getStatus());
+		
 		Assert.assertFalse(thread.isAlive());
 	}
 	
@@ -577,7 +578,7 @@ public class FlowRunnerTest2 {
 		compareStates(expectedStateMap, nodeMap);
 		
 		InteractiveTestJob.getTestJob("jobb:innerJobB").failJob();
-		pause(1000);
+		pause(250);
 		expectedStateMap.put("jobb", Status.FAILED);
 		expectedStateMap.put("jobb:innerJobB", Status.FAILED);
 		expectedStateMap.put("jobb:innerJobC", Status.FAILED);
@@ -588,10 +589,11 @@ public class FlowRunnerTest2 {
 		expectedStateMap.put("jobd:innerFlow2", Status.KILLED);
 		expectedStateMap.put("jobe", Status.KILLED);
 		expectedStateMap.put("jobf", Status.KILLED);
-		
-		Assert.assertEquals(Status.FAILED, flow.getStatus());
 		compareStates(expectedStateMap, nodeMap);
+		
 		Assert.assertFalse(thread.isAlive());
+		Assert.assertEquals(Status.FAILED, flow.getStatus());
+
 	}
 	
 	@Test
@@ -700,7 +702,7 @@ public class FlowRunnerTest2 {
 		// 1. START FLOW
 		createExpectedStateMap(flow, expectedStateMap, nodeMap);
 		Thread thread = runFlowRunnerInThread(runner);
-		pause(250);
+		pause(1000);
 		
 		// After it starts up, only joba should be running
 		expectedStateMap.put("joba", Status.RUNNING);
@@ -728,7 +730,7 @@ public class FlowRunnerTest2 {
 		compareStates(expectedStateMap, nodeMap);
 		
 		runner.cancel("me");
-		pause(1000);
+		pause(250);
 		
 		expectedStateMap.put("jobb", Status.KILLED);
 		expectedStateMap.put("jobb:innerJobB", Status.FAILED);
@@ -805,14 +807,11 @@ public class FlowRunnerTest2 {
 		expectedStateMap.put("jobe", Status.KILLED);
 		expectedStateMap.put("jobf", Status.KILLED);
 		
-		Assert.assertEquals(Status.FAILED, flow.getStatus());
+		Assert.assertEquals(Status.KILLED, flow.getStatus());
 		compareStates(expectedStateMap, nodeMap);
 		Assert.assertFalse(thread.isAlive());
 	}
 	
-	// @TODO write test for:
-	// Pipeline
-	
 	private Thread runFlowRunnerInThread(FlowRunner runner) {
 		Thread thread = new Thread(runner);
 		thread.start();