azkaban-aplcache

Merge pull request #115 from azkaban/killCancelChanges Added

1/27/2014 5:53:59 PM

Details

diff --git a/src/java/azkaban/execapp/FlowRunner.java b/src/java/azkaban/execapp/FlowRunner.java
index 44b97a3..77c9016 100644
--- a/src/java/azkaban/execapp/FlowRunner.java
+++ b/src/java/azkaban/execapp/FlowRunner.java
@@ -400,7 +400,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 		resetFailedState(this.flow, retryJobs);
 		
 		for (ExecutableNode node: retryJobs) {
-			if(node.getStatus() == Status.READY || node.getStatus() == Status.DISABLED) {
+			if (node.getStatus() == Status.READY || node.getStatus() == Status.DISABLED) {
 				runReadyJob(node);
 			}
 			else if (node.getStatus() == Status.SUCCEEDED){
@@ -570,10 +570,12 @@ public class FlowRunner extends EventHandler implements Runnable {
 		boolean succeeded = true;
 		Props previousOutput = null;
 		
-		for(String end: flow.getEndNodes()) {
+		for (String end: flow.getEndNodes()) {
 			ExecutableNode node = flow.getExecutableNode(end);
 
-			if (node.getStatus() == Status.KILLED || node.getStatus() == Status.FAILED || node.getStatus() == Status.CANCELLED) {
+			if (node.getStatus() == Status.KILLED || 
+				node.getStatus() == Status.FAILED || 
+				node.getStatus() == Status.CANCELLED) {
 				succeeded = false;
 			}
 			
@@ -683,7 +685,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 		}
 		
 		File path = new File(execDir, source);
-		if(props == null) {
+		if (props == null) {
 			// if no override prop, load the original one on disk
 			try {
 				props = new Props(null, path);				
@@ -742,7 +744,9 @@ public class FlowRunner extends EventHandler implements Runnable {
 			if (!Status.isStatusFinished(depStatus)) {
 				return null;
 			}
-			else if (depStatus == Status.FAILED || depStatus == Status.CANCELLED || depStatus == Status.KILLED) {
+			else if (depStatus == Status.FAILED || 
+					depStatus == Status.CANCELLED || 
+					depStatus == Status.KILLED) {
 				// We propagate failures as KILLED states.
 				shouldKill = true;
 			}
@@ -908,18 +912,21 @@ public class FlowRunner extends EventHandler implements Runnable {
 			else if (node instanceof ExecutableFlowBase) {
 				ExecutableFlowBase base = (ExecutableFlowBase)node;
 				switch (base.getStatus()) {
-				case KILLED:
-				case FAILED:
-				case FAILED_FINISHING:
-					resetFailedState(base, nodesToRetry);
-					continue;
 				case CANCELLED:
 					node.setStatus(Status.READY);
 					node.setEndTime(-1);
 					node.setStartTime(-1);
 					node.setUpdateTime(currentTime);
+					// Break out of the switch. We'll reset the flow just like a normal node
 					break;
+				case KILLED:
+				case FAILED:
+				case FAILED_FINISHING:
+					resetFailedState(base, nodesToRetry);
+					continue;
 				default:
+					// Continue the while loop. If the job is in a finished state that's not
+					// a failure, we don't want to reset the job.
 					continue;
 				}
 			}
@@ -930,7 +937,7 @@ public class FlowRunner extends EventHandler implements Runnable {
 				node.setEndTime(-1);
 				node.setUpdateTime(currentTime);
 			}
-			else if(node.getStatus() == Status.FAILED || node.getStatus() == Status.KILLED) {
+			else if (node.getStatus() == Status.FAILED || node.getStatus() == Status.KILLED) {
 				node.resetForRetry();
 				nodesToRetry.add(node);
 			}
@@ -939,12 +946,13 @@ public class FlowRunner extends EventHandler implements Runnable {
 				logger.info("Resetting job '" + node.getNestedId() + "' from " + oldStatus + " to " + node.getStatus());
 			}
 			
-			for(String inId: node.getInNodes()) {
+			for (String inId: node.getInNodes()) {
 				ExecutableNode nodeUp = flow.getExecutableNode(inId);
 				queue.add(nodeUp);
 			}
 		}
 		
+		// At this point, the following code will reset the flow
 		Status oldFlowState = flow.getStatus();
 		if (maxStartTime == -1) {
 			// Nothing has run inside the flow, so we assume the flow hasn't even started running yet.